#![allow(clippy::cast_precision_loss)]
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use crate::{Result, TranscodeConfig, TranscodeError};
#[derive(Debug, Clone)]
pub struct TranscodeProfile {
pub name: String,
pub description: String,
pub output_extension: String,
pub output_dir: Option<PathBuf>,
pub config: TranscodeConfig,
}
impl TranscodeProfile {
#[must_use]
pub fn new(name: impl Into<String>, config: TranscodeConfig) -> Self {
Self {
name: name.into(),
description: String::new(),
output_extension: "mkv".into(),
output_dir: None,
config,
}
}
#[must_use]
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = desc.into();
self
}
#[must_use]
pub fn with_output_extension(mut self, ext: impl Into<String>) -> Self {
self.output_extension = ext.into();
self
}
#[must_use]
pub fn with_output_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.output_dir = Some(dir.into());
self
}
#[must_use]
pub fn output_path_for(&self, source: &Path) -> PathBuf {
let stem = source
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("output");
let filename = format!("{stem}.{}", self.output_extension);
match &self.output_dir {
Some(dir) => dir.join(filename),
None => {
let parent = source.parent().unwrap_or_else(|| Path::new("."));
parent.join(filename)
}
}
}
#[must_use]
pub fn create_job_config(&self, source: &Path) -> TranscodeConfig {
let mut config = self.config.clone();
config.input = source.to_str().map(String::from);
config.output = self.output_path_for(source).to_str().map(String::from);
config
}
}
#[derive(Debug, Clone)]
pub struct GlobPattern {
pattern: String,
case_insensitive: bool,
}
impl GlobPattern {
#[must_use]
pub fn new(pattern: impl Into<String>) -> Self {
Self {
pattern: pattern.into(),
case_insensitive: true,
}
}
#[must_use]
pub fn case_sensitive(mut self) -> Self {
self.case_insensitive = false;
self
}
#[must_use]
pub fn matches(&self, text: &str) -> bool {
let (pat, txt) = if self.case_insensitive {
(self.pattern.to_lowercase(), text.to_lowercase())
} else {
(self.pattern.clone(), text.to_string())
};
Self::glob_match(&pat, &txt)
}
#[must_use]
pub fn pattern(&self) -> &str {
&self.pattern
}
fn glob_match(pattern: &str, text: &str) -> bool {
let pat: Vec<char> = pattern.chars().collect();
let txt: Vec<char> = text.chars().collect();
let (plen, tlen) = (pat.len(), txt.len());
let mut dp = vec![vec![false; tlen + 1]; plen + 1];
dp[0][0] = true;
for (i, &pc) in pat.iter().enumerate() {
if pc == '*' {
dp[i + 1][0] = dp[i][0];
} else {
break;
}
}
for i in 1..=plen {
for j in 1..=tlen {
if pat[i - 1] == '*' {
dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
} else if pat[i - 1] == '?' || pat[i - 1] == txt[j - 1] {
dp[i][j] = dp[i - 1][j - 1];
}
}
}
dp[plen][tlen]
}
}
#[derive(Debug, Clone)]
struct DebounceEntry {
last_size: u64,
last_change: Instant,
stable_count: u32,
}
#[derive(Debug, Clone)]
pub struct DebounceConfig {
pub stable_duration: Duration,
pub required_stable_checks: u32,
pub min_file_size: u64,
}
impl Default for DebounceConfig {
fn default() -> Self {
Self {
stable_duration: Duration::from_secs(3),
required_stable_checks: 2,
min_file_size: 1024,
}
}
}
impl DebounceConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_stable_duration(mut self, dur: Duration) -> Self {
self.stable_duration = dur;
self
}
#[must_use]
pub fn with_required_checks(mut self, n: u32) -> Self {
self.required_stable_checks = n;
self
}
#[must_use]
pub fn with_min_size(mut self, size: u64) -> Self {
self.min_file_size = size;
self
}
}
#[derive(Debug, Clone)]
pub struct WatchTranscoderConfig {
pub watch_dir: PathBuf,
pub include_patterns: Vec<GlobPattern>,
pub exclude_patterns: Vec<GlobPattern>,
pub debounce: DebounceConfig,
pub profile: TranscodeProfile,
pub poll_interval: Duration,
pub max_concurrent: usize,
pub process_existing: bool,
}
impl WatchTranscoderConfig {
#[must_use]
pub fn new(watch_dir: impl Into<PathBuf>, profile: TranscodeProfile) -> Self {
Self {
watch_dir: watch_dir.into(),
include_patterns: vec![
GlobPattern::new("*.mp4"),
GlobPattern::new("*.mkv"),
GlobPattern::new("*.mov"),
GlobPattern::new("*.avi"),
GlobPattern::new("*.webm"),
GlobPattern::new("*.mxf"),
GlobPattern::new("*.ts"),
],
exclude_patterns: vec![
GlobPattern::new(".*"), GlobPattern::new("*~"), GlobPattern::new("*.part"), GlobPattern::new("*.tmp"), ],
debounce: DebounceConfig::default(),
profile,
poll_interval: Duration::from_secs(5),
max_concurrent: 2,
process_existing: false,
}
}
#[must_use]
pub fn include(mut self, pattern: impl Into<String>) -> Self {
self.include_patterns.push(GlobPattern::new(pattern));
self
}
#[must_use]
pub fn exclude(mut self, pattern: impl Into<String>) -> Self {
self.exclude_patterns.push(GlobPattern::new(pattern));
self
}
#[must_use]
pub fn with_debounce(mut self, debounce: DebounceConfig) -> Self {
self.debounce = debounce;
self
}
#[must_use]
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
#[must_use]
pub fn with_max_concurrent(mut self, n: usize) -> Self {
self.max_concurrent = n;
self
}
#[must_use]
pub fn with_process_existing(mut self, enable: bool) -> Self {
self.process_existing = enable;
self
}
pub fn validate(&self) -> Result<()> {
if !self.watch_dir.exists() {
return Err(TranscodeError::InvalidInput(format!(
"Watch directory does not exist: {}",
self.watch_dir.display()
)));
}
if self.max_concurrent == 0 {
return Err(TranscodeError::InvalidInput(
"max_concurrent must be >= 1".into(),
));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatchJobStatus {
Debouncing,
Ready,
Transcoding,
Completed,
Failed(String),
Excluded,
}
#[derive(Debug, Clone)]
pub struct WatchJob {
pub source: PathBuf,
pub output: PathBuf,
pub status: WatchJobStatus,
pub detected_at: Instant,
pub config: Option<TranscodeConfig>,
}
impl WatchJob {
fn new(source: PathBuf, output: PathBuf) -> Self {
Self {
source,
output,
status: WatchJobStatus::Debouncing,
detected_at: Instant::now(),
config: None,
}
}
}
pub struct WatchTranscoder {
config: WatchTranscoderConfig,
jobs: Vec<WatchJob>,
debounce_state: HashMap<PathBuf, DebounceEntry>,
active_count: usize,
}
impl WatchTranscoder {
#[must_use]
pub fn new(config: WatchTranscoderConfig) -> Self {
Self {
config,
jobs: Vec::new(),
debounce_state: HashMap::new(),
active_count: 0,
}
}
#[must_use]
pub fn config(&self) -> &WatchTranscoderConfig {
&self.config
}
#[must_use]
pub fn jobs(&self) -> &[WatchJob] {
&self.jobs
}
#[must_use]
pub fn active_count(&self) -> usize {
self.active_count
}
#[must_use]
pub fn should_process(&self, filename: &str) -> bool {
for pattern in &self.config.exclude_patterns {
if pattern.matches(filename) {
return false;
}
}
if self.config.include_patterns.is_empty() {
return true;
}
self.config
.include_patterns
.iter()
.any(|p| p.matches(filename))
}
pub fn scan(&mut self) -> Result<usize> {
let entries = std::fs::read_dir(&self.config.watch_dir).map_err(|e| {
TranscodeError::IoError(format!(
"Cannot read watch dir '{}': {e}",
self.config.watch_dir.display()
))
})?;
let mut new_count = 0usize;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
if self.jobs.iter().any(|j| j.source == path) {
continue;
}
let filename = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if !self.should_process(filename) {
continue;
}
let output = self.config.profile.output_path_for(&path);
let job = WatchJob::new(path.clone(), output);
self.jobs.push(job);
let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
self.debounce_state.insert(
path,
DebounceEntry {
last_size: size,
last_change: Instant::now(),
stable_count: 0,
},
);
new_count += 1;
}
let debouncing_paths: Vec<PathBuf> = self
.jobs
.iter()
.filter(|j| j.status == WatchJobStatus::Debouncing)
.map(|j| j.source.clone())
.collect();
for path in debouncing_paths {
self.update_debounce(&path);
}
Ok(new_count)
}
fn update_debounce(&mut self, path: &Path) {
let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
if let Some(entry) = self.debounce_state.get_mut(path) {
if current_size < self.config.debounce.min_file_size {
entry.stable_count = 0;
entry.last_size = current_size;
return;
}
if current_size == entry.last_size {
entry.stable_count += 1;
} else {
entry.stable_count = 0;
entry.last_change = Instant::now();
}
entry.last_size = current_size;
if entry.stable_count >= self.config.debounce.required_stable_checks {
for job in &mut self.jobs {
if job.source == path && job.status == WatchJobStatus::Debouncing {
job.status = WatchJobStatus::Ready;
job.config = Some(self.config.profile.create_job_config(path));
}
}
}
}
}
pub fn drain_ready(&mut self) -> Vec<(PathBuf, TranscodeConfig)> {
let mut result = Vec::new();
for job in &mut self.jobs {
if self.active_count >= self.config.max_concurrent {
break;
}
if job.status == WatchJobStatus::Ready {
if let Some(ref config) = job.config {
result.push((job.source.clone(), config.clone()));
job.status = WatchJobStatus::Transcoding;
self.active_count += 1;
}
}
}
result
}
pub fn mark_completed(&mut self, source: &Path) {
for job in &mut self.jobs {
if job.source == source && job.status == WatchJobStatus::Transcoding {
job.status = WatchJobStatus::Completed;
self.active_count = self.active_count.saturating_sub(1);
return;
}
}
}
pub fn mark_failed(&mut self, source: &Path, reason: impl Into<String>) {
for job in &mut self.jobs {
if job.source == source && job.status == WatchJobStatus::Transcoding {
job.status = WatchJobStatus::Failed(reason.into());
self.active_count = self.active_count.saturating_sub(1);
return;
}
}
}
#[must_use]
pub fn status_summary(&self) -> WatchStatusSummary {
let mut summary = WatchStatusSummary::default();
for job in &self.jobs {
match &job.status {
WatchJobStatus::Debouncing => summary.debouncing += 1,
WatchJobStatus::Ready => summary.ready += 1,
WatchJobStatus::Transcoding => summary.transcoding += 1,
WatchJobStatus::Completed => summary.completed += 1,
WatchJobStatus::Failed(_) => summary.failed += 1,
WatchJobStatus::Excluded => summary.excluded += 1,
}
}
summary
}
#[must_use]
pub fn total_jobs(&self) -> usize {
self.jobs.len()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct WatchStatusSummary {
pub debouncing: usize,
pub ready: usize,
pub transcoding: usize,
pub completed: usize,
pub failed: usize,
pub excluded: usize,
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_profile() -> TranscodeProfile {
TranscodeProfile::new("test", TranscodeConfig::default())
.with_output_extension("webm")
.with_description("Test profile")
}
#[test]
fn test_glob_pattern_basic() {
let pat = GlobPattern::new("*.mp4");
assert!(pat.matches("video.mp4"));
assert!(pat.matches("Video.MP4")); assert!(!pat.matches("video.mkv"));
}
#[test]
fn test_glob_pattern_question_mark() {
let pat = GlobPattern::new("file?.mp4");
assert!(pat.matches("file1.mp4"));
assert!(pat.matches("fileA.mp4"));
assert!(!pat.matches("file12.mp4"));
}
#[test]
fn test_glob_pattern_case_sensitive() {
let pat = GlobPattern::new("*.MP4").case_sensitive();
assert!(pat.matches("video.MP4"));
assert!(!pat.matches("video.mp4"));
}
#[test]
fn test_glob_pattern_wildcard_middle() {
let pat = GlobPattern::new("pre*suf.txt");
assert!(pat.matches("pre_middle_suf.txt"));
assert!(pat.matches("presuf.txt"));
assert!(!pat.matches("prefix.txt"));
}
#[test]
fn test_should_process_include_exclude() {
let profile = make_test_profile();
let config = WatchTranscoderConfig::new(std::env::temp_dir(), profile);
let watcher = WatchTranscoder::new(config);
assert!(watcher.should_process("video.mp4"));
assert!(watcher.should_process("clip.mkv"));
assert!(watcher.should_process("clip.mov"));
assert!(!watcher.should_process("document.pdf"));
assert!(!watcher.should_process("image.png"));
assert!(!watcher.should_process(".hidden.mp4"));
assert!(!watcher.should_process("backup.mp4~"));
assert!(!watcher.should_process("download.mp4.part"));
assert!(!watcher.should_process("temp.mp4.tmp"));
}
#[test]
fn test_transcode_profile_output_path() {
let profile = make_test_profile();
let source = Path::new("/media/input/clip.mp4");
let output = profile.output_path_for(source);
assert_eq!(output, PathBuf::from("/media/input/clip.webm"));
}
#[test]
fn test_transcode_profile_output_dir() {
let profile = make_test_profile()
.with_output_dir("/media/output");
let source = Path::new("/media/input/clip.mp4");
let output = profile.output_path_for(source);
assert_eq!(output, PathBuf::from("/media/output/clip.webm"));
}
#[test]
fn test_transcode_profile_create_job() {
let profile = make_test_profile();
let source = Path::new("/media/clip.mp4");
let config = profile.create_job_config(source);
assert_eq!(config.input, Some("/media/clip.mp4".to_string()));
assert_eq!(config.output, Some("/media/clip.webm".to_string()));
}
#[test]
fn test_watcher_scan_with_real_dir() {
let temp_dir = std::env::temp_dir().join("oximedia_watch_test_scan");
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).expect("create temp dir");
let mp4_path = temp_dir.join("test.mp4");
let txt_path = temp_dir.join("readme.txt");
std::fs::write(&mp4_path, b"fake mp4 content that is long enough to pass min size check 1234567890 1234567890").expect("write mp4");
std::fs::write(&txt_path, b"not a video").expect("write txt");
let profile = make_test_profile();
let config = WatchTranscoderConfig::new(&temp_dir, profile)
.with_process_existing(true);
let mut watcher = WatchTranscoder::new(config);
let new_count = watcher.scan().expect("scan should succeed");
assert_eq!(new_count, 1);
assert_eq!(watcher.total_jobs(), 1);
assert_eq!(watcher.jobs()[0].status, WatchJobStatus::Debouncing);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[test]
fn test_debounce_promotes_to_ready() {
let temp_dir = std::env::temp_dir().join("oximedia_watch_test_debounce");
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).expect("create temp dir");
let mp4_path = temp_dir.join("stable.mp4");
std::fs::write(&mp4_path, vec![0u8; 2048]).expect("write mp4");
let profile = make_test_profile();
let debounce = DebounceConfig::new()
.with_required_checks(2)
.with_min_size(512);
let config = WatchTranscoderConfig::new(&temp_dir, profile)
.with_debounce(debounce);
let mut watcher = WatchTranscoder::new(config);
watcher.scan().expect("scan 1");
assert_eq!(watcher.jobs()[0].status, WatchJobStatus::Debouncing);
watcher.scan().expect("scan 2");
assert_eq!(watcher.jobs()[0].status, WatchJobStatus::Ready);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[test]
fn test_drain_ready_respects_concurrency() {
let temp_dir = std::env::temp_dir().join("oximedia_watch_test_drain");
let _ = std::fs::remove_dir_all(&temp_dir);
std::fs::create_dir_all(&temp_dir).expect("create temp dir");
for i in 0..5 {
let path = temp_dir.join(format!("file{i}.mp4"));
std::fs::write(&path, vec![0u8; 2048]).expect("write");
}
let profile = make_test_profile();
let debounce = DebounceConfig::new()
.with_required_checks(1)
.with_min_size(512);
let config = WatchTranscoderConfig::new(&temp_dir, profile)
.with_debounce(debounce)
.with_max_concurrent(2);
let mut watcher = WatchTranscoder::new(config);
watcher.scan().expect("scan 1");
watcher.scan().expect("scan 2");
let ready = watcher.drain_ready();
assert_eq!(ready.len(), 2);
assert_eq!(watcher.active_count(), 2);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[test]
fn test_mark_completed_and_failed() {
let profile = make_test_profile();
let config = WatchTranscoderConfig::new(std::env::temp_dir(), profile);
let mut watcher = WatchTranscoder::new(config);
let source1 = PathBuf::from("/tmp/a.mp4");
let source2 = PathBuf::from("/tmp/b.mp4");
watcher.jobs.push(WatchJob {
source: source1.clone(),
output: PathBuf::from("/tmp/a.webm"),
status: WatchJobStatus::Transcoding,
detected_at: Instant::now(),
config: None,
});
watcher.jobs.push(WatchJob {
source: source2.clone(),
output: PathBuf::from("/tmp/b.webm"),
status: WatchJobStatus::Transcoding,
detected_at: Instant::now(),
config: None,
});
watcher.active_count = 2;
watcher.mark_completed(&source1);
assert_eq!(watcher.active_count(), 1);
assert_eq!(watcher.jobs[0].status, WatchJobStatus::Completed);
watcher.mark_failed(&source2, "codec error");
assert_eq!(watcher.active_count(), 0);
assert_eq!(
watcher.jobs[1].status,
WatchJobStatus::Failed("codec error".to_string())
);
}
#[test]
fn test_status_summary() {
let profile = make_test_profile();
let config = WatchTranscoderConfig::new(std::env::temp_dir(), profile);
let mut watcher = WatchTranscoder::new(config);
watcher.jobs.push(WatchJob {
source: PathBuf::from("/a.mp4"),
output: PathBuf::from("/a.webm"),
status: WatchJobStatus::Debouncing,
detected_at: Instant::now(),
config: None,
});
watcher.jobs.push(WatchJob {
source: PathBuf::from("/b.mp4"),
output: PathBuf::from("/b.webm"),
status: WatchJobStatus::Ready,
detected_at: Instant::now(),
config: None,
});
watcher.jobs.push(WatchJob {
source: PathBuf::from("/c.mp4"),
output: PathBuf::from("/c.webm"),
status: WatchJobStatus::Completed,
detected_at: Instant::now(),
config: None,
});
let summary = watcher.status_summary();
assert_eq!(summary.debouncing, 1);
assert_eq!(summary.ready, 1);
assert_eq!(summary.completed, 1);
assert_eq!(summary.transcoding, 0);
assert_eq!(summary.failed, 0);
}
#[test]
fn test_debounce_config_builder() {
let config = DebounceConfig::new()
.with_stable_duration(Duration::from_secs(10))
.with_required_checks(5)
.with_min_size(4096);
assert_eq!(config.stable_duration, Duration::from_secs(10));
assert_eq!(config.required_stable_checks, 5);
assert_eq!(config.min_file_size, 4096);
}
#[test]
fn test_watch_config_validation() {
let profile = make_test_profile();
let config = WatchTranscoderConfig::new("/nonexistent/path/12345", profile.clone());
assert!(config.validate().is_err());
let config2 = WatchTranscoderConfig::new(std::env::temp_dir(), profile)
.with_max_concurrent(0);
assert!(config2.validate().is_err());
}
}