Skip to main content

fips_core/mmp/
sender.rs

1//! MMP sender state machine.
2//!
3//! Tracks what this node has sent to a specific peer and produces
4//! SenderReport messages on demand. One `SenderState` per active peer.
5
6use std::time::{Duration, Instant};
7
8use crate::mmp::report::SenderReport;
9use crate::mmp::{
10    COLD_START_SAMPLES, DEFAULT_COLD_START_INTERVAL_MS, MAX_REPORT_INTERVAL_MS,
11    MIN_REPORT_INTERVAL_MS,
12};
13
14/// Per-peer sender-side MMP state.
15///
16/// Records cumulative and interval counters for every frame transmitted
17/// to this peer. Produces `SenderReport` snapshots on demand.
18pub struct SenderState {
19    // --- Cumulative (lifetime) ---
20    cumulative_packets_sent: u64,
21    cumulative_bytes_sent: u64,
22
23    // --- Current interval ---
24    interval_start_counter: u64,
25    interval_start_timestamp: u32,
26    interval_bytes_sent: u32,
27    /// Counter of the most recently sent frame.
28    last_counter: u64,
29    /// Timestamp of the most recently sent frame.
30    last_timestamp: u32,
31    /// Whether any frames have been sent in the current interval.
32    interval_has_data: bool,
33
34    // --- Report timing ---
35    last_report_time: Option<Instant>,
36    report_interval: Duration,
37
38    // --- Send failure backoff ---
39    /// Consecutive send failure count for backoff calculation.
40    consecutive_send_failures: u32,
41
42    // --- Cold-start tracking ---
43    /// Number of SRTT-based interval updates received.
44    srtt_sample_count: u32,
45}
46
47impl SenderState {
48    pub fn new() -> Self {
49        Self::new_with_cold_start(DEFAULT_COLD_START_INTERVAL_MS)
50    }
51
52    /// Create with a custom cold-start interval (ms).
53    ///
54    /// Used by session-layer MMP which needs a longer initial interval
55    /// since reports consume bandwidth on every transit link.
56    pub fn new_with_cold_start(cold_start_ms: u64) -> Self {
57        Self {
58            cumulative_packets_sent: 0,
59            cumulative_bytes_sent: 0,
60            interval_start_counter: 0,
61            interval_start_timestamp: 0,
62            interval_bytes_sent: 0,
63            last_counter: 0,
64            last_timestamp: 0,
65            interval_has_data: false,
66            last_report_time: None,
67            report_interval: Duration::from_millis(cold_start_ms),
68            consecutive_send_failures: 0,
69            srtt_sample_count: 0,
70        }
71    }
72
73    /// Record a frame sent to this peer.
74    ///
75    /// Called on the TX path for every encrypted link message.
76    /// `counter` is the AEAD nonce/counter, `timestamp` is the inner header
77    /// session-relative timestamp (ms), `bytes` is the wire payload size.
78    pub fn record_sent(&mut self, counter: u64, timestamp: u32, bytes: usize) {
79        if !self.interval_has_data {
80            self.interval_start_counter = counter;
81            self.interval_start_timestamp = timestamp;
82            self.interval_has_data = true;
83        }
84        self.last_counter = counter;
85        self.last_timestamp = timestamp;
86        self.interval_bytes_sent = self.interval_bytes_sent.saturating_add(bytes as u32);
87        self.cumulative_packets_sent += 1;
88        self.cumulative_bytes_sent += bytes as u64;
89    }
90
91    /// Build a SenderReport from current state and reset the interval.
92    ///
93    /// Returns `None` if no frames have been sent since the last report.
94    pub fn build_report(&mut self, now: Instant) -> Option<SenderReport> {
95        if !self.interval_has_data {
96            return None;
97        }
98
99        let report = SenderReport {
100            interval_start_counter: self.interval_start_counter,
101            interval_end_counter: self.last_counter,
102            interval_start_timestamp: self.interval_start_timestamp,
103            interval_end_timestamp: self.last_timestamp,
104            interval_bytes_sent: self.interval_bytes_sent,
105            cumulative_packets_sent: self.cumulative_packets_sent,
106            cumulative_bytes_sent: self.cumulative_bytes_sent,
107        };
108
109        // Reset interval
110        self.interval_has_data = false;
111        self.interval_bytes_sent = 0;
112        self.last_report_time = Some(now);
113
114        Some(report)
115    }
116
117    /// Check if it's time to send a report.
118    ///
119    /// When consecutive send failures have occurred, the effective interval
120    /// is multiplied by an exponential backoff factor (2^failures, capped at 32×).
121    pub fn should_send_report(&self, now: Instant) -> bool {
122        if !self.interval_has_data {
123            return false;
124        }
125        match self.last_report_time {
126            None => true, // Never sent a report — send immediately
127            Some(last) => {
128                let effective = self
129                    .report_interval
130                    .mul_f64(self.send_failure_backoff_multiplier());
131                now.duration_since(last) >= effective
132            }
133        }
134    }
135
136    /// Record a send failure. Returns the new consecutive failure count.
137    pub fn record_send_failure(&mut self) -> u32 {
138        self.consecutive_send_failures += 1;
139        self.consecutive_send_failures
140    }
141
142    /// Record a successful send. Returns the previous failure count (for summary logging).
143    pub fn record_send_success(&mut self) -> u32 {
144        let prev = self.consecutive_send_failures;
145        self.consecutive_send_failures = 0;
146        prev
147    }
148
149    /// Get the backoff multiplier based on consecutive failures.
150    ///
151    /// Returns 1.0 for no failures, 2.0 for 1 failure, 4.0 for 2, ...
152    /// capped at 32.0 (5 failures).
153    pub fn send_failure_backoff_multiplier(&self) -> f64 {
154        if self.consecutive_send_failures == 0 {
155            1.0
156        } else {
157            2.0_f64.powi(self.consecutive_send_failures.min(5) as i32)
158        }
159    }
160
161    /// Update the report interval based on SRTT (link-layer defaults).
162    ///
163    /// Sender reports at 2× SRTT clamped to [floor, MAX]. During cold-start
164    /// (first `COLD_START_SAMPLES` updates), the floor is the cold-start
165    /// interval (200ms) for fast SRTT convergence. After that, it rises to
166    /// `MIN_REPORT_INTERVAL_MS` (1000ms) for steady-state efficiency.
167    pub fn update_report_interval_from_srtt(&mut self, srtt_us: i64) {
168        self.srtt_sample_count = self.srtt_sample_count.saturating_add(1);
169        let floor = if self.srtt_sample_count <= COLD_START_SAMPLES {
170            DEFAULT_COLD_START_INTERVAL_MS
171        } else {
172            MIN_REPORT_INTERVAL_MS
173        };
174        self.update_report_interval_with_bounds(srtt_us, floor, MAX_REPORT_INTERVAL_MS);
175    }
176
177    /// Update the report interval based on SRTT with custom bounds.
178    ///
179    /// Used by session-layer MMP which needs higher clamp values since
180    /// each report consumes bandwidth on every transit link.
181    pub fn update_report_interval_with_bounds(&mut self, srtt_us: i64, min_ms: u64, max_ms: u64) {
182        if srtt_us <= 0 {
183            return;
184        }
185        let interval_us = (srtt_us * 2) as u64;
186        let interval_ms = (interval_us / 1000).clamp(min_ms, max_ms);
187        self.report_interval = Duration::from_millis(interval_ms);
188    }
189
190    // --- Accessors ---
191
192    pub fn cumulative_packets_sent(&self) -> u64 {
193        self.cumulative_packets_sent
194    }
195
196    pub fn cumulative_bytes_sent(&self) -> u64 {
197        self.cumulative_bytes_sent
198    }
199
200    pub fn report_interval(&self) -> Duration {
201        self.report_interval
202    }
203
204    pub fn consecutive_send_failures(&self) -> u32 {
205        self.consecutive_send_failures
206    }
207}
208
209impl Default for SenderState {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215// ============================================================================
216// Tests
217// ============================================================================
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_new_sender_state() {
225        let s = SenderState::new();
226        assert_eq!(s.cumulative_packets_sent(), 0);
227        assert_eq!(s.cumulative_bytes_sent(), 0);
228    }
229
230    #[test]
231    fn test_record_sent() {
232        let mut s = SenderState::new();
233        s.record_sent(1, 100, 500);
234        s.record_sent(2, 200, 600);
235        assert_eq!(s.cumulative_packets_sent(), 2);
236        assert_eq!(s.cumulative_bytes_sent(), 1100);
237    }
238
239    #[test]
240    fn test_build_report_empty() {
241        let mut s = SenderState::new();
242        assert!(s.build_report(Instant::now()).is_none());
243    }
244
245    #[test]
246    fn test_build_report() {
247        let mut s = SenderState::new();
248        s.record_sent(10, 1000, 500);
249        s.record_sent(11, 1100, 600);
250        s.record_sent(12, 1200, 400);
251
252        let report = s.build_report(Instant::now()).unwrap();
253        assert_eq!(report.interval_start_counter, 10);
254        assert_eq!(report.interval_end_counter, 12);
255        assert_eq!(report.interval_start_timestamp, 1000);
256        assert_eq!(report.interval_end_timestamp, 1200);
257        assert_eq!(report.interval_bytes_sent, 1500);
258        assert_eq!(report.cumulative_packets_sent, 3);
259        assert_eq!(report.cumulative_bytes_sent, 1500);
260    }
261
262    #[test]
263    fn test_build_report_resets_interval() {
264        let mut s = SenderState::new();
265        s.record_sent(1, 100, 500);
266        let _ = s.build_report(Instant::now());
267
268        // Second report with no new data returns None
269        assert!(s.build_report(Instant::now()).is_none());
270
271        // New data starts a fresh interval
272        s.record_sent(2, 200, 300);
273        let report = s.build_report(Instant::now()).unwrap();
274        assert_eq!(report.interval_start_counter, 2);
275        assert_eq!(report.interval_bytes_sent, 300);
276        // Cumulative continues
277        assert_eq!(report.cumulative_packets_sent, 2);
278        assert_eq!(report.cumulative_bytes_sent, 800);
279    }
280
281    #[test]
282    fn test_should_send_report_no_data() {
283        let s = SenderState::new();
284        assert!(!s.should_send_report(Instant::now()));
285    }
286
287    #[test]
288    fn test_should_send_report_first_time() {
289        let mut s = SenderState::new();
290        s.record_sent(1, 100, 500);
291        assert!(s.should_send_report(Instant::now()));
292    }
293
294    #[test]
295    fn test_should_send_report_respects_interval() {
296        let mut s = SenderState::new();
297        let t0 = Instant::now();
298        s.record_sent(1, 100, 500);
299        let _ = s.build_report(t0);
300
301        s.record_sent(2, 200, 500);
302        // Immediately after report — should not send
303        assert!(!s.should_send_report(t0));
304
305        // After interval elapses
306        let t1 = t0 + s.report_interval() + Duration::from_millis(1);
307        assert!(s.should_send_report(t1));
308    }
309
310    #[test]
311    fn test_update_report_interval_cold_start() {
312        let mut s = SenderState::new();
313        // During cold-start, floor is 200ms (DEFAULT_COLD_START_INTERVAL_MS)
314        // 50ms RTT → 100ms sender interval (2× SRTT), clamped to cold-start floor 200ms
315        s.update_report_interval_from_srtt(50_000);
316        assert_eq!(s.report_interval(), Duration::from_millis(200));
317
318        // 500ms RTT → 1000ms sender interval (above cold-start floor)
319        s.update_report_interval_from_srtt(500_000);
320        assert_eq!(s.report_interval(), Duration::from_millis(1000));
321    }
322
323    #[test]
324    fn test_update_report_interval_after_cold_start() {
325        let mut s = SenderState::new();
326        // Burn through cold-start samples (COLD_START_SAMPLES = 5)
327        for _ in 0..COLD_START_SAMPLES {
328            s.update_report_interval_from_srtt(500_000);
329        }
330
331        // 6th sample: now in steady state, floor is MIN_REPORT_INTERVAL_MS (1000ms)
332        // 50ms RTT → 100ms sender interval (2× SRTT), clamped to 1000ms
333        s.update_report_interval_from_srtt(50_000);
334        assert_eq!(
335            s.report_interval(),
336            Duration::from_millis(MIN_REPORT_INTERVAL_MS)
337        );
338
339        // 3s RTT → 6s, clamped to max 5s
340        s.update_report_interval_from_srtt(3_000_000);
341        assert_eq!(
342            s.report_interval(),
343            Duration::from_millis(MAX_REPORT_INTERVAL_MS)
344        );
345    }
346
347    #[test]
348    fn test_backoff_multiplier_progression() {
349        let mut s = SenderState::new();
350
351        // No failures → multiplier 1.0
352        assert_eq!(s.send_failure_backoff_multiplier(), 1.0);
353        assert_eq!(s.consecutive_send_failures(), 0);
354
355        // Progressive failures: 2^1, 2^2, 2^3, 2^4, 2^5
356        let expected = [2.0, 4.0, 8.0, 16.0, 32.0];
357        for (i, &exp) in expected.iter().enumerate() {
358            let count = s.record_send_failure();
359            assert_eq!(count, (i + 1) as u32);
360            assert_eq!(s.send_failure_backoff_multiplier(), exp);
361        }
362
363        // Beyond 5 failures: stays capped at 32.0
364        s.record_send_failure(); // 6th
365        assert_eq!(s.send_failure_backoff_multiplier(), 32.0);
366        s.record_send_failure(); // 7th
367        assert_eq!(s.send_failure_backoff_multiplier(), 32.0);
368    }
369
370    #[test]
371    fn test_backoff_reset_on_success() {
372        let mut s = SenderState::new();
373
374        // Accumulate failures
375        s.record_send_failure();
376        s.record_send_failure();
377        s.record_send_failure();
378        assert_eq!(s.consecutive_send_failures(), 3);
379        assert_eq!(s.send_failure_backoff_multiplier(), 8.0);
380
381        // Success resets and returns previous count
382        let prev = s.record_send_success();
383        assert_eq!(prev, 3);
384        assert_eq!(s.consecutive_send_failures(), 0);
385        assert_eq!(s.send_failure_backoff_multiplier(), 1.0);
386    }
387
388    #[test]
389    fn test_backoff_success_with_no_prior_failures() {
390        let mut s = SenderState::new();
391
392        // Success with no failures returns 0
393        let prev = s.record_send_success();
394        assert_eq!(prev, 0);
395        assert_eq!(s.consecutive_send_failures(), 0);
396    }
397
398    #[test]
399    fn test_should_send_report_respects_backoff() {
400        let mut s = SenderState::new();
401        let t0 = Instant::now();
402        s.record_sent(1, 100, 500);
403        let _ = s.build_report(t0);
404
405        // Record a failure: multiplier becomes 2.0
406        s.record_send_failure();
407
408        s.record_sent(2, 200, 500);
409
410        // At 1× interval: should NOT send (backoff requires 2×)
411        let t1 = t0 + s.report_interval() + Duration::from_millis(1);
412        assert!(!s.should_send_report(t1));
413
414        // At 2× interval: should send
415        let t2 = t0 + s.report_interval() * 2 + Duration::from_millis(1);
416        assert!(s.should_send_report(t2));
417    }
418}