#![no_main]
use std::cell::RefCell;
use videocall_codecs::decoder::{Decodable, DecodedFrame, VideoCodec};
use videocall_codecs::frame::{FrameBuffer, FrameCodec, VideoFrame};
use videocall_codecs::jitter_buffer::JitterBuffer;
use videocall_codecs::messages::{VideoStatsMessage, WorkerMessage};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::{
console, DedicatedWorkerGlobalScope, EncodedVideoChunk, EncodedVideoChunkInit,
EncodedVideoChunkType, VideoDecoder, VideoDecoderConfig, VideoDecoderInit,
VideoFrame as WebVideoFrame,
};
struct WebDecoder {
decoder: RefCell<Option<VideoDecoder>>,
current_codec: RefCell<Option<FrameCodec>>,
self_scope: DedicatedWorkerGlobalScope,
}
unsafe impl Send for WebDecoder {}
unsafe impl Sync for WebDecoder {}
impl WebDecoder {
fn new(self_scope: DedicatedWorkerGlobalScope) -> Self {
Self {
decoder: RefCell::new(None),
current_codec: RefCell::new(None),
self_scope,
}
}
fn initialize_decoder(&self, codec: FrameCodec) -> Result<(), String> {
let codec_str = match codec.as_webcodecs_str() {
Some(s) => s,
None => return Err("Unknown codec - cannot decode".to_string()),
};
let mut decoder_ref = self.decoder.borrow_mut();
let mut codec_ref = self.current_codec.borrow_mut();
if decoder_ref.is_some() && *codec_ref == Some(codec) {
return Ok(());
}
if decoder_ref.is_some() && *codec_ref != Some(codec) {
console::log_1(
&format!(
"[WORKER] Codec changed from {codec_ref:?} to {codec:?}, reconfiguring decoder"
)
.into(),
);
if let Some(decoder) = decoder_ref.take() {
let _ = decoder.close();
}
}
let self_scope = self.self_scope.clone();
let on_output = {
let global_scope = self_scope.clone();
Closure::wrap(Box::new(move |video_frame: JsValue| {
let video_frame = video_frame.dyn_into::<WebVideoFrame>().unwrap();
if let Err(e) = global_scope.post_message(&video_frame) {
console::error_1(
&format!("[WORKER] Error posting decoded frame: {e:?}").into(),
);
}
video_frame.close();
}) as Box<dyn FnMut(_)>)
};
let on_error = Closure::wrap(Box::new(move |e: JsValue| {
console::error_1(&"[WORKER] WebCodecs decoder error:".into());
console::error_1(&e);
}) as Box<dyn FnMut(_)>);
let init = VideoDecoderInit::new(
on_error.as_ref().unchecked_ref(),
on_output.as_ref().unchecked_ref(),
);
let decoder =
VideoDecoder::new(&init).map_err(|e| format!("Failed to create decoder: {e:?}"))?;
console::log_1(&format!("[WORKER] Configuring decoder with codec: {codec_str}").into());
let config = VideoDecoderConfig::new(codec_str);
decoder
.configure(&config)
.map_err(|e| format!("Failed to configure decoder: {e:?}"))?;
on_output.forget();
on_error.forget();
*decoder_ref = Some(decoder);
*codec_ref = Some(codec);
console::log_1(
&format!("[WORKER] WebCodecs decoder initialized with codec: {codec:?}").into(),
);
Ok(())
}
fn destroy_decoder(&self) {
let mut decoder_ref = self.decoder.borrow_mut();
if let Some(decoder) = decoder_ref.take() {
if let Err(e) = decoder.close() {
console::error_1(
&format!("[WORKER] Failed to close decoder cleanly: {e:?}").into(),
);
} else {
console::log_1(&"[WORKER] Video decoder closed".into());
}
console::log_1(&"[WORKER] Video decoder destroyed".into());
}
}
fn reset_pipeline(&self) {
self.destroy_decoder();
let self_scope = self.self_scope.clone();
let cb = Closure::once_into_js(move || {
reset_jitter_buffer();
});
let _ = self_scope
.set_timeout_with_callback_and_timeout_and_arguments_0(cb.as_ref().unchecked_ref(), 0);
}
}
impl Decodable for WebDecoder {
type Frame = DecodedFrame;
fn new(_codec: VideoCodec, _on_decoded_frame: Box<dyn Fn(Self::Frame) + Send + Sync>) -> Self {
panic!("Use WebDecoder::new(self_scope) in worker context");
}
fn decode(&self, frame: FrameBuffer) {
let frame_codec = frame.frame.codec;
if let Err(e) = self.initialize_decoder(frame_codec) {
console::error_1(&format!("[WORKER] Failed to initialize decoder: {e:?}").into());
return;
}
let decoder_ref = self.decoder.borrow();
if let Some(decoder) = decoder_ref.as_ref() {
let chunk_type = match frame.frame.frame_type {
videocall_codecs::frame::FrameType::KeyFrame => EncodedVideoChunkType::Key,
videocall_codecs::frame::FrameType::DeltaFrame => EncodedVideoChunkType::Delta,
};
let data = js_sys::Uint8Array::from(frame.frame.data.as_slice());
let init = EncodedVideoChunkInit::new(&data.into(), frame.frame.timestamp, chunk_type);
match EncodedVideoChunk::new(&init) {
Ok(chunk) => {
if let Err(e) = decoder.decode(&chunk) {
console::error_1(&format!("[WORKER] Decoder error: {e:?}").into());
drop(decoder_ref);
self.reset_pipeline();
}
}
Err(e) => {
console::error_1(&format!("[WORKER] Failed to create chunk: {e:?}").into());
}
}
}
}
}
thread_local! {
static JITTER_BUFFER: RefCell<Option<JitterBuffer<DecodedFrame>>> = const { RefCell::new(None) };
static INTERVAL_ID: RefCell<Option<i32>> = const { RefCell::new(None) };
static CONTEXT_FROM: RefCell<Option<String>> = const { RefCell::new(None) };
static CONTEXT_TO: RefCell<Option<String>> = const { RefCell::new(None) };
static LAST_DIAGNOSTIC_EMIT_MS: RefCell<f64> = const { RefCell::new(0.0) };
}
const JITTER_BUFFER_CHECK_INTERVAL_MS: i32 = 10; const DIAGNOSTIC_EMIT_INTERVAL_MS: f64 = 1000.0;
#[wasm_bindgen(start)]
pub fn main() {
console_error_panic_hook::set_once();
log::set_max_level(log::LevelFilter::Debug);
log::info!("Starting worker decoder with jitter buffer and message handling");
let self_scope = js_sys::global()
.dyn_into::<DedicatedWorkerGlobalScope>()
.unwrap();
let on_message = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
match serde_wasm_bindgen::from_value::<WorkerMessage>(event.data()) {
Ok(message) => handle_worker_message(message),
Err(e) => {
console::error_1(&format!("[WORKER] Failed to deserialize message: {e:?}").into());
}
}
}) as Box<dyn FnMut(_)>);
self_scope.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
on_message.forget();
start_jitter_buffer_interval();
}
fn handle_worker_message(message: WorkerMessage) {
match message {
WorkerMessage::DecodeFrame(frame) => {
insert_frame_to_jitter_buffer(frame);
}
WorkerMessage::Flush => {
console::log_1(&"[WORKER] Flushing jitter buffer and decoder".into());
flush_jitter_buffer();
}
WorkerMessage::Reset => {
console::log_1(&"[WORKER] Resetting jitter buffer and decoder state".into());
reset_jitter_buffer();
}
WorkerMessage::SetContext { from_peer, to_peer } => {
CONTEXT_FROM.with(|f| *f.borrow_mut() = Some(from_peer));
CONTEXT_TO.with(|t| *t.borrow_mut() = Some(to_peer));
console::log_1(&"[WORKER] Set diagnostics context (from_peer,to_peer)".into());
}
}
}
fn insert_frame_to_jitter_buffer(frame: FrameBuffer) {
JITTER_BUFFER.with(|jb_cell| {
let mut jb_opt = jb_cell.borrow_mut();
if jb_opt.is_none() {
match initialize_jitter_buffer() {
Ok(jb) => *jb_opt = Some(jb),
Err(e) => {
console::error_1(
&format!("[WORKER] Failed to initialize jitter buffer: {e:?}").into(),
);
return;
}
}
}
if let Some(jb) = jb_opt.as_mut() {
let video_frame = VideoFrame {
sequence_number: frame.sequence_number(),
frame_type: frame.frame.frame_type,
codec: frame.frame.codec,
data: frame.frame.data.clone(),
timestamp: frame.frame.timestamp,
};
let current_time_ms = js_sys::Date::now() as u128;
jb.insert_frame(video_frame, current_time_ms);
}
});
}
fn start_jitter_buffer_interval() {
let self_scope = js_sys::global()
.dyn_into::<DedicatedWorkerGlobalScope>()
.unwrap();
let interval_callback = Closure::wrap(Box::new(move || {
check_jitter_buffer_for_ready_frames();
}) as Box<dyn FnMut()>);
let interval_id = self_scope
.set_interval_with_callback_and_timeout_and_arguments_0(
interval_callback.as_ref().unchecked_ref(),
JITTER_BUFFER_CHECK_INTERVAL_MS,
)
.expect("Failed to set interval");
interval_callback.forget();
INTERVAL_ID.with(|id_cell| {
*id_cell.borrow_mut() = Some(interval_id);
});
console::log_1(
&format!("[WORKER] Started jitter buffer check interval with {JITTER_BUFFER_CHECK_INTERVAL_MS}ms interval")
.into(),
);
}
fn check_jitter_buffer_for_ready_frames() {
JITTER_BUFFER.with(|jb_cell| {
let mut jb_opt = jb_cell.borrow_mut();
if let Some(jb) = jb_opt.as_mut() {
let current_time_ms = js_sys::Date::now() as u128;
jb.find_and_move_continuous_frames(current_time_ms);
let buffered = jb.buffered_frames_len() as u64;
#[cfg(feature = "wasm")]
{
use videocall_diagnostics::{global_sender, metric, now_ms, DiagEvent};
CONTEXT_FROM.with(|from_cell| {
CONTEXT_TO.with(|to_cell| {
LAST_DIAGNOSTIC_EMIT_MS.with(|last_emit_cell| {
if let (Some(from_peer), Some(to_peer)) =
(from_cell.borrow().clone(), to_cell.borrow().clone())
{
let now = js_sys::Date::now();
let last_emit = *last_emit_cell.borrow();
if now - last_emit >= DIAGNOSTIC_EMIT_INTERVAL_MS {
*last_emit_cell.borrow_mut() = now;
let evt = DiagEvent {
subsystem: "video",
stream_id: None,
ts_ms: now_ms(),
metrics: vec![
metric!("from_peer", from_peer.clone()),
metric!("to_peer", to_peer.clone()),
metric!("frames_buffered", buffered),
],
};
let _ = global_sender().try_broadcast(evt);
if let Ok(scope) =
js_sys::global().dyn_into::<DedicatedWorkerGlobalScope>()
{
let msg =
VideoStatsMessage::new(from_peer, to_peer, buffered);
if let Ok(val) = serde_wasm_bindgen::to_value(&msg) {
let _ = scope.post_message(&val);
}
}
}
}
})
})
});
}
}
});
}
fn initialize_jitter_buffer() -> Result<JitterBuffer<DecodedFrame>, String> {
let self_scope = js_sys::global()
.dyn_into::<DedicatedWorkerGlobalScope>()
.unwrap();
let web_decoder = WebDecoder::new(self_scope);
let boxed_decoder = Box::new(web_decoder);
console::log_1(&"[WORKER] Initializing jitter buffer with WebCodecs decoder".into());
Ok(JitterBuffer::new(boxed_decoder))
}
fn flush_jitter_buffer() {
JITTER_BUFFER.with(|jb_cell| {
let mut jb_opt = jb_cell.borrow_mut();
if let Some(jb) = jb_opt.as_mut() {
jb.flush();
console::log_1(&"[WORKER] Jitter buffer flushed".into());
} else {
console::log_1(&"[WORKER] No jitter buffer to flush".into());
}
});
}
fn reset_jitter_buffer() {
JITTER_BUFFER.with(|jb_cell| {
*jb_cell.borrow_mut() = None;
});
console::log_1(&"[WORKER] Jitter buffer reset to initial state".into());
}