1use crate::metrics::{Metrics, WindowGaugeStats, WindowMetrics};
4use crate::{Error, Result};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PrometheusEncoder {
13 metric_prefix: String,
14 labels: Vec<(String, String)>,
15}
16
17impl PrometheusEncoder {
18 pub const DEFAULT_PREFIX: &'static str = "iperf3";
21
22 pub fn new(metric_prefix: impl Into<String>) -> Result<Self> {
24 let metric_prefix = metric_prefix.into();
25 validate_metric_prefix(&metric_prefix)?;
26 Ok(Self {
27 metric_prefix,
28 labels: Vec::new(),
29 })
30 }
31
32 pub fn with_labels<I, K, V>(metric_prefix: impl Into<String>, labels: I) -> Result<Self>
34 where
35 I: IntoIterator<Item = (K, V)>,
36 K: Into<String>,
37 V: Into<String>,
38 {
39 let metric_prefix = metric_prefix.into();
40 let labels = labels
41 .into_iter()
42 .map(|(name, value)| (name.into(), value.into()))
43 .collect::<Vec<_>>();
44 validate_metric_prefix(&metric_prefix)?;
45 validate_labels(&labels)?;
46 Ok(Self {
47 metric_prefix,
48 labels,
49 })
50 }
51
52 pub fn metric_prefix(&self) -> &str {
54 &self.metric_prefix
55 }
56
57 pub fn labels(&self) -> &[(String, String)] {
59 &self.labels
60 }
61
62 pub fn encode_interval(&self, metrics: &Metrics) -> String {
64 render_interval_prometheus_with_labels(metrics, &self.metric_prefix, &self.labels)
65 }
66
67 pub fn encode_window(&self, metrics: &WindowMetrics) -> String {
69 render_window_prometheus_with_labels(metrics, &self.metric_prefix, &self.labels)
70 }
71}
72
73impl Default for PrometheusEncoder {
74 fn default() -> Self {
75 Self {
76 metric_prefix: Self::DEFAULT_PREFIX.to_owned(),
77 labels: Vec::new(),
78 }
79 }
80}
81
82pub(crate) fn validate_metric_prefix(prefix: &str) -> Result<()> {
83 if !is_valid_metric_prefix(prefix) {
84 return Err(Error::invalid_argument(format!(
85 "invalid Prometheus metric prefix '{prefix}'"
86 )));
87 }
88 Ok(())
89}
90
91fn is_valid_metric_prefix(prefix: &str) -> bool {
92 is_valid_metric_prefix_bytes(prefix.as_bytes())
93}
94
95fn is_valid_metric_prefix_bytes(prefix: &[u8]) -> bool {
96 let Some((&first, rest)) = prefix.split_first() else {
97 return false;
98 };
99 if !(first.is_ascii_alphabetic() || first == b'_') {
100 return false;
101 }
102 for &byte in rest {
103 if !(byte.is_ascii_alphanumeric() || byte == b'_') {
104 return false;
105 }
106 }
107 true
108}
109
110fn validate_labels(labels: &[(String, String)]) -> Result<()> {
111 for (name, value) in labels {
112 if !is_valid_label_name(name) {
113 return Err(Error::invalid_argument(format!(
114 "invalid Prometheus label name '{name}'"
115 )));
116 }
117 if value.is_empty() {
118 return Err(Error::invalid_argument(format!(
119 "Prometheus label value for '{name}' must not be empty"
120 )));
121 }
122 }
123 for (index, (name, _)) in labels.iter().enumerate() {
124 if labels[..index]
125 .iter()
126 .any(|(previous_name, _)| previous_name == name)
127 {
128 return Err(Error::invalid_argument(format!(
129 "duplicate Prometheus label name '{name}'"
130 )));
131 }
132 }
133 Ok(())
134}
135
136fn is_valid_label_name(name: &str) -> bool {
137 is_valid_label_name_bytes(name.as_bytes())
138}
139
140fn is_valid_label_name_bytes(name: &[u8]) -> bool {
141 let Some((&first, rest)) = name.split_first() else {
142 return false;
143 };
144 if !(first.is_ascii_alphabetic() || first == b'_') {
145 return false;
146 }
147 for &byte in rest {
148 if !(byte.is_ascii_alphanumeric() || byte == b'_') {
149 return false;
150 }
151 }
152 true
153}
154
155#[cfg(feature = "pushgateway")]
156pub(crate) fn render_interval_prometheus(metrics: &Metrics, prefix: &str) -> String {
157 render_interval_prometheus_with_labels(metrics, prefix, &[])
158}
159
160fn render_interval_prometheus_with_labels(
161 metrics: &Metrics,
162 prefix: &str,
163 labels: &[(String, String)],
164) -> String {
165 let mut out = String::new();
166 let label_set = label_set(labels);
167 gauge(
168 &mut out,
169 &metric_name(prefix, "transferred_bytes"),
170 metrics.transferred_bytes,
171 &label_set,
172 );
173 gauge(
174 &mut out,
175 &metric_name(prefix, "bandwidth_bits_per_second"),
176 metrics.bandwidth_bits_per_second,
177 &label_set,
178 );
179 gauge(
180 &mut out,
181 &metric_name(prefix, "stream_count"),
182 metrics.stream_count as f64,
183 &label_set,
184 );
185 gauge_option(
186 &mut out,
187 &metric_name(prefix, "tcp_retransmits"),
188 metrics.tcp_retransmits,
189 &label_set,
190 );
191 gauge_option(
192 &mut out,
193 &metric_name(prefix, "tcp_rtt_seconds"),
194 metrics.tcp_rtt_seconds,
195 &label_set,
196 );
197 gauge_option(
198 &mut out,
199 &metric_name(prefix, "tcp_rttvar_seconds"),
200 metrics.tcp_rttvar_seconds,
201 &label_set,
202 );
203 gauge_option(
204 &mut out,
205 &metric_name(prefix, "tcp_snd_cwnd_bytes"),
206 metrics.tcp_snd_cwnd_bytes,
207 &label_set,
208 );
209 gauge_option(
210 &mut out,
211 &metric_name(prefix, "tcp_snd_wnd_bytes"),
212 metrics.tcp_snd_wnd_bytes,
213 &label_set,
214 );
215 gauge_option(
216 &mut out,
217 &metric_name(prefix, "tcp_pmtu_bytes"),
218 metrics.tcp_pmtu_bytes,
219 &label_set,
220 );
221 gauge_option(
222 &mut out,
223 &metric_name(prefix, "tcp_reorder_events"),
224 metrics.tcp_reorder_events,
225 &label_set,
226 );
227 gauge_option(
228 &mut out,
229 &metric_name(prefix, "udp_packets"),
230 metrics.udp_packets,
231 &label_set,
232 );
233 gauge_option(
234 &mut out,
235 &metric_name(prefix, "udp_lost_packets"),
236 metrics.udp_lost_packets,
237 &label_set,
238 );
239 gauge_option(
240 &mut out,
241 &metric_name(prefix, "udp_jitter_seconds"),
242 metrics.udp_jitter_seconds,
243 &label_set,
244 );
245 gauge_option(
246 &mut out,
247 &metric_name(prefix, "udp_out_of_order_packets"),
248 metrics.udp_out_of_order_packets,
249 &label_set,
250 );
251 gauge(
252 &mut out,
253 &metric_name(prefix, "omitted_intervals"),
254 if metrics.omitted { 1.0 } else { 0.0 },
255 &label_set,
256 );
257 out
258}
259
260#[cfg(feature = "pushgateway")]
261pub(crate) fn render_window_prometheus(metrics: &WindowMetrics, prefix: &str) -> String {
262 render_window_prometheus_with_labels(metrics, prefix, &[])
263}
264
265fn render_window_prometheus_with_labels(
266 metrics: &WindowMetrics,
267 prefix: &str,
268 labels: &[(String, String)],
269) -> String {
270 let mut out = String::new();
271 let label_set = label_set(labels);
272 gauge(
273 &mut out,
274 &metric_name(prefix, "window_duration_seconds"),
275 metrics.duration_seconds,
276 &label_set,
277 );
278 gauge(
279 &mut out,
280 &metric_name(prefix, "window_transferred_bytes"),
281 metrics.transferred_bytes,
282 &label_set,
283 );
284 gauge(
285 &mut out,
286 &metric_name(prefix, "window_stream_count"),
287 metrics.stream_count as f64,
288 &label_set,
289 );
290 gauge_stats(
291 &mut out,
292 prefix,
293 "window_bandwidth",
294 "bits_per_second",
295 metrics.bandwidth_bits_per_second,
296 &label_set,
297 );
298 gauge_stats(
299 &mut out,
300 prefix,
301 "window_tcp_rtt",
302 "seconds",
303 metrics.tcp_rtt_seconds,
304 &label_set,
305 );
306 gauge_stats(
307 &mut out,
308 prefix,
309 "window_tcp_rttvar",
310 "seconds",
311 metrics.tcp_rttvar_seconds,
312 &label_set,
313 );
314 gauge_stats(
315 &mut out,
316 prefix,
317 "window_tcp_snd_cwnd",
318 "bytes",
319 metrics.tcp_snd_cwnd_bytes,
320 &label_set,
321 );
322 gauge_stats(
323 &mut out,
324 prefix,
325 "window_tcp_snd_wnd",
326 "bytes",
327 metrics.tcp_snd_wnd_bytes,
328 &label_set,
329 );
330 gauge_stats(
331 &mut out,
332 prefix,
333 "window_tcp_pmtu",
334 "bytes",
335 metrics.tcp_pmtu_bytes,
336 &label_set,
337 );
338 gauge_stats(
339 &mut out,
340 prefix,
341 "window_udp_jitter",
342 "seconds",
343 metrics.udp_jitter_seconds,
344 &label_set,
345 );
346 gauge_option(
347 &mut out,
348 &metric_name(prefix, "window_tcp_retransmits"),
349 metrics.tcp_retransmits,
350 &label_set,
351 );
352 gauge_option(
353 &mut out,
354 &metric_name(prefix, "window_tcp_reorder_events"),
355 metrics.tcp_reorder_events,
356 &label_set,
357 );
358 gauge_option(
359 &mut out,
360 &metric_name(prefix, "window_udp_packets"),
361 metrics.udp_packets,
362 &label_set,
363 );
364 gauge_option(
365 &mut out,
366 &metric_name(prefix, "window_udp_lost_packets"),
367 metrics.udp_lost_packets,
368 &label_set,
369 );
370 gauge_option(
371 &mut out,
372 &metric_name(prefix, "window_udp_out_of_order_packets"),
373 metrics.udp_out_of_order_packets,
374 &label_set,
375 );
376 gauge(
377 &mut out,
378 &metric_name(prefix, "window_omitted_intervals"),
379 metrics.omitted_intervals,
380 &label_set,
381 );
382 out
383}
384
385fn metric_name(prefix: &str, suffix: &str) -> String {
386 format!("{prefix}_{suffix}")
387}
388
389fn label_set(labels: &[(String, String)]) -> String {
390 if labels.is_empty() {
391 return String::new();
392 }
393
394 let mut out = String::from("{");
395 for (index, (name, value)) in labels.iter().enumerate() {
396 if index > 0 {
397 out.push(',');
398 }
399 out.push_str(name);
400 out.push_str("=\"");
401 push_escaped_label_value(&mut out, value);
402 out.push('"');
403 }
404 out.push('}');
405 out
406}
407
408fn push_escaped_label_value(out: &mut String, value: &str) {
409 for ch in value.chars() {
410 match ch {
411 '\\' => out.push_str(r"\\"),
412 '"' => out.push_str(r#"\""#),
413 '\n' => out.push_str(r"\n"),
414 _ => out.push(ch),
415 }
416 }
417}
418
419fn gauge_stats(
420 out: &mut String,
421 prefix: &str,
422 stem: &str,
423 unit: &str,
424 stats: WindowGaugeStats,
425 label_set: &str,
426) {
427 if stats.samples == 0 {
428 return;
429 }
430 gauge(
431 out,
432 &metric_name(prefix, &format!("{stem}_mean_{unit}")),
433 stats.mean,
434 label_set,
435 );
436 gauge(
437 out,
438 &metric_name(prefix, &format!("{stem}_min_{unit}")),
439 stats.min,
440 label_set,
441 );
442 gauge(
443 out,
444 &metric_name(prefix, &format!("{stem}_max_{unit}")),
445 stats.max,
446 label_set,
447 );
448}
449
450fn gauge(out: &mut String, name: &str, value: f64, label_set: &str) {
451 out.push_str("# TYPE ");
452 out.push_str(name);
453 out.push_str(" gauge\n");
454 out.push_str(name);
455 out.push_str(label_set);
456 out.push(' ');
457 out.push_str(&value.to_string());
458 out.push('\n');
459}
460
461fn gauge_option(out: &mut String, name: &str, value: Option<f64>, label_set: &str) {
462 if let Some(value) = value {
463 gauge(out, name, value, label_set);
464 }
465}
466
467#[cfg(kani)]
468mod verification {
469 use super::*;
470
471 #[kani::proof]
472 #[kani::unwind(6)]
473 fn metric_prefix_matches_documented_shape_for_bounded_ascii() {
474 let len: usize = kani::any();
475 kani::assume(len <= 5);
476 let bytes: [u8; 5] = kani::any();
477 let raw = &bytes[..len];
478
479 let expected = if let Some((&first, rest)) = raw.split_first() {
480 let mut ok = first.is_ascii_alphabetic() || first == b'_';
481 for &byte in rest {
482 ok &= byte.is_ascii_alphanumeric() || byte == b'_';
483 }
484 ok
485 } else {
486 false
487 };
488
489 assert_eq!(is_valid_metric_prefix_bytes(raw), expected);
490 }
491
492 #[kani::proof]
493 #[kani::unwind(6)]
494 fn label_name_matches_documented_shape_for_bounded_ascii() {
495 let len: usize = kani::any();
496 kani::assume(len <= 5);
497 let bytes: [u8; 5] = kani::any();
498 let raw = &bytes[..len];
499
500 let expected = if let Some((&first, rest)) = raw.split_first() {
501 let mut ok = first.is_ascii_alphabetic() || first == b'_';
502 for &byte in rest {
503 ok &= byte.is_ascii_alphanumeric() || byte == b'_';
504 }
505 ok
506 } else {
507 false
508 };
509
510 assert_eq!(is_valid_label_name_bytes(raw), expected);
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 #[test]
519 fn default_encoder_renders_prometheus_gauges() {
520 let rendered = PrometheusEncoder::default().encode_interval(&Metrics {
521 transferred_bytes: 1.0,
522 bandwidth_bits_per_second: 8.0,
523 tcp_retransmits: Some(5.0),
524 tcp_rtt_seconds: Some(0.006),
525 tcp_rttvar_seconds: Some(0.007),
526 tcp_snd_cwnd_bytes: Some(8.0),
527 tcp_snd_wnd_bytes: Some(9.0),
528 tcp_pmtu_bytes: Some(10.0),
529 tcp_reorder_events: Some(11.0),
530 udp_packets: Some(2.0),
531 udp_lost_packets: Some(3.0),
532 udp_jitter_seconds: Some(0.004),
533 udp_out_of_order_packets: Some(12.0),
534 interval_duration_seconds: 1.0,
535 omitted: true,
536 ..Metrics::default()
537 });
538
539 assert!(rendered.contains("iperf3_transferred_bytes 1\n"));
540 assert!(rendered.contains("iperf3_stream_count 0\n"));
541 assert!(rendered.contains("iperf3_tcp_rtt_seconds 0.006\n"));
542 assert!(rendered.contains("iperf3_udp_packets 2\n"));
543 assert!(rendered.contains("iperf3_udp_lost_packets 3\n"));
544 assert!(rendered.contains("iperf3_udp_jitter_seconds 0.004\n"));
545 assert!(rendered.contains("iperf3_udp_out_of_order_packets 12\n"));
546 assert!(rendered.contains("iperf3_omitted_intervals 1\n"));
547 }
548
549 #[test]
550 fn custom_prefix_changes_metric_names() {
551 let rendered = PrometheusEncoder::new("nettest")
552 .unwrap()
553 .encode_interval(&Metrics::default());
554
555 assert!(rendered.contains("# TYPE nettest_transferred_bytes gauge\n"));
556 assert!(rendered.contains("nettest_bandwidth_bits_per_second 0\n"));
557 assert!(!rendered.contains("iperf3_transferred_bytes"));
558 }
559
560 #[test]
561 fn fixed_labels_are_rendered_on_samples() {
562 let rendered = PrometheusEncoder::with_labels(
563 "nettest",
564 [("site", "ci"), ("case", "quote\"slash\\line\n")],
565 )
566 .unwrap()
567 .encode_interval(&Metrics::default());
568
569 assert!(rendered.contains("# TYPE nettest_transferred_bytes gauge\n"));
570 assert!(rendered.contains(
571 "nettest_transferred_bytes{site=\"ci\",case=\"quote\\\"slash\\\\line\\n\"} 0\n"
572 ));
573 }
574
575 #[test]
576 fn invalid_prefix_is_rejected() {
577 let err = PrometheusEncoder::new("bad-prefix").unwrap_err();
578
579 assert!(err.to_string().contains("metric prefix"));
580 }
581
582 #[test]
583 fn invalid_labels_are_rejected() {
584 for labels in [
585 vec![("9bad", "value")],
586 vec![("ok", "")],
587 vec![("dup", "one"), ("dup", "two")],
588 ] {
589 let err = PrometheusEncoder::with_labels("iperf3", labels).unwrap_err();
590 assert!(err.to_string().contains("label"));
591 }
592 }
593
594 #[test]
595 fn renders_all_expected_metric_names() {
596 let rendered = PrometheusEncoder::default().encode_interval(&Metrics::default());
597
598 for name in [
599 "iperf3_transferred_bytes",
600 "iperf3_bandwidth_bits_per_second",
601 "iperf3_stream_count",
602 "iperf3_omitted_intervals",
603 ] {
604 assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
605 assert!(rendered.contains(&format!("{name} 0\n")));
606 }
607
608 for name in [
609 "iperf3_tcp_retransmits",
610 "iperf3_tcp_rtt_seconds",
611 "iperf3_tcp_rttvar_seconds",
612 "iperf3_tcp_snd_cwnd_bytes",
613 "iperf3_tcp_snd_wnd_bytes",
614 "iperf3_tcp_pmtu_bytes",
615 "iperf3_tcp_reorder_events",
616 "iperf3_udp_packets",
617 "iperf3_udp_lost_packets",
618 "iperf3_udp_jitter_seconds",
619 "iperf3_udp_out_of_order_packets",
620 ] {
621 assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
622 }
623 }
624
625 #[test]
626 fn renders_window_prometheus_gauges() {
627 let rendered = PrometheusEncoder::default().encode_window(&WindowMetrics {
628 duration_seconds: 10.0,
629 transferred_bytes: 1000.0,
630 bandwidth_bits_per_second: WindowGaugeStats {
631 samples: 2,
632 mean: 100.0,
633 min: 90.0,
634 max: 110.0,
635 },
636 tcp_rtt_seconds: WindowGaugeStats {
637 samples: 2,
638 mean: 0.010,
639 min: 0.005,
640 max: 0.020,
641 },
642 tcp_retransmits: Some(3.0),
643 udp_packets: Some(4.0),
644 udp_lost_packets: Some(1.0),
645 omitted_intervals: 2.0,
646 ..WindowMetrics::default()
647 });
648
649 assert!(rendered.contains("iperf3_window_duration_seconds 10\n"));
650 assert!(rendered.contains("iperf3_window_transferred_bytes 1000\n"));
651 assert!(rendered.contains("iperf3_window_stream_count 0\n"));
652 assert!(rendered.contains("iperf3_window_bandwidth_mean_bits_per_second 100\n"));
653 assert!(rendered.contains("iperf3_window_bandwidth_min_bits_per_second 90\n"));
654 assert!(rendered.contains("iperf3_window_bandwidth_max_bits_per_second 110\n"));
655 assert!(rendered.contains("iperf3_window_tcp_rtt_mean_seconds 0.01\n"));
656 assert!(rendered.contains("iperf3_window_tcp_rtt_min_seconds 0.005\n"));
657 assert!(rendered.contains("iperf3_window_tcp_rtt_max_seconds 0.02\n"));
658 assert!(rendered.contains("iperf3_window_tcp_retransmits 3\n"));
659 assert!(rendered.contains("iperf3_window_udp_packets 4\n"));
660 assert!(rendered.contains("iperf3_window_udp_lost_packets 1\n"));
661 assert!(rendered.contains("iperf3_window_omitted_intervals 2\n"));
662 }
663
664 #[test]
665 fn renders_all_expected_window_metric_names() {
666 let rendered = PrometheusEncoder::default().encode_window(&WindowMetrics::default());
667
668 for name in [
669 "iperf3_window_duration_seconds",
670 "iperf3_window_transferred_bytes",
671 "iperf3_window_stream_count",
672 "iperf3_window_omitted_intervals",
673 ] {
674 assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
675 assert!(rendered.contains(&format!("{name} 0\n")));
676 }
677
678 for name in [
679 "iperf3_window_bandwidth_mean_bits_per_second",
680 "iperf3_window_bandwidth_min_bits_per_second",
681 "iperf3_window_bandwidth_max_bits_per_second",
682 "iperf3_window_tcp_rtt_mean_seconds",
683 "iperf3_window_tcp_rtt_min_seconds",
684 "iperf3_window_tcp_rtt_max_seconds",
685 "iperf3_window_tcp_rttvar_mean_seconds",
686 "iperf3_window_tcp_rttvar_min_seconds",
687 "iperf3_window_tcp_rttvar_max_seconds",
688 "iperf3_window_tcp_snd_cwnd_mean_bytes",
689 "iperf3_window_tcp_snd_cwnd_min_bytes",
690 "iperf3_window_tcp_snd_cwnd_max_bytes",
691 "iperf3_window_tcp_snd_wnd_mean_bytes",
692 "iperf3_window_tcp_snd_wnd_min_bytes",
693 "iperf3_window_tcp_snd_wnd_max_bytes",
694 "iperf3_window_tcp_pmtu_mean_bytes",
695 "iperf3_window_tcp_pmtu_min_bytes",
696 "iperf3_window_tcp_pmtu_max_bytes",
697 "iperf3_window_udp_jitter_mean_seconds",
698 "iperf3_window_udp_jitter_min_seconds",
699 "iperf3_window_udp_jitter_max_seconds",
700 "iperf3_window_tcp_retransmits",
701 "iperf3_window_tcp_reorder_events",
702 "iperf3_window_udp_packets",
703 "iperf3_window_udp_lost_packets",
704 "iperf3_window_udp_out_of_order_packets",
705 ] {
706 assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
707 }
708 }
709}