fips-core 0.3.3

Reusable FIPS mesh, endpoint, transport, and protocol library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
//! MMP derived metrics.
//!
//! `MmpMetrics` processes incoming ReceiverReports (from our peer) and
//! maintains derived metrics: SRTT, loss rate, goodput, ETX, and dual
//! EWMA trend indicators. Updated by the sender side when it receives
//! a ReceiverReport about its own traffic.

use crate::mmp::algorithms::{DualEwma, SrttEstimator, compute_etx};
use crate::mmp::report::ReceiverReport;
use std::time::Instant;
use tracing::trace;

/// Derived MMP metrics, updated from incoming ReceiverReports.
///
/// This lives on the sender side: when we receive a ReceiverReport from
/// our peer describing what they observed about our traffic, we process
/// it here to compute RTT, loss, goodput, and trend indicators.
pub struct MmpMetrics {
    /// Smoothed RTT from timestamp echo.
    pub srtt: SrttEstimator,

    /// Dual EWMA trend detectors.
    pub rtt_trend: DualEwma,
    pub loss_trend: DualEwma,
    pub goodput_trend: DualEwma,
    pub jitter_trend: DualEwma,
    pub etx_trend: DualEwma,

    /// Forward delivery ratio (what fraction of our frames the peer received).
    pub delivery_ratio_forward: f64,
    /// Reverse delivery ratio (set when we compute from our own receiver state).
    pub delivery_ratio_reverse: f64,
    /// ETX computed from bidirectional delivery ratios.
    pub etx: f64,

    /// Smoothed goodput in bytes/sec (forward direction: what the peer received from us).
    pub goodput_bps: f64,

    // --- State for delta computation ---
    /// Previous ReceiverReport's cumulative counters (for computing interval deltas).
    prev_rr_cum_packets: u64,
    prev_rr_cum_bytes: u64,
    prev_rr_highest_counter: u64,
    prev_rr_ecn_ce: u32,
    prev_rr_reorder: u32,
    /// Time of previous ReceiverReport (for goodput rate computation).
    prev_rr_time: Option<Instant>,
    /// Whether we have a previous ReceiverReport for delta computation.
    has_prev_rr: bool,

    // --- State for reverse delivery ratio delta computation ---
    /// Previous reverse-side cumulative packets received (our receiver state).
    prev_reverse_packets: u64,
    /// Previous reverse-side highest counter (our receiver state).
    prev_reverse_highest: u64,
    /// Whether we have a previous reverse-side snapshot for delta computation.
    has_prev_reverse: bool,
}

impl MmpMetrics {
    /// Reset state derived from ReceiverReport counters for rekey cutover.
    ///
    /// The new session starts with counter 0, so the prev_rr deltas must
    /// be reset to avoid computing bogus loss/goodput from the counter
    /// discontinuity. RTT (SRTT) is preserved since it remains valid.
    pub fn reset_for_rekey(&mut self) {
        self.prev_rr_cum_packets = 0;
        self.prev_rr_cum_bytes = 0;
        self.prev_rr_highest_counter = 0;
        self.prev_rr_ecn_ce = 0;
        self.prev_rr_reorder = 0;
        self.prev_rr_time = None;
        self.has_prev_rr = false;
        self.delivery_ratio_forward = 1.0;
        self.prev_reverse_packets = 0;
        self.prev_reverse_highest = 0;
        self.has_prev_reverse = false;
        // Keep srtt, etx, trends, goodput_bps — they'll refresh from data
    }

    pub fn new() -> Self {
        Self {
            srtt: SrttEstimator::new(),
            rtt_trend: DualEwma::new(),
            loss_trend: DualEwma::new(),
            goodput_trend: DualEwma::new(),
            jitter_trend: DualEwma::new(),
            etx_trend: DualEwma::new(),
            delivery_ratio_forward: 1.0,
            delivery_ratio_reverse: 1.0,
            etx: 1.0,
            goodput_bps: 0.0,
            prev_rr_cum_packets: 0,
            prev_rr_cum_bytes: 0,
            prev_rr_highest_counter: 0,
            prev_rr_ecn_ce: 0,
            prev_rr_reorder: 0,
            prev_rr_time: None,
            has_prev_rr: false,
            prev_reverse_packets: 0,
            prev_reverse_highest: 0,
            has_prev_reverse: false,
        }
    }

