use std::{cell::RefCell, collections::VecDeque, rc::Rc, task::Waker};
use mediadecode::Timestamp;
use wasm_bindgen::JsValue;
use crate::error::Error;
pub(crate) struct DecodedFrame<F> {
frame: F,
}
impl<F> DecodedFrame<F> {
pub const fn new(frame: F) -> Self {
Self { frame }
}
pub const fn frame(&self) -> &F {
&self.frame
}
pub fn into_frame(self) -> F {
self.frame
}
}
#[derive(Clone, Copy)]
pub(crate) struct PendingOutput {
epoch: u64,
user_pts: Option<Timestamp>,
key: bool,
input_bytes: u32,
expected_samples: u32,
}
impl PendingOutput {
pub const fn new(
epoch: u64,
user_pts: Option<Timestamp>,
key: bool,
input_bytes: u32,
expected_samples: u32,
) -> Self {
Self {
epoch,
user_pts,
key,
input_bytes,
expected_samples,
}
}
pub const fn expected_samples(&self) -> u32 {
self.expected_samples
}
pub const fn epoch(&self) -> u64 {
self.epoch
}
pub const fn user_pts(&self) -> Option<Timestamp> {
self.user_pts
}
pub const fn key(&self) -> bool {
self.key
}
}
pub(crate) struct Inner<F> {
queue: VecDeque<DecodedFrame<F>>,
queue_bytes: u64,
last_error: Option<Error>,
pending_outputs: PendingOutputMap,
next_submission_id: i64,
epoch_id_floor: i64,
pending_input_bytes: u64,
pending_copies: u32,
pending_copy_bytes: u64,
last_measured_frame_bytes: u64,
epoch: u64,
next_output_sequence: u32,
next_push_sequence: u32,
pending_pushes: PendingPushMap<F>,
pending_push_bytes: u64,
receiver_waker: Option<Waker>,
dequeue_waker: Option<Waker>,
}
pub(crate) struct PendingOutputMap {
entries: Vec<(i64, PendingOutput)>,
}
impl PendingOutputMap {
pub fn try_with_capacity(cap: usize) -> Result<Self, Error> {
let mut entries = Vec::new();
entries
.try_reserve_exact(cap)
.map_err(|_| Error::from_static("out of memory: pending_outputs"))?;
Ok(Self { entries })
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn insert(&mut self, id: i64, record: PendingOutput) {
debug_assert!(
self.entries.len() < self.entries.capacity()
|| self.entries.binary_search_by_key(&id, |(k, _)| *k).is_ok(),
"PendingOutputMap::insert at capacity would reallocate"
);
match self.entries.binary_search_by_key(&id, |(k, _)| *k) {
Ok(idx) => self.entries[idx].1 = record,
Err(idx) => self.entries.insert(idx, (id, record)),
}
}
pub fn remove(&mut self, id: i64) -> Option<PendingOutput> {
let idx = self.entries.binary_search_by_key(&id, |(k, _)| *k).ok()?;
Some(self.entries.remove(idx).1)
}
pub fn pop_first(&mut self) -> Option<(i64, PendingOutput)> {
if self.entries.is_empty() {
None
} else {
Some(self.entries.remove(0))
}
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
pub(crate) struct PendingPushMap<F> {
entries: Vec<(u32, PendingPush<F>)>,
}
impl<F> PendingPushMap<F> {
pub fn try_with_capacity(cap: usize) -> Result<Self, Error> {
let mut entries = Vec::new();
entries
.try_reserve_exact(cap)
.map_err(|_| Error::from_static("out of memory: pending_pushes"))?;
Ok(Self { entries })
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn try_insert(&mut self, sequence: u32, push: PendingPush<F>) -> Result<(), PendingPush<F>> {
match self.entries.binary_search_by_key(&sequence, |(k, _)| *k) {
Ok(idx) => {
self.entries[idx].1 = push;
Ok(())
}
Err(idx) => {
if self.entries.len() >= self.entries.capacity() {
return Err(push);
}
self.entries.insert(idx, (sequence, push));
Ok(())
}
}
}
pub fn remove(&mut self, sequence: u32) -> Option<PendingPush<F>> {
let idx = self
.entries
.binary_search_by_key(&sequence, |(k, _)| *k)
.ok()?;
Some(self.entries.remove(idx).1)
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
pub(crate) enum PendingPush<F> {
Ready(DecodedFrame<F>, u64),
Skipped,
}
impl<F> Inner<F> {
fn try_new(max_queue: usize) -> Result<Self, Error> {
let mut queue = VecDeque::new();
queue
.try_reserve_exact(max_queue)
.map_err(|_| Error::from_static("out of memory: decoded-frame queue"))?;
Ok(Self {
queue,
queue_bytes: 0,
last_error: None,
pending_outputs: PendingOutputMap::try_with_capacity(MAX_PENDING_OUTPUTS)?,
pending_input_bytes: 0,
next_submission_id: BASE_SUBMISSION_ID,
epoch_id_floor: BASE_SUBMISSION_ID,
pending_copies: 0,
pending_copy_bytes: 0,
last_measured_frame_bytes: 0,
epoch: 0,
next_output_sequence: 0,
next_push_sequence: 0,
pending_pushes: PendingPushMap::try_with_capacity(MAX_PENDING_PUSHES)?,
pending_push_bytes: 0,
receiver_waker: None,
dequeue_waker: None,
})
}
pub fn queue_len(&self) -> usize {
self.queue.len()
}
pub fn queue_is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn push_queue(
&mut self,
frame: DecodedFrame<F>,
byte_size: u64,
) -> Result<(), DecodedFrame<F>> {
if self.queue.len() >= self.queue.capacity() {
return Err(frame);
}
self.queue_bytes = self.queue_bytes.saturating_add(byte_size);
self.queue.push_back(frame);
Ok(())
}
pub fn pop_queue(&mut self, byte_size: u64) -> Option<DecodedFrame<F>> {
let frame = self.queue.pop_front()?;
self.queue_bytes = self.queue_bytes.saturating_sub(byte_size);
Some(frame)
}
pub fn peek_queue_head(&self) -> Option<&F> {
self.queue.front().map(DecodedFrame::frame)
}
pub const fn queue_bytes(&self) -> u64 {
self.queue_bytes
}
pub const fn is_closed(&self) -> bool {
self.last_error.is_some()
}
pub fn last_error_clone(&self) -> Option<Error> {
self.last_error.clone()
}
pub fn clear_last_error(&mut self) {
self.last_error = None;
}
pub fn next_submission_id(&mut self) -> i64 {
let id = self.next_submission_id;
self.next_submission_id = self.next_submission_id.wrapping_add(1);
id
}
pub fn insert_pending_output(&mut self, id: i64, record: PendingOutput) -> Result<(), Error> {
if self.pending_outputs.len() >= MAX_PENDING_OUTPUTS {
let err = Error::from_js(JsValue::from_str(
"WebCodecs pending_outputs reached MAX_PENDING_OUTPUTS; \
decoder is producing far fewer outputs than inputs",
));
self.record_close(err.clone());
return Err(err);
}
self.pending_input_bytes = self
.pending_input_bytes
.saturating_add(record.input_bytes as u64);
self.pending_outputs.insert(id, record);
Ok(())
}
pub fn remove_pending_output(&mut self, id: i64) -> Option<PendingOutput> {
let record = self.pending_outputs.remove(id)?;
self.pending_input_bytes = self
.pending_input_bytes
.saturating_sub(record.input_bytes as u64);
Some(record)
}
pub fn pop_oldest_pending_output(&mut self) -> Option<(i64, PendingOutput)> {
let (id, record) = self.pending_outputs.pop_first()?;
self.pending_input_bytes = self
.pending_input_bytes
.saturating_sub(record.input_bytes as u64);
Some((id, record))
}
pub const fn pending_input_bytes(&self) -> u64 {
self.pending_input_bytes
}
pub const fn pending_copies(&self) -> u32 {
self.pending_copies
}
pub const fn pending_copy_bytes(&self) -> u64 {
self.pending_copy_bytes
}
pub const fn pending_push_bytes(&self) -> u64 {
self.pending_push_bytes
}
pub fn pending_pushes_len(&self) -> usize {
self.pending_pushes.len()
}
pub fn add_pending_copy(&mut self, byte_estimate: u64) {
self.pending_copies = self.pending_copies.saturating_add(1);
self.pending_copy_bytes = self.pending_copy_bytes.saturating_add(byte_estimate);
self.last_measured_frame_bytes = byte_estimate;
}
pub const fn last_measured_frame_bytes(&self) -> u64 {
self.last_measured_frame_bytes
}
pub const fn epoch_id_floor(&self) -> i64 {
self.epoch_id_floor
}
pub fn sub_pending_copy(&mut self, byte_estimate: u64) {
self.pending_copies = self.pending_copies.saturating_sub(1);
self.pending_copy_bytes = self.pending_copy_bytes.saturating_sub(byte_estimate);
}
pub const fn epoch(&self) -> u64 {
self.epoch
}
pub fn set_receiver_waker(&mut self, waker: Waker) {
self.receiver_waker = Some(waker);
}
pub fn set_dequeue_waker(&mut self, waker: Waker) {
self.dequeue_waker = Some(waker);
}
pub fn clear_dequeue_waker(&mut self) {
self.dequeue_waker = None;
}
pub fn clear_pending_outputs(&mut self) {
self.pending_outputs.clear();
self.pending_input_bytes = 0;
}
pub fn record_close(&mut self, err: Error) -> bool {
let just_closed = self.last_error.is_none();
self.last_error.get_or_insert(err);
self.queue.clear();
self.queue_bytes = 0;
self.pending_outputs.clear();
self.pending_input_bytes = 0;
self.pending_pushes.clear();
self.pending_push_bytes = 0;
self.next_output_sequence = 0;
self.next_push_sequence = 0;
just_closed
}
}
pub const MAX_PENDING_OUTPUTS: usize = 256;
pub const MAX_PENDING_PUSHES: usize = 32;
pub const BASE_SUBMISSION_ID: i64 = 1 << 50;
#[derive(Default, Clone)]
pub(crate) enum CloseHookTarget {
#[default]
None,
Video(web_sys::VideoDecoder),
Audio(web_sys::AudioDecoder),
}
impl CloseHookTarget {
fn fire(&self) {
match self {
Self::None => {}
Self::Video(d) => {
let _ = d.reset();
}
Self::Audio(d) => {
let _ = d.reset();
}
}
}
}
pub(crate) struct SharedState<F> {
inner: Rc<RefCell<Inner<F>>>,
close_hook: Rc<RefCell<CloseHookTarget>>,
}
impl<F> SharedState<F> {
pub fn try_new(max_queue: usize) -> Result<Self, Error> {
Ok(Self {
inner: Rc::new(RefCell::new(Inner::try_new(max_queue)?)),
close_hook: Rc::new(RefCell::new(CloseHookTarget::None)),
})
}
pub fn set_close_hook_video(&self, decoder: web_sys::VideoDecoder) {
*self.close_hook.borrow_mut() = CloseHookTarget::Video(decoder);
}
pub fn set_close_hook_audio(&self, decoder: web_sys::AudioDecoder) {
*self.close_hook.borrow_mut() = CloseHookTarget::Audio(decoder);
}
pub fn invoke_close_hook(&self) {
self.close_hook.borrow().fire();
}
pub fn clear_close_hook(&self) {
*self.close_hook.borrow_mut() = CloseHookTarget::None;
}
pub fn borrow(&self) -> std::cell::Ref<'_, Inner<F>> {
self.inner.borrow()
}
pub fn borrow_mut(&self) -> std::cell::RefMut<'_, Inner<F>> {
self.inner.borrow_mut()
}
pub fn epoch(&self) -> u64 {
self.inner.borrow().epoch
}
pub fn bump_epoch(&self) -> u64 {
let mut inner = self.inner.borrow_mut();
inner.epoch = inner.epoch.wrapping_add(1);
inner.queue.clear();
inner.queue_bytes = 0;
inner.pending_outputs.clear();
inner.pending_input_bytes = 0;
inner.pending_pushes.clear();
inner.pending_push_bytes = 0;
inner.next_output_sequence = 0;
inner.next_push_sequence = 0;
inner.epoch_id_floor = inner.next_submission_id;
inner.epoch
}
pub fn allocate_output_sequence(&self) -> u32 {
let mut inner = self.inner.borrow_mut();
let s = inner.next_output_sequence;
inner.next_output_sequence = inner.next_output_sequence.wrapping_add(1);
s
}
pub fn deliver_pending_push(&self, sequence: u32, push: PendingPush<F>) {
let (pushed_any, overflow_close): (bool, Option<bool>) = {
let mut inner = self.inner.borrow_mut();
if inner.is_closed() {
(false, None)
} else {
let added_bytes = match &push {
PendingPush::Ready(_, byte_size) => *byte_size,
PendingPush::Skipped => 0,
};
inner.pending_push_bytes = inner.pending_push_bytes.saturating_add(added_bytes);
if let Err(_returned) = inner.pending_pushes.try_insert(sequence, push) {
inner.pending_push_bytes = inner.pending_push_bytes.saturating_sub(added_bytes);
let err = Error::from_static(
"WebCodecs pending_pushes reached capacity; \
admission gate failed to bound reorder buffer",
);
let just_closed = inner.record_close(err);
drop(inner);
if just_closed {
self.invoke_close_hook();
}
self.wake_all();
return;
}
let mut pushed = false;
let mut queue_overflow: Option<bool> = None;
loop {
let next = inner.next_push_sequence;
let Some(entry) = inner.pending_pushes.remove(next) else {
break;
};
match entry {
PendingPush::Ready(frame, byte_size) => {
inner.pending_push_bytes = inner.pending_push_bytes.saturating_sub(byte_size);
if let Err(_returned) = inner.push_queue(frame, byte_size) {
let err = Error::from_static(
"WebCodecs decoded-frame queue reached capacity; \
admission gate failed to bound the output queue",
);
queue_overflow = Some(inner.record_close(err));
break;
}
pushed = true;
}
PendingPush::Skipped => {}
}
inner.next_push_sequence = next.wrapping_add(1);
}
(pushed, queue_overflow)
}
};
if let Some(just_closed) = overflow_close {
if just_closed {
self.invoke_close_hook();
}
self.wake_all();
return;
}
if pushed_any {
self.wake_receiver();
}
}
fn wake_receiver(&self) {
let waker = self.inner.borrow_mut().receiver_waker.take();
if let Some(w) = waker {
w.wake();
}
}
pub fn wake_dequeue(&self) {
let waker = self.inner.borrow_mut().dequeue_waker.take();
if let Some(w) = waker {
w.wake();
}
}
pub fn wake_all(&self) {
let (rx, dq) = {
let mut inner = self.inner.borrow_mut();
(inner.receiver_waker.take(), inner.dequeue_waker.take())
};
if let Some(w) = rx {
w.wake();
}
if let Some(w) = dq {
w.wake();
}
}
}
impl<F> Clone for SharedState<F> {
fn clone(&self) -> Self {
Self {
inner: Rc::clone(&self.inner),
close_hook: Rc::clone(&self.close_hook),
}
}
}
pub(crate) struct ReceiverWakerGuard<'a, F> {
state: &'a SharedState<F>,
}
impl<F> Drop for ReceiverWakerGuard<'_, F> {
fn drop(&mut self) {
self.state.inner.borrow_mut().receiver_waker = None;
}
}
pub(crate) struct DequeueWakerGuard<'a, F> {
state: &'a SharedState<F>,
}
impl<F> Drop for DequeueWakerGuard<'_, F> {
fn drop(&mut self) {
self.state.inner.borrow_mut().dequeue_waker = None;
}
}
impl<F> SharedState<F> {
pub fn receiver_waker_guard(&self) -> ReceiverWakerGuard<'_, F> {
ReceiverWakerGuard { state: self }
}
pub fn dequeue_waker_guard(&self) -> DequeueWakerGuard<'_, F> {
DequeueWakerGuard { state: self }
}
}