Skip to main content

iperf3_rs/
command.rs

1//! High-level command API for running libiperf tests from Rust.
2//!
3//! `IperfCommand` deliberately accepts argv-style iperf arguments rather than a
4//! typed Rust clone of every upstream option. This keeps compatibility anchored
5//! to libiperf's own parser while still giving Rust callers structured results
6//! and live metric streams.
7
8use std::path::Path;
9use std::sync::{Mutex, OnceLock};
10use std::thread::{self, JoinHandle};
11use std::time::{Duration, Instant};
12
13use crossbeam_channel::{Sender, bounded};
14
15use crate::iperf::{IperfTest, Role};
16#[cfg(feature = "pushgateway")]
17use crate::metrics::IntervalMetricsReporter;
18use crate::metrics::{
19    CallbackMetricsReporter, MetricEvent, MetricsMode, MetricsStream, metric_event_stream,
20};
21#[cfg(feature = "pushgateway")]
22use crate::pushgateway::{PushGateway, PushGatewayConfig};
23use crate::{Error, Result};
24
25static RUN_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
26
27/// Builder for running an iperf test through libiperf.
28///
29/// The typed helpers append ordinary iperf arguments such as `-c`, `-p`, and
30/// `-t`. The lower-level [`IperfCommand::arg`] and [`IperfCommand::args`]
31/// methods remain available for upstream options that do not have a dedicated
32/// Rust helper.
33///
34/// Library runs suppress libiperf's normal stdout output by default. Use
35/// [`IperfCommand::inherit_output`] for upstream-style terminal output or
36/// [`IperfCommand::logfile`] to send that output to a file.
37///
38/// # Execution model
39///
40/// High-level runs are serialized inside the process because libiperf keeps
41/// process-global state. [`RunningIperf`] is therefore a completion observer,
42/// not a cancellation handle. Dropping it detaches the worker thread, and
43/// [`RunningIperf::wait_timeout`] only stops waiting. It does not stop libiperf
44/// or release this crate's process-wide run lock.
45///
46/// Use one-off server mode for library servers, or run long-lived and
47/// externally cancellable iperf workloads in a helper process that the
48/// embedding application can terminate.
49///
50/// # Examples
51///
52/// ```no_run
53/// use std::time::Duration;
54///
55/// use iperf3_rs::{IperfCommand, Result};
56///
57/// fn main() -> Result<()> {
58///     let mut command = IperfCommand::client("127.0.0.1");
59///     command.duration(Duration::from_secs(5));
60///
61///     let result = command.run()?;
62///     println!("{:?}", result.role());
63///     Ok(())
64/// }
65/// ```
66#[derive(Debug, Clone)]
67pub struct IperfCommand {
68    program: String,
69    args: Vec<String>,
70    metrics_mode: MetricsMode,
71    #[cfg(feature = "pushgateway")]
72    pushgateway: Option<PushGatewayRun>,
73    allow_unbounded_server: bool,
74    suppress_output: bool,
75}
76
77#[cfg(feature = "pushgateway")]
78#[derive(Debug, Clone)]
79struct PushGatewayRun {
80    config: PushGatewayConfig,
81    mode: MetricsMode,
82}
83
84impl IperfCommand {
85    /// Create a command with no iperf role selected yet.
86    pub fn new() -> Self {
87        Self {
88            program: "iperf3-rs".to_owned(),
89            args: Vec::new(),
90            metrics_mode: MetricsMode::Disabled,
91            #[cfg(feature = "pushgateway")]
92            pushgateway: None,
93            allow_unbounded_server: false,
94            suppress_output: true,
95        }
96    }
97
98    /// Create a client command equivalent to `iperf3 -c HOST`.
99    pub fn client(host: impl Into<String>) -> Self {
100        let mut command = Self::new();
101        command.arg("-c").arg(host);
102        command
103    }
104
105    /// Create a one-off server command equivalent to `iperf3 -s -1`.
106    ///
107    /// This is the preferred server constructor for library code because the
108    /// run exits after one accepted test and releases the process-wide libiperf
109    /// lock.
110    pub fn server_once() -> Self {
111        let mut command = Self::new();
112        command.args(["-s", "-1"]);
113        command
114    }
115
116    /// Create a long-lived server command equivalent to `iperf3 -s`.
117    ///
118    /// Long-lived servers keep the high-level libiperf lock held until the
119    /// server exits. The library does not provide in-process cancellation for
120    /// an active libiperf server, and dropping [`RunningIperf`] detaches rather
121    /// than stops it. Use this only for a process dedicated to serving tests or
122    /// for a helper process that the parent application can terminate from the
123    /// outside.
124    pub fn server_unbounded() -> Self {
125        let mut command = Self::new();
126        command.arg("-s").allow_unbounded_server(true);
127        command
128    }
129
130    /// Override the program name passed as `argv[0]` to libiperf.
131    pub fn program(&mut self, program: impl Into<String>) -> &mut Self {
132        self.program = program.into();
133        self
134    }
135
136    /// Append one iperf argument.
137    pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
138        self.args.push(arg.into());
139        self
140    }
141
142    /// Append several iperf arguments.
143    pub fn args<I, S>(&mut self, args: I) -> &mut Self
144    where
145        I: IntoIterator<Item = S>,
146        S: Into<String>,
147    {
148        self.args.extend(args.into_iter().map(Into::into));
149        self
150    }
151
152    /// Set the server port with iperf's `-p` option.
153    pub fn port(&mut self, port: u16) -> &mut Self {
154        self.arg("-p").arg(port.to_string())
155    }
156
157    /// Set client test duration with iperf's `-t` option.
158    ///
159    /// Upstream iperf parses `-t` as whole seconds. Sub-second durations are
160    /// rounded up so the typed API does not silently truncate a nonzero
161    /// [`Duration`] to `0`.
162    pub fn duration(&mut self, duration: Duration) -> &mut Self {
163        self.arg("-t").arg(whole_seconds_arg(duration))
164    }
165
166    /// Set reporting interval with iperf's `-i` option.
167    pub fn report_interval(&mut self, interval: Duration) -> &mut Self {
168        self.arg("-i").arg(decimal_seconds_arg(interval))
169    }
170
171    /// Send iperf output to a log file with iperf's `--logfile` option.
172    ///
173    /// Explicit log files are honored even though high-level library runs are
174    /// quiet by default.
175    pub fn logfile(&mut self, path: impl AsRef<Path>) -> &mut Self {
176        self.arg("--logfile")
177            .arg(path.as_ref().to_string_lossy().into_owned())
178    }
179
180    /// Suppress libiperf's normal text or JSON writes to the process stdout.
181    ///
182    /// This is the default for `IperfCommand` so library use does not
183    /// unexpectedly write to the embedding application's terminal. Retained
184    /// JSON remains available through [`IperfResult::json_output`] when
185    /// [`IperfCommand::json`] is enabled.
186    pub fn quiet(&mut self) -> &mut Self {
187        self.suppress_output = true;
188        self
189    }
190
191    /// Let libiperf write normal output to this process' stdout.
192    ///
193    /// This matches upstream `iperf3` output behavior for applications that
194    /// intentionally want human-readable output from library runs.
195    pub fn inherit_output(&mut self) -> &mut Self {
196        self.suppress_output = false;
197        self
198    }
199
200    /// Set control connection setup timeout with iperf's `--connect-timeout`.
201    ///
202    /// Upstream iperf expects milliseconds. Nonzero sub-millisecond durations
203    /// are rounded up so intent is not lost.
204    pub fn connect_timeout(&mut self, timeout: Duration) -> &mut Self {
205        self.arg("--connect-timeout").arg(milliseconds_arg(timeout))
206    }
207
208    /// Omit pre-test statistics for the given duration with iperf's `-O`.
209    pub fn omit(&mut self, duration: Duration) -> &mut Self {
210        self.arg("-O").arg(decimal_seconds_arg(duration))
211    }
212
213    /// Bind to a local address or `address%device` with iperf's `-B`.
214    pub fn bind(&mut self, address: impl Into<String>) -> &mut Self {
215        self.arg("-B").arg(address)
216    }
217
218    /// Enable UDP mode with iperf's `-u` option.
219    pub fn udp(&mut self) -> &mut Self {
220        self.arg("-u")
221    }
222
223    /// Enable SCTP mode with iperf's `--sctp` option.
224    ///
225    /// This option is available only when the linked libiperf was built with
226    /// SCTP support; otherwise libiperf reports the unsupported option.
227    pub fn sctp(&mut self) -> &mut Self {
228        self.arg("--sctp")
229    }
230
231    /// Set target bitrate in bits per second with iperf's `-b` option.
232    pub fn bitrate_bits_per_second(&mut self, bits_per_second: u64) -> &mut Self {
233        self.arg("-b").arg(bits_per_second.to_string())
234    }
235
236    /// Set the number of parallel client streams with iperf's `-P` option.
237    pub fn parallel_streams(&mut self, streams: u16) -> &mut Self {
238        self.arg("-P").arg(streams.to_string())
239    }
240
241    /// Enable reverse mode with iperf's `-R` option.
242    pub fn reverse(&mut self) -> &mut Self {
243        self.arg("-R")
244    }
245
246    /// Enable bidirectional mode with iperf's `--bidir` option.
247    pub fn bidirectional(&mut self) -> &mut Self {
248        self.arg("--bidir")
249    }
250
251    /// Disable Nagle's algorithm with iperf's `-N` option.
252    pub fn no_delay(&mut self) -> &mut Self {
253        self.arg("-N")
254    }
255
256    /// Use zero-copy send with iperf's `-Z` option.
257    pub fn zerocopy(&mut self) -> &mut Self {
258        self.arg("-Z")
259    }
260
261    /// Set TCP congestion control algorithm with iperf's `-C` option.
262    ///
263    /// Upstream support depends on the operating system and linked libiperf
264    /// build; unsupported values are reported by libiperf when the command
265    /// parses or runs.
266    pub fn congestion_control(&mut self, algorithm: impl Into<String>) -> &mut Self {
267        self.arg("-C").arg(algorithm)
268    }
269
270    /// Request retained JSON output with iperf's `-J` option.
271    pub fn json(&mut self) -> &mut Self {
272        self.arg("-J")
273    }
274
275    /// Enable or disable callback metrics for this run.
276    ///
277    /// [`MetricsMode::Interval`] and [`MetricsMode::Window`] preserve every
278    /// emitted event. Callers that spawn a metrics stream must keep draining it
279    /// for the lifetime of the run. Blocking [`IperfCommand::run`] collects all
280    /// emitted events in memory before returning, so it is intended for bounded
281    /// client runs or one-off servers rather than long-lived servers.
282    pub fn metrics(&mut self, mode: MetricsMode) -> &mut Self {
283        self.metrics_mode = mode;
284        self
285    }
286
287    /// Push live metrics for this run directly to a Pushgateway.
288    ///
289    /// `MetricsMode::Interval` uses the same freshness-oriented queue as the
290    /// CLI's immediate push mode. `MetricsMode::Window(duration)` uses the same
291    /// aggregation behavior as `--push.interval`. `MetricsMode::Disabled` is
292    /// rejected when the command is started.
293    ///
294    /// Direct Pushgateway delivery and [`IperfCommand::spawn_with_metrics`] are
295    /// currently mutually exclusive for one run because libiperf exposes a
296    /// single reporter callback. Use [`IperfCommand::spawn_with_metrics`] plus
297    /// [`PushGateway::push`] or [`PushGateway::push_window`] when application
298    /// code needs both live inspection and custom push behavior.
299    ///
300    /// Delivery is best-effort: Pushgateway push/delete failures are logged to
301    /// stderr and do not make the iperf run fail. Applications that require
302    /// strict delivery should use [`IperfCommand::spawn_with_metrics`] and call
303    /// [`PushGateway::push`] or [`PushGateway::push_window`] themselves.
304    #[cfg(feature = "pushgateway")]
305    pub fn pushgateway(&mut self, config: PushGatewayConfig, mode: MetricsMode) -> &mut Self {
306        self.pushgateway = Some(PushGatewayRun { config, mode });
307        self
308    }
309
310    /// Disable direct Pushgateway delivery for this command.
311    #[cfg(feature = "pushgateway")]
312    pub fn clear_pushgateway(&mut self) -> &mut Self {
313        self.pushgateway = None;
314        self
315    }
316
317    /// Run the iperf test to completion while pushing metrics to Pushgateway.
318    ///
319    /// Pushgateway delivery uses the same best-effort policy as the CLI. This
320    /// method returns an error for setup or libiperf execution failures, but
321    /// transient push/delete failures are logged and do not change the result.
322    /// Use [`IperfCommand::spawn_with_metrics`] plus [`PushGateway::push`] or
323    /// [`PushGateway::push_window`] when delivery failures must be handled
324    /// strictly by application code.
325    #[cfg(feature = "pushgateway")]
326    pub fn run_with_pushgateway(
327        &self,
328        config: PushGatewayConfig,
329        mode: MetricsMode,
330    ) -> Result<IperfResult> {
331        let mut command = self.clone();
332        command.pushgateway = Some(PushGatewayRun { config, mode });
333        command.run()
334    }
335
336    /// Run iperf on a worker thread while pushing metrics to Pushgateway.
337    ///
338    /// Pushgateway delivery is best-effort for this convenience method; use
339    /// [`IperfCommand::spawn_with_metrics`] and call [`PushGateway`] directly
340    /// when the application must treat delivery errors as run failures.
341    #[cfg(feature = "pushgateway")]
342    pub fn spawn_with_pushgateway(
343        &self,
344        config: PushGatewayConfig,
345        mode: MetricsMode,
346    ) -> Result<RunningIperf> {
347        let mut command = self.clone();
348        command.pushgateway = Some(PushGatewayRun { config, mode });
349        command.spawn()
350    }
351
352    /// Allow `-s` server runs that do not include iperf's one-off option.
353    ///
354    /// Long-lived servers keep libiperf running on the worker thread and keep
355    /// this crate's process-wide libiperf lock held. The default is therefore
356    /// conservative: server mode must use `-1`/`--one-off` unless this opt-in is
357    /// set. The CLI does not use this high-level API, so normal `iperf3-rs -s`
358    /// behavior is unchanged. Once started, an unbounded in-process server must
359    /// exit through libiperf itself; this API intentionally does not expose an
360    /// in-process cancellation primitive. Prefer a dedicated helper process
361    /// whenever the owner must enforce an external timeout or stop policy.
362    pub fn allow_unbounded_server(&mut self, allow: bool) -> &mut Self {
363        self.allow_unbounded_server = allow;
364        self
365    }
366
367    /// Run the iperf test to completion and collect metric events in memory.
368    ///
369    /// When metrics are enabled, every emitted event is retained in the returned
370    /// [`IperfResult`]. Keep metrics disabled for long-running or unbounded
371    /// runs, or use [`IperfCommand::spawn_with_metrics`] and drain the stream
372    /// while the run is active.
373    pub fn run(&self) -> Result<IperfResult> {
374        run_command(self.clone(), None)
375    }
376
377    /// Run iperf on a worker thread and optionally stream metric events live.
378    ///
379    /// If metrics are enabled, call [`RunningIperf::take_metrics`] before
380    /// [`RunningIperf::wait`] to consume live events. Dropping the returned
381    /// handle detaches the worker and does not cancel the underlying libiperf
382    /// run. If you keep the metrics stream, drain it continuously for the
383    /// lifetime of the run; every-sample streams use unbounded queues so the
384    /// libiperf reporting callback never blocks on application code.
385    pub fn spawn(&self) -> Result<RunningIperf> {
386        let command = self.clone();
387        let (ready_tx, ready_rx) = bounded::<ReadyMessage>(1);
388        let handle = thread::spawn(move || run_command(command, Some(ready_tx)));
389
390        match ready_rx.recv() {
391            Ok(Ok(metrics)) => Ok(RunningIperf {
392                handle: Some(handle),
393                metrics,
394            }),
395            Ok(Err(err)) => {
396                let _ = handle.join();
397                Err(Error::worker(err))
398            }
399            Err(err) => {
400                let _ = handle.join();
401                Err(Error::worker(format!(
402                    "iperf worker exited before setup completed: {err}"
403                )))
404            }
405        }
406    }
407
408    /// Run iperf on a worker thread and return the live metric stream.
409    ///
410    /// This is a convenience wrapper around [`IperfCommand::metrics`],
411    /// [`IperfCommand::spawn`], and [`RunningIperf::take_metrics`] for callers
412    /// that know they want metrics for this run. The returned stream is part of
413    /// the run contract: drain it until it closes, or drop it if metrics are no
414    /// longer needed. Keeping it alive but unread can grow memory on long runs.
415    pub fn spawn_with_metrics(&self, mode: MetricsMode) -> Result<(RunningIperf, MetricsStream)> {
416        let mut command = self.clone();
417        command.metrics(mode);
418        let mut running = command.spawn()?;
419        let metrics = running
420            .take_metrics()
421            .ok_or_else(|| Error::internal("metrics stream was not created"))?;
422        Ok((running, metrics))
423    }
424
425    fn argv(&self) -> Vec<String> {
426        let mut argv = Vec::with_capacity(self.args.len() + 1);
427        argv.push(self.program.clone());
428        argv.extend(self.args.iter().cloned());
429        argv
430    }
431
432    fn should_suppress_output(&self) -> bool {
433        self.suppress_output && !self.has_logfile_arg()
434    }
435
436    fn has_logfile_arg(&self) -> bool {
437        self.args
438            .iter()
439            .any(|arg| arg == "--logfile" || arg.starts_with("--logfile="))
440    }
441}
442
443impl Default for IperfCommand {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449/// Completed result from a blocking or spawned iperf run.
450#[derive(Debug)]
451pub struct IperfResult {
452    role: Role,
453    json_output: Option<String>,
454    metrics: Vec<MetricEvent>,
455}
456
457impl IperfResult {
458    /// Role selected by libiperf after parsing the supplied arguments.
459    pub fn role(&self) -> Role {
460        self.role
461    }
462
463    /// Upstream JSON result if JSON output was requested and libiperf retained it.
464    pub fn json_output(&self) -> Option<&str> {
465        self.json_output.as_deref()
466    }
467
468    /// Parse the retained upstream JSON result as a [`serde_json::Value`].
469    ///
470    /// Returns `None` when JSON output was not requested with
471    /// [`IperfCommand::json`]. The raw string remains available through
472    /// [`IperfResult::json_output`] for callers that prefer their own parser.
473    #[cfg(feature = "serde")]
474    pub fn json_value(&self) -> Option<std::result::Result<serde_json::Value, serde_json::Error>> {
475        self.json_output.as_deref().map(serde_json::from_str)
476    }
477
478    /// Metric events collected by `IperfCommand::run`.
479    ///
480    /// Spawned commands deliver live metrics through `RunningIperf` instead, so
481    /// their completed result does not duplicate the stream contents.
482    pub fn metrics(&self) -> &[MetricEvent] {
483        &self.metrics
484    }
485}
486
487/// Handle for an iperf run executing on a worker thread.
488///
489/// This handle observes completion; it does not own a safe cancellation
490/// mechanism for the underlying libiperf run. Dropping it detaches the worker.
491/// `try_wait`, `wait_timeout`, and `wait` only observe or join the worker; none
492/// of them requests libiperf shutdown. Use one-off server mode or a dedicated
493/// helper process when a run must be externally stopped, isolated from hangs, or
494/// allowed to coexist with other runs in the parent process.
495#[derive(Debug)]
496#[must_use = "dropping RunningIperf detaches the worker; call wait to observe the iperf result"]
497pub struct RunningIperf {
498    handle: Option<JoinHandle<Result<IperfResult>>>,
499    metrics: Option<MetricsStream>,
500}
501
502impl RunningIperf {
503    /// Borrow the live metric stream, if metrics were enabled.
504    pub fn metrics(&self) -> Option<&MetricsStream> {
505        self.metrics.as_ref()
506    }
507
508    /// Take ownership of the live metric stream.
509    pub fn take_metrics(&mut self) -> Option<MetricsStream> {
510        self.metrics.take()
511    }
512
513    /// Return `true` if the worker thread has finished.
514    pub fn is_finished(&self) -> bool {
515        self.handle
516            .as_ref()
517            .map(JoinHandle::is_finished)
518            .unwrap_or(true)
519    }
520
521    /// Return the result if the worker has finished, without blocking.
522    ///
523    /// After this returns `Ok(Some(_))`, the worker result has been consumed and
524    /// later calls to `try_wait`, `wait_timeout`, or `wait` will report that the
525    /// run was already observed.
526    pub fn try_wait(&mut self) -> Result<Option<IperfResult>> {
527        if !self.is_finished() {
528            return Ok(None);
529        }
530        self.take_finished_result().map(Some)
531    }
532
533    /// Wait up to `timeout` for the worker to finish.
534    ///
535    /// Returns `Ok(None)` when the timeout expires before the iperf run exits.
536    /// A zero timeout performs a single nonblocking poll. Timeout expiration
537    /// does not stop the iperf run; call this again, call [`RunningIperf::wait`],
538    /// or manage cancellation outside this in-process API.
539    pub fn wait_timeout(&mut self, timeout: Duration) -> Result<Option<IperfResult>> {
540        let deadline = Instant::now()
541            .checked_add(timeout)
542            .unwrap_or_else(Instant::now);
543        loop {
544            if self.is_finished() {
545                return self.take_finished_result().map(Some);
546            }
547            if timeout.is_zero() || Instant::now() >= deadline {
548                return Ok(None);
549            }
550            thread::sleep(
551                Duration::from_millis(10).min(deadline.saturating_duration_since(Instant::now())),
552            );
553        }
554    }
555
556    /// Wait until the iperf worker exits.
557    pub fn wait(mut self) -> Result<IperfResult> {
558        self.take_handle()?
559            .join()
560            .map_err(|_| Error::worker("iperf worker thread panicked"))?
561    }
562
563    fn take_finished_result(&mut self) -> Result<IperfResult> {
564        self.take_handle()?
565            .join()
566            .map_err(|_| Error::worker("iperf worker thread panicked"))?
567    }
568
569    fn take_handle(&mut self) -> Result<JoinHandle<Result<IperfResult>>> {
570        self.handle
571            .take()
572            .ok_or_else(|| Error::worker("iperf worker result was already observed"))
573    }
574}
575
576type ReadyMessage = std::result::Result<Option<MetricsStream>, String>;
577
578struct RunSetup {
579    test: IperfTest,
580    role: Role,
581    callback: Option<CallbackMetricsReporter>,
582    stream: Option<MetricsStream>,
583    worker: Option<JoinHandle<()>>,
584    #[cfg(feature = "pushgateway")]
585    push_reporter: Option<IntervalMetricsReporter>,
586}
587
588fn run_command(command: IperfCommand, ready: Option<Sender<ReadyMessage>>) -> Result<IperfResult> {
589    let _guard = run_lock()
590        .lock()
591        .map_err(|_| Error::internal("libiperf run lock is poisoned"))?;
592
593    let mut setup = match setup_run(command) {
594        Ok(setup) => setup,
595        Err(err) => {
596            notify_ready(ready, Err(format!("{err:#}")));
597            return Err(err);
598        }
599    };
600
601    let ready_stream = if ready.is_some() {
602        setup.stream.take()
603    } else {
604        None
605    };
606    notify_ready(ready, Ok(ready_stream));
607
608    let result = setup.test.run();
609    let json_output = setup.test.json_output();
610
611    // Removing the callback first closes the raw metrics channel, allowing the
612    // event worker to flush any final window and exit before the result returns.
613    drop(setup.callback.take());
614    if let Some(worker) = setup.worker.take() {
615        let _ = worker.join();
616    }
617    #[cfg(feature = "pushgateway")]
618    let push_result = setup
619        .push_reporter
620        .take()
621        .map(IntervalMetricsReporter::finish)
622        .transpose();
623
624    let metrics = setup
625        .stream
626        .map(|stream| stream.collect())
627        .unwrap_or_default();
628
629    result?;
630    #[cfg(feature = "pushgateway")]
631    push_result?;
632    Ok(IperfResult {
633        role: setup.role,
634        json_output,
635        metrics,
636    })
637}
638
639fn setup_run(command: IperfCommand) -> Result<RunSetup> {
640    validate_metrics_mode(command.metrics_mode)?;
641    #[cfg(feature = "pushgateway")]
642    validate_pushgateway_request(&command)?;
643
644    let mut test = IperfTest::new()?;
645    test.parse_arguments(&command.argv())?;
646    if command.should_suppress_output() {
647        test.suppress_output()?;
648    }
649    let role = test.role();
650    validate_server_lifecycle(&command, &test, role)?;
651
652    #[cfg(feature = "pushgateway")]
653    let (callback, stream, worker, push_reporter) =
654        if let Some(queue) = command.metrics_mode.callback_queue() {
655            let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
656            let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
657            (Some(callback), Some(stream), Some(worker), None)
658        } else if let Some(pushgateway) = command.pushgateway {
659            let sink = PushGateway::new(pushgateway.config)?;
660            let reporter =
661                IntervalMetricsReporter::attach(&mut test, sink, pushgateway.mode.push_interval())?;
662            (None, None, None, Some(reporter))
663        } else {
664            (None, None, None, None)
665        };
666    #[cfg(not(feature = "pushgateway"))]
667    let (callback, stream, worker) = match command.metrics_mode.callback_queue() {
668        Some(queue) => {
669            let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
670            let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
671            (Some(callback), Some(stream), Some(worker))
672        }
673        None => (None, None, None),
674    };
675
676    Ok(RunSetup {
677        test,
678        role,
679        callback,
680        stream,
681        worker,
682        #[cfg(feature = "pushgateway")]
683        push_reporter,
684    })
685}
686
687fn notify_ready(ready: Option<Sender<ReadyMessage>>, message: ReadyMessage) {
688    if let Some(ready) = ready {
689        let _ = ready.send(message);
690    }
691}
692
693fn run_lock() -> &'static Mutex<()> {
694    // libiperf still has process-global state, including its current error and
695    // signal/output hooks. The first public API keeps high-level runs
696    // serialized so callers do not accidentally depend on best-effort
697    // in-process parallelism that libiperf does not clearly promise.
698    //
699    // If parallel library runs become important, prefer adding a process-backed
700    // runner first. A helper process gives each libiperf instance its own
701    // globals, lets the parent enforce real kill/timeout policy, and avoids
702    // wedging later in-process runs behind a detached or hung worker. Removing
703    // this lock for true in-process concurrency should come only after upstream
704    // and shim state are audited and covered by stress tests.
705    RUN_LOCK.get_or_init(|| Mutex::new(()))
706}
707
708fn validate_metrics_mode(mode: MetricsMode) -> Result<()> {
709    if metrics_mode_is_valid(mode) {
710        Ok(())
711    } else {
712        Err(Error::invalid_metrics_mode(
713            "metrics window interval must be greater than zero",
714        ))
715    }
716}
717
718#[cfg(feature = "pushgateway")]
719fn validate_pushgateway_request(command: &IperfCommand) -> Result<()> {
720    let Some(pushgateway) = &command.pushgateway else {
721        return Ok(());
722    };
723    if command.metrics_mode.is_enabled() {
724        return Err(Error::invalid_argument(
725            "direct Pushgateway delivery cannot be combined with a MetricsStream in the same IperfCommand run",
726        ));
727    }
728    validate_pushgateway_mode(pushgateway.mode)
729}
730
731#[cfg(feature = "pushgateway")]
732fn validate_pushgateway_mode(mode: MetricsMode) -> Result<()> {
733    match mode {
734        MetricsMode::Disabled => Err(Error::invalid_metrics_mode(
735            "Pushgateway metrics mode must be Interval or Window",
736        )),
737        MetricsMode::Interval => Ok(()),
738        MetricsMode::Window(interval) if interval.is_zero() => Err(Error::invalid_metrics_mode(
739            "metrics window interval must be greater than zero",
740        )),
741        MetricsMode::Window(_) => Ok(()),
742    }
743}
744
745#[cfg(feature = "pushgateway")]
746impl MetricsMode {
747    fn push_interval(self) -> Option<Duration> {
748        match self {
749            MetricsMode::Disabled | MetricsMode::Interval => None,
750            MetricsMode::Window(interval) => Some(interval),
751        }
752    }
753}
754
755fn metrics_mode_is_valid(mode: MetricsMode) -> bool {
756    !matches!(mode, MetricsMode::Window(interval) if interval.is_zero())
757}
758
759fn whole_seconds_arg(duration: Duration) -> String {
760    let seconds = if duration.subsec_nanos() == 0 {
761        duration.as_secs()
762    } else {
763        duration.as_secs().saturating_add(1)
764    };
765    seconds.to_string()
766}
767
768fn decimal_seconds_arg(duration: Duration) -> String {
769    let seconds = duration.as_secs();
770    let nanos = duration.subsec_nanos();
771    if nanos == 0 {
772        return seconds.to_string();
773    }
774
775    let mut value = format!("{seconds}.{nanos:09}");
776    while value.ends_with('0') {
777        value.pop();
778    }
779    value
780}
781
782fn milliseconds_arg(duration: Duration) -> String {
783    let millis = duration.as_millis();
784    let has_fractional_millis = !duration.subsec_nanos().is_multiple_of(1_000_000);
785    if has_fractional_millis {
786        millis.saturating_add(1).to_string()
787    } else {
788        millis.to_string()
789    }
790}
791
792fn validate_server_lifecycle(command: &IperfCommand, test: &IperfTest, role: Role) -> Result<()> {
793    if role == Role::Server && !test.one_off() && !command.allow_unbounded_server {
794        return Err(Error::invalid_argument(
795            "IperfCommand server mode must use -1/--one-off or opt in with allow_unbounded_server(true)",
796        ));
797    }
798    Ok(())
799}
800
801#[cfg(kani)]
802mod verification {
803    use std::time::Duration;
804
805    use super::*;
806
807    #[kani::proof]
808    fn zero_window_interval_is_the_only_invalid_metrics_mode() {
809        let seconds: u8 = kani::any();
810        let mode = MetricsMode::Window(Duration::from_secs(u64::from(seconds)));
811
812        assert_eq!(metrics_mode_is_valid(mode), seconds != 0);
813        assert!(metrics_mode_is_valid(MetricsMode::Disabled));
814        assert!(metrics_mode_is_valid(MetricsMode::Interval));
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use std::time::Duration;
821
822    use crate::ErrorKind;
823    #[cfg(feature = "pushgateway")]
824    use url::Url;
825
826    use super::*;
827
828    #[test]
829    fn argv_includes_program_name_before_iperf_arguments() {
830        let mut command = IperfCommand::new();
831        command.arg("-c").arg("127.0.0.1");
832
833        assert_eq!(
834            command.argv(),
835            vec![
836                "iperf3-rs".to_owned(),
837                "-c".to_owned(),
838                "127.0.0.1".to_owned()
839            ]
840        );
841    }
842
843    #[test]
844    fn custom_program_name_is_used_as_argv_zero() {
845        let mut command = IperfCommand::new();
846        command.program("iperf3").arg("-v");
847
848        assert_eq!(command.argv(), vec!["iperf3".to_owned(), "-v".to_owned()]);
849    }
850
851    #[test]
852    fn library_output_is_quiet_by_default() {
853        let command = IperfCommand::new();
854
855        assert!(command.should_suppress_output());
856    }
857
858    #[test]
859    fn inherit_output_disables_library_quiet_default() {
860        let mut command = IperfCommand::new();
861        command.inherit_output();
862
863        assert!(!command.should_suppress_output());
864
865        command.quiet();
866        assert!(command.should_suppress_output());
867    }
868
869    #[test]
870    fn explicit_logfile_disables_null_output_sink() {
871        let mut typed = IperfCommand::new();
872        typed.logfile("iperf.log");
873
874        let mut raw_split = IperfCommand::new();
875        raw_split.arg("--logfile").arg("iperf.log");
876
877        let mut raw_equals = IperfCommand::new();
878        raw_equals.arg("--logfile=iperf.log");
879
880        assert!(!typed.should_suppress_output());
881        assert!(!raw_split.should_suppress_output());
882        assert!(!raw_equals.should_suppress_output());
883    }
884
885    #[test]
886    fn typed_client_builder_appends_iperf_arguments() {
887        let mut command = IperfCommand::client("192.0.2.10");
888        command
889            .port(5202)
890            .duration(Duration::from_secs(3))
891            .report_interval(Duration::from_millis(500))
892            .udp()
893            .bitrate_bits_per_second(1_000_000)
894            .parallel_streams(4)
895            .reverse()
896            .json()
897            .arg("--get-server-output");
898
899        assert_eq!(
900            command.argv(),
901            vec![
902                "iperf3-rs".to_owned(),
903                "-c".to_owned(),
904                "192.0.2.10".to_owned(),
905                "-p".to_owned(),
906                "5202".to_owned(),
907                "-t".to_owned(),
908                "3".to_owned(),
909                "-i".to_owned(),
910                "0.5".to_owned(),
911                "-u".to_owned(),
912                "-b".to_owned(),
913                "1000000".to_owned(),
914                "-P".to_owned(),
915                "4".to_owned(),
916                "-R".to_owned(),
917                "-J".to_owned(),
918                "--get-server-output".to_owned(),
919            ]
920        );
921    }
922
923    #[test]
924    fn typed_operational_helpers_append_iperf_arguments() {
925        let mut command = IperfCommand::client("192.0.2.10");
926        command
927            .logfile("iperf.log")
928            .connect_timeout(Duration::from_millis(1500))
929            .omit(Duration::from_millis(250))
930            .bind("127.0.0.1%lo0")
931            .no_delay()
932            .zerocopy()
933            .congestion_control("cubic");
934
935        assert_eq!(
936            command.argv(),
937            vec![
938                "iperf3-rs".to_owned(),
939                "-c".to_owned(),
940                "192.0.2.10".to_owned(),
941                "--logfile".to_owned(),
942                "iperf.log".to_owned(),
943                "--connect-timeout".to_owned(),
944                "1500".to_owned(),
945                "-O".to_owned(),
946                "0.25".to_owned(),
947                "-B".to_owned(),
948                "127.0.0.1%lo0".to_owned(),
949                "-N".to_owned(),
950                "-Z".to_owned(),
951                "-C".to_owned(),
952                "cubic".to_owned(),
953            ]
954        );
955    }
956
957    #[test]
958    fn typed_server_constructors_select_expected_lifecycle() {
959        let one_off = IperfCommand::server_once();
960        assert_eq!(
961            one_off.argv(),
962            vec!["iperf3-rs".to_owned(), "-s".to_owned(), "-1".to_owned()]
963        );
964        assert!(!one_off.allow_unbounded_server);
965
966        let unbounded = IperfCommand::server_unbounded();
967        assert_eq!(
968            unbounded.argv(),
969            vec!["iperf3-rs".to_owned(), "-s".to_owned()]
970        );
971        assert!(unbounded.allow_unbounded_server);
972    }
973
974    #[test]
975    fn bidirectional_helper_appends_long_option() {
976        let mut command = IperfCommand::client("192.0.2.10");
977        command.bidirectional();
978
979        assert_eq!(
980            command.argv(),
981            vec![
982                "iperf3-rs".to_owned(),
983                "-c".to_owned(),
984                "192.0.2.10".to_owned(),
985                "--bidir".to_owned()
986            ]
987        );
988    }
989
990    #[test]
991    fn sctp_helper_appends_long_option() {
992        let mut command = IperfCommand::client("192.0.2.10");
993        command.sctp();
994
995        assert_eq!(
996            command.argv(),
997            vec![
998                "iperf3-rs".to_owned(),
999                "-c".to_owned(),
1000                "192.0.2.10".to_owned(),
1001                "--sctp".to_owned()
1002            ]
1003        );
1004    }
1005
1006    #[cfg(feature = "pushgateway")]
1007    #[test]
1008    fn pushgateway_helper_records_delivery_config() {
1009        let config = PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap())
1010            .label("scenario", "library");
1011        let mut command = IperfCommand::client("192.0.2.10");
1012        command.pushgateway(config, MetricsMode::Window(Duration::from_secs(5)));
1013
1014        let pushgateway = command.pushgateway.as_ref().unwrap();
1015        assert_eq!(
1016            pushgateway.mode,
1017            MetricsMode::Window(Duration::from_secs(5))
1018        );
1019        assert_eq!(
1020            pushgateway.config.labels,
1021            [("scenario".to_owned(), "library".to_owned())]
1022        );
1023
1024        command.clear_pushgateway();
1025        assert!(command.pushgateway.is_none());
1026    }
1027
1028    #[cfg(feature = "pushgateway")]
1029    #[test]
1030    fn pushgateway_convenience_helpers_do_not_persist_config() {
1031        let mut command = IperfCommand::new();
1032        command.metrics(MetricsMode::Window(Duration::ZERO));
1033
1034        let result = command.run_with_pushgateway(
1035            PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1036            MetricsMode::Interval,
1037        );
1038
1039        assert!(result.is_err());
1040        assert!(command.pushgateway.is_none());
1041    }
1042
1043    #[test]
1044    fn spawn_with_metrics_does_not_persist_metrics_mode() {
1045        let command = IperfCommand::new();
1046
1047        let err = command
1048            .spawn_with_metrics(MetricsMode::Window(Duration::ZERO))
1049            .unwrap_err();
1050
1051        assert!(err.to_string().contains("greater than zero"), "{err:#}");
1052        assert_eq!(command.metrics_mode, MetricsMode::Disabled);
1053    }
1054
1055    #[test]
1056    fn duration_helpers_preserve_nonzero_subsecond_intent() {
1057        assert_eq!(whole_seconds_arg(Duration::ZERO), "0");
1058        assert_eq!(whole_seconds_arg(Duration::from_millis(1)), "1");
1059        assert_eq!(whole_seconds_arg(Duration::from_millis(1500)), "2");
1060        assert_eq!(decimal_seconds_arg(Duration::ZERO), "0");
1061        assert_eq!(decimal_seconds_arg(Duration::from_millis(250)), "0.25");
1062        assert_eq!(decimal_seconds_arg(Duration::new(1, 1)), "1.000000001");
1063        assert_eq!(milliseconds_arg(Duration::ZERO), "0");
1064        assert_eq!(milliseconds_arg(Duration::from_nanos(1)), "1");
1065        assert_eq!(milliseconds_arg(Duration::from_millis(1500)), "1500");
1066        assert_eq!(milliseconds_arg(Duration::new(1, 1)), "1001");
1067    }
1068
1069    #[test]
1070    fn unbounded_server_mode_is_rejected_by_default() {
1071        let command = {
1072            let mut command = IperfCommand::new();
1073            command.arg("-s");
1074            command
1075        };
1076
1077        let err = match setup_run(command) {
1078            Ok(_) => panic!("unbounded server should be rejected"),
1079            Err(err) => err,
1080        };
1081        assert_eq!(err.kind(), ErrorKind::InvalidArgument);
1082        assert!(err.to_string().contains("allow_unbounded_server"));
1083    }
1084
1085    #[test]
1086    fn one_off_server_mode_is_allowed() {
1087        let command = {
1088            let mut command = IperfCommand::new();
1089            command.args(["-s", "-1"]);
1090            command
1091        };
1092
1093        let setup = setup_run(command).unwrap();
1094        assert_eq!(setup.role, Role::Server);
1095    }
1096
1097    #[test]
1098    fn unbounded_server_mode_can_be_explicitly_allowed() {
1099        let command = {
1100            let mut command = IperfCommand::new();
1101            command.arg("-s").allow_unbounded_server(true);
1102            command
1103        };
1104
1105        let setup = setup_run(command).unwrap();
1106        assert_eq!(setup.role, Role::Server);
1107    }
1108
1109    #[test]
1110    fn zero_metrics_window_interval_is_rejected_before_running_iperf() {
1111        let mut command = IperfCommand::new();
1112        command.metrics(MetricsMode::Window(Duration::ZERO));
1113
1114        let err = command.run().unwrap_err();
1115        assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
1116        assert!(err.to_string().contains("greater than zero"));
1117    }
1118
1119    #[cfg(feature = "pushgateway")]
1120    #[test]
1121    fn direct_pushgateway_rejects_disabled_or_zero_window_mode() {
1122        for mode in [MetricsMode::Disabled, MetricsMode::Window(Duration::ZERO)] {
1123            let command = {
1124                let mut command = IperfCommand::new();
1125                command.arg("-s").arg("-1").pushgateway(
1126                    PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1127                    mode,
1128                );
1129                command
1130            };
1131
1132            let err = match setup_run(command) {
1133                Ok(_) => panic!("invalid Pushgateway mode should be rejected"),
1134                Err(err) => err,
1135            };
1136            assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
1137        }
1138    }
1139
1140    #[cfg(feature = "pushgateway")]
1141    #[test]
1142    fn direct_pushgateway_is_rejected_when_metrics_stream_is_enabled() {
1143        let command = {
1144            let mut command = IperfCommand::new();
1145            command
1146                .arg("-s")
1147                .arg("-1")
1148                .metrics(MetricsMode::Interval)
1149                .pushgateway(
1150                    PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1151                    MetricsMode::Interval,
1152                );
1153            command
1154        };
1155
1156        let err = match setup_run(command) {
1157            Ok(_) => panic!("direct Pushgateway and MetricsStream should be rejected together"),
1158            Err(err) => err,
1159        };
1160        assert_eq!(err.kind(), ErrorKind::InvalidArgument);
1161        assert!(err.to_string().contains("cannot be combined"));
1162    }
1163
1164    #[test]
1165    fn running_iperf_try_wait_observes_finished_worker_once() {
1166        let mut running = RunningIperf {
1167            handle: Some(thread::spawn(|| Ok(test_result()))),
1168            metrics: None,
1169        };
1170
1171        let result = running
1172            .wait_timeout(Duration::from_secs(1))
1173            .unwrap()
1174            .expect("worker should finish");
1175        assert_eq!(result.role(), Role::Client);
1176        assert_eq!(running.try_wait().unwrap_err().kind(), ErrorKind::Worker);
1177    }
1178
1179    #[test]
1180    fn running_iperf_try_wait_returns_none_while_worker_is_running() {
1181        let (release_tx, release_rx) = bounded::<()>(1);
1182        let mut running = RunningIperf {
1183            handle: Some(thread::spawn(move || {
1184                release_rx.recv().unwrap();
1185                Ok(test_result())
1186            })),
1187            metrics: None,
1188        };
1189
1190        assert!(!running.is_finished());
1191        assert!(running.try_wait().unwrap().is_none());
1192        assert!(running.wait_timeout(Duration::ZERO).unwrap().is_none());
1193
1194        release_tx.send(()).unwrap();
1195        assert!(
1196            running
1197                .wait_timeout(Duration::from_secs(1))
1198                .unwrap()
1199                .is_some()
1200        );
1201    }
1202
1203    #[test]
1204    fn run_without_client_or_server_role_fails_fast() {
1205        let command = IperfCommand::new();
1206
1207        let err = command.run().unwrap_err();
1208        assert_eq!(err.kind(), ErrorKind::Libiperf);
1209        assert!(
1210            err.to_string().contains("client (-c) or server (-s)"),
1211            "{err:#}"
1212        );
1213    }
1214
1215    fn test_result() -> IperfResult {
1216        IperfResult {
1217            role: Role::Client,
1218            json_output: None,
1219            metrics: Vec::new(),
1220        }
1221    }
1222}