use crate::residual::{ResidualSample, ResidualStream};
use std::collections::BinaryHeap;
pub const DEFAULT_REORDER_WINDOW_S: f64 = 10.0;
pub struct StreamingIngestor {
stream: ResidualStream,
reorder_window_s: f64,
buf: BinaryHeap<Staged>,
newest_t: f64,
dropped_out_of_window: u64,
}
struct Staged(ResidualSample);
impl PartialEq for Staged {
fn eq(&self, other: &Self) -> bool {
self.0.t == other.0.t
}
}
impl Eq for Staged {}
impl PartialOrd for Staged {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Staged {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other
.0
.t
.partial_cmp(&self.0.t)
.unwrap_or(std::cmp::Ordering::Equal)
}
}
impl StreamingIngestor {
pub fn new(source: impl Into<String>) -> Self {
Self::with_window(source, DEFAULT_REORDER_WINDOW_S)
}
pub fn with_window(source: impl Into<String>, reorder_window_s: f64) -> Self {
debug_assert!(
reorder_window_s >= 0.0 && reorder_window_s.is_finite(),
"reorder window must be finite and non-negative"
);
Self {
stream: ResidualStream::new(source),
reorder_window_s,
buf: BinaryHeap::new(),
newest_t: f64::NEG_INFINITY,
dropped_out_of_window: 0,
}
}
pub fn push(&mut self, sample: ResidualSample) {
debug_assert!(sample.t.is_finite(), "residual t must be finite");
debug_assert!(sample.value.is_finite(), "residual value must be finite");
let flushed_frontier = self
.stream
.samples
.last()
.map(|s| s.t)
.unwrap_or(f64::NEG_INFINITY);
if sample.t + self.reorder_window_s < flushed_frontier {
self.dropped_out_of_window += 1;
return;
}
if sample.t > self.newest_t {
self.newest_t = sample.t;
}
self.buf.push(Staged(sample));
self.drain_ready();
}
fn drain_ready(&mut self) {
let cutoff = self.newest_t - self.reorder_window_s;
while let Some(top) = self.buf.peek() {
if top.0.t <= cutoff {
let Staged(s) = self.buf.pop().expect("peek succeeded");
self.stream.samples.push(s);
} else {
break;
}
}
}
pub fn finish(mut self) -> (ResidualStream, u64) {
while let Some(Staged(s)) = self.buf.pop() {
self.stream.samples.push(s);
}
debug_assert!(
self.stream.samples.windows(2).all(|w| w[0].t <= w[1].t),
"finish invariant: samples time-ordered"
);
(self.stream, self.dropped_out_of_window)
}
pub fn flushed(&self) -> usize {
self.stream.samples.len()
}
pub fn staged(&self) -> usize {
self.buf.len()
}
pub fn dropped_out_of_window(&self) -> u64 {
self.dropped_out_of_window
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::residual::ResidualClass;
fn s(t: f64, value: f64) -> ResidualSample {
ResidualSample::new(t, ResidualClass::PlanRegression, value)
}
#[test]
fn in_order_ingest_matches_batch() {
let mut ing = StreamingIngestor::with_window("test", 5.0);
for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0] {
ing.push(s(t, 0.1));
}
let (stream, dropped) = ing.finish();
assert_eq!(dropped, 0);
let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0]);
}
#[test]
fn out_of_order_within_window_is_sorted() {
let mut ing = StreamingIngestor::with_window("test", 5.0);
for t in [0.0, 2.0, 1.0, 4.0, 3.0, 5.0, 7.0, 6.0] {
ing.push(s(t, 0.1));
}
let (stream, dropped) = ing.finish();
assert_eq!(dropped, 0);
let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]);
}
#[test]
fn sample_older_than_window_is_dropped() {
let mut ing = StreamingIngestor::with_window("test", 1.0);
for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] {
ing.push(s(t, 0.1));
}
ing.push(s(1.0, 0.1));
let (_, dropped) = ing.finish();
assert_eq!(dropped, 1);
}
#[test]
fn empty_ingest_produces_empty_stream() {
let ing = StreamingIngestor::new("test");
let (stream, dropped) = ing.finish();
assert!(stream.samples.is_empty());
assert_eq!(dropped, 0);
}
#[test]
fn zero_window_drops_any_out_of_order() {
let mut ing = StreamingIngestor::with_window("test", 0.0);
ing.push(s(1.0, 0.1));
ing.push(s(2.0, 0.1));
ing.push(s(1.5, 0.1)); let (stream, dropped) = ing.finish();
assert_eq!(dropped, 1);
assert_eq!(stream.samples.len(), 2);
}
}