1use bytes::Bytes;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, Instant};
5use tracing::{debug, warn};
6
7use crate::error::Error;
8use crate::packet::RtpPacket;
9use crate::{Result, RtpSequenceNumber, RtpSsrc, RtpTimestamp};
10
11pub struct RtpStream {
13 pub ssrc: RtpSsrc,
15
16 latest_seq: RtpSequenceNumber,
18
19 highest_seq: u32,
21
22 base_seq: RtpSequenceNumber,
24
25 initialized: bool,
27
28 last_packet_time: Instant,
30
31 packets_received: u64,
33
34 bytes_received: u64,
36
37 packets_lost: u64,
39
40 duplicates: u64,
42
43 jitter: f64,
45
46 last_arrival: Option<Instant>,
48
49 last_timestamp: Option<RtpTimestamp>,
51
52 clock_rate: u32,
54
55 jitter_buffer: Option<Arc<Mutex<VecDeque<RtpPacket>>>>,
57
58 max_jitter_size: usize,
60
61 max_packet_age: Duration,
63
64 seq_cycles: u16,
66
67 last_sr_timestamp: Option<u32>,
69
70 last_sr_time: Option<Instant>,
72}
73
74impl RtpStream {
75 pub fn new(ssrc: RtpSsrc, clock_rate: u32) -> Self {
77 Self {
78 ssrc,
79 latest_seq: 0,
80 highest_seq: 0,
81 base_seq: 0,
82 initialized: false,
83 last_packet_time: Instant::now(),
84 packets_received: 0,
85 bytes_received: 0,
86 packets_lost: 0,
87 duplicates: 0,
88 jitter: 0.0,
89 last_arrival: None,
90 last_timestamp: None,
91 clock_rate,
92 jitter_buffer: None,
93 max_jitter_size: 50, max_packet_age: Duration::from_millis(200), seq_cycles: 0,
96 last_sr_timestamp: None,
97 last_sr_time: None,
98 }
99 }
100
101 pub fn with_jitter_buffer(
103 ssrc: RtpSsrc,
104 clock_rate: u32,
105 buffer_size: usize,
106 max_age_ms: u64
107 ) -> Self {
108 let mut stream = Self::new(ssrc, clock_rate);
109 stream.enable_jitter_buffer(buffer_size, max_age_ms);
110 stream
111 }
112
113 pub fn enable_jitter_buffer(&mut self, size: usize, max_age_ms: u64) {
115 self.jitter_buffer = Some(Arc::new(Mutex::new(VecDeque::with_capacity(size))));
116 self.max_jitter_size = size;
117 self.max_packet_age = Duration::from_millis(max_age_ms);
118 }
119
120 pub fn disable_jitter_buffer(&mut self) {
122 self.jitter_buffer = None;
123 }
124
125 pub fn process_packet(&mut self, packet: RtpPacket) -> Option<RtpPacket> {
129 let now = Instant::now();
130 self.last_packet_time = now;
131
132 let seq = packet.header.sequence_number;
133 let timestamp = packet.header.timestamp;
134
135 self.packets_received += 1;
137 self.bytes_received += packet.size() as u64;
138
139 if !self.initialized {
141 self.init_sequence(seq);
142 self.last_timestamp = Some(timestamp);
143 self.last_arrival = Some(now);
144 self.initialized = true;
145 return Some(packet);
146 }
147
148 self.update_sequence(seq);
150
151 if let (Some(last_arrival), Some(last_ts)) = (self.last_arrival, self.last_timestamp) {
153 let arrival_diff = now.duration_since(last_arrival).as_secs_f64();
154 let ts_diff = ((timestamp as i32 - last_ts as i32).abs() as f64) / (self.clock_rate as f64);
155
156 let d = arrival_diff - ts_diff;
158 self.jitter += (d.abs() - self.jitter) / 16.0;
159 }
160
161 self.last_arrival = Some(now);
162 self.last_timestamp = Some(timestamp);
163
164 if let Some(buffer) = &self.jitter_buffer {
166 self.add_to_jitter_buffer(packet, buffer.clone());
167 self.get_next_from_jitter_buffer(buffer.clone())
168 } else {
169 Some(packet)
171 }
172 }
173
174 fn init_sequence(&mut self, seq: RtpSequenceNumber) {
176 self.base_seq = seq;
177 self.latest_seq = seq;
178 self.highest_seq = seq as u32;
179 debug!("Initialized RTP stream with seq={}", seq);
180 }
181
182 fn update_sequence(&mut self, seq: RtpSequenceNumber) {
184 if seq < 0x1000 && self.latest_seq > 0xf000 {
186 debug!("Detected sequence wraparound: {} -> {}", self.latest_seq, seq);
187 self.seq_cycles += 1;
188 }
189
190 if seq == self.latest_seq {
192 self.duplicates += 1;
193 return;
194 }
195
196 let extended_seq = (self.seq_cycles as u32) << 16 | (seq as u32);
198
199 if extended_seq > self.highest_seq {
201 let expected_seq = (self.latest_seq as u32 + 1) & 0xFFFF;
203 if seq != expected_seq as u16 {
204 let gap = if seq > expected_seq as u16 {
206 seq - expected_seq as u16
207 } else {
208 ((0xFFFF as u32 + 1) - expected_seq as u32) as u16 + seq
210 };
211
212 if gap > 0 {
213 self.packets_lost += gap as u64;
214 debug!("Detected sequence gap: expected={}, got={}, lost={}",
215 expected_seq, seq, gap);
216 }
217 }
218
219 self.highest_seq = extended_seq;
220 } else {
221 debug!("Out of order packet: seq={}, highest={}", seq, self.highest_seq & 0xFFFF);
223 }
224
225 self.latest_seq = seq;
226 }
227
228 fn add_to_jitter_buffer(&self, packet: RtpPacket, buffer: Arc<Mutex<VecDeque<RtpPacket>>>) {
230 if let Ok(mut buffer_lock) = buffer.lock() {
231 if buffer_lock.len() >= self.max_jitter_size {
232 buffer_lock.pop_front();
234 warn!("Jitter buffer full, dropping oldest packet");
235 }
236
237 let seq = packet.header.sequence_number;
239 let pos = buffer_lock.iter().position(|p| {
240 let p_seq = p.header.sequence_number;
241 is_sequence_newer(seq, p_seq)
242 });
243
244 if let Some(pos) = pos {
245 buffer_lock.insert(pos, packet);
246 } else {
247 buffer_lock.push_back(packet);
249 }
250 }
251 }
252
253 fn get_next_from_jitter_buffer(&self, buffer: Arc<Mutex<VecDeque<RtpPacket>>>) -> Option<RtpPacket> {
255 if let Ok(mut buffer_lock) = buffer.lock() {
256 if buffer_lock.is_empty() {
257 return None;
258 }
259
260 let first_packet = buffer_lock.front()?;
262 let expected_seq = (self.latest_seq as u32 + 1) & 0xFFFF;
263
264 if first_packet.header.sequence_number == expected_seq as u16 {
266 return buffer_lock.pop_front();
267 }
268
269 if buffer_lock.len() > self.max_jitter_size / 2 {
271 return buffer_lock.pop_front();
272 }
273 }
274
275 None
276 }
277
278 pub fn get_jitter_ms(&self) -> f64 {
280 self.jitter * 1000.0
281 }
282
283 pub fn get_stats(&self) -> RtpStreamStats {
285 RtpStreamStats {
286 ssrc: self.ssrc,
287 packets_received: self.packets_received,
288 bytes_received: self.bytes_received,
289 packets_lost: self.packets_lost,
290 duplicates: self.duplicates,
291 last_packet_time: Some(self.last_packet_time),
292 jitter: self.jitter as u32,
293 first_seq: self.base_seq as u32,
294 highest_seq: self.highest_seq,
295 received: self.packets_received as u32,
296 }
297 }
298
299 pub fn ensure_initialized(&mut self, seq: u16) {
303 if !self.initialized {
304 self.init_sequence(seq as RtpSequenceNumber);
305
306 self.packets_received = 0;
308 self.highest_seq = seq as u32;
309 self.latest_seq = seq as RtpSequenceNumber;
310 self.packets_lost = 0;
311 self.duplicates = 0;
312 self.jitter = 0.0;
313 self.last_arrival = None;
314 self.last_timestamp = None;
315
316 self.initialized = true;
317 debug!("Initialized RTP stream with seq={}", seq);
318 }
319 }
320
321 pub fn update_last_sr_info(&mut self, sr_timestamp: u32, time: Instant) {
323 self.last_sr_timestamp = Some(sr_timestamp);
324 self.last_sr_time = Some(time);
325 }
326
327 pub fn get_last_sr_info(&self) -> (Option<u32>, Option<Instant>) {
329 (self.last_sr_timestamp, self.last_sr_time)
330 }
331
332 pub fn calculate_delay_since_last_sr(&self) -> u32 {
334 if let (Some(timestamp), Some(time)) = (self.last_sr_timestamp, self.last_sr_time) {
335 let delay_secs = Instant::now().duration_since(time).as_secs_f64();
337 (delay_secs * 65536.0) as u32
338 } else {
339 0
340 }
341 }
342}
343
344fn is_sequence_newer(a: RtpSequenceNumber, b: RtpSequenceNumber) -> bool {
346 let half_range = 0x8000;
347 if b < a {
349 (a - b) <= half_range
350 } else {
351 (b - a) > half_range
352 }
353}
354
355#[derive(Debug, Clone, Default)]
357pub struct RtpStreamStats {
358 pub ssrc: RtpSsrc,
360
361 pub packets_received: u64,
363
364 pub bytes_received: u64,
366
367 pub packets_lost: u64,
369
370 pub duplicates: u64,
372
373 pub last_packet_time: Option<Instant>,
375
376 pub jitter: u32,
378
379 pub first_seq: u32,
381
382 pub highest_seq: u32,
384
385 pub received: u32,
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use bytes::Bytes;
393 use crate::packet::RtpHeader;
394
395 fn create_test_packet(seq: RtpSequenceNumber, ts: RtpTimestamp) -> RtpPacket {
396 let header = RtpHeader::new(96, seq, ts, 0x12345678);
397 let payload = Bytes::from_static(b"test");
398 RtpPacket::new(header, payload)
399 }
400
401 #[test]
402 fn test_sequence_tracking() {
403 let mut stream = RtpStream::new(0x12345678, 8000);
404
405 stream.process_packet(create_test_packet(1000, 80000));
407 assert_eq!(stream.base_seq, 1000);
408 assert_eq!(stream.highest_seq, 1000);
409 assert_eq!(stream.packets_received, 1);
410 assert_eq!(stream.packets_lost, 0);
411
412 stream.process_packet(create_test_packet(1001, 80160));
414 assert_eq!(stream.highest_seq, 1001);
415 assert_eq!(stream.packets_received, 2);
416 assert_eq!(stream.packets_lost, 0);
417
418 stream.process_packet(create_test_packet(1005, 80800));
420 assert_eq!(stream.highest_seq, 1005);
421 assert_eq!(stream.packets_received, 3);
422 assert_eq!(stream.packets_lost, 3); stream.process_packet(create_test_packet(1005, 80800));
426 assert_eq!(stream.highest_seq, 1005);
427 assert_eq!(stream.packets_received, 4);
428 assert_eq!(stream.duplicates, 1);
429
430 stream.process_packet(create_test_packet(1003, 80480));
432 assert_eq!(stream.highest_seq, 1005); assert_eq!(stream.packets_received, 5);
434 assert_eq!(stream.packets_lost, 3); }
436
437 #[test]
438 fn test_sequence_wraparound() {
439 let mut stream = RtpStream::new(0x12345678, 8000);
440
441 stream.process_packet(create_test_packet(65530, 80000));
443 assert_eq!(stream.base_seq, 65530);
444 assert_eq!(stream.highest_seq, 65530);
445 assert_eq!(stream.seq_cycles, 0);
446
447 stream.process_packet(create_test_packet(65531, 80160));
449 stream.process_packet(create_test_packet(65532, 80320));
450 stream.process_packet(create_test_packet(65533, 80480));
451 stream.process_packet(create_test_packet(65534, 80640));
452 stream.process_packet(create_test_packet(65535, 80800));
453 assert_eq!(stream.highest_seq, 65535);
454 assert_eq!(stream.seq_cycles, 0);
455
456 stream.process_packet(create_test_packet(0, 80960));
458 assert_eq!(stream.highest_seq, 65536); assert_eq!(stream.seq_cycles, 1);
460
461 stream.process_packet(create_test_packet(1, 81120));
463 stream.process_packet(create_test_packet(2, 81280));
464 assert_eq!(stream.highest_seq, 65538); assert_eq!(stream.seq_cycles, 1);
466 }
467
468 #[test]
469 fn test_is_sequence_newer() {
470 assert!(is_sequence_newer(101, 100));
472 assert!(!is_sequence_newer(100, 101));
473
474 assert!(!is_sequence_newer(100, 100));
476
477 assert!(is_sequence_newer(0, 65535));
479 assert!(!is_sequence_newer(65535, 0));
480
481 assert!(is_sequence_newer(1, 65000));
483 assert!(!is_sequence_newer(65000, 1));
484
485 assert!(is_sequence_newer(32768, 0));
487 assert!(!is_sequence_newer(0, 32768));
488 }
489}