videocall_codecs/
jitter_buffer.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19//! The JitterBuffer, which reorders, buffers, and prepares frames for the decoder.
20
21use crate::decoder::Decodable;
22use crate::frame::{FrameBuffer, FrameType, VideoFrame};
23use crate::jitter_estimator::JitterEstimator;
24use std::collections::BTreeMap;
25
26// --- Playout Delay Constants ---
27/// The minimum delay we will allow. Prevents the buffer from becoming completely empty.
28const MIN_PLAYOUT_DELAY_MS: f64 = 10.0;
29/// The maximum delay. Prevents the delay from growing indefinitely.
30const MAX_PLAYOUT_DELAY_MS: f64 = 500.0;
31/// A multiplier applied to the jitter estimate to provide a safety margin.
32/// A value of 3.0 means we buffer enough to handle jitter up to 3x the running average.
33const JITTER_MULTIPLIER: f64 = 3.0;
34/// A smoothing factor for delay updates to prevent rapid, jarring changes.
35const DELAY_SMOOTHING_FACTOR: f64 = 0.99;
36
37/// The maximum number of frames the buffer will hold before rejecting new ones.
38const MAX_BUFFER_SIZE: usize = 200;
39// From libwebrtc's jitter_buffer_common.h
40const MAX_CONSECUTIVE_OLD_FRAMES: u64 = 300;
41/// If an incoming keyframe is this many sequence numbers behind the last decoded frame, we assume
42/// the stream restarted (e.g., camera switch) and flush immediately. Smaller rollbacks are treated
43/// as harmless reordering.
44const STREAM_RESTART_BACKTRACK_THRESHOLD: u64 = 30;
45
46pub struct JitterBuffer<T> {
47    /// Frames that have been received but are not yet continuous with the last decoded frame.
48    /// A BTreeMap is used to keep them sorted by sequence number automatically.
49    buffered_frames: BTreeMap<u64, FrameBuffer>,
50
51    /// The sequence number of the last frame that was sent to the decoder.
52    last_decoded_sequence_number: Option<u64>,
53
54    /// The jitter estimator for monitoring network conditions.
55    jitter_estimator: JitterEstimator,
56
57    /// The current adaptive target for playout delay, in milliseconds.
58    target_playout_delay_ms: f64,
59
60    /// A counter for frames that were dropped due to being stale.
61    dropped_frames_count: u64,
62
63    /// A counter for consecutive old frames to detect stream corruption.
64    num_consecutive_old_frames: u64,
65
66    // --- Decoder Interface ---
67    /// The abstract decoder that will receive frames ready for decoding.
68    decoder: Box<dyn Decodable<Frame = T>>,
69}
70
71impl<T> JitterBuffer<T> {
72    pub fn new(decoder: Box<dyn Decodable<Frame = T>>) -> Self {
73        Self {
74            buffered_frames: BTreeMap::new(),
75            last_decoded_sequence_number: None,
76            jitter_estimator: JitterEstimator::new(),
77            target_playout_delay_ms: MIN_PLAYOUT_DELAY_MS,
78            dropped_frames_count: 0,
79            num_consecutive_old_frames: 0,
80            decoder,
81        }
82    }
83
84    /// Returns the current number of frames buffered and waiting in the jitter buffer.
85    pub fn buffered_frames_len(&self) -> usize {
86        self.buffered_frames.len()
87    }
88
89    /// The main entry point for a new frame arriving from the network.
90    pub fn insert_frame(&mut self, frame: VideoFrame, arrival_time_ms: u128) {
91        let seq = frame.sequence_number;
92        println!("[JITTER_BUFFER] Inserting frame: {seq}");
93
94        // --- Pre-insertion checks ---
95        // 1. Ignore frames that are too old.
96        if let Some(last_decoded) = self.last_decoded_sequence_number {
97            if seq <= last_decoded {
98                // Special case: if the old frame is a KEYFRAME, it likely indicates the sender has
99                // restarted (e.g., camera switch). Flush immediately so we can start decoding from
100                // this new keyframe without waiting for the old-frame counter threshold.
101                if frame.frame_type == FrameType::KeyFrame
102                    && last_decoded.saturating_sub(seq) > STREAM_RESTART_BACKTRACK_THRESHOLD
103                {
104                    println!(
105                        "[JITTER_BUFFER] Detected keyframe with older sequence ({seq} <= {last_decoded}). Assuming stream restart – flushing buffer."
106                    );
107                    self.flush();
108                } else {
109                    println!("[JITTER_BUFFER] Ignoring old frame: {seq}");
110                    self.num_consecutive_old_frames += 1;
111                    if self.num_consecutive_old_frames > MAX_CONSECUTIVE_OLD_FRAMES {
112                        println!(
113                            "[JITTER_BUFFER] Received {} consecutive old frames. Flushing buffer.",
114                            self.num_consecutive_old_frames
115                        );
116                        self.flush();
117                    }
118                }
119                return;
120            }
121        }
122
123        // If we received a valid frame, reset the counter.
124        self.num_consecutive_old_frames = 0;
125
126        // 2. Check if the buffer is full.
127        if self.buffered_frames.len() >= MAX_BUFFER_SIZE {
128            // Allow a keyframe to clear the buffer if it's full.
129            if frame.frame_type == FrameType::KeyFrame {
130                println!("[JITTER_BUFFER] Buffer full, but received keyframe. Clearing buffer.");
131                self.drop_all_frames();
132            } else {
133                println!("[JITTER_BUFFER] Buffer full. Rejecting frame: {seq}");
134                return; // Reject the frame.
135            }
136        }
137
138        println!("[JITTER_BUFFER] Received frame: {seq}");
139
140        self.jitter_estimator.update_estimate(seq, arrival_time_ms);
141        self.update_target_playout_delay();
142
143        let fb = FrameBuffer::new(frame, arrival_time_ms);
144        self.buffered_frames.insert(seq, fb);
145
146        self.find_and_move_continuous_frames(arrival_time_ms);
147    }
148
149    /// Updates the target playout delay based on the current jitter estimate.
150    fn update_target_playout_delay(&mut self) {
151        let jitter_estimate = self.jitter_estimator.get_jitter_estimate_ms();
152
153        // Calculate the raw target delay with a safety margin.
154        let raw_target = jitter_estimate * JITTER_MULTIPLIER;
155
156        // Clamp the target to our defined min/max bounds.
157        let clamped_target = raw_target.clamp(MIN_PLAYOUT_DELAY_MS, MAX_PLAYOUT_DELAY_MS);
158
159        // Smooth the transition to the new target to avoid sudden changes.
160        self.target_playout_delay_ms = (self.target_playout_delay_ms * DELAY_SMOOTHING_FACTOR)
161            + (clamped_target * (1.0 - DELAY_SMOOTHING_FACTOR));
162    }
163
164    /// Checks the buffered frames and moves any continuous frames to the decodable queue.
165    pub fn find_and_move_continuous_frames(&mut self, current_time_ms: u128) {
166        let mut frames_were_moved = false;
167
168        println!(
169            "[JB_POLL] Checking buffer. Last decoded: {:?}, Buffer size: {}, Target delay: {:.2}ms",
170            self.last_decoded_sequence_number,
171            self.buffered_frames.len(),
172            self.target_playout_delay_ms
173        );
174
175        loop {
176            let mut found_frame_to_move = false;
177
178            let next_decodable_key: Option<u64> = if let Some(last_seq) =
179                self.last_decoded_sequence_number
180            {
181                // CASE 1: We are in a continuous stream. Look for the next frame.
182                let next_continuous_seq = last_seq + 1;
183                if self.buffered_frames.contains_key(&next_continuous_seq) {
184                    println!("[JB_POLL] Seeking next continuous frame: {next_continuous_seq}");
185                    Some(next_continuous_seq)
186                } else {
187                    // CASE 2: Gap detected. Look for the next keyframe after the gap.
188                    let keyframe = self
189                        .buffered_frames
190                        .iter()
191                        .find(|(&s, f)| s > next_continuous_seq && f.is_keyframe())
192                        .map(|(&s, _)| s);
193                    if let Some(k) = keyframe {
194                        println!(
195                            "[JB_POLL] Gap after {last_seq}. Seeking next keyframe. Found: {k}"
196                        );
197                    } else {
198                        println!("[JB_POLL] Gap after {last_seq}. No subsequent keyframe found.");
199                    }
200                    keyframe
201                }
202            } else {
203                // CASE 3: We have never decoded. We MUST start with a keyframe.
204                let keyframe = self
205                    .buffered_frames
206                    .iter()
207                    .find(|(_, f)| f.is_keyframe())
208                    .map(|(&s, _)| s);
209                if let Some(k) = keyframe {
210                    println!("[JB_POLL] Seeking first keyframe. Found: {k}");
211                } else {
212                    println!("[JB_POLL] Seeking first keyframe. None found in buffer.");
213                }
214                keyframe
215            };
216
217            if let Some(key) = next_decodable_key {
218                if let Some(frame) = self.buffered_frames.get(&key) {
219                    let time_in_buffer_ms = (current_time_ms - frame.arrival_time_ms) as f64;
220
221                    let is_ready = time_in_buffer_ms >= self.target_playout_delay_ms;
222                    println!(
223                        "[JB_POLL] Candidate {key}: Time in buffer: {time_in_buffer_ms:.2}ms, Target: {:.2}ms -> Ready: {is_ready}",
224                        self.target_playout_delay_ms
225                    );
226
227                    if is_ready {
228                        let frame_to_move = self.buffered_frames.remove(&key).unwrap();
229
230                        // If we're jumping to a keyframe to recover, drop everything before it.
231                        if frame_to_move.is_keyframe() {
232                            let is_first_frame = self.last_decoded_sequence_number.is_none();
233                            let is_gap_recovery = self
234                                .last_decoded_sequence_number
235                                .is_some_and(|last_seq| key > last_seq + 1);
236
237                            if is_first_frame || is_gap_recovery {
238                                println!(
239                                    "[JITTER_BUFFER] Keyframe {key} recovery. Dropping frames before it."
240                                );
241                                self.drop_frames_before(key);
242                            }
243                        }
244
245                        self.push_to_decoder(frame_to_move);
246                        self.last_decoded_sequence_number = Some(key);
247                        frames_were_moved = true;
248                        found_frame_to_move = true;
249                    }
250                }
251            } else {
252                println!("[JB_POLL] No decodable frame found in buffer.");
253            }
254
255            if !found_frame_to_move {
256                break;
257            }
258        }
259
260        if frames_were_moved {
261            // NOTE: No need to notify a condvar anymore. The decoder manages its own thread.
262        }
263    }
264
265    /// Pushes a single frame to the shared decodable queue.
266    fn push_to_decoder(&mut self, frame: FrameBuffer) {
267        let seq = frame.sequence_number();
268        println!("[JITTER_BUFFER] Pushing frame {seq} to decoder.");
269        self.decoder.decode(frame);
270    }
271
272    /// Checks if the jitter buffer is currently waiting for a keyframe to continue.
273    pub fn is_waiting_for_keyframe(&self) -> bool {
274        self.last_decoded_sequence_number.is_none()
275    }
276
277    /// Removes all frames from the buffer with a sequence number less than the given one.
278    fn drop_frames_before(&mut self, sequence_number: u64) {
279        let keys_to_drop: Vec<u64> = self
280            .buffered_frames
281            .keys()
282            .cloned()
283            .filter(|&k| k < sequence_number)
284            .collect();
285
286        self.dropped_frames_count += keys_to_drop.len() as u64;
287        for key in keys_to_drop {
288            println!("[JITTER_BUFFER] Dropping stale frame: {key}");
289            self.buffered_frames.remove(&key);
290        }
291    }
292
293    /// Removes all frames from the buffer. Used when a keyframe arrives and the buffer is full.
294    pub fn drop_all_frames(&mut self) {
295        let num_dropped = self.buffered_frames.len() as u64;
296        self.buffered_frames.clear();
297        self.dropped_frames_count += num_dropped;
298        println!("[JITTER_BUFFER] Dropped all {num_dropped} frames.");
299    }
300
301    /// Flushes the jitter buffer, resetting its state completely.
302    pub fn flush(&mut self) {
303        self.drop_all_frames();
304        self.last_decoded_sequence_number = None;
305        self.num_consecutive_old_frames = 0;
306        // Consider resetting jitter estimator as well if needed
307        self.jitter_estimator = JitterEstimator::new();
308    }
309
310    pub fn get_jitter_estimate_ms(&self) -> f64 {
311        self.jitter_estimator.get_jitter_estimate_ms()
312    }
313
314    pub fn get_target_playout_delay_ms(&self) -> f64 {
315        self.target_playout_delay_ms
316    }
317
318    pub fn get_dropped_frames_count(&self) -> u64 {
319        self.dropped_frames_count
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::decoder::DecodedFrame;
327    use crate::frame::{FrameType, VideoFrame};
328    use std::sync::Arc;
329    use std::sync::Mutex;
330
331    /// A mock decoder for testing purposes. It stores decoded frames in a shared Vec.
332    struct MockDecoder {
333        decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>,
334    }
335
336    // This impl is for native targets
337    #[cfg(not(target_arch = "wasm32"))]
338    impl Decodable for MockDecoder {
339        /// The decoded frame type for mock decoder in tests.
340        type Frame = crate::decoder::DecodedFrame;
341        fn new(
342            _codec: crate::decoder::VideoCodec,
343            _on_decoded_frame: Box<dyn Fn(DecodedFrame) + Send + Sync>,
344        ) -> Self {
345            panic!("Use `new_with_vec` for this mock.");
346        }
347        fn decode(&self, frame: FrameBuffer) {
348            let mut frames = self.decoded_frames.lock().unwrap();
349            frames.push(DecodedFrame {
350                sequence_number: frame.sequence_number(),
351                width: 0,
352                height: 0,
353                data: frame.frame.data.to_vec(),
354            });
355        }
356    }
357
358    // This impl is for wasm targets
359    #[cfg(target_arch = "wasm32")]
360    impl Decodable for MockDecoder {
361        /// The decoded frame type for mock decoder in tests.
362        type Frame = crate::decoder::DecodedFrame;
363        fn new(
364            _codec: crate::decoder::VideoCodec,
365            _on_decoded_frame: Box<dyn Fn(DecodedFrame)>,
366        ) -> Self {
367            panic!("Use `new_with_vec` for this mock.");
368        }
369        fn decode(&self, frame: FrameBuffer) {
370            let mut frames = self.decoded_frames.lock().unwrap();
371            frames.push(DecodedFrame {
372                sequence_number: frame.sequence_number(),
373                width: 0,
374                height: 0,
375                data: frame.frame.data.to_vec(),
376            });
377        }
378    }
379
380    impl MockDecoder {
381        fn new_with_vec(decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>) -> Self {
382            Self { decoded_frames }
383        }
384    }
385
386    /// A helper to create a JitterBuffer with a mock decoder for testing.
387    fn create_test_jitter_buffer() -> (
388        JitterBuffer<crate::decoder::DecodedFrame>,
389        Arc<Mutex<Vec<DecodedFrame>>>,
390    ) {
391        let decoded_frames = Arc::new(Mutex::new(Vec::new()));
392        let mock_decoder = Box::new(MockDecoder::new_with_vec(decoded_frames.clone()));
393        let jitter_buffer = JitterBuffer::new(mock_decoder);
394        (jitter_buffer, decoded_frames)
395    }
396
397    fn create_test_frame(seq: u64, frame_type: FrameType) -> VideoFrame {
398        VideoFrame {
399            sequence_number: seq,
400            frame_type,
401            data: vec![0; 10],
402            timestamp: 0.0,
403        }
404    }
405
406    #[test]
407    fn insert_in_order() {
408        let (mut jb, decoded_frames) = create_test_jitter_buffer();
409        // Playout delay requires us to simulate time passing.
410        let mut time = 1000;
411
412        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
413        time += 100; // Elapse time to overcome playout delay
414        jb.find_and_move_continuous_frames(time);
415
416        {
417            let queue = decoded_frames.lock().unwrap();
418            assert_eq!(queue.len(), 1);
419            assert_eq!(queue[0].sequence_number, 1);
420        }
421
422        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
423        time += 100;
424        jb.find_and_move_continuous_frames(time);
425
426        let queue = decoded_frames.lock().unwrap();
427        assert_eq!(queue.len(), 2);
428        assert_eq!(queue[1].sequence_number, 2);
429    }
430
431    #[test]
432    fn insert_out_of_order() {
433        let (mut jb, decoded_frames) = create_test_jitter_buffer();
434        let mut time = 1000;
435
436        // Insert 3, then 1, then 2.
437        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
438        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
439        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
440
441        // Advance time enough for all frames to pass the playout delay.
442        time += 100;
443        jb.find_and_move_continuous_frames(time);
444
445        let queue = decoded_frames.lock().unwrap();
446        assert_eq!(queue.len(), 3);
447        assert_eq!(queue[0].sequence_number, 1);
448        assert_eq!(queue[1].sequence_number, 2);
449        assert_eq!(queue[2].sequence_number, 3);
450    }
451
452    #[test]
453    fn keyframe_recovers_from_gap() {
454        let (mut jb, decoded_frames) = create_test_jitter_buffer();
455        let mut time = 1000;
456
457        // Insert 1, then 3 (KeyFrame). Frame 2 is "lost".
458        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
459        time += 100;
460        jb.find_and_move_continuous_frames(time); // Frame 1 is moved.
461
462        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
463        time += 100;
464        jb.find_and_move_continuous_frames(time); // Frame 3 is moved.
465
466        let queue = decoded_frames.lock().unwrap();
467        assert_eq!(queue.len(), 2);
468        assert_eq!(queue[0].sequence_number, 1);
469        assert_eq!(queue[1].sequence_number, 3);
470        assert_eq!(jb.last_decoded_sequence_number, Some(3));
471    }
472
473    #[test]
474    fn stale_frames_are_dropped_on_keyframe() {
475        let (mut jb, decoded_frames) = create_test_jitter_buffer();
476        let mut time = 1000;
477        assert_eq!(jb.get_dropped_frames_count(), 0);
478
479        // Insert frames that will become stale.
480        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
481        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
482        assert!(jb.buffered_frames.contains_key(&2));
483        assert!(jb.buffered_frames.contains_key(&3));
484
485        // At this point, nothing is decodable because we haven't seen a keyframe.
486        jb.find_and_move_continuous_frames(time);
487        assert!(decoded_frames.lock().unwrap().is_empty());
488
489        // Insert a keyframe that jumps over the stale frames.
490        jb.insert_frame(create_test_frame(4, FrameType::KeyFrame), time);
491
492        // Advance time to allow the keyframe to be decoded.
493        time += 100;
494        jb.find_and_move_continuous_frames(time);
495
496        // The keyframe should be ready to decode.
497        let queue = decoded_frames.lock().unwrap();
498        assert_eq!(queue.len(), 1);
499        assert_eq!(queue[0].sequence_number, 4);
500
501        // The stale frames should be gone from the internal buffer.
502        assert!(!jb.buffered_frames.contains_key(&2));
503        assert!(!jb.buffered_frames.contains_key(&3));
504
505        // The dropped frame counter should be updated.
506        assert_eq!(jb.get_dropped_frames_count(), 2);
507    }
508
509    #[test]
510    fn old_frames_are_ignored() {
511        let (mut jb, decoded_frames) = create_test_jitter_buffer();
512        let mut time = 1000;
513
514        // Decode sequence 1 and 2
515        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
516        time += 100;
517        jb.find_and_move_continuous_frames(time);
518        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
519        time += 100;
520        jb.find_and_move_continuous_frames(time);
521
522        // At this point, frames 1 and 2 should be in the queue.
523        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
524        assert_eq!(jb.last_decoded_sequence_number, Some(2));
525
526        // Now, insert an old frame (seq 1) and a current frame (seq 2).
527        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
528        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
529
530        // No new frames should have been added to the queue.
531        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
532
533        // And the internal buffer should be empty.
534        assert!(jb.buffered_frames.is_empty());
535    }
536
537    #[test]
538    fn buffer_capacity_is_enforced() {
539        let (mut jb, decoded_frames) = create_test_jitter_buffer();
540        let time = 1000;
541
542        // Fill the buffer up to its capacity. These frames are not continuous.
543        for i in 1..=MAX_BUFFER_SIZE {
544            jb.insert_frame(create_test_frame(i as u64 * 2, FrameType::DeltaFrame), time);
545        }
546
547        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
548
549        // Try to insert another delta frame. It should be rejected.
550        let next_seq = (MAX_BUFFER_SIZE + 1) as u64 * 2;
551        jb.insert_frame(create_test_frame(next_seq, FrameType::DeltaFrame), time);
552        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
553        assert!(!jb.buffered_frames.contains_key(&next_seq));
554
555        // No frames should have been moved.
556        assert_eq!(decoded_frames.lock().unwrap().len(), 0);
557
558        // Now, insert a keyframe. It should clear the buffer and insert itself.
559        let keyframe_seq = (MAX_BUFFER_SIZE + 2) as u64 * 2;
560        jb.insert_frame(create_test_frame(keyframe_seq, FrameType::KeyFrame), time);
561
562        assert_eq!(jb.buffered_frames.len(), 1);
563        assert!(jb.buffered_frames.contains_key(&keyframe_seq));
564        assert_eq!(jb.get_dropped_frames_count(), MAX_BUFFER_SIZE as u64);
565    }
566
567    #[test]
568    fn playout_delay_holds_frame() {
569        let (mut jb, decoded_frames) = create_test_jitter_buffer();
570        let mut time = 1000;
571
572        // Insert a keyframe. The initial playout delay is MIN_PLAYOUT_DELAY_MS (10ms).
573        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
574
575        // Advance time, but not enough to meet the delay.
576        time += (MIN_PLAYOUT_DELAY_MS / 2.0) as u128;
577        jb.find_and_move_continuous_frames(time);
578
579        // The frame should NOT be in the decodable queue yet.
580        assert!(decoded_frames.lock().unwrap().is_empty());
581
582        // Advance time past the minimum delay.
583        time += (MIN_PLAYOUT_DELAY_MS as u128) + 1;
584        jb.find_and_move_continuous_frames(time);
585
586        // NOW the frame should be in the queue.
587        let queue = decoded_frames.lock().unwrap();
588        assert_eq!(queue.len(), 1);
589        assert_eq!(queue[0].sequence_number, 1);
590    }
591
592    #[test]
593    fn advances_decodable_frame_on_extraction() {
594        let (mut jb, decoded_frames) = create_test_jitter_buffer();
595        let mut time = 1000;
596
597        // Insert the first frame.
598        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
599
600        // Advance time to decode it.
601        time += 100;
602        jb.find_and_move_continuous_frames(time);
603
604        // Verify only frame 1 is in the queue.
605        {
606            let queue = decoded_frames.lock().unwrap();
607            assert_eq!(queue.len(), 1, "Queue should have frame 1");
608            assert_eq!(queue[0].sequence_number, 1);
609        }
610
611        // Simulate extraction by the decoder by updating our last decoded number
612        // and clearing the queue for the next check.
613        jb.last_decoded_sequence_number = Some(1);
614        decoded_frames.lock().unwrap().clear();
615
616        // Insert the second frame.
617        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
618
619        // Advance time to decode it.
620        time += 100;
621        jb.find_and_move_continuous_frames(time);
622
623        // Verify only frame 2 is in the queue.
624        {
625            let queue = decoded_frames.lock().unwrap();
626            assert_eq!(queue.len(), 1, "Queue should have frame 2");
627            assert_eq!(queue[0].sequence_number, 2);
628        }
629
630        // Simulate extraction of frame 2.
631        jb.last_decoded_sequence_number = Some(2);
632        decoded_frames.lock().unwrap().clear();
633
634        // Insert the third frame.
635        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
636
637        // Advance time to decode it.
638        time += 100;
639        jb.find_and_move_continuous_frames(time);
640
641        // Verify only frame 3 is in the queue.
642        {
643            let queue = decoded_frames.lock().unwrap();
644            assert_eq!(queue.len(), 1, "Queue should have frame 3");
645            assert_eq!(queue[0].sequence_number, 3);
646        }
647    }
648
649    #[test]
650    fn complex_reordering_pattern() {
651        let (mut jb, decoded_frames) = create_test_jitter_buffer();
652        let mut time = 1000;
653
654        // Insert odd frames first
655        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
656        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
657        jb.insert_frame(create_test_frame(5, FrameType::DeltaFrame), time);
658
659        // Then insert even frames
660        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
661        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
662
663        // Advance time to allow all to be decoded
664        time += 100;
665        jb.find_and_move_continuous_frames(time);
666
667        let queue = decoded_frames.lock().unwrap();
668        assert_eq!(queue.len(), 5);
669        for i in 0..5 {
670            assert_eq!(queue[i].sequence_number, (i + 1) as u64);
671        }
672    }
673
674    #[test]
675    fn in_order_keyframe_does_not_disrupt_flow() {
676        let (mut jb, decoded_frames) = create_test_jitter_buffer();
677        let mut time = 1000;
678
679        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
680        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
681
682        time += 100;
683        jb.find_and_move_continuous_frames(time);
684        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
685        assert_eq!(jb.get_dropped_frames_count(), 0);
686
687        // Insert another Keyframe, but it's in order, so no frames should be dropped.
688        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
689
690        time += 100;
691        jb.find_and_move_continuous_frames(time);
692
693        let queue = decoded_frames.lock().unwrap();
694        assert_eq!(queue.len(), 3, "All three frames should be in the queue");
695        assert_eq!(queue[2].sequence_number, 3);
696        assert_eq!(
697            jb.get_dropped_frames_count(),
698            0,
699            "No frames should have been dropped"
700        );
701    }
702
703    #[test]
704    fn sequence_starting_at_high_number() {
705        let (mut jb, decoded_frames) = create_test_jitter_buffer();
706        let mut time = 1000;
707        let start_seq = 10000;
708
709        // Insert frames starting from a high sequence number
710        jb.insert_frame(create_test_frame(start_seq, FrameType::KeyFrame), time);
711        jb.insert_frame(
712            create_test_frame(start_seq + 2, FrameType::DeltaFrame),
713            time,
714        );
715        jb.insert_frame(
716            create_test_frame(start_seq + 1, FrameType::DeltaFrame),
717            time,
718        );
719
720        // Advance time enough for all frames to pass the playout delay.
721        time += 100;
722        jb.find_and_move_continuous_frames(time);
723
724        let queue = decoded_frames.lock().unwrap();
725        assert_eq!(queue.len(), 3);
726        assert_eq!(queue[0].sequence_number, start_seq);
727        assert_eq!(queue[1].sequence_number, start_seq + 1);
728        assert_eq!(queue[2].sequence_number, start_seq + 2);
729    }
730
731    #[test]
732    fn flush_on_too_many_consecutive_old_frames() {
733        let (mut jb, decoded_frames) = create_test_jitter_buffer();
734        let mut time = 1000;
735
736        // Decode sequence 1 and 2
737        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
738        time += 100;
739        jb.find_and_move_continuous_frames(time);
740        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
741        time += 100;
742        jb.find_and_move_continuous_frames(time);
743        assert_eq!(jb.last_decoded_sequence_number, Some(2));
744        assert_eq!(jb.buffered_frames.len(), 0);
745
746        // Insert a frame into the buffer that won't be decoded
747        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
748        assert_eq!(jb.buffered_frames.len(), 1);
749
750        // Send a stream of old packets
751        for _ in 0..=MAX_CONSECUTIVE_OLD_FRAMES {
752            // Send old frame with sequence number 1
753            jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
754        }
755
756        // The buffer should now be flushed
757        assert_eq!(
758            jb.last_decoded_sequence_number, None,
759            "Last decoded sequence number should be reset"
760        );
761        assert_eq!(
762            jb.buffered_frames.len(),
763            0,
764            "Buffer should be empty after flush"
765        );
766        assert_eq!(
767            jb.num_consecutive_old_frames, 0,
768            "Consecutive old frames counter should be reset"
769        );
770
771        // It should now be waiting for a keyframe again
772        assert!(jb.is_waiting_for_keyframe());
773
774        // Verify that even if we send another delta frame, it doesn't get decoded
775        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
776        time += 100;
777        jb.find_and_move_continuous_frames(time);
778        assert!(decoded_frames.lock().unwrap().len() <= 2); // Should not have increased
779    }
780}