use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::config::ScenarioEntry;
use crate::schedule::launch::{launch_scenario, prepare_entries};
use crate::{RuntimeError, SondaError};
#[cfg(feature = "config")]
use crate::compiler::compile_after::CompiledFile;
#[cfg(feature = "config")]
use crate::compiler::prepare::translate_entry;
#[cfg(feature = "config")]
use crate::config::aliases::desugar_entry;
#[cfg(feature = "config")]
use crate::config::expand_entry;
#[cfg(feature = "config")]
use crate::schedule::core_loop::GateContext;
#[cfg(feature = "config")]
use crate::schedule::gate_bus::{GateBus, SubscriptionSpec, WhileSpec};
#[cfg(feature = "config")]
use crate::schedule::launch::{launch_scenario_with_gates, validate_entry};
#[cfg(feature = "config")]
use std::collections::HashMap;
pub fn run_multi(entries: Vec<ScenarioEntry>, shutdown: Arc<AtomicBool>) -> Result<(), SondaError> {
let prepared = prepare_entries(entries)?;
let mut handles = Vec::with_capacity(prepared.len());
for (i, prepared_entry) in prepared.into_iter().enumerate() {
let id = format!("multi-{i}");
let handle = launch_scenario(
id,
prepared_entry.entry,
Arc::clone(&shutdown),
prepared_entry.start_delay,
)?;
handles.push(handle);
}
let mut errors: Vec<String> = Vec::new();
for mut handle in handles {
match handle.join(None) {
Ok(()) => {}
Err(e) => errors.push(e.to_string()),
}
}
if errors.is_empty() {
Ok(())
} else {
Err(SondaError::Runtime(RuntimeError::ScenariosFailed(
errors.join("; "),
)))
}
}
pub fn signal_shutdown(shutdown: &AtomicBool) {
shutdown.store(false, Ordering::SeqCst);
}
#[cfg(feature = "config")]
pub fn launch_multi_compiled(
file: CompiledFile,
shutdown: Arc<AtomicBool>,
) -> Result<Vec<crate::schedule::handle::ScenarioHandle>, SondaError> {
let CompiledFile {
scenario_name,
entries,
..
} = file;
let bus_ids = while_upstream_ids(&entries);
let mut buses: HashMap<String, Arc<GateBus>> = HashMap::with_capacity(bus_ids.len());
for id in bus_ids {
buses.insert(id, Arc::new(GateBus::new()));
}
let mut launches: Vec<LaunchPlan> = Vec::with_capacity(entries.len());
for compiled_entry in entries.into_iter() {
let id = compiled_entry.id.clone();
let while_clause = compiled_entry.while_clause.clone();
let delay_clause = compiled_entry.delay_clause.clone();
let phase_offset = compiled_entry.phase_offset.clone();
let translated = translate_entry(compiled_entry).map_err(|e| {
SondaError::Config(crate::ConfigError::invalid(format!("compile prepare: {e}")))
})?;
let mut expanded = expand_entry(translated)?;
let translated = match expanded.len() {
0 => continue,
1 => expanded.remove(0),
_ => {
return Err(SondaError::Config(crate::ConfigError::invalid(format!(
"scenario id {:?}: csv_replay multi-column expansion is not supported \
when `while:` is in use; specify a single column or remove the gate",
id.as_deref().unwrap_or("(anonymous)"),
))));
}
};
let translated = desugar_entry(translated)?;
validate_entry(&translated)?;
let upstream_bus = id.as_ref().and_then(|name| buses.get(name).cloned());
let gate_ctx = if let Some(ref clause) = while_clause {
let upstream = buses.get(&clause.ref_id).ok_or_else(|| {
SondaError::Config(crate::ConfigError::invalid(format!(
"while: ref '{}' not found among scenario ids",
clause.ref_id
)))
})?;
let spec = SubscriptionSpec {
after: None,
while_: Some(WhileSpec {
op: clause.op,
threshold: clause.value,
}),
};
let (rx, init) = upstream.subscribe(spec);
Some(GateContext {
gate_rx: rx,
initial: init,
delay: delay_clause,
has_after: false,
has_while: true,
close_emit: None,
})
} else {
None
};
let start_delay = match phase_offset {
Some(s) => crate::config::validate::parse_phase_offset(&s).map_err(|e| {
SondaError::Config(crate::ConfigError::invalid(format!("phase_offset: {e}")))
})?,
None => None,
};
launches.push(LaunchPlan {
id: id.clone(),
entry: translated,
gate_ctx,
upstream_bus,
start_delay,
});
}
let mut handles = Vec::with_capacity(launches.len());
for (idx, plan) in launches.into_iter().enumerate() {
let id = plan.id.unwrap_or_else(|| format!("multi-{idx}"));
match launch_scenario_with_gates(
id,
scenario_name.clone(),
plan.entry,
Arc::clone(&shutdown),
plan.start_delay,
plan.upstream_bus,
plan.gate_ctx,
) {
Ok(handle) => handles.push(handle),
Err(e) => {
for handle in &handles {
handle.stop();
}
for mut handle in handles {
let _ = handle.join_timeout(std::time::Duration::from_secs(1));
}
return Err(e);
}
}
}
Ok(handles)
}
#[cfg(feature = "config")]
pub fn run_multi_compiled(file: CompiledFile, shutdown: Arc<AtomicBool>) -> Result<(), SondaError> {
let handles = launch_multi_compiled(file, shutdown)?;
let mut errors: Vec<String> = Vec::new();
for mut handle in handles {
match handle.join(None) {
Ok(()) => {}
Err(e) => errors.push(e.to_string()),
}
}
if errors.is_empty() {
Ok(())
} else {
Err(SondaError::Runtime(RuntimeError::ScenariosFailed(
errors.join("; "),
)))
}
}
#[cfg(feature = "config")]
fn while_upstream_ids(entries: &[crate::compiler::compile_after::CompiledEntry]) -> Vec<String> {
let mut ids: Vec<String> = entries
.iter()
.filter_map(|e| e.while_clause.as_ref().map(|w| w.ref_id.clone()))
.collect();
ids.sort();
ids.dedup();
ids
}
#[cfg(feature = "config")]
struct LaunchPlan {
id: Option<String>,
entry: ScenarioEntry,
gate_ctx: Option<GateContext>,
upstream_bus: Option<Arc<GateBus>>,
start_delay: Option<std::time::Duration>,
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::config::{BaseScheduleConfig, LogScenarioConfig, ScenarioConfig, ScenarioEntry};
use crate::encoder::EncoderConfig;
use crate::generator::{GeneratorConfig, LogGeneratorConfig, TemplateConfig};
use crate::sink::SinkConfig;
#[cfg(feature = "config")]
use super::launch_multi_compiled;
use super::{run_multi, signal_shutdown};
fn metrics_entry_stdout(name: &str) -> ScenarioEntry {
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: name.to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})
}
fn logs_entry_stdout(name: &str) -> ScenarioEntry {
ScenarioEntry::Logs(LogScenarioConfig {
base: BaseScheduleConfig {
name: name.to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "test log event".to_string(),
field_pools: std::collections::BTreeMap::new(),
}],
severity_weights: None,
seed: Some(42),
},
encoder: EncoderConfig::JsonLines { precision: None },
})
}
#[test]
fn run_multi_with_empty_scenarios_returns_ok() {
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(vec![], shutdown);
assert!(result.is_ok(), "empty scenario list should return Ok");
}
#[test]
fn run_multi_with_single_metrics_scenario_returns_ok() {
let entries = vec![metrics_entry_stdout("single_metric")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"single metrics scenario should complete without error"
);
}
#[test]
fn run_multi_with_single_logs_scenario_returns_ok() {
let entries = vec![logs_entry_stdout("single_logs")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"single logs scenario should complete without error"
);
}
#[test]
fn run_multi_with_metrics_and_logs_both_complete() {
let entries = vec![
metrics_entry_stdout("concurrent_metrics"),
logs_entry_stdout("concurrent_logs"),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"both concurrent scenarios should complete without error"
);
}
#[test]
fn run_multi_three_concurrent_scenarios_all_complete() {
let entries = vec![
metrics_entry_stdout("m1"),
metrics_entry_stdout("m2"),
logs_entry_stdout("l1"),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"three concurrent scenarios should all complete without error"
);
}
#[test]
fn run_multi_shutdown_flag_stops_all_threads_within_two_seconds() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "shutdown_test_metric".to_string(),
rate: 10.0,
duration: None, gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Logs(LogScenarioConfig {
base: BaseScheduleConfig {
name: "shutdown_test_logs".to_string(),
rate: 10.0,
duration: None, gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "shutdown test".to_string(),
field_pools: std::collections::BTreeMap::new(),
}],
severity_weights: None,
seed: Some(0),
},
encoder: EncoderConfig::JsonLines { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_for_thread = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
signal_shutdown(&shutdown_for_thread);
});
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "shutdown should not produce an error");
assert!(
elapsed < Duration::from_secs(2),
"run_multi should return within 2 seconds of shutdown signal, took {:?}",
elapsed
);
}
#[test]
fn signal_shutdown_stores_false_with_seqcst_ordering() {
let flag = AtomicBool::new(true);
signal_shutdown(&flag);
assert!(
!flag.load(Ordering::SeqCst),
"signal_shutdown should set the flag to false"
);
}
#[test]
fn run_multi_with_invalid_sink_config_returns_err() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "error_test".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_test_cannot_create_this_file_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_err(),
"scenario with an invalid sink path should return Err"
);
let err_msg = result.unwrap_err().to_string();
assert!(
!err_msg.is_empty(),
"error message should be non-empty, got: {err_msg}"
);
}
#[test]
fn run_multi_collects_all_thread_errors() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "err_a".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_err_a_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "err_b".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_err_b_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(result.is_err(), "two failing scenarios should return Err");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains(';'),
"combined error should separate errors with ';', got: {err_msg}"
);
}
#[test]
fn run_multi_thread_errors_produce_runtime_not_config_variant() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "variant_test".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::File {
path: "/proc/sonda_variant_test_27.txt".to_string(),
},
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(result.is_err(), "invalid sink must produce an error");
let err = result.unwrap_err();
assert!(
matches!(
err,
crate::SondaError::Runtime(crate::RuntimeError::ScenariosFailed(_))
),
"thread join errors must be Runtime::ScenariosFailed, not Config; got: {err:?}"
);
}
#[test]
fn run_multi_with_minimal_phase_offset_emits_almost_immediately() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "minimal_offset".to_string(),
rate: 10.0,
duration: Some("200ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("1ms".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "minimal phase_offset should complete ok");
assert!(
elapsed < Duration::from_secs(2),
"minimal phase_offset must not add significant delay, took {:?}",
elapsed
);
}
#[test]
fn run_multi_accepts_zero_phase_offset() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "zero_offset".to_string(),
rate: 10.0,
duration: Some("200ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("0s".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"phase_offset '0s' should succeed (treated as no delay): {:?}",
result.err()
);
}
#[test]
fn run_multi_with_no_phase_offset_preserves_behavior() {
let entries = vec![metrics_entry_stdout("no_offset")];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"scenario without phase_offset should work as before"
);
}
#[test]
fn run_multi_respects_phase_offset_between_scenarios() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "first_immediate".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "second_delayed".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("500ms".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(result.is_ok(), "phase_offset multi-scenario should succeed");
assert!(
elapsed >= Duration::from_millis(400),
"total run time must include the phase_offset delay, took {:?}",
elapsed
);
}
#[test]
fn run_multi_shutdown_during_phase_offset_exits_cleanly() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "immediate_indef".to_string(),
rate: 10.0,
duration: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "long_delay".to_string(),
rate: 10.0,
duration: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("10s".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_for_thread = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
signal_shutdown(&shutdown_for_thread);
});
let start = Instant::now();
let result = run_multi(entries, shutdown);
let elapsed = start.elapsed();
assert!(
result.is_ok(),
"shutdown during phase_offset should not produce an error"
);
assert!(
elapsed < Duration::from_secs(2),
"run_multi must exit promptly when shutdown during phase_offset, took {:?}",
elapsed
);
}
#[test]
fn run_multi_rejects_invalid_phase_offset() {
let entries = vec![ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "bad_offset".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("not_a_duration".to_string()),
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
})];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_err(),
"invalid phase_offset must cause run_multi to return Err"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("phase_offset"),
"error message should mention phase_offset, got: {err_msg}"
);
}
#[test]
fn run_multi_with_clock_group_and_offsets() {
let entries = vec![
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "grouped_a".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: Some("test-group".to_string()),
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
ScenarioEntry::Metrics(ScenarioConfig {
base: BaseScheduleConfig {
name: "grouped_b".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: Some("200ms".to_string()),
clock_group: Some("test-group".to_string()),
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 2.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}),
];
let shutdown = Arc::new(AtomicBool::new(true));
let result = run_multi(entries, shutdown);
assert!(
result.is_ok(),
"scenarios with clock_group and offsets should complete"
);
}
#[cfg(feature = "config")]
#[test]
fn while_upstream_ids_returns_only_entries_referenced_by_a_while_clause() {
use super::while_upstream_ids;
use crate::compile_scenario_file_compiled;
use crate::compiler::expand::InMemoryPackResolver;
let yaml = "\
version: 2
defaults:
rate: 5
duration: 1s
encoder:
type: prometheus_text
sink:
type: stdout
scenarios:
- id: upstream_a
signal_type: metrics
name: upstream_a
generator:
type: sawtooth
min: 0.0
max: 100.0
period_secs: 60.0
- id: middle_b
signal_type: metrics
name: middle_b
generator:
type: constant
value: 1.0
while:
ref: upstream_a
op: '>'
value: 50.0
- id: lonely_c
signal_type: metrics
name: lonely_c
generator:
type: constant
value: 1.0
- id: lonely_d
signal_type: metrics
name: lonely_d
generator:
type: constant
value: 1.0
";
let resolver = InMemoryPackResolver::new();
let compiled =
compile_scenario_file_compiled(yaml, &resolver).expect("compile must succeed");
let ids = while_upstream_ids(&compiled.entries);
assert_eq!(
ids,
vec!["upstream_a".to_string()],
"only entries referenced by some while: clause must get a bus, got {ids:?}"
);
}
#[cfg(feature = "config")]
#[test]
fn launch_multi_compiled_partial_cleanup_stops_already_launched_handles() {
use crate::compile_scenario_file_compiled;
use crate::compiler::expand::InMemoryPackResolver;
let yaml = "\
version: 2
defaults:
rate: 50
duration: 10s
encoder:
type: prometheus_text
sink:
type: stdout
scenarios:
- id: cleanup_a
signal_type: metrics
name: cleanup_a
generator:
type: constant
value: 1.0
- id: cleanup_b
signal_type: metrics
name: cleanup_b
generator:
type: constant
value: 2.0
";
let resolver = InMemoryPackResolver::new();
let compiled =
compile_scenario_file_compiled(yaml, &resolver).expect("compile must succeed");
let shutdown = Arc::new(AtomicBool::new(true));
let mut handles =
launch_multi_compiled(compiled, Arc::clone(&shutdown)).expect("launch must succeed");
assert_eq!(handles.len(), 2, "must launch both entries");
assert!(
handles.iter().all(|h| h.is_alive()),
"both threads must be alive immediately after launch"
);
for handle in &handles {
handle.stop();
}
let deadline = Instant::now() + Duration::from_secs(2);
while Instant::now() < deadline && handles.iter().any(|h| h.is_alive()) {
thread::sleep(Duration::from_millis(20));
}
assert!(
handles.iter().all(|h| !h.is_alive()),
"every handle must exit after stop() — partial-launch cleanup must not leak threads"
);
for handle in &mut handles {
handle
.join(Some(Duration::from_secs(1)))
.expect("join must succeed after stop");
}
}
}