Skip to main content

fips_core/mmp/
algorithms.rs

1//! MMP algorithmic building blocks.
2//!
3//! Pure computational types with no dependency on peer or node state.
4//! Each is independently testable.
5
6use std::collections::VecDeque;
7use std::time::Instant;
8
9use crate::mmp::{EWMA_LONG_ALPHA, EWMA_SHORT_ALPHA};
10
11// ============================================================================
12// Jitter Estimator (RFC 3550 §6.4.1)
13// ============================================================================
14
15/// Interarrival jitter estimator using RFC 3550 algorithm.
16///
17/// Maintains a smoothed jitter estimate (α = 1/16) from the absolute
18/// difference in one-way transit times between consecutive frames.
19/// Uses integer arithmetic scaled by 16 to avoid floating-point.
20pub struct JitterEstimator {
21    /// Scaled jitter estimate (×16 for integer arithmetic).
22    jitter_q4: i64,
23}
24
25impl JitterEstimator {
26    pub fn new() -> Self {
27        Self { jitter_q4: 0 }
28    }
29
30    /// Update with transit time delta between consecutive frames.
31    ///
32    /// `transit_delta` = (R_i - R_{i-1}) - (S_i - S_{i-1}) in microseconds.
33    pub fn update(&mut self, transit_delta: i32) {
34        // RFC 3550: J = J + (1/16)(|D(i)| - J)
35        // Scaled: J_q4 = J_q4 + (|D| - J_q4/16)
36        //       = J_q4 + |D| - J_q4 >> 4
37        let abs_d = (transit_delta as i64).unsigned_abs() as i64;
38        self.jitter_q4 += abs_d - (self.jitter_q4 >> 4);
39    }
40
41    /// Current jitter estimate in microseconds.
42    pub fn jitter_us(&self) -> u32 {
43        (self.jitter_q4 >> 4) as u32
44    }
45}
46
47impl Default for JitterEstimator {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53// ============================================================================
54// SRTT Estimator (Jacobson, RFC 6298)
55// ============================================================================
56
57/// Smoothed RTT estimator using Jacobson's algorithm.
58///
59/// SRTT and RTTVAR are maintained in microseconds using integer arithmetic.
60pub struct SrttEstimator {
61    /// Smoothed RTT (microseconds).
62    srtt_us: i64,
63    /// RTT variance (microseconds).
64    rttvar_us: i64,
65    /// Whether the first sample has been applied.
66    initialized: bool,
67}
68
69impl SrttEstimator {
70    pub fn new() -> Self {
71        Self {
72            srtt_us: 0,
73            rttvar_us: 0,
74            initialized: false,
75        }
76    }
77
78    /// Feed an RTT sample in microseconds.
79    pub fn update(&mut self, rtt_us: i64) {
80        if !self.initialized {
81            // RFC 6298 §2.2: first measurement
82            self.srtt_us = rtt_us;
83            self.rttvar_us = rtt_us / 2;
84            self.initialized = true;
85        } else {
86            // RFC 6298 §2.3:
87            // RTTVAR = (1 - β) * RTTVAR + β * |SRTT - R'|    β = 1/4
88            // SRTT   = (1 - α) * SRTT   + α * R'             α = 1/8
89            let err = (self.srtt_us - rtt_us).abs();
90            self.rttvar_us = self.rttvar_us - (self.rttvar_us >> 2) + (err >> 2);
91            self.srtt_us = self.srtt_us - (self.srtt_us >> 3) + (rtt_us >> 3);
92        }
93    }
94
95    pub fn srtt_us(&self) -> i64 {
96        self.srtt_us
97    }
98
99    pub fn rttvar_us(&self) -> i64 {
100        self.rttvar_us
101    }
102
103    pub fn initialized(&self) -> bool {
104        self.initialized
105    }
106
107    /// Retransmission timeout = SRTT + max(4 * RTTVAR, 1s), floored at 1s.
108    pub fn rto_us(&self) -> i64 {
109        let rto = self.srtt_us + (self.rttvar_us << 2).max(1_000_000);
110        rto.max(1_000_000)
111    }
112}
113
114impl Default for SrttEstimator {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120// ============================================================================
121// Dual EWMA Trend Detector
122// ============================================================================
123
124/// Dual EWMA for trend detection on a single metric.
125///
126/// Short-term (α=1/4) tracks recent conditions; long-term (α=1/32)
127/// establishes a stable baseline. Divergence indicates trend direction.
128pub struct DualEwma {
129    short: f64,
130    long: f64,
131    initialized: bool,
132}
133
134impl DualEwma {
135    pub fn new() -> Self {
136        Self {
137            short: 0.0,
138            long: 0.0,
139            initialized: false,
140        }
141    }
142
143    pub fn update(&mut self, sample: f64) {
144        if !self.initialized {
145            self.short = sample;
146            self.long = sample;
147            self.initialized = true;
148        } else {
149            self.short += EWMA_SHORT_ALPHA * (sample - self.short);
150            self.long += EWMA_LONG_ALPHA * (sample - self.long);
151        }
152    }
153
154    pub fn short(&self) -> f64 {
155        self.short
156    }
157
158    pub fn long(&self) -> f64 {
159        self.long
160    }
161
162    pub fn initialized(&self) -> bool {
163        self.initialized
164    }
165}
166
167impl Default for DualEwma {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173// ============================================================================
174// One-Way Delay Trend Detector
175// ============================================================================
176
177/// OWD trend detector using linear regression over a ring buffer.
178///
179/// Stores (sequence, owd_us) samples and computes the slope via
180/// least-squares regression. The slope (µs/s) indicates whether
181/// queuing delay is increasing (congestion) or stable.
182pub struct OwdTrendDetector {
183    samples: VecDeque<(u32, i64)>,
184    capacity: usize,
185}
186
187impl OwdTrendDetector {
188    pub fn new(capacity: usize) -> Self {
189        Self {
190            samples: VecDeque::with_capacity(capacity),
191            capacity,
192        }
193    }
194
195    /// Clear all samples, keeping the same capacity.
196    pub fn clear(&mut self) {
197        self.samples.clear();
198    }
199
200    /// Add an OWD sample.
201    ///
202    /// `seq` is a monotonic sequence number (e.g., truncated frame counter).
203    /// `owd_us` is the relative one-way delay in microseconds (R_i - S_i).
204    pub fn push(&mut self, seq: u32, owd_us: i64) {
205        if self.samples.len() == self.capacity {
206            self.samples.pop_front();
207        }
208        self.samples.push_back((seq, owd_us));
209    }
210
211    /// Compute the OWD trend as a slope in µs/second.
212    ///
213    /// Uses simple linear regression: slope = Σ((x-x̄)(y-ȳ)) / Σ((x-x̄)²)
214    /// where x = sequence number and y = owd_us.
215    ///
216    /// Returns 0 if fewer than 2 samples.
217    pub fn trend_us_per_sec(&self) -> i32 {
218        let n = self.samples.len();
219        if n < 2 {
220            return 0;
221        }
222
223        let n_f = n as f64;
224        let sum_x: f64 = self.samples.iter().map(|(s, _)| *s as f64).sum();
225        let sum_y: f64 = self.samples.iter().map(|(_, y)| *y as f64).sum();
226        let mean_x = sum_x / n_f;
227        let mean_y = sum_y / n_f;
228
229        let mut num = 0.0;
230        let mut den = 0.0;
231        for &(x, y) in &self.samples {
232            let dx = x as f64 - mean_x;
233            let dy = y as f64 - mean_y;
234            num += dx * dy;
235            den += dx * dx;
236        }
237
238        if den.abs() < f64::EPSILON {
239            return 0;
240        }
241
242        // slope is in µs/packet. Convert to µs/second assuming ~1ms inter-packet
243        // spacing as a rough estimate. The raw slope per packet is more useful
244        // for trend detection than an absolute rate, but the wire format specifies
245        // µs/s. We report the raw per-packet slope scaled by 1000.
246        let slope_per_packet = num / den;
247        (slope_per_packet * 1000.0) as i32
248    }
249
250    pub fn len(&self) -> usize {
251        self.samples.len()
252    }
253
254    pub fn is_empty(&self) -> bool {
255        self.samples.is_empty()
256    }
257}
258
259// ============================================================================
260// ETX
261// ============================================================================
262
263/// Compute Expected Transmission Count from bidirectional delivery ratios.
264///
265/// ETX = 1 / (d_f × d_r) where d_f and d_r are forward and reverse
266/// delivery probabilities (1.0 = perfect, 0.0 = no delivery).
267///
268/// Clamped to [1.0, 100.0].
269pub fn compute_etx(d_forward: f64, d_reverse: f64) -> f64 {
270    let product = d_forward * d_reverse;
271    if product <= 0.0 {
272        return 100.0;
273    }
274    (1.0 / product).clamp(1.0, 100.0)
275}
276
277// ============================================================================
278// Spin Bit
279// ============================================================================
280
281/// Spin bit state for passive RTT estimation.
282///
283/// Uses asymmetric roles (initiator/responder) per the MMP design:
284/// - **Initiator**: flips spin value on each received frame; measures RTT
285///   from edge-to-edge intervals.
286/// - **Responder**: copies received spin bit into outgoing frames, with a
287///   counter guard to filter reordered frames.
288pub struct SpinBitState {
289    is_initiator: bool,
290    current_value: bool,
291    /// Highest counter observed with a spin edge (responder guard).
292    highest_counter_for_spin: u64,
293    /// Time of last spin edge (initiator only, for RTT measurement).
294    last_edge_time: Option<Instant>,
295}
296
297impl SpinBitState {
298    pub fn new(is_initiator: bool) -> Self {
299        Self {
300            is_initiator,
301            current_value: false,
302            highest_counter_for_spin: 0,
303            last_edge_time: None,
304        }
305    }
306
307    /// Check if this is the spin bit initiator.
308    pub fn is_initiator(&self) -> bool {
309        self.is_initiator
310    }
311
312    /// Get the spin bit value to set on an outgoing frame.
313    pub fn tx_bit(&self) -> bool {
314        self.current_value
315    }
316
317    /// Process a received frame's spin bit.
318    ///
319    /// Returns an RTT sample duration if an edge was detected (initiator only).
320    pub fn rx_observe(
321        &mut self,
322        received_bit: bool,
323        counter: u64,
324        now: Instant,
325    ) -> Option<std::time::Duration> {
326        if self.is_initiator {
327            // Initiator: when the reflected bit matches what we sent,
328            // that completes a round trip. Record the edge time, then
329            // flip for the next cycle.
330            if received_bit == self.current_value {
331                let rtt = self.last_edge_time.map(|t| now.duration_since(t));
332                self.last_edge_time = Some(now);
333                self.current_value = !self.current_value;
334                rtt
335            } else {
336                None
337            }
338        } else {
339            // Responder: copy received bit, but only if counter is higher
340            // (reordering guard)
341            if counter > self.highest_counter_for_spin {
342                self.highest_counter_for_spin = counter;
343                self.current_value = received_bit;
344            }
345            None
346        }
347    }
348}
349
350// ============================================================================
351// Tests
352// ============================================================================
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_jitter_zero_input() {
360        let mut j = JitterEstimator::new();
361        j.update(0);
362        assert_eq!(j.jitter_us(), 0);
363    }
364
365    #[test]
366    fn test_jitter_convergence() {
367        let mut j = JitterEstimator::new();
368        // Feed constant transit delta of 1000µs
369        for _ in 0..200 {
370            j.update(1000);
371        }
372        // Should converge near 1000µs
373        let jitter = j.jitter_us();
374        assert!(
375            jitter > 900 && jitter < 1100,
376            "jitter={jitter}, expected ~1000"
377        );
378    }
379
380    #[test]
381    fn test_srtt_first_sample() {
382        let mut s = SrttEstimator::new();
383        s.update(10_000); // 10ms
384        assert_eq!(s.srtt_us(), 10_000);
385        assert_eq!(s.rttvar_us(), 5_000);
386        assert!(s.initialized());
387    }
388
389    #[test]
390    fn test_srtt_convergence() {
391        let mut s = SrttEstimator::new();
392        // Feed constant 50ms RTT
393        for _ in 0..100 {
394            s.update(50_000);
395        }
396        let srtt = s.srtt_us();
397        assert!((srtt - 50_000).abs() < 1000, "srtt={srtt}, expected ~50000");
398    }
399
400    #[test]
401    fn test_dual_ewma_initialization() {
402        let mut e = DualEwma::new();
403        assert!(!e.initialized());
404        e.update(100.0);
405        assert!(e.initialized());
406        assert_eq!(e.short(), 100.0);
407        assert_eq!(e.long(), 100.0);
408    }
409
410    #[test]
411    fn test_dual_ewma_short_tracks_faster() {
412        let mut e = DualEwma::new();
413        // Initialize at 0
414        e.update(0.0);
415        // Jump to 100
416        for _ in 0..20 {
417            e.update(100.0);
418        }
419        // Short should be closer to 100 than long
420        assert!(
421            e.short() > e.long(),
422            "short={} long={}",
423            e.short(),
424            e.long()
425        );
426    }
427
428    #[test]
429    fn test_owd_trend_flat() {
430        let mut d = OwdTrendDetector::new(32);
431        for i in 0..20 {
432            d.push(i, 5000); // constant OWD
433        }
434        let trend = d.trend_us_per_sec();
435        assert_eq!(trend, 0, "flat OWD should have zero trend");
436    }
437
438    #[test]
439    fn test_owd_trend_increasing() {
440        let mut d = OwdTrendDetector::new(32);
441        for i in 0..20 {
442            d.push(i, 5000 + (i as i64) * 100); // increasing by 100µs per packet
443        }
444        let trend = d.trend_us_per_sec();
445        assert!(
446            trend > 0,
447            "increasing OWD should have positive trend, got {trend}"
448        );
449    }
450
451    #[test]
452    fn test_owd_trend_insufficient_samples() {
453        let mut d = OwdTrendDetector::new(32);
454        d.push(0, 5000);
455        assert_eq!(d.trend_us_per_sec(), 0);
456    }
457
458    #[test]
459    fn test_etx_perfect_link() {
460        assert!((compute_etx(1.0, 1.0) - 1.0).abs() < f64::EPSILON);
461    }
462
463    #[test]
464    fn test_etx_lossy_link() {
465        // 10% forward loss, 5% reverse loss
466        let etx = compute_etx(0.9, 0.95);
467        assert!(etx > 1.0 && etx < 2.0, "etx={etx}");
468    }
469
470    #[test]
471    fn test_etx_zero_delivery() {
472        assert_eq!(compute_etx(0.0, 1.0), 100.0);
473        assert_eq!(compute_etx(1.0, 0.0), 100.0);
474    }
475
476    #[test]
477    fn test_spin_bit_initiator_rtt() {
478        let mut initiator = SpinBitState::new(true);
479        let mut responder = SpinBitState::new(false);
480
481        let t0 = Instant::now();
482        let t1 = t0 + std::time::Duration::from_millis(10);
483        let t2 = t0 + std::time::Duration::from_millis(20);
484
485        // Initiator sends with spin=false (initial)
486        let bit_to_send = initiator.tx_bit();
487        assert!(!bit_to_send);
488
489        // Responder receives, copies bit
490        responder.rx_observe(bit_to_send, 1, t0);
491        assert!(!responder.tx_bit());
492
493        // Responder sends back, initiator receives
494        let resp_bit = responder.tx_bit();
495        let rtt1 = initiator.rx_observe(resp_bit, 2, t1);
496        // First edge: no previous edge to compare
497        assert!(rtt1.is_none());
498
499        // Now initiator's spin flipped to true
500        let bit2 = initiator.tx_bit();
501        assert!(bit2);
502
503        // Responder receives new bit
504        responder.rx_observe(bit2, 3, t1);
505        assert!(responder.tx_bit());
506
507        // Responder sends back, initiator receives
508        let resp_bit2 = responder.tx_bit();
509        let rtt2 = initiator.rx_observe(resp_bit2, 4, t2);
510        // Second edge: should produce an RTT sample
511        assert!(rtt2.is_some());
512    }
513
514    #[test]
515    fn test_spin_bit_responder_counter_guard() {
516        let mut responder = SpinBitState::new(false);
517
518        // Receive counter=5 with spin=true
519        responder.rx_observe(true, 5, Instant::now());
520        assert!(responder.tx_bit());
521
522        // Reordered packet with counter=3 and spin=false should be ignored
523        responder.rx_observe(false, 3, Instant::now());
524        assert!(responder.tx_bit()); // unchanged
525    }
526}