Skip to main content

videocall_codecs/
jitter_buffer.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19//! The JitterBuffer, which reorders, buffers, and prepares frames for the decoder.
20
21use crate::decoder::Decodable;
22use crate::frame::{FrameBuffer, FrameType, VideoFrame};
23use crate::jitter_estimator::JitterEstimator;
24use std::collections::BTreeMap;
25
26// --- Playout Delay Constants ---
27/// The minimum delay we will allow. Prevents the buffer from becoming completely empty.
28const MIN_PLAYOUT_DELAY_MS: f64 = 10.0;
29/// The maximum delay. Prevents the delay from growing indefinitely.
30const MAX_PLAYOUT_DELAY_MS: f64 = 500.0;
31/// A multiplier applied to the jitter estimate to provide a safety margin.
32/// A value of 3.0 means we buffer enough to handle jitter up to 3x the running average.
33const JITTER_MULTIPLIER: f64 = 3.0;
34/// A smoothing factor for delay updates to prevent rapid, jarring changes.
35const DELAY_SMOOTHING_FACTOR: f64 = 0.99;
36
37/// The maximum number of frames the buffer will hold before rejecting new ones.
38const MAX_BUFFER_SIZE: usize = 200;
39// From libwebrtc's jitter_buffer_common.h
40const MAX_CONSECUTIVE_OLD_FRAMES: u64 = 300;
41/// If an incoming keyframe is this many sequence numbers behind the last decoded frame, we assume
42/// the stream restarted (e.g., camera switch) and flush immediately. Smaller rollbacks are treated
43/// as harmless reordering.
44const STREAM_RESTART_BACKTRACK_THRESHOLD: u64 = 30;
45
46pub struct JitterBuffer<T> {
47    /// Frames that have been received but are not yet continuous with the last decoded frame.
48    /// A BTreeMap is used to keep them sorted by sequence number automatically.
49    buffered_frames: BTreeMap<u64, FrameBuffer>,
50
51    /// The sequence number of the last frame that was sent to the decoder.
52    last_decoded_sequence_number: Option<u64>,
53
54    /// The jitter estimator for monitoring network conditions.
55    jitter_estimator: JitterEstimator,
56
57    /// The current adaptive target for playout delay, in milliseconds.
58    target_playout_delay_ms: f64,
59
60    /// A counter for frames that were dropped due to being stale.
61    dropped_frames_count: u64,
62
63    /// A counter for consecutive old frames to detect stream corruption.
64    num_consecutive_old_frames: u64,
65
66    // --- Decoder Interface ---
67    /// The abstract decoder that will receive frames ready for decoding.
68    decoder: Box<dyn Decodable<Frame = T>>,
69}
70
71impl<T> JitterBuffer<T> {
72    pub fn new(decoder: Box<dyn Decodable<Frame = T>>) -> Self {
73        Self {
74            buffered_frames: BTreeMap::new(),
75            last_decoded_sequence_number: None,
76            jitter_estimator: JitterEstimator::new(),
77            target_playout_delay_ms: MIN_PLAYOUT_DELAY_MS,
78            dropped_frames_count: 0,
79            num_consecutive_old_frames: 0,
80            decoder,
81        }
82    }
83
84    /// Returns the current number of frames buffered and waiting in the jitter buffer.
85    pub fn buffered_frames_len(&self) -> usize {
86        self.buffered_frames.len()
87    }
88
89    /// The main entry point for a new frame arriving from the network.
90    pub fn insert_frame(&mut self, frame: VideoFrame, arrival_time_ms: u128) {
91        let seq = frame.sequence_number;
92        println!("[JITTER_BUFFER] Inserting frame: {seq}");
93
94        // --- Pre-insertion checks ---
95        // 1. Ignore frames that are too old.
96        if let Some(last_decoded) = self.last_decoded_sequence_number {
97            if seq <= last_decoded {
98                // Special case: if the old frame is a KEYFRAME, it likely indicates the sender has
99                // restarted (e.g., camera switch). Flush immediately so we can start decoding from
100                // this new keyframe without waiting for the old-frame counter threshold.
101                if frame.frame_type == FrameType::KeyFrame
102                    && last_decoded.saturating_sub(seq) > STREAM_RESTART_BACKTRACK_THRESHOLD
103                {
104                    println!(
105                        "[JITTER_BUFFER] Detected keyframe with older sequence ({seq} <= {last_decoded}). Assuming stream restart – flushing buffer."
106                    );
107                    self.flush();
108                } else {
109                    println!("[JITTER_BUFFER] Ignoring old frame: {seq}");
110                    self.num_consecutive_old_frames += 1;
111                    if self.num_consecutive_old_frames > MAX_CONSECUTIVE_OLD_FRAMES {
112                        println!(
113                            "[JITTER_BUFFER] Received {} consecutive old frames. Flushing buffer.",
114                            self.num_consecutive_old_frames
115                        );
116                        self.flush();
117                    }
118                }
119                return;
120            }
121        }
122
123        // If we received a valid frame, reset the counter.
124        self.num_consecutive_old_frames = 0;
125
126        // 2. Check if the buffer is full.
127        if self.buffered_frames.len() >= MAX_BUFFER_SIZE {
128            // Allow a keyframe to clear the buffer if it's full.
129            if frame.frame_type == FrameType::KeyFrame {
130                println!("[JITTER_BUFFER] Buffer full, but received keyframe. Clearing buffer.");
131                self.drop_all_frames();
132            } else {
133                println!("[JITTER_BUFFER] Buffer full. Rejecting frame: {seq}");
134                return; // Reject the frame.
135            }
136        }
137
138        println!("[JITTER_BUFFER] Received frame: {seq}");
139
140        self.jitter_estimator.update_estimate(seq, arrival_time_ms);
141        self.update_target_playout_delay();
142
143        let fb = FrameBuffer::new(frame, arrival_time_ms);
144        self.buffered_frames.insert(seq, fb);
145
146        self.find_and_move_continuous_frames(arrival_time_ms);
147    }
148
149    /// Updates the target playout delay based on the current jitter estimate.
150    fn update_target_playout_delay(&mut self) {
151        let jitter_estimate = self.jitter_estimator.get_jitter_estimate_ms();
152
153        // Calculate the raw target delay with a safety margin.
154        let raw_target = jitter_estimate * JITTER_MULTIPLIER;
155
156        // Clamp the target to our defined min/max bounds.
157        let clamped_target = raw_target.clamp(MIN_PLAYOUT_DELAY_MS, MAX_PLAYOUT_DELAY_MS);
158
159        // Smooth the transition to the new target to avoid sudden changes.
160        self.target_playout_delay_ms = (self.target_playout_delay_ms * DELAY_SMOOTHING_FACTOR)
161            + (clamped_target * (1.0 - DELAY_SMOOTHING_FACTOR));
162    }
163
164    /// Checks the buffered frames and moves any continuous frames to the decodable queue.
165    pub fn find_and_move_continuous_frames(&mut self, current_time_ms: u128) {
166        let mut frames_were_moved = false;
167
168        println!(
169            "[JB_POLL] Checking buffer. Last decoded: {:?}, Buffer size: {}, Target delay: {:.2}ms",
170            self.last_decoded_sequence_number,
171            self.buffered_frames.len(),
172            self.target_playout_delay_ms
173        );
174
175        loop {
176            let mut found_frame_to_move = false;
177
178            let next_decodable_key: Option<u64> = if let Some(last_seq) =
179                self.last_decoded_sequence_number
180            {
181                // CASE 1: We are in a continuous stream. Look for the next frame.
182                let next_continuous_seq = last_seq + 1;
183                if self.buffered_frames.contains_key(&next_continuous_seq) {
184                    println!("[JB_POLL] Seeking next continuous frame: {next_continuous_seq}");
185                    Some(next_continuous_seq)
186                } else {
187                    // CASE 2: Gap detected. Look for the next keyframe after the gap.
188                    let keyframe = self
189                        .buffered_frames
190                        .iter()
191                        .find(|(&s, f)| s > next_continuous_seq && f.is_keyframe())
192                        .map(|(&s, _)| s);
193                    if let Some(k) = keyframe {
194                        println!(
195                            "[JB_POLL] Gap after {last_seq}. Seeking next keyframe. Found: {k}"
196                        );
197                    } else {
198                        println!("[JB_POLL] Gap after {last_seq}. No subsequent keyframe found.");
199                    }
200                    keyframe
201                }
202            } else {
203                // CASE 3: We have never decoded. We MUST start with a keyframe.
204                let keyframe = self
205                    .buffered_frames
206                    .iter()
207                    .find(|(_, f)| f.is_keyframe())
208                    .map(|(&s, _)| s);
209                if let Some(k) = keyframe {
210                    println!("[JB_POLL] Seeking first keyframe. Found: {k}");
211                } else {
212                    println!("[JB_POLL] Seeking first keyframe. None found in buffer.");
213                }
214                keyframe
215            };
216
217            if let Some(key) = next_decodable_key {
218                if let Some(frame) = self.buffered_frames.get(&key) {
219                    let time_in_buffer_ms = (current_time_ms - frame.arrival_time_ms) as f64;
220
221                    let is_ready = time_in_buffer_ms >= self.target_playout_delay_ms;
222                    println!(
223                        "[JB_POLL] Candidate {key}: Time in buffer: {time_in_buffer_ms:.2}ms, Target: {:.2}ms -> Ready: {is_ready}",
224                        self.target_playout_delay_ms
225                    );
226
227                    if is_ready {
228                        let frame_to_move = self.buffered_frames.remove(&key).unwrap();
229
230                        // If we're jumping to a keyframe to recover, drop everything before it.
231                        if frame_to_move.is_keyframe() {
232                            let is_first_frame = self.last_decoded_sequence_number.is_none();
233                            let is_gap_recovery = self
234                                .last_decoded_sequence_number
235                                .is_some_and(|last_seq| key > last_seq + 1);
236
237                            if is_first_frame || is_gap_recovery {
238                                println!(
239                                    "[JITTER_BUFFER] Keyframe {key} recovery. Dropping frames before it."
240                                );
241                                self.drop_frames_before(key);
242                            }
243                        }
244
245                        self.push_to_decoder(frame_to_move);
246                        self.last_decoded_sequence_number = Some(key);
247                        frames_were_moved = true;
248                        found_frame_to_move = true;
249                    }
250                }
251            } else {
252                println!("[JB_POLL] No decodable frame found in buffer.");
253            }
254
255            if !found_frame_to_move {
256                break;
257            }
258        }
259
260        if frames_were_moved {
261            // NOTE: No need to notify a condvar anymore. The decoder manages its own thread.
262        }
263    }
264
265    /// Pushes a single frame to the shared decodable queue.
266    fn push_to_decoder(&mut self, frame: FrameBuffer) {
267        let seq = frame.sequence_number();
268        println!("[JITTER_BUFFER] Pushing frame {seq} to decoder.");
269        self.decoder.decode(frame);
270    }
271
272    /// Checks if the jitter buffer is currently waiting for a keyframe to continue.
273    pub fn is_waiting_for_keyframe(&self) -> bool {
274        self.last_decoded_sequence_number.is_none()
275    }
276
277    /// Removes all frames from the buffer with a sequence number less than the given one.
278    fn drop_frames_before(&mut self, sequence_number: u64) {
279        let keys_to_drop: Vec<u64> = self
280            .buffered_frames
281            .keys()
282            .cloned()
283            .filter(|&k| k < sequence_number)
284            .collect();
285
286        self.dropped_frames_count += keys_to_drop.len() as u64;
287        for key in keys_to_drop {
288            println!("[JITTER_BUFFER] Dropping stale frame: {key}");
289            self.buffered_frames.remove(&key);
290        }
291    }
292
293    /// Removes all frames from the buffer. Used when a keyframe arrives and the buffer is full.
294    pub fn drop_all_frames(&mut self) {
295        let num_dropped = self.buffered_frames.len() as u64;
296        self.buffered_frames.clear();
297        self.dropped_frames_count += num_dropped;
298        println!("[JITTER_BUFFER] Dropped all {num_dropped} frames.");
299    }
300
301    /// Flushes the jitter buffer, resetting its state completely.
302    pub fn flush(&mut self) {
303        self.drop_all_frames();
304        self.last_decoded_sequence_number = None;
305        self.num_consecutive_old_frames = 0;
306        // Consider resetting jitter estimator as well if needed
307        self.jitter_estimator = JitterEstimator::new();
308    }
309
310    pub fn get_jitter_estimate_ms(&self) -> f64 {
311        self.jitter_estimator.get_jitter_estimate_ms()
312    }
313
314    pub fn get_target_playout_delay_ms(&self) -> f64 {
315        self.target_playout_delay_ms
316    }
317
318    pub fn get_dropped_frames_count(&self) -> u64 {
319        self.dropped_frames_count
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::decoder::DecodedFrame;
327    use crate::frame::{FrameType, VideoFrame};
328    use std::sync::Arc;
329    use std::sync::Mutex;
330
331    /// A mock decoder for testing purposes. It stores decoded frames in a shared Vec.
332    struct MockDecoder {
333        decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>,
334    }
335
336    // This impl is for native targets
337    #[cfg(not(target_arch = "wasm32"))]
338    impl Decodable for MockDecoder {
339        /// The decoded frame type for mock decoder in tests.
340        type Frame = crate::decoder::DecodedFrame;
341        fn new(
342            _codec: crate::decoder::VideoCodec,
343            _on_decoded_frame: Box<dyn Fn(DecodedFrame) + Send + Sync>,
344        ) -> Self {
345            panic!("Use `new_with_vec` for this mock.");
346        }
347        fn decode(&self, frame: FrameBuffer) {
348            let mut frames = self.decoded_frames.lock().unwrap();
349            frames.push(DecodedFrame {
350                sequence_number: frame.sequence_number(),
351                width: 0,
352                height: 0,
353                data: frame.frame.data.to_vec(),
354            });
355        }
356    }
357
358    // This impl is for wasm targets
359    #[cfg(target_arch = "wasm32")]
360    impl Decodable for MockDecoder {
361        /// The decoded frame type for mock decoder in tests.
362        type Frame = crate::decoder::DecodedFrame;
363        fn new(
364            _codec: crate::decoder::VideoCodec,
365            _on_decoded_frame: Box<dyn Fn(DecodedFrame)>,
366        ) -> Self {
367            panic!("Use `new_with_vec` for this mock.");
368        }
369        fn decode(&self, frame: FrameBuffer) {
370            let mut frames = self.decoded_frames.lock().unwrap();
371            frames.push(DecodedFrame {
372                sequence_number: frame.sequence_number(),
373                width: 0,
374                height: 0,
375                data: frame.frame.data.to_vec(),
376            });
377        }
378    }
379
380    impl MockDecoder {
381        fn new_with_vec(decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>) -> Self {
382            Self { decoded_frames }
383        }
384    }
385
386    /// A helper to create a JitterBuffer with a mock decoder for testing.
387    fn create_test_jitter_buffer() -> (
388        JitterBuffer<crate::decoder::DecodedFrame>,
389        Arc<Mutex<Vec<DecodedFrame>>>,
390    ) {
391        let decoded_frames = Arc::new(Mutex::new(Vec::new()));
392        let mock_decoder = Box::new(MockDecoder::new_with_vec(decoded_frames.clone()));
393        let jitter_buffer = JitterBuffer::new(mock_decoder);
394        (jitter_buffer, decoded_frames)
395    }
396
397    fn create_test_frame(seq: u64, frame_type: FrameType) -> VideoFrame {
398        VideoFrame {
399            sequence_number: seq,
400            frame_type,
401            codec: crate::frame::FrameCodec::default(),
402            data: vec![0; 10],
403            timestamp: 0.0,
404        }
405    }
406
407    #[test]
408    fn insert_in_order() {
409        let (mut jb, decoded_frames) = create_test_jitter_buffer();
410        // Playout delay requires us to simulate time passing.
411        let mut time = 1000;
412
413        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
414        time += 100; // Elapse time to overcome playout delay
415        jb.find_and_move_continuous_frames(time);
416
417        {
418            let queue = decoded_frames.lock().unwrap();
419            assert_eq!(queue.len(), 1);
420            assert_eq!(queue[0].sequence_number, 1);
421        }
422
423        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
424        time += 100;
425        jb.find_and_move_continuous_frames(time);
426
427        let queue = decoded_frames.lock().unwrap();
428        assert_eq!(queue.len(), 2);
429        assert_eq!(queue[1].sequence_number, 2);
430    }
431
432    #[test]
433    fn insert_out_of_order() {
434        let (mut jb, decoded_frames) = create_test_jitter_buffer();
435        let mut time = 1000;
436
437        // Insert 3, then 1, then 2.
438        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
439        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
440        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
441
442        // Advance time enough for all frames to pass the playout delay.
443        time += 100;
444        jb.find_and_move_continuous_frames(time);
445
446        let queue = decoded_frames.lock().unwrap();
447        assert_eq!(queue.len(), 3);
448        assert_eq!(queue[0].sequence_number, 1);
449        assert_eq!(queue[1].sequence_number, 2);
450        assert_eq!(queue[2].sequence_number, 3);
451    }
452
453    #[test]
454    fn keyframe_recovers_from_gap() {
455        let (mut jb, decoded_frames) = create_test_jitter_buffer();
456        let mut time = 1000;
457
458        // Insert 1, then 3 (KeyFrame). Frame 2 is "lost".
459        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
460        time += 100;
461        jb.find_and_move_continuous_frames(time); // Frame 1 is moved.
462
463        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
464        time += 100;
465        jb.find_and_move_continuous_frames(time); // Frame 3 is moved.
466
467        let queue = decoded_frames.lock().unwrap();
468        assert_eq!(queue.len(), 2);
469        assert_eq!(queue[0].sequence_number, 1);
470        assert_eq!(queue[1].sequence_number, 3);
471        assert_eq!(jb.last_decoded_sequence_number, Some(3));
472    }
473
474    #[test]
475    fn stale_frames_are_dropped_on_keyframe() {
476        let (mut jb, decoded_frames) = create_test_jitter_buffer();
477        let mut time = 1000;
478        assert_eq!(jb.get_dropped_frames_count(), 0);
479
480        // Insert frames that will become stale.
481        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
482        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
483        assert!(jb.buffered_frames.contains_key(&2));
484        assert!(jb.buffered_frames.contains_key(&3));
485
486        // At this point, nothing is decodable because we haven't seen a keyframe.
487        jb.find_and_move_continuous_frames(time);
488        assert!(decoded_frames.lock().unwrap().is_empty());
489
490        // Insert a keyframe that jumps over the stale frames.
491        jb.insert_frame(create_test_frame(4, FrameType::KeyFrame), time);
492
493        // Advance time to allow the keyframe to be decoded.
494        time += 100;
495        jb.find_and_move_continuous_frames(time);
496
497        // The keyframe should be ready to decode.
498        let queue = decoded_frames.lock().unwrap();
499        assert_eq!(queue.len(), 1);
500        assert_eq!(queue[0].sequence_number, 4);
501
502        // The stale frames should be gone from the internal buffer.
503        assert!(!jb.buffered_frames.contains_key(&2));
504        assert!(!jb.buffered_frames.contains_key(&3));
505
506        // The dropped frame counter should be updated.
507        assert_eq!(jb.get_dropped_frames_count(), 2);
508    }
509
510    #[test]
511    fn old_frames_are_ignored() {
512        let (mut jb, decoded_frames) = create_test_jitter_buffer();
513        let mut time = 1000;
514
515        // Decode sequence 1 and 2
516        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
517        time += 100;
518        jb.find_and_move_continuous_frames(time);
519        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
520        time += 100;
521        jb.find_and_move_continuous_frames(time);
522
523        // At this point, frames 1 and 2 should be in the queue.
524        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
525        assert_eq!(jb.last_decoded_sequence_number, Some(2));
526
527        // Now, insert an old frame (seq 1) and a current frame (seq 2).
528        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
529        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
530
531        // No new frames should have been added to the queue.
532        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
533
534        // And the internal buffer should be empty.
535        assert!(jb.buffered_frames.is_empty());
536    }
537
538    #[test]
539    fn buffer_capacity_is_enforced() {
540        let (mut jb, decoded_frames) = create_test_jitter_buffer();
541        let time = 1000;
542
543        // Fill the buffer up to its capacity. These frames are not continuous.
544        for i in 1..=MAX_BUFFER_SIZE {
545            jb.insert_frame(create_test_frame(i as u64 * 2, FrameType::DeltaFrame), time);
546        }
547
548        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
549
550        // Try to insert another delta frame. It should be rejected.
551        let next_seq = (MAX_BUFFER_SIZE + 1) as u64 * 2;
552        jb.insert_frame(create_test_frame(next_seq, FrameType::DeltaFrame), time);
553        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
554        assert!(!jb.buffered_frames.contains_key(&next_seq));
555
556        // No frames should have been moved.
557        assert_eq!(decoded_frames.lock().unwrap().len(), 0);
558
559        // Now, insert a keyframe. It should clear the buffer and insert itself.
560        let keyframe_seq = (MAX_BUFFER_SIZE + 2) as u64 * 2;
561        jb.insert_frame(create_test_frame(keyframe_seq, FrameType::KeyFrame), time);
562
563        assert_eq!(jb.buffered_frames.len(), 1);
564        assert!(jb.buffered_frames.contains_key(&keyframe_seq));
565        assert_eq!(jb.get_dropped_frames_count(), MAX_BUFFER_SIZE as u64);
566    }
567
568    #[test]
569    fn playout_delay_holds_frame() {
570        let (mut jb, decoded_frames) = create_test_jitter_buffer();
571        let mut time = 1000;
572
573        // Insert a keyframe. The initial playout delay is MIN_PLAYOUT_DELAY_MS (10ms).
574        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
575
576        // Advance time, but not enough to meet the delay.
577        time += (MIN_PLAYOUT_DELAY_MS / 2.0) as u128;
578        jb.find_and_move_continuous_frames(time);
579
580        // The frame should NOT be in the decodable queue yet.
581        assert!(decoded_frames.lock().unwrap().is_empty());
582
583        // Advance time past the minimum delay.
584        time += (MIN_PLAYOUT_DELAY_MS as u128) + 1;
585        jb.find_and_move_continuous_frames(time);
586
587        // NOW the frame should be in the queue.
588        let queue = decoded_frames.lock().unwrap();
589        assert_eq!(queue.len(), 1);
590        assert_eq!(queue[0].sequence_number, 1);
591    }
592
593    #[test]
594    fn advances_decodable_frame_on_extraction() {
595        let (mut jb, decoded_frames) = create_test_jitter_buffer();
596        let mut time = 1000;
597
598        // Insert the first frame.
599        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
600
601        // Advance time to decode it.
602        time += 100;
603        jb.find_and_move_continuous_frames(time);
604
605        // Verify only frame 1 is in the queue.
606        {
607            let queue = decoded_frames.lock().unwrap();
608            assert_eq!(queue.len(), 1, "Queue should have frame 1");
609            assert_eq!(queue[0].sequence_number, 1);
610        }
611
612        // Simulate extraction by the decoder by updating our last decoded number
613        // and clearing the queue for the next check.
614        jb.last_decoded_sequence_number = Some(1);
615        decoded_frames.lock().unwrap().clear();
616
617        // Insert the second frame.
618        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
619
620        // Advance time to decode it.
621        time += 100;
622        jb.find_and_move_continuous_frames(time);
623
624        // Verify only frame 2 is in the queue.
625        {
626            let queue = decoded_frames.lock().unwrap();
627            assert_eq!(queue.len(), 1, "Queue should have frame 2");
628            assert_eq!(queue[0].sequence_number, 2);
629        }
630
631        // Simulate extraction of frame 2.
632        jb.last_decoded_sequence_number = Some(2);
633        decoded_frames.lock().unwrap().clear();
634
635        // Insert the third frame.
636        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
637
638        // Advance time to decode it.
639        time += 100;
640        jb.find_and_move_continuous_frames(time);
641
642        // Verify only frame 3 is in the queue.
643        {
644            let queue = decoded_frames.lock().unwrap();
645            assert_eq!(queue.len(), 1, "Queue should have frame 3");
646            assert_eq!(queue[0].sequence_number, 3);
647        }
648    }
649
650    #[test]
651    fn complex_reordering_pattern() {
652        let (mut jb, decoded_frames) = create_test_jitter_buffer();
653        let mut time = 1000;
654
655        // Insert odd frames first
656        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
657        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
658        jb.insert_frame(create_test_frame(5, FrameType::DeltaFrame), time);
659
660        // Then insert even frames
661        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
662        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
663
664        // Advance time to allow all to be decoded
665        time += 100;
666        jb.find_and_move_continuous_frames(time);
667
668        let queue = decoded_frames.lock().unwrap();
669        assert_eq!(queue.len(), 5);
670        for i in 0..5 {
671            assert_eq!(queue[i].sequence_number, (i + 1) as u64);
672        }
673    }
674
675    #[test]
676    fn in_order_keyframe_does_not_disrupt_flow() {
677        let (mut jb, decoded_frames) = create_test_jitter_buffer();
678        let mut time = 1000;
679
680        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
681        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
682
683        time += 100;
684        jb.find_and_move_continuous_frames(time);
685        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
686        assert_eq!(jb.get_dropped_frames_count(), 0);
687
688        // Insert another Keyframe, but it's in order, so no frames should be dropped.
689        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
690
691        time += 100;
692        jb.find_and_move_continuous_frames(time);
693
694        let queue = decoded_frames.lock().unwrap();
695        assert_eq!(queue.len(), 3, "All three frames should be in the queue");
696        assert_eq!(queue[2].sequence_number, 3);
697        assert_eq!(
698            jb.get_dropped_frames_count(),
699            0,
700            "No frames should have been dropped"
701        );
702    }
703
704    #[test]
705    fn sequence_starting_at_high_number() {
706        let (mut jb, decoded_frames) = create_test_jitter_buffer();
707        let mut time = 1000;
708        let start_seq = 10000;
709
710        // Insert frames starting from a high sequence number
711        jb.insert_frame(create_test_frame(start_seq, FrameType::KeyFrame), time);
712        jb.insert_frame(
713            create_test_frame(start_seq + 2, FrameType::DeltaFrame),
714            time,
715        );
716        jb.insert_frame(
717            create_test_frame(start_seq + 1, FrameType::DeltaFrame),
718            time,
719        );
720
721        // Advance time enough for all frames to pass the playout delay.
722        time += 100;
723        jb.find_and_move_continuous_frames(time);
724
725        let queue = decoded_frames.lock().unwrap();
726        assert_eq!(queue.len(), 3);
727        assert_eq!(queue[0].sequence_number, start_seq);
728        assert_eq!(queue[1].sequence_number, start_seq + 1);
729        assert_eq!(queue[2].sequence_number, start_seq + 2);
730    }
731
732    #[test]
733    fn flush_on_too_many_consecutive_old_frames() {
734        let (mut jb, decoded_frames) = create_test_jitter_buffer();
735        let mut time = 1000;
736
737        // Decode sequence 1 and 2
738        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
739        time += 100;
740        jb.find_and_move_continuous_frames(time);
741        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
742        time += 100;
743        jb.find_and_move_continuous_frames(time);
744        assert_eq!(jb.last_decoded_sequence_number, Some(2));
745        assert_eq!(jb.buffered_frames.len(), 0);
746
747        // Insert a frame into the buffer that won't be decoded
748        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
749        assert_eq!(jb.buffered_frames.len(), 1);
750
751        // Send a stream of old packets
752        for _ in 0..=MAX_CONSECUTIVE_OLD_FRAMES {
753            // Send old frame with sequence number 1
754            jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
755        }
756
757        // The buffer should now be flushed
758        assert_eq!(
759            jb.last_decoded_sequence_number, None,
760            "Last decoded sequence number should be reset"
761        );
762        assert_eq!(
763            jb.buffered_frames.len(),
764            0,
765            "Buffer should be empty after flush"
766        );
767        assert_eq!(
768            jb.num_consecutive_old_frames, 0,
769            "Consecutive old frames counter should be reset"
770        );
771
772        // It should now be waiting for a keyframe again
773        assert!(jb.is_waiting_for_keyframe());
774
775        // Verify that even if we send another delta frame, it doesn't get decoded
776        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
777        time += 100;
778        jb.find_and_move_continuous_frames(time);
779        assert!(decoded_frames.lock().unwrap().len() <= 2); // Should not have increased
780    }
781}