use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use crate::config::SummaryScenarioConfig;
use crate::encoder::create_encoder;
use crate::generator::histogram::to_distribution;
use crate::generator::summary::{SummaryGenerator, DEFAULT_SUMMARY_QUANTILES};
use crate::model::metric::{Labels, MetricEvent, ValidatedMetricName};
use crate::schedule::core_loop::{self, GateContext, TickContext, TickResult};
use crate::schedule::is_in_spike;
use crate::schedule::stats::ScenarioStats;
use crate::schedule::ParsedSchedule;
use crate::sink::{create_sink, Sink};
use crate::SondaError;
pub fn run(config: &SummaryScenarioConfig) -> Result<(), SondaError> {
let mut sink = create_sink(&config.sink, None)?;
run_with_sink(config, sink.as_mut(), None, None)
}
pub fn run_with_sink(
config: &SummaryScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
) -> Result<(), SondaError> {
run_with_sink_gated(config, sink, shutdown, stats, None)
}
pub fn run_with_sink_gated(
config: &SummaryScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
gate_ctx: Option<GateContext>,
) -> Result<(), SondaError> {
let schedule = ParsedSchedule::from_base_config(&config.base)?;
let quantiles: Vec<f64> = config
.quantiles
.clone()
.unwrap_or_else(|| DEFAULT_SUMMARY_QUANTILES.to_vec());
let distribution = to_distribution(&config.distribution);
let observations_per_tick = config.observations_per_tick.unwrap_or(100);
let mean_shift_per_sec = config.mean_shift_per_sec.unwrap_or(0.0);
let seed = config.seed.unwrap_or(0);
let mut summary_gen = SummaryGenerator::new(
quantiles.clone(),
distribution,
observations_per_tick,
mean_shift_per_sec,
seed,
config.rate,
);
let encoder = create_encoder(&config.encoder)?;
let labels: Arc<Labels> = {
let inner = if let Some(ref label_map) = config.labels {
let pairs: Vec<(&str, &str)> = label_map
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
Labels::from_pairs(&pairs)?
} else {
Labels::from_pairs(&[])?
};
Arc::new(inner)
};
let base_name = ValidatedMetricName::new(&config.name)?;
let count_name = ValidatedMetricName::new(&format!("{}_count", config.name))?;
let sum_name = ValidatedMetricName::new(&format!("{}_sum", config.name))?;
let quantile_strings: Vec<String> = quantiles.iter().map(|q| format!("{}", q)).collect();
let prebuilt_quantile_labels: Vec<Arc<Labels>> = quantile_strings
.iter()
.map(|q_val| {
let mut ql = (*labels).clone();
ql.insert("quantile".to_string(), q_val.clone());
Arc::new(ql)
})
.collect();
let prebuilt_count_sum_labels: Arc<Labels> = Arc::clone(&labels);
let mut buf: Vec<u8> = Vec::with_capacity(1024);
let mut tick_fn =
|ctx: &TickContext<'_>, sink: &mut dyn Sink| -> Result<TickResult, SondaError> {
let wall_now = std::time::SystemTime::now();
let sample = summary_gen.observe(ctx.tick);
let needs_dynamic = !ctx.dynamic_labels.is_empty();
let has_active_spike = ctx
.spike_windows
.iter()
.any(|sw| is_in_spike(ctx.elapsed, sw));
let needs_clone = needs_dynamic || has_active_spike;
let mut total_bytes: u64 = 0;
for (i, &(_q_target, q_value)) in sample.quantiles.iter().enumerate() {
let quantile_labels = if needs_clone {
let mut ql = (*prebuilt_quantile_labels[i]).clone();
for dl in ctx.dynamic_labels {
ql.insert(dl.key.clone(), dl.label_value_for_tick(ctx.tick));
}
for sw in ctx.spike_windows {
if is_in_spike(ctx.elapsed, sw) {
ql.insert(sw.label.clone(), sw.label_value_for_tick(ctx.tick));
}
}
Arc::new(ql)
} else {
Arc::clone(&prebuilt_quantile_labels[i])
};
let event =
MetricEvent::from_parts(base_name.clone(), q_value, quantile_labels, wall_now);
buf.clear();
encoder.encode_metric(&event, &mut buf)?;
total_bytes += buf.len() as u64;
sink.write(&buf)?;
}
let count_sum_labels = if needs_clone {
let mut bl = (*prebuilt_count_sum_labels).clone();
for dl in ctx.dynamic_labels {
bl.insert(dl.key.clone(), dl.label_value_for_tick(ctx.tick));
}
for sw in ctx.spike_windows {
if is_in_spike(ctx.elapsed, sw) {
bl.insert(sw.label.clone(), sw.label_value_for_tick(ctx.tick));
}
}
Arc::new(bl)
} else {
Arc::clone(&prebuilt_count_sum_labels)
};
let count_event = MetricEvent::from_parts(
count_name.clone(),
sample.count as f64,
Arc::clone(&count_sum_labels),
wall_now,
);
buf.clear();
encoder.encode_metric(&count_event, &mut buf)?;
total_bytes += buf.len() as u64;
sink.write(&buf)?;
let sum_event =
MetricEvent::from_parts(sum_name.clone(), sample.sum, count_sum_labels, wall_now);
buf.clear();
encoder.encode_metric(&sum_event, &mut buf)?;
total_bytes += buf.len() as u64;
sink.write(&buf)?;
Ok(TickResult {
bytes_written: total_bytes,
metric_event: Some(count_event),
})
};
let stats_for_flush = stats.clone();
let loop_result = match gate_ctx {
None => core_loop::run_schedule_loop(
&schedule,
config.rate,
shutdown,
stats,
sink,
&mut tick_fn,
),
Some(ctx) => core_loop::gated_loop(
&schedule,
config.rate,
shutdown,
stats,
ctx,
sink,
&mut tick_fn,
),
};
let flush_result = sink.flush();
match loop_result {
Ok(()) => core_loop::apply_flush_policy(&schedule, stats_for_flush.as_ref(), flush_result),
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use crate::config::{BaseScheduleConfig, DistributionConfig, SummaryScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::sink::memory::MemorySink;
use crate::sink::SinkConfig;
fn make_config(
rate: f64,
duration: &str,
quantiles: Option<Vec<f64>>,
) -> SummaryScenarioConfig {
SummaryScenarioConfig {
base: BaseScheduleConfig {
name: "rpc_duration_seconds".to_string(),
rate,
duration: Some(duration.to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
quantiles,
distribution: DistributionConfig::Normal {
mean: 0.1,
stddev: 0.02,
},
observations_per_tick: Some(100),
mean_shift_per_sec: None,
seed: Some(42),
encoder: EncoderConfig::PrometheusText { precision: None },
}
}
#[test]
fn run_completes_for_short_duration() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("summary run must succeed");
assert!(!sink.buffer.is_empty(), "summary run must produce output");
}
#[test]
fn output_contains_quantile_count_sum_series() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
assert!(
output.contains("rpc_duration_seconds{"),
"output must contain base name quantile events"
);
assert!(
output.contains("rpc_duration_seconds_count"),
"output must contain _count events"
);
assert!(
output.contains("rpc_duration_seconds_sum"),
"output must contain _sum events"
);
}
#[test]
fn quantile_events_have_quantile_label() {
let config = make_config(50.0, "100ms", Some(vec![0.5, 0.99]));
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
assert!(
output.contains("quantile=\"0.5\""),
"output must contain quantile=\"0.5\""
);
assert!(
output.contains("quantile=\"0.99\""),
"output must contain quantile=\"0.99\""
);
}
#[test]
fn gap_suppresses_summary_output() {
let mut config = make_config(100.0, "2s", None);
config.base.gaps = Some(crate::config::GapConfig {
every: "1s".to_string(),
r#for: "500ms".to_string(),
});
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
assert!(
!sink.buffer.is_empty(),
"summary with gaps must still produce some output"
);
}
#[test]
fn config_labels_appear_in_output() {
let mut config = make_config(50.0, "100ms", Some(vec![0.5]));
let mut label_map = std::collections::HashMap::new();
label_map.insert("service".to_string(), "auth".to_string());
config.base.labels = Some(label_map);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
assert!(
output.contains("service=\"auth\""),
"config labels must appear in output"
);
}
}