cros-codecs 0.0.6

Hardware-accelerated codecs for Linux
Documentation
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use nix::errno::Errno;
use nix::sys::eventfd::EfdFlags;
use nix::sys::eventfd::EventFd;

use thiserror::Error;

use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::vec::Vec;

use crate::decoder::StreamInfo;
use crate::video_frame::VideoFrame;
use crate::Fourcc;

pub mod c2_decoder;
pub mod c2_encoder;
#[cfg(feature = "v4l2")]
pub mod c2_v4l2_decoder;
#[cfg(feature = "v4l2")]
pub mod c2_v4l2_encoder;
#[cfg(feature = "vaapi")]
pub mod c2_vaapi_decoder;
#[cfg(feature = "vaapi")]
pub mod c2_vaapi_encoder;

#[derive(Debug, Default, PartialEq, Eq, Copy, Clone)]
pub enum DrainMode {
    // Not draining
    #[default]
    NoDrain = -1,
    // Drain the C2 component and signal an EOS. Currently we also change the state to stop.
    EOSDrain = 0,
    // Drain the C2 component, but keep accepting new jobs in the queue immediately after.
    NoEOSDrain = 1,
}

#[derive(Debug)]
pub struct C2DecodeJob<V: VideoFrame> {
    // Compressed input data
    // TODO: Use VideoFrame for input too
    pub input: Vec<u8>,
    // Decompressed output frame. Note that this needs to be reference counted because we may still
    // use this frame as a reference frame even while we're displaying it.
    pub output: Option<Arc<V>>,
    pub drain: DrainMode,
    // TODO: Add output delay and color aspect support as needed.
}

impl<V> Job for C2DecodeJob<V>
where
    V: VideoFrame,
{
    type Frame = V;

    fn set_drain(&mut self, drain: DrainMode) {
        self.drain = drain;
    }

    fn get_drain(&self) -> DrainMode {
        self.drain
    }
}

impl<V: VideoFrame> Default for C2DecodeJob<V> {
    fn default() -> Self {
        Self { input: vec![], output: None, drain: DrainMode::NoDrain }
    }
}

pub trait Job: Send + 'static {
    type Frame: VideoFrame;

    fn set_drain(&mut self, drain: DrainMode) -> ();
    fn get_drain(&self) -> DrainMode;
}

#[derive(Debug)]
pub struct C2EncodeJob<V: VideoFrame> {
    pub input: Option<V>,
    // TODO: Use VideoFrame for output too
    pub output: Vec<u8>,
    // In microseconds.
    pub timestamp: u64,
    // TODO: only support CBR right now, follow up with VBR support.
    pub bitrate: u64,
    // Framerate is actually negotiated, so the encoder can change this value
    // based on the timestamps of the frames it receives.
    pub framerate: Arc<AtomicU32>,
    pub drain: DrainMode,
}

impl<V: VideoFrame> Default for C2EncodeJob<V> {
    fn default() -> Self {
        Self {
            input: None,
            output: vec![],
            timestamp: 0,
            bitrate: 0,
            framerate: Arc::new(AtomicU32::new(0)),
            drain: DrainMode::NoDrain,
        }
    }
}

impl<V> Job for C2EncodeJob<V>
where
    V: VideoFrame,
{
    type Frame = V;

    fn set_drain(&mut self, drain: DrainMode) {
        self.drain = drain;
    }

    fn get_drain(&self) -> DrainMode {
        self.drain
    }
}

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum C2State {
    C2Running,
    C2Stopped,
    // Note that on state C2Error, stop() must be called before we can start()
    // again.
    C2Error,
}

// This is not a very "Rust-y" way of doing error handling, but it will
// hopefully make the FFI easier to write. Numerical values taken from
// frameworks/av/media/codec2/core/include/C2.h
// TODO: Improve error handling by adding more statuses.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum C2Status {
    C2Ok = 0,
    C2BadState = 1,  // EPERM
    C2BadValue = 22, // EINVAL
}

// J should be either C2DecodeJob or C2EncodeJob.
pub trait C2Worker<J>
where
    J: Send + Job + 'static,
{
    type Options: Clone + Send + 'static;

    fn new(
        input_fourcc: Fourcc,
        output_fourcc: Fourcc,
        awaiting_job_event: Arc<EventFd>,
        error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
        work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
        work_queue: Arc<Mutex<VecDeque<J>>>,
        state: Arc<Mutex<C2State>>,
        framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
        alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
        options: Self::Options,
    ) -> Result<Self, String>
    where
        Self: Sized;

    fn process_loop(&mut self);
}

#[derive(Debug, Error)]
pub enum C2WrapperError {
    #[error("failed to create EventFd for awaiting job event: {0}")]
    AwaitingJobEventFd(Errno),
}

// Note that we do not guarantee thread safety in C2CrosCodecsWrapper.
pub struct C2Wrapper<J, W>
where
    J: Send + Default + Job + 'static,
    W: C2Worker<J>,
{
    awaiting_job_event: Arc<EventFd>,
    input_fourcc: Fourcc,
    output_fourcc: Fourcc,
    error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
    work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
    work_queue: Arc<Mutex<VecDeque<J>>>,
    state: Arc<Mutex<C2State>>,
    framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
    alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
    options: <W as C2Worker<J>>::Options,
    worker_thread: Option<JoinHandle<()>>,
    // The instance of V actually lives in the thread creation closure, not
    // this struct.
    _phantom: PhantomData<W>,
}

