dsfb_database/streaming.rs
1//! Phase-C2: streaming residual construction.
2//!
3//! The batch adapter path calls [`ResidualStream::push`] followed by
4//! [`ResidualStream::sort`]; it produces the bytewise-identical stream
5//! the four fingerprint locks pin. That path is unchanged and remains
6//! the canonical construction for reproduction.
7//!
8//! [`ResidualStream::push`]: crate::residual::ResidualStream::push
9//! [`ResidualStream::sort`]: crate::residual::ResidualStream::sort
10//!
11//! This module adds a **parallel, additive** API for a live-ingestion
12//! deployment where samples arrive one at a time and a single terminal
13//! `.sort()` call over a materialised 10⁶-sample buffer is not
14//! acceptable. The streaming path preserves time-ordering via a
15//! bounded **reorder buffer**: every incoming sample is staged in a
16//! small heap, and any sample older than `newest_t − reorder_window_s`
17//! is flushed to the underlying stream in sorted order. At
18//! [`StreamingIngestor::finish`] the remaining buffer tail is drained.
19//!
20//! The trade-off is explicit: if a sample arrives with a time delta
21//! greater than `reorder_window_s` behind the current newest sample,
22//! it is dropped and the drop is counted — `dropped_out_of_window` is
23//! part of the closing summary. A production deployment sizes
24//! `reorder_window_s` to be larger than the engine's maximum
25//! telemetry-pipeline jitter (we default to 10 s; PostgreSQL's
26//! `pg_stat_statements` polling cadence at 60 s makes 10 s a ~6×
27//! safety margin).
28//!
29//! Determinism: given the same input stream and `reorder_window_s`,
30//! the flushed sample order and the `dropped_out_of_window` count are
31//! deterministic. The streaming path is **not** expected to produce
32//! the same fingerprint as the batch path for real-world jitter-bearing
33//! inputs — that is the honest reason batch is pinned and streaming is
34//! parallel, not a replacement.
35
36use crate::residual::{ResidualSample, ResidualStream};
37use std::collections::BinaryHeap;
38
39/// Default reorder-buffer window in seconds. Sized for
40/// `pg_stat_statements`-class telemetry jitter. Tune up for slower
41/// engines, tune down for well-behaved log tails.
42pub const DEFAULT_REORDER_WINDOW_S: f64 = 10.0;
43
44/// Streaming ingestor that accepts one [`ResidualSample`] at a time and
45/// flushes a correctly-ordered prefix into an owned [`ResidualStream`]
46/// as the reorder window slides forward.
47///
48/// Invariant: after every [`StreamingIngestor::push`] or
49/// [`StreamingIngestor::finish`], the underlying
50/// `self.stream.samples` is time-ordered (`t` non-decreasing).
51pub struct StreamingIngestor {
52 stream: ResidualStream,
53 reorder_window_s: f64,
54 /// Min-heap on `t`; the `Reverse` wrapper is standard because
55 /// `BinaryHeap` is a max-heap by default.
56 buf: BinaryHeap<Staged>,
57 newest_t: f64,
58 /// Samples whose timestamp fell more than `reorder_window_s` behind
59 /// the already-flushed frontier at arrival. A production runbook
60 /// should alert on any non-zero value.
61 dropped_out_of_window: u64,
62}
63
64/// Heap entry: orders by `t` in reverse so `BinaryHeap::pop` returns
65/// the *oldest* sample. We compare strictly on `t`; `ResidualSample`
66/// is not `Ord`-worthy directly because `value` is `f64`, and we do
67/// not want a stable secondary key to leak into the ordering.
68struct Staged(ResidualSample);
69
70impl PartialEq for Staged {
71 fn eq(&self, other: &Self) -> bool {
72 self.0.t == other.0.t
73 }
74}
75impl Eq for Staged {}
76impl PartialOrd for Staged {
77 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
78 Some(self.cmp(other))
79 }
80}
81impl Ord for Staged {
82 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
83 // Reverse: smaller `t` is "greater" so pop() returns oldest.
84 other
85 .0
86 .t
87 .partial_cmp(&self.0.t)
88 .unwrap_or(std::cmp::Ordering::Equal)
89 }
90}
91
92impl StreamingIngestor {
93 /// Construct an ingestor with the default reorder window.
94 pub fn new(source: impl Into<String>) -> Self {
95 Self::with_window(source, DEFAULT_REORDER_WINDOW_S)
96 }
97
98 /// Construct an ingestor with a custom reorder window. A zero
99 /// window degenerates to strictly-in-order ingest (any
100 /// out-of-order sample is dropped).
101 pub fn with_window(source: impl Into<String>, reorder_window_s: f64) -> Self {
102 debug_assert!(
103 reorder_window_s >= 0.0 && reorder_window_s.is_finite(),
104 "reorder window must be finite and non-negative"
105 );
106 Self {
107 stream: ResidualStream::new(source),
108 reorder_window_s,
109 buf: BinaryHeap::new(),
110 newest_t: f64::NEG_INFINITY,
111 dropped_out_of_window: 0,
112 }
113 }
114
115 /// Ingest one sample. If `sample.t` is more than `reorder_window_s`
116 /// behind the already-flushed frontier, the sample is dropped and
117 /// `dropped_out_of_window` is incremented. Otherwise the sample is
118 /// staged and the internal buffer is drained up to the new
119 /// frontier.
120 pub fn push(&mut self, sample: ResidualSample) {
121 debug_assert!(sample.t.is_finite(), "residual t must be finite");
122 debug_assert!(sample.value.is_finite(), "residual value must be finite");
123 let flushed_frontier = self
124 .stream
125 .samples
126 .last()
127 .map(|s| s.t)
128 .unwrap_or(f64::NEG_INFINITY);
129 if sample.t + self.reorder_window_s < flushed_frontier {
130 self.dropped_out_of_window += 1;
131 return;
132 }
133 if sample.t > self.newest_t {
134 self.newest_t = sample.t;
135 }
136 self.buf.push(Staged(sample));
137 self.drain_ready();
138 }
139
140 /// Flush every staged sample whose `t` is at least
141 /// `reorder_window_s` behind the newest observed `t`.
142 fn drain_ready(&mut self) {
143 let cutoff = self.newest_t - self.reorder_window_s;
144 while let Some(top) = self.buf.peek() {
145 if top.0.t <= cutoff {
146 let Staged(s) = self.buf.pop().expect("peek succeeded");
147 self.stream.samples.push(s);
148 } else {
149 break;
150 }
151 }
152 }
153
154 /// Drain the reorder buffer and return the completed stream
155 /// together with the count of samples dropped during ingest.
156 pub fn finish(mut self) -> (ResidualStream, u64) {
157 while let Some(Staged(s)) = self.buf.pop() {
158 self.stream.samples.push(s);
159 }
160 debug_assert!(
161 self.stream.samples.windows(2).all(|w| w[0].t <= w[1].t),
162 "finish invariant: samples time-ordered"
163 );
164 (self.stream, self.dropped_out_of_window)
165 }
166
167 /// Number of samples already flushed to the owned stream.
168 pub fn flushed(&self) -> usize {
169 self.stream.samples.len()
170 }
171
172 /// Number of samples currently staged in the reorder buffer.
173 pub fn staged(&self) -> usize {
174 self.buf.len()
175 }
176
177 /// Running count of out-of-window drops. Production deployments
178 /// should expose this as a Prometheus counter (Phase-C4) and alert
179 /// on any non-zero value.
180 pub fn dropped_out_of_window(&self) -> u64 {
181 self.dropped_out_of_window
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::residual::ResidualClass;
189
190 fn s(t: f64, value: f64) -> ResidualSample {
191 ResidualSample::new(t, ResidualClass::PlanRegression, value)
192 }
193
194 #[test]
195 fn in_order_ingest_matches_batch() {
196 let mut ing = StreamingIngestor::with_window("test", 5.0);
197 for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0] {
198 ing.push(s(t, 0.1));
199 }
200 let (stream, dropped) = ing.finish();
201 assert_eq!(dropped, 0);
202 let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
203 assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0]);
204 }
205
206 #[test]
207 fn out_of_order_within_window_is_sorted() {
208 let mut ing = StreamingIngestor::with_window("test", 5.0);
209 // Jittered arrival order.
210 for t in [0.0, 2.0, 1.0, 4.0, 3.0, 5.0, 7.0, 6.0] {
211 ing.push(s(t, 0.1));
212 }
213 let (stream, dropped) = ing.finish();
214 assert_eq!(dropped, 0);
215 let ts: Vec<f64> = stream.samples.iter().map(|s| s.t).collect();
216 assert_eq!(ts, vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]);
217 }
218
219 #[test]
220 fn sample_older_than_window_is_dropped() {
221 let mut ing = StreamingIngestor::with_window("test", 1.0);
222 for t in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] {
223 ing.push(s(t, 0.1));
224 }
225 // This one is 9 s behind the frontier → dropped.
226 ing.push(s(1.0, 0.1));
227 let (_, dropped) = ing.finish();
228 assert_eq!(dropped, 1);
229 }
230
231 #[test]
232 fn empty_ingest_produces_empty_stream() {
233 let ing = StreamingIngestor::new("test");
234 let (stream, dropped) = ing.finish();
235 assert!(stream.samples.is_empty());
236 assert_eq!(dropped, 0);
237 }
238
239 #[test]
240 fn zero_window_drops_any_out_of_order() {
241 let mut ing = StreamingIngestor::with_window("test", 0.0);
242 ing.push(s(1.0, 0.1));
243 ing.push(s(2.0, 0.1));
244 ing.push(s(1.5, 0.1)); // exactly 0.5 behind frontier 2.0 → dropped
245 let (stream, dropped) = ing.finish();
246 assert_eq!(dropped, 1);
247 assert_eq!(stream.samples.len(), 2);
248 }
249}