1use std::collections::BTreeMap;
31use std::time::{Duration, Instant};
32
33#[derive(Debug, Clone, Copy, PartialEq)]
35pub enum JitterMode {
36 Fixed {
38 delay_ms: u32,
40 },
41 Adaptive {
43 target_ms: u32,
45 min_ms: u32,
47 max_ms: u32,
49 },
50}
51
52impl Default for JitterMode {
53 fn default() -> Self {
54 Self::Adaptive {
55 target_ms: 60,
56 min_ms: 20,
57 max_ms: 200,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct JitterConfig {
65 pub mode: JitterMode,
67 pub clock_rate: u32,
69 pub max_packets: usize,
71}
72
73impl Default for JitterConfig {
74 fn default() -> Self {
75 Self {
76 mode: JitterMode::default(),
77 clock_rate: 8000,
78 max_packets: 50,
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct BufferedPacket {
86 pub seq: u16,
88 pub timestamp: u32,
90 pub payload: Vec<u8>,
92 pub received_at: Instant,
94 pub synthesized: bool,
96}
97
98#[derive(Debug, Clone, Default)]
100pub struct JitterStats {
101 pub packets_received: u64,
103 pub packets_dropped: u64,
105 pub packets_lost: u64,
107 pub packets_played: u64,
109 pub buffer_depth: usize,
111 pub current_delay_ms: u32,
113 pub observed_jitter_ms: f64,
115}
116
117pub struct JitterBuffer {
119 config: JitterConfig,
120 packets: BTreeMap<u32, BufferedPacket>,
122 playout_seq: Option<u32>,
124 seq_cycles: u16,
126 last_seq: Option<u16>,
128 current_delay_ms: u32,
130 jitter_estimate: f64,
132 last_transit: Option<i64>,
134 stats: JitterStats,
136 base_timestamp: Option<u32>,
138 base_time: Option<Instant>,
140 playing: bool,
142}
143
144impl JitterBuffer {
145 pub fn new(config: JitterConfig) -> Self {
147 let initial_delay = match config.mode {
148 JitterMode::Fixed { delay_ms } => delay_ms,
149 JitterMode::Adaptive { target_ms, .. } => target_ms,
150 };
151
152 Self {
153 config,
154 packets: BTreeMap::new(),
155 playout_seq: None,
156 seq_cycles: 0,
157 last_seq: None,
158 current_delay_ms: initial_delay,
159 jitter_estimate: 0.0,
160 last_transit: None,
161 stats: JitterStats::default(),
162 base_timestamp: None,
163 base_time: None,
164 playing: false,
165 }
166 }
167
168 pub fn push(&mut self, seq: u16, timestamp: u32, payload: Vec<u8>) -> bool {
172 let now = Instant::now();
173 self.stats.packets_received += 1;
174
175 if self.base_timestamp.is_none() {
177 self.base_timestamp = Some(timestamp);
178 self.base_time = Some(now);
179 self.last_seq = Some(seq);
180 }
181
182 let extended_seq = self.extend_seq(seq);
184
185 self.update_jitter(timestamp, now);
187
188 if let Some(playout) = self.playout_seq
190 && extended_seq < playout
191 {
192 self.stats.packets_dropped += 1;
193 return false;
194 }
195
196 if self.packets.len() >= self.config.max_packets {
198 if let Some(&oldest_seq) = self.packets.keys().next() {
200 self.packets.remove(&oldest_seq);
201 self.stats.packets_dropped += 1;
202 }
203 }
204
205 self.packets.insert(
207 extended_seq,
208 BufferedPacket {
209 seq,
210 timestamp,
211 payload,
212 received_at: now,
213 synthesized: false,
214 },
215 );
216
217 self.last_seq = Some(seq);
218 true
219 }
220
221 pub fn pop(&mut self) -> Option<BufferedPacket> {
225 let now = Instant::now();
226
227 let target_seq = if let Some(seq) = self.playout_seq {
229 seq
230 } else {
231 if !self.should_start_playout(now) {
233 return None;
234 }
235 let first_seq = *self.packets.keys().next()?;
237 self.playout_seq = Some(first_seq);
238 self.playing = true;
239 first_seq
240 };
241
242 let packet = if let Some(pkt) = self.packets.remove(&target_seq) {
244 Some(pkt)
245 } else {
246 self.stats.packets_lost += 1;
248 Some(BufferedPacket {
250 seq: (target_seq & 0xFFFF) as u16,
251 timestamp: self.estimate_timestamp(target_seq),
252 payload: Vec::new(), received_at: now,
254 synthesized: true,
255 })
256 };
257
258 self.stats.packets_played += 1;
259
260 self.playout_seq = Some(target_seq.wrapping_add(1));
262
263 self.stats.buffer_depth = self.packets.len();
265 self.stats.current_delay_ms = self.current_delay_ms;
266 self.stats.observed_jitter_ms = self.jitter_ms();
267
268 if matches!(self.config.mode, JitterMode::Adaptive { .. }) {
270 self.adapt_delay();
271 }
272
273 packet
274 }
275
276 pub fn stats(&self) -> JitterStats {
278 let mut stats = self.stats.clone();
279 stats.buffer_depth = self.packets.len();
280 stats.current_delay_ms = self.current_delay_ms;
281 stats.observed_jitter_ms = self.jitter_ms();
282 stats
283 }
284
285 pub fn delay_ms(&self) -> u32 {
287 self.current_delay_ms
288 }
289
290 pub fn jitter_ms(&self) -> f64 {
292 (self.jitter_estimate / self.config.clock_rate as f64) * 1000.0
294 }
295
296 pub fn reset(&mut self) {
298 self.packets.clear();
299 self.playout_seq = None;
300 self.seq_cycles = 0;
301 self.last_seq = None;
302 self.jitter_estimate = 0.0;
303 self.last_transit = None;
304 self.base_timestamp = None;
305 self.base_time = None;
306 self.playing = false;
307 self.stats = JitterStats::default();
308
309 self.current_delay_ms = match self.config.mode {
311 JitterMode::Fixed { delay_ms } => delay_ms,
312 JitterMode::Adaptive { target_ms, .. } => target_ms,
313 };
314 }
315
316 pub fn flush(&mut self) -> Vec<BufferedPacket> {
318 let packets: Vec<_> = self.packets.values().cloned().collect();
319 self.packets.clear();
320 packets
321 }
322
323 pub fn is_empty(&self) -> bool {
325 self.packets.is_empty()
326 }
327
328 pub fn len(&self) -> usize {
330 self.packets.len()
331 }
332
333 fn extend_seq(&mut self, seq: u16) -> u32 {
336 if let Some(last) = self.last_seq {
337 if seq < last && (last.wrapping_sub(seq)) > 0x8000 {
339 self.seq_cycles = self.seq_cycles.wrapping_add(1);
340 } else if seq > last && (seq.wrapping_sub(last)) > 0x8000 {
341 self.seq_cycles = self.seq_cycles.wrapping_sub(1);
342 }
343 }
344 ((self.seq_cycles as u32) << 16) | (seq as u32)
345 }
346
347 fn update_jitter(&mut self, timestamp: u32, now: Instant) {
348 let base_ts = match self.base_timestamp {
349 Some(ts) => ts,
350 None => return,
351 };
352 let base_time = match self.base_time {
353 Some(t) => t,
354 None => return,
355 };
356
357 let arrival_ts = now.duration_since(base_time).as_micros() as i64
359 * self.config.clock_rate as i64
360 / 1_000_000;
361 let send_ts = timestamp.wrapping_sub(base_ts) as i64;
362 let transit = arrival_ts - send_ts;
363
364 if let Some(last_transit) = self.last_transit {
365 let d = (transit - last_transit).abs() as f64;
367 self.jitter_estimate += (d - self.jitter_estimate) / 16.0;
368 }
369
370 self.last_transit = Some(transit);
371 }
372
373 fn should_start_playout(&self, now: Instant) -> bool {
374 if self.packets.is_empty() {
376 return false;
377 }
378
379 if let Some(base_time) = self.base_time {
381 let elapsed = now.duration_since(base_time);
382 let delay = Duration::from_millis(self.current_delay_ms as u64);
383 return elapsed >= delay;
384 }
385
386 false
387 }
388
389 fn adapt_delay(&mut self) {
390 let JitterMode::Adaptive {
391 min_ms,
392 max_ms,
393 target_ms: _,
394 } = self.config.mode
395 else {
396 return;
397 };
398
399 let jitter_ms = self.jitter_ms();
401 let target = (jitter_ms * 2.0) as u32;
402 let target = target.clamp(min_ms, max_ms);
403
404 if target > self.current_delay_ms {
406 self.current_delay_ms = self
408 .current_delay_ms
409 .saturating_add(((target - self.current_delay_ms) / 4).max(1));
410 } else if target < self.current_delay_ms {
411 self.current_delay_ms = self
413 .current_delay_ms
414 .saturating_sub(((self.current_delay_ms - target) / 8).max(1));
415 }
416
417 self.current_delay_ms = self.current_delay_ms.clamp(min_ms, max_ms);
418 }
419
420 fn estimate_timestamp(&self, extended_seq: u32) -> u32 {
421 let samples_per_frame = self.config.clock_rate / 50; let base = self.base_timestamp.unwrap_or(0);
425 let seq_offset = extended_seq.wrapping_sub(self.playout_seq.unwrap_or(extended_seq));
426 base.wrapping_add(seq_offset * samples_per_frame)
427 }
428}
429
430impl std::fmt::Debug for JitterBuffer {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 f.debug_struct("JitterBuffer")
433 .field("mode", &self.config.mode)
434 .field("buffered", &self.packets.len())
435 .field("delay_ms", &self.current_delay_ms)
436 .field("jitter_ms", &self.jitter_ms())
437 .field("playing", &self.playing)
438 .finish()
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use std::thread::sleep;
446
447 #[test]
448 fn test_fixed_jitter_buffer_basic() {
449 let config = JitterConfig {
450 mode: JitterMode::Fixed { delay_ms: 20 },
451 clock_rate: 8000,
452 max_packets: 10,
453 };
454 let mut jitter = JitterBuffer::new(config);
455
456 assert!(jitter.push(0, 0, vec![1, 2, 3]));
458 assert!(jitter.push(1, 160, vec![4, 5, 6]));
459 assert!(jitter.push(2, 320, vec![7, 8, 9]));
460
461 assert_eq!(jitter.len(), 3);
462
463 sleep(Duration::from_millis(25));
465
466 let p1 = jitter.pop().unwrap();
468 assert_eq!(p1.seq, 0);
469 assert_eq!(p1.payload, vec![1, 2, 3]);
470
471 let p2 = jitter.pop().unwrap();
472 assert_eq!(p2.seq, 1);
473
474 let p3 = jitter.pop().unwrap();
475 assert_eq!(p3.seq, 2);
476 }
477
478 #[test]
479 fn test_packet_reordering() {
480 let config = JitterConfig {
481 mode: JitterMode::Fixed { delay_ms: 10 },
482 clock_rate: 8000,
483 max_packets: 10,
484 };
485 let mut jitter = JitterBuffer::new(config);
486
487 jitter.push(2, 320, vec![3]);
489 jitter.push(0, 0, vec![1]);
490 jitter.push(1, 160, vec![2]);
491
492 sleep(Duration::from_millis(15));
493
494 assert_eq!(jitter.pop().unwrap().seq, 0);
496 assert_eq!(jitter.pop().unwrap().seq, 1);
497 assert_eq!(jitter.pop().unwrap().seq, 2);
498 }
499
500 #[test]
501 fn test_packet_loss_detection() {
502 let config = JitterConfig {
503 mode: JitterMode::Fixed { delay_ms: 10 },
504 clock_rate: 8000,
505 max_packets: 10,
506 };
507 let mut jitter = JitterBuffer::new(config);
508
509 jitter.push(0, 0, vec![1]);
511 jitter.push(2, 320, vec![3]);
512
513 sleep(Duration::from_millis(15));
514
515 let p1 = jitter.pop().unwrap();
517 assert_eq!(p1.seq, 0);
518 assert!(!p1.synthesized);
519
520 let p2 = jitter.pop().unwrap();
522 assert_eq!(p2.seq, 1);
523 assert!(p2.synthesized);
524 assert!(p2.payload.is_empty());
525
526 let p3 = jitter.pop().unwrap();
528 assert_eq!(p3.seq, 2);
529 assert!(!p3.synthesized);
530
531 let stats = jitter.stats();
532 assert_eq!(stats.packets_lost, 1);
533 }
534
535 #[test]
536 fn test_late_packet_dropped() {
537 let config = JitterConfig {
538 mode: JitterMode::Fixed { delay_ms: 5 },
539 clock_rate: 8000,
540 max_packets: 10,
541 };
542 let mut jitter = JitterBuffer::new(config);
543
544 jitter.push(0, 0, vec![1]);
545 jitter.push(1, 160, vec![2]);
546
547 sleep(Duration::from_millis(10));
548
549 jitter.pop();
551
552 assert!(!jitter.push(0, 0, vec![1]));
554
555 let stats = jitter.stats();
556 assert_eq!(stats.packets_dropped, 1);
557 }
558
559 #[test]
560 fn test_adaptive_jitter_buffer() {
561 let config = JitterConfig {
562 mode: JitterMode::Adaptive {
563 target_ms: 40,
564 min_ms: 20,
565 max_ms: 200,
566 },
567 clock_rate: 8000,
568 max_packets: 20,
569 };
570 let mut jitter = JitterBuffer::new(config);
571
572 assert_eq!(jitter.delay_ms(), 40); for i in 0..10u16 {
576 jitter.push(i, i as u32 * 160, vec![i as u8]);
577 sleep(Duration::from_millis(5)); }
579
580 sleep(Duration::from_millis(50));
581
582 for _ in 0..5 {
584 jitter.pop();
585 }
586
587 let delay = jitter.delay_ms();
589 assert!(delay >= 20 && delay <= 200);
590 }
591
592 #[test]
593 fn test_sequence_rollover_in_jitter_buffer() {
594 let config = JitterConfig {
595 mode: JitterMode::Fixed { delay_ms: 5 },
596 clock_rate: 8000,
597 max_packets: 10,
598 };
599 let mut jitter = JitterBuffer::new(config);
600
601 jitter.push(65534, 0, vec![1]);
603 jitter.push(65535, 160, vec![2]);
604 jitter.push(0, 320, vec![3]); jitter.push(1, 480, vec![4]);
606
607 sleep(Duration::from_millis(10));
608
609 assert_eq!(jitter.pop().unwrap().seq, 65534);
611 assert_eq!(jitter.pop().unwrap().seq, 65535);
612 assert_eq!(jitter.pop().unwrap().seq, 0);
613 assert_eq!(jitter.pop().unwrap().seq, 1);
614 }
615
616 #[test]
617 fn test_buffer_overflow() {
618 let config = JitterConfig {
619 mode: JitterMode::Fixed { delay_ms: 100 },
620 clock_rate: 8000,
621 max_packets: 3,
622 };
623 let mut jitter = JitterBuffer::new(config);
624
625 jitter.push(0, 0, vec![1]);
627 jitter.push(1, 160, vec![2]);
628 jitter.push(2, 320, vec![3]);
629 jitter.push(3, 480, vec![4]); assert_eq!(jitter.len(), 3);
632
633 let stats = jitter.stats();
634 assert_eq!(stats.packets_dropped, 1);
635 }
636
637 #[test]
638 fn test_reset() {
639 let config = JitterConfig::default();
640 let mut jitter = JitterBuffer::new(config);
641
642 jitter.push(0, 0, vec![1]);
643 jitter.push(1, 160, vec![2]);
644
645 jitter.reset();
646
647 assert!(jitter.is_empty());
648 assert_eq!(jitter.stats().packets_received, 0);
649 }
650
651 #[test]
652 fn test_flush() {
653 let config = JitterConfig::default();
654 let mut jitter = JitterBuffer::new(config);
655
656 jitter.push(0, 0, vec![1]);
657 jitter.push(1, 160, vec![2]);
658 jitter.push(2, 320, vec![3]);
659
660 let flushed = jitter.flush();
661 assert_eq!(flushed.len(), 3);
662 assert!(jitter.is_empty());
663 }
664
665 #[test]
666 fn test_jitter_calculation() {
667 let config = JitterConfig {
668 mode: JitterMode::Fixed { delay_ms: 10 },
669 clock_rate: 8000,
670 max_packets: 20,
671 };
672 let mut jitter = JitterBuffer::new(config);
673
674 jitter.push(0, 0, vec![1]);
676 sleep(Duration::from_millis(20));
677 jitter.push(1, 160, vec![2]);
678 sleep(Duration::from_millis(25));
679 jitter.push(2, 320, vec![3]);
680 sleep(Duration::from_millis(15));
681 jitter.push(3, 480, vec![4]);
682
683 let jitter_ms = jitter.jitter_ms();
685 assert!(jitter_ms >= 0.0);
686 }
687
688 #[test]
689 fn test_stats() {
690 let config = JitterConfig {
691 mode: JitterMode::Fixed { delay_ms: 5 },
692 clock_rate: 8000,
693 max_packets: 10,
694 };
695 let mut jitter = JitterBuffer::new(config);
696
697 jitter.push(0, 0, vec![1]);
698 jitter.push(1, 160, vec![2]);
699 jitter.push(3, 480, vec![4]);
701
702 sleep(Duration::from_millis(10));
703
704 jitter.pop(); jitter.pop(); jitter.pop(); jitter.pop(); let stats = jitter.stats();
710 assert_eq!(stats.packets_received, 3);
711 assert_eq!(stats.packets_played, 4);
712 assert_eq!(stats.packets_lost, 1);
713 }
714}