Skip to main content

gap/
jitter.rs

1//! Bounded reorder buffer for GAP audio frames.
2//!
3//! Real-time voice traffic arrives out of order. GAP §7 RECOMMENDS a small
4//! jitter buffer at the receiver: hold incoming frames briefly so the
5//! decoder consumes them in `rtp_sequence` order, drop frames that arrive
6//! after their playout deadline.
7//!
8//! [`JitterBuffer`] is a tiny, allocation-free reorder window keyed by
9//! `media_source_id`. It does not own a clock — the application calls
10//! [`JitterBuffer::pop_in_order`] from its render thread to drain the
11//! next-expected frame.
12
13use crate::GapPayload;
14use std::collections::HashMap;
15use std::collections::VecDeque;
16
17/// Per-source state held by [`JitterBuffer`].
18struct SourceState {
19    /// Frames waiting in the reorder window, sorted by `rtp_sequence` ascending.
20    waiting: VecDeque<GapPayload>,
21    /// Next sequence number we expect to pop.
22    next: Option<u32>,
23}
24
25impl SourceState {
26    fn new() -> Self {
27        Self {
28            waiting: VecDeque::new(),
29            next: None,
30        }
31    }
32
33    fn insert(&mut self, p: GapPayload, capacity: usize) -> Option<GapPayload> {
34        if self
35            .waiting
36            .iter()
37            .any(|q| q.rtp_sequence == p.rtp_sequence)
38        {
39            return None;
40        }
41        // O(n) linear scan — n ≤ 16 in practice (Opus frame window), so this
42        // is cheaper than a BTreeMap for tiny N.
43        let pos = self
44            .waiting
45            .iter()
46            .position(|q| q.rtp_sequence > p.rtp_sequence)
47            .unwrap_or(self.waiting.len());
48        self.waiting.insert(pos, p);
49        if self.waiting.len() > capacity {
50            return self.waiting.pop_front();
51        }
52        None
53    }
54}
55
56/// Outcome of [`JitterBuffer::push`].
57#[derive(Debug)]
58pub enum JitterPush {
59    /// Frame was buffered.
60    Accepted,
61    /// Frame is older than the next expected sequence — dropped without
62    /// inserting.
63    Late,
64    /// Buffer was full; the oldest queued frame was evicted to make room
65    /// (returned here so the application can still meter / log it).
66    Evicted(GapPayload),
67}
68
69/// Bounded reorder window for GAP audio frames.
70pub struct JitterBuffer {
71    capacity_per_source: usize,
72    sources: HashMap<u32, SourceState>,
73}
74
75impl JitterBuffer {
76    /// Builds a jitter buffer that retains up to `capacity_per_source`
77    /// frames for each `media_source_id`.
78    ///
79    /// # Panics
80    /// Panics if `capacity_per_source == 0`.
81    pub fn new(capacity_per_source: usize) -> Self {
82        assert!(capacity_per_source > 0, "capacity must be > 0");
83        Self {
84            capacity_per_source,
85            sources: HashMap::new(),
86        }
87    }
88
89    /// Inserts a frame into the buffer.
90    pub fn push(&mut self, frame: GapPayload) -> JitterPush {
91        let state = self
92            .sources
93            .entry(frame.media_source_id)
94            .or_insert_with(SourceState::new);
95        if let Some(next) = state.next
96            && frame.rtp_sequence < next
97        {
98            return JitterPush::Late;
99        }
100        match state.insert(frame, self.capacity_per_source) {
101            Some(evicted) => JitterPush::Evicted(evicted),
102            None => JitterPush::Accepted,
103        }
104    }
105
106    /// Pops the next frame for `media_source_id` if it is the next one
107    /// expected (i.e. its `rtp_sequence` is contiguous with the previous
108    /// pop). Returns `None` if the head of the queue is from the future
109    /// (caller should wait or skip via [`JitterBuffer::pop_force`]).
110    pub fn pop_in_order(&mut self, media_source_id: u32) -> Option<GapPayload> {
111        let state = self.sources.get_mut(&media_source_id)?;
112        let head = state.waiting.front()?;
113        if let Some(next) = state.next
114            && head.rtp_sequence != next
115        {
116            return None;
117        }
118        let popped = state.waiting.pop_front()?;
119        state.next = Some(popped.rtp_sequence.wrapping_add(1));
120        Some(popped)
121    }
122
123    /// Forces the next frame out regardless of contiguity. Use this when the
124    /// playout deadline is reached and the next-expected packet is still
125    /// missing — the caller will play the next available frame and skip the
126    /// gap.
127    pub fn pop_force(&mut self, media_source_id: u32) -> Option<GapPayload> {
128        let state = self.sources.get_mut(&media_source_id)?;
129        let popped = state.waiting.pop_front()?;
130        state.next = Some(popped.rtp_sequence.wrapping_add(1));
131        Some(popped)
132    }
133
134    /// Returns the number of frames buffered for the given source.
135    pub fn len_for(&self, media_source_id: u32) -> usize {
136        self.sources
137            .get(&media_source_id)
138            .map(|s| s.waiting.len())
139            .unwrap_or(0)
140    }
141
142    /// Drops every queued frame and forgets every source.
143    pub fn clear(&mut self) {
144        self.sources.clear();
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    fn frame(src: u32, seq: u32) -> GapPayload {
153        GapPayload::opus_20ms(src, seq as u16, 0, vec![seq as u8])
154    }
155
156    #[test]
157    fn reorders_out_of_order_pushes() {
158        let mut jb = JitterBuffer::new(8);
159        for s in [3u32, 1, 2, 4] {
160            assert!(matches!(jb.push(frame(1, s)), JitterPush::Accepted));
161        }
162        let order: Vec<u32> = std::iter::from_fn(|| jb.pop_in_order(1))
163            .map(|f| f.rtp_sequence)
164            .collect();
165        assert_eq!(order, vec![1, 2, 3, 4]);
166    }
167
168    #[test]
169    fn rejects_late_frames() {
170        let mut jb = JitterBuffer::new(4);
171        jb.push(frame(1, 5));
172        jb.pop_in_order(1).unwrap();
173        let r = jb.push(frame(1, 4));
174        assert!(matches!(r, JitterPush::Late));
175    }
176
177    #[test]
178    fn pop_force_skips_missing() {
179        let mut jb = JitterBuffer::new(4);
180        jb.push(frame(1, 1));
181        jb.push(frame(1, 3));
182        assert_eq!(jb.pop_in_order(1).unwrap().rtp_sequence, 1);
183        assert!(jb.pop_in_order(1).is_none());
184        assert_eq!(jb.pop_force(1).unwrap().rtp_sequence, 3);
185    }
186}