use std::path::Path;
use std::sync::Arc;
use pixelflow_core::{
ClipFormat, ClipMedia, ClipResolution, ErrorCategory, ErrorCode, FrameCount, Graph, LogLevel,
Logger, NodeId, NodeKind, PixelFlowError, RenderExecutorMap, Result,
};
use super::backend::{BackendProgress, Ffms2Backend, SourceVideo, SystemFfms2Backend};
use super::cache::{SourceFingerprint, read_cache, write_cache};
use super::executor::Ffms2SourceExecutor;
use super::options::{Ffms2SourceOptions, VfrMode};
pub struct SourceIndexContext<'a> {
script_dir: Option<&'a Path>,
logger: Logger,
}
impl<'a> SourceIndexContext<'a> {
#[must_use]
pub const fn new(script_dir: Option<&'a Path>, logger: Logger) -> Self {
Self { script_dir, logger }
}
#[cfg(test)]
pub(crate) fn for_tests(script_dir: Option<&'a Path>) -> Self {
Self::new(script_dir, Logger::default())
}
#[cfg(test)]
pub(crate) const fn for_tests_with_logger(
script_dir: Option<&'a Path>,
logger: Logger,
) -> Self {
Self::new(script_dir, logger)
}
}
pub trait IndexProgressSink: Send {
fn begin_source(&mut self, path: &Path);
fn progress(&mut self, path: &Path, current: usize, total: usize);
fn cache_hit(&mut self, path: &Path);
fn finish_source(&mut self, path: &Path);
}
pub struct NoopIndexProgressSink;
impl IndexProgressSink for NoopIndexProgressSink {
fn begin_source(&mut self, _path: &Path) {}
fn progress(&mut self, _path: &Path, _current: usize, _total: usize) {}
fn cache_hit(&mut self, _path: &Path) {}
fn finish_source(&mut self, _path: &Path) {}
}
pub struct IndexedSources {
graph: Graph,
executors: RenderExecutorMap,
}
impl IndexedSources {
#[must_use]
pub const fn graph(&self) -> &Graph {
&self.graph
}
#[must_use]
pub const fn executors(&self) -> &RenderExecutorMap {
&self.executors
}
#[must_use]
pub fn into_graph_and_executors(self) -> (Graph, RenderExecutorMap) {
(self.graph, self.executors)
}
}
pub fn index_reachable_sources(
graph: Graph,
context: &SourceIndexContext<'_>,
progress: &mut dyn IndexProgressSink,
) -> Result<IndexedSources> {
index_reachable_sources_with_backend(graph, context, &SystemFfms2Backend::new(), progress)
}
pub(crate) fn index_reachable_sources_with_backend<B: Ffms2Backend>(
mut graph: Graph,
context: &SourceIndexContext<'_>,
backend: &B,
progress: &mut dyn IndexProgressSink,
) -> Result<IndexedSources>
where
B::Video: 'static,
{
let reachable_sources = graph.validation_plan()?.reachable_sources().to_vec();
let mut executors = RenderExecutorMap::new();
for node_id in reachable_sources {
let request = source_request(&graph, node_id)?;
let options = Ffms2SourceOptions::from_request(&request, context.script_dir)?;
progress.begin_source(options.source_path());
let fingerprint = SourceFingerprint::from_file(options.source_path())?;
let index = if let Some(bytes) = read_cache(options.cache_path(), &fingerprint)? {
progress.cache_hit(options.source_path());
backend.read_index_from_bytes(&bytes)?
} else {
let mut backend_progress = ProgressAdapter {
path: options.source_path(),
sink: progress,
};
let index = backend.index(
options.source_path(),
options.track(),
&mut backend_progress,
)?;
let bytes = backend.write_index_to_bytes(options.source_path(), &index)?;
write_cache(options.cache_path(), &fingerprint, &bytes)?;
index
};
let video = backend.open_video(
options.source_path(),
&index,
options.track(),
options.threads(),
options.output_format(),
)?;
let properties = video.properties();
let frame_rate = options
.frame_rate_override()
.or(properties.frame_rate)
.ok_or_else(|| {
PixelFlowError::new(
ErrorCategory::Source,
ErrorCode::new("source.unknown_frame_rate"),
format!(
"source '{}' does not report a usable frame rate; supply fps explicitly",
options.source_path().display()
),
)
})?;
if properties.variable_timestamps
&& matches!(options.vfr_mode(), VfrMode::ImplicitNormalize)
{
context.logger.log(
LogLevel::Warn,
"pixelflow_source_ffms2",
format!(
"normalizing VFR source '{}' to CFR using detected or explicit frame rate",
options.source_path().display()
),
);
}
let media = ClipMedia::new(
ClipFormat::Fixed(options.output_format().clone()),
ClipResolution::Fixed {
width: properties.width,
height: properties.height,
},
FrameCount::Finite(properties.frame_count),
pixelflow_core::FrameRate::Cfr(frame_rate),
);
graph = graph.with_source_media(node_id, media)?;
executors.insert(
node_id,
Arc::new(Ffms2SourceExecutor::new(
Box::new(video),
options.output_format().clone(),
properties.width,
properties.height,
options.source_path().to_path_buf(),
)),
);
progress.finish_source(options.source_path());
}
Ok(IndexedSources { graph, executors })
}
fn source_request(graph: &Graph, node_id: NodeId) -> Result<pixelflow_core::SourceRequest> {
let node = graph.node(node_id).ok_or_else(|| {
PixelFlowError::new(
ErrorCategory::Graph,
ErrorCode::new("graph.invalid_clip"),
format!("reachable source node {} is missing", node_id.index()),
)
})?;
match node.kind() {
NodeKind::Source { request, .. } => Ok(request.clone()),
_ => Err(PixelFlowError::new(
ErrorCategory::Graph,
ErrorCode::new("graph.invalid_clip"),
format!("reachable node {} is not a source", node_id.index()),
)),
}
}
struct ProgressAdapter<'a> {
path: &'a Path,
sink: &'a mut dyn IndexProgressSink,
}
impl BackendProgress for ProgressAdapter<'_> {
fn update(&mut self, current: usize, total: usize) {
self.sink.progress(self.path, current, total);
}
}
#[cfg(test)]
mod tests {
#![expect(clippy::indexing_slicing, reason = "allow in tests")]
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use pixelflow_core::{
ClipFormat, ClipMedia, ClipResolution, ErrorCode, FilterChangeSet, FilterCompatibility,
FrameCount, FrameRate, LogLevel, LogRecord, LogSink, Logger, NodeKind, Rational,
resolve_format_alias,
};
use crate::ffms2_source::backend::FakeBackend;
use super::{IndexProgressSink, SourceIndexContext, index_reachable_sources_with_backend};
#[derive(Default)]
struct RecordingProgress {
updates: Vec<(PathBuf, usize, usize)>,
}
impl IndexProgressSink for RecordingProgress {
fn begin_source(&mut self, _path: &Path) {}
fn progress(&mut self, path: &Path, current: usize, total: usize) {
self.updates.push((path.to_path_buf(), current, total));
}
fn cache_hit(&mut self, _path: &Path) {}
fn finish_source(&mut self, _path: &Path) {}
}
#[derive(Default)]
struct RecordingSink {
records: Mutex<Vec<LogRecord>>,
}
impl RecordingSink {
fn records(&self) -> Vec<LogRecord> {
self.records.lock().expect("record lock poisoned").clone()
}
}
impl LogSink for RecordingSink {
fn log(&self, record: &LogRecord) {
self.records
.lock()
.expect("record lock poisoned")
.push(record.clone());
}
}
fn unknown_source_media() -> ClipMedia {
ClipMedia::new(
ClipFormat::Fixed(resolve_format_alias("yuv420p8").expect("format")),
ClipResolution::Fixed {
width: 1,
height: 1,
},
FrameCount::Unknown,
FrameRate::Unknown,
)
}
fn single_source_graph(path: &str) -> pixelflow_core::Graph {
let mut builder = pixelflow_core::GraphBuilder::new();
let source = builder.source_with_request(
pixelflow_core::SourceRequest::new(path),
unknown_source_media(),
);
builder.set_output(source);
builder.build()
}
#[test]
fn indexes_only_reachable_sources() {
let temp = tempdir().expect("tempdir");
std::fs::write(temp.path().join("used.mkv"), b"media").expect("source");
let mut builder = pixelflow_core::GraphBuilder::new();
let unused = builder.source_with_request(
pixelflow_core::SourceRequest::new("unused.mkv"),
unknown_source_media(),
);
let used = builder.source_with_request(
pixelflow_core::SourceRequest::new("used.mkv"),
unknown_source_media(),
);
builder.set_output(used);
let graph = builder.build();
assert_ne!(unused.node_id(), used.node_id());
let backend = FakeBackend::default_with_cfr();
let indexed = index_reachable_sources_with_backend(
graph,
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
)
.expect("indexing succeeds");
assert_eq!(backend.indexed_paths(), vec![temp.path().join("used.mkv")]);
assert_eq!(
indexed
.graph()
.validation_plan()
.expect("validation plan")
.reachable_sources(),
&[used.node_id()]
);
}
#[test]
fn reuses_cache_when_fingerprint_matches() {
let temp = tempdir().expect("tempdir");
let source = temp.path().join("input.mkv");
std::fs::write(&source, b"media").expect("source");
let graph = single_source_graph("input.mkv");
let backend = FakeBackend::default_with_cfr();
index_reachable_sources_with_backend(
graph.clone(),
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
)
.expect("first index");
backend.reset_counts();
index_reachable_sources_with_backend(
graph,
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
)
.expect("second index");
assert_eq!(backend.index_count(), 0);
assert_eq!(backend.read_index_count(), 1);
}
#[test]
fn unknown_frame_rate_errors_without_override() {
let temp = tempdir().expect("tempdir");
std::fs::write(temp.path().join("input.mkv"), b"media").expect("source");
let graph = single_source_graph("input.mkv");
let backend = FakeBackend::default_with_unknown_rate();
let result = index_reachable_sources_with_backend(
graph,
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
);
let Err(error) = result else {
panic!("unknown rate should fail");
};
assert_eq!(error.code(), ErrorCode::new("source.unknown_frame_rate"));
}
#[test]
fn implicit_vfr_normalization_emits_warning() {
let temp = tempdir().expect("tempdir");
std::fs::write(temp.path().join("input.mkv"), b"media").expect("source");
let graph = single_source_graph("input.mkv");
let backend = FakeBackend::default_with_vfr();
let sink = Arc::new(RecordingSink::default());
let logger = Logger::new(sink.clone());
index_reachable_sources_with_backend(
graph,
&SourceIndexContext::for_tests_with_logger(Some(temp.path()), logger),
&backend,
&mut RecordingProgress::default(),
)
.expect("indexing succeeds");
assert!(sink.records().iter().any(|record| {
record.level() == LogLevel::Warn && record.message().contains("normalizing VFR source")
}));
}
#[test]
fn indexed_graph_replaces_unknown_media_with_fixed_properties() {
let temp = tempdir().expect("tempdir");
std::fs::write(temp.path().join("input.mkv"), b"media").expect("source");
let graph = single_source_graph("input.mkv");
let backend = FakeBackend::default_with_cfr();
let indexed = index_reachable_sources_with_backend(
graph,
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
)
.expect("indexing succeeds");
let node = indexed
.graph()
.node(indexed.graph().outputs()[0].node_id())
.expect("source exists");
let NodeKind::Source { .. } = node.kind() else {
panic!("expected source node");
};
assert_eq!(node.media().frame_count(), FrameCount::Finite(2));
assert_eq!(
node.media().frame_rate(),
FrameRate::Cfr(Rational {
numerator: 24,
denominator: 1,
})
);
}
#[test]
fn indexed_source_media_propagates_to_downstream_filters() {
let temp = tempdir().expect("tempdir");
std::fs::write(temp.path().join("input.mkv"), b"media").expect("source");
let mut builder = pixelflow_core::GraphBuilder::new();
let source = builder.source_with_request(
pixelflow_core::SourceRequest::new("input.mkv"),
unknown_source_media(),
);
let resized = builder
.filter(
"resize",
&[source],
ClipMedia::new(
ClipFormat::Fixed(resolve_format_alias("yuv420p8").expect("format")),
ClipResolution::Fixed {
width: 8,
height: 8,
},
FrameCount::Unknown,
FrameRate::Unknown,
),
FilterCompatibility::AllowChanges(FilterChangeSet {
format: false,
resolution: true,
frame_count: false,
frame_rate: false,
}),
)
.expect("resize filter");
let converted = builder
.filter(
"convert_format",
&[resized],
ClipMedia::new(
ClipFormat::Fixed(resolve_format_alias("yuv420p10").expect("format")),
ClipResolution::Fixed {
width: 8,
height: 8,
},
FrameCount::Unknown,
FrameRate::Unknown,
),
FilterCompatibility::AllowChanges(FilterChangeSet {
format: true,
resolution: false,
frame_count: false,
frame_rate: false,
}),
)
.expect("convert filter");
builder.set_output(converted);
let backend = FakeBackend::default_with_cfr();
let indexed = index_reachable_sources_with_backend(
builder.build(),
&SourceIndexContext::for_tests(Some(temp.path())),
&backend,
&mut RecordingProgress::default(),
)
.expect("indexing succeeds");
indexed
.graph()
.validate()
.expect("downstream filter media should validate after indexing");
}
}