1#![allow(dead_code)]
9
10use std::collections::HashSet;
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13
14use crate::{Result, TranscodeConfig, TranscodeError};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PostProcessAction {
19 Leave,
21 MoveToDone,
23 Delete,
25}
26
27#[derive(Debug, Clone)]
29pub enum OutputLocation {
30 Fixed(PathBuf),
32 SiblingWithExtension(String),
34 DoneSubDir,
36}
37
38#[derive(Debug, Clone)]
40pub struct WatchConfig {
41 pub watch_dir: PathBuf,
43 pub accepted_extensions: Vec<String>,
45 pub output_location: OutputLocation,
47 pub on_success: PostProcessAction,
49 pub on_failure: PostProcessAction,
51 pub poll_interval_ms: u64,
53 pub base_config: TranscodeConfig,
55 pub max_concurrent: usize,
57}
58
59impl WatchConfig {
60 #[must_use]
65 pub fn new(watch_dir: impl Into<PathBuf>) -> Self {
66 Self {
67 watch_dir: watch_dir.into(),
68 accepted_extensions: vec![
69 "mp4".into(),
70 "mkv".into(),
71 "mov".into(),
72 "avi".into(),
73 "webm".into(),
74 "mxf".into(),
75 "ts".into(),
76 "m2ts".into(),
77 ],
78 output_location: OutputLocation::DoneSubDir,
79 on_success: PostProcessAction::MoveToDone,
80 on_failure: PostProcessAction::Leave,
81 poll_interval_ms: 5_000,
82 base_config: TranscodeConfig::default(),
83 max_concurrent: 2,
84 }
85 }
86
87 #[must_use]
89 pub fn output_location(mut self, loc: OutputLocation) -> Self {
90 self.output_location = loc;
91 self
92 }
93
94 #[must_use]
96 pub fn on_success(mut self, action: PostProcessAction) -> Self {
97 self.on_success = action;
98 self
99 }
100
101 #[must_use]
103 pub fn on_failure(mut self, action: PostProcessAction) -> Self {
104 self.on_failure = action;
105 self
106 }
107
108 #[must_use]
110 pub fn poll_interval_ms(mut self, ms: u64) -> Self {
111 self.poll_interval_ms = ms;
112 self
113 }
114
115 #[must_use]
117 pub fn base_config(mut self, config: TranscodeConfig) -> Self {
118 self.base_config = config;
119 self
120 }
121
122 #[must_use]
124 pub fn max_concurrent(mut self, n: usize) -> Self {
125 self.max_concurrent = n;
126 self
127 }
128
129 pub fn validate(&self) -> Result<()> {
135 if !self.watch_dir.exists() {
136 return Err(TranscodeError::InvalidInput(format!(
137 "Watch directory does not exist: {}",
138 self.watch_dir.display()
139 )));
140 }
141 if self.max_concurrent == 0 {
142 return Err(TranscodeError::InvalidInput(
143 "max_concurrent must be at least 1".into(),
144 ));
145 }
146 Ok(())
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum WatchFileStatus {
153 Pending,
155 Processing,
157 Done,
159 Failed(String),
161}
162
163#[derive(Debug, Clone)]
165pub struct WatchEntry {
166 pub source: PathBuf,
168 pub output: PathBuf,
170 pub status: WatchFileStatus,
172}
173
174impl WatchEntry {
175 #[must_use]
177 pub fn new(source: PathBuf, output: PathBuf) -> Self {
178 Self {
179 source,
180 output,
181 status: WatchFileStatus::Pending,
182 }
183 }
184}
185
186pub struct TranscodeWatcher {
192 config: WatchConfig,
193 seen: HashSet<PathBuf>,
195 queue: Vec<WatchEntry>,
197}
198
199impl TranscodeWatcher {
200 #[must_use]
202 pub fn new(config: WatchConfig) -> Self {
203 Self {
204 config,
205 seen: HashSet::new(),
206 queue: Vec::new(),
207 }
208 }
209
210 #[must_use]
212 pub fn config(&self) -> &WatchConfig {
213 &self.config
214 }
215
216 #[must_use]
218 pub fn poll_interval(&self) -> Duration {
219 Duration::from_millis(self.config.poll_interval_ms)
220 }
221
222 pub fn scan(&mut self) -> Result<usize> {
230 let entries = std::fs::read_dir(&self.config.watch_dir).map_err(|e| {
231 TranscodeError::IoError(format!(
232 "Cannot read watch dir '{}': {e}",
233 self.config.watch_dir.display()
234 ))
235 })?;
236
237 let mut new_count = 0usize;
238
239 for entry in entries.flatten() {
240 let path = entry.path();
241 if !path.is_file() {
242 continue;
243 }
244 if self.seen.contains(&path) {
245 continue;
246 }
247 let ext = path
248 .extension()
249 .and_then(|e| e.to_str())
250 .map(str::to_lowercase)
251 .unwrap_or_default();
252 if !self.config.accepted_extensions.iter().any(|a| a == &ext) {
253 continue;
254 }
255
256 let output = self.resolve_output(&path);
257 self.seen.insert(path.clone());
258 self.queue.push(WatchEntry::new(path, output));
259 new_count += 1;
260 }
261
262 Ok(new_count)
263 }
264
265 #[must_use]
267 pub fn pending(&self) -> Vec<&WatchEntry> {
268 self.queue
269 .iter()
270 .filter(|e| e.status == WatchFileStatus::Pending)
271 .collect()
272 }
273
274 pub fn drain_pending(&mut self) -> Vec<(WatchEntry, TranscodeConfig)> {
277 let mut out = Vec::new();
278
279 for entry in &mut self.queue {
280 if entry.status != WatchFileStatus::Pending {
281 continue;
282 }
283 entry.status = WatchFileStatus::Processing;
284
285 let mut job = self.config.base_config.clone();
286 job.input = entry.source.to_str().map(String::from);
287 job.output = entry.output.to_str().map(String::from);
288
289 out.push((entry.clone(), job));
290 }
291
292 out
293 }
294
295 pub fn mark_done(&mut self, source: &Path) -> Result<()> {
302 self.update_status(source, WatchFileStatus::Done);
303
304 match self.config.on_success {
305 PostProcessAction::Leave => {}
306 PostProcessAction::Delete => {
307 std::fs::remove_file(source).map_err(|e| {
308 TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
309 })?;
310 }
311 PostProcessAction::MoveToDone => {
312 self.move_to_done_dir(source)?;
313 }
314 }
315
316 Ok(())
317 }
318
319 pub fn mark_failed(&mut self, source: &Path, reason: &str) -> Result<()> {
325 self.update_status(source, WatchFileStatus::Failed(reason.to_string()));
326
327 match self.config.on_failure {
328 PostProcessAction::Leave => {}
329 PostProcessAction::Delete => {
330 std::fs::remove_file(source).map_err(|e| {
331 TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
332 })?;
333 }
334 PostProcessAction::MoveToDone => {
335 self.move_to_done_dir(source)?;
336 }
337 }
338
339 Ok(())
340 }
341
342 #[must_use]
344 pub fn queue_len(&self) -> usize {
345 self.queue.len()
346 }
347
348 #[must_use]
350 pub fn status_counts(&self) -> WatchStatusCounts {
351 let mut counts = WatchStatusCounts::default();
352 for entry in &self.queue {
353 match entry.status {
354 WatchFileStatus::Pending => counts.pending += 1,
355 WatchFileStatus::Processing => counts.processing += 1,
356 WatchFileStatus::Done => counts.done += 1,
357 WatchFileStatus::Failed(_) => counts.failed += 1,
358 }
359 }
360 counts
361 }
362
363 fn update_status(&mut self, source: &Path, new_status: WatchFileStatus) {
366 for entry in &mut self.queue {
367 if entry.source == source {
368 entry.status = new_status;
369 return;
370 }
371 }
372 }
373
374 fn resolve_output(&self, source: &Path) -> PathBuf {
375 match &self.config.output_location {
376 OutputLocation::Fixed(dir) => {
377 let filename = source
378 .file_name()
379 .map(PathBuf::from)
380 .unwrap_or_else(|| PathBuf::from("output.mkv"));
381 dir.join(filename)
382 }
383 OutputLocation::SiblingWithExtension(ext) => {
384 let mut out = source.to_path_buf();
385 out.set_extension(ext.trim_start_matches('.'));
386 out
387 }
388 OutputLocation::DoneSubDir => {
389 let done_dir = self.config.watch_dir.join("done");
390 let filename = source
391 .file_name()
392 .map(PathBuf::from)
393 .unwrap_or_else(|| PathBuf::from("output.mkv"));
394 done_dir.join(filename)
395 }
396 }
397 }
398
399 fn move_to_done_dir(&self, source: &Path) -> Result<()> {
400 let done_dir = self.config.watch_dir.join("done");
401 std::fs::create_dir_all(&done_dir)
402 .map_err(|e| TranscodeError::IoError(format!("Cannot create done dir: {e}")))?;
403 let dest = done_dir.join(
404 source
405 .file_name()
406 .unwrap_or_else(|| std::ffi::OsStr::new("moved_file")),
407 );
408 std::fs::rename(source, &dest).map_err(|e| {
409 TranscodeError::IoError(format!(
410 "Cannot move '{}' → '{}': {e}",
411 source.display(),
412 dest.display()
413 ))
414 })
415 }
416}
417
418#[derive(Debug, Clone, Default)]
420pub struct WatchStatusCounts {
421 pub pending: usize,
423 pub processing: usize,
425 pub done: usize,
427 pub failed: usize,
429}
430
431#[derive(Debug, Clone)]
439pub struct FileStabilityConfig {
440 pub required_stable_checks: u32,
443 pub check_interval_ms: u64,
445 pub min_file_size: u64,
447}
448
449impl Default for FileStabilityConfig {
450 fn default() -> Self {
451 Self {
452 required_stable_checks: 3,
453 check_interval_ms: 2_000,
454 min_file_size: 1024,
455 }
456 }
457}
458
459impl FileStabilityConfig {
460 #[must_use]
462 pub fn new() -> Self {
463 Self::default()
464 }
465
466 #[must_use]
468 pub fn required_checks(mut self, n: u32) -> Self {
469 self.required_stable_checks = n;
470 self
471 }
472
473 #[must_use]
475 pub fn check_interval_ms(mut self, ms: u64) -> Self {
476 self.check_interval_ms = ms;
477 self
478 }
479
480 #[must_use]
482 pub fn min_file_size(mut self, size: u64) -> Self {
483 self.min_file_size = size;
484 self
485 }
486}
487
488#[derive(Debug, Clone)]
490pub struct FileStabilityTracker {
491 path: PathBuf,
493 last_size: u64,
495 stable_count: u32,
497 is_stable: bool,
499}
500
501impl FileStabilityTracker {
502 #[must_use]
504 pub fn new(path: PathBuf) -> Self {
505 Self {
506 path,
507 last_size: 0,
508 stable_count: 0,
509 is_stable: false,
510 }
511 }
512
513 pub fn check(&mut self, config: &FileStabilityConfig) -> bool {
517 if self.is_stable {
518 return true;
519 }
520 let current_size = std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0);
521
522 if current_size < config.min_file_size {
523 self.stable_count = 0;
524 self.last_size = current_size;
525 return false;
526 }
527
528 if current_size == self.last_size {
529 self.stable_count += 1;
530 } else {
531 self.stable_count = 0;
532 }
533 self.last_size = current_size;
534
535 if self.stable_count >= config.required_stable_checks {
536 self.is_stable = true;
537 }
538 self.is_stable
539 }
540
541 #[must_use]
543 pub fn is_stable(&self) -> bool {
544 self.is_stable
545 }
546
547 #[must_use]
549 pub fn path(&self) -> &Path {
550 &self.path
551 }
552
553 #[must_use]
555 pub fn last_size(&self) -> u64 {
556 self.last_size
557 }
558}
559
560#[derive(Debug, Clone)]
569pub struct HotFolderChain {
570 stages: Vec<WatchConfig>,
572}
573
574impl HotFolderChain {
575 #[must_use]
577 pub fn new() -> Self {
578 Self { stages: Vec::new() }
579 }
580
581 pub fn add_stage(&mut self, config: WatchConfig) {
586 self.stages.push(config);
587 }
588
589 #[must_use]
591 pub fn stage_count(&self) -> usize {
592 self.stages.len()
593 }
594
595 #[must_use]
597 pub fn stages(&self) -> &[WatchConfig] {
598 &self.stages
599 }
600
601 pub fn validate(&self) -> Result<()> {
610 if self.stages.is_empty() {
611 return Err(TranscodeError::InvalidInput(
612 "Hot folder chain has no stages".into(),
613 ));
614 }
615
616 for i in 0..self.stages.len().saturating_sub(1) {
617 let current = &self.stages[i];
618 let next = &self.stages[i + 1];
619
620 let output_dir = match ¤t.output_location {
621 OutputLocation::Fixed(dir) => Some(dir.clone()),
622 OutputLocation::DoneSubDir => Some(current.watch_dir.join("done")),
623 OutputLocation::SiblingWithExtension(_) => None,
624 };
625
626 if let Some(out_dir) = output_dir {
627 if out_dir != next.watch_dir {
628 return Err(TranscodeError::InvalidInput(format!(
629 "Stage {} output dir '{}' does not match stage {} watch dir '{}'",
630 i,
631 out_dir.display(),
632 i + 1,
633 next.watch_dir.display()
634 )));
635 }
636 }
637 }
638
639 Ok(())
640 }
641}
642
643impl Default for HotFolderChain {
644 fn default() -> Self {
645 Self::new()
646 }
647}
648
649#[derive(Debug, Clone)]
656pub struct FilenamePattern {
657 pattern: String,
659 case_insensitive: bool,
661}
662
663impl FilenamePattern {
664 #[must_use]
666 pub fn new(pattern: impl Into<String>) -> Self {
667 Self {
668 pattern: pattern.into(),
669 case_insensitive: true,
670 }
671 }
672
673 #[must_use]
675 pub fn case_insensitive(mut self, ci: bool) -> Self {
676 self.case_insensitive = ci;
677 self
678 }
679
680 #[must_use]
684 pub fn matches(&self, filename: &str) -> bool {
685 let (pat, name) = if self.case_insensitive {
686 (self.pattern.to_lowercase(), filename.to_lowercase())
687 } else {
688 (self.pattern.clone(), filename.to_string())
689 };
690 Self::glob_match(&pat, &name)
691 }
692
693 fn glob_match(pattern: &str, text: &str) -> bool {
695 let pat_chars: Vec<char> = pattern.chars().collect();
696 let txt_chars: Vec<char> = text.chars().collect();
697 let (plen, tlen) = (pat_chars.len(), txt_chars.len());
698
699 let mut dp = vec![vec![false; tlen + 1]; plen + 1];
701 dp[0][0] = true;
702
703 for (i, &pc) in pat_chars.iter().enumerate() {
705 if pc == '*' {
706 dp[i + 1][0] = dp[i][0];
707 } else {
708 break;
709 }
710 }
711
712 for i in 1..=plen {
713 for j in 1..=tlen {
714 if pat_chars[i - 1] == '*' {
715 dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
716 } else if pat_chars[i - 1] == '?' || pat_chars[i - 1] == txt_chars[j - 1] {
717 dp[i][j] = dp[i - 1][j - 1];
718 }
719 }
720 }
721
722 dp[plen][tlen]
723 }
724
725 #[must_use]
727 pub fn pattern(&self) -> &str {
728 &self.pattern
729 }
730}
731
732#[derive(Debug, Clone)]
736pub struct RetryConfig {
737 pub max_retries: u32,
739 pub initial_delay_ms: u64,
741 pub backoff_multiplier: f64,
743 pub max_delay_ms: u64,
745}
746
747impl Default for RetryConfig {
748 fn default() -> Self {
749 Self {
750 max_retries: 3,
751 initial_delay_ms: 1_000,
752 backoff_multiplier: 2.0,
753 max_delay_ms: 30_000,
754 }
755 }
756}
757
758impl RetryConfig {
759 #[must_use]
761 pub fn new() -> Self {
762 Self::default()
763 }
764
765 #[must_use]
767 pub fn max_retries(mut self, n: u32) -> Self {
768 self.max_retries = n;
769 self
770 }
771
772 #[must_use]
774 pub fn initial_delay_ms(mut self, ms: u64) -> Self {
775 self.initial_delay_ms = ms;
776 self
777 }
778
779 #[must_use]
781 pub fn backoff_multiplier(mut self, m: f64) -> Self {
782 self.backoff_multiplier = m;
783 self
784 }
785
786 #[must_use]
788 pub fn max_delay_ms(mut self, ms: u64) -> Self {
789 self.max_delay_ms = ms;
790 self
791 }
792
793 #[must_use]
795 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
796 if attempt == 0 {
797 return Duration::from_millis(self.initial_delay_ms);
798 }
799 let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
800 let clamped = delay.min(self.max_delay_ms as f64) as u64;
801 Duration::from_millis(clamped)
802 }
803}
804
805#[derive(Debug, Clone)]
807pub struct RetryTracker {
808 pub path: PathBuf,
810 pub attempts: u32,
812 pub last_error: Option<String>,
814}
815
816impl RetryTracker {
817 #[must_use]
819 pub fn new(path: PathBuf) -> Self {
820 Self {
821 path,
822 attempts: 0,
823 last_error: None,
824 }
825 }
826
827 pub fn record_failure(&mut self, error: &str) {
829 self.attempts += 1;
830 self.last_error = Some(error.to_string());
831 }
832
833 #[must_use]
835 pub fn can_retry(&self, config: &RetryConfig) -> bool {
836 self.attempts < config.max_retries
837 }
838
839 #[must_use]
841 pub fn next_delay(&self, config: &RetryConfig) -> Duration {
842 config.delay_for_attempt(self.attempts)
843 }
844}
845
846#[derive(Debug, Clone, Default)]
850pub struct WatchFolderStats {
851 pub processed_count: u64,
853 pub error_count: u64,
855 pub total_processing_time_ms: u64,
857 pub total_bytes_processed: u64,
859 pub min_processing_time_ms: Option<u64>,
861 pub max_processing_time_ms: Option<u64>,
863}
864
865impl WatchFolderStats {
866 #[must_use]
868 pub fn new() -> Self {
869 Self::default()
870 }
871
872 pub fn record_success(&mut self, processing_time_ms: u64, file_size_bytes: u64) {
874 self.processed_count += 1;
875 self.total_processing_time_ms += processing_time_ms;
876 self.total_bytes_processed += file_size_bytes;
877
878 self.min_processing_time_ms = Some(
879 self.min_processing_time_ms
880 .map_or(processing_time_ms, |m| m.min(processing_time_ms)),
881 );
882 self.max_processing_time_ms = Some(
883 self.max_processing_time_ms
884 .map_or(processing_time_ms, |m| m.max(processing_time_ms)),
885 );
886 }
887
888 pub fn record_error(&mut self) {
890 self.error_count += 1;
891 }
892
893 #[must_use]
895 pub fn avg_processing_time_ms(&self) -> Option<u64> {
896 if self.processed_count == 0 {
897 return None;
898 }
899 Some(self.total_processing_time_ms / self.processed_count)
900 }
901
902 #[must_use]
904 pub fn success_rate(&self) -> f64 {
905 let total = self.processed_count + self.error_count;
906 if total == 0 {
907 return 1.0;
908 }
909 self.processed_count as f64 / total as f64
910 }
911
912 #[must_use]
914 pub fn avg_throughput_bps(&self) -> Option<f64> {
915 if self.total_processing_time_ms == 0 || self.total_bytes_processed == 0 {
916 return None;
917 }
918 let secs = self.total_processing_time_ms as f64 / 1000.0;
919 Some(self.total_bytes_processed as f64 / secs)
920 }
921}
922
923#[cfg(test)]
926mod tests {
927 use super::*;
928 use std::env::temp_dir;
929 use std::fs;
930
931 fn make_temp_dir(suffix: &str) -> PathBuf {
932 let dir = temp_dir().join(format!("oximedia_watch_test_{suffix}"));
933 fs::create_dir_all(&dir).expect("create temp dir");
934 dir
935 }
936
937 fn touch(dir: &Path, name: &str) -> PathBuf {
938 let path = dir.join(name);
939 fs::write(&path, b"fake media").expect("create temp file");
940 path
941 }
942
943 #[test]
944 fn test_watch_config_new() {
945 let cfg = WatchConfig::new("/tmp/watch");
946 assert!(!cfg.accepted_extensions.is_empty());
947 assert_eq!(cfg.max_concurrent, 2);
948 assert_eq!(cfg.poll_interval_ms, 5_000);
949 }
950
951 #[test]
952 fn test_watch_config_validate_missing_dir() {
953 let cfg = WatchConfig::new("/nonexistent/path/for/oximedia_test");
954 assert!(cfg.validate().is_err());
955 }
956
957 #[test]
958 fn test_watch_config_validate_ok() {
959 let dir = make_temp_dir("cfg_ok");
960 let cfg = WatchConfig::new(&dir);
961 assert!(cfg.validate().is_ok());
962 fs::remove_dir_all(&dir).ok();
963 }
964
965 #[test]
966 fn test_scan_detects_new_files() {
967 let dir = make_temp_dir("scan");
968 touch(&dir, "video.mp4");
969 touch(&dir, "clip.mkv");
970 touch(&dir, "readme.txt"); let cfg = WatchConfig::new(&dir);
973 let mut watcher = TranscodeWatcher::new(cfg);
974 let count = watcher.scan().expect("scan ok");
975 assert_eq!(count, 2);
976 assert_eq!(watcher.queue_len(), 2);
977
978 let count2 = watcher.scan().expect("scan ok");
980 assert_eq!(count2, 0);
981
982 fs::remove_dir_all(&dir).ok();
983 }
984
985 #[test]
986 fn test_drain_pending_creates_configs() {
987 let dir = make_temp_dir("drain");
988 touch(&dir, "a.mp4");
989
990 let cfg = WatchConfig::new(&dir);
991 let mut watcher = TranscodeWatcher::new(cfg);
992 watcher.scan().expect("scan ok");
993
994 let drained = watcher.drain_pending();
995 assert_eq!(drained.len(), 1);
996 let (entry, job) = &drained[0];
997 assert!(entry.source.ends_with("a.mp4"));
998 assert!(job.input.is_some());
999 assert!(job.output.is_some());
1000
1001 assert_eq!(watcher.pending().len(), 0);
1003
1004 fs::remove_dir_all(&dir).ok();
1005 }
1006
1007 #[test]
1008 fn test_mark_done_updates_status() {
1009 let dir = make_temp_dir("mark_done");
1010 let file = touch(&dir, "b.mp4");
1011
1012 let cfg = WatchConfig::new(&dir).on_success(PostProcessAction::Leave);
1013 let mut watcher = TranscodeWatcher::new(cfg);
1014 watcher.scan().expect("scan ok");
1015 watcher.drain_pending();
1016
1017 watcher.mark_done(&file).expect("mark done ok");
1018
1019 let counts = watcher.status_counts();
1020 assert_eq!(counts.done, 1);
1021 assert_eq!(counts.failed, 0);
1022
1023 fs::remove_dir_all(&dir).ok();
1024 }
1025
1026 #[test]
1027 fn test_mark_failed_updates_status() {
1028 let dir = make_temp_dir("mark_failed");
1029 let file = touch(&dir, "c.mp4");
1030
1031 let cfg = WatchConfig::new(&dir).on_failure(PostProcessAction::Leave);
1032 let mut watcher = TranscodeWatcher::new(cfg);
1033 watcher.scan().expect("scan ok");
1034 watcher.drain_pending();
1035
1036 watcher
1037 .mark_failed(&file, "codec not found")
1038 .expect("mark failed ok");
1039
1040 let counts = watcher.status_counts();
1041 assert_eq!(counts.failed, 1);
1042
1043 fs::remove_dir_all(&dir).ok();
1044 }
1045
1046 #[test]
1047 fn test_status_counts() {
1048 let dir = make_temp_dir("counts");
1049 touch(&dir, "x.mp4");
1050 touch(&dir, "y.mkv");
1051
1052 let cfg = WatchConfig::new(&dir);
1053 let mut watcher = TranscodeWatcher::new(cfg);
1054 watcher.scan().expect("scan ok");
1055
1056 let counts = watcher.status_counts();
1057 assert_eq!(counts.pending, 2);
1058 assert_eq!(counts.processing, 0);
1059
1060 fs::remove_dir_all(&dir).ok();
1061 }
1062
1063 #[test]
1064 fn test_poll_interval() {
1065 let cfg = WatchConfig::new("/tmp").poll_interval_ms(2000);
1066 let watcher = TranscodeWatcher::new(cfg);
1067 assert_eq!(watcher.poll_interval(), Duration::from_secs(2));
1068 }
1069
1070 #[test]
1071 fn test_output_location_sibling() {
1072 let dir = make_temp_dir("sibling");
1073 touch(&dir, "d.mp4");
1074
1075 let cfg = WatchConfig::new(&dir)
1076 .output_location(OutputLocation::SiblingWithExtension("mkv".into()));
1077 let mut watcher = TranscodeWatcher::new(cfg);
1078 watcher.scan().expect("scan ok");
1079
1080 let entry = &watcher.queue[0];
1081 assert!(entry
1082 .output
1083 .extension()
1084 .map(|e| e == "mkv")
1085 .unwrap_or(false));
1086
1087 fs::remove_dir_all(&dir).ok();
1088 }
1089
1090 #[test]
1093 fn test_stability_config_defaults() {
1094 let cfg = FileStabilityConfig::default();
1095 assert_eq!(cfg.required_stable_checks, 3);
1096 assert_eq!(cfg.check_interval_ms, 2000);
1097 assert_eq!(cfg.min_file_size, 1024);
1098 }
1099
1100 #[test]
1101 fn test_stability_config_builder() {
1102 let cfg = FileStabilityConfig::new()
1103 .required_checks(5)
1104 .check_interval_ms(1000)
1105 .min_file_size(4096);
1106 assert_eq!(cfg.required_stable_checks, 5);
1107 assert_eq!(cfg.check_interval_ms, 1000);
1108 assert_eq!(cfg.min_file_size, 4096);
1109 }
1110
1111 #[test]
1112 fn test_stability_tracker_stable_file() {
1113 let dir = make_temp_dir("stability");
1114 let path = dir.join("stable.mp4");
1115 fs::write(&path, vec![0u8; 2048]).expect("write ok");
1117
1118 let cfg = FileStabilityConfig::new().required_checks(2);
1119 let mut tracker = FileStabilityTracker::new(path);
1120
1121 assert!(!tracker.check(&cfg));
1123 assert!(!tracker.check(&cfg));
1125 assert!(tracker.check(&cfg));
1127 assert!(tracker.is_stable());
1128 assert_eq!(tracker.last_size(), 2048);
1129
1130 fs::remove_dir_all(&dir).ok();
1131 }
1132
1133 #[test]
1134 fn test_stability_tracker_growing_file() {
1135 let dir = make_temp_dir("growing");
1136 let path = dir.join("growing.mp4");
1137 fs::write(&path, vec![0u8; 2048]).expect("write ok");
1138
1139 let cfg = FileStabilityConfig::new().required_checks(2);
1140 let mut tracker = FileStabilityTracker::new(path.clone());
1141
1142 tracker.check(&cfg); tracker.check(&cfg); fs::write(&path, vec![0u8; 4096]).expect("grow ok");
1147 assert!(!tracker.check(&cfg)); fs::remove_dir_all(&dir).ok();
1150 }
1151
1152 #[test]
1153 fn test_stability_tracker_too_small() {
1154 let dir = make_temp_dir("small");
1155 let path = dir.join("tiny.mp4");
1156 fs::write(&path, b"x").expect("write ok");
1157
1158 let cfg = FileStabilityConfig::new().min_file_size(1024);
1159 let mut tracker = FileStabilityTracker::new(path);
1160
1161 for _ in 0..10 {
1162 assert!(!tracker.check(&cfg));
1163 }
1164
1165 fs::remove_dir_all(&dir).ok();
1166 }
1167
1168 #[test]
1171 fn test_hot_folder_chain_empty() {
1172 let chain = HotFolderChain::new();
1173 assert_eq!(chain.stage_count(), 0);
1174 assert!(chain.validate().is_err());
1175 }
1176
1177 #[test]
1178 fn test_hot_folder_chain_single_stage() {
1179 let dir = make_temp_dir("chain1");
1180 let mut chain = HotFolderChain::new();
1181 chain.add_stage(WatchConfig::new(&dir));
1182 assert_eq!(chain.stage_count(), 1);
1183 assert!(chain.validate().is_ok());
1184 fs::remove_dir_all(&dir).ok();
1185 }
1186
1187 #[test]
1188 fn test_hot_folder_chain_two_stages_aligned() {
1189 let dir1 = make_temp_dir("chain2a");
1190 let dir2 = dir1.join("done");
1191 fs::create_dir_all(&dir2).expect("create done dir");
1192
1193 let mut chain = HotFolderChain::new();
1194 chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert_eq!(chain.stage_count(), 2);
1197 assert!(chain.validate().is_ok());
1198
1199 fs::remove_dir_all(&dir1).ok();
1200 }
1201
1202 #[test]
1203 fn test_hot_folder_chain_misaligned() {
1204 let dir1 = make_temp_dir("chain3a");
1205 let dir2 = make_temp_dir("chain3b");
1206
1207 let mut chain = HotFolderChain::new();
1208 chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert!(chain.validate().is_err());
1211
1212 fs::remove_dir_all(&dir1).ok();
1213 fs::remove_dir_all(&dir2).ok();
1214 }
1215
1216 #[test]
1219 fn test_filename_pattern_exact() {
1220 let p = FilenamePattern::new("video.mp4");
1221 assert!(p.matches("video.mp4"));
1222 assert!(p.matches("VIDEO.MP4")); assert!(!p.matches("audio.mp4"));
1224 }
1225
1226 #[test]
1227 fn test_filename_pattern_wildcard() {
1228 let p = FilenamePattern::new("*.mp4");
1229 assert!(p.matches("video.mp4"));
1230 assert!(p.matches("CLIP.MP4"));
1231 assert!(!p.matches("video.mkv"));
1232 }
1233
1234 #[test]
1235 fn test_filename_pattern_wildcard_prefix() {
1236 let p = FilenamePattern::new("raw_*");
1237 assert!(p.matches("raw_clip.mp4"));
1238 assert!(p.matches("raw_"));
1239 assert!(!p.matches("clip_raw.mp4"));
1240 }
1241
1242 #[test]
1243 fn test_filename_pattern_multiple_wildcards() {
1244 let p = FilenamePattern::new("*_final_*");
1245 assert!(p.matches("clip_final_v2.mp4"));
1246 assert!(!p.matches("clip_draft_v2.mp4"));
1247 }
1248
1249 #[test]
1250 fn test_filename_pattern_case_sensitive() {
1251 let p = FilenamePattern::new("Video.mp4").case_insensitive(false);
1252 assert!(p.matches("Video.mp4"));
1253 assert!(!p.matches("video.mp4"));
1254 }
1255
1256 #[test]
1259 fn test_retry_config_defaults() {
1260 let cfg = RetryConfig::default();
1261 assert_eq!(cfg.max_retries, 3);
1262 assert_eq!(cfg.initial_delay_ms, 1000);
1263 assert!((cfg.backoff_multiplier - 2.0).abs() < 1e-6);
1264 }
1265
1266 #[test]
1267 fn test_retry_delay_exponential() {
1268 let cfg = RetryConfig::new()
1269 .initial_delay_ms(1000)
1270 .backoff_multiplier(2.0)
1271 .max_delay_ms(10_000);
1272
1273 assert_eq!(cfg.delay_for_attempt(0), Duration::from_secs(1));
1274 assert_eq!(cfg.delay_for_attempt(1), Duration::from_secs(2));
1275 assert_eq!(cfg.delay_for_attempt(2), Duration::from_secs(4));
1276 assert_eq!(cfg.delay_for_attempt(3), Duration::from_secs(8));
1277 assert_eq!(cfg.delay_for_attempt(4), Duration::from_secs(10));
1279 }
1280
1281 #[test]
1282 fn test_retry_tracker() {
1283 let cfg = RetryConfig::new().max_retries(3);
1284 let mut tracker = RetryTracker::new(PathBuf::from("/tmp/test.mp4"));
1285
1286 assert!(tracker.can_retry(&cfg));
1287 assert_eq!(tracker.attempts, 0);
1288
1289 tracker.record_failure("codec error");
1290 assert_eq!(tracker.attempts, 1);
1291 assert_eq!(tracker.last_error.as_deref(), Some("codec error"));
1292 assert!(tracker.can_retry(&cfg));
1293
1294 tracker.record_failure("timeout");
1295 tracker.record_failure("timeout");
1296 assert!(!tracker.can_retry(&cfg));
1297 }
1298
1299 #[test]
1302 fn test_stats_empty() {
1303 let stats = WatchFolderStats::new();
1304 assert_eq!(stats.processed_count, 0);
1305 assert_eq!(stats.error_count, 0);
1306 assert!(stats.avg_processing_time_ms().is_none());
1307 assert!((stats.success_rate() - 1.0).abs() < 1e-6);
1308 }
1309
1310 #[test]
1311 fn test_stats_record_success() {
1312 let mut stats = WatchFolderStats::new();
1313 stats.record_success(1000, 10_000_000);
1314 stats.record_success(2000, 20_000_000);
1315
1316 assert_eq!(stats.processed_count, 2);
1317 assert_eq!(stats.total_processing_time_ms, 3000);
1318 assert_eq!(stats.avg_processing_time_ms(), Some(1500));
1319 assert_eq!(stats.min_processing_time_ms, Some(1000));
1320 assert_eq!(stats.max_processing_time_ms, Some(2000));
1321 assert_eq!(stats.total_bytes_processed, 30_000_000);
1322 }
1323
1324 #[test]
1325 fn test_stats_success_rate() {
1326 let mut stats = WatchFolderStats::new();
1327 stats.record_success(100, 1000);
1328 stats.record_success(100, 1000);
1329 stats.record_error();
1330
1331 let rate = stats.success_rate();
1332 assert!((rate - 2.0 / 3.0).abs() < 1e-6);
1333 }
1334
1335 #[test]
1336 fn test_stats_throughput() {
1337 let mut stats = WatchFolderStats::new();
1338 stats.record_success(1000, 1_000_000); let bps = stats.avg_throughput_bps().expect("should have throughput");
1341 assert!((bps - 1_000_000.0).abs() < 1.0);
1342 }
1343}