Skip to main content

dsfb_gray/
residual.rs

1//! Residual sign computation: the primary inferential object.
2//!
3//! A [`ResidualSign`] encodes the instantaneous structural state of a residual
4//! trajectory at a single observation point. It captures not just the magnitude
5//! of deviation (which scalar thresholds already use) but the **direction**
6//! (drift), **curvature** (slew), and **temporal persistence** of that deviation.
7//!
8//! This is the core insight of DSFB: residuals carry structured temporal
9//! information that scalar alarm methods discard.
10
11/// Source type for a residual measurement.
12///
13/// Each variant represents a distinct telemetry channel from which residuals
14/// are derived. The observer computes residual signs independently per source,
15/// then correlates across sources in the grammar layer.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ResidualSource {
18    /// Request/response latency (e.g., p50, p95, p99 in nanoseconds).
19    Latency,
20    /// Operations per second or bytes per second throughput.
21    Throughput,
22    /// Error rate (errors per total requests in a window).
23    ErrorRate,
24    /// Queue depth for bounded channels or task queues.
25    QueueDepth,
26    /// Consensus heartbeat round-trip time (e.g., Raft leader→follower).
27    HeartbeatRtt,
28    /// Async runtime task poll duration.
29    PollDuration,
30    /// Resident set size or heap allocation rate.
31    MemoryUsage,
32    /// Serialization/deserialization throughput or latency.
33    SerdeLatency,
34    /// gRPC/HTTP2 flow control window utilization.
35    FlowControlWindow,
36    /// DNS resolution latency.
37    DnsLatency,
38    /// Custom user-defined source with a static label.
39    Custom(&'static str),
40}
41
42/// A single residual measurement sample from the observed system.
43///
44/// This is the **input** to the DSFB observer. It is accepted as an immutable
45/// reference — no mutation of the sample or its source is possible through
46/// this type.
47#[derive(Debug, Clone, Copy)]
48pub struct ResidualSample {
49    /// The measured value (e.g., latency in nanoseconds).
50    pub value: f64,
51    /// The expected/baseline value for this source under current regime.
52    pub baseline: f64,
53    /// Monotonic timestamp in nanoseconds (or cycle index).
54    pub timestamp_ns: u64,
55    /// Which telemetry channel produced this sample.
56    pub source: ResidualSource,
57}
58
59impl ResidualSample {
60    /// Compute the raw residual: measured minus baseline.
61    #[inline]
62    pub fn residual(&self) -> f64 {
63        self.value - self.baseline
64    }
65}
66
67/// The residual sign: DSFB's primary inferential object.
68///
69/// Captures the instantaneous structural state of a residual trajectory:
70/// - `residual` (r): raw deviation from baseline
71/// - `drift` (ω): first derivative — direction and rate of change
72/// - `slew` (α): second derivative — curvature / acceleration of change
73///
74/// These three quantities, together with the admissibility envelope and
75/// grammar state, fully determine the structural interpretation.
76#[derive(Debug, Clone, Copy)]
77pub struct ResidualSign {
78    /// Raw residual: r(k) = measured(k) - baseline(k).
79    pub residual: f64,
80    /// Drift: estimated first derivative of the residual trajectory.
81    /// Positive drift = residual growing (degradation direction).
82    pub drift: f64,
83    /// Slew: estimated second derivative of the residual trajectory.
84    /// Positive slew = drift accelerating (worsening curvature).
85    pub slew: f64,
86    /// Monotonic timestamp (nanoseconds) of this observation.
87    pub timestamp_ns: u64,
88    /// Source channel that produced this residual.
89    pub source: ResidualSource,
90}
91
92/// Windowed residual sign estimator.
93///
94/// Computes drift and slew from a sliding window of residual samples
95/// using finite differences. The window size `P` controls the persistence
96/// requirement: drift must be sustained over P samples to register.
97///
98/// ## Failure Mode FM-02: Insufficient Persistence Window
99///
100/// If P is too small, transient noise triggers drift detection.
101/// If P is too large, early degradation is filtered out.
102/// The recommended range is P ∈ [20, 100] for typical distributed
103/// system telemetry at 1-second sampling intervals.
104pub struct ResidualEstimator {
105    /// Ring buffer of recent residuals (fixed-size, stack-allocated).
106    window: [f64; 128],
107    /// Ring buffer of timestamps.
108    timestamps: [u64; 128],
109    /// Current write position in the ring buffer.
110    head: usize,
111    /// Number of samples received (saturates at window size).
112    count: usize,
113    /// Persistence window size (how many samples for drift/slew estimation).
114    persistence_window: usize,
115    /// Source channel this estimator tracks.
116    source: ResidualSource,
117}
118
119impl ResidualEstimator {
120    /// Create a new estimator for the given source with persistence window P.
121    ///
122    /// # Panics
123    ///
124    /// Panics if `persistence_window` is 0 or greater than 128.
125    pub fn new(source: ResidualSource, persistence_window: usize) -> Self {
126        assert!(
127            persistence_window > 0 && persistence_window <= 128,
128            "Persistence window must be in [1, 128], got {}",
129            persistence_window
130        );
131        Self {
132            window: [0.0; 128],
133            timestamps: [0; 128],
134            head: 0,
135            count: 0,
136            persistence_window,
137            source,
138        }
139    }
140
141    /// Ingest a new sample and compute the current residual sign.
142    ///
143    /// Accepts the sample as an immutable reference — the observer never
144    /// modifies the sample or its source.
145    pub fn observe(&mut self, sample: &ResidualSample) -> ResidualSign {
146        let r = sample.residual();
147
148        // Write into ring buffer
149        self.window[self.head] = r;
150        self.timestamps[self.head] = sample.timestamp_ns;
151        self.head = (self.head + 1) % 128;
152        if self.count < 128 {
153            self.count += 1;
154        }
155
156        let drift = self.estimate_drift();
157        let slew = self.estimate_slew();
158
159        ResidualSign {
160            residual: r,
161            drift,
162            slew,
163            timestamp_ns: sample.timestamp_ns,
164            source: self.source,
165        }
166    }
167
168    /// Estimate drift (first derivative) via least-squares slope over the
169    /// persistence window.
170    ///
171    /// Uses the standard formula: slope = (n·Σ(xy) - Σx·Σy) / (n·Σ(x²) - (Σx)²)
172    /// where x is the sample index and y is the residual value.
173    fn estimate_drift(&self) -> f64 {
174        let n = self.count.min(self.persistence_window);
175        if n < 2 {
176            return 0.0;
177        }
178
179        let mut sum_x: f64 = 0.0;
180        let mut sum_y: f64 = 0.0;
181        let mut sum_xy: f64 = 0.0;
182        let mut sum_x2: f64 = 0.0;
183
184        for i in 0..n {
185            let idx = if self.head >= n {
186                self.head - n + i
187            } else {
188                (128 + self.head - n + i) % 128
189            };
190            let x = i as f64;
191            let y = self.window[idx];
192            sum_x += x;
193            sum_y += y;
194            sum_xy += x * y;
195            sum_x2 += x * x;
196        }
197
198        let nf = n as f64;
199        let denom = nf * sum_x2 - sum_x * sum_x;
200        if denom.abs() < 1e-15 {
201            return 0.0;
202        }
203        (nf * sum_xy - sum_x * sum_y) / denom
204    }
205
206    /// Estimate slew (second derivative) via difference of two half-window drifts.
207    ///
208    /// Splits the persistence window into two halves, computes drift on each,
209    /// and reports the difference. This is a robust finite-difference approximation
210    /// that avoids noise amplification from direct second differences.
211    fn estimate_slew(&self) -> f64 {
212        let n = self.count.min(self.persistence_window);
213        if n < 4 {
214            return 0.0;
215        }
216
217        let half = n / 2;
218
219        // Drift of first half
220        let drift_first = self.half_window_drift(0, half);
221        // Drift of second half
222        let drift_second = self.half_window_drift(half, n);
223
224        // Slew is the change in drift
225        drift_second - drift_first
226    }
227
228    /// Compute least-squares drift on a sub-window [start_offset, end_offset)
229    /// relative to the current persistence window.
230    fn half_window_drift(&self, start_offset: usize, end_offset: usize) -> f64 {
231        let n = self.count.min(self.persistence_window);
232        let sub_n = end_offset - start_offset;
233        if sub_n < 2 {
234            return 0.0;
235        }
236
237        let mut sum_x: f64 = 0.0;
238        let mut sum_y: f64 = 0.0;
239        let mut sum_xy: f64 = 0.0;
240        let mut sum_x2: f64 = 0.0;
241
242        for i in 0..sub_n {
243            let global_i = start_offset + i;
244            let idx = if self.head >= n {
245                self.head - n + global_i
246            } else {
247                (128 + self.head - n + global_i) % 128
248            };
249            let x = i as f64;
250            let y = self.window[idx];
251            sum_x += x;
252            sum_y += y;
253            sum_xy += x * y;
254            sum_x2 += x * x;
255        }
256
257        let nf = sub_n as f64;
258        let denom = nf * sum_x2 - sum_x * sum_x;
259        if denom.abs() < 1e-15 {
260            return 0.0;
261        }
262        (nf * sum_xy - sum_x * sum_y) / denom
263    }
264
265    /// Reset the estimator state. Used when the observed system restarts
266    /// or when a new workload phase begins.
267    pub fn reset(&mut self) {
268        self.window = [0.0; 128];
269        self.timestamps = [0; 128];
270        self.head = 0;
271        self.count = 0;
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_zero_residual_produces_zero_drift_slew() {
281        let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
282        for i in 0..30 {
283            let sample = ResidualSample {
284                value: 100.0,
285                baseline: 100.0,
286                timestamp_ns: i * 1_000_000_000,
287                source: ResidualSource::Latency,
288            };
289            let sign = est.observe(&sample);
290            assert!((sign.residual).abs() < 1e-10);
291        }
292        // After enough samples, drift and slew should be ~0
293        let sample = ResidualSample {
294            value: 100.0,
295            baseline: 100.0,
296            timestamp_ns: 30_000_000_000,
297            source: ResidualSource::Latency,
298        };
299        let sign = est.observe(&sample);
300        assert!(sign.drift.abs() < 1e-10);
301        assert!(sign.slew.abs() < 1e-10);
302    }
303
304    #[test]
305    fn test_linear_drift_detected() {
306        let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
307        // Feed linearly increasing residuals: baseline=100, value = 100 + 0.5*i
308        for i in 0..30u64 {
309            let sample = ResidualSample {
310                value: 100.0 + 0.5 * i as f64,
311                baseline: 100.0,
312                timestamp_ns: i * 1_000_000_000,
313                source: ResidualSource::Latency,
314            };
315            est.observe(&sample);
316        }
317        let sample = ResidualSample {
318            value: 100.0 + 15.5,
319            baseline: 100.0,
320            timestamp_ns: 31_000_000_000,
321            source: ResidualSource::Latency,
322        };
323        let sign = est.observe(&sample);
324        // Drift should be approximately 0.5 (the slope)
325        assert!(
326            (sign.drift - 0.5).abs() < 0.1,
327            "Expected drift ~0.5, got {}",
328            sign.drift
329        );
330        // Slew should be ~0 (constant drift)
331        assert!(sign.slew.abs() < 0.2, "Expected slew ~0, got {}", sign.slew);
332    }
333
334    #[test]
335    fn test_accelerating_drift_produces_positive_slew() {
336        let mut est = ResidualEstimator::new(ResidualSource::Latency, 40);
337        // Feed quadratically increasing residuals: r(k) = 0.01 * k^2
338        for i in 0..50u64 {
339            let sample = ResidualSample {
340                value: 100.0 + 0.01 * (i as f64) * (i as f64),
341                baseline: 100.0,
342                timestamp_ns: i * 1_000_000_000,
343                source: ResidualSource::Latency,
344            };
345            est.observe(&sample);
346        }
347        let sample = ResidualSample {
348            value: 100.0 + 0.01 * 50.0 * 50.0,
349            baseline: 100.0,
350            timestamp_ns: 50_000_000_000,
351            source: ResidualSource::Latency,
352        };
353        let sign = est.observe(&sample);
354        // Slew should be positive (accelerating drift)
355        assert!(
356            sign.slew > 0.0,
357            "Expected positive slew for quadratic growth, got {}",
358            sign.slew
359        );
360    }
361
362    #[test]
363    fn test_source_preserved() {
364        let mut est = ResidualEstimator::new(ResidualSource::HeartbeatRtt, 10);
365        let sample = ResidualSample {
366            value: 50.0,
367            baseline: 40.0,
368            timestamp_ns: 0,
369            source: ResidualSource::HeartbeatRtt,
370        };
371        let sign = est.observe(&sample);
372        assert_eq!(sign.source, ResidualSource::HeartbeatRtt);
373    }
374}