use crate::progress::BatchProgress;
use crate::transcode::{self, TranscodeOptions};
use anyhow::{Context, Result};
use colored::Colorize;
use rayon::prelude::*;
use serde::Deserialize;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct BatchOptions {
pub input_dir: PathBuf,
pub output_dir: PathBuf,
pub config: PathBuf,
pub jobs: usize,
pub continue_on_error: bool,
pub dry_run: bool,
}
#[derive(Debug, Deserialize)]
pub struct BatchConfig {
pub patterns: Vec<String>,
#[serde(default)]
pub exclude: Vec<String>,
pub video_codec: Option<String>,
pub audio_codec: Option<String>,
pub video_bitrate: Option<String>,
pub audio_bitrate: Option<String>,
pub scale: Option<String>,
pub video_filter: Option<String>,
#[serde(default = "default_preset")]
pub preset: String,
#[serde(default)]
pub two_pass: bool,
pub crf: Option<u32>,
#[serde(default)]
pub threads: usize,
#[serde(default = "default_extension")]
pub output_extension: String,
#[serde(default)]
pub overwrite: bool,
#[serde(default = "default_recursive")]
pub recursive: bool,
}
fn default_preset() -> String {
"medium".to_string()
}
fn default_extension() -> String {
"webm".to_string()
}
fn default_recursive() -> bool {
true
}
#[derive(Debug, Clone)]
struct BatchJob {
#[allow(dead_code)]
input_path: PathBuf,
#[allow(dead_code)]
output_path: PathBuf,
options: TranscodeOptions,
}
#[derive(Debug)]
enum JobResult {
Success {
#[allow(dead_code)]
input: PathBuf,
#[allow(dead_code)]
output: PathBuf,
},
Failed {
input: PathBuf,
error: String,
},
Skipped {
input: PathBuf,
reason: String,
},
}
pub async fn batch_process(options: BatchOptions) -> Result<()> {
info!("Starting batch processing");
debug!("Batch options: {:?}", options);
validate_directories(&options)?;
let config = load_config(&options.config).context("Failed to load batch configuration")?;
debug!("Loaded config: {:?}", config);
let input_files =
discover_files(&options.input_dir, &config).context("Failed to discover input files")?;
if input_files.is_empty() {
warn!("No files found matching the patterns");
return Ok(());
}
info!("Found {} files to process", input_files.len());
let jobs = create_job_queue(&input_files, &options, &config)?;
if options.dry_run {
print_dry_run(&jobs);
return Ok(());
}
let results = execute_jobs(jobs, &options).await?;
print_batch_summary(&results);
let failed_count = results
.iter()
.filter(|r| matches!(r, JobResult::Failed { .. }))
.count();
if failed_count > 0 && !options.continue_on_error {
Err(anyhow::anyhow!("{} jobs failed", failed_count))
} else {
Ok(())
}
}
fn validate_directories(options: &BatchOptions) -> Result<()> {
if !options.input_dir.exists() {
return Err(anyhow::anyhow!(
"Input directory does not exist: {}",
options.input_dir.display()
));
}
if !options.input_dir.is_dir() {
return Err(anyhow::anyhow!(
"Input path is not a directory: {}",
options.input_dir.display()
));
}
if !options.output_dir.exists() {
fs::create_dir_all(&options.output_dir).context("Failed to create output directory")?;
}
Ok(())
}
fn load_config(path: &Path) -> Result<BatchConfig> {
let content = fs::read_to_string(path).context("Failed to read configuration file")?;
toml::from_str(&content).context("Failed to parse TOML configuration")
}
fn discover_files(input_dir: &Path, config: &BatchConfig) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
if config.recursive {
walk_dir_recursive(input_dir, &mut files, config)?;
} else {
walk_dir_shallow(input_dir, &mut files, config)?;
}
files.sort();
Ok(files)
}
fn walk_dir_recursive(dir: &Path, files: &mut Vec<PathBuf>, config: &BatchConfig) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
walk_dir_recursive(&path, files, config)?;
} else if path.is_file() && matches_patterns(&path, config) {
files.push(path);
}
}
Ok(())
}
fn walk_dir_shallow(dir: &Path, files: &mut Vec<PathBuf>, config: &BatchConfig) -> Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_file() && matches_patterns(&path, config) {
files.push(path);
}
}
Ok(())
}
fn matches_patterns(path: &Path, config: &BatchConfig) -> bool {
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
let included = config
.patterns
.iter()
.any(|pattern| glob_match(filename, pattern));
if !included {
return false;
}
let excluded = config
.exclude
.iter()
.any(|pattern| glob_match(filename, pattern));
!excluded
}
fn glob_match(text: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
text.starts_with(prefix)
} else if let Some(suffix) = pattern.strip_prefix('*') {
text.ends_with(suffix)
} else {
text == pattern
}
}
fn create_job_queue(
files: &[PathBuf],
options: &BatchOptions,
config: &BatchConfig,
) -> Result<Vec<BatchJob>> {
let mut jobs = Vec::new();
for input_path in files {
let output_path = compute_output_path(input_path, options, config);
let job_options = TranscodeOptions {
input: input_path.clone(),
output: output_path.clone(),
preset_name: None,
video_codec: config.video_codec.clone(),
audio_codec: config.audio_codec.clone(),
video_bitrate: config.video_bitrate.clone(),
audio_bitrate: config.audio_bitrate.clone(),
scale: config.scale.clone(),
video_filter: config.video_filter.clone(),
audio_filter: None,
start_time: None,
duration: None,
framerate: None,
preset: config.preset.clone(),
two_pass: config.two_pass,
crf: config.crf,
threads: config.threads,
overwrite: config.overwrite,
resume: false,
};
jobs.push(BatchJob {
input_path: input_path.clone(),
output_path,
options: job_options,
});
}
Ok(jobs)
}
fn compute_output_path(input_path: &Path, options: &BatchOptions, config: &BatchConfig) -> PathBuf {
let relative = input_path
.strip_prefix(&options.input_dir)
.unwrap_or(input_path);
let mut output = options.output_dir.join(relative);
output.set_extension(&config.output_extension);
output
}
fn print_dry_run(jobs: &[BatchJob]) {
println!("{}", "Batch Dry Run".yellow().bold());
println!("{}", "=".repeat(60));
println!("The following operations would be performed:\n");
for (i, job) in jobs.iter().enumerate() {
println!("{}. {}", i + 1, "Job".cyan());
println!(" {:<12} {}", "Input:", job.input_path.display());
println!(" {:<12} {}", "Output:", job.output_path.display());
if let Some(ref codec) = job.options.video_codec {
println!(" {:<12} {}", "Video:", codec);
}
if let Some(ref codec) = job.options.audio_codec {
println!(" {:<12} {}", "Audio:", codec);
}
println!();
}
println!("{}", "=".repeat(60));
println!("Total: {} jobs", jobs.len());
}
async fn execute_jobs(jobs: Vec<BatchJob>, options: &BatchOptions) -> Result<Vec<JobResult>> {
let total_jobs = jobs.len();
let progress = Arc::new(Mutex::new(BatchProgress::new(total_jobs)));
let results = Arc::new(Mutex::new(Vec::new()));
let num_threads = if options.jobs == 0 {
rayon::current_num_threads()
} else {
options.jobs
};
info!("Processing with {} parallel jobs", num_threads);
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.context("Failed to build thread pool")?;
pool.install(|| {
jobs.par_iter().for_each(|job| {
let result = process_job(job);
let mut progress_guard = progress.lock().unwrap_or_else(|e| e.into_inner());
match &result {
JobResult::Success { .. } => progress_guard.inc_success(),
JobResult::Failed { .. } => progress_guard.inc_failed(),
JobResult::Skipped { .. } => progress_guard.inc_success(),
}
drop(progress_guard);
results
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(result);
});
});
progress.lock().unwrap_or_else(|e| e.into_inner()).finish();
let final_results = Arc::try_unwrap(results)
.map_err(|_| anyhow::anyhow!("Failed to extract results"))?
.into_inner()
.map_err(|e| anyhow::anyhow!("Failed to unwrap results mutex: {e}"))?;
Ok(final_results)
}
fn process_job(job: &BatchJob) -> JobResult {
if let Some(parent) = job.output_path.parent() {
if let Err(e) = fs::create_dir_all(parent) {
return JobResult::Failed {
input: job.input_path.clone(),
error: format!("Failed to create output directory: {}", e),
};
}
}
if job.output_path.exists() && !job.options.overwrite {
return JobResult::Skipped {
input: job.input_path.clone(),
reason: "Output file already exists".to_string(),
};
}
let runtime = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
return JobResult::Failed {
input: job.input_path.clone(),
error: format!("Failed to create tokio runtime: {e}"),
};
}
};
match runtime.block_on(transcode::transcode(job.options.clone())) {
Ok(()) => JobResult::Success {
input: job.input_path.clone(),
output: job.output_path.clone(),
},
Err(e) => {
error!("Job failed for {}: {}", job.input_path.display(), e);
JobResult::Failed {
input: job.input_path.clone(),
error: e.to_string(),
}
}
}
}
fn print_batch_summary(results: &[JobResult]) {
let success_count = results
.iter()
.filter(|r| matches!(r, JobResult::Success { .. }))
.count();
let failed_count = results
.iter()
.filter(|r| matches!(r, JobResult::Failed { .. }))
.count();
let skipped_count = results
.iter()
.filter(|r| matches!(r, JobResult::Skipped { .. }))
.count();
println!();
println!("{}", "Batch Processing Summary".green().bold());
println!("{}", "=".repeat(60));
println!("{:20} {}", "Total jobs:", results.len());
println!("{:20} {}", "Succeeded:", success_count.to_string().green());
println!("{:20} {}", "Failed:", failed_count.to_string().red());
println!("{:20} {}", "Skipped:", skipped_count.to_string().yellow());
println!("{}", "=".repeat(60));
if failed_count > 0 {
println!();
println!("{}", "Failed Jobs:".red().bold());
for result in results {
if let JobResult::Failed { input, error } = result {
println!(" {} {}", "✗".red(), input.display());
println!(" {}", error.dimmed());
}
}
}
if skipped_count > 0 {
println!();
println!("{}", "Skipped Jobs:".yellow().bold());
for result in results {
if let JobResult::Skipped { input, reason } = result {
println!(" {} {} - {}", "⊘".yellow(), input.display(), reason);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_glob_match() {
assert!(glob_match("video.mkv", "*.mkv"));
assert!(glob_match("test.mp4", "*.mp4"));
assert!(!glob_match("video.mkv", "*.mp4"));
assert!(glob_match("anything", "*"));
assert!(glob_match("prefix_test.mkv", "prefix*"));
}
#[test]
fn test_config_defaults() {
let toml = r#"
patterns = ["*.mkv"]
video_codec = "vp9"
"#;
let config: BatchConfig = toml::from_str(toml).expect("toml::from_str should succeed");
assert_eq!(config.preset, "medium");
assert_eq!(config.output_extension, "webm");
assert!(config.recursive);
assert!(!config.two_pass);
}
}