    /// Process an incoming ReceiverReport (from the peer about our traffic).
    ///
    /// `our_timestamp_ms` is the current session-relative time in ms (for RTT).
    /// `now` is the current monotonic time (for goodput rate computation).
    ///
    /// Returns `true` if this report produced the first SRTT measurement
    /// (transition from uninitialized to initialized).
    pub fn process_receiver_report(
        &mut self,
        rr: &ReceiverReport,
        our_timestamp_ms: u32,
        now: Instant,
    ) -> bool {
        let had_srtt = self.srtt.initialized();

        // --- RTT from timestamp echo ---
        // RTT = now - echoed_timestamp - dwell_time
        if rr.timestamp_echo > 0 {
            let echo_ms = rr.timestamp_echo;
            let dwell_ms = rr.dwell_time as u32;
            // Guard against timestamp wrap or bogus values
            if our_timestamp_ms > echo_ms + dwell_ms {
                let rtt_ms = our_timestamp_ms - echo_ms - dwell_ms;
                let rtt_us = (rtt_ms as i64) * 1000;
                trace!(
                    our_ts = our_timestamp_ms,
                    echo = echo_ms,
                    dwell = dwell_ms,
                    rtt_ms = rtt_ms,
                    srtt_ms = self.srtt.srtt_us() as f64 / 1000.0,
                    "RTT sample from timestamp echo"
                );
                self.srtt.update(rtt_us);
                self.rtt_trend.update(rtt_us as f64);
            }
        }

        // --- Loss rate from cumulative counters ---
        // Delta: frames the peer should have received vs. actually received
        if self.has_prev_rr {
            let counter_span = rr
                .highest_counter
                .saturating_sub(self.prev_rr_highest_counter);
            let packets_delta = rr
                .cumulative_packets_recv
                .saturating_sub(self.prev_rr_cum_packets);

            if counter_span > 0 {
                let delivery = (packets_delta as f64) / (counter_span as f64);
                self.delivery_ratio_forward = delivery.clamp(0.0, 1.0);
                let loss_rate = 1.0 - self.delivery_ratio_forward;
                self.loss_trend.update(loss_rate);
                self.etx = compute_etx(self.delivery_ratio_forward, self.delivery_ratio_reverse);
                self.etx_trend.update(self.etx);
            }
        }

        // --- Goodput from cumulative bytes + time delta ---
        if self.has_prev_rr {
            let bytes_delta = rr
                .cumulative_bytes_recv
                .saturating_sub(self.prev_rr_cum_bytes);
            self.goodput_trend.update(bytes_delta as f64);

            // Compute bytes/sec if we have a time reference
            if let Some(prev_time) = self.prev_rr_time {
                let elapsed = now.duration_since(prev_time);
                let secs = elapsed.as_secs_f64();
                if secs > 0.0 {
                    let bps = bytes_delta as f64 / secs;
                    // EWMA smoothing: α = 1/4
                    if self.goodput_bps == 0.0 {
                        self.goodput_bps = bps;
                    } else {
                        self.goodput_bps += (bps - self.goodput_bps) * 0.25;
                    }
                }
            }
        }

        // --- Jitter trend ---
        self.jitter_trend.update(rr.jitter as f64);

        // --- Save for next delta ---
        self.prev_rr_cum_packets = rr.cumulative_packets_recv;
        self.prev_rr_cum_bytes = rr.cumulative_bytes_recv;
        self.prev_rr_highest_counter = rr.highest_counter;
        self.prev_rr_ecn_ce = rr.ecn_ce_count;
        self.prev_rr_reorder = rr.cumulative_reorder_count;
        self.prev_rr_time = Some(now);
        self.has_prev_rr = true;

        !had_srtt && self.srtt.initialized()
    }

