cros_codecs/encoder/
stateless.rs

1// Copyright 2024 The ChromiumOS Authors
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6
7use thiserror::Error;
8
9use crate::encoder::CodedBitstreamBuffer;
10use crate::encoder::EncodeError;
11use crate::encoder::EncodeResult;
12use crate::encoder::FrameMetadata;
13use crate::encoder::Tunings;
14use crate::encoder::VideoEncoder;
15use crate::BlockingMode;
16
17pub mod av1;
18pub mod h264;
19pub(crate) mod predictor;
20pub mod vp9;
21
22#[derive(Error, Debug)]
23pub enum StatelessBackendError {
24    #[error("unsupported profile")]
25    UnsupportedProfile,
26    #[error("unsupported format")]
27    UnsupportedFormat,
28    #[error("not enough resources to proceed with the operation now")]
29    OutOfResources,
30    #[error(transparent)]
31    Other(#[from] anyhow::Error),
32}
33
34pub type StatelessBackendResult<T> = Result<T, StatelessBackendError>;
35
36/// Trait for representing pending encoder output.
37pub trait BackendPromise {
38    type Output;
39
40    /// Return coded result of the processing. Blocks if processing is not finished yet.
41    fn sync(self) -> StatelessBackendResult<Self::Output>;
42
43    /// Return true whenever the underlaying processing is done
44    fn is_ready(&self) -> bool;
45}
46
47pub struct ReadyPromise<T>(T);
48
49impl<T> From<T> for ReadyPromise<T> {
50    fn from(value: T) -> Self {
51        ReadyPromise(value)
52    }
53}
54
55impl<T> BackendPromise for ReadyPromise<T> {
56    type Output = T;
57
58    fn sync(self) -> StatelessBackendResult<Self::Output> {
59        Ok(self.0)
60    }
61
62    fn is_ready(&self) -> bool {
63        true
64    }
65}
66
67/// Wrapper type for [`BackendPromise<Output = Vec<u8>>`], with additional
68/// metadata.
69pub struct BitstreamPromise<P>
70where
71    P: BackendPromise<Output = Vec<u8>>,
72{
73    /// Slice data and reconstructed surface promise
74    bitstream: P,
75
76    /// Input frame metadata, for [`CodedBitstreamBuffer`]
77    meta: FrameMetadata,
78}
79
80impl<P> BackendPromise for BitstreamPromise<P>
81where
82    P: BackendPromise<Output = Vec<u8>>,
83{
84    type Output = CodedBitstreamBuffer;
85
86    fn is_ready(&self) -> bool {
87        self.bitstream.is_ready()
88    }
89
90    fn sync(self) -> StatelessBackendResult<Self::Output> {
91        let coded_data = self.bitstream.sync()?;
92
93        log::trace!("synced bitstream size={}", coded_data.len());
94
95        Ok(CodedBitstreamBuffer::new(self.meta, coded_data))
96    }
97}
98
99/// Internal structure representing all current processing represented using promises and allowing
100/// polling for finished promises.
101pub(crate) struct OutputQueue<O>
102where
103    O: BackendPromise,
104{
105    /// True if the every single polling call shall be blocking
106    blocking: BlockingMode,
107
108    /// Queue of currently pending [`BackendPromise`]
109    promises: VecDeque<O>,
110}
111
112impl<O> OutputQueue<O>
113where
114    O: BackendPromise,
115{
116    pub(crate) fn new(blocking: BlockingMode) -> Self {
117        Self {
118            blocking,
119            promises: Default::default(),
120        }
121    }
122
123    /// Add new pending job to the queue. Which will be returned to client if it is done.
124    pub(crate) fn add_promise(&mut self, pending: O) {
125        self.promises.push_back(pending);
126    }
127
128    /// Returns the result of an oldest [`BackendPromise`] if it is done processing. If `force_block`
129    /// is true, then the function will block till processing of the oldest [`BackendPromise`] is
130    /// finished and return it's result.
131    pub(crate) fn poll(&mut self, mode: BlockingMode) -> StatelessBackendResult<Option<O::Output>> {
132        let block = self.blocking == BlockingMode::Blocking || mode == BlockingMode::Blocking;
133
134        match self.promises.pop_front() {
135            Some(o) if block || o.is_ready() => Ok(Some(o.sync()?)),
136            Some(o) => {
137                self.promises.push_front(o);
138                Ok(None)
139            }
140            None => Ok(None),
141        }
142    }
143
144    /// Returns true if queue is empty ie. no [`BackendPromise`] is pending.
145    pub(crate) fn is_empty(&self) -> bool {
146        self.promises.is_empty()
147    }
148}
149
150/// Predictor is responsible for yielding stream parameter sets and creating requests to backend.
151/// It accepts the frames and reconstructed frames and returns [`Request`]s for execution. For
152/// example [`Predictor`] may hold frames from processing until enough is supplied to create a
153/// specific prediction structure. [`Predictor::drain`] may be called to force predictor to
154/// yield requests.
155pub(super) trait Predictor<Picture, Reference, Request> {
156    /// Called by encoder when there is new frame to encode. The predictor may return empty vector
157    /// to postpone processing or a set of requests to process frames (it does not have to be a frame
158    /// specified in parameters)
159    fn new_frame(
160        &mut self,
161        backend_pic: Picture,
162        meta: FrameMetadata,
163    ) -> EncodeResult<Vec<Request>>;
164
165    /// This function is called by the encoder, with reconstructed frame when backend finished
166    /// processing the frame. the [`Predictor`] may choose to return [`Request`]s to submit to
167    /// backend, if reconstructed was required for creating that request.
168    fn reconstructed(&mut self, recon: Reference) -> EncodeResult<Vec<Request>>;
169
170    /// Requests the change of dynamic parameters (aka [`Tunings`]) for the stream. The predictor
171    /// may choose to delay the change until entire or some part of the structure had been encoded.
172    /// However in such case the predictor is responsible for ensuring the change will be
173    /// successful.
174    fn tune(&mut self, tunings: Tunings) -> EncodeResult<()>;
175
176    /// Force [`Predictor`] to pop at least one frame from internal queue and return a [`Request`]s
177    fn drain(&mut self) -> EncodeResult<Vec<Request>>;
178}
179
180/// Generic trait for stateless encoder backends
181pub trait StatelessVideoEncoderBackend<Codec>: Sized
182where
183    Codec: StatelessCodec<Self>,
184{
185    /// Backend's specific representation of the input frame, transformed with [`import_picture`].
186    /// Might be a wrapper of the input handle with additional backend specific data or a copy of
187    /// an input frame in internal backend's representation.
188    ///
189    /// [`import_picture`]: StatelessEncoderBackendImport::import_picture
190    type Picture: 'static;
191
192    /// Backend's reconstructed frame handle.
193    type Reconstructed: 'static;
194
195    /// Backend's specific [`BackendPromise`] for bitstream, a result of
196    /// [`StatelessCodec::Request`] submission.
197    type CodedPromise: BackendPromise<Output = Vec<u8>>;
198
199    /// Backend's specific [`BackendPromise`] for [`StatelessVideoEncoderBackend::Reconstructed`],
200    /// a result of [`StatelessCodec::Request`] submission.
201    type ReconPromise: BackendPromise<Output = Self::Reconstructed>;
202}
203
204pub trait StatelessEncoderBackendImport<Handle, Picture> {
205    /// Imports the input `handle` from client and transforms into `Picture`
206    fn import_picture(
207        &mut self,
208        metadata: &FrameMetadata,
209        handle: Handle,
210    ) -> StatelessBackendResult<Picture>;
211}
212
213/// Trait helping contain all codec specific and backend specific types
214pub trait StatelessCodec<Backend>: Sized
215where
216    Backend: StatelessVideoEncoderBackend<Self>,
217{
218    /// Codec specific representation of frame reference wrapping a backend reference type
219    /// containing a codec specific frame metadata
220    type Reference;
221
222    /// A request type that will be delivered to codec specific stateless encoder backend
223    type Request;
224
225    /// Codec specific [`BackendPromise`] for [`CodedBitstreamBuffer`] wrapping a backend specific
226    /// [`StatelessVideoEncoderBackend::CodedPromise`]
227    type CodedPromise: BackendPromise<Output = CodedBitstreamBuffer>;
228
229    /// Codec specific [`BackendPromise`] for [`StatelessCodec::Reference`] wrapping a
230    /// backend speficic [`StatelessVideoEncoderBackend::ReconPromise`]
231    type ReferencePromise: BackendPromise<Output = Self::Reference>;
232}
233
234/// Helper aliases for codec and backend specific types
235type Picture<C, B> = <B as StatelessVideoEncoderBackend<C>>::Picture;
236
237type Reference<C, B> = <C as StatelessCodec<B>>::Reference;
238
239type Request<C, B> = <C as StatelessCodec<B>>::Request;
240
241type CodedPromise<C, B> = <C as StatelessCodec<B>>::CodedPromise;
242
243type ReferencePromise<C, B> = <C as StatelessCodec<B>>::ReferencePromise;
244
245type BoxPredictor<C, B> = Box<dyn Predictor<Picture<C, B>, Reference<C, B>, Request<C, B>>>;
246
247pub struct StatelessEncoder<Codec, Handle, Backend>
248where
249    Backend: StatelessVideoEncoderBackend<Codec>,
250    Codec: StatelessCodec<Backend>,
251{
252    /// Pending frame output promise queue
253    output_queue: OutputQueue<CodedPromise<Codec, Backend>>,
254
255    /// Pending reconstructed pictures promise queue
256    recon_queue: OutputQueue<ReferencePromise<Codec, Backend>>,
257
258    /// [`Predictor`] instance responsible for the encoder decision making
259    predictor: BoxPredictor<Codec, Backend>,
260
261    // predictor: Box<dyn Predictor<B::Picture, B::Reference>>,
262    coded_queue: VecDeque<CodedBitstreamBuffer>,
263
264    /// Number of the currently held frames by the predictor
265    predictor_frame_count: usize,
266
267    /// [`StatelessVideoEncoderBackend`] instance to delegate work to
268    backend: Backend,
269
270    _phantom: std::marker::PhantomData<Handle>,
271}
272
273/// A bridge trait between [`StatelessEncoder`] and codec specific backend trait (eg.
274/// [`h264::StatelessH264EncoderBackend`] or [`vp9::StatelessVP9EncoderBackend`]). Accepts
275/// `Request` and is responsible for adding resutling [`BackendPromise`] to [`StatelessEncoder`]
276/// internal queues and  decrementing the internal predictor frame counter if the backend moved the
277/// frame outside predictor ownership.
278pub trait StatelessEncoderExecute<Codec, Handle, Backend>
279where
280    Backend: StatelessVideoEncoderBackend<Codec>,
281    Codec: StatelessCodec<Backend>,
282{
283    fn execute(&mut self, request: Request<Codec, Backend>) -> EncodeResult<()>;
284}
285
286impl<Codec, Handle, Backend> StatelessEncoder<Codec, Handle, Backend>
287where
288    Codec: StatelessCodec<Backend>,
289    Backend: StatelessVideoEncoderBackend<Codec>,
290    Self: StatelessEncoderExecute<Codec, Handle, Backend>,
291{
292    fn new(
293        backend: Backend,
294        mode: BlockingMode,
295        predictor: BoxPredictor<Codec, Backend>,
296    ) -> EncodeResult<Self> {
297        Ok(Self {
298            backend,
299            predictor,
300            predictor_frame_count: 0,
301            coded_queue: Default::default(),
302            output_queue: OutputQueue::new(mode),
303            recon_queue: OutputQueue::new(mode),
304            _phantom: Default::default(),
305        })
306    }
307
308    fn poll_pending(&mut self, mode: BlockingMode) -> EncodeResult<()> {
309        // Poll the output queue once and then continue polling while new promise is submitted
310        while let Some(coded) = self.output_queue.poll(mode)? {
311            self.coded_queue.push_back(coded);
312        }
313
314        while let Some(recon) = self.recon_queue.poll(mode)? {
315            let requests = self.predictor.reconstructed(recon)?;
316            if requests.is_empty() {
317                // No promise was submitted, therefore break
318                break;
319            }
320
321            for request in requests {
322                self.execute(request)?;
323            }
324        }
325
326        Ok(())
327    }
328}
329
330impl<Codec, Handle, Backend> VideoEncoder<Handle> for StatelessEncoder<Codec, Handle, Backend>
331where
332    Codec: StatelessCodec<Backend>,
333    Backend: StatelessVideoEncoderBackend<Codec>,
334    Backend: StatelessEncoderBackendImport<Handle, Backend::Picture>,
335    Self: StatelessEncoderExecute<Codec, Handle, Backend>,
336{
337    fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
338        self.predictor.tune(tunings)
339    }
340
341    fn encode(&mut self, metadata: FrameMetadata, handle: Handle) -> EncodeResult<()> {
342        log::trace!(
343            "encode: timestamp={} layout={:?}",
344            metadata.timestamp,
345            metadata.layout
346        );
347
348        // Import `handle` to backends representation
349        let backend_pic = self.backend.import_picture(&metadata, handle)?;
350
351        // Increase the number of frames that predictor holds, before handing one to it
352        self.predictor_frame_count += 1;
353
354        // Ask predictor to decide on the next move and execute it
355        let requests = self.predictor.new_frame(backend_pic, metadata)?;
356        for request in requests {
357            self.execute(request)?;
358        }
359
360        Ok(())
361    }
362
363    fn drain(&mut self) -> EncodeResult<()> {
364        log::trace!("currently predictor holds {}", self.predictor_frame_count);
365
366        // Drain the predictor
367        while self.predictor_frame_count > 0 || !self.recon_queue.is_empty() {
368            if self.output_queue.is_empty() && self.recon_queue.is_empty() {
369                // The OutputQueue is empty and predictor holds frames, force it to yield a request
370                // to empty it's internal queue.
371                let requests = self.predictor.drain()?;
372                if requests.is_empty() {
373                    log::error!("failed to drain predictor, no request was returned");
374                    return Err(EncodeError::InvalidInternalState);
375                }
376
377                for request in requests {
378                    self.execute(request)?;
379                }
380            }
381
382            self.poll_pending(BlockingMode::Blocking)?;
383        }
384
385        // There are still some requests being processed. Continue on polling them.
386        while !self.output_queue.is_empty() {
387            self.poll_pending(BlockingMode::Blocking)?;
388        }
389
390        Ok(())
391    }
392
393    fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
394        // Poll on output queue without blocking and try to dueue from coded queue
395        self.poll_pending(BlockingMode::NonBlocking)?;
396        Ok(self.coded_queue.pop_front())
397    }
398}