use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::config::validate::parse_duration;
use crate::config::ScenarioConfig;
use crate::encoder::create_encoder;
use crate::generator::create_generator;
use crate::model::metric::{Labels, MetricEvent};
use crate::schedule::stats::ScenarioStats;
use crate::schedule::{is_in_burst, is_in_gap, time_until_gap_end, BurstWindow, GapWindow};
use crate::sink::{create_sink, Sink};
use crate::SondaError;
pub fn run(config: &ScenarioConfig) -> Result<(), SondaError> {
let mut sink = create_sink(&config.sink)?;
run_with_sink(config, sink.as_mut(), None, None)
}
pub fn run_with_sink(
config: &ScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
) -> Result<(), SondaError> {
let total_duration: Option<Duration> =
config.duration.as_deref().map(parse_duration).transpose()?;
let gap_window: Option<GapWindow> = config
.gaps
.as_ref()
.map(|g| -> Result<GapWindow, SondaError> {
Ok(GapWindow {
every: parse_duration(&g.every)?,
duration: parse_duration(&g.r#for)?,
})
})
.transpose()?;
let burst_window: Option<BurstWindow> = config
.bursts
.as_ref()
.map(|b| -> Result<BurstWindow, SondaError> {
Ok(BurstWindow {
every: parse_duration(&b.every)?,
duration: parse_duration(&b.r#for)?,
multiplier: b.multiplier,
})
})
.transpose()?;
let generator = create_generator(&config.generator, config.rate);
let encoder = create_encoder(&config.encoder);
let labels: Labels = if let Some(ref label_map) = config.labels {
let pairs: Vec<(&str, &str)> = label_map
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
Labels::from_pairs(&pairs)?
} else {
Labels::from_pairs(&[])?
};
let name = config.name.clone();
let base_interval = Duration::from_secs_f64(1.0 / config.rate);
let mut buf: Vec<u8> = Vec::with_capacity(256);
let start = Instant::now();
let mut next_deadline = start;
let mut tick: u64 = 0;
let mut rate_window_tick: u64 = 0;
let mut rate_window_start = start;
let loop_result = (|| -> Result<(), SondaError> {
loop {
if let Some(flag) = shutdown {
if !flag.load(Ordering::SeqCst) {
break;
}
}
let elapsed = start.elapsed();
if let Some(total) = total_duration {
if elapsed >= total {
break;
}
}
let currently_in_gap = if let Some(ref gap) = gap_window {
if is_in_gap(elapsed, gap) {
if let Some(ref s) = stats {
if let Ok(mut st) = s.write() {
st.in_gap = true;
st.in_burst = false;
}
}
let sleep_for = time_until_gap_end(elapsed, gap);
if sleep_for > Duration::ZERO {
thread::sleep(sleep_for);
}
let now = Instant::now();
next_deadline = now;
tick = (start.elapsed().as_secs_f64() / base_interval.as_secs_f64()) as u64;
continue;
} else {
false
}
} else {
false
};
let currently_in_burst;
let effective_interval = if let Some(ref burst) = burst_window {
if let Some(multiplier) = is_in_burst(elapsed, burst) {
currently_in_burst = true;
Duration::from_secs_f64(base_interval.as_secs_f64() / multiplier)
} else {
currently_in_burst = false;
base_interval
}
} else {
currently_in_burst = false;
base_interval
};
let now = Instant::now();
if now < next_deadline {
thread::sleep(next_deadline - now);
}
let wall_now = std::time::SystemTime::now();
let value = generator.value(tick);
let event = MetricEvent::with_timestamp(name.clone(), value, labels.clone(), wall_now)?;
buf.clear();
encoder.encode_metric(&event, &mut buf)?;
let bytes_written = buf.len() as u64;
sink.write(&buf)?;
if let Some(ref s) = stats {
let window_elapsed = rate_window_start.elapsed();
let current_rate = if window_elapsed >= Duration::from_secs(1) {
let events_in_window = tick - rate_window_tick;
let rate = events_in_window as f64 / window_elapsed.as_secs_f64();
rate_window_tick = tick;
rate_window_start = Instant::now();
rate
} else {
s.read().map(|st| st.current_rate).unwrap_or(0.0)
};
if let Ok(mut st) = s.write() {
st.total_events += 1;
st.bytes_emitted += bytes_written;
st.current_rate = current_rate;
st.in_gap = currently_in_gap;
st.in_burst = currently_in_burst;
}
}
next_deadline += effective_interval;
tick += 1;
}
Ok(())
})();
let flush_result = sink.flush();
match loop_result {
Ok(()) => flush_result,
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use crate::config::{GapConfig, ScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::generator::GeneratorConfig;
use crate::sink::memory::MemorySink;
use crate::sink::SinkConfig;
fn make_config(rate: f64, duration: &str, gaps: Option<GapConfig>) -> ScenarioConfig {
ScenarioConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
generator: GeneratorConfig::Constant { value: 1.0 },
gaps,
bursts: None,
labels: None,
encoder: EncoderConfig::PrometheusText,
sink: SinkConfig::Stdout, }
}
#[test]
fn run_completes_without_error_for_short_duration() {
let config = make_config(100.0, "100ms", None);
let result = super::run(&config);
assert!(
result.is_ok(),
"run must succeed for valid config: {result:?}"
);
}
#[test]
fn integration_rate_100_duration_1s_emits_approximately_100_events() {
let config = make_config(100.0, "1s", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
(80..=120).contains(&newlines),
"expected ~100 events (80–120), got {newlines}"
);
}
#[test]
fn integration_output_lines_start_with_metric_name() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
for line in output.lines() {
assert!(
line.starts_with("up"),
"each line must start with metric name 'up', got: {line:?}"
);
}
}
#[test]
fn integration_output_ends_with_newline() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
assert!(
sink.buffer.ends_with(b"\n"),
"output must end with a newline"
);
}
#[test]
fn integration_gap_suppresses_events() {
let config = make_config(
100.0,
"5s",
Some(GapConfig {
every: "3s".to_string(),
r#for: "1s".to_string(),
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines < 500,
"gap must suppress events: expected < 500, got {newlines}"
);
assert!(
newlines > 0,
"some events must be emitted outside of gaps, got {newlines}"
);
}
#[test]
fn run_with_invalid_duration_returns_err() {
let mut config = make_config(100.0, "bad_duration", None);
config.duration = Some("not_a_duration".to_string());
let result = super::run(&config);
assert!(result.is_err(), "invalid duration must return Err");
}
#[test]
fn run_with_invalid_gap_every_returns_err() {
let mut config = make_config(100.0, "1s", None);
config.gaps = Some(GapConfig {
every: "bad".to_string(),
r#for: "1s".to_string(),
});
let result = super::run(&config);
assert!(result.is_err(), "invalid gap.every must return Err");
}
#[test]
fn integration_labels_appear_in_output() {
let mut config = make_config(50.0, "100ms", None);
let mut label_map = std::collections::HashMap::new();
label_map.insert("host".to_string(), "server1".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
assert!(
output.contains("host=\"server1\""),
"label must appear in output, got:\n{output}"
);
}
fn make_config_with_burst(
rate: f64,
duration: &str,
gaps: Option<crate::config::GapConfig>,
bursts: Option<crate::config::BurstConfig>,
) -> crate::config::ScenarioConfig {
crate::config::ScenarioConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
generator: crate::generator::GeneratorConfig::Constant { value: 1.0 },
gaps,
bursts,
labels: None,
encoder: crate::encoder::EncoderConfig::PrometheusText,
sink: crate::sink::SinkConfig::Stdout,
}
}
#[test]
fn integration_burst_increases_event_count() {
let config = make_config_with_burst(
10.0,
"1s",
None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "9s".to_string(),
multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 15,
"burst must increase event count above base rate: expected >15, got {newlines}"
);
assert!(
newlines < 100,
"event count must be sane (not runaway): expected <100, got {newlines}"
);
}
#[test]
fn integration_burst_then_normal_produces_mixed_rate() {
let config = make_config_with_burst(
100.0,
"2s",
None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "1s".to_string(),
multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 200,
"burst phase must produce more than base rate alone: expected >200, got {newlines}"
);
assert!(
newlines <= 900,
"total event count must be in expected range, got {newlines}"
);
}
#[test]
fn integration_gap_wins_over_burst_suppresses_events() {
let config = make_config_with_burst(
100.0,
"3s",
Some(GapConfig {
every: "3s".to_string(),
r#for: "2s".to_string(),
}),
Some(crate::config::BurstConfig {
every: "3s".to_string(),
r#for: "2500ms".to_string(), multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines < 1000,
"gap must suppress many events that burst would have produced: expected <1000, got {newlines}"
);
assert!(
newlines > 0,
"some events must be emitted outside of gaps, got {newlines}"
);
}
#[test]
fn integration_gap_covering_full_window_produces_zero_events_even_with_burst() {
let config_no_gap_burst = make_config_with_burst(
1000.0,
"500ms",
None,
Some(crate::config::BurstConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
multiplier: 5.0,
}),
);
let mut sink_no_gap = MemorySink::new();
super::run_with_sink(&config_no_gap_burst, &mut sink_no_gap, None, None)
.expect("run must succeed");
let events_burst_only = sink_no_gap.buffer.iter().filter(|&&b| b == b'\n').count();
let config_gap_and_burst = make_config_with_burst(
1000.0,
"500ms",
Some(GapConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
}),
Some(crate::config::BurstConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
multiplier: 5.0,
}),
);
let mut sink_gap_burst = MemorySink::new();
super::run_with_sink(&config_gap_and_burst, &mut sink_gap_burst, None, None)
.expect("run must succeed");
let events_gap_and_burst = sink_gap_burst
.buffer
.iter()
.filter(|&&b| b == b'\n')
.count();
assert!(
events_gap_and_burst < events_burst_only,
"gap must suppress burst events: gap+burst={events_gap_and_burst} must be < burst-only={events_burst_only}"
);
}
#[test]
fn shutdown_flag_stops_run_during_burst() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let config = make_config_with_burst(
1000.0,
"60s", None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "9s".to_string(),
multiplier: 5.0,
}),
);
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_clone = Arc::clone(&shutdown);
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
shutdown_clone.store(false, Ordering::SeqCst);
});
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, Some(&shutdown), None).expect("run must succeed");
handle.join().expect("thread must complete");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 0,
"some events must have been emitted before shutdown"
);
}
}