Skip to main content

dsfb_database/
streaming.rs

1//! Phase-C2: streaming residual construction.
2//!
3//! The batch adapter path calls [`ResidualStream::push`] followed by
4//! [`ResidualStream::sort`]; it produces the bytewise-identical stream
5//! the four fingerprint locks pin. That path is unchanged and remains
6//! the canonical construction for reproduction.
7//!
8//! [`ResidualStream::push`]: crate::residual::ResidualStream::push
9//! [`ResidualStream::sort`]: crate::residual::ResidualStream::sort
10//!
11//! This module adds a **parallel, additive** API for a live-ingestion
12//! deployment where samples arrive one at a time and a single terminal
13//! `.sort()` call over a materialised 10⁶-sample buffer is not
14//! acceptable. The streaming path preserves time-ordering via a
15//! bounded **reorder buffer**: every incoming sample is staged in a
16//! small heap, and any sample older than `newest_t − reorder_window_s`
17//! is flushed to the underlying stream in sorted order. At
18//! [`StreamingIngestor::finish`] the remaining buffer tail is drained.
19//!
20//! The trade-off is explicit: if a sample arrives with a time delta
21//! greater than `reorder_window_s` behind the current newest sample,
22//! it is dropped and the drop is counted — `dropped_out_of_window` is
23//! part of the closing summary. A production deployment sizes
24//! `reorder_window_s` to be larger than the engine's maximum
25//! telemetry-pipeline jitter (we default to 10 s; PostgreSQL's
26//! `pg_stat_statements` polling cadence at 60 s makes 10 s a ~6×
27//! safety margin).
28//!
29//! Determinism: given the same input stream and `reorder_window_s`,
30//! the flushed sample order and the `dropped_out_of_window` count are
31//! deterministic. The streaming path is **not** expected to produce
32//! the same fingerprint as the batch path for real-world jitter-bearing
33//! inputs — that is the honest reason batch is pinned and streaming is
34//! parallel, not a replacement.
35
36use crate::residual::{ResidualSample, ResidualStream};
37use std::collections::BinaryHeap;
38
39/// Default reorder-buffer window in seconds. Sized for
40/// `pg_stat_statements`-class telemetry jitter. Tune up for slower
41/// engines, tune down for well-behaved log tails.
42pub const DEFAULT_REORDER_WINDOW_S: f64 = 10.0;
43
44/// Streaming ingestor that accepts one [`ResidualSample`] at a time and
45/// flushes a correctly-ordered prefix into an owned [`ResidualStream`]
46/// as the reorder window slides forward.
47///
48/// Invariant: after every [`StreamingIngestor::push`] or
49/// [`StreamingIngestor::finish`], the underlying
50/// `self.stream.samples` is time-ordered (`t` non-decreasing).
51pub struct StreamingIngestor {
52    stream: ResidualStream,
53    reorder_window_s: f64,
54    /// Min-heap on `t`; the `Reverse` wrapper is standard because
55    /// `BinaryHeap` is a max-heap by default.
56    buf: BinaryHeap<Staged>,
57    newest_t: f64,
58    /// Samples whose timestamp fell more than `reorder_window_s` behind
59    /// the already-flushed frontier at arrival. A production runbook
60    /// should alert on any non-zero value.
61    dropped_out_of_window: u64,
62}
63
64/// Heap entry: orders by `t` in reverse so `BinaryHeap::pop` returns
65/// the *oldest* sample. We compare strictly on `t`; `ResidualSample`
66/// is not `Ord`-worthy directly because `value` is `f64`, and we do
67/// not want a stable secondary key to leak into the ordering.
68struct Staged(ResidualSample);
69
70impl PartialEq for Staged {
71    fn eq(&self, other: &Self) -> bool {
72        self.0.t == other.0.t
73    }
74}
75impl Eq for Staged {}
76impl PartialOrd for Staged {
77    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
78        Some(self.cmp(other))
79    }
80}
81impl Ord for Staged {
82    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
83        // Reverse: smaller `t` is "greater" so pop() returns oldest.
84        other
85            .0
86            .t
87            .partial_cmp(&self.0.t)
88            .unwrap_or(std::cmp::Ordering::Equal)
89    }
90}
91
92impl StreamingIngestor {
93    /// Construct an ingestor with the default reorder window.
94    pub fn new(source: impl Into<String>) -> Self {
95        Self::with_window(source, DEFAULT_REORDER_WINDOW_S)
96    }
97
98    /// Construct an ingestor with a custom reorder window. A zero
99    /// window degenerates to strictly-in-order ingest (any
100    /// out-of-order sample is dropped).
101    pub fn with_window(source: impl Into<String>, reorder_window_s: f64) -> Self {
102        debug_assert!(
103            reorder_window_s >= 0.0 && reorder_window_s.is_finite(),
104            "reorder window must be finite and non-negative"
105        );
106        Self {
107            stream: ResidualStream::new(source),
108            reorder_window_s,
109            buf: BinaryHeap::new(),
110            newest_t: f64::NEG_INFINITY,
111            dropped_out_of_window: 0,
112        }
113    }
114
115    /// Ingest one sample. If `sample.t` is more than `reorder_window_s`
116    /// behind the already-flushed frontier, the sample is dropped and
117    /// `dropped_out_of_window` is incremented. Otherwise the sample is
118    /// staged and the internal buffer is drained up to the new
119    /// frontier.
120    pub fn push(&mut self, sample: ResidualSample) {
121        debug_assert!(sample.t.is_finite(), "residual t must be finite");
122        debug_assert!(sample.value.is_finite(), "residual value must be finite");
123        let flushed_frontier = self
124            .stream
125            .samples
126            .last()
127            .map(|s| s.t)
128            .unwrap_or(f64::NEG_INFINITY);
129        if sample.t + self.reorder_window_s < flushed_frontier {
130            self.dropped_out_of_window += 1;
131            return;
132        }
133        if sample.t > self.newest_t {
134            self.newest_t = sample.t;
135        }
136        self.buf.push(Staged(sample));
137        self.drain_ready();
138    }
139
140    /// Flush every staged sample whose `t` is at least
141    /// `reorder_window_s` behind the newest observed `t`.
142    fn drain_ready(&mut self) {
143        let cutoff = self.newest_t - self.reorder_window_s;
144        while let Some(top) = self.buf.peek() {
145            if top.0.t <= cutoff {
146                let Staged(s) = self.buf.pop().expect("peek succeeded");
147                self.stream.samples.push(s);
148            } else {
149                break;
150            }
151        }
152    }
153
154    /// Drain the reorder buffer and return the completed stream
155    /// together with the count of samples dropped during ingest.
156    pub fn finish(mut self) -> (ResidualStream, u64) {
157        while let Some(Staged(s)) = self.buf.pop() {
158            self.stream.samples.push(s);
159        }
160        debug_assert!(
161            self.stream.samples.windows(2).all(|w| w[0].t <= w[1].t),
162            "finish invariant: samples time-ordered"
163        );
164        (self.stream, self.dropped_out_of_window)
165    }
166
167    /// Number of samples already flushed to the owned stream.
168    pub fn flushed(&self) -> usize {
169        self.stream.samples.len()
170    }
171
172    /// Number of samples currently staged in the reorder buffer.
173    pub fn staged(&self) -> usize {
174        self.buf.len()
175    }
176
177    /// Running count of out-of-window drops. Production deployments
178    /// should expose this as a Prometheus counter (Phase-C4) and alert
179    /// on any non-zero value.
180    pub fn dropped_out_of_window(&self) -> u64 {
181        self.dropped_out_of_window
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::residual::ResidualClass;
189
190    fn s(t: f64, value: f64) -> ResidualSample {
191        ResidualSample::new(t, ResidualClass::PlanRegression, value)
192    }
193
194    #[test]
195    fn in_order_ingest_matches_batch() {
196        let mut ing = StreamingIngestor::with_window("test", 5.0);
197        for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0] {
198            ing.push(s(t, 0.1));
199        }
200        let (stream, dropped) = ing.finish();
201        assert_eq!(dropped, 0);
202        let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
203        assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0]);
204    }
205
206    #[test]
207    fn out_of_order_within_window_is_sorted() {
208        let mut ing = StreamingIngestor::with_window("test", 5.0);
209        // Jittered arrival order.
210        for t in [0.0, 2.0, 1.0, 4.0, 3.0, 5.0, 7.0, 6.0] {
211            ing.push(s(t, 0.1));
212        }
213        let (stream, dropped) = ing.finish();
214        assert_eq!(dropped, 0);
215        let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
216        assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]);
217    }
218
219    #[test]
220    fn sample_older_than_window_is_dropped() {
221        let mut ing = StreamingIngestor::with_window("test", 1.0);
222        for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] {
223            ing.push(s(t, 0.1));
224        }
225        // This one is 9 s behind the frontier → dropped.
226        ing.push(s(1.0, 0.1));
227        let (_, dropped) = ing.finish();
228        assert_eq!(dropped, 1);
229    }
230
231    #[test]
232    fn empty_ingest_produces_empty_stream() {
233        let ing = StreamingIngestor::new("test");
234        let (stream, dropped) = ing.finish();
235        assert!(stream.samples.is_empty());
236        assert_eq!(dropped, 0);
237    }
238
239    #[test]
240    fn zero_window_drops_any_out_of_order() {
241        let mut ing = StreamingIngestor::with_window("test", 0.0);
242        ing.push(s(1.0, 0.1));
243        ing.push(s(2.0, 0.1));
244        ing.push(s(1.5, 0.1)); // exactly 0.5 behind frontier 2.0 → dropped
245        let (stream, dropped) = ing.finish();
246        assert_eq!(dropped, 1);
247        assert_eq!(stream.samples.len(), 2);
248    }
249}