1use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
8
9#[derive(Debug, Clone, Default)]
11pub struct RtpStats {
12 pub packets_sent: u64,
14 pub packets_received: u64,
16 pub bytes_sent: u64,
18 pub bytes_received: u64,
20 pub packets_lost: u64,
22 pub jitter_ms: f64,
24 pub codec_name: String,
26 pub extended_highest_seq: u32,
28 pub seq_cycles: u16,
30}
31
32#[derive(Clone)]
34pub struct RtpCounters {
35 pub packets_sent: Arc<AtomicU64>,
37 pub packets_received: Arc<AtomicU64>,
39 pub bytes_sent: Arc<AtomicU64>,
41 pub bytes_received: Arc<AtomicU64>,
43 pub packets_lost: Arc<AtomicU64>,
45 pub jitter_us: Arc<AtomicU64>,
47 pub codec_name: String,
49 pub highest_seq: Arc<AtomicU32>,
51 pub expected_packets: Arc<AtomicU64>,
53 initialized: Arc<AtomicBool>,
55 base_seq: Arc<AtomicU32>,
57}
58
59impl RtpCounters {
60 pub fn new(codec_name: &str) -> Self {
62 Self {
63 packets_sent: Arc::new(AtomicU64::new(0)),
64 packets_received: Arc::new(AtomicU64::new(0)),
65 bytes_sent: Arc::new(AtomicU64::new(0)),
66 bytes_received: Arc::new(AtomicU64::new(0)),
67 packets_lost: Arc::new(AtomicU64::new(0)),
68 jitter_us: Arc::new(AtomicU64::new(0)),
69 codec_name: codec_name.to_string(),
70 highest_seq: Arc::new(AtomicU32::new(0)),
71 expected_packets: Arc::new(AtomicU64::new(0)),
72 initialized: Arc::new(AtomicBool::new(false)),
73 base_seq: Arc::new(AtomicU32::new(0)),
74 }
75 }
76
77 pub fn snapshot(&self) -> RtpStats {
79 let received = self.packets_received.load(Ordering::Relaxed);
80 let expected = self.expected_packets.load(Ordering::Relaxed);
81 let lost = expected.saturating_sub(received);
82 self.packets_lost.store(lost, Ordering::Relaxed);
83 let highest = self.highest_seq.load(Ordering::Relaxed);
84
85 RtpStats {
86 packets_sent: self.packets_sent.load(Ordering::Relaxed),
87 packets_received: received,
88 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
89 bytes_received: self.bytes_received.load(Ordering::Relaxed),
90 packets_lost: lost,
91 jitter_ms: self.jitter_us.load(Ordering::Relaxed) as f64 / 1000.0,
92 codec_name: self.codec_name.clone(),
93 extended_highest_seq: highest,
94 seq_cycles: (highest >> 16) as u16,
95 }
96 }
97
98 pub fn record_sent(&self, bytes: u64) {
100 self.packets_sent.fetch_add(1, Ordering::Relaxed);
101 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
102 }
103
104 pub fn record_received(&self, bytes: u64, seq: u16) {
108 self.packets_received.fetch_add(1, Ordering::Relaxed);
109 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
110
111 if !self.initialized.swap(true, Ordering::Relaxed) {
112 self.base_seq.store(seq as u32, Ordering::Relaxed);
114 self.highest_seq.store(seq as u32, Ordering::Relaxed);
115 self.expected_packets.store(1, Ordering::Relaxed);
116 return;
117 }
118
119 let prev_extended = self.highest_seq.load(Ordering::Relaxed);
120 let prev_seq = (prev_extended & 0xFFFF) as u16;
121 let cycles = prev_extended >> 16;
122
123 let new_cycles = if seq < prev_seq && (prev_seq.wrapping_sub(seq)) > 0x8000 {
125 cycles.wrapping_add(1)
127 } else if seq > prev_seq && (seq.wrapping_sub(prev_seq)) > 0x8000 {
128 cycles.wrapping_sub(1)
130 } else {
131 cycles
132 };
133
134 let new_extended = (new_cycles << 16) | (seq as u32);
135
136 if new_extended > prev_extended || (new_cycles > cycles) {
138 self.highest_seq.store(new_extended, Ordering::Relaxed);
139
140 let base = self.base_seq.load(Ordering::Relaxed);
142 let expected = new_extended.wrapping_sub(base).wrapping_add(1) as u64;
143 self.expected_packets.store(expected, Ordering::Relaxed);
144 }
145 }
146
147 pub fn update_jitter(&self, transit_diff_us: u64) {
149 let prev_jitter = self.jitter_us.load(Ordering::Relaxed) as f64;
150 let d = transit_diff_us as f64;
151 let new_jitter = prev_jitter + (d - prev_jitter) / 16.0;
152 self.jitter_us.store(new_jitter as u64, Ordering::Relaxed);
153 }
154
155 pub fn reset(&self) {
157 self.packets_sent.store(0, Ordering::Relaxed);
158 self.packets_received.store(0, Ordering::Relaxed);
159 self.bytes_sent.store(0, Ordering::Relaxed);
160 self.bytes_received.store(0, Ordering::Relaxed);
161 self.packets_lost.store(0, Ordering::Relaxed);
162 self.jitter_us.store(0, Ordering::Relaxed);
163 self.highest_seq.store(0, Ordering::Relaxed);
164 self.expected_packets.store(0, Ordering::Relaxed);
165 self.initialized.store(false, Ordering::Relaxed);
166 self.base_seq.store(0, Ordering::Relaxed);
167 }
168
169 pub fn extended_highest_seq(&self) -> u32 {
171 self.highest_seq.load(Ordering::Relaxed)
172 }
173
174 pub fn seq_cycles(&self) -> u16 {
176 (self.highest_seq.load(Ordering::Relaxed) >> 16) as u16
177 }
178}
179
180impl Default for RtpCounters {
181 fn default() -> Self {
182 Self::new("unknown")
183 }
184}
185
186impl std::fmt::Debug for RtpCounters {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 f.debug_struct("RtpCounters")
189 .field("codec", &self.codec_name)
190 .field("sent", &self.packets_sent.load(Ordering::Relaxed))
191 .field("received", &self.packets_received.load(Ordering::Relaxed))
192 .finish()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn test_counters_basic() {
202 let counters = RtpCounters::new("PCMU");
203
204 counters.record_sent(172);
205 counters.record_sent(172);
206 counters.record_received(172, 1);
207 counters.record_received(172, 2);
208
209 let stats = counters.snapshot();
210 assert_eq!(stats.packets_sent, 2);
211 assert_eq!(stats.packets_received, 2);
212 assert_eq!(stats.bytes_sent, 344);
213 assert_eq!(stats.bytes_received, 344);
214 assert_eq!(stats.codec_name, "PCMU");
215 }
216
217 #[test]
218 fn test_jitter_calculation() {
219 let counters = RtpCounters::new("PCMU");
220
221 counters.update_jitter(1000);
223 counters.update_jitter(2000);
224 counters.update_jitter(500);
225
226 let stats = counters.snapshot();
227 assert!(stats.jitter_ms > 0.0);
228 }
229
230 #[test]
231 fn test_reset() {
232 let counters = RtpCounters::new("PCMU");
233
234 counters.record_sent(100);
235 counters.record_received(100, 1);
236 counters.reset();
237
238 let stats = counters.snapshot();
239 assert_eq!(stats.packets_sent, 0);
240 assert_eq!(stats.packets_received, 0);
241 }
242
243 #[test]
244 fn test_sequence_rollover_forward() {
245 let counters = RtpCounters::new("PCMU");
246
247 counters.record_received(100, 65534);
249 assert_eq!(counters.seq_cycles(), 0);
250 assert_eq!(counters.extended_highest_seq(), 65534);
251
252 counters.record_received(100, 65535);
253 assert_eq!(counters.seq_cycles(), 0);
254 assert_eq!(counters.extended_highest_seq(), 65535);
255
256 counters.record_received(100, 0);
258 assert_eq!(counters.seq_cycles(), 1);
259 assert_eq!(counters.extended_highest_seq(), 1 << 16); counters.record_received(100, 1);
262 assert_eq!(counters.seq_cycles(), 1);
263 assert_eq!(counters.extended_highest_seq(), (1 << 16) | 1);
264
265 counters.record_received(100, 2);
267 counters.record_received(100, 3);
268 assert_eq!(counters.seq_cycles(), 1);
269 assert_eq!(counters.extended_highest_seq(), (1 << 16) | 3);
270 }
271
272 #[test]
273 fn test_second_rollover_sequential() {
274 let counters = RtpCounters::new("PCMU");
275
276 counters.record_received(100, 65534);
278 counters.record_received(100, 65535);
279 counters.record_received(100, 0); assert_eq!(counters.seq_cycles(), 1);
281
282 for seq in 1u16..=65535 {
285 counters.record_received(100, seq);
286 }
287 counters.record_received(100, 0); assert_eq!(counters.seq_cycles(), 2);
290 assert_eq!(counters.extended_highest_seq(), 2 << 16); }
292
293 #[test]
294 fn test_small_gap_near_rollover() {
295 let counters = RtpCounters::new("PCMU");
296
297 counters.record_received(100, 65530);
299 counters.record_received(100, 65531);
300 counters.record_received(100, 65533);
302 counters.record_received(100, 65534);
303 counters.record_received(100, 65535);
304 counters.record_received(100, 0);
306 assert_eq!(counters.seq_cycles(), 1);
307 counters.record_received(100, 2);
309 counters.record_received(100, 3);
310
311 assert_eq!(counters.seq_cycles(), 1);
312 assert_eq!(counters.extended_highest_seq(), (1 << 16) | 3);
313 }
314
315 #[test]
316 fn test_sequence_reorder_near_rollover() {
317 let counters = RtpCounters::new("PCMU");
318
319 counters.record_received(100, 65534);
321 assert_eq!(counters.seq_cycles(), 0);
322
323 counters.record_received(100, 0);
325 assert_eq!(counters.seq_cycles(), 1);
326
327 counters.record_received(100, 65535);
330 assert_eq!(counters.seq_cycles(), 1);
331
332 counters.record_received(100, 1);
334 counters.record_received(100, 2);
335 assert_eq!(counters.extended_highest_seq(), (1 << 16) | 2);
336 }
337
338 #[test]
339 fn test_expected_packets_with_rollover() {
340 let counters = RtpCounters::new("PCMU");
341
342 counters.record_received(100, 65530);
344
345 for seq in 65531..=65535 {
347 counters.record_received(100, seq);
348 }
349 for seq in 0..=5 {
350 counters.record_received(100, seq);
351 }
352
353 let stats = counters.snapshot();
354 assert_eq!(stats.packets_received, 12);
359 assert_eq!(stats.extended_highest_seq, (1 << 16) | 5);
360 }
361
362 #[test]
363 fn test_multiple_rollovers() {
364 let counters = RtpCounters::new("PCMU");
365
366 counters.record_received(100, 0);
367
368 for cycle in 0..3 {
370 for seq in 1..=65535u16 {
371 counters.record_received(100, seq);
372 }
373 counters.record_received(100, 0);
374 assert_eq!(
375 counters.seq_cycles(),
376 cycle + 1,
377 "After cycle {}, expected {} cycles",
378 cycle,
379 cycle + 1
380 );
381 }
382
383 assert_eq!(counters.seq_cycles(), 3);
384 }
385
386 #[test]
387 fn test_packet_loss_calculation() {
388 let counters = RtpCounters::new("PCMU");
389
390 counters.record_received(100, 0);
392 counters.record_received(100, 1);
393 counters.record_received(100, 2);
394 counters.record_received(100, 5);
395 counters.record_received(100, 6);
396
397 let stats = counters.snapshot();
398 assert_eq!(stats.packets_received, 5);
399 assert_eq!(stats.packets_lost, 2);
402 }
403}