Skip to main content

iperf3_rs/
pushgateway.rs

1//! Pushgateway HTTP delivery helpers.
2//!
3//! The CLI uses these types internally, and library users can also construct a
4//! [`PushGateway`] when they want to push [`crate::Metrics`] or
5//! [`crate::WindowMetrics`] collected from [`crate::MetricsStream`].
6
7use std::time::Duration;
8
9use reqwest::StatusCode;
10use reqwest::blocking::Client;
11use url::Url;
12
13use crate::metrics::{Metrics, WindowMetrics};
14use crate::prometheus::{
15    PrometheusEncoder, render_interval_prometheus as render_prometheus, render_window_prometheus,
16    validate_metric_prefix,
17};
18use crate::{Error, ErrorKind, Result};
19
20const PUSH_RETRY_BASE_DELAY: Duration = Duration::from_millis(100);
21const PUSH_RETRY_MAX_DELAY: Duration = Duration::from_secs(1);
22
23/// Configuration for a [`PushGateway`] sink.
24#[derive(Debug, Clone)]
25#[non_exhaustive]
26pub struct PushGatewayConfig {
27    /// Base Pushgateway URL.
28    ///
29    /// The final request path is built as
30    /// `/metrics/job/{job}/{label}/{value}/...`.
31    pub endpoint: Url,
32    /// Pushgateway job name.
33    pub job: String,
34    /// Grouping labels encoded into the Pushgateway request path.
35    pub labels: Vec<(String, String)>,
36    /// Per-request HTTP timeout.
37    pub timeout: Duration,
38    /// Number of retries after the first failed request.
39    pub retries: u32,
40    /// HTTP `User-Agent` header.
41    pub user_agent: String,
42    /// Prefix for emitted Prometheus metric names.
43    pub metric_prefix: String,
44    /// Delete this grouping key from Pushgateway after the run finishes.
45    pub delete_on_finish: bool,
46}
47
48impl PushGatewayConfig {
49    /// Default Pushgateway job name used by the CLI and builder.
50    pub const DEFAULT_JOB: &'static str = "iperf3";
51    /// Default metric prefix used by the CLI and builder.
52    pub const DEFAULT_METRIC_PREFIX: &'static str = PrometheusEncoder::DEFAULT_PREFIX;
53    /// Default number of Pushgateway retries after the first failed request.
54    pub const DEFAULT_RETRIES: u32 = 0;
55    /// Maximum supported retry count.
56    pub const MAX_RETRIES: u32 = 10;
57
58    /// Build a config with production-safe defaults for every field except the
59    /// Pushgateway endpoint.
60    pub fn new(endpoint: Url) -> Self {
61        Self {
62            endpoint,
63            job: Self::DEFAULT_JOB.to_owned(),
64            labels: Vec::new(),
65            timeout: Self::default_timeout(),
66            retries: Self::DEFAULT_RETRIES,
67            user_agent: Self::default_user_agent(),
68            metric_prefix: Self::DEFAULT_METRIC_PREFIX.to_owned(),
69            delete_on_finish: false,
70        }
71    }
72
73    /// Parse a Pushgateway endpoint, defaulting bare `host:port` values to HTTP.
74    pub fn parse_endpoint(raw: &str) -> Result<Url> {
75        let raw = raw.trim();
76        if raw.is_empty() {
77            return Err(Error::invalid_argument(
78                "Pushgateway endpoint must not be empty",
79            ));
80        }
81
82        // Keep local development terse: `localhost:9091` means the normal HTTP
83        // Pushgateway endpoint unless a scheme is explicitly provided.
84        let with_scheme = if raw.starts_with("http://") || raw.starts_with("https://") {
85            raw.to_owned()
86        } else {
87            format!("http://{raw}")
88        };
89        Url::parse(&with_scheme).map_err(|err| {
90            Error::with_source(
91                ErrorKind::InvalidArgument,
92                "invalid Pushgateway endpoint URL",
93                err,
94            )
95        })
96    }
97
98    /// Default per-request timeout.
99    pub const fn default_timeout() -> Duration {
100        Duration::from_secs(5)
101    }
102
103    /// Default HTTP `User-Agent`.
104    pub fn default_user_agent() -> String {
105        format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
106    }
107
108    /// Set the Pushgateway job name.
109    pub fn job(mut self, job: impl Into<String>) -> Self {
110        self.job = job.into();
111        self
112    }
113
114    /// Add one grouping label.
115    pub fn label(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
116        self.labels.push((name.into(), value.into()));
117        self
118    }
119
120    /// Replace grouping labels.
121    pub fn labels<I, K, V>(mut self, labels: I) -> Self
122    where
123        I: IntoIterator<Item = (K, V)>,
124        K: Into<String>,
125        V: Into<String>,
126    {
127        self.labels = labels
128            .into_iter()
129            .map(|(name, value)| (name.into(), value.into()))
130            .collect();
131        self
132    }
133
134    /// Set the per-request HTTP timeout.
135    pub fn timeout(mut self, timeout: Duration) -> Self {
136        self.timeout = timeout;
137        self
138    }
139
140    /// Set the number of retries after the first failed request.
141    pub fn retries(mut self, retries: u32) -> Self {
142        self.retries = retries;
143        self
144    }
145
146    /// Set the HTTP `User-Agent`.
147    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
148        self.user_agent = user_agent.into();
149        self
150    }
151
152    /// Set the Prometheus metric name prefix.
153    pub fn metric_prefix(mut self, metric_prefix: impl Into<String>) -> Self {
154        self.metric_prefix = metric_prefix.into();
155        self
156    }
157
158    /// Delete this grouping key from Pushgateway after direct delivery finishes.
159    pub fn delete_on_finish(mut self, delete: bool) -> Self {
160        self.delete_on_finish = delete;
161        self
162    }
163
164    /// Validate this config before it is used for HTTP delivery.
165    pub fn validate(&self) -> Result<()> {
166        validate_endpoint(&self.endpoint)?;
167        validate_job(&self.job)?;
168        validate_labels(&self.labels)?;
169        validate_timeout(self.timeout)?;
170        validate_retries(self.retries)?;
171        validate_user_agent(&self.user_agent)?;
172        validate_metric_prefix(&self.metric_prefix)?;
173        Ok(())
174    }
175}
176
177/// HTTP sink for pushing iperf metrics to Prometheus Pushgateway.
178pub struct PushGateway {
179    client: Client,
180    url: Url,
181    retries: u32,
182    metric_prefix: String,
183    delete_on_finish: bool,
184}
185
186impl PushGateway {
187    /// Build a Pushgateway sink from validated configuration.
188    pub fn new(config: PushGatewayConfig) -> Result<Self> {
189        config.validate()?;
190
191        let PushGatewayConfig {
192            endpoint,
193            job,
194            labels,
195            timeout,
196            retries,
197            user_agent,
198            metric_prefix,
199            delete_on_finish,
200        } = config;
201        let url = grouping_url(endpoint, &job, &labels);
202        let client = build_http_client(timeout, user_agent)?;
203
204        Ok(Self {
205            client,
206            url,
207            retries,
208            metric_prefix,
209            delete_on_finish,
210        })
211    }
212
213    /// Push one immediate interval sample.
214    pub fn push(&self, metrics: &Metrics) -> Result<()> {
215        let body = render_prometheus(metrics, &self.metric_prefix);
216        self.push_body(&body)
217    }
218
219    /// Push one aggregated window summary.
220    pub fn push_window(&self, metrics: &WindowMetrics) -> Result<()> {
221        let body = render_window_prometheus(metrics, &self.metric_prefix);
222        self.push_body(&body)
223    }
224
225    /// Delete this sink's grouping key from Pushgateway.
226    pub fn delete(&self) -> Result<()> {
227        for attempt in 0..=self.retries {
228            match self.delete_once() {
229                Ok(()) => return Ok(()),
230                Err(err) if err.retryable && attempt < self.retries => {
231                    std::thread::sleep(retry_delay(attempt));
232                }
233                Err(err) => return Err(err.error),
234            }
235        }
236
237        unreachable!("delete retry loop always returns")
238    }
239
240    pub(crate) fn delete_on_finish(&self) -> bool {
241        self.delete_on_finish
242    }
243
244    #[cfg(test)]
245    fn url(&self) -> &Url {
246        &self.url
247    }
248
249    fn push_body(&self, body: &str) -> Result<()> {
250        for attempt in 0..=self.retries {
251            match self.push_once(body) {
252                Ok(()) => return Ok(()),
253                Err(err) if err.retryable && attempt < self.retries => {
254                    std::thread::sleep(retry_delay(attempt));
255                }
256                Err(err) => return Err(err.error),
257            }
258        }
259
260        unreachable!("push retry loop always returns")
261    }
262
263    fn push_once(&self, body: &str) -> std::result::Result<(), PushAttemptError> {
264        let response = self
265            .client
266            .put(self.url.clone())
267            .header("content-type", "text/plain; version=0.0.4; charset=utf-8")
268            .body(body.to_owned())
269            .send()
270            .map_err(|err| PushAttemptError {
271                error: Error::with_source(
272                    ErrorKind::PushGateway,
273                    "failed to send Pushgateway request",
274                    err,
275                ),
276                retryable: true,
277            })?;
278
279        if !response.status().is_success() {
280            let status = response.status();
281            return Err(PushAttemptError {
282                error: Error::pushgateway(format!("Pushgateway returned {status}")),
283                retryable: is_retryable_status(status),
284            });
285        }
286
287        Ok(())
288    }
289
290    fn delete_once(&self) -> std::result::Result<(), PushAttemptError> {
291        let response =
292            self.client
293                .delete(self.url.clone())
294                .send()
295                .map_err(|err| PushAttemptError {
296                    error: Error::with_source(
297                        ErrorKind::PushGateway,
298                        "failed to send Pushgateway delete request",
299                        err,
300                    ),
301                    retryable: true,
302                })?;
303
304        if !response.status().is_success() {
305            let status = response.status();
306            return Err(PushAttemptError {
307                error: Error::pushgateway(format!("Pushgateway delete returned {status}")),
308                retryable: is_retryable_status(status),
309            });
310        }
311
312        Ok(())
313    }
314}
315
316fn grouping_url(mut endpoint: Url, job: &str, labels: &[(String, String)]) -> Url {
317    let path = grouping_path(endpoint.path(), job, labels);
318    endpoint.set_path(&path);
319    endpoint
320}
321
322fn grouping_path(base_path: &str, job: &str, labels: &[(String, String)]) -> String {
323    let mut path = base_path.trim_end_matches('/').to_owned();
324    // Pushgateway represents grouping labels as path segments:
325    // /metrics/job/<job>/<label>/<value>/...
326    path.push_str("/metrics/job/");
327    path.push_str(&encode_path_segment(job));
328    for (name, value) in labels {
329        path.push('/');
330        path.push_str(&encode_path_segment(name));
331        path.push('/');
332        path.push_str(&encode_path_segment(value));
333    }
334    path
335}
336
337fn build_http_client(timeout: Duration, user_agent: String) -> Result<Client> {
338    Client::builder()
339        // Metrics are best-effort; a stuck gateway should not hold the iperf
340        // process indefinitely.
341        .timeout(timeout)
342        .user_agent(user_agent)
343        .build()
344        .map_err(|err| {
345            Error::with_source(ErrorKind::PushGateway, "failed to build HTTP client", err)
346        })
347}
348
349fn validate_endpoint(endpoint: &Url) -> Result<()> {
350    match endpoint.scheme() {
351        "http" | "https" => {}
352        scheme => {
353            return Err(Error::invalid_argument(format!(
354                "Pushgateway endpoint scheme must be http or https, got '{scheme}'"
355            )));
356        }
357    }
358    if endpoint.host_str().is_none() {
359        return Err(Error::invalid_argument(
360            "Pushgateway endpoint must include a host",
361        ));
362    }
363    Ok(())
364}
365
366fn validate_job(job: &str) -> Result<()> {
367    if job.is_empty() {
368        return Err(Error::invalid_argument(
369            "Pushgateway job name must not be empty",
370        ));
371    }
372    Ok(())
373}
374
375fn validate_labels(labels: &[(String, String)]) -> Result<()> {
376    for (name, value) in labels {
377        validate_label(name, value)?;
378    }
379    reject_duplicate_labels(labels)?;
380    Ok(())
381}
382
383pub(crate) fn validate_label(name: &str, value: &str) -> Result<()> {
384    if !is_valid_label_name(name) {
385        return Err(Error::invalid_argument(format!(
386            "invalid Pushgateway label name '{name}'"
387        )));
388    }
389    if is_reserved_label_name(name) {
390        return Err(Error::invalid_argument(format!(
391            "Pushgateway label name '{name}' is reserved"
392        )));
393    }
394    if value.is_empty() {
395        return Err(Error::invalid_argument(format!(
396            "Pushgateway label value for '{name}' must not be empty"
397        )));
398    }
399    Ok(())
400}
401
402fn reject_duplicate_labels(labels: &[(String, String)]) -> Result<()> {
403    for (index, (name, _)) in labels.iter().enumerate() {
404        if labels[..index]
405            .iter()
406            .any(|(previous_name, _)| previous_name == name)
407        {
408            return Err(Error::invalid_argument(format!(
409                "duplicate Pushgateway label name '{name}'"
410            )));
411        }
412    }
413    Ok(())
414}
415
416fn validate_timeout(timeout: Duration) -> Result<()> {
417    if timeout.is_zero() {
418        return Err(Error::invalid_argument(
419            "Pushgateway timeout must be greater than zero",
420        ));
421    }
422    Ok(())
423}
424
425pub(crate) fn validate_retries(retries: u32) -> Result<()> {
426    if retries > PushGatewayConfig::MAX_RETRIES {
427        return Err(Error::invalid_argument(format!(
428            "Pushgateway retries must be at most {}",
429            PushGatewayConfig::MAX_RETRIES
430        )));
431    }
432    Ok(())
433}
434
435pub(crate) fn validate_user_agent(value: &str) -> Result<()> {
436    if value.is_empty() {
437        return Err(Error::invalid_argument(
438            "Pushgateway User-Agent must not be empty",
439        ));
440    }
441    if value.chars().any(char::is_control) {
442        return Err(Error::invalid_argument(
443            "Pushgateway User-Agent must not contain control characters",
444        ));
445    }
446    Ok(())
447}
448
449pub(crate) fn is_valid_label_name(name: &str) -> bool {
450    is_valid_label_name_bytes(name.as_bytes())
451}
452
453pub(crate) fn is_reserved_label_name(name: &str) -> bool {
454    is_reserved_label_name_bytes(name.as_bytes())
455}
456
457pub(crate) fn is_reserved_label_name_bytes(name: &[u8]) -> bool {
458    name == b"job"
459}
460
461pub(crate) fn is_valid_label_name_bytes(name: &[u8]) -> bool {
462    let Some((&first, rest)) = name.split_first() else {
463        return false;
464    };
465    if !(first.is_ascii_alphabetic() || first == b'_') {
466        return false;
467    }
468    for &byte in rest {
469        if !(byte.is_ascii_alphanumeric() || byte == b'_') {
470            return false;
471        }
472    }
473    true
474}
475
476#[derive(Debug)]
477struct PushAttemptError {
478    error: Error,
479    retryable: bool,
480}
481
482fn is_retryable_status(status: StatusCode) -> bool {
483    is_retryable_status_code(status.as_u16())
484}
485
486fn is_retryable_status_code(status: u16) -> bool {
487    (500..=599).contains(&status) || status == 429
488}
489
490fn retry_delay(attempt: u32) -> Duration {
491    PUSH_RETRY_BASE_DELAY
492        .saturating_mul(2_u32.saturating_pow(attempt))
493        .min(PUSH_RETRY_MAX_DELAY)
494}
495
496fn encode_path_segment(raw: &str) -> String {
497    // Path segments cannot be delegated to Url::path_segments_mut here because
498    // the Pushgateway grouping path is assembled onto any existing base path.
499    let mut encoded = String::new();
500    for byte in raw.bytes() {
501        let encoded_byte = encode_path_byte(byte);
502        for &byte in &encoded_byte.bytes[..encoded_byte.len] {
503            encoded.push(byte as char);
504        }
505    }
506    encoded
507}
508
509#[derive(Debug, Clone, Copy)]
510struct EncodedPathByte {
511    bytes: [u8; 3],
512    len: usize,
513}
514
515fn encode_path_byte(byte: u8) -> EncodedPathByte {
516    const HEX: &[u8; 16] = b"0123456789ABCDEF";
517
518    if is_unreserved_path_byte(byte) {
519        return EncodedPathByte {
520            bytes: [byte, 0, 0],
521            len: 1,
522        };
523    }
524
525    EncodedPathByte {
526        bytes: [b'%', HEX[(byte >> 4) as usize], HEX[(byte & 0x0f) as usize]],
527        len: 3,
528    }
529}
530
531fn is_unreserved_path_byte(byte: u8) -> bool {
532    matches!(
533        byte,
534        b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~'
535    )
536}
537
538#[cfg(kani)]
539mod verification {
540    use super::*;
541
542    #[kani::proof]
543    #[kani::unwind(4)]
544    fn path_byte_encoding_escapes_reserved_bytes() {
545        let byte: u8 = kani::any();
546        let encoded = encode_path_byte(byte);
547
548        if is_unreserved_path_byte(byte) {
549            assert_eq!(encoded.len, 1);
550            assert_eq!(encoded.bytes[0], byte);
551        } else {
552            assert_eq!(encoded.len, 3);
553            assert_eq!(encoded.bytes[0], b'%');
554            assert!(encoded.bytes[1].is_ascii_hexdigit());
555            assert!(encoded.bytes[2].is_ascii_hexdigit());
556        }
557
558        for i in 0..encoded.len {
559            assert_ne!(encoded.bytes[i], b'/');
560            assert_ne!(encoded.bytes[i], b' ');
561        }
562    }
563
564    #[kani::proof]
565    fn retryable_status_codes_match_pushgateway_retry_policy() {
566        let status: u16 = kani::any();
567        let expected = status == 429 || (500..=599).contains(&status);
568
569        assert_eq!(is_retryable_status_code(status), expected);
570    }
571
572    #[kani::proof]
573    #[kani::unwind(12)]
574    fn retry_delay_is_bounded_for_configured_retry_counts() {
575        let attempt: u32 = kani::any();
576        kani::assume(attempt <= 10);
577
578        let delay = retry_delay(attempt);
579
580        assert!(delay >= PUSH_RETRY_BASE_DELAY);
581        assert!(delay <= PUSH_RETRY_MAX_DELAY);
582        assert!(!delay.is_zero());
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use std::io::{Read, Write};
589    use std::net::TcpListener;
590    use std::thread;
591
592    use crate::metrics::WindowGaugeStats;
593
594    use super::*;
595
596    #[test]
597    fn encodes_grouping_segments() {
598        assert_eq!(encode_path_segment("a b/c"), "a%20b%2Fc");
599    }
600
601    #[test]
602    fn renders_prometheus_gauges() {
603        let rendered = render_prometheus(
604            &Metrics {
605                transferred_bytes: 1.0,
606                bandwidth_bits_per_second: 8.0,
607                tcp_retransmits: Some(5.0),
608                tcp_rtt_seconds: Some(0.006),
609                tcp_rttvar_seconds: Some(0.007),
610                tcp_snd_cwnd_bytes: Some(8.0),
611                tcp_snd_wnd_bytes: Some(9.0),
612                tcp_pmtu_bytes: Some(10.0),
613                tcp_reorder_events: Some(11.0),
614                udp_packets: Some(2.0),
615                udp_lost_packets: Some(3.0),
616                udp_jitter_seconds: Some(0.004),
617                udp_out_of_order_packets: Some(12.0),
618                interval_duration_seconds: 1.0,
619                omitted: true,
620                ..Metrics::default()
621            },
622            "iperf3",
623        );
624        assert!(rendered.contains("iperf3_transferred_bytes 1\n"));
625        assert!(rendered.contains("iperf3_stream_count 0\n"));
626        assert!(rendered.contains("iperf3_tcp_rtt_seconds 0.006\n"));
627        assert!(rendered.contains("iperf3_udp_packets 2\n"));
628        assert!(rendered.contains("iperf3_udp_lost_packets 3\n"));
629        assert!(rendered.contains("iperf3_udp_jitter_seconds 0.004\n"));
630        assert!(rendered.contains("iperf3_udp_out_of_order_packets 12\n"));
631        assert!(rendered.contains("iperf3_omitted_intervals 1\n"));
632    }
633
634    #[test]
635    fn builds_pushgateway_grouping_url() {
636        let config = PushGatewayConfig::new(Url::parse("http://127.0.0.1:9091/base/").unwrap())
637            .job("iperf job")
638            .label("test", "test/one")
639            .label("scenario", "sample#1")
640            .label("mode", "client")
641            .user_agent("iperf3-rs/test");
642        let gateway = PushGateway::new(config).unwrap();
643
644        assert_eq!(
645            gateway.url().as_str(),
646            "http://127.0.0.1:9091/base/metrics/job/iperf%20job/test/test%2Fone/scenario/sample%231/mode/client"
647        );
648    }
649
650    #[test]
651    fn config_builder_sets_defaults() {
652        let config = PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap());
653
654        assert_eq!(config.job, PushGatewayConfig::DEFAULT_JOB);
655        assert!(config.labels.is_empty());
656        assert_eq!(config.timeout, PushGatewayConfig::default_timeout());
657        assert_eq!(config.retries, PushGatewayConfig::DEFAULT_RETRIES);
658        assert_eq!(config.user_agent, PushGatewayConfig::default_user_agent());
659        assert_eq!(
660            config.metric_prefix,
661            PushGatewayConfig::DEFAULT_METRIC_PREFIX
662        );
663        assert!(!config.delete_on_finish);
664        config.validate().unwrap();
665
666        let delete_config = config.delete_on_finish(true);
667        assert!(delete_config.delete_on_finish);
668    }
669
670    #[test]
671    fn config_validation_rejects_values_cli_would_reject() {
672        let endpoint = Url::parse("http://localhost:9091").unwrap();
673
674        for (label, config, expected) in [
675            (
676                "empty job",
677                PushGatewayConfig::new(endpoint.clone()).job(""),
678                "job name",
679            ),
680            (
681                "bad label",
682                PushGatewayConfig::new(endpoint.clone()).label("9bad", "value"),
683                "label name",
684            ),
685            (
686                "reserved label",
687                PushGatewayConfig::new(endpoint.clone()).label("job", "value"),
688                "reserved",
689            ),
690            (
691                "duplicate label",
692                PushGatewayConfig::new(endpoint.clone())
693                    .label("site", "a")
694                    .label("site", "b"),
695                "duplicate",
696            ),
697            (
698                "zero timeout",
699                PushGatewayConfig::new(endpoint.clone()).timeout(Duration::ZERO),
700                "timeout",
701            ),
702            (
703                "too many retries",
704                PushGatewayConfig::new(endpoint.clone())
705                    .retries(PushGatewayConfig::MAX_RETRIES + 1),
706                "retries",
707            ),
708            (
709                "bad prefix",
710                PushGatewayConfig::new(endpoint.clone()).metric_prefix("bad-prefix"),
711                "metric prefix",
712            ),
713        ] {
714            let err = config.validate().expect_err(label);
715            assert!(
716                err.to_string().contains(expected),
717                "{label} should mention {expected:?}, got {err}"
718            );
719        }
720    }
721
722    #[test]
723    fn parses_bare_pushgateway_endpoint_as_http() {
724        let url = PushGatewayConfig::parse_endpoint("localhost:9091").unwrap();
725
726        assert_eq!(url.as_str(), "http://localhost:9091/");
727    }
728
729    #[test]
730    fn delete_sends_http_delete_to_grouping_url() {
731        let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap();
732        let endpoint = Url::parse(&format!("http://{}", listener.local_addr().unwrap())).unwrap();
733        let handle = thread::spawn(move || {
734            let (mut stream, _) = listener.accept().unwrap();
735            let mut buffer = [0_u8; 1024];
736            let n = stream.read(&mut buffer).unwrap();
737            stream
738                .write_all(b"HTTP/1.1 202 Accepted\r\nContent-Length: 0\r\n\r\n")
739                .unwrap();
740            String::from_utf8_lossy(&buffer[..n]).into_owned()
741        });
742
743        let gateway =
744            PushGateway::new(PushGatewayConfig::new(endpoint).label("scenario", "delete")).unwrap();
745        gateway.delete().unwrap();
746
747        let request = handle.join().unwrap();
748        assert!(request.starts_with("DELETE /metrics/job/iperf3/scenario/delete HTTP/1.1"));
749    }
750
751    #[test]
752    fn renders_all_expected_metric_names() {
753        let rendered = render_prometheus(&Metrics::default(), "iperf3");
754
755        for name in [
756            "iperf3_transferred_bytes",
757            "iperf3_bandwidth_bits_per_second",
758            "iperf3_stream_count",
759            "iperf3_omitted_intervals",
760        ] {
761            assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
762            assert!(rendered.contains(&format!("{name} 0\n")));
763        }
764
765        for name in [
766            "iperf3_tcp_retransmits",
767            "iperf3_tcp_rtt_seconds",
768            "iperf3_tcp_rttvar_seconds",
769            "iperf3_tcp_snd_cwnd_bytes",
770            "iperf3_tcp_snd_wnd_bytes",
771            "iperf3_tcp_pmtu_bytes",
772            "iperf3_tcp_reorder_events",
773            "iperf3_udp_packets",
774            "iperf3_udp_lost_packets",
775            "iperf3_udp_jitter_seconds",
776            "iperf3_udp_out_of_order_packets",
777        ] {
778            assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
779        }
780    }
781
782    #[test]
783    fn renders_window_prometheus_gauges() {
784        let rendered = render_window_prometheus(
785            &WindowMetrics {
786                duration_seconds: 10.0,
787                transferred_bytes: 1000.0,
788                bandwidth_bits_per_second: WindowGaugeStats {
789                    samples: 2,
790                    mean: 100.0,
791                    min: 90.0,
792                    max: 110.0,
793                },
794                tcp_rtt_seconds: WindowGaugeStats {
795                    samples: 2,
796                    mean: 0.010,
797                    min: 0.005,
798                    max: 0.020,
799                },
800                tcp_retransmits: Some(3.0),
801                udp_packets: Some(4.0),
802                udp_lost_packets: Some(1.0),
803                omitted_intervals: 2.0,
804                ..WindowMetrics::default()
805            },
806            "iperf3",
807        );
808
809        assert!(rendered.contains("iperf3_window_duration_seconds 10\n"));
810        assert!(rendered.contains("iperf3_window_transferred_bytes 1000\n"));
811        assert!(rendered.contains("iperf3_window_stream_count 0\n"));
812        assert!(rendered.contains("iperf3_window_bandwidth_mean_bits_per_second 100\n"));
813        assert!(rendered.contains("iperf3_window_bandwidth_min_bits_per_second 90\n"));
814        assert!(rendered.contains("iperf3_window_bandwidth_max_bits_per_second 110\n"));
815        assert!(rendered.contains("iperf3_window_tcp_rtt_mean_seconds 0.01\n"));
816        assert!(rendered.contains("iperf3_window_tcp_rtt_min_seconds 0.005\n"));
817        assert!(rendered.contains("iperf3_window_tcp_rtt_max_seconds 0.02\n"));
818        assert!(rendered.contains("iperf3_window_tcp_retransmits 3\n"));
819        assert!(rendered.contains("iperf3_window_udp_packets 4\n"));
820        assert!(rendered.contains("iperf3_window_udp_lost_packets 1\n"));
821        assert!(rendered.contains("iperf3_window_omitted_intervals 2\n"));
822    }
823
824    #[test]
825    fn renders_all_expected_window_metric_names() {
826        let rendered = render_window_prometheus(&WindowMetrics::default(), "iperf3");
827
828        for name in [
829            "iperf3_window_duration_seconds",
830            "iperf3_window_transferred_bytes",
831            "iperf3_window_stream_count",
832            "iperf3_window_omitted_intervals",
833        ] {
834            assert!(rendered.contains(&format!("# TYPE {name} gauge\n")));
835            assert!(rendered.contains(&format!("{name} 0\n")));
836        }
837
838        for name in [
839            "iperf3_window_bandwidth_mean_bits_per_second",
840            "iperf3_window_bandwidth_min_bits_per_second",
841            "iperf3_window_bandwidth_max_bits_per_second",
842            "iperf3_window_tcp_rtt_mean_seconds",
843            "iperf3_window_tcp_rtt_min_seconds",
844            "iperf3_window_tcp_rtt_max_seconds",
845            "iperf3_window_tcp_rttvar_mean_seconds",
846            "iperf3_window_tcp_rttvar_min_seconds",
847            "iperf3_window_tcp_rttvar_max_seconds",
848            "iperf3_window_tcp_snd_cwnd_mean_bytes",
849            "iperf3_window_tcp_snd_cwnd_min_bytes",
850            "iperf3_window_tcp_snd_cwnd_max_bytes",
851            "iperf3_window_tcp_snd_wnd_mean_bytes",
852            "iperf3_window_tcp_snd_wnd_min_bytes",
853            "iperf3_window_tcp_snd_wnd_max_bytes",
854            "iperf3_window_tcp_pmtu_mean_bytes",
855            "iperf3_window_tcp_pmtu_min_bytes",
856            "iperf3_window_tcp_pmtu_max_bytes",
857            "iperf3_window_udp_jitter_mean_seconds",
858            "iperf3_window_udp_jitter_min_seconds",
859            "iperf3_window_udp_jitter_max_seconds",
860            "iperf3_window_tcp_retransmits",
861            "iperf3_window_tcp_reorder_events",
862            "iperf3_window_udp_packets",
863            "iperf3_window_udp_lost_packets",
864            "iperf3_window_udp_out_of_order_packets",
865        ] {
866            assert!(!rendered.contains(&format!("# TYPE {name} gauge\n")));
867        }
868    }
869
870    #[test]
871    fn renders_metric_names_with_custom_prefix() {
872        let rendered = render_prometheus(&Metrics::default(), "nettest");
873
874        assert!(rendered.contains("# TYPE nettest_transferred_bytes gauge\n"));
875        assert!(rendered.contains("nettest_bandwidth_bits_per_second 0\n"));
876        assert!(!rendered.contains("iperf3_transferred_bytes"));
877    }
878
879    #[test]
880    fn identifies_retryable_statuses() {
881        assert!(is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
882        assert!(is_retryable_status(StatusCode::BAD_GATEWAY));
883        assert!(!is_retryable_status(StatusCode::BAD_REQUEST));
884    }
885
886    #[test]
887    fn caps_retry_delay() {
888        assert_eq!(retry_delay(0), Duration::from_millis(100));
889        assert_eq!(retry_delay(1), Duration::from_millis(200));
890        assert_eq!(retry_delay(10), Duration::from_secs(1));
891    }
892}