sfu/interceptor/report/
receiver_stream.rs

1use std::time::Instant;
2
3pub(crate) struct ReceiverStream {
4    ssrc: u32,
5    receiver_ssrc: u32,
6    clock_rate: f64,
7
8    packets: Vec<u64>,
9    started: bool,
10    seq_num_cycles: u16,
11    last_seq_num: i32,
12    last_report_seq_num: i32,
13    last_rtp_time_rtp: u32,
14    last_rtp_time_time: Instant,
15    jitter: f64,
16    last_sender_report: u32,
17    last_sender_report_time: Instant,
18    total_lost: u32,
19}
20
21impl ReceiverStream {
22    pub(crate) fn new(ssrc: u32, clock_rate: u32) -> Self {
23        Self {
24            ssrc,
25            receiver_ssrc: rand::random::<u32>(),
26            clock_rate: clock_rate as f64,
27
28            packets: vec![0u64; 128],
29            started: false,
30            seq_num_cycles: 0,
31            last_seq_num: 0,
32            last_report_seq_num: 0,
33            last_rtp_time_rtp: 0,
34            last_rtp_time_time: Instant::now(),
35            jitter: 0.0,
36            last_sender_report: 0,
37            last_sender_report_time: Instant::now(),
38            total_lost: 0,
39        }
40    }
41
42    fn set_received(&mut self, seq: u16) {
43        let pos = (seq as usize) % self.packets.len();
44        self.packets[pos / 64] |= 1 << (pos % 64);
45    }
46
47    fn del_received(&mut self, seq: u16) {
48        let pos = (seq as usize) % self.packets.len();
49        self.packets[pos / 64] &= u64::MAX ^ (1u64 << (pos % 64));
50    }
51
52    fn get_received(&self, seq: u16) -> bool {
53        let pos = (seq as usize) % self.packets.len();
54        (self.packets[pos / 64] & (1 << (pos % 64))) != 0
55    }
56
57    pub(crate) fn process_rtp(&mut self, now: Instant, pkt: &rtp::packet::Packet) {
58        if !self.started {
59            // first frame
60            self.started = true;
61            self.set_received(pkt.header.sequence_number);
62            self.last_seq_num = pkt.header.sequence_number as i32;
63            self.last_report_seq_num = pkt.header.sequence_number as i32 - 1;
64        } else {
65            // following frames
66            self.set_received(pkt.header.sequence_number);
67
68            let diff = pkt.header.sequence_number as i32 - self.last_seq_num;
69            if !(-0x0FFF..=0).contains(&diff) {
70                // overflow
71                if diff < -0x0FFF {
72                    self.seq_num_cycles += 1;
73                }
74
75                // set missing packets as missing
76                for i in self.last_seq_num + 1..pkt.header.sequence_number as i32 {
77                    self.del_received(i as u16);
78                }
79
80                self.last_seq_num = pkt.header.sequence_number as i32;
81            }
82
83            // compute jitter
84            // https://tools.ietf.org/html/rfc3550#page-39
85            let d = now.duration_since(self.last_rtp_time_time).as_secs_f64() * self.clock_rate
86                - (pkt.header.timestamp as f64 - self.last_rtp_time_rtp as f64);
87            self.jitter += (d.abs() - self.jitter) / 16.0;
88        }
89
90        self.last_rtp_time_rtp = pkt.header.timestamp;
91        self.last_rtp_time_time = now;
92    }
93
94    pub(crate) fn process_sender_report(
95        &mut self,
96        now: Instant,
97        sr: &rtcp::sender_report::SenderReport,
98    ) {
99        self.last_sender_report = (sr.ntp_time >> 16) as u32;
100        self.last_sender_report_time = now;
101    }
102
103    pub(crate) fn generate_report(
104        &mut self,
105        now: Instant,
106    ) -> rtcp::receiver_report::ReceiverReport {
107        let total_since_report = (self.last_seq_num - self.last_report_seq_num) as u16;
108        let mut total_lost_since_report = {
109            if self.last_seq_num == self.last_report_seq_num {
110                0
111            } else {
112                let mut ret = 0u32;
113                let mut i = (self.last_report_seq_num + 1) as u16;
114                while i != self.last_seq_num as u16 {
115                    if !self.get_received(i) {
116                        ret += 1;
117                    }
118                    i = i.wrapping_add(1);
119                }
120                ret
121            }
122        };
123
124        self.total_lost += total_lost_since_report;
125
126        // allow up to 24 bits
127        if total_lost_since_report > 0xFFFFFF {
128            total_lost_since_report = 0xFFFFFF;
129        }
130        if self.total_lost > 0xFFFFFF {
131            self.total_lost = 0xFFFFFF
132        }
133
134        let r = rtcp::receiver_report::ReceiverReport {
135            ssrc: self.receiver_ssrc,
136            reports: vec![rtcp::reception_report::ReceptionReport {
137                ssrc: self.ssrc,
138                last_sequence_number: (self.seq_num_cycles as u32) << 16
139                    | (self.last_seq_num as u32),
140                last_sender_report: self.last_sender_report,
141                fraction_lost: ((total_lost_since_report * 256) as f64 / total_since_report as f64)
142                    as u8,
143                total_lost: self.total_lost,
144                delay: (now
145                    .duration_since(self.last_sender_report_time)
146                    .as_secs_f64()
147                    * 65536.0) as u32,
148                jitter: self.jitter as u32,
149            }],
150            ..Default::default()
151        };
152
153        self.last_report_seq_num = self.last_seq_num;
154
155        r
156    }
157}