use std::collections::VecDeque;
const SW_REPLAY_FRAME_CAP: usize = 64;
use ffmpeg_next::{codec::Parameters, frame};
use mediadecode::{Timebase, decoder::VideoStreamDecoder, frame::VideoFrame, packet::VideoPacket};
use crate::{
Error, Ffmpeg, FfmpegBuffer, Frame, VideoDecoder, boundary,
convert::{self, ConvertError},
decoder::{build_codec_context, try_clone_parameters},
error::FallbackFailed,
extras::{VideoFrameExtra, VideoPacketExtra},
frame::alloc_av_video_frame,
};
pub struct FfmpegVideoStreamDecoder {
state: DecodeState,
parameters: Parameters,
hw_scratch: Frame,
sw_scratch: frame::Video,
sw_replay_frames: VecDeque<frame::Video>,
eof_sent: bool,
time_base: Timebase,
}
enum DecodeState {
Hw(VideoDecoder),
Sw(ffmpeg_next::decoder::Video),
}
impl FfmpegVideoStreamDecoder {
pub fn open(parameters: Parameters, time_base: Timebase) -> Result<Self, Error> {
let owned_parameters = try_clone_parameters(¶meters).map_err(Error::Ffmpeg)?;
let hw_scratch = Frame::empty()?;
let sw_scratch = alloc_av_video_frame()?;
let state =
match VideoDecoder::open(try_clone_parameters(&owned_parameters).map_err(Error::Ffmpeg)?) {
Ok(hw) => DecodeState::Hw(hw),
Err(Error::AllBackendsFailed(_)) => {
let sw = open_sw_decoder(&owned_parameters)?;
DecodeState::Sw(sw)
}
Err(other) => return Err(other),
};
Ok(Self {
state,
parameters: owned_parameters,
hw_scratch,
sw_scratch,
sw_replay_frames: VecDeque::new(),
eof_sent: false,
time_base,
})
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub const fn is_software(&self) -> bool {
matches!(self.state, DecodeState::Sw(_))
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub const fn is_hardware(&self) -> bool {
matches!(self.state, DecodeState::Hw(_))
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub const fn hardware_inner(&self) -> Option<&VideoDecoder> {
match &self.state {
DecodeState::Hw(hw) => Some(hw),
DecodeState::Sw(_) => None,
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub const fn time_base(&self) -> Timebase {
self.time_base
}
fn fall_back_to_sw(
&mut self,
unconsumed_packets: std::vec::Vec<ffmpeg_next::Packet>,
) -> Result<(), Error> {
tracing::info!(
packets_replayed = unconsumed_packets.len(),
eof_pending = self.eof_sent,
"mediadecode-ffmpeg: HW probe exhausted, falling back to software decode",
);
match self.fall_back_to_sw_inner(&unconsumed_packets) {
Ok(()) => Ok(()),
Err(source) => Err(Error::FallbackFailed(FallbackFailed::new(
Box::new(source),
unconsumed_packets,
))),
}
}
fn fall_back_to_sw_inner(
&mut self,
unconsumed_packets: &[ffmpeg_next::Packet],
) -> Result<(), Error> {
let mut sw = open_sw_decoder(&self.parameters)?;
let mut local_replay: VecDeque<frame::Video> = VecDeque::new();
fn drain_into(
sw: &mut ffmpeg_next::decoder::Video,
local_replay: &mut VecDeque<frame::Video>,
) -> std::result::Result<(), Error> {
loop {
let mut tmp = alloc_av_video_frame()?;
match sw.receive_frame(&mut tmp) {
Ok(()) => {
if local_replay.len() >= SW_REPLAY_FRAME_CAP {
tracing::error!(
cap = SW_REPLAY_FRAME_CAP,
"mediadecode-ffmpeg: SW fallback replay produced more frames than the \
replay cap allows; aborting fallback (no frames dropped — they're \
still in the SW decoder's internal queue and will be released when \
it drops)",
);
return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
errno: libc::ENOMEM,
}));
}
local_replay.push_back(tmp);
}
Err(_) => break,
}
}
Ok(())
}
for pkt in unconsumed_packets {
let mut attempts: u32 = 0;
loop {
match sw.send_packet(pkt) {
Ok(()) => break,
Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
drain_into(&mut sw, &mut local_replay)?;
attempts += 1;
if attempts > 16 {
return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
errno: ffmpeg_next::error::EAGAIN,
}));
}
}
Err(other) => return Err(Error::Ffmpeg(other)),
}
}
}
if self.eof_sent {
let mut attempts: u32 = 0;
loop {
match sw.send_eof() {
Ok(()) => break,
Err(ffmpeg_next::Error::Other { errno }) if errno == ffmpeg_next::error::EAGAIN => {
drain_into(&mut sw, &mut local_replay)?;
attempts += 1;
if attempts > 16 {
return Err(Error::Ffmpeg(ffmpeg_next::Error::Other {
errno: ffmpeg_next::error::EAGAIN,
}));
}
}
Err(other) => return Err(Error::Ffmpeg(other)),
}
}
}
self.sw_replay_frames.append(&mut local_replay);
self.state = DecodeState::Sw(sw);
Ok(())
}
fn deliver_frame(
&mut self,
dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, FfmpegBuffer>,
) -> Result<(), VideoDecodeError> {
let av_frame = match &mut self.state {
DecodeState::Hw(_) => unsafe { self.hw_scratch.as_inner_mut().as_ptr() },
DecodeState::Sw(_) => unsafe { self.sw_scratch.as_ptr() },
};
let new_frame = unsafe { convert::av_frame_to_video_frame(av_frame, self.time_base) }
.map_err(VideoDecodeError::Convert)?;
*dst = new_frame;
Ok(())
}
}
impl VideoStreamDecoder for FfmpegVideoStreamDecoder {
type Adapter = Ffmpeg;
type Buffer = FfmpegBuffer;
type Error = VideoDecodeError;
fn send_packet(
&mut self,
packet: &VideoPacket<VideoPacketExtra, Self::Buffer>,
) -> Result<(), Self::Error> {
let av_pkt = boundary::ffmpeg_packet_from_video_packet(packet)
.map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
match &mut self.state {
DecodeState::Hw(hw) => match hw.send_packet(&av_pkt) {
Ok(()) => Ok(()),
Err(Error::AllBackendsFailed(p)) => {
let unconsumed_packets = p.into_unconsumed_packets();
self
.fall_back_to_sw(unconsumed_packets)
.map_err(VideoDecodeError::Decode)?;
if let DecodeState::Sw(sw) = &mut self.state {
sw.send_packet(&av_pkt)
.map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e)))?;
}
Ok(())
}
Err(other) => Err(VideoDecodeError::Decode(other)),
},
DecodeState::Sw(sw) => sw
.send_packet(&av_pkt)
.map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
}
}
fn receive_frame(
&mut self,
dst: &mut VideoFrame<mediadecode::PixelFormat, VideoFrameExtra, Self::Buffer>,
) -> Result<(), Self::Error> {
if let Some(replayed) = self.sw_replay_frames.pop_front() {
let new_frame =
unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
.map_err(VideoDecodeError::Convert)?;
*dst = new_frame;
return Ok(());
}
loop {
match &mut self.state {
DecodeState::Hw(hw) => match hw.receive_frame(&mut self.hw_scratch) {
Ok(()) => return self.deliver_frame(dst),
Err(Error::AllBackendsFailed(p)) => {
let unconsumed_packets = p.into_unconsumed_packets();
self
.fall_back_to_sw(unconsumed_packets)
.map_err(VideoDecodeError::Decode)?;
if let Some(replayed) = self.sw_replay_frames.pop_front() {
let new_frame =
unsafe { convert::av_frame_to_video_frame(replayed.as_ptr(), self.time_base) }
.map_err(VideoDecodeError::Convert)?;
*dst = new_frame;
return Ok(());
}
}
Err(other) => return Err(VideoDecodeError::Decode(other)),
},
DecodeState::Sw(sw) => {
return match sw.receive_frame(&mut self.sw_scratch) {
Ok(()) => self.deliver_frame(dst),
Err(e) => Err(VideoDecodeError::Decode(Error::Ffmpeg(e))),
};
}
}
}
}
fn send_eof(&mut self) -> Result<(), Self::Error> {
let outcome = match &mut self.state {
DecodeState::Hw(hw) => match hw.send_eof() {
Ok(()) => Ok(()),
Err(Error::AllBackendsFailed(p)) => {
let unconsumed_packets = p.into_unconsumed_packets();
self.eof_sent = true;
self
.fall_back_to_sw(unconsumed_packets)
.map_err(VideoDecodeError::Decode)?;
Ok(())
}
Err(other) => Err(VideoDecodeError::Decode(other)),
},
DecodeState::Sw(sw) => sw
.send_eof()
.map_err(|e| VideoDecodeError::Decode(Error::Ffmpeg(e))),
};
if outcome.is_ok() {
self.eof_sent = true;
}
outcome
}
fn flush(&mut self) -> Result<(), Self::Error> {
self.sw_replay_frames.clear();
self.eof_sent = false;
match &mut self.state {
DecodeState::Hw(hw) => hw.flush(),
DecodeState::Sw(sw) => sw.flush(),
}
Ok(())
}
}
fn open_sw_decoder(parameters: &Parameters) -> Result<ffmpeg_next::decoder::Video, Error> {
let ctx = build_codec_context(parameters)?;
ctx.decoder().video().map_err(Error::Ffmpeg)
}
#[derive(thiserror::Error, Debug)]
pub enum VideoDecodeError {
#[error(transparent)]
Decode(#[from] Error),
#[error(transparent)]
Convert(#[from] ConvertError),
}