Skip to main content

mediadecode_ffmpeg/
video.rs

1//! `mediadecode::VideoStreamDecoder` impl with HW + SW fallback.
2//!
3//! [`FfmpegVideoStreamDecoder`] starts on the hardware path: an inner
4//! [`crate::VideoDecoder`] that auto-probes VideoToolbox / VAAPI /
5//! NVDEC / D3D11VA. When every HW backend fails — at `open` time
6//! (no backend opens) or mid-stream ([`crate::Error::AllBackendsFailed`]
7//! from `send_packet` / `receive_frame` / `send_eof`) — we transparently
8//! fall back to a **software** `ffmpeg::decoder::Video` opened from the
9//! same `Parameters`. The rescued `unconsumed_packets` from the HW
10//! probe are replayed through the SW decoder before the new packet (or
11//! the next `receive_frame` poll) is processed, so non-seekable inputs
12//! survive a mid-stream HW exhaustion without losing any compressed
13//! data.
14//!
15//! After the transition the decoder stays on SW for the rest of its
16//! life — there's no probe-back-to-HW logic; once we've decided the
17//! stream isn't HW-decodable, that decision is sticky.
18//!
19//! Frames produced by either path are converted via
20//! [`crate::convert::av_frame_to_video_frame`] so the consumer sees
21//! the same `mediadecode::VideoFrame<PixelFormat, VideoFrameExtra,
22//! FfmpegBuffer>` shape regardless of which backend produced it.
23
24use std::collections::VecDeque;
25
26/// Maximum number of frames the SW fallback replay path will buffer
27/// while draining the new SW decoder during packet/EOF replay.
28/// Replaying many compressed packets through SW can produce hundreds
29/// of decoded frames before the fallback commits; with no cap the
30/// resident memory grows unbounded (e.g. 4K frames at ~12 MB each ×
31/// 100s of frames). 64 frames is enough room to absorb every
32/// realistic codec's reorder/lookahead window without becoming a
33/// resource sink.
34const SW_REPLAY_FRAME_CAP: usize = 64;
35
36use ffmpeg_next::{codec::Parameters, frame};
37use mediadecode::{Timebase, decoder::VideoStreamDecoder, frame::VideoFrame, packet::VideoPacket};
38
39use crate::{
40  Error, Ffmpeg, FfmpegBuffer, Frame, VideoDecoder, boundary,
41  convert::{self, ConvertError},
42  decoder::{build_codec_context, try_clone_parameters},
43  error::FallbackFailed,
44  extras::{VideoFrameExtra, VideoPacketExtra},
45  frame::alloc_av_video_frame,
46};
47
48/// `mediadecode::VideoStreamDecoder` impl with transparent HW → SW
49/// fallback.
50pub struct FfmpegVideoStreamDecoder {
51  state: DecodeState,
52  /// Codec parameters retained so we can open a software
53  /// `ffmpeg::decoder::Video` if the HW probe exhausts.
54  parameters: Parameters,
55  /// HW-side scratch frame (filled by [`VideoDecoder::receive_frame`]).
56  hw_scratch: Frame,
57  /// SW-side scratch frame (filled by `ffmpeg::decoder::Video::receive_frame`).
58  sw_scratch: frame::Video,
59  /// Frames produced while draining the SW decoder during fallback
60  /// replay (see [`Self::fall_back_to_sw`]). The trait's
61  /// `receive_frame` delivers from this queue before pulling new
62  /// frames from the SW decoder. Empty in steady-state operation.
63  sw_replay_frames: VecDeque<frame::Video>,
64  /// `true` once `send_eof` has been called on the active decoder.
65  /// Used to propagate EOF to the SW decoder when fallback fires
66  /// during the drain phase — without this, codecs that hold tail
67  /// frames at EOF would hang waiting for an EOF they already saw on
68  /// the HW path.
69  eof_sent: bool,
70  /// Source-stream time base, used to label produced frames.
71  time_base: Timebase,
72}
73
74/// Internal: which backend is currently driving the decode.
75enum DecodeState {
76  /// Hardware-backed decoder (auto-probe). May transition to `Sw` on
77  /// `AllBackendsFailed`.
78  Hw(VideoDecoder),
79  /// Software decoder. Terminal state.
80  Sw(ffmpeg_next::decoder::Video),
81}
82
83impl FfmpegVideoStreamDecoder {
84  /// Opens a decoder for the given codec parameters with the default
85  /// HW backend probe order. If the HW probe can't open any backend,
86  /// falls back to a software `ffmpeg::decoder::Video` immediately —
87  /// `open` only returns `Err` when both paths fail.
88  ///
89  /// Subsequent mid-stream `AllBackendsFailed` from the HW path
90  /// triggers the same SW fallback (with rescued packets replayed).
91  pub fn open(parameters: Parameters, time_base: Timebase) -> Result<Self, Error> {
92    // ffmpeg-next's `Parameters` carries an optional `owner: Rc<dyn Any>`
93    // (when constructed from `stream.parameters()` it points back at
94    // the demuxer's `AVStream`). Upstream marks the type `Send`
95    // anyway, which is unsound the moment a non-`None` owner is in
96    // play — moving such a value across threads moves the `Rc`. We
97    // sidestep this by always storing a deep-cloned `Parameters`
98    // (`avcodec_parameters_copy` produces an owner-free copy), so
99    // the `FfmpegVideoStreamDecoder`'s `Send` reachability never
100    // depends on the caller's owner discipline.
101    //
102    // Use `try_clone_parameters` instead of `Parameters::clone` —
103    // ffmpeg-next's `clone` calls `Parameters::new()` which can
104    // return a `Parameters` whose inner pointer is null on OOM
105    // (`avcodec_parameters_alloc` returns null without indication);
106    // the subsequent `avcodec_parameters_copy` against that null
107    // destination is C UB. Our checked helper surfaces the OOM as
108    // an error instead.
109    let owned_parameters = try_clone_parameters(&parameters).map_err(Error::Ffmpeg)?;
110    let hw_scratch = Frame::empty()?;
111    let sw_scratch = alloc_av_video_frame()?;
112    let state =
113      match VideoDecoder::open(try_clone_parameters(&owned_parameters).map_err(Error::Ffmpeg)?) {
114        Ok(hw) => DecodeState::Hw(hw),
115        Err(Error::AllBackendsFailed(_)) => {
116          // Open-time HW exhaustion: no rescued packets (open didn't
117          // see any). Just open SW directly from our owned copy.
118          let sw = open_sw_decoder(&owned_parameters)?;
119          DecodeState::Sw(sw)
120        }
121        Err(other) => return Err(other),
122      };
123    Ok(Self {
124      state,
125      parameters: owned_parameters,
126      hw_scratch,
127      sw_scratch,
128      sw_replay_frames: VecDeque::new(),
129      eof_sent: false,
130      time_base,
131    })
132  }
133
134  /// Returns `true` when this decoder has fallen back to the software
135  /// path. `false` while still on the HW probe (the initial state).
136  #[cfg_attr(not(tarpaulin), inline(always))]
137  pub const fn is_software(&self) -> bool {
138    matches!(self.state, DecodeState::Sw(_))
139  }
140
141  /// Returns `true` while the HW probe is still active.
142  #[cfg_attr(not(tarpaulin), inline(always))]
143  pub const fn is_hardware(&self) -> bool {
144    matches!(self.state, DecodeState::Hw(_))
145  }
146
147  /// Borrow the inner [`VideoDecoder`] when this decoder is still on
148  /// the HW path. Returns `None` after the SW fallback has fired.
149  #[cfg_attr(not(tarpaulin), inline(always))]
150  pub const fn hardware_inner(&self) -> Option<&VideoDecoder> {
151    match &self.state {
152      DecodeState::Hw(hw) => Some(hw),
153      DecodeState::Sw(_) => None,
154    }
155  }
156
157  /// Returns the time base associated with the source stream.
158  #[cfg_attr(not(tarpaulin), inline(always))]
159  pub const fn time_base(&self) -> Timebase {
160    self.time_base
161  }
162
163  /// Internal: transition from HW to SW. Replays the rescued packets
164  /// (already accepted by the HW probe but not yet decoded) through
165  /// the new SW decoder so the stream resumes seamlessly.
166  ///
167  /// **Transactional**: drained replay frames accumulate in a local
168  /// queue; we only commit them to `self.sw_replay_frames` and switch
169  /// `self.state` to `Sw` after the replay (and EOF re-forwarding, if
170  /// needed) succeed. On failure, the SW decoder, the local frame
171  /// queue, and (where reachable) any consumed packets are dropped —
172  /// `self` is left in its prior state.
173  ///
174  /// **EOF-aware**: when EOF was already accepted on the HW path
175  /// (`self.eof_sent`), the new SW decoder also receives `send_eof()`
176  /// after replay. Without this, codecs that delay tail frames hang
177  /// forever in the drain phase.
178  ///
179  /// **EAGAIN-aware**: if SW's `send_packet` returns EAGAIN during
180  /// replay, drain produced frames into the local queue and retry.
181  fn fall_back_to_sw(
182    &mut self,
183    unconsumed_packets: std::vec::Vec<ffmpeg_next::Packet>,
184  ) -> Result<(), Error> {
185    tracing::info!(
186      packets_replayed = unconsumed_packets.len(),
187      eof_pending = self.eof_sent,
188      "mediadecode-ffmpeg: HW probe exhausted, falling back to software decode",
189    );
190    // Wrap the internal worker so any failure path returns the
191    // rescued packets to the caller via `Error::FallbackFailed`.
192    // Without this, non-seekable streams (live feeds, pipes) would
193    // lose every compressed byte the HW path had consumed when a
194    // fallback transition fails partway.
195    match self.fall_back_to_sw_inner(&unconsumed_packets) {
196      Ok(()) => Ok(()),
197      Err(source) => Err(Error::FallbackFailed(FallbackFailed::new(
198        Box::new(source),
199        unconsumed_packets,
200      ))),
201    }
202  }
203
204  /// Worker for [`Self::fall_back_to_sw`]. Returns the rescued packets
205  /// untouched on the borrowed slice; the wrapper takes ownership of
206  /// them and surfaces them in `FallbackFailed` if this returns Err.
207  fn fall_back_to_sw_inner(
208    &mut self,
209    unconsumed_packets: &[ffmpeg_next::Packet],
210  ) -> Result<(), Error> {
211    let mut sw = open_sw_decoder(&self.parameters)?;
212    let mut local_replay: VecDeque<frame::Video> = VecDeque::new();
213    // Helper: drain SW into the local replay queue, capped at
214    // `SW_REPLAY_FRAME_CAP`. Returns an error when the cap is
215    // exceeded — the fallback caller treats this as a non-recoverable
216    // failure rather than silently dropping decoded frames.
217    fn drain_into(
218      sw: &mut ffmpeg_next::decoder::Video,
219      local_replay: &mut VecDeque<frame::Video>,
220    ) -> std::result::Result<(), Error> {
221      loop {
222        let mut tmp = alloc_av_video_frame()?;
223        match sw.receive_frame(&mut tmp) {
224          Ok(()) => {
225            if local_replay.len() >= SW_REPLAY_FRAME_CAP {
226              tracing::error!(
227                cap = SW_REPLAY_FRAME_CAP,
228                "mediadecode-ffmpeg: SW fallback replay produced more frames than the \
229                 replay cap allows; aborting fallback (no frames dropped — they're \
230                 still in the SW decoder's internal queue and will be released when \
231                 it drops)",
232              );
233              return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
234                errno: libc::ENOMEM,
235              }));
236            }
237            local_replay.push_back(tmp);
238          }
239          Err(_) => break,
240        }
241      }
242      Ok(())
243    }
244
245    for pkt in unconsumed_packets {
246      let mut attempts: u32 = 0;
247      loop {
248        match sw.send_packet(pkt) {
249          Ok(()) => break,
250          Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
251            drain_into(&mut sw, &mut local_replay)?;
252            attempts += 1;
253            if attempts > 16 {
254              return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
255                errno: ffmpeg_next::error::EAGAIN,
256              }));
257            }
258          }
259          Err(other) => return Err(Error::Ffmpeg(other)),
260        }
261      }
262    }
263    // Re-forward EOF if the HW path already saw it. SW EOF can also
264    // return EAGAIN until prior output is drained — mirror the
265    // packet-replay loop.
266    if self.eof_sent {
267      let mut attempts: u32 = 0;
268      loop {
269        match sw.send_eof() {
270          Ok(()) => break,
271          Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
272            drain_into(&mut sw, &mut local_replay)?;
273            attempts += 1;
274            if attempts > 16 {
275              return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
276                errno: ffmpeg_next::error::EAGAIN,
277              }));
278            }
279          }
280          Err(other) => return Err(Error::Ffmpeg(other)),
281        }
282      }
283    }
284    // Commit: only after replay (and any EOF forwarding) succeeded
285    // do we move the new SW decoder and queue into `self`.
286    self.sw_replay_frames.append(&mut local_replay);
287    self.state = DecodeState::Sw(sw);
288    Ok(())
289  }
290
291  /// Internal: convert the active scratch frame into a
292  /// `mediadecode::VideoFrame` and write into `dst`.
293  fn deliver_frame(
294    &mut self,
295    dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, FfmpegBuffer>,
296  ) -> Result<(), VideoDecodeError> {
297    let av_frame = match &mut self.state {
298      DecodeState::Hw(_) => unsafe { self.hw_scratch.as_inner_mut().as_ptr() },
299      DecodeState::Sw(_) => unsafe { self.sw_scratch.as_ptr() },
300    };
301    // SAFETY: the scratch frame is live (just filled by the inner
302    // decoder's `receive_frame`); convert bumps refcounts on each
303    // plane buffer it pulls into the produced VideoFrame so the
304    // scratch can be reused on the next call.
305    let new_frame = unsafe { convert::av_frame_to_video_frame(av_frame, self.time_base) }
306      .map_err(VideoDecodeError::Convert)?;
307    *dst = new_frame;
308    Ok(())
309  }
310}
311
312impl VideoStreamDecoder for FfmpegVideoStreamDecoder {
313  type Adapter = Ffmpeg;
314  type Buffer = FfmpegBuffer;
315  type Error = VideoDecodeError;
316
317  fn send_packet(
318    &mut self,
319    packet: &VideoPacket<VideoPacketExtra, Self::Buffer>,
320  ) -> Result<(), Self::Error> {
321    let av_pkt = boundary::ffmpeg_packet_from_video_packet(packet)
322      .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
323    match &mut self.state {
324      DecodeState::Hw(hw) => match hw.send_packet(&av_pkt) {
325        Ok(()) => Ok(()),
326        Err(Error::AllBackendsFailed(p)) => {
327          let unconsumed_packets = p.into_unconsumed_packets();
328          self
329            .fall_back_to_sw(unconsumed_packets)
330            .map_err(VideoDecodeError::Decode)?;
331          // Now route the *new* packet to the freshly-opened SW
332          // decoder — the rescued packets were already replayed.
333          if let DecodeState::Sw(sw) = &mut self.state {
334            sw.send_packet(&av_pkt)
335              .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
336          }
337          Ok(())
338        }
339        Err(other) => Err(VideoDecodeError::Decode(other)),
340      },
341      DecodeState::Sw(sw) => sw
342        .send_packet(&av_pkt)
343        .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
344    }
345  }
346
347  fn receive_frame(
348    &mut self,
349    dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, Self::Buffer>,
350  ) -> Result<(), Self::Error> {
351    // Deliver any frames produced during SW fallback replay before
352    // pulling new ones from the SW decoder. This is the queue
353    // populated by `fall_back_to_sw` when SW returned EAGAIN during
354    // packet replay.
355    if let Some(replayed) = self.sw_replay_frames.pop_front() {
356      // SAFETY: `replayed` is a live AVFrame owned by us; convert
357      // bumps refcounts on each plane buffer.
358      let new_frame =
359        unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
360          .map_err(VideoDecodeError::Convert)?;
361      *dst = new_frame;
362      return Ok(());
363    }
364    loop {
365      match &mut self.state {
366        DecodeState::Hw(hw) => match hw.receive_frame(&mut self.hw_scratch) {
367          Ok(()) => return self.deliver_frame(dst),
368          Err(Error::AllBackendsFailed(p)) => {
369            let unconsumed_packets = p.into_unconsumed_packets();
370            // Probe exhausted at frame-time. Open SW, replay packets,
371            // loop back so the SW path tries to receive_frame.
372            self
373              .fall_back_to_sw(unconsumed_packets)
374              .map_err(VideoDecodeError::Decode)?;
375            // If the replay produced any drained frames, return one
376            // immediately — preserves stream order vs. whatever the
377            // SW decoder will produce next.
378            if let Some(replayed) = self.sw_replay_frames.pop_front() {
379              // SAFETY: see above.
380              let new_frame =
381                unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
382                  .map_err(VideoDecodeError::Convert)?;
383              *dst = new_frame;
384              return Ok(());
385            }
386            // Fall through to the loop; next iteration takes the Sw arm.
387          }
388          Err(other) => return Err(VideoDecodeError::Decode(other)),
389        },
390        DecodeState::Sw(sw) => {
391          return match sw.receive_frame(&mut self.sw_scratch) {
392            Ok(()) => self.deliver_frame(dst),
393            Err(e) => Err(VideoDecodeError::Decode(Error::Ffmpeg(e))),
394          };
395        }
396      }
397    }
398  }
399
400  fn send_eof(&mut self) -> Result<(), Self::Error> {
401    let outcome = match &mut self.state {
402      DecodeState::Hw(hw) => match hw.send_eof() {
403        Ok(()) => Ok(()),
404        Err(Error::AllBackendsFailed(p)) => {
405          let unconsumed_packets = p.into_unconsumed_packets();
406          // Mark EOF as already accepted *before* fallback so that
407          // `fall_back_to_sw` forwards it to the new SW decoder
408          // transactionally — packet replay, EOF replay, and the
409          // bounded EAGAIN-drain retry all happen inside the
410          // FallbackFailed-wrapping helper. Without this flag set
411          // first, the SW EOF would happen outside the transaction
412          // and a failure there would lose the rescued packets.
413          self.eof_sent = true;
414          self
415            .fall_back_to_sw(unconsumed_packets)
416            .map_err(VideoDecodeError::Decode)?;
417          Ok(())
418        }
419        Err(other) => Err(VideoDecodeError::Decode(other)),
420      },
421      DecodeState::Sw(sw) => sw
422        .send_eof()
423        .map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
424    };
425    if outcome.is_ok() {
426      self.eof_sent = true;
427    }
428    outcome
429  }
430
431  fn flush(&mut self) -> Result<(), Self::Error> {
432    // Drop any frames buffered during SW fallback replay before
433    // flushing the inner decoder — otherwise a seek/reset would
434    // surface stale pre-flush frames on the next `receive_frame`.
435    self.sw_replay_frames.clear();
436    // Flush ends the drain phase; the decoder accepts new packets
437    // after this, so reset EOF tracking.
438    self.eof_sent = false;
439    match &mut self.state {
440      DecodeState::Hw(hw) => hw.flush(),
441      DecodeState::Sw(sw) => sw.flush(),
442    }
443    Ok(())
444  }
445}
446
447fn open_sw_decoder(parameters: &Parameters) -> Result<ffmpeg_next::decoder::Video, Error> {
448  // Use the checked codec-context builder — ffmpeg-next's
449  // `Context::from_parameters` calls `Context::new()` which doesn't
450  // null-check `avcodec_alloc_context3`'s return value before
451  // running `avcodec_parameters_to_context` against it. Under
452  // memory pressure that's C-level UB; `build_codec_context`
453  // surfaces the OOM as an error instead.
454  let ctx = build_codec_context(parameters)?;
455  ctx.decoder().video().map_err(Error::Ffmpeg)
456}
457
458/// Error type for [`FfmpegVideoStreamDecoder`].
459#[derive(thiserror::Error, Debug)]
460pub enum VideoDecodeError {
461  /// The wrapped decoder (HW or SW) reported an error.
462  #[error(transparent)]
463  Decode(#[from] Error),
464  /// Frame conversion from FFmpeg's native types to mediadecode's
465  /// types failed.
466  #[error(transparent)]
467  Convert(#[from] ConvertError),
468}