1use std::collections::vec_deque;
2use std::collections::VecDeque;
3use std::fmt;
4use std::ops::RangeInclusive;
5use std::time::{Duration, Instant};
6
7use super::{extend_u16, FeedbackMessageType, RtcpHeader, RtcpPacket};
8use super::{RtcpType, SeqNo, Ssrc, TransportType};
9
10#[derive(Clone, PartialEq, Eq)]
14pub struct Twcc {
15 pub sender_ssrc: Ssrc,
17 pub ssrc: Ssrc,
19 pub base_seq: u16,
21 pub status_count: u16,
23 pub reference_time: u32, pub feedback_count: u8, pub chunks: VecDeque<PacketChunk>,
29 pub delta: VecDeque<Delta>,
31}
32
33impl Twcc {
34 fn chunks_byte_len(&self) -> usize {
35 self.chunks.len() * 2
36 }
37
38 fn delta_byte_len(&self) -> usize {
39 self.delta.iter().map(|d| d.byte_len()).sum()
40 }
41
42 pub fn into_iter(self, time_zero: Instant, extend_from: SeqNo) -> TwccIter {
44 let millis = self.reference_time as u64 * 64;
45 let time_base = time_zero + Duration::from_millis(millis);
46 let base_seq = extend_u16(Some(*extend_from), self.base_seq);
47 TwccIter {
48 base_seq,
49 time_base,
50 index: 0,
51 twcc: self,
52 }
53 }
54}
55
56pub struct TwccIter {
57 base_seq: u64,
58 time_base: Instant,
59 index: usize,
60 twcc: Twcc,
61}
62
63impl Iterator for TwccIter {
64 type Item = (SeqNo, PacketStatus, Option<Instant>);
65
66 fn next(&mut self) -> Option<Self::Item> {
67 let head = self.twcc.chunks.front()?;
68
69 let (status, amount) = match head {
70 PacketChunk::Run(s, n) => {
71 use PacketStatus::*;
72 let status = match s {
73 NotReceived | Unknown => NotReceived,
74 ReceivedSmallDelta => ReceivedSmallDelta,
75 PacketStatus::ReceivedLargeOrNegativeDelta => ReceivedLargeOrNegativeDelta,
76 };
77 (status, *n)
78 }
79 PacketChunk::VectorSingle(v, n) => {
80 let status = if 1 << (13 - self.index) & v > 0 {
81 PacketStatus::ReceivedSmallDelta
82 } else {
83 PacketStatus::NotReceived
84 };
85 (status, *n)
86 }
87 PacketChunk::VectorDouble(v, n) => {
88 let e = ((v >> (12 - self.index * 2)) & 0b11) as u8;
89 let status = PacketStatus::from(e);
90 (status, *n)
91 }
92 };
93
94 let instant = match status {
95 PacketStatus::NotReceived => None,
96 PacketStatus::ReceivedSmallDelta => match self.twcc.delta.pop_front()? {
97 Delta::Small(v) => Some(self.time_base + Duration::from_micros(250 * v as u64)),
98 Delta::Large(_) => panic!("Incorrect large delta size"),
99 },
100 PacketStatus::ReceivedLargeOrNegativeDelta => match self.twcc.delta.pop_front()? {
101 Delta::Small(_) => panic!("Incorrect small delta size"),
102 Delta::Large(v) => {
103 let dur = Duration::from_micros(250 * v.unsigned_abs() as u64);
104 Some(if v < 0 {
105 self.time_base.checked_sub(dur).unwrap()
106 } else {
107 self.time_base + dur
108 })
109 }
110 },
111 _ => unreachable!(),
112 };
113
114 if let Some(new_timebase) = instant {
115 self.time_base = new_timebase;
116 }
117 let seq: SeqNo = (self.base_seq + self.index as u64).into();
118
119 self.index += 1;
120 if self.index == amount as usize {
121 self.twcc.chunks.pop_front();
122 self.base_seq = *seq + 1;
123 self.index = 0;
124 }
125
126 Some((seq, status, instant))
127 }
128}
129
130impl RtcpPacket for Twcc {
131 fn header(&self) -> RtcpHeader {
132 RtcpHeader {
133 rtcp_type: RtcpType::TransportLayerFeedback,
134 feedback_message_type: FeedbackMessageType::TransportFeedback(
135 TransportType::TransportWide,
136 ),
137 words_less_one: (self.length_words() - 1) as u16,
138 }
139 }
140
141 fn length_words(&self) -> usize {
142 let mut total = self.chunks_byte_len() + self.delta_byte_len();
150
151 let pad = 4 - total % 4;
152 if pad < 4 {
153 total += pad;
154 }
155
156 assert!(total % 4 == 0);
157
158 let total_words = total / 4;
159
160 5 + total_words
161 }
162
163 fn write_to(&self, buf: &mut [u8]) -> usize {
164 let len_start = buf.len();
165
166 let mut total = {
167 let buf = &mut buf[..];
168
169 self.header().write_to(buf);
170 buf[4..8].copy_from_slice(&self.sender_ssrc.to_be_bytes());
171 buf[8..12].copy_from_slice(&self.ssrc.to_be_bytes());
172
173 buf[12..14].copy_from_slice(&self.base_seq.to_be_bytes());
174 buf[14..16].copy_from_slice(&self.status_count.to_be_bytes());
175
176 let ref_time = self.reference_time.to_be_bytes();
177 buf[16..19].copy_from_slice(&ref_time[1..4]);
178 buf[19] = self.feedback_count;
179
180 let mut buf = &mut buf[20..];
181 for p in &self.chunks {
182 p.write_to(buf);
183 buf = &mut buf[2..];
184 }
185
186 for d in &self.delta {
187 let n = d.write_to(buf);
188 buf = &mut buf[n..];
189 }
190
191 len_start - buf.len()
192 };
193
194 let pad = 4 - total % 4;
195 if pad < 4 {
196 for i in 0..pad {
197 buf[total + i] = 0;
198 }
199 buf[total + pad - 1] = pad as u8;
200
201 total += pad;
202 buf[0] |= 0b00_1_00000;
204 }
205
206 total
207 }
208}
209
210#[derive(Debug)]
211pub struct TwccRecvRegister {
212 keep_reported: usize,
216
217 queue: VecDeque<Receiption>,
222
223 report_from: usize,
225
226 interims: VecDeque<ChunkInterim>,
228
229 time_start: Option<Instant>,
237
238 generated_reports: u64,
240
241 receive_window: ReceiveWindow,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246struct Receiption {
247 seq: SeqNo,
248 time: Instant,
249}
250
251impl TwccRecvRegister {
252 pub fn new(keep_reported: usize) -> Self {
253 TwccRecvRegister {
254 keep_reported,
255 queue: VecDeque::new(),
256 report_from: 0,
257 interims: VecDeque::new(),
258 time_start: None,
259 generated_reports: 0,
260 receive_window: ReceiveWindow::default(),
261 }
262 }
263
264 pub fn max_seq(&self) -> SeqNo {
265 self.queue.back().map(|r| r.seq).unwrap_or_else(|| 0.into())
268 }
269
270 pub fn update_seq(&mut self, seq: SeqNo, time: Instant) {
271 self.receive_window.record_seq(seq);
272
273 match self.queue.binary_search_by_key(&seq, |r| r.seq) {
274 Ok(_) => {
275 }
278 Err(idx) => {
279 if let Some(time_start) = self.time_start {
280 if time_start - time >= Duration::from_millis(8192) {
284 return;
285 }
286 }
287
288 self.queue.insert(idx, Receiption { seq, time });
289
290 if idx < self.report_from {
291 self.report_from = idx;
292 }
293 }
294 }
295 }
296
297 pub fn build_report(&mut self, max_byte_size: usize) -> Option<Twcc> {
298 if max_byte_size > 10_000 {
299 warn!("Refuse to build too large Twcc report");
300 return None;
301 }
302
303 let first = self.queue.get(self.report_from);
305 let first = first?;
306
307 if self.time_start.is_none() {
309 self.time_start = Some(first.time);
310 }
311
312 let (base_seq, first_time) = (first.seq, first.time);
313 let time_start = self.time_start.expect("a start time");
314
315 let first_time_rel = first_time - time_start;
317
318 let reference_time = (first_time_rel.as_micros() as u64 / 64_000) as u32;
320
321 let mut twcc = Twcc {
322 sender_ssrc: 0.into(),
323 ssrc: 0.into(),
324 feedback_count: self.generated_reports as u8,
325 base_seq: *base_seq as u16,
326 reference_time,
327 status_count: 0,
328 chunks: VecDeque::new(),
329 delta: VecDeque::new(),
330 };
331
332 let base_time = time_start + Duration::from_micros(reference_time as u64 * 64_000);
338
339 build_interims(
342 &self.queue,
343 self.report_from,
344 base_seq,
345 base_time,
346 &mut self.interims,
347 );
348 let interims = &mut self.interims;
349
350 let mut bytes_left = max_byte_size - 20;
352
353 while !interims.is_empty() {
354 const MIN_RUN_SIZE: usize = 2 + 2 + 3;
356
357 if bytes_left < MIN_RUN_SIZE {
358 break;
359 }
360
361 let (mut chunk, max) = {
363 let first_status = interims.front().expect("at least one interim").status();
364
365 let c_run = PacketChunk::Run(first_status, 0);
366 let c_single = PacketChunk::VectorSingle(0, 0);
367 let c_double = PacketChunk::VectorDouble(0, 0);
368
369 let max_run = c_run.append_max(interims.iter());
370 let max_single = c_single.append_max(interims.iter());
371 let max_double = c_double.append_max(interims.iter());
372
373 let max = max_run.max(max_single).max(max_double);
374
375 const MAX_SINGLE_SIZE: usize = 2 + 14 + 3;
377 const MAX_DOUBLE_SIZE: usize = 2 + 14 + 3;
379
380 if max == max_run {
381 (c_run, max_run)
382 } else if max == max_single && bytes_left >= MAX_SINGLE_SIZE {
383 (c_single, max_single)
384 } else if max == max_double && bytes_left >= MAX_DOUBLE_SIZE {
385 (c_double, max_double)
386 } else {
387 (c_run, max_run)
389 }
390 };
391
392 let mut todo = max;
394
395 loop {
396 if bytes_left < MIN_RUN_SIZE {
397 break;
398 }
399
400 if todo == 0 {
401 break;
402 }
403
404 let i = match interims.front_mut() {
405 Some(v) => v,
406 None => break,
407 };
408
409 let appended = chunk.append(i);
410 assert!(appended > 0);
411 todo -= appended;
412 twcc.status_count += appended;
413
414 if i.consume(appended) {
415 if matches!(i, ChunkInterim::Received(_, _, _)) {
417 self.report_from += 1;
418 }
419
420 if let Some(delta) = i.delta() {
421 twcc.delta.push_back(delta);
422 bytes_left -= delta.byte_len();
423 }
424
425 interims.pop_front();
427 } else {
428 assert!(todo == 0);
430 }
431 }
432
433 let free = chunk.free();
434 if chunk.must_be_full() && free > 0 {
435 assert!(interims.is_empty());
437 chunk.append(&ChunkInterim::Missing(free));
438 }
439
440 twcc.chunks.push_back(chunk);
441 bytes_left -= 2;
442 }
443
444 if twcc.chunks.is_empty() {
448 return None;
449 }
450
451 self.generated_reports += 1;
452
453 if self.report_from > self.keep_reported {
455 let to_remove = self.report_from - self.keep_reported;
456 self.queue.drain(..to_remove);
457 self.report_from -= to_remove;
458 }
459
460 Some(twcc)
461 }
462
463 pub fn has_unreported(&self) -> bool {
464 self.queue.len() > self.report_from
465 }
466
467 pub fn loss(&mut self) -> Option<f32> {
471 let max_seq = self.receive_window.max_seq?;
476 let base_seq = self.receive_window.base_seq?;
477 let expected = *max_seq - *base_seq + 1_u64;
478
479 let expected_interval = expected - self.receive_window.expected_prior;
480 self.receive_window.expected_prior = expected;
481
482 let received_interval = self.receive_window.received - self.receive_window.received_prior;
483 self.receive_window.received_prior = self.receive_window.received;
484
485 let lost_interval = expected_interval.saturating_sub(received_interval);
486
487 (expected_interval != 0).then_some(lost_interval as f32 / expected_interval as f32)
488 }
489}
490
491fn build_interims(
494 queue: &VecDeque<Receiption>,
495 report_from: usize,
496 base_seq: SeqNo,
497 base_time: Instant,
498 interims: &mut VecDeque<ChunkInterim>,
499) {
500 interims.clear();
501 let report_from = queue.iter().enumerate().skip(report_from);
502
503 let mut prev = (base_seq, base_time);
504
505 for (index, r) in report_from {
506 let diff_seq = *r.seq - *prev.0;
507
508 if diff_seq > 1 {
509 let mut todo = diff_seq - 1;
510 while todo > 0 {
511 let n = todo.min(8192);
513 interims.push_back(ChunkInterim::Missing(n as u16));
514 todo -= n;
515 }
516 }
517
518 let diff_time = if r.time < prev.1 {
519 let dur = prev.1 - r.time;
521 -(dur.as_micros() as i32)
522 } else {
523 let dur = r.time - prev.1;
524 dur.as_micros() as i32
525 };
526
527 let (status, time) = if diff_time < -8_192_000 || diff_time > 8_191_750 {
528 break;
531 } else if diff_time < 0 || diff_time > 63_750 {
532 let t = diff_time / 250;
533 assert!(t >= -32_765 && t <= 32_767);
534 (PacketStatus::ReceivedLargeOrNegativeDelta, t as i16)
535 } else {
536 let t = diff_time / 250;
537 assert!(t >= 0 && t <= 255);
538 (PacketStatus::ReceivedSmallDelta, t as i16)
539 };
540
541 interims.push_back(ChunkInterim::Received(index, status, time));
542 prev = (r.seq, r.time);
543 }
544}
545
546#[derive(Debug, Clone, Copy)]
547enum ChunkInterim {
548 Missing(u16), Received(usize, PacketStatus, i16),
550}
551
552#[derive(Debug, Default)]
553struct ReceiveWindow {
554 base_seq: Option<SeqNo>,
556
557 max_seq: Option<SeqNo>,
559
560 expected_prior: u64,
562
563 received: u64,
565
566 received_prior: u64,
568}
569
570impl ReceiveWindow {
571 fn record_seq(&mut self, seq: SeqNo) {
572 if self.base_seq.is_none() {
573 self.base_seq = Some(seq);
574 }
575
576 self.received += 1;
577 self.max_seq = self.max_seq.max(Some(seq));
578 }
579}
580
581impl ChunkInterim {
582 fn status(&self) -> PacketStatus {
583 match self {
584 ChunkInterim::Missing(_) => PacketStatus::NotReceived,
585 ChunkInterim::Received(_, s, _) => *s,
586 }
587 }
588
589 fn delta(&self) -> Option<Delta> {
590 match self {
591 ChunkInterim::Missing(_) => None,
592 ChunkInterim::Received(_, s, d) => match *s {
593 PacketStatus::ReceivedSmallDelta => Some(Delta::Small(*d as u8)),
594 PacketStatus::ReceivedLargeOrNegativeDelta => Some(Delta::Large(*d)),
595 _ => unreachable!(),
596 },
597 }
598 }
599
600 fn consume(&mut self, n: u16) -> bool {
601 match self {
602 ChunkInterim::Missing(c) => {
603 *c -= n;
604 *c == 0
605 }
606 ChunkInterim::Received(_, _, _) => {
607 assert!(n <= 1);
608 n == 1
609 }
610 }
611 }
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq)]
615pub enum PacketChunk {
616 Run(PacketStatus, u16), VectorSingle(u16, u16),
618 VectorDouble(u16, u16),
619}
620
621impl PacketChunk {
622 fn append_max<'a>(&self, iter: impl Iterator<Item = &'a ChunkInterim>) -> u16 {
623 let mut to_fill = *self;
624
625 let mut reached_end = true;
626
627 for i in iter {
628 if to_fill.free() == 0 {
629 reached_end = false;
630 break;
631 }
632
633 if !to_fill.can_append_status(i.status()) {
636 reached_end = false;
637 break;
638 }
639
640 to_fill.append(i);
641 }
642
643 if to_fill.must_be_full() && to_fill.free() > 0 && !reached_end {
647 return 0;
648 }
649
650 self.free() - to_fill.free()
651 }
652
653 fn append(&mut self, i: &ChunkInterim) -> u16 {
654 use ChunkInterim::*;
655 use PacketChunk::*;
656 let free = self.free();
657 match (self, i) {
658 (Run(s, n), Missing(c)) => {
659 if *s != PacketStatus::NotReceived {
660 return 0;
661 }
662 let max = free.min(*c);
663 *n += max;
664 max
665 }
666 (Run(s, n), Received(_, s2, _)) => {
667 if *s != *s2 {
668 return 0;
669 }
670 let max = free.min(1);
671 *n += max;
672 max
673 }
674 (VectorSingle(n, f), Missing(c)) => {
675 let max = free.min(*c);
676 *n <<= max;
677 *f += max;
678 max
679 }
680 (VectorSingle(n, f), Received(_, s2, _)) => {
681 if *s2 == PacketStatus::ReceivedLargeOrNegativeDelta {
682 return 0;
683 }
684 let max = free.min(1);
685 if max == 1 {
686 *n <<= 1;
687 *n |= 1;
688 *f += 1;
689 }
690 max
691 }
692 (VectorDouble(n, f), Missing(c)) => {
693 let max = free.min(*c);
694 *n <<= max * 2;
695 *f += max;
696 max
697 }
698 (VectorDouble(n, f), Received(_, s2, _)) => {
699 let max = free.min(1);
700 if max == 1 {
701 *n <<= 2;
702 *n |= *s2 as u16;
703 *f += 1;
704 }
705 max
706 }
707 }
708 }
709
710 fn must_be_full(&self) -> bool {
711 match self {
712 PacketChunk::Run(_, _) => false,
713 PacketChunk::VectorSingle(_, _) => true,
714 PacketChunk::VectorDouble(_, _) => true,
715 }
716 }
717
718 fn free(&self) -> u16 {
719 match self {
720 PacketChunk::Run(_, n) => 8192 - *n,
721 PacketChunk::VectorSingle(_, filled) => 14 - *filled,
722 PacketChunk::VectorDouble(_, filled) => 7 - *filled,
723 }
724 }
725
726 fn write_to(&self, buf: &mut [u8]) {
727 let x = match self {
728 PacketChunk::Run(s, n) => {
738 let mut x = 0_u16;
739 x |= (*s as u16) << 13;
740 assert!(*n <= 8192);
741 x |= n;
742 x
743 }
744
745 PacketChunk::VectorSingle(n, fill) => {
762 assert!(*fill == 14);
763 let mut x: u16 = 1 << 15;
764 assert!(*n <= 16384);
765 x |= *n;
766 x
767 }
768 PacketChunk::VectorDouble(n, fill) => {
769 assert!(*fill == 7);
770 let mut x: u16 = 1 << 15;
771 assert!(*n <= 16384);
772 x |= 1 << 14;
773 x |= *n;
774 x
775 }
776 };
777 buf[..2].copy_from_slice(&x.to_be_bytes());
778 }
779
780 fn max_possible_status_count(&self) -> usize {
781 match self {
782 PacketChunk::Run(_, n) => *n as usize,
783 PacketChunk::VectorSingle(_, _) => 14,
784 PacketChunk::VectorDouble(_, _) => 7,
785 }
786 }
787
788 fn can_append_status(&self, status: PacketStatus) -> bool {
789 match self {
790 PacketChunk::Run(s, _) => *s == status,
791 PacketChunk::VectorSingle(_, _) => status != PacketStatus::ReceivedLargeOrNegativeDelta,
792 PacketChunk::VectorDouble(_, _) => true,
793 }
794 }
795}
796
797impl Delta {
798 fn write_to(&self, buf: &mut [u8]) -> usize {
799 match self {
800 Delta::Small(v) => {
801 buf[0] = *v;
802 1
803 }
804 Delta::Large(v) => {
805 buf[..2].copy_from_slice(&v.to_be_bytes());
806 2
807 }
808 }
809 }
810
811 fn byte_len(&self) -> usize {
812 match self {
813 Delta::Small(_) => 1,
814 Delta::Large(_) => 2,
815 }
816 }
817}
818
819#[derive(Debug, Clone, Copy, PartialEq, Eq)]
820pub enum PacketStatus {
821 NotReceived = 0b00,
822 ReceivedSmallDelta = 0b01,
823 ReceivedLargeOrNegativeDelta = 0b10,
824 Unknown = 0b11,
825}
826
827#[derive(Debug, Clone, Copy, PartialEq, Eq)]
828pub enum Delta {
829 Small(u8),
830 Large(i16),
831}
832
833impl From<PacketStatus> for u8 {
834 fn from(val: PacketStatus) -> Self {
835 val as usize as u8
836 }
837}
838
839impl From<u8> for PacketStatus {
840 fn from(v: u8) -> Self {
841 match v {
842 0b00 => Self::NotReceived,
843 0b01 => Self::ReceivedSmallDelta,
844 0b10 => Self::ReceivedLargeOrNegativeDelta,
845 _ => Self::Unknown,
846 }
847 }
848}
849
850impl<'a> TryFrom<&'a [u8]> for Twcc {
851 type Error = &'static str;
852
853 fn try_from(buf: &'a [u8]) -> Result<Self, Self::Error> {
854 if buf.len() < 16 {
855 return Err("Less than 16 bytes for start of Twcc");
856 }
857
858 let sender_ssrc = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]).into();
859 let ssrc = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]).into();
860 let base_seq = u16::from_be_bytes([buf[8], buf[9]]);
861 let status_count = u16::from_be_bytes([buf[10], buf[11]]);
862 let reference_time = u32::from_be_bytes([0, buf[12], buf[13], buf[14]]);
863 let feedback_count = buf[15];
864
865 let mut twcc = Twcc {
866 sender_ssrc,
867 ssrc,
868 base_seq,
869 status_count,
870 reference_time,
871 feedback_count,
872 chunks: VecDeque::new(),
873 delta: VecDeque::new(),
874 };
875
876 let mut todo = status_count as isize;
877 let mut buf = &buf[16..];
878 loop {
879 if todo <= 0 {
880 break;
881 }
882
883 let chunk: PacketChunk = buf.try_into()?;
884
885 todo -= chunk.max_possible_status_count() as isize;
886
887 twcc.chunks.push_back(chunk);
888 buf = &buf[2..];
889 }
890
891 if twcc.chunks.is_empty() {
892 return Ok(twcc);
893 }
894
895 fn read_delta_small(
896 buf: &[u8],
897 n: usize,
898 ) -> Result<impl Iterator<Item = Delta> + '_, &'static str> {
899 if buf.len() < n {
900 return Err("Not enough buf for small deltas");
901 }
902 Ok((0..n).map(|i| Delta::Small(buf[i])))
903 }
904
905 fn read_delta_large(
906 buf: &[u8],
907 n: usize,
908 ) -> Result<impl Iterator<Item = Delta> + '_, &'static str> {
909 if buf.len() < n * 2 {
910 return Err("Not enough buf for large deltas");
911 }
912 Ok((0..(n * 2))
913 .step_by(2)
914 .map(|i| Delta::Large(i16::from_be_bytes([buf[i], buf[i + 1]]))))
915 }
916
917 for c in &twcc.chunks {
918 match c {
919 PacketChunk::Run(PacketStatus::ReceivedSmallDelta, n) => {
920 let n = *n as usize;
921 twcc.delta.extend(read_delta_small(buf, n)?);
922 buf = &buf[n..];
923 }
924 PacketChunk::Run(PacketStatus::ReceivedLargeOrNegativeDelta, n) => {
925 let n = *n as usize;
926 twcc.delta.extend(read_delta_large(buf, n)?);
927 buf = &buf[n..];
928 }
929 PacketChunk::VectorSingle(v, _) => {
930 let n = v.count_ones() as usize;
931 twcc.delta.extend(read_delta_small(buf, n)?);
932 buf = &buf[n..];
933 }
934 PacketChunk::VectorDouble(v, _) => {
935 for n in (0..=12).step_by(2) {
936 let x = (*v >> (12 - n)) & 0b11;
937 match PacketStatus::from(x as u8) {
938 PacketStatus::ReceivedSmallDelta => {
939 twcc.delta.extend(read_delta_small(buf, 1)?);
940 buf = &buf[1..];
941 }
942 PacketStatus::ReceivedLargeOrNegativeDelta => {
943 twcc.delta.extend(read_delta_large(buf, 1)?);
944 buf = &buf[2..];
945 }
946 _ => {}
947 }
948 }
949 }
950 _ => {}
951 }
952 }
953
954 Ok(twcc)
955 }
956}
957
958impl<'a> TryFrom<&'a [u8]> for PacketChunk {
959 type Error = &'static str;
960
961 fn try_from(buf: &'a [u8]) -> Result<Self, Self::Error> {
962 if buf.len() < 2 {
963 return Err("Less than 2 bytes for PacketChunk");
964 }
965
966 let x = u16::from_be_bytes([buf[0], buf[1]]);
967
968 let is_vec = (x & 0b1000_0000_0000_0000) > 0;
969
970 let p = if is_vec {
971 let is_double = (x & 0b0100_0000_0000_0000) > 0;
972 let n = x & 0b0011_1111_1111_1111;
973 if is_double {
974 PacketChunk::VectorDouble(n, 7)
975 } else {
976 PacketChunk::VectorSingle(n, 14)
977 }
978 } else {
979 let s: PacketStatus = ((x >> 13) as u8).into();
980 let n = x & 0b0001_1111_1111_1111;
981 PacketChunk::Run(s, n)
982 };
983
984 Ok(p)
985 }
986}
987
988#[derive(Debug)]
989pub struct TwccSendRegister {
990 keep: usize,
992
993 queue: VecDeque<TwccSendRecord>,
995
996 time_zero: Option<Instant>,
998
999 last_registered: SeqNo,
1001}
1002
1003impl<'a> IntoIterator for &'a TwccSendRegister {
1004 type Item = &'a TwccSendRecord;
1005 type IntoIter = vec_deque::Iter<'a, TwccSendRecord>;
1006
1007 fn into_iter(self) -> Self::IntoIter {
1008 self.queue.iter()
1009 }
1010}
1011
1012#[derive(Debug)]
1014pub struct TwccSendRecord {
1015 seq: SeqNo,
1017
1018 local_send_time: Instant,
1020
1021 size: u16,
1023
1024 recv_report: Option<TwccRecvReport>,
1025}
1026
1027impl TwccSendRecord {
1028 pub fn seq(&self) -> SeqNo {
1030 self.seq
1031 }
1032
1033 pub fn local_send_time(&self) -> Instant {
1035 self.local_send_time
1036 }
1037
1038 pub fn size(&self) -> usize {
1039 self.size as usize
1040 }
1041
1042 pub fn remote_recv_time(&self) -> Option<Instant> {
1044 self.recv_report.as_ref().and_then(|r| r.remote_recv_time)
1045 }
1046
1047 pub fn rtt(&self) -> Option<Duration> {
1049 let recv_report = self.recv_report.as_ref()?;
1050 Some(recv_report.local_recv_time - self.local_send_time)
1051 }
1052}
1053
1054#[derive(Debug)]
1055pub struct TwccRecvReport {
1056 local_recv_time: Instant,
1058
1059 remote_recv_time: Option<Instant>,
1061}
1062
1063impl TwccSendRegister {
1064 pub fn new(keep: usize) -> Self {
1065 TwccSendRegister {
1066 keep,
1067 queue: VecDeque::new(),
1068 time_zero: None,
1069 last_registered: 0.into(),
1070 }
1071 }
1072
1073 pub fn register_seq(&mut self, seq: SeqNo, now: Instant, size: usize) {
1074 self.last_registered = seq;
1075 self.queue.push_back(TwccSendRecord {
1076 seq,
1077 local_send_time: now,
1078 size: size as u16,
1081 recv_report: None,
1083 });
1084 while self.queue.len() > self.keep {
1085 self.queue.pop_front();
1086 }
1087 }
1088
1089 pub fn apply_report(&mut self, twcc: Twcc, now: Instant) -> Option<RangeInclusive<SeqNo>> {
1094 if self.time_zero.is_none() {
1095 self.time_zero = Some(now);
1096 }
1097
1098 let time_zero = self.time_zero.unwrap();
1099 let head_seq = self.queue.front().map(|r| r.seq)?;
1100
1101 let mut iter = twcc
1102 .into_iter(time_zero, self.last_registered)
1103 .skip_while(|(seq, _, _)| seq < &head_seq);
1104 let (first_seq_no, _, first_instant) = iter.next()?;
1105
1106 let mut iter2 = self.queue.iter_mut().skip_while(|r| *r.seq < *first_seq_no);
1107 let first_record = iter2.next()?;
1108
1109 fn update(
1110 now: Instant,
1111 r: &mut TwccSendRecord,
1112 seq: SeqNo,
1113 instant: Option<Instant>,
1114 ) -> bool {
1115 if r.seq != seq {
1116 return false;
1117 }
1118 let recv_report = TwccRecvReport {
1119 local_recv_time: now,
1120 remote_recv_time: instant,
1121 };
1122 r.recv_report = Some(recv_report);
1123
1124 true
1125 }
1126
1127 if first_record.seq != first_seq_no {
1128 return None;
1130 }
1131
1132 let mut problematic_seq = None;
1133
1134 if !update(now, first_record, first_seq_no, first_instant) {
1135 problematic_seq = Some((first_record.seq, first_seq_no));
1136 }
1137
1138 let mut last_seq_no = first_seq_no;
1139 for ((seq, _, instant), record) in iter.zip(iter2) {
1140 if problematic_seq.is_some() {
1141 break;
1142 }
1143
1144 if !update(now, record, seq, instant) {
1145 problematic_seq = Some((record.seq, seq));
1146 }
1147 last_seq_no = seq;
1148 }
1149
1150 if let Some((record_seq, report_seq)) = problematic_seq {
1151 let queue_tail: Vec<_> = self.queue.iter().rev().take(100).collect();
1152 panic!(
1153 "Unexpected TWCC sequence when applying TWCC report. \
1154 Send Record Seq({record_seq}) does not match Report Seq({report_seq}).\
1155 \nLast 100 entires in queue: {queue_tail:?}."
1156 );
1157 }
1158
1159 Some(first_seq_no..=last_seq_no)
1160 }
1161
1162 pub fn send_record(&self, seq: SeqNo) -> Option<&TwccSendRecord> {
1163 let index = self.queue.binary_search_by_key(&seq, |r| r.seq).ok()?;
1164
1165 Some(&self.queue[index])
1166 }
1167
1168 pub fn loss(&self, duration: Duration, now: Instant) -> Option<f32> {
1174 let lower_bound = now - duration;
1176
1177 let packets = self
1178 .queue
1179 .iter()
1180 .rev()
1181 .filter(|s| s.recv_report.is_some())
1185 .take_while(|s| s.local_send_time >= lower_bound);
1186
1187 let (total, lost) = packets.fold((0, 0), |(total, lost), s| {
1188 let was_lost = s
1189 .recv_report
1190 .as_ref()
1191 .map(|rr| rr.remote_recv_time.is_none())
1192 .unwrap_or(true);
1193
1194 (total + 1, lost + u64::from(was_lost))
1195 });
1196
1197 if total == 0 {
1198 return None;
1199 }
1200
1201 Some((lost as f32) / (total as f32))
1202 }
1203
1204 pub fn send_records(
1206 &self,
1207 range: RangeInclusive<SeqNo>,
1208 ) -> Option<impl Iterator<Item = &TwccSendRecord>> {
1209 let first_index = self
1210 .queue
1211 .binary_search_by_key(range.start(), |r| r.seq)
1212 .ok()?;
1213
1214 let current = *range.start();
1215
1216 Some(TwccSendRecordsIter {
1217 range,
1218 index: first_index,
1219 current,
1220 queue: &self.queue,
1221 })
1222 }
1223}
1224
1225#[derive()]
1226struct TwccSendRecordsIter<'a> {
1227 range: RangeInclusive<SeqNo>,
1228 current: SeqNo,
1229 index: usize,
1230 queue: &'a VecDeque<TwccSendRecord>,
1231}
1232
1233impl<'a> Iterator for TwccSendRecordsIter<'a> {
1234 type Item = &'a TwccSendRecord;
1235
1236 fn next(&mut self) -> Option<Self::Item> {
1237 if self.current > *self.range.end() || self.current < *self.range.start() {
1238 return None;
1239 }
1240
1241 let item = &self.queue[self.index];
1242 assert!(self.current == item.seq);
1243 self.current.inc();
1244 self.index += 1;
1245
1246 Some(item)
1247 }
1248}
1249
1250impl fmt::Debug for Twcc {
1334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1335 f.debug_struct("Twcc")
1336 .field("sender_ssrc", &self.sender_ssrc)
1337 .field("ssrc", &self.ssrc)
1338 .field("base_seq", &self.base_seq)
1339 .field("status_count", &self.status_count)
1340 .field("reference_time", &self.reference_time)
1341 .field("feedback_count", &self.feedback_count)
1342 .field("chunks", &self.chunks)
1343 .field("delta", &self.delta.len())
1344 .finish()
1345 }
1346}
1347
1348#[allow(clippy::assign_op_pattern)]
1349#[cfg(test)]
1350mod test {
1351 use std::time::Duration;
1352
1353 use super::*;
1354
1355 use Delta::*;
1356 use PacketChunk::*;
1357 use PacketStatus::*;
1358
1359 #[test]
1360 fn register_write_parse_small_delta() {
1361 let mut reg = TwccRecvRegister::new(100);
1362
1363 let now = Instant::now();
1364
1365 reg.update_seq(10.into(), now + Duration::from_millis(0));
1366 reg.update_seq(11.into(), now + Duration::from_millis(12));
1367 reg.update_seq(12.into(), now + Duration::from_millis(23));
1368 reg.update_seq(13.into(), now + Duration::from_millis(43));
1369
1370 let report = reg.build_report(1000).unwrap();
1371 let mut buf = vec![0_u8; 1500];
1372 let n = report.write_to(&mut buf[..]);
1373 buf.truncate(n);
1374
1375 let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1376 let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1377
1378 assert_eq!(header, report.header());
1379 assert_eq!(parsed, report);
1380 }
1381
1382 #[test]
1383 fn register_write_parse_small_delta_missing() {
1384 let mut reg = TwccRecvRegister::new(100);
1385
1386 let now = Instant::now();
1387
1388 reg.update_seq(10.into(), now + Duration::from_millis(0));
1389 reg.update_seq(11.into(), now + Duration::from_millis(12));
1390 reg.update_seq(12.into(), now + Duration::from_millis(23));
1391 reg.update_seq(14.into(), now + Duration::from_millis(43));
1393
1394 let report = reg.build_report(1000).unwrap();
1395 let mut buf = vec![0_u8; 1500];
1396 let n = report.write_to(&mut buf[..]);
1397 buf.truncate(n);
1398
1399 let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1400 let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1401
1402 assert_eq!(header, report.header());
1403 assert_eq!(parsed, report);
1404 }
1405
1406 #[test]
1407 fn register_write_parse_large_delta() {
1408 let mut reg = TwccRecvRegister::new(100);
1409
1410 let now = Instant::now();
1411
1412 reg.update_seq(10.into(), now + Duration::from_millis(0));
1413 reg.update_seq(11.into(), now + Duration::from_millis(70));
1414 reg.update_seq(12.into(), now + Duration::from_millis(140));
1415 reg.update_seq(13.into(), now + Duration::from_millis(210));
1416
1417 let report = reg.build_report(1000).unwrap();
1418 let mut buf = vec![0_u8; 1500];
1419 let n = report.write_to(&mut buf[..]);
1420 buf.truncate(n);
1421
1422 let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1423 let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1424
1425 assert_eq!(header, report.header());
1426 assert_eq!(parsed, report);
1427 }
1428
1429 #[test]
1430 fn register_write_parse_mixed_delta() {
1431 let mut reg = TwccRecvRegister::new(100);
1432
1433 let now = Instant::now();
1434
1435 reg.update_seq(10.into(), now + Duration::from_millis(0));
1436 reg.update_seq(11.into(), now + Duration::from_millis(12));
1437 reg.update_seq(12.into(), now + Duration::from_millis(140));
1438 reg.update_seq(13.into(), now + Duration::from_millis(152));
1439
1440 let report = reg.build_report(1000).unwrap();
1441 let mut buf = vec![0_u8; 1500];
1442 let n = report.write_to(&mut buf[..]);
1443 buf.truncate(n);
1444
1445 let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1446 let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1447
1448 assert_eq!(header, report.header());
1449 assert_eq!(parsed, report);
1450 }
1451
1452 #[test]
1453 fn too_big_time_gap_requires_two_reports() {
1454 let mut reg = TwccRecvRegister::new(100);
1455
1456 let now = Instant::now();
1457
1458 reg.update_seq(10.into(), now + Duration::from_millis(0));
1459 reg.update_seq(11.into(), now + Duration::from_millis(12));
1460 reg.update_seq(12.into(), now + Duration::from_millis(9000));
1461
1462 let _ = reg.build_report(1000).unwrap();
1463 let report2 = reg.build_report(1000).unwrap();
1464
1465 assert_eq!(report2.reference_time, 140);
1468
1469 assert_eq!(report2.delta[0], Small(160));
1472 }
1473
1474 #[test]
1475 fn report_padded_to_even_word() {
1476 let mut reg = TwccRecvRegister::new(100);
1477
1478 let now = Instant::now();
1479
1480 reg.update_seq(10.into(), now + Duration::from_millis(0));
1481
1482 let report = reg.build_report(1000).unwrap();
1483 let mut buf = vec![0_u8; 1500];
1484 let n = report.write_to(&mut buf[..]);
1485
1486 assert!(n % 4 == 0);
1487 }
1488
1489 #[test]
1490 fn report_truncated_to_max_byte_size() {
1491 let mut reg = TwccRecvRegister::new(100);
1492
1493 let now = Instant::now();
1494
1495 reg.update_seq(10.into(), now + Duration::from_millis(0));
1496 reg.update_seq(11.into(), now + Duration::from_millis(12));
1497 reg.update_seq(12.into(), now + Duration::from_millis(140));
1498 reg.update_seq(13.into(), now + Duration::from_millis(152));
1499
1500 let report = reg.build_report(28).unwrap();
1501
1502 assert_eq!(report.status_count, 2);
1503 assert_eq!(report.chunks, vec![Run(ReceivedSmallDelta, 2)]);
1504 assert_eq!(report.delta, vec![Small(0), Small(48)]);
1505
1506 let report = reg.build_report(28).unwrap();
1507
1508 assert_eq!(report.status_count, 2);
1509 assert_eq!(report.chunks, vec![Run(ReceivedSmallDelta, 2)]);
1510 assert_eq!(report.delta, vec![Small(48), Small(48)]);
1511 }
1512
1513 #[test]
1514 fn truncated_counts_gaps_correctly() {
1515 let mut reg = TwccRecvRegister::new(100);
1516
1517 let now = Instant::now();
1518
1519 reg.update_seq(10.into(), now + Duration::from_millis(0));
1520 reg.update_seq(13.into(), now + Duration::from_millis(12));
1522 reg.update_seq(14.into(), now + Duration::from_millis(140));
1523 reg.update_seq(15.into(), now + Duration::from_millis(152));
1524
1525 let report = reg.build_report(32).unwrap();
1526
1527 assert_eq!(report.status_count, 4);
1528 assert_eq!(
1529 report.chunks,
1530 vec![
1531 Run(ReceivedSmallDelta, 1),
1532 Run(NotReceived, 2),
1533 Run(ReceivedSmallDelta, 1)
1534 ]
1535 );
1536 assert_eq!(report.delta, vec![Small(0), Small(48)]);
1537 }
1538
1539 #[test]
1540 fn run_max_is_8192() {
1541 let mut reg = TwccRecvRegister::new(100);
1542
1543 let now = Instant::now();
1544
1545 reg.update_seq(0.into(), now + Duration::from_millis(0));
1546 reg.update_seq(8194.into(), now + Duration::from_millis(10));
1547
1548 let report = reg.build_report(1000).unwrap();
1549
1550 assert_eq!(report.status_count, 8195);
1551 assert_eq!(
1552 report.chunks,
1553 vec![
1554 VectorSingle(8192, 14),
1555 Run(NotReceived, 8180),
1556 Run(ReceivedSmallDelta, 1)
1557 ]
1558 );
1559 }
1560
1561 #[test]
1562 fn single_followed_by_missing() {
1563 let mut reg = TwccRecvRegister::new(100);
1564
1565 let now = Instant::now();
1566
1567 reg.update_seq(10.into(), now + Duration::from_millis(0));
1568 reg.update_seq(12.into(), now + Duration::from_millis(10));
1569 reg.update_seq(100.into(), now + Duration::from_millis(20));
1570
1571 let report = reg.build_report(2016).unwrap();
1572
1573 assert_eq!(report.status_count, 91);
1574 assert_eq!(
1575 report.chunks,
1576 vec![
1577 VectorSingle(10240, 14),
1578 Run(NotReceived, 76),
1579 Run(ReceivedSmallDelta, 1)
1580 ]
1581 );
1582 assert_eq!(report.delta, vec![Small(0), Small(40), Small(40)]);
1583 }
1584
1585 #[test]
1586 fn time_jump_small_back_for_second_report() {
1587 let mut reg = TwccRecvRegister::new(100);
1588
1589 let now = Instant::now();
1590
1591 reg.update_seq(10.into(), now + Duration::from_millis(8000));
1592 let _ = reg.build_report(2016).unwrap();
1593
1594 reg.update_seq(9.into(), now + Duration::from_millis(0));
1595 let report = reg.build_report(2016).unwrap();
1596
1597 assert_eq!(report.status_count, 2);
1598 assert_eq!(report.chunks, vec![Run(ReceivedLargeOrNegativeDelta, 2)]);
1599 assert_eq!(report.delta, vec![Large(-32000), Large(32000)]);
1600 }
1601
1602 #[test]
1603 fn time_jump_large_back_for_second_report() {
1604 let mut reg = TwccRecvRegister::new(100);
1605
1606 let now = Instant::now();
1607
1608 reg.update_seq(10.into(), now + Duration::from_millis(9000));
1609 let _ = reg.build_report(2016).unwrap();
1610
1611 reg.update_seq(9.into(), now + Duration::from_millis(0));
1612 assert!(reg.build_report(2016).is_none());
1613
1614 assert_eq!(reg.queue.len(), 1);
1615 }
1616
1617 #[test]
1618 fn empty_twcc() {
1619 let twcc = Twcc {
1620 sender_ssrc: 0.into(),
1621 ssrc: 0.into(),
1622 base_seq: 0,
1623 status_count: 0,
1624 reference_time: 0,
1625 feedback_count: 0,
1626 chunks: VecDeque::new(),
1627 delta: VecDeque::new(),
1628 };
1629
1630 let mut buf = vec![0_u8; 1500];
1631 let n = twcc.write_to(&mut buf[..]);
1632 buf.truncate(n);
1633
1634 let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1635 let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1636
1637 assert_eq!(header, twcc.header());
1638 assert_eq!(parsed, twcc);
1639 }
1640
1641 #[test]
1642 fn negative_deltas() {
1643 let mut reg = TwccRecvRegister::new(100);
1644
1645 let now = Instant::now();
1646
1647 reg.update_seq(10.into(), now + Duration::from_millis(12));
1648 reg.update_seq(11.into(), now + Duration::from_millis(0));
1649 reg.update_seq(12.into(), now + Duration::from_millis(23));
1650
1651 let report = reg.build_report(1000).unwrap();
1652
1653 assert_eq!(report.status_count, 3);
1654 assert_eq!(report.base_seq, 10);
1655 assert_eq!(report.reference_time, 0);
1656 assert_eq!(report.chunks, vec![VectorDouble(6400, 7)]);
1657 assert_eq!(report.delta, vec![Small(0), Large(-48), Small(92)]);
1658
1659 let base = reg.time_start.unwrap();
1660
1661 let mut iter = report.into_iter(base, 10.into());
1662 assert_eq!(
1663 iter.next(),
1664 Some((
1665 10.into(),
1666 PacketStatus::ReceivedSmallDelta,
1667 Some(base + Duration::from_millis(0))
1668 ))
1669 );
1670 assert_eq!(
1671 iter.next(),
1672 Some((
1673 11.into(),
1674 PacketStatus::ReceivedLargeOrNegativeDelta,
1675 Some(base.checked_sub(Duration::from_millis(12)).unwrap())
1676 ))
1677 );
1678 assert_eq!(
1679 iter.next(),
1680 Some((
1681 12.into(),
1682 PacketStatus::ReceivedSmallDelta,
1683 Some(base + Duration::from_millis(11))
1684 ))
1685 );
1686 }
1687
1688 #[test]
1689 fn twcc_fuzz_fail() {
1690 let mut reg = TwccRecvRegister::new(100);
1691
1692 let now = Instant::now();
1693
1694 reg.update_seq(4542.into(), now + Duration::from_millis(2373281424));
1697 reg.update_seq(15918.into(), now + Duration::from_millis(2373862820));
1698 reg.update_seq(8405.into(), now + Duration::from_millis(2379074367));
1699
1700 let report = reg.build_report(43).unwrap();
1701
1702 let mut buf = vec![0_u8; 1500];
1703 let n = report.write_to(&mut buf[..]);
1704 buf.truncate(n);
1705
1706 let header: RtcpHeader = match (&buf[..]).try_into() {
1707 Ok(v) => v,
1708 Err(_) => return,
1709 };
1710 let parsed: Twcc = match (&buf[4..]).try_into() {
1711 Ok(v) => v,
1712 Err(_) => return,
1713 };
1714
1715 assert_eq!(header, report.header());
1716 assert_eq!(parsed, report);
1717 }
1718
1719 #[test]
1720 fn test_send_register_apply_report_for_old_seq_numbers() {
1721 let mut reg = TwccSendRegister::new(25);
1722 let mut now = Instant::now();
1723
1724 for i in 0..50 {
1725 reg.register_seq(i.into(), now, 0);
1726 now = now + Duration::from_micros(15);
1727 }
1728
1729 reg.apply_report(
1733 Twcc {
1734 sender_ssrc: Ssrc::new(),
1735 ssrc: Ssrc::new(),
1736 base_seq: 0,
1737 status_count: 0,
1738 reference_time: 0,
1739 feedback_count: 0,
1740 chunks: [].into(),
1741 delta: [].into(),
1742 },
1743 now,
1744 );
1745 now = now + Duration::from_millis(35);
1746
1747 reg.apply_report(
1748 Twcc {
1749 sender_ssrc: Ssrc::new(),
1750 ssrc: Ssrc::new(),
1751 base_seq: 20,
1752 status_count: 8,
1753 reference_time: 35,
1754 feedback_count: 0,
1755 chunks: [PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 8)].into(),
1756 delta: [
1757 Delta::Small(10),
1758 Delta::Small(10),
1759 Delta::Small(10),
1760 Delta::Small(10),
1761 Delta::Small(10),
1762 Delta::Small(10),
1763 Delta::Small(10),
1764 Delta::Small(10),
1765 ]
1766 .into(),
1767 },
1768 now,
1769 );
1770
1771 for seq in 25..=27 {
1772 let record = reg
1773 .send_record(seq.into())
1774 .unwrap_or_else(|| panic!("Should have send record for seq {seq}"));
1775
1776 assert!(
1777 record.recv_report.is_some(),
1778 "Report should have recorded recv_report for {seq}"
1779 );
1780 }
1781 }
1782
1783 #[test]
1784 fn test_twcc_iter_correct_deltas() {
1785 let twcc = Twcc {
1786 sender_ssrc: 0.into(),
1787 ssrc: 0.into(),
1788 base_seq: 1,
1789 status_count: 12,
1790 reference_time: 1337,
1791 feedback_count: 0,
1792 chunks: [
1793 PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 2),
1794 PacketChunk::VectorDouble(0b1101_0010_0101_0000, 7),
1795 PacketChunk::Run(PacketStatus::ReceivedLargeOrNegativeDelta, 3),
1796 ]
1797 .into(),
1798 delta: [
1799 Delta::Small(10),
1801 Delta::Small(15),
1802 Delta::Small(7),
1804 Delta::Large(280),
1805 Delta::Small(3),
1806 Delta::Small(13),
1807 Delta::Large(-37),
1809 Delta::Large(32),
1810 Delta::Large(89),
1811 ]
1812 .into(),
1813 };
1814
1815 let now = Instant::now();
1816 let base = now + Duration::from_millis(1337 * 64);
1817 let expected = vec![
1818 (
1819 1.into(),
1820 PacketStatus::ReceivedSmallDelta,
1821 Some(base + Duration::from_micros(10 * 250)),
1822 ),
1823 (
1824 2.into(),
1825 PacketStatus::ReceivedSmallDelta,
1826 Some(base + Duration::from_micros(25 * 250)),
1827 ),
1828 (
1829 3.into(),
1830 PacketStatus::ReceivedSmallDelta,
1831 Some(base + Duration::from_micros(32 * 250)),
1832 ),
1833 (4.into(), PacketStatus::NotReceived, None),
1834 (
1835 5.into(),
1836 PacketStatus::ReceivedLargeOrNegativeDelta,
1837 Some(base + Duration::from_micros(312 * 250)),
1838 ),
1839 (
1840 6.into(),
1841 PacketStatus::ReceivedSmallDelta,
1842 Some(base + Duration::from_micros(315 * 250)),
1843 ),
1844 (
1845 7.into(),
1846 PacketStatus::ReceivedSmallDelta,
1847 Some(base + Duration::from_micros(328 * 250)),
1848 ),
1849 (8.into(), PacketStatus::NotReceived, None),
1850 (9.into(), PacketStatus::NotReceived, None),
1851 (
1852 10.into(),
1853 PacketStatus::ReceivedLargeOrNegativeDelta,
1854 Some(base + Duration::from_micros(291 * 250)),
1855 ),
1856 (
1857 11.into(),
1858 PacketStatus::ReceivedLargeOrNegativeDelta,
1859 Some(base + Duration::from_micros(323 * 250)),
1860 ),
1861 (
1862 12.into(),
1863 PacketStatus::ReceivedLargeOrNegativeDelta,
1864 Some(base + Duration::from_micros(412 * 250)),
1865 ),
1866 ];
1867
1868 let result: Vec<_> = twcc.into_iter(now, 1.into()).collect();
1869
1870 assert_eq!(result, expected);
1871 }
1872
1873 #[test]
1874 fn test_twcc_register_send_records() {
1875 let mut reg = TwccSendRegister::new(25);
1876 let mut now = Instant::now();
1877 for i in 0..25 {
1878 reg.register_seq(i.into(), now, 0);
1879 now = now + Duration::from_micros(15);
1880 }
1881
1882 let range = reg
1883 .apply_report(
1884 Twcc {
1885 sender_ssrc: Ssrc::new(),
1886 ssrc: Ssrc::new(),
1887 base_seq: 0,
1888 status_count: 8,
1889 reference_time: 35,
1890 feedback_count: 0,
1891 chunks: [PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 8)].into(),
1892 delta: [
1893 Delta::Small(10),
1894 Delta::Small(10),
1895 Delta::Small(10),
1896 Delta::Small(10),
1897 Delta::Small(10),
1898 Delta::Small(10),
1899 Delta::Small(10),
1900 Delta::Small(10),
1901 ]
1902 .into(),
1903 },
1904 now,
1905 )
1906 .expect("apply_report to return Some(_)");
1907
1908 assert_eq!(range, 0.into()..=7.into());
1909
1910 let iter = reg
1911 .send_records(range)
1912 .expect("send_records to return Some(_)");
1913 assert_eq!(
1914 iter.map(|r| *r.seq).collect::<Vec<_>>(),
1915 vec![0, 1, 2, 3, 4, 5, 6, 7]
1916 );
1917 }
1918
1919 #[test]
1920 fn test_twcc_send_register_loss() {
1921 let mut reg = TwccSendRegister::new(25);
1922 let mut now = Instant::now();
1923 for i in 0..9 {
1924 reg.register_seq(i.into(), now, 0);
1925 now = now + Duration::from_millis(15);
1926 }
1927
1928 now = now + Duration::from_millis(5);
1929 reg.apply_report(
1930 Twcc {
1931 sender_ssrc: Ssrc::new(),
1932 ssrc: Ssrc::new(),
1933 base_seq: 0,
1934 status_count: 9,
1935 reference_time: 35,
1936 feedback_count: 0,
1937 chunks: [
1938 PacketChunk::VectorDouble(0b11_01_01_01_00_01_00_01, 7),
1939 PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 2),
1940 ]
1941 .into(),
1942 delta: [
1943 Delta::Small(10),
1944 Delta::Small(10),
1945 Delta::Small(10),
1946 Delta::Small(10),
1947 Delta::Small(10),
1948 Delta::Small(10),
1949 Delta::Small(10),
1950 ]
1951 .into(),
1952 },
1953 now,
1954 )
1955 .expect("apply_report to return Some(_)");
1956
1957 now = now + Duration::from_millis(20);
1958 let loss = reg
1959 .loss(Duration::from_millis(150), now)
1960 .expect("Should be able to calcualte loss");
1961
1962 let pct = (loss * 100.0).floor() as u32;
1963
1964 assert_eq!(
1965 pct, 25,
1966 "The loss percentage should be 25 as 2 out of 8 packets are lost"
1967 );
1968 }
1969
1970 #[test]
1971 fn test_twcc_recv_register_loss() {
1972 let mut reg = TwccRecvRegister::new(25);
1973 let mut now = Instant::now();
1974
1975 for i in 0..10 {
1976 if i == 3 || i == 7 {
1977 continue;
1979 }
1980 reg.update_seq(i.into(), now);
1981 now = now + Duration::from_millis(50);
1982 }
1983
1984 assert_eq!(reg.loss(), Some(2.0 / 10.0));
1985
1986 for i in 10..20 {
1987 if i == 11 || i == 13 || i == 15 || i == 17 {
1988 continue;
1990 }
1991 reg.update_seq(i.into(), now);
1992 now = now + Duration::from_millis(50);
1993 }
1994
1995 assert_eq!(reg.loss(), Some(4.0 / 10.0));
1996 }
1997}