Skip to main content

subx_cli/core/parallel/
task.rs

1//! Task definition and utilities for parallel processing
2use crate::core::fs_util::{atomic_create_file, validate_write_target};
3use async_trait::async_trait;
4use std::fmt;
5use std::fs::File;
6use std::io;
7use std::path::Path;
8
9/// Returns true if the given I/O error indicates a cross-device link failure,
10/// which is the signal to fall back from `rename` to copy+delete.
11fn is_cross_device_error(err: &io::Error) -> bool {
12    #[cfg(unix)]
13    {
14        // EXDEV == 18 on Linux and most Unixes
15        if err.raw_os_error() == Some(18) {
16            return true;
17        }
18    }
19    // Fallback check: some platforms report cross-device via a message or kind.
20    matches!(err.kind(), io::ErrorKind::Unsupported)
21}
22
23/// Resolve filename conflicts by atomically creating the target.
24///
25/// Returns the resolved path together with the open file handle. Callers
26/// should write through this handle to avoid TOCTOU races between conflict
27/// resolution and file creation.
28fn resolve_filename_conflict(
29    target: std::path::PathBuf,
30) -> Result<(std::path::PathBuf, File), Box<dyn std::error::Error + Send + Sync>> {
31    match atomic_create_file(&target) {
32        Ok(f) => return Ok((target, f)),
33        Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {}
34        Err(e) => return Err(e.into()),
35    }
36
37    let file_stem = target
38        .file_stem()
39        .and_then(|s| s.to_str())
40        .unwrap_or("file");
41    let extension = target.extension().and_then(|s| s.to_str()).unwrap_or("");
42    let parent = target.parent().unwrap_or_else(|| std::path::Path::new("."));
43
44    for i in 1..1000 {
45        let new_name = if extension.is_empty() {
46            format!("{}.{}", file_stem, i)
47        } else {
48            format!("{}.{}.{}", file_stem, i, extension)
49        };
50        let new_path = parent.join(new_name);
51        match atomic_create_file(&new_path) {
52            Ok(f) => return Ok((new_path, f)),
53            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => continue,
54            Err(e) => return Err(e.into()),
55        }
56    }
57
58    Err("Could not resolve filename conflict".into())
59}
60
61/// Trait defining a unit of work that can be executed asynchronously.
62///
63/// All tasks in the parallel processing system must implement this trait
64/// to provide execution logic and metadata.
65#[async_trait]
66pub trait Task: Send + Sync {
67    /// Executes the task and returns the result.
68    async fn execute(&self) -> TaskResult;
69    /// Returns the type identifier for this task.
70    fn task_type(&self) -> &'static str;
71    /// Returns a unique identifier for this specific task instance.
72    fn task_id(&self) -> String;
73    /// Returns an estimated duration for the task execution.
74    fn estimated_duration(&self) -> Option<std::time::Duration> {
75        None
76    }
77    /// Returns a human-readable description of the task.
78    fn description(&self) -> String {
79        format!("{} task", self.task_type())
80    }
81}
82
83/// Result of task execution indicating success, failure, or partial completion.
84///
85/// Provides detailed information about the outcome of a task execution,
86/// including success/failure status and descriptive messages.
87#[derive(Debug, Clone)]
88pub enum TaskResult {
89    /// Task completed successfully with a result message
90    Success(String),
91    /// Task failed with an error message
92    Failed(String),
93    /// Task was cancelled before completion
94    Cancelled,
95    /// Task partially completed with success and failure messages
96    PartialSuccess(String, String),
97}
98
99/// Current execution status of a task in the system.
100///
101/// Tracks the lifecycle of a task from initial queuing through completion
102/// or failure, providing detailed status information.
103#[derive(Debug, Clone)]
104pub enum TaskStatus {
105    /// Task is queued and waiting for execution
106    Pending,
107    /// Task is currently being executed
108    Running,
109    /// Task completed successfully or with partial success
110    Completed(TaskResult),
111    /// Task failed during execution
112    Failed(String),
113    /// Task was cancelled before or during execution
114    Cancelled,
115}
116
117impl fmt::Display for TaskResult {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match self {
120            TaskResult::Success(msg) => write!(f, "✓ {}", msg),
121            TaskResult::Failed(msg) => write!(f, "✗ {}", msg),
122            TaskResult::Cancelled => write!(f, "⚠ Task cancelled"),
123            TaskResult::PartialSuccess(success, warn) => {
124                write!(f, "⚠ {} (warning: {})", success, warn)
125            }
126        }
127    }
128}
129
130impl fmt::Display for TaskStatus {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        match self {
133            TaskStatus::Pending => write!(f, "Pending"),
134            TaskStatus::Running => write!(f, "Running"),
135            TaskStatus::Completed(result) => write!(f, "Completed: {}", result),
136            TaskStatus::Failed(msg) => write!(f, "Failed: {}", msg),
137            TaskStatus::Cancelled => write!(f, "Cancelled"),
138        }
139    }
140}
141
142/// Task for processing files (convert, sync, match, validate).
143///
144/// Represents a file processing operation that can be executed
145/// asynchronously in the parallel processing system.
146pub struct FileProcessingTask {
147    /// Path to the input file to be processed
148    pub input_path: std::path::PathBuf,
149    /// Optional output path for the processed file
150    pub output_path: Option<std::path::PathBuf>,
151    /// The specific operation to perform on the file
152    pub operation: ProcessingOperation,
153}
154
155/// Supported operations for file processing tasks.
156///
157/// Defines the different types of operations that can be performed
158/// on subtitle and video files in the processing system.
159#[derive(Debug, Clone)]
160pub enum ProcessingOperation {
161    /// Convert subtitle format from one type to another
162    ConvertFormat {
163        /// Source format (e.g., "srt", "ass")
164        from: String,
165        /// Target format (e.g., "srt", "ass")
166        to: String,
167    },
168    /// Synchronize subtitle timing with audio
169    SyncSubtitle {
170        /// Path to the audio file for synchronization
171        audio_path: std::path::PathBuf,
172    },
173    /// Match subtitle files with video files
174    MatchFiles {
175        /// Whether to search recursively in subdirectories
176        recursive: bool,
177    },
178    /// Validate subtitle file format and structure
179    ValidateFormat,
180    /// Copy subtitle file to video folder
181    CopyToVideoFolder {
182        /// Path to the source subtitle file to be copied
183        source: std::path::PathBuf,
184        /// Path to the target video folder where the subtitle will be copied
185        target: std::path::PathBuf,
186    },
187    /// Move subtitle file to video folder
188    MoveToVideoFolder {
189        /// Path to the source subtitle file to be moved
190        source: std::path::PathBuf,
191        /// Path to the target video folder where the subtitle will be moved
192        target: std::path::PathBuf,
193    },
194    /// Copy a file with a new name (local copy)
195    CopyWithRename {
196        /// Source file path
197        source: std::path::PathBuf,
198        /// Target file path
199        target: std::path::PathBuf,
200    },
201    /// Create a backup of a file
202    CreateBackup {
203        /// Original file path
204        source: std::path::PathBuf,
205        /// Backup file path
206        backup: std::path::PathBuf,
207    },
208    /// Rename (move) a file
209    RenameFile {
210        /// Original file path
211        source: std::path::PathBuf,
212        /// New file path after rename
213        target: std::path::PathBuf,
214    },
215}
216
217#[async_trait]
218impl Task for FileProcessingTask {
219    async fn execute(&self) -> TaskResult {
220        match &self.operation {
221            ProcessingOperation::ConvertFormat { from, to } => {
222                match self.convert_format(from, to).await {
223                    Ok(path) => TaskResult::Success(format!(
224                        "Successfully converted {} -> {}: {}",
225                        from,
226                        to,
227                        path.display()
228                    )),
229                    Err(e) => TaskResult::Failed(format!(
230                        "Conversion failed {}: {}",
231                        self.input_path.display(),
232                        e
233                    )),
234                }
235            }
236            ProcessingOperation::SyncSubtitle { .. } => {
237                // Sync not supported in parallel tasks
238                TaskResult::Failed("Sync functionality not implemented".to_string())
239            }
240            ProcessingOperation::MatchFiles { recursive } => {
241                match self.match_files(*recursive).await {
242                    Ok(m) => TaskResult::Success(format!(
243                        "File matching completed: found {} matches",
244                        m.len()
245                    )),
246                    Err(e) => TaskResult::Failed(format!("Matching failed: {}", e)),
247                }
248            }
249            ProcessingOperation::ValidateFormat => match self.validate_format().await {
250                Ok(true) => TaskResult::Success(format!(
251                    "Format validation passed: {}",
252                    self.input_path.display()
253                )),
254                Ok(false) => TaskResult::Failed(format!(
255                    "Format validation failed: {}",
256                    self.input_path.display()
257                )),
258                Err(e) => TaskResult::Failed(format!("Validation error: {}", e)),
259            },
260            ProcessingOperation::CopyToVideoFolder { source, target } => {
261                match self.execute_copy_operation(source, target).await {
262                    Ok(_) => TaskResult::Success(format!(
263                        "Copied: {} -> {}",
264                        source.display(),
265                        target.display()
266                    )),
267                    Err(e) => TaskResult::Failed(format!("Copy failed: {}", e)),
268                }
269            }
270            ProcessingOperation::MoveToVideoFolder { source, target } => {
271                match self.execute_move_operation(source, target).await {
272                    Ok(_) => TaskResult::Success(format!(
273                        "Moved: {} -> {}",
274                        source.display(),
275                        target.display()
276                    )),
277                    Err(e) => TaskResult::Failed(format!("Move failed: {}", e)),
278                }
279            }
280            ProcessingOperation::CopyWithRename { source, target } => {
281                match self
282                    .execute_copy_with_rename_operation(source, target)
283                    .await
284                {
285                    Ok(_) => TaskResult::Success(format!(
286                        "Copied: {} -> {}",
287                        source.display(),
288                        target.display()
289                    )),
290                    Err(e) => TaskResult::Failed(format!("Copy failed: {}", e)),
291                }
292            }
293            ProcessingOperation::CreateBackup { source, backup } => {
294                match self.execute_create_backup_operation(source, backup).await {
295                    Ok(_) => TaskResult::Success(format!(
296                        "Backup created: {} -> {}",
297                        source.display(),
298                        backup.display()
299                    )),
300                    Err(e) => TaskResult::Failed(format!("Backup failed: {}", e)),
301                }
302            }
303            ProcessingOperation::RenameFile { source, target } => {
304                match self.execute_rename_file_operation(source, target).await {
305                    Ok(_) => TaskResult::Success(format!(
306                        "Renamed: {} -> {}",
307                        source.display(),
308                        target.display()
309                    )),
310                    Err(e) => TaskResult::Failed(format!("Rename failed: {}", e)),
311                }
312            }
313        }
314    }
315
316    fn task_type(&self) -> &'static str {
317        match &self.operation {
318            ProcessingOperation::ConvertFormat { .. } => "convert",
319            ProcessingOperation::SyncSubtitle { .. } => "sync",
320            ProcessingOperation::MatchFiles { .. } => "match",
321            ProcessingOperation::ValidateFormat => "validate",
322            ProcessingOperation::CopyToVideoFolder { .. } => "copy_to_video_folder",
323            ProcessingOperation::MoveToVideoFolder { .. } => "move_to_video_folder",
324            ProcessingOperation::CopyWithRename { .. } => "copy_with_rename",
325            ProcessingOperation::CreateBackup { .. } => "create_backup",
326            ProcessingOperation::RenameFile { .. } => "rename_file",
327        }
328    }
329
330    fn task_id(&self) -> String {
331        use std::collections::hash_map::DefaultHasher;
332        use std::hash::{Hash, Hasher};
333        let mut hasher = DefaultHasher::new();
334        self.input_path.hash(&mut hasher);
335        self.operation.hash(&mut hasher);
336        format!("{}_{:x}", self.task_type(), hasher.finish())
337    }
338
339    fn estimated_duration(&self) -> Option<std::time::Duration> {
340        if let Ok(meta) = std::fs::metadata(&self.input_path) {
341            let size_mb = meta.len() as f64 / 1_048_576.0;
342            let secs = match &self.operation {
343                ProcessingOperation::ConvertFormat { .. } => size_mb * 0.1,
344                ProcessingOperation::SyncSubtitle { .. } => size_mb * 0.5,
345                ProcessingOperation::MatchFiles { .. } => 2.0,
346                ProcessingOperation::ValidateFormat => size_mb * 0.05,
347                ProcessingOperation::CopyToVideoFolder { .. } => size_mb * 0.01, // Fast copy
348                ProcessingOperation::MoveToVideoFolder { .. } => size_mb * 0.005, // Even faster move
349                ProcessingOperation::CopyWithRename { .. } => size_mb * 0.01,
350                ProcessingOperation::CreateBackup { .. } => size_mb * 0.01,
351                ProcessingOperation::RenameFile { .. } => size_mb * 0.005,
352            };
353            Some(std::time::Duration::from_secs_f64(secs))
354        } else {
355            None
356        }
357    }
358
359    fn description(&self) -> String {
360        match &self.operation {
361            ProcessingOperation::ConvertFormat { from, to } => {
362                format!(
363                    "Convert {} from {} to {}",
364                    self.input_path.display(),
365                    from,
366                    to
367                )
368            }
369            ProcessingOperation::SyncSubtitle { audio_path } => format!(
370                "Sync subtitle {} with audio {}",
371                self.input_path.display(),
372                audio_path.display()
373            ),
374            ProcessingOperation::MatchFiles { recursive } => format!(
375                "Match files in {}{}",
376                self.input_path.display(),
377                if *recursive { " (recursive)" } else { "" }
378            ),
379            ProcessingOperation::ValidateFormat => {
380                format!("Validate format of {}", self.input_path.display())
381            }
382            ProcessingOperation::CopyToVideoFolder { source, target } => {
383                format!("Copy {} to {}", source.display(), target.display())
384            }
385            ProcessingOperation::MoveToVideoFolder { source, target } => {
386                format!("Move {} to {}", source.display(), target.display())
387            }
388            ProcessingOperation::CopyWithRename { source, target } => {
389                format!(
390                    "CopyWithRename {} to {}",
391                    source.display(),
392                    target.display()
393                )
394            }
395            ProcessingOperation::CreateBackup { source, backup } => {
396                format!("CreateBackup {} to {}", source.display(), backup.display())
397            }
398            ProcessingOperation::RenameFile { source, target } => {
399                format!("Rename {} to {}", source.display(), target.display())
400            }
401        }
402    }
403}
404
405impl FileProcessingTask {
406    /// Create a new file processing task with operation
407    pub fn new(
408        input_path: std::path::PathBuf,
409        output_path: Option<std::path::PathBuf>,
410        operation: ProcessingOperation,
411    ) -> Self {
412        FileProcessingTask {
413            input_path,
414            output_path,
415            operation,
416        }
417    }
418
419    /// Execute copy operation for file relocation
420    async fn execute_copy_operation(
421        &self,
422        source: &Path,
423        target: &Path,
424    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
425        let source = source.to_path_buf();
426        let target = target.to_path_buf();
427        tokio::task::spawn_blocking(
428            move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
429                // Create target directory if it doesn't exist
430                if let Some(parent) = target.parent() {
431                    std::fs::create_dir_all(parent)?;
432                }
433
434                // Atomically resolve filename conflicts and obtain an open file handle.
435                let (final_target, mut file) = resolve_filename_conflict(target)?;
436
437                if let Some(parent) = final_target.parent() {
438                    validate_write_target(&final_target, parent)?;
439                }
440
441                // Stream the source file contents through the exclusive handle.
442                let mut src = std::fs::File::open(&source)?;
443                std::io::copy(&mut src, &mut file)?;
444                Ok(())
445            },
446        )
447        .await?
448    }
449
450    /// Execute move operation for file relocation
451    async fn execute_move_operation(
452        &self,
453        source: &Path,
454        target: &Path,
455    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
456        let source = source.to_path_buf();
457        let target = target.to_path_buf();
458        tokio::task::spawn_blocking(
459            move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
460                // Create target directory if it doesn't exist
461                if let Some(parent) = target.parent() {
462                    std::fs::create_dir_all(parent)?;
463                }
464
465                // Try a fast in-filesystem rename first; fall back to copy+delete otherwise.
466                if !target.exists() {
467                    match std::fs::rename(&source, &target) {
468                        Ok(_) => return Ok(()),
469                        Err(e) if is_cross_device_error(&e) => {}
470                        Err(_) => { /* fall through to copy+delete path */ }
471                    }
472                }
473
474                let (final_target, mut file) = resolve_filename_conflict(target)?;
475
476                if let Some(parent) = final_target.parent() {
477                    validate_write_target(&final_target, parent)?;
478                }
479
480                let mut src = std::fs::File::open(&source)?;
481                std::io::copy(&mut src, &mut file)?;
482                file.sync_all()?;
483                drop(file);
484                std::fs::remove_file(&source)?;
485                Ok(())
486            },
487        )
488        .await?
489    }
490
491    /// Execute a copy with rename operation (local copy) using CIFS-safe copy
492    async fn execute_copy_with_rename_operation(
493        &self,
494        source: &Path,
495        target: &Path,
496    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
497        let source = source.to_path_buf();
498        let target = target.to_path_buf();
499        tokio::task::spawn_blocking(
500            move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
501                if let Some(parent) = target.parent() {
502                    std::fs::create_dir_all(parent)?;
503                }
504                crate::core::fs_util::copy_file_cifs_safe(&source, &target)?;
505                Ok(())
506            },
507        )
508        .await?
509    }
510
511    /// Execute a create backup operation using an atomically created destination.
512    async fn execute_create_backup_operation(
513        &self,
514        source: &Path,
515        backup: &Path,
516    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
517        let source = source.to_path_buf();
518        let backup = backup.to_path_buf();
519        tokio::task::spawn_blocking(
520            move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
521                if let Some(parent) = backup.parent() {
522                    std::fs::create_dir_all(parent)?;
523                }
524
525                let (final_target, mut file) = resolve_filename_conflict(backup)?;
526
527                if let Some(parent) = final_target.parent() {
528                    validate_write_target(&final_target, parent)?;
529                }
530
531                let mut src = std::fs::File::open(&source)?;
532                std::io::copy(&mut src, &mut file)?;
533                Ok(())
534            },
535        )
536        .await?
537    }
538
539    /// Execute a file rename operation
540    async fn execute_rename_file_operation(
541        &self,
542        source: &Path,
543        target: &Path,
544    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
545        let source = source.to_path_buf();
546        let target = target.to_path_buf();
547        tokio::task::spawn_blocking(
548            move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
549                if let Some(parent) = target.parent() {
550                    std::fs::create_dir_all(parent)?;
551                }
552
553                if !target.exists() {
554                    match std::fs::rename(&source, &target) {
555                        Ok(_) => return Ok(()),
556                        Err(e) if is_cross_device_error(&e) => {}
557                        Err(_) => { /* fall through to copy+delete path */ }
558                    }
559                }
560
561                let (final_target, mut file) = resolve_filename_conflict(target)?;
562
563                if let Some(parent) = final_target.parent() {
564                    validate_write_target(&final_target, parent)?;
565                }
566
567                let mut src = std::fs::File::open(&source)?;
568                std::io::copy(&mut src, &mut file)?;
569                file.sync_all()?;
570                drop(file);
571                std::fs::remove_file(&source)?;
572                Ok(())
573            },
574        )
575        .await?
576    }
577
578    async fn convert_format(&self, _from: &str, _to: &str) -> crate::Result<std::path::PathBuf> {
579        // Stub convert: simply return input path
580        Ok(self.input_path.clone())
581    }
582
583    async fn sync_subtitle(
584        &self,
585        _audio_path: &std::path::Path,
586    ) -> crate::Result<crate::core::sync::SyncResult> {
587        // Stub implementation: sync not available
588        Err(crate::error::SubXError::parallel_processing(
589            "sync_subtitle not implemented".to_string(),
590        ))
591    }
592
593    async fn match_files(&self, _recursive: bool) -> crate::Result<Vec<()>> {
594        // Stub implementation: no actual matching
595        Ok(Vec::new())
596    }
597
598    async fn validate_format(&self) -> crate::Result<bool> {
599        // Stub validate: always succeed
600        Ok(true)
601    }
602}
603
604// impl Hash for ProcessingOperation to support task_id generation
605impl std::hash::Hash for ProcessingOperation {
606    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
607        match self {
608            ProcessingOperation::ConvertFormat { from, to } => {
609                "convert".hash(state);
610                from.hash(state);
611                to.hash(state);
612            }
613            ProcessingOperation::SyncSubtitle { audio_path } => {
614                "sync".hash(state);
615                audio_path.hash(state);
616            }
617            ProcessingOperation::MatchFiles { recursive } => {
618                "match".hash(state);
619                recursive.hash(state);
620            }
621            ProcessingOperation::ValidateFormat => {
622                "validate".hash(state);
623            }
624            ProcessingOperation::CopyToVideoFolder { source, target } => {
625                "copy_to_video_folder".hash(state);
626                source.hash(state);
627                target.hash(state);
628            }
629            ProcessingOperation::MoveToVideoFolder { source, target } => {
630                "move_to_video_folder".hash(state);
631                source.hash(state);
632                target.hash(state);
633            }
634            ProcessingOperation::CopyWithRename { source, target } => {
635                "copy_with_rename".hash(state);
636                source.hash(state);
637                target.hash(state);
638            }
639            ProcessingOperation::CreateBackup { source, backup } => {
640                "create_backup".hash(state);
641                source.hash(state);
642                backup.hash(state);
643            }
644            ProcessingOperation::RenameFile { source, target } => {
645                "rename_file".hash(state);
646                source.hash(state);
647                target.hash(state);
648            }
649        }
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use std::time::Duration;
657    use tempfile::TempDir;
658
659    #[tokio::test]
660    async fn test_file_processing_task_validate_format() {
661        let tmp = TempDir::new().unwrap();
662        let test_file = tmp.path().join("test.srt");
663        tokio::fs::write(&test_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
664            .await
665            .unwrap();
666        let task = FileProcessingTask {
667            input_path: test_file.clone(),
668            output_path: None,
669            operation: ProcessingOperation::ValidateFormat,
670        };
671        let result = task.execute().await;
672        assert!(matches!(result, TaskResult::Success(_)));
673    }
674
675    #[tokio::test]
676    async fn test_file_processing_task_copy_with_rename() {
677        let tmp = TempDir::new().unwrap();
678        let src = tmp.path().join("orig.txt");
679        let dst = tmp.path().join("copy.txt");
680        tokio::fs::write(&src, b"hello").await.unwrap();
681        let task = FileProcessingTask::new(
682            src.clone(),
683            Some(dst.clone()),
684            ProcessingOperation::CopyWithRename {
685                source: src.clone(),
686                target: dst.clone(),
687            },
688        );
689        let result = task.execute().await;
690        assert!(matches!(result, TaskResult::Success(_)));
691        let data = tokio::fs::read(&dst).await.unwrap();
692        assert_eq!(data, b"hello");
693    }
694
695    #[tokio::test]
696    async fn test_file_processing_task_create_backup() {
697        let tmp = TempDir::new().unwrap();
698        let src = tmp.path().join("orig.txt");
699        let backup = tmp.path().join("orig.txt.bak");
700        tokio::fs::write(&src, b"backup").await.unwrap();
701        let task = FileProcessingTask::new(
702            src.clone(),
703            Some(backup.clone()),
704            ProcessingOperation::CreateBackup {
705                source: src.clone(),
706                backup: backup.clone(),
707            },
708        );
709        let result = task.execute().await;
710        assert!(matches!(result, TaskResult::Success(_)));
711        let data = tokio::fs::read(&backup).await.unwrap();
712        assert_eq!(data, b"backup");
713    }
714
715    #[tokio::test]
716    async fn test_file_processing_task_rename_file() {
717        let tmp = TempDir::new().unwrap();
718        let src = tmp.path().join("a.txt");
719        let dst = tmp.path().join("b.txt");
720        tokio::fs::write(&src, b"rename").await.unwrap();
721        let task = FileProcessingTask::new(
722            src.clone(),
723            Some(dst.clone()),
724            ProcessingOperation::RenameFile {
725                source: src.clone(),
726                target: dst.clone(),
727            },
728        );
729        let result = task.execute().await;
730        assert!(matches!(result, TaskResult::Success(_)));
731        assert!(tokio::fs::metadata(&src).await.is_err());
732        let data = tokio::fs::read(&dst).await.unwrap();
733        assert_eq!(data, b"rename");
734    }
735
736    /// Test task lifecycle and status transitions
737    #[tokio::test]
738    async fn test_task_lifecycle() {
739        let tmp = TempDir::new().unwrap();
740        let test_file = tmp.path().join("lifecycle.srt");
741        tokio::fs::write(
742            &test_file,
743            "1\n00:00:01,000 --> 00:00:02,000\nLifecycle test\n",
744        )
745        .await
746        .unwrap();
747
748        let task = FileProcessingTask {
749            input_path: test_file.clone(),
750            output_path: None,
751            operation: ProcessingOperation::ValidateFormat,
752        };
753
754        // Test initial task properties
755        assert_eq!(task.task_type(), "validate");
756        assert!(!task.task_id().is_empty());
757        assert!(task.description().contains("Validate format"));
758        assert!(task.description().contains("lifecycle.srt"));
759        assert!(
760            task.estimated_duration().is_some(),
761            "Should estimate duration for existing file"
762        );
763
764        // Test execution
765        let result = task.execute().await;
766        assert!(matches!(result, TaskResult::Success(_)));
767    }
768
769    /// Test task result serialization and display
770    #[test]
771    fn test_task_result_display() {
772        let success = TaskResult::Success("Operation completed".to_string());
773        let failed = TaskResult::Failed("Operation failed".to_string());
774        let cancelled = TaskResult::Cancelled;
775        let partial =
776            TaskResult::PartialSuccess("Mostly worked".to_string(), "Minor issue".to_string());
777
778        assert_eq!(format!("{}", success), "✓ Operation completed");
779        assert_eq!(format!("{}", failed), "✗ Operation failed");
780        assert_eq!(format!("{}", cancelled), "⚠ Task cancelled");
781        assert_eq!(
782            format!("{}", partial),
783            "⚠ Mostly worked (warning: Minor issue)"
784        );
785    }
786
787    /// Test task status display
788    #[test]
789    fn test_task_status_display() {
790        let pending = TaskStatus::Pending;
791        let running = TaskStatus::Running;
792        let completed = TaskStatus::Completed(TaskResult::Success("Done".to_string()));
793        let failed = TaskStatus::Failed("Error occurred".to_string());
794        let cancelled = TaskStatus::Cancelled;
795
796        assert_eq!(format!("{}", pending), "Pending");
797        assert_eq!(format!("{}", running), "Running");
798        assert_eq!(format!("{}", completed), "Completed: ✓ Done");
799        assert_eq!(format!("{}", failed), "Failed: Error occurred");
800        assert_eq!(format!("{}", cancelled), "Cancelled");
801    }
802
803    /// Test format conversion task
804    #[tokio::test]
805    async fn test_format_conversion_task() {
806        let tmp = TempDir::new().unwrap();
807        let input_file = tmp.path().join("input.srt");
808        let output_file = tmp.path().join("output.ass");
809
810        // Create valid SRT content
811        let srt_content = r#"1
81200:00:01,000 --> 00:00:03,000
813First subtitle
814
8152
81600:00:04,000 --> 00:00:06,000
817Second subtitle
818"#;
819
820        tokio::fs::write(&input_file, srt_content).await.unwrap();
821
822        let task = FileProcessingTask {
823            input_path: input_file.clone(),
824            output_path: Some(output_file.clone()),
825            operation: ProcessingOperation::ConvertFormat {
826                from: "srt".to_string(),
827                to: "ass".to_string(),
828            },
829        };
830
831        let result = task.execute().await;
832        assert!(matches!(result, TaskResult::Success(_)));
833
834        // Note: The convert_format method is a stub that returns the input path
835        // In a real implementation, this would create an actual output file
836        assert!(tokio::fs::metadata(&input_file).await.is_ok());
837    }
838
839    /// Test file matching task
840    #[tokio::test]
841    async fn test_file_matching_task() {
842        let tmp = TempDir::new().unwrap();
843        let video_file = tmp.path().join("movie.mkv");
844        let subtitle_file = tmp.path().join("movie.srt");
845
846        // Create test files
847        tokio::fs::write(&video_file, b"fake video content")
848            .await
849            .unwrap();
850        tokio::fs::write(&subtitle_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
851            .await
852            .unwrap();
853
854        let task = FileProcessingTask {
855            input_path: tmp.path().to_path_buf(),
856            output_path: None,
857            operation: ProcessingOperation::MatchFiles { recursive: false },
858        };
859
860        let result = task.execute().await;
861        assert!(matches!(result, TaskResult::Success(_)));
862    }
863
864    /// Test sync subtitle task (expected to fail)
865    #[tokio::test]
866    async fn test_sync_subtitle_task() {
867        let tmp = TempDir::new().unwrap();
868        let audio_file = tmp.path().join("audio.wav");
869        let subtitle_file = tmp.path().join("subtitle.srt");
870
871        tokio::fs::write(&audio_file, b"fake audio content")
872            .await
873            .unwrap();
874        tokio::fs::write(&subtitle_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
875            .await
876            .unwrap();
877
878        let task = FileProcessingTask {
879            input_path: subtitle_file.clone(),
880            output_path: None,
881            operation: ProcessingOperation::SyncSubtitle {
882                audio_path: audio_file,
883            },
884        };
885
886        let result = task.execute().await;
887        // Sync is not implemented, so should fail
888        assert!(matches!(result, TaskResult::Failed(_)));
889    }
890
891    /// Test task error handling
892    #[tokio::test]
893    async fn test_task_error_handling() {
894        // Test with sync operation which always fails in stub implementation
895        let tmp = TempDir::new().unwrap();
896        let test_file = tmp.path().join("test.srt");
897
898        let task = FileProcessingTask {
899            input_path: test_file,
900            output_path: None,
901            operation: ProcessingOperation::SyncSubtitle {
902                audio_path: tmp.path().join("audio.wav"),
903            },
904        };
905
906        let result = task.execute().await;
907        assert!(matches!(result, TaskResult::Failed(_)));
908    }
909
910    /// Test task timeout handling
911    #[tokio::test]
912    async fn test_task_timeout() {
913        use async_trait::async_trait;
914
915        struct SlowTask {
916            duration: Duration,
917        }
918
919        #[async_trait]
920        impl Task for SlowTask {
921            async fn execute(&self) -> TaskResult {
922                tokio::time::sleep(self.duration).await;
923                TaskResult::Success("Slow task completed".to_string())
924            }
925            fn task_type(&self) -> &'static str {
926                "slow"
927            }
928            fn task_id(&self) -> String {
929                "slow_task_1".to_string()
930            }
931            fn estimated_duration(&self) -> Option<Duration> {
932                Some(self.duration)
933            }
934        }
935
936        let slow_task = SlowTask {
937            duration: Duration::from_millis(100),
938        };
939
940        // Test estimated duration
941        assert_eq!(
942            slow_task.estimated_duration(),
943            Some(Duration::from_millis(100))
944        );
945
946        // Test execution
947        let start = std::time::Instant::now();
948        let result = slow_task.execute().await;
949        let elapsed = start.elapsed();
950
951        assert!(matches!(result, TaskResult::Success(_)));
952        assert!(elapsed >= Duration::from_millis(90)); // Allow some variance
953    }
954
955    /// Test processing operation variants
956    #[test]
957    fn test_processing_operation_variants() {
958        let convert_op = ProcessingOperation::ConvertFormat {
959            from: "srt".to_string(),
960            to: "ass".to_string(),
961        };
962
963        let sync_op = ProcessingOperation::SyncSubtitle {
964            audio_path: std::path::PathBuf::from("audio.wav"),
965        };
966
967        let match_op = ProcessingOperation::MatchFiles { recursive: true };
968        let validate_op = ProcessingOperation::ValidateFormat;
969
970        // Test debug formatting
971        assert!(format!("{:?}", convert_op).contains("ConvertFormat"));
972        assert!(format!("{:?}", sync_op).contains("SyncSubtitle"));
973        assert!(format!("{:?}", match_op).contains("MatchFiles"));
974        assert!(format!("{:?}", validate_op).contains("ValidateFormat"));
975
976        // Test cloning
977        let convert_clone = convert_op.clone();
978        assert!(format!("{:?}", convert_clone).contains("ConvertFormat"));
979    }
980
981    /// Test custom task implementation
982    #[tokio::test]
983    async fn test_custom_task_implementation() {
984        use async_trait::async_trait;
985
986        struct CustomTask {
987            id: String,
988            should_succeed: bool,
989        }
990
991        #[async_trait]
992        impl Task for CustomTask {
993            async fn execute(&self) -> TaskResult {
994                if self.should_succeed {
995                    TaskResult::Success(format!("Custom task {} succeeded", self.id))
996                } else {
997                    TaskResult::Failed(format!("Custom task {} failed", self.id))
998                }
999            }
1000
1001            fn task_type(&self) -> &'static str {
1002                "custom"
1003            }
1004
1005            fn task_id(&self) -> String {
1006                self.id.clone()
1007            }
1008
1009            fn description(&self) -> String {
1010                format!("Custom task with ID: {}", self.id)
1011            }
1012
1013            fn estimated_duration(&self) -> Option<Duration> {
1014                Some(Duration::from_millis(1))
1015            }
1016        }
1017
1018        // Test successful custom task
1019        let success_task = CustomTask {
1020            id: "success_1".to_string(),
1021            should_succeed: true,
1022        };
1023
1024        assert_eq!(success_task.task_type(), "custom");
1025        assert_eq!(success_task.task_id(), "success_1");
1026        assert_eq!(success_task.description(), "Custom task with ID: success_1");
1027        assert_eq!(
1028            success_task.estimated_duration(),
1029            Some(Duration::from_millis(1))
1030        );
1031
1032        let result = success_task.execute().await;
1033        assert!(matches!(result, TaskResult::Success(_)));
1034
1035        // Test failing custom task
1036        let fail_task = CustomTask {
1037            id: "fail_1".to_string(),
1038            should_succeed: false,
1039        };
1040
1041        let result = fail_task.execute().await;
1042        assert!(matches!(result, TaskResult::Failed(_)));
1043    }
1044
1045    #[tokio::test]
1046    async fn test_resolve_filename_conflict_sequential_suffixes() {
1047        let tmp = TempDir::new().unwrap();
1048        let base = tmp.path().join("x.txt");
1049        tokio::fs::write(&base, b"first").await.unwrap();
1050
1051        let (p1, f1) = resolve_filename_conflict(base.clone()).unwrap();
1052        assert_eq!(p1.file_name().unwrap(), "x.1.txt");
1053        drop(f1);
1054
1055        let (p2, _f2) = resolve_filename_conflict(base.clone()).unwrap();
1056        assert_eq!(p2.file_name().unwrap(), "x.2.txt");
1057    }
1058
1059    #[tokio::test]
1060    async fn test_execute_copy_operation_atomic() {
1061        let tmp = TempDir::new().unwrap();
1062        let src = tmp.path().join("src.txt");
1063        let dst = tmp.path().join("dst.txt");
1064        tokio::fs::write(&src, b"payload").await.unwrap();
1065
1066        let task = FileProcessingTask {
1067            input_path: src.clone(),
1068            output_path: None,
1069            operation: ProcessingOperation::ValidateFormat,
1070        };
1071        task.execute_copy_operation(&src, &dst).await.unwrap();
1072        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"payload");
1073    }
1074
1075    #[tokio::test]
1076    async fn test_execute_move_operation_deletes_source() {
1077        let tmp = TempDir::new().unwrap();
1078        let src = tmp.path().join("from.txt");
1079        let dst = tmp.path().join("to.txt");
1080        tokio::fs::write(&src, b"moved").await.unwrap();
1081
1082        let task = FileProcessingTask {
1083            input_path: src.clone(),
1084            output_path: None,
1085            operation: ProcessingOperation::ValidateFormat,
1086        };
1087        task.execute_move_operation(&src, &dst).await.unwrap();
1088        assert!(tokio::fs::metadata(&src).await.is_err());
1089        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"moved");
1090    }
1091}