use std::fs;
use std::path::Path;
use std::thread;
#[cfg(test)]
use std::path::PathBuf;
#[cfg(test)]
use std::sync::{Arc, Mutex};
use ffms2::{FFMS2, frame::Resizers, track::TrackType, video::SeekMode};
use pixelflow_core::{
ErrorCategory, ErrorCode, FormatDescriptor, PixelFlowError, Rational, Result,
};
use semisafe::slice::get as semisafe_get;
use tempfile::tempdir;
pub(crate) trait BackendProgress: Send {
fn update(&mut self, current: usize, total: usize);
}
pub(crate) trait SourceVideo {
fn properties(&self) -> VideoProperties;
fn frame(&mut self, frame_number: usize) -> Result<DecodedFrame>;
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct VideoProperties {
pub width: usize,
pub height: usize,
pub frame_count: usize,
pub frame_rate: Option<Rational>,
pub variable_timestamps: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct DecodedFrame {
pub format: FormatDescriptor,
pub width: usize,
pub height: usize,
pub planes: Vec<DecodedPlane>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct DecodedPlane {
pub width: usize,
pub height: usize,
pub stride_bytes: usize,
pub bytes: Vec<u8>,
}
pub(crate) trait Ffms2Backend {
type Index: Send;
type Video: SourceVideo + Send + 'static;
fn read_index_from_bytes(&self, bytes: &[u8]) -> Result<Self::Index>;
fn write_index_to_bytes(&self, source_path: &Path, index: &Self::Index) -> Result<Vec<u8>>;
fn index(
&self,
source_path: &Path,
track: Option<usize>,
progress: &mut dyn BackendProgress,
) -> Result<Self::Index>;
fn open_video(
&self,
source_path: &Path,
index: &Self::Index,
track: Option<usize>,
threads: usize,
format: &FormatDescriptor,
) -> Result<Self::Video>;
}
pub(crate) struct SystemFfms2Backend;
impl SystemFfms2Backend {
pub(crate) const fn new() -> Self {
Self
}
}
pub(crate) struct SystemIndex {
inner: ffms2::index::Index,
}
pub(crate) struct SystemVideo {
video: ffms2::video::VideoSource,
properties: VideoProperties,
format: FormatDescriptor,
}
impl Ffms2Backend for SystemFfms2Backend {
type Index = SystemIndex;
type Video = SystemVideo;
fn read_index_from_bytes(&self, bytes: &[u8]) -> Result<Self::Index> {
FFMS2::Init();
let temp = tempdir().map_err(|error| {
ffms2_index_error(format!(
"failed to create temporary index directory: {error}"
))
})?;
let path = temp.path().join("cache.ffindex");
fs::write(&path, bytes).map_err(|error| {
ffms2_index_error(format!("failed to materialize cached index: {error}"))
})?;
let inner = ffms2::index::Index::new(&path).map_err(|error| {
ffms2_index_error(format!("failed to read cached index: {error:?}"))
})?;
Ok(SystemIndex { inner })
}
fn write_index_to_bytes(&self, _source_path: &Path, index: &Self::Index) -> Result<Vec<u8>> {
let temp = tempdir().map_err(|error| {
ffms2_index_error(format!(
"failed to create temporary index directory: {error}"
))
})?;
let path = temp.path().join("cache.ffindex");
index.inner.WriteIndex(&path).map_err(|error| {
ffms2_index_error(format!("failed to write FFMS2 index: {error:?}"))
})?;
fs::read(&path).map_err(|error| {
ffms2_index_error(format!("failed to read written FFMS2 index: {error}"))
})
}
fn index(
&self,
source_path: &Path,
track: Option<usize>,
progress: &mut dyn BackendProgress,
) -> Result<Self::Index> {
FFMS2::Init();
let indexer = ffms2::index::Indexer::new(source_path).map_err(|error| {
ffms2_index_error(format!("failed to create FFMS2 indexer: {error:?}"))
})?;
configure_indexer(&indexer, track);
let (tx, rx) = std::sync::mpsc::channel::<Option<(usize, usize)>>();
let mut callback_state = 0usize;
let callback_tx = tx.clone();
indexer.ProgressCallback(
move |current, total, _| {
let _ = callback_tx.send(Some((current, total)));
0
},
&mut callback_state,
);
let inner = thread::scope(|scope| {
let progress_handle = scope.spawn(move || {
while let Ok(update) = rx.recv() {
let Some((current, total)) = update else {
break;
};
progress.update(current, total);
}
});
let result = indexer
.DoIndexing2(ffms2::IndexErrorHandling::IEH_ABORT)
.map_err(|error| {
ffms2_index_error(format!("failed to build FFMS2 index: {error:?}"))
});
let _ = tx.send(None);
let _ = progress_handle.join();
result
})?;
Ok(SystemIndex { inner })
}
fn open_video(
&self,
source_path: &Path,
index: &Self::Index,
track: Option<usize>,
threads: usize,
format: &FormatDescriptor,
) -> Result<Self::Video> {
FFMS2::Init();
let track = match track {
Some(track) => track,
None => index
.inner
.FirstIndexedTrackOfType(TrackType::TYPE_VIDEO)
.map_err(|error| {
ffms2_open_error(format!("failed to locate indexed video track: {error:?}"))
})?,
};
let mut video = ffms2::video::VideoSource::new(
source_path,
track,
&index.inner,
threads,
SeekMode::SEEK_NORMAL,
)
.map_err(|error| {
ffms2_open_error(format!("failed to open FFMS2 video source: {error:?}"))
})?;
let probe = ffms2::frame::Frame::GetFrame(&mut video, 0).map_err(|error| {
ffms2_decode_error(format!("failed to decode probe frame: {error:?}"))
})?;
let resolution = probe.get_frame_resolution();
let width = usize::try_from(resolution.width)
.map_err(|_| ffms2_decode_error("decoded frame width does not fit platform usize"))?;
let height = usize::try_from(resolution.height)
.map_err(|_| ffms2_decode_error("decoded frame height does not fit platform usize"))?;
let pixel_format = ffms2::frame::Frame::GetPixFmt(ffmpeg_pixel_format_name(format)?);
if pixel_format < 0 {
return Err(ffms2_open_error(format!(
"failed to resolve FFmpeg pixel format for '{}'",
format.name()
)));
}
let mut target_formats = vec![pixel_format];
video
.SetOutputFormatV2(&mut target_formats, width, height, Resizers::RESIZER_POINT)
.map_err(|error| {
ffms2_open_error(format!(
"failed to set FFMS2 output format '{}': {error:?}",
format.name()
))
})?;
let props = video.GetVideoProperties();
let frame_rate = (props.FPSNumerator > 0 && props.FPSDenominator > 0).then(|| Rational {
numerator: i64::from(props.FPSNumerator),
denominator: i64::from(props.FPSDenominator),
});
let frame_count = usize::try_from(props.NumFrames)
.map_err(|_| ffms2_open_error("FFMS2 reported negative frame count"))?;
let variable_timestamps = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
is_variable_timestamps(&mut video, frame_count)
}))
.unwrap_or(false);
Ok(SystemVideo {
video,
properties: VideoProperties {
width,
height,
frame_count,
frame_rate,
variable_timestamps,
},
format: format.clone(),
})
}
}
impl SourceVideo for SystemVideo {
fn properties(&self) -> VideoProperties {
self.properties
}
fn frame(&mut self, frame_number: usize) -> Result<DecodedFrame> {
let frame =
ffms2::frame::Frame::GetFrame(&mut self.video, frame_number).map_err(|error| {
ffms2_decode_error(format!("failed to decode frame {frame_number}: {error:?}"))
})?;
let plane_bytes = frame.get_pixel_data().ok_or_else(|| {
ffms2_decode_error(format!(
"failed to inspect decoded pixel planes for frame {frame_number}"
))
})?;
let mut planes = Vec::with_capacity(self.format.planes().len());
for (index, descriptor) in self.format.planes().iter().enumerate() {
let bytes = plane_bytes
.get(index)
.and_then(|plane| plane.as_ref())
.ok_or_else(|| {
ffms2_decode_error(format!(
"decoded frame {frame_number} is missing plane {index}"
))
})?;
let stride_bytes =
usize::try_from(*semisafe_get(&frame.Linesize, index)).map_err(|_| {
ffms2_decode_error(format!(
"decoded frame {frame_number} plane {index} has negative stride"
))
})?;
let width = self.properties.width.div_ceil(descriptor.width_divisor);
let height = self.properties.height.div_ceil(descriptor.height_divisor);
planes.push(DecodedPlane {
width,
height,
stride_bytes,
bytes: bytes.to_vec(),
});
}
Ok(DecodedFrame {
format: self.format.clone(),
width: self.properties.width,
height: self.properties.height,
planes,
})
}
}
fn configure_indexer(indexer: &ffms2::index::Indexer, track: Option<usize>) {
for current_track in 0..indexer.NumTracksI() {
let index_flag = match indexer.TrackTypeI(current_track) {
TrackType::TYPE_VIDEO => usize::from(track.is_none_or(|track| track == current_track)),
_ => 0,
};
indexer.TrackIndexSettings(current_track, index_flag);
}
}
fn is_variable_timestamps(video: &mut ffms2::video::VideoSource, frame_count: usize) -> bool {
if frame_count < 3 {
return false;
}
let track = ffms2::track::Track::TrackFromVideo(video);
let mut previous_pts = None;
let mut previous_delta = None;
for frame_number in 0..frame_count.min(32) {
let pts = track.FrameInfo(frame_number).PTS;
if let Some(previous_pts) = previous_pts {
let delta = pts - previous_pts;
if let Some(previous_delta) = previous_delta {
if delta != previous_delta {
return true;
}
} else {
previous_delta = Some(delta);
}
}
previous_pts = Some(pts);
}
false
}
fn ffmpeg_pixel_format_name(format: &FormatDescriptor) -> Result<&'static str> {
match format.name() {
"gray8" => Ok("gray"),
"gray10" => Ok("gray10le"),
"gray12" => Ok("gray12le"),
"gray16" => Ok("gray16le"),
"yuv420p8" => Ok("yuv420p"),
"yuv420p10" => Ok("yuv420p10le"),
"yuv420p12" => Ok("yuv420p12le"),
"yuv420p16" => Ok("yuv420p16le"),
"yuv422p8" => Ok("yuv422p"),
"yuv422p10" => Ok("yuv422p10le"),
"yuv422p12" => Ok("yuv422p12le"),
"yuv422p16" => Ok("yuv422p16le"),
"yuv444p8" => Ok("yuv444p"),
"yuv444p10" => Ok("yuv444p10le"),
"yuv444p12" => Ok("yuv444p12le"),
"yuv444p16" => Ok("yuv444p16le"),
other => Err(PixelFlowError::new(
ErrorCategory::Format,
ErrorCode::new("format.unsupported_alias"),
format!("FFMS2 source cannot output format '{other}'"),
)),
}
}
fn ffms2_index_error(message: impl Into<String>) -> PixelFlowError {
PixelFlowError::new(
ErrorCategory::Source,
ErrorCode::new("source.ffms2_index"),
message,
)
}
fn ffms2_open_error(message: impl Into<String>) -> PixelFlowError {
PixelFlowError::new(
ErrorCategory::Source,
ErrorCode::new("source.ffms2_open"),
message,
)
}
fn ffms2_decode_error(message: impl Into<String>) -> PixelFlowError {
PixelFlowError::new(
ErrorCategory::Source,
ErrorCode::new("source.ffms2_decode"),
message,
)
}
#[cfg(test)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct FakeVideoProperties {
pub width: usize,
pub height: usize,
pub frame_count: usize,
pub frame_rate: Option<Rational>,
pub variable_timestamps: bool,
}
#[cfg(test)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct FakeIndex {
path: PathBuf,
}
#[cfg(test)]
#[derive(Clone)]
pub(crate) struct FakeBackend {
state: Arc<Mutex<FakeBackendState>>,
}
#[cfg(test)]
#[derive(Debug)]
struct FakeBackendState {
properties: FakeVideoProperties,
index_count: usize,
read_index_count: usize,
indexed_paths: Vec<PathBuf>,
}
#[cfg(test)]
impl FakeBackend {
pub(crate) fn new(properties: FakeVideoProperties) -> Self {
Self {
state: Arc::new(Mutex::new(FakeBackendState {
properties,
index_count: 0,
read_index_count: 0,
indexed_paths: Vec::new(),
})),
}
}
pub(crate) fn default_with_cfr() -> Self {
Self::new(FakeVideoProperties {
width: 4,
height: 2,
frame_count: 2,
frame_rate: Some(Rational {
numerator: 24,
denominator: 1,
}),
variable_timestamps: false,
})
}
pub(crate) fn default_with_unknown_rate() -> Self {
Self::new(FakeVideoProperties {
width: 4,
height: 2,
frame_count: 2,
frame_rate: None,
variable_timestamps: false,
})
}
pub(crate) fn default_with_vfr() -> Self {
Self::new(FakeVideoProperties {
width: 4,
height: 2,
frame_count: 2,
frame_rate: Some(Rational {
numerator: 24,
denominator: 1,
}),
variable_timestamps: true,
})
}
pub(crate) fn indexed_paths(&self) -> Vec<PathBuf> {
self.state
.lock()
.expect("state lock poisoned")
.indexed_paths
.clone()
}
pub(crate) fn index_count(&self) -> usize {
self.state.lock().expect("state lock poisoned").index_count
}
pub(crate) fn read_index_count(&self) -> usize {
self.state
.lock()
.expect("state lock poisoned")
.read_index_count
}
pub(crate) fn reset_counts(&self) {
let mut state = self.state.lock().expect("state lock poisoned");
state.index_count = 0;
state.read_index_count = 0;
state.indexed_paths.clear();
}
}
#[cfg(test)]
impl Ffms2Backend for FakeBackend {
type Index = FakeIndex;
type Video = FakeVideo;
fn read_index_from_bytes(&self, bytes: &[u8]) -> Result<Self::Index> {
let mut state = self.state.lock().expect("state lock poisoned");
state.read_index_count += 1;
Ok(FakeIndex {
path: PathBuf::from(String::from_utf8_lossy(bytes).into_owned()),
})
}
fn write_index_to_bytes(&self, source_path: &Path, _index: &Self::Index) -> Result<Vec<u8>> {
Ok(source_path.to_string_lossy().as_bytes().to_vec())
}
fn index(
&self,
source_path: &Path,
_track: Option<usize>,
progress: &mut dyn BackendProgress,
) -> Result<Self::Index> {
progress.update(1, 2);
progress.update(2, 2);
let mut state = self.state.lock().expect("state lock poisoned");
state.index_count += 1;
state.indexed_paths.push(source_path.to_path_buf());
Ok(FakeIndex {
path: source_path.to_path_buf(),
})
}
fn open_video(
&self,
_source_path: &Path,
_index: &Self::Index,
_track: Option<usize>,
_threads: usize,
format: &FormatDescriptor,
) -> Result<Self::Video> {
let state = self.state.lock().expect("state lock poisoned");
Ok(FakeVideo::new(
VideoProperties {
width: state.properties.width,
height: state.properties.height,
frame_count: state.properties.frame_count,
frame_rate: state.properties.frame_rate,
variable_timestamps: state.properties.variable_timestamps,
},
build_fake_frames(
format,
state.properties.width,
state.properties.height,
state.properties.frame_count,
),
))
}
}
#[cfg(test)]
pub(crate) struct FakeVideo {
properties: VideoProperties,
frames: Vec<DecodedFrame>,
}
#[cfg(test)]
impl FakeVideo {
pub(crate) fn new(properties: VideoProperties, frames: Vec<DecodedFrame>) -> Self {
Self { properties, frames }
}
}
#[cfg(test)]
impl SourceVideo for FakeVideo {
fn properties(&self) -> VideoProperties {
self.properties
}
fn frame(&mut self, frame_number: usize) -> Result<DecodedFrame> {
self.frames.get(frame_number).cloned().ok_or_else(|| {
PixelFlowError::new(
ErrorCategory::Source,
ErrorCode::new("source.ffms2_decode"),
format!("missing fake frame {frame_number}"),
)
})
}
}
#[cfg(test)]
fn build_fake_frames(
format: &FormatDescriptor,
width: usize,
height: usize,
frame_count: usize,
) -> Vec<DecodedFrame> {
let mut frames = Vec::with_capacity(frame_count);
for frame_number in 0..frame_count {
let planes = format
.planes()
.iter()
.enumerate()
.map(|(plane_index, descriptor)| {
let plane_width = width.div_ceil(descriptor.width_divisor);
let plane_height = height.div_ceil(descriptor.height_divisor);
let bytes_per_sample = descriptor.sample_type.bytes_per_sample();
let stride_bytes = plane_width * bytes_per_sample;
let fill = u8::try_from(frame_number + plane_index).unwrap_or(u8::MAX);
DecodedPlane {
width: plane_width,
height: plane_height,
stride_bytes,
bytes: vec![fill; stride_bytes * plane_height],
}
})
.collect();
frames.push(DecodedFrame {
format: format.clone(),
width,
height,
planes,
});
}
frames
}
#[cfg(test)]
#[derive(Default)]
pub(crate) struct NoopProgress;
#[cfg(test)]
impl BackendProgress for NoopProgress {
fn update(&mut self, _current: usize, _total: usize) {}
}
#[cfg(test)]
mod tests {
#![expect(clippy::indexing_slicing, reason = "allow in tests")]
use std::path::Path;
use pixelflow_core::resolve_format_alias;
use super::{FakeBackend, FakeVideoProperties, Ffms2Backend, NoopProgress, SourceVideo};
#[test]
fn fake_backend_reports_properties_and_mutable_internal_frames() {
let backend = FakeBackend::new(FakeVideoProperties {
width: 4,
height: 2,
frame_count: 2,
frame_rate: Some(pixelflow_core::Rational {
numerator: 24,
denominator: 1,
}),
variable_timestamps: false,
});
let index = backend
.index(Path::new("input.mkv"), None, &mut NoopProgress)
.expect("index");
let mut video = backend
.open_video(
Path::new("input.mkv"),
&index,
None,
1,
&resolve_format_alias("gray8").expect("gray8 format"),
)
.expect("video");
let frame = video.frame(0).expect("frame");
assert_eq!(video.properties().frame_count, 2);
assert_eq!(frame.width, 4);
assert_eq!(frame.planes[0].bytes, vec![0; 8]);
}
}