use std::collections::VecDeque;
use thiserror::Error;
use crate::encoder::CodedBitstreamBuffer;
use crate::encoder::EncodeError;
use crate::encoder::EncodeResult;
use crate::encoder::FrameMetadata;
use crate::encoder::Tunings;
use crate::encoder::VideoEncoder;
use crate::BlockingMode;
#[cfg(feature = "vaapi")]
pub mod av1;
#[cfg(feature = "vaapi")]
pub mod h264;
#[cfg(feature = "vaapi")]
pub(crate) mod predictor;
#[cfg(feature = "vaapi")]
pub mod vp9;
#[derive(Error, Debug)]
pub enum StatelessBackendError {
#[error("unsupported profile")]
UnsupportedProfile,
#[error("unsupported format")]
UnsupportedFormat,
#[error("not enough resources to proceed with the operation now")]
OutOfResources,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub type StatelessBackendResult<T> = Result<T, StatelessBackendError>;
pub trait BackendPromise {
type Output;
fn sync(self) -> StatelessBackendResult<Self::Output>;
fn is_ready(&self) -> bool;
}
pub struct ReadyPromise<T>(T);
impl<T> From<T> for ReadyPromise<T> {
fn from(value: T) -> Self {
ReadyPromise(value)
}
}
impl<T> BackendPromise for ReadyPromise<T> {
type Output = T;
fn sync(self) -> StatelessBackendResult<Self::Output> {
Ok(self.0)
}
fn is_ready(&self) -> bool {
true
}
}
pub struct BitstreamPromise<P>
where
P: BackendPromise<Output = Vec<u8>>,
{
bitstream: P,
meta: FrameMetadata,
}
impl<P> BackendPromise for BitstreamPromise<P>
where
P: BackendPromise<Output = Vec<u8>>,
{
type Output = CodedBitstreamBuffer;
fn is_ready(&self) -> bool {
self.bitstream.is_ready()
}
fn sync(self) -> StatelessBackendResult<Self::Output> {
let coded_data = self.bitstream.sync()?;
log::trace!("synced bitstream size={}", coded_data.len());
Ok(CodedBitstreamBuffer::new(self.meta, coded_data))
}
}
pub(crate) struct OutputQueue<O>
where
O: BackendPromise,
{
blocking: BlockingMode,
promises: VecDeque<O>,
}
impl<O> OutputQueue<O>
where
O: BackendPromise,
{
#[allow(dead_code)]
pub(crate) fn new(blocking: BlockingMode) -> Self {
Self { blocking, promises: Default::default() }
}
#[allow(dead_code)]
pub(crate) fn add_promise(&mut self, pending: O) {
self.promises.push_back(pending);
}
pub(crate) fn poll(&mut self, mode: BlockingMode) -> StatelessBackendResult<Option<O::Output>> {
let block = self.blocking == BlockingMode::Blocking || mode == BlockingMode::Blocking;
match self.promises.pop_front() {
Some(o) if block || o.is_ready() => Ok(Some(o.sync()?)),
Some(o) => {
self.promises.push_front(o);
Ok(None)
}
None => Ok(None),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.promises.is_empty()
}
}
pub(super) trait Predictor<Picture, Reference, Request> {
fn new_frame(
&mut self,
backend_pic: Picture,
meta: FrameMetadata,
) -> EncodeResult<Vec<Request>>;
fn reconstructed(&mut self, recon: Reference) -> EncodeResult<Vec<Request>>;
fn tune(&mut self, tunings: Tunings) -> EncodeResult<()>;
fn drain(&mut self) -> EncodeResult<Vec<Request>>;
}
pub trait StatelessVideoEncoderBackend<Codec>: Sized
where
Codec: StatelessCodec<Self>,
{
type Picture: 'static;
type Reconstructed: 'static;
type CodedPromise: BackendPromise<Output = Vec<u8>>;
type ReconPromise: BackendPromise<Output = Self::Reconstructed>;
}
pub trait StatelessEncoderBackendImport<Handle, Picture> {
fn import_picture(
&mut self,
metadata: &FrameMetadata,
handle: Handle,
) -> StatelessBackendResult<Picture>;
}
pub trait StatelessCodec<Backend>: Sized
where
Backend: StatelessVideoEncoderBackend<Self>,
{
type Reference;
type Request;
type CodedPromise: BackendPromise<Output = CodedBitstreamBuffer>;
type ReferencePromise: BackendPromise<Output = Self::Reference>;
}
type Picture<C, B> = <B as StatelessVideoEncoderBackend<C>>::Picture;
type Reference<C, B> = <C as StatelessCodec<B>>::Reference;
type Request<C, B> = <C as StatelessCodec<B>>::Request;
type CodedPromise<C, B> = <C as StatelessCodec<B>>::CodedPromise;
type ReferencePromise<C, B> = <C as StatelessCodec<B>>::ReferencePromise;
type BoxPredictor<C, B> = Box<dyn Predictor<Picture<C, B>, Reference<C, B>, Request<C, B>>>;
pub struct StatelessEncoder<Codec, Handle, Backend>
where
Backend: StatelessVideoEncoderBackend<Codec>,
Codec: StatelessCodec<Backend>,
{
output_queue: OutputQueue<CodedPromise<Codec, Backend>>,
recon_queue: OutputQueue<ReferencePromise<Codec, Backend>>,
predictor: BoxPredictor<Codec, Backend>,
coded_queue: VecDeque<CodedBitstreamBuffer>,
predictor_frame_count: usize,
backend: Backend,
_phantom: std::marker::PhantomData<Handle>,
}
pub trait StatelessEncoderExecute<Codec, Handle, Backend>
where
Backend: StatelessVideoEncoderBackend<Codec>,
Codec: StatelessCodec<Backend>,
{
fn execute(&mut self, request: Request<Codec, Backend>) -> EncodeResult<()>;
}
impl<Codec, Handle, Backend> StatelessEncoder<Codec, Handle, Backend>
where
Codec: StatelessCodec<Backend>,
Backend: StatelessVideoEncoderBackend<Codec>,
Self: StatelessEncoderExecute<Codec, Handle, Backend>,
{
#[allow(dead_code)]
fn new(
backend: Backend,
mode: BlockingMode,
predictor: BoxPredictor<Codec, Backend>,
) -> EncodeResult<Self> {
Ok(Self {
backend,
predictor,
predictor_frame_count: 0,
coded_queue: Default::default(),
output_queue: OutputQueue::new(mode),
recon_queue: OutputQueue::new(mode),
_phantom: Default::default(),
})
}
fn poll_pending(&mut self, mode: BlockingMode) -> EncodeResult<()> {
while let Some(coded) = self.output_queue.poll(mode)? {
self.coded_queue.push_back(coded);
}
while let Some(recon) = self.recon_queue.poll(mode)? {
let requests = self.predictor.reconstructed(recon)?;
if requests.is_empty() {
break;
}
for request in requests {
self.execute(request)?;
}
}
Ok(())
}
}
impl<Codec, Handle, Backend> VideoEncoder<Handle> for StatelessEncoder<Codec, Handle, Backend>
where
Codec: StatelessCodec<Backend>,
Backend: StatelessVideoEncoderBackend<Codec>,
Backend: StatelessEncoderBackendImport<Handle, Backend::Picture>,
Self: StatelessEncoderExecute<Codec, Handle, Backend>,
{
fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
self.predictor.tune(tunings)
}
fn encode(&mut self, metadata: FrameMetadata, handle: Handle) -> EncodeResult<()> {
log::trace!("encode: timestamp={} layout={:?}", metadata.timestamp, metadata.layout);
let backend_pic = self.backend.import_picture(&metadata, handle)?;
self.predictor_frame_count += 1;
let requests = self.predictor.new_frame(backend_pic, metadata)?;
for request in requests {
self.execute(request)?;
}
Ok(())
}
fn drain(&mut self) -> EncodeResult<()> {
log::trace!("currently predictor holds {}", self.predictor_frame_count);
while self.predictor_frame_count > 0 || !self.recon_queue.is_empty() {
if self.output_queue.is_empty() && self.recon_queue.is_empty() {
let requests = self.predictor.drain()?;
if requests.is_empty() {
log::error!("failed to drain predictor, no request was returned");
return Err(EncodeError::InvalidInternalState);
}
for request in requests {
self.execute(request)?;
}
}
self.poll_pending(BlockingMode::Blocking)?;
}
while !self.output_queue.is_empty() {
self.poll_pending(BlockingMode::Blocking)?;
}
Ok(())
}
fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
self.poll_pending(BlockingMode::NonBlocking)?;
Ok(self.coded_queue.pop_front())
}
}