use std::collections::VecDeque;
use serde::Serialize;
use crate::model::metric::MetricEvent;
pub const MAX_RECENT_METRICS: usize = 100;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum ScenarioState {
#[default]
Pending,
Running,
Paused,
Finished,
}
#[derive(Debug, Clone, Default, Serialize)]
#[non_exhaustive]
pub struct ScenarioStats {
pub total_events: u64,
pub bytes_emitted: u64,
pub current_rate: f64,
pub errors: u64,
pub in_gap: bool,
pub in_burst: bool,
pub in_cardinality_spike: bool,
pub consecutive_failures: u64,
pub total_sink_failures: u64,
pub last_sink_error: Option<String>,
pub last_successful_write_at: Option<u64>,
#[serde(skip)]
pub recent_metrics: VecDeque<MetricEvent>,
pub state: ScenarioState,
}
impl ScenarioStats {
pub fn push_metric(&mut self, event: MetricEvent) {
if self.recent_metrics.len() >= MAX_RECENT_METRICS {
self.recent_metrics.pop_front();
}
self.recent_metrics.push_back(event);
}
pub fn drain_recent_metrics(&mut self) -> Vec<MetricEvent> {
self.recent_metrics.drain(..).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_stats_has_zero_counters_and_false_flags() {
let s = ScenarioStats::default();
assert_eq!(s.total_events, 0, "total_events must start at zero");
assert_eq!(s.bytes_emitted, 0, "bytes_emitted must start at zero");
assert_eq!(s.current_rate, 0.0, "current_rate must start at zero");
assert_eq!(s.errors, 0, "errors must start at zero");
assert!(!s.in_gap, "in_gap must start as false");
assert!(!s.in_burst, "in_burst must start as false");
assert!(
!s.in_cardinality_spike,
"in_cardinality_spike must start as false"
);
assert_eq!(s.consecutive_failures, 0);
assert_eq!(s.total_sink_failures, 0);
assert!(s.last_sink_error.is_none());
assert!(s.last_successful_write_at.is_none());
}
#[test]
fn sink_failure_fields_serialize_to_json() {
let s = ScenarioStats {
consecutive_failures: 3,
total_sink_failures: 12,
last_sink_error: Some("connection refused".to_string()),
last_successful_write_at: Some(1_700_000_000_000_000_000),
..Default::default()
};
let json = serde_json::to_string(&s).expect("must serialize");
assert!(json.contains("\"consecutive_failures\":3"));
assert!(json.contains("\"total_sink_failures\":12"));
assert!(json.contains("\"last_sink_error\":\"connection refused\""));
assert!(json.contains("\"last_successful_write_at\":1700000000000000000"));
}
#[test]
fn last_sink_error_serializes_as_null_when_none() {
let s = ScenarioStats::default();
let json = serde_json::to_string(&s).expect("must serialize");
assert!(json.contains("\"last_sink_error\":null"));
assert!(json.contains("\"last_successful_write_at\":null"));
}
#[test]
fn clone_produces_independent_copy() {
let original = ScenarioStats {
total_events: 42,
bytes_emitted: 1024,
current_rate: 10.5,
errors: 3,
in_gap: true,
in_burst: false,
..Default::default()
};
let mut cloned = original.clone();
cloned.total_events = 99;
cloned.in_gap = false;
assert_eq!(original.total_events, 42);
assert!(original.in_gap);
assert_eq!(cloned.total_events, 99);
assert!(!cloned.in_gap);
}
#[test]
fn debug_format_contains_struct_name() {
let s = ScenarioStats::default();
let debug_str = format!("{s:?}");
assert!(
debug_str.contains("ScenarioStats"),
"Debug output must name the struct, got: {debug_str}"
);
}
#[test]
fn serializes_to_json_with_all_fields_present() {
let s = ScenarioStats {
total_events: 7,
bytes_emitted: 512,
current_rate: 3.14,
errors: 1,
in_gap: false,
in_burst: true,
..Default::default()
};
let json = serde_json::to_string(&s).expect("ScenarioStats must serialize to JSON");
assert!(
json.contains("\"total_events\""),
"JSON must contain total_events"
);
assert!(
json.contains("\"bytes_emitted\""),
"JSON must contain bytes_emitted"
);
assert!(
json.contains("\"current_rate\""),
"JSON must contain current_rate"
);
assert!(json.contains("\"errors\""), "JSON must contain errors");
assert!(json.contains("\"in_gap\""), "JSON must contain in_gap");
assert!(json.contains("\"in_burst\""), "JSON must contain in_burst");
assert!(
json.contains("\"in_cardinality_spike\""),
"JSON must contain in_cardinality_spike"
);
}
#[test]
fn scenario_stats_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ScenarioStats>();
}
#[test]
fn default_stats_has_empty_recent_metrics_buffer() {
let s = ScenarioStats::default();
assert!(
s.recent_metrics.is_empty(),
"recent_metrics buffer must be empty on default construction"
);
}
fn make_metric_event(name: &str, value: f64) -> crate::model::metric::MetricEvent {
crate::model::metric::MetricEvent::new(
name.to_string(),
value,
crate::model::metric::Labels::default(),
)
.expect("test metric name must be valid")
}
#[test]
fn push_metric_adds_event_to_buffer() {
let mut s = ScenarioStats::default();
let event = make_metric_event("up", 1.0);
s.push_metric(event);
assert_eq!(
s.recent_metrics.len(),
1,
"buffer must contain exactly 1 event after one push"
);
}
#[test]
fn push_metric_preserves_insertion_order() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 10.0));
s.push_metric(make_metric_event("up", 20.0));
s.push_metric(make_metric_event("up", 30.0));
assert_eq!(s.recent_metrics.len(), 3);
assert_eq!(
s.recent_metrics[0].value, 10.0,
"first event must be the oldest (value=10.0)"
);
assert_eq!(
s.recent_metrics[1].value, 20.0,
"second event must be value=20.0"
);
assert_eq!(
s.recent_metrics[2].value, 30.0,
"third event must be the newest (value=30.0)"
);
}
#[test]
fn push_metric_fills_buffer_to_max_capacity() {
let mut s = ScenarioStats::default();
for i in 0..MAX_RECENT_METRICS {
s.push_metric(make_metric_event("up", i as f64));
}
assert_eq!(
s.recent_metrics.len(),
MAX_RECENT_METRICS,
"buffer must hold exactly MAX_RECENT_METRICS events"
);
}
#[test]
fn push_metric_evicts_oldest_when_full() {
let mut s = ScenarioStats::default();
for i in 0..MAX_RECENT_METRICS {
s.push_metric(make_metric_event("up", i as f64));
}
assert_eq!(s.recent_metrics.len(), MAX_RECENT_METRICS);
assert_eq!(
s.recent_metrics.front().unwrap().value,
0.0,
"oldest event before eviction must be value=0.0"
);
s.push_metric(make_metric_event("up", 999.0));
assert_eq!(
s.recent_metrics.len(),
MAX_RECENT_METRICS,
"buffer must not grow beyond MAX_RECENT_METRICS after eviction"
);
assert_eq!(
s.recent_metrics.front().unwrap().value,
1.0,
"oldest event after eviction must be value=1.0"
);
assert_eq!(
s.recent_metrics.back().unwrap().value,
999.0,
"newest event after eviction must be value=999.0"
);
}
#[test]
fn push_metric_multiple_evictions_discard_oldest() {
let mut s = ScenarioStats::default();
let total = MAX_RECENT_METRICS + 5;
for i in 0..total {
s.push_metric(make_metric_event("up", i as f64));
}
assert_eq!(s.recent_metrics.len(), MAX_RECENT_METRICS);
assert_eq!(
s.recent_metrics.front().unwrap().value,
5.0,
"after MAX+5 pushes, oldest event must be value=5.0"
);
assert_eq!(
s.recent_metrics.back().unwrap().value,
(total - 1) as f64,
"newest event must be the last pushed value"
);
}
#[test]
fn drain_recent_metrics_returns_all_events_and_empties_buffer() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 1.0));
s.push_metric(make_metric_event("up", 2.0));
s.push_metric(make_metric_event("up", 3.0));
let drained = s.drain_recent_metrics();
assert_eq!(drained.len(), 3, "drain must return all 3 buffered events");
assert!(
s.recent_metrics.is_empty(),
"buffer must be empty after drain"
);
}
#[test]
fn drain_recent_metrics_returns_oldest_first_order() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 100.0));
s.push_metric(make_metric_event("up", 200.0));
s.push_metric(make_metric_event("up", 300.0));
let drained = s.drain_recent_metrics();
assert_eq!(drained[0].value, 100.0, "first drained must be oldest");
assert_eq!(drained[1].value, 200.0, "second drained must be middle");
assert_eq!(drained[2].value, 300.0, "third drained must be newest");
}
#[test]
fn drain_recent_metrics_on_empty_buffer_returns_empty_vec() {
let mut s = ScenarioStats::default();
let drained = s.drain_recent_metrics();
assert!(
drained.is_empty(),
"draining an empty buffer must return an empty Vec"
);
}
#[test]
fn drain_then_push_starts_fresh_buffer() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 1.0));
s.push_metric(make_metric_event("up", 2.0));
let first_drain = s.drain_recent_metrics();
assert_eq!(first_drain.len(), 2);
assert!(s.recent_metrics.is_empty());
s.push_metric(make_metric_event("up", 10.0));
assert_eq!(s.recent_metrics.len(), 1);
let second_drain = s.drain_recent_metrics();
assert_eq!(second_drain.len(), 1);
assert_eq!(
second_drain[0].value, 10.0,
"new event after drain must be retrievable"
);
}
#[test]
fn drain_twice_returns_empty_on_second_call() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 42.0));
let first = s.drain_recent_metrics();
assert_eq!(first.len(), 1);
let second = s.drain_recent_metrics();
assert!(
second.is_empty(),
"second drain must return empty Vec after first drain consumed all events"
);
}
#[test]
fn recent_metrics_buffer_is_not_serialized_to_json() {
let mut s = ScenarioStats::default();
s.push_metric(make_metric_event("up", 1.0));
s.push_metric(make_metric_event("up", 2.0));
let json = serde_json::to_string(&s).expect("must serialize");
assert!(
!json.contains("recent_metrics"),
"recent_metrics must not appear in JSON output (serde skip): {json}"
);
}
}