use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::config::validate::parse_duration;
use crate::config::LogScenarioConfig;
use crate::encoder::create_encoder;
use crate::generator::create_log_generator;
use crate::schedule::stats::ScenarioStats;
use crate::schedule::{is_in_burst, is_in_gap, time_until_gap_end, BurstWindow, GapWindow};
use crate::sink::{create_sink, Sink};
use crate::SondaError;
pub fn run_logs(config: &LogScenarioConfig) -> Result<(), SondaError> {
let mut sink = create_sink(&config.sink)?;
run_logs_with_sink(config, sink.as_mut(), None, None)
}
pub fn run_logs_with_sink(
config: &LogScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
) -> Result<(), SondaError> {
let total_duration: Option<Duration> =
config.duration.as_deref().map(parse_duration).transpose()?;
let gap_window: Option<GapWindow> = config
.gaps
.as_ref()
.map(|g| -> Result<GapWindow, SondaError> {
Ok(GapWindow {
every: parse_duration(&g.every)?,
duration: parse_duration(&g.r#for)?,
})
})
.transpose()?;
let burst_window: Option<BurstWindow> = config
.bursts
.as_ref()
.map(|b| -> Result<BurstWindow, SondaError> {
Ok(BurstWindow {
every: parse_duration(&b.every)?,
duration: parse_duration(&b.r#for)?,
multiplier: b.multiplier,
})
})
.transpose()?;
let generator = create_log_generator(&config.generator)?;
let encoder = create_encoder(&config.encoder);
let base_interval = Duration::from_secs_f64(1.0 / config.rate);
let mut buf: Vec<u8> = Vec::with_capacity(512);
let start = Instant::now();
let mut next_deadline = start;
let mut tick: u64 = 0;
let mut rate_window_tick: u64 = 0;
let mut rate_window_start = start;
let loop_result = (|| -> Result<(), SondaError> {
loop {
if let Some(flag) = shutdown {
if !flag.load(Ordering::SeqCst) {
break;
}
}
let elapsed = start.elapsed();
if let Some(total) = total_duration {
if elapsed >= total {
break;
}
}
let currently_in_gap = if let Some(ref gap) = gap_window {
if is_in_gap(elapsed, gap) {
if let Some(ref s) = stats {
if let Ok(mut st) = s.write() {
st.in_gap = true;
st.in_burst = false;
}
}
let sleep_for = time_until_gap_end(elapsed, gap);
if sleep_for > Duration::ZERO {
thread::sleep(sleep_for);
}
let now = Instant::now();
next_deadline = now;
tick = (start.elapsed().as_secs_f64() / base_interval.as_secs_f64()) as u64;
continue;
} else {
false
}
} else {
false
};
let currently_in_burst;
let effective_interval = if let Some(ref burst) = burst_window {
if let Some(multiplier) = is_in_burst(elapsed, burst) {
currently_in_burst = true;
Duration::from_secs_f64(base_interval.as_secs_f64() / multiplier)
} else {
currently_in_burst = false;
base_interval
}
} else {
currently_in_burst = false;
base_interval
};
let now = Instant::now();
if now < next_deadline {
thread::sleep(next_deadline - now);
}
let event = generator.generate(tick);
buf.clear();
encoder.encode_log(&event, &mut buf)?;
let bytes_written = buf.len() as u64;
sink.write(&buf)?;
if let Some(ref s) = stats {
let window_elapsed = rate_window_start.elapsed();
let current_rate = if window_elapsed >= Duration::from_secs(1) {
let events_in_window = tick - rate_window_tick;
let rate = events_in_window as f64 / window_elapsed.as_secs_f64();
rate_window_tick = tick;
rate_window_start = Instant::now();
rate
} else {
s.read().map(|st| st.current_rate).unwrap_or(0.0)
};
if let Ok(mut st) = s.write() {
st.total_events += 1;
st.bytes_emitted += bytes_written;
st.current_rate = current_rate;
st.in_gap = currently_in_gap;
st.in_burst = currently_in_burst;
}
}
next_deadline += effective_interval;
tick += 1;
}
Ok(())
})();
let flush_result = sink.flush();
match loop_result {
Ok(()) => flush_result,
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::config::{GapConfig, LogScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::generator::{LogGeneratorConfig, TemplateConfig};
use crate::sink::memory::MemorySink;
use crate::sink::SinkConfig;
fn make_config(rate: f64, duration: Option<&str>) -> LogScenarioConfig {
LogScenarioConfig {
name: "test_logs".to_string(),
rate,
duration: duration.map(|s| s.to_string()),
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "synthetic log event".to_string(),
field_pools: HashMap::new(),
}],
severity_weights: None,
seed: Some(0),
},
gaps: None,
bursts: None,
encoder: EncoderConfig::JsonLines,
sink: SinkConfig::Stdout,
}
}
#[test]
fn run_logs_with_sink_rate_10_duration_1s_produces_approx_10_lines() {
let config = make_config(10.0, Some("1s"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let line_count = output.lines().count();
assert!(
(7..=13).contains(&line_count),
"expected ~10 log lines, got {line_count}"
);
}
#[test]
fn run_logs_with_sink_each_line_is_valid_json() {
let config = make_config(10.0, Some("1s"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed.get("message").is_some(),
"each JSON line must contain a 'message' key; line: {line}"
);
}
}
#[test]
fn run_logs_with_sink_shutdown_flag_stops_runner() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
let config = make_config(5.0, None); let mut sink = MemorySink::new();
let shutdown = Arc::new(AtomicBool::new(true));
let flag_clone = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
flag_clone.store(false, Ordering::SeqCst);
});
let result = run_logs_with_sink(&config, &mut sink, Some(shutdown.as_ref()), None);
assert!(
result.is_ok(),
"runner must return Ok when stopped via shutdown flag"
);
}
#[test]
fn run_logs_with_sink_gap_suppresses_output() {
let mut config = make_config(100.0, Some("2s"));
config.gaps = Some(GapConfig {
every: "10s".to_string(),
r#for: "9s".to_string(), });
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("valid UTF-8");
let line_count = output.lines().count();
assert!(
line_count < 150,
"gap should suppress events: expected < 150 lines, got {line_count}"
);
}
#[test]
fn run_logs_with_sink_duration_500ms_exits_promptly() {
use std::time::Instant;
let config = make_config(5.0, Some("500ms"));
let mut sink = MemorySink::new();
let t0 = Instant::now();
run_logs_with_sink(&config, &mut sink, None, None).expect("must not error");
let elapsed = t0.elapsed();
assert!(
elapsed.as_secs() < 2,
"runner should have exited after ~500ms, elapsed={elapsed:?}"
);
}
#[test]
fn log_scenario_config_deserializes_template_yaml() {
let yaml = r#"
name: app_logs_template
rate: 10
duration: 60s
generator:
type: template
templates:
- message: "Request from {ip} to {endpoint}"
field_pools:
ip:
- "10.0.0.1"
- "10.0.0.2"
endpoint:
- "/api/v1/health"
- "/api/v1/metrics"
severity_weights:
info: 0.7
warn: 0.2
error: 0.1
seed: 42
encoder:
type: json_lines
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml::from_str(yaml).expect("log-template YAML must deserialize");
assert_eq!(config.name, "app_logs_template");
assert_eq!(config.rate, 10.0);
assert_eq!(config.duration.as_deref(), Some("60s"));
assert!(matches!(config.encoder, EncoderConfig::JsonLines));
assert!(matches!(config.sink, SinkConfig::Stdout));
}
#[test]
fn log_scenario_config_deserializes_replay_yaml() {
let yaml = r#"
name: app_logs_replay
rate: 5
duration: 30s
generator:
type: replay
file: /var/log/app.log
encoder:
type: json_lines
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml::from_str(yaml).expect("log-replay YAML must deserialize");
assert_eq!(config.name, "app_logs_replay");
assert_eq!(config.rate, 5.0);
assert!(matches!(
config.generator,
LogGeneratorConfig::Replay { .. }
));
}
#[test]
fn log_scenario_config_default_encoder_is_json_lines() {
let yaml = r#"
name: defaults_test
rate: 1
generator:
type: template
templates:
- message: "hello"
field_pools: {}
"#;
let config: LogScenarioConfig =
serde_yaml::from_str(yaml).expect("minimal log YAML must deserialize");
assert!(
matches!(config.encoder, EncoderConfig::JsonLines),
"default encoder must be json_lines, got {:?}",
config.encoder
);
}
#[test]
fn log_scenario_config_default_sink_is_stdout() {
let yaml = r#"
name: defaults_test
rate: 1
generator:
type: template
templates:
- message: "hello"
field_pools: {}
"#;
let config: LogScenarioConfig =
serde_yaml::from_str(yaml).expect("minimal log YAML must deserialize");
assert!(
matches!(config.sink, SinkConfig::Stdout),
"default sink must be stdout, got {:?}",
config.sink
);
}
#[test]
fn log_scenario_config_with_gaps_and_bursts_deserializes() {
let yaml = r#"
name: full_config
rate: 20
duration: 120s
generator:
type: template
templates:
- message: "event"
field_pools: {}
gaps:
every: 10s
for: 2s
bursts:
every: 5s
for: 1s
multiplier: 10.0
encoder:
type: syslog
hostname: myhost
app_name: myapp
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml::from_str(yaml).expect("full log YAML must deserialize");
let gaps = config.gaps.as_ref().expect("gaps must be present");
assert_eq!(gaps.every, "10s");
assert_eq!(gaps.r#for, "2s");
let bursts = config.bursts.as_ref().expect("bursts must be present");
assert_eq!(bursts.every, "5s");
assert_eq!(bursts.r#for, "1s");
assert_eq!(bursts.multiplier, 10.0);
}
#[test]
fn log_scenario_config_is_clone_and_debug() {
let config = make_config(10.0, Some("1s"));
let cloned = config.clone();
assert_eq!(cloned.name, config.name);
assert_eq!(cloned.rate, config.rate);
let s = format!("{config:?}");
assert!(s.contains("LogScenarioConfig") || s.contains("test_logs"));
}
}