impl<J, W> C2Wrapper<J, W>
where
    J: Send + Default + Job + 'static,
    W: C2Worker<J>,
{
    pub fn new(
        input_fourcc: Fourcc,
        output_fourcc: Fourcc,
        error_cb: impl FnMut(C2Status) + Send + 'static,
        work_done_cb: impl FnMut(J) + Send + 'static,
        framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static,
        alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static,
        options: <W as C2Worker<J>>::Options,
    ) -> Self {
        Self {
            awaiting_job_event: Arc::new(
                EventFd::from_flags(EfdFlags::EFD_SEMAPHORE)
                    .map_err(C2WrapperError::AwaitingJobEventFd)
                    .unwrap(),
            ),
            input_fourcc: input_fourcc,
            output_fourcc: output_fourcc,
            error_cb: Arc::new(Mutex::new(error_cb)),
            work_done_cb: Arc::new(Mutex::new(work_done_cb)),
            work_queue: Arc::new(Mutex::new(VecDeque::new())),
            state: Arc::new(Mutex::new(C2State::C2Stopped)),
            framepool_hint_cb: Arc::new(Mutex::new(framepool_hint_cb)),
            alloc_cb: Arc::new(Mutex::new(alloc_cb)),
            options: options,
            worker_thread: None,
            _phantom: Default::default(),
        }
    }

    // This isn't part of C2, but it's convenient to check if our worker thread
    // is still running.
    pub fn is_alive(&self) -> bool {
        match &self.worker_thread {
            Some(worker_thread) => !worker_thread.is_finished(),
            None => false,
        }
    }

    // Start the decoder/encoder.
    // State will be C2Running after this call.
    pub fn start(&mut self) -> C2Status {
        {
            let mut state = self.state.lock().unwrap();
            if *state != C2State::C2Stopped {
                (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
                return C2Status::C2BadState;
            }
            *state = C2State::C2Running;
        }

        let input_fourcc = self.input_fourcc.clone();
        let output_fourcc = self.output_fourcc.clone();
        let error_cb = self.error_cb.clone();
        let work_done_cb = self.work_done_cb.clone();
        let work_queue = self.work_queue.clone();
        let state = self.state.clone();
        let options = self.options.clone();
        let awaiting_job_event = self.awaiting_job_event.clone();
        let framepool_hint_cb = self.framepool_hint_cb.clone();
        let alloc_cb = self.alloc_cb.clone();
        self.worker_thread = Some(thread::spawn(move || {
            let worker = W::new(
                input_fourcc,
                output_fourcc,
                awaiting_job_event,
                error_cb.clone(),
                work_done_cb,
                work_queue,
                state.clone(),
                framepool_hint_cb,
                alloc_cb,
                options,
            );
            match worker {
                Ok(mut worker) => worker.process_loop(),
                Err(msg) => {
                    log::debug!("Error instantiating C2Worker {}", msg);
                    *state.lock().unwrap() = C2State::C2Error;
                    (*error_cb.lock().unwrap())(C2Status::C2BadValue);
                }
            };
        }));

        C2Status::C2Ok
    }

    // Stop the decoder/encoder and abandon in-flight work.
    // C2's reset() function is equivalent for our purposes.
    // Note that in event of error, stop() must be called before we can start()
    // again. This is to ensure we clear out the work queue.
    // State will be C2Stopped after this call.
    pub fn stop(&mut self) -> C2Status {
        *self.state.lock().unwrap() = C2State::C2Stopped;
        let mut worker_thread: Option<JoinHandle<()>> = None;
        std::mem::swap(&mut worker_thread, &mut self.worker_thread);
        self.worker_thread = match worker_thread {
            Some(worker_thread) => {
                let _ = worker_thread.join();
                None
            }
            None => None,
        };

        self.work_queue.lock().unwrap().drain(..);

        self.awaiting_job_event.write(1).unwrap();

        C2Status::C2Ok
    }

    // Add work to the work queue.
    // State must be C2Running or this function is invalid.
    // State will remain C2Running.
    pub fn queue(&mut self, work_items: Vec<J>) -> C2Status {
        if *self.state.lock().unwrap() != C2State::C2Running {
            (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
            return C2Status::C2BadState;
        }

        self.work_queue.lock().unwrap().extend(work_items.into_iter());

        self.awaiting_job_event.write(1).unwrap();

        C2Status::C2Ok
    }

    // Flush work from the queue and return it as |flushed_work|.
    // State will not change after this call.
    // TODO: Support different flush modes.
    pub fn flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status {
        if *self.state.lock().unwrap() != C2State::C2Running {
            (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
            return C2Status::C2BadState;
        }

        let mut tmp = self.work_queue.lock().unwrap().drain(..).collect::<Vec<J>>();
        flushed_work.append(&mut tmp);

        C2Status::C2Ok
    }

    // Signal to the decoder/encoder that it does not need to wait for
    // additional work to begin processing. This is an unusual name for this
    // function, but it is the convention that C2 uses.
    // State must be C2Running or this function is invalid.
    // State will remain C2Running until the last frames drain, at which point
    // the state will change to C2Stopped.
    // TODO: Support different drain modes.
    pub fn drain(&mut self, mode: DrainMode) -> C2Status {
        if *self.state.lock().unwrap() != C2State::C2Running {
            (*self.error_cb.lock().unwrap())(C2Status::C2BadState);
            return C2Status::C2BadState;
        }

        let mut drain_job: J = Default::default();
        drain_job.set_drain(mode);
        self.work_queue.lock().unwrap().push_back(drain_job);

        self.awaiting_job_event.write(1).unwrap();

        C2Status::C2Ok
    }
}

// Instead of C2's release() function, we implement Drop and use RAII to
// accomplish the same thing
impl<J, W> Drop for C2Wrapper<J, W>
where
    J: Send + Default + Job + 'static,
    W: C2Worker<J>,
{
    fn drop(&mut self) {
        self.stop();
    }
}