1use std::path::Path;
9use std::sync::{Mutex, OnceLock};
10use std::thread::{self, JoinHandle};
11use std::time::{Duration, Instant};
12
13use crossbeam_channel::{Sender, bounded};
14
15use crate::iperf::{IperfTest, Role};
16#[cfg(feature = "pushgateway")]
17use crate::metrics::IntervalMetricsReporter;
18use crate::metrics::{
19 CallbackMetricsReporter, MetricEvent, MetricsMode, MetricsStream, metric_event_stream,
20};
21#[cfg(feature = "pushgateway")]
22use crate::pushgateway::{PushGateway, PushGatewayConfig};
23use crate::{Error, Result};
24
25static RUN_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
26
27#[derive(Debug, Clone)]
67pub struct IperfCommand {
68 program: String,
69 args: Vec<String>,
70 metrics_mode: MetricsMode,
71 #[cfg(feature = "pushgateway")]
72 pushgateway: Option<PushGatewayRun>,
73 allow_unbounded_server: bool,
74 suppress_output: bool,
75}
76
77#[cfg(feature = "pushgateway")]
78#[derive(Debug, Clone)]
79struct PushGatewayRun {
80 config: PushGatewayConfig,
81 mode: MetricsMode,
82}
83
84impl IperfCommand {
85 pub fn new() -> Self {
87 Self {
88 program: "iperf3-rs".to_owned(),
89 args: Vec::new(),
90 metrics_mode: MetricsMode::Disabled,
91 #[cfg(feature = "pushgateway")]
92 pushgateway: None,
93 allow_unbounded_server: false,
94 suppress_output: true,
95 }
96 }
97
98 pub fn client(host: impl Into<String>) -> Self {
100 let mut command = Self::new();
101 command.arg("-c").arg(host);
102 command
103 }
104
105 pub fn server_once() -> Self {
111 let mut command = Self::new();
112 command.args(["-s", "-1"]);
113 command
114 }
115
116 pub fn server_unbounded() -> Self {
125 let mut command = Self::new();
126 command.arg("-s").allow_unbounded_server(true);
127 command
128 }
129
130 pub fn program(&mut self, program: impl Into<String>) -> &mut Self {
132 self.program = program.into();
133 self
134 }
135
136 pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
138 self.args.push(arg.into());
139 self
140 }
141
142 pub fn args<I, S>(&mut self, args: I) -> &mut Self
144 where
145 I: IntoIterator<Item = S>,
146 S: Into<String>,
147 {
148 self.args.extend(args.into_iter().map(Into::into));
149 self
150 }
151
152 pub fn port(&mut self, port: u16) -> &mut Self {
154 self.arg("-p").arg(port.to_string())
155 }
156
157 pub fn duration(&mut self, duration: Duration) -> &mut Self {
163 self.arg("-t").arg(whole_seconds_arg(duration))
164 }
165
166 pub fn report_interval(&mut self, interval: Duration) -> &mut Self {
168 self.arg("-i").arg(decimal_seconds_arg(interval))
169 }
170
171 pub fn logfile(&mut self, path: impl AsRef<Path>) -> &mut Self {
176 self.arg("--logfile")
177 .arg(path.as_ref().to_string_lossy().into_owned())
178 }
179
180 pub fn quiet(&mut self) -> &mut Self {
187 self.suppress_output = true;
188 self
189 }
190
191 pub fn inherit_output(&mut self) -> &mut Self {
196 self.suppress_output = false;
197 self
198 }
199
200 pub fn connect_timeout(&mut self, timeout: Duration) -> &mut Self {
205 self.arg("--connect-timeout").arg(milliseconds_arg(timeout))
206 }
207
208 pub fn omit(&mut self, duration: Duration) -> &mut Self {
210 self.arg("-O").arg(decimal_seconds_arg(duration))
211 }
212
213 pub fn bind(&mut self, address: impl Into<String>) -> &mut Self {
215 self.arg("-B").arg(address)
216 }
217
218 pub fn udp(&mut self) -> &mut Self {
220 self.arg("-u")
221 }
222
223 pub fn sctp(&mut self) -> &mut Self {
228 self.arg("--sctp")
229 }
230
231 pub fn bitrate_bits_per_second(&mut self, bits_per_second: u64) -> &mut Self {
233 self.arg("-b").arg(bits_per_second.to_string())
234 }
235
236 pub fn parallel_streams(&mut self, streams: u16) -> &mut Self {
238 self.arg("-P").arg(streams.to_string())
239 }
240
241 pub fn reverse(&mut self) -> &mut Self {
243 self.arg("-R")
244 }
245
246 pub fn bidirectional(&mut self) -> &mut Self {
248 self.arg("--bidir")
249 }
250
251 pub fn no_delay(&mut self) -> &mut Self {
253 self.arg("-N")
254 }
255
256 pub fn zerocopy(&mut self) -> &mut Self {
258 self.arg("-Z")
259 }
260
261 pub fn congestion_control(&mut self, algorithm: impl Into<String>) -> &mut Self {
267 self.arg("-C").arg(algorithm)
268 }
269
270 pub fn json(&mut self) -> &mut Self {
272 self.arg("-J")
273 }
274
275 pub fn metrics(&mut self, mode: MetricsMode) -> &mut Self {
283 self.metrics_mode = mode;
284 self
285 }
286
287 #[cfg(feature = "pushgateway")]
305 pub fn pushgateway(&mut self, config: PushGatewayConfig, mode: MetricsMode) -> &mut Self {
306 self.pushgateway = Some(PushGatewayRun { config, mode });
307 self
308 }
309
310 #[cfg(feature = "pushgateway")]
312 pub fn clear_pushgateway(&mut self) -> &mut Self {
313 self.pushgateway = None;
314 self
315 }
316
317 #[cfg(feature = "pushgateway")]
326 pub fn run_with_pushgateway(
327 &self,
328 config: PushGatewayConfig,
329 mode: MetricsMode,
330 ) -> Result<IperfResult> {
331 let mut command = self.clone();
332 command.pushgateway = Some(PushGatewayRun { config, mode });
333 command.run()
334 }
335
336 #[cfg(feature = "pushgateway")]
342 pub fn spawn_with_pushgateway(
343 &self,
344 config: PushGatewayConfig,
345 mode: MetricsMode,
346 ) -> Result<RunningIperf> {
347 let mut command = self.clone();
348 command.pushgateway = Some(PushGatewayRun { config, mode });
349 command.spawn()
350 }
351
352 pub fn allow_unbounded_server(&mut self, allow: bool) -> &mut Self {
363 self.allow_unbounded_server = allow;
364 self
365 }
366
367 pub fn run(&self) -> Result<IperfResult> {
374 run_command(self.clone(), None)
375 }
376
377 pub fn spawn(&self) -> Result<RunningIperf> {
386 let command = self.clone();
387 let (ready_tx, ready_rx) = bounded::<ReadyMessage>(1);
388 let handle = thread::spawn(move || run_command(command, Some(ready_tx)));
389
390 match ready_rx.recv() {
391 Ok(Ok(metrics)) => Ok(RunningIperf {
392 handle: Some(handle),
393 metrics,
394 }),
395 Ok(Err(err)) => {
396 let _ = handle.join();
397 Err(Error::worker(err))
398 }
399 Err(err) => {
400 let _ = handle.join();
401 Err(Error::worker(format!(
402 "iperf worker exited before setup completed: {err}"
403 )))
404 }
405 }
406 }
407
408 pub fn spawn_with_metrics(&self, mode: MetricsMode) -> Result<(RunningIperf, MetricsStream)> {
416 let mut command = self.clone();
417 command.metrics(mode);
418 let mut running = command.spawn()?;
419 let metrics = running
420 .take_metrics()
421 .ok_or_else(|| Error::internal("metrics stream was not created"))?;
422 Ok((running, metrics))
423 }
424
425 fn argv(&self) -> Vec<String> {
426 let mut argv = Vec::with_capacity(self.args.len() + 1);
427 argv.push(self.program.clone());
428 argv.extend(self.args.iter().cloned());
429 argv
430 }
431
432 fn should_suppress_output(&self) -> bool {
433 self.suppress_output && !self.has_logfile_arg()
434 }
435
436 fn has_logfile_arg(&self) -> bool {
437 self.args
438 .iter()
439 .any(|arg| arg == "--logfile" || arg.starts_with("--logfile="))
440 }
441}
442
443impl Default for IperfCommand {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449#[derive(Debug)]
451pub struct IperfResult {
452 role: Role,
453 json_output: Option<String>,
454 metrics: Vec<MetricEvent>,
455}
456
457impl IperfResult {
458 pub fn role(&self) -> Role {
460 self.role
461 }
462
463 pub fn json_output(&self) -> Option<&str> {
465 self.json_output.as_deref()
466 }
467
468 #[cfg(feature = "serde")]
474 pub fn json_value(&self) -> Option<std::result::Result<serde_json::Value, serde_json::Error>> {
475 self.json_output.as_deref().map(serde_json::from_str)
476 }
477
478 pub fn metrics(&self) -> &[MetricEvent] {
483 &self.metrics
484 }
485}
486
487#[derive(Debug)]
496#[must_use = "dropping RunningIperf detaches the worker; call wait to observe the iperf result"]
497pub struct RunningIperf {
498 handle: Option<JoinHandle<Result<IperfResult>>>,
499 metrics: Option<MetricsStream>,
500}
501
502impl RunningIperf {
503 pub fn metrics(&self) -> Option<&MetricsStream> {
505 self.metrics.as_ref()
506 }
507
508 pub fn take_metrics(&mut self) -> Option<MetricsStream> {
510 self.metrics.take()
511 }
512
513 pub fn is_finished(&self) -> bool {
515 self.handle
516 .as_ref()
517 .map(JoinHandle::is_finished)
518 .unwrap_or(true)
519 }
520
521 pub fn try_wait(&mut self) -> Result<Option<IperfResult>> {
527 if !self.is_finished() {
528 return Ok(None);
529 }
530 self.take_finished_result().map(Some)
531 }
532
533 pub fn wait_timeout(&mut self, timeout: Duration) -> Result<Option<IperfResult>> {
540 let deadline = Instant::now()
541 .checked_add(timeout)
542 .unwrap_or_else(Instant::now);
543 loop {
544 if self.is_finished() {
545 return self.take_finished_result().map(Some);
546 }
547 if timeout.is_zero() || Instant::now() >= deadline {
548 return Ok(None);
549 }
550 thread::sleep(
551 Duration::from_millis(10).min(deadline.saturating_duration_since(Instant::now())),
552 );
553 }
554 }
555
556 pub fn wait(mut self) -> Result<IperfResult> {
558 self.take_handle()?
559 .join()
560 .map_err(|_| Error::worker("iperf worker thread panicked"))?
561 }
562
563 fn take_finished_result(&mut self) -> Result<IperfResult> {
564 self.take_handle()?
565 .join()
566 .map_err(|_| Error::worker("iperf worker thread panicked"))?
567 }
568
569 fn take_handle(&mut self) -> Result<JoinHandle<Result<IperfResult>>> {
570 self.handle
571 .take()
572 .ok_or_else(|| Error::worker("iperf worker result was already observed"))
573 }
574}
575
576type ReadyMessage = std::result::Result<Option<MetricsStream>, String>;
577
578struct RunSetup {
579 test: IperfTest,
580 role: Role,
581 callback: Option<CallbackMetricsReporter>,
582 stream: Option<MetricsStream>,
583 worker: Option<JoinHandle<()>>,
584 #[cfg(feature = "pushgateway")]
585 push_reporter: Option<IntervalMetricsReporter>,
586}
587
588fn run_command(command: IperfCommand, ready: Option<Sender<ReadyMessage>>) -> Result<IperfResult> {
589 let _guard = run_lock()
590 .lock()
591 .map_err(|_| Error::internal("libiperf run lock is poisoned"))?;
592
593 let mut setup = match setup_run(command) {
594 Ok(setup) => setup,
595 Err(err) => {
596 notify_ready(ready, Err(format!("{err:#}")));
597 return Err(err);
598 }
599 };
600
601 let ready_stream = if ready.is_some() {
602 setup.stream.take()
603 } else {
604 None
605 };
606 notify_ready(ready, Ok(ready_stream));
607
608 let result = setup.test.run();
609 let json_output = setup.test.json_output();
610
611 drop(setup.callback.take());
614 if let Some(worker) = setup.worker.take() {
615 let _ = worker.join();
616 }
617 #[cfg(feature = "pushgateway")]
618 let push_result = setup
619 .push_reporter
620 .take()
621 .map(IntervalMetricsReporter::finish)
622 .transpose();
623
624 let metrics = setup
625 .stream
626 .map(|stream| stream.collect())
627 .unwrap_or_default();
628
629 result?;
630 #[cfg(feature = "pushgateway")]
631 push_result?;
632 Ok(IperfResult {
633 role: setup.role,
634 json_output,
635 metrics,
636 })
637}
638
639fn setup_run(command: IperfCommand) -> Result<RunSetup> {
640 validate_metrics_mode(command.metrics_mode)?;
641 #[cfg(feature = "pushgateway")]
642 validate_pushgateway_request(&command)?;
643
644 let mut test = IperfTest::new()?;
645 test.parse_arguments(&command.argv())?;
646 if command.should_suppress_output() {
647 test.suppress_output()?;
648 }
649 let role = test.role();
650 validate_server_lifecycle(&command, &test, role)?;
651
652 #[cfg(feature = "pushgateway")]
653 let (callback, stream, worker, push_reporter) =
654 if let Some(queue) = command.metrics_mode.callback_queue() {
655 let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
656 let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
657 (Some(callback), Some(stream), Some(worker), None)
658 } else if let Some(pushgateway) = command.pushgateway {
659 let sink = PushGateway::new(pushgateway.config)?;
660 let reporter =
661 IntervalMetricsReporter::attach(&mut test, sink, pushgateway.mode.push_interval())?;
662 (None, None, None, Some(reporter))
663 } else {
664 (None, None, None, None)
665 };
666 #[cfg(not(feature = "pushgateway"))]
667 let (callback, stream, worker) = match command.metrics_mode.callback_queue() {
668 Some(queue) => {
669 let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
670 let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
671 (Some(callback), Some(stream), Some(worker))
672 }
673 None => (None, None, None),
674 };
675
676 Ok(RunSetup {
677 test,
678 role,
679 callback,
680 stream,
681 worker,
682 #[cfg(feature = "pushgateway")]
683 push_reporter,
684 })
685}
686
687fn notify_ready(ready: Option<Sender<ReadyMessage>>, message: ReadyMessage) {
688 if let Some(ready) = ready {
689 let _ = ready.send(message);
690 }
691}
692
693fn run_lock() -> &'static Mutex<()> {
694 RUN_LOCK.get_or_init(|| Mutex::new(()))
706}
707
708fn validate_metrics_mode(mode: MetricsMode) -> Result<()> {
709 if metrics_mode_is_valid(mode) {
710 Ok(())
711 } else {
712 Err(Error::invalid_metrics_mode(
713 "metrics window interval must be greater than zero",
714 ))
715 }
716}
717
718#[cfg(feature = "pushgateway")]
719fn validate_pushgateway_request(command: &IperfCommand) -> Result<()> {
720 let Some(pushgateway) = &command.pushgateway else {
721 return Ok(());
722 };
723 if command.metrics_mode.is_enabled() {
724 return Err(Error::invalid_argument(
725 "direct Pushgateway delivery cannot be combined with a MetricsStream in the same IperfCommand run",
726 ));
727 }
728 validate_pushgateway_mode(pushgateway.mode)
729}
730
731#[cfg(feature = "pushgateway")]
732fn validate_pushgateway_mode(mode: MetricsMode) -> Result<()> {
733 match mode {
734 MetricsMode::Disabled => Err(Error::invalid_metrics_mode(
735 "Pushgateway metrics mode must be Interval or Window",
736 )),
737 MetricsMode::Interval => Ok(()),
738 MetricsMode::Window(interval) if interval.is_zero() => Err(Error::invalid_metrics_mode(
739 "metrics window interval must be greater than zero",
740 )),
741 MetricsMode::Window(_) => Ok(()),
742 }
743}
744
745#[cfg(feature = "pushgateway")]
746impl MetricsMode {
747 fn push_interval(self) -> Option<Duration> {
748 match self {
749 MetricsMode::Disabled | MetricsMode::Interval => None,
750 MetricsMode::Window(interval) => Some(interval),
751 }
752 }
753}
754
755fn metrics_mode_is_valid(mode: MetricsMode) -> bool {
756 !matches!(mode, MetricsMode::Window(interval) if interval.is_zero())
757}
758
759fn whole_seconds_arg(duration: Duration) -> String {
760 let seconds = if duration.subsec_nanos() == 0 {
761 duration.as_secs()
762 } else {
763 duration.as_secs().saturating_add(1)
764 };
765 seconds.to_string()
766}
767
768fn decimal_seconds_arg(duration: Duration) -> String {
769 let seconds = duration.as_secs();
770 let nanos = duration.subsec_nanos();
771 if nanos == 0 {
772 return seconds.to_string();
773 }
774
775 let mut value = format!("{seconds}.{nanos:09}");
776 while value.ends_with('0') {
777 value.pop();
778 }
779 value
780}
781
782fn milliseconds_arg(duration: Duration) -> String {
783 let millis = duration.as_millis();
784 let has_fractional_millis = !duration.subsec_nanos().is_multiple_of(1_000_000);
785 if has_fractional_millis {
786 millis.saturating_add(1).to_string()
787 } else {
788 millis.to_string()
789 }
790}
791
792fn validate_server_lifecycle(command: &IperfCommand, test: &IperfTest, role: Role) -> Result<()> {
793 if role == Role::Server && !test.one_off() && !command.allow_unbounded_server {
794 return Err(Error::invalid_argument(
795 "IperfCommand server mode must use -1/--one-off or opt in with allow_unbounded_server(true)",
796 ));
797 }
798 Ok(())
799}
800
801#[cfg(kani)]
802mod verification {
803 use std::time::Duration;
804
805 use super::*;
806
807 #[kani::proof]
808 fn zero_window_interval_is_the_only_invalid_metrics_mode() {
809 let seconds: u8 = kani::any();
810 let mode = MetricsMode::Window(Duration::from_secs(u64::from(seconds)));
811
812 assert_eq!(metrics_mode_is_valid(mode), seconds != 0);
813 assert!(metrics_mode_is_valid(MetricsMode::Disabled));
814 assert!(metrics_mode_is_valid(MetricsMode::Interval));
815 }
816}
817
818#[cfg(test)]
819mod tests {
820 use std::time::Duration;
821
822 use crate::ErrorKind;
823 #[cfg(feature = "pushgateway")]
824 use url::Url;
825
826 use super::*;
827
828 #[test]
829 fn argv_includes_program_name_before_iperf_arguments() {
830 let mut command = IperfCommand::new();
831 command.arg("-c").arg("127.0.0.1");
832
833 assert_eq!(
834 command.argv(),
835 vec![
836 "iperf3-rs".to_owned(),
837 "-c".to_owned(),
838 "127.0.0.1".to_owned()
839 ]
840 );
841 }
842
843 #[test]
844 fn custom_program_name_is_used_as_argv_zero() {
845 let mut command = IperfCommand::new();
846 command.program("iperf3").arg("-v");
847
848 assert_eq!(command.argv(), vec!["iperf3".to_owned(), "-v".to_owned()]);
849 }
850
851 #[test]
852 fn library_output_is_quiet_by_default() {
853 let command = IperfCommand::new();
854
855 assert!(command.should_suppress_output());
856 }
857
858 #[test]
859 fn inherit_output_disables_library_quiet_default() {
860 let mut command = IperfCommand::new();
861 command.inherit_output();
862
863 assert!(!command.should_suppress_output());
864
865 command.quiet();
866 assert!(command.should_suppress_output());
867 }
868
869 #[test]
870 fn explicit_logfile_disables_null_output_sink() {
871 let mut typed = IperfCommand::new();
872 typed.logfile("iperf.log");
873
874 let mut raw_split = IperfCommand::new();
875 raw_split.arg("--logfile").arg("iperf.log");
876
877 let mut raw_equals = IperfCommand::new();
878 raw_equals.arg("--logfile=iperf.log");
879
880 assert!(!typed.should_suppress_output());
881 assert!(!raw_split.should_suppress_output());
882 assert!(!raw_equals.should_suppress_output());
883 }
884
885 #[test]
886 fn typed_client_builder_appends_iperf_arguments() {
887 let mut command = IperfCommand::client("192.0.2.10");
888 command
889 .port(5202)
890 .duration(Duration::from_secs(3))
891 .report_interval(Duration::from_millis(500))
892 .udp()
893 .bitrate_bits_per_second(1_000_000)
894 .parallel_streams(4)
895 .reverse()
896 .json()
897 .arg("--get-server-output");
898
899 assert_eq!(
900 command.argv(),
901 vec![
902 "iperf3-rs".to_owned(),
903 "-c".to_owned(),
904 "192.0.2.10".to_owned(),
905 "-p".to_owned(),
906 "5202".to_owned(),
907 "-t".to_owned(),
908 "3".to_owned(),
909 "-i".to_owned(),
910 "0.5".to_owned(),
911 "-u".to_owned(),
912 "-b".to_owned(),
913 "1000000".to_owned(),
914 "-P".to_owned(),
915 "4".to_owned(),
916 "-R".to_owned(),
917 "-J".to_owned(),
918 "--get-server-output".to_owned(),
919 ]
920 );
921 }
922
923 #[test]
924 fn typed_operational_helpers_append_iperf_arguments() {
925 let mut command = IperfCommand::client("192.0.2.10");
926 command
927 .logfile("iperf.log")
928 .connect_timeout(Duration::from_millis(1500))
929 .omit(Duration::from_millis(250))
930 .bind("127.0.0.1%lo0")
931 .no_delay()
932 .zerocopy()
933 .congestion_control("cubic");
934
935 assert_eq!(
936 command.argv(),
937 vec![
938 "iperf3-rs".to_owned(),
939 "-c".to_owned(),
940 "192.0.2.10".to_owned(),
941 "--logfile".to_owned(),
942 "iperf.log".to_owned(),
943 "--connect-timeout".to_owned(),
944 "1500".to_owned(),
945 "-O".to_owned(),
946 "0.25".to_owned(),
947 "-B".to_owned(),
948 "127.0.0.1%lo0".to_owned(),
949 "-N".to_owned(),
950 "-Z".to_owned(),
951 "-C".to_owned(),
952 "cubic".to_owned(),
953 ]
954 );
955 }
956
957 #[test]
958 fn typed_server_constructors_select_expected_lifecycle() {
959 let one_off = IperfCommand::server_once();
960 assert_eq!(
961 one_off.argv(),
962 vec!["iperf3-rs".to_owned(), "-s".to_owned(), "-1".to_owned()]
963 );
964 assert!(!one_off.allow_unbounded_server);
965
966 let unbounded = IperfCommand::server_unbounded();
967 assert_eq!(
968 unbounded.argv(),
969 vec!["iperf3-rs".to_owned(), "-s".to_owned()]
970 );
971 assert!(unbounded.allow_unbounded_server);
972 }
973
974 #[test]
975 fn bidirectional_helper_appends_long_option() {
976 let mut command = IperfCommand::client("192.0.2.10");
977 command.bidirectional();
978
979 assert_eq!(
980 command.argv(),
981 vec![
982 "iperf3-rs".to_owned(),
983 "-c".to_owned(),
984 "192.0.2.10".to_owned(),
985 "--bidir".to_owned()
986 ]
987 );
988 }
989
990 #[test]
991 fn sctp_helper_appends_long_option() {
992 let mut command = IperfCommand::client("192.0.2.10");
993 command.sctp();
994
995 assert_eq!(
996 command.argv(),
997 vec![
998 "iperf3-rs".to_owned(),
999 "-c".to_owned(),
1000 "192.0.2.10".to_owned(),
1001 "--sctp".to_owned()
1002 ]
1003 );
1004 }
1005
1006 #[cfg(feature = "pushgateway")]
1007 #[test]
1008 fn pushgateway_helper_records_delivery_config() {
1009 let config = PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap())
1010 .label("scenario", "library");
1011 let mut command = IperfCommand::client("192.0.2.10");
1012 command.pushgateway(config, MetricsMode::Window(Duration::from_secs(5)));
1013
1014 let pushgateway = command.pushgateway.as_ref().unwrap();
1015 assert_eq!(
1016 pushgateway.mode,
1017 MetricsMode::Window(Duration::from_secs(5))
1018 );
1019 assert_eq!(
1020 pushgateway.config.labels,
1021 [("scenario".to_owned(), "library".to_owned())]
1022 );
1023
1024 command.clear_pushgateway();
1025 assert!(command.pushgateway.is_none());
1026 }
1027
1028 #[cfg(feature = "pushgateway")]
1029 #[test]
1030 fn pushgateway_convenience_helpers_do_not_persist_config() {
1031 let mut command = IperfCommand::new();
1032 command.metrics(MetricsMode::Window(Duration::ZERO));
1033
1034 let result = command.run_with_pushgateway(
1035 PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1036 MetricsMode::Interval,
1037 );
1038
1039 assert!(result.is_err());
1040 assert!(command.pushgateway.is_none());
1041 }
1042
1043 #[test]
1044 fn spawn_with_metrics_does_not_persist_metrics_mode() {
1045 let command = IperfCommand::new();
1046
1047 let err = command
1048 .spawn_with_metrics(MetricsMode::Window(Duration::ZERO))
1049 .unwrap_err();
1050
1051 assert!(err.to_string().contains("greater than zero"), "{err:#}");
1052 assert_eq!(command.metrics_mode, MetricsMode::Disabled);
1053 }
1054
1055 #[test]
1056 fn duration_helpers_preserve_nonzero_subsecond_intent() {
1057 assert_eq!(whole_seconds_arg(Duration::ZERO), "0");
1058 assert_eq!(whole_seconds_arg(Duration::from_millis(1)), "1");
1059 assert_eq!(whole_seconds_arg(Duration::from_millis(1500)), "2");
1060 assert_eq!(decimal_seconds_arg(Duration::ZERO), "0");
1061 assert_eq!(decimal_seconds_arg(Duration::from_millis(250)), "0.25");
1062 assert_eq!(decimal_seconds_arg(Duration::new(1, 1)), "1.000000001");
1063 assert_eq!(milliseconds_arg(Duration::ZERO), "0");
1064 assert_eq!(milliseconds_arg(Duration::from_nanos(1)), "1");
1065 assert_eq!(milliseconds_arg(Duration::from_millis(1500)), "1500");
1066 assert_eq!(milliseconds_arg(Duration::new(1, 1)), "1001");
1067 }
1068
1069 #[test]
1070 fn unbounded_server_mode_is_rejected_by_default() {
1071 let command = {
1072 let mut command = IperfCommand::new();
1073 command.arg("-s");
1074 command
1075 };
1076
1077 let err = match setup_run(command) {
1078 Ok(_) => panic!("unbounded server should be rejected"),
1079 Err(err) => err,
1080 };
1081 assert_eq!(err.kind(), ErrorKind::InvalidArgument);
1082 assert!(err.to_string().contains("allow_unbounded_server"));
1083 }
1084
1085 #[test]
1086 fn one_off_server_mode_is_allowed() {
1087 let command = {
1088 let mut command = IperfCommand::new();
1089 command.args(["-s", "-1"]);
1090 command
1091 };
1092
1093 let setup = setup_run(command).unwrap();
1094 assert_eq!(setup.role, Role::Server);
1095 }
1096
1097 #[test]
1098 fn unbounded_server_mode_can_be_explicitly_allowed() {
1099 let command = {
1100 let mut command = IperfCommand::new();
1101 command.arg("-s").allow_unbounded_server(true);
1102 command
1103 };
1104
1105 let setup = setup_run(command).unwrap();
1106 assert_eq!(setup.role, Role::Server);
1107 }
1108
1109 #[test]
1110 fn zero_metrics_window_interval_is_rejected_before_running_iperf() {
1111 let mut command = IperfCommand::new();
1112 command.metrics(MetricsMode::Window(Duration::ZERO));
1113
1114 let err = command.run().unwrap_err();
1115 assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
1116 assert!(err.to_string().contains("greater than zero"));
1117 }
1118
1119 #[cfg(feature = "pushgateway")]
1120 #[test]
1121 fn direct_pushgateway_rejects_disabled_or_zero_window_mode() {
1122 for mode in [MetricsMode::Disabled, MetricsMode::Window(Duration::ZERO)] {
1123 let command = {
1124 let mut command = IperfCommand::new();
1125 command.arg("-s").arg("-1").pushgateway(
1126 PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1127 mode,
1128 );
1129 command
1130 };
1131
1132 let err = match setup_run(command) {
1133 Ok(_) => panic!("invalid Pushgateway mode should be rejected"),
1134 Err(err) => err,
1135 };
1136 assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
1137 }
1138 }
1139
1140 #[cfg(feature = "pushgateway")]
1141 #[test]
1142 fn direct_pushgateway_is_rejected_when_metrics_stream_is_enabled() {
1143 let command = {
1144 let mut command = IperfCommand::new();
1145 command
1146 .arg("-s")
1147 .arg("-1")
1148 .metrics(MetricsMode::Interval)
1149 .pushgateway(
1150 PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
1151 MetricsMode::Interval,
1152 );
1153 command
1154 };
1155
1156 let err = match setup_run(command) {
1157 Ok(_) => panic!("direct Pushgateway and MetricsStream should be rejected together"),
1158 Err(err) => err,
1159 };
1160 assert_eq!(err.kind(), ErrorKind::InvalidArgument);
1161 assert!(err.to_string().contains("cannot be combined"));
1162 }
1163
1164 #[test]
1165 fn running_iperf_try_wait_observes_finished_worker_once() {
1166 let mut running = RunningIperf {
1167 handle: Some(thread::spawn(|| Ok(test_result()))),
1168 metrics: None,
1169 };
1170
1171 let result = running
1172 .wait_timeout(Duration::from_secs(1))
1173 .unwrap()
1174 .expect("worker should finish");
1175 assert_eq!(result.role(), Role::Client);
1176 assert_eq!(running.try_wait().unwrap_err().kind(), ErrorKind::Worker);
1177 }
1178
1179 #[test]
1180 fn running_iperf_try_wait_returns_none_while_worker_is_running() {
1181 let (release_tx, release_rx) = bounded::<()>(1);
1182 let mut running = RunningIperf {
1183 handle: Some(thread::spawn(move || {
1184 release_rx.recv().unwrap();
1185 Ok(test_result())
1186 })),
1187 metrics: None,
1188 };
1189
1190 assert!(!running.is_finished());
1191 assert!(running.try_wait().unwrap().is_none());
1192 assert!(running.wait_timeout(Duration::ZERO).unwrap().is_none());
1193
1194 release_tx.send(()).unwrap();
1195 assert!(
1196 running
1197 .wait_timeout(Duration::from_secs(1))
1198 .unwrap()
1199 .is_some()
1200 );
1201 }
1202
1203 #[test]
1204 fn run_without_client_or_server_role_fails_fast() {
1205 let command = IperfCommand::new();
1206
1207 let err = command.run().unwrap_err();
1208 assert_eq!(err.kind(), ErrorKind::Libiperf);
1209 assert!(
1210 err.to_string().contains("client (-c) or server (-s)"),
1211 "{err:#}"
1212 );
1213 }
1214
1215 fn test_result() -> IperfResult {
1216 IperfResult {
1217 role: Role::Client,
1218 json_output: None,
1219 metrics: Vec::new(),
1220 }
1221 }
1222}