use crate::grammar::{Episode, MotifClass, MotifEngine, MotifGrammar};
use crate::residual::{ResidualSample, ResidualStream};
use std::collections::HashSet;
type EpisodeKey = (MotifClass, Option<String>, i64);
fn episode_key(ep: &Episode) -> EpisodeKey {
(ep.motif, ep.channel.clone(), (ep.t_start * 1000.0) as i64)
}
pub struct LiveEmitter {
buffer: ResidualStream,
engine: MotifEngine,
emitted: HashSet<EpisodeKey>,
retention_window_s: f64,
max_samples: usize,
}
impl LiveEmitter {
pub fn new(grammar: MotifGrammar, retention_window_s: f64, max_samples: usize) -> Self {
Self {
buffer: ResidualStream::new("live-postgres"),
engine: MotifEngine::new(grammar),
emitted: HashSet::new(),
retention_window_s,
max_samples,
}
}
pub fn push_samples(&mut self, samples: Vec<ResidualSample>) -> Vec<Episode> {
if samples.is_empty() {
return Vec::new();
}
for s in samples {
self.buffer.push(s);
}
self.buffer.sort();
let all = self.engine.run(&self.buffer);
let mut fresh = Vec::new();
for ep in all.iter() {
let key = episode_key(ep);
if self.emitted.insert(key) {
fresh.push(ep.clone());
}
}
self.trim(&all);
fresh
}
fn trim(&mut self, all_episodes: &[Episode]) {
let open_t: Option<f64> = all_episodes
.iter()
.filter(|e| !self.emitted.contains(&episode_key(e)))
.map(|e| e.t_start)
.reduce(f64::min);
let last_t = self.buffer.samples.last().map(|s| s.t).unwrap_or(0.0);
let retention_cutoff = last_t - self.retention_window_s;
let cutoff = match open_t {
Some(t) => t.min(retention_cutoff),
None => retention_cutoff,
};
self.buffer.samples.retain(|s| s.t >= cutoff);
if self.buffer.samples.len() > self.max_samples {
let excess = self.buffer.samples.len() - self.max_samples;
eprintln!(
"warning: live buffer exceeded max_samples ({}); dropping {} oldest samples — open episodes may be truncated",
self.max_samples, excess
);
self.buffer.samples.drain(0..excess);
}
}
pub fn buffer_len(&self) -> usize {
self.buffer.samples.len()
}
pub fn emitted_count(&self) -> usize {
self.emitted.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::residual::{plan_regression, ResidualClass};
fn grammar() -> MotifGrammar {
MotifGrammar::default()
}
#[test]
fn emits_episode_once_across_rescans() {
let mut em = LiveEmitter::new(grammar(), 3600.0, 1_000_000);
let mut total_emitted = 0;
for i in 0..60 {
let t = i as f64;
let baseline = 10.0;
let latency = if i >= 20 { 30.0 } else { baseline };
let mut tmp = ResidualStream::new("");
plan_regression::push_latency(&mut tmp, t, "qA", latency, baseline);
let new = em.push_samples(tmp.samples);
total_emitted += new.len();
}
assert!(total_emitted >= 1);
}
#[test]
fn emitter_is_idempotent_on_empty_push() {
let mut em = LiveEmitter::new(grammar(), 3600.0, 1_000_000);
let out = em.push_samples(Vec::new());
assert!(out.is_empty());
}
#[test]
fn trim_respects_retention_window() {
let mut em = LiveEmitter::new(grammar(), 10.0, 1_000_000);
for i in 0..50 {
let mut tmp = ResidualStream::new("");
plan_regression::push_latency(&mut tmp, i as f64, "qA", 10.0, 10.0);
em.push_samples(tmp.samples);
}
assert!(em.buffer_len() <= 15, "buffer should be trimmed to ~10 s: {}", em.buffer_len());
}
#[test]
fn residual_class_of_emitted_matches_motif_class() {
let mut em = LiveEmitter::new(grammar(), 3600.0, 1_000_000);
for i in 0..80 {
let t = i as f64;
let latency = if i >= 20 { 40.0 } else { 10.0 };
let mut tmp = ResidualStream::new("");
plan_regression::push_latency(&mut tmp, t, "qA", latency, 10.0);
for ep in em.push_samples(tmp.samples) {
assert_eq!(ep.motif.residual_class(), ResidualClass::PlanRegression);
}
}
}
}