rvoip_rtp_core/session/
stream.rs

1use bytes::Bytes;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, Instant};
5use tracing::{debug, warn};
6
7use crate::error::Error;
8use crate::packet::RtpPacket;
9use crate::{Result, RtpSequenceNumber, RtpSsrc, RtpTimestamp};
10
11/// Represents an RTP stream with sequence tracking and statistics
12pub struct RtpStream {
13    /// SSRC of the stream
14    pub ssrc: RtpSsrc,
15    
16    /// Latest sequence number received
17    latest_seq: RtpSequenceNumber,
18    
19    /// Highest sequence number received
20    highest_seq: u32,
21    
22    /// Base sequence number (first received)
23    base_seq: RtpSequenceNumber,
24    
25    /// Whether the stream has been initialized
26    initialized: bool,
27    
28    /// Last time a packet was received
29    last_packet_time: Instant,
30    
31    /// Number of packets received
32    packets_received: u64,
33    
34    /// Number of bytes received
35    bytes_received: u64,
36    
37    /// Number of packets lost (based on sequence gaps)
38    packets_lost: u64,
39    
40    /// Number of duplicate packets
41    duplicates: u64,
42    
43    /// Interarrival jitter estimate (RFC 3550)
44    jitter: f64,
45    
46    /// Last arrival time used for jitter calculation
47    last_arrival: Option<Instant>,
48    
49    /// Last RTP timestamp used for jitter calculation
50    last_timestamp: Option<RtpTimestamp>,
51    
52    /// Clock rate for timestamp calculations
53    clock_rate: u32,
54    
55    /// Jitter buffer for reordering packets (optional)
56    jitter_buffer: Option<Arc<Mutex<VecDeque<RtpPacket>>>>,
57    
58    /// Maximum jitter buffer size in packets
59    max_jitter_size: usize,
60    
61    /// Maximum packet age in the jitter buffer
62    max_packet_age: Duration,
63    
64    /// Sequence cycle count (for wraparound handling)
65    seq_cycles: u16,
66    
67    /// RTCP SR timestamp (middle 32 bits of NTP timestamp)
68    last_sr_timestamp: Option<u32>,
69    
70    /// Time when the last SR was received
71    last_sr_time: Option<Instant>,
72}
73
74impl RtpStream {
75    /// Create a new RTP stream
76    pub fn new(ssrc: RtpSsrc, clock_rate: u32) -> Self {
77        Self {
78            ssrc,
79            latest_seq: 0,
80            highest_seq: 0,
81            base_seq: 0,
82            initialized: false,
83            last_packet_time: Instant::now(),
84            packets_received: 0,
85            bytes_received: 0,
86            packets_lost: 0,
87            duplicates: 0,
88            jitter: 0.0,
89            last_arrival: None,
90            last_timestamp: None,
91            clock_rate,
92            jitter_buffer: None,
93            max_jitter_size: 50, // Default size
94            max_packet_age: Duration::from_millis(200), // Default 200ms
95            seq_cycles: 0,
96            last_sr_timestamp: None,
97            last_sr_time: None,
98        }
99    }
100    
101    /// Create a new RTP stream with jitter buffer
102    pub fn with_jitter_buffer(
103        ssrc: RtpSsrc, 
104        clock_rate: u32, 
105        buffer_size: usize, 
106        max_age_ms: u64
107    ) -> Self {
108        let mut stream = Self::new(ssrc, clock_rate);
109        stream.enable_jitter_buffer(buffer_size, max_age_ms);
110        stream
111    }
112    
113    /// Enable jitter buffer for this stream
114    pub fn enable_jitter_buffer(&mut self, size: usize, max_age_ms: u64) {
115        self.jitter_buffer = Some(Arc::new(Mutex::new(VecDeque::with_capacity(size))));
116        self.max_jitter_size = size;
117        self.max_packet_age = Duration::from_millis(max_age_ms);
118    }
119    
120    /// Disable jitter buffer
121    pub fn disable_jitter_buffer(&mut self) {
122        self.jitter_buffer = None;
123    }
124    
125    /// Process a received RTP packet
126    /// Returns the packet if it should be processed immediately,
127    /// or None if it was placed in the jitter buffer
128    pub fn process_packet(&mut self, packet: RtpPacket) -> Option<RtpPacket> {
129        let now = Instant::now();
130        self.last_packet_time = now;
131        
132        let seq = packet.header.sequence_number;
133        let timestamp = packet.header.timestamp;
134        
135        // Update basic stats
136        self.packets_received += 1;
137        self.bytes_received += packet.size() as u64;
138        
139        // Initialize sequence tracking if this is the first packet
140        if !self.initialized {
141            self.init_sequence(seq);
142            self.last_timestamp = Some(timestamp);
143            self.last_arrival = Some(now);
144            self.initialized = true;
145            return Some(packet);
146        }
147        
148        // Update sequence tracking
149        self.update_sequence(seq);
150        
151        // Update jitter estimate
152        if let (Some(last_arrival), Some(last_ts)) = (self.last_arrival, self.last_timestamp) {
153            let arrival_diff = now.duration_since(last_arrival).as_secs_f64();
154            let ts_diff = ((timestamp as i32 - last_ts as i32).abs() as f64) / (self.clock_rate as f64);
155            
156            // RFC 3550 jitter calculation
157            let d = arrival_diff - ts_diff;
158            self.jitter += (d.abs() - self.jitter) / 16.0;
159        }
160        
161        self.last_arrival = Some(now);
162        self.last_timestamp = Some(timestamp);
163        
164        // If using jitter buffer, add to buffer and return ordered packets
165        if let Some(buffer) = &self.jitter_buffer {
166            self.add_to_jitter_buffer(packet, buffer.clone());
167            self.get_next_from_jitter_buffer(buffer.clone())
168        } else {
169            // No jitter buffer, return packet immediately
170            Some(packet)
171        }
172    }
173    
174    /// Initialize sequence tracking
175    fn init_sequence(&mut self, seq: RtpSequenceNumber) {
176        self.base_seq = seq;
177        self.latest_seq = seq;
178        self.highest_seq = seq as u32;
179        debug!("Initialized RTP stream with seq={}", seq);
180    }
181    
182    /// Update sequence tracking with a new sequence number
183    fn update_sequence(&mut self, seq: RtpSequenceNumber) {
184        // Detect sequence number cycle (wraparound from 65535 to 0)
185        if seq < 0x1000 && self.latest_seq > 0xf000 {
186            debug!("Detected sequence wraparound: {} -> {}", self.latest_seq, seq);
187            self.seq_cycles += 1;
188        }
189        
190        // Check for duplicate
191        if seq == self.latest_seq {
192            self.duplicates += 1;
193            return;
194        }
195        
196        // Calculate extended sequence (with cycle count)
197        let extended_seq = (self.seq_cycles as u32) << 16 | (seq as u32);
198        
199        // Check if this is the highest sequence seen
200        if extended_seq > self.highest_seq {
201            // Calculate lost packets (gap in sequence)
202            let expected_seq = (self.latest_seq as u32 + 1) & 0xFFFF;
203            if seq != expected_seq as u16 {
204                // There's a gap - calculate how many packets were lost
205                let gap = if seq > expected_seq as u16 {
206                    seq - expected_seq as u16
207                } else {
208                    // Handle sequence number wraparound
209                    ((0xFFFF as u32 + 1) - expected_seq as u32) as u16 + seq
210                };
211                
212                if gap > 0 {
213                    self.packets_lost += gap as u64;
214                    debug!("Detected sequence gap: expected={}, got={}, lost={}", 
215                           expected_seq, seq, gap);
216                }
217            }
218            
219            self.highest_seq = extended_seq;
220        } else {
221            // Out of order packet (older than highest)
222            debug!("Out of order packet: seq={}, highest={}", seq, self.highest_seq & 0xFFFF);
223        }
224        
225        self.latest_seq = seq;
226    }
227    
228    /// Add a packet to the jitter buffer
229    fn add_to_jitter_buffer(&self, packet: RtpPacket, buffer: Arc<Mutex<VecDeque<RtpPacket>>>) {
230        if let Ok(mut buffer_lock) = buffer.lock() {
231            if buffer_lock.len() >= self.max_jitter_size {
232                // Buffer is full, remove oldest packet
233                buffer_lock.pop_front();
234                warn!("Jitter buffer full, dropping oldest packet");
235            }
236            
237            // Find the correct position to insert this packet (sorted by sequence number)
238            let seq = packet.header.sequence_number;
239            let pos = buffer_lock.iter().position(|p| {
240                let p_seq = p.header.sequence_number;
241                is_sequence_newer(seq, p_seq)
242            });
243            
244            if let Some(pos) = pos {
245                buffer_lock.insert(pos, packet);
246            } else {
247                // Add to end
248                buffer_lock.push_back(packet);
249            }
250        }
251    }
252    
253    /// Get the next packet from the jitter buffer if it's ready to be processed
254    fn get_next_from_jitter_buffer(&self, buffer: Arc<Mutex<VecDeque<RtpPacket>>>) -> Option<RtpPacket> {
255        if let Ok(mut buffer_lock) = buffer.lock() {
256            if buffer_lock.is_empty() {
257                return None;
258            }
259            
260            // Check if the oldest packet is old enough to be released
261            let first_packet = buffer_lock.front()?;
262            let expected_seq = (self.latest_seq as u32 + 1) & 0xFFFF;
263            
264            // Release packet if it's the next expected one, or if it's old
265            if first_packet.header.sequence_number == expected_seq as u16 {
266                return buffer_lock.pop_front();
267            }
268            
269            // If buffer has enough packets or packet is too old, release it
270            if buffer_lock.len() > self.max_jitter_size / 2 {
271                return buffer_lock.pop_front();
272            }
273        }
274        
275        None
276    }
277    
278    /// Get the current jitter estimate in milliseconds
279    pub fn get_jitter_ms(&self) -> f64 {
280        self.jitter * 1000.0
281    }
282    
283    /// Get statistics for this stream
284    pub fn get_stats(&self) -> RtpStreamStats {
285        RtpStreamStats {
286            ssrc: self.ssrc,
287            packets_received: self.packets_received,
288            bytes_received: self.bytes_received,
289            packets_lost: self.packets_lost,
290            duplicates: self.duplicates,
291            last_packet_time: Some(self.last_packet_time),
292            jitter: self.jitter as u32,
293            first_seq: self.base_seq as u32,
294            highest_seq: self.highest_seq,
295            received: self.packets_received as u32,
296        }
297    }
298    
299    /// Ensure the stream is initialized with the given sequence number
300    /// This is useful when packets might be held in the jitter buffer
301    /// or discarded, but we still want to track the stream.
302    pub fn ensure_initialized(&mut self, seq: u16) {
303        if !self.initialized {
304            self.init_sequence(seq as RtpSequenceNumber);
305            
306            // Make sure all required state is properly initialized
307            self.packets_received = 0;
308            self.highest_seq = seq as u32;
309            self.latest_seq = seq as RtpSequenceNumber;
310            self.packets_lost = 0;
311            self.duplicates = 0;
312            self.jitter = 0.0;
313            self.last_arrival = None;
314            self.last_timestamp = None;
315            
316            self.initialized = true;
317            debug!("Initialized RTP stream with seq={}", seq);
318        }
319    }
320    
321    /// Update the last SR information
322    pub fn update_last_sr_info(&mut self, sr_timestamp: u32, time: Instant) {
323        self.last_sr_timestamp = Some(sr_timestamp);
324        self.last_sr_time = Some(time);
325    }
326    
327    /// Get the last SR information
328    pub fn get_last_sr_info(&self) -> (Option<u32>, Option<Instant>) {
329        (self.last_sr_timestamp, self.last_sr_time)
330    }
331    
332    /// Calculate the delay since last SR in 1/65536 seconds units
333    pub fn calculate_delay_since_last_sr(&self) -> u32 {
334        if let (Some(timestamp), Some(time)) = (self.last_sr_timestamp, self.last_sr_time) {
335            // Calculate delay in seconds, then convert to 1/65536 seconds
336            let delay_secs = Instant::now().duration_since(time).as_secs_f64();
337            (delay_secs * 65536.0) as u32
338        } else {
339            0
340        }
341    }
342}
343
344/// Determines if sequence a is newer than sequence b, accounting for wraparound
345fn is_sequence_newer(a: RtpSequenceNumber, b: RtpSequenceNumber) -> bool {
346    let half_range = 0x8000;
347    // If the difference is larger than half the range, it's due to wraparound
348    if b < a {
349        (a - b) <= half_range
350    } else {
351        (b - a) > half_range
352    }
353}
354
355/// Stats for an RTP stream
356#[derive(Debug, Clone, Default)]
357pub struct RtpStreamStats {
358    /// SSRC of the stream
359    pub ssrc: RtpSsrc,
360    
361    /// Number of packets received
362    pub packets_received: u64,
363    
364    /// Number of bytes received
365    pub bytes_received: u64,
366    
367    /// Number of packets lost (based on sequence gaps)
368    pub packets_lost: u64,
369    
370    /// Number of duplicate packets
371    pub duplicates: u64,
372    
373    /// Last time a packet was received
374    pub last_packet_time: Option<Instant>,
375    
376    /// Interarrival jitter (RFC 3550)
377    pub jitter: u32,
378    
379    /// First sequence number received
380    pub first_seq: u32,
381    
382    /// Highest sequence number received
383    pub highest_seq: u32,
384    
385    /// Number of packets actually received (may differ from packets_received due to jitter buffer)
386    pub received: u32,
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use bytes::Bytes;
393    use crate::packet::RtpHeader;
394    
395    fn create_test_packet(seq: RtpSequenceNumber, ts: RtpTimestamp) -> RtpPacket {
396        let header = RtpHeader::new(96, seq, ts, 0x12345678);
397        let payload = Bytes::from_static(b"test");
398        RtpPacket::new(header, payload)
399    }
400    
401    #[test]
402    fn test_sequence_tracking() {
403        let mut stream = RtpStream::new(0x12345678, 8000);
404        
405        // Process first packet
406        stream.process_packet(create_test_packet(1000, 80000));
407        assert_eq!(stream.base_seq, 1000);
408        assert_eq!(stream.highest_seq, 1000);
409        assert_eq!(stream.packets_received, 1);
410        assert_eq!(stream.packets_lost, 0);
411        
412        // Process next packet in sequence
413        stream.process_packet(create_test_packet(1001, 80160));
414        assert_eq!(stream.highest_seq, 1001);
415        assert_eq!(stream.packets_received, 2);
416        assert_eq!(stream.packets_lost, 0);
417        
418        // Process packet with gap
419        stream.process_packet(create_test_packet(1005, 80800));
420        assert_eq!(stream.highest_seq, 1005);
421        assert_eq!(stream.packets_received, 3);
422        assert_eq!(stream.packets_lost, 3); // Missing 1002, 1003, 1004
423        
424        // Process duplicate
425        stream.process_packet(create_test_packet(1005, 80800));
426        assert_eq!(stream.highest_seq, 1005);
427        assert_eq!(stream.packets_received, 4);
428        assert_eq!(stream.duplicates, 1);
429        
430        // Process out-of-order packet
431        stream.process_packet(create_test_packet(1003, 80480));
432        assert_eq!(stream.highest_seq, 1005); // Highest shouldn't change
433        assert_eq!(stream.packets_received, 5);
434        assert_eq!(stream.packets_lost, 3); // Still 3 packets lost
435    }
436    
437    #[test]
438    fn test_sequence_wraparound() {
439        let mut stream = RtpStream::new(0x12345678, 8000);
440        
441        // Start with high sequence number
442        stream.process_packet(create_test_packet(65530, 80000));
443        assert_eq!(stream.base_seq, 65530);
444        assert_eq!(stream.highest_seq, 65530);
445        assert_eq!(stream.seq_cycles, 0);
446        
447        // Process packets up to wraparound
448        stream.process_packet(create_test_packet(65531, 80160));
449        stream.process_packet(create_test_packet(65532, 80320));
450        stream.process_packet(create_test_packet(65533, 80480));
451        stream.process_packet(create_test_packet(65534, 80640));
452        stream.process_packet(create_test_packet(65535, 80800));
453        assert_eq!(stream.highest_seq, 65535);
454        assert_eq!(stream.seq_cycles, 0);
455        
456        // Process packet after wraparound
457        stream.process_packet(create_test_packet(0, 80960));
458        assert_eq!(stream.highest_seq, 65536); // 1 cycle + sequence 0
459        assert_eq!(stream.seq_cycles, 1);
460        
461        // Process a few more packets
462        stream.process_packet(create_test_packet(1, 81120));
463        stream.process_packet(create_test_packet(2, 81280));
464        assert_eq!(stream.highest_seq, 65538); // 1 cycle + sequence 2
465        assert_eq!(stream.seq_cycles, 1);
466    }
467    
468    #[test]
469    fn test_is_sequence_newer() {
470        // Normal cases
471        assert!(is_sequence_newer(101, 100));
472        assert!(!is_sequence_newer(100, 101));
473        
474        // Equal case
475        assert!(!is_sequence_newer(100, 100));
476        
477        // Wraparound cases
478        assert!(is_sequence_newer(0, 65535));
479        assert!(!is_sequence_newer(65535, 0));
480        
481        // Edge cases around wraparound
482        assert!(is_sequence_newer(1, 65000));
483        assert!(!is_sequence_newer(65000, 1));
484        
485        // Edge cases within half range
486        assert!(is_sequence_newer(32768, 0));
487        assert!(!is_sequence_newer(0, 32768));
488    }
489}