cros-codecs 0.0.6

Hardware-accelerated codecs for Linux
Documentation
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use std::collections::BTreeSet;
use std::collections::VecDeque;

use thiserror::Error;

use crate::encoder::CodedBitstreamBuffer;
use crate::encoder::EncodeError;
use crate::encoder::EncodeResult;
use crate::encoder::FrameMetadata;
use crate::encoder::Tunings;
use crate::encoder::VideoEncoder;

pub mod h264;
pub mod h265;
pub mod vp8;
pub mod vp9;

#[derive(Debug, Error)]
pub enum StatefulBackendError {
    #[error("invalid internal state. This is likely a bug.")]
    InvalidInternalState,
    #[error(transparent)]
    Other(#[from] anyhow::Error),
}

pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;

/// Unique identifier of the [`BackendRequest`]
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct BackendRequestId(usize);

/// Request package that is offered to [`StatefulVideoEncoderBackend`] for processing
pub struct BackendRequest<Handle> {
    /// Request's unique identifier
    pub request_id: BackendRequestId,
    /// Frame's metadata
    pub meta: FrameMetadata,
    /// Frame's handle
    pub handle: Handle,
    /// Tunings set for the request
    pub tunings: Tunings,
}

pub struct BackendOutput {
    /// Request's unique identifier corresponding to [`BackendRequest`]
    pub request_id: BackendRequestId,
    /// Result of the request. [`CodedBitstreamBuffer`] containing encoded frame
    pub buffer: CodedBitstreamBuffer,
}

/// Generic trait for stateful encoder backends
pub trait StatefulVideoEncoderBackend<Handle> {
    /// Try to submit encode request to the backend. The backend may not be able to accept the
    /// request eg. if there are not enough available resources or backend desires to finish
    /// previous request first. The function shall not be blocking.
    /// If backend accepts the request for processing it shall take the `request` (take ownership of
    /// [`BackendRequest`] and set ref mut to [`None`].
    fn consume_request(
        &mut self,
        request: &mut Option<BackendRequest<Handle>>,
    ) -> StatefulBackendResult<()>;

    /// Function shall block, until the backend can accept request with [`consume_request`] or
    /// will finished processing of some [`BackendRequest`] and [`poll`] can be used to
    /// fetch is result.
    ///
    /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
    /// [`poll`]: StatefulVideoEncoderBackend::poll
    fn sync(&mut self) -> StatefulBackendResult<()>;

    /// Blocking function, until the backend finishes processing all [`BackendRequest`], that the
    /// backend has accepted and all outputs of those requests are returned.
    fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;

    /// If the processing of any [`BackendRequest`] is finished then the function should yield it's
    /// corresponding [`BackendOutput`].
    ///
    /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
    fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
}

pub struct StatefulEncoder<Handle, Backend>
where
    Backend: StatefulVideoEncoderBackend<Handle>,
{
    /// Pending queue of frames to encoded by the backend
    queue: VecDeque<BackendRequest<Handle>>,

    /// Unique request identifier continue
    request_counter: usize,

    /// Latest [`Tunings`], that will be cloned in to request
    tunings: Tunings,

    /// Processed encoded bitstream queue for client to poll
    coded_queue: VecDeque<CodedBitstreamBuffer>,

    /// Currently processed requests by the backend
    processing: BTreeSet<BackendRequestId>,

    // [`StatefulVideoEncoderBackend`] instance to delegate [`BackendRequest`] to
    backend: Backend,
}

impl<Handle, Backend> StatefulEncoder<Handle, Backend>
where
    Backend: StatefulVideoEncoderBackend<Handle>,
{
    /// Utility function that creates an new [`StatefulEncoder`] with [`Tunings`] and
    /// [`StatefulVideoEncoderBackend`] instance.
    #[allow(dead_code)]
    fn create(tunings: Tunings, backend: Backend) -> Self {
        Self {
            queue: Default::default(),
            request_counter: 0,
            tunings,
            coded_queue: Default::default(),
            processing: Default::default(),
            backend,
        }
    }

    /// Handles the [`BackendOutput`] from the backend, ie add to the queue for client to poll.
    fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
        log::debug!(
            "Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
            output.request_id,
            output.buffer.metadata.timestamp,
            output.buffer.bitstream.len()
        );
        if !self.processing.remove(&output.request_id) {
            log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
                output.request_id,
                output.buffer.metadata.timestamp,
            );
        }
        self.coded_queue.push_back(output.buffer);
        Ok(())
    }

    /// Poll the backend for outputs and handles them
    fn poll_backend(&mut self) -> EncodeResult<()> {
        while let Some(output) = self.backend.poll()? {
            self.handle_output(output)?;
        }

        Ok(())
    }

    /// Performs essential processing. Poll the backend for outputs and tries to submit requests to
    /// backends.
    fn process(&mut self) -> EncodeResult<()> {
        log::debug!(
            "Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
            self.queue.len(),
            self.processing,
            self.coded_queue.len()
        );

        if !self.processing.is_empty() {
            self.poll_backend()?;
        }

        while let Some(request) = self.queue.pop_front() {
            let request_id = request.request_id;
            let timestamp = request.meta.timestamp;
            let mut request = Some(request);

            log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
            self.backend.consume_request(&mut request)?;

            if let Some(request) = request {
                log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
                self.queue.push_front(request);
                break;
            } else {
                log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
                self.processing.insert(request_id);
            }
        }

        Ok(())
    }

    /// [`StatefulVideoEncoderBackend`]'s instance
    pub fn backend(&mut self) -> &Backend {
        &self.backend
    }
}

impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
where
    Backend: StatefulVideoEncoderBackend<Handle>,
{
    fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
        self.tunings = tunings;
        Ok(())
    }

    fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
        let request_id = BackendRequestId(self.request_counter);
        self.request_counter = self.request_counter.wrapping_add(1);

        log::trace!("Got new request id={request_id:?} timestamp={}", meta.timestamp);

        let request = BackendRequest { request_id, meta, handle, tunings: self.tunings.clone() };

        self.queue.push_back(request);
        self.process()?;

        Ok(())
    }

    fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
        if !self.queue.is_empty() || !self.processing.is_empty() {
            self.process()?;
        }

        if let Some(buffer) = self.coded_queue.pop_front() {
            log::debug!("Returning coded buffer timestamp={}", buffer.metadata.timestamp);
            return Ok(Some(buffer));
        }
        Ok(None)
    }

    fn drain(&mut self) -> EncodeResult<()> {
        log::debug!(
            "Got drain request. Pending in queue: {}. Currently processed: {:?}",
            self.queue.len(),
            self.processing
        );

        while !self.queue.is_empty() {
            self.process()?;

            if !self.queue.is_empty() {
                self.backend.sync()?;
            }
        }

        if self.processing.is_empty() {
            log::debug!("Skipping drain request to backend, everything is drained");
        }

        log::debug!("Sending drain request to backend");
        for output in self.backend.drain()? {
            self.handle_output(output)?;
        }

        Ok(())
    }
}