1use std::time::{Duration, Instant};
7
8use crate::mmp::algorithms::{JitterEstimator, OwdTrendDetector};
9use crate::mmp::report::ReceiverReport;
10use crate::mmp::{
11 COLD_START_SAMPLES, DEFAULT_COLD_START_INTERVAL_MS, DEFAULT_OWD_WINDOW_SIZE,
12 MAX_REPORT_INTERVAL_MS, MIN_REPORT_INTERVAL_MS,
13};
14
15const REKEY_JITTER_GRACE_SECS: u64 = 15;
22
23struct GapTracker {
32 expected_next: Option<u64>,
34 in_burst: bool,
36 current_burst_len: u16,
38
39 burst_count: u32,
42 max_burst_len: u16,
44 total_burst_len: u64,
46}
47
48impl GapTracker {
49 fn new() -> Self {
50 Self {
51 expected_next: None,
52 in_burst: false,
53 current_burst_len: 0,
54 burst_count: 0,
55 max_burst_len: 0,
56 total_burst_len: 0,
57 }
58 }
59
60 fn observe(&mut self, counter: u64) -> u64 {
63 let Some(expected) = self.expected_next else {
64 self.expected_next = Some(counter + 1);
66 return 0;
67 };
68
69 let lost = if counter > expected {
70 let gap = counter - expected;
72 if self.in_burst {
73 self.current_burst_len = self.current_burst_len.saturating_add(gap as u16);
75 } else {
76 self.in_burst = true;
78 self.current_burst_len = gap as u16;
79 self.burst_count += 1;
80 }
81 gap
82 } else {
83 if self.in_burst {
85 self.finish_burst();
87 }
88 0
89 };
90
91 if counter >= expected {
94 self.expected_next = Some(counter + 1);
95 }
96
97 lost
98 }
99
100 fn finish_burst(&mut self) {
102 if self.in_burst {
103 self.max_burst_len = self.max_burst_len.max(self.current_burst_len);
104 self.total_burst_len += self.current_burst_len as u64;
105 self.in_burst = false;
106 self.current_burst_len = 0;
107 }
108 }
109
110 fn take_interval_stats(&mut self) -> (u32, u16, u16) {
112 self.finish_burst();
114
115 let count = self.burst_count;
116 let max_len = self.max_burst_len;
117 let mean_len = if count > 0 {
118 let mean_f = (self.total_burst_len as f64) / (count as f64);
120 (mean_f * 256.0) as u16
121 } else {
122 0
123 };
124
125 self.burst_count = 0;
127 self.max_burst_len = 0;
128 self.total_burst_len = 0;
129
130 (count, max_len, mean_len)
131 }
132}
133
134pub struct ReceiverState {
142 cumulative_packets_recv: u64,
144 cumulative_bytes_recv: u64,
145 cumulative_reorder_count: u64,
146
147 highest_counter: u64,
149
150 interval_packets_recv: u32,
152 interval_bytes_recv: u32,
153
154 jitter: JitterEstimator,
156
157 owd_trend: OwdTrendDetector,
159 owd_seq: u32,
161
162 gap_tracker: GapTracker,
164
165 ecn_ce_count: u32,
167
168 last_sender_timestamp: u32,
171 last_recv_time: Option<Instant>,
173
174 rekey_jitter_grace_until: Option<Instant>,
178
179 last_report_time: Option<Instant>,
181 report_interval: Duration,
182 interval_has_data: bool,
184
185 srtt_sample_count: u32,
188}
189
190impl ReceiverState {
191 pub fn new(owd_window_size: usize) -> Self {
192 Self::new_with_cold_start(owd_window_size, DEFAULT_COLD_START_INTERVAL_MS)
193 }
194
195 pub fn new_with_cold_start(owd_window_size: usize, cold_start_ms: u64) -> Self {
200 Self {
201 cumulative_packets_recv: 0,
202 cumulative_bytes_recv: 0,
203 cumulative_reorder_count: 0,
204 highest_counter: 0,
205 interval_packets_recv: 0,
206 interval_bytes_recv: 0,
207 jitter: JitterEstimator::new(),
208 owd_trend: OwdTrendDetector::new(owd_window_size),
209 owd_seq: 0,
210 gap_tracker: GapTracker::new(),
211 ecn_ce_count: 0,
212 last_sender_timestamp: 0,
213 last_recv_time: None,
214 rekey_jitter_grace_until: None,
215 last_report_time: None,
216 report_interval: Duration::from_millis(cold_start_ms),
217 interval_has_data: false,
218 srtt_sample_count: 0,
219 }
220 }
221
222 pub fn reset_for_rekey(&mut self, now: Instant) {
228 self.highest_counter = 0;
229 self.cumulative_reorder_count = 0;
230 self.gap_tracker = GapTracker::new();
231 self.interval_packets_recv = 0;
232 self.interval_bytes_recv = 0;
233 self.jitter = JitterEstimator::new();
234 self.owd_trend.clear();
235 self.owd_seq = 0;
236 self.last_sender_timestamp = 0;
237 self.last_recv_time = None;
238 self.rekey_jitter_grace_until = Some(now + Duration::from_secs(REKEY_JITTER_GRACE_SECS));
239 self.ecn_ce_count = 0;
240 self.interval_has_data = false;
241 }
244
245 pub fn record_recv(
255 &mut self,
256 counter: u64,
257 sender_timestamp_ms: u32,
258 bytes: usize,
259 ce_flag: bool,
260 now: Instant,
261 ) {
262 self.interval_has_data = true;
263 self.cumulative_packets_recv += 1;
264 self.cumulative_bytes_recv += bytes as u64;
265 self.interval_packets_recv = self.interval_packets_recv.saturating_add(1);
266 self.interval_bytes_recv = self.interval_bytes_recv.saturating_add(bytes as u32);
267
268 if counter < self.highest_counter {
270 self.cumulative_reorder_count += 1;
271 } else {
272 self.highest_counter = counter;
273 }
274
275 let _lost = self.gap_tracker.observe(counter);
277
278 if ce_flag {
280 self.ecn_ce_count = self.ecn_ce_count.saturating_add(1);
281 }
282
283 let sender_us = (sender_timestamp_ms as i64) * 1000;
287 let in_grace = self
291 .rekey_jitter_grace_until
292 .is_some_and(|deadline| now < deadline);
293 if !in_grace {
294 self.rekey_jitter_grace_until = None; if let Some(prev_recv) = self.last_recv_time {
296 let recv_delta_us = now.duration_since(prev_recv).as_micros() as i64;
297 let send_delta_us = sender_us - (self.last_sender_timestamp as i64 * 1000);
298 let transit_delta = (recv_delta_us - send_delta_us) as i32;
299 self.jitter.update(transit_delta);
300 }
301 }
302
303 if let Some(first_recv) = self.last_recv_time.or(Some(now)) {
307 let recv_offset_us = now.duration_since(first_recv).as_micros() as i64;
308 let owd_us = recv_offset_us - sender_us;
309 self.owd_seq = self.owd_seq.wrapping_add(1);
310 self.owd_trend.push(self.owd_seq, owd_us);
311 }
312
313 self.last_sender_timestamp = sender_timestamp_ms;
315 self.last_recv_time = Some(now);
316 }
317
318 pub fn build_report(&mut self, now: Instant) -> Option<ReceiverReport> {
322 if !self.interval_has_data {
323 return None;
324 }
325
326 let dwell_time = self
328 .last_recv_time
329 .map(|t| now.duration_since(t).as_millis() as u16)
330 .unwrap_or(0);
331
332 let (burst_count, max_burst, mean_burst) = self.gap_tracker.take_interval_stats();
333
334 let report = ReceiverReport {
335 highest_counter: self.highest_counter,
336 cumulative_packets_recv: self.cumulative_packets_recv,
337 cumulative_bytes_recv: self.cumulative_bytes_recv,
338 timestamp_echo: self.last_sender_timestamp,
339 dwell_time,
340 max_burst_loss: max_burst,
341 mean_burst_loss: mean_burst,
342 jitter: self.jitter.jitter_us(),
343 ecn_ce_count: self.ecn_ce_count,
344 owd_trend: self.owd_trend.trend_us_per_sec(),
345 burst_loss_count: burst_count,
346 cumulative_reorder_count: self.cumulative_reorder_count as u32,
347 interval_packets_recv: self.interval_packets_recv,
348 interval_bytes_recv: self.interval_bytes_recv,
349 };
350
351 self.interval_packets_recv = 0;
353 self.interval_bytes_recv = 0;
354 self.interval_has_data = false;
355 self.last_report_time = Some(now);
356
357 Some(report)
358 }
359
360 pub fn should_send_report(&self, now: Instant) -> bool {
362 if !self.interval_has_data {
363 return false;
364 }
365 match self.last_report_time {
366 None => true,
367 Some(last) => now.duration_since(last) >= self.report_interval,
368 }
369 }
370
371 pub fn update_report_interval_from_srtt(&mut self, srtt_us: i64) {
378 self.srtt_sample_count = self.srtt_sample_count.saturating_add(1);
379 let floor = if self.srtt_sample_count <= COLD_START_SAMPLES {
380 DEFAULT_COLD_START_INTERVAL_MS
381 } else {
382 MIN_REPORT_INTERVAL_MS
383 };
384 self.update_report_interval_with_bounds(srtt_us, floor, MAX_REPORT_INTERVAL_MS);
385 }
386
387 pub fn update_report_interval_with_bounds(&mut self, srtt_us: i64, min_ms: u64, max_ms: u64) {
392 if srtt_us <= 0 {
393 return;
394 }
395 let interval_ms = ((srtt_us as u64) / 1000).clamp(min_ms, max_ms);
396 self.report_interval = Duration::from_millis(interval_ms);
397 }
398
399 pub fn cumulative_packets_recv(&self) -> u64 {
402 self.cumulative_packets_recv
403 }
404
405 pub fn cumulative_bytes_recv(&self) -> u64 {
406 self.cumulative_bytes_recv
407 }
408
409 pub fn highest_counter(&self) -> u64 {
410 self.highest_counter
411 }
412
413 pub fn jitter_us(&self) -> u32 {
414 self.jitter.jitter_us()
415 }
416
417 pub fn report_interval(&self) -> Duration {
418 self.report_interval
419 }
420
421 pub fn last_recv_time(&self) -> Option<Instant> {
422 self.last_recv_time
423 }
424
425 pub fn ecn_ce_count(&self) -> u32 {
426 self.ecn_ce_count
427 }
428}
429
430impl Default for ReceiverState {
431 fn default() -> Self {
432 Self::new(DEFAULT_OWD_WINDOW_SIZE)
433 }
434}
435
436#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_new_receiver_state() {
446 let r = ReceiverState::new(32);
447 assert_eq!(r.cumulative_packets_recv(), 0);
448 assert_eq!(r.cumulative_bytes_recv(), 0);
449 assert_eq!(r.highest_counter(), 0);
450 }
451
452 #[test]
453 fn test_record_recv_basic() {
454 let mut r = ReceiverState::new(32);
455 let now = Instant::now();
456 r.record_recv(1, 100, 500, false, now);
457 r.record_recv(2, 200, 600, false, now + Duration::from_millis(100));
458
459 assert_eq!(r.cumulative_packets_recv(), 2);
460 assert_eq!(r.cumulative_bytes_recv(), 1100);
461 assert_eq!(r.highest_counter(), 2);
462 }
463
464 #[test]
465 fn test_reorder_detection() {
466 let mut r = ReceiverState::new(32);
467 let now = Instant::now();
468 r.record_recv(5, 500, 100, false, now);
469 r.record_recv(3, 300, 100, false, now + Duration::from_millis(10));
470
471 assert_eq!(r.cumulative_reorder_count, 1);
472 assert_eq!(r.highest_counter(), 5); }
474
475 #[test]
476 fn test_ecn_counting() {
477 let mut r = ReceiverState::new(32);
478 let now = Instant::now();
479 r.record_recv(1, 100, 100, true, now);
480 r.record_recv(2, 200, 100, false, now);
481 r.record_recv(3, 300, 100, true, now);
482
483 assert_eq!(r.ecn_ce_count, 2);
484 }
485
486 #[test]
487 fn test_build_report_empty() {
488 let mut r = ReceiverState::new(32);
489 assert!(r.build_report(Instant::now()).is_none());
490 }
491
492 #[test]
493 fn test_build_report() {
494 let mut r = ReceiverState::new(32);
495 let t0 = Instant::now();
496 r.record_recv(1, 100, 500, false, t0);
497 r.record_recv(2, 200, 600, false, t0 + Duration::from_millis(100));
498
499 let report = r.build_report(t0 + Duration::from_millis(150)).unwrap();
500 assert_eq!(report.highest_counter, 2);
501 assert_eq!(report.cumulative_packets_recv, 2);
502 assert_eq!(report.cumulative_bytes_recv, 1100);
503 assert_eq!(report.timestamp_echo, 200); assert_eq!(report.interval_packets_recv, 2);
505 assert_eq!(report.interval_bytes_recv, 1100);
506 }
507
508 #[test]
509 fn test_build_report_resets_interval() {
510 let mut r = ReceiverState::new(32);
511 let t0 = Instant::now();
512 r.record_recv(1, 100, 500, false, t0);
513 let _ = r.build_report(t0);
514
515 assert!(r.build_report(t0).is_none());
517
518 r.record_recv(2, 200, 300, false, t0 + Duration::from_millis(100));
520 let report = r.build_report(t0 + Duration::from_millis(150)).unwrap();
521 assert_eq!(report.interval_packets_recv, 1);
522 assert_eq!(report.interval_bytes_recv, 300);
523 assert_eq!(report.cumulative_packets_recv, 2);
525 }
526
527 #[test]
528 fn test_gap_tracker_no_loss() {
529 let mut g = GapTracker::new();
530 g.observe(1);
531 g.observe(2);
532 g.observe(3);
533 let (count, max, mean) = g.take_interval_stats();
534 assert_eq!(count, 0);
535 assert_eq!(max, 0);
536 assert_eq!(mean, 0);
537 }
538
539 #[test]
540 fn test_gap_tracker_single_burst() {
541 let mut g = GapTracker::new();
542 g.observe(1);
543 g.observe(4);
545 g.observe(5);
546 let (count, max, _mean) = g.take_interval_stats();
547 assert_eq!(count, 1);
548 assert_eq!(max, 2);
549 }
550
551 #[test]
552 fn test_gap_tracker_multiple_bursts() {
553 let mut g = GapTracker::new();
554 g.observe(1);
555 g.observe(4); g.observe(5);
557 g.observe(8); g.observe(9);
559 let (count, max, mean) = g.take_interval_stats();
560 assert_eq!(count, 2);
561 assert_eq!(max, 2);
562 assert_eq!(mean, 512);
564 }
565
566 #[test]
567 fn test_should_send_report_timing() {
568 let mut r = ReceiverState::new(32);
569 let t0 = Instant::now();
570
571 assert!(!r.should_send_report(t0)); r.record_recv(1, 100, 500, false, t0);
574 assert!(r.should_send_report(t0)); let _ = r.build_report(t0);
577 r.record_recv(2, 200, 500, false, t0);
578 assert!(!r.should_send_report(t0)); let t1 = t0 + r.report_interval() + Duration::from_millis(1);
581 assert!(r.should_send_report(t1));
582 }
583
584 #[test]
585 fn test_update_report_interval_cold_start() {
586 let mut r = ReceiverState::new(32);
587 r.update_report_interval_from_srtt(50_000);
590 assert_eq!(r.report_interval(), Duration::from_millis(200));
591
592 r.update_report_interval_from_srtt(500_000);
594 assert_eq!(r.report_interval(), Duration::from_millis(500));
595 }
596
597 #[test]
598 fn test_update_report_interval_after_cold_start() {
599 let mut r = ReceiverState::new(32);
600 for _ in 0..COLD_START_SAMPLES {
602 r.update_report_interval_from_srtt(500_000);
603 }
604
605 r.update_report_interval_from_srtt(50_000);
608 assert_eq!(
609 r.report_interval(),
610 Duration::from_millis(MIN_REPORT_INTERVAL_MS)
611 );
612
613 r.update_report_interval_from_srtt(3_000_000);
615 assert_eq!(r.report_interval(), Duration::from_millis(3000));
616 }
617
618 #[test]
619 fn test_rekey_jitter_grace_suppresses_spikes() {
620 let mut r = ReceiverState::new(32);
621 let t0 = Instant::now();
622
623 r.record_recv(1, 1000, 100, false, t0);
625 r.record_recv(2, 2000, 100, false, t0 + Duration::from_secs(1));
626 assert_eq!(r.jitter_us(), 0); r.reset_for_rekey(t0 + Duration::from_secs(2));
632
633 r.record_recv(0, 120_000, 100, false, t0 + Duration::from_secs(3));
635 r.record_recv(1, 100, 100, false, t0 + Duration::from_secs(4));
637 assert_eq!(r.jitter_us(), 0);
639
640 let after_grace =
642 t0 + Duration::from_secs(2) + Duration::from_secs(REKEY_JITTER_GRACE_SECS + 1);
643 r.record_recv(2, 200, 100, false, after_grace);
644 r.record_recv(3, 300, 100, false, after_grace + Duration::from_millis(100));
645 assert!(r.jitter_us() < 1_000_000); }
649}