use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::config::ScenarioEntry;
use crate::schedule::launch::{launch_scenario, prepare_entries};
use crate::{RuntimeError, SondaError};
pub fn run_multi(entries: Vec<ScenarioEntry>, shutdown: Arc<AtomicBool>) -> Result<(), SondaError> {
let prepared = prepare_entries(entries)?;
let mut handles = Vec::with_capacity(prepared.len());
for (i, prepared_entry) in prepared.into_iter().enumerate() {
let id = format!("multi-{i}");
let handle = launch_scenario(
id,
prepared_entry.entry,
Arc::clone(&shutdown),
prepared_entry.start_delay,
)?;
handles.push(handle);
}
let mut errors: Vec<String> = Vec::new();
for mut handle in handles {
match handle.join(None) {
Ok(()) => {}
Err(e) => errors.push(e.to_string()),
}
}
if errors.is_empty() {
Ok(())
} else {
Err(SondaError::Runtime(RuntimeError::ScenariosFailed(
errors.join("; "),
)))
}
}
pub fn signal_shutdown(shutdown: &AtomicBool) {
shutdown.store(false, Ordering::SeqCst);
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::config::{BaseScheduleConfig, LogScenarioConfig, ScenarioConfig, ScenarioEntry};
use crate::encoder::EncoderConfig;
use crate::generator::{GeneratorConfig, LogGeneratorConfig, TemplateConfig};
use crate::sink::SinkConfig;
use super::{run_multi, signal_shutdown};
fn metrics_entry_stdout(name: &str) -> ScenarioEntry {
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: name.to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})
}
fn logs_entry_stdout(name: &str) -> ScenarioEntry {
ScenarioEntry::Logs(LogScenarioConfig {
base: BaseScheduleConfig {
name: name.to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "test log event".to_string(),
field_pools: std::collections::BTreeMap::new(),
}],
severity_weights: None,
seed: Some(42),
},
encoder: EncoderConfig::JsonLines { precision: None },
})
}
#[test]
fn run_multi_with_empty_scenarios_returns_ok() {
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(vec![], shutdown);
assert!(result.is_ok(), "empty scenario list should return Ok");
}
#[test]
fn run_multi_with_single_metrics_scenario_returns_ok() {
let entries = vec![metrics_entry_stdout("single_metric")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"single metrics scenario should complete without error"
);
}
#[test]
fn run_multi_with_single_logs_scenario_returns_ok() {
let entries = vec![logs_entry_stdout("single_logs")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"single logs scenario should complete without error"
);
}
#[test]
fn run_multi_with_metrics_and_logs_both_complete() {
let entries = vec![
metrics_entry_stdout("concurrent_metrics"),
logs_entry_stdout("concurrent_logs"),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"both concurrent scenarios should complete without error"
);
}
#[test]
fn run_multi_three_concurrent_scenarios_all_complete() {
let entries = vec![
metrics_entry_stdout("m1"),
metrics_entry_stdout("m2"),
logs_entry_stdout("l1"),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"three concurrent scenarios should all complete without error"
);
}
#[test]
fn run_multi_shutdown_flag_stops_all_threads_within_two_seconds() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "shutdown_test_metric".to_string(),
rate: 10.0,
duration: None, gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Logs(LogScenarioConfig {
base: BaseScheduleConfig {
name: "shutdown_test_logs".to_string(),
rate: 10.0,
duration: None, gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "shutdown test".to_string(),
field_pools: std::collections::BTreeMap::new(),
}],
severity_weights: None,
seed: Some(0),
},
encoder: EncoderConfig::JsonLines { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_for_thread = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
signal_shutdown(&shutdown_for_thread);
});
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "shutdown should not produce an error");
assert!(
elapsed < Duration::from_secs(2),
"run_multi should return within 2 seconds of shutdown signal, took {:?}",
elapsed
);
}
#[test]
fn signal_shutdown_stores_false_with_seqcst_ordering() {
let flag = AtomicBool::new(true);
signal_shutdown(&flag);
assert!(
!flag.load(Ordering::SeqCst),
"signal_shutdown should set the flag to false"
);
}
#[test]
fn run_multi_with_invalid_sink_config_returns_err() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "error_test".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_test_cannot_create_this_file_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_err(),
"scenario with an invalid sink path should return Err"
);
let err_msg = result.unwrap_err().to_string();
assert!(
!err_msg.is_empty(),
"error message should be non-empty, got: {err_msg}"
);
}
#[test]
fn run_multi_collects_all_thread_errors() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "err_a".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_err_a_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "err_b".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_err_b_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(result.is_err(), "two failing scenarios should return Err");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains(';'),
"combined error should separate errors with ';', got: {err_msg}"
);
}
#[test]
fn run_multi_thread_errors_produce_runtime_not_config_variant() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "variant_test".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_variant_test_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(result.is_err(), "invalid sink must produce an error");
let err = result.unwrap_err();
assert!(
matches!(
err,
crate::SondaError::Runtime(crate::RuntimeError::ScenariosFailed(_))
),
"thread join errors must be Runtime::ScenariosFailed, not Config; got: {err:?}"
);
}
#[test]
fn run_multi_with_minimal_phase_offset_emits_almost_immediately() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "minimal_offset".to_string(),
rate: 10.0,
duration: Some("200ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("1ms".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "minimal phase_offset should complete ok");
assert!(
elapsed < Duration::from_secs(2),
"minimal phase_offset must not add significant delay, took {:?}",
elapsed
);
}
#[test]
fn run_multi_accepts_zero_phase_offset() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "zero_offset".to_string(),
rate: 10.0,
duration: Some("200ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("0s".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"phase_offset '0s' should succeed (treated as no delay): {:?}",
result.err()
);
}
#[test]
fn run_multi_with_no_phase_offset_preserves_behavior() {
let entries = vec![metrics_entry_stdout("no_offset")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"scenario without phase_offset should work as before"
);
}
#[test]
fn run_multi_respects_phase_offset_between_scenarios() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "first_immediate".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "second_delayed".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("500ms".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "phase_offset multi-scenario should succeed");
assert!(
elapsed >= Duration::from_millis(400),
"total run time must include the phase_offset delay, took {:?}",
elapsed
);
}
#[test]
fn run_multi_shutdown_during_phase_offset_exits_cleanly() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "immediate_indef".to_string(),
rate: 10.0,
duration: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "long_delay".to_string(),
rate: 10.0,
duration: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("10s".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_for_thread = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
signal_shutdown(&shutdown_for_thread);
});
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(
result.is_ok(),
"shutdown during phase_offset should not produce an error"
);
assert!(
elapsed < Duration::from_secs(2),
"run_multi must exit promptly when shutdown during phase_offset, took {:?}",
elapsed
);
}
#[test]
fn run_multi_rejects_invalid_phase_offset() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "bad_offset".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("not_a_duration".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_err(),
"invalid phase_offset must cause run_multi to return Err"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("phase_offset"),
"error message should mention phase_offset, got: {err_msg}"
);
}
#[test]
fn run_multi_with_clock_group_and_offsets() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "grouped_a".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: Some("test-group".to_string()),
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "grouped_b".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("200ms".to_string()),
clock_group: Some("test-group".to_string()),
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"scenarios with clock_group and offsets should complete"
);
}
}