cros_codecs/encoder/
stateful.rs

1// Copyright 2024 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::BTreeSet;
6use std::collections::VecDeque;
7
8use thiserror::Error;
9
10use crate::encoder::CodedBitstreamBuffer;
11use crate::encoder::EncodeError;
12use crate::encoder::EncodeResult;
13use crate::encoder::FrameMetadata;
14use crate::encoder::Tunings;
15use crate::encoder::VideoEncoder;
16
17pub mod h264;
18pub mod h265;
19pub mod vp8;
20pub mod vp9;
21
22#[derive(Debug, Error)]
23pub enum StatefulBackendError {
24    #[error("invalid internal state. This is likely a bug.")]
25    InvalidInternalState,
26    #[error(transparent)]
27    Other(#[from] anyhow::Error),
28}
29
30pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;
31
32/// Unique identifier of the [`BackendRequest`]
33#[repr(transparent)]
34#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
35pub struct BackendRequestId(usize);
36
37/// Request package that is offered to [`StatefulVideoEncoderBackend`] for processing
38pub struct BackendRequest<Handle> {
39    /// Request's unique identifier
40    pub request_id: BackendRequestId,
41    /// Frame's metadata
42    pub meta: FrameMetadata,
43    /// Frame's handle
44    pub handle: Handle,
45    /// Tunings set for the request
46    pub tunings: Tunings,
47}
48
49pub struct BackendOutput {
50    /// Request's unique identifier corresponding to [`BackendRequest`]
51    pub request_id: BackendRequestId,
52    /// Result of the request. [`CodedBitstreamBuffer`] containing encoded frame
53    pub buffer: CodedBitstreamBuffer,
54}
55
56/// Generic trait for stateful encoder backends
57pub trait StatefulVideoEncoderBackend<Handle> {
58    /// Try to submit encode request to the backend. The backend may not be able to accept the
59    /// request eg. if there are not enough available resources or backend desires to finish
60    /// previous request first. The function shall not be blocking.
61    /// If backend accepts the request for processing it shall take the `request` (take ownership of
62    /// [`BackendRequest`] and set ref mut to [`None`].
63    fn consume_request(
64        &mut self,
65        request: &mut Option<BackendRequest<Handle>>,
66    ) -> StatefulBackendResult<()>;
67
68    /// Function shall block, until the backend can accept request with [`consume_request`] or
69    /// will finished processing of some [`BackendRequest`] and [`poll`] can be used to
70    /// fetch is result.
71    ///
72    /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
73    /// [`poll`]: StatefulVideoEncoderBackend::poll
74    fn sync(&mut self) -> StatefulBackendResult<()>;
75
76    /// Blocking function, until the backend finishes processing all [`BackendRequest`], that the
77    /// backend has accepted and all outputs of those requests are returned.
78    fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;
79
80    /// If the processing of any [`BackendRequest`] is finished then the function should yield it's
81    /// corresponding [`BackendOutput`].
82    ///
83    /// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
84    fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
85}
86
87pub struct StatefulEncoder<Handle, Backend>
88where
89    Backend: StatefulVideoEncoderBackend<Handle>,
90{
91    /// Pending queue of frames to encoded by the backend
92    queue: VecDeque<BackendRequest<Handle>>,
93
94    /// Unique request identifier continue
95    request_counter: usize,
96
97    /// Latest [`Tunings`], that will be cloned in to request
98    tunings: Tunings,
99
100    /// Processed encoded bitstream queue for client to poll
101    coded_queue: VecDeque<CodedBitstreamBuffer>,
102
103    /// Currently processed requests by the backend
104    processing: BTreeSet<BackendRequestId>,
105
106    // [`StatefulVideoEncoderBackend`] instance to delegate [`BackendRequest`] to
107    backend: Backend,
108}
109
110impl<Handle, Backend> StatefulEncoder<Handle, Backend>
111where
112    Backend: StatefulVideoEncoderBackend<Handle>,
113{
114    /// Utility function that creates an new [`StatefulEncoder`] with [`Tunings`] and
115    /// [`StatefulVideoEncoderBackend`] instance.
116    #[allow(dead_code)]
117    fn create(tunings: Tunings, backend: Backend) -> Self {
118        Self {
119            queue: Default::default(),
120            request_counter: 0,
121            tunings,
122            coded_queue: Default::default(),
123            processing: Default::default(),
124            backend,
125        }
126    }
127
128    /// Handles the [`BackendOutput`] from the backend, ie add to the queue for client to poll.
129    fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
130        log::debug!(
131            "Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
132            output.request_id,
133            output.buffer.metadata.timestamp,
134            output.buffer.bitstream.len()
135        );
136        if !self.processing.remove(&output.request_id) {
137            log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
138                output.request_id,
139                output.buffer.metadata.timestamp,
140            );
141        }
142        self.coded_queue.push_back(output.buffer);
143        Ok(())
144    }
145
146    /// Poll the backend for outputs and handles them
147    fn poll_backend(&mut self) -> EncodeResult<()> {
148        while let Some(output) = self.backend.poll()? {
149            self.handle_output(output)?;
150        }
151
152        Ok(())
153    }
154
155    /// Performs essential processing. Poll the backend for outputs and tries to submit requests to
156    /// backends.
157    fn process(&mut self) -> EncodeResult<()> {
158        log::debug!(
159            "Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
160            self.queue.len(),
161            self.processing,
162            self.coded_queue.len()
163        );
164
165        if !self.processing.is_empty() {
166            self.poll_backend()?;
167        }
168
169        while let Some(request) = self.queue.pop_front() {
170            let request_id = request.request_id;
171            let timestamp = request.meta.timestamp;
172            let mut request = Some(request);
173
174            log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
175            self.backend.consume_request(&mut request)?;
176
177            if let Some(request) = request {
178                log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
179                self.queue.push_front(request);
180                break;
181            } else {
182                log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
183                self.processing.insert(request_id);
184            }
185        }
186
187        Ok(())
188    }
189
190    /// [`StatefulVideoEncoderBackend`]'s instance
191    pub fn backend(&mut self) -> &Backend {
192        &self.backend
193    }
194}
195
196impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
197where
198    Backend: StatefulVideoEncoderBackend<Handle>,
199{
200    fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
201        self.tunings = tunings;
202        Ok(())
203    }
204
205    fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
206        let request_id = BackendRequestId(self.request_counter);
207        self.request_counter = self.request_counter.wrapping_add(1);
208
209        log::trace!(
210            "Got new request id={request_id:?} timestamp={}",
211            meta.timestamp
212        );
213
214        let request = BackendRequest {
215            request_id,
216            meta,
217            handle,
218            tunings: self.tunings.clone(),
219        };
220
221        self.queue.push_back(request);
222        self.process()?;
223
224        Ok(())
225    }
226
227    fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
228        if !self.queue.is_empty() || !self.processing.is_empty() {
229            self.process()?;
230        }
231
232        if let Some(buffer) = self.coded_queue.pop_front() {
233            log::debug!(
234                "Returning coded buffer timestamp={}",
235                buffer.metadata.timestamp
236            );
237            return Ok(Some(buffer));
238        }
239        Ok(None)
240    }
241
242    fn drain(&mut self) -> EncodeResult<()> {
243        log::debug!(
244            "Got drain request. Pending in queue: {}. Currently processed: {:?}",
245            self.queue.len(),
246            self.processing
247        );
248
249        while !self.queue.is_empty() {
250            self.process()?;
251
252            if !self.queue.is_empty() {
253                self.backend.sync()?;
254            }
255        }
256
257        if self.processing.is_empty() {
258            log::debug!("Skipping drain request to backend, everything is drained");
259        }
260
261        log::debug!("Sending drain request to backend");
262        for output in self.backend.drain()? {
263            self.handle_output(output)?;
264        }
265
266        Ok(())
267    }
268}