dsfb_gray/residual.rs
1//! Residual sign computation: the primary inferential object.
2//!
3//! A [`ResidualSign`] encodes the instantaneous structural state of a residual
4//! trajectory at a single observation point. It captures not just the magnitude
5//! of deviation (which scalar thresholds already use) but the **direction**
6//! (drift), **curvature** (slew), and **temporal persistence** of that deviation.
7//!
8//! This is the core insight of DSFB: residuals carry structured temporal
9//! information that scalar alarm methods discard.
10
11/// Source type for a residual measurement.
12///
13/// Each variant represents a distinct telemetry channel from which residuals
14/// are derived. The observer computes residual signs independently per source,
15/// then correlates across sources in the grammar layer.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum ResidualSource {
18 /// Request/response latency (e.g., p50, p95, p99 in nanoseconds).
19 Latency,
20 /// Operations per second or bytes per second throughput.
21 Throughput,
22 /// Error rate (errors per total requests in a window).
23 ErrorRate,
24 /// Queue depth for bounded channels or task queues.
25 QueueDepth,
26 /// Consensus heartbeat round-trip time (e.g., Raft leader→follower).
27 HeartbeatRtt,
28 /// Async runtime task poll duration.
29 PollDuration,
30 /// Resident set size or heap allocation rate.
31 MemoryUsage,
32 /// Serialization/deserialization throughput or latency.
33 SerdeLatency,
34 /// gRPC/HTTP2 flow control window utilization.
35 FlowControlWindow,
36 /// DNS resolution latency.
37 DnsLatency,
38 /// Custom user-defined source with a static label.
39 Custom(&'static str),
40}
41
42/// A single residual measurement sample from the observed system.
43///
44/// This is the **input** to the DSFB observer. It is accepted as an immutable
45/// reference — no mutation of the sample or its source is possible through
46/// this type.
47#[derive(Debug, Clone, Copy)]
48pub struct ResidualSample {
49 /// The measured value (e.g., latency in nanoseconds).
50 pub value: f64,
51 /// The expected/baseline value for this source under current regime.
52 pub baseline: f64,
53 /// Monotonic timestamp in nanoseconds (or cycle index).
54 pub timestamp_ns: u64,
55 /// Which telemetry channel produced this sample.
56 pub source: ResidualSource,
57}
58
59impl ResidualSample {
60 /// Compute the raw residual: measured minus baseline.
61 #[inline]
62 pub fn residual(&self) -> f64 {
63 self.value - self.baseline
64 }
65}
66
67/// The residual sign: DSFB's primary inferential object.
68///
69/// Captures the instantaneous structural state of a residual trajectory:
70/// - `residual` (r): raw deviation from baseline
71/// - `drift` (ω): first derivative — direction and rate of change
72/// - `slew` (α): second derivative — curvature / acceleration of change
73///
74/// These three quantities, together with the admissibility envelope and
75/// grammar state, fully determine the structural interpretation.
76#[derive(Debug, Clone, Copy)]
77pub struct ResidualSign {
78 /// Raw residual: r(k) = measured(k) - baseline(k).
79 pub residual: f64,
80 /// Drift: estimated first derivative of the residual trajectory.
81 /// Positive drift = residual growing (degradation direction).
82 pub drift: f64,
83 /// Slew: estimated second derivative of the residual trajectory.
84 /// Positive slew = drift accelerating (worsening curvature).
85 pub slew: f64,
86 /// Monotonic timestamp (nanoseconds) of this observation.
87 pub timestamp_ns: u64,
88 /// Source channel that produced this residual.
89 pub source: ResidualSource,
90}
91
92/// Windowed residual sign estimator.
93///
94/// Computes drift and slew from a sliding window of residual samples
95/// using finite differences. The window size `P` controls the persistence
96/// requirement: drift must be sustained over P samples to register.
97///
98/// ## Failure Mode FM-02: Insufficient Persistence Window
99///
100/// If P is too small, transient noise triggers drift detection.
101/// If P is too large, early degradation is filtered out.
102/// The recommended range is P ∈ [20, 100] for typical distributed
103/// system telemetry at 1-second sampling intervals.
104pub struct ResidualEstimator {
105 /// Ring buffer of recent residuals (fixed-size, stack-allocated).
106 window: [f64; 128],
107 /// Ring buffer of timestamps.
108 timestamps: [u64; 128],
109 /// Current write position in the ring buffer.
110 head: usize,
111 /// Number of samples received (saturates at window size).
112 count: usize,
113 /// Persistence window size (how many samples for drift/slew estimation).
114 persistence_window: usize,
115 /// Source channel this estimator tracks.
116 source: ResidualSource,
117}
118
119impl ResidualEstimator {
120 /// Create a new estimator for the given source with persistence window P.
121 ///
122 /// # Panics
123 ///
124 /// Panics if `persistence_window` is 0 or greater than 128.
125 pub fn new(source: ResidualSource, persistence_window: usize) -> Self {
126 assert!(
127 persistence_window > 0 && persistence_window <= 128,
128 "Persistence window must be in [1, 128], got {}",
129 persistence_window
130 );
131 Self {
132 window: [0.0; 128],
133 timestamps: [0; 128],
134 head: 0,
135 count: 0,
136 persistence_window,
137 source,
138 }
139 }
140
141 /// Ingest a new sample and compute the current residual sign.
142 ///
143 /// Accepts the sample as an immutable reference — the observer never
144 /// modifies the sample or its source.
145 pub fn observe(&mut self, sample: &ResidualSample) -> ResidualSign {
146 let r = sample.residual();
147
148 // Write into ring buffer
149 self.window[self.head] = r;
150 self.timestamps[self.head] = sample.timestamp_ns;
151 self.head = (self.head + 1) % 128;
152 if self.count < 128 {
153 self.count += 1;
154 }
155
156 let drift = self.estimate_drift();
157 let slew = self.estimate_slew();
158
159 ResidualSign {
160 residual: r,
161 drift,
162 slew,
163 timestamp_ns: sample.timestamp_ns,
164 source: self.source,
165 }
166 }
167
168 /// Estimate drift (first derivative) via least-squares slope over the
169 /// persistence window.
170 ///
171 /// Uses the standard formula: slope = (n·Σ(xy) - Σx·Σy) / (n·Σ(x²) - (Σx)²)
172 /// where x is the sample index and y is the residual value.
173 fn estimate_drift(&self) -> f64 {
174 let n = self.count.min(self.persistence_window);
175 if n < 2 {
176 return 0.0;
177 }
178
179 let mut sum_x: f64 = 0.0;
180 let mut sum_y: f64 = 0.0;
181 let mut sum_xy: f64 = 0.0;
182 let mut sum_x2: f64 = 0.0;
183
184 for i in 0..n {
185 let idx = if self.head >= n {
186 self.head - n + i
187 } else {
188 (128 + self.head - n + i) % 128
189 };
190 let x = i as f64;
191 let y = self.window[idx];
192 sum_x += x;
193 sum_y += y;
194 sum_xy += x * y;
195 sum_x2 += x * x;
196 }
197
198 let nf = n as f64;
199 let denom = nf * sum_x2 - sum_x * sum_x;
200 if denom.abs() < 1e-15 {
201 return 0.0;
202 }
203 (nf * sum_xy - sum_x * sum_y) / denom
204 }
205
206 /// Estimate slew (second derivative) via difference of two half-window drifts.
207 ///
208 /// Splits the persistence window into two halves, computes drift on each,
209 /// and reports the difference. This is a robust finite-difference approximation
210 /// that avoids noise amplification from direct second differences.
211 fn estimate_slew(&self) -> f64 {
212 let n = self.count.min(self.persistence_window);
213 if n < 4 {
214 return 0.0;
215 }
216
217 let half = n / 2;
218
219 // Drift of first half
220 let drift_first = self.half_window_drift(0, half);
221 // Drift of second half
222 let drift_second = self.half_window_drift(half, n);
223
224 // Slew is the change in drift
225 drift_second - drift_first
226 }
227
228 /// Compute least-squares drift on a sub-window [start_offset, end_offset)
229 /// relative to the current persistence window.
230 fn half_window_drift(&self, start_offset: usize, end_offset: usize) -> f64 {
231 let n = self.count.min(self.persistence_window);
232 let sub_n = end_offset - start_offset;
233 if sub_n < 2 {
234 return 0.0;
235 }
236
237 let mut sum_x: f64 = 0.0;
238 let mut sum_y: f64 = 0.0;
239 let mut sum_xy: f64 = 0.0;
240 let mut sum_x2: f64 = 0.0;
241
242 for i in 0..sub_n {
243 let global_i = start_offset + i;
244 let idx = if self.head >= n {
245 self.head - n + global_i
246 } else {
247 (128 + self.head - n + global_i) % 128
248 };
249 let x = i as f64;
250 let y = self.window[idx];
251 sum_x += x;
252 sum_y += y;
253 sum_xy += x * y;
254 sum_x2 += x * x;
255 }
256
257 let nf = sub_n as f64;
258 let denom = nf * sum_x2 - sum_x * sum_x;
259 if denom.abs() < 1e-15 {
260 return 0.0;
261 }
262 (nf * sum_xy - sum_x * sum_y) / denom
263 }
264
265 /// Reset the estimator state. Used when the observed system restarts
266 /// or when a new workload phase begins.
267 pub fn reset(&mut self) {
268 self.window = [0.0; 128];
269 self.timestamps = [0; 128];
270 self.head = 0;
271 self.count = 0;
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_zero_residual_produces_zero_drift_slew() {
281 let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
282 for i in 0..30 {
283 let sample = ResidualSample {
284 value: 100.0,
285 baseline: 100.0,
286 timestamp_ns: i * 1_000_000_000,
287 source: ResidualSource::Latency,
288 };
289 let sign = est.observe(&sample);
290 assert!((sign.residual).abs() < 1e-10);
291 }
292 // After enough samples, drift and slew should be ~0
293 let sample = ResidualSample {
294 value: 100.0,
295 baseline: 100.0,
296 timestamp_ns: 30_000_000_000,
297 source: ResidualSource::Latency,
298 };
299 let sign = est.observe(&sample);
300 assert!(sign.drift.abs() < 1e-10);
301 assert!(sign.slew.abs() < 1e-10);
302 }
303
304 #[test]
305 fn test_linear_drift_detected() {
306 let mut est = ResidualEstimator::new(ResidualSource::Latency, 20);
307 // Feed linearly increasing residuals: baseline=100, value = 100 + 0.5*i
308 for i in 0..30u64 {
309 let sample = ResidualSample {
310 value: 100.0 + 0.5 * i as f64,
311 baseline: 100.0,
312 timestamp_ns: i * 1_000_000_000,
313 source: ResidualSource::Latency,
314 };
315 est.observe(&sample);
316 }
317 let sample = ResidualSample {
318 value: 100.0 + 15.5,
319 baseline: 100.0,
320 timestamp_ns: 31_000_000_000,
321 source: ResidualSource::Latency,
322 };
323 let sign = est.observe(&sample);
324 // Drift should be approximately 0.5 (the slope)
325 assert!(
326 (sign.drift - 0.5).abs() < 0.1,
327 "Expected drift ~0.5, got {}",
328 sign.drift
329 );
330 // Slew should be ~0 (constant drift)
331 assert!(sign.slew.abs() < 0.2, "Expected slew ~0, got {}", sign.slew);
332 }
333
334 #[test]
335 fn test_accelerating_drift_produces_positive_slew() {
336 let mut est = ResidualEstimator::new(ResidualSource::Latency, 40);
337 // Feed quadratically increasing residuals: r(k) = 0.01 * k^2
338 for i in 0..50u64 {
339 let sample = ResidualSample {
340 value: 100.0 + 0.01 * (i as f64) * (i as f64),
341 baseline: 100.0,
342 timestamp_ns: i * 1_000_000_000,
343 source: ResidualSource::Latency,
344 };
345 est.observe(&sample);
346 }
347 let sample = ResidualSample {
348 value: 100.0 + 0.01 * 50.0 * 50.0,
349 baseline: 100.0,
350 timestamp_ns: 50_000_000_000,
351 source: ResidualSource::Latency,
352 };
353 let sign = est.observe(&sample);
354 // Slew should be positive (accelerating drift)
355 assert!(
356 sign.slew > 0.0,
357 "Expected positive slew for quadratic growth, got {}",
358 sign.slew
359 );
360 }
361
362 #[test]
363 fn test_source_preserved() {
364 let mut est = ResidualEstimator::new(ResidualSource::HeartbeatRtt, 10);
365 let sample = ResidualSample {
366 value: 50.0,
367 baseline: 40.0,
368 timestamp_ns: 0,
369 source: ResidualSource::HeartbeatRtt,
370 };
371 let sign = est.observe(&sample);
372 assert_eq!(sign.source, ResidualSource::HeartbeatRtt);
373 }
374}