mediadecode_ffmpeg/video.rs
1//! `mediadecode::VideoStreamDecoder` impl with HW + SW fallback.
2//!
3//! [`FfmpegVideoStreamDecoder`] starts on the hardware path: an inner
4//! [`crate::VideoDecoder`] that auto-probes VideoToolbox / VAAPI /
5//! NVDEC / D3D11VA. When every HW backend fails — at `open` time
6//! (no backend opens) or mid-stream ([`crate::Error::AllBackendsFailed`]
7//! from `send_packet` / `receive_frame` / `send_eof`) — we transparently
8//! fall back to a **software** `ffmpeg::decoder::Video` opened from the
9//! same `Parameters`. The rescued `unconsumed_packets` from the HW
10//! probe are replayed through the SW decoder before the new packet (or
11//! the next `receive_frame` poll) is processed, so non-seekable inputs
12//! survive a mid-stream HW exhaustion without losing any compressed
13//! data.
14//!
15//! After the transition the decoder stays on SW for the rest of its
16//! life — there's no probe-back-to-HW logic; once we've decided the
17//! stream isn't HW-decodable, that decision is sticky.
18//!
19//! Frames produced by either path are converted via
20//! [`crate::convert::av_frame_to_video_frame`] so the consumer sees
21//! the same `mediadecode::VideoFrame<PixelFormat, VideoFrameExtra,
22//! FfmpegBuffer>` shape regardless of which backend produced it.
23
24use std::collections::VecDeque;
25
26/// Maximum number of frames the SW fallback replay path will buffer
27/// while draining the new SW decoder during packet/EOF replay.
28/// Replaying many compressed packets through SW can produce hundreds
29/// of decoded frames before the fallback commits; with no cap the
30/// resident memory grows unbounded (e.g. 4K frames at ~12 MB each ×
31/// 100s of frames). 64 frames is enough room to absorb every
32/// realistic codec's reorder/lookahead window without becoming a
33/// resource sink.
34const SW_REPLAY_FRAME_CAP: usize = 64;
35
36use ffmpeg_next::{codec::Parameters, frame};
37use mediadecode::{Timebase, decoder::VideoStreamDecoder, frame::VideoFrame, packet::VideoPacket};
38
39use crate::{
40 Error, Ffmpeg, FfmpegBuffer, Frame, VideoDecoder, boundary,
41 convert::{self, ConvertError},
42 decoder::{build_codec_context, try_clone_parameters},
43 error::FallbackFailed,
44 extras::{VideoFrameExtra, VideoPacketExtra},
45 frame::alloc_av_video_frame,
46};
47
48/// `mediadecode::VideoStreamDecoder` impl with transparent HW → SW
49/// fallback.
50pub struct FfmpegVideoStreamDecoder {
51 state: DecodeState,
52 /// Codec parameters retained so we can open a software
53 /// `ffmpeg::decoder::Video` if the HW probe exhausts.
54 parameters: Parameters,
55 /// HW-side scratch frame (filled by [`VideoDecoder::receive_frame`]).
56 hw_scratch: Frame,
57 /// SW-side scratch frame (filled by `ffmpeg::decoder::Video::receive_frame`).
58 sw_scratch: frame::Video,
59 /// Frames produced while draining the SW decoder during fallback
60 /// replay (see [`Self::fall_back_to_sw`]). The trait's
61 /// `receive_frame` delivers from this queue before pulling new
62 /// frames from the SW decoder. Empty in steady-state operation.
63 sw_replay_frames: VecDeque<frame::Video>,
64 /// `true` once `send_eof` has been called on the active decoder.
65 /// Used to propagate EOF to the SW decoder when fallback fires
66 /// during the drain phase — without this, codecs that hold tail
67 /// frames at EOF would hang waiting for an EOF they already saw on
68 /// the HW path.
69 eof_sent: bool,
70 /// Source-stream time base, used to label produced frames.
71 time_base: Timebase,
72}
73
74/// Internal: which backend is currently driving the decode.
75enum DecodeState {
76 /// Hardware-backed decoder (auto-probe). May transition to `Sw` on
77 /// `AllBackendsFailed`.
78 Hw(VideoDecoder),
79 /// Software decoder. Terminal state.
80 Sw(ffmpeg_next::decoder::Video),
81}
82
83impl FfmpegVideoStreamDecoder {
84 /// Opens a decoder for the given codec parameters with the default
85 /// HW backend probe order. If the HW probe can't open any backend,
86 /// falls back to a software `ffmpeg::decoder::Video` immediately —
87 /// `open` only returns `Err` when both paths fail.
88 ///
89 /// Subsequent mid-stream `AllBackendsFailed` from the HW path
90 /// triggers the same SW fallback (with rescued packets replayed).
91 pub fn open(parameters: Parameters, time_base: Timebase) -> Result<Self, Error> {
92 // ffmpeg-next's `Parameters` carries an optional `owner: Rc<dyn Any>`
93 // (when constructed from `stream.parameters()` it points back at
94 // the demuxer's `AVStream`). Upstream marks the type `Send`
95 // anyway, which is unsound the moment a non-`None` owner is in
96 // play — moving such a value across threads moves the `Rc`. We
97 // sidestep this by always storing a deep-cloned `Parameters`
98 // (`avcodec_parameters_copy` produces an owner-free copy), so
99 // the `FfmpegVideoStreamDecoder`'s `Send` reachability never
100 // depends on the caller's owner discipline.
101 //
102 // Use `try_clone_parameters` instead of `Parameters::clone` —
103 // ffmpeg-next's `clone` calls `Parameters::new()` which can
104 // return a `Parameters` whose inner pointer is null on OOM
105 // (`avcodec_parameters_alloc` returns null without indication);
106 // the subsequent `avcodec_parameters_copy` against that null
107 // destination is C UB. Our checked helper surfaces the OOM as
108 // an error instead.
109 let owned_parameters = try_clone_parameters(¶meters).map_err(Error::Ffmpeg)?;
110 let hw_scratch = Frame::empty()?;
111 let sw_scratch = alloc_av_video_frame()?;
112 let state =
113 match VideoDecoder::open(try_clone_parameters(&owned_parameters).map_err(Error::Ffmpeg)?) {
114 Ok(hw) => DecodeState::Hw(hw),
115 Err(Error::AllBackendsFailed(_)) => {
116 // Open-time HW exhaustion: no rescued packets (open didn't
117 // see any). Just open SW directly from our owned copy.
118 let sw = open_sw_decoder(&owned_parameters)?;
119 DecodeState::Sw(sw)
120 }
121 Err(other) => return Err(other),
122 };
123 Ok(Self {
124 state,
125 parameters: owned_parameters,
126 hw_scratch,
127 sw_scratch,
128 sw_replay_frames: VecDeque::new(),
129 eof_sent: false,
130 time_base,
131 })
132 }
133
134 /// Returns `true` when this decoder has fallen back to the software
135 /// path. `false` while still on the HW probe (the initial state).
136 #[cfg_attr(not(tarpaulin), inline(always))]
137 pub const fn is_software(&self) -> bool {
138 matches!(self.state, DecodeState::Sw(_))
139 }
140
141 /// Returns `true` while the HW probe is still active.
142 #[cfg_attr(not(tarpaulin), inline(always))]
143 pub const fn is_hardware(&self) -> bool {
144 matches!(self.state, DecodeState::Hw(_))
145 }
146
147 /// Borrow the inner [`VideoDecoder`] when this decoder is still on
148 /// the HW path. Returns `None` after the SW fallback has fired.
149 #[cfg_attr(not(tarpaulin), inline(always))]
150 pub const fn hardware_inner(&self) -> Option<&VideoDecoder> {
151 match &self.state {
152 DecodeState::Hw(hw) => Some(hw),
153 DecodeState::Sw(_) => None,
154 }
155 }
156
157 /// Returns the time base associated with the source stream.
158 #[cfg_attr(not(tarpaulin), inline(always))]
159 pub const fn time_base(&self) -> Timebase {
160 self.time_base
161 }
162
163 /// Internal: transition from HW to SW. Replays the rescued packets
164 /// (already accepted by the HW probe but not yet decoded) through
165 /// the new SW decoder so the stream resumes seamlessly.
166 ///
167 /// **Transactional**: drained replay frames accumulate in a local
168 /// queue; we only commit them to `self.sw_replay_frames` and switch
169 /// `self.state` to `Sw` after the replay (and EOF re-forwarding, if
170 /// needed) succeed. On failure, the SW decoder, the local frame
171 /// queue, and (where reachable) any consumed packets are dropped —
172 /// `self` is left in its prior state.
173 ///
174 /// **EOF-aware**: when EOF was already accepted on the HW path
175 /// (`self.eof_sent`), the new SW decoder also receives `send_eof()`
176 /// after replay. Without this, codecs that delay tail frames hang
177 /// forever in the drain phase.
178 ///
179 /// **EAGAIN-aware**: if SW's `send_packet` returns EAGAIN during
180 /// replay, drain produced frames into the local queue and retry.
181 fn fall_back_to_sw(
182 &mut self,
183 unconsumed_packets: std::vec::Vec<ffmpeg_next::Packet>,
184 ) -> Result<(), Error> {
185 tracing::info!(
186 packets_replayed = unconsumed_packets.len(),
187 eof_pending = self.eof_sent,
188 "mediadecode-ffmpeg: HW probe exhausted, falling back to software decode",
189 );
190 // Wrap the internal worker so any failure path returns the
191 // rescued packets to the caller via `Error::FallbackFailed`.
192 // Without this, non-seekable streams (live feeds, pipes) would
193 // lose every compressed byte the HW path had consumed when a
194 // fallback transition fails partway.
195 match self.fall_back_to_sw_inner(&unconsumed_packets) {
196 Ok(()) => Ok(()),
197 Err(source) => Err(Error::FallbackFailed(FallbackFailed::new(
198 Box::new(source),
199 unconsumed_packets,
200 ))),
201 }
202 }
203
204 /// Worker for [`Self::fall_back_to_sw`]. Returns the rescued packets
205 /// untouched on the borrowed slice; the wrapper takes ownership of
206 /// them and surfaces them in `FallbackFailed` if this returns Err.
207 fn fall_back_to_sw_inner(
208 &mut self,
209 unconsumed_packets: &[ffmpeg_next::Packet],
210 ) -> Result<(), Error> {
211 let mut sw = open_sw_decoder(&self.parameters)?;
212 let mut local_replay: VecDeque<frame::Video> = VecDeque::new();
213 // Helper: drain SW into the local replay queue, capped at
214 // `SW_REPLAY_FRAME_CAP`. Returns an error when the cap is
215 // exceeded — the fallback caller treats this as a non-recoverable
216 // failure rather than silently dropping decoded frames.
217 fn drain_into(
218 sw: &mut ffmpeg_next::decoder::Video,
219 local_replay: &mut VecDeque<frame::Video>,
220 ) -> std::result::Result<(), Error> {
221 loop {
222 let mut tmp = alloc_av_video_frame()?;
223 match sw.receive_frame(&mut tmp) {
224 Ok(()) => {
225 if local_replay.len() >= SW_REPLAY_FRAME_CAP {
226 tracing::error!(
227 cap = SW_REPLAY_FRAME_CAP,
228 "mediadecode-ffmpeg: SW fallback replay produced more frames than the \
229 replay cap allows; aborting fallback (no frames dropped — they're \
230 still in the SW decoder's internal queue and will be released when \
231 it drops)",
232 );
233 return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
234 errno: libc::ENOMEM,
235 }));
236 }
237 local_replay.push_back(tmp);
238 }
239 Err(_) => break,
240 }
241 }
242 Ok(())
243 }
244
245 for pkt in unconsumed_packets {
246 let mut attempts: u32 = 0;
247 loop {
248 match sw.send_packet(pkt) {
249 Ok(()) => break,
250 Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
251 drain_into(&mut sw, &mut local_replay)?;
252 attempts += 1;
253 if attempts > 16 {
254 return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
255 errno: ffmpeg_next::error::EAGAIN,
256 }));
257 }
258 }
259 Err(other) => return Err(Error::Ffmpeg(other)),
260 }
261 }
262 }
263 // Re-forward EOF if the HW path already saw it. SW EOF can also
264 // return EAGAIN until prior output is drained — mirror the
265 // packet-replay loop.
266 if self.eof_sent {
267 let mut attempts: u32 = 0;
268 loop {
269 match sw.send_eof() {
270 Ok(()) => break,
271 Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
272 drain_into(&mut sw, &mut local_replay)?;
273 attempts += 1;
274 if attempts > 16 {
275 return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
276 errno: ffmpeg_next::error::EAGAIN,
277 }));
278 }
279 }
280 Err(other) => return Err(Error::Ffmpeg(other)),
281 }
282 }
283 }
284 // Commit: only after replay (and any EOF forwarding) succeeded
285 // do we move the new SW decoder and queue into `self`.
286 self.sw_replay_frames.append(&mut local_replay);
287 self.state = DecodeState::Sw(sw);
288 Ok(())
289 }
290
291 /// Internal: convert the active scratch frame into a
292 /// `mediadecode::VideoFrame` and write into `dst`.
293 fn deliver_frame(
294 &mut self,
295 dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, FfmpegBuffer>,
296 ) -> Result<(), VideoDecodeError> {
297 let av_frame = match &mut self.state {
298 DecodeState::Hw(_) => unsafe { self.hw_scratch.as_inner_mut().as_ptr() },
299 DecodeState::Sw(_) => unsafe { self.sw_scratch.as_ptr() },
300 };
301 // SAFETY: the scratch frame is live (just filled by the inner
302 // decoder's `receive_frame`); convert bumps refcounts on each
303 // plane buffer it pulls into the produced VideoFrame so the
304 // scratch can be reused on the next call.
305 let new_frame = unsafe { convert::av_frame_to_video_frame(av_frame, self.time_base) }
306 .map_err(VideoDecodeError::Convert)?;
307 *dst = new_frame;
308 Ok(())
309 }
310}
311
312impl VideoStreamDecoder for FfmpegVideoStreamDecoder {
313 type Adapter = Ffmpeg;
314 type Buffer = FfmpegBuffer;
315 type Error = VideoDecodeError;
316
317 fn send_packet(
318 &mut self,
319 packet: &VideoPacket<VideoPacketExtra, Self::Buffer>,
320 ) -> Result<(), Self::Error> {
321 let av_pkt = boundary::ffmpeg_packet_from_video_packet(packet)
322 .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
323 match &mut self.state {
324 DecodeState::Hw(hw) => match hw.send_packet(&av_pkt) {
325 Ok(()) => Ok(()),
326 Err(Error::AllBackendsFailed(p)) => {
327 let unconsumed_packets = p.into_unconsumed_packets();
328 self
329 .fall_back_to_sw(unconsumed_packets)
330 .map_err(VideoDecodeError::Decode)?;
331 // Now route the *new* packet to the freshly-opened SW
332 // decoder — the rescued packets were already replayed.
333 if let DecodeState::Sw(sw) = &mut self.state {
334 sw.send_packet(&av_pkt)
335 .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
336 }
337 Ok(())
338 }
339 Err(other) => Err(VideoDecodeError::Decode(other)),
340 },
341 DecodeState::Sw(sw) => sw
342 .send_packet(&av_pkt)
343 .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
344 }
345 }
346
347 fn receive_frame(
348 &mut self,
349 dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, Self::Buffer>,
350 ) -> Result<(), Self::Error> {
351 // Deliver any frames produced during SW fallback replay before
352 // pulling new ones from the SW decoder. This is the queue
353 // populated by `fall_back_to_sw` when SW returned EAGAIN during
354 // packet replay.
355 if let Some(replayed) = self.sw_replay_frames.pop_front() {
356 // SAFETY: `replayed` is a live AVFrame owned by us; convert
357 // bumps refcounts on each plane buffer.
358 let new_frame =
359 unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
360 .map_err(VideoDecodeError::Convert)?;
361 *dst = new_frame;
362 return Ok(());
363 }
364 loop {
365 match &mut self.state {
366 DecodeState::Hw(hw) => match hw.receive_frame(&mut self.hw_scratch) {
367 Ok(()) => return self.deliver_frame(dst),
368 Err(Error::AllBackendsFailed(p)) => {
369 let unconsumed_packets = p.into_unconsumed_packets();
370 // Probe exhausted at frame-time. Open SW, replay packets,
371 // loop back so the SW path tries to receive_frame.
372 self
373 .fall_back_to_sw(unconsumed_packets)
374 .map_err(VideoDecodeError::Decode)?;
375 // If the replay produced any drained frames, return one
376 // immediately — preserves stream order vs. whatever the
377 // SW decoder will produce next.
378 if let Some(replayed) = self.sw_replay_frames.pop_front() {
379 // SAFETY: see above.
380 let new_frame =
381 unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
382 .map_err(VideoDecodeError::Convert)?;
383 *dst = new_frame;
384 return Ok(());
385 }
386 // Fall through to the loop; next iteration takes the Sw arm.
387 }
388 Err(other) => return Err(VideoDecodeError::Decode(other)),
389 },
390 DecodeState::Sw(sw) => {
391 return match sw.receive_frame(&mut self.sw_scratch) {
392 Ok(()) => self.deliver_frame(dst),
393 Err(e) => Err(VideoDecodeError::Decode(Error::Ffmpeg(e))),
394 };
395 }
396 }
397 }
398 }
399
400 fn send_eof(&mut self) -> Result<(), Self::Error> {
401 let outcome = match &mut self.state {
402 DecodeState::Hw(hw) => match hw.send_eof() {
403 Ok(()) => Ok(()),
404 Err(Error::AllBackendsFailed(p)) => {
405 let unconsumed_packets = p.into_unconsumed_packets();
406 // Mark EOF as already accepted *before* fallback so that
407 // `fall_back_to_sw` forwards it to the new SW decoder
408 // transactionally — packet replay, EOF replay, and the
409 // bounded EAGAIN-drain retry all happen inside the
410 // FallbackFailed-wrapping helper. Without this flag set
411 // first, the SW EOF would happen outside the transaction
412 // and a failure there would lose the rescued packets.
413 self.eof_sent = true;
414 self
415 .fall_back_to_sw(unconsumed_packets)
416 .map_err(VideoDecodeError::Decode)?;
417 Ok(())
418 }
419 Err(other) => Err(VideoDecodeError::Decode(other)),
420 },
421 DecodeState::Sw(sw) => sw
422 .send_eof()
423 .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
424 };
425 if outcome.is_ok() {
426 self.eof_sent = true;
427 }
428 outcome
429 }
430
431 fn flush(&mut self) -> Result<(), Self::Error> {
432 // Drop any frames buffered during SW fallback replay before
433 // flushing the inner decoder — otherwise a seek/reset would
434 // surface stale pre-flush frames on the next `receive_frame`.
435 self.sw_replay_frames.clear();
436 // Flush ends the drain phase; the decoder accepts new packets
437 // after this, so reset EOF tracking.
438 self.eof_sent = false;
439 match &mut self.state {
440 DecodeState::Hw(hw) => hw.flush(),
441 DecodeState::Sw(sw) => sw.flush(),
442 }
443 Ok(())
444 }
445}
446
447fn open_sw_decoder(parameters: &Parameters) -> Result<ffmpeg_next::decoder::Video, Error> {
448 // Use the checked codec-context builder — ffmpeg-next's
449 // `Context::from_parameters` calls `Context::new()` which doesn't
450 // null-check `avcodec_alloc_context3`'s return value before
451 // running `avcodec_parameters_to_context` against it. Under
452 // memory pressure that's C-level UB; `build_codec_context`
453 // surfaces the OOM as an error instead.
454 let ctx = build_codec_context(parameters)?;
455 ctx.decoder().video().map_err(Error::Ffmpeg)
456}
457
458/// Error type for [`FfmpegVideoStreamDecoder`].
459#[derive(thiserror::Error, Debug)]
460pub enum VideoDecodeError {
461 /// The wrapped decoder (HW or SW) reported an error.
462 #[error(transparent)]
463 Decode(#[from] Error),
464 /// Frame conversion from FFmpeg's native types to mediadecode's
465 /// types failed.
466 #[error(transparent)]
467 Convert(#[from] ConvertError),
468}