1#![allow(dead_code)]
9
10use std::collections::HashSet;
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13
14use crate::{Result, TranscodeConfig, TranscodeError};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PostProcessAction {
19 Leave,
21 MoveToDone,
23 Delete,
25}
26
27#[derive(Debug, Clone)]
29pub enum OutputLocation {
30 Fixed(PathBuf),
32 SiblingWithExtension(String),
34 DoneSubDir,
36}
37
38#[derive(Debug, Clone)]
40pub struct WatchConfig {
41 pub watch_dir: PathBuf,
43 pub accepted_extensions: Vec<String>,
45 pub output_location: OutputLocation,
47 pub on_success: PostProcessAction,
49 pub on_failure: PostProcessAction,
51 pub poll_interval_ms: u64,
53 pub base_config: TranscodeConfig,
55 pub max_concurrent: usize,
57}
58
59impl WatchConfig {
60 #[must_use]
65 pub fn new(watch_dir: impl Into<PathBuf>) -> Self {
66 Self {
67 watch_dir: watch_dir.into(),
68 accepted_extensions: vec![
69 "mp4".into(),
70 "mkv".into(),
71 "mov".into(),
72 "avi".into(),
73 "webm".into(),
74 "mxf".into(),
75 "ts".into(),
76 "m2ts".into(),
77 ],
78 output_location: OutputLocation::DoneSubDir,
79 on_success: PostProcessAction::MoveToDone,
80 on_failure: PostProcessAction::Leave,
81 poll_interval_ms: 5_000,
82 base_config: TranscodeConfig::default(),
83 max_concurrent: 2,
84 }
85 }
86
87 #[must_use]
89 pub fn output_location(mut self, loc: OutputLocation) -> Self {
90 self.output_location = loc;
91 self
92 }
93
94 #[must_use]
96 pub fn on_success(mut self, action: PostProcessAction) -> Self {
97 self.on_success = action;
98 self
99 }
100
101 #[must_use]
103 pub fn on_failure(mut self, action: PostProcessAction) -> Self {
104 self.on_failure = action;
105 self
106 }
107
108 #[must_use]
110 pub fn poll_interval_ms(mut self, ms: u64) -> Self {
111 self.poll_interval_ms = ms;
112 self
113 }
114
115 #[must_use]
117 pub fn base_config(mut self, config: TranscodeConfig) -> Self {
118 self.base_config = config;
119 self
120 }
121
122 #[must_use]
124 pub fn max_concurrent(mut self, n: usize) -> Self {
125 self.max_concurrent = n;
126 self
127 }
128
129 pub fn validate(&self) -> Result<()> {
135 if !self.watch_dir.exists() {
136 return Err(TranscodeError::InvalidInput(format!(
137 "Watch directory does not exist: {}",
138 self.watch_dir.display()
139 )));
140 }
141 if self.max_concurrent == 0 {
142 return Err(TranscodeError::InvalidInput(
143 "max_concurrent must be at least 1".into(),
144 ));
145 }
146 Ok(())
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum WatchFileStatus {
153 Pending,
155 Processing,
157 Done,
159 Failed(String),
161}
162
163#[derive(Debug, Clone)]
165pub struct WatchEntry {
166 pub source: PathBuf,
168 pub output: PathBuf,
170 pub status: WatchFileStatus,
172}
173
174impl WatchEntry {
175 #[must_use]
177 pub fn new(source: PathBuf, output: PathBuf) -> Self {
178 Self {
179 source,
180 output,
181 status: WatchFileStatus::Pending,
182 }
183 }
184}
185
186pub struct TranscodeWatcher {
192 config: WatchConfig,
193 seen: HashSet<PathBuf>,
195 queue: Vec<WatchEntry>,
197}
198
199impl TranscodeWatcher {
200 #[must_use]
202 pub fn new(config: WatchConfig) -> Self {
203 Self {
204 config,
205 seen: HashSet::new(),
206 queue: Vec::new(),
207 }
208 }
209
210 #[must_use]
212 pub fn config(&self) -> &WatchConfig {
213 &self.config
214 }
215
216 #[must_use]
218 pub fn poll_interval(&self) -> Duration {
219 Duration::from_millis(self.config.poll_interval_ms)
220 }
221
222 pub fn scan(&mut self) -> Result<usize> {
230 let entries = std::fs::read_dir(&self.config.watch_dir).map_err(|e| {
231 TranscodeError::IoError(format!(
232 "Cannot read watch dir '{}': {e}",
233 self.config.watch_dir.display()
234 ))
235 })?;
236
237 let mut new_count = 0usize;
238
239 for entry in entries.flatten() {
240 let path = entry.path();
241 if !path.is_file() {
242 continue;
243 }
244 if self.seen.contains(&path) {
245 continue;
246 }
247 let ext = path
248 .extension()
249 .and_then(|e| e.to_str())
250 .map(str::to_lowercase)
251 .unwrap_or_default();
252 if !self.config.accepted_extensions.iter().any(|a| a == &ext) {
253 continue;
254 }
255
256 let output = self.resolve_output(&path);
257 self.seen.insert(path.clone());
258 self.queue.push(WatchEntry::new(path, output));
259 new_count += 1;
260 }
261
262 Ok(new_count)
263 }
264
265 #[must_use]
267 pub fn pending(&self) -> Vec<&WatchEntry> {
268 self.queue
269 .iter()
270 .filter(|e| e.status == WatchFileStatus::Pending)
271 .collect()
272 }
273
274 pub fn drain_pending(&mut self) -> Vec<(WatchEntry, TranscodeConfig)> {
277 let mut out = Vec::new();
278
279 for entry in &mut self.queue {
280 if entry.status != WatchFileStatus::Pending {
281 continue;
282 }
283 entry.status = WatchFileStatus::Processing;
284
285 let mut job = self.config.base_config.clone();
286 job.input = entry.source.to_str().map(String::from);
287 job.output = entry.output.to_str().map(String::from);
288
289 out.push((entry.clone(), job));
290 }
291
292 out
293 }
294
295 pub fn mark_done(&mut self, source: &Path) -> Result<()> {
302 self.update_status(source, WatchFileStatus::Done);
303
304 match self.config.on_success {
305 PostProcessAction::Leave => {}
306 PostProcessAction::Delete => {
307 std::fs::remove_file(source).map_err(|e| {
308 TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
309 })?;
310 }
311 PostProcessAction::MoveToDone => {
312 self.move_to_done_dir(source)?;
313 }
314 }
315
316 Ok(())
317 }
318
319 pub fn mark_failed(&mut self, source: &Path, reason: &str) -> Result<()> {
325 self.update_status(source, WatchFileStatus::Failed(reason.to_string()));
326
327 match self.config.on_failure {
328 PostProcessAction::Leave => {}
329 PostProcessAction::Delete => {
330 std::fs::remove_file(source).map_err(|e| {
331 TranscodeError::IoError(format!("Failed to delete '{}': {e}", source.display()))
332 })?;
333 }
334 PostProcessAction::MoveToDone => {
335 self.move_to_done_dir(source)?;
336 }
337 }
338
339 Ok(())
340 }
341
342 #[must_use]
344 pub fn queue_len(&self) -> usize {
345 self.queue.len()
346 }
347
348 #[must_use]
350 pub fn status_counts(&self) -> WatchStatusCounts {
351 let mut counts = WatchStatusCounts::default();
352 for entry in &self.queue {
353 match entry.status {
354 WatchFileStatus::Pending => counts.pending += 1,
355 WatchFileStatus::Processing => counts.processing += 1,
356 WatchFileStatus::Done => counts.done += 1,
357 WatchFileStatus::Failed(_) => counts.failed += 1,
358 }
359 }
360 counts
361 }
362
363 fn update_status(&mut self, source: &Path, new_status: WatchFileStatus) {
366 for entry in &mut self.queue {
367 if entry.source == source {
368 entry.status = new_status;
369 return;
370 }
371 }
372 }
373
374 fn resolve_output(&self, source: &Path) -> PathBuf {
375 match &self.config.output_location {
376 OutputLocation::Fixed(dir) => {
377 let filename = source
378 .file_name()
379 .map(PathBuf::from)
380 .unwrap_or_else(|| PathBuf::from("output.mkv"));
381 dir.join(filename)
382 }
383 OutputLocation::SiblingWithExtension(ext) => {
384 let mut out = source.to_path_buf();
385 out.set_extension(ext.trim_start_matches('.'));
386 out
387 }
388 OutputLocation::DoneSubDir => {
389 let done_dir = self.config.watch_dir.join("done");
390 let filename = source
391 .file_name()
392 .map(PathBuf::from)
393 .unwrap_or_else(|| PathBuf::from("output.mkv"));
394 done_dir.join(filename)
395 }
396 }
397 }
398
399 fn move_to_done_dir(&self, source: &Path) -> Result<()> {
400 let done_dir = self.config.watch_dir.join("done");
401 std::fs::create_dir_all(&done_dir)
402 .map_err(|e| TranscodeError::IoError(format!("Cannot create done dir: {e}")))?;
403 let dest = done_dir.join(
404 source
405 .file_name()
406 .unwrap_or_else(|| std::ffi::OsStr::new("moved_file")),
407 );
408 std::fs::rename(source, &dest).map_err(|e| {
409 TranscodeError::IoError(format!(
410 "Cannot move '{}' → '{}': {e}",
411 source.display(),
412 dest.display()
413 ))
414 })
415 }
416}
417
418#[derive(Debug, Clone, Default)]
420pub struct WatchStatusCounts {
421 pub pending: usize,
423 pub processing: usize,
425 pub done: usize,
427 pub failed: usize,
429}
430
431#[derive(Debug, Clone)]
439pub struct FileStabilityConfig {
440 pub required_stable_checks: u32,
443 pub check_interval_ms: u64,
445 pub min_file_size: u64,
447}
448
449impl Default for FileStabilityConfig {
450 fn default() -> Self {
451 Self {
452 required_stable_checks: 3,
453 check_interval_ms: 2_000,
454 min_file_size: 1024,
455 }
456 }
457}
458
459impl FileStabilityConfig {
460 #[must_use]
462 pub fn new() -> Self {
463 Self::default()
464 }
465
466 #[must_use]
468 pub fn required_checks(mut self, n: u32) -> Self {
469 self.required_stable_checks = n;
470 self
471 }
472
473 #[must_use]
475 pub fn check_interval_ms(mut self, ms: u64) -> Self {
476 self.check_interval_ms = ms;
477 self
478 }
479
480 #[must_use]
482 pub fn min_file_size(mut self, size: u64) -> Self {
483 self.min_file_size = size;
484 self
485 }
486}
487
488#[derive(Debug, Clone)]
490pub struct FileStabilityTracker {
491 path: PathBuf,
493 last_size: u64,
495 stable_count: u32,
497 is_stable: bool,
499}
500
501impl FileStabilityTracker {
502 #[must_use]
504 pub fn new(path: PathBuf) -> Self {
505 Self {
506 path,
507 last_size: 0,
508 stable_count: 0,
509 is_stable: false,
510 }
511 }
512
513 pub fn check(&mut self, config: &FileStabilityConfig) -> bool {
517 if self.is_stable {
518 return true;
519 }
520 let current_size = std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0);
521
522 if current_size < config.min_file_size {
523 self.stable_count = 0;
524 self.last_size = current_size;
525 return false;
526 }
527
528 if current_size == self.last_size {
529 self.stable_count += 1;
530 } else {
531 self.stable_count = 0;
532 }
533 self.last_size = current_size;
534
535 if self.stable_count >= config.required_stable_checks {
536 self.is_stable = true;
537 }
538 self.is_stable
539 }
540
541 #[must_use]
543 pub fn is_stable(&self) -> bool {
544 self.is_stable
545 }
546
547 #[must_use]
549 pub fn path(&self) -> &Path {
550 &self.path
551 }
552
553 #[must_use]
555 pub fn last_size(&self) -> u64 {
556 self.last_size
557 }
558}
559
560#[derive(Debug, Clone)]
569pub struct HotFolderChain {
570 stages: Vec<WatchConfig>,
572}
573
574impl HotFolderChain {
575 #[must_use]
577 pub fn new() -> Self {
578 Self { stages: Vec::new() }
579 }
580
581 pub fn add_stage(&mut self, config: WatchConfig) {
586 self.stages.push(config);
587 }
588
589 #[must_use]
591 pub fn stage_count(&self) -> usize {
592 self.stages.len()
593 }
594
595 #[must_use]
597 pub fn stages(&self) -> &[WatchConfig] {
598 &self.stages
599 }
600
601 pub fn validate(&self) -> Result<()> {
610 if self.stages.is_empty() {
611 return Err(TranscodeError::InvalidInput(
612 "Hot folder chain has no stages".into(),
613 ));
614 }
615
616 for i in 0..self.stages.len().saturating_sub(1) {
617 let current = &self.stages[i];
618 let next = &self.stages[i + 1];
619
620 let output_dir = match ¤t.output_location {
621 OutputLocation::Fixed(dir) => Some(dir.clone()),
622 OutputLocation::DoneSubDir => Some(current.watch_dir.join("done")),
623 OutputLocation::SiblingWithExtension(_) => None,
624 };
625
626 if let Some(out_dir) = output_dir {
627 if out_dir != next.watch_dir {
628 return Err(TranscodeError::InvalidInput(format!(
629 "Stage {} output dir '{}' does not match stage {} watch dir '{}'",
630 i,
631 out_dir.display(),
632 i + 1,
633 next.watch_dir.display()
634 )));
635 }
636 }
637 }
638
639 Ok(())
640 }
641}
642
643impl Default for HotFolderChain {
644 fn default() -> Self {
645 Self::new()
646 }
647}
648
649#[derive(Debug, Clone)]
656pub struct FilenamePattern {
657 pattern: String,
659 case_insensitive: bool,
661}
662
663impl FilenamePattern {
664 #[must_use]
666 pub fn new(pattern: impl Into<String>) -> Self {
667 Self {
668 pattern: pattern.into(),
669 case_insensitive: true,
670 }
671 }
672
673 #[must_use]
675 pub fn case_insensitive(mut self, ci: bool) -> Self {
676 self.case_insensitive = ci;
677 self
678 }
679
680 #[must_use]
684 pub fn matches(&self, filename: &str) -> bool {
685 let (pat, name) = if self.case_insensitive {
686 (self.pattern.to_lowercase(), filename.to_lowercase())
687 } else {
688 (self.pattern.clone(), filename.to_string())
689 };
690 Self::glob_match(&pat, &name)
691 }
692
693 fn glob_match(pattern: &str, text: &str) -> bool {
695 let pat_chars: Vec<char> = pattern.chars().collect();
696 let txt_chars: Vec<char> = text.chars().collect();
697 let (plen, tlen) = (pat_chars.len(), txt_chars.len());
698
699 let mut dp = vec![vec![false; tlen + 1]; plen + 1];
701 dp[0][0] = true;
702
703 for (i, &pc) in pat_chars.iter().enumerate() {
705 if pc == '*' {
706 dp[i + 1][0] = dp[i][0];
707 } else {
708 break;
709 }
710 }
711
712 for i in 1..=plen {
713 for j in 1..=tlen {
714 if pat_chars[i - 1] == '*' {
715 dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
716 } else if pat_chars[i - 1] == '?' || pat_chars[i - 1] == txt_chars[j - 1] {
717 dp[i][j] = dp[i - 1][j - 1];
718 }
719 }
720 }
721
722 dp[plen][tlen]
723 }
724
725 #[must_use]
727 pub fn pattern(&self) -> &str {
728 &self.pattern
729 }
730}
731
732#[derive(Debug, Clone)]
736pub struct RetryConfig {
737 pub max_retries: u32,
739 pub initial_delay_ms: u64,
741 pub backoff_multiplier: f64,
743 pub max_delay_ms: u64,
745}
746
747impl Default for RetryConfig {
748 fn default() -> Self {
749 Self {
750 max_retries: 3,
751 initial_delay_ms: 1_000,
752 backoff_multiplier: 2.0,
753 max_delay_ms: 30_000,
754 }
755 }
756}
757
758impl RetryConfig {
759 #[must_use]
761 pub fn new() -> Self {
762 Self::default()
763 }
764
765 #[must_use]
767 pub fn max_retries(mut self, n: u32) -> Self {
768 self.max_retries = n;
769 self
770 }
771
772 #[must_use]
774 pub fn initial_delay_ms(mut self, ms: u64) -> Self {
775 self.initial_delay_ms = ms;
776 self
777 }
778
779 #[must_use]
781 pub fn backoff_multiplier(mut self, m: f64) -> Self {
782 self.backoff_multiplier = m;
783 self
784 }
785
786 #[must_use]
788 pub fn max_delay_ms(mut self, ms: u64) -> Self {
789 self.max_delay_ms = ms;
790 self
791 }
792
793 #[must_use]
795 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
796 if attempt == 0 {
797 return Duration::from_millis(self.initial_delay_ms);
798 }
799 let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
800 let clamped = delay.min(self.max_delay_ms as f64) as u64;
801 Duration::from_millis(clamped)
802 }
803}
804
805#[derive(Debug, Clone)]
807pub struct RetryTracker {
808 pub path: PathBuf,
810 pub attempts: u32,
812 pub last_error: Option<String>,
814}
815
816impl RetryTracker {
817 #[must_use]
819 pub fn new(path: PathBuf) -> Self {
820 Self {
821 path,
822 attempts: 0,
823 last_error: None,
824 }
825 }
826
827 pub fn record_failure(&mut self, error: &str) {
829 self.attempts += 1;
830 self.last_error = Some(error.to_string());
831 }
832
833 #[must_use]
835 pub fn can_retry(&self, config: &RetryConfig) -> bool {
836 self.attempts < config.max_retries
837 }
838
839 #[must_use]
841 pub fn next_delay(&self, config: &RetryConfig) -> Duration {
842 config.delay_for_attempt(self.attempts)
843 }
844}
845
846#[derive(Debug, Clone, Default)]
850pub struct WatchFolderStats {
851 pub processed_count: u64,
853 pub error_count: u64,
855 pub total_processing_time_ms: u64,
857 pub total_bytes_processed: u64,
859 pub min_processing_time_ms: Option<u64>,
861 pub max_processing_time_ms: Option<u64>,
863}
864
865impl WatchFolderStats {
866 #[must_use]
868 pub fn new() -> Self {
869 Self::default()
870 }
871
872 pub fn record_success(&mut self, processing_time_ms: u64, file_size_bytes: u64) {
874 self.processed_count += 1;
875 self.total_processing_time_ms += processing_time_ms;
876 self.total_bytes_processed += file_size_bytes;
877
878 self.min_processing_time_ms = Some(
879 self.min_processing_time_ms
880 .map_or(processing_time_ms, |m| m.min(processing_time_ms)),
881 );
882 self.max_processing_time_ms = Some(
883 self.max_processing_time_ms
884 .map_or(processing_time_ms, |m| m.max(processing_time_ms)),
885 );
886 }
887
888 pub fn record_error(&mut self) {
890 self.error_count += 1;
891 }
892
893 #[must_use]
895 pub fn avg_processing_time_ms(&self) -> Option<u64> {
896 if self.processed_count == 0 {
897 return None;
898 }
899 Some(self.total_processing_time_ms / self.processed_count)
900 }
901
902 #[must_use]
904 pub fn success_rate(&self) -> f64 {
905 let total = self.processed_count + self.error_count;
906 if total == 0 {
907 return 1.0;
908 }
909 self.processed_count as f64 / total as f64
910 }
911
912 #[must_use]
914 pub fn avg_throughput_bps(&self) -> Option<f64> {
915 if self.total_processing_time_ms == 0 || self.total_bytes_processed == 0 {
916 return None;
917 }
918 let secs = self.total_processing_time_ms as f64 / 1000.0;
919 Some(self.total_bytes_processed as f64 / secs)
920 }
921}
922
923#[cfg(test)]
926mod tests {
927 use super::*;
928 use std::env::temp_dir;
929 use std::fs;
930
931 fn make_temp_dir(suffix: &str) -> PathBuf {
932 let dir = temp_dir().join(format!("oximedia_watch_test_{suffix}"));
933 fs::create_dir_all(&dir).expect("create temp dir");
934 dir
935 }
936
937 fn touch(dir: &Path, name: &str) -> PathBuf {
938 let path = dir.join(name);
939 fs::write(&path, b"fake media").expect("create temp file");
940 path
941 }
942
943 #[test]
944 fn test_watch_config_new() {
945 let cfg =
946 WatchConfig::new(std::env::temp_dir().join("oximedia-transcode-watch-folder-watch"));
947 assert!(!cfg.accepted_extensions.is_empty());
948 assert_eq!(cfg.max_concurrent, 2);
949 assert_eq!(cfg.poll_interval_ms, 5_000);
950 }
951
952 #[test]
953 fn test_watch_config_validate_missing_dir() {
954 let cfg = WatchConfig::new("/nonexistent/path/for/oximedia_test");
955 assert!(cfg.validate().is_err());
956 }
957
958 #[test]
959 fn test_watch_config_validate_ok() {
960 let dir = make_temp_dir("cfg_ok");
961 let cfg = WatchConfig::new(&dir);
962 assert!(cfg.validate().is_ok());
963 fs::remove_dir_all(&dir).ok();
964 }
965
966 #[test]
967 fn test_scan_detects_new_files() {
968 let dir = make_temp_dir("scan");
969 touch(&dir, "video.mp4");
970 touch(&dir, "clip.mkv");
971 touch(&dir, "readme.txt"); let cfg = WatchConfig::new(&dir);
974 let mut watcher = TranscodeWatcher::new(cfg);
975 let count = watcher.scan().expect("scan ok");
976 assert_eq!(count, 2);
977 assert_eq!(watcher.queue_len(), 2);
978
979 let count2 = watcher.scan().expect("scan ok");
981 assert_eq!(count2, 0);
982
983 fs::remove_dir_all(&dir).ok();
984 }
985
986 #[test]
987 fn test_drain_pending_creates_configs() {
988 let dir = make_temp_dir("drain");
989 touch(&dir, "a.mp4");
990
991 let cfg = WatchConfig::new(&dir);
992 let mut watcher = TranscodeWatcher::new(cfg);
993 watcher.scan().expect("scan ok");
994
995 let drained = watcher.drain_pending();
996 assert_eq!(drained.len(), 1);
997 let (entry, job) = &drained[0];
998 assert!(entry.source.ends_with("a.mp4"));
999 assert!(job.input.is_some());
1000 assert!(job.output.is_some());
1001
1002 assert_eq!(watcher.pending().len(), 0);
1004
1005 fs::remove_dir_all(&dir).ok();
1006 }
1007
1008 #[test]
1009 fn test_mark_done_updates_status() {
1010 let dir = make_temp_dir("mark_done");
1011 let file = touch(&dir, "b.mp4");
1012
1013 let cfg = WatchConfig::new(&dir).on_success(PostProcessAction::Leave);
1014 let mut watcher = TranscodeWatcher::new(cfg);
1015 watcher.scan().expect("scan ok");
1016 watcher.drain_pending();
1017
1018 watcher.mark_done(&file).expect("mark done ok");
1019
1020 let counts = watcher.status_counts();
1021 assert_eq!(counts.done, 1);
1022 assert_eq!(counts.failed, 0);
1023
1024 fs::remove_dir_all(&dir).ok();
1025 }
1026
1027 #[test]
1028 fn test_mark_failed_updates_status() {
1029 let dir = make_temp_dir("mark_failed");
1030 let file = touch(&dir, "c.mp4");
1031
1032 let cfg = WatchConfig::new(&dir).on_failure(PostProcessAction::Leave);
1033 let mut watcher = TranscodeWatcher::new(cfg);
1034 watcher.scan().expect("scan ok");
1035 watcher.drain_pending();
1036
1037 watcher
1038 .mark_failed(&file, "codec not found")
1039 .expect("mark failed ok");
1040
1041 let counts = watcher.status_counts();
1042 assert_eq!(counts.failed, 1);
1043
1044 fs::remove_dir_all(&dir).ok();
1045 }
1046
1047 #[test]
1048 fn test_status_counts() {
1049 let dir = make_temp_dir("counts");
1050 touch(&dir, "x.mp4");
1051 touch(&dir, "y.mkv");
1052
1053 let cfg = WatchConfig::new(&dir);
1054 let mut watcher = TranscodeWatcher::new(cfg);
1055 watcher.scan().expect("scan ok");
1056
1057 let counts = watcher.status_counts();
1058 assert_eq!(counts.pending, 2);
1059 assert_eq!(counts.processing, 0);
1060
1061 fs::remove_dir_all(&dir).ok();
1062 }
1063
1064 #[test]
1065 fn test_poll_interval() {
1066 let cfg = WatchConfig::new("/tmp").poll_interval_ms(2000);
1067 let watcher = TranscodeWatcher::new(cfg);
1068 assert_eq!(watcher.poll_interval(), Duration::from_secs(2));
1069 }
1070
1071 #[test]
1072 fn test_output_location_sibling() {
1073 let dir = make_temp_dir("sibling");
1074 touch(&dir, "d.mp4");
1075
1076 let cfg = WatchConfig::new(&dir)
1077 .output_location(OutputLocation::SiblingWithExtension("mkv".into()));
1078 let mut watcher = TranscodeWatcher::new(cfg);
1079 watcher.scan().expect("scan ok");
1080
1081 let entry = &watcher.queue[0];
1082 assert!(entry
1083 .output
1084 .extension()
1085 .map(|e| e == "mkv")
1086 .unwrap_or(false));
1087
1088 fs::remove_dir_all(&dir).ok();
1089 }
1090
1091 #[test]
1094 fn test_stability_config_defaults() {
1095 let cfg = FileStabilityConfig::default();
1096 assert_eq!(cfg.required_stable_checks, 3);
1097 assert_eq!(cfg.check_interval_ms, 2000);
1098 assert_eq!(cfg.min_file_size, 1024);
1099 }
1100
1101 #[test]
1102 fn test_stability_config_builder() {
1103 let cfg = FileStabilityConfig::new()
1104 .required_checks(5)
1105 .check_interval_ms(1000)
1106 .min_file_size(4096);
1107 assert_eq!(cfg.required_stable_checks, 5);
1108 assert_eq!(cfg.check_interval_ms, 1000);
1109 assert_eq!(cfg.min_file_size, 4096);
1110 }
1111
1112 #[test]
1113 fn test_stability_tracker_stable_file() {
1114 let dir = make_temp_dir("stability");
1115 let path = dir.join("stable.mp4");
1116 fs::write(&path, vec![0u8; 2048]).expect("write ok");
1118
1119 let cfg = FileStabilityConfig::new().required_checks(2);
1120 let mut tracker = FileStabilityTracker::new(path);
1121
1122 assert!(!tracker.check(&cfg));
1124 assert!(!tracker.check(&cfg));
1126 assert!(tracker.check(&cfg));
1128 assert!(tracker.is_stable());
1129 assert_eq!(tracker.last_size(), 2048);
1130
1131 fs::remove_dir_all(&dir).ok();
1132 }
1133
1134 #[test]
1135 fn test_stability_tracker_growing_file() {
1136 let dir = make_temp_dir("growing");
1137 let path = dir.join("growing.mp4");
1138 fs::write(&path, vec![0u8; 2048]).expect("write ok");
1139
1140 let cfg = FileStabilityConfig::new().required_checks(2);
1141 let mut tracker = FileStabilityTracker::new(path.clone());
1142
1143 tracker.check(&cfg); tracker.check(&cfg); fs::write(&path, vec![0u8; 4096]).expect("grow ok");
1148 assert!(!tracker.check(&cfg)); fs::remove_dir_all(&dir).ok();
1151 }
1152
1153 #[test]
1154 fn test_stability_tracker_too_small() {
1155 let dir = make_temp_dir("small");
1156 let path = dir.join("tiny.mp4");
1157 fs::write(&path, b"x").expect("write ok");
1158
1159 let cfg = FileStabilityConfig::new().min_file_size(1024);
1160 let mut tracker = FileStabilityTracker::new(path);
1161
1162 for _ in 0..10 {
1163 assert!(!tracker.check(&cfg));
1164 }
1165
1166 fs::remove_dir_all(&dir).ok();
1167 }
1168
1169 #[test]
1172 fn test_hot_folder_chain_empty() {
1173 let chain = HotFolderChain::new();
1174 assert_eq!(chain.stage_count(), 0);
1175 assert!(chain.validate().is_err());
1176 }
1177
1178 #[test]
1179 fn test_hot_folder_chain_single_stage() {
1180 let dir = make_temp_dir("chain1");
1181 let mut chain = HotFolderChain::new();
1182 chain.add_stage(WatchConfig::new(&dir));
1183 assert_eq!(chain.stage_count(), 1);
1184 assert!(chain.validate().is_ok());
1185 fs::remove_dir_all(&dir).ok();
1186 }
1187
1188 #[test]
1189 fn test_hot_folder_chain_two_stages_aligned() {
1190 let dir1 = make_temp_dir("chain2a");
1191 let dir2 = dir1.join("done");
1192 fs::create_dir_all(&dir2).expect("create done dir");
1193
1194 let mut chain = HotFolderChain::new();
1195 chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert_eq!(chain.stage_count(), 2);
1198 assert!(chain.validate().is_ok());
1199
1200 fs::remove_dir_all(&dir1).ok();
1201 }
1202
1203 #[test]
1204 fn test_hot_folder_chain_misaligned() {
1205 let dir1 = make_temp_dir("chain3a");
1206 let dir2 = make_temp_dir("chain3b");
1207
1208 let mut chain = HotFolderChain::new();
1209 chain.add_stage(WatchConfig::new(&dir1)); chain.add_stage(WatchConfig::new(&dir2)); assert!(chain.validate().is_err());
1212
1213 fs::remove_dir_all(&dir1).ok();
1214 fs::remove_dir_all(&dir2).ok();
1215 }
1216
1217 #[test]
1220 fn test_filename_pattern_exact() {
1221 let p = FilenamePattern::new("video.mp4");
1222 assert!(p.matches("video.mp4"));
1223 assert!(p.matches("VIDEO.MP4")); assert!(!p.matches("audio.mp4"));
1225 }
1226
1227 #[test]
1228 fn test_filename_pattern_wildcard() {
1229 let p = FilenamePattern::new("*.mp4");
1230 assert!(p.matches("video.mp4"));
1231 assert!(p.matches("CLIP.MP4"));
1232 assert!(!p.matches("video.mkv"));
1233 }
1234
1235 #[test]
1236 fn test_filename_pattern_wildcard_prefix() {
1237 let p = FilenamePattern::new("raw_*");
1238 assert!(p.matches("raw_clip.mp4"));
1239 assert!(p.matches("raw_"));
1240 assert!(!p.matches("clip_raw.mp4"));
1241 }
1242
1243 #[test]
1244 fn test_filename_pattern_multiple_wildcards() {
1245 let p = FilenamePattern::new("*_final_*");
1246 assert!(p.matches("clip_final_v2.mp4"));
1247 assert!(!p.matches("clip_draft_v2.mp4"));
1248 }
1249
1250 #[test]
1251 fn test_filename_pattern_case_sensitive() {
1252 let p = FilenamePattern::new("Video.mp4").case_insensitive(false);
1253 assert!(p.matches("Video.mp4"));
1254 assert!(!p.matches("video.mp4"));
1255 }
1256
1257 #[test]
1260 fn test_retry_config_defaults() {
1261 let cfg = RetryConfig::default();
1262 assert_eq!(cfg.max_retries, 3);
1263 assert_eq!(cfg.initial_delay_ms, 1000);
1264 assert!((cfg.backoff_multiplier - 2.0).abs() < 1e-6);
1265 }
1266
1267 #[test]
1268 fn test_retry_delay_exponential() {
1269 let cfg = RetryConfig::new()
1270 .initial_delay_ms(1000)
1271 .backoff_multiplier(2.0)
1272 .max_delay_ms(10_000);
1273
1274 assert_eq!(cfg.delay_for_attempt(0), Duration::from_secs(1));
1275 assert_eq!(cfg.delay_for_attempt(1), Duration::from_secs(2));
1276 assert_eq!(cfg.delay_for_attempt(2), Duration::from_secs(4));
1277 assert_eq!(cfg.delay_for_attempt(3), Duration::from_secs(8));
1278 assert_eq!(cfg.delay_for_attempt(4), Duration::from_secs(10));
1280 }
1281
1282 #[test]
1283 fn test_retry_tracker() {
1284 let cfg = RetryConfig::new().max_retries(3);
1285 let mut tracker = RetryTracker::new(
1286 std::env::temp_dir().join("oximedia-transcode-watch-folder-test.mp4"),
1287 );
1288
1289 assert!(tracker.can_retry(&cfg));
1290 assert_eq!(tracker.attempts, 0);
1291
1292 tracker.record_failure("codec error");
1293 assert_eq!(tracker.attempts, 1);
1294 assert_eq!(tracker.last_error.as_deref(), Some("codec error"));
1295 assert!(tracker.can_retry(&cfg));
1296
1297 tracker.record_failure("timeout");
1298 tracker.record_failure("timeout");
1299 assert!(!tracker.can_retry(&cfg));
1300 }
1301
1302 #[test]
1305 fn test_stats_empty() {
1306 let stats = WatchFolderStats::new();
1307 assert_eq!(stats.processed_count, 0);
1308 assert_eq!(stats.error_count, 0);
1309 assert!(stats.avg_processing_time_ms().is_none());
1310 assert!((stats.success_rate() - 1.0).abs() < 1e-6);
1311 }
1312
1313 #[test]
1314 fn test_stats_record_success() {
1315 let mut stats = WatchFolderStats::new();
1316 stats.record_success(1000, 10_000_000);
1317 stats.record_success(2000, 20_000_000);
1318
1319 assert_eq!(stats.processed_count, 2);
1320 assert_eq!(stats.total_processing_time_ms, 3000);
1321 assert_eq!(stats.avg_processing_time_ms(), Some(1500));
1322 assert_eq!(stats.min_processing_time_ms, Some(1000));
1323 assert_eq!(stats.max_processing_time_ms, Some(2000));
1324 assert_eq!(stats.total_bytes_processed, 30_000_000);
1325 }
1326
1327 #[test]
1328 fn test_stats_success_rate() {
1329 let mut stats = WatchFolderStats::new();
1330 stats.record_success(100, 1000);
1331 stats.record_success(100, 1000);
1332 stats.record_error();
1333
1334 let rate = stats.success_rate();
1335 assert!((rate - 2.0 / 3.0).abs() < 1e-6);
1336 }
1337
1338 #[test]
1339 fn test_stats_throughput() {
1340 let mut stats = WatchFolderStats::new();
1341 stats.record_success(1000, 1_000_000); let bps = stats.avg_throughput_bps().expect("should have throughput");
1344 assert!((bps - 1_000_000.0).abs() < 1.0);
1345 }
1346}