Skip to main content

oximedia_transcode/
watch_folder.rs

1//! Watch folder automation for directory-based transcode monitoring.
2//!
3//! `TranscodeWatcher` polls a source directory for new media files and
4//! dispatches transcode jobs automatically.  The implementation is pure Rust
5//! (no `inotify`/`kqueue` bindings required) using a polling loop with
6//! configurable interval.
7
8#![allow(dead_code)]
9
10use std::collections::HashSet;
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13
14use crate::{Result, TranscodeConfig, TranscodeError};
15
16/// Action to take when a file has been processed.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PostProcessAction {
19    /// Leave the file in the watch folder.
20    Leave,
21    /// Move the file to the `done` sub-directory.
22    MoveToDone,
23    /// Delete the file from the watch folder.
24    Delete,
25}
26
27/// Policy for selecting the output directory of each transcode job.
28#[derive(Debug, Clone)]
29pub enum OutputLocation {
30    /// Place output in a fixed directory.
31    Fixed(PathBuf),
32    /// Place output next to the input, using the same name with a new extension.
33    SiblingWithExtension(String),
34    /// Place output in a `done/` sub-directory of the watch folder.
35    DoneSubDir,
36}
37
38/// Configuration for `TranscodeWatcher`.
39#[derive(Debug, Clone)]
40pub struct WatchConfig {
41    /// Directory to monitor for incoming files.
42    pub watch_dir: PathBuf,
43    /// File extensions to accept (lower-case, without leading dot).
44    pub accepted_extensions: Vec<String>,
45    /// How to determine the output path for each new file.
46    pub output_location: OutputLocation,
47    /// What to do with an input file after a successful transcode.
48    pub on_success: PostProcessAction,
49    /// What to do with an input file after a failed transcode.
50    pub on_failure: PostProcessAction,
51    /// How often to scan the watch directory (milliseconds).
52    pub poll_interval_ms: u64,
53    /// Base `TranscodeConfig` applied to every discovered file.
54    pub base_config: TranscodeConfig,
55    /// Maximum number of concurrent jobs.
56    pub max_concurrent: usize,
57}
58
59impl WatchConfig {
60    /// Creates a `WatchConfig` with sensible defaults.
61    ///
62    /// Accepts common video extensions, moves successful files to `done/`,
63    /// and polls every 5 seconds.
64    #[must_use]
65    pub fn new(watch_dir: impl Into<PathBuf>) -> Self {
66        Self {
67            watch_dir: watch_dir.into(),
68            accepted_extensions: vec![
69                "mp4".into(),
70                "mkv".into(),
71                "mov".into(),
72                "avi".into(),
73                "webm".into(),
74                "mxf".into(),
75                "ts".into(),
76                "m2ts".into(),
77            ],
78            output_location: OutputLocation::DoneSubDir,
79            on_success: PostProcessAction::MoveToDone,
80            on_failure: PostProcessAction::Leave,
81            poll_interval_ms: 5_000,
82            base_config: TranscodeConfig::default(),
83            max_concurrent: 2,
84        }
85    }
86
87    /// Sets the output location policy.
88    #[must_use]
89    pub fn output_location(mut self, loc: OutputLocation) -> Self {
90        self.output_location = loc;
91        self
92    }
93
94    /// Sets what happens to the source file after a successful transcode.
95    #[must_use]
96    pub fn on_success(mut self, action: PostProcessAction) -> Self {
97        self.on_success = action;
98        self
99    }
100
101    /// Sets what happens to the source file after a failed transcode.
102    #[must_use]
103    pub fn on_failure(mut self, action: PostProcessAction) -> Self {
104        self.on_failure = action;
105        self
106    }
107
108    /// Sets the polling interval in milliseconds.
109    #[must_use]
110    pub fn poll_interval_ms(mut self, ms: u64) -> Self {
111        self.poll_interval_ms = ms;
112        self
113    }
114
115    /// Overrides the base `TranscodeConfig`.
116    #[must_use]
117    pub fn base_config(mut self, config: TranscodeConfig) -> Self {
118        self.base_config = config;
119        self
120    }
121
122    /// Sets the maximum number of concurrent jobs.
123    #[must_use]
124    pub fn max_concurrent(mut self, n: usize) -> Self {
125        self.max_concurrent = n;
126        self
127    }
128
129    /// Validates the configuration.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the watch directory does not exist.
134    pub fn validate(&self) -> Result<()> {
135        if !self.watch_dir.exists() {
136            return Err(TranscodeError::InvalidInput(format!(
137                "Watch directory does not exist: {}",
138                self.watch_dir.display()
139            )));
140        }
141        if self.max_concurrent == 0 {
142            return Err(TranscodeError::InvalidInput(
143                "max_concurrent must be at least 1".into(),
144            ));
145        }
146        Ok(())
147    }
148}
149
150/// Status of a single watched file.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum WatchFileStatus {
153    /// Waiting to be processed.
154    Pending,
155    /// Currently being transcoded.
156    Processing,
157    /// Transcoded successfully.
158    Done,
159    /// Transcode failed; message contains the reason.
160    Failed(String),
161}
162
163/// A discovered file entry in the watch queue.
164#[derive(Debug, Clone)]
165pub struct WatchEntry {
166    /// Original source file path.
167    pub source: PathBuf,
168    /// Computed output path.
169    pub output: PathBuf,
170    /// Current status.
171    pub status: WatchFileStatus,
172}
173
174impl WatchEntry {
175    /// Creates a new watch entry.
176    #[must_use]
177    pub fn new(source: PathBuf, output: PathBuf) -> Self {
178        Self {
179            source,
180            output,
181            status: WatchFileStatus::Pending,
182        }
183    }
184}
185
186/// Directory-based transcode watcher.
187///
188/// Call [`TranscodeWatcher::scan`] to detect new files, and
189/// [`TranscodeWatcher::drain_pending`] to obtain `TranscodeConfig` values
190/// ready for submission to the job queue.
191pub struct TranscodeWatcher {
192    config: WatchConfig,
193    /// Paths already seen (regardless of processing status).
194    seen: HashSet<PathBuf>,
195    /// Queue of watch entries waiting to be processed.
196    queue: Vec<WatchEntry>,
197}
198
199impl TranscodeWatcher {
200    /// Creates a new watcher from `config`.
201    #[must_use]
202    pub fn new(config: WatchConfig) -> Self {
203        Self {
204            config,
205            seen: HashSet::new(),
206            queue: Vec::new(),
207        }
208    }
209
210    /// Returns the watcher configuration.
211    #[must_use]
212    pub fn config(&self) -> &WatchConfig {
213        &self.config
214    }
215
216    /// Returns the poll interval as a [`Duration`].
217    #[must_use]
218    pub fn poll_interval(&self) -> Duration {
219        Duration::from_millis(self.config.poll_interval_ms)
220    }
221
222    /// Scans the watch directory for new eligible files.
223    ///
224    /// Returns the number of new files enqueued.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the directory cannot be read.
229    pub fn scan(&mut self) -> Result<usize> {
230        let entries = std::fs::read_dir(&self.config.watch_dir).map_err(|e| {
231            TranscodeError::IoError(format!(
232                "Cannot read watch dir '{}': {e}",
233                self.config.watch_dir.display()
234            ))
235        })?;
236
237        let mut new_count = 0usize;
238
239        for entry in entries.flatten() {
240            let path = entry.path();
241            if !path.is_file() {
242                continue;
243            }
244            if self.seen.contains(&path) {
245                continue;
246            }
247            let ext = path
248                .extension()
249                .and_then(|e| e.to_str())
250                .map(str::to_lowercase)
251                .unwrap_or_default();
252            if !self.config.accepted_extensions.iter().any(|a| a == &ext) {
253                continue;
254            }
255
256            let output = self.resolve_output(&path);
257            self.seen.insert(path.clone());
258            self.queue.push(WatchEntry::new(path, output));
259            new_count += 1;
260        }
261
262        Ok(new_count)
263    }
264
265    /// Returns all pending entries (without removing them from the queue).
266    #[must_use]
267    pub fn pending(&self) -> Vec<&WatchEntry> {
268        self.queue
269            .iter()
270            .filter(|e| e.status == WatchFileStatus::Pending)
271            .collect()
272    }
273
274    /// Drains all pending entries into a `Vec<TranscodeConfig>` for submission
275    /// to the job queue, marking each entry as `Processing`.
276    pub fn drain_pending(&mut self) -> Vec<(WatchEntry, TranscodeConfig)> {
277        let mut out = Vec::new();
278
279        for entry in &mut self.queue {
280            if entry.status != WatchFileStatus::Pending {
281                continue;
282            }
283            entry.status = WatchFileStatus::Processing;
284
285            let mut job = self.config.base_config.clone();
286            job.input = entry.source.to_str().map(String::from);
287            job.output = entry.output.to_str().map(String::from);
288
289            out.push((entry.clone(), job));
290        }
291
292        out
293    }
294
295    /// Marks a watch entry as successfully processed and applies the configured
296    /// post-process action (move / delete / leave).
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the file move or delete operation fails.
301    pub fn mark_done(&mut self, source: &Path) -> Result<()> {
302        self.update_status(source, WatchFileStatus::Done);
303
304        match self.config.on_success {
305            PostProcessAction::Leave => {}
306            PostProcessAction::Delete => {
307                std::fs::remove_file(source).map_err(|e| {
308                    TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
309                })?;
310            }
311            PostProcessAction::MoveToDone => {
312                self.move_to_done_dir(source)?;
313            }
314        }
315
316        Ok(())
317    }
318
319    /// Marks a watch entry as failed and applies the configured on-failure action.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the file operation fails.
324    pub fn mark_failed(&mut self, source: &Path, reason: &str) -> Result<()> {
325        self.update_status(source, WatchFileStatus::Failed(reason.to_string()));
326
327        match self.config.on_failure {
328            PostProcessAction::Leave => {}
329            PostProcessAction::Delete => {
330                std::fs::remove_file(source).map_err(|e| {
331                    TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
332                })?;
333            }
334            PostProcessAction::MoveToDone => {
335                self.move_to_done_dir(source)?;
336            }
337        }
338
339        Ok(())
340    }
341
342    /// Returns the total number of queued entries.
343    #[must_use]
344    pub fn queue_len(&self) -> usize {
345        self.queue.len()
346    }
347
348    /// Returns the number of entries in each status category.
349    #[must_use]
350    pub fn status_counts(&self) -> WatchStatusCounts {
351        let mut counts = WatchStatusCounts::default();
352        for entry in &self.queue {
353            match entry.status {
354                WatchFileStatus::Pending => counts.pending += 1,
355                WatchFileStatus::Processing => counts.processing += 1,
356                WatchFileStatus::Done => counts.done += 1,
357                WatchFileStatus::Failed(_) => counts.failed += 1,
358            }
359        }
360        counts
361    }
362
363    // ── Internal helpers ──────────────────────────────────────────────────────
364
365    fn update_status(&mut self, source: &Path, new_status: WatchFileStatus) {
366        for entry in &mut self.queue {
367            if entry.source == source {
368                entry.status = new_status;
369                return;
370            }
371        }
372    }
373
374    fn resolve_output(&self, source: &Path) -> PathBuf {
375        match &self.config.output_location {
376            OutputLocation::Fixed(dir) => {
377                let filename = source
378                    .file_name()
379                    .map(PathBuf::from)
380                    .unwrap_or_else(|| PathBuf::from("output.mkv"));
381                dir.join(filename)
382            }
383            OutputLocation::SiblingWithExtension(ext) => {
384                let mut out = source.to_path_buf();
385                out.set_extension(ext.trim_start_matches('.'));
386                out
387            }
388            OutputLocation::DoneSubDir => {
389                let done_dir = self.config.watch_dir.join("done");
390                let filename = source
391                    .file_name()
392                    .map(PathBuf::from)
393                    .unwrap_or_else(|| PathBuf::from("output.mkv"));
394                done_dir.join(filename)
395            }
396        }
397    }
398
399    fn move_to_done_dir(&self, source: &Path) -> Result<()> {
400        let done_dir = self.config.watch_dir.join("done");
401        std::fs::create_dir_all(&done_dir)
402            .map_err(|e| TranscodeError::IoError(format!("Cannot create done dir: {e}")))?;
403        let dest = done_dir.join(
404            source
405                .file_name()
406                .unwrap_or_else(|| std::ffi::OsStr::new("moved_file")),
407        );
408        std::fs::rename(source, &dest).map_err(|e| {
409            TranscodeError::IoError(format!(
410                "Cannot move '{}' → '{}': {e}",
411                source.display(),
412                dest.display()
413            ))
414        })
415    }
416}
417
418/// Snapshot of watch queue status counts.
419#[derive(Debug, Clone, Default)]
420pub struct WatchStatusCounts {
421    /// Number of entries awaiting processing.
422    pub pending: usize,
423    /// Number of entries currently being transcoded.
424    pub processing: usize,
425    /// Number of successfully completed entries.
426    pub done: usize,
427    /// Number of failed entries.
428    pub failed: usize,
429}
430
431// ─── File stability detection ─────────────────────────────────────────────────
432
433/// Configuration for file stability detection.
434///
435/// Waits until a file has stopped growing before considering it ready
436/// for processing. This prevents partial files (still being copied or
437/// written by another process) from entering the transcode queue.
438#[derive(Debug, Clone)]
439pub struct FileStabilityConfig {
440    /// Number of consecutive stable checks required before a file is
441    /// considered complete.
442    pub required_stable_checks: u32,
443    /// Interval between stability checks in milliseconds.
444    pub check_interval_ms: u64,
445    /// Minimum file size in bytes before stability checks begin.
446    pub min_file_size: u64,
447}
448
449impl Default for FileStabilityConfig {
450    fn default() -> Self {
451        Self {
452            required_stable_checks: 3,
453            check_interval_ms: 2_000,
454            min_file_size: 1024,
455        }
456    }
457}
458
459impl FileStabilityConfig {
460    /// Creates a new stability config with default values.
461    #[must_use]
462    pub fn new() -> Self {
463        Self::default()
464    }
465
466    /// Sets the number of required stable checks.
467    #[must_use]
468    pub fn required_checks(mut self, n: u32) -> Self {
469        self.required_stable_checks = n;
470        self
471    }
472
473    /// Sets the check interval in milliseconds.
474    #[must_use]
475    pub fn check_interval_ms(mut self, ms: u64) -> Self {
476        self.check_interval_ms = ms;
477        self
478    }
479
480    /// Sets the minimum file size.
481    #[must_use]
482    pub fn min_file_size(mut self, size: u64) -> Self {
483        self.min_file_size = size;
484        self
485    }
486}
487
488/// Tracks stability state for a single file.
489#[derive(Debug, Clone)]
490pub struct FileStabilityTracker {
491    /// Path being tracked.
492    path: PathBuf,
493    /// Last observed file size.
494    last_size: u64,
495    /// Number of consecutive stable readings.
496    stable_count: u32,
497    /// Whether the file has been declared stable.
498    is_stable: bool,
499}
500
501impl FileStabilityTracker {
502    /// Creates a new tracker for the given path.
503    #[must_use]
504    pub fn new(path: PathBuf) -> Self {
505        Self {
506            path,
507            last_size: 0,
508            stable_count: 0,
509            is_stable: false,
510        }
511    }
512
513    /// Checks the file and updates stability state.
514    ///
515    /// Returns `true` if the file is now considered stable.
516    pub fn check(&mut self, config: &FileStabilityConfig) -> bool {
517        if self.is_stable {
518            return true;
519        }
520        let current_size = std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0);
521
522        if current_size < config.min_file_size {
523            self.stable_count = 0;
524            self.last_size = current_size;
525            return false;
526        }
527
528        if current_size == self.last_size {
529            self.stable_count += 1;
530        } else {
531            self.stable_count = 0;
532        }
533        self.last_size = current_size;
534
535        if self.stable_count >= config.required_stable_checks {
536            self.is_stable = true;
537        }
538        self.is_stable
539    }
540
541    /// Returns true if the file has been declared stable.
542    #[must_use]
543    pub fn is_stable(&self) -> bool {
544        self.is_stable
545    }
546
547    /// Returns the path being tracked.
548    #[must_use]
549    pub fn path(&self) -> &Path {
550        &self.path
551    }
552
553    /// Returns the last observed file size.
554    #[must_use]
555    pub fn last_size(&self) -> u64 {
556        self.last_size
557    }
558}
559
560// ─── Hot folder chains ────────────────────────────────────────────────────────
561
562/// A chain of watch folders where the output of one feeds into the next.
563///
564/// This enables multi-step processing workflows, for example:
565/// 1. Ingest folder → transcode to intermediate format
566/// 2. Intermediate folder → apply effects / normalisation
567/// 3. Final folder → encode to delivery format
568#[derive(Debug, Clone)]
569pub struct HotFolderChain {
570    /// Ordered list of watch configurations forming the chain.
571    stages: Vec<WatchConfig>,
572}
573
574impl HotFolderChain {
575    /// Creates a new empty chain.
576    #[must_use]
577    pub fn new() -> Self {
578        Self { stages: Vec::new() }
579    }
580
581    /// Appends a stage to the chain.
582    ///
583    /// The output directory of the previous stage should match the watch
584    /// directory of this stage for seamless chaining.
585    pub fn add_stage(&mut self, config: WatchConfig) {
586        self.stages.push(config);
587    }
588
589    /// Returns the number of stages in the chain.
590    #[must_use]
591    pub fn stage_count(&self) -> usize {
592        self.stages.len()
593    }
594
595    /// Returns the stages as a slice.
596    #[must_use]
597    pub fn stages(&self) -> &[WatchConfig] {
598        &self.stages
599    }
600
601    /// Validates that the chain is well-formed.
602    ///
603    /// Checks that each stage's output directory matches the next stage's
604    /// watch directory (for `DoneSubDir` and `Fixed` output locations).
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the chain is empty or directories don't align.
609    pub fn validate(&self) -> Result<()> {
610        if self.stages.is_empty() {
611            return Err(TranscodeError::InvalidInput(
612                "Hot folder chain has no stages".into(),
613            ));
614        }
615
616        for i in 0..self.stages.len().saturating_sub(1) {
617            let current = &self.stages[i];
618            let next = &self.stages[i + 1];
619
620            let output_dir = match &current.output_location {
621                OutputLocation::Fixed(dir) => Some(dir.clone()),
622                OutputLocation::DoneSubDir => Some(current.watch_dir.join("done")),
623                OutputLocation::SiblingWithExtension(_) => None,
624            };
625
626            if let Some(out_dir) = output_dir {
627                if out_dir != next.watch_dir {
628                    return Err(TranscodeError::InvalidInput(format!(
629                        "Stage {} output dir '{}' does not match stage {} watch dir '{}'",
630                        i,
631                        out_dir.display(),
632                        i + 1,
633                        next.watch_dir.display()
634                    )));
635                }
636            }
637        }
638
639        Ok(())
640    }
641}
642
643impl Default for HotFolderChain {
644    fn default() -> Self {
645        Self::new()
646    }
647}
648
649// ─── Filename pattern matching ────────────────────────────────────────────────
650
651/// Pattern-based file filter for selective watch folder processing.
652///
653/// Uses simple glob-like patterns (not full regex, to avoid a regex dependency)
654/// to match filenames. Supports `*` wildcard and case-insensitive matching.
655#[derive(Debug, Clone)]
656pub struct FilenamePattern {
657    /// The raw pattern string.
658    pattern: String,
659    /// Whether matching is case-insensitive.
660    case_insensitive: bool,
661}
662
663impl FilenamePattern {
664    /// Creates a new filename pattern.
665    #[must_use]
666    pub fn new(pattern: impl Into<String>) -> Self {
667        Self {
668            pattern: pattern.into(),
669            case_insensitive: true,
670        }
671    }
672
673    /// Sets case sensitivity.
674    #[must_use]
675    pub fn case_insensitive(mut self, ci: bool) -> Self {
676        self.case_insensitive = ci;
677        self
678    }
679
680    /// Tests whether the given filename matches this pattern.
681    ///
682    /// Supports `*` as a wildcard matching zero or more characters.
683    #[must_use]
684    pub fn matches(&self, filename: &str) -> bool {
685        let (pat, name) = if self.case_insensitive {
686            (self.pattern.to_lowercase(), filename.to_lowercase())
687        } else {
688            (self.pattern.clone(), filename.to_string())
689        };
690        Self::glob_match(&pat, &name)
691    }
692
693    /// Simple glob matching with `*` wildcard.
694    fn glob_match(pattern: &str, text: &str) -> bool {
695        let pat_chars: Vec<char> = pattern.chars().collect();
696        let txt_chars: Vec<char> = text.chars().collect();
697        let (plen, tlen) = (pat_chars.len(), txt_chars.len());
698
699        // DP approach for wildcard matching
700        let mut dp = vec![vec![false; tlen + 1]; plen + 1];
701        dp[0][0] = true;
702
703        // Handle leading *
704        for (i, &pc) in pat_chars.iter().enumerate() {
705            if pc == '*' {
706                dp[i + 1][0] = dp[i][0];
707            } else {
708                break;
709            }
710        }
711
712        for i in 1..=plen {
713            for j in 1..=tlen {
714                if pat_chars[i - 1] == '*' {
715                    dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
716                } else if pat_chars[i - 1] == '?' || pat_chars[i - 1] == txt_chars[j - 1] {
717                    dp[i][j] = dp[i - 1][j - 1];
718                }
719            }
720        }
721
722        dp[plen][tlen]
723    }
724
725    /// Returns the raw pattern string.
726    #[must_use]
727    pub fn pattern(&self) -> &str {
728        &self.pattern
729    }
730}
731
732// ─── Retry with exponential backoff ───────────────────────────────────────────
733
734/// Configuration for retry with exponential backoff.
735#[derive(Debug, Clone)]
736pub struct RetryConfig {
737    /// Maximum number of retry attempts.
738    pub max_retries: u32,
739    /// Initial delay before the first retry (milliseconds).
740    pub initial_delay_ms: u64,
741    /// Multiplier applied to the delay after each retry.
742    pub backoff_multiplier: f64,
743    /// Maximum delay between retries (milliseconds).
744    pub max_delay_ms: u64,
745}
746
747impl Default for RetryConfig {
748    fn default() -> Self {
749        Self {
750            max_retries: 3,
751            initial_delay_ms: 1_000,
752            backoff_multiplier: 2.0,
753            max_delay_ms: 30_000,
754        }
755    }
756}
757
758impl RetryConfig {
759    /// Creates a new retry config with default values.
760    #[must_use]
761    pub fn new() -> Self {
762        Self::default()
763    }
764
765    /// Sets the maximum number of retries.
766    #[must_use]
767    pub fn max_retries(mut self, n: u32) -> Self {
768        self.max_retries = n;
769        self
770    }
771
772    /// Sets the initial delay in milliseconds.
773    #[must_use]
774    pub fn initial_delay_ms(mut self, ms: u64) -> Self {
775        self.initial_delay_ms = ms;
776        self
777    }
778
779    /// Sets the backoff multiplier.
780    #[must_use]
781    pub fn backoff_multiplier(mut self, m: f64) -> Self {
782        self.backoff_multiplier = m;
783        self
784    }
785
786    /// Sets the maximum delay in milliseconds.
787    #[must_use]
788    pub fn max_delay_ms(mut self, ms: u64) -> Self {
789        self.max_delay_ms = ms;
790        self
791    }
792
793    /// Computes the delay for the given attempt number (0-based).
794    #[must_use]
795    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
796        if attempt == 0 {
797            return Duration::from_millis(self.initial_delay_ms);
798        }
799        let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
800        let clamped = delay.min(self.max_delay_ms as f64) as u64;
801        Duration::from_millis(clamped)
802    }
803}
804
805/// Tracks retry state for a single file.
806#[derive(Debug, Clone)]
807pub struct RetryTracker {
808    /// Path of the file being retried.
809    pub path: PathBuf,
810    /// Number of attempts made so far.
811    pub attempts: u32,
812    /// Last error message.
813    pub last_error: Option<String>,
814}
815
816impl RetryTracker {
817    /// Creates a new retry tracker.
818    #[must_use]
819    pub fn new(path: PathBuf) -> Self {
820        Self {
821            path,
822            attempts: 0,
823            last_error: None,
824        }
825    }
826
827    /// Records a failed attempt.
828    pub fn record_failure(&mut self, error: &str) {
829        self.attempts += 1;
830        self.last_error = Some(error.to_string());
831    }
832
833    /// Returns whether more retries are allowed given the config.
834    #[must_use]
835    pub fn can_retry(&self, config: &RetryConfig) -> bool {
836        self.attempts < config.max_retries
837    }
838
839    /// Returns the delay before the next retry.
840    #[must_use]
841    pub fn next_delay(&self, config: &RetryConfig) -> Duration {
842        config.delay_for_attempt(self.attempts)
843    }
844}
845
846// ─── Watch folder statistics ──────────────────────────────────────────────────
847
848/// Statistics for a watch folder's processing activity.
849#[derive(Debug, Clone, Default)]
850pub struct WatchFolderStats {
851    /// Total number of files processed successfully.
852    pub processed_count: u64,
853    /// Total number of files that failed processing.
854    pub error_count: u64,
855    /// Total processing time in milliseconds across all successful jobs.
856    pub total_processing_time_ms: u64,
857    /// Total bytes processed (input file sizes).
858    pub total_bytes_processed: u64,
859    /// Minimum processing time in milliseconds.
860    pub min_processing_time_ms: Option<u64>,
861    /// Maximum processing time in milliseconds.
862    pub max_processing_time_ms: Option<u64>,
863}
864
865impl WatchFolderStats {
866    /// Creates a new empty statistics tracker.
867    #[must_use]
868    pub fn new() -> Self {
869        Self::default()
870    }
871
872    /// Records a successful processing event.
873    pub fn record_success(&mut self, processing_time_ms: u64, file_size_bytes: u64) {
874        self.processed_count += 1;
875        self.total_processing_time_ms += processing_time_ms;
876        self.total_bytes_processed += file_size_bytes;
877
878        self.min_processing_time_ms = Some(
879            self.min_processing_time_ms
880                .map_or(processing_time_ms, |m| m.min(processing_time_ms)),
881        );
882        self.max_processing_time_ms = Some(
883            self.max_processing_time_ms
884                .map_or(processing_time_ms, |m| m.max(processing_time_ms)),
885        );
886    }
887
888    /// Records a failed processing event.
889    pub fn record_error(&mut self) {
890        self.error_count += 1;
891    }
892
893    /// Returns the average processing time in milliseconds, or `None` if no files processed.
894    #[must_use]
895    pub fn avg_processing_time_ms(&self) -> Option<u64> {
896        if self.processed_count == 0 {
897            return None;
898        }
899        Some(self.total_processing_time_ms / self.processed_count)
900    }
901
902    /// Returns the success rate as a fraction [0.0, 1.0].
903    #[must_use]
904    pub fn success_rate(&self) -> f64 {
905        let total = self.processed_count + self.error_count;
906        if total == 0 {
907            return 1.0;
908        }
909        self.processed_count as f64 / total as f64
910    }
911
912    /// Returns the average throughput in bytes per second, or `None` if no data.
913    #[must_use]
914    pub fn avg_throughput_bps(&self) -> Option<f64> {
915        if self.total_processing_time_ms == 0 || self.total_bytes_processed == 0 {
916            return None;
917        }
918        let secs = self.total_processing_time_ms as f64 / 1000.0;
919        Some(self.total_bytes_processed as f64 / secs)
920    }
921}
922
923// ─── Tests ────────────────────────────────────────────────────────────────────
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928    use std::env::temp_dir;
929    use std::fs;
930
931    fn make_temp_dir(suffix: &str) -> PathBuf {
932        let dir = temp_dir().join(format!("oximedia_watch_test_{suffix}"));
933        fs::create_dir_all(&dir).expect("create temp dir");
934        dir
935    }
936
937    fn touch(dir: &Path, name: &str) -> PathBuf {
938        let path = dir.join(name);
939        fs::write(&path, b"fake media").expect("create temp file");
940        path
941    }
942
943    #[test]
944    fn test_watch_config_new() {
945        let cfg = WatchConfig::new("/tmp/watch");
946        assert!(!cfg.accepted_extensions.is_empty());
947        assert_eq!(cfg.max_concurrent, 2);
948        assert_eq!(cfg.poll_interval_ms, 5_000);
949    }
950
951    #[test]
952    fn test_watch_config_validate_missing_dir() {
953        let cfg = WatchConfig::new("/nonexistent/path/for/oximedia_test");
954        assert!(cfg.validate().is_err());
955    }
956
957    #[test]
958    fn test_watch_config_validate_ok() {
959        let dir = make_temp_dir("cfg_ok");
960        let cfg = WatchConfig::new(&dir);
961        assert!(cfg.validate().is_ok());
962        fs::remove_dir_all(&dir).ok();
963    }
964
965    #[test]
966    fn test_scan_detects_new_files() {
967        let dir = make_temp_dir("scan");
968        touch(&dir, "video.mp4");
969        touch(&dir, "clip.mkv");
970        touch(&dir, "readme.txt"); // ignored
971
972        let cfg = WatchConfig::new(&dir);
973        let mut watcher = TranscodeWatcher::new(cfg);
974        let count = watcher.scan().expect("scan ok");
975        assert_eq!(count, 2);
976        assert_eq!(watcher.queue_len(), 2);
977
978        // Second scan should not re-enqueue
979        let count2 = watcher.scan().expect("scan ok");
980        assert_eq!(count2, 0);
981
982        fs::remove_dir_all(&dir).ok();
983    }
984
985    #[test]
986    fn test_drain_pending_creates_configs() {
987        let dir = make_temp_dir("drain");
988        touch(&dir, "a.mp4");
989
990        let cfg = WatchConfig::new(&dir);
991        let mut watcher = TranscodeWatcher::new(cfg);
992        watcher.scan().expect("scan ok");
993
994        let drained = watcher.drain_pending();
995        assert_eq!(drained.len(), 1);
996        let (entry, job) = &drained[0];
997        assert!(entry.source.ends_with("a.mp4"));
998        assert!(job.input.is_some());
999        assert!(job.output.is_some());
1000
1001        // After drain, pending count should be 0
1002        assert_eq!(watcher.pending().len(), 0);
1003
1004        fs::remove_dir_all(&dir).ok();
1005    }
1006
1007    #[test]
1008    fn test_mark_done_updates_status() {
1009        let dir = make_temp_dir("mark_done");
1010        let file = touch(&dir, "b.mp4");
1011
1012        let cfg = WatchConfig::new(&dir).on_success(PostProcessAction::Leave);
1013        let mut watcher = TranscodeWatcher::new(cfg);
1014        watcher.scan().expect("scan ok");
1015        watcher.drain_pending();
1016
1017        watcher.mark_done(&file).expect("mark done ok");
1018
1019        let counts = watcher.status_counts();
1020        assert_eq!(counts.done, 1);
1021        assert_eq!(counts.failed, 0);
1022
1023        fs::remove_dir_all(&dir).ok();
1024    }
1025
1026    #[test]
1027    fn test_mark_failed_updates_status() {
1028        let dir = make_temp_dir("mark_failed");
1029        let file = touch(&dir, "c.mp4");
1030
1031        let cfg = WatchConfig::new(&dir).on_failure(PostProcessAction::Leave);
1032        let mut watcher = TranscodeWatcher::new(cfg);
1033        watcher.scan().expect("scan ok");
1034        watcher.drain_pending();
1035
1036        watcher
1037            .mark_failed(&file, "codec not found")
1038            .expect("mark failed ok");
1039
1040        let counts = watcher.status_counts();
1041        assert_eq!(counts.failed, 1);
1042
1043        fs::remove_dir_all(&dir).ok();
1044    }
1045
1046    #[test]
1047    fn test_status_counts() {
1048        let dir = make_temp_dir("counts");
1049        touch(&dir, "x.mp4");
1050        touch(&dir, "y.mkv");
1051
1052        let cfg = WatchConfig::new(&dir);
1053        let mut watcher = TranscodeWatcher::new(cfg);
1054        watcher.scan().expect("scan ok");
1055
1056        let counts = watcher.status_counts();
1057        assert_eq!(counts.pending, 2);
1058        assert_eq!(counts.processing, 0);
1059
1060        fs::remove_dir_all(&dir).ok();
1061    }
1062
1063    #[test]
1064    fn test_poll_interval() {
1065        let cfg = WatchConfig::new("/tmp").poll_interval_ms(2000);
1066        let watcher = TranscodeWatcher::new(cfg);
1067        assert_eq!(watcher.poll_interval(), Duration::from_secs(2));
1068    }
1069
1070    #[test]
1071    fn test_output_location_sibling() {
1072        let dir = make_temp_dir("sibling");
1073        touch(&dir, "d.mp4");
1074
1075        let cfg = WatchConfig::new(&dir)
1076            .output_location(OutputLocation::SiblingWithExtension("mkv".into()));
1077        let mut watcher = TranscodeWatcher::new(cfg);
1078        watcher.scan().expect("scan ok");
1079
1080        let entry = &watcher.queue[0];
1081        assert!(entry
1082            .output
1083            .extension()
1084            .map(|e| e == "mkv")
1085            .unwrap_or(false));
1086
1087        fs::remove_dir_all(&dir).ok();
1088    }
1089
1090    // ── File stability tests ─────────────────────────────────────────────────
1091
1092    #[test]
1093    fn test_stability_config_defaults() {
1094        let cfg = FileStabilityConfig::default();
1095        assert_eq!(cfg.required_stable_checks, 3);
1096        assert_eq!(cfg.check_interval_ms, 2000);
1097        assert_eq!(cfg.min_file_size, 1024);
1098    }
1099
1100    #[test]
1101    fn test_stability_config_builder() {
1102        let cfg = FileStabilityConfig::new()
1103            .required_checks(5)
1104            .check_interval_ms(1000)
1105            .min_file_size(4096);
1106        assert_eq!(cfg.required_stable_checks, 5);
1107        assert_eq!(cfg.check_interval_ms, 1000);
1108        assert_eq!(cfg.min_file_size, 4096);
1109    }
1110
1111    #[test]
1112    fn test_stability_tracker_stable_file() {
1113        let dir = make_temp_dir("stability");
1114        let path = dir.join("stable.mp4");
1115        // Write a file larger than default min_file_size
1116        fs::write(&path, vec![0u8; 2048]).expect("write ok");
1117
1118        let cfg = FileStabilityConfig::new().required_checks(2);
1119        let mut tracker = FileStabilityTracker::new(path);
1120
1121        // First check: sets baseline
1122        assert!(!tracker.check(&cfg));
1123        // Second check: size unchanged → stable_count = 1
1124        assert!(!tracker.check(&cfg));
1125        // Third check: stable_count = 2 → stable!
1126        assert!(tracker.check(&cfg));
1127        assert!(tracker.is_stable());
1128        assert_eq!(tracker.last_size(), 2048);
1129
1130        fs::remove_dir_all(&dir).ok();
1131    }
1132
1133    #[test]
1134    fn test_stability_tracker_growing_file() {
1135        let dir = make_temp_dir("growing");
1136        let path = dir.join("growing.mp4");
1137        fs::write(&path, vec![0u8; 2048]).expect("write ok");
1138
1139        let cfg = FileStabilityConfig::new().required_checks(2);
1140        let mut tracker = FileStabilityTracker::new(path.clone());
1141
1142        tracker.check(&cfg); // baseline
1143        tracker.check(&cfg); // stable 1
1144
1145        // File grows
1146        fs::write(&path, vec![0u8; 4096]).expect("grow ok");
1147        assert!(!tracker.check(&cfg)); // reset
1148
1149        fs::remove_dir_all(&dir).ok();
1150    }
1151
1152    #[test]
1153    fn test_stability_tracker_too_small() {
1154        let dir = make_temp_dir("small");
1155        let path = dir.join("tiny.mp4");
1156        fs::write(&path, b"x").expect("write ok");
1157
1158        let cfg = FileStabilityConfig::new().min_file_size(1024);
1159        let mut tracker = FileStabilityTracker::new(path);
1160
1161        for _ in 0..10 {
1162            assert!(!tracker.check(&cfg));
1163        }
1164
1165        fs::remove_dir_all(&dir).ok();
1166    }
1167
1168    // ── Hot folder chain tests ───────────────────────────────────────────────
1169
1170    #[test]
1171    fn test_hot_folder_chain_empty() {
1172        let chain = HotFolderChain::new();
1173        assert_eq!(chain.stage_count(), 0);
1174        assert!(chain.validate().is_err());
1175    }
1176
1177    #[test]
1178    fn test_hot_folder_chain_single_stage() {
1179        let dir = make_temp_dir("chain1");
1180        let mut chain = HotFolderChain::new();
1181        chain.add_stage(WatchConfig::new(&dir));
1182        assert_eq!(chain.stage_count(), 1);
1183        assert!(chain.validate().is_ok());
1184        fs::remove_dir_all(&dir).ok();
1185    }
1186
1187    #[test]
1188    fn test_hot_folder_chain_two_stages_aligned() {
1189        let dir1 = make_temp_dir("chain2a");
1190        let dir2 = dir1.join("done");
1191        fs::create_dir_all(&dir2).expect("create done dir");
1192
1193        let mut chain = HotFolderChain::new();
1194        chain.add_stage(WatchConfig::new(&dir1)); // output = dir1/done
1195        chain.add_stage(WatchConfig::new(&dir2)); // watch = dir1/done
1196        assert_eq!(chain.stage_count(), 2);
1197        assert!(chain.validate().is_ok());
1198
1199        fs::remove_dir_all(&dir1).ok();
1200    }
1201
1202    #[test]
1203    fn test_hot_folder_chain_misaligned() {
1204        let dir1 = make_temp_dir("chain3a");
1205        let dir2 = make_temp_dir("chain3b");
1206
1207        let mut chain = HotFolderChain::new();
1208        chain.add_stage(WatchConfig::new(&dir1)); // output = dir1/done
1209        chain.add_stage(WatchConfig::new(&dir2)); // watch = dir2 (mismatch)
1210        assert!(chain.validate().is_err());
1211
1212        fs::remove_dir_all(&dir1).ok();
1213        fs::remove_dir_all(&dir2).ok();
1214    }
1215
1216    // ── Filename pattern tests ───────────────────────────────────────────────
1217
1218    #[test]
1219    fn test_filename_pattern_exact() {
1220        let p = FilenamePattern::new("video.mp4");
1221        assert!(p.matches("video.mp4"));
1222        assert!(p.matches("VIDEO.MP4")); // case insensitive
1223        assert!(!p.matches("audio.mp4"));
1224    }
1225
1226    #[test]
1227    fn test_filename_pattern_wildcard() {
1228        let p = FilenamePattern::new("*.mp4");
1229        assert!(p.matches("video.mp4"));
1230        assert!(p.matches("CLIP.MP4"));
1231        assert!(!p.matches("video.mkv"));
1232    }
1233
1234    #[test]
1235    fn test_filename_pattern_wildcard_prefix() {
1236        let p = FilenamePattern::new("raw_*");
1237        assert!(p.matches("raw_clip.mp4"));
1238        assert!(p.matches("raw_"));
1239        assert!(!p.matches("clip_raw.mp4"));
1240    }
1241
1242    #[test]
1243    fn test_filename_pattern_multiple_wildcards() {
1244        let p = FilenamePattern::new("*_final_*");
1245        assert!(p.matches("clip_final_v2.mp4"));
1246        assert!(!p.matches("clip_draft_v2.mp4"));
1247    }
1248
1249    #[test]
1250    fn test_filename_pattern_case_sensitive() {
1251        let p = FilenamePattern::new("Video.mp4").case_insensitive(false);
1252        assert!(p.matches("Video.mp4"));
1253        assert!(!p.matches("video.mp4"));
1254    }
1255
1256    // ── Retry config tests ───────────────────────────────────────────────────
1257
1258    #[test]
1259    fn test_retry_config_defaults() {
1260        let cfg = RetryConfig::default();
1261        assert_eq!(cfg.max_retries, 3);
1262        assert_eq!(cfg.initial_delay_ms, 1000);
1263        assert!((cfg.backoff_multiplier - 2.0).abs() < 1e-6);
1264    }
1265
1266    #[test]
1267    fn test_retry_delay_exponential() {
1268        let cfg = RetryConfig::new()
1269            .initial_delay_ms(1000)
1270            .backoff_multiplier(2.0)
1271            .max_delay_ms(10_000);
1272
1273        assert_eq!(cfg.delay_for_attempt(0), Duration::from_secs(1));
1274        assert_eq!(cfg.delay_for_attempt(1), Duration::from_secs(2));
1275        assert_eq!(cfg.delay_for_attempt(2), Duration::from_secs(4));
1276        assert_eq!(cfg.delay_for_attempt(3), Duration::from_secs(8));
1277        // Clamped to max
1278        assert_eq!(cfg.delay_for_attempt(4), Duration::from_secs(10));
1279    }
1280
1281    #[test]
1282    fn test_retry_tracker() {
1283        let cfg = RetryConfig::new().max_retries(3);
1284        let mut tracker = RetryTracker::new(PathBuf::from("/tmp/test.mp4"));
1285
1286        assert!(tracker.can_retry(&cfg));
1287        assert_eq!(tracker.attempts, 0);
1288
1289        tracker.record_failure("codec error");
1290        assert_eq!(tracker.attempts, 1);
1291        assert_eq!(tracker.last_error.as_deref(), Some("codec error"));
1292        assert!(tracker.can_retry(&cfg));
1293
1294        tracker.record_failure("timeout");
1295        tracker.record_failure("timeout");
1296        assert!(!tracker.can_retry(&cfg));
1297    }
1298
1299    // ── Watch folder statistics tests ────────────────────────────────────────
1300
1301    #[test]
1302    fn test_stats_empty() {
1303        let stats = WatchFolderStats::new();
1304        assert_eq!(stats.processed_count, 0);
1305        assert_eq!(stats.error_count, 0);
1306        assert!(stats.avg_processing_time_ms().is_none());
1307        assert!((stats.success_rate() - 1.0).abs() < 1e-6);
1308    }
1309
1310    #[test]
1311    fn test_stats_record_success() {
1312        let mut stats = WatchFolderStats::new();
1313        stats.record_success(1000, 10_000_000);
1314        stats.record_success(2000, 20_000_000);
1315
1316        assert_eq!(stats.processed_count, 2);
1317        assert_eq!(stats.total_processing_time_ms, 3000);
1318        assert_eq!(stats.avg_processing_time_ms(), Some(1500));
1319        assert_eq!(stats.min_processing_time_ms, Some(1000));
1320        assert_eq!(stats.max_processing_time_ms, Some(2000));
1321        assert_eq!(stats.total_bytes_processed, 30_000_000);
1322    }
1323
1324    #[test]
1325    fn test_stats_success_rate() {
1326        let mut stats = WatchFolderStats::new();
1327        stats.record_success(100, 1000);
1328        stats.record_success(100, 1000);
1329        stats.record_error();
1330
1331        let rate = stats.success_rate();
1332        assert!((rate - 2.0 / 3.0).abs() < 1e-6);
1333    }
1334
1335    #[test]
1336    fn test_stats_throughput() {
1337        let mut stats = WatchFolderStats::new();
1338        stats.record_success(1000, 1_000_000); // 1 MB in 1 second
1339
1340        let bps = stats.avg_throughput_bps().expect("should have throughput");
1341        assert!((bps - 1_000_000.0).abs() < 1.0);
1342    }
1343}