use std::collections::BTreeSet;
use std::collections::VecDeque;
use thiserror::Error;
use crate::encoder::CodedBitstreamBuffer;
use crate::encoder::EncodeError;
use crate::encoder::EncodeResult;
use crate::encoder::FrameMetadata;
use crate::encoder::Tunings;
use crate::encoder::VideoEncoder;
pub mod h264;
pub mod h265;
pub mod vp8;
pub mod vp9;
#[derive(Debug, Error)]
pub enum StatefulBackendError {
#[error("invalid internal state. This is likely a bug.")]
InvalidInternalState,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct BackendRequestId(usize);
pub struct BackendRequest<Handle> {
pub request_id: BackendRequestId,
pub meta: FrameMetadata,
pub handle: Handle,
pub tunings: Tunings,
}
pub struct BackendOutput {
pub request_id: BackendRequestId,
pub buffer: CodedBitstreamBuffer,
}
pub trait StatefulVideoEncoderBackend<Handle> {
fn consume_request(
&mut self,
request: &mut Option<BackendRequest<Handle>>,
) -> StatefulBackendResult<()>;
fn sync(&mut self) -> StatefulBackendResult<()>;
fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;
fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
}
pub struct StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
queue: VecDeque<BackendRequest<Handle>>,
request_counter: usize,
tunings: Tunings,
coded_queue: VecDeque<CodedBitstreamBuffer>,
processing: BTreeSet<BackendRequestId>,
backend: Backend,
}
impl<Handle, Backend> StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
#[allow(dead_code)]
fn create(tunings: Tunings, backend: Backend) -> Self {
Self {
queue: Default::default(),
request_counter: 0,
tunings,
coded_queue: Default::default(),
processing: Default::default(),
backend,
}
}
fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
log::debug!(
"Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
output.request_id,
output.buffer.metadata.timestamp,
output.buffer.bitstream.len()
);
if !self.processing.remove(&output.request_id) {
log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
output.request_id,
output.buffer.metadata.timestamp,
);
}
self.coded_queue.push_back(output.buffer);
Ok(())
}
fn poll_backend(&mut self) -> EncodeResult<()> {
while let Some(output) = self.backend.poll()? {
self.handle_output(output)?;
}
Ok(())
}
fn process(&mut self) -> EncodeResult<()> {
log::debug!(
"Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
self.queue.len(),
self.processing,
self.coded_queue.len()
);
if !self.processing.is_empty() {
self.poll_backend()?;
}
while let Some(request) = self.queue.pop_front() {
let request_id = request.request_id;
let timestamp = request.meta.timestamp;
let mut request = Some(request);
log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
self.backend.consume_request(&mut request)?;
if let Some(request) = request {
log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
self.queue.push_front(request);
break;
} else {
log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
self.processing.insert(request_id);
}
}
Ok(())
}
pub fn backend(&mut self) -> &Backend {
&self.backend
}
}
impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
self.tunings = tunings;
Ok(())
}
fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
let request_id = BackendRequestId(self.request_counter);
self.request_counter = self.request_counter.wrapping_add(1);
log::trace!("Got new request id={request_id:?} timestamp={}", meta.timestamp);
let request = BackendRequest { request_id, meta, handle, tunings: self.tunings.clone() };
self.queue.push_back(request);
self.process()?;
Ok(())
}
fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
if !self.queue.is_empty() || !self.processing.is_empty() {
self.process()?;
}
if let Some(buffer) = self.coded_queue.pop_front() {
log::debug!("Returning coded buffer timestamp={}", buffer.metadata.timestamp);
return Ok(Some(buffer));
}
Ok(None)
}
fn drain(&mut self) -> EncodeResult<()> {
log::debug!(
"Got drain request. Pending in queue: {}. Currently processed: {:?}",
self.queue.len(),
self.processing
);
while !self.queue.is_empty() {
self.process()?;
if !self.queue.is_empty() {
self.backend.sync()?;
}
}
if self.processing.is_empty() {
log::debug!("Skipping drain request to backend, everything is drained");
}
log::debug!("Sending drain request to backend");
for output in self.backend.drain()? {
self.handle_output(output)?;
}
Ok(())
}
}