subx_cli/core/parallel/
task.rs

1//! Task definition and utilities for parallel processing
2use async_trait::async_trait;
3use std::fmt;
4
5/// Trait defining a unit of work that can be executed asynchronously.
6///
7/// All tasks in the parallel processing system must implement this trait
8/// to provide execution logic and metadata.
9#[async_trait]
10pub trait Task: Send + Sync {
11    /// Executes the task and returns the result.
12    async fn execute(&self) -> TaskResult;
13    /// Returns the type identifier for this task.
14    fn task_type(&self) -> &'static str;
15    /// Returns a unique identifier for this specific task instance.
16    fn task_id(&self) -> String;
17    /// Returns an estimated duration for the task execution.
18    fn estimated_duration(&self) -> Option<std::time::Duration> {
19        None
20    }
21    /// Returns a human-readable description of the task.
22    fn description(&self) -> String {
23        format!("{} task", self.task_type())
24    }
25}
26
27/// Result of task execution indicating success, failure, or partial completion.
28///
29/// Provides detailed information about the outcome of a task execution,
30/// including success/failure status and descriptive messages.
31#[derive(Debug, Clone)]
32pub enum TaskResult {
33    /// Task completed successfully with a result message
34    Success(String),
35    /// Task failed with an error message
36    Failed(String),
37    /// Task was cancelled before completion
38    Cancelled,
39    /// Task partially completed with success and failure messages
40    PartialSuccess(String, String),
41}
42
43/// Current execution status of a task in the system.
44///
45/// Tracks the lifecycle of a task from initial queuing through completion
46/// or failure, providing detailed status information.
47#[derive(Debug, Clone)]
48pub enum TaskStatus {
49    /// Task is queued and waiting for execution
50    Pending,
51    /// Task is currently being executed
52    Running,
53    /// Task completed successfully or with partial success
54    Completed(TaskResult),
55    /// Task failed during execution
56    Failed(String),
57    /// Task was cancelled before or during execution
58    Cancelled,
59}
60
61impl fmt::Display for TaskResult {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            TaskResult::Success(msg) => write!(f, "✓ {}", msg),
65            TaskResult::Failed(msg) => write!(f, "✗ {}", msg),
66            TaskResult::Cancelled => write!(f, "⚠ Task cancelled"),
67            TaskResult::PartialSuccess(success, warn) => {
68                write!(f, "⚠ {} (warning: {})", success, warn)
69            }
70        }
71    }
72}
73
74impl fmt::Display for TaskStatus {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        match self {
77            TaskStatus::Pending => write!(f, "Pending"),
78            TaskStatus::Running => write!(f, "Running"),
79            TaskStatus::Completed(result) => write!(f, "Completed: {}", result),
80            TaskStatus::Failed(msg) => write!(f, "Failed: {}", msg),
81            TaskStatus::Cancelled => write!(f, "Cancelled"),
82        }
83    }
84}
85
86/// Task for processing files (convert, sync, match, validate).
87///
88/// Represents a file processing operation that can be executed
89/// asynchronously in the parallel processing system.
90pub struct FileProcessingTask {
91    /// Path to the input file to be processed
92    pub input_path: std::path::PathBuf,
93    /// Optional output path for the processed file
94    pub output_path: Option<std::path::PathBuf>,
95    /// The specific operation to perform on the file
96    pub operation: ProcessingOperation,
97}
98
99/// Supported operations for file processing tasks.
100///
101/// Defines the different types of operations that can be performed
102/// on subtitle and video files in the processing system.
103#[derive(Debug, Clone)]
104pub enum ProcessingOperation {
105    /// Convert subtitle format from one type to another
106    ConvertFormat {
107        /// Source format (e.g., "srt", "ass")
108        from: String,
109        /// Target format (e.g., "srt", "ass")
110        to: String,
111    },
112    /// Synchronize subtitle timing with audio
113    SyncSubtitle {
114        /// Path to the audio file for synchronization
115        audio_path: std::path::PathBuf,
116    },
117    /// Match subtitle files with video files
118    MatchFiles {
119        /// Whether to search recursively in subdirectories
120        recursive: bool,
121    },
122    /// Validate subtitle file format and structure
123    ValidateFormat,
124}
125
126#[async_trait]
127impl Task for FileProcessingTask {
128    async fn execute(&self) -> TaskResult {
129        match &self.operation {
130            ProcessingOperation::ConvertFormat { from, to } => {
131                match self.convert_format(from, to).await {
132                    Ok(path) => TaskResult::Success(format!(
133                        "Successfully converted {} -> {}: {}",
134                        from,
135                        to,
136                        path.display()
137                    )),
138                    Err(e) => TaskResult::Failed(format!(
139                        "Conversion failed {}: {}",
140                        self.input_path.display(),
141                        e
142                    )),
143                }
144            }
145            ProcessingOperation::SyncSubtitle { .. } => {
146                // Sync not supported in parallel tasks
147                TaskResult::Failed("Sync functionality not implemented".to_string())
148            }
149            ProcessingOperation::MatchFiles { recursive } => {
150                match self.match_files(*recursive).await {
151                    Ok(m) => TaskResult::Success(format!(
152                        "File matching completed: found {} matches",
153                        m.len()
154                    )),
155                    Err(e) => TaskResult::Failed(format!("Matching failed: {}", e)),
156                }
157            }
158            ProcessingOperation::ValidateFormat => match self.validate_format().await {
159                Ok(true) => TaskResult::Success(format!(
160                    "Format validation passed: {}",
161                    self.input_path.display()
162                )),
163                Ok(false) => TaskResult::Failed(format!(
164                    "Format validation failed: {}",
165                    self.input_path.display()
166                )),
167                Err(e) => TaskResult::Failed(format!("Validation error: {}", e)),
168            },
169        }
170    }
171
172    fn task_type(&self) -> &'static str {
173        match &self.operation {
174            ProcessingOperation::ConvertFormat { .. } => "convert",
175            ProcessingOperation::SyncSubtitle { .. } => "sync",
176            ProcessingOperation::MatchFiles { .. } => "match",
177            ProcessingOperation::ValidateFormat => "validate",
178        }
179    }
180
181    fn task_id(&self) -> String {
182        use std::collections::hash_map::DefaultHasher;
183        use std::hash::{Hash, Hasher};
184        let mut hasher = DefaultHasher::new();
185        self.input_path.hash(&mut hasher);
186        self.operation.hash(&mut hasher);
187        format!("{}_{:x}", self.task_type(), hasher.finish())
188    }
189
190    fn estimated_duration(&self) -> Option<std::time::Duration> {
191        if let Ok(meta) = std::fs::metadata(&self.input_path) {
192            let size_mb = meta.len() as f64 / 1_048_576.0;
193            let secs = match &self.operation {
194                ProcessingOperation::ConvertFormat { .. } => size_mb * 0.1,
195                ProcessingOperation::SyncSubtitle { .. } => size_mb * 0.5,
196                ProcessingOperation::MatchFiles { .. } => 2.0,
197                ProcessingOperation::ValidateFormat => size_mb * 0.05,
198            };
199            Some(std::time::Duration::from_secs_f64(secs))
200        } else {
201            None
202        }
203    }
204
205    fn description(&self) -> String {
206        match &self.operation {
207            ProcessingOperation::ConvertFormat { from, to } => {
208                format!(
209                    "Convert {} from {} to {}",
210                    self.input_path.display(),
211                    from,
212                    to
213                )
214            }
215            ProcessingOperation::SyncSubtitle { audio_path } => format!(
216                "Sync subtitle {} with audio {}",
217                self.input_path.display(),
218                audio_path.display()
219            ),
220            ProcessingOperation::MatchFiles { recursive } => format!(
221                "Match files in {}{}",
222                self.input_path.display(),
223                if *recursive { " (recursive)" } else { "" }
224            ),
225            ProcessingOperation::ValidateFormat => {
226                format!("Validate format of {}", self.input_path.display())
227            }
228        }
229    }
230}
231
232impl FileProcessingTask {
233    async fn convert_format(&self, _from: &str, _to: &str) -> crate::Result<std::path::PathBuf> {
234        // Stub convert: simply return input path
235        Ok(self.input_path.clone())
236    }
237
238    async fn sync_subtitle(
239        &self,
240        _audio_path: &std::path::Path,
241    ) -> crate::Result<crate::core::sync::SyncResult> {
242        // Stub implementation: sync not available
243        Err(crate::error::SubXError::parallel_processing(
244            "sync_subtitle not implemented".to_string(),
245        ))
246    }
247
248    async fn match_files(&self, _recursive: bool) -> crate::Result<Vec<()>> {
249        // Stub implementation: no actual matching
250        Ok(Vec::new())
251    }
252
253    async fn validate_format(&self) -> crate::Result<bool> {
254        // Stub validate: always succeed
255        Ok(true)
256    }
257}
258
259// impl Hash for ProcessingOperation to support task_id generation
260impl std::hash::Hash for ProcessingOperation {
261    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
262        match self {
263            ProcessingOperation::ConvertFormat { from, to } => {
264                "convert".hash(state);
265                from.hash(state);
266                to.hash(state);
267            }
268            ProcessingOperation::SyncSubtitle { audio_path } => {
269                "sync".hash(state);
270                audio_path.hash(state);
271            }
272            ProcessingOperation::MatchFiles { recursive } => {
273                "match".hash(state);
274                recursive.hash(state);
275            }
276            ProcessingOperation::ValidateFormat => {
277                "validate".hash(state);
278            }
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use std::time::Duration;
287    use tempfile::TempDir;
288
289    #[tokio::test]
290    async fn test_file_processing_task_validate_format() {
291        let tmp = TempDir::new().unwrap();
292        let test_file = tmp.path().join("test.srt");
293        tokio::fs::write(&test_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
294            .await
295            .unwrap();
296        let task = FileProcessingTask {
297            input_path: test_file.clone(),
298            output_path: None,
299            operation: ProcessingOperation::ValidateFormat,
300        };
301        let result = task.execute().await;
302        assert!(matches!(result, TaskResult::Success(_)));
303    }
304
305    /// Test task lifecycle and status transitions
306    #[tokio::test]
307    async fn test_task_lifecycle() {
308        let tmp = TempDir::new().unwrap();
309        let test_file = tmp.path().join("lifecycle.srt");
310        tokio::fs::write(
311            &test_file,
312            "1\n00:00:01,000 --> 00:00:02,000\nLifecycle test\n",
313        )
314        .await
315        .unwrap();
316
317        let task = FileProcessingTask {
318            input_path: test_file.clone(),
319            output_path: None,
320            operation: ProcessingOperation::ValidateFormat,
321        };
322
323        // Test initial task properties
324        assert_eq!(task.task_type(), "validate");
325        assert!(!task.task_id().is_empty());
326        assert!(task.description().contains("Validate format"));
327        assert!(task.description().contains("lifecycle.srt"));
328        assert!(
329            task.estimated_duration().is_some(),
330            "Should estimate duration for existing file"
331        );
332
333        // Test execution
334        let result = task.execute().await;
335        assert!(matches!(result, TaskResult::Success(_)));
336    }
337
338    /// Test task result serialization and display
339    #[test]
340    fn test_task_result_display() {
341        let success = TaskResult::Success("Operation completed".to_string());
342        let failed = TaskResult::Failed("Operation failed".to_string());
343        let cancelled = TaskResult::Cancelled;
344        let partial =
345            TaskResult::PartialSuccess("Mostly worked".to_string(), "Minor issue".to_string());
346
347        assert_eq!(format!("{}", success), "✓ Operation completed");
348        assert_eq!(format!("{}", failed), "✗ Operation failed");
349        assert_eq!(format!("{}", cancelled), "⚠ Task cancelled");
350        assert_eq!(
351            format!("{}", partial),
352            "⚠ Mostly worked (warning: Minor issue)"
353        );
354    }
355
356    /// Test task status display
357    #[test]
358    fn test_task_status_display() {
359        let pending = TaskStatus::Pending;
360        let running = TaskStatus::Running;
361        let completed = TaskStatus::Completed(TaskResult::Success("Done".to_string()));
362        let failed = TaskStatus::Failed("Error occurred".to_string());
363        let cancelled = TaskStatus::Cancelled;
364
365        assert_eq!(format!("{}", pending), "Pending");
366        assert_eq!(format!("{}", running), "Running");
367        assert_eq!(format!("{}", completed), "Completed: ✓ Done");
368        assert_eq!(format!("{}", failed), "Failed: Error occurred");
369        assert_eq!(format!("{}", cancelled), "Cancelled");
370    }
371
372    /// Test format conversion task
373    #[tokio::test]
374    async fn test_format_conversion_task() {
375        let tmp = TempDir::new().unwrap();
376        let input_file = tmp.path().join("input.srt");
377        let output_file = tmp.path().join("output.ass");
378
379        // Create valid SRT content
380        let srt_content = r#"1
38100:00:01,000 --> 00:00:03,000
382First subtitle
383
3842
38500:00:04,000 --> 00:00:06,000
386Second subtitle
387"#;
388
389        tokio::fs::write(&input_file, srt_content).await.unwrap();
390
391        let task = FileProcessingTask {
392            input_path: input_file.clone(),
393            output_path: Some(output_file.clone()),
394            operation: ProcessingOperation::ConvertFormat {
395                from: "srt".to_string(),
396                to: "ass".to_string(),
397            },
398        };
399
400        let result = task.execute().await;
401        assert!(matches!(result, TaskResult::Success(_)));
402
403        // Note: The convert_format method is a stub that returns the input path
404        // In a real implementation, this would create an actual output file
405        assert!(tokio::fs::metadata(&input_file).await.is_ok());
406    }
407
408    /// Test file matching task
409    #[tokio::test]
410    async fn test_file_matching_task() {
411        let tmp = TempDir::new().unwrap();
412        let video_file = tmp.path().join("movie.mkv");
413        let subtitle_file = tmp.path().join("movie.srt");
414
415        // Create test files
416        tokio::fs::write(&video_file, b"fake video content")
417            .await
418            .unwrap();
419        tokio::fs::write(&subtitle_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
420            .await
421            .unwrap();
422
423        let task = FileProcessingTask {
424            input_path: tmp.path().to_path_buf(),
425            output_path: None,
426            operation: ProcessingOperation::MatchFiles { recursive: false },
427        };
428
429        let result = task.execute().await;
430        assert!(matches!(result, TaskResult::Success(_)));
431    }
432
433    /// Test sync subtitle task (expected to fail)
434    #[tokio::test]
435    async fn test_sync_subtitle_task() {
436        let tmp = TempDir::new().unwrap();
437        let audio_file = tmp.path().join("audio.wav");
438        let subtitle_file = tmp.path().join("subtitle.srt");
439
440        tokio::fs::write(&audio_file, b"fake audio content")
441            .await
442            .unwrap();
443        tokio::fs::write(&subtitle_file, "1\n00:00:01,000 --> 00:00:02,000\nTest\n")
444            .await
445            .unwrap();
446
447        let task = FileProcessingTask {
448            input_path: subtitle_file.clone(),
449            output_path: None,
450            operation: ProcessingOperation::SyncSubtitle {
451                audio_path: audio_file,
452            },
453        };
454
455        let result = task.execute().await;
456        // Sync is not implemented, so should fail
457        assert!(matches!(result, TaskResult::Failed(_)));
458    }
459
460    /// Test task error handling
461    #[tokio::test]
462    async fn test_task_error_handling() {
463        // Test with sync operation which always fails in stub implementation
464        let tmp = TempDir::new().unwrap();
465        let test_file = tmp.path().join("test.srt");
466
467        let task = FileProcessingTask {
468            input_path: test_file,
469            output_path: None,
470            operation: ProcessingOperation::SyncSubtitle {
471                audio_path: tmp.path().join("audio.wav"),
472            },
473        };
474
475        let result = task.execute().await;
476        assert!(matches!(result, TaskResult::Failed(_)));
477    }
478
479    /// Test task timeout handling
480    #[tokio::test]
481    async fn test_task_timeout() {
482        use async_trait::async_trait;
483
484        struct SlowTask {
485            duration: Duration,
486        }
487
488        #[async_trait]
489        impl Task for SlowTask {
490            async fn execute(&self) -> TaskResult {
491                tokio::time::sleep(self.duration).await;
492                TaskResult::Success("Slow task completed".to_string())
493            }
494            fn task_type(&self) -> &'static str {
495                "slow"
496            }
497            fn task_id(&self) -> String {
498                "slow_task_1".to_string()
499            }
500            fn estimated_duration(&self) -> Option<Duration> {
501                Some(self.duration)
502            }
503        }
504
505        let slow_task = SlowTask {
506            duration: Duration::from_millis(100),
507        };
508
509        // Test estimated duration
510        assert_eq!(
511            slow_task.estimated_duration(),
512            Some(Duration::from_millis(100))
513        );
514
515        // Test execution
516        let start = std::time::Instant::now();
517        let result = slow_task.execute().await;
518        let elapsed = start.elapsed();
519
520        assert!(matches!(result, TaskResult::Success(_)));
521        assert!(elapsed >= Duration::from_millis(90)); // Allow some variance
522    }
523
524    /// Test processing operation variants
525    #[test]
526    fn test_processing_operation_variants() {
527        let convert_op = ProcessingOperation::ConvertFormat {
528            from: "srt".to_string(),
529            to: "ass".to_string(),
530        };
531
532        let sync_op = ProcessingOperation::SyncSubtitle {
533            audio_path: std::path::PathBuf::from("audio.wav"),
534        };
535
536        let match_op = ProcessingOperation::MatchFiles { recursive: true };
537        let validate_op = ProcessingOperation::ValidateFormat;
538
539        // Test debug formatting
540        assert!(format!("{:?}", convert_op).contains("ConvertFormat"));
541        assert!(format!("{:?}", sync_op).contains("SyncSubtitle"));
542        assert!(format!("{:?}", match_op).contains("MatchFiles"));
543        assert!(format!("{:?}", validate_op).contains("ValidateFormat"));
544
545        // Test cloning
546        let convert_clone = convert_op.clone();
547        assert!(format!("{:?}", convert_clone).contains("ConvertFormat"));
548    }
549
550    /// Test custom task implementation
551    #[tokio::test]
552    async fn test_custom_task_implementation() {
553        use async_trait::async_trait;
554
555        struct CustomTask {
556            id: String,
557            should_succeed: bool,
558        }
559
560        #[async_trait]
561        impl Task for CustomTask {
562            async fn execute(&self) -> TaskResult {
563                if self.should_succeed {
564                    TaskResult::Success(format!("Custom task {} succeeded", self.id))
565                } else {
566                    TaskResult::Failed(format!("Custom task {} failed", self.id))
567                }
568            }
569
570            fn task_type(&self) -> &'static str {
571                "custom"
572            }
573
574            fn task_id(&self) -> String {
575                self.id.clone()
576            }
577
578            fn description(&self) -> String {
579                format!("Custom task with ID: {}", self.id)
580            }
581
582            fn estimated_duration(&self) -> Option<Duration> {
583                Some(Duration::from_millis(1))
584            }
585        }
586
587        // Test successful custom task
588        let success_task = CustomTask {
589            id: "success_1".to_string(),
590            should_succeed: true,
591        };
592
593        assert_eq!(success_task.task_type(), "custom");
594        assert_eq!(success_task.task_id(), "success_1");
595        assert_eq!(success_task.description(), "Custom task with ID: success_1");
596        assert_eq!(
597            success_task.estimated_duration(),
598            Some(Duration::from_millis(1))
599        );
600
601        let result = success_task.execute().await;
602        assert!(matches!(result, TaskResult::Success(_)));
603
604        // Test failing custom task
605        let fail_task = CustomTask {
606            id: "fail_1".to_string(),
607            should_succeed: false,
608        };
609
610        let result = fail_task.execute().await;
611        assert!(matches!(result, TaskResult::Failed(_)));
612    }
613}