sfu/interceptor/report/
receiver_stream.rs1use std::time::Instant;
2
3pub(crate) struct ReceiverStream {
4 ssrc: u32,
5 receiver_ssrc: u32,
6 clock_rate: f64,
7
8 packets: Vec<u64>,
9 started: bool,
10 seq_num_cycles: u16,
11 last_seq_num: i32,
12 last_report_seq_num: i32,
13 last_rtp_time_rtp: u32,
14 last_rtp_time_time: Instant,
15 jitter: f64,
16 last_sender_report: u32,
17 last_sender_report_time: Instant,
18 total_lost: u32,
19}
20
21impl ReceiverStream {
22 pub(crate) fn new(ssrc: u32, clock_rate: u32) -> Self {
23 Self {
24 ssrc,
25 receiver_ssrc: rand::random::<u32>(),
26 clock_rate: clock_rate as f64,
27
28 packets: vec![0u64; 128],
29 started: false,
30 seq_num_cycles: 0,
31 last_seq_num: 0,
32 last_report_seq_num: 0,
33 last_rtp_time_rtp: 0,
34 last_rtp_time_time: Instant::now(),
35 jitter: 0.0,
36 last_sender_report: 0,
37 last_sender_report_time: Instant::now(),
38 total_lost: 0,
39 }
40 }
41
42 fn set_received(&mut self, seq: u16) {
43 let pos = (seq as usize) % self.packets.len();
44 self.packets[pos / 64] |= 1 << (pos % 64);
45 }
46
47 fn del_received(&mut self, seq: u16) {
48 let pos = (seq as usize) % self.packets.len();
49 self.packets[pos / 64] &= u64::MAX ^ (1u64 << (pos % 64));
50 }
51
52 fn get_received(&self, seq: u16) -> bool {
53 let pos = (seq as usize) % self.packets.len();
54 (self.packets[pos / 64] & (1 << (pos % 64))) != 0
55 }
56
57 pub(crate) fn process_rtp(&mut self, now: Instant, pkt: &rtp::packet::Packet) {
58 if !self.started {
59 self.started = true;
61 self.set_received(pkt.header.sequence_number);
62 self.last_seq_num = pkt.header.sequence_number as i32;
63 self.last_report_seq_num = pkt.header.sequence_number as i32 - 1;
64 } else {
65 self.set_received(pkt.header.sequence_number);
67
68 let diff = pkt.header.sequence_number as i32 - self.last_seq_num;
69 if !(-0x0FFF..=0).contains(&diff) {
70 if diff < -0x0FFF {
72 self.seq_num_cycles += 1;
73 }
74
75 for i in self.last_seq_num + 1..pkt.header.sequence_number as i32 {
77 self.del_received(i as u16);
78 }
79
80 self.last_seq_num = pkt.header.sequence_number as i32;
81 }
82
83 let d = now.duration_since(self.last_rtp_time_time).as_secs_f64() * self.clock_rate
86 - (pkt.header.timestamp as f64 - self.last_rtp_time_rtp as f64);
87 self.jitter += (d.abs() - self.jitter) / 16.0;
88 }
89
90 self.last_rtp_time_rtp = pkt.header.timestamp;
91 self.last_rtp_time_time = now;
92 }
93
94 pub(crate) fn process_sender_report(
95 &mut self,
96 now: Instant,
97 sr: &rtcp::sender_report::SenderReport,
98 ) {
99 self.last_sender_report = (sr.ntp_time >> 16) as u32;
100 self.last_sender_report_time = now;
101 }
102
103 pub(crate) fn generate_report(
104 &mut self,
105 now: Instant,
106 ) -> rtcp::receiver_report::ReceiverReport {
107 let total_since_report = (self.last_seq_num - self.last_report_seq_num) as u16;
108 let mut total_lost_since_report = {
109 if self.last_seq_num == self.last_report_seq_num {
110 0
111 } else {
112 let mut ret = 0u32;
113 let mut i = (self.last_report_seq_num + 1) as u16;
114 while i != self.last_seq_num as u16 {
115 if !self.get_received(i) {
116 ret += 1;
117 }
118 i = i.wrapping_add(1);
119 }
120 ret
121 }
122 };
123
124 self.total_lost += total_lost_since_report;
125
126 if total_lost_since_report > 0xFFFFFF {
128 total_lost_since_report = 0xFFFFFF;
129 }
130 if self.total_lost > 0xFFFFFF {
131 self.total_lost = 0xFFFFFF
132 }
133
134 let r = rtcp::receiver_report::ReceiverReport {
135 ssrc: self.receiver_ssrc,
136 reports: vec![rtcp::reception_report::ReceptionReport {
137 ssrc: self.ssrc,
138 last_sequence_number: (self.seq_num_cycles as u32) << 16
139 | (self.last_seq_num as u32),
140 last_sender_report: self.last_sender_report,
141 fraction_lost: ((total_lost_since_report * 256) as f64 / total_since_report as f64)
142 as u8,
143 total_lost: self.total_lost,
144 delay: (now
145 .duration_since(self.last_sender_report_time)
146 .as_secs_f64()
147 * 65536.0) as u32,
148 jitter: self.jitter as u32,
149 }],
150 ..Default::default()
151 };
152
153 self.last_report_seq_num = self.last_seq_num;
154
155 r
156 }
157}