1use crate::GapPayload;
14use std::collections::HashMap;
15use std::collections::VecDeque;
16
17struct SourceState {
19 waiting: VecDeque<GapPayload>,
21 next: Option<u32>,
23}
24
25impl SourceState {
26 fn new() -> Self {
27 Self {
28 waiting: VecDeque::new(),
29 next: None,
30 }
31 }
32
33 fn insert(&mut self, p: GapPayload, capacity: usize) -> Option<GapPayload> {
34 if self
35 .waiting
36 .iter()
37 .any(|q| q.rtp_sequence == p.rtp_sequence)
38 {
39 return None;
40 }
41 let pos = self
44 .waiting
45 .iter()
46 .position(|q| q.rtp_sequence > p.rtp_sequence)
47 .unwrap_or(self.waiting.len());
48 self.waiting.insert(pos, p);
49 if self.waiting.len() > capacity {
50 return self.waiting.pop_front();
51 }
52 None
53 }
54}
55
56#[derive(Debug)]
58pub enum JitterPush {
59 Accepted,
61 Late,
64 Evicted(GapPayload),
67}
68
69pub struct JitterBuffer {
71 capacity_per_source: usize,
72 sources: HashMap<u32, SourceState>,
73}
74
75impl JitterBuffer {
76 pub fn new(capacity_per_source: usize) -> Self {
82 assert!(capacity_per_source > 0, "capacity must be > 0");
83 Self {
84 capacity_per_source,
85 sources: HashMap::new(),
86 }
87 }
88
89 pub fn push(&mut self, frame: GapPayload) -> JitterPush {
91 let state = self
92 .sources
93 .entry(frame.media_source_id)
94 .or_insert_with(SourceState::new);
95 if let Some(next) = state.next
96 && frame.rtp_sequence < next
97 {
98 return JitterPush::Late;
99 }
100 match state.insert(frame, self.capacity_per_source) {
101 Some(evicted) => JitterPush::Evicted(evicted),
102 None => JitterPush::Accepted,
103 }
104 }
105
106 pub fn pop_in_order(&mut self, media_source_id: u32) -> Option<GapPayload> {
111 let state = self.sources.get_mut(&media_source_id)?;
112 let head = state.waiting.front()?;
113 if let Some(next) = state.next
114 && head.rtp_sequence != next
115 {
116 return None;
117 }
118 let popped = state.waiting.pop_front()?;
119 state.next = Some(popped.rtp_sequence.wrapping_add(1));
120 Some(popped)
121 }
122
123 pub fn pop_force(&mut self, media_source_id: u32) -> Option<GapPayload> {
128 let state = self.sources.get_mut(&media_source_id)?;
129 let popped = state.waiting.pop_front()?;
130 state.next = Some(popped.rtp_sequence.wrapping_add(1));
131 Some(popped)
132 }
133
134 pub fn len_for(&self, media_source_id: u32) -> usize {
136 self.sources
137 .get(&media_source_id)
138 .map(|s| s.waiting.len())
139 .unwrap_or(0)
140 }
141
142 pub fn clear(&mut self) {
144 self.sources.clear();
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151
152 fn frame(src: u32, seq: u32) -> GapPayload {
153 GapPayload::opus_20ms(src, seq as u16, 0, vec![seq as u8])
154 }
155
156 #[test]
157 fn reorders_out_of_order_pushes() {
158 let mut jb = JitterBuffer::new(8);
159 for s in [3u32, 1, 2, 4] {
160 assert!(matches!(jb.push(frame(1, s)), JitterPush::Accepted));
161 }
162 let order: Vec<u32> = std::iter::from_fn(|| jb.pop_in_order(1))
163 .map(|f| f.rtp_sequence)
164 .collect();
165 assert_eq!(order, vec![1, 2, 3, 4]);
166 }
167
168 #[test]
169 fn rejects_late_frames() {
170 let mut jb = JitterBuffer::new(4);
171 jb.push(frame(1, 5));
172 jb.pop_in_order(1).unwrap();
173 let r = jb.push(frame(1, 4));
174 assert!(matches!(r, JitterPush::Late));
175 }
176
177 #[test]
178 fn pop_force_skips_missing() {
179 let mut jb = JitterBuffer::new(4);
180 jb.push(frame(1, 1));
181 jb.push(frame(1, 3));
182 assert_eq!(jb.pop_in_order(1).unwrap().rtp_sequence, 1);
183 assert!(jb.pop_in_order(1).is_none());
184 assert_eq!(jb.pop_force(1).unwrap().rtp_sequence, 3);
185 }
186}