Skip to main content

ezk_rtp/session/
mod.rs

1use crate::{NtpTimestamp, RtpPacket};
2use jitter_buffer::{guess_timestamp, JitterBuffer};
3use rtcp_types::{
4    CompoundBuilder, ReceiverReport, ReportBlock, RtcpPacketWriterExt, RtcpWriteError, SdesBuilder,
5    SdesChunkBuilder, SdesItem, SdesItemBuilder, SenderReport,
6};
7use std::time::{Duration, Instant};
8use time::ext::InstantExt;
9
10mod jitter_buffer;
11
12const JITTERBUFFER_LENGTH: Duration = Duration::from_millis(100);
13
14/// Single RTP session, (1 sender, many receiver)
15///
16/// This can be used to publish a single RTP source and receive others.
17/// It manages a jitterbuffer for every remote ssrc and can generate RTCP reports.
18pub struct Session {
19    ssrc: u32,
20    clock_rate: u32,
21    cname: String,
22    sender: Option<SenderState>,
23    receiver: Vec<ReceiverStatus>,
24}
25
26struct SenderState {
27    ntp_timestamp: NtpTimestamp,
28    rtp_timestamp: u64,
29
30    sender_pkg_count: u32,
31    sender_octet_count: u32,
32}
33
34#[derive(Default)]
35struct ReceiverStatus {
36    ssrc: u32,
37
38    jitter_buffer: JitterBuffer,
39
40    last_rtp_received: Option<(Instant, u64)>,
41    jitter: f32,
42
43    last_sr: Option<NtpTimestamp>,
44    total_lost: u64,
45}
46
47impl Session {
48    pub fn new(ssrc: u32, clock_rate: u32, cname: String) -> Self {
49        Self {
50            ssrc,
51            clock_rate,
52            cname,
53            sender: None,
54            receiver: vec![],
55        }
56    }
57
58    /// Sender ssrc of this session
59    pub fn ssrc(&self) -> u32 {
60        self.ssrc
61    }
62
63    /// Clock rate of the RTP timestamp
64    pub fn clock_rate(&self) -> u32 {
65        self.clock_rate
66    }
67
68    /// Register an RTP packet before sending it out
69    pub fn send_rtp(&mut self, packet: &RtpPacket) {
70        let packet = packet.get();
71
72        let sender_status = self.sender.get_or_insert(SenderState {
73            ntp_timestamp: NtpTimestamp::ZERO,
74            rtp_timestamp: 0,
75
76            sender_pkg_count: 0,
77            sender_octet_count: 0,
78        });
79
80        sender_status.ntp_timestamp = NtpTimestamp::now();
81        sender_status.rtp_timestamp =
82            guess_timestamp(sender_status.rtp_timestamp, packet.timestamp());
83
84        sender_status.sender_pkg_count += 1;
85        sender_status.sender_octet_count += packet.payload_len() as u32;
86    }
87
88    /// Receive an RTP packet.
89    ///
90    /// The session consumes the packet and puts in into a internal jitterbuffer to fix potential reordering.
91    pub fn recv_rtp(&mut self, rtp_packet: RtpPacket) {
92        let packet = rtp_packet.get();
93
94        let receiver_status = if let Some(receiver_status) =
95            self.receiver.iter_mut().find(|r| r.ssrc == packet.ssrc())
96        {
97            receiver_status
98        } else {
99            self.receiver.push(ReceiverStatus {
100                ssrc: packet.ssrc(),
101                jitter_buffer: JitterBuffer::default(),
102                last_rtp_received: None,
103                jitter: 0.0,
104                last_sr: None,
105                total_lost: 0,
106            });
107
108            self.receiver.last_mut().unwrap()
109        };
110
111        let now = Instant::now();
112
113        // Update jitter and find extended timestamp
114        let timestamp = if let Some((last_rtp_instant, last_rtp_timestamp)) =
115            receiver_status.last_rtp_received
116        {
117            // Rj - Ri
118            let a = now - last_rtp_instant;
119            let a = (a.as_secs_f32() * self.clock_rate as f32) as i64;
120
121            // Sj - Si
122            let b = packet.timestamp() as i64 - last_rtp_timestamp as i64;
123
124            // (Rj - Ri) - (Sj - Si)
125            let d = a.abs_diff(b);
126
127            receiver_status.jitter =
128                receiver_status.jitter + ((d as f32).abs() - receiver_status.jitter) / 16.;
129
130            guess_timestamp(last_rtp_timestamp, packet.timestamp())
131        } else {
132            packet.timestamp() as u64
133        };
134
135        receiver_status.last_rtp_received = Some((now, timestamp));
136
137        receiver_status.jitter_buffer.push(rtp_packet);
138    }
139
140    pub fn pop_rtp(&mut self) -> Option<RtpPacket> {
141        let pop_earliest = Instant::now() - JITTERBUFFER_LENGTH;
142
143        for receiver in &mut self.receiver {
144            let Some((last_rtp_received_instant, last_rtp_received_timestamp)) =
145                receiver.last_rtp_received
146            else {
147                continue;
148            };
149
150            let max_timestamp = map_instant_to_rtp_timestamp(
151                last_rtp_received_instant,
152                last_rtp_received_timestamp,
153                self.clock_rate,
154                pop_earliest,
155            );
156
157            if let Some(packet) = receiver.jitter_buffer.pop(max_timestamp) {
158                return Some(packet);
159            }
160        }
161
162        None
163    }
164
165    pub fn recv_rtcp(&mut self, packet: rtcp_types::Packet<'_>) {
166        // TODO: read reports
167        if let rtcp_types::Packet::Sr(sr) = packet {
168            if let Some(receiver) = self
169                .receiver
170                .iter_mut()
171                .find(|status| status.ssrc == sr.ssrc())
172            {
173                receiver.last_sr = Some(NtpTimestamp::now());
174            }
175        }
176    }
177
178    /// Generate RTCP sender or receiver report packet.
179    ///
180    /// This resets the internal received & lost packets counter for every receiver.
181    pub fn write_rtcp_report(&mut self, dst: &mut [u8]) -> Result<usize, RtcpWriteError> {
182        let now = NtpTimestamp::now();
183
184        let mut report_blocks = vec![];
185
186        for receiver in &mut self.receiver {
187            let lost = receiver.jitter_buffer.lost;
188            let received = receiver.jitter_buffer.received;
189
190            receiver.total_lost += lost;
191            receiver.jitter_buffer.lost = 0;
192            receiver.jitter_buffer.received = 0;
193
194            let fraction_lost = (lost as f64 / (received + lost) as f64) * 255.0;
195            let fraction_lost = fraction_lost as u32;
196
197            let (last_sr, delay) = if let Some(last_sr) = receiver.last_sr {
198                let delay = now - last_sr;
199                let delay = (delay.as_seconds_f64() * 65536.0) as u32;
200
201                let last_sr = last_sr.to_fixed_u32();
202
203                (last_sr, delay)
204            } else {
205                (0, 0)
206            };
207
208            let last_sequence_number = receiver
209                .jitter_buffer
210                .last_sequence_number()
211                .unwrap_or_default();
212
213            let report_block = ReportBlock::builder(receiver.ssrc)
214                .fraction_lost(fraction_lost as u8)
215                .cumulative_lost(receiver.total_lost as u32)
216                .extended_sequence_number(lower_32bits(last_sequence_number))
217                .interarrival_jitter(receiver.jitter as u32)
218                .last_sender_report_timestamp(last_sr)
219                .delay_since_last_sender_report_timestamp(delay);
220
221            report_blocks.push(report_block);
222        }
223
224        let mut compound = CompoundBuilder::default();
225
226        // Add report block
227        if let Some(sender_info) = &self.sender {
228            let rtp_timestamp = {
229                let offset = (self.clock_rate * (now - sender_info.ntp_timestamp)).as_seconds_f64()
230                    * self.clock_rate as f64;
231                sender_info.rtp_timestamp + offset as u64
232            };
233
234            let mut sr = SenderReport::builder(self.ssrc)
235                .ntp_timestamp(now.to_fixed_u64())
236                .rtp_timestamp(lower_32bits(rtp_timestamp))
237                .packet_count(sender_info.sender_pkg_count)
238                .octet_count(sender_info.sender_octet_count);
239
240            for report_blocks in report_blocks {
241                sr = sr.add_report_block(report_blocks);
242            }
243
244            compound = compound.add_packet(sr);
245        } else {
246            let mut rr = ReceiverReport::builder(self.ssrc);
247
248            for report_blocks in report_blocks {
249                rr = rr.add_report_block(report_blocks);
250            }
251
252            compound = compound.add_packet(rr);
253        }
254
255        // Add source description block
256        let sdes = SdesBuilder::default().add_chunk(
257            SdesChunkBuilder::new(self.ssrc)
258                .add_item(SdesItemBuilder::new(SdesItem::CNAME, &self.cname)),
259        );
260
261        // Add block and write into dst
262        compound.add_packet(sdes).write_into(dst)
263    }
264}
265
266fn map_instant_to_rtp_timestamp(
267    reference_instant: Instant,
268    reference_timestamp: u64,
269    clock_rate: u32,
270    instant: Instant,
271) -> u64 {
272    let delta = instant.signed_duration_since(reference_instant);
273    let delta_in_rtp_timesteps = (delta.as_seconds_f32() * clock_rate as f32) as i64;
274    (reference_timestamp as i64 + delta_in_rtp_timesteps) as u64
275}
276
277fn lower_32bits(i: u64) -> u32 {
278    (i & u64::from(u32::MAX)) as u32
279}