nomad_protocol/transport/
pacing.rs

1//! Frame pacing and rate limiting.
2//!
3//! Implements the frame pacing algorithm from 2-TRANSPORT.md to prevent
4//! buffer bloat and network congestion.
5
6use std::time::{Duration, Instant};
7
8/// Frame pacing constants from the protocol specification.
9pub mod constants {
10    use std::time::Duration;
11
12    /// Minimum time between frames (lower bound).
13    /// Actual minimum is max(SRTT/2, 20ms).
14    pub const MIN_FRAME_INTERVAL_FLOOR: Duration = Duration::from_millis(20);
15
16    /// Wait after state change before sending (batch rapid changes).
17    pub const COLLECTION_INTERVAL: Duration = Duration::from_millis(8);
18
19    /// Maximum time to delay an ack-only frame.
20    pub const DELAYED_ACK_TIMEOUT: Duration = Duration::from_millis(100);
21
22    /// Hard cap on frame rate (50 Hz = 20ms between frames).
23    pub const MAX_FRAME_RATE_HZ: u32 = 50;
24
25    /// Keepalive interval - send keepalive if no data sent.
26    pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(25);
27
28    /// Dead interval - consider connection dead if no frames received.
29    pub const DEAD_INTERVAL: Duration = Duration::from_secs(60);
30
31    /// Maximum retransmits before giving up.
32    pub const MAX_RETRANSMITS: u32 = 10;
33
34    /// Retransmit backoff multiplier.
35    pub const RETRANSMIT_BACKOFF: u32 = 2;
36}
37
38/// Reason why a frame should be sent.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SendReason {
41    /// State has changed and needs to be synchronized.
42    StateChange,
43    /// Need to acknowledge received data.
44    Ack,
45    /// Keepalive to prevent timeout.
46    Keepalive,
47    /// Retransmitting unacknowledged data.
48    Retransmit,
49}
50
51/// Action the pacer recommends.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum PacerAction {
54    /// Send a frame now.
55    SendNow,
56    /// Wait until the specified instant before sending.
57    WaitUntil(Instant),
58    /// No action needed (nothing to send).
59    Idle,
60}
61
62/// Frame pacer that controls when frames can be sent.
63///
64/// The pacer ensures:
65/// - Minimum interval between frames (SRTT/2 or 20ms, whichever is greater)
66/// - Collection interval to batch rapid state changes (8ms)
67/// - Delayed ACK to piggyback on data frames (100ms max)
68/// - Frame rate cap at 50 Hz
69#[derive(Debug, Clone)]
70pub struct FramePacer {
71    /// When we last sent a frame.
72    last_frame_sent: Option<Instant>,
73    /// When a state change occurred that needs to be sent.
74    state_change_time: Option<Instant>,
75    /// When an ACK became pending.
76    ack_pending_since: Option<Instant>,
77    /// Whether we have pending data to send (not just ACK).
78    data_pending: bool,
79    /// Current smoothed RTT in milliseconds (from RTT estimator).
80    srtt_ms: f64,
81}
82
83impl Default for FramePacer {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl FramePacer {
90    /// Create a new frame pacer.
91    pub fn new() -> Self {
92        Self {
93            last_frame_sent: None,
94            state_change_time: None,
95            ack_pending_since: None,
96            data_pending: false,
97            srtt_ms: 0.0,
98        }
99    }
100
101    /// Update the SRTT from the RTT estimator.
102    pub fn set_srtt(&mut self, srtt: Duration) {
103        self.srtt_ms = srtt.as_secs_f64() * 1000.0;
104    }
105
106    /// Notify the pacer that local state has changed.
107    pub fn on_state_change(&mut self) {
108        if self.state_change_time.is_none() {
109            self.state_change_time = Some(Instant::now());
110        }
111        self.data_pending = true;
112    }
113
114    /// Notify the pacer that we received a frame and should send an ACK.
115    pub fn on_ack_needed(&mut self) {
116        if self.ack_pending_since.is_none() {
117            self.ack_pending_since = Some(Instant::now());
118        }
119    }
120
121    /// Notify the pacer that a frame was sent.
122    pub fn on_frame_sent(&mut self) {
123        self.last_frame_sent = Some(Instant::now());
124        self.state_change_time = None;
125        self.ack_pending_since = None;
126        self.data_pending = false;
127    }
128
129    /// Clear pending state (e.g., after receiving ACK).
130    pub fn clear_pending(&mut self) {
131        self.data_pending = false;
132        self.state_change_time = None;
133    }
134
135    /// Calculate the minimum frame interval based on SRTT.
136    fn min_frame_interval(&self) -> Duration {
137        let srtt_half_ms = self.srtt_ms / 2.0;
138        let floor_ms = constants::MIN_FRAME_INTERVAL_FLOOR.as_millis() as f64;
139        let interval_ms = f64::max(srtt_half_ms, floor_ms);
140
141        // Also respect the hard frame rate cap
142        let max_interval_ms = 1000.0 / constants::MAX_FRAME_RATE_HZ as f64;
143        let interval_ms = f64::max(interval_ms, max_interval_ms);
144
145        Duration::from_secs_f64(interval_ms / 1000.0)
146    }
147
148    /// Determine what action to take based on current state.
149    pub fn poll(&self) -> PacerAction {
150        let now = Instant::now();
151
152        // Check if we need to send anything at all
153        let needs_send = self.data_pending || self.ack_pending_since.is_some();
154        if !needs_send {
155            return PacerAction::Idle;
156        }
157
158        // Check minimum frame interval
159        if let Some(last_sent) = self.last_frame_sent {
160            let min_interval = self.min_frame_interval();
161            let next_allowed = last_sent + min_interval;
162            if now < next_allowed {
163                return PacerAction::WaitUntil(next_allowed);
164            }
165        }
166
167        // Check collection interval for state changes
168        if let Some(state_time) = self.state_change_time {
169            let collection_end = state_time + constants::COLLECTION_INTERVAL;
170            if now < collection_end && self.ack_pending_since.is_none() {
171                // Wait for collection interval, unless we have an ACK to send
172                return PacerAction::WaitUntil(collection_end);
173            }
174        }
175
176        // Check delayed ACK timeout
177        if !self.data_pending
178            && let Some(ack_time) = self.ack_pending_since
179        {
180            let ack_deadline = ack_time + constants::DELAYED_ACK_TIMEOUT;
181            if now < ack_deadline {
182                // Still within delayed ACK window, wait for data
183                return PacerAction::WaitUntil(ack_deadline);
184            }
185        }
186
187        // All checks passed, send now
188        PacerAction::SendNow
189    }
190
191    /// Check if we should send a keepalive.
192    pub fn needs_keepalive(&self, last_received: Instant) -> bool {
193        if let Some(last_sent) = self.last_frame_sent {
194            let now = Instant::now();
195            let since_sent = now.duration_since(last_sent);
196            let since_received = now.duration_since(last_received);
197
198            // Send keepalive if we haven't sent anything recently
199            // and the connection is still alive
200            since_sent >= constants::KEEPALIVE_INTERVAL
201                && since_received < constants::DEAD_INTERVAL
202        } else {
203            false
204        }
205    }
206
207    /// Check if the connection should be considered dead.
208    pub fn is_connection_dead(&self, last_received: Instant) -> bool {
209        Instant::now().duration_since(last_received) >= constants::DEAD_INTERVAL
210    }
211}
212
213/// Retransmission controller.
214///
215/// Tracks retransmission state and applies exponential backoff.
216#[derive(Debug, Clone)]
217pub struct RetransmitController {
218    /// Number of retransmits for current data.
219    retransmit_count: u32,
220    /// Last retransmit time.
221    last_retransmit: Option<Instant>,
222    /// Current timeout (after backoff).
223    current_timeout: Duration,
224    /// Base RTO from RTT estimator.
225    base_rto: Duration,
226}
227
228impl RetransmitController {
229    /// Create a new retransmit controller.
230    pub fn new(initial_rto: Duration) -> Self {
231        Self {
232            retransmit_count: 0,
233            last_retransmit: None,
234            current_timeout: initial_rto,
235            base_rto: initial_rto,
236        }
237    }
238
239    /// Update the base RTO from RTT estimator.
240    pub fn set_rto(&mut self, rto: Duration) {
241        self.base_rto = rto;
242        // Only update current_timeout if we're not in backoff
243        if self.retransmit_count == 0 {
244            self.current_timeout = rto;
245        }
246    }
247
248    /// Check if we should retransmit now.
249    pub fn should_retransmit(&self, unacked_data: bool) -> bool {
250        if !unacked_data {
251            return false;
252        }
253
254        if self.retransmit_count >= constants::MAX_RETRANSMITS {
255            return false; // Give up
256        }
257
258        match self.last_retransmit {
259            Some(last) => Instant::now().duration_since(last) >= self.current_timeout,
260            None => true, // First transmission
261        }
262    }
263
264    /// Record that we're retransmitting.
265    pub fn on_retransmit(&mut self) {
266        self.retransmit_count += 1;
267        self.last_retransmit = Some(Instant::now());
268
269        // Exponential backoff
270        let new_timeout = self.current_timeout * constants::RETRANSMIT_BACKOFF;
271        self.current_timeout = new_timeout.min(super::timing::constants::MAX_RTO);
272    }
273
274    /// Reset after successful acknowledgment.
275    pub fn on_ack(&mut self) {
276        self.retransmit_count = 0;
277        self.last_retransmit = None;
278        self.current_timeout = self.base_rto;
279    }
280
281    /// Get the current retransmit count.
282    pub fn retransmit_count(&self) -> u32 {
283        self.retransmit_count
284    }
285
286    /// Check if we've exceeded max retransmits.
287    pub fn is_failed(&self) -> bool {
288        self.retransmit_count >= constants::MAX_RETRANSMITS
289    }
290
291    /// Get time until next retransmit is allowed.
292    pub fn time_until_retransmit(&self) -> Option<Duration> {
293        self.last_retransmit.map(|last| {
294            let elapsed = Instant::now().duration_since(last);
295            self.current_timeout.saturating_sub(elapsed)
296        })
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_pacer_initial_state() {
306        let pacer = FramePacer::new();
307        assert_eq!(pacer.poll(), PacerAction::Idle);
308    }
309
310    #[test]
311    fn test_pacer_state_change() {
312        let mut pacer = FramePacer::new();
313        pacer.on_state_change();
314
315        // Should wait for collection interval
316        match pacer.poll() {
317            PacerAction::WaitUntil(_) => {}
318            other => panic!("Expected WaitUntil, got {:?}", other),
319        }
320
321        // After collection interval, should send
322        std::thread::sleep(constants::COLLECTION_INTERVAL + Duration::from_millis(1));
323        assert_eq!(pacer.poll(), PacerAction::SendNow);
324    }
325
326    #[test]
327    fn test_pacer_ack_only() {
328        let mut pacer = FramePacer::new();
329        pacer.on_ack_needed();
330
331        // ACK-only should wait for delayed ACK timeout
332        match pacer.poll() {
333            PacerAction::WaitUntil(_) => {}
334            other => panic!("Expected WaitUntil, got {:?}", other),
335        }
336    }
337
338    #[test]
339    fn test_pacer_ack_with_data() {
340        let mut pacer = FramePacer::new();
341        pacer.on_ack_needed();
342        pacer.on_state_change();
343
344        // With data pending, should send after collection interval (not delayed ACK)
345        std::thread::sleep(constants::COLLECTION_INTERVAL + Duration::from_millis(1));
346        assert_eq!(pacer.poll(), PacerAction::SendNow);
347    }
348
349    #[test]
350    fn test_pacer_min_interval() {
351        let mut pacer = FramePacer::new();
352        pacer.set_srtt(Duration::from_millis(100)); // 100ms SRTT
353
354        // Min interval should be SRTT/2 = 50ms (greater than 20ms floor)
355        let min_interval = pacer.min_frame_interval();
356        assert!(min_interval >= Duration::from_millis(50));
357    }
358
359    #[test]
360    fn test_pacer_frame_sent_clears_state() {
361        let mut pacer = FramePacer::new();
362        pacer.on_state_change();
363        pacer.on_ack_needed();
364
365        pacer.on_frame_sent();
366
367        // After sending, should be idle
368        assert_eq!(pacer.poll(), PacerAction::Idle);
369    }
370
371    #[test]
372    fn test_retransmit_controller() {
373        let mut controller = RetransmitController::new(Duration::from_millis(100));
374
375        // Should retransmit immediately for unacked data
376        assert!(controller.should_retransmit(true));
377        assert!(!controller.should_retransmit(false));
378
379        // After retransmit, should wait
380        controller.on_retransmit();
381        assert!(!controller.should_retransmit(true)); // Need to wait for timeout
382
383        // After ACK, should reset
384        controller.on_ack();
385        assert_eq!(controller.retransmit_count(), 0);
386    }
387
388    #[test]
389    fn test_retransmit_max_attempts() {
390        let mut controller = RetransmitController::new(Duration::from_millis(1));
391
392        for _ in 0..constants::MAX_RETRANSMITS {
393            controller.on_retransmit();
394        }
395
396        assert!(controller.is_failed());
397        assert!(!controller.should_retransmit(true));
398    }
399
400    #[test]
401    fn test_keepalive_check() {
402        let pacer = FramePacer::new();
403
404        // No frames sent yet, no keepalive needed
405        assert!(!pacer.needs_keepalive(Instant::now()));
406    }
407
408    #[test]
409    fn test_connection_dead() {
410        let pacer = FramePacer::new();
411
412        // Recent activity, not dead
413        assert!(!pacer.is_connection_dead(Instant::now()));
414
415        // Very old activity would be dead
416        // (Can't easily test without mocking time)
417    }
418}