#![allow(dead_code)]
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::{Result, TranscodeConfig, TranscodeError};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PostProcessAction {
Leave,
MoveToDone,
Delete,
}
#[derive(Debug, Clone)]
pub enum OutputLocation {
Fixed(PathBuf),
SiblingWithExtension(String),
DoneSubDir,
}
#[derive(Debug, Clone)]
pub struct WatchConfig {
pub watch_dir: PathBuf,
pub accepted_extensions: Vec<String>,
pub output_location: OutputLocation,
pub on_success: PostProcessAction,
pub on_failure: PostProcessAction,
pub poll_interval_ms: u64,
pub base_config: TranscodeConfig,
pub max_concurrent: usize,
}
impl WatchConfig {
#[must_use]
pub fn new(watch_dir: impl Into<PathBuf>) -> Self {
Self {
watch_dir: watch_dir.into(),
accepted_extensions: vec![
"mp4".into(),
"mkv".into(),
"mov".into(),
"avi".into(),
"webm".into(),
"mxf".into(),
"ts".into(),
"m2ts".into(),
],
output_location: OutputLocation::DoneSubDir,
on_success: PostProcessAction::MoveToDone,
on_failure: PostProcessAction::Leave,
poll_interval_ms: 5_000,
base_config: TranscodeConfig::default(),
max_concurrent: 2,
}
}
#[must_use]
pub fn output_location(mut self, loc: OutputLocation) -> Self {
self.output_location = loc;
self
}
#[must_use]
pub fn on_success(mut self, action: PostProcessAction) -> Self {
self.on_success = action;
self
}
#[must_use]
pub fn on_failure(mut self, action: PostProcessAction) -> Self {
self.on_failure = action;
self
}
#[must_use]
pub fn poll_interval_ms(mut self, ms: u64) -> Self {
self.poll_interval_ms = ms;
self
}
#[must_use]
pub fn base_config(mut self, config: TranscodeConfig) -> Self {
self.base_config = config;
self
}
#[must_use]
pub fn max_concurrent(mut self, n: usize) -> Self {
self.max_concurrent = n;
self
}
pub fn validate(&self) -> Result<()> {
if !self.watch_dir.exists() {
return Err(TranscodeError::InvalidInput(format!(
"Watch directory does not exist: {}",
self.watch_dir.display()
)));
}
if self.max_concurrent == 0 {
return Err(TranscodeError::InvalidInput(
"max_concurrent must be at least 1".into(),
));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatchFileStatus {
Pending,
Processing,
Done,
Failed(String),
}
#[derive(Debug, Clone)]
pub struct WatchEntry {
pub source: PathBuf,
pub output: PathBuf,
pub status: WatchFileStatus,
}
impl WatchEntry {
#[must_use]
pub fn new(source: PathBuf, output: PathBuf) -> Self {
Self {
source,
output,
status: WatchFileStatus::Pending,
}
}
}
pub struct TranscodeWatcher {
config: WatchConfig,
seen: HashSet<PathBuf>,
queue: Vec<WatchEntry>,
}
impl TranscodeWatcher {
#[must_use]
pub fn new(config: WatchConfig) -> Self {
Self {
config,
seen: HashSet::new(),
queue: Vec::new(),
}
}
#[must_use]
pub fn config(&self) -> &WatchConfig {
&self.config
}
#[must_use]
pub fn poll_interval(&self) -> Duration {
Duration::from_millis(self.config.poll_interval_ms)
}
pub fn scan(&mut self) -> Result<usize> {
let entries = std::fs::read_dir(&self.config.watch_dir).map_err(|e| {
TranscodeError::IoError(format!(
"Cannot read watch dir '{}': {e}",
self.config.watch_dir.display()
))
})?;
let mut new_count = 0usize;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
if self.seen.contains(&path) {
continue;
}
let ext = path
.extension()
.and_then(|e| e.to_str())
.map(str::to_lowercase)
.unwrap_or_default();
if !self.config.accepted_extensions.iter().any(|a| a == &ext) {
continue;
}
let output = self.resolve_output(&path);
self.seen.insert(path.clone());
self.queue.push(WatchEntry::new(path, output));
new_count += 1;
}
Ok(new_count)
}
#[must_use]
pub fn pending(&self) -> Vec<&WatchEntry> {
self.queue
.iter()
.filter(|e| e.status == WatchFileStatus::Pending)
.collect()
}
pub fn drain_pending(&mut self) -> Vec<(WatchEntry, TranscodeConfig)> {
let mut out = Vec::new();
for entry in &mut self.queue {
if entry.status != WatchFileStatus::Pending {
continue;
}
entry.status = WatchFileStatus::Processing;
let mut job = self.config.base_config.clone();
job.input = entry.source.to_str().map(String::from);
job.output = entry.output.to_str().map(String::from);
out.push((entry.clone(), job));
}
out
}
pub fn mark_done(&mut self, source: &Path) -> Result<()> {
self.update_status(source, WatchFileStatus::Done);
match self.config.on_success {
PostProcessAction::Leave => {}
PostProcessAction::Delete => {
std::fs::remove_file(source).map_err(|e| {
TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
})?;
}
PostProcessAction::MoveToDone => {
self.move_to_done_dir(source)?;
}
}
Ok(())
}
pub fn mark_failed(&mut self, source: &Path, reason: &str) -> Result<()> {
self.update_status(source, WatchFileStatus::Failed(reason.to_string()));
match self.config.on_failure {
PostProcessAction::Leave => {}
PostProcessAction::Delete => {
std::fs::remove_file(source).map_err(|e| {
TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
})?;
}
PostProcessAction::MoveToDone => {
self.move_to_done_dir(source)?;
}
}
Ok(())
}
#[must_use]
pub fn queue_len(&self) -> usize {
self.queue.len()
}
#[must_use]
pub fn status_counts(&self) -> WatchStatusCounts {
let mut counts = WatchStatusCounts::default();
for entry in &self.queue {
match entry.status {
WatchFileStatus::Pending => counts.pending += 1,
WatchFileStatus::Processing => counts.processing += 1,
WatchFileStatus::Done => counts.done += 1,
WatchFileStatus::Failed(_) => counts.failed += 1,
}
}
counts
}
fn update_status(&mut self, source: &Path, new_status: WatchFileStatus) {
for entry in &mut self.queue {
if entry.source == source {
entry.status = new_status;
return;
}
}
}
fn resolve_output(&self, source: &Path) -> PathBuf {
match &self.config.output_location {
OutputLocation::Fixed(dir) => {
let filename = source
.file_name()
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("output.mkv"));
dir.join(filename)
}
OutputLocation::SiblingWithExtension(ext) => {
let mut out = source.to_path_buf();
out.set_extension(ext.trim_start_matches('.'));
out
}
OutputLocation::DoneSubDir => {
let done_dir = self.config.watch_dir.join("done");
let filename = source
.file_name()
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("output.mkv"));
done_dir.join(filename)
}
}
}
fn move_to_done_dir(&self, source: &Path) -> Result<()> {
let done_dir = self.config.watch_dir.join("done");
std::fs::create_dir_all(&done_dir)
.map_err(|e| TranscodeError::IoError(format!("Cannot create done dir: {e}")))?;
let dest = done_dir.join(
source
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new("moved_file")),
);
std::fs::rename(source, &dest).map_err(|e| {
TranscodeError::IoError(format!(
"Cannot move '{}' → '{}': {e}",
source.display(),
dest.display()
))
})
}
}
#[derive(Debug, Clone, Default)]
pub struct WatchStatusCounts {
pub pending: usize,
pub processing: usize,
pub done: usize,
pub failed: usize,
}
#[derive(Debug, Clone)]
pub struct FileStabilityConfig {
pub required_stable_checks: u32,
pub check_interval_ms: u64,
pub min_file_size: u64,
}
impl Default for FileStabilityConfig {
fn default() -> Self {
Self {
required_stable_checks: 3,
check_interval_ms: 2_000,
min_file_size: 1024,
}
}
}
impl FileStabilityConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn required_checks(mut self, n: u32) -> Self {
self.required_stable_checks = n;
self
}
#[must_use]
pub fn check_interval_ms(mut self, ms: u64) -> Self {
self.check_interval_ms = ms;
self
}
#[must_use]
pub fn min_file_size(mut self, size: u64) -> Self {
self.min_file_size = size;
self
}
}
#[derive(Debug, Clone)]
pub struct FileStabilityTracker {
path: PathBuf,
last_size: u64,
stable_count: u32,
is_stable: bool,
}
impl FileStabilityTracker {
#[must_use]
pub fn new(path: PathBuf) -> Self {
Self {
path,
last_size: 0,
stable_count: 0,
is_stable: false,
}
}
pub fn check(&mut self, config: &FileStabilityConfig) -> bool {
if self.is_stable {
return true;
}
let current_size = std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0);
if current_size < config.min_file_size {
self.stable_count = 0;
self.last_size = current_size;
return false;
}
if current_size == self.last_size {
self.stable_count += 1;
} else {
self.stable_count = 0;
}
self.last_size = current_size;
if self.stable_count >= config.required_stable_checks {
self.is_stable = true;
}
self.is_stable
}
#[must_use]
pub fn is_stable(&self) -> bool {
self.is_stable
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn last_size(&self) -> u64 {
self.last_size
}
}
#[derive(Debug, Clone)]
pub struct HotFolderChain {
stages: Vec<WatchConfig>,
}
impl HotFolderChain {
#[must_use]
pub fn new() -> Self {
Self { stages: Vec::new() }
}
pub fn add_stage(&mut self, config: WatchConfig) {
self.stages.push(config);
}
#[must_use]
pub fn stage_count(&self) -> usize {
self.stages.len()
}
#[must_use]
pub fn stages(&self) -> &[WatchConfig] {
&self.stages
}
pub fn validate(&self) -> Result<()> {
if self.stages.is_empty() {
return Err(TranscodeError::InvalidInput(
"Hot folder chain has no stages".into(),
));
}
for i in 0..self.stages.len().saturating_sub(1) {
let current = &self.stages[i];
let next = &self.stages[i + 1];
let output_dir = match ¤t.output_location {
OutputLocation::Fixed(dir) => Some(dir.clone()),
OutputLocation::DoneSubDir => Some(current.watch_dir.join("done")),
OutputLocation::SiblingWithExtension(_) => None,
};
if let Some(out_dir) = output_dir {
if out_dir != next.watch_dir {
return Err(TranscodeError::InvalidInput(format!(
"Stage {} output dir '{}' does not match stage {} watch dir '{}'",
i,
out_dir.display(),
i + 1,
next.watch_dir.display()
)));
}
}
}
Ok(())
}
}
impl Default for HotFolderChain {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct FilenamePattern {
pattern: String,
case_insensitive: bool,
}
impl FilenamePattern {
#[must_use]
pub fn new(pattern: impl Into<String>) -> Self {
Self {
pattern: pattern.into(),
case_insensitive: true,
}
}
#[must_use]
pub fn case_insensitive(mut self, ci: bool) -> Self {
self.case_insensitive = ci;
self
}
#[must_use]
pub fn matches(&self, filename: &str) -> bool {
let (pat, name) = if self.case_insensitive {
(self.pattern.to_lowercase(), filename.to_lowercase())
} else {
(self.pattern.clone(), filename.to_string())
};
Self::glob_match(&pat, &name)
}
fn glob_match(pattern: &str, text: &str) -> bool {
let pat_chars: Vec<char> = pattern.chars().collect();
let txt_chars: Vec<char> = text.chars().collect();
let (plen, tlen) = (pat_chars.len(), txt_chars.len());
let mut dp = vec![vec![false; tlen + 1]; plen + 1];
dp[0][0] = true;
for (i, &pc) in pat_chars.iter().enumerate() {
if pc == '*' {
dp[i + 1][0] = dp[i][0];
} else {
break;
}
}
for i in 1..=plen {
for j in 1..=tlen {
if pat_chars[i - 1] == '*' {
dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
} else if pat_chars[i - 1] == '?' || pat_chars[i - 1] == txt_chars[j - 1] {
dp[i][j] = dp[i - 1][j - 1];
}
}
}
dp[plen][tlen]
}
#[must_use]
pub fn pattern(&self) -> &str {
&self.pattern
}
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_delay_ms: u64,
pub backoff_multiplier: f64,
pub max_delay_ms: u64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay_ms: 1_000,
backoff_multiplier: 2.0,
max_delay_ms: 30_000,
}
}
}
impl RetryConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn max_retries(mut self, n: u32) -> Self {
self.max_retries = n;
self
}
#[must_use]
pub fn initial_delay_ms(mut self, ms: u64) -> Self {
self.initial_delay_ms = ms;
self
}
#[must_use]
pub fn backoff_multiplier(mut self, m: f64) -> Self {
self.backoff_multiplier = m;
self
}
#[must_use]
pub fn max_delay_ms(mut self, ms: u64) -> Self {
self.max_delay_ms = ms;
self
}
#[must_use]
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::from_millis(self.initial_delay_ms);
}
let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
let clamped = delay.min(self.max_delay_ms as f64) as u64;
Duration::from_millis(clamped)
}
}
#[derive(Debug, Clone)]
pub struct RetryTracker {
pub path: PathBuf,
pub attempts: u32,
pub last_error: Option<String>,
}
impl RetryTracker {
#[must_use]
pub fn new(path: PathBuf) -> Self {
Self {
path,
attempts: 0,
last_error: None,
}
}
pub fn record_failure(&mut self, error: &str) {
self.attempts += 1;
self.last_error = Some(error.to_string());
}
#[must_use]
pub fn can_retry(&self, config: &RetryConfig) -> bool {
self.attempts < config.max_retries
}
#[must_use]
pub fn next_delay(&self, config: &RetryConfig) -> Duration {
config.delay_for_attempt(self.attempts)
}
}
#[derive(Debug, Clone, Default)]
pub struct WatchFolderStats {
pub processed_count: u64,
pub error_count: u64,
pub total_processing_time_ms: u64,
pub total_bytes_processed: u64,
pub min_processing_time_ms: Option<u64>,
pub max_processing_time_ms: Option<u64>,
}
impl WatchFolderStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&mut self, processing_time_ms: u64, file_size_bytes: u64) {
self.processed_count += 1;
self.total_processing_time_ms += processing_time_ms;
self.total_bytes_processed += file_size_bytes;
self.min_processing_time_ms = Some(
self.min_processing_time_ms
.map_or(processing_time_ms, |m| m.min(processing_time_ms)),
);
self.max_processing_time_ms = Some(
self.max_processing_time_ms
.map_or(processing_time_ms, |m| m.max(processing_time_ms)),
);
}
pub fn record_error(&mut self) {
self.error_count += 1;
}
#[must_use]
pub fn avg_processing_time_ms(&self) -> Option<u64> {
if self.processed_count == 0 {
return None;
}
Some(self.total_processing_time_ms / self.processed_count)
}
#[must_use]
pub fn success_rate(&self) -> f64 {
let total = self.processed_count + self.error_count;
if total == 0 {
return 1.0;
}
self.processed_count as f64 / total as f64
}
#[must_use]
pub fn avg_throughput_bps(&self) -> Option<f64> {
if self.total_processing_time_ms == 0 || self.total_bytes_processed == 0 {
return None;
}
let secs = self.total_processing_time_ms as f64 / 1000.0;
Some(self.total_bytes_processed as f64 / secs)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
use std::fs;
fn make_temp_dir(suffix: &str) -> PathBuf {
let dir = temp_dir().join(format!("oximedia_watch_test_{suffix}"));
fs::create_dir_all(&dir).expect("create temp dir");
dir
}
fn touch(dir: &Path, name: &str) -> PathBuf {
let path = dir.join(name);
fs::write(&path, b"fake media").expect("create temp file");
path
}
#[test]
fn test_watch_config_new() {
let cfg = WatchConfig::new("/tmp/watch");
assert!(!cfg.accepted_extensions.is_empty());
assert_eq!(cfg.max_concurrent, 2);
assert_eq!(cfg.poll_interval_ms, 5_000);
}
#[test]
fn test_watch_config_validate_missing_dir() {
let cfg = WatchConfig::new("/nonexistent/path/for/oximedia_test");
assert!(cfg.validate().is_err());
}
#[test]
fn test_watch_config_validate_ok() {
let dir = make_temp_dir("cfg_ok");
let cfg = WatchConfig::new(&dir);
assert!(cfg.validate().is_ok());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_scan_detects_new_files() {
let dir = make_temp_dir("scan");
touch(&dir, "video.mp4");
touch(&dir, "clip.mkv");
touch(&dir, "readme.txt");
let cfg = WatchConfig::new(&dir);
let mut watcher = TranscodeWatcher::new(cfg);
let count = watcher.scan().expect("scan ok");
assert_eq!(count, 2);
assert_eq!(watcher.queue_len(), 2);
let count2 = watcher.scan().expect("scan ok");
assert_eq!(count2, 0);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_drain_pending_creates_configs() {
let dir = make_temp_dir("drain");
touch(&dir, "a.mp4");
let cfg = WatchConfig::new(&dir);
let mut watcher = TranscodeWatcher::new(cfg);
watcher.scan().expect("scan ok");
let drained = watcher.drain_pending();
assert_eq!(drained.len(), 1);
let (entry, job) = &drained[0];
assert!(entry.source.ends_with("a.mp4"));
assert!(job.input.is_some());
assert!(job.output.is_some());
assert_eq!(watcher.pending().len(), 0);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_mark_done_updates_status() {
let dir = make_temp_dir("mark_done");
let file = touch(&dir, "b.mp4");
let cfg = WatchConfig::new(&dir).on_success(PostProcessAction::Leave);
let mut watcher = TranscodeWatcher::new(cfg);
watcher.scan().expect("scan ok");
watcher.drain_pending();
watcher.mark_done(&file).expect("mark done ok");
let counts = watcher.status_counts();
assert_eq!(counts.done, 1);
assert_eq!(counts.failed, 0);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_mark_failed_updates_status() {
let dir = make_temp_dir("mark_failed");
let file = touch(&dir, "c.mp4");
let cfg = WatchConfig::new(&dir).on_failure(PostProcessAction::Leave);
let mut watcher = TranscodeWatcher::new(cfg);
watcher.scan().expect("scan ok");
watcher.drain_pending();
watcher
.mark_failed(&file, "codec not found")
.expect("mark failed ok");
let counts = watcher.status_counts();
assert_eq!(counts.failed, 1);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_status_counts() {
let dir = make_temp_dir("counts");
touch(&dir, "x.mp4");
touch(&dir, "y.mkv");
let cfg = WatchConfig::new(&dir);
let mut watcher = TranscodeWatcher::new(cfg);
watcher.scan().expect("scan ok");
let counts = watcher.status_counts();
assert_eq!(counts.pending, 2);
assert_eq!(counts.processing, 0);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_poll_interval() {
let cfg = WatchConfig::new("/tmp").poll_interval_ms(2000);
let watcher = TranscodeWatcher::new(cfg);
assert_eq!(watcher.poll_interval(), Duration::from_secs(2));
}
#[test]
fn test_output_location_sibling() {
let dir = make_temp_dir("sibling");
touch(&dir, "d.mp4");
let cfg = WatchConfig::new(&dir)
.output_location(OutputLocation::SiblingWithExtension("mkv".into()));
let mut watcher = TranscodeWatcher::new(cfg);
watcher.scan().expect("scan ok");
let entry = &watcher.queue[0];
assert!(entry
.output
.extension()
.map(|e| e == "mkv")
.unwrap_or(false));
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stability_config_defaults() {
let cfg = FileStabilityConfig::default();
assert_eq!(cfg.required_stable_checks, 3);
assert_eq!(cfg.check_interval_ms, 2000);
assert_eq!(cfg.min_file_size, 1024);
}
#[test]
fn test_stability_config_builder() {
let cfg = FileStabilityConfig::new()
.required_checks(5)
.check_interval_ms(1000)
.min_file_size(4096);
assert_eq!(cfg.required_stable_checks, 5);
assert_eq!(cfg.check_interval_ms, 1000);
assert_eq!(cfg.min_file_size, 4096);
}
#[test]
fn test_stability_tracker_stable_file() {
let dir = make_temp_dir("stability");
let path = dir.join("stable.mp4");
fs::write(&path, vec![0u8; 2048]).expect("write ok");
let cfg = FileStabilityConfig::new().required_checks(2);
let mut tracker = FileStabilityTracker::new(path);
assert!(!tracker.check(&cfg));
assert!(!tracker.check(&cfg));
assert!(tracker.check(&cfg));
assert!(tracker.is_stable());
assert_eq!(tracker.last_size(), 2048);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stability_tracker_growing_file() {
let dir = make_temp_dir("growing");
let path = dir.join("growing.mp4");
fs::write(&path, vec![0u8; 2048]).expect("write ok");
let cfg = FileStabilityConfig::new().required_checks(2);
let mut tracker = FileStabilityTracker::new(path.clone());
tracker.check(&cfg); tracker.check(&cfg);
fs::write(&path, vec![0u8; 4096]).expect("grow ok");
assert!(!tracker.check(&cfg));
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stability_tracker_too_small() {
let dir = make_temp_dir("small");
let path = dir.join("tiny.mp4");
fs::write(&path, b"x").expect("write ok");
let cfg = FileStabilityConfig::new().min_file_size(1024);
let mut tracker = FileStabilityTracker::new(path);
for _ in 0..10 {
assert!(!tracker.check(&cfg));
}
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_hot_folder_chain_empty() {
let chain = HotFolderChain::new();
assert_eq!(chain.stage_count(), 0);
assert!(chain.validate().is_err());
}
#[test]
fn test_hot_folder_chain_single_stage() {
let dir = make_temp_dir("chain1");
let mut chain = HotFolderChain::new();
chain.add_stage(WatchConfig::new(&dir));
assert_eq!(chain.stage_count(), 1);
assert!(chain.validate().is_ok());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_hot_folder_chain_two_stages_aligned() {
let dir1 = make_temp_dir("chain2a");
let dir2 = dir1.join("done");
fs::create_dir_all(&dir2).expect("create done dir");
let mut chain = HotFolderChain::new();
chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert_eq!(chain.stage_count(), 2);
assert!(chain.validate().is_ok());
fs::remove_dir_all(&dir1).ok();
}
#[test]
fn test_hot_folder_chain_misaligned() {
let dir1 = make_temp_dir("chain3a");
let dir2 = make_temp_dir("chain3b");
let mut chain = HotFolderChain::new();
chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert!(chain.validate().is_err());
fs::remove_dir_all(&dir1).ok();
fs::remove_dir_all(&dir2).ok();
}
#[test]
fn test_filename_pattern_exact() {
let p = FilenamePattern::new("video.mp4");
assert!(p.matches("video.mp4"));
assert!(p.matches("VIDEO.MP4")); assert!(!p.matches("audio.mp4"));
}
#[test]
fn test_filename_pattern_wildcard() {
let p = FilenamePattern::new("*.mp4");
assert!(p.matches("video.mp4"));
assert!(p.matches("CLIP.MP4"));
assert!(!p.matches("video.mkv"));
}
#[test]
fn test_filename_pattern_wildcard_prefix() {
let p = FilenamePattern::new("raw_*");
assert!(p.matches("raw_clip.mp4"));
assert!(p.matches("raw_"));
assert!(!p.matches("clip_raw.mp4"));
}
#[test]
fn test_filename_pattern_multiple_wildcards() {
let p = FilenamePattern::new("*_final_*");
assert!(p.matches("clip_final_v2.mp4"));
assert!(!p.matches("clip_draft_v2.mp4"));
}
#[test]
fn test_filename_pattern_case_sensitive() {
let p = FilenamePattern::new("Video.mp4").case_insensitive(false);
assert!(p.matches("Video.mp4"));
assert!(!p.matches("video.mp4"));
}
#[test]
fn test_retry_config_defaults() {
let cfg = RetryConfig::default();
assert_eq!(cfg.max_retries, 3);
assert_eq!(cfg.initial_delay_ms, 1000);
assert!((cfg.backoff_multiplier - 2.0).abs() < 1e-6);
}
#[test]
fn test_retry_delay_exponential() {
let cfg = RetryConfig::new()
.initial_delay_ms(1000)
.backoff_multiplier(2.0)
.max_delay_ms(10_000);
assert_eq!(cfg.delay_for_attempt(0), Duration::from_secs(1));
assert_eq!(cfg.delay_for_attempt(1), Duration::from_secs(2));
assert_eq!(cfg.delay_for_attempt(2), Duration::from_secs(4));
assert_eq!(cfg.delay_for_attempt(3), Duration::from_secs(8));
assert_eq!(cfg.delay_for_attempt(4), Duration::from_secs(10));
}
#[test]
fn test_retry_tracker() {
let cfg = RetryConfig::new().max_retries(3);
let mut tracker = RetryTracker::new(PathBuf::from("/tmp/test.mp4"));
assert!(tracker.can_retry(&cfg));
assert_eq!(tracker.attempts, 0);
tracker.record_failure("codec error");
assert_eq!(tracker.attempts, 1);
assert_eq!(tracker.last_error.as_deref(), Some("codec error"));
assert!(tracker.can_retry(&cfg));
tracker.record_failure("timeout");
tracker.record_failure("timeout");
assert!(!tracker.can_retry(&cfg));
}
#[test]
fn test_stats_empty() {
let stats = WatchFolderStats::new();
assert_eq!(stats.processed_count, 0);
assert_eq!(stats.error_count, 0);
assert!(stats.avg_processing_time_ms().is_none());
assert!((stats.success_rate() - 1.0).abs() < 1e-6);
}
#[test]
fn test_stats_record_success() {
let mut stats = WatchFolderStats::new();
stats.record_success(1000, 10_000_000);
stats.record_success(2000, 20_000_000);
assert_eq!(stats.processed_count, 2);
assert_eq!(stats.total_processing_time_ms, 3000);
assert_eq!(stats.avg_processing_time_ms(), Some(1500));
assert_eq!(stats.min_processing_time_ms, Some(1000));
assert_eq!(stats.max_processing_time_ms, Some(2000));
assert_eq!(stats.total_bytes_processed, 30_000_000);
}
#[test]
fn test_stats_success_rate() {
let mut stats = WatchFolderStats::new();
stats.record_success(100, 1000);
stats.record_success(100, 1000);
stats.record_error();
let rate = stats.success_rate();
assert!((rate - 2.0 / 3.0).abs() < 1e-6);
}
#[test]
fn test_stats_throughput() {
let mut stats = WatchFolderStats::new();
stats.record_success(1000, 1_000_000);
let bps = stats.avg_throughput_bps().expect("should have throughput");
assert!((bps - 1_000_000.0).abs() < 1.0);
}
}