1use std::time::{Duration, Instant};
7
8use crate::mmp::algorithms::{JitterEstimator, OwdTrendDetector};
9use crate::mmp::report::ReceiverReport;
10use crate::mmp::{
11 COLD_START_SAMPLES, DEFAULT_COLD_START_INTERVAL_MS, DEFAULT_OWD_WINDOW_SIZE,
12 MAX_REPORT_INTERVAL_MS, MIN_REPORT_INTERVAL_MS,
13};
14
15const REKEY_JITTER_GRACE_SECS: u64 = 15;
22
23struct GapTracker {
32 expected_next: Option<u64>,
34 in_burst: bool,
36 current_burst_len: u16,
38
39 burst_count: u32,
42 max_burst_len: u16,
44 total_burst_len: u64,
46}
47
48impl GapTracker {
49 fn new() -> Self {
50 Self {
51 expected_next: None,
52 in_burst: false,
53 current_burst_len: 0,
54 burst_count: 0,
55 max_burst_len: 0,
56 total_burst_len: 0,
57 }
58 }
59
60 fn observe(&mut self, counter: u64) -> u64 {
63 let Some(expected) = self.expected_next else {
64 self.expected_next = Some(counter + 1);
66 return 0;
67 };
68
69 let lost = if counter > expected {
70 let gap = counter - expected;
72 if self.in_burst {
73 self.current_burst_len = self.current_burst_len.saturating_add(gap as u16);
75 } else {
76 self.in_burst = true;
78 self.current_burst_len = gap as u16;
79 self.burst_count += 1;
80 }
81 gap
82 } else {
83 if self.in_burst {
85 self.finish_burst();
87 }
88 0
89 };
90
91 if counter >= expected {
94 self.expected_next = Some(counter + 1);
95 }
96
97 lost
98 }
99
100 fn finish_burst(&mut self) {
102 if self.in_burst {
103 self.max_burst_len = self.max_burst_len.max(self.current_burst_len);
104 self.total_burst_len += self.current_burst_len as u64;
105 self.in_burst = false;
106 self.current_burst_len = 0;
107 }
108 }
109
110 fn take_interval_stats(&mut self) -> (u32, u16, u16) {
112 self.finish_burst();
114
115 let count = self.burst_count;
116 let max_len = self.max_burst_len;
117 let mean_len = if count > 0 {
118 let mean_f = (self.total_burst_len as f64) / (count as f64);
120 (mean_f * 256.0) as u16
121 } else {
122 0
123 };
124
125 self.burst_count = 0;
127 self.max_burst_len = 0;
128 self.total_burst_len = 0;
129
130 (count, max_len, mean_len)
131 }
132}
133
134pub struct ReceiverState {
142 cumulative_packets_recv: u64,
144 cumulative_bytes_recv: u64,
145 cumulative_reorder_count: u64,
146
147 highest_counter: u64,
149
150 interval_packets_recv: u32,
152 interval_bytes_recv: u32,
153
154 jitter: JitterEstimator,
156
157 owd_trend: OwdTrendDetector,
159 owd_seq: u32,
161
162 gap_tracker: GapTracker,
164
165 ecn_ce_count: u32,
167
168 last_sender_timestamp: u32,
171 last_recv_time: Option<Instant>,
173
174 rekey_jitter_grace_until: Option<Instant>,
178
179 last_report_time: Option<Instant>,
181 report_interval: Duration,
182 interval_has_data: bool,
184
185 srtt_sample_count: u32,
188}
189
190impl ReceiverState {
191 pub fn new(owd_window_size: usize) -> Self {
192 Self::new_with_cold_start(owd_window_size, DEFAULT_COLD_START_INTERVAL_MS)
193 }
194
195 pub fn new_with_cold_start(owd_window_size: usize, cold_start_ms: u64) -> Self {
200 Self {
201 cumulative_packets_recv: 0,
202 cumulative_bytes_recv: 0,
203 cumulative_reorder_count: 0,
204 highest_counter: 0,
205 interval_packets_recv: 0,
206 interval_bytes_recv: 0,
207 jitter: JitterEstimator::new(),
208 owd_trend: OwdTrendDetector::new(owd_window_size),
209 owd_seq: 0,
210 gap_tracker: GapTracker::new(),
211 ecn_ce_count: 0,
212 last_sender_timestamp: 0,
213 last_recv_time: None,
214 rekey_jitter_grace_until: None,
215 last_report_time: None,
216 report_interval: Duration::from_millis(cold_start_ms),
217 interval_has_data: false,
218 srtt_sample_count: 0,
219 }
220 }
221
222 pub fn reset_for_rekey(&mut self, now: Instant) {
228 self.highest_counter = 0;
229 self.cumulative_reorder_count = 0;
230 self.gap_tracker = GapTracker::new();
231 self.interval_packets_recv = 0;
232 self.interval_bytes_recv = 0;
233 self.jitter = JitterEstimator::new();
234 self.owd_trend.clear();
235 self.owd_seq = 0;
236 self.last_sender_timestamp = 0;
237 self.last_recv_time = None;
238 self.rekey_jitter_grace_until = Some(now + Duration::from_secs(REKEY_JITTER_GRACE_SECS));
239 self.ecn_ce_count = 0;
240 self.interval_has_data = false;
241 }
244
245 pub fn record_recv(
255 &mut self,
256 counter: u64,
257 sender_timestamp_ms: u32,
258 bytes: usize,
259 ce_flag: bool,
260 now: Instant,
261 ) {
262 self.interval_has_data = true;
263 self.cumulative_packets_recv += 1;
264 self.cumulative_bytes_recv += bytes as u64;
265 self.interval_packets_recv = self.interval_packets_recv.saturating_add(1);
266 self.interval_bytes_recv = self.interval_bytes_recv.saturating_add(bytes as u32);
267
268 if counter < self.highest_counter {
270 self.cumulative_reorder_count += 1;
271 } else {
272 self.highest_counter = counter;
273 }
274
275 let _lost = self.gap_tracker.observe(counter);
277
278 if ce_flag {
280 self.ecn_ce_count = self.ecn_ce_count.saturating_add(1);
281 }
282
283 let sender_us = (sender_timestamp_ms as i64) * 1000;
287 let in_grace = self
291 .rekey_jitter_grace_until
292 .is_some_and(|deadline| now < deadline);
293 if !in_grace {
294 self.rekey_jitter_grace_until = None; if let Some(prev_recv) = self.last_recv_time {
296 let recv_delta_us = now.duration_since(prev_recv).as_micros() as i64;
297 let send_delta_us = sender_us - (self.last_sender_timestamp as i64 * 1000);
298 let transit_delta = (recv_delta_us - send_delta_us) as i32;
299 self.jitter.update(transit_delta);
300 }
301 }
302
303 if let Some(first_recv) = self.last_recv_time.or(Some(now)) {
307 let recv_offset_us = now.duration_since(first_recv).as_micros() as i64;
308 let owd_us = recv_offset_us - sender_us;
309 self.owd_seq = self.owd_seq.wrapping_add(1);
310 self.owd_trend.push(self.owd_seq, owd_us);
311 }
312
313 self.last_sender_timestamp = sender_timestamp_ms;
315 self.last_recv_time = Some(now);
316 }
317
318 pub fn build_report(&mut self, now: Instant) -> Option<ReceiverReport> {
322 if !self.interval_has_data {
323 return None;
324 }
325
326 let (timestamp_echo, dwell_time) = self
330 .last_recv_time
331 .map(|t| {
332 let dwell_ms = now.duration_since(t).as_millis();
333 if dwell_ms > u128::from(u16::MAX) {
334 (0, u16::MAX)
335 } else {
336 (self.last_sender_timestamp, dwell_ms as u16)
337 }
338 })
339 .unwrap_or((0, 0));
340
341 let (burst_count, max_burst, mean_burst) = self.gap_tracker.take_interval_stats();
342
343 let report = ReceiverReport {
344 highest_counter: self.highest_counter,
345 cumulative_packets_recv: self.cumulative_packets_recv,
346 cumulative_bytes_recv: self.cumulative_bytes_recv,
347 timestamp_echo,
348 dwell_time,
349 max_burst_loss: max_burst,
350 mean_burst_loss: mean_burst,
351 jitter: self.jitter.jitter_us(),
352 ecn_ce_count: self.ecn_ce_count,
353 owd_trend: self.owd_trend.trend_us_per_sec(),
354 burst_loss_count: burst_count,
355 cumulative_reorder_count: self.cumulative_reorder_count as u32,
356 interval_packets_recv: self.interval_packets_recv,
357 interval_bytes_recv: self.interval_bytes_recv,
358 };
359
360 self.interval_packets_recv = 0;
362 self.interval_bytes_recv = 0;
363 self.interval_has_data = false;
364 self.last_report_time = Some(now);
365
366 Some(report)
367 }
368
369 pub fn should_send_report(&self, now: Instant) -> bool {
371 if !self.interval_has_data {
372 return false;
373 }
374 match self.last_report_time {
375 None => true,
376 Some(last) => now.duration_since(last) >= self.report_interval,
377 }
378 }
379
380 pub fn update_report_interval_from_srtt(&mut self, srtt_us: i64) {
387 self.srtt_sample_count = self.srtt_sample_count.saturating_add(1);
388 let floor = if self.srtt_sample_count <= COLD_START_SAMPLES {
389 DEFAULT_COLD_START_INTERVAL_MS
390 } else {
391 MIN_REPORT_INTERVAL_MS
392 };
393 self.update_report_interval_with_bounds(srtt_us, floor, MAX_REPORT_INTERVAL_MS);
394 }
395
396 pub fn update_report_interval_with_bounds(&mut self, srtt_us: i64, min_ms: u64, max_ms: u64) {
401 if srtt_us <= 0 {
402 return;
403 }
404 let interval_ms = ((srtt_us as u64) / 1000).clamp(min_ms, max_ms);
405 self.report_interval = Duration::from_millis(interval_ms);
406 }
407
408 pub fn cumulative_packets_recv(&self) -> u64 {
411 self.cumulative_packets_recv
412 }
413
414 pub fn cumulative_bytes_recv(&self) -> u64 {
415 self.cumulative_bytes_recv
416 }
417
418 pub fn highest_counter(&self) -> u64 {
419 self.highest_counter
420 }
421
422 pub fn jitter_us(&self) -> u32 {
423 self.jitter.jitter_us()
424 }
425
426 pub fn report_interval(&self) -> Duration {
427 self.report_interval
428 }
429
430 pub fn last_recv_time(&self) -> Option<Instant> {
431 self.last_recv_time
432 }
433
434 pub fn ecn_ce_count(&self) -> u32 {
435 self.ecn_ce_count
436 }
437}
438
439impl Default for ReceiverState {
440 fn default() -> Self {
441 Self::new(DEFAULT_OWD_WINDOW_SIZE)
442 }
443}
444
445#[cfg(test)]
450mod tests {
451 use super::*;
452
453 #[test]
454 fn test_new_receiver_state() {
455 let r = ReceiverState::new(32);
456 assert_eq!(r.cumulative_packets_recv(), 0);
457 assert_eq!(r.cumulative_bytes_recv(), 0);
458 assert_eq!(r.highest_counter(), 0);
459 }
460
461 #[test]
462 fn test_record_recv_basic() {
463 let mut r = ReceiverState::new(32);
464 let now = Instant::now();
465 r.record_recv(1, 100, 500, false, now);
466 r.record_recv(2, 200, 600, false, now + Duration::from_millis(100));
467
468 assert_eq!(r.cumulative_packets_recv(), 2);
469 assert_eq!(r.cumulative_bytes_recv(), 1100);
470 assert_eq!(r.highest_counter(), 2);
471 }
472
473 #[test]
474 fn test_reorder_detection() {
475 let mut r = ReceiverState::new(32);
476 let now = Instant::now();
477 r.record_recv(5, 500, 100, false, now);
478 r.record_recv(3, 300, 100, false, now + Duration::from_millis(10));
479
480 assert_eq!(r.cumulative_reorder_count, 1);
481 assert_eq!(r.highest_counter(), 5); }
483
484 #[test]
485 fn test_ecn_counting() {
486 let mut r = ReceiverState::new(32);
487 let now = Instant::now();
488 r.record_recv(1, 100, 100, true, now);
489 r.record_recv(2, 200, 100, false, now);
490 r.record_recv(3, 300, 100, true, now);
491
492 assert_eq!(r.ecn_ce_count, 2);
493 }
494
495 #[test]
496 fn test_build_report_empty() {
497 let mut r = ReceiverState::new(32);
498 assert!(r.build_report(Instant::now()).is_none());
499 }
500
501 #[test]
502 fn test_build_report() {
503 let mut r = ReceiverState::new(32);
504 let t0 = Instant::now();
505 r.record_recv(1, 100, 500, false, t0);
506 r.record_recv(2, 200, 600, false, t0 + Duration::from_millis(100));
507
508 let report = r.build_report(t0 + Duration::from_millis(150)).unwrap();
509 assert_eq!(report.highest_counter, 2);
510 assert_eq!(report.cumulative_packets_recv, 2);
511 assert_eq!(report.cumulative_bytes_recv, 1100);
512 assert_eq!(report.timestamp_echo, 200); assert_eq!(report.interval_packets_recv, 2);
514 assert_eq!(report.interval_bytes_recv, 1100);
515 }
516
517 #[test]
518 fn test_build_report_suppresses_rtt_echo_when_dwell_overflows() {
519 let mut r = ReceiverState::new(32);
520 let t0 = Instant::now();
521 r.record_recv(1, 100, 500, false, t0);
522
523 let report = r
524 .build_report(t0 + Duration::from_millis(u64::from(u16::MAX) + 1))
525 .unwrap();
526
527 assert_eq!(report.timestamp_echo, 0);
528 assert_eq!(report.dwell_time, u16::MAX);
529 assert_eq!(report.cumulative_packets_recv, 1);
530 }
531
532 #[test]
533 fn test_build_report_resets_interval() {
534 let mut r = ReceiverState::new(32);
535 let t0 = Instant::now();
536 r.record_recv(1, 100, 500, false, t0);
537 let _ = r.build_report(t0);
538
539 assert!(r.build_report(t0).is_none());
541
542 r.record_recv(2, 200, 300, false, t0 + Duration::from_millis(100));
544 let report = r.build_report(t0 + Duration::from_millis(150)).unwrap();
545 assert_eq!(report.interval_packets_recv, 1);
546 assert_eq!(report.interval_bytes_recv, 300);
547 assert_eq!(report.cumulative_packets_recv, 2);
549 }
550
551 #[test]
552 fn test_gap_tracker_no_loss() {
553 let mut g = GapTracker::new();
554 g.observe(1);
555 g.observe(2);
556 g.observe(3);
557 let (count, max, mean) = g.take_interval_stats();
558 assert_eq!(count, 0);
559 assert_eq!(max, 0);
560 assert_eq!(mean, 0);
561 }
562
563 #[test]
564 fn test_gap_tracker_single_burst() {
565 let mut g = GapTracker::new();
566 g.observe(1);
567 g.observe(4);
569 g.observe(5);
570 let (count, max, _mean) = g.take_interval_stats();
571 assert_eq!(count, 1);
572 assert_eq!(max, 2);
573 }
574
575 #[test]
576 fn test_gap_tracker_multiple_bursts() {
577 let mut g = GapTracker::new();
578 g.observe(1);
579 g.observe(4); g.observe(5);
581 g.observe(8); g.observe(9);
583 let (count, max, mean) = g.take_interval_stats();
584 assert_eq!(count, 2);
585 assert_eq!(max, 2);
586 assert_eq!(mean, 512);
588 }
589
590 #[test]
591 fn test_should_send_report_timing() {
592 let mut r = ReceiverState::new(32);
593 let t0 = Instant::now();
594
595 assert!(!r.should_send_report(t0)); r.record_recv(1, 100, 500, false, t0);
598 assert!(r.should_send_report(t0)); let _ = r.build_report(t0);
601 r.record_recv(2, 200, 500, false, t0);
602 assert!(!r.should_send_report(t0)); let t1 = t0 + r.report_interval() + Duration::from_millis(1);
605 assert!(r.should_send_report(t1));
606 }
607
608 #[test]
609 fn test_update_report_interval_cold_start() {
610 let mut r = ReceiverState::new(32);
611 r.update_report_interval_from_srtt(50_000);
614 assert_eq!(r.report_interval(), Duration::from_millis(200));
615
616 r.update_report_interval_from_srtt(500_000);
618 assert_eq!(r.report_interval(), Duration::from_millis(500));
619 }
620
621 #[test]
622 fn test_update_report_interval_after_cold_start() {
623 let mut r = ReceiverState::new(32);
624 for _ in 0..COLD_START_SAMPLES {
626 r.update_report_interval_from_srtt(500_000);
627 }
628
629 r.update_report_interval_from_srtt(50_000);
632 assert_eq!(
633 r.report_interval(),
634 Duration::from_millis(MIN_REPORT_INTERVAL_MS)
635 );
636
637 r.update_report_interval_from_srtt(3_000_000);
639 assert_eq!(r.report_interval(), Duration::from_millis(3000));
640 }
641
642 #[test]
643 fn test_rekey_jitter_grace_suppresses_spikes() {
644 let mut r = ReceiverState::new(32);
645 let t0 = Instant::now();
646
647 r.record_recv(1, 1000, 100, false, t0);
649 r.record_recv(2, 2000, 100, false, t0 + Duration::from_secs(1));
650 assert_eq!(r.jitter_us(), 0); r.reset_for_rekey(t0 + Duration::from_secs(2));
656
657 r.record_recv(0, 120_000, 100, false, t0 + Duration::from_secs(3));
659 r.record_recv(1, 100, 100, false, t0 + Duration::from_secs(4));
661 assert_eq!(r.jitter_us(), 0);
663
664 let after_grace =
666 t0 + Duration::from_secs(2) + Duration::from_secs(REKEY_JITTER_GRACE_SECS + 1);
667 r.record_recv(2, 200, 100, false, after_grace);
668 r.record_recv(3, 300, 100, false, after_grace + Duration::from_millis(100));
669 assert!(r.jitter_us() < 1_000_000); }
673}