nomad_protocol/transport/
pacing.rs1use std::time::{Duration, Instant};
7
8pub mod constants {
10 use std::time::Duration;
11
12 pub const MIN_FRAME_INTERVAL_FLOOR: Duration = Duration::from_millis(20);
15
16 pub const COLLECTION_INTERVAL: Duration = Duration::from_millis(8);
18
19 pub const DELAYED_ACK_TIMEOUT: Duration = Duration::from_millis(100);
21
22 pub const MAX_FRAME_RATE_HZ: u32 = 50;
24
25 pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(25);
27
28 pub const DEAD_INTERVAL: Duration = Duration::from_secs(60);
30
31 pub const MAX_RETRANSMITS: u32 = 10;
33
34 pub const RETRANSMIT_BACKOFF: u32 = 2;
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SendReason {
41 StateChange,
43 Ack,
45 Keepalive,
47 Retransmit,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum PacerAction {
54 SendNow,
56 WaitUntil(Instant),
58 Idle,
60}
61
62#[derive(Debug, Clone)]
70pub struct FramePacer {
71 last_frame_sent: Option<Instant>,
73 state_change_time: Option<Instant>,
75 ack_pending_since: Option<Instant>,
77 data_pending: bool,
79 srtt_ms: f64,
81}
82
83impl Default for FramePacer {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl FramePacer {
90 pub fn new() -> Self {
92 Self {
93 last_frame_sent: None,
94 state_change_time: None,
95 ack_pending_since: None,
96 data_pending: false,
97 srtt_ms: 0.0,
98 }
99 }
100
101 pub fn set_srtt(&mut self, srtt: Duration) {
103 self.srtt_ms = srtt.as_secs_f64() * 1000.0;
104 }
105
106 pub fn on_state_change(&mut self) {
108 if self.state_change_time.is_none() {
109 self.state_change_time = Some(Instant::now());
110 }
111 self.data_pending = true;
112 }
113
114 pub fn on_ack_needed(&mut self) {
116 if self.ack_pending_since.is_none() {
117 self.ack_pending_since = Some(Instant::now());
118 }
119 }
120
121 pub fn on_frame_sent(&mut self) {
123 self.last_frame_sent = Some(Instant::now());
124 self.state_change_time = None;
125 self.ack_pending_since = None;
126 self.data_pending = false;
127 }
128
129 pub fn clear_pending(&mut self) {
131 self.data_pending = false;
132 self.state_change_time = None;
133 }
134
135 fn min_frame_interval(&self) -> Duration {
137 let srtt_half_ms = self.srtt_ms / 2.0;
138 let floor_ms = constants::MIN_FRAME_INTERVAL_FLOOR.as_millis() as f64;
139 let interval_ms = f64::max(srtt_half_ms, floor_ms);
140
141 let max_interval_ms = 1000.0 / constants::MAX_FRAME_RATE_HZ as f64;
143 let interval_ms = f64::max(interval_ms, max_interval_ms);
144
145 Duration::from_secs_f64(interval_ms / 1000.0)
146 }
147
148 pub fn poll(&self) -> PacerAction {
150 let now = Instant::now();
151
152 let needs_send = self.data_pending || self.ack_pending_since.is_some();
154 if !needs_send {
155 return PacerAction::Idle;
156 }
157
158 if let Some(last_sent) = self.last_frame_sent {
160 let min_interval = self.min_frame_interval();
161 let next_allowed = last_sent + min_interval;
162 if now < next_allowed {
163 return PacerAction::WaitUntil(next_allowed);
164 }
165 }
166
167 if let Some(state_time) = self.state_change_time {
169 let collection_end = state_time + constants::COLLECTION_INTERVAL;
170 if now < collection_end && self.ack_pending_since.is_none() {
171 return PacerAction::WaitUntil(collection_end);
173 }
174 }
175
176 if !self.data_pending
178 && let Some(ack_time) = self.ack_pending_since
179 {
180 let ack_deadline = ack_time + constants::DELAYED_ACK_TIMEOUT;
181 if now < ack_deadline {
182 return PacerAction::WaitUntil(ack_deadline);
184 }
185 }
186
187 PacerAction::SendNow
189 }
190
191 pub fn needs_keepalive(&self, last_received: Instant) -> bool {
193 if let Some(last_sent) = self.last_frame_sent {
194 let now = Instant::now();
195 let since_sent = now.duration_since(last_sent);
196 let since_received = now.duration_since(last_received);
197
198 since_sent >= constants::KEEPALIVE_INTERVAL
201 && since_received < constants::DEAD_INTERVAL
202 } else {
203 false
204 }
205 }
206
207 pub fn is_connection_dead(&self, last_received: Instant) -> bool {
209 Instant::now().duration_since(last_received) >= constants::DEAD_INTERVAL
210 }
211}
212
213#[derive(Debug, Clone)]
217pub struct RetransmitController {
218 retransmit_count: u32,
220 last_retransmit: Option<Instant>,
222 current_timeout: Duration,
224 base_rto: Duration,
226}
227
228impl RetransmitController {
229 pub fn new(initial_rto: Duration) -> Self {
231 Self {
232 retransmit_count: 0,
233 last_retransmit: None,
234 current_timeout: initial_rto,
235 base_rto: initial_rto,
236 }
237 }
238
239 pub fn set_rto(&mut self, rto: Duration) {
241 self.base_rto = rto;
242 if self.retransmit_count == 0 {
244 self.current_timeout = rto;
245 }
246 }
247
248 pub fn should_retransmit(&self, unacked_data: bool) -> bool {
250 if !unacked_data {
251 return false;
252 }
253
254 if self.retransmit_count >= constants::MAX_RETRANSMITS {
255 return false; }
257
258 match self.last_retransmit {
259 Some(last) => Instant::now().duration_since(last) >= self.current_timeout,
260 None => true, }
262 }
263
264 pub fn on_retransmit(&mut self) {
266 self.retransmit_count += 1;
267 self.last_retransmit = Some(Instant::now());
268
269 let new_timeout = self.current_timeout * constants::RETRANSMIT_BACKOFF;
271 self.current_timeout = new_timeout.min(super::timing::constants::MAX_RTO);
272 }
273
274 pub fn on_ack(&mut self) {
276 self.retransmit_count = 0;
277 self.last_retransmit = None;
278 self.current_timeout = self.base_rto;
279 }
280
281 pub fn retransmit_count(&self) -> u32 {
283 self.retransmit_count
284 }
285
286 pub fn is_failed(&self) -> bool {
288 self.retransmit_count >= constants::MAX_RETRANSMITS
289 }
290
291 pub fn time_until_retransmit(&self) -> Option<Duration> {
293 self.last_retransmit.map(|last| {
294 let elapsed = Instant::now().duration_since(last);
295 self.current_timeout.saturating_sub(elapsed)
296 })
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_pacer_initial_state() {
306 let pacer = FramePacer::new();
307 assert_eq!(pacer.poll(), PacerAction::Idle);
308 }
309
310 #[test]
311 fn test_pacer_state_change() {
312 let mut pacer = FramePacer::new();
313 pacer.on_state_change();
314
315 match pacer.poll() {
317 PacerAction::WaitUntil(_) => {}
318 other => panic!("Expected WaitUntil, got {:?}", other),
319 }
320
321 std::thread::sleep(constants::COLLECTION_INTERVAL + Duration::from_millis(1));
323 assert_eq!(pacer.poll(), PacerAction::SendNow);
324 }
325
326 #[test]
327 fn test_pacer_ack_only() {
328 let mut pacer = FramePacer::new();
329 pacer.on_ack_needed();
330
331 match pacer.poll() {
333 PacerAction::WaitUntil(_) => {}
334 other => panic!("Expected WaitUntil, got {:?}", other),
335 }
336 }
337
338 #[test]
339 fn test_pacer_ack_with_data() {
340 let mut pacer = FramePacer::new();
341 pacer.on_ack_needed();
342 pacer.on_state_change();
343
344 std::thread::sleep(constants::COLLECTION_INTERVAL + Duration::from_millis(1));
346 assert_eq!(pacer.poll(), PacerAction::SendNow);
347 }
348
349 #[test]
350 fn test_pacer_min_interval() {
351 let mut pacer = FramePacer::new();
352 pacer.set_srtt(Duration::from_millis(100)); let min_interval = pacer.min_frame_interval();
356 assert!(min_interval >= Duration::from_millis(50));
357 }
358
359 #[test]
360 fn test_pacer_frame_sent_clears_state() {
361 let mut pacer = FramePacer::new();
362 pacer.on_state_change();
363 pacer.on_ack_needed();
364
365 pacer.on_frame_sent();
366
367 assert_eq!(pacer.poll(), PacerAction::Idle);
369 }
370
371 #[test]
372 fn test_retransmit_controller() {
373 let mut controller = RetransmitController::new(Duration::from_millis(100));
374
375 assert!(controller.should_retransmit(true));
377 assert!(!controller.should_retransmit(false));
378
379 controller.on_retransmit();
381 assert!(!controller.should_retransmit(true)); controller.on_ack();
385 assert_eq!(controller.retransmit_count(), 0);
386 }
387
388 #[test]
389 fn test_retransmit_max_attempts() {
390 let mut controller = RetransmitController::new(Duration::from_millis(1));
391
392 for _ in 0..constants::MAX_RETRANSMITS {
393 controller.on_retransmit();
394 }
395
396 assert!(controller.is_failed());
397 assert!(!controller.should_retransmit(true));
398 }
399
400 #[test]
401 fn test_keepalive_check() {
402 let pacer = FramePacer::new();
403
404 assert!(!pacer.needs_keepalive(Instant::now()));
406 }
407
408 #[test]
409 fn test_connection_dead() {
410 let pacer = FramePacer::new();
411
412 assert!(!pacer.is_connection_dead(Instant::now()));
414
415 }
418}