use std::path::Path;
use std::sync::{Mutex, OnceLock};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::{Sender, bounded};
use crate::iperf::{IperfTest, Role};
#[cfg(feature = "pushgateway")]
use crate::metrics::IntervalMetricsReporter;
use crate::metrics::{
CallbackMetricsReporter, MetricEvent, MetricsMode, MetricsStream, metric_event_stream,
};
#[cfg(feature = "pushgateway")]
use crate::pushgateway::{PushGateway, PushGatewayConfig};
use crate::{Error, Result};
static RUN_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
#[derive(Debug, Clone)]
pub struct IperfCommand {
program: String,
args: Vec<String>,
metrics_mode: MetricsMode,
#[cfg(feature = "pushgateway")]
pushgateway: Option<PushGatewayRun>,
allow_unbounded_server: bool,
suppress_output: bool,
}
#[cfg(feature = "pushgateway")]
#[derive(Debug, Clone)]
struct PushGatewayRun {
config: PushGatewayConfig,
mode: MetricsMode,
}
impl IperfCommand {
pub fn new() -> Self {
Self {
program: "iperf3-rs".to_owned(),
args: Vec::new(),
metrics_mode: MetricsMode::Disabled,
#[cfg(feature = "pushgateway")]
pushgateway: None,
allow_unbounded_server: false,
suppress_output: true,
}
}
pub fn client(host: impl Into<String>) -> Self {
let mut command = Self::new();
command.arg("-c").arg(host);
command
}
pub fn server_once() -> Self {
let mut command = Self::new();
command.args(["-s", "-1"]);
command
}
pub fn server_unbounded() -> Self {
let mut command = Self::new();
command.arg("-s").allow_unbounded_server(true);
command
}
pub fn program(&mut self, program: impl Into<String>) -> &mut Self {
self.program = program.into();
self
}
pub fn arg(&mut self, arg: impl Into<String>) -> &mut Self {
self.args.push(arg.into());
self
}
pub fn args<I, S>(&mut self, args: I) -> &mut Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.args.extend(args.into_iter().map(Into::into));
self
}
pub fn port(&mut self, port: u16) -> &mut Self {
self.arg("-p").arg(port.to_string())
}
pub fn duration(&mut self, duration: Duration) -> &mut Self {
self.arg("-t").arg(whole_seconds_arg(duration))
}
pub fn report_interval(&mut self, interval: Duration) -> &mut Self {
self.arg("-i").arg(decimal_seconds_arg(interval))
}
pub fn logfile(&mut self, path: impl AsRef<Path>) -> &mut Self {
self.arg("--logfile")
.arg(path.as_ref().to_string_lossy().into_owned())
}
pub fn quiet(&mut self) -> &mut Self {
self.suppress_output = true;
self
}
pub fn inherit_output(&mut self) -> &mut Self {
self.suppress_output = false;
self
}
pub fn connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.arg("--connect-timeout").arg(milliseconds_arg(timeout))
}
pub fn omit(&mut self, duration: Duration) -> &mut Self {
self.arg("-O").arg(decimal_seconds_arg(duration))
}
pub fn bind(&mut self, address: impl Into<String>) -> &mut Self {
self.arg("-B").arg(address)
}
pub fn udp(&mut self) -> &mut Self {
self.arg("-u")
}
pub fn sctp(&mut self) -> &mut Self {
self.arg("--sctp")
}
pub fn bitrate_bits_per_second(&mut self, bits_per_second: u64) -> &mut Self {
self.arg("-b").arg(bits_per_second.to_string())
}
pub fn parallel_streams(&mut self, streams: u16) -> &mut Self {
self.arg("-P").arg(streams.to_string())
}
pub fn reverse(&mut self) -> &mut Self {
self.arg("-R")
}
pub fn bidirectional(&mut self) -> &mut Self {
self.arg("--bidir")
}
pub fn no_delay(&mut self) -> &mut Self {
self.arg("-N")
}
pub fn zerocopy(&mut self) -> &mut Self {
self.arg("-Z")
}
pub fn congestion_control(&mut self, algorithm: impl Into<String>) -> &mut Self {
self.arg("-C").arg(algorithm)
}
pub fn json(&mut self) -> &mut Self {
self.arg("-J")
}
pub fn metrics(&mut self, mode: MetricsMode) -> &mut Self {
self.metrics_mode = mode;
self
}
#[cfg(feature = "pushgateway")]
pub fn pushgateway(&mut self, config: PushGatewayConfig, mode: MetricsMode) -> &mut Self {
self.pushgateway = Some(PushGatewayRun { config, mode });
self
}
#[cfg(feature = "pushgateway")]
pub fn clear_pushgateway(&mut self) -> &mut Self {
self.pushgateway = None;
self
}
#[cfg(feature = "pushgateway")]
pub fn run_with_pushgateway(
&self,
config: PushGatewayConfig,
mode: MetricsMode,
) -> Result<IperfResult> {
let mut command = self.clone();
command.pushgateway = Some(PushGatewayRun { config, mode });
command.run()
}
#[cfg(feature = "pushgateway")]
pub fn spawn_with_pushgateway(
&self,
config: PushGatewayConfig,
mode: MetricsMode,
) -> Result<RunningIperf> {
let mut command = self.clone();
command.pushgateway = Some(PushGatewayRun { config, mode });
command.spawn()
}
pub fn allow_unbounded_server(&mut self, allow: bool) -> &mut Self {
self.allow_unbounded_server = allow;
self
}
pub fn run(&self) -> Result<IperfResult> {
run_command(self.clone(), None)
}
pub fn spawn(&self) -> Result<RunningIperf> {
let command = self.clone();
let (ready_tx, ready_rx) = bounded::<ReadyMessage>(1);
let handle = thread::spawn(move || run_command(command, Some(ready_tx)));
match ready_rx.recv() {
Ok(Ok(metrics)) => Ok(RunningIperf {
handle: Some(handle),
metrics,
}),
Ok(Err(err)) => {
let _ = handle.join();
Err(Error::worker(err))
}
Err(err) => {
let _ = handle.join();
Err(Error::worker(format!(
"iperf worker exited before setup completed: {err}"
)))
}
}
}
pub fn spawn_with_metrics(&self, mode: MetricsMode) -> Result<(RunningIperf, MetricsStream)> {
let mut command = self.clone();
command.metrics(mode);
let mut running = command.spawn()?;
let metrics = running
.take_metrics()
.ok_or_else(|| Error::internal("metrics stream was not created"))?;
Ok((running, metrics))
}
fn argv(&self) -> Vec<String> {
let mut argv = Vec::with_capacity(self.args.len() + 1);
argv.push(self.program.clone());
argv.extend(self.args.iter().cloned());
argv
}
fn should_suppress_output(&self) -> bool {
self.suppress_output && !self.has_logfile_arg()
}
fn has_logfile_arg(&self) -> bool {
self.args
.iter()
.any(|arg| arg == "--logfile" || arg.starts_with("--logfile="))
}
}
impl Default for IperfCommand {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct IperfResult {
role: Role,
json_output: Option<String>,
metrics: Vec<MetricEvent>,
}
impl IperfResult {
pub fn role(&self) -> Role {
self.role
}
pub fn json_output(&self) -> Option<&str> {
self.json_output.as_deref()
}
#[cfg(feature = "serde")]
pub fn json_value(&self) -> Option<std::result::Result<serde_json::Value, serde_json::Error>> {
self.json_output.as_deref().map(serde_json::from_str)
}
pub fn metrics(&self) -> &[MetricEvent] {
&self.metrics
}
}
#[derive(Debug)]
#[must_use = "dropping RunningIperf detaches the worker; call wait to observe the iperf result"]
pub struct RunningIperf {
handle: Option<JoinHandle<Result<IperfResult>>>,
metrics: Option<MetricsStream>,
}
impl RunningIperf {
pub fn metrics(&self) -> Option<&MetricsStream> {
self.metrics.as_ref()
}
pub fn take_metrics(&mut self) -> Option<MetricsStream> {
self.metrics.take()
}
pub fn is_finished(&self) -> bool {
self.handle
.as_ref()
.map(JoinHandle::is_finished)
.unwrap_or(true)
}
pub fn try_wait(&mut self) -> Result<Option<IperfResult>> {
if !self.is_finished() {
return Ok(None);
}
self.take_finished_result().map(Some)
}
pub fn wait_timeout(&mut self, timeout: Duration) -> Result<Option<IperfResult>> {
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(Instant::now);
loop {
if self.is_finished() {
return self.take_finished_result().map(Some);
}
if timeout.is_zero() || Instant::now() >= deadline {
return Ok(None);
}
thread::sleep(
Duration::from_millis(10).min(deadline.saturating_duration_since(Instant::now())),
);
}
}
pub fn wait(mut self) -> Result<IperfResult> {
self.take_handle()?
.join()
.map_err(|_| Error::worker("iperf worker thread panicked"))?
}
fn take_finished_result(&mut self) -> Result<IperfResult> {
self.take_handle()?
.join()
.map_err(|_| Error::worker("iperf worker thread panicked"))?
}
fn take_handle(&mut self) -> Result<JoinHandle<Result<IperfResult>>> {
self.handle
.take()
.ok_or_else(|| Error::worker("iperf worker result was already observed"))
}
}
type ReadyMessage = std::result::Result<Option<MetricsStream>, String>;
struct RunSetup {
test: IperfTest,
role: Role,
callback: Option<CallbackMetricsReporter>,
stream: Option<MetricsStream>,
worker: Option<JoinHandle<()>>,
#[cfg(feature = "pushgateway")]
push_reporter: Option<IntervalMetricsReporter>,
}
fn run_command(command: IperfCommand, ready: Option<Sender<ReadyMessage>>) -> Result<IperfResult> {
let _guard = run_lock()
.lock()
.map_err(|_| Error::internal("libiperf run lock is poisoned"))?;
let mut setup = match setup_run(command) {
Ok(setup) => setup,
Err(err) => {
notify_ready(ready, Err(format!("{err:#}")));
return Err(err);
}
};
let ready_stream = if ready.is_some() {
setup.stream.take()
} else {
None
};
notify_ready(ready, Ok(ready_stream));
let result = setup.test.run();
let json_output = setup.test.json_output();
drop(setup.callback.take());
if let Some(worker) = setup.worker.take() {
let _ = worker.join();
}
#[cfg(feature = "pushgateway")]
let push_result = setup
.push_reporter
.take()
.map(IntervalMetricsReporter::finish)
.transpose();
let metrics = setup
.stream
.map(|stream| stream.collect())
.unwrap_or_default();
result?;
#[cfg(feature = "pushgateway")]
push_result?;
Ok(IperfResult {
role: setup.role,
json_output,
metrics,
})
}
fn setup_run(command: IperfCommand) -> Result<RunSetup> {
validate_metrics_mode(command.metrics_mode)?;
#[cfg(feature = "pushgateway")]
validate_pushgateway_request(&command)?;
let mut test = IperfTest::new()?;
test.parse_arguments(&command.argv())?;
if command.should_suppress_output() {
test.suppress_output()?;
}
let role = test.role();
validate_server_lifecycle(&command, &test, role)?;
#[cfg(feature = "pushgateway")]
let (callback, stream, worker, push_reporter) =
if let Some(queue) = command.metrics_mode.callback_queue() {
let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
(Some(callback), Some(stream), Some(worker), None)
} else if let Some(pushgateway) = command.pushgateway {
let sink = PushGateway::new(pushgateway.config)?;
let reporter =
IntervalMetricsReporter::attach(&mut test, sink, pushgateway.mode.push_interval())?;
(None, None, None, Some(reporter))
} else {
(None, None, None, None)
};
#[cfg(not(feature = "pushgateway"))]
let (callback, stream, worker) = match command.metrics_mode.callback_queue() {
Some(queue) => {
let (callback, rx) = CallbackMetricsReporter::attach(&mut test, queue)?;
let (stream, worker) = metric_event_stream(rx, command.metrics_mode);
(Some(callback), Some(stream), Some(worker))
}
None => (None, None, None),
};
Ok(RunSetup {
test,
role,
callback,
stream,
worker,
#[cfg(feature = "pushgateway")]
push_reporter,
})
}
fn notify_ready(ready: Option<Sender<ReadyMessage>>, message: ReadyMessage) {
if let Some(ready) = ready {
let _ = ready.send(message);
}
}
fn run_lock() -> &'static Mutex<()> {
RUN_LOCK.get_or_init(|| Mutex::new(()))
}
fn validate_metrics_mode(mode: MetricsMode) -> Result<()> {
if metrics_mode_is_valid(mode) {
Ok(())
} else {
Err(Error::invalid_metrics_mode(
"metrics window interval must be greater than zero",
))
}
}
#[cfg(feature = "pushgateway")]
fn validate_pushgateway_request(command: &IperfCommand) -> Result<()> {
let Some(pushgateway) = &command.pushgateway else {
return Ok(());
};
if command.metrics_mode.is_enabled() {
return Err(Error::invalid_argument(
"direct Pushgateway delivery cannot be combined with a MetricsStream in the same IperfCommand run",
));
}
validate_pushgateway_mode(pushgateway.mode)
}
#[cfg(feature = "pushgateway")]
fn validate_pushgateway_mode(mode: MetricsMode) -> Result<()> {
match mode {
MetricsMode::Disabled => Err(Error::invalid_metrics_mode(
"Pushgateway metrics mode must be Interval or Window",
)),
MetricsMode::Interval => Ok(()),
MetricsMode::Window(interval) if interval.is_zero() => Err(Error::invalid_metrics_mode(
"metrics window interval must be greater than zero",
)),
MetricsMode::Window(_) => Ok(()),
}
}
#[cfg(feature = "pushgateway")]
impl MetricsMode {
fn push_interval(self) -> Option<Duration> {
match self {
MetricsMode::Disabled | MetricsMode::Interval => None,
MetricsMode::Window(interval) => Some(interval),
}
}
}
fn metrics_mode_is_valid(mode: MetricsMode) -> bool {
!matches!(mode, MetricsMode::Window(interval) if interval.is_zero())
}
fn whole_seconds_arg(duration: Duration) -> String {
let seconds = if duration.subsec_nanos() == 0 {
duration.as_secs()
} else {
duration.as_secs().saturating_add(1)
};
seconds.to_string()
}
fn decimal_seconds_arg(duration: Duration) -> String {
let seconds = duration.as_secs();
let nanos = duration.subsec_nanos();
if nanos == 0 {
return seconds.to_string();
}
let mut value = format!("{seconds}.{nanos:09}");
while value.ends_with('0') {
value.pop();
}
value
}
fn milliseconds_arg(duration: Duration) -> String {
let millis = duration.as_millis();
let has_fractional_millis = !duration.subsec_nanos().is_multiple_of(1_000_000);
if has_fractional_millis {
millis.saturating_add(1).to_string()
} else {
millis.to_string()
}
}
fn validate_server_lifecycle(command: &IperfCommand, test: &IperfTest, role: Role) -> Result<()> {
if role == Role::Server && !test.one_off() && !command.allow_unbounded_server {
return Err(Error::invalid_argument(
"IperfCommand server mode must use -1/--one-off or opt in with allow_unbounded_server(true)",
));
}
Ok(())
}
#[cfg(kani)]
mod verification {
use std::time::Duration;
use super::*;
#[kani::proof]
fn zero_window_interval_is_the_only_invalid_metrics_mode() {
let seconds: u8 = kani::any();
let mode = MetricsMode::Window(Duration::from_secs(u64::from(seconds)));
assert_eq!(metrics_mode_is_valid(mode), seconds != 0);
assert!(metrics_mode_is_valid(MetricsMode::Disabled));
assert!(metrics_mode_is_valid(MetricsMode::Interval));
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::ErrorKind;
#[cfg(feature = "pushgateway")]
use url::Url;
use super::*;
#[test]
fn argv_includes_program_name_before_iperf_arguments() {
let mut command = IperfCommand::new();
command.arg("-c").arg("127.0.0.1");
assert_eq!(
command.argv(),
vec![
"iperf3-rs".to_owned(),
"-c".to_owned(),
"127.0.0.1".to_owned()
]
);
}
#[test]
fn custom_program_name_is_used_as_argv_zero() {
let mut command = IperfCommand::new();
command.program("iperf3").arg("-v");
assert_eq!(command.argv(), vec!["iperf3".to_owned(), "-v".to_owned()]);
}
#[test]
fn library_output_is_quiet_by_default() {
let command = IperfCommand::new();
assert!(command.should_suppress_output());
}
#[test]
fn inherit_output_disables_library_quiet_default() {
let mut command = IperfCommand::new();
command.inherit_output();
assert!(!command.should_suppress_output());
command.quiet();
assert!(command.should_suppress_output());
}
#[test]
fn explicit_logfile_disables_null_output_sink() {
let mut typed = IperfCommand::new();
typed.logfile("iperf.log");
let mut raw_split = IperfCommand::new();
raw_split.arg("--logfile").arg("iperf.log");
let mut raw_equals = IperfCommand::new();
raw_equals.arg("--logfile=iperf.log");
assert!(!typed.should_suppress_output());
assert!(!raw_split.should_suppress_output());
assert!(!raw_equals.should_suppress_output());
}
#[test]
fn typed_client_builder_appends_iperf_arguments() {
let mut command = IperfCommand::client("192.0.2.10");
command
.port(5202)
.duration(Duration::from_secs(3))
.report_interval(Duration::from_millis(500))
.udp()
.bitrate_bits_per_second(1_000_000)
.parallel_streams(4)
.reverse()
.json()
.arg("--get-server-output");
assert_eq!(
command.argv(),
vec![
"iperf3-rs".to_owned(),
"-c".to_owned(),
"192.0.2.10".to_owned(),
"-p".to_owned(),
"5202".to_owned(),
"-t".to_owned(),
"3".to_owned(),
"-i".to_owned(),
"0.5".to_owned(),
"-u".to_owned(),
"-b".to_owned(),
"1000000".to_owned(),
"-P".to_owned(),
"4".to_owned(),
"-R".to_owned(),
"-J".to_owned(),
"--get-server-output".to_owned(),
]
);
}
#[test]
fn typed_operational_helpers_append_iperf_arguments() {
let mut command = IperfCommand::client("192.0.2.10");
command
.logfile("iperf.log")
.connect_timeout(Duration::from_millis(1500))
.omit(Duration::from_millis(250))
.bind("127.0.0.1%lo0")
.no_delay()
.zerocopy()
.congestion_control("cubic");
assert_eq!(
command.argv(),
vec![
"iperf3-rs".to_owned(),
"-c".to_owned(),
"192.0.2.10".to_owned(),
"--logfile".to_owned(),
"iperf.log".to_owned(),
"--connect-timeout".to_owned(),
"1500".to_owned(),
"-O".to_owned(),
"0.25".to_owned(),
"-B".to_owned(),
"127.0.0.1%lo0".to_owned(),
"-N".to_owned(),
"-Z".to_owned(),
"-C".to_owned(),
"cubic".to_owned(),
]
);
}
#[test]
fn typed_server_constructors_select_expected_lifecycle() {
let one_off = IperfCommand::server_once();
assert_eq!(
one_off.argv(),
vec!["iperf3-rs".to_owned(), "-s".to_owned(), "-1".to_owned()]
);
assert!(!one_off.allow_unbounded_server);
let unbounded = IperfCommand::server_unbounded();
assert_eq!(
unbounded.argv(),
vec!["iperf3-rs".to_owned(), "-s".to_owned()]
);
assert!(unbounded.allow_unbounded_server);
}
#[test]
fn bidirectional_helper_appends_long_option() {
let mut command = IperfCommand::client("192.0.2.10");
command.bidirectional();
assert_eq!(
command.argv(),
vec![
"iperf3-rs".to_owned(),
"-c".to_owned(),
"192.0.2.10".to_owned(),
"--bidir".to_owned()
]
);
}
#[test]
fn sctp_helper_appends_long_option() {
let mut command = IperfCommand::client("192.0.2.10");
command.sctp();
assert_eq!(
command.argv(),
vec![
"iperf3-rs".to_owned(),
"-c".to_owned(),
"192.0.2.10".to_owned(),
"--sctp".to_owned()
]
);
}
#[cfg(feature = "pushgateway")]
#[test]
fn pushgateway_helper_records_delivery_config() {
let config = PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap())
.label("scenario", "library");
let mut command = IperfCommand::client("192.0.2.10");
command.pushgateway(config, MetricsMode::Window(Duration::from_secs(5)));
let pushgateway = command.pushgateway.as_ref().unwrap();
assert_eq!(
pushgateway.mode,
MetricsMode::Window(Duration::from_secs(5))
);
assert_eq!(
pushgateway.config.labels,
[("scenario".to_owned(), "library".to_owned())]
);
command.clear_pushgateway();
assert!(command.pushgateway.is_none());
}
#[cfg(feature = "pushgateway")]
#[test]
fn pushgateway_convenience_helpers_do_not_persist_config() {
let mut command = IperfCommand::new();
command.metrics(MetricsMode::Window(Duration::ZERO));
let result = command.run_with_pushgateway(
PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
MetricsMode::Interval,
);
assert!(result.is_err());
assert!(command.pushgateway.is_none());
}
#[test]
fn spawn_with_metrics_does_not_persist_metrics_mode() {
let command = IperfCommand::new();
let err = command
.spawn_with_metrics(MetricsMode::Window(Duration::ZERO))
.unwrap_err();
assert!(err.to_string().contains("greater than zero"), "{err:#}");
assert_eq!(command.metrics_mode, MetricsMode::Disabled);
}
#[test]
fn duration_helpers_preserve_nonzero_subsecond_intent() {
assert_eq!(whole_seconds_arg(Duration::ZERO), "0");
assert_eq!(whole_seconds_arg(Duration::from_millis(1)), "1");
assert_eq!(whole_seconds_arg(Duration::from_millis(1500)), "2");
assert_eq!(decimal_seconds_arg(Duration::ZERO), "0");
assert_eq!(decimal_seconds_arg(Duration::from_millis(250)), "0.25");
assert_eq!(decimal_seconds_arg(Duration::new(1, 1)), "1.000000001");
assert_eq!(milliseconds_arg(Duration::ZERO), "0");
assert_eq!(milliseconds_arg(Duration::from_nanos(1)), "1");
assert_eq!(milliseconds_arg(Duration::from_millis(1500)), "1500");
assert_eq!(milliseconds_arg(Duration::new(1, 1)), "1001");
}
#[test]
fn unbounded_server_mode_is_rejected_by_default() {
let command = {
let mut command = IperfCommand::new();
command.arg("-s");
command
};
let err = match setup_run(command) {
Ok(_) => panic!("unbounded server should be rejected"),
Err(err) => err,
};
assert_eq!(err.kind(), ErrorKind::InvalidArgument);
assert!(err.to_string().contains("allow_unbounded_server"));
}
#[test]
fn one_off_server_mode_is_allowed() {
let command = {
let mut command = IperfCommand::new();
command.args(["-s", "-1"]);
command
};
let setup = setup_run(command).unwrap();
assert_eq!(setup.role, Role::Server);
}
#[test]
fn unbounded_server_mode_can_be_explicitly_allowed() {
let command = {
let mut command = IperfCommand::new();
command.arg("-s").allow_unbounded_server(true);
command
};
let setup = setup_run(command).unwrap();
assert_eq!(setup.role, Role::Server);
}
#[test]
fn zero_metrics_window_interval_is_rejected_before_running_iperf() {
let mut command = IperfCommand::new();
command.metrics(MetricsMode::Window(Duration::ZERO));
let err = command.run().unwrap_err();
assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
assert!(err.to_string().contains("greater than zero"));
}
#[cfg(feature = "pushgateway")]
#[test]
fn direct_pushgateway_rejects_disabled_or_zero_window_mode() {
for mode in [MetricsMode::Disabled, MetricsMode::Window(Duration::ZERO)] {
let command = {
let mut command = IperfCommand::new();
command.arg("-s").arg("-1").pushgateway(
PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
mode,
);
command
};
let err = match setup_run(command) {
Ok(_) => panic!("invalid Pushgateway mode should be rejected"),
Err(err) => err,
};
assert_eq!(err.kind(), ErrorKind::InvalidMetricsMode);
}
}
#[cfg(feature = "pushgateway")]
#[test]
fn direct_pushgateway_is_rejected_when_metrics_stream_is_enabled() {
let command = {
let mut command = IperfCommand::new();
command
.arg("-s")
.arg("-1")
.metrics(MetricsMode::Interval)
.pushgateway(
PushGatewayConfig::new(Url::parse("http://localhost:9091").unwrap()),
MetricsMode::Interval,
);
command
};
let err = match setup_run(command) {
Ok(_) => panic!("direct Pushgateway and MetricsStream should be rejected together"),
Err(err) => err,
};
assert_eq!(err.kind(), ErrorKind::InvalidArgument);
assert!(err.to_string().contains("cannot be combined"));
}
#[test]
fn running_iperf_try_wait_observes_finished_worker_once() {
let mut running = RunningIperf {
handle: Some(thread::spawn(|| Ok(test_result()))),
metrics: None,
};
let result = running
.wait_timeout(Duration::from_secs(1))
.unwrap()
.expect("worker should finish");
assert_eq!(result.role(), Role::Client);
assert_eq!(running.try_wait().unwrap_err().kind(), ErrorKind::Worker);
}
#[test]
fn running_iperf_try_wait_returns_none_while_worker_is_running() {
let (release_tx, release_rx) = bounded::<()>(1);
let mut running = RunningIperf {
handle: Some(thread::spawn(move || {
release_rx.recv().unwrap();
Ok(test_result())
})),
metrics: None,
};
assert!(!running.is_finished());
assert!(running.try_wait().unwrap().is_none());
assert!(running.wait_timeout(Duration::ZERO).unwrap().is_none());
release_tx.send(()).unwrap();
assert!(
running
.wait_timeout(Duration::from_secs(1))
.unwrap()
.is_some()
);
}
#[test]
fn run_without_client_or_server_role_fails_fast() {
let command = IperfCommand::new();
let err = command.run().unwrap_err();
assert_eq!(err.kind(), ErrorKind::Libiperf);
assert!(
err.to_string().contains("client (-c) or server (-s)"),
"{err:#}"
);
}
fn test_result() -> IperfResult {
IperfResult {
role: Role::Client,
json_output: None,
metrics: Vec::new(),
}
}
}