Skip to main content

gap/
jitter.rs

1//! Bounded reorder buffer for GAP audio frames.
2//!
3//! Real-time voice traffic arrives out of order. GAP §7 RECOMMENDS a small
4//! jitter buffer at the receiver: hold incoming frames briefly so the
5//! decoder consumes them in `rtp_sequence` order, drop frames that arrive
6//! after their playout deadline.
7//!
8//! [`JitterBuffer`] is a tiny, allocation-free reorder window keyed by
9//! `media_source_id`. It does not own a clock — the application calls
10//! [`JitterBuffer::pop_in_order`] from its render thread to drain the
11//! next-expected frame.
12
13use crate::GapPayload;
14use std::collections::HashMap;
15use std::collections::VecDeque;
16
17/// Per-source state held by [`JitterBuffer`].
18struct SourceState {
19    /// Frames waiting in the reorder window, sorted by `rtp_sequence` ascending.
20    waiting: VecDeque<GapPayload>,
21    /// Next sequence number we expect to pop.
22    next: Option<u32>,
23}
24
25impl SourceState {
26    fn new() -> Self {
27        Self { waiting: VecDeque::new(), next: None }
28    }
29
30    fn insert(&mut self, p: GapPayload, capacity: usize) -> Option<GapPayload> {
31        if self.waiting.iter().any(|q| q.rtp_sequence == p.rtp_sequence) {
32            return None;
33        }
34        // O(n) linear scan — n ≤ 16 in practice (Opus frame window), so this
35        // is cheaper than a BTreeMap for tiny N.
36        let pos = self
37            .waiting
38            .iter()
39            .position(|q| q.rtp_sequence > p.rtp_sequence)
40            .unwrap_or(self.waiting.len());
41        self.waiting.insert(pos, p);
42        if self.waiting.len() > capacity {
43            return self.waiting.pop_front();
44        }
45        None
46    }
47}
48
49/// Outcome of [`JitterBuffer::push`].
50#[derive(Debug)]
51pub enum JitterPush {
52    /// Frame was buffered.
53    Accepted,
54    /// Frame is older than the next expected sequence — dropped without
55    /// inserting.
56    Late,
57    /// Buffer was full; the oldest queued frame was evicted to make room
58    /// (returned here so the application can still meter / log it).
59    Evicted(GapPayload),
60}
61
62/// Bounded reorder window for GAP audio frames.
63pub struct JitterBuffer {
64    capacity_per_source: usize,
65    sources: HashMap<u32, SourceState>,
66}
67
68impl JitterBuffer {
69    /// Builds a jitter buffer that retains up to `capacity_per_source`
70    /// frames for each `media_source_id`.
71    ///
72    /// # Panics
73    /// Panics if `capacity_per_source == 0`.
74    pub fn new(capacity_per_source: usize) -> Self {
75        assert!(capacity_per_source > 0, "capacity must be > 0");
76        Self {
77            capacity_per_source,
78            sources: HashMap::new(),
79        }
80    }
81
82    /// Inserts a frame into the buffer.
83    pub fn push(&mut self, frame: GapPayload) -> JitterPush {
84        let state = self
85            .sources
86            .entry(frame.media_source_id)
87            .or_insert_with(SourceState::new);
88        if let Some(next) = state.next
89            && frame.rtp_sequence < next
90        {
91            return JitterPush::Late;
92        }
93        match state.insert(frame, self.capacity_per_source) {
94            Some(evicted) => JitterPush::Evicted(evicted),
95            None => JitterPush::Accepted,
96        }
97    }
98
99    /// Pops the next frame for `media_source_id` if it is the next one
100    /// expected (i.e. its `rtp_sequence` is contiguous with the previous
101    /// pop). Returns `None` if the head of the queue is from the future
102    /// (caller should wait or skip via [`JitterBuffer::pop_force`]).
103    pub fn pop_in_order(&mut self, media_source_id: u32) -> Option<GapPayload> {
104        let state = self.sources.get_mut(&media_source_id)?;
105        let head = state.waiting.front()?;
106        if let Some(next) = state.next
107            && head.rtp_sequence != next
108        {
109            return None;
110        }
111        let popped = state.waiting.pop_front()?;
112        state.next = Some(popped.rtp_sequence.wrapping_add(1));
113        Some(popped)
114    }
115
116    /// Forces the next frame out regardless of contiguity. Use this when the
117    /// playout deadline is reached and the next-expected packet is still
118    /// missing — the caller will play the next available frame and skip the
119    /// gap.
120    pub fn pop_force(&mut self, media_source_id: u32) -> Option<GapPayload> {
121        let state = self.sources.get_mut(&media_source_id)?;
122        let popped = state.waiting.pop_front()?;
123        state.next = Some(popped.rtp_sequence.wrapping_add(1));
124        Some(popped)
125    }
126
127    /// Returns the number of frames buffered for the given source.
128    pub fn len_for(&self, media_source_id: u32) -> usize {
129        self.sources
130            .get(&media_source_id)
131            .map(|s| s.waiting.len())
132            .unwrap_or(0)
133    }
134
135    /// Drops every queued frame and forgets every source.
136    pub fn clear(&mut self) {
137        self.sources.clear();
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    fn frame(src: u32, seq: u32) -> GapPayload {
146        GapPayload::opus_20ms(src, seq as u16, 0, vec![seq as u8])
147    }
148
149    #[test]
150    fn reorders_out_of_order_pushes() {
151        let mut jb = JitterBuffer::new(8);
152        for s in [3u32, 1, 2, 4] {
153            assert!(matches!(jb.push(frame(1, s)), JitterPush::Accepted));
154        }
155        let order: Vec<u32> = std::iter::from_fn(|| jb.pop_in_order(1))
156            .map(|f| f.rtp_sequence)
157            .collect();
158        assert_eq!(order, vec![1, 2, 3, 4]);
159    }
160
161    #[test]
162    fn rejects_late_frames() {
163        let mut jb = JitterBuffer::new(4);
164        jb.push(frame(1, 5));
165        jb.pop_in_order(1).unwrap();
166        let r = jb.push(frame(1, 4));
167        assert!(matches!(r, JitterPush::Late));
168    }
169
170    #[test]
171    fn pop_force_skips_missing() {
172        let mut jb = JitterBuffer::new(4);
173        jb.push(frame(1, 1));
174        jb.push(frame(1, 3));
175        assert_eq!(jb.pop_in_order(1).unwrap().rtp_sequence, 1);
176        assert!(jb.pop_in_order(1).is_none());
177        assert_eq!(jb.pop_force(1).unwrap().rtp_sequence, 3);
178    }
179}