use tokio::sync::mpsc;
use crate::EncodeError;
pub(crate) trait SyncEncoder<F> {
fn push_frame(&mut self, frame: &F) -> Result<(), EncodeError>;
fn drain_and_finish(self) -> Result<(), EncodeError>;
}
enum WorkerMsg<F> {
Frame(F),
Finish,
}
pub(crate) struct AsyncEncoder<F> {
sender: mpsc::Sender<WorkerMsg<F>>,
join_handle: Option<std::thread::JoinHandle<Result<(), EncodeError>>>,
}
impl<F: Send + 'static> AsyncEncoder<F> {
pub(crate) fn new<E>(encoder: E) -> Self
where
E: SyncEncoder<F> + Send + 'static,
{
let (tx, rx) = mpsc::channel::<WorkerMsg<F>>(8);
let handle = std::thread::spawn(move || -> Result<(), EncodeError> {
let mut enc = encoder;
let mut rx = rx;
#[allow(clippy::while_let_loop)]
loop {
match rx.blocking_recv() {
Some(WorkerMsg::Frame(frame)) => enc.push_frame(&frame)?,
Some(WorkerMsg::Finish) | None => break,
}
}
enc.drain_and_finish()
});
Self {
sender: tx,
join_handle: Some(handle),
}
}
pub(crate) async fn push(&mut self, frame: F) -> Result<(), EncodeError> {
self.sender
.send(WorkerMsg::Frame(frame))
.await
.map_err(|_| EncodeError::WorkerPanicked)
}
pub(crate) async fn finish(self) -> Result<(), EncodeError> {
let Self {
sender,
join_handle,
} = self;
sender
.send(WorkerMsg::Finish)
.await
.map_err(|_| EncodeError::WorkerPanicked)?;
drop(sender);
if let Some(handle) = join_handle {
tokio::task::spawn_blocking(move || {
handle.join().map_err(|_| EncodeError::WorkerPanicked)?
})
.await
.map_err(|_| EncodeError::WorkerPanicked)?
} else {
Ok(())
}
}
}