1use std::collections::VecDeque;
6
7use thiserror::Error;
8
9use crate::encoder::CodedBitstreamBuffer;
10use crate::encoder::EncodeError;
11use crate::encoder::EncodeResult;
12use crate::encoder::FrameMetadata;
13use crate::encoder::Tunings;
14use crate::encoder::VideoEncoder;
15use crate::BlockingMode;
16
17pub mod av1;
18pub mod h264;
19pub(crate) mod predictor;
20pub mod vp9;
21
22#[derive(Error, Debug)]
23pub enum StatelessBackendError {
24 #[error("unsupported profile")]
25 UnsupportedProfile,
26 #[error("unsupported format")]
27 UnsupportedFormat,
28 #[error("not enough resources to proceed with the operation now")]
29 OutOfResources,
30 #[error(transparent)]
31 Other(#[from] anyhow::Error),
32}
33
34pub type StatelessBackendResult<T> = Result<T, StatelessBackendError>;
35
36pub trait BackendPromise {
38 type Output;
39
40 fn sync(self) -> StatelessBackendResult<Self::Output>;
42
43 fn is_ready(&self) -> bool;
45}
46
47pub struct ReadyPromise<T>(T);
48
49impl<T> From<T> for ReadyPromise<T> {
50 fn from(value: T) -> Self {
51 ReadyPromise(value)
52 }
53}
54
55impl<T> BackendPromise for ReadyPromise<T> {
56 type Output = T;
57
58 fn sync(self) -> StatelessBackendResult<Self::Output> {
59 Ok(self.0)
60 }
61
62 fn is_ready(&self) -> bool {
63 true
64 }
65}
66
67pub struct BitstreamPromise<P>
70where
71 P: BackendPromise<Output = Vec<u8>>,
72{
73 bitstream: P,
75
76 meta: FrameMetadata,
78}
79
80impl<P> BackendPromise for BitstreamPromise<P>
81where
82 P: BackendPromise<Output = Vec<u8>>,
83{
84 type Output = CodedBitstreamBuffer;
85
86 fn is_ready(&self) -> bool {
87 self.bitstream.is_ready()
88 }
89
90 fn sync(self) -> StatelessBackendResult<Self::Output> {
91 let coded_data = self.bitstream.sync()?;
92
93 log::trace!("synced bitstream size={}", coded_data.len());
94
95 Ok(CodedBitstreamBuffer::new(self.meta, coded_data))
96 }
97}
98
99pub(crate) struct OutputQueue<O>
102where
103 O: BackendPromise,
104{
105 blocking: BlockingMode,
107
108 promises: VecDeque<O>,
110}
111
112impl<O> OutputQueue<O>
113where
114 O: BackendPromise,
115{
116 pub(crate) fn new(blocking: BlockingMode) -> Self {
117 Self {
118 blocking,
119 promises: Default::default(),
120 }
121 }
122
123 pub(crate) fn add_promise(&mut self, pending: O) {
125 self.promises.push_back(pending);
126 }
127
128 pub(crate) fn poll(&mut self, mode: BlockingMode) -> StatelessBackendResult<Option<O::Output>> {
132 let block = self.blocking == BlockingMode::Blocking || mode == BlockingMode::Blocking;
133
134 match self.promises.pop_front() {
135 Some(o) if block || o.is_ready() => Ok(Some(o.sync()?)),
136 Some(o) => {
137 self.promises.push_front(o);
138 Ok(None)
139 }
140 None => Ok(None),
141 }
142 }
143
144 pub(crate) fn is_empty(&self) -> bool {
146 self.promises.is_empty()
147 }
148}
149
150pub(super) trait Predictor<Picture, Reference, Request> {
156 fn new_frame(
160 &mut self,
161 backend_pic: Picture,
162 meta: FrameMetadata,
163 ) -> EncodeResult<Vec<Request>>;
164
165 fn reconstructed(&mut self, recon: Reference) -> EncodeResult<Vec<Request>>;
169
170 fn tune(&mut self, tunings: Tunings) -> EncodeResult<()>;
175
176 fn drain(&mut self) -> EncodeResult<Vec<Request>>;
178}
179
180pub trait StatelessVideoEncoderBackend<Codec>: Sized
182where
183 Codec: StatelessCodec<Self>,
184{
185 type Picture: 'static;
191
192 type Reconstructed: 'static;
194
195 type CodedPromise: BackendPromise<Output = Vec<u8>>;
198
199 type ReconPromise: BackendPromise<Output = Self::Reconstructed>;
202}
203
204pub trait StatelessEncoderBackendImport<Handle, Picture> {
205 fn import_picture(
207 &mut self,
208 metadata: &FrameMetadata,
209 handle: Handle,
210 ) -> StatelessBackendResult<Picture>;
211}
212
213pub trait StatelessCodec<Backend>: Sized
215where
216 Backend: StatelessVideoEncoderBackend<Self>,
217{
218 type Reference;
221
222 type Request;
224
225 type CodedPromise: BackendPromise<Output = CodedBitstreamBuffer>;
228
229 type ReferencePromise: BackendPromise<Output = Self::Reference>;
232}
233
234type Picture<C, B> = <B as StatelessVideoEncoderBackend<C>>::Picture;
236
237type Reference<C, B> = <C as StatelessCodec<B>>::Reference;
238
239type Request<C, B> = <C as StatelessCodec<B>>::Request;
240
241type CodedPromise<C, B> = <C as StatelessCodec<B>>::CodedPromise;
242
243type ReferencePromise<C, B> = <C as StatelessCodec<B>>::ReferencePromise;
244
245type BoxPredictor<C, B> = Box<dyn Predictor<Picture<C, B>, Reference<C, B>, Request<C, B>>>;
246
247pub struct StatelessEncoder<Codec, Handle, Backend>
248where
249 Backend: StatelessVideoEncoderBackend<Codec>,
250 Codec: StatelessCodec<Backend>,
251{
252 output_queue: OutputQueue<CodedPromise<Codec, Backend>>,
254
255 recon_queue: OutputQueue<ReferencePromise<Codec, Backend>>,
257
258 predictor: BoxPredictor<Codec, Backend>,
260
261 coded_queue: VecDeque<CodedBitstreamBuffer>,
263
264 predictor_frame_count: usize,
266
267 backend: Backend,
269
270 _phantom: std::marker::PhantomData<Handle>,
271}
272
273pub trait StatelessEncoderExecute<Codec, Handle, Backend>
279where
280 Backend: StatelessVideoEncoderBackend<Codec>,
281 Codec: StatelessCodec<Backend>,
282{
283 fn execute(&mut self, request: Request<Codec, Backend>) -> EncodeResult<()>;
284}
285
286impl<Codec, Handle, Backend> StatelessEncoder<Codec, Handle, Backend>
287where
288 Codec: StatelessCodec<Backend>,
289 Backend: StatelessVideoEncoderBackend<Codec>,
290 Self: StatelessEncoderExecute<Codec, Handle, Backend>,
291{
292 fn new(
293 backend: Backend,
294 mode: BlockingMode,
295 predictor: BoxPredictor<Codec, Backend>,
296 ) -> EncodeResult<Self> {
297 Ok(Self {
298 backend,
299 predictor,
300 predictor_frame_count: 0,
301 coded_queue: Default::default(),
302 output_queue: OutputQueue::new(mode),
303 recon_queue: OutputQueue::new(mode),
304 _phantom: Default::default(),
305 })
306 }
307
308 fn poll_pending(&mut self, mode: BlockingMode) -> EncodeResult<()> {
309 while let Some(coded) = self.output_queue.poll(mode)? {
311 self.coded_queue.push_back(coded);
312 }
313
314 while let Some(recon) = self.recon_queue.poll(mode)? {
315 let requests = self.predictor.reconstructed(recon)?;
316 if requests.is_empty() {
317 break;
319 }
320
321 for request in requests {
322 self.execute(request)?;
323 }
324 }
325
326 Ok(())
327 }
328}
329
330impl<Codec, Handle, Backend> VideoEncoder<Handle> for StatelessEncoder<Codec, Handle, Backend>
331where
332 Codec: StatelessCodec<Backend>,
333 Backend: StatelessVideoEncoderBackend<Codec>,
334 Backend: StatelessEncoderBackendImport<Handle, Backend::Picture>,
335 Self: StatelessEncoderExecute<Codec, Handle, Backend>,
336{
337 fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
338 self.predictor.tune(tunings)
339 }
340
341 fn encode(&mut self, metadata: FrameMetadata, handle: Handle) -> EncodeResult<()> {
342 log::trace!(
343 "encode: timestamp={} layout={:?}",
344 metadata.timestamp,
345 metadata.layout
346 );
347
348 let backend_pic = self.backend.import_picture(&metadata, handle)?;
350
351 self.predictor_frame_count += 1;
353
354 let requests = self.predictor.new_frame(backend_pic, metadata)?;
356 for request in requests {
357 self.execute(request)?;
358 }
359
360 Ok(())
361 }
362
363 fn drain(&mut self) -> EncodeResult<()> {
364 log::trace!("currently predictor holds {}", self.predictor_frame_count);
365
366 while self.predictor_frame_count > 0 || !self.recon_queue.is_empty() {
368 if self.output_queue.is_empty() && self.recon_queue.is_empty() {
369 let requests = self.predictor.drain()?;
372 if requests.is_empty() {
373 log::error!("failed to drain predictor, no request was returned");
374 return Err(EncodeError::InvalidInternalState);
375 }
376
377 for request in requests {
378 self.execute(request)?;
379 }
380 }
381
382 self.poll_pending(BlockingMode::Blocking)?;
383 }
384
385 while !self.output_queue.is_empty() {
387 self.poll_pending(BlockingMode::Blocking)?;
388 }
389
390 Ok(())
391 }
392
393 fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
394 self.poll_pending(BlockingMode::NonBlocking)?;
396 Ok(self.coded_queue.pop_front())
397 }
398}