1use crate::{NtpTimestamp, RtpPacket};
2use jitter_buffer::{guess_timestamp, JitterBuffer};
3use rtcp_types::{
4 CompoundBuilder, ReceiverReport, ReportBlock, RtcpPacketWriterExt, RtcpWriteError, SdesBuilder,
5 SdesChunkBuilder, SdesItem, SdesItemBuilder, SenderReport,
6};
7use std::time::{Duration, Instant};
8use time::ext::InstantExt;
9
10mod jitter_buffer;
11
12const JITTERBUFFER_LENGTH: Duration = Duration::from_millis(100);
13
14pub struct Session {
19 ssrc: u32,
20 clock_rate: u32,
21 cname: String,
22 sender: Option<SenderState>,
23 receiver: Vec<ReceiverStatus>,
24}
25
26struct SenderState {
27 ntp_timestamp: NtpTimestamp,
28 rtp_timestamp: u64,
29
30 sender_pkg_count: u32,
31 sender_octet_count: u32,
32}
33
34#[derive(Default)]
35struct ReceiverStatus {
36 ssrc: u32,
37
38 jitter_buffer: JitterBuffer,
39
40 last_rtp_received: Option<(Instant, u64)>,
41 jitter: f32,
42
43 last_sr: Option<NtpTimestamp>,
44 total_lost: u64,
45}
46
47impl Session {
48 pub fn new(ssrc: u32, clock_rate: u32, cname: String) -> Self {
49 Self {
50 ssrc,
51 clock_rate,
52 cname,
53 sender: None,
54 receiver: vec![],
55 }
56 }
57
58 pub fn ssrc(&self) -> u32 {
60 self.ssrc
61 }
62
63 pub fn clock_rate(&self) -> u32 {
65 self.clock_rate
66 }
67
68 pub fn send_rtp(&mut self, packet: &RtpPacket) {
70 let packet = packet.get();
71
72 let sender_status = self.sender.get_or_insert(SenderState {
73 ntp_timestamp: NtpTimestamp::ZERO,
74 rtp_timestamp: 0,
75
76 sender_pkg_count: 0,
77 sender_octet_count: 0,
78 });
79
80 sender_status.ntp_timestamp = NtpTimestamp::now();
81 sender_status.rtp_timestamp =
82 guess_timestamp(sender_status.rtp_timestamp, packet.timestamp());
83
84 sender_status.sender_pkg_count += 1;
85 sender_status.sender_octet_count += packet.payload_len() as u32;
86 }
87
88 pub fn recv_rtp(&mut self, rtp_packet: RtpPacket) {
92 let packet = rtp_packet.get();
93
94 let receiver_status = if let Some(receiver_status) =
95 self.receiver.iter_mut().find(|r| r.ssrc == packet.ssrc())
96 {
97 receiver_status
98 } else {
99 self.receiver.push(ReceiverStatus {
100 ssrc: packet.ssrc(),
101 jitter_buffer: JitterBuffer::default(),
102 last_rtp_received: None,
103 jitter: 0.0,
104 last_sr: None,
105 total_lost: 0,
106 });
107
108 self.receiver.last_mut().unwrap()
109 };
110
111 let now = Instant::now();
112
113 let timestamp = if let Some((last_rtp_instant, last_rtp_timestamp)) =
115 receiver_status.last_rtp_received
116 {
117 let a = now - last_rtp_instant;
119 let a = (a.as_secs_f32() * self.clock_rate as f32) as i64;
120
121 let b = packet.timestamp() as i64 - last_rtp_timestamp as i64;
123
124 let d = a.abs_diff(b);
126
127 receiver_status.jitter =
128 receiver_status.jitter + ((d as f32).abs() - receiver_status.jitter) / 16.;
129
130 guess_timestamp(last_rtp_timestamp, packet.timestamp())
131 } else {
132 packet.timestamp() as u64
133 };
134
135 receiver_status.last_rtp_received = Some((now, timestamp));
136
137 receiver_status.jitter_buffer.push(rtp_packet);
138 }
139
140 pub fn pop_rtp(&mut self) -> Option<RtpPacket> {
141 let pop_earliest = Instant::now() - JITTERBUFFER_LENGTH;
142
143 for receiver in &mut self.receiver {
144 let Some((last_rtp_received_instant, last_rtp_received_timestamp)) =
145 receiver.last_rtp_received
146 else {
147 continue;
148 };
149
150 let max_timestamp = map_instant_to_rtp_timestamp(
151 last_rtp_received_instant,
152 last_rtp_received_timestamp,
153 self.clock_rate,
154 pop_earliest,
155 );
156
157 if let Some(packet) = receiver.jitter_buffer.pop(max_timestamp) {
158 return Some(packet);
159 }
160 }
161
162 None
163 }
164
165 pub fn recv_rtcp(&mut self, packet: rtcp_types::Packet<'_>) {
166 if let rtcp_types::Packet::Sr(sr) = packet {
168 if let Some(receiver) = self
169 .receiver
170 .iter_mut()
171 .find(|status| status.ssrc == sr.ssrc())
172 {
173 receiver.last_sr = Some(NtpTimestamp::now());
174 }
175 }
176 }
177
178 pub fn write_rtcp_report(&mut self, dst: &mut [u8]) -> Result<usize, RtcpWriteError> {
182 let now = NtpTimestamp::now();
183
184 let mut report_blocks = vec![];
185
186 for receiver in &mut self.receiver {
187 let lost = receiver.jitter_buffer.lost;
188 let received = receiver.jitter_buffer.received;
189
190 receiver.total_lost += lost;
191 receiver.jitter_buffer.lost = 0;
192 receiver.jitter_buffer.received = 0;
193
194 let fraction_lost = (lost as f64 / (received + lost) as f64) * 255.0;
195 let fraction_lost = fraction_lost as u32;
196
197 let (last_sr, delay) = if let Some(last_sr) = receiver.last_sr {
198 let delay = now - last_sr;
199 let delay = (delay.as_seconds_f64() * 65536.0) as u32;
200
201 let last_sr = last_sr.to_fixed_u32();
202
203 (last_sr, delay)
204 } else {
205 (0, 0)
206 };
207
208 let last_sequence_number = receiver
209 .jitter_buffer
210 .last_sequence_number()
211 .unwrap_or_default();
212
213 let report_block = ReportBlock::builder(receiver.ssrc)
214 .fraction_lost(fraction_lost as u8)
215 .cumulative_lost(receiver.total_lost as u32)
216 .extended_sequence_number(lower_32bits(last_sequence_number))
217 .interarrival_jitter(receiver.jitter as u32)
218 .last_sender_report_timestamp(last_sr)
219 .delay_since_last_sender_report_timestamp(delay);
220
221 report_blocks.push(report_block);
222 }
223
224 let mut compound = CompoundBuilder::default();
225
226 if let Some(sender_info) = &self.sender {
228 let rtp_timestamp = {
229 let offset = (self.clock_rate * (now - sender_info.ntp_timestamp)).as_seconds_f64()
230 * self.clock_rate as f64;
231 sender_info.rtp_timestamp + offset as u64
232 };
233
234 let mut sr = SenderReport::builder(self.ssrc)
235 .ntp_timestamp(now.to_fixed_u64())
236 .rtp_timestamp(lower_32bits(rtp_timestamp))
237 .packet_count(sender_info.sender_pkg_count)
238 .octet_count(sender_info.sender_octet_count);
239
240 for report_blocks in report_blocks {
241 sr = sr.add_report_block(report_blocks);
242 }
243
244 compound = compound.add_packet(sr);
245 } else {
246 let mut rr = ReceiverReport::builder(self.ssrc);
247
248 for report_blocks in report_blocks {
249 rr = rr.add_report_block(report_blocks);
250 }
251
252 compound = compound.add_packet(rr);
253 }
254
255 let sdes = SdesBuilder::default().add_chunk(
257 SdesChunkBuilder::new(self.ssrc)
258 .add_item(SdesItemBuilder::new(SdesItem::CNAME, &self.cname)),
259 );
260
261 compound.add_packet(sdes).write_into(dst)
263 }
264}
265
266fn map_instant_to_rtp_timestamp(
267 reference_instant: Instant,
268 reference_timestamp: u64,
269 clock_rate: u32,
270 instant: Instant,
271) -> u64 {
272 let delta = instant.signed_duration_since(reference_instant);
273 let delta_in_rtp_timesteps = (delta.as_seconds_f32() * clock_rate as f32) as i64;
274 (reference_timestamp as i64 + delta_in_rtp_timesteps) as u64
275}
276
277fn lower_32bits(i: u64) -> u32 {
278 (i & u64::from(u32::MAX)) as u32
279}