use std::num::NonZeroU32;
use mediadecode::{
Timebase, Timestamp,
channel::AudioChannelLayout,
frame::{AudioFrame, Plane},
future::local::AudioStreamDecoder,
packet::{AudioPacket, PacketFlags},
};
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen_futures::{JsFuture, spawn_local};
use crate::{
adapter::WebCodecs,
buffer::WebCodecsBuffer,
codec_id::AudioCodecId,
codec_string,
dispatch::{
allocate_value_handler, allocate_void_handler, free_value_handler, free_void_handler,
make_value_trampoline, make_void_trampoline,
},
error::{AudioDecodeError, Error},
extras::{AudioFrameExtra, AudioPacketExtra},
sample_format::SampleFormat,
state::{DecodedFrame, PendingOutput, SharedState},
video::{MAX_INPUT_INFLIGHT_BYTES, MAX_INPUT_PACKET_BYTES},
};
pub const MAX_PENDING_DECODE: u32 = 64;
pub const MAX_FRAME_ALLOCATION_BYTES: u32 = 1024 * 1024;
pub const MAX_INFLIGHT_BYTES: u64 = 64 * 1024 * 1024;
pub const MAX_QUEUED_OUTPUT: u32 = 64;
pub(crate) struct DecodedAudioFrame {
pts: Option<Timestamp>,
duration: Option<Timestamp>,
sample_rate: u32,
nb_samples: u32,
channel_count: u8,
format: SampleFormat,
planes: [(WebCodecsBuffer, u32); 8],
plane_count: u8,
key: bool,
byte_size: u32,
}
impl DecodedAudioFrame {
#[allow(clippy::too_many_arguments)]
pub fn new(
pts: Option<Timestamp>,
duration: Option<Timestamp>,
sample_rate: u32,
nb_samples: u32,
channel_count: u8,
format: SampleFormat,
planes: [(WebCodecsBuffer, u32); 8],
plane_count: u8,
key: bool,
byte_size: u32,
) -> Self {
Self {
pts,
duration,
sample_rate,
nb_samples,
channel_count,
format,
planes,
plane_count,
key,
byte_size,
}
}
pub const fn byte_size(&self) -> u32 {
self.byte_size
}
pub const fn pts(&self) -> Option<Timestamp> {
self.pts
}
pub const fn duration(&self) -> Option<Timestamp> {
self.duration
}
pub const fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub const fn nb_samples(&self) -> u32 {
self.nb_samples
}
pub const fn channel_count(&self) -> u8 {
self.channel_count
}
pub const fn format(&self) -> SampleFormat {
self.format
}
pub fn into_planes(self) -> [(WebCodecsBuffer, u32); 8] {
self.planes
}
pub const fn plane_count(&self) -> u8 {
self.plane_count
}
pub const fn key(&self) -> bool {
self.key
}
}
const MICROS: Timebase = match NonZeroU32::new(1_000_000) {
Some(d) => Timebase::new(1, d),
None => unreachable!(),
};
pub struct WebCodecsAudioStreamDecoder {
decoder: web_sys::AudioDecoder,
config: web_sys::AudioDecoderConfig,
state: SharedState<DecodedAudioFrame>,
time_base: Timebase,
rebases_timestamps: bool,
channel_count: u8,
pcm_bytes_per_sample: Option<u32>,
output_slot_id: u64,
error_slot_id: u64,
dequeue_slot_id: u64,
eof: bool,
}
impl WebCodecsAudioStreamDecoder {
pub fn open(
codec: AudioCodecId,
audio_specific_config: Option<&[u8]>,
sample_rate: u32,
channel_count: u8,
time_base: Timebase,
) -> Result<Self, AudioDecodeError> {
let codec_string = codec_string::for_audio(codec, audio_specific_config)?;
Self::open_with_codec_string(
&codec_string,
audio_specific_config,
sample_rate,
channel_count,
time_base,
)
}
pub fn open_with_codec_string(
codec_string: &str,
description: Option<&[u8]>,
sample_rate: u32,
channel_count: u8,
time_base: Timebase,
) -> Result<Self, AudioDecodeError> {
let state = SharedState::<DecodedAudioFrame>::try_new(
MAX_QUEUED_OUTPUT as usize + MAX_PENDING_DECODE as usize,
)
.map_err(AudioDecodeError::Js)?;
let config = web_sys::AudioDecoderConfig::new(codec_string, channel_count as u32, sample_rate);
if let Some(bytes) = description {
if bytes.len() > crate::video::MAX_CODEC_DESCRIPTION_BYTES {
return Err(AudioDecodeError::Js(Error::from_static(
"codec description exceeds MAX_CODEC_DESCRIPTION_BYTES",
)));
}
let arr = crate::video::try_new_uint8_array(bytes.len() as u32)
.map_err(|e| AudioDecodeError::Js(Error::from_js(e)))?;
arr.copy_from(bytes);
config.set_description_u8_array(&arr);
}
let rebases_timestamps = codec_rebases_timestamps(codec_string);
let pcm_bytes_per_sample = pcm_bytes_per_sample_for(codec_string);
if rebases_timestamps && pcm_bytes_per_sample.is_none() {
return Err(AudioDecodeError::Js(Error::from_js(JsValue::from_str(
&format!(
"audio codec {codec_string:?} rebases timestamps but the adapter \
has no bytes-per-sample mapping for it; cannot safely match \
decoded outputs (rejected at open)"
),
))));
}
let installed = install_audio_decoder(&state, &config, rebases_timestamps)?;
Ok(Self {
decoder: installed.decoder,
config,
state,
time_base,
rebases_timestamps,
channel_count,
pcm_bytes_per_sample,
output_slot_id: installed.output_slot_id,
error_slot_id: installed.error_slot_id,
dequeue_slot_id: installed.dequeue_slot_id,
eof: false,
})
}
pub const fn time_base(&self) -> Timebase {
self.time_base
}
fn check_closed(&self) -> Result<(), AudioDecodeError> {
if let Some(err) = self.state.borrow().last_error_clone() {
return Err(AudioDecodeError::Closed(err));
}
Ok(())
}
async fn await_decode_room(&self) -> Result<(), AudioDecodeError> {
loop {
{
let inner = self.state.borrow();
if let Some(err) = inner.last_error_clone() {
return Err(AudioDecodeError::Closed(err));
}
if inner.queue_len() >= MAX_QUEUED_OUTPUT as usize {
return Err(AudioDecodeError::OutputFull);
}
let in_flight_bytes = inner
.queue_bytes()
.saturating_add(inner.pending_copy_bytes())
.saturating_add(inner.last_measured_frame_bytes());
if in_flight_bytes > MAX_INFLIGHT_BYTES {
return Err(AudioDecodeError::OutputFull);
}
if pending_decode(&self.decoder, &inner) < MAX_PENDING_DECODE as usize {
return Ok(());
}
}
let _guard = self.state.dequeue_waker_guard();
let state = self.state.clone();
let decoder = self.decoder.clone();
core::future::poll_fn(move |cx| {
let (closed, busy, queue_full) = {
let inner = state.borrow();
(
inner.is_closed(),
pending_decode(&decoder, &inner) >= MAX_PENDING_DECODE as usize,
inner.queue_len() >= MAX_QUEUED_OUTPUT as usize,
)
};
if closed || !busy || queue_full {
return core::task::Poll::Ready(());
}
state.borrow_mut().set_dequeue_waker(cx.waker().clone());
let (closed_2, busy_2, queue_full_2) = {
let inner = state.borrow();
(
inner.is_closed(),
pending_decode(&decoder, &inner) >= MAX_PENDING_DECODE as usize,
inner.queue_len() >= MAX_QUEUED_OUTPUT as usize,
)
};
if closed_2 || !busy_2 || queue_full_2 {
state.borrow_mut().clear_dequeue_waker();
core::task::Poll::Ready(())
} else {
core::task::Poll::Pending
}
})
.await;
}
}
}
fn pending_decode(
decoder: &web_sys::AudioDecoder,
inner: &crate::state::Inner<DecodedAudioFrame>,
) -> usize {
(decoder.decode_queue_size() as usize).saturating_add(inner.pending_copies() as usize)
}
impl Drop for WebCodecsAudioStreamDecoder {
fn drop(&mut self) {
self.state.bump_epoch();
self.decoder.set_ondequeue(None);
let _ = self.decoder.close();
self.state.clear_close_hook();
free_value_handler(self.output_slot_id);
free_value_handler(self.error_slot_id);
free_void_handler(self.dequeue_slot_id);
}
}
impl AudioStreamDecoder for WebCodecsAudioStreamDecoder {
type Adapter = WebCodecs;
type Buffer = WebCodecsBuffer;
type Error = AudioDecodeError;
async fn send_packet(
&mut self,
packet: &AudioPacket<AudioPacketExtra, Self::Buffer>,
) -> Result<(), Self::Error> {
self.check_closed()?;
if self.eof {
return Err(AudioDecodeError::AtEof);
}
self.await_decode_room().await?;
let key = packet.flags().contains(PacketFlags::KEY);
let chunk_type = web_sys::EncodedAudioChunkType::Key;
let bytes = packet.data().as_ref();
if bytes.len() > MAX_INPUT_PACKET_BYTES {
let err = Error::from_js(JsValue::from_str(
"encoded audio packet exceeds MAX_INPUT_PACKET_BYTES",
));
self.state.borrow_mut().record_close(err.clone());
self.state.invoke_close_hook();
self.state.wake_all();
return Err(AudioDecodeError::Closed(err));
}
{
let inner = self.state.borrow();
if inner
.pending_input_bytes()
.saturating_add(bytes.len() as u64)
> MAX_INPUT_INFLIGHT_BYTES
{
drop(inner);
let err = Error::from_js(JsValue::from_str(
"encoded audio pending_input_bytes would exceed MAX_INPUT_INFLIGHT_BYTES; \
flush() to recover",
));
self.state.borrow_mut().record_close(err.clone());
self.state.invoke_close_hook();
self.state.wake_all();
return Err(AudioDecodeError::Closed(err));
}
}
let expected_samples = match self.pcm_bytes_per_sample {
Some(bps) if self.channel_count > 0 => {
let denom = bps as usize * self.channel_count as usize;
if bytes.len() % denom != 0 {
return Err(AudioDecodeError::Js(Error::from_static(
"malformed PCM packet: byte length is not a multiple of \
bytes_per_sample × channel_count",
)));
}
u32::try_from(bytes.len() / denom).map_err(|_| {
AudioDecodeError::Js(Error::from_static("PCM packet sample count exceeds u32"))
})?
}
_ => 0,
};
let buf = match crate::video::try_new_uint8_array(bytes.len() as u32) {
Ok(b) => b,
Err(err) => {
let err = Error::from_js(err);
self.state.borrow_mut().record_close(err.clone());
self.state.invoke_close_hook();
self.state.wake_all();
return Err(AudioDecodeError::Closed(err));
}
};
buf.copy_from(bytes);
let submission_id = self.state.borrow_mut().next_submission_id();
let init = web_sys::EncodedAudioChunkInit::new(&buf.into(), 0, chunk_type);
init.set_timestamp_f64(submission_id as f64);
if let Some(d) = packet.duration() {
let duration_us = d.rescale_to(MICROS).pts();
init.set_duration_f64(duration_us as f64);
}
let chunk = web_sys::EncodedAudioChunk::new(&init).map_err(Error::from_js)?;
let insert_res = {
let mut inner = self.state.borrow_mut();
let epoch = inner.epoch();
inner.insert_pending_output(
submission_id,
PendingOutput::new(
epoch,
packet.pts(),
key,
bytes.len() as u32,
expected_samples,
),
)
};
if let Err(err) = insert_res {
self.state.invoke_close_hook();
self.state.wake_all();
return Err(AudioDecodeError::Closed(err));
}
if let Err(e) = self.decoder.decode(&chunk) {
self.state.borrow_mut().remove_pending_output(submission_id);
return Err(AudioDecodeError::Js(Error::from_js(e)));
}
Ok(())
}
async fn receive_frame(
&mut self,
dst: &mut AudioFrame<SampleFormat, AudioChannelLayout, AudioFrameExtra, Self::Buffer>,
) -> Result<(), Self::Error> {
let frame = wait_for_frame(&self.state, &self.decoder, self.eof).await?;
let sample_rate = frame.sample_rate();
let nb_samples = frame.nb_samples();
let channel_count = frame.channel_count();
let format = frame.format();
let plane_count = frame.plane_count();
let key = frame.key();
let pts = frame.pts();
let duration = frame.duration();
let [p0, p1, p2, p3, p4, p5, p6, p7] = frame.into_planes();
let planes: [Plane<WebCodecsBuffer>; 8] = [
Plane::new(p0.0, p0.1),
Plane::new(p1.0, p1.1),
Plane::new(p2.0, p2.1),
Plane::new(p3.0, p3.1),
Plane::new(p4.0, p4.1),
Plane::new(p5.0, p5.1),
Plane::new(p6.0, p6.1),
Plane::new(p7.0, p7.1),
];
*dst = AudioFrame::new(
sample_rate,
nb_samples,
channel_count,
format,
AudioChannelLayout::new(channel_count as u32),
planes,
plane_count,
AudioFrameExtra::new(key),
)
.with_pts(pts)
.with_duration(duration);
Ok(())
}
async fn send_eof(&mut self) -> Result<(), Self::Error> {
self.check_closed()?;
struct EofCancelGuard<F> {
state: SharedState<F>,
completed: bool,
}
impl<F> Drop for EofCancelGuard<F> {
fn drop(&mut self) {
if !self.completed {
self.state.bump_epoch();
self
.state
.borrow_mut()
.record_close(Error::from_js(wasm_bindgen::JsValue::from_str(
"send_eof was cancelled — call flush() to recover",
)));
self.state.invoke_close_hook();
}
self.state.wake_all();
}
}
let mut guard = EofCancelGuard {
state: self.state.clone(),
completed: false,
};
let promise = self.decoder.flush();
JsFuture::from(promise).await.map_err(Error::from_js)?;
{
let mut inner = self.state.borrow_mut();
inner.clear_pending_outputs();
}
self.eof = true;
guard.completed = true;
Ok(())
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.state.bump_epoch();
self.eof = false;
let _ = self.decoder.close();
free_value_handler(self.output_slot_id);
free_value_handler(self.error_slot_id);
free_void_handler(self.dequeue_slot_id);
struct FlushGuard<'a> {
state: &'a SharedState<DecodedAudioFrame>,
completed: bool,
}
impl Drop for FlushGuard<'_> {
fn drop(&mut self) {
if !self.completed {
let just_closed =
self
.state
.borrow_mut()
.record_close(Error::from_js(JsValue::from_str(
"audio flush did not complete (cancelled or rebuild failed); \
decoder is closed",
)));
if just_closed {
self.state.invoke_close_hook();
}
self.state.wake_all();
}
}
}
let mut guard = FlushGuard {
state: &self.state,
completed: false,
};
let installed = install_audio_decoder(&self.state, &self.config, self.rebases_timestamps)?;
self.decoder = installed.decoder;
self.output_slot_id = installed.output_slot_id;
self.error_slot_id = installed.error_slot_id;
self.dequeue_slot_id = installed.dequeue_slot_id;
guard.completed = true;
self.state.borrow_mut().clear_last_error();
self.state.wake_dequeue();
Ok(())
}
}
async fn wait_for_frame(
state: &SharedState<DecodedAudioFrame>,
decoder: &web_sys::AudioDecoder,
eof: bool,
) -> Result<DecodedAudioFrame, AudioDecodeError> {
loop {
let popped = {
let mut inner = state.borrow_mut();
if let Some(err) = inner.last_error_clone() {
return Err(AudioDecodeError::Closed(err));
}
let head_bytes = inner
.peek_queue_head()
.map(|f| f.byte_size() as u64)
.unwrap_or(0);
let frame = inner.pop_queue(head_bytes);
if frame.is_none() {
let active_decode_work = inner.pending_copies() > 0 || decoder.decode_queue_size() > 0;
if !active_decode_work {
if eof {
return Err(AudioDecodeError::Eof);
}
return Err(AudioDecodeError::NoFrameReady);
}
}
frame
};
if let Some(frame) = popped.map(DecodedFrame::into_frame) {
state.wake_dequeue();
return Ok(frame);
}
let _guard = state.receiver_waker_guard();
core::future::poll_fn(|cx| {
let mut inner = state.borrow_mut();
let active_decode_work = inner.pending_copies() > 0 || decoder.decode_queue_size() > 0;
if !inner.queue_is_empty() || inner.is_closed() || !active_decode_work {
core::task::Poll::Ready(())
} else {
inner.set_receiver_waker(cx.waker().clone());
core::task::Poll::Pending
}
})
.await;
}
}
struct AudioCopyAdmission {
state: SharedState<DecodedAudioFrame>,
data: Option<web_sys::AudioData>,
byte_estimate: u64,
armed: bool,
}
impl Drop for AudioCopyAdmission {
fn drop(&mut self) {
if !self.armed {
return;
}
if let Some(d) = self.data.take() {
d.close();
}
self.state.borrow_mut().sub_pending_copy(self.byte_estimate);
self.state.wake_all();
}
}
impl AudioCopyAdmission {
fn disarm(&mut self) -> web_sys::AudioData {
self.armed = false;
self
.data
.take()
.expect("AudioCopyAdmission::disarm called twice")
}
}
async fn handle_audio_data(
mut admission: AudioCopyAdmission,
state: SharedState<DecodedAudioFrame>,
epoch: u64,
record: PendingOutput,
byte_estimate: u64,
) {
let data = admission.disarm();
drop(admission);
let data_guard = JsAudioGuard(Some(data));
if state.epoch() != epoch {
state.borrow_mut().sub_pending_copy(byte_estimate);
state.wake_all();
return;
}
let result = copy_audio_data(data_guard.data(), record.user_pts(), record.key()).await;
drop(data_guard);
enum Outcome {
Pushed,
Errored {
just_closed: bool,
},
Stale,
}
let outcome = {
let mut inner = state.borrow_mut();
inner.sub_pending_copy(byte_estimate);
if inner.epoch() != epoch {
Outcome::Stale
} else if inner.is_closed() {
Outcome::Stale
} else {
match result {
Ok(decoded) => {
let bytes = decoded.byte_size() as u64;
let projected = inner.queue_bytes().saturating_add(bytes);
if projected > MAX_INFLIGHT_BYTES {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(
"audio output queue exceeded MAX_INFLIGHT_BYTES at copy completion",
)));
Outcome::Errored { just_closed }
} else if let Err(_returned) = inner.push_queue(DecodedFrame::new(decoded), bytes) {
let just_closed = inner.record_close(Error::from_static(
"audio decoded-frame queue reached capacity; \
admission gate failed to bound the output queue",
));
Outcome::Errored { just_closed }
} else {
Outcome::Pushed
}
}
Err(err) => {
let just_closed = inner.record_close(err);
Outcome::Errored { just_closed }
}
}
}
};
match outcome {
Outcome::Pushed => state.wake_all(),
Outcome::Errored { just_closed } => {
if just_closed {
state.invoke_close_hook();
}
state.wake_all();
}
Outcome::Stale => state.wake_all(),
}
}
struct JsAudioGuard(Option<web_sys::AudioData>);
impl JsAudioGuard {
fn data(&self) -> &web_sys::AudioData {
self.0.as_ref().expect("audio data already taken")
}
}
impl Drop for JsAudioGuard {
fn drop(&mut self) {
if let Some(data) = self.0.take() {
data.close();
}
}
}
async fn copy_audio_data(
data: &web_sys::AudioData,
user_pts: Option<Timestamp>,
key: bool,
) -> Result<DecodedAudioFrame, Error> {
let format_str = data
.format()
.ok_or_else(|| Error::from_js(JsValue::from_str("AudioData.format is null")))?;
let format = SampleFormat::from_spec_name(audio_sample_format_name(format_str))
.ok_or_else(|| Error::from_js(JsValue::from_str("unknown AudioSampleFormat")))?;
let sample_rate = data.sample_rate() as u32;
let nb_samples = data.number_of_frames();
let channel_count_u32 = data.number_of_channels();
let channel_count: u8 = u8::try_from(channel_count_u32).map_err(|_| {
Error::from_js(JsValue::from_str(&format!(
"AudioData.numberOfChannels = {channel_count_u32} exceeds the 255 the AudioFrame channel_count field encodes"
)))
})?;
if channel_count == 0 {
return Err(Error::from_js(JsValue::from_str(
"AudioData reports zero channels",
)));
}
let pts = user_pts;
let duration = Some(Timestamp::new(data.duration() as i64, MICROS));
let plane_count = if format.is_planar() {
if channel_count as usize > 8 {
return Err(Error::from_js(JsValue::from_str(&format!(
"{channel_count}-channel planar audio exceeds the 8-plane cap of mediadecode::AudioFrame"
))));
}
channel_count as usize
} else {
1
};
let bytes_per_sample = audio_sample_format_bytes(format_str);
if bytes_per_sample == 0 {
return Err(Error::from_js(JsValue::from_str(
"AudioData has unknown bytes-per-sample for plane size validation",
)));
}
let per_plane_samples = nb_samples;
let expected_plane_bytes: u32 = if format.is_planar() {
per_plane_samples.saturating_mul(bytes_per_sample)
} else {
per_plane_samples
.saturating_mul(bytes_per_sample)
.saturating_mul(channel_count_u32)
};
let mut planes: [(WebCodecsBuffer, u32); 8] = [
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
(WebCodecsBuffer::empty(), 0),
];
let mut total_bytes: u32 = 0;
for (plane_index, slot) in planes.iter_mut().enumerate().take(plane_count) {
let opts = web_sys::AudioDataCopyToOptions::new(plane_index as u32);
let plane_size = data.allocation_size(&opts).map_err(Error::from_js)?;
if plane_size != expected_plane_bytes {
return Err(Error::from_js(JsValue::from_str(&format!(
"AudioData plane {plane_index} allocation_size = {plane_size} \
does not match expected {expected_plane_bytes} bytes \
(nb_samples = {nb_samples}, bps = {bytes_per_sample}, \
channels = {channel_count_u32}, planar = {})",
format.is_planar(),
))));
}
total_bytes = total_bytes.saturating_add(plane_size);
if total_bytes > MAX_FRAME_ALLOCATION_BYTES {
return Err(Error::from_js(JsValue::from_str(&format!(
"AudioData total allocation_size > MAX_FRAME_ALLOCATION_BYTES = {MAX_FRAME_ALLOCATION_BYTES}"
))));
}
let size = plane_size as usize;
let mut bytes: Vec<u8> = Vec::new();
bytes
.try_reserve_exact(size)
.map_err(|_| Error::from_static("Rust allocation for AudioData plane copy failed"))?;
bytes.resize(size, 0);
data
.copy_to_with_u8_slice(&mut bytes, &opts)
.map_err(Error::from_js)?;
let stride = size as u32;
*slot = (WebCodecsBuffer::from_bytes(bytes), stride);
}
Ok(DecodedAudioFrame::new(
pts,
duration,
sample_rate,
nb_samples,
channel_count,
format,
planes,
plane_count as u8,
key,
total_bytes,
))
}
fn codec_rebases_timestamps(codec_string: &str) -> bool {
codec_string.starts_with("pcm-") || codec_string == "ulaw" || codec_string == "alaw"
}
fn pcm_bytes_per_sample_for(codec_string: &str) -> Option<u32> {
match codec_string {
"pcm-u8" | "ulaw" | "alaw" => Some(1),
"pcm-s16" => Some(2),
"pcm-s32" | "pcm-f32" => Some(4),
_ => None,
}
}
struct InstalledAudioDecoder {
decoder: web_sys::AudioDecoder,
output_slot_id: u64,
error_slot_id: u64,
dequeue_slot_id: u64,
}
fn install_audio_decoder(
state: &SharedState<DecodedAudioFrame>,
config: &web_sys::AudioDecoderConfig,
rebases_timestamps: bool,
) -> Result<InstalledAudioDecoder, AudioDecodeError> {
let output_handler = make_audio_output_handler(state.clone(), rebases_timestamps);
let error_handler = make_audio_error_handler(state.clone());
let dequeue_handler = make_audio_dequeue_handler(state.clone());
let output_slot_id = allocate_value_handler(output_handler).map_err(AudioDecodeError::Js)?;
let error_slot_id = match allocate_value_handler(error_handler) {
Ok(id) => id,
Err(err) => {
free_value_handler(output_slot_id);
return Err(AudioDecodeError::Js(err));
}
};
let dequeue_slot_id = match allocate_void_handler(dequeue_handler) {
Ok(id) => id,
Err(err) => {
free_value_handler(output_slot_id);
free_value_handler(error_slot_id);
return Err(AudioDecodeError::Js(err));
}
};
let output_trampoline = make_value_trampoline(output_slot_id);
let error_trampoline = make_value_trampoline(error_slot_id);
let dequeue_trampoline = make_void_trampoline(dequeue_slot_id);
let init = web_sys::AudioDecoderInit::new(&error_trampoline, &output_trampoline);
let decoder = match web_sys::AudioDecoder::new(&init) {
Ok(d) => d,
Err(err) => {
free_value_handler(output_slot_id);
free_value_handler(error_slot_id);
free_void_handler(dequeue_slot_id);
return Err(AudioDecodeError::Js(Error::from_js(err)));
}
};
decoder.set_ondequeue(Some(&dequeue_trampoline));
if let Err(err) = decoder.configure(config) {
let _ = decoder.close();
free_value_handler(output_slot_id);
free_value_handler(error_slot_id);
free_void_handler(dequeue_slot_id);
return Err(AudioDecodeError::Js(Error::from_js(err)));
}
state.set_close_hook_audio(decoder.clone());
Ok(InstalledAudioDecoder {
decoder,
output_slot_id,
error_slot_id,
dequeue_slot_id,
})
}
fn make_audio_output_handler(
state: SharedState<DecodedAudioFrame>,
rebases_timestamps: bool,
) -> Box<dyn FnMut(JsValue)> {
Box::new(move |value: JsValue| {
let Ok(data) = value.dyn_into::<web_sys::AudioData>() else {
return;
};
let submission_id = data.timestamp() as i64;
let (resolved, just_closed) = 'budget: {
let mut inner = state.borrow_mut();
if inner.is_closed() {
break 'budget (None, false);
}
let current_epoch = inner.epoch();
let floor = inner.epoch_id_floor();
let mut missing_close: bool = false;
let record_opt = if rebases_timestamps {
let popped = inner.pop_oldest_pending_output();
match popped {
Some((_, record)) if record.epoch() == current_epoch => {
let expected = record.expected_samples();
if expected == 0 {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(
"WebCodecs audio output: rebasing-codec record has no \
expected_samples — cannot validate against multi-output \
split; refusing FIFO match",
)));
break 'budget (None, just_closed);
}
let actual = data.number_of_frames();
if actual != expected {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(&format!(
"WebCodecs audio output: AudioData reported {actual} samples \
but the matching pending chunk encoded {expected} samples \
(suggests a multi-output codec split — refusing to attach \
stale audio to a wrong record)"
))));
break 'budget (None, just_closed);
}
Some(record)
}
Some(_) => None,
None => None,
}
} else {
match inner.remove_pending_output(submission_id) {
Some(record) if record.epoch() == current_epoch => Some(record),
Some(_) => None,
None if submission_id < floor => None,
None => {
missing_close = inner.record_close(Error::from_js(JsValue::from_str(
"WebCodecs audio output: current-generation submission_id \
has no side-map entry (multi-output codec spec deviation)",
)));
None
}
}
};
let Some(record) = record_opt else {
break 'budget (None, missing_close);
};
let measurement = measure_audio_data_bytes(&data);
let total_bytes = match measurement {
Some(t) if t <= MAX_FRAME_ALLOCATION_BYTES => t,
Some(t) => {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(&format!(
"AudioData total allocation_size {t} > \
MAX_FRAME_ALLOCATION_BYTES = {MAX_FRAME_ALLOCATION_BYTES}; \
refusing admission"
))));
break 'budget (None, just_closed);
}
None => {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(
"AudioData allocation_size measurement failed \
(format absent, unknown, zero channels, or planar channel \
count exceeds 8); refusing admission",
)));
break 'budget (None, just_closed);
}
};
let new_frame_bytes: u64 = (total_bytes as u64).saturating_mul(2);
let projected_bytes = inner
.queue_bytes()
.saturating_add(inner.pending_copy_bytes())
.saturating_add(new_frame_bytes);
if inner.pending_copies() >= MAX_PENDING_DECODE {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(
"WebCodecs output burst exceeded MAX_PENDING_DECODE; \
audio frames would be lost",
)));
break 'budget (None, just_closed);
}
if projected_bytes > MAX_INFLIGHT_BYTES {
let just_closed = inner.record_close(Error::from_js(JsValue::from_str(
"WebCodecs output burst would exceed MAX_INFLIGHT_BYTES; \
audio frames would be lost",
)));
break 'budget (None, just_closed);
}
inner.add_pending_copy(new_frame_bytes);
(Some((record, current_epoch, new_frame_bytes)), false)
};
let Some((record, captured_epoch, new_frame_bytes)) = resolved else {
data.close();
if just_closed {
state.invoke_close_hook();
}
state.wake_all();
return;
};
let admission = AudioCopyAdmission {
state: state.clone(),
data: Some(data),
byte_estimate: new_frame_bytes,
armed: true,
};
spawn_local(handle_audio_data(
admission,
state.clone(),
captured_epoch,
record,
new_frame_bytes,
));
})
}
fn make_audio_error_handler(state: SharedState<DecodedAudioFrame>) -> Box<dyn FnMut(JsValue)> {
Box::new(move |value: JsValue| {
state.borrow_mut().record_close(Error::from_js(value));
state.wake_all();
})
}
fn make_audio_dequeue_handler(state: SharedState<DecodedAudioFrame>) -> Box<dyn FnMut()> {
Box::new(move || {
state.wake_all();
})
}
const MAX_PLANAR_CHANNELS_FOR_MEASUREMENT: u32 = 8;
fn measure_audio_data_bytes(data: &web_sys::AudioData) -> Option<u32> {
let format_str = data.format()?;
let format = SampleFormat::from_spec_name(audio_sample_format_name(format_str))?;
let channels = data.number_of_channels();
if channels == 0 {
return None;
}
let plane_count: u32 = if format.is_planar() {
if channels > MAX_PLANAR_CHANNELS_FOR_MEASUREMENT {
return None;
}
channels
} else {
1
};
let mut total: u32 = 0;
for plane in 0..plane_count {
let opts = web_sys::AudioDataCopyToOptions::new(plane);
let plane_size = data.allocation_size(&opts).ok()?;
total = total.checked_add(plane_size)?;
}
Some(total)
}
fn audio_sample_format_bytes(fmt: web_sys::AudioSampleFormat) -> u32 {
use web_sys::AudioSampleFormat as W;
match fmt {
W::U8 | W::U8Planar => 1,
W::S16 | W::S16Planar => 2,
W::S32 | W::S32Planar | W::F32 | W::F32Planar => 4,
_ => 0,
}
}
fn audio_sample_format_name(fmt: web_sys::AudioSampleFormat) -> &'static str {
use web_sys::AudioSampleFormat as W;
match fmt {
W::U8 => "u8",
W::S16 => "s16",
W::S32 => "s32",
W::F32 => "f32",
W::U8Planar => "u8-planar",
W::S16Planar => "s16-planar",
W::S32Planar => "s32-planar",
W::F32Planar => "f32-planar",
_ => "",
}
}