atm0s_custom_str0m/rtp/rtcp/
twcc.rs

1use std::collections::vec_deque;
2use std::collections::VecDeque;
3use std::fmt;
4use std::ops::RangeInclusive;
5use std::time::{Duration, Instant};
6
7use super::{extend_u16, FeedbackMessageType, RtcpHeader, RtcpPacket};
8use super::{RtcpType, SeqNo, Ssrc, TransportType};
9
10/// Transport Wide Congestion Control.
11///
12/// Sent in response to every RTP packet, but does ranges of packets to respond to.
13#[derive(Clone, PartialEq, Eq)]
14pub struct Twcc {
15    /// Sender of this feedback. Mostly irrelevant, but part of RTCP packets.
16    pub sender_ssrc: Ssrc,
17    /// The SSRC this report is for.
18    pub ssrc: Ssrc,
19    /// Start sequence number.
20    pub base_seq: u16,
21    /// Number of reported statuses.
22    pub status_count: u16,
23    /// Clock time this report was produced. Used for RTT measurement.
24    pub reference_time: u32, // 24 bit
25    /// Increasing counter for each TWCC. For deduping.
26    pub feedback_count: u8, // counter for each Twcc
27    /// Ranges received.
28    pub chunks: VecDeque<PacketChunk>,
29    /// Delta times for the ranges received.
30    pub delta: VecDeque<Delta>,
31}
32
33impl Twcc {
34    fn chunks_byte_len(&self) -> usize {
35        self.chunks.len() * 2
36    }
37
38    fn delta_byte_len(&self) -> usize {
39        self.delta.iter().map(|d| d.byte_len()).sum()
40    }
41
42    /// Iterate over the reported sequences.
43    pub fn into_iter(self, time_zero: Instant, extend_from: SeqNo) -> TwccIter {
44        let millis = self.reference_time as u64 * 64;
45        let time_base = time_zero + Duration::from_millis(millis);
46        let base_seq = extend_u16(Some(*extend_from), self.base_seq);
47        TwccIter {
48            base_seq,
49            time_base,
50            index: 0,
51            twcc: self,
52        }
53    }
54}
55
56pub struct TwccIter {
57    base_seq: u64,
58    time_base: Instant,
59    index: usize,
60    twcc: Twcc,
61}
62
63impl Iterator for TwccIter {
64    type Item = (SeqNo, PacketStatus, Option<Instant>);
65
66    fn next(&mut self) -> Option<Self::Item> {
67        let head = self.twcc.chunks.front()?;
68
69        let (status, amount) = match head {
70            PacketChunk::Run(s, n) => {
71                use PacketStatus::*;
72                let status = match s {
73                    NotReceived | Unknown => NotReceived,
74                    ReceivedSmallDelta => ReceivedSmallDelta,
75                    PacketStatus::ReceivedLargeOrNegativeDelta => ReceivedLargeOrNegativeDelta,
76                };
77                (status, *n)
78            }
79            PacketChunk::VectorSingle(v, n) => {
80                let status = if 1 << (13 - self.index) & v > 0 {
81                    PacketStatus::ReceivedSmallDelta
82                } else {
83                    PacketStatus::NotReceived
84                };
85                (status, *n)
86            }
87            PacketChunk::VectorDouble(v, n) => {
88                let e = ((v >> (12 - self.index * 2)) & 0b11) as u8;
89                let status = PacketStatus::from(e);
90                (status, *n)
91            }
92        };
93
94        let instant = match status {
95            PacketStatus::NotReceived => None,
96            PacketStatus::ReceivedSmallDelta => match self.twcc.delta.pop_front()? {
97                Delta::Small(v) => Some(self.time_base + Duration::from_micros(250 * v as u64)),
98                Delta::Large(_) => panic!("Incorrect large delta size"),
99            },
100            PacketStatus::ReceivedLargeOrNegativeDelta => match self.twcc.delta.pop_front()? {
101                Delta::Small(_) => panic!("Incorrect small delta size"),
102                Delta::Large(v) => {
103                    let dur = Duration::from_micros(250 * v.unsigned_abs() as u64);
104                    Some(if v < 0 {
105                        self.time_base.checked_sub(dur).unwrap()
106                    } else {
107                        self.time_base + dur
108                    })
109                }
110            },
111            _ => unreachable!(),
112        };
113
114        if let Some(new_timebase) = instant {
115            self.time_base = new_timebase;
116        }
117        let seq: SeqNo = (self.base_seq + self.index as u64).into();
118
119        self.index += 1;
120        if self.index == amount as usize {
121            self.twcc.chunks.pop_front();
122            self.base_seq = *seq + 1;
123            self.index = 0;
124        }
125
126        Some((seq, status, instant))
127    }
128}
129
130impl RtcpPacket for Twcc {
131    fn header(&self) -> RtcpHeader {
132        RtcpHeader {
133            rtcp_type: RtcpType::TransportLayerFeedback,
134            feedback_message_type: FeedbackMessageType::TransportFeedback(
135                TransportType::TransportWide,
136            ),
137            words_less_one: (self.length_words() - 1) as u16,
138        }
139    }
140
141    fn length_words(&self) -> usize {
142        // header: 1
143        // sender ssrc: 1
144        // ssrc: 1
145        // base seq + packet status: 1
146        // ref time + feedback count: 1
147        // chunks byte len + delta byte len + padding
148
149        let mut total = self.chunks_byte_len() + self.delta_byte_len();
150
151        let pad = 4 - total % 4;
152        if pad < 4 {
153            total += pad;
154        }
155
156        assert!(total % 4 == 0);
157
158        let total_words = total / 4;
159
160        5 + total_words
161    }
162
163    fn write_to(&self, buf: &mut [u8]) -> usize {
164        let len_start = buf.len();
165
166        let mut total = {
167            let buf = &mut buf[..];
168
169            self.header().write_to(buf);
170            buf[4..8].copy_from_slice(&self.sender_ssrc.to_be_bytes());
171            buf[8..12].copy_from_slice(&self.ssrc.to_be_bytes());
172
173            buf[12..14].copy_from_slice(&self.base_seq.to_be_bytes());
174            buf[14..16].copy_from_slice(&self.status_count.to_be_bytes());
175
176            let ref_time = self.reference_time.to_be_bytes();
177            buf[16..19].copy_from_slice(&ref_time[1..4]);
178            buf[19] = self.feedback_count;
179
180            let mut buf = &mut buf[20..];
181            for p in &self.chunks {
182                p.write_to(buf);
183                buf = &mut buf[2..];
184            }
185
186            for d in &self.delta {
187                let n = d.write_to(buf);
188                buf = &mut buf[n..];
189            }
190
191            len_start - buf.len()
192        };
193
194        let pad = 4 - total % 4;
195        if pad < 4 {
196            for i in 0..pad {
197                buf[total + i] = 0;
198            }
199            buf[total + pad - 1] = pad as u8;
200
201            total += pad;
202            // Toggle padding bit
203            buf[0] |= 0b00_1_00000;
204        }
205
206        total
207    }
208}
209
210#[derive(Debug)]
211pub struct TwccRecvRegister {
212    // How many packets to keep when they are reported. This is to handle packets arriving out
213    // of order and where two consecutive calls to `build_report` needs to go "backwards" in
214    // base_seq.
215    keep_reported: usize,
216
217    /// Queue of packets to form Twcc reports of.
218    ///
219    /// Once the queue has some content, we will always keep at least one entry to "remember" for the
220    /// next report.
221    queue: VecDeque<Receiption>,
222
223    /// Index into queue from where we start reporting on next build_report().
224    report_from: usize,
225
226    /// Interims built in this for every build_report.
227    interims: VecDeque<ChunkInterim>,
228
229    /// The point in time we consider 0. All reported values are offset from this. Set to first
230    /// unreported packet in first `build_reported`.
231    ///
232    // https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#page-5
233    // reference time: 24 bits Signed integer indicating an absolute
234    // reference time in some (unknown) time base chosen by the
235    // sender of the feedback packets.
236    time_start: Option<Instant>,
237
238    /// Counter that increases by one for each report generated.
239    generated_reports: u64,
240
241    /// Data to calculate received loss.
242    receive_window: ReceiveWindow,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246struct Receiption {
247    seq: SeqNo,
248    time: Instant,
249}
250
251impl TwccRecvRegister {
252    pub fn new(keep_reported: usize) -> Self {
253        TwccRecvRegister {
254            keep_reported,
255            queue: VecDeque::new(),
256            report_from: 0,
257            interims: VecDeque::new(),
258            time_start: None,
259            generated_reports: 0,
260            receive_window: ReceiveWindow::default(),
261        }
262    }
263
264    pub fn max_seq(&self) -> SeqNo {
265        // The highest seq must be the last since update_seq inserts values
266        // using a binary search.
267        self.queue.back().map(|r| r.seq).unwrap_or_else(|| 0.into())
268    }
269
270    pub fn update_seq(&mut self, seq: SeqNo, time: Instant) {
271        self.receive_window.record_seq(seq);
272
273        match self.queue.binary_search_by_key(&seq, |r| r.seq) {
274            Ok(_) => {
275                // Exact same SeqNo found. This is an error where the sender potentially
276                // used the same twcc sequence number for two packets. Let's ignore it.
277            }
278            Err(idx) => {
279                if let Some(time_start) = self.time_start {
280                    // If time goes back more than 8192 millis from the time point we've
281                    // chosen as our 0, we can't represent that in the report. Let's just
282                    // forget about it and hope for the best.
283                    if time_start - time >= Duration::from_millis(8192) {
284                        return;
285                    }
286                }
287
288                self.queue.insert(idx, Receiption { seq, time });
289
290                if idx < self.report_from {
291                    self.report_from = idx;
292                }
293            }
294        }
295    }
296
297    pub fn build_report(&mut self, max_byte_size: usize) -> Option<Twcc> {
298        if max_byte_size > 10_000 {
299            warn!("Refuse to build too large Twcc report");
300            return None;
301        }
302
303        // First unreported is the self.time_start relative offset of the next Twcc.
304        let first = self.queue.get(self.report_from);
305        let first = first?;
306
307        // Set once on first ever built report.
308        if self.time_start.is_none() {
309            self.time_start = Some(first.time);
310        }
311
312        let (base_seq, first_time) = (first.seq, first.time);
313        let time_start = self.time_start.expect("a start time");
314
315        // The difference between our Twcc reference time and the first ever report start time.
316        let first_time_rel = first_time - time_start;
317
318        // The value is to be interpreted in multiples of 64ms.
319        let reference_time = (first_time_rel.as_micros() as u64 / 64_000) as u32;
320
321        let mut twcc = Twcc {
322            sender_ssrc: 0.into(),
323            ssrc: 0.into(),
324            feedback_count: self.generated_reports as u8,
325            base_seq: *base_seq as u16,
326            reference_time,
327            status_count: 0,
328            chunks: VecDeque::new(),
329            delta: VecDeque::new(),
330        };
331
332        // Because reference time is in steps of 64ms, the first reported packet might have an
333        // offset (packet time resolution is 250us). This base_time is calculated backwards from
334        // reference time so that we can offset all packets from the "truncated" 64ms steps.
335        // The RFC says:
336        // The first recv delta in this packet is relative to the reference time.
337        let base_time = time_start + Duration::from_micros(reference_time as u64 * 64_000);
338
339        // The ChunkInterim are helpers structures that hold the deltas between
340        // the registered receptions.
341        build_interims(
342            &self.queue,
343            self.report_from,
344            base_seq,
345            base_time,
346            &mut self.interims,
347        );
348        let interims = &mut self.interims;
349
350        // 20 bytes is the size of the fixed fields in Twcc.
351        let mut bytes_left = max_byte_size - 20;
352
353        while !interims.is_empty() {
354            // 2 chunk + 2 large delta + 3 padding
355            const MIN_RUN_SIZE: usize = 2 + 2 + 3;
356
357            if bytes_left < MIN_RUN_SIZE {
358                break;
359            }
360
361            // Chose the packet chunk type that can fit the most interims.
362            let (mut chunk, max) = {
363                let first_status = interims.front().expect("at least one interim").status();
364
365                let c_run = PacketChunk::Run(first_status, 0);
366                let c_single = PacketChunk::VectorSingle(0, 0);
367                let c_double = PacketChunk::VectorDouble(0, 0);
368
369                let max_run = c_run.append_max(interims.iter());
370                let max_single = c_single.append_max(interims.iter());
371                let max_double = c_double.append_max(interims.iter());
372
373                let max = max_run.max(max_single).max(max_double);
374
375                // 2 chunk + 14 small delta + 3 padding
376                const MAX_SINGLE_SIZE: usize = 2 + 14 + 3;
377                // 2 chunk + 7 large delta  + 3 padding
378                const MAX_DOUBLE_SIZE: usize = 2 + 14 + 3;
379
380                if max == max_run {
381                    (c_run, max_run)
382                } else if max == max_single && bytes_left >= MAX_SINGLE_SIZE {
383                    (c_single, max_single)
384                } else if max == max_double && bytes_left >= MAX_DOUBLE_SIZE {
385                    (c_double, max_double)
386                } else {
387                    // fallback, since we can always do runs.
388                    (c_run, max_run)
389                }
390            };
391
392            // we should _definitely_ be able to fit this many reported.
393            let mut todo = max;
394
395            loop {
396                if bytes_left < MIN_RUN_SIZE {
397                    break;
398                }
399
400                if todo == 0 {
401                    break;
402                }
403
404                let i = match interims.front_mut() {
405                    Some(v) => v,
406                    None => break,
407                };
408
409                let appended = chunk.append(i);
410                assert!(appended > 0);
411                todo -= appended;
412                twcc.status_count += appended;
413
414                if i.consume(appended) {
415                    // it was fully consumed.
416                    if matches!(i, ChunkInterim::Received(_, _, _)) {
417                        self.report_from += 1;
418                    }
419
420                    if let Some(delta) = i.delta() {
421                        twcc.delta.push_back(delta);
422                        bytes_left -= delta.byte_len();
423                    }
424
425                    // move on to next interim
426                    interims.pop_front();
427                } else {
428                    // not fully consumed, then we must have run out of space in the chunk.
429                    assert!(todo == 0);
430                }
431            }
432
433            let free = chunk.free();
434            if chunk.must_be_full() && free > 0 {
435                // this must be at the end where we can shift in missing
436                assert!(interims.is_empty());
437                chunk.append(&ChunkInterim::Missing(free));
438            }
439
440            twcc.chunks.push_back(chunk);
441            bytes_left -= 2;
442        }
443
444        // libWebRTC demands at least one chunk, or it will warn with
445        // "Buffer too small (16 bytes) to fit a FeedbackPacket. Minimum size = 18"
446        // (18 bytes here is not including the RTCP header).
447        if twcc.chunks.is_empty() {
448            return None;
449        }
450
451        self.generated_reports += 1;
452
453        // clean up
454        if self.report_from > self.keep_reported {
455            let to_remove = self.report_from - self.keep_reported;
456            self.queue.drain(..to_remove);
457            self.report_from -= to_remove;
458        }
459
460        Some(twcc)
461    }
462
463    pub fn has_unreported(&self) -> bool {
464        self.queue.len() > self.report_from
465    }
466
467    /// Calculate the fraction of lost packets since the last call.
468    ///
469    /// To get periodic stats call this method at fixed intervals.
470    pub fn loss(&mut self) -> Option<f32> {
471        // Based on the algorithm described in
472        // [RFC 3550 Appendix A](https://www.rfc-editor.org/rfc/rfc3550#appendix-A.3), but instead
473        // of applying it to an individual RTP stream it's applied to the whole session using TWCC
474        // sequence numbers rather than RTP sequence numbers.
475        let max_seq = self.receive_window.max_seq?;
476        let base_seq = self.receive_window.base_seq?;
477        let expected = *max_seq - *base_seq + 1_u64;
478
479        let expected_interval = expected - self.receive_window.expected_prior;
480        self.receive_window.expected_prior = expected;
481
482        let received_interval = self.receive_window.received - self.receive_window.received_prior;
483        self.receive_window.received_prior = self.receive_window.received;
484
485        let lost_interval = expected_interval.saturating_sub(received_interval);
486
487        (expected_interval != 0).then_some(lost_interval as f32 / expected_interval as f32)
488    }
489}
490
491/// Interims are deltas between `Receiption` which is an intermediary format before
492/// we populate the Twcc report.
493fn build_interims(
494    queue: &VecDeque<Receiption>,
495    report_from: usize,
496    base_seq: SeqNo,
497    base_time: Instant,
498    interims: &mut VecDeque<ChunkInterim>,
499) {
500    interims.clear();
501    let report_from = queue.iter().enumerate().skip(report_from);
502
503    let mut prev = (base_seq, base_time);
504
505    for (index, r) in report_from {
506        let diff_seq = *r.seq - *prev.0;
507
508        if diff_seq > 1 {
509            let mut todo = diff_seq - 1;
510            while todo > 0 {
511                // max 2^13 run length in each missing chunk
512                let n = todo.min(8192);
513                interims.push_back(ChunkInterim::Missing(n as u16));
514                todo -= n;
515            }
516        }
517
518        let diff_time = if r.time < prev.1 {
519            // negative
520            let dur = prev.1 - r.time;
521            -(dur.as_micros() as i32)
522        } else {
523            let dur = r.time - prev.1;
524            dur.as_micros() as i32
525        };
526
527        let (status, time) = if diff_time < -8_192_000 || diff_time > 8_191_750 {
528            // This is too large to be representable in deltas.
529            // Abort, make a report of what we got, and start anew.
530            break;
531        } else if diff_time < 0 || diff_time > 63_750 {
532            let t = diff_time / 250;
533            assert!(t >= -32_765 && t <= 32_767);
534            (PacketStatus::ReceivedLargeOrNegativeDelta, t as i16)
535        } else {
536            let t = diff_time / 250;
537            assert!(t >= 0 && t <= 255);
538            (PacketStatus::ReceivedSmallDelta, t as i16)
539        };
540
541        interims.push_back(ChunkInterim::Received(index, status, time));
542        prev = (r.seq, r.time);
543    }
544}
545
546#[derive(Debug, Clone, Copy)]
547enum ChunkInterim {
548    Missing(u16), // max 2^13 (one run length)
549    Received(usize, PacketStatus, i16),
550}
551
552#[derive(Debug, Default)]
553struct ReceiveWindow {
554    /// The base seq num, set on the first receive.
555    base_seq: Option<SeqNo>,
556
557    /// The large seq num received.
558    max_seq: Option<SeqNo>,
559
560    /// The previous number of packets expected, used to calculate a delta for loss.
561    expected_prior: u64,
562
563    /// The total number of packets received
564    received: u64,
565
566    /// The previous number of packets received, used to calculate a delta for loss.
567    received_prior: u64,
568}
569
570impl ReceiveWindow {
571    fn record_seq(&mut self, seq: SeqNo) {
572        if self.base_seq.is_none() {
573            self.base_seq = Some(seq);
574        }
575
576        self.received += 1;
577        self.max_seq = self.max_seq.max(Some(seq));
578    }
579}
580
581impl ChunkInterim {
582    fn status(&self) -> PacketStatus {
583        match self {
584            ChunkInterim::Missing(_) => PacketStatus::NotReceived,
585            ChunkInterim::Received(_, s, _) => *s,
586        }
587    }
588
589    fn delta(&self) -> Option<Delta> {
590        match self {
591            ChunkInterim::Missing(_) => None,
592            ChunkInterim::Received(_, s, d) => match *s {
593                PacketStatus::ReceivedSmallDelta => Some(Delta::Small(*d as u8)),
594                PacketStatus::ReceivedLargeOrNegativeDelta => Some(Delta::Large(*d)),
595                _ => unreachable!(),
596            },
597        }
598    }
599
600    fn consume(&mut self, n: u16) -> bool {
601        match self {
602            ChunkInterim::Missing(c) => {
603                *c -= n;
604                *c == 0
605            }
606            ChunkInterim::Received(_, _, _) => {
607                assert!(n <= 1);
608                n == 1
609            }
610        }
611    }
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq)]
615pub enum PacketChunk {
616    Run(PacketStatus, u16), // 13 bit repeat
617    VectorSingle(u16, u16),
618    VectorDouble(u16, u16),
619}
620
621impl PacketChunk {
622    fn append_max<'a>(&self, iter: impl Iterator<Item = &'a ChunkInterim>) -> u16 {
623        let mut to_fill = *self;
624
625        let mut reached_end = true;
626
627        for i in iter {
628            if to_fill.free() == 0 {
629                reached_end = false;
630                break;
631            }
632
633            // The status is not possible to add in this chunk. This could be
634            // a large delta in a single, or a mismatching run.
635            if !to_fill.can_append_status(i.status()) {
636                reached_end = false;
637                break;
638            }
639
640            to_fill.append(i);
641        }
642
643        // As a special case, single/double must be completely filled. However if
644        // we reached the end of the interims, we can shift in "missing" to make
645        // them full.
646        if to_fill.must_be_full() && to_fill.free() > 0 && !reached_end {
647            return 0;
648        }
649
650        self.free() - to_fill.free()
651    }
652
653    fn append(&mut self, i: &ChunkInterim) -> u16 {
654        use ChunkInterim::*;
655        use PacketChunk::*;
656        let free = self.free();
657        match (self, i) {
658            (Run(s, n), Missing(c)) => {
659                if *s != PacketStatus::NotReceived {
660                    return 0;
661                }
662                let max = free.min(*c);
663                *n += max;
664                max
665            }
666            (Run(s, n), Received(_, s2, _)) => {
667                if *s != *s2 {
668                    return 0;
669                }
670                let max = free.min(1);
671                *n += max;
672                max
673            }
674            (VectorSingle(n, f), Missing(c)) => {
675                let max = free.min(*c);
676                *n <<= max;
677                *f += max;
678                max
679            }
680            (VectorSingle(n, f), Received(_, s2, _)) => {
681                if *s2 == PacketStatus::ReceivedLargeOrNegativeDelta {
682                    return 0;
683                }
684                let max = free.min(1);
685                if max == 1 {
686                    *n <<= 1;
687                    *n |= 1;
688                    *f += 1;
689                }
690                max
691            }
692            (VectorDouble(n, f), Missing(c)) => {
693                let max = free.min(*c);
694                *n <<= max * 2;
695                *f += max;
696                max
697            }
698            (VectorDouble(n, f), Received(_, s2, _)) => {
699                let max = free.min(1);
700                if max == 1 {
701                    *n <<= 2;
702                    *n |= *s2 as u16;
703                    *f += 1;
704                }
705                max
706            }
707        }
708    }
709
710    fn must_be_full(&self) -> bool {
711        match self {
712            PacketChunk::Run(_, _) => false,
713            PacketChunk::VectorSingle(_, _) => true,
714            PacketChunk::VectorDouble(_, _) => true,
715        }
716    }
717
718    fn free(&self) -> u16 {
719        match self {
720            PacketChunk::Run(_, n) => 8192 - *n,
721            PacketChunk::VectorSingle(_, filled) => 14 - *filled,
722            PacketChunk::VectorDouble(_, filled) => 7 - *filled,
723        }
724    }
725
726    fn write_to(&self, buf: &mut [u8]) {
727        let x = match self {
728            //     0                   1
729            //     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
730            //    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
731            //    |T| S |       Run Length        |
732            //    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
733            // chunk type (T):  1 bit A zero identifies this as a run length chunk.
734            // packet status symbol (S):  2 bits The symbol repeated in this run.
735            //             See above.
736            // run length (L):  13 bits An unsigned integer denoting the run length.
737            PacketChunk::Run(s, n) => {
738                let mut x = 0_u16;
739                x |= (*s as u16) << 13;
740                assert!(*n <= 8192);
741                x |= n;
742                x
743            }
744
745            // Corrected according to email exchange at the bottom..
746            //
747            //         0                   1
748            //         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
749            //        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
750            //        |T|S|       symbol list         |
751            //        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
752            //    chunk type (T):  1 bit A one identifies this as a status vector
753            //                chunk.
754            //    symbol size (S):  1 bit A zero means this vector contains only
755            //                "packet received" (1) and "packet not received" (0)
756            //                symbols.  This means we can compress each symbol to just
757            //                one bit, 14 in total.  A one means this vector contains
758            //                the normal 2-bit symbols, 7 in total.
759            //    symbol list:  14 bits A list of packet status symbols, 7 or 14 in
760            //                total.
761            PacketChunk::VectorSingle(n, fill) => {
762                assert!(*fill == 14);
763                let mut x: u16 = 1 << 15;
764                assert!(*n <= 16384);
765                x |= *n;
766                x
767            }
768            PacketChunk::VectorDouble(n, fill) => {
769                assert!(*fill == 7);
770                let mut x: u16 = 1 << 15;
771                assert!(*n <= 16384);
772                x |= 1 << 14;
773                x |= *n;
774                x
775            }
776        };
777        buf[..2].copy_from_slice(&x.to_be_bytes());
778    }
779
780    fn max_possible_status_count(&self) -> usize {
781        match self {
782            PacketChunk::Run(_, n) => *n as usize,
783            PacketChunk::VectorSingle(_, _) => 14,
784            PacketChunk::VectorDouble(_, _) => 7,
785        }
786    }
787
788    fn can_append_status(&self, status: PacketStatus) -> bool {
789        match self {
790            PacketChunk::Run(s, _) => *s == status,
791            PacketChunk::VectorSingle(_, _) => status != PacketStatus::ReceivedLargeOrNegativeDelta,
792            PacketChunk::VectorDouble(_, _) => true,
793        }
794    }
795}
796
797impl Delta {
798    fn write_to(&self, buf: &mut [u8]) -> usize {
799        match self {
800            Delta::Small(v) => {
801                buf[0] = *v;
802                1
803            }
804            Delta::Large(v) => {
805                buf[..2].copy_from_slice(&v.to_be_bytes());
806                2
807            }
808        }
809    }
810
811    fn byte_len(&self) -> usize {
812        match self {
813            Delta::Small(_) => 1,
814            Delta::Large(_) => 2,
815        }
816    }
817}
818
819#[derive(Debug, Clone, Copy, PartialEq, Eq)]
820pub enum PacketStatus {
821    NotReceived = 0b00,
822    ReceivedSmallDelta = 0b01,
823    ReceivedLargeOrNegativeDelta = 0b10,
824    Unknown = 0b11,
825}
826
827#[derive(Debug, Clone, Copy, PartialEq, Eq)]
828pub enum Delta {
829    Small(u8),
830    Large(i16),
831}
832
833impl From<PacketStatus> for u8 {
834    fn from(val: PacketStatus) -> Self {
835        val as usize as u8
836    }
837}
838
839impl From<u8> for PacketStatus {
840    fn from(v: u8) -> Self {
841        match v {
842            0b00 => Self::NotReceived,
843            0b01 => Self::ReceivedSmallDelta,
844            0b10 => Self::ReceivedLargeOrNegativeDelta,
845            _ => Self::Unknown,
846        }
847    }
848}
849
850impl<'a> TryFrom<&'a [u8]> for Twcc {
851    type Error = &'static str;
852
853    fn try_from(buf: &'a [u8]) -> Result<Self, Self::Error> {
854        if buf.len() < 16 {
855            return Err("Less than 16 bytes for start of Twcc");
856        }
857
858        let sender_ssrc = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]).into();
859        let ssrc = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]).into();
860        let base_seq = u16::from_be_bytes([buf[8], buf[9]]);
861        let status_count = u16::from_be_bytes([buf[10], buf[11]]);
862        let reference_time = u32::from_be_bytes([0, buf[12], buf[13], buf[14]]);
863        let feedback_count = buf[15];
864
865        let mut twcc = Twcc {
866            sender_ssrc,
867            ssrc,
868            base_seq,
869            status_count,
870            reference_time,
871            feedback_count,
872            chunks: VecDeque::new(),
873            delta: VecDeque::new(),
874        };
875
876        let mut todo = status_count as isize;
877        let mut buf = &buf[16..];
878        loop {
879            if todo <= 0 {
880                break;
881            }
882
883            let chunk: PacketChunk = buf.try_into()?;
884
885            todo -= chunk.max_possible_status_count() as isize;
886
887            twcc.chunks.push_back(chunk);
888            buf = &buf[2..];
889        }
890
891        if twcc.chunks.is_empty() {
892            return Ok(twcc);
893        }
894
895        fn read_delta_small(
896            buf: &[u8],
897            n: usize,
898        ) -> Result<impl Iterator<Item = Delta> + '_, &'static str> {
899            if buf.len() < n {
900                return Err("Not enough buf for small deltas");
901            }
902            Ok((0..n).map(|i| Delta::Small(buf[i])))
903        }
904
905        fn read_delta_large(
906            buf: &[u8],
907            n: usize,
908        ) -> Result<impl Iterator<Item = Delta> + '_, &'static str> {
909            if buf.len() < n * 2 {
910                return Err("Not enough buf for large deltas");
911            }
912            Ok((0..(n * 2))
913                .step_by(2)
914                .map(|i| Delta::Large(i16::from_be_bytes([buf[i], buf[i + 1]]))))
915        }
916
917        for c in &twcc.chunks {
918            match c {
919                PacketChunk::Run(PacketStatus::ReceivedSmallDelta, n) => {
920                    let n = *n as usize;
921                    twcc.delta.extend(read_delta_small(buf, n)?);
922                    buf = &buf[n..];
923                }
924                PacketChunk::Run(PacketStatus::ReceivedLargeOrNegativeDelta, n) => {
925                    let n = *n as usize;
926                    twcc.delta.extend(read_delta_large(buf, n)?);
927                    buf = &buf[n..];
928                }
929                PacketChunk::VectorSingle(v, _) => {
930                    let n = v.count_ones() as usize;
931                    twcc.delta.extend(read_delta_small(buf, n)?);
932                    buf = &buf[n..];
933                }
934                PacketChunk::VectorDouble(v, _) => {
935                    for n in (0..=12).step_by(2) {
936                        let x = (*v >> (12 - n)) & 0b11;
937                        match PacketStatus::from(x as u8) {
938                            PacketStatus::ReceivedSmallDelta => {
939                                twcc.delta.extend(read_delta_small(buf, 1)?);
940                                buf = &buf[1..];
941                            }
942                            PacketStatus::ReceivedLargeOrNegativeDelta => {
943                                twcc.delta.extend(read_delta_large(buf, 1)?);
944                                buf = &buf[2..];
945                            }
946                            _ => {}
947                        }
948                    }
949                }
950                _ => {}
951            }
952        }
953
954        Ok(twcc)
955    }
956}
957
958impl<'a> TryFrom<&'a [u8]> for PacketChunk {
959    type Error = &'static str;
960
961    fn try_from(buf: &'a [u8]) -> Result<Self, Self::Error> {
962        if buf.len() < 2 {
963            return Err("Less than 2 bytes for PacketChunk");
964        }
965
966        let x = u16::from_be_bytes([buf[0], buf[1]]);
967
968        let is_vec = (x & 0b1000_0000_0000_0000) > 0;
969
970        let p = if is_vec {
971            let is_double = (x & 0b0100_0000_0000_0000) > 0;
972            let n = x & 0b0011_1111_1111_1111;
973            if is_double {
974                PacketChunk::VectorDouble(n, 7)
975            } else {
976                PacketChunk::VectorSingle(n, 14)
977            }
978        } else {
979            let s: PacketStatus = ((x >> 13) as u8).into();
980            let n = x & 0b0001_1111_1111_1111;
981            PacketChunk::Run(s, n)
982        };
983
984        Ok(p)
985    }
986}
987
988#[derive(Debug)]
989pub struct TwccSendRegister {
990    /// How many send records to keep.
991    keep: usize,
992
993    /// Circular buffer of send records.
994    queue: VecDeque<TwccSendRecord>,
995
996    /// 0 offset for remote time in Twcc structs.
997    time_zero: Option<Instant>,
998
999    /// Last registered Twcc number.
1000    last_registered: SeqNo,
1001}
1002
1003impl<'a> IntoIterator for &'a TwccSendRegister {
1004    type Item = &'a TwccSendRecord;
1005    type IntoIter = vec_deque::Iter<'a, TwccSendRecord>;
1006
1007    fn into_iter(self) -> Self::IntoIter {
1008        self.queue.iter()
1009    }
1010}
1011
1012/// Record for a send entry in twcc.
1013#[derive(Debug)]
1014pub struct TwccSendRecord {
1015    /// Twcc sequence number for a packet we sent.
1016    seq: SeqNo,
1017
1018    /// The (local) time we sent the packet represented by seq.
1019    local_send_time: Instant,
1020
1021    /// Size in bytes of the payload sent.
1022    size: u16,
1023
1024    recv_report: Option<TwccRecvReport>,
1025}
1026
1027impl TwccSendRecord {
1028    /// The twcc sequence number of the packet we sent.
1029    pub fn seq(&self) -> SeqNo {
1030        self.seq
1031    }
1032
1033    /// The time we sent the packet.
1034    pub fn local_send_time(&self) -> Instant {
1035        self.local_send_time
1036    }
1037
1038    pub fn size(&self) -> usize {
1039        self.size as usize
1040    }
1041
1042    /// The time indicated by the remote side for when they received the packet.
1043    pub fn remote_recv_time(&self) -> Option<Instant> {
1044        self.recv_report.as_ref().and_then(|r| r.remote_recv_time)
1045    }
1046
1047    /// The rtt time between sending the packet and receiving the twcc report response.
1048    pub fn rtt(&self) -> Option<Duration> {
1049        let recv_report = self.recv_report.as_ref()?;
1050        Some(recv_report.local_recv_time - self.local_send_time)
1051    }
1052}
1053
1054#[derive(Debug)]
1055pub struct TwccRecvReport {
1056    ///  The (local) time we received confirmation the other side received the seq.
1057    local_recv_time: Instant,
1058
1059    /// The remote time the other side received the seq.
1060    remote_recv_time: Option<Instant>,
1061}
1062
1063impl TwccSendRegister {
1064    pub fn new(keep: usize) -> Self {
1065        TwccSendRegister {
1066            keep,
1067            queue: VecDeque::new(),
1068            time_zero: None,
1069            last_registered: 0.into(),
1070        }
1071    }
1072
1073    pub fn register_seq(&mut self, seq: SeqNo, now: Instant, size: usize) {
1074        self.last_registered = seq;
1075        self.queue.push_back(TwccSendRecord {
1076            seq,
1077            local_send_time: now,
1078            // In practice the max sizes is constrained by the MTU and will max out around 1200
1079            // bytes, hence this cast is fine.
1080            size: size as u16,
1081            // The recv report, derived from TWCC feedback later.
1082            recv_report: None,
1083        });
1084        while self.queue.len() > self.keep {
1085            self.queue.pop_front();
1086        }
1087    }
1088
1089    /// Apply a TWCC RTCP report.
1090    ///
1091    /// Returns a range of the sequence numbers for the applied packets if the report was
1092    /// successfully applied.
1093    pub fn apply_report(&mut self, twcc: Twcc, now: Instant) -> Option<RangeInclusive<SeqNo>> {
1094        if self.time_zero.is_none() {
1095            self.time_zero = Some(now);
1096        }
1097
1098        let time_zero = self.time_zero.unwrap();
1099        let head_seq = self.queue.front().map(|r| r.seq)?;
1100
1101        let mut iter = twcc
1102            .into_iter(time_zero, self.last_registered)
1103            .skip_while(|(seq, _, _)| seq < &head_seq);
1104        let (first_seq_no, _, first_instant) = iter.next()?;
1105
1106        let mut iter2 = self.queue.iter_mut().skip_while(|r| *r.seq < *first_seq_no);
1107        let first_record = iter2.next()?;
1108
1109        fn update(
1110            now: Instant,
1111            r: &mut TwccSendRecord,
1112            seq: SeqNo,
1113            instant: Option<Instant>,
1114        ) -> bool {
1115            if r.seq != seq {
1116                return false;
1117            }
1118            let recv_report = TwccRecvReport {
1119                local_recv_time: now,
1120                remote_recv_time: instant,
1121            };
1122            r.recv_report = Some(recv_report);
1123
1124            true
1125        }
1126
1127        if first_record.seq != first_seq_no {
1128            // Old report for which we no longer have any send records.
1129            return None;
1130        }
1131
1132        let mut problematic_seq = None;
1133
1134        if !update(now, first_record, first_seq_no, first_instant) {
1135            problematic_seq = Some((first_record.seq, first_seq_no));
1136        }
1137
1138        let mut last_seq_no = first_seq_no;
1139        for ((seq, _, instant), record) in iter.zip(iter2) {
1140            if problematic_seq.is_some() {
1141                break;
1142            }
1143
1144            if !update(now, record, seq, instant) {
1145                problematic_seq = Some((record.seq, seq));
1146            }
1147            last_seq_no = seq;
1148        }
1149
1150        if let Some((record_seq, report_seq)) = problematic_seq {
1151            let queue_tail: Vec<_> = self.queue.iter().rev().take(100).collect();
1152            panic!(
1153                "Unexpected TWCC sequence when applying TWCC report. \
1154                    Send Record Seq({record_seq}) does not match Report Seq({report_seq}).\
1155                    \nLast 100 entires in queue: {queue_tail:?}."
1156            );
1157        }
1158
1159        Some(first_seq_no..=last_seq_no)
1160    }
1161
1162    pub fn send_record(&self, seq: SeqNo) -> Option<&TwccSendRecord> {
1163        let index = self.queue.binary_search_by_key(&seq, |r| r.seq).ok()?;
1164
1165        Some(&self.queue[index])
1166    }
1167
1168    /// Calculate the egress loss for given time window.
1169    ///
1170    /// **Note:** The register only keeps a limited number of records and using `duration` values
1171    /// larger than ~1-2 seconds is liable to be inaccurate since some packets sent might have already
1172    /// been evicted from the register.
1173    pub fn loss(&self, duration: Duration, now: Instant) -> Option<f32> {
1174        // Consider only packets in the span specified by the caller
1175        let lower_bound = now - duration;
1176
1177        let packets = self
1178            .queue
1179            .iter()
1180            .rev()
1181            // If there's ingress loss but no egress loss, there's a chance the TWCC reports
1182            // themselves are lost. In this case considering packets that haven't been reported as
1183            // lost will incorrectly conclude that there is in fact egress loss.
1184            .filter(|s| s.recv_report.is_some())
1185            .take_while(|s| s.local_send_time >= lower_bound);
1186
1187        let (total, lost) = packets.fold((0, 0), |(total, lost), s| {
1188            let was_lost = s
1189                .recv_report
1190                .as_ref()
1191                .map(|rr| rr.remote_recv_time.is_none())
1192                .unwrap_or(true);
1193
1194            (total + 1, lost + u64::from(was_lost))
1195        });
1196
1197        if total == 0 {
1198            return None;
1199        }
1200
1201        Some((lost as f32) / (total as f32))
1202    }
1203
1204    /// Get all send records in a range.
1205    pub fn send_records(
1206        &self,
1207        range: RangeInclusive<SeqNo>,
1208    ) -> Option<impl Iterator<Item = &TwccSendRecord>> {
1209        let first_index = self
1210            .queue
1211            .binary_search_by_key(range.start(), |r| r.seq)
1212            .ok()?;
1213
1214        let current = *range.start();
1215
1216        Some(TwccSendRecordsIter {
1217            range,
1218            index: first_index,
1219            current,
1220            queue: &self.queue,
1221        })
1222    }
1223}
1224
1225#[derive()]
1226struct TwccSendRecordsIter<'a> {
1227    range: RangeInclusive<SeqNo>,
1228    current: SeqNo,
1229    index: usize,
1230    queue: &'a VecDeque<TwccSendRecord>,
1231}
1232
1233impl<'a> Iterator for TwccSendRecordsIter<'a> {
1234    type Item = &'a TwccSendRecord;
1235
1236    fn next(&mut self) -> Option<Self::Item> {
1237        if self.current > *self.range.end() || self.current < *self.range.start() {
1238            return None;
1239        }
1240
1241        let item = &self.queue[self.index];
1242        assert!(self.current == item.seq);
1243        self.current.inc();
1244        self.index += 1;
1245
1246        Some(item)
1247    }
1248}
1249
1250// Below is a clarification of the RFC draft from an email exchange with Erik Språng (one of the authors).
1251//
1252// > I'm trying to implement the draft spec
1253// > https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
1254// > I found a number of errors/inconsistencies in the RFC, and wonder who I should address this to.
1255// > I think the RFC could benefit from another revision.
1256// >
1257// > There are three problems listed below.
1258// >
1259// > 1. There's a contradiction between section 3.1.1 and the example in 3.1.3. First the RFC tells
1260// > me 11 is Reserved, later it shows an example using 11 saying it is a run of packets received w/o
1261// > recv delta. Which one is right?
1262// >
1263// > Section 3.1.1
1264// > ...
1265// > The status of a packet is described using a 2-bit symbol:
1266// > ...
1267// > 11 [Reserved]
1268// >
1269// > Section 3.1.3
1270// > ...
1271// > Example 2:
1272// >
1273// >       0                   1
1274// >       0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
1275// >      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1276// >      |0|1 1|0 0 0 0 0 0 0 0 1 1 0 0 0|
1277// >      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1278// >
1279// >
1280// >   This is a run of the "packet received, w/o recv delta" status of
1281// >   length 24.
1282//
1283// I believe this example is in error. Packets without receive deltas was a proposal for the v1
1284// protocol but was dropped iirc. Note that there is a newer header extension available, which
1285// when negotiated allows send-side control over when feedback is generated  - and that provides
1286// the option to omit all receive deltas from the feedback.
1287//
1288// > 2. In section 3.1.4 when using a 1-bit vector to indicate packet received or not received,
1289// > there's a contradiction between the the definition and the example. The definition says
1290// > "packet received" (0) and "packet not received" (1), while the example is the opposite way
1291// > around: 0 is packet not received. Which way around is it?
1292// >
1293// > symbol size (S):  1 bit A zero means this vector contains only
1294// >               "packet received" (0) and "packet not received" (1)
1295// >               symbols.  This means we can compress each symbol to just
1296// >               one bit, 14 in total.  A one means this vector contains
1297// >               the normal 2-bit symbols, 7 in total.
1298// > ...
1299// > Example 1:
1300// >
1301// >        0                   1
1302// >        0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
1303// >       +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1304// >       |1|0|0 1 1 1 1 1 0 0 0 1 1 1 0 0|
1305// >       +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1306// >
1307// >   This chunk contains, in order:
1308// >
1309// >      1x "packet not received"
1310// >
1311// >      5x "packet received"
1312//
1313// I believe the definition is wrong in this case. The intent is to just truncate the 2-bit values:
1314//
1315// 3.1.1.  Packet Status Symbols
1316//
1317//    The status of a packet is described using a 2-bit symbol:
1318//
1319//       00 Packet not received
1320//
1321//       01 Packet received, small delta
1322//
1323// So (0) for not received and (1) for received, small delta.
1324// This also matches what the libwebrtc source code does.
1325//
1326// > 3. In section 3.1.4 when using a 1-bit vector, the RFC doesn't say what a "packet received" in that
1327// > vector should be accompanied by in receive delta size. Is it an 8 bit delta or 16 bit
1328// > delta per "packet received"?
1329//
1330// Same as the question above, this is a truncation to (0) for not received and (1) for
1331// received, small delta.
1332
1333impl fmt::Debug for Twcc {
1334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1335        f.debug_struct("Twcc")
1336            .field("sender_ssrc", &self.sender_ssrc)
1337            .field("ssrc", &self.ssrc)
1338            .field("base_seq", &self.base_seq)
1339            .field("status_count", &self.status_count)
1340            .field("reference_time", &self.reference_time)
1341            .field("feedback_count", &self.feedback_count)
1342            .field("chunks", &self.chunks)
1343            .field("delta", &self.delta.len())
1344            .finish()
1345    }
1346}
1347
1348#[allow(clippy::assign_op_pattern)]
1349#[cfg(test)]
1350mod test {
1351    use std::time::Duration;
1352
1353    use super::*;
1354
1355    use Delta::*;
1356    use PacketChunk::*;
1357    use PacketStatus::*;
1358
1359    #[test]
1360    fn register_write_parse_small_delta() {
1361        let mut reg = TwccRecvRegister::new(100);
1362
1363        let now = Instant::now();
1364
1365        reg.update_seq(10.into(), now + Duration::from_millis(0));
1366        reg.update_seq(11.into(), now + Duration::from_millis(12));
1367        reg.update_seq(12.into(), now + Duration::from_millis(23));
1368        reg.update_seq(13.into(), now + Duration::from_millis(43));
1369
1370        let report = reg.build_report(1000).unwrap();
1371        let mut buf = vec![0_u8; 1500];
1372        let n = report.write_to(&mut buf[..]);
1373        buf.truncate(n);
1374
1375        let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1376        let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1377
1378        assert_eq!(header, report.header());
1379        assert_eq!(parsed, report);
1380    }
1381
1382    #[test]
1383    fn register_write_parse_small_delta_missing() {
1384        let mut reg = TwccRecvRegister::new(100);
1385
1386        let now = Instant::now();
1387
1388        reg.update_seq(10.into(), now + Duration::from_millis(0));
1389        reg.update_seq(11.into(), now + Duration::from_millis(12));
1390        reg.update_seq(12.into(), now + Duration::from_millis(23));
1391        // 13 is not there
1392        reg.update_seq(14.into(), now + Duration::from_millis(43));
1393
1394        let report = reg.build_report(1000).unwrap();
1395        let mut buf = vec![0_u8; 1500];
1396        let n = report.write_to(&mut buf[..]);
1397        buf.truncate(n);
1398
1399        let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1400        let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1401
1402        assert_eq!(header, report.header());
1403        assert_eq!(parsed, report);
1404    }
1405
1406    #[test]
1407    fn register_write_parse_large_delta() {
1408        let mut reg = TwccRecvRegister::new(100);
1409
1410        let now = Instant::now();
1411
1412        reg.update_seq(10.into(), now + Duration::from_millis(0));
1413        reg.update_seq(11.into(), now + Duration::from_millis(70));
1414        reg.update_seq(12.into(), now + Duration::from_millis(140));
1415        reg.update_seq(13.into(), now + Duration::from_millis(210));
1416
1417        let report = reg.build_report(1000).unwrap();
1418        let mut buf = vec![0_u8; 1500];
1419        let n = report.write_to(&mut buf[..]);
1420        buf.truncate(n);
1421
1422        let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1423        let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1424
1425        assert_eq!(header, report.header());
1426        assert_eq!(parsed, report);
1427    }
1428
1429    #[test]
1430    fn register_write_parse_mixed_delta() {
1431        let mut reg = TwccRecvRegister::new(100);
1432
1433        let now = Instant::now();
1434
1435        reg.update_seq(10.into(), now + Duration::from_millis(0));
1436        reg.update_seq(11.into(), now + Duration::from_millis(12));
1437        reg.update_seq(12.into(), now + Duration::from_millis(140));
1438        reg.update_seq(13.into(), now + Duration::from_millis(152));
1439
1440        let report = reg.build_report(1000).unwrap();
1441        let mut buf = vec![0_u8; 1500];
1442        let n = report.write_to(&mut buf[..]);
1443        buf.truncate(n);
1444
1445        let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1446        let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1447
1448        assert_eq!(header, report.header());
1449        assert_eq!(parsed, report);
1450    }
1451
1452    #[test]
1453    fn too_big_time_gap_requires_two_reports() {
1454        let mut reg = TwccRecvRegister::new(100);
1455
1456        let now = Instant::now();
1457
1458        reg.update_seq(10.into(), now + Duration::from_millis(0));
1459        reg.update_seq(11.into(), now + Duration::from_millis(12));
1460        reg.update_seq(12.into(), now + Duration::from_millis(9000));
1461
1462        let _ = reg.build_report(1000).unwrap();
1463        let report2 = reg.build_report(1000).unwrap();
1464
1465        // 9000 milliseconds is not possible to set as exact reference time which
1466        // is in multiples of 64ms. 9000/64 = 140.625.
1467        assert_eq!(report2.reference_time, 140);
1468
1469        // 140 * 64 = 8960
1470        // So the first offset must be 40ms, i.e. 40_000us / 250us = 160
1471        assert_eq!(report2.delta[0], Small(160));
1472    }
1473
1474    #[test]
1475    fn report_padded_to_even_word() {
1476        let mut reg = TwccRecvRegister::new(100);
1477
1478        let now = Instant::now();
1479
1480        reg.update_seq(10.into(), now + Duration::from_millis(0));
1481
1482        let report = reg.build_report(1000).unwrap();
1483        let mut buf = vec![0_u8; 1500];
1484        let n = report.write_to(&mut buf[..]);
1485
1486        assert!(n % 4 == 0);
1487    }
1488
1489    #[test]
1490    fn report_truncated_to_max_byte_size() {
1491        let mut reg = TwccRecvRegister::new(100);
1492
1493        let now = Instant::now();
1494
1495        reg.update_seq(10.into(), now + Duration::from_millis(0));
1496        reg.update_seq(11.into(), now + Duration::from_millis(12));
1497        reg.update_seq(12.into(), now + Duration::from_millis(140));
1498        reg.update_seq(13.into(), now + Duration::from_millis(152));
1499
1500        let report = reg.build_report(28).unwrap();
1501
1502        assert_eq!(report.status_count, 2);
1503        assert_eq!(report.chunks, vec![Run(ReceivedSmallDelta, 2)]);
1504        assert_eq!(report.delta, vec![Small(0), Small(48)]);
1505
1506        let report = reg.build_report(28).unwrap();
1507
1508        assert_eq!(report.status_count, 2);
1509        assert_eq!(report.chunks, vec![Run(ReceivedSmallDelta, 2)]);
1510        assert_eq!(report.delta, vec![Small(48), Small(48)]);
1511    }
1512
1513    #[test]
1514    fn truncated_counts_gaps_correctly() {
1515        let mut reg = TwccRecvRegister::new(100);
1516
1517        let now = Instant::now();
1518
1519        reg.update_seq(10.into(), now + Duration::from_millis(0));
1520        // gap
1521        reg.update_seq(13.into(), now + Duration::from_millis(12));
1522        reg.update_seq(14.into(), now + Duration::from_millis(140));
1523        reg.update_seq(15.into(), now + Duration::from_millis(152));
1524
1525        let report = reg.build_report(32).unwrap();
1526
1527        assert_eq!(report.status_count, 4);
1528        assert_eq!(
1529            report.chunks,
1530            vec![
1531                Run(ReceivedSmallDelta, 1),
1532                Run(NotReceived, 2),
1533                Run(ReceivedSmallDelta, 1)
1534            ]
1535        );
1536        assert_eq!(report.delta, vec![Small(0), Small(48)]);
1537    }
1538
1539    #[test]
1540    fn run_max_is_8192() {
1541        let mut reg = TwccRecvRegister::new(100);
1542
1543        let now = Instant::now();
1544
1545        reg.update_seq(0.into(), now + Duration::from_millis(0));
1546        reg.update_seq(8194.into(), now + Duration::from_millis(10));
1547
1548        let report = reg.build_report(1000).unwrap();
1549
1550        assert_eq!(report.status_count, 8195);
1551        assert_eq!(
1552            report.chunks,
1553            vec![
1554                VectorSingle(8192, 14),
1555                Run(NotReceived, 8180),
1556                Run(ReceivedSmallDelta, 1)
1557            ]
1558        );
1559    }
1560
1561    #[test]
1562    fn single_followed_by_missing() {
1563        let mut reg = TwccRecvRegister::new(100);
1564
1565        let now = Instant::now();
1566
1567        reg.update_seq(10.into(), now + Duration::from_millis(0));
1568        reg.update_seq(12.into(), now + Duration::from_millis(10));
1569        reg.update_seq(100.into(), now + Duration::from_millis(20));
1570
1571        let report = reg.build_report(2016).unwrap();
1572
1573        assert_eq!(report.status_count, 91);
1574        assert_eq!(
1575            report.chunks,
1576            vec![
1577                VectorSingle(10240, 14),
1578                Run(NotReceived, 76),
1579                Run(ReceivedSmallDelta, 1)
1580            ]
1581        );
1582        assert_eq!(report.delta, vec![Small(0), Small(40), Small(40)]);
1583    }
1584
1585    #[test]
1586    fn time_jump_small_back_for_second_report() {
1587        let mut reg = TwccRecvRegister::new(100);
1588
1589        let now = Instant::now();
1590
1591        reg.update_seq(10.into(), now + Duration::from_millis(8000));
1592        let _ = reg.build_report(2016).unwrap();
1593
1594        reg.update_seq(9.into(), now + Duration::from_millis(0));
1595        let report = reg.build_report(2016).unwrap();
1596
1597        assert_eq!(report.status_count, 2);
1598        assert_eq!(report.chunks, vec![Run(ReceivedLargeOrNegativeDelta, 2)]);
1599        assert_eq!(report.delta, vec![Large(-32000), Large(32000)]);
1600    }
1601
1602    #[test]
1603    fn time_jump_large_back_for_second_report() {
1604        let mut reg = TwccRecvRegister::new(100);
1605
1606        let now = Instant::now();
1607
1608        reg.update_seq(10.into(), now + Duration::from_millis(9000));
1609        let _ = reg.build_report(2016).unwrap();
1610
1611        reg.update_seq(9.into(), now + Duration::from_millis(0));
1612        assert!(reg.build_report(2016).is_none());
1613
1614        assert_eq!(reg.queue.len(), 1);
1615    }
1616
1617    #[test]
1618    fn empty_twcc() {
1619        let twcc = Twcc {
1620            sender_ssrc: 0.into(),
1621            ssrc: 0.into(),
1622            base_seq: 0,
1623            status_count: 0,
1624            reference_time: 0,
1625            feedback_count: 0,
1626            chunks: VecDeque::new(),
1627            delta: VecDeque::new(),
1628        };
1629
1630        let mut buf = vec![0_u8; 1500];
1631        let n = twcc.write_to(&mut buf[..]);
1632        buf.truncate(n);
1633
1634        let header: RtcpHeader = (&buf[..]).try_into().unwrap();
1635        let parsed: Twcc = (&buf[4..]).try_into().unwrap();
1636
1637        assert_eq!(header, twcc.header());
1638        assert_eq!(parsed, twcc);
1639    }
1640
1641    #[test]
1642    fn negative_deltas() {
1643        let mut reg = TwccRecvRegister::new(100);
1644
1645        let now = Instant::now();
1646
1647        reg.update_seq(10.into(), now + Duration::from_millis(12));
1648        reg.update_seq(11.into(), now + Duration::from_millis(0));
1649        reg.update_seq(12.into(), now + Duration::from_millis(23));
1650
1651        let report = reg.build_report(1000).unwrap();
1652
1653        assert_eq!(report.status_count, 3);
1654        assert_eq!(report.base_seq, 10);
1655        assert_eq!(report.reference_time, 0);
1656        assert_eq!(report.chunks, vec![VectorDouble(6400, 7)]);
1657        assert_eq!(report.delta, vec![Small(0), Large(-48), Small(92)]);
1658
1659        let base = reg.time_start.unwrap();
1660
1661        let mut iter = report.into_iter(base, 10.into());
1662        assert_eq!(
1663            iter.next(),
1664            Some((
1665                10.into(),
1666                PacketStatus::ReceivedSmallDelta,
1667                Some(base + Duration::from_millis(0))
1668            ))
1669        );
1670        assert_eq!(
1671            iter.next(),
1672            Some((
1673                11.into(),
1674                PacketStatus::ReceivedLargeOrNegativeDelta,
1675                Some(base.checked_sub(Duration::from_millis(12)).unwrap())
1676            ))
1677        );
1678        assert_eq!(
1679            iter.next(),
1680            Some((
1681                12.into(),
1682                PacketStatus::ReceivedSmallDelta,
1683                Some(base + Duration::from_millis(11))
1684            ))
1685        );
1686    }
1687
1688    #[test]
1689    fn twcc_fuzz_fail() {
1690        let mut reg = TwccRecvRegister::new(100);
1691
1692        let now = Instant::now();
1693
1694        // [Register(, ), Register(, ), Register(, ), BuildReport(43)]
1695
1696        reg.update_seq(4542.into(), now + Duration::from_millis(2373281424));
1697        reg.update_seq(15918.into(), now + Duration::from_millis(2373862820));
1698        reg.update_seq(8405.into(), now + Duration::from_millis(2379074367));
1699
1700        let report = reg.build_report(43).unwrap();
1701
1702        let mut buf = vec![0_u8; 1500];
1703        let n = report.write_to(&mut buf[..]);
1704        buf.truncate(n);
1705
1706        let header: RtcpHeader = match (&buf[..]).try_into() {
1707            Ok(v) => v,
1708            Err(_) => return,
1709        };
1710        let parsed: Twcc = match (&buf[4..]).try_into() {
1711            Ok(v) => v,
1712            Err(_) => return,
1713        };
1714
1715        assert_eq!(header, report.header());
1716        assert_eq!(parsed, report);
1717    }
1718
1719    #[test]
1720    fn test_send_register_apply_report_for_old_seq_numbers() {
1721        let mut reg = TwccSendRegister::new(25);
1722        let mut now = Instant::now();
1723
1724        for i in 0..50 {
1725            reg.register_seq(i.into(), now, 0);
1726            now = now + Duration::from_micros(15);
1727        }
1728
1729        // At this point the front of the internal queue should be seq no 25.
1730        //
1731        // Set time zero base with empty packet
1732        reg.apply_report(
1733            Twcc {
1734                sender_ssrc: Ssrc::new(),
1735                ssrc: Ssrc::new(),
1736                base_seq: 0,
1737                status_count: 0,
1738                reference_time: 0,
1739                feedback_count: 0,
1740                chunks: [].into(),
1741                delta: [].into(),
1742            },
1743            now,
1744        );
1745        now = now + Duration::from_millis(35);
1746
1747        reg.apply_report(
1748            Twcc {
1749                sender_ssrc: Ssrc::new(),
1750                ssrc: Ssrc::new(),
1751                base_seq: 20,
1752                status_count: 8,
1753                reference_time: 35,
1754                feedback_count: 0,
1755                chunks: [PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 8)].into(),
1756                delta: [
1757                    Delta::Small(10),
1758                    Delta::Small(10),
1759                    Delta::Small(10),
1760                    Delta::Small(10),
1761                    Delta::Small(10),
1762                    Delta::Small(10),
1763                    Delta::Small(10),
1764                    Delta::Small(10),
1765                ]
1766                .into(),
1767            },
1768            now,
1769        );
1770
1771        for seq in 25..=27 {
1772            let record = reg
1773                .send_record(seq.into())
1774                .unwrap_or_else(|| panic!("Should have send record for seq {seq}"));
1775
1776            assert!(
1777                record.recv_report.is_some(),
1778                "Report should have recorded recv_report for {seq}"
1779            );
1780        }
1781    }
1782
1783    #[test]
1784    fn test_twcc_iter_correct_deltas() {
1785        let twcc = Twcc {
1786            sender_ssrc: 0.into(),
1787            ssrc: 0.into(),
1788            base_seq: 1,
1789            status_count: 12,
1790            reference_time: 1337,
1791            feedback_count: 0,
1792            chunks: [
1793                PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 2),
1794                PacketChunk::VectorDouble(0b1101_0010_0101_0000, 7),
1795                PacketChunk::Run(PacketStatus::ReceivedLargeOrNegativeDelta, 3),
1796            ]
1797            .into(),
1798            delta: [
1799                // Run of length 2
1800                Delta::Small(10),
1801                Delta::Small(15),
1802                // Double status vector with 4 deltas
1803                Delta::Small(7),
1804                Delta::Large(280),
1805                Delta::Small(3),
1806                Delta::Small(13),
1807                // Run of length 3
1808                Delta::Large(-37),
1809                Delta::Large(32),
1810                Delta::Large(89),
1811            ]
1812            .into(),
1813        };
1814
1815        let now = Instant::now();
1816        let base = now + Duration::from_millis(1337 * 64);
1817        let expected = vec![
1818            (
1819                1.into(),
1820                PacketStatus::ReceivedSmallDelta,
1821                Some(base + Duration::from_micros(10 * 250)),
1822            ),
1823            (
1824                2.into(),
1825                PacketStatus::ReceivedSmallDelta,
1826                Some(base + Duration::from_micros(25 * 250)),
1827            ),
1828            (
1829                3.into(),
1830                PacketStatus::ReceivedSmallDelta,
1831                Some(base + Duration::from_micros(32 * 250)),
1832            ),
1833            (4.into(), PacketStatus::NotReceived, None),
1834            (
1835                5.into(),
1836                PacketStatus::ReceivedLargeOrNegativeDelta,
1837                Some(base + Duration::from_micros(312 * 250)),
1838            ),
1839            (
1840                6.into(),
1841                PacketStatus::ReceivedSmallDelta,
1842                Some(base + Duration::from_micros(315 * 250)),
1843            ),
1844            (
1845                7.into(),
1846                PacketStatus::ReceivedSmallDelta,
1847                Some(base + Duration::from_micros(328 * 250)),
1848            ),
1849            (8.into(), PacketStatus::NotReceived, None),
1850            (9.into(), PacketStatus::NotReceived, None),
1851            (
1852                10.into(),
1853                PacketStatus::ReceivedLargeOrNegativeDelta,
1854                Some(base + Duration::from_micros(291 * 250)),
1855            ),
1856            (
1857                11.into(),
1858                PacketStatus::ReceivedLargeOrNegativeDelta,
1859                Some(base + Duration::from_micros(323 * 250)),
1860            ),
1861            (
1862                12.into(),
1863                PacketStatus::ReceivedLargeOrNegativeDelta,
1864                Some(base + Duration::from_micros(412 * 250)),
1865            ),
1866        ];
1867
1868        let result: Vec<_> = twcc.into_iter(now, 1.into()).collect();
1869
1870        assert_eq!(result, expected);
1871    }
1872
1873    #[test]
1874    fn test_twcc_register_send_records() {
1875        let mut reg = TwccSendRegister::new(25);
1876        let mut now = Instant::now();
1877        for i in 0..25 {
1878            reg.register_seq(i.into(), now, 0);
1879            now = now + Duration::from_micros(15);
1880        }
1881
1882        let range = reg
1883            .apply_report(
1884                Twcc {
1885                    sender_ssrc: Ssrc::new(),
1886                    ssrc: Ssrc::new(),
1887                    base_seq: 0,
1888                    status_count: 8,
1889                    reference_time: 35,
1890                    feedback_count: 0,
1891                    chunks: [PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 8)].into(),
1892                    delta: [
1893                        Delta::Small(10),
1894                        Delta::Small(10),
1895                        Delta::Small(10),
1896                        Delta::Small(10),
1897                        Delta::Small(10),
1898                        Delta::Small(10),
1899                        Delta::Small(10),
1900                        Delta::Small(10),
1901                    ]
1902                    .into(),
1903                },
1904                now,
1905            )
1906            .expect("apply_report to return Some(_)");
1907
1908        assert_eq!(range, 0.into()..=7.into());
1909
1910        let iter = reg
1911            .send_records(range)
1912            .expect("send_records to return Some(_)");
1913        assert_eq!(
1914            iter.map(|r| *r.seq).collect::<Vec<_>>(),
1915            vec![0, 1, 2, 3, 4, 5, 6, 7]
1916        );
1917    }
1918
1919    #[test]
1920    fn test_twcc_send_register_loss() {
1921        let mut reg = TwccSendRegister::new(25);
1922        let mut now = Instant::now();
1923        for i in 0..9 {
1924            reg.register_seq(i.into(), now, 0);
1925            now = now + Duration::from_millis(15);
1926        }
1927
1928        now = now + Duration::from_millis(5);
1929        reg.apply_report(
1930            Twcc {
1931                sender_ssrc: Ssrc::new(),
1932                ssrc: Ssrc::new(),
1933                base_seq: 0,
1934                status_count: 9,
1935                reference_time: 35,
1936                feedback_count: 0,
1937                chunks: [
1938                    PacketChunk::VectorDouble(0b11_01_01_01_00_01_00_01, 7),
1939                    PacketChunk::Run(PacketStatus::ReceivedSmallDelta, 2),
1940                ]
1941                .into(),
1942                delta: [
1943                    Delta::Small(10),
1944                    Delta::Small(10),
1945                    Delta::Small(10),
1946                    Delta::Small(10),
1947                    Delta::Small(10),
1948                    Delta::Small(10),
1949                    Delta::Small(10),
1950                ]
1951                .into(),
1952            },
1953            now,
1954        )
1955        .expect("apply_report to return Some(_)");
1956
1957        now = now + Duration::from_millis(20);
1958        let loss = reg
1959            .loss(Duration::from_millis(150), now)
1960            .expect("Should be able to calcualte loss");
1961
1962        let pct = (loss * 100.0).floor() as u32;
1963
1964        assert_eq!(
1965            pct, 25,
1966            "The loss percentage should be 25 as 2 out of 8 packets are lost"
1967        );
1968    }
1969
1970    #[test]
1971    fn test_twcc_recv_register_loss() {
1972        let mut reg = TwccRecvRegister::new(25);
1973        let mut now = Instant::now();
1974
1975        for i in 0..10 {
1976            if i == 3 || i == 7 {
1977                // simulate loss
1978                continue;
1979            }
1980            reg.update_seq(i.into(), now);
1981            now = now + Duration::from_millis(50);
1982        }
1983
1984        assert_eq!(reg.loss(), Some(2.0 / 10.0));
1985
1986        for i in 10..20 {
1987            if i == 11 || i == 13 || i == 15 || i == 17 {
1988                // simulate loss
1989                continue;
1990            }
1991            reg.update_seq(i.into(), now);
1992            now = now + Duration::from_millis(50);
1993        }
1994
1995        assert_eq!(reg.loss(), Some(4.0 / 10.0));
1996    }
1997}