videocall_codecs/
jitter_buffer.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19//! The JitterBuffer, which reorders, buffers, and prepares frames for the decoder.
20
21use crate::decoder::Decodable;
22use crate::frame::{FrameBuffer, FrameType, VideoFrame};
23use crate::jitter_estimator::JitterEstimator;
24use std::collections::BTreeMap;
25
26// --- Playout Delay Constants ---
27/// The minimum delay we will allow. Prevents the buffer from becoming completely empty.
28const MIN_PLAYOUT_DELAY_MS: f64 = 10.0;
29/// The maximum delay. Prevents the delay from growing indefinitely.
30const MAX_PLAYOUT_DELAY_MS: f64 = 500.0;
31/// A multiplier applied to the jitter estimate to provide a safety margin.
32/// A value of 3.0 means we buffer enough to handle jitter up to 3x the running average.
33const JITTER_MULTIPLIER: f64 = 3.0;
34/// A smoothing factor for delay updates to prevent rapid, jarring changes.
35const DELAY_SMOOTHING_FACTOR: f64 = 0.99;
36
37/// The maximum number of frames the buffer will hold before rejecting new ones.
38const MAX_BUFFER_SIZE: usize = 200;
39// From libwebrtc's jitter_buffer_common.h
40const MAX_CONSECUTIVE_OLD_FRAMES: u64 = 300;
41/// If an incoming keyframe is this many sequence numbers behind the last decoded frame, we assume
42/// the stream restarted (e.g., camera switch) and flush immediately. Smaller rollbacks are treated
43/// as harmless reordering.
44const STREAM_RESTART_BACKTRACK_THRESHOLD: u64 = 30;
45
46pub struct JitterBuffer<T> {
47    /// Frames that have been received but are not yet continuous with the last decoded frame.
48    /// A BTreeMap is used to keep them sorted by sequence number automatically.
49    buffered_frames: BTreeMap<u64, FrameBuffer>,
50
51    /// The sequence number of the last frame that was sent to the decoder.
52    last_decoded_sequence_number: Option<u64>,
53
54    /// The jitter estimator for monitoring network conditions.
55    jitter_estimator: JitterEstimator,
56
57    /// The current adaptive target for playout delay, in milliseconds.
58    target_playout_delay_ms: f64,
59
60    /// A counter for frames that were dropped due to being stale.
61    dropped_frames_count: u64,
62
63    /// A counter for consecutive old frames to detect stream corruption.
64    num_consecutive_old_frames: u64,
65
66    // --- Decoder Interface ---
67    /// The abstract decoder that will receive frames ready for decoding.
68    decoder: Box<dyn Decodable<Frame = T>>,
69}
70
71impl<T> JitterBuffer<T> {
72    pub fn new(decoder: Box<dyn Decodable<Frame = T>>) -> Self {
73        Self {
74            buffered_frames: BTreeMap::new(),
75            last_decoded_sequence_number: None,
76            jitter_estimator: JitterEstimator::new(),
77            target_playout_delay_ms: MIN_PLAYOUT_DELAY_MS,
78            dropped_frames_count: 0,
79            num_consecutive_old_frames: 0,
80            decoder,
81        }
82    }
83
84    /// The main entry point for a new frame arriving from the network.
85    pub fn insert_frame(&mut self, frame: VideoFrame, arrival_time_ms: u128) {
86        let seq = frame.sequence_number;
87        println!("[JITTER_BUFFER] Inserting frame: {seq}");
88
89        // --- Pre-insertion checks ---
90        // 1. Ignore frames that are too old.
91        if let Some(last_decoded) = self.last_decoded_sequence_number {
92            if seq <= last_decoded {
93                // Special case: if the old frame is a KEYFRAME, it likely indicates the sender has
94                // restarted (e.g., camera switch). Flush immediately so we can start decoding from
95                // this new keyframe without waiting for the old-frame counter threshold.
96                if frame.frame_type == FrameType::KeyFrame
97                    && last_decoded.saturating_sub(seq) > STREAM_RESTART_BACKTRACK_THRESHOLD
98                {
99                    println!(
100                        "[JITTER_BUFFER] Detected keyframe with older sequence ({seq} <= {last_decoded}). Assuming stream restart – flushing buffer."
101                    );
102                    self.flush();
103                } else {
104                    println!("[JITTER_BUFFER] Ignoring old frame: {seq}");
105                    self.num_consecutive_old_frames += 1;
106                    if self.num_consecutive_old_frames > MAX_CONSECUTIVE_OLD_FRAMES {
107                        println!(
108                            "[JITTER_BUFFER] Received {} consecutive old frames. Flushing buffer.",
109                            self.num_consecutive_old_frames
110                        );
111                        self.flush();
112                    }
113                }
114                return;
115            }
116        }
117
118        // If we received a valid frame, reset the counter.
119        self.num_consecutive_old_frames = 0;
120
121        // 2. Check if the buffer is full.
122        if self.buffered_frames.len() >= MAX_BUFFER_SIZE {
123            // Allow a keyframe to clear the buffer if it's full.
124            if frame.frame_type == FrameType::KeyFrame {
125                println!("[JITTER_BUFFER] Buffer full, but received keyframe. Clearing buffer.");
126                self.drop_all_frames();
127            } else {
128                println!("[JITTER_BUFFER] Buffer full. Rejecting frame: {seq}");
129                return; // Reject the frame.
130            }
131        }
132
133        println!("[JITTER_BUFFER] Received frame: {seq}");
134
135        self.jitter_estimator.update_estimate(seq, arrival_time_ms);
136        self.update_target_playout_delay();
137
138        let fb = FrameBuffer::new(frame, arrival_time_ms);
139        self.buffered_frames.insert(seq, fb);
140
141        self.find_and_move_continuous_frames(arrival_time_ms);
142    }
143
144    /// Updates the target playout delay based on the current jitter estimate.
145    fn update_target_playout_delay(&mut self) {
146        let jitter_estimate = self.jitter_estimator.get_jitter_estimate_ms();
147
148        // Calculate the raw target delay with a safety margin.
149        let raw_target = jitter_estimate * JITTER_MULTIPLIER;
150
151        // Clamp the target to our defined min/max bounds.
152        let clamped_target = raw_target.clamp(MIN_PLAYOUT_DELAY_MS, MAX_PLAYOUT_DELAY_MS);
153
154        // Smooth the transition to the new target to avoid sudden changes.
155        self.target_playout_delay_ms = (self.target_playout_delay_ms * DELAY_SMOOTHING_FACTOR)
156            + (clamped_target * (1.0 - DELAY_SMOOTHING_FACTOR));
157    }
158
159    /// Checks the buffered frames and moves any continuous frames to the decodable queue.
160    pub fn find_and_move_continuous_frames(&mut self, current_time_ms: u128) {
161        let mut frames_were_moved = false;
162
163        println!(
164            "[JB_POLL] Checking buffer. Last decoded: {:?}, Buffer size: {}, Target delay: {:.2}ms",
165            self.last_decoded_sequence_number,
166            self.buffered_frames.len(),
167            self.target_playout_delay_ms
168        );
169
170        loop {
171            let mut found_frame_to_move = false;
172
173            let next_decodable_key: Option<u64> = if let Some(last_seq) =
174                self.last_decoded_sequence_number
175            {
176                // CASE 1: We are in a continuous stream. Look for the next frame.
177                let next_continuous_seq = last_seq + 1;
178                if self.buffered_frames.contains_key(&next_continuous_seq) {
179                    println!("[JB_POLL] Seeking next continuous frame: {next_continuous_seq}");
180                    Some(next_continuous_seq)
181                } else {
182                    // CASE 2: Gap detected. Look for the next keyframe after the gap.
183                    let keyframe = self
184                        .buffered_frames
185                        .iter()
186                        .find(|(&s, f)| s > next_continuous_seq && f.is_keyframe())
187                        .map(|(&s, _)| s);
188                    if let Some(k) = keyframe {
189                        println!(
190                            "[JB_POLL] Gap after {last_seq}. Seeking next keyframe. Found: {k}"
191                        );
192                    } else {
193                        println!("[JB_POLL] Gap after {last_seq}. No subsequent keyframe found.");
194                    }
195                    keyframe
196                }
197            } else {
198                // CASE 3: We have never decoded. We MUST start with a keyframe.
199                let keyframe = self
200                    .buffered_frames
201                    .iter()
202                    .find(|(_, f)| f.is_keyframe())
203                    .map(|(&s, _)| s);
204                if let Some(k) = keyframe {
205                    println!("[JB_POLL] Seeking first keyframe. Found: {k}");
206                } else {
207                    println!("[JB_POLL] Seeking first keyframe. None found in buffer.");
208                }
209                keyframe
210            };
211
212            if let Some(key) = next_decodable_key {
213                if let Some(frame) = self.buffered_frames.get(&key) {
214                    let time_in_buffer_ms = (current_time_ms - frame.arrival_time_ms) as f64;
215
216                    let is_ready = time_in_buffer_ms >= self.target_playout_delay_ms;
217                    println!(
218                        "[JB_POLL] Candidate {}: Time in buffer: {:.2}ms, Target: {:.2}ms -> Ready: {}",
219                        key, time_in_buffer_ms, self.target_playout_delay_ms, is_ready
220                    );
221
222                    if is_ready {
223                        let frame_to_move = self.buffered_frames.remove(&key).unwrap();
224
225                        // If we're jumping to a keyframe to recover, drop everything before it.
226                        if frame_to_move.is_keyframe() {
227                            let is_first_frame = self.last_decoded_sequence_number.is_none();
228                            let is_gap_recovery = self
229                                .last_decoded_sequence_number
230                                .is_some_and(|last_seq| key > last_seq + 1);
231
232                            if is_first_frame || is_gap_recovery {
233                                println!(
234                                    "[JITTER_BUFFER] Keyframe {key} recovery. Dropping frames before it."
235                                );
236                                self.drop_frames_before(key);
237                            }
238                        }
239
240                        self.push_to_decoder(frame_to_move);
241                        self.last_decoded_sequence_number = Some(key);
242                        frames_were_moved = true;
243                        found_frame_to_move = true;
244                    }
245                }
246            } else {
247                println!("[JB_POLL] No decodable frame found in buffer.");
248            }
249
250            if !found_frame_to_move {
251                break;
252            }
253        }
254
255        if frames_were_moved {
256            // NOTE: No need to notify a condvar anymore. The decoder manages its own thread.
257        }
258    }
259
260    /// Pushes a single frame to the shared decodable queue.
261    fn push_to_decoder(&mut self, frame: FrameBuffer) {
262        println!(
263            "[JITTER_BUFFER] Pushing frame {} to decoder.",
264            frame.sequence_number()
265        );
266        self.decoder.decode(frame);
267    }
268
269    /// Checks if the jitter buffer is currently waiting for a keyframe to continue.
270    pub fn is_waiting_for_keyframe(&self) -> bool {
271        self.last_decoded_sequence_number.is_none()
272    }
273
274    /// Removes all frames from the buffer with a sequence number less than the given one.
275    fn drop_frames_before(&mut self, sequence_number: u64) {
276        let keys_to_drop: Vec<u64> = self
277            .buffered_frames
278            .keys()
279            .cloned()
280            .filter(|&k| k < sequence_number)
281            .collect();
282
283        self.dropped_frames_count += keys_to_drop.len() as u64;
284        for key in keys_to_drop {
285            println!("[JITTER_BUFFER] Dropping stale frame: {key}");
286            self.buffered_frames.remove(&key);
287        }
288    }
289
290    /// Removes all frames from the buffer. Used when a keyframe arrives and the buffer is full.
291    pub fn drop_all_frames(&mut self) {
292        let num_dropped = self.buffered_frames.len() as u64;
293        self.buffered_frames.clear();
294        self.dropped_frames_count += num_dropped;
295        println!("[JITTER_BUFFER] Dropped all {num_dropped} frames.");
296    }
297
298    /// Flushes the jitter buffer, resetting its state completely.
299    pub fn flush(&mut self) {
300        self.drop_all_frames();
301        self.last_decoded_sequence_number = None;
302        self.num_consecutive_old_frames = 0;
303        // Consider resetting jitter estimator as well if needed
304        self.jitter_estimator = JitterEstimator::new();
305    }
306
307    pub fn get_jitter_estimate_ms(&self) -> f64 {
308        self.jitter_estimator.get_jitter_estimate_ms()
309    }
310
311    pub fn get_target_playout_delay_ms(&self) -> f64 {
312        self.target_playout_delay_ms
313    }
314
315    pub fn get_dropped_frames_count(&self) -> u64 {
316        self.dropped_frames_count
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::decoder::DecodedFrame;
324    use crate::frame::{FrameType, VideoFrame};
325    use std::sync::Arc;
326    use std::sync::Mutex;
327
328    /// A mock decoder for testing purposes. It stores decoded frames in a shared Vec.
329    struct MockDecoder {
330        decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>,
331    }
332
333    // This impl is for native targets
334    #[cfg(not(target_arch = "wasm32"))]
335    impl Decodable for MockDecoder {
336        /// The decoded frame type for mock decoder in tests.
337        type Frame = crate::decoder::DecodedFrame;
338        fn new(
339            _codec: crate::decoder::VideoCodec,
340            _on_decoded_frame: Box<dyn Fn(DecodedFrame) + Send + Sync>,
341        ) -> Self {
342            panic!("Use `new_with_vec` for this mock.");
343        }
344        fn decode(&self, frame: FrameBuffer) {
345            let mut frames = self.decoded_frames.lock().unwrap();
346            frames.push(DecodedFrame {
347                sequence_number: frame.sequence_number(),
348                width: 0,
349                height: 0,
350                data: frame.frame.data.to_vec(),
351            });
352        }
353    }
354
355    // This impl is for wasm targets
356    #[cfg(target_arch = "wasm32")]
357    impl Decodable for MockDecoder {
358        /// The decoded frame type for mock decoder in tests.
359        type Frame = crate::decoder::DecodedFrame;
360        fn new(
361            _codec: crate::decoder::VideoCodec,
362            _on_decoded_frame: Box<dyn Fn(DecodedFrame)>,
363        ) -> Self {
364            panic!("Use `new_with_vec` for this mock.");
365        }
366        fn decode(&self, frame: FrameBuffer) {
367            let mut frames = self.decoded_frames.lock().unwrap();
368            frames.push(DecodedFrame {
369                sequence_number: frame.sequence_number(),
370                width: 0,
371                height: 0,
372                data: frame.frame.data.to_vec(),
373            });
374        }
375    }
376
377    impl MockDecoder {
378        fn new_with_vec(decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>) -> Self {
379            Self { decoded_frames }
380        }
381    }
382
383    /// A helper to create a JitterBuffer with a mock decoder for testing.
384    fn create_test_jitter_buffer() -> (
385        JitterBuffer<crate::decoder::DecodedFrame>,
386        Arc<Mutex<Vec<DecodedFrame>>>,
387    ) {
388        let decoded_frames = Arc::new(Mutex::new(Vec::new()));
389        let mock_decoder = Box::new(MockDecoder::new_with_vec(decoded_frames.clone()));
390        let jitter_buffer = JitterBuffer::new(mock_decoder);
391        (jitter_buffer, decoded_frames)
392    }
393
394    fn create_test_frame(seq: u64, frame_type: FrameType) -> VideoFrame {
395        VideoFrame {
396            sequence_number: seq,
397            frame_type,
398            data: vec![0; 10],
399            timestamp: 0.0,
400        }
401    }
402
403    #[test]
404    fn insert_in_order() {
405        let (mut jb, decoded_frames) = create_test_jitter_buffer();
406        // Playout delay requires us to simulate time passing.
407        let mut time = 1000;
408
409        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
410        time += 100; // Elapse time to overcome playout delay
411        jb.find_and_move_continuous_frames(time);
412
413        {
414            let queue = decoded_frames.lock().unwrap();
415            assert_eq!(queue.len(), 1);
416            assert_eq!(queue[0].sequence_number, 1);
417        }
418
419        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
420        time += 100;
421        jb.find_and_move_continuous_frames(time);
422
423        let queue = decoded_frames.lock().unwrap();
424        assert_eq!(queue.len(), 2);
425        assert_eq!(queue[1].sequence_number, 2);
426    }
427
428    #[test]
429    fn insert_out_of_order() {
430        let (mut jb, decoded_frames) = create_test_jitter_buffer();
431        let mut time = 1000;
432
433        // Insert 3, then 1, then 2.
434        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
435        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
436        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
437
438        // Advance time enough for all frames to pass the playout delay.
439        time += 100;
440        jb.find_and_move_continuous_frames(time);
441
442        let queue = decoded_frames.lock().unwrap();
443        assert_eq!(queue.len(), 3);
444        assert_eq!(queue[0].sequence_number, 1);
445        assert_eq!(queue[1].sequence_number, 2);
446        assert_eq!(queue[2].sequence_number, 3);
447    }
448
449    #[test]
450    fn keyframe_recovers_from_gap() {
451        let (mut jb, decoded_frames) = create_test_jitter_buffer();
452        let mut time = 1000;
453
454        // Insert 1, then 3 (KeyFrame). Frame 2 is "lost".
455        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
456        time += 100;
457        jb.find_and_move_continuous_frames(time); // Frame 1 is moved.
458
459        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
460        time += 100;
461        jb.find_and_move_continuous_frames(time); // Frame 3 is moved.
462
463        let queue = decoded_frames.lock().unwrap();
464        assert_eq!(queue.len(), 2);
465        assert_eq!(queue[0].sequence_number, 1);
466        assert_eq!(queue[1].sequence_number, 3);
467        assert_eq!(jb.last_decoded_sequence_number, Some(3));
468    }
469
470    #[test]
471    fn stale_frames_are_dropped_on_keyframe() {
472        let (mut jb, decoded_frames) = create_test_jitter_buffer();
473        let mut time = 1000;
474        assert_eq!(jb.get_dropped_frames_count(), 0);
475
476        // Insert frames that will become stale.
477        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
478        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
479        assert!(jb.buffered_frames.contains_key(&2));
480        assert!(jb.buffered_frames.contains_key(&3));
481
482        // At this point, nothing is decodable because we haven't seen a keyframe.
483        jb.find_and_move_continuous_frames(time);
484        assert!(decoded_frames.lock().unwrap().is_empty());
485
486        // Insert a keyframe that jumps over the stale frames.
487        jb.insert_frame(create_test_frame(4, FrameType::KeyFrame), time);
488
489        // Advance time to allow the keyframe to be decoded.
490        time += 100;
491        jb.find_and_move_continuous_frames(time);
492
493        // The keyframe should be ready to decode.
494        let queue = decoded_frames.lock().unwrap();
495        assert_eq!(queue.len(), 1);
496        assert_eq!(queue[0].sequence_number, 4);
497
498        // The stale frames should be gone from the internal buffer.
499        assert!(!jb.buffered_frames.contains_key(&2));
500        assert!(!jb.buffered_frames.contains_key(&3));
501
502        // The dropped frame counter should be updated.
503        assert_eq!(jb.get_dropped_frames_count(), 2);
504    }
505
506    #[test]
507    fn old_frames_are_ignored() {
508        let (mut jb, decoded_frames) = create_test_jitter_buffer();
509        let mut time = 1000;
510
511        // Decode sequence 1 and 2
512        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
513        time += 100;
514        jb.find_and_move_continuous_frames(time);
515        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
516        time += 100;
517        jb.find_and_move_continuous_frames(time);
518
519        // At this point, frames 1 and 2 should be in the queue.
520        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
521        assert_eq!(jb.last_decoded_sequence_number, Some(2));
522
523        // Now, insert an old frame (seq 1) and a current frame (seq 2).
524        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
525        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
526
527        // No new frames should have been added to the queue.
528        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
529
530        // And the internal buffer should be empty.
531        assert!(jb.buffered_frames.is_empty());
532    }
533
534    #[test]
535    fn buffer_capacity_is_enforced() {
536        let (mut jb, decoded_frames) = create_test_jitter_buffer();
537        let time = 1000;
538
539        // Fill the buffer up to its capacity. These frames are not continuous.
540        for i in 1..=MAX_BUFFER_SIZE {
541            jb.insert_frame(create_test_frame(i as u64 * 2, FrameType::DeltaFrame), time);
542        }
543
544        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
545
546        // Try to insert another delta frame. It should be rejected.
547        let next_seq = (MAX_BUFFER_SIZE + 1) as u64 * 2;
548        jb.insert_frame(create_test_frame(next_seq, FrameType::DeltaFrame), time);
549        assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
550        assert!(!jb.buffered_frames.contains_key(&next_seq));
551
552        // No frames should have been moved.
553        assert_eq!(decoded_frames.lock().unwrap().len(), 0);
554
555        // Now, insert a keyframe. It should clear the buffer and insert itself.
556        let keyframe_seq = (MAX_BUFFER_SIZE + 2) as u64 * 2;
557        jb.insert_frame(create_test_frame(keyframe_seq, FrameType::KeyFrame), time);
558
559        assert_eq!(jb.buffered_frames.len(), 1);
560        assert!(jb.buffered_frames.contains_key(&keyframe_seq));
561        assert_eq!(jb.get_dropped_frames_count(), MAX_BUFFER_SIZE as u64);
562    }
563
564    #[test]
565    fn playout_delay_holds_frame() {
566        let (mut jb, decoded_frames) = create_test_jitter_buffer();
567        let mut time = 1000;
568
569        // Insert a keyframe. The initial playout delay is MIN_PLAYOUT_DELAY_MS (10ms).
570        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
571
572        // Advance time, but not enough to meet the delay.
573        time += (MIN_PLAYOUT_DELAY_MS / 2.0) as u128;
574        jb.find_and_move_continuous_frames(time);
575
576        // The frame should NOT be in the decodable queue yet.
577        assert!(decoded_frames.lock().unwrap().is_empty());
578
579        // Advance time past the minimum delay.
580        time += (MIN_PLAYOUT_DELAY_MS as u128) + 1;
581        jb.find_and_move_continuous_frames(time);
582
583        // NOW the frame should be in the queue.
584        let queue = decoded_frames.lock().unwrap();
585        assert_eq!(queue.len(), 1);
586        assert_eq!(queue[0].sequence_number, 1);
587    }
588
589    #[test]
590    fn advances_decodable_frame_on_extraction() {
591        let (mut jb, decoded_frames) = create_test_jitter_buffer();
592        let mut time = 1000;
593
594        // Insert the first frame.
595        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
596
597        // Advance time to decode it.
598        time += 100;
599        jb.find_and_move_continuous_frames(time);
600
601        // Verify only frame 1 is in the queue.
602        {
603            let queue = decoded_frames.lock().unwrap();
604            assert_eq!(queue.len(), 1, "Queue should have frame 1");
605            assert_eq!(queue[0].sequence_number, 1);
606        }
607
608        // Simulate extraction by the decoder by updating our last decoded number
609        // and clearing the queue for the next check.
610        jb.last_decoded_sequence_number = Some(1);
611        decoded_frames.lock().unwrap().clear();
612
613        // Insert the second frame.
614        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
615
616        // Advance time to decode it.
617        time += 100;
618        jb.find_and_move_continuous_frames(time);
619
620        // Verify only frame 2 is in the queue.
621        {
622            let queue = decoded_frames.lock().unwrap();
623            assert_eq!(queue.len(), 1, "Queue should have frame 2");
624            assert_eq!(queue[0].sequence_number, 2);
625        }
626
627        // Simulate extraction of frame 2.
628        jb.last_decoded_sequence_number = Some(2);
629        decoded_frames.lock().unwrap().clear();
630
631        // Insert the third frame.
632        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
633
634        // Advance time to decode it.
635        time += 100;
636        jb.find_and_move_continuous_frames(time);
637
638        // Verify only frame 3 is in the queue.
639        {
640            let queue = decoded_frames.lock().unwrap();
641            assert_eq!(queue.len(), 1, "Queue should have frame 3");
642            assert_eq!(queue[0].sequence_number, 3);
643        }
644    }
645
646    #[test]
647    fn complex_reordering_pattern() {
648        let (mut jb, decoded_frames) = create_test_jitter_buffer();
649        let mut time = 1000;
650
651        // Insert odd frames first
652        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
653        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
654        jb.insert_frame(create_test_frame(5, FrameType::DeltaFrame), time);
655
656        // Then insert even frames
657        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
658        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
659
660        // Advance time to allow all to be decoded
661        time += 100;
662        jb.find_and_move_continuous_frames(time);
663
664        let queue = decoded_frames.lock().unwrap();
665        assert_eq!(queue.len(), 5);
666        for i in 0..5 {
667            assert_eq!(queue[i].sequence_number, (i + 1) as u64);
668        }
669    }
670
671    #[test]
672    fn in_order_keyframe_does_not_disrupt_flow() {
673        let (mut jb, decoded_frames) = create_test_jitter_buffer();
674        let mut time = 1000;
675
676        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
677        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
678
679        time += 100;
680        jb.find_and_move_continuous_frames(time);
681        assert_eq!(decoded_frames.lock().unwrap().len(), 2);
682        assert_eq!(jb.get_dropped_frames_count(), 0);
683
684        // Insert another Keyframe, but it's in order, so no frames should be dropped.
685        jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
686
687        time += 100;
688        jb.find_and_move_continuous_frames(time);
689
690        let queue = decoded_frames.lock().unwrap();
691        assert_eq!(queue.len(), 3, "All three frames should be in the queue");
692        assert_eq!(queue[2].sequence_number, 3);
693        assert_eq!(
694            jb.get_dropped_frames_count(),
695            0,
696            "No frames should have been dropped"
697        );
698    }
699
700    #[test]
701    fn sequence_starting_at_high_number() {
702        let (mut jb, decoded_frames) = create_test_jitter_buffer();
703        let mut time = 1000;
704        let start_seq = 10000;
705
706        // Insert frames starting from a high sequence number
707        jb.insert_frame(create_test_frame(start_seq, FrameType::KeyFrame), time);
708        jb.insert_frame(
709            create_test_frame(start_seq + 2, FrameType::DeltaFrame),
710            time,
711        );
712        jb.insert_frame(
713            create_test_frame(start_seq + 1, FrameType::DeltaFrame),
714            time,
715        );
716
717        // Advance time enough for all frames to pass the playout delay.
718        time += 100;
719        jb.find_and_move_continuous_frames(time);
720
721        let queue = decoded_frames.lock().unwrap();
722        assert_eq!(queue.len(), 3);
723        assert_eq!(queue[0].sequence_number, start_seq);
724        assert_eq!(queue[1].sequence_number, start_seq + 1);
725        assert_eq!(queue[2].sequence_number, start_seq + 2);
726    }
727
728    #[test]
729    fn flush_on_too_many_consecutive_old_frames() {
730        let (mut jb, decoded_frames) = create_test_jitter_buffer();
731        let mut time = 1000;
732
733        // Decode sequence 1 and 2
734        jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
735        time += 100;
736        jb.find_and_move_continuous_frames(time);
737        jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
738        time += 100;
739        jb.find_and_move_continuous_frames(time);
740        assert_eq!(jb.last_decoded_sequence_number, Some(2));
741        assert_eq!(jb.buffered_frames.len(), 0);
742
743        // Insert a frame into the buffer that won't be decoded
744        jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
745        assert_eq!(jb.buffered_frames.len(), 1);
746
747        // Send a stream of old packets
748        for _ in 0..=MAX_CONSECUTIVE_OLD_FRAMES {
749            // Send old frame with sequence number 1
750            jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
751        }
752
753        // The buffer should now be flushed
754        assert_eq!(
755            jb.last_decoded_sequence_number, None,
756            "Last decoded sequence number should be reset"
757        );
758        assert_eq!(
759            jb.buffered_frames.len(),
760            0,
761            "Buffer should be empty after flush"
762        );
763        assert_eq!(
764            jb.num_consecutive_old_frames, 0,
765            "Consecutive old frames counter should be reset"
766        );
767
768        // It should now be waiting for a keyframe again
769        assert!(jb.is_waiting_for_keyframe());
770
771        // Verify that even if we send another delta frame, it doesn't get decoded
772        jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
773        time += 100;
774        jb.find_and_move_continuous_frames(time);
775        assert!(decoded_frames.lock().unwrap().len() <= 2); // Should not have increased
776    }
777}