use std::collections::HashMap;
use std::rc::Rc;
use std::sync::atomic::{AtomicU32, Ordering};
use js_sys::Date;
use videocall_types::protos::diagnostics_packet::DiagnosticsPacket;
use videocall_types::protos::media_packet::media_packet::MediaType;
const WINDOW_DURATION_SEC: u32 = 10;
const INACTIVE_TIMEOUT_SEC: u32 = 20;
#[derive(Debug, Clone)]
pub enum EncoderControl {
UpdateBitrate { target_bitrate_kbps: u32 },
}
pub struct DiagnosticPacketWindow {
packets: Vec<(f64, DiagnosticsPacket)>,
window_duration_ms: f64,
last_cleanup: f64,
}
impl DiagnosticPacketWindow {
pub fn new(window_duration_sec: u32) -> Self {
Self {
packets: Vec::new(),
window_duration_ms: window_duration_sec as f64 * 1000.0,
last_cleanup: Date::now(),
}
}
pub fn add_packet(&mut self, timestamp: f64, packet: DiagnosticsPacket) {
self.packets.push((timestamp, packet));
if timestamp - self.last_cleanup > 1000.0 {
self.cleanup(timestamp);
}
}
fn cleanup(&mut self, current_time: f64) {
let cutoff = current_time - self.window_duration_ms;
self.packets.retain(|(ts, _)| *ts >= cutoff);
self.last_cleanup = current_time;
}
pub fn latest_fps(&self) -> Option<f64> {
if let Some((_, packet)) = self.packets.last() {
let fps = match packet.media_type.enum_value_or_default() {
MediaType::VIDEO => packet.video_metrics.as_ref().map(|m| m.fps_received as f64),
MediaType::AUDIO => packet.audio_metrics.as_ref().map(|m| m.fps_received as f64),
MediaType::SCREEN => packet.video_metrics.as_ref().map(|m| m.fps_received as f64),
_ => None,
};
return fps;
}
None
}
pub fn min_fps(&self) -> Option<f64> {
if self.packets.is_empty() {
return None;
}
let mut min_fps = f64::INFINITY;
let mut found_fps = false;
for (_, packet) in &self.packets {
let fps = match packet.media_type.enum_value_or_default() {
MediaType::VIDEO => packet.video_metrics.as_ref().map(|m| m.fps_received as f64),
MediaType::AUDIO => packet.audio_metrics.as_ref().map(|m| m.fps_received as f64),
MediaType::SCREEN => packet.video_metrics.as_ref().map(|m| m.fps_received as f64),
_ => None,
};
if let Some(fps) = fps {
min_fps = min_fps.min(fps);
found_fps = true;
}
}
if found_fps {
Some(min_fps)
} else {
None
}
}
pub fn latest_timestamp(&self) -> Option<f64> {
self.packets.last().map(|(ts, _)| *ts)
}
pub fn len(&self) -> usize {
self.packets.len()
}
pub fn is_empty(&self) -> bool {
self.packets.is_empty()
}
}
pub struct DiagnosticPackets {
peer_windows: HashMap<String, DiagnosticPacketWindow>,
window_duration_sec: u32,
inactive_timeout_sec: u32,
}
impl DiagnosticPackets {
pub fn new(window_duration_sec: u32, inactive_timeout_sec: u32) -> Self {
Self {
peer_windows: HashMap::new(),
window_duration_sec,
inactive_timeout_sec,
}
}
pub fn process_packet(&mut self, packet: DiagnosticsPacket, now: f64) {
let target_id = packet.target_id.clone();
let window = self
.peer_windows
.entry(target_id)
.or_insert_with(|| DiagnosticPacketWindow::new(self.window_duration_sec));
window.add_packet(now, packet);
self.remove_inactive_peers(now);
}
fn remove_inactive_peers(&mut self, now: f64) {
let inactive_cutoff = now - (self.inactive_timeout_sec as f64 * 1000.0);
self.peer_windows.retain(|_, window| {
if let Some(latest) = window.latest_timestamp() {
latest >= inactive_cutoff
} else {
false
}
});
}
pub fn get_worst_fps_peer(&self) -> Option<(String, f64)> {
if self.peer_windows.is_empty() {
return None;
}
let mut worst_peer = None;
let mut min_fps = f64::INFINITY;
for (peer_id, window) in &self.peer_windows {
if let Some(fps) = window.min_fps() {
if fps < min_fps {
min_fps = fps;
worst_peer = Some((peer_id.clone(), fps));
}
}
}
worst_peer
}
pub fn get_peer_ids(&self) -> Vec<String> {
self.peer_windows.keys().cloned().collect()
}
pub fn peer_count(&self) -> usize {
self.peer_windows.len()
}
}
pub struct EncoderBitrateController {
pid: pidgeon::PidController,
last_update: f64,
_ideal_bitrate_kbps: u32,
_current_fps: Rc<AtomicU32>,
fps_history: std::collections::VecDeque<f64>, max_history_size: usize, last_error: f64, initialization_complete: bool, diagnostic_packets: DiagnosticPackets, last_correction_time: f64, correction_throttle_ms: f64, }
impl EncoderBitrateController {
pub fn new(ideal_bitrate_kbps: u32, current_fps: Rc<AtomicU32>) -> Self {
let controller_config = pidgeon::ControllerConfig::default()
.with_kp(0.2) .with_ki(0.05) .with_kd(0.02) .with_setpoint(0.0) .with_deadband(0.5) .with_output_limits(0.0, 50.0) .with_anti_windup(true);
let pid = pidgeon::PidController::new(controller_config);
let diagnostic_packets = DiagnosticPackets::new(WINDOW_DURATION_SEC, INACTIVE_TIMEOUT_SEC);
Self {
pid,
last_update: Date::now(),
_ideal_bitrate_kbps: ideal_bitrate_kbps,
_current_fps: current_fps,
fps_history: std::collections::VecDeque::with_capacity(10),
max_history_size: 10,
last_error: 0.0,
initialization_complete: false,
diagnostic_packets,
last_correction_time: 0.0, correction_throttle_ms: 1000.0, }
}
fn calculate_jitter(&self) -> f64 {
if self.fps_history.len() < 2 {
return 0.0; }
let differences: Vec<f64> = self
.fps_history
.iter()
.zip(self.fps_history.iter().skip(1))
.map(|(&a, &b)| (b - a).abs())
.collect();
let sum: f64 = differences.iter().sum();
sum / differences.len() as f64
}
pub fn process_diagnostics_packet_with_time(
&mut self,
packet: DiagnosticsPacket,
now: f64,
) -> Option<f64> {
self.diagnostic_packets.process_packet(packet.clone(), now);
let time_since_last_correction = now - self.last_correction_time;
if time_since_last_correction < self.correction_throttle_ms {
log::debug!(
"Throttling bitrate correction: {:.0}ms since last correction (throttle: {:.0}ms)",
time_since_last_correction,
self.correction_throttle_ms
);
return None; }
let worst_fps = match self.diagnostic_packets.get_worst_fps_peer() {
Some((_, fps)) => fps,
None => {
return None;
}
};
let target_fps = self._current_fps.load(Ordering::Relaxed) as f64;
let fps_received = worst_fps.min(target_fps);
if target_fps <= 0.0 {
self.last_correction_time = now;
return Some(self._ideal_bitrate_kbps as f64); }
self.fps_history.push_back(fps_received);
while self.fps_history.len() > self.max_history_size {
self.fps_history.pop_front();
}
let jitter = self.calculate_jitter();
let dt = now - self.last_update;
self.last_update = now;
let current_error = target_fps - fps_received;
if !self.initialization_complete {
if self.fps_history.len() >= 3 {
self.initialization_complete = true;
} else {
self.last_error = current_error;
self.last_correction_time = now;
return Some(self._ideal_bitrate_kbps as f64); }
}
self.last_error = current_error;
let fps_error_output = self.pid.compute(current_error, dt);
let normalized_jitter = jitter / target_fps;
let jitter_factor = (normalized_jitter * 5.0).min(1.0);
let base_bitrate = self._ideal_bitrate_kbps as f64;
let fps_adjustment = fps_error_output * 3_000.0;
let after_pid = base_bitrate - fps_adjustment;
let jitter_reduction = after_pid * (jitter_factor * 0.9);
let corrected_bitrate = after_pid - jitter_reduction;
let min_bitrate = (self._ideal_bitrate_kbps as f64) * 0.1; let max_bitrate = (self._ideal_bitrate_kbps as f64) * 1.5;
log::debug!(
"FPS: target={:.1} received={:.1} error={:.1} | PID output={:.2} | Jitter={:.2} factor={:.2} | Bitrate: base={:.0} bps pid_adj={:.0} jitter_adj={:.0} final={:.0} bps | Peers: {}",
target_fps, fps_received, current_error,
fps_error_output, jitter, jitter_factor,
base_bitrate, fps_adjustment, jitter_reduction, corrected_bitrate,
self.diagnostic_packets.peer_count()
);
let final_bitrate = if !(min_bitrate..=max_bitrate).contains(&corrected_bitrate)
|| corrected_bitrate.is_nan()
{
log::warn!("Bitrate out of bounds or NaN: {corrected_bitrate:.0} kbps (min: {min_bitrate:.0} kbps, max: {max_bitrate:.0} kbps)");
f64::max(min_bitrate, f64::min(base_bitrate, max_bitrate))
} else {
corrected_bitrate
};
self.last_correction_time = now;
Some(final_bitrate)
}
pub fn process_diagnostics_packet(&mut self, packet: DiagnosticsPacket) -> Option<f64> {
self.process_diagnostics_packet_with_time(packet, Date::now())
}
pub fn peer_count(&self) -> usize {
self.diagnostic_packets.peer_count()
}
pub fn peer_ids(&self) -> Vec<String> {
self.diagnostic_packets.get_peer_ids()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::rc::Rc;
use std::sync::atomic::AtomicU32;
use videocall_types::protos::diagnostics_packet::{
AudioMetrics, DiagnosticsPacket, VideoMetrics,
};
use wasm_bindgen_test::*;
fn create_test_packet(
sender_id: &str,
target_id: &str,
fps: f32,
bitrate_kbps: u32,
) -> DiagnosticsPacket {
let mut packet = DiagnosticsPacket::new();
packet.sender_id = sender_id.to_string();
packet.target_id = target_id.to_string();
packet.timestamp_ms = js_sys::Date::now() as u64;
packet.media_type =
videocall_types::protos::media_packet::media_packet::MediaType::VIDEO.into();
let mut video_metrics = VideoMetrics::new();
video_metrics.fps_received = fps;
video_metrics.bitrate_kbps = bitrate_kbps;
packet.video_metrics = ::protobuf::MessageField::some(video_metrics);
packet
}
fn create_test_audio_packet(
sender_id: &str,
target_id: &str,
fps: f32,
bitrate_kbps: u32,
) -> DiagnosticsPacket {
let mut packet = DiagnosticsPacket::new();
packet.sender_id = sender_id.to_string();
packet.target_id = target_id.to_string();
packet.timestamp_ms = js_sys::Date::now() as u64;
packet.media_type =
videocall_types::protos::media_packet::media_packet::MediaType::AUDIO.into();
let mut audio_metrics = AudioMetrics::new();
audio_metrics.fps_received = fps;
audio_metrics.bitrate_kbps = bitrate_kbps;
packet.audio_metrics = ::protobuf::MessageField::some(audio_metrics);
packet
}
#[wasm_bindgen_test]
fn test_happy_path() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
for i in 0..10 {
let packet = create_test_packet("peer1", "self", 30.0, 500);
let result = controller
.process_diagnostics_packet_with_time(packet, base_time + (i as f64 * 1100.0));
assert!(result.is_some(), "Packet should return a bitrate");
if let Some(bitrate) = result {
assert!(
(bitrate - (ideal_bitrate_kbps) as f64).abs() < 10.0,
"Expected bitrate close to base ({ideal_bitrate_kbps} kbps), got {bitrate} kbps"
);
}
}
assert_eq!(controller.fps_history.len(), 10);
let jitter = controller.calculate_jitter();
assert!(
jitter < 0.1,
"Expected near-zero jitter in happy path, got {jitter}"
);
assert_eq!(controller.peer_count(), 1);
assert_eq!(controller.peer_ids(), vec!["self"]);
}
#[wasm_bindgen_test]
fn test_multiple_peers() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
let good_peer_packet = create_test_packet("good_sender", "peer1", 30.0, 500);
let result1 = controller.process_diagnostics_packet_with_time(good_peer_packet, base_time);
assert!(result1.is_some(), "First packet should return a bitrate");
let avg_peer_packet = create_test_packet("avg_sender", "peer2", 20.0, 500);
let result2 =
controller.process_diagnostics_packet_with_time(avg_peer_packet, base_time + 1100.0);
assert!(result2.is_some(), "Second packet should return a bitrate");
let poor_peer_packet = create_test_packet("poor_sender", "peer3", 5.0, 500);
let result3 =
controller.process_diagnostics_packet_with_time(poor_peer_packet, base_time + 2200.0);
assert!(result3.is_some(), "Third packet should return a bitrate");
assert_eq!(controller.peer_count(), 3);
assert!(controller.peer_ids().contains(&"peer1".to_string()));
assert!(controller.peer_ids().contains(&"peer2".to_string()));
assert!(controller.peer_ids().contains(&"peer3".to_string()));
let result4 = controller.process_diagnostics_packet_with_time(
create_test_packet("test_sender", "test_target", 30.0, 500),
base_time + 3300.0,
);
assert!(result4.is_some(), "Fourth packet should return a bitrate");
if let Some(bitrate) = result4 {
assert!(
bitrate < ideal_bitrate_kbps as f64 * 0.7, "Expected reduced bitrate due to poor peer, got {bitrate} bps (ideal: {ideal_bitrate_kbps} bps)"
);
}
}
#[wasm_bindgen_test]
fn test_peer_cleanup() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
controller.process_diagnostics_packet_with_time(
create_test_packet("sender1", "peer1", 30.0, 500),
base_time,
);
controller.process_diagnostics_packet_with_time(
create_test_packet("sender2", "peer2", 28.0, 500),
base_time + 1100.0,
);
controller.process_diagnostics_packet_with_time(
create_test_packet("sender3", "peer3", 25.0, 500),
base_time + 2200.0,
);
assert_eq!(controller.peer_count(), 3);
controller.process_diagnostics_packet_with_time(
create_test_packet("sender1", "peer1", 29.0, 500),
base_time + 20_000.0,
);
controller.process_diagnostics_packet_with_time(
create_test_packet("sender2", "peer2", 27.0, 500),
base_time + 21_100.0, );
assert_eq!(controller.peer_count(), 3);
controller.process_diagnostics_packet_with_time(
create_test_packet("sender1", "peer1", 29.0, 500),
base_time + 35_000.0,
);
assert_eq!(controller.peer_count(), 2);
assert!(controller.peer_ids().contains(&"peer1".to_string()));
assert!(controller.peer_ids().contains(&"peer2".to_string()));
assert!(!controller.peer_ids().contains(&"peer3".to_string()));
controller.process_diagnostics_packet_with_time(
create_test_packet("sender1", "peer1", 29.0, 500),
base_time + 55_000.0,
);
assert_eq!(controller.peer_count(), 1);
assert!(controller.peer_ids().contains(&"peer1".to_string()));
}
#[wasm_bindgen_test]
fn test_diagnostic_packet_window() {
let mut window = DiagnosticPacketWindow::new(10);
let base_time = 1000.0;
window.add_packet(base_time, create_test_packet("sender1", "peer1", 30.0, 500));
window.add_packet(
base_time + 1000.0,
create_test_packet("sender1", "peer1", 25.0, 500),
);
window.add_packet(
base_time + 2000.0,
create_test_packet("sender1", "peer1", 20.0, 500),
);
assert_eq!(window.len(), 3);
assert_eq!(window.latest_timestamp(), Some(base_time + 2000.0));
assert_eq!(window.min_fps(), Some(20.0));
window.cleanup(base_time + 10500.0);
assert_eq!(window.len(), 2);
window.cleanup(base_time + 15000.0);
assert_eq!(window.len(), 0);
assert_eq!(window.min_fps(), None);
}
#[wasm_bindgen_test]
fn test_different_media_types() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
let result1 = controller.process_diagnostics_packet_with_time(
create_test_packet("sender1", "peer1", 30.0, 500), base_time,
);
assert!(result1.is_some(), "First packet should return a bitrate");
let result2 = controller.process_diagnostics_packet_with_time(
create_test_audio_packet("sender2", "peer2", 40.0, 100), base_time + 1100.0,
);
assert!(result2.is_some(), "Second packet should return a bitrate");
assert_eq!(controller.peer_count(), 2);
let result3 = controller.process_diagnostics_packet_with_time(
create_test_packet("sender3", "peer3", 25.0, 400),
base_time + 2200.0,
);
assert!(result3.is_some(), "Third packet should return a bitrate");
}
#[wasm_bindgen_test]
fn test_bandwidth_drop() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
let good_packet = create_test_packet("good_sender", "peer1", 30.0, 500); let good_result = controller.process_diagnostics_packet_with_time(good_packet, base_time);
assert!(
good_result.is_some(),
"First packet should return a bitrate"
);
let good_bitrate = good_result.unwrap();
for i in 0..5 {
let bad_packet = create_test_packet("poor_sender", "peer2", 5.0, 500); let time = base_time + 1100.0 * (i as f64 + 1.0);
let result = controller.process_diagnostics_packet_with_time(bad_packet, time);
assert!(result.is_some(), "Packet should return a bitrate");
}
let final_packet = create_test_packet("test_sender", "test_peer", 15.0, 500);
let final_result =
controller.process_diagnostics_packet_with_time(final_packet, base_time + 6600.0);
assert!(
final_result.is_some(),
"Final packet should return a bitrate"
);
let poor_bitrate = final_result.unwrap();
assert!(
poor_bitrate < good_bitrate,
"Expected bitrate to decrease when FPS drops. Good: {good_bitrate}, Poor: {poor_bitrate}"
);
let min_bitrate = (ideal_bitrate_kbps) as f64 * 0.1; let max_bitrate = (ideal_bitrate_kbps) as f64 * 1.5; assert!(
poor_bitrate >= min_bitrate,
"Poor bitrate {poor_bitrate} bps should be greater than or equal to minimum bitrate {min_bitrate} bps"
);
assert!(
poor_bitrate <= max_bitrate,
"Poor bitrate {poor_bitrate} bps should be less than or equal to maximum bitrate {max_bitrate} bps"
);
}
#[wasm_bindgen_test]
fn test_calculate_jitter() {
let target_fps = Rc::new(AtomicU32::new(30));
let mut controller = EncoderBitrateController::new(500, target_fps);
assert_eq!(
controller.calculate_jitter(),
0.0,
"Empty history should return 0 jitter"
);
controller.fps_history.push_back(30.0);
assert_eq!(
controller.calculate_jitter(),
0.0,
"Single value should return 0 jitter"
);
controller.fps_history.push_back(30.0);
controller.fps_history.push_back(30.0);
assert_eq!(
controller.calculate_jitter(),
0.0,
"Constant values should have 0 jitter"
);
controller.fps_history.clear();
controller.fps_history.push_back(10.0);
controller.fps_history.push_back(20.0);
controller.fps_history.push_back(10.0);
controller.fps_history.push_back(20.0);
let expected_jitter = 10.0;
let actual_jitter = controller.calculate_jitter();
assert!(
(actual_jitter - expected_jitter).abs() < 0.001,
"Expected jitter {expected_jitter}, got {actual_jitter}"
);
controller.fps_history.clear();
controller.fps_history.push_back(10.0);
controller.fps_history.push_back(12.0);
controller.fps_history.push_back(14.0);
controller.fps_history.push_back(16.0);
let expected_jitter = 2.0;
let actual_jitter = controller.calculate_jitter();
assert!(
(actual_jitter - expected_jitter).abs() < 0.001,
"Expected jitter {expected_jitter}, got {actual_jitter}"
);
}
#[wasm_bindgen_test]
fn test_throttling_basic() {
let target_fps = Rc::new(AtomicU32::new(30));
let ideal_bitrate_kbps = 500;
let mut controller = EncoderBitrateController::new(ideal_bitrate_kbps, target_fps.clone());
let base_time = 1000.0;
let packet1 = create_test_packet("sender1", "peer1", 25.0, 500);
let result1 = controller.process_diagnostics_packet_with_time(packet1, base_time);
assert!(result1.is_some(), "First packet should return a bitrate");
let packet2 = create_test_packet("sender2", "peer2", 20.0, 500);
let result2 = controller.process_diagnostics_packet_with_time(packet2, base_time + 500.0);
assert!(
result2.is_none(),
"Packet within throttle period should not return a bitrate"
);
assert_eq!(controller.peer_count(), 2);
let packet3 = create_test_packet("sender3", "peer3", 15.0, 500);
let result3 = controller.process_diagnostics_packet_with_time(packet3, base_time + 1100.0);
assert!(
result3.is_some(),
"Packet after throttle period should return a bitrate"
);
assert_eq!(controller.peer_count(), 3);
}
}