use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result, bail};
use serde::Deserialize;
use crate::job::RungArtifact;
use crate::settings::{
TranscodeSettings, parse_audio, parse_bit_depth, parse_color, parse_gpu_family, parse_mode,
parse_rung, parse_seam,
};
use crate::spec::OutputMode;
#[derive(Debug, Default, Clone, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub struct JobSpec {
pub input: Option<String>,
pub output: Option<String>,
pub mode: Option<String>,
#[serde(default)]
pub rungs: Option<Vec<String>>,
pub ladder: Option<bool>,
pub max_short_side: Option<u32>,
pub segment_seconds: Option<f32>,
pub crf: Option<u8>,
pub speed: Option<u8>,
pub audio: Option<String>,
pub color: Option<String>,
#[serde(alias = "pixel_format")]
pub bit_depth: Option<String>,
pub seam: Option<String>,
pub max_fps: Option<f64>,
pub gpu: Option<u32>,
pub gpu_family: Option<String>,
pub single_gpu: Option<bool>,
pub decode_gpu: Option<u32>,
pub width: Option<u32>,
pub height: Option<u32>,
pub filter: Option<codec::filter::FilterSpec>,
}
impl JobSpec {
fn over(&self, base: &JobSpec) -> JobSpec {
macro_rules! pick {
($f:ident) => {
self.$f.clone().or_else(|| base.$f.clone())
};
}
JobSpec {
input: self.input.clone(),
output: self.output.clone(),
mode: pick!(mode),
rungs: pick!(rungs),
ladder: pick!(ladder),
max_short_side: pick!(max_short_side),
segment_seconds: pick!(segment_seconds),
crf: pick!(crf),
speed: pick!(speed),
audio: pick!(audio),
color: pick!(color),
bit_depth: pick!(bit_depth),
seam: pick!(seam),
max_fps: pick!(max_fps),
gpu: pick!(gpu),
gpu_family: pick!(gpu_family),
single_gpu: pick!(single_gpu),
decode_gpu: pick!(decode_gpu),
width: pick!(width),
height: pick!(height),
filter: pick!(filter),
}
}
pub fn to_settings(&self) -> Result<TranscodeSettings> {
let mut s = TranscodeSettings::default();
if let Some(m) = &self.mode {
s.mode = Some(parse_mode(m)?);
}
if let Some(rungs) = &self.rungs {
for r in rungs {
s.rungs.push(parse_rung(r)?);
}
}
s.ladder = self.ladder.unwrap_or(false);
s.max_short_side = self.max_short_side;
s.segment_seconds = self.segment_seconds;
s.crf = self.crf;
s.speed = self.speed;
if let Some(a) = &self.audio {
s.audio = Some(parse_audio(a)?);
}
if let Some(c) = &self.color {
s.color = Some(parse_color(c)?);
}
if let Some(b) = &self.bit_depth {
s.bit_depth = Some(parse_bit_depth(b)?);
}
if let Some(sm) = &self.seam {
s.seam = Some(parse_seam(sm)?);
}
s.max_fps = self.max_fps;
s.gpu = self.gpu;
if let Some(f) = &self.gpu_family {
s.gpu_family = Some(parse_gpu_family(f)?);
}
s.single_gpu = self.single_gpu.unwrap_or(false);
s.decode_gpu = self.decode_gpu;
s.width = self.width;
s.height = self.height;
if let Some(f) = &self.filter {
s.filters = f.resolve().context("resolving filter")?;
}
Ok(s)
}
}
#[derive(Debug, Default, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub struct Manifest {
#[serde(default)]
pub version: Option<u32>,
#[serde(default)]
pub output_dir: Option<String>,
#[serde(default)]
pub on_error: Option<String>,
#[serde(default)]
pub defaults: JobSpec,
pub jobs: Vec<JobSpec>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Format {
Yaml,
Json,
}
impl Format {
pub fn from_path(path: &Path) -> Format {
match path.extension().and_then(|e| e.to_str()) {
Some(e) if e.eq_ignore_ascii_case("json") => Format::Json,
_ => Format::Yaml, }
}
}
pub fn parse_manifest(text: &str, format: Format) -> Result<Manifest> {
let m: Manifest = match format {
Format::Json => serde_json::from_str(text).context("parsing JSON manifest")?,
Format::Yaml => serde_yaml_ng::from_str(text).context("parsing YAML manifest")?,
};
if m.jobs.is_empty() {
bail!("manifest has no `jobs`");
}
Ok(m)
}
#[derive(Debug)]
pub struct JobOutcome {
pub input: PathBuf,
pub output: Option<PathBuf>,
pub frames: u64,
pub bytes: u64,
pub status: JobStatus,
}
#[derive(Debug)]
pub enum JobStatus {
Ok,
Failed(String),
}
impl JobStatus {
pub fn is_ok(&self) -> bool {
matches!(self, JobStatus::Ok)
}
}
#[derive(Debug, Default)]
pub struct BatchReport {
pub outcomes: Vec<JobOutcome>,
}
impl BatchReport {
pub fn ok_count(&self) -> usize {
self.outcomes.iter().filter(|o| o.status.is_ok()).count()
}
pub fn failed_count(&self) -> usize {
self.outcomes.len() - self.ok_count()
}
pub fn all_ok(&self) -> bool {
self.failed_count() == 0
}
}
#[derive(Debug)]
pub struct PlannedJob {
pub input: PathBuf,
pub spec: JobSpec,
}
pub fn plan_manifest(manifest: &Manifest, base_dir: &Path) -> Result<Vec<PlannedJob>> {
let mut planned = Vec::new();
for (i, job) in manifest.jobs.iter().enumerate() {
let merged = job.over(&manifest.defaults);
let input = merged
.input
.clone()
.with_context(|| format!("job #{} has no `input`", i + 1))?;
for path in expand_input(&input, base_dir)? {
planned.push(PlannedJob {
input: path,
spec: merged.clone(),
});
}
}
Ok(planned)
}
pub fn run_manifest_file(path: &Path) -> Result<BatchReport> {
let text = fs::read_to_string(path)
.with_context(|| format!("reading manifest {}", path.display()))?;
let manifest = parse_manifest(&text, Format::from_path(path))?;
let base_dir = path.parent().unwrap_or(Path::new(".")).to_path_buf();
run_manifest(&manifest, &base_dir)
}
pub fn run_manifest(manifest: &Manifest, base_dir: &Path) -> Result<BatchReport> {
let stop_on_error = matches!(manifest.on_error.as_deref(), Some("stop"));
let manifest_out_dir = manifest.output_dir.as_ref().map(|d| base_dir.join(d));
let planned = plan_manifest(manifest, base_dir)?;
tracing::info!(jobs = planned.len(), "batch: starting");
let mut report = BatchReport::default();
for (i, job) in planned.iter().enumerate() {
let n = i + 1;
let total = planned.len();
tracing::info!(
"batch: [{n}/{total}] {} -> converting",
job.input.display()
);
let outcome = match run_one(&job.input, &job.spec, manifest_out_dir.as_deref(), base_dir) {
Ok((output, frames, bytes)) => {
tracing::info!(
"batch: [{n}/{total}] {} -> {} ({} frames, {} bytes)",
job.input.display(),
output.display(),
frames,
bytes
);
JobOutcome {
input: job.input.clone(),
output: Some(output),
frames,
bytes,
status: JobStatus::Ok,
}
}
Err(e) => {
let msg = format!("{e:#}");
tracing::error!("batch: [{n}/{total}] {} FAILED: {msg}", job.input.display());
JobOutcome {
input: job.input.clone(),
output: None,
frames: 0,
bytes: 0,
status: JobStatus::Failed(msg),
}
}
};
let failed = !outcome.status.is_ok();
report.outcomes.push(outcome);
if failed && stop_on_error {
tracing::warn!("batch: on_error=stop — aborting after a failed job");
break;
}
}
Ok(report)
}
fn run_one(
input: &Path,
spec: &JobSpec,
manifest_out_dir: Option<&Path>,
base_dir: &Path,
) -> Result<(PathBuf, u64, u64)> {
let bytes = fs::read(input).with_context(|| format!("reading {}", input.display()))?;
let info = crate::probe_bytes(&bytes).context("probing input")?;
let settings = spec.to_settings()?;
let mut output_spec = settings
.into_spec(info.width, info.height)
.context("building output spec")?;
for f in &mut output_spec.filters {
if let codec::filter::VideoFilter::Overlay { image, .. } = f {
*image = join_rel(base_dir, image).to_string_lossy().into_owned();
}
}
let is_hls = matches!(output_spec.mode, OutputMode::Hls { .. });
let multi = output_spec.rungs.len() > 1;
let plan = resolve_output(
spec.output.as_deref(),
manifest_out_dir,
base_dir,
input,
is_hls,
multi,
);
let sink = Arc::new(crate::fn_sink(|_p| {}));
match plan {
OutputPlan::Directory(dir) => {
fs::create_dir_all(&dir)
.with_context(|| format!("creating output dir {}", dir.display()))?;
let out = crate::run_job_blocking(&bytes, &output_spec, Some(&dir), sink)
.with_context(|| format!("transcoding {}", input.display()))?;
let mut frames = 0u64;
let mut written = 0u64;
for r in out.rungs {
frames += r.frames;
if let RungArtifact::File(b) = r.artifact {
written += b.len() as u64;
let f = dir.join(format!("{}.mp4", r.label));
fs::write(&f, &b).with_context(|| format!("writing {}", f.display()))?;
}
}
Ok((dir, frames, written))
}
OutputPlan::SingleFile(target) => {
if let Some(parent) = target.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("creating {}", parent.display()))?;
}
let out = crate::run_job_blocking(&bytes, &output_spec, None, sink)
.with_context(|| format!("transcoding {}", input.display()))?;
let (data, frames) = out
.rungs
.into_iter()
.find_map(|r| match r.artifact {
RungArtifact::File(b) => Some((b, r.frames)),
_ => None,
})
.context("no single-file output produced")?;
fs::write(&target, &data).with_context(|| format!("writing {}", target.display()))?;
Ok((target, frames, data.len() as u64))
}
}
}
enum OutputPlan {
SingleFile(PathBuf),
Directory(PathBuf),
}
fn resolve_output(
job_output: Option<&str>,
manifest_out_dir: Option<&Path>,
base_dir: &Path,
input: &Path,
is_hls: bool,
multi: bool,
) -> OutputPlan {
let stem = input
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "output".into());
let wants_dir = is_hls || multi;
if let Some(o) = job_output {
let looks_dir = o.ends_with('/') || o.ends_with('\\');
let p = join_rel(base_dir, o);
if wants_dir {
OutputPlan::Directory(if looks_dir { p.join(&stem) } else { p })
} else if looks_dir {
OutputPlan::SingleFile(p.join(format!("{stem}.mp4")))
} else {
OutputPlan::SingleFile(p)
}
} else {
let base = manifest_out_dir
.map(|d| d.to_path_buf())
.unwrap_or_else(|| input.parent().map(Path::to_path_buf).unwrap_or_default());
if wants_dir {
OutputPlan::Directory(base.join(&stem))
} else {
OutputPlan::SingleFile(base.join(format!("{stem}.mp4")))
}
}
}
fn join_rel(base_dir: &Path, p: &str) -> PathBuf {
let pb = PathBuf::from(p);
if pb.is_absolute() {
pb
} else {
base_dir.join(pb)
}
}
fn expand_input(input: &str, base_dir: &Path) -> Result<Vec<PathBuf>> {
let has_glob = input.contains(['*', '?', '[']);
if !has_glob {
let p = join_rel(base_dir, input);
if !p.is_file() {
bail!("input not found: {}", p.display());
}
return Ok(vec![p]);
}
let pattern = join_rel(base_dir, input);
let pattern = pattern.to_string_lossy();
let mut out = Vec::new();
for entry in glob::glob(&pattern).with_context(|| format!("bad glob: {input}"))? {
match entry {
Ok(p) if p.is_file() => out.push(p),
Ok(_) => {}
Err(e) => tracing::warn!("batch: glob entry error: {e}"),
}
}
out.sort();
if out.is_empty() {
tracing::warn!("batch: glob matched no files: {input}");
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::settings;
const YAML: &str = r#"
output_dir: out
defaults:
crf: 28
color: sdr
jobs:
- input: in/a.mkv
output: out/a.mp4
crf: 24
- input: in/b.mov
mode: hls
ladder: true
pixel_format: 10bit
"#;
#[test]
fn parses_yaml_and_merges_defaults() {
let m = parse_manifest(YAML, Format::Yaml).unwrap();
assert_eq!(m.jobs.len(), 2);
assert_eq!(m.output_dir.as_deref(), Some("out"));
let j1 = m.jobs[0].over(&m.defaults);
assert_eq!(j1.crf, Some(24));
assert_eq!(j1.color.as_deref(), Some("sdr"));
let j2 = m.jobs[1].over(&m.defaults);
assert_eq!(j2.crf, Some(28));
assert_eq!(j2.bit_depth.as_deref(), Some("10bit"));
assert_eq!(j2.mode.as_deref(), Some("hls"));
}
#[test]
fn parses_equivalent_json() {
let json = r#"
{ "defaults": { "crf": 28 },
"jobs": [ { "input": "a.mkv", "output": "a.mp4", "crf": 24 } ] }"#;
let m = parse_manifest(json, Format::Json).unwrap();
assert_eq!(m.jobs.len(), 1);
assert_eq!(m.jobs[0].over(&m.defaults).crf, Some(24));
}
#[test]
fn settings_build_from_spec() {
let m = parse_manifest(YAML, Format::Yaml).unwrap();
let s = m.jobs[1].over(&m.defaults).to_settings().unwrap();
assert_eq!(s.mode, Some(settings::Mode::Hls));
assert!(s.ladder);
assert_eq!(s.crf, Some(28));
}
#[test]
fn unknown_field_is_rejected() {
let bad = "jobs:\n - input: a.mkv\n crff: 24\n";
assert!(parse_manifest(bad, Format::Yaml).is_err());
}
#[test]
fn filter_structured_objects_and_string_resolve_equal() {
use codec::filter::VideoFilter::{Crop, HFlip, Rotate};
let expect = vec![Crop { w: 1280, h: 720, x: None, y: None }, HFlip, Rotate(90)];
let structured = "jobs:\n - input: a.mkv\n output: a.mp4\n filter:\n - crop:\n w: 1280\n h: 720\n - hflip\n - rotate: 90\n";
let s = parse_manifest(structured, Format::Yaml).unwrap().jobs[0].to_settings().unwrap();
assert_eq!(s.filters, expect);
let string = "jobs:\n - input: a.mkv\n output: a.mp4\n filter: \"crop=1280:720,hflip,rotate=90\"\n";
let s2 = parse_manifest(string, Format::Yaml).unwrap().jobs[0].to_settings().unwrap();
assert_eq!(s2.filters, expect);
let bad = "jobs:\n - input: a.mkv\n filter:\n - rotate: 45\n";
assert!(parse_manifest(bad, Format::Yaml).unwrap().jobs[0].to_settings().is_err());
}
#[test]
fn empty_jobs_rejected() {
assert!(parse_manifest("jobs: []", Format::Yaml).is_err());
}
#[test]
fn format_from_extension() {
assert_eq!(Format::from_path(Path::new("m.json")), Format::Json);
assert_eq!(Format::from_path(Path::new("m.yaml")), Format::Yaml);
assert_eq!(Format::from_path(Path::new("m.yml")), Format::Yaml);
}
#[test]
fn output_rules() {
let base = Path::new("/b");
let inp = Path::new("/b/clip.mkv");
assert!(matches!(
resolve_output(Some("out/a.mp4"), None, base, inp, false, false),
OutputPlan::SingleFile(p) if p.ends_with("out/a.mp4")
));
assert!(matches!(
resolve_output(Some("out/"), None, base, inp, false, false),
OutputPlan::SingleFile(p) if p.ends_with("clip.mp4")
));
assert!(matches!(
resolve_output(Some("out/hls"), None, base, inp, true, false),
OutputPlan::Directory(p) if p.ends_with("out/hls")
));
assert!(matches!(
resolve_output(None, Some(Path::new("/out")), base, inp, false, false),
OutputPlan::SingleFile(p) if p == Path::new("/out/clip.mp4")
));
}
}