cros_codecs/encoder/
stateful.rs1use std::collections::BTreeSet;
6use std::collections::VecDeque;
7
8use thiserror::Error;
9
10use crate::encoder::CodedBitstreamBuffer;
11use crate::encoder::EncodeError;
12use crate::encoder::EncodeResult;
13use crate::encoder::FrameMetadata;
14use crate::encoder::Tunings;
15use crate::encoder::VideoEncoder;
16
17pub mod h264;
18pub mod h265;
19pub mod vp8;
20pub mod vp9;
21
22#[derive(Debug, Error)]
23pub enum StatefulBackendError {
24 #[error("invalid internal state. This is likely a bug.")]
25 InvalidInternalState,
26 #[error(transparent)]
27 Other(#[from] anyhow::Error),
28}
29
30pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;
31
32#[repr(transparent)]
34#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
35pub struct BackendRequestId(usize);
36
37pub struct BackendRequest<Handle> {
39 pub request_id: BackendRequestId,
41 pub meta: FrameMetadata,
43 pub handle: Handle,
45 pub tunings: Tunings,
47}
48
49pub struct BackendOutput {
50 pub request_id: BackendRequestId,
52 pub buffer: CodedBitstreamBuffer,
54}
55
56pub trait StatefulVideoEncoderBackend<Handle> {
58 fn consume_request(
64 &mut self,
65 request: &mut Option<BackendRequest<Handle>>,
66 ) -> StatefulBackendResult<()>;
67
68 fn sync(&mut self) -> StatefulBackendResult<()>;
75
76 fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;
79
80 fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
85}
86
87pub struct StatefulEncoder<Handle, Backend>
88where
89 Backend: StatefulVideoEncoderBackend<Handle>,
90{
91 queue: VecDeque<BackendRequest<Handle>>,
93
94 request_counter: usize,
96
97 tunings: Tunings,
99
100 coded_queue: VecDeque<CodedBitstreamBuffer>,
102
103 processing: BTreeSet<BackendRequestId>,
105
106 backend: Backend,
108}
109
110impl<Handle, Backend> StatefulEncoder<Handle, Backend>
111where
112 Backend: StatefulVideoEncoderBackend<Handle>,
113{
114 #[allow(dead_code)]
117 fn create(tunings: Tunings, backend: Backend) -> Self {
118 Self {
119 queue: Default::default(),
120 request_counter: 0,
121 tunings,
122 coded_queue: Default::default(),
123 processing: Default::default(),
124 backend,
125 }
126 }
127
128 fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
130 log::debug!(
131 "Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
132 output.request_id,
133 output.buffer.metadata.timestamp,
134 output.buffer.bitstream.len()
135 );
136 if !self.processing.remove(&output.request_id) {
137 log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
138 output.request_id,
139 output.buffer.metadata.timestamp,
140 );
141 }
142 self.coded_queue.push_back(output.buffer);
143 Ok(())
144 }
145
146 fn poll_backend(&mut self) -> EncodeResult<()> {
148 while let Some(output) = self.backend.poll()? {
149 self.handle_output(output)?;
150 }
151
152 Ok(())
153 }
154
155 fn process(&mut self) -> EncodeResult<()> {
158 log::debug!(
159 "Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
160 self.queue.len(),
161 self.processing,
162 self.coded_queue.len()
163 );
164
165 if !self.processing.is_empty() {
166 self.poll_backend()?;
167 }
168
169 while let Some(request) = self.queue.pop_front() {
170 let request_id = request.request_id;
171 let timestamp = request.meta.timestamp;
172 let mut request = Some(request);
173
174 log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
175 self.backend.consume_request(&mut request)?;
176
177 if let Some(request) = request {
178 log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
179 self.queue.push_front(request);
180 break;
181 } else {
182 log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
183 self.processing.insert(request_id);
184 }
185 }
186
187 Ok(())
188 }
189
190 pub fn backend(&mut self) -> &Backend {
192 &self.backend
193 }
194}
195
196impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
197where
198 Backend: StatefulVideoEncoderBackend<Handle>,
199{
200 fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
201 self.tunings = tunings;
202 Ok(())
203 }
204
205 fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
206 let request_id = BackendRequestId(self.request_counter);
207 self.request_counter = self.request_counter.wrapping_add(1);
208
209 log::trace!(
210 "Got new request id={request_id:?} timestamp={}",
211 meta.timestamp
212 );
213
214 let request = BackendRequest {
215 request_id,
216 meta,
217 handle,
218 tunings: self.tunings.clone(),
219 };
220
221 self.queue.push_back(request);
222 self.process()?;
223
224 Ok(())
225 }
226
227 fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
228 if !self.queue.is_empty() || !self.processing.is_empty() {
229 self.process()?;
230 }
231
232 if let Some(buffer) = self.coded_queue.pop_front() {
233 log::debug!(
234 "Returning coded buffer timestamp={}",
235 buffer.metadata.timestamp
236 );
237 return Ok(Some(buffer));
238 }
239 Ok(None)
240 }
241
242 fn drain(&mut self) -> EncodeResult<()> {
243 log::debug!(
244 "Got drain request. Pending in queue: {}. Currently processed: {:?}",
245 self.queue.len(),
246 self.processing
247 );
248
249 while !self.queue.is_empty() {
250 self.process()?;
251
252 if !self.queue.is_empty() {
253 self.backend.sync()?;
254 }
255 }
256
257 if self.processing.is_empty() {
258 log::debug!("Skipping drain request to backend, everything is drained");
259 }
260
261 log::debug!("Sending drain request to backend");
262 for output in self.backend.drain()? {
263 self.handle_output(output)?;
264 }
265
266 Ok(())
267 }
268}