1use crate::GapPayload;
14use std::collections::HashMap;
15use std::collections::VecDeque;
16
17struct SourceState {
19 waiting: VecDeque<GapPayload>,
21 next: Option<u32>,
23}
24
25impl SourceState {
26 fn new() -> Self {
27 Self { waiting: VecDeque::new(), next: None }
28 }
29
30 fn insert(&mut self, p: GapPayload, capacity: usize) -> Option<GapPayload> {
31 if self.waiting.iter().any(|q| q.rtp_sequence == p.rtp_sequence) {
32 return None;
33 }
34 let pos = self
37 .waiting
38 .iter()
39 .position(|q| q.rtp_sequence > p.rtp_sequence)
40 .unwrap_or(self.waiting.len());
41 self.waiting.insert(pos, p);
42 if self.waiting.len() > capacity {
43 return self.waiting.pop_front();
44 }
45 None
46 }
47}
48
49#[derive(Debug)]
51pub enum JitterPush {
52 Accepted,
54 Late,
57 Evicted(GapPayload),
60}
61
62pub struct JitterBuffer {
64 capacity_per_source: usize,
65 sources: HashMap<u32, SourceState>,
66}
67
68impl JitterBuffer {
69 pub fn new(capacity_per_source: usize) -> Self {
75 assert!(capacity_per_source > 0, "capacity must be > 0");
76 Self {
77 capacity_per_source,
78 sources: HashMap::new(),
79 }
80 }
81
82 pub fn push(&mut self, frame: GapPayload) -> JitterPush {
84 let state = self
85 .sources
86 .entry(frame.media_source_id)
87 .or_insert_with(SourceState::new);
88 if let Some(next) = state.next
89 && frame.rtp_sequence < next
90 {
91 return JitterPush::Late;
92 }
93 match state.insert(frame, self.capacity_per_source) {
94 Some(evicted) => JitterPush::Evicted(evicted),
95 None => JitterPush::Accepted,
96 }
97 }
98
99 pub fn pop_in_order(&mut self, media_source_id: u32) -> Option<GapPayload> {
104 let state = self.sources.get_mut(&media_source_id)?;
105 let head = state.waiting.front()?;
106 if let Some(next) = state.next
107 && head.rtp_sequence != next
108 {
109 return None;
110 }
111 let popped = state.waiting.pop_front()?;
112 state.next = Some(popped.rtp_sequence.wrapping_add(1));
113 Some(popped)
114 }
115
116 pub fn pop_force(&mut self, media_source_id: u32) -> Option<GapPayload> {
121 let state = self.sources.get_mut(&media_source_id)?;
122 let popped = state.waiting.pop_front()?;
123 state.next = Some(popped.rtp_sequence.wrapping_add(1));
124 Some(popped)
125 }
126
127 pub fn len_for(&self, media_source_id: u32) -> usize {
129 self.sources
130 .get(&media_source_id)
131 .map(|s| s.waiting.len())
132 .unwrap_or(0)
133 }
134
135 pub fn clear(&mut self) {
137 self.sources.clear();
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 fn frame(src: u32, seq: u32) -> GapPayload {
146 GapPayload::opus_20ms(src, seq as u16, 0, vec![seq as u8])
147 }
148
149 #[test]
150 fn reorders_out_of_order_pushes() {
151 let mut jb = JitterBuffer::new(8);
152 for s in [3u32, 1, 2, 4] {
153 assert!(matches!(jb.push(frame(1, s)), JitterPush::Accepted));
154 }
155 let order: Vec<u32> = std::iter::from_fn(|| jb.pop_in_order(1))
156 .map(|f| f.rtp_sequence)
157 .collect();
158 assert_eq!(order, vec![1, 2, 3, 4]);
159 }
160
161 #[test]
162 fn rejects_late_frames() {
163 let mut jb = JitterBuffer::new(4);
164 jb.push(frame(1, 5));
165 jb.pop_in_order(1).unwrap();
166 let r = jb.push(frame(1, 4));
167 assert!(matches!(r, JitterPush::Late));
168 }
169
170 #[test]
171 fn pop_force_skips_missing() {
172 let mut jb = JitterBuffer::new(4);
173 jb.push(frame(1, 1));
174 jb.push(frame(1, 3));
175 assert_eq!(jb.pop_in_order(1).unwrap().rtp_sequence, 1);
176 assert!(jb.pop_in_order(1).is_none());
177 assert_eq!(jb.pop_force(1).unwrap().rtp_sequence, 3);
178 }
179}