    /// Update the reverse delivery ratio from our own receiver state.
    ///
    /// Computes a per-interval delta (same as forward ratio) rather than
    /// a lifetime cumulative ratio, so ETX responds to recent conditions.
    pub fn update_reverse_delivery(&mut self, our_recv_packets: u64, peer_highest: u64) {
        if self.has_prev_reverse {
            let counter_span = peer_highest.saturating_sub(self.prev_reverse_highest);
            let packets_delta = our_recv_packets.saturating_sub(self.prev_reverse_packets);

            if counter_span > 0 {
                let delivery = (packets_delta as f64) / (counter_span as f64);
                self.delivery_ratio_reverse = delivery.clamp(0.0, 1.0);
                self.etx = compute_etx(self.delivery_ratio_forward, self.delivery_ratio_reverse);
                self.etx_trend.update(self.etx);
            }
        }

        self.prev_reverse_packets = our_recv_packets;
        self.prev_reverse_highest = peer_highest;
        self.has_prev_reverse = true;
    }

    /// Current smoothed RTT in milliseconds, or `None` if not yet measured.
    pub fn srtt_ms(&self) -> Option<f64> {
        if self.srtt.initialized() {
            Some(self.srtt.srtt_us() as f64 / 1000.0)
        } else {
            None
        }
    }

    /// Current loss rate (0.0 = no loss, 1.0 = total loss).
    pub fn loss_rate(&self) -> f64 {
        1.0 - self.delivery_ratio_forward
    }

    /// Smoothed loss rate (long-term EWMA), or `None` if not yet initialized.
    pub fn smoothed_loss(&self) -> Option<f64> {
        if self.loss_trend.initialized() {
            Some(self.loss_trend.long())
        } else {
            None
        }
    }

    /// Smoothed ETX (long-term EWMA), or `None` if not yet initialized.
    pub fn smoothed_etx(&self) -> Option<f64> {
        if self.etx_trend.initialized() {
            Some(self.etx_trend.long())
        } else {
            None
        }
    }

    /// Current smoothed goodput in bytes/sec, or 0 if not yet measured.
    pub fn goodput_bps(&self) -> f64 {
        self.goodput_bps
    }

    /// Cumulative ECN CE count from the most recent ReceiverReport.
    pub fn last_ecn_ce_count(&self) -> u32 {
        self.prev_rr_ecn_ce
    }
}

impl Default for MmpMetrics {
    fn default() -> Self {
        Self::new()
    }
}

