use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use crate::config::LogScenarioConfig;
use crate::encoder::create_encoder;
use crate::generator::create_log_generator;
use crate::model::metric::Labels;
use crate::schedule::core_loop::{self, GateContext, TickContext, TickResult};
use crate::schedule::is_in_spike;
use crate::schedule::stats::ScenarioStats;
use crate::schedule::ParsedSchedule;
use crate::sink::{create_sink, Sink};
use crate::SondaError;
pub fn run_logs(config: &LogScenarioConfig) -> Result<(), SondaError> {
let mut sink = create_sink(&config.sink, config.labels.as_ref())?;
run_logs_with_sink(config, sink.as_mut(), None, None)
}
pub fn run_logs_with_sink(
config: &LogScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
) -> Result<(), SondaError> {
run_logs_with_sink_gated(config, sink, shutdown, stats, None)
}
pub fn run_logs_with_sink_gated(
config: &LogScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
gate_ctx: Option<GateContext>,
) -> Result<(), SondaError> {
let schedule = ParsedSchedule::from_base_config(&config.base)?;
let generator = create_log_generator(&config.generator)?;
let encoder = create_encoder(&config.encoder)?;
let labels: Labels = if let Some(ref label_map) = config.labels {
let pairs: Vec<(&str, &str)> = label_map
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
Labels::from_pairs(&pairs)?
} else {
Labels::default()
};
let mut buf: Vec<u8> = Vec::with_capacity(512);
let mut tick_fn =
|ctx: &TickContext<'_>, sink: &mut dyn Sink| -> Result<TickResult, SondaError> {
let mut event = generator.generate(ctx.tick);
let needs_dynamic = !ctx.dynamic_labels.is_empty();
if ctx.spike_windows.is_empty() && !needs_dynamic {
event.labels = labels.clone();
} else {
let mut tl = labels.clone();
for dl in ctx.dynamic_labels {
tl.insert(dl.key.clone(), dl.label_value_for_tick(ctx.tick));
}
for sw in ctx.spike_windows {
if is_in_spike(ctx.elapsed, sw) {
tl.insert(sw.label.clone(), sw.label_value_for_tick(ctx.tick));
}
}
event.labels = tl;
}
buf.clear();
encoder.encode_log(&event, &mut buf)?;
let bytes_written = buf.len() as u64;
sink.write(&buf)?;
Ok(TickResult {
bytes_written,
metric_event: None,
})
};
let stats_for_flush = stats.clone();
let loop_result = match gate_ctx {
None => core_loop::run_schedule_loop(
&schedule,
config.rate,
shutdown,
stats,
sink,
&mut tick_fn,
),
Some(ctx) => core_loop::gated_loop(
&schedule,
config.rate,
shutdown,
stats,
ctx,
sink,
&mut tick_fn,
),
};
let flush_result = sink.flush();
match loop_result {
Ok(()) => core_loop::apply_flush_policy(&schedule, stats_for_flush.as_ref(), flush_result),
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
use super::*;
use crate::config::{BaseScheduleConfig, GapConfig, LogScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::generator::{LogGeneratorConfig, TemplateConfig};
use crate::sink::memory::MemorySink;
use crate::sink::SinkConfig;
fn make_config(rate: f64, duration: Option<&str>) -> LogScenarioConfig {
LogScenarioConfig {
base: BaseScheduleConfig {
name: "test_logs".to_string(),
rate,
duration: duration.map(|s| s.to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: LogGeneratorConfig::Template {
templates: vec![TemplateConfig {
message: "synthetic log event".to_string(),
field_pools: BTreeMap::new(),
}],
severity_weights: None,
seed: Some(0),
},
encoder: EncoderConfig::JsonLines { precision: None },
}
}
#[test]
fn run_logs_with_sink_rate_10_duration_1s_produces_approx_10_lines() {
let config = make_config(10.0, Some("1s"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let line_count = output.lines().count();
assert!(
(7..=13).contains(&line_count),
"expected ~10 log lines, got {line_count}"
);
}
#[test]
fn run_logs_with_sink_each_line_is_valid_json() {
let config = make_config(10.0, Some("1s"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed.get("message").is_some(),
"each JSON line must contain a 'message' key; line: {line}"
);
}
}
#[test]
fn run_logs_with_sink_shutdown_flag_stops_runner() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
let config = make_config(5.0, None); let mut sink = MemorySink::new();
let shutdown = Arc::new(AtomicBool::new(true));
let flag_clone = Arc::clone(&shutdown);
thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
flag_clone.store(false, Ordering::SeqCst);
});
let result = run_logs_with_sink(&config, &mut sink, Some(shutdown.as_ref()), None);
assert!(
result.is_ok(),
"runner must return Ok when stopped via shutdown flag"
);
}
#[test]
fn run_logs_with_sink_gap_suppresses_output() {
let mut config = make_config(100.0, Some("2s"));
config.gaps = Some(GapConfig {
every: "10s".to_string(),
r#for: "9s".to_string(), });
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("valid UTF-8");
let line_count = output.lines().count();
assert!(
line_count < 150,
"gap should suppress events: expected < 150 lines, got {line_count}"
);
}
#[test]
fn run_logs_with_sink_duration_500ms_exits_promptly() {
use std::time::Instant;
let config = make_config(5.0, Some("500ms"));
let mut sink = MemorySink::new();
let t0 = Instant::now();
run_logs_with_sink(&config, &mut sink, None, None).expect("must not error");
let elapsed = t0.elapsed();
assert!(
elapsed.as_secs() < 2,
"runner should have exited after ~500ms, elapsed={elapsed:?}"
);
}
#[cfg(feature = "config")]
#[test]
fn log_scenario_config_deserializes_template_yaml() {
let yaml = r#"
name: app_logs_template
rate: 10
duration: 60s
generator:
type: template
templates:
- message: "Request from {ip} to {endpoint}"
field_pools:
ip:
- "10.0.0.1"
- "10.0.0.2"
endpoint:
- "/api/v1/health"
- "/api/v1/metrics"
severity_weights:
info: 0.7
warn: 0.2
error: 0.1
seed: 42
encoder:
type: json_lines
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("log-template YAML must deserialize");
assert_eq!(config.name, "app_logs_template");
assert_eq!(config.rate, 10.0);
assert_eq!(config.duration.as_deref(), Some("60s"));
assert!(matches!(config.encoder, EncoderConfig::JsonLines { .. }));
assert!(matches!(config.sink, SinkConfig::Stdout));
}
#[cfg(feature = "config")]
#[test]
fn log_scenario_config_deserializes_replay_yaml() {
let yaml = r#"
name: app_logs_replay
rate: 5
duration: 30s
generator:
type: replay
file: /var/log/app.log
encoder:
type: json_lines
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("log-replay YAML must deserialize");
assert_eq!(config.name, "app_logs_replay");
assert_eq!(config.rate, 5.0);
assert!(matches!(
config.generator,
LogGeneratorConfig::Replay { .. }
));
}
#[cfg(feature = "config")]
#[test]
fn log_scenario_config_default_encoder_is_json_lines() {
let yaml = r#"
name: defaults_test
rate: 1
generator:
type: template
templates:
- message: "hello"
field_pools: {}
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("minimal log YAML must deserialize");
assert!(
matches!(config.encoder, EncoderConfig::JsonLines { .. }),
"default encoder must be json_lines, got {:?}",
config.encoder
);
}
#[cfg(feature = "config")]
#[test]
fn log_scenario_config_default_sink_is_stdout() {
let yaml = r#"
name: defaults_test
rate: 1
generator:
type: template
templates:
- message: "hello"
field_pools: {}
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("minimal log YAML must deserialize");
assert!(
matches!(config.sink, SinkConfig::Stdout),
"default sink must be stdout, got {:?}",
config.sink
);
}
#[cfg(feature = "config")]
#[test]
fn log_scenario_config_with_gaps_and_bursts_deserializes() {
let yaml = r#"
name: full_config
rate: 20
duration: 120s
generator:
type: template
templates:
- message: "event"
field_pools: {}
gaps:
every: 10s
for: 2s
bursts:
every: 5s
for: 1s
multiplier: 10.0
encoder:
type: syslog
hostname: myhost
app_name: myapp
sink:
type: stdout
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("full log YAML must deserialize");
let gaps = config.gaps.as_ref().expect("gaps must be present");
assert_eq!(gaps.every, "10s");
assert_eq!(gaps.r#for, "2s");
let bursts = config.bursts.as_ref().expect("bursts must be present");
assert_eq!(bursts.every, "5s");
assert_eq!(bursts.r#for, "1s");
assert_eq!(bursts.multiplier, 10.0);
}
#[test]
fn run_logs_with_sink_labels_appear_in_json_output() {
let mut config = make_config(10.0, Some("1s"));
let mut label_map = HashMap::new();
label_map.insert("device".to_string(), "wlan0".to_string());
label_map.insert("hostname".to_string(), "router_01".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(
!lines.is_empty(),
"runner must produce at least one line of output"
);
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert_eq!(
parsed["labels"]["device"], "wlan0",
"every JSON line must contain label device=wlan0; line: {line}"
);
assert_eq!(
parsed["labels"]["hostname"], "router_01",
"every JSON line must contain label hostname=router_01; line: {line}"
);
}
}
#[test]
fn run_logs_with_sink_no_labels_produces_empty_labels_object() {
let config = make_config(10.0, Some("500ms"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert_eq!(
parsed["labels"],
serde_json::json!({}),
"when no labels configured, labels must be empty object; line: {line}"
);
}
}
#[test]
fn run_logs_with_sink_labels_appear_in_syslog_output() {
let mut config = make_config(10.0, Some("500ms"));
config.encoder = EncoderConfig::Syslog {
hostname: None,
app_name: None,
};
let mut label_map = HashMap::new();
label_map.insert("env".to_string(), "prod".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(
!lines.is_empty(),
"runner must produce at least one syslog line"
);
for line in &lines {
assert!(
line.contains("[sonda env=\"prod\"]"),
"every syslog line must contain structured data with labels; line: {line}"
);
}
}
#[test]
fn log_scenario_config_is_clone_and_debug() {
let config = make_config(10.0, Some("1s"));
let cloned = config.clone();
assert_eq!(cloned.name, config.name);
assert_eq!(cloned.rate, config.rate);
let s = format!("{config:?}");
assert!(s.contains("LogScenarioConfig") || s.contains("test_logs"));
}
fn make_config_with_spike(
rate: f64,
duration: Option<&str>,
spike: crate::config::CardinalitySpikeConfig,
) -> LogScenarioConfig {
let mut config = make_config(rate, duration);
config.cardinality_spikes = Some(vec![spike]);
config
}
#[test]
fn run_logs_with_sink_spike_labels_appear_during_spike_window() {
let spike = crate::config::CardinalitySpikeConfig {
label: "pod_name".to_string(),
every: "10s".to_string(),
r#for: "9s".to_string(),
cardinality: 5,
strategy: crate::config::SpikeStrategy::Counter,
prefix: Some("pod-".to_string()),
seed: None,
};
let config = make_config_with_spike(10.0, Some("1s"), spike);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(
!lines.is_empty(),
"runner must produce at least one line of output"
);
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed["labels"]["pod_name"].is_string(),
"every JSON line during spike must contain pod_name label; line: {line}"
);
let pod_val = parsed["labels"]["pod_name"].as_str().unwrap();
assert!(
pod_val.starts_with("pod-"),
"spike label value must start with prefix 'pod-', got: {pod_val}"
);
}
}
#[test]
fn run_logs_with_sink_no_spike_config_produces_no_spike_labels() {
let config = make_config(10.0, Some("500ms"));
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed["labels"]["pod_name"].is_null(),
"without spike config, pod_name must not appear in labels; line: {line}"
);
}
}
fn make_config_with_dynamic_labels(
rate: f64,
duration: Option<&str>,
dynamic_labels: Vec<crate::config::DynamicLabelConfig>,
) -> LogScenarioConfig {
let mut config = make_config(rate, duration);
config.dynamic_labels = Some(dynamic_labels);
config
}
#[test]
fn run_logs_dynamic_labels_counter_appear_in_output() {
let config = make_config_with_dynamic_labels(
10.0,
Some("1s"),
vec![crate::config::DynamicLabelConfig {
key: "pod_name".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("pod-".to_string()),
cardinality: 5,
},
}],
);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(!lines.is_empty(), "runner must produce output");
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed["labels"]["pod_name"].is_string(),
"every JSON line must contain dynamic label pod_name; line: {line}"
);
let val = parsed["labels"]["pod_name"].as_str().unwrap();
assert!(
val.starts_with("pod-"),
"dynamic label value must start with prefix 'pod-', got: {val}"
);
}
}
#[test]
fn run_logs_dynamic_labels_values_list_cycle_in_output() {
let config = make_config_with_dynamic_labels(
10.0,
Some("1s"),
vec![crate::config::DynamicLabelConfig {
key: "region".to_string(),
strategy: crate::config::DynamicLabelStrategy::ValuesList {
values: vec!["alpha".to_string(), "beta".to_string(), "gamma".to_string()],
},
}],
);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(!lines.is_empty());
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert!(
parsed["labels"]["region"].is_string(),
"every JSON line must contain dynamic label region; line: {line}"
);
}
let mut distinct_values = std::collections::HashSet::new();
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
if let Some(v) = parsed["labels"]["region"].as_str() {
distinct_values.insert(v.to_string());
}
}
assert!(
distinct_values.len() >= 2,
"with 3-element values list, at least 2 distinct values should appear: {distinct_values:?}"
);
}
#[test]
fn run_logs_dynamic_labels_respects_cardinality_ceiling() {
let config = make_config_with_dynamic_labels(
50.0,
Some("1s"),
vec![crate::config::DynamicLabelConfig {
key: "pod".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("pod-".to_string()),
cardinality: 3,
},
}],
);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
let mut distinct_values = std::collections::HashSet::new();
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
if let Some(v) = parsed["labels"]["pod"].as_str() {
distinct_values.insert(v.to_string());
}
}
assert_eq!(
distinct_values.len(),
3,
"with cardinality=3, exactly 3 distinct values must appear, got {:?}",
distinct_values
);
}
#[test]
fn run_logs_dynamic_labels_and_static_labels_coexist() {
let mut config = make_config_with_dynamic_labels(
10.0,
Some("1s"),
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("host-".to_string()),
cardinality: 5,
},
}],
);
let mut label_map = HashMap::new();
label_map.insert("env".to_string(), "staging".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
assert_eq!(
parsed["labels"]["env"], "staging",
"static label must be present; line: {line}"
);
assert!(
parsed["labels"]["hostname"].is_string(),
"dynamic label must be present; line: {line}"
);
}
}
#[test]
fn run_logs_dynamic_label_wins_on_key_collision() {
let mut config = make_config_with_dynamic_labels(
10.0,
Some("500ms"),
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("dynamic-".to_string()),
cardinality: 3,
},
}],
);
let mut label_map = HashMap::new();
label_map.insert("hostname".to_string(), "static-value".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
run_logs_with_sink(&config, &mut sink, None, None).expect("log runner must not error");
let output = String::from_utf8(sink.buffer.clone()).expect("output must be valid UTF-8");
for line in output.lines() {
let parsed: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON: {e}\nline: {line}"));
let val = parsed["labels"]["hostname"].as_str().unwrap();
assert!(
val.starts_with("dynamic-"),
"dynamic label must overwrite static; got: {val}"
);
}
}
}