use crate::decoder::Decodable;
use crate::frame::{FrameBuffer, FrameType, VideoFrame};
use crate::jitter_estimator::JitterEstimator;
use std::collections::BTreeMap;
const MIN_PLAYOUT_DELAY_MS: f64 = 10.0;
const MAX_PLAYOUT_DELAY_MS: f64 = 500.0;
const JITTER_MULTIPLIER: f64 = 3.0;
const DELAY_SMOOTHING_FACTOR: f64 = 0.99;
const MAX_BUFFER_SIZE: usize = 200;
const MAX_CONSECUTIVE_OLD_FRAMES: u64 = 300;
const STREAM_RESTART_BACKTRACK_THRESHOLD: u64 = 30;
pub struct JitterBuffer<T> {
buffered_frames: BTreeMap<u64, FrameBuffer>,
last_decoded_sequence_number: Option<u64>,
jitter_estimator: JitterEstimator,
target_playout_delay_ms: f64,
dropped_frames_count: u64,
num_consecutive_old_frames: u64,
decoder: Box<dyn Decodable<Frame = T>>,
}
impl<T> JitterBuffer<T> {
pub fn new(decoder: Box<dyn Decodable<Frame = T>>) -> Self {
Self {
buffered_frames: BTreeMap::new(),
last_decoded_sequence_number: None,
jitter_estimator: JitterEstimator::new(),
target_playout_delay_ms: MIN_PLAYOUT_DELAY_MS,
dropped_frames_count: 0,
num_consecutive_old_frames: 0,
decoder,
}
}
pub fn buffered_frames_len(&self) -> usize {
self.buffered_frames.len()
}
pub fn insert_frame(&mut self, frame: VideoFrame, arrival_time_ms: u128) {
let seq = frame.sequence_number;
println!("[JITTER_BUFFER] Inserting frame: {seq}");
if let Some(last_decoded) = self.last_decoded_sequence_number {
if seq <= last_decoded {
if frame.frame_type == FrameType::KeyFrame
&& last_decoded.saturating_sub(seq) > STREAM_RESTART_BACKTRACK_THRESHOLD
{
println!(
"[JITTER_BUFFER] Detected keyframe with older sequence ({seq} <= {last_decoded}). Assuming stream restart – flushing buffer."
);
self.flush();
} else {
println!("[JITTER_BUFFER] Ignoring old frame: {seq}");
self.num_consecutive_old_frames += 1;
if self.num_consecutive_old_frames > MAX_CONSECUTIVE_OLD_FRAMES {
println!(
"[JITTER_BUFFER] Received {} consecutive old frames. Flushing buffer.",
self.num_consecutive_old_frames
);
self.flush();
}
}
return;
}
}
self.num_consecutive_old_frames = 0;
if self.buffered_frames.len() >= MAX_BUFFER_SIZE {
if frame.frame_type == FrameType::KeyFrame {
println!("[JITTER_BUFFER] Buffer full, but received keyframe. Clearing buffer.");
self.drop_all_frames();
} else {
println!("[JITTER_BUFFER] Buffer full. Rejecting frame: {seq}");
return; }
}
println!("[JITTER_BUFFER] Received frame: {seq}");
self.jitter_estimator.update_estimate(seq, arrival_time_ms);
self.update_target_playout_delay();
let fb = FrameBuffer::new(frame, arrival_time_ms);
self.buffered_frames.insert(seq, fb);
self.find_and_move_continuous_frames(arrival_time_ms);
}
fn update_target_playout_delay(&mut self) {
let jitter_estimate = self.jitter_estimator.get_jitter_estimate_ms();
let raw_target = jitter_estimate * JITTER_MULTIPLIER;
let clamped_target = raw_target.clamp(MIN_PLAYOUT_DELAY_MS, MAX_PLAYOUT_DELAY_MS);
self.target_playout_delay_ms = (self.target_playout_delay_ms * DELAY_SMOOTHING_FACTOR)
+ (clamped_target * (1.0 - DELAY_SMOOTHING_FACTOR));
}
pub fn find_and_move_continuous_frames(&mut self, current_time_ms: u128) {
let mut frames_were_moved = false;
println!(
"[JB_POLL] Checking buffer. Last decoded: {:?}, Buffer size: {}, Target delay: {:.2}ms",
self.last_decoded_sequence_number,
self.buffered_frames.len(),
self.target_playout_delay_ms
);
loop {
let mut found_frame_to_move = false;
let next_decodable_key: Option<u64> = if let Some(last_seq) =
self.last_decoded_sequence_number
{
let next_continuous_seq = last_seq + 1;
if self.buffered_frames.contains_key(&next_continuous_seq) {
println!("[JB_POLL] Seeking next continuous frame: {next_continuous_seq}");
Some(next_continuous_seq)
} else {
let keyframe = self
.buffered_frames
.iter()
.find(|(&s, f)| s > next_continuous_seq && f.is_keyframe())
.map(|(&s, _)| s);
if let Some(k) = keyframe {
println!(
"[JB_POLL] Gap after {last_seq}. Seeking next keyframe. Found: {k}"
);
} else {
println!("[JB_POLL] Gap after {last_seq}. No subsequent keyframe found.");
}
keyframe
}
} else {
let keyframe = self
.buffered_frames
.iter()
.find(|(_, f)| f.is_keyframe())
.map(|(&s, _)| s);
if let Some(k) = keyframe {
println!("[JB_POLL] Seeking first keyframe. Found: {k}");
} else {
println!("[JB_POLL] Seeking first keyframe. None found in buffer.");
}
keyframe
};
if let Some(key) = next_decodable_key {
if let Some(frame) = self.buffered_frames.get(&key) {
let time_in_buffer_ms = (current_time_ms - frame.arrival_time_ms) as f64;
let is_ready = time_in_buffer_ms >= self.target_playout_delay_ms;
println!(
"[JB_POLL] Candidate {key}: Time in buffer: {time_in_buffer_ms:.2}ms, Target: {:.2}ms -> Ready: {is_ready}",
self.target_playout_delay_ms
);
if is_ready {
let frame_to_move = self.buffered_frames.remove(&key).unwrap();
if frame_to_move.is_keyframe() {
let is_first_frame = self.last_decoded_sequence_number.is_none();
let is_gap_recovery = self
.last_decoded_sequence_number
.is_some_and(|last_seq| key > last_seq + 1);
if is_first_frame || is_gap_recovery {
println!(
"[JITTER_BUFFER] Keyframe {key} recovery. Dropping frames before it."
);
self.drop_frames_before(key);
}
}
self.push_to_decoder(frame_to_move);
self.last_decoded_sequence_number = Some(key);
frames_were_moved = true;
found_frame_to_move = true;
}
}
} else {
println!("[JB_POLL] No decodable frame found in buffer.");
}
if !found_frame_to_move {
break;
}
}
if frames_were_moved {
}
}
fn push_to_decoder(&mut self, frame: FrameBuffer) {
let seq = frame.sequence_number();
println!("[JITTER_BUFFER] Pushing frame {seq} to decoder.");
self.decoder.decode(frame);
}
pub fn is_waiting_for_keyframe(&self) -> bool {
self.last_decoded_sequence_number.is_none()
}
fn drop_frames_before(&mut self, sequence_number: u64) {
let keys_to_drop: Vec<u64> = self
.buffered_frames
.keys()
.cloned()
.filter(|&k| k < sequence_number)
.collect();
self.dropped_frames_count += keys_to_drop.len() as u64;
for key in keys_to_drop {
println!("[JITTER_BUFFER] Dropping stale frame: {key}");
self.buffered_frames.remove(&key);
}
}
pub fn drop_all_frames(&mut self) {
let num_dropped = self.buffered_frames.len() as u64;
self.buffered_frames.clear();
self.dropped_frames_count += num_dropped;
println!("[JITTER_BUFFER] Dropped all {num_dropped} frames.");
}
pub fn flush(&mut self) {
self.drop_all_frames();
self.last_decoded_sequence_number = None;
self.num_consecutive_old_frames = 0;
self.jitter_estimator = JitterEstimator::new();
}
pub fn get_jitter_estimate_ms(&self) -> f64 {
self.jitter_estimator.get_jitter_estimate_ms()
}
pub fn get_target_playout_delay_ms(&self) -> f64 {
self.target_playout_delay_ms
}
pub fn get_dropped_frames_count(&self) -> u64 {
self.dropped_frames_count
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::decoder::DecodedFrame;
use crate::frame::{FrameType, VideoFrame};
use std::sync::Arc;
use std::sync::Mutex;
struct MockDecoder {
decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Decodable for MockDecoder {
type Frame = crate::decoder::DecodedFrame;
fn new(
_codec: crate::decoder::VideoCodec,
_on_decoded_frame: Box<dyn Fn(DecodedFrame) + Send + Sync>,
) -> Self {
panic!("Use `new_with_vec` for this mock.");
}
fn decode(&self, frame: FrameBuffer) {
let mut frames = self.decoded_frames.lock().unwrap();
frames.push(DecodedFrame {
sequence_number: frame.sequence_number(),
width: 0,
height: 0,
data: frame.frame.data.to_vec(),
});
}
}
#[cfg(target_arch = "wasm32")]
impl Decodable for MockDecoder {
type Frame = crate::decoder::DecodedFrame;
fn new(
_codec: crate::decoder::VideoCodec,
_on_decoded_frame: Box<dyn Fn(DecodedFrame)>,
) -> Self {
panic!("Use `new_with_vec` for this mock.");
}
fn decode(&self, frame: FrameBuffer) {
let mut frames = self.decoded_frames.lock().unwrap();
frames.push(DecodedFrame {
sequence_number: frame.sequence_number(),
width: 0,
height: 0,
data: frame.frame.data.to_vec(),
});
}
}
impl MockDecoder {
fn new_with_vec(decoded_frames: Arc<Mutex<Vec<DecodedFrame>>>) -> Self {
Self { decoded_frames }
}
}
fn create_test_jitter_buffer() -> (
JitterBuffer<crate::decoder::DecodedFrame>,
Arc<Mutex<Vec<DecodedFrame>>>,
) {
let decoded_frames = Arc::new(Mutex::new(Vec::new()));
let mock_decoder = Box::new(MockDecoder::new_with_vec(decoded_frames.clone()));
let jitter_buffer = JitterBuffer::new(mock_decoder);
(jitter_buffer, decoded_frames)
}
fn create_test_frame(seq: u64, frame_type: FrameType) -> VideoFrame {
VideoFrame {
sequence_number: seq,
frame_type,
data: vec![0; 10],
timestamp: 0.0,
}
}
#[test]
fn insert_in_order() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += 100; jb.find_and_move_continuous_frames(time);
{
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].sequence_number, 1);
}
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 2);
assert_eq!(queue[1].sequence_number, 2);
}
#[test]
fn insert_out_of_order() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 3);
assert_eq!(queue[0].sequence_number, 1);
assert_eq!(queue[1].sequence_number, 2);
assert_eq!(queue[2].sequence_number, 3);
}
#[test]
fn keyframe_recovers_from_gap() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 2);
assert_eq!(queue[0].sequence_number, 1);
assert_eq!(queue[1].sequence_number, 3);
assert_eq!(jb.last_decoded_sequence_number, Some(3));
}
#[test]
fn stale_frames_are_dropped_on_keyframe() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
assert_eq!(jb.get_dropped_frames_count(), 0);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
assert!(jb.buffered_frames.contains_key(&2));
assert!(jb.buffered_frames.contains_key(&3));
jb.find_and_move_continuous_frames(time);
assert!(decoded_frames.lock().unwrap().is_empty());
jb.insert_frame(create_test_frame(4, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].sequence_number, 4);
assert!(!jb.buffered_frames.contains_key(&2));
assert!(!jb.buffered_frames.contains_key(&3));
assert_eq!(jb.get_dropped_frames_count(), 2);
}
#[test]
fn old_frames_are_ignored() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
assert_eq!(decoded_frames.lock().unwrap().len(), 2);
assert_eq!(jb.last_decoded_sequence_number, Some(2));
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
assert_eq!(decoded_frames.lock().unwrap().len(), 2);
assert!(jb.buffered_frames.is_empty());
}
#[test]
fn buffer_capacity_is_enforced() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let time = 1000;
for i in 1..=MAX_BUFFER_SIZE {
jb.insert_frame(create_test_frame(i as u64 * 2, FrameType::DeltaFrame), time);
}
assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
let next_seq = (MAX_BUFFER_SIZE + 1) as u64 * 2;
jb.insert_frame(create_test_frame(next_seq, FrameType::DeltaFrame), time);
assert_eq!(jb.buffered_frames.len(), MAX_BUFFER_SIZE);
assert!(!jb.buffered_frames.contains_key(&next_seq));
assert_eq!(decoded_frames.lock().unwrap().len(), 0);
let keyframe_seq = (MAX_BUFFER_SIZE + 2) as u64 * 2;
jb.insert_frame(create_test_frame(keyframe_seq, FrameType::KeyFrame), time);
assert_eq!(jb.buffered_frames.len(), 1);
assert!(jb.buffered_frames.contains_key(&keyframe_seq));
assert_eq!(jb.get_dropped_frames_count(), MAX_BUFFER_SIZE as u64);
}
#[test]
fn playout_delay_holds_frame() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += (MIN_PLAYOUT_DELAY_MS / 2.0) as u128;
jb.find_and_move_continuous_frames(time);
assert!(decoded_frames.lock().unwrap().is_empty());
time += (MIN_PLAYOUT_DELAY_MS as u128) + 1;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].sequence_number, 1);
}
#[test]
fn advances_decodable_frame_on_extraction() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
{
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1, "Queue should have frame 1");
assert_eq!(queue[0].sequence_number, 1);
}
jb.last_decoded_sequence_number = Some(1);
decoded_frames.lock().unwrap().clear();
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
{
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1, "Queue should have frame 2");
assert_eq!(queue[0].sequence_number, 2);
}
jb.last_decoded_sequence_number = Some(2);
decoded_frames.lock().unwrap().clear();
jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
{
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 1, "Queue should have frame 3");
assert_eq!(queue[0].sequence_number, 3);
}
}
#[test]
fn complex_reordering_pattern() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
jb.insert_frame(create_test_frame(5, FrameType::DeltaFrame), time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 5);
for i in 0..5 {
assert_eq!(queue[i].sequence_number, (i + 1) as u64);
}
}
#[test]
fn in_order_keyframe_does_not_disrupt_flow() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
assert_eq!(decoded_frames.lock().unwrap().len(), 2);
assert_eq!(jb.get_dropped_frames_count(), 0);
jb.insert_frame(create_test_frame(3, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 3, "All three frames should be in the queue");
assert_eq!(queue[2].sequence_number, 3);
assert_eq!(
jb.get_dropped_frames_count(),
0,
"No frames should have been dropped"
);
}
#[test]
fn sequence_starting_at_high_number() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
let start_seq = 10000;
jb.insert_frame(create_test_frame(start_seq, FrameType::KeyFrame), time);
jb.insert_frame(
create_test_frame(start_seq + 2, FrameType::DeltaFrame),
time,
);
jb.insert_frame(
create_test_frame(start_seq + 1, FrameType::DeltaFrame),
time,
);
time += 100;
jb.find_and_move_continuous_frames(time);
let queue = decoded_frames.lock().unwrap();
assert_eq!(queue.len(), 3);
assert_eq!(queue[0].sequence_number, start_seq);
assert_eq!(queue[1].sequence_number, start_seq + 1);
assert_eq!(queue[2].sequence_number, start_seq + 2);
}
#[test]
fn flush_on_too_many_consecutive_old_frames() {
let (mut jb, decoded_frames) = create_test_jitter_buffer();
let mut time = 1000;
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
jb.insert_frame(create_test_frame(2, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
assert_eq!(jb.last_decoded_sequence_number, Some(2));
assert_eq!(jb.buffered_frames.len(), 0);
jb.insert_frame(create_test_frame(4, FrameType::DeltaFrame), time);
assert_eq!(jb.buffered_frames.len(), 1);
for _ in 0..=MAX_CONSECUTIVE_OLD_FRAMES {
jb.insert_frame(create_test_frame(1, FrameType::KeyFrame), time);
}
assert_eq!(
jb.last_decoded_sequence_number, None,
"Last decoded sequence number should be reset"
);
assert_eq!(
jb.buffered_frames.len(),
0,
"Buffer should be empty after flush"
);
assert_eq!(
jb.num_consecutive_old_frames, 0,
"Consecutive old frames counter should be reset"
);
assert!(jb.is_waiting_for_keyframe());
jb.insert_frame(create_test_frame(3, FrameType::DeltaFrame), time);
time += 100;
jb.find_and_move_continuous_frames(time);
assert!(decoded_frames.lock().unwrap().len() <= 2); }
}