use crate::common::errors::*;
use image::{DynamicImage, ImageBuffer, Rgb};
use ffmpeg_next as ffmpeg;
use ffmpeg_next::format::{input, input_with_dictionary, Pixel};
use ffmpeg_next::media::Type;
use ffmpeg_next::software::scaling::{context::Context as ScalingContext, flag::Flags};
use ffmpeg_next::util::frame::video::Video as FfmpegFrame;
use ffmpeg_next::Dictionary;
use std::sync::Once;
static FFMPEG_INIT: Once = Once::new();
fn ensure_ffmpeg_init() {
FFMPEG_INIT.call_once(|| {
ffmpeg::init().expect("Failed to initialize ffmpeg");
ffmpeg::log::set_level(ffmpeg::log::Level::Fatal);
});
}
pub fn is_stream_url(path: &str) -> bool {
const STREAM_SCHEMES: &[&str] = &[
"http://",
"https://",
"rtsp://",
"rtsps://",
"rtmp://",
"rtmps://",
"rtmpe://",
"rtmpte://",
"srt://",
"udp://",
"tcp://",
"rtp://",
"mms://",
"mmsh://",
"mmst://",
"hls+http://",
"hls+https://",
];
STREAM_SCHEMES.iter().any(|scheme| path.starts_with(scheme))
}
#[allow(dead_code)]
pub struct VideoDecoder {
input_ctx: ffmpeg::format::context::Input,
video_stream_index: usize,
decoder: ffmpeg::decoder::Video,
scaler: ScalingContext,
total_frames: i64,
time_base: ffmpeg::Rational,
stream_duration: i64,
current_frame: i64,
fps: f64,
eof: bool,
is_streaming: bool,
}
unsafe impl Send for VideoDecoder {}
impl VideoDecoder {
pub fn open(path: &str) -> Result<Self, MyError> {
ensure_ffmpeg_init();
let is_streaming = path.starts_with("http://") || path.starts_with("https://");
let input_ctx = if is_streaming {
let mut opts = Dictionary::new();
opts.set("buffer_size", "5242880");
opts.set("analyzeduration", "10000000");
opts.set("probesize", "5000000");
input_with_dictionary(&path, opts)
} else {
input(&path)
}
.map_err(|e| {
MyError::Application(format!("{}: {} ({:?})", ERROR_OPENING_VIDEO, path, e))
})?;
let stream = input_ctx.streams().best(Type::Video).ok_or_else(|| {
MyError::Application(format!("{}: no video stream", ERROR_OPENING_VIDEO))
})?;
let video_stream_index = stream.index();
let time_base = stream.time_base();
let stream_duration = stream.duration();
let total_frames = stream.frames();
let context_decoder = ffmpeg::codec::context::Context::from_parameters(stream.parameters())
.map_err(|e| MyError::Application(format!("{}: {:?}", ERROR_OPENING_VIDEO, e)))?;
let decoder = context_decoder
.decoder()
.video()
.map_err(|e| MyError::Application(format!("{}: {:?}", ERROR_OPENING_VIDEO, e)))?;
let fps = {
let r = stream.avg_frame_rate();
if r.denominator() != 0 {
r.numerator() as f64 / r.denominator() as f64
} else {
30.0 }
};
let scaler = ScalingContext::get(
decoder.format(),
decoder.width(),
decoder.height(),
Pixel::RGB24,
decoder.width(),
decoder.height(),
Flags::BILINEAR,
)
.map_err(|e| {
MyError::Application(format!(
"{}: failed to create scaler ({:?})",
ERROR_OPENING_VIDEO, e
))
})?;
Ok(Self {
input_ctx,
video_stream_index,
decoder,
scaler,
total_frames,
time_base,
stream_duration,
current_frame: 0,
fps,
eof: false,
is_streaming,
})
}
#[allow(dead_code)]
pub fn fps(&self) -> f64 {
self.fps
}
pub fn dimensions(&self) -> (u32, u32) {
(self.decoder.width(), self.decoder.height())
}
pub fn is_streaming(&self) -> bool {
self.is_streaming
}
pub fn duration_secs(&self) -> Option<f64> {
if self.stream_duration > 0 && self.time_base.numerator() > 0 {
return Some(
self.stream_duration as f64 * self.time_base.numerator() as f64
/ self.time_base.denominator() as f64,
);
}
let fmt_duration = self.input_ctx.duration();
if fmt_duration > 0 {
return Some(fmt_duration as f64 / 1_000_000.0);
}
None
}
pub fn next_frame(&mut self) -> Option<DynamicImage> {
if self.eof {
return None;
}
if let Some(img) = self.receive_frame() {
return Some(img);
}
loop {
match self.next_video_packet() {
Some(packet) => {
if self.decoder.send_packet(&packet).is_err() {
continue;
}
if let Some(img) = self.receive_frame() {
return Some(img);
}
}
None => {
let _ = self.decoder.send_eof();
let img = self.receive_frame();
self.eof = true;
return img;
}
}
}
}
fn receive_frame(&mut self) -> Option<DynamicImage> {
let mut decoded = FfmpegFrame::empty();
if self.decoder.receive_frame(&mut decoded).is_ok() {
if let Some(pts) = decoded.pts() {
let secs = pts as f64 * self.time_base.numerator() as f64
/ self.time_base.denominator() as f64;
self.current_frame = (secs * self.fps).round() as i64;
} else {
self.current_frame += 1;
}
let mut rgb_frame = FfmpegFrame::empty();
self.scaler.run(&decoded, &mut rgb_frame).ok()?;
frame_to_image(&rgb_frame)
} else {
None
}
}
fn next_video_packet(&mut self) -> Option<ffmpeg::Packet> {
loop {
let mut packet_iter = self.input_ctx.packets();
match packet_iter.next() {
Some((stream, packet)) => {
if stream.index() == self.video_stream_index {
return Some(packet);
}
}
None => return None,
}
}
}
pub fn skip_frames(&mut self, n: usize) {
for _ in 0..n {
if self.next_frame().is_none() {
break;
}
}
}
pub fn reset(&mut self) {
let _ = self.input_ctx.seek(0, ..i64::MAX);
self.decoder.flush();
self.current_frame = 0;
self.eof = false;
}
pub fn is_at_end(&self) -> bool {
self.eof
}
pub fn seek_to_seconds_abs(&mut self, seconds: f64) -> bool {
self.seek_to_seconds(seconds.max(0.0))
}
pub fn seek_seconds(&mut self, seconds: f64) -> bool {
let current_secs = self.current_frame as f64 / self.fps;
let target_secs = (current_secs + seconds).max(0.0);
self.seek_to_seconds(target_secs)
}
fn seek_to_seconds(&mut self, target_secs: f64) -> bool {
let seek_ts = (target_secs * 1_000_000.0) as i64;
let stream_ts = (target_secs * self.time_base.denominator() as f64
/ self.time_base.numerator() as f64) as i64;
let result = self.input_ctx.seek(seek_ts, ..seek_ts + 1).is_ok();
if result {
self.decoder.flush();
self.eof = false;
self.decode_forward_to(stream_ts);
}
result
}
fn decode_forward_to(&mut self, target_ts: i64) {
loop {
match self.next_video_packet() {
Some(packet) => {
if self.decoder.send_packet(&packet).is_err() {
continue;
}
let mut decoded = FfmpegFrame::empty();
while self.decoder.receive_frame(&mut decoded).is_ok() {
let pts = decoded.pts().unwrap_or(0);
let secs = pts as f64 * self.time_base.numerator() as f64
/ self.time_base.denominator() as f64;
self.current_frame = (secs * self.fps).round() as i64;
if pts >= target_ts {
return;
}
}
}
None => {
self.eof = true;
return;
}
}
}
}
pub fn seek_to_frame(&mut self, target_frame: usize) {
let target_secs = target_frame as f64 / self.fps;
self.seek_to_seconds(target_secs);
}
pub fn get_position_frames(&self) -> i64 {
self.current_frame
}
}
fn frame_to_image(frame: &FfmpegFrame) -> Option<DynamicImage> {
let width = frame.width();
let height = frame.height();
let stride = frame.stride(0);
let data = frame.data(0);
let row_bytes = (width * 3) as usize;
let mut pixels = Vec::with_capacity(row_bytes * height as usize);
for y in 0..height as usize {
let row_start = y * stride;
pixels.extend_from_slice(&data[row_start..row_start + row_bytes]);
}
ImageBuffer::<Rgb<u8>, _>::from_raw(width, height, pixels).map(DynamicImage::ImageRgb8)
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::Command;
use tempfile::NamedTempFile;
fn create_test_video() -> NamedTempFile {
let tmp = tempfile::Builder::new()
.suffix(".mp4")
.tempfile()
.expect("Failed to create temp file");
let path = tmp.path().to_str().unwrap().to_string();
let status = Command::new("ffmpeg")
.args([
"-y",
"-f",
"lavfi",
"-i",
"color=c=red:s=16x16:r=10:d=1",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-t",
"1",
&path,
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.expect("Failed to run ffmpeg to create test video");
assert!(status.success(), "ffmpeg failed to create test video");
tmp
}
#[test]
fn test_open_invalid_path_returns_error() {
let result = VideoDecoder::open("/nonexistent/path/to/video.mp4");
assert!(result.is_err());
}
#[test]
fn test_open_valid_video() {
let tmp = create_test_video();
let decoder = VideoDecoder::open(tmp.path().to_str().unwrap());
assert!(decoder.is_ok());
let decoder = decoder.unwrap();
assert!(decoder.fps() > 0.0);
assert!(!decoder.is_at_end());
assert_eq!(decoder.get_position_frames(), 0);
}
#[test]
fn test_next_frame_returns_valid_image() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
let frame = decoder.next_frame();
assert!(frame.is_some());
let img = frame.unwrap();
assert_eq!(img.width(), 16);
assert_eq!(img.height(), 16);
assert_eq!(decoder.get_position_frames(), 1);
}
#[test]
fn test_next_frame_rgb_values() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
let frame = decoder.next_frame().unwrap();
let rgb = frame.to_rgb8();
let pixel = rgb.get_pixel(8, 8);
assert!(
pixel[0] > 200,
"Red channel should be high, got {}",
pixel[0]
);
assert!(
pixel[1] < 50,
"Green channel should be low, got {}",
pixel[1]
);
assert!(
pixel[2] < 50,
"Blue channel should be low, got {}",
pixel[2]
);
}
#[test]
fn test_skip_frames_advances_position() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
decoder.skip_frames(3);
assert_eq!(decoder.get_position_frames(), 3);
}
#[test]
fn test_eof_after_all_frames() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
while decoder.next_frame().is_some() {}
assert!(decoder.is_at_end());
assert!(decoder.next_frame().is_none());
}
#[test]
fn test_reset_returns_to_start() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
for _ in 0..5 {
decoder.next_frame();
}
assert!(decoder.get_position_frames() > 0);
decoder.reset();
assert_eq!(decoder.get_position_frames(), 0);
assert!(!decoder.is_at_end());
let frame = decoder.next_frame();
assert!(frame.is_some());
}
#[test]
fn test_seek_seconds_forward() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
let result = decoder.seek_seconds(0.5);
assert!(result);
assert!(!decoder.is_at_end());
let frame = decoder.next_frame();
assert!(frame.is_some());
}
#[test]
fn test_seek_seconds_backward_clamps_to_zero() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
for _ in 0..5 {
decoder.next_frame();
}
let result = decoder.seek_seconds(-100.0);
assert!(result);
assert_eq!(decoder.get_position_frames(), 0);
}
#[test]
fn test_seek_to_frame() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
decoder.seek_to_frame(5);
assert!(!decoder.is_at_end());
let frame = decoder.next_frame();
assert!(frame.is_some());
}
#[test]
fn test_reset_after_eof() {
let tmp = create_test_video();
let mut decoder = VideoDecoder::open(tmp.path().to_str().unwrap()).unwrap();
while decoder.next_frame().is_some() {}
assert!(decoder.is_at_end());
decoder.reset();
assert!(!decoder.is_at_end());
assert_eq!(decoder.get_position_frames(), 0);
let frame = decoder.next_frame();
assert!(frame.is_some());
}
}