1use std::time::Duration;
8
9use reqwest::StatusCode;
10use reqwest::blocking::Client;
11use url::Url;
12
13use crate::metrics::{Metrics, WindowMetrics};
14use crate::prometheus::{
15 PrometheusEncoder, render_interval_prometheus as render_prometheus, render_window_prometheus,
16 validate_metric_prefix,
17};
18use crate::{Error, ErrorKind, Result};
19
20const PUSH_RETRY_BASE_DELAY: Duration = Duration::from_millis(100);
21const PUSH_RETRY_MAX_DELAY: Duration = Duration::from_secs(1);
22
23#[derive(Debug, Clone)]
25#[non_exhaustive]
26pub struct PushGatewayConfig {
27 pub endpoint: Url,
32 pub job: String,
34 pub labels: Vec<(String, String)>,
36 pub timeout: Duration,
38 pub retries: u32,
40 pub user_agent: String,
42 pub metric_prefix: String,
44 pub delete_on_finish: bool,
46}
47
48impl PushGatewayConfig {
49 pub const DEFAULT_JOB: &'static str = "iperf3";
51 pub const DEFAULT_METRIC_PREFIX: &'static str = PrometheusEncoder::DEFAULT_PREFIX;
53 pub const DEFAULT_RETRIES: u32 = 0;
55 pub const MAX_RETRIES: u32 = 10;
57
58 pub fn new(endpoint: Url) -> Self {
61 Self {
62 endpoint,
63 job: Self::DEFAULT_JOB.to_owned(),
64 labels: Vec::new(),
65 timeout: Self::default_timeout(),
66 retries: Self::DEFAULT_RETRIES,
67 user_agent: Self::default_user_agent(),
68 metric_prefix: Self::DEFAULT_METRIC_PREFIX.to_owned(),
69 delete_on_finish: false,
70 }
71 }
72
73 pub fn parse_endpoint(raw: &str) -> Result<Url> {
75 let raw = raw.trim();
76 if raw.is_empty() {
77 return Err(Error::invalid_argument(
78 "Pushgateway endpoint must not be empty",
79 ));
80 }
81
82 let with_scheme = if raw.starts_with("http://") || raw.starts_with("https://") {
85 raw.to_owned()
86 } else {
87 format!("http://{raw}")
88 };
89 Url::parse(&with_scheme).map_err(|err| {
90 Error::with_source(
91 ErrorKind::InvalidArgument,
92 "invalid Pushgateway endpoint URL",
93 err,
94 )
95 })
96 }
97
98 pub const fn default_timeout() -> Duration {
100 Duration::from_secs(5)
101 }
102
103 pub fn default_user_agent() -> String {
105 format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
106 }
107
108 pub fn job(mut self, job: impl Into<String>) -> Self {
110 self.job = job.into();
111 self
112 }
113
114 pub fn label(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
116 self.labels.push((name.into(), value.into()));
117 self
118 }
119
120 pub fn labels<I, K, V>(mut self, labels: I) -> Self
122 where
123 I: IntoIterator<Item = (K, V)>,
124 K: Into<String>,
125 V: Into<String>,
126 {
127 self.labels = labels
128 .into_iter()
129 .map(|(name, value)| (name.into(), value.into()))
130 .collect();
131 self
132 }
133
134 pub fn timeout(mut self, timeout: Duration) -> Self {
136 self.timeout = timeout;
137 self
138 }
139
140 pub fn retries(mut self, retries: u32) -> Self {
142 self.retries = retries;
143 self
144 }
145
146 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
148 self.user_agent = user_agent.into();
149 self
150 }
151
152 pub fn metric_prefix(mut self, metric_prefix: impl Into<String>) -> Self {
154 self.metric_prefix = metric_prefix.into();
155 self
156 }
157
158 pub fn delete_on_finish(mut self, delete: bool) -> Self {
160 self.delete_on_finish = delete;
161 self
162 }
163
164 pub fn validate(&self) -> Result<()> {
166 validate_endpoint(&self.endpoint)?;
167 validate_job(&self.job)?;
168 validate_labels(&self.labels)?;
169 validate_timeout(self.timeout)?;
170 validate_retries(self.retries)?;
171 validate_user_agent(&self.user_agent)?;
172 validate_metric_prefix(&self.metric_prefix)?;
173 Ok(())
174 }
175}
176
177pub struct PushGateway {
179 client: Client,
180 url: Url,
181 retries: u32,
182 metric_prefix: String,
183 delete_on_finish: bool,
184}
185
186impl PushGateway {
187 pub fn new(config: PushGatewayConfig) -> Result<Self> {
189 config.validate()?;
190
191 let PushGatewayConfig {
192 endpoint,
193 job,
194 labels,
195 timeout,
196 retries,
197 user_agent,
198 metric_prefix,
199 delete_on_finish,
200 } = config;
201 let url = grouping_url(endpoint, &job, &labels);
202 let client = build_http_client(timeout, user_agent)?;
203
204 Ok(Self {
205 client,
206 url,
207 retries,
208 metric_prefix,
209 delete_on_finish,
210 })
211 }
212
213 pub fn push(&self, metrics: &Metrics) -> Result<()> {
215 let body = render_prometheus(metrics, &self.metric_prefix);
216 self.push_body(&body)
217 }
218
219 pub fn push_window(&self, metrics: &WindowMetrics) -> Result<()> {
221 let body = render_window_prometheus(metrics, &self.metric_prefix);
222 self.push_body(&body)
223 }
224
225 pub fn delete(&self) -> Result<()> {
227 for attempt in 0..=self.retries {
228 match self.delete_once() {
229 Ok(()) => return Ok(()),
230 Err(err) if err.retryable && attempt < self.retries => {
231 std::thread::sleep(retry_delay(attempt));
232 }
233 Err(err) => return Err(err.error),
234 }
235 }
236
237 unreachable!("delete retry loop always returns")
238 }
239
240 pub(crate) fn delete_on_finish(&self) -> bool {
241 self.delete_on_finish
242 }
243
244 #[cfg(test)]
245 fn url(&self) -> &Url {
246 &self.url
247 }
248
249 fn push_body(&self, body: &str) -> Result<()> {
250 for attempt in 0..=self.retries {
251 match self.push_once(body) {
252 Ok(()) => return Ok(()),
253 Err(err) if err.retryable && attempt < self.retries => {
254 std::thread::sleep(retry_delay(attempt));
255 }
256 Err(err) => return Err(err.error),
257 }
258 }
259
260 unreachable!("push retry loop always returns")
261 }
262
263 fn push_once(&self, body: &str) -> std::result::Result<(), PushAttemptError> {
264 let response = self
265 .client
266 .put(self.url.clone())
267 .header("content-type", "text/plain; version=0.0.4; charset=utf-8")
268 .body(body.to_owned())
269 .send()
270 .map_err(|err| PushAttemptError {
271 error: Error::with_source(
272 ErrorKind::PushGateway,
273 "failed to send Pushgateway request",
274 err,
275 ),
276 retryable: true,
277 })?;
278
279 if !response.status().is_success() {
280 let status = response.status();
281 return Err(PushAttemptError {
282 error: Error::pushgateway(format!("Pushgateway returned {status}")),
283 retryable: is_retryable_status(status),
284 });
285 }
286
287 Ok(())
288 }
289
290 fn delete_once(&self) -> std::result::Result<(), PushAttemptError> {
291 let response =
292 self.client
293 .delete(self.url.clone())
294 .send()
295 .map_err(|err| PushAttemptError {
296 error: Error::with_source(
297 ErrorKind::PushGateway,
298 "failed to send Pushgateway delete request",
299 err,
300 ),
301 retryable: true,
302 })?;
303
304 if !response.status().is_success() {
305 let status = response.status();
306 return Err(PushAttemptError {
307 error: Error::pushgateway(format!("Pushgateway delete returned {status}")),
308 retryable: is_retryable_status(status),
309 });
310 }
311
312 Ok(())
313 }
314}
315
316fn grouping_url(mut endpoint: Url, job: &str, labels: &[(String, String)]) -> Url {
317 let path = grouping_path(endpoint.path(), job, labels);
318 endpoint.set_path(&path);
319 endpoint
320}
321
322fn grouping_path(base_path: &str, job: &str, labels: &[(String, String)]) -> String {
323 let mut path = base_path.trim_end_matches('/').to_owned();
324 path.push_str("/metrics/job/");
327 path.push_str(&encode_path_segment(job));
328 for (name, value) in labels {
329 path.push('/');
330 path.push_str(&encode_path_segment(name));
331 path.push('/');
332 path.push_str(&encode_path_segment(value));
333 }
334 path
335}
336
337fn build_http_client(timeout: Duration, user_agent: String) -> Result<Client> {
338 Client::builder()
339 .timeout(timeout)
342 .user_agent(user_agent)
343 .build()
344 .map_err(|err| {
345 Error::with_source(ErrorKind::PushGateway, "failed to build HTTP client", err)
346 })
347}
348
349fn validate_endpoint(endpoint: &Url) -> Result<()> {
350 match endpoint.scheme() {
351 "http" | "https" => {}
352 scheme => {
353 return Err(Error::invalid_argument(format!(
354 "Pushgateway endpoint scheme must be http or https, got '{scheme}'"
355 )));
356 }
357 }
358 if endpoint.host_str().is_none() {
359 return Err(Error::invalid_argument(
360 "Pushgateway endpoint must include a host",
361 ));
362 }
363 Ok(())
364}
365
366fn validate_job(job: &str) -> Result<()> {
367 if job.is_empty() {
368 return Err(Error::invalid_argument(
369 "Pushgateway job name must not be empty",
370 ));
371 }
372 Ok(())
373}
374
375fn validate_labels(labels: &[(String, String)]) -> Result<()> {
376 for (name, value) in labels {
377 validate_label(name, value)?;
378 }
379 reject_duplicate_labels(labels)?;
380 Ok(())
381}
382
383pub(crate) fn validate_label(name: &str, value: &str) -> Result<()> {
384 if !is_valid_label_name(name) {
385 return Err(Error::invalid_argument(format!(
386 "invalid Pushgateway label name '{name}'"
387 )));
388 }
389 if is_reserved_label_name(name) {
390 return Err(Error::invalid_argument(format!(
391 "Pushgateway label name '{name}' is reserved"
392 )));
393 }
394 if value.is_empty() {
395 return Err(Error::invalid_argument(format!(
396 "Pushgateway label value for '{name}' must not be empty"
397 )));
398 }
399 Ok(())
400}
401
402fn reject_duplicate_labels(labels: &[(String, String)]) -> Result<()> {
403 for (index, (name, _)) in labels.iter().enumerate() {
404 if labels[..index]
405 .iter()
406 .any(|(previous_name, _)| previous_name == name)
407 {
408 return Err(Error::invalid_argument(format!(
409 "duplicate Pushgateway label name '{name}'"
410 )));
411 }
412 }
413 Ok(())
414}
415
416fn validate_timeout(timeout: Duration) -> Result<()> {
417 if timeout.is_zero() {
418 return Err(Error::invalid_argument(
419 "Pushgateway timeout must be greater than zero",
420 ));
421 }
422 Ok(())
423}
424
425pub(crate) fn validate_retries(retries: u32) -> Result<()> {
426 if retries > PushGatewayConfig::MAX_RETRIES {
427 return Err(Error::invalid_argument(format!(
428 "Pushgateway retries must be at most {}",
429 PushGatewayConfig::MAX_RETRIES
430 )));
431 }
432 Ok(())
433}
434
435pub(crate) fn validate_user_agent(value: &str) -> Result<()> {
436 if value.is_empty() {
437 return Err(Error::invalid_argument(
438 "Pushgateway User-Agent must not be empty",
439 ));
440 }
441 if value.chars().any(char::is_control) {
442 return Err(Error::invalid_argument(
443 "Pushgateway User-Agent must not contain control characters",
444 ));
445 }
446 Ok(())
447}
448
449pub(crate) fn is_valid_label_name(name: &str) -> bool {
450 is_valid_label_name_bytes(name.as_bytes())
451}
452
453pub(crate) fn is_reserved_label_name(name: &str) -> bool {
454 is_reserved_label_name_bytes(name.as_bytes())
455}
456
457pub(crate) fn is_reserved_label_name_bytes(name: &[u8]) -> bool {
458 name == b"job"
459}
460
461pub(crate) fn is_valid_label_name_bytes(name: &[u8]) -> bool {
462 let Some((&first, rest)) = name.split_first() else {
463 return false;
464 };
465 if !(first.is_ascii_alphabetic() || first == b'_') {
466 return false;
467 }
468 for &byte in rest {
469 if !(byte.is_ascii_alphanumeric() || byte == b'_') {
470 return false;
471 }
472 }
473 true
474}
475
476#[derive(Debug)]
477struct PushAttemptError {
478 error: Error,
479 retryable: bool,
480}
481
482fn is_retryable_status(status: StatusCode) -> bool {
483 is_retryable_status_code(status.as_u16())
484}
485
486fn is_retryable_status_code(status: u16) -> bool {
487 (500..=599).contains(&status) || status == 429
488}
489
490fn retry_delay(attempt: u32) -> Duration {
491 PUSH_RETRY_BASE_DELAY
492 .saturating_mul(2_u32.saturating_pow(attempt))
493 .min(PUSH_RETRY_MAX_DELAY)
494}
495
496fn encode_path_segment(raw: &str) -> String {
497 let mut encoded = String::new();
500 for byte in raw.bytes() {
501 let encoded_byte = encode_path_byte(byte);
502 for &byte in &encoded_byte.bytes[..encoded_byte.len] {
503 encoded.push(byte as char);
504 }
505 }
506 encoded
507}
508
509#[derive(Debug, Clone, Copy)]
510struct EncodedPathByte {
511 bytes: [u8; 3],
512 len: usize,
513}
514
515fn encode_path_byte(byte: u8) -> EncodedPathByte {
516 const HEX: &[u8; 16] = b"0123456789ABCDEF";
517
518 if is_unreserved_path_byte(byte) {
519 return EncodedPathByte {
520 bytes: [byte, 0, 0],
521 len: 1,
522 };
523 }
524
525 EncodedPathByte {
526 bytes: [b'%', HEX[(byte >> 4) as usize], HEX[(byte & 0x0f) as usize]],
527 len: 3,
528 }
529}
530
531fn is_unreserved_path_byte(byte: u8) -> bool {
532 matches!(
533 byte,
534 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~'
535 )
536}
537
538#[cfg(kani)]
539mod verification {
540 use super::*;
541
542 #[kani::proof]
543 #[kani::unwind(4)]
544 fn path_byte_encoding_escapes_reserved_bytes() {
545 let byte: u8 = kani::any();
546 let encoded = encode_path_byte(byte);
547
548 if is_unreserved_path_byte(byte) {
549 assert_eq!(encoded.len, 1);
550 assert_eq!(encoded.bytes[0], byte);
551 } else {
552 assert_eq!(encoded.len, 3);
553 assert_eq!(encoded.bytes[0], b'%');
554 assert!(encoded.bytes[1].is_ascii_hexdigit());
555 assert!(encoded.bytes[2].is_ascii_hexdigit());
556 }
557
558 for i in 0..encoded.len {
559 assert_ne!(encoded.bytes[i], b'/');
560 assert_ne!(encoded.bytes[i], b' ');
561 }
562 }
563
564 #[kani::proof]
565 fn retryable_status_codes_match_pushgateway_retry_policy() {
566 let status: u16 = kani::any();
567 let expected = status == 429 || (500..=599).contains(&status);
568
569 assert_eq!(is_retryable_status_code(status), expected);
570 }
571
572 #[kani::proof]
573 #[kani::unwind(12)]
574 fn retry_delay_is_bounded_for_configured_retry_counts() {
575 let attempt: u32 = kani::any();
576 kani::assume(attempt <= 10);
577
578 let delay = retry_delay(attempt);
579
580 assert!(delay >= PUSH_RETRY_BASE_DELAY);
581 assert!(delay <= PUSH_RETRY_MAX_DELAY);
582 assert!(!delay.is_zero());
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use std::io::{Read, Write};
589 use std::net::TcpListener;
590 use std::thread;
591
592 use crate::metrics::WindowGaugeStats;
593
594 use super::*;
595
596 #[test]
597 fn encodes_grouping_segments() {
598 assert_eq!(encode_path_segment("a b/c"), "a%20b%2Fc");
599 }
600
601 #[test]
602 fn renders_prometheus_gauges() {
603 let rendered = render_prometheus(
604 &Metrics {
605 transferred_bytes: 1.0,
606 bandwidth_bits_per_second: 8.0,
607 tcp_retransmits: Some(5.0),
608 tcp_rtt_seconds: Some(0.006),
609 tcp_rttvar_seconds: Some(0.007),
610 tcp_snd_cwnd_bytes: Some(8.0),
611 tcp_snd_wnd_bytes: Some(9.0),
612 tcp_pmtu_bytes: Some(10.0),
613 tcp_reorder_events: Some(11.0),
614 udp_packets: Some(2.0),
615 udp_lost_packets: Some(3.0),
616 udp_jitter_seconds: Some(0.004),
617 udp_out_of_order_packets: Some(12.0),
618 interval_duration_seconds: 1.0,
619 omitted: true,
620 ..Metrics::default()
621 },
622 "iperf3",
623 );
624 assert!(rendered.contains("iperf3_transferred_bytes 1\n"));
625 assert!(rendered.contains("iperf3_stream_count 0\n"));
626 assert!(rendered.contains("iperf3_tcp_rtt_seconds 0.006\n"));
627 assert!(rendered.contains("iperf3_udp_packets 2\n"));
628 assert!(rendered.contains("iperf3_udp_lost_packets 3\n"));
629 assert!(rendered.contains("iperf3_udp_jitter_seconds 0.004\n"));
630 assert!(rendered.contains("iperf3_udp_out_of_order_packets 12\n"));
631 assert!(rendered.contains("iperf3_omitted_intervals 1\n"));
632 }
633
634 #[test]
635 fn builds_pushgateway_grouping_url() {
636 let config = PushGatewayConfig::new(Url::parse("http://127.0.0.1:9091/base/").unwrap())
637 .job("iperf job")
638 .label("test", "test/one")
639 .label("scenario", "sample#1")
640 .label("mode", "client")
641 .user_agent("iperf3-rs/test");
642 let gateway = PushGateway::new(config).unwrap();
643
644 assert_eq!(
645 gateway.url().as_str(),
646 "http://127.0.0.1:9091/base/metrics/job/iperf%20job/test/test%2Fone/scenario/sample%231/mode/client"
647 );
648 }
649
650 #[test]
651 fn config_builder_sets_defaults() {
652 let config = PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap());
653
654 assert_eq!(config.job, PushGatewayConfig::DEFAULT_JOB);
655 assert!(config.labels.is_empty());
656 assert_eq!(config.timeout, PushGatewayConfig::default_timeout());
657 assert_eq!(config.retries, PushGatewayConfig::DEFAULT_RETRIES);
658 assert_eq!(config.user_agent, PushGatewayConfig::default_user_agent());
659 assert_eq!(
660 config.metric_prefix,
661 PushGatewayConfig::DEFAULT_METRIC_PREFIX
662 );
663 assert!(!config.delete_on_finish);
664 config.validate().unwrap();
665
666 let delete_config = config.delete_on_finish(true);
667 assert!(delete_config.delete_on_finish);
668 }
669
670 #[test]
671 fn config_validation_rejects_values_cli_would_reject() {
672 let endpoint = Url::parse("http://localhost:9091").unwrap();
673
674 for (label, config, expected) in [
675 (
676 "empty job",
677 PushGatewayConfig::new(endpoint.clone()).job(""),
678 "job name",
679 ),
680 (
681 "bad label",
682 PushGatewayConfig::new(endpoint.clone()).label("9bad", "value"),
683 "label name",
684 ),
685 (
686 "reserved label",
687 PushGatewayConfig::new(endpoint.clone()).label("job", "value"),
688 "reserved",
689 ),
690 (
691 "duplicate label",
692 PushGatewayConfig::new(endpoint.clone())
693 .label("site", "a")
694 .label("site", "b"),
695 "duplicate",
696 ),
697 (
698 "zero timeout",
699 PushGatewayConfig::new(endpoint.clone()).timeout(Duration::ZERO),
700 "timeout",
701 ),
702 (
703 "too many retries",
704 PushGatewayConfig::new(endpoint.clone())
705 .retries(PushGatewayConfig::MAX_RETRIES + 1),
706 "retries",
707 ),
708 (
709 "bad prefix",
710 PushGatewayConfig::new(endpoint.clone()).metric_prefix("bad-prefix"),
711 "metric prefix",
712 ),
713 ] {
714 let err = config.validate().expect_err(label);
715 assert!(
716 err.to_string().contains(expected),
717 "{label} should mention {expected:?}, got {err}"
718 );
719 }
720 }
721
722 #[test]
723 fn parses_bare_pushgateway_endpoint_as_http() {
724 let url = PushGatewayConfig::parse_endpoint("localhost:9091").unwrap();
725
726 assert_eq!(url.as_str(), "http://localhost:9091/");
727 }
728
729 #[test]
730 fn delete_sends_http_delete_to_grouping_url() {
731 let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap();
732 let endpoint = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
733 let handle = thread::spawn(move || {
734 let (mut stream, _) = listener.accept().unwrap();
735 let mut buffer = [0_u8; 1024];
736 let n = stream.read(&mut buffer).unwrap();
737 stream
738 .write_all(b"HTTP/1.1 202 Accepted\r\nContent-Length: 0\r\n\r\n")
739 .unwrap();
740 String::from_utf8_lossy(&buffer[..n]).into_owned()
741 });
742
743 let gateway =
744 PushGateway::new(PushGatewayConfig::new(endpoint).label("scenario", "delete")).unwrap();
745 gateway.delete().unwrap();
746
747 let request = handle.join().unwrap();
748 assert!(request.starts_with("DELETE /metrics/job/iperf3/scenario/delete HTTP/1.1"));
749 }
750
751 #[test]
752 fn renders_all_expected_metric_names() {
753 let rendered = render_prometheus(&Metrics::default(), "iperf3");
754
755 for name in [
756 "iperf3_transferred_bytes",
757 "iperf3_bandwidth_bits_per_second",
758 "iperf3_stream_count",
759 "iperf3_omitted_intervals",
760 ] {
761 assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
762 assert!(rendered.contains(&format!("{name} 0\n")));
763 }
764
765 for name in [
766 "iperf3_tcp_retransmits",
767 "iperf3_tcp_rtt_seconds",
768 "iperf3_tcp_rttvar_seconds",
769 "iperf3_tcp_snd_cwnd_bytes",
770 "iperf3_tcp_snd_wnd_bytes",
771 "iperf3_tcp_pmtu_bytes",
772 "iperf3_tcp_reorder_events",
773 "iperf3_udp_packets",
774 "iperf3_udp_lost_packets",
775 "iperf3_udp_jitter_seconds",
776 "iperf3_udp_out_of_order_packets",
777 ] {
778 assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
779 }
780 }
781
782 #[test]
783 fn renders_window_prometheus_gauges() {
784 let rendered = render_window_prometheus(
785 &WindowMetrics {
786 duration_seconds: 10.0,
787 transferred_bytes: 1000.0,
788 bandwidth_bits_per_second: WindowGaugeStats {
789 samples: 2,
790 mean: 100.0,
791 min: 90.0,
792 max: 110.0,
793 },
794 tcp_rtt_seconds: WindowGaugeStats {
795 samples: 2,
796 mean: 0.010,
797 min: 0.005,
798 max: 0.020,
799 },
800 tcp_retransmits: Some(3.0),
801 udp_packets: Some(4.0),
802 udp_lost_packets: Some(1.0),
803 omitted_intervals: 2.0,
804 ..WindowMetrics::default()
805 },
806 "iperf3",
807 );
808
809 assert!(rendered.contains("iperf3_window_duration_seconds 10\n"));
810 assert!(rendered.contains("iperf3_window_transferred_bytes 1000\n"));
811 assert!(rendered.contains("iperf3_window_stream_count 0\n"));
812 assert!(rendered.contains("iperf3_window_bandwidth_mean_bits_per_second 100\n"));
813 assert!(rendered.contains("iperf3_window_bandwidth_min_bits_per_second 90\n"));
814 assert!(rendered.contains("iperf3_window_bandwidth_max_bits_per_second 110\n"));
815 assert!(rendered.contains("iperf3_window_tcp_rtt_mean_seconds 0.01\n"));
816 assert!(rendered.contains("iperf3_window_tcp_rtt_min_seconds 0.005\n"));
817 assert!(rendered.contains("iperf3_window_tcp_rtt_max_seconds 0.02\n"));
818 assert!(rendered.contains("iperf3_window_tcp_retransmits 3\n"));
819 assert!(rendered.contains("iperf3_window_udp_packets 4\n"));
820 assert!(rendered.contains("iperf3_window_udp_lost_packets 1\n"));
821 assert!(rendered.contains("iperf3_window_omitted_intervals 2\n"));
822 }
823
824 #[test]
825 fn renders_all_expected_window_metric_names() {
826 let rendered = render_window_prometheus(&WindowMetrics::default(), "iperf3");
827
828 for name in [
829 "iperf3_window_duration_seconds",
830 "iperf3_window_transferred_bytes",
831 "iperf3_window_stream_count",
832 "iperf3_window_omitted_intervals",
833 ] {
834 assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
835 assert!(rendered.contains(&format!("{name} 0\n")));
836 }
837
838 for name in [
839 "iperf3_window_bandwidth_mean_bits_per_second",
840 "iperf3_window_bandwidth_min_bits_per_second",
841 "iperf3_window_bandwidth_max_bits_per_second",
842 "iperf3_window_tcp_rtt_mean_seconds",
843 "iperf3_window_tcp_rtt_min_seconds",
844 "iperf3_window_tcp_rtt_max_seconds",
845 "iperf3_window_tcp_rttvar_mean_seconds",
846 "iperf3_window_tcp_rttvar_min_seconds",
847 "iperf3_window_tcp_rttvar_max_seconds",
848 "iperf3_window_tcp_snd_cwnd_mean_bytes",
849 "iperf3_window_tcp_snd_cwnd_min_bytes",
850 "iperf3_window_tcp_snd_cwnd_max_bytes",
851 "iperf3_window_tcp_snd_wnd_mean_bytes",
852 "iperf3_window_tcp_snd_wnd_min_bytes",
853 "iperf3_window_tcp_snd_wnd_max_bytes",
854 "iperf3_window_tcp_pmtu_mean_bytes",
855 "iperf3_window_tcp_pmtu_min_bytes",
856 "iperf3_window_tcp_pmtu_max_bytes",
857 "iperf3_window_udp_jitter_mean_seconds",
858 "iperf3_window_udp_jitter_min_seconds",
859 "iperf3_window_udp_jitter_max_seconds",
860 "iperf3_window_tcp_retransmits",
861 "iperf3_window_tcp_reorder_events",
862 "iperf3_window_udp_packets",
863 "iperf3_window_udp_lost_packets",
864 "iperf3_window_udp_out_of_order_packets",
865 ] {
866 assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
867 }
868 }
869
870 #[test]
871 fn renders_metric_names_with_custom_prefix() {
872 let rendered = render_prometheus(&Metrics::default(), "nettest");
873
874 assert!(rendered.contains("# TYPE nettest_transferred_bytes gauge\n"));
875 assert!(rendered.contains("nettest_bandwidth_bits_per_second 0\n"));
876 assert!(!rendered.contains("iperf3_transferred_bytes"));
877 }
878
879 #[test]
880 fn identifies_retryable_statuses() {
881 assert!(is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
882 assert!(is_retryable_status(StatusCode::BAD_GATEWAY));
883 assert!(!is_retryable_status(StatusCode::BAD_REQUEST));
884 }
885
886 #[test]
887 fn caps_retry_delay() {
888 assert_eq!(retry_delay(0), Duration::from_millis(100));
889 assert_eq!(retry_delay(1), Duration::from_millis(200));
890 assert_eq!(retry_delay(10), Duration::from_secs(1));
891 }
892}