// ============================================================================
// Tests
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    fn make_rr(
        highest_counter: u64,
        cum_packets: u64,
        cum_bytes: u64,
        timestamp_echo: u32,
        dwell: u16,
        jitter: u32,
    ) -> ReceiverReport {
        ReceiverReport {
            highest_counter,
            cumulative_packets_recv: cum_packets,
            cumulative_bytes_recv: cum_bytes,
            timestamp_echo,
            dwell_time: dwell,
            max_burst_loss: 0,
            mean_burst_loss: 0,
            jitter,
            ecn_ce_count: 0,
            owd_trend: 0,
            burst_loss_count: 0,
            cumulative_reorder_count: 0,
            interval_packets_recv: 0,
            interval_bytes_recv: 0,
        }
    }

    #[test]
    fn test_rtt_from_echo() {
        let mut m = MmpMetrics::new();
        let now = Instant::now();
        // Peer echoes timestamp 1000ms, dwell=5ms, our current time=1050ms
        let rr = make_rr(10, 10, 5000, 1000, 5, 0);
        m.process_receiver_report(&rr, 1050, now);

        assert!(m.srtt.initialized());
        // RTT = 1050 - 1000 - 5 = 45ms
        let srtt_ms = m.srtt_ms().unwrap();
        assert!((srtt_ms - 45.0).abs() < 1.0, "srtt={srtt_ms}, expected ~45");
    }

    #[test]
    fn test_loss_rate_computation() {
        let mut m = MmpMetrics::new();
        let t0 = Instant::now();

        // First report: baseline
        let rr1 = make_rr(100, 100, 50000, 0, 0, 0);
        m.process_receiver_report(&rr1, 0, t0);

        // Second report: 200 counters sent, 190 received (5% loss)
        let rr2 = make_rr(300, 290, 145000, 0, 0, 0);
        m.process_receiver_report(&rr2, 0, t0 + Duration::from_secs(1));

        let loss = m.loss_rate();
        assert!((loss - 0.05).abs() < 0.01, "loss={loss}, expected ~0.05");
    }

    #[test]
    fn test_etx_updates() {
        let mut m = MmpMetrics::new();
        assert_eq!(m.etx, 1.0); // initial: perfect

        // Simulate some loss via forward ratio
        m.delivery_ratio_forward = 0.9;

        // First call establishes the baseline (no ETX update yet)
        m.update_reverse_delivery(100, 100);
        assert_eq!(m.etx, 1.0); // still perfect — baseline only

        // Second call: 190 of 200 frames received (5% loss)
        m.update_reverse_delivery(290, 300);
        assert!(m.etx > 1.0);
        assert!(m.etx < 2.0);
    }

    #[test]
    fn test_no_rtt_without_echo() {
        let mut m = MmpMetrics::new();
        let now = Instant::now();
        let rr = make_rr(10, 10, 5000, 0, 0, 0);
        m.process_receiver_report(&rr, 1000, now);
        assert!(m.srtt_ms().is_none());
    }

    #[test]
    fn test_jitter_trend() {
        let mut m = MmpMetrics::new();
        let t0 = Instant::now();
        let rr1 = make_rr(10, 10, 5000, 0, 0, 100);
        m.process_receiver_report(&rr1, 0, t0);

        let rr2 = make_rr(20, 20, 10000, 0, 0, 500);
        m.process_receiver_report(&rr2, 0, t0 + Duration::from_secs(1));

        assert!(m.jitter_trend.initialized());
        // Short-term should be closer to 500 than long-term
        assert!(m.jitter_trend.short() > m.jitter_trend.long());
    }

    #[test]
    fn test_goodput_bps() {
        let mut m = MmpMetrics::new();
        let t0 = Instant::now();

        // First report: baseline (50KB received)
        let rr1 = make_rr(100, 100, 50_000, 0, 0, 0);
        m.process_receiver_report(&rr1, 0, t0);
        assert_eq!(m.goodput_bps(), 0.0); // no rate yet (first report)

        // Second report 1s later: 150KB total (100KB delta in 1s = 100KB/s)
        let rr2 = make_rr(300, 290, 150_000, 0, 0, 0);
        m.process_receiver_report(&rr2, 0, t0 + Duration::from_secs(1));
        assert!(
            m.goodput_bps() > 90_000.0,
            "goodput={}, expected ~100000",
            m.goodput_bps()
        );
        assert!(
            m.goodput_bps() < 110_000.0,
            "goodput={}, expected ~100000",
            m.goodput_bps()
        );
    }

    #[test]
    fn test_reverse_delivery_delta() {
        let mut m = MmpMetrics::new();

        // First call: baseline only, no ratio update
        m.update_reverse_delivery(100, 100);
        assert_eq!(m.delivery_ratio_reverse, 1.0); // unchanged from default

        // Second call: perfect delivery (200 new frames, all received)
        m.update_reverse_delivery(300, 300);
        assert!((m.delivery_ratio_reverse - 1.0).abs() < 0.001);

        // Third call: 50% loss (100 frames sent, 50 received)
        m.update_reverse_delivery(350, 400);
        assert!(
            (m.delivery_ratio_reverse - 0.5).abs() < 0.001,
            "reverse={}, expected 0.5",
            m.delivery_ratio_reverse
        );
    }

    #[test]
    fn test_reverse_delivery_rekey_reset() {
        let mut m = MmpMetrics::new();

        // Establish baseline and one measurement
        m.update_reverse_delivery(100, 100);
        m.update_reverse_delivery(300, 300);
        assert!((m.delivery_ratio_reverse - 1.0).abs() < 0.001);

        // Rekey resets reverse state
        m.reset_for_rekey();

        // First call after rekey: baseline only
        m.update_reverse_delivery(50, 50);
        // delivery_ratio_reverse was reset to 1.0 by reset_for_rekey's
        // clearing of delivery_ratio_forward; reverse is not explicitly
        // reset — but the delta state is, so next call computes fresh.
        assert_eq!(m.delivery_ratio_reverse, 1.0);

        // Second call after rekey: 80% delivery
        m.update_reverse_delivery(90, 100);
        assert!(
            (m.delivery_ratio_reverse - 0.8).abs() < 0.001,
            "reverse={}, expected 0.8",
            m.delivery_ratio_reverse
        );
    }
}