use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use super::{AsyncDecoder, Chunk, Result};
#[cfg(with_dav1d)]
use crate::{VideoDataDescription, decode::FrameResult};
use crate::{Receiver, Sender};
enum Command {
Chunk(Chunk),
Reset(Box<VideoDataDescription>),
Stop,
}
impl re_byte_size::SizeBytes for Command {
fn heap_size_bytes(&self) -> u64 {
match self {
Self::Chunk(chunk) => chunk.heap_size_bytes(),
Self::Reset(video_data_description) => video_data_description.heap_size_bytes(),
Self::Stop => 0,
}
}
}
#[derive(Clone)]
struct Comms {
should_stop: Arc<AtomicBool>,
num_outstanding_resets: Arc<AtomicU64>,
}
impl Default for Comms {
fn default() -> Self {
Self {
should_stop: Arc::new(AtomicBool::new(false)),
num_outstanding_resets: Arc::new(AtomicU64::new(0)),
}
}
}
#[cfg(with_dav1d)]
pub trait SyncDecoder {
fn submit_chunk(
&mut self,
should_stop: &std::sync::atomic::AtomicBool,
chunk: Chunk,
output_sender: &Sender<FrameResult>,
);
fn reset(&mut self, video_data_description: &crate::VideoDataDescription);
}
pub struct AsyncDecoderWrapper {
_thread: std::thread::JoinHandle<()>,
command_tx: Sender<Command>,
comms: Comms,
}
impl AsyncDecoderWrapper {
pub fn new(
debug_name: String,
mut sync_decoder: Box<dyn SyncDecoder + Send>,
output_sender: Sender<FrameResult>,
) -> Self {
re_tracing::profile_function!();
let (command_tx, command_rx) = crate::channel(format!("{debug_name}-channel"));
let comms = Comms::default();
let thread = std::thread::Builder::new()
.name(format!("decoder of {debug_name}"))
.spawn({
let comms = comms.clone();
move || {
econtext::econtext_data!("Video", debug_name.clone());
decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &output_sender);
re_log::debug!("Closing decoder thread for {debug_name}");
}
})
.expect("failed to spawn decoder thread");
Self {
_thread: thread,
command_tx,
comms,
}
}
}
impl AsyncDecoder for AsyncDecoderWrapper {
fn submit_chunk(&mut self, chunk: Chunk) -> Result<()> {
re_tracing::profile_function!();
self.command_tx.send(Command::Chunk(chunk)).ok();
Ok(())
}
fn reset(&mut self, video_data_description: &VideoDataDescription) -> Result<()> {
re_tracing::profile_function!();
self.comms
.num_outstanding_resets
.fetch_add(1, Ordering::Release);
self.command_tx
.send(Command::Reset(Box::new(video_data_description.clone())))
.ok();
Ok(())
}
}
impl Drop for AsyncDecoderWrapper {
fn drop(&mut self) {
re_tracing::profile_function!();
self.comms.should_stop.store(true, Ordering::Release);
self.command_tx.send(Command::Stop).ok();
}
}
fn decoder_thread(
decoder: &mut dyn SyncDecoder,
comms: &Comms,
command_rx: &Receiver<Command>,
output_sender: &Sender<FrameResult>,
) {
while let Ok(command) = command_rx.recv() {
if comms.should_stop.load(Ordering::Acquire) {
re_log::debug!("Should stop");
return;
}
let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire);
match command {
Command::Chunk(chunk) => {
if !has_outstanding_reset {
decoder.submit_chunk(&comms.should_stop, chunk, output_sender);
}
}
Command::Reset(video_data_description) => {
decoder.reset(&video_data_description);
comms.num_outstanding_resets.fetch_sub(1, Ordering::Release);
}
Command::Stop => {
re_log::debug!("Stop");
return;
}
}
}
re_log::debug!("Disconnected");
}