use std::collections::VecDeque;
use std::num::NonZeroUsize;
use lru::LruCache;
use crate::normalize::NormalizedEvent;
#[derive(Debug, Clone)]
pub struct WindowConfig {
pub max_events_per_trace: usize,
pub trace_ttl_ms: u64,
pub max_active_traces: NonZeroUsize,
}
impl Default for WindowConfig {
fn default() -> Self {
Self {
max_events_per_trace: 1000,
trace_ttl_ms: 30_000,
max_active_traces: NonZeroUsize::new(10_000).expect("10000 > 0"),
}
}
}
struct TraceBuffer {
events: VecDeque<NormalizedEvent>,
last_seen_ms: u64,
}
pub struct TraceWindow {
config: WindowConfig,
traces: LruCache<String, TraceBuffer>,
}
impl TraceWindow {
#[must_use]
pub fn new(config: WindowConfig) -> Self {
let cap = config.max_active_traces;
Self {
config,
traces: LruCache::new(cap),
}
}
pub fn push(
&mut self,
event: NormalizedEvent,
now_ms: u64,
) -> Option<(String, Vec<NormalizedEvent>)> {
if let Some(buf) = self.traces.get_mut(event.event.trace_id.as_str()) {
buf.last_seen_ms = now_ms;
buf.events.push_back(event);
if buf.events.len() > self.config.max_events_per_trace {
buf.events.pop_front();
}
return None;
}
let trace_id = event.event.trace_id.clone();
let mut events = VecDeque::with_capacity(8);
events.push_back(event);
self.traces
.push(
trace_id,
TraceBuffer {
events,
last_seen_ms: now_ms,
},
)
.map(|(id, buf)| (id, Vec::from(buf.events)))
}
pub fn evict(&mut self, now_ms: u64) {
for key in self.collect_expired_keys(now_ms) {
self.traces.pop(&key);
}
}
pub fn evict_expired(&mut self, now_ms: u64) -> Vec<(String, Vec<NormalizedEvent>)> {
let expired_keys = self.collect_expired_keys(now_ms);
let mut expired = Vec::with_capacity(expired_keys.len());
for key in expired_keys {
if let Some((_id, buf)) = self.traces.pop_entry(&key) {
expired.push((key, Vec::from(buf.events)));
}
}
expired
}
fn collect_expired_keys(&self, now_ms: u64) -> Vec<String> {
let ttl = self.config.trace_ttl_ms;
self.traces
.iter()
.filter(|(_, buf)| now_ms.saturating_sub(buf.last_seen_ms) > ttl)
.map(|(id, _)| id.clone())
.collect()
}
pub fn drain_all(&mut self) -> Vec<(String, Vec<NormalizedEvent>)> {
let mut result = Vec::with_capacity(self.traces.len());
while let Some((id, buf)) = self.traces.pop_lru() {
result.push((id, Vec::from(buf.events)));
}
result
}
#[must_use]
pub fn active_traces(&self) -> usize {
self.traces.len()
}
#[must_use]
pub fn peek_clone(&self, trace_id: &str) -> Option<Vec<NormalizedEvent>> {
self.traces
.peek(trace_id)
.map(|buf| buf.events.iter().cloned().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::{EventSource, EventType, SpanEvent};
use crate::normalize;
fn make_event(trace_id: &str, target: &str) -> NormalizedEvent {
let event = SpanEvent {
timestamp: "2025-07-10T14:32:01.123Z".to_string(),
trace_id: trace_id.to_string(),
span_id: "span-1".to_string(),
parent_span_id: None,
service: "test".to_string(),
cloud_region: None,
event_type: EventType::Sql,
operation: "SELECT".to_string(),
target: target.to_string(),
duration_us: 100,
source: EventSource {
endpoint: "GET /test".to_string(),
method: "Test::test".to_string(),
},
status_code: None,
response_size_bytes: None,
code_function: None,
code_filepath: None,
code_lineno: None,
code_namespace: None,
};
normalize::normalize(event)
}
#[test]
fn accumulates_events_by_trace() {
let mut w = TraceWindow::new(WindowConfig::default());
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t1", "SELECT 2"), 10);
w.push(make_event("t2", "SELECT 3"), 20);
assert_eq!(w.active_traces(), 2);
let drained = w.drain_all();
let t1 = drained.iter().find(|(id, _)| id == "t1").unwrap();
assert_eq!(t1.1.len(), 2);
}
#[test]
fn ring_buffer_overflow() {
let config = WindowConfig {
max_events_per_trace: 3,
..Default::default()
};
let mut w = TraceWindow::new(config);
for i in 0..5 {
w.push(
make_event("t1", &format!("SELECT {i}")),
u64::try_from(i).unwrap(),
);
}
let drained = w.drain_all();
let t1 = drained.iter().find(|(id, _)| id == "t1").unwrap();
assert_eq!(t1.1.len(), 3);
assert_eq!(t1.1[0].event.target, "SELECT 2");
assert_eq!(t1.1[2].event.target, "SELECT 4");
}
#[test]
fn ttl_eviction() {
let config = WindowConfig {
trace_ttl_ms: 100,
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t2", "SELECT 2"), 50);
w.evict(150);
assert_eq!(w.active_traces(), 1);
let drained = w.drain_all();
assert_eq!(drained[0].0, "t2");
}
#[test]
fn lru_eviction() {
let config = WindowConfig {
max_active_traces: NonZeroUsize::new(2).unwrap(),
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t2", "SELECT 2"), 10);
let evicted = w.push(make_event("t3", "SELECT 3"), 20);
assert!(evicted.is_some());
assert_eq!(evicted.unwrap().0, "t1");
assert_eq!(w.active_traces(), 2);
assert!(w.traces.peek(&"t2".to_string()).is_some());
assert!(w.traces.peek(&"t3".to_string()).is_some());
assert!(w.traces.peek(&"t1".to_string()).is_none());
}
#[test]
fn drain_empties_window() {
let mut w = TraceWindow::new(WindowConfig::default());
w.push(make_event("t1", "SELECT 1"), 0);
let drained = w.drain_all();
assert_eq!(drained.len(), 1);
assert_eq!(w.active_traces(), 0);
}
#[test]
fn lru_touch_prevents_eviction() {
let config = WindowConfig {
max_active_traces: NonZeroUsize::new(2).unwrap(),
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t2", "SELECT 2"), 10);
w.push(make_event("t1", "SELECT 1b"), 20);
let evicted = w.push(make_event("t3", "SELECT 3"), 30);
assert!(evicted.is_some());
assert_eq!(evicted.unwrap().0, "t2");
assert_eq!(w.active_traces(), 2);
assert!(w.traces.peek(&"t1".to_string()).is_some());
assert!(w.traces.peek(&"t3".to_string()).is_some());
assert!(w.traces.peek(&"t2".to_string()).is_none());
}
#[test]
fn evict_on_empty_window() {
let mut w = TraceWindow::new(WindowConfig::default());
w.evict(1000);
assert_eq!(w.active_traces(), 0);
}
#[test]
fn ttl_evicts_all_expired() {
let config = WindowConfig {
trace_ttl_ms: 50,
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t2", "SELECT 2"), 10);
w.evict(200);
assert_eq!(w.active_traces(), 0);
}
#[test]
fn drain_empty_window() {
let mut w = TraceWindow::new(WindowConfig::default());
let drained = w.drain_all();
assert!(drained.is_empty());
}
#[test]
fn lru_eviction_chain() {
let config = WindowConfig {
max_active_traces: NonZeroUsize::new(1).unwrap(),
..Default::default()
};
let mut w = TraceWindow::new(config);
let evicted1 = w.push(make_event("t1", "SELECT 1"), 0);
assert!(evicted1.is_none());
let evicted2 = w.push(make_event("t2", "SELECT 2"), 10);
assert!(evicted2.is_some());
assert_eq!(evicted2.unwrap().0, "t1");
assert_eq!(w.active_traces(), 1);
assert!(w.traces.peek(&"t2".to_string()).is_some());
let evicted3 = w.push(make_event("t3", "SELECT 3"), 20);
assert!(evicted3.is_some());
assert_eq!(evicted3.unwrap().0, "t2");
assert_eq!(w.active_traces(), 1);
assert!(w.traces.peek(&"t3".to_string()).is_some());
}
#[test]
fn evict_expired_returns_traces() {
let config = WindowConfig {
trace_ttl_ms: 100,
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t2", "SELECT 2"), 50);
let expired = w.evict_expired(50);
assert!(expired.is_empty());
assert_eq!(w.active_traces(), 2);
let expired = w.evict_expired(150);
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].0, "t1");
assert_eq!(w.active_traces(), 1);
}
#[test]
fn push_returns_evicted_events() {
let config = WindowConfig {
max_active_traces: NonZeroUsize::new(1).unwrap(),
..Default::default()
};
let mut w = TraceWindow::new(config);
w.push(make_event("t1", "SELECT 1"), 0);
w.push(make_event("t1", "SELECT 2"), 5);
let evicted = w.push(make_event("t2", "SELECT 3"), 10);
assert!(evicted.is_some());
let (id, events) = evicted.unwrap();
assert_eq!(id, "t1");
assert_eq!(events.len(), 2); }
}