Skip to main content

rtp_engine/rtp/
stats.rs

1//! RTP/RTCP statistics tracking.
2//!
3//! Implements RFC 3550 extended sequence number tracking with proper
4//! rollover handling for 16-bit sequence numbers.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
8
9/// RTP/RTCP statistics snapshot.
10#[derive(Debug, Clone, Default)]
11pub struct RtpStats {
12    /// Total RTP packets sent.
13    pub packets_sent: u64,
14    /// Total RTP packets received.
15    pub packets_received: u64,
16    /// Total payload bytes sent.
17    pub bytes_sent: u64,
18    /// Total payload bytes received.
19    pub bytes_received: u64,
20    /// Estimated packets lost.
21    pub packets_lost: u64,
22    /// Interarrival jitter in milliseconds.
23    pub jitter_ms: f64,
24    /// Codec name.
25    pub codec_name: String,
26    /// Extended highest sequence number (includes rollover cycles).
27    pub extended_highest_seq: u32,
28    /// Number of sequence number rollovers.
29    pub seq_cycles: u16,
30}
31
32/// Thread-safe counters for tracking RTP statistics.
33#[derive(Clone)]
34pub struct RtpCounters {
35    /// Packets sent.
36    pub packets_sent: Arc<AtomicU64>,
37    /// Packets received.
38    pub packets_received: Arc<AtomicU64>,
39    /// Bytes sent.
40    pub bytes_sent: Arc<AtomicU64>,
41    /// Bytes received.
42    pub bytes_received: Arc<AtomicU64>,
43    /// Packets lost.
44    pub packets_lost: Arc<AtomicU64>,
45    /// Jitter in microseconds.
46    pub jitter_us: Arc<AtomicU64>,
47    /// Codec name.
48    pub codec_name: String,
49    /// Extended highest sequence number (upper 16 bits = cycles, lower 16 = seq).
50    pub highest_seq: Arc<AtomicU32>,
51    /// Expected packets based on sequence numbers.
52    pub expected_packets: Arc<AtomicU64>,
53    /// Whether we've received the first packet (to initialize tracking).
54    initialized: Arc<AtomicBool>,
55    /// Base (first) sequence number received.
56    base_seq: Arc<AtomicU32>,
57}
58
59impl RtpCounters {
60    /// Create a new set of counters.
61    pub fn new(codec_name: &str) -> Self {
62        Self {
63            packets_sent: Arc::new(AtomicU64::new(0)),
64            packets_received: Arc::new(AtomicU64::new(0)),
65            bytes_sent: Arc::new(AtomicU64::new(0)),
66            bytes_received: Arc::new(AtomicU64::new(0)),
67            packets_lost: Arc::new(AtomicU64::new(0)),
68            jitter_us: Arc::new(AtomicU64::new(0)),
69            codec_name: codec_name.to_string(),
70            highest_seq: Arc::new(AtomicU32::new(0)),
71            expected_packets: Arc::new(AtomicU64::new(0)),
72            initialized: Arc::new(AtomicBool::new(false)),
73            base_seq: Arc::new(AtomicU32::new(0)),
74        }
75    }
76
77    /// Take a snapshot of the current statistics.
78    pub fn snapshot(&self) -> RtpStats {
79        let received = self.packets_received.load(Ordering::Relaxed);
80        let expected = self.expected_packets.load(Ordering::Relaxed);
81        let lost = expected.saturating_sub(received);
82        self.packets_lost.store(lost, Ordering::Relaxed);
83        let highest = self.highest_seq.load(Ordering::Relaxed);
84
85        RtpStats {
86            packets_sent: self.packets_sent.load(Ordering::Relaxed),
87            packets_received: received,
88            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
89            bytes_received: self.bytes_received.load(Ordering::Relaxed),
90            packets_lost: lost,
91            jitter_ms: self.jitter_us.load(Ordering::Relaxed) as f64 / 1000.0,
92            codec_name: self.codec_name.clone(),
93            extended_highest_seq: highest,
94            seq_cycles: (highest >> 16) as u16,
95        }
96    }
97
98    /// Record a sent packet.
99    pub fn record_sent(&self, bytes: u64) {
100        self.packets_sent.fetch_add(1, Ordering::Relaxed);
101        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
102    }
103
104    /// Record a received packet with proper sequence rollover handling.
105    ///
106    /// This implements RFC 3550 Appendix A.1 extended sequence number algorithm.
107    pub fn record_received(&self, bytes: u64, seq: u16) {
108        self.packets_received.fetch_add(1, Ordering::Relaxed);
109        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
110
111        if !self.initialized.swap(true, Ordering::Relaxed) {
112            // First packet - initialize tracking
113            self.base_seq.store(seq as u32, Ordering::Relaxed);
114            self.highest_seq.store(seq as u32, Ordering::Relaxed);
115            self.expected_packets.store(1, Ordering::Relaxed);
116            return;
117        }
118
119        let prev_extended = self.highest_seq.load(Ordering::Relaxed);
120        let prev_seq = (prev_extended & 0xFFFF) as u16;
121        let cycles = prev_extended >> 16;
122
123        // RFC 3550: detect rollover by checking if sequence wrapped
124        let new_cycles = if seq < prev_seq && (prev_seq.wrapping_sub(seq)) > 0x8000 {
125            // Sequence wrapped forward (65535 -> 0)
126            cycles.wrapping_add(1)
127        } else if seq > prev_seq && (seq.wrapping_sub(prev_seq)) > 0x8000 {
128            // Late/reordered packet from before rollover
129            cycles.wrapping_sub(1)
130        } else {
131            cycles
132        };
133
134        let new_extended = (new_cycles << 16) | (seq as u32);
135
136        // Update if this is a higher extended sequence number
137        if new_extended > prev_extended || (new_cycles > cycles) {
138            self.highest_seq.store(new_extended, Ordering::Relaxed);
139
140            // Update expected packets count
141            let base = self.base_seq.load(Ordering::Relaxed);
142            let expected = new_extended.wrapping_sub(base).wrapping_add(1) as u64;
143            self.expected_packets.store(expected, Ordering::Relaxed);
144        }
145    }
146
147    /// Update jitter calculation (RFC 3550 algorithm).
148    pub fn update_jitter(&self, transit_diff_us: u64) {
149        let prev_jitter = self.jitter_us.load(Ordering::Relaxed) as f64;
150        let d = transit_diff_us as f64;
151        let new_jitter = prev_jitter + (d - prev_jitter) / 16.0;
152        self.jitter_us.store(new_jitter as u64, Ordering::Relaxed);
153    }
154
155    /// Reset all counters.
156    pub fn reset(&self) {
157        self.packets_sent.store(0, Ordering::Relaxed);
158        self.packets_received.store(0, Ordering::Relaxed);
159        self.bytes_sent.store(0, Ordering::Relaxed);
160        self.bytes_received.store(0, Ordering::Relaxed);
161        self.packets_lost.store(0, Ordering::Relaxed);
162        self.jitter_us.store(0, Ordering::Relaxed);
163        self.highest_seq.store(0, Ordering::Relaxed);
164        self.expected_packets.store(0, Ordering::Relaxed);
165        self.initialized.store(false, Ordering::Relaxed);
166        self.base_seq.store(0, Ordering::Relaxed);
167    }
168
169    /// Get the extended highest sequence number (cycles << 16 | seq).
170    pub fn extended_highest_seq(&self) -> u32 {
171        self.highest_seq.load(Ordering::Relaxed)
172    }
173
174    /// Get the number of sequence cycles (rollovers).
175    pub fn seq_cycles(&self) -> u16 {
176        (self.highest_seq.load(Ordering::Relaxed) >> 16) as u16
177    }
178}
179
180impl Default for RtpCounters {
181    fn default() -> Self {
182        Self::new("unknown")
183    }
184}
185
186impl std::fmt::Debug for RtpCounters {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        f.debug_struct("RtpCounters")
189            .field("codec", &self.codec_name)
190            .field("sent", &self.packets_sent.load(Ordering::Relaxed))
191            .field("received", &self.packets_received.load(Ordering::Relaxed))
192            .finish()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_counters_basic() {
202        let counters = RtpCounters::new("PCMU");
203
204        counters.record_sent(172);
205        counters.record_sent(172);
206        counters.record_received(172, 1);
207        counters.record_received(172, 2);
208
209        let stats = counters.snapshot();
210        assert_eq!(stats.packets_sent, 2);
211        assert_eq!(stats.packets_received, 2);
212        assert_eq!(stats.bytes_sent, 344);
213        assert_eq!(stats.bytes_received, 344);
214        assert_eq!(stats.codec_name, "PCMU");
215    }
216
217    #[test]
218    fn test_jitter_calculation() {
219        let counters = RtpCounters::new("PCMU");
220
221        // Simulate varying transit times
222        counters.update_jitter(1000);
223        counters.update_jitter(2000);
224        counters.update_jitter(500);
225
226        let stats = counters.snapshot();
227        assert!(stats.jitter_ms > 0.0);
228    }
229
230    #[test]
231    fn test_reset() {
232        let counters = RtpCounters::new("PCMU");
233
234        counters.record_sent(100);
235        counters.record_received(100, 1);
236        counters.reset();
237
238        let stats = counters.snapshot();
239        assert_eq!(stats.packets_sent, 0);
240        assert_eq!(stats.packets_received, 0);
241    }
242
243    #[test]
244    fn test_sequence_rollover_forward() {
245        let counters = RtpCounters::new("PCMU");
246
247        // Start near the rollover point
248        counters.record_received(100, 65534);
249        assert_eq!(counters.seq_cycles(), 0);
250        assert_eq!(counters.extended_highest_seq(), 65534);
251
252        counters.record_received(100, 65535);
253        assert_eq!(counters.seq_cycles(), 0);
254        assert_eq!(counters.extended_highest_seq(), 65535);
255
256        // Rollover: 65535 -> 0
257        counters.record_received(100, 0);
258        assert_eq!(counters.seq_cycles(), 1);
259        assert_eq!(counters.extended_highest_seq(), 1 << 16); // cycle 1, seq 0
260
261        counters.record_received(100, 1);
262        assert_eq!(counters.seq_cycles(), 1);
263        assert_eq!(counters.extended_highest_seq(), (1 << 16) | 1);
264
265        // Continue normally in cycle 1
266        counters.record_received(100, 2);
267        counters.record_received(100, 3);
268        assert_eq!(counters.seq_cycles(), 1);
269        assert_eq!(counters.extended_highest_seq(), (1 << 16) | 3);
270    }
271
272    #[test]
273    fn test_second_rollover_sequential() {
274        let counters = RtpCounters::new("PCMU");
275
276        // Start in cycle 0 near rollover
277        counters.record_received(100, 65534);
278        counters.record_received(100, 65535);
279        counters.record_received(100, 0); // -> cycle 1
280        assert_eq!(counters.seq_cycles(), 1);
281
282        // Progress sequentially through cycle 1
283        // (In real RTP, packets arrive sequentially)
284        for seq in 1u16..=65535 {
285            counters.record_received(100, seq);
286        }
287        // Now rollover again
288        counters.record_received(100, 0); // -> cycle 2
289        assert_eq!(counters.seq_cycles(), 2);
290        assert_eq!(counters.extended_highest_seq(), 2 << 16); // cycle 2, seq 0
291    }
292
293    #[test]
294    fn test_small_gap_near_rollover() {
295        let counters = RtpCounters::new("PCMU");
296
297        // Test small gaps (realistic packet loss) near rollover
298        counters.record_received(100, 65530);
299        counters.record_received(100, 65531);
300        // Skip 65532 (lost)
301        counters.record_received(100, 65533);
302        counters.record_received(100, 65534);
303        counters.record_received(100, 65535);
304        // Rollover
305        counters.record_received(100, 0);
306        assert_eq!(counters.seq_cycles(), 1);
307        // Skip 1 (lost)
308        counters.record_received(100, 2);
309        counters.record_received(100, 3);
310
311        assert_eq!(counters.seq_cycles(), 1);
312        assert_eq!(counters.extended_highest_seq(), (1 << 16) | 3);
313    }
314
315    #[test]
316    fn test_sequence_reorder_near_rollover() {
317        let counters = RtpCounters::new("PCMU");
318
319        // Receive packet 65534
320        counters.record_received(100, 65534);
321        assert_eq!(counters.seq_cycles(), 0);
322
323        // Receive packet 0 (rollover)
324        counters.record_received(100, 0);
325        assert_eq!(counters.seq_cycles(), 1);
326
327        // Late arrival of 65535 from before rollover
328        // Should not increment cycles further
329        counters.record_received(100, 65535);
330        assert_eq!(counters.seq_cycles(), 1);
331
332        // Continue normally
333        counters.record_received(100, 1);
334        counters.record_received(100, 2);
335        assert_eq!(counters.extended_highest_seq(), (1 << 16) | 2);
336    }
337
338    #[test]
339    fn test_expected_packets_with_rollover() {
340        let counters = RtpCounters::new("PCMU");
341
342        // Start at 65530
343        counters.record_received(100, 65530);
344
345        // Go to 5 (across rollover)
346        for seq in 65531..=65535 {
347            counters.record_received(100, seq);
348        }
349        for seq in 0..=5 {
350            counters.record_received(100, seq);
351        }
352
353        let stats = counters.snapshot();
354        // Expected: 65530 to 65535 (6) + 0 to 5 (6) = 12 packets
355        // But we also count from base_seq to highest, so:
356        // base = 65530, highest = (1 << 16) | 5 = 65541
357        // expected = 65541 - 65530 + 1 = 12
358        assert_eq!(stats.packets_received, 12);
359        assert_eq!(stats.extended_highest_seq, (1 << 16) | 5);
360    }
361
362    #[test]
363    fn test_multiple_rollovers() {
364        let counters = RtpCounters::new("PCMU");
365
366        counters.record_received(100, 0);
367
368        // Simulate 3 full cycles
369        for cycle in 0..3 {
370            for seq in 1..=65535u16 {
371                counters.record_received(100, seq);
372            }
373            counters.record_received(100, 0);
374            assert_eq!(
375                counters.seq_cycles(),
376                cycle + 1,
377                "After cycle {}, expected {} cycles",
378                cycle,
379                cycle + 1
380            );
381        }
382
383        assert_eq!(counters.seq_cycles(), 3);
384    }
385
386    #[test]
387    fn test_packet_loss_calculation() {
388        let counters = RtpCounters::new("PCMU");
389
390        // Receive packets 0, 1, 2, 5, 6 (missing 3, 4)
391        counters.record_received(100, 0);
392        counters.record_received(100, 1);
393        counters.record_received(100, 2);
394        counters.record_received(100, 5);
395        counters.record_received(100, 6);
396
397        let stats = counters.snapshot();
398        assert_eq!(stats.packets_received, 5);
399        // Expected: 0 to 6 = 7 packets
400        // Lost: 7 - 5 = 2
401        assert_eq!(stats.packets_lost, 2);
402    }
403}