Skip to main content

oximedia_transcode/
transcode_job.rs

1//! Job management and queuing for transcode operations.
2
3use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use serde::{Deserialize, Serialize};
5use std::time::{Duration, SystemTime};
6
7/// Status of a transcode job.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
9pub enum TranscodeStatus {
10    /// Job is queued and waiting to start.
11    Queued,
12    /// Job is currently running.
13    Running,
14    /// Job completed successfully.
15    Completed,
16    /// Job failed with an error.
17    Failed,
18    /// Job was cancelled.
19    Cancelled,
20    /// Job is paused.
21    Paused,
22}
23
24/// Priority levels for transcode jobs.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
26pub enum JobPriority {
27    /// Low priority (background processing).
28    Low = 0,
29    /// Normal priority (default).
30    #[default]
31    Normal = 1,
32    /// High priority (time-sensitive).
33    High = 2,
34    /// Critical priority (highest).
35    Critical = 3,
36}
37
38/// Configuration for a transcode job.
39#[derive(Debug, Clone)]
40pub struct TranscodeJobConfig {
41    /// The transcode configuration.
42    pub config: TranscodeConfig,
43    /// Job priority.
44    pub priority: JobPriority,
45    /// Maximum number of retry attempts.
46    pub max_retries: u32,
47    /// Timeout for the job (None = no timeout).
48    pub timeout: Option<Duration>,
49    /// Job metadata (tags, labels, etc.).
50    pub metadata: std::collections::HashMap<String, String>,
51}
52
53impl TranscodeJobConfig {
54    /// Creates a new job configuration.
55    #[must_use]
56    pub fn new(config: TranscodeConfig) -> Self {
57        Self {
58            config,
59            priority: JobPriority::Normal,
60            max_retries: 3,
61            timeout: None,
62            metadata: std::collections::HashMap::new(),
63        }
64    }
65
66    /// Sets the job priority.
67    #[must_use]
68    pub fn with_priority(mut self, priority: JobPriority) -> Self {
69        self.priority = priority;
70        self
71    }
72
73    /// Sets the maximum number of retries.
74    #[must_use]
75    pub fn with_max_retries(mut self, retries: u32) -> Self {
76        self.max_retries = retries;
77        self
78    }
79
80    /// Sets the job timeout.
81    #[must_use]
82    pub fn with_timeout(mut self, timeout: Duration) -> Self {
83        self.timeout = Some(timeout);
84        self
85    }
86
87    /// Adds metadata to the job.
88    #[must_use]
89    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
90        self.metadata.insert(key.into(), value.into());
91        self
92    }
93}
94
95/// A transcode job with state tracking.
96#[derive(Debug, Clone)]
97pub struct TranscodeJob {
98    /// Unique job ID.
99    pub id: String,
100    /// Job configuration.
101    pub config: TranscodeJobConfig,
102    /// Current status.
103    pub status: TranscodeStatus,
104    /// Number of retry attempts made.
105    pub retry_count: u32,
106    /// Time when the job was created.
107    pub created_at: SystemTime,
108    /// Time when the job started.
109    pub started_at: Option<SystemTime>,
110    /// Time when the job completed or failed.
111    pub completed_at: Option<SystemTime>,
112    /// Error message if the job failed.
113    pub error: Option<String>,
114    /// Output if the job completed successfully.
115    pub output: Option<TranscodeOutput>,
116    /// Progress percentage (0-100).
117    pub progress: f64,
118}
119
120impl TranscodeJob {
121    /// Creates a new transcode job.
122    #[must_use]
123    pub fn new(config: TranscodeJobConfig) -> Self {
124        Self {
125            id: Self::generate_id(),
126            config,
127            status: TranscodeStatus::Queued,
128            retry_count: 0,
129            created_at: SystemTime::now(),
130            started_at: None,
131            completed_at: None,
132            error: None,
133            output: None,
134            progress: 0.0,
135        }
136    }
137
138    /// Generates a unique job ID.
139    fn generate_id() -> String {
140        use std::time::UNIX_EPOCH;
141
142        let timestamp = SystemTime::now()
143            .duration_since(UNIX_EPOCH)
144            .unwrap_or_default()
145            .as_micros();
146
147        format!("job_{timestamp}")
148    }
149
150    /// Marks the job as started.
151    pub fn start(&mut self) {
152        self.status = TranscodeStatus::Running;
153        self.started_at = Some(SystemTime::now());
154    }
155
156    /// Marks the job as completed with output.
157    pub fn complete(&mut self, output: TranscodeOutput) {
158        self.status = TranscodeStatus::Completed;
159        self.completed_at = Some(SystemTime::now());
160        self.output = Some(output);
161        self.progress = 100.0;
162    }
163
164    /// Marks the job as failed with an error message.
165    pub fn fail(&mut self, error: impl Into<String>) {
166        self.status = TranscodeStatus::Failed;
167        self.completed_at = Some(SystemTime::now());
168        self.error = Some(error.into());
169    }
170
171    /// Marks the job as cancelled.
172    pub fn cancel(&mut self) {
173        self.status = TranscodeStatus::Cancelled;
174        self.completed_at = Some(SystemTime::now());
175    }
176
177    /// Pauses the job.
178    pub fn pause(&mut self) {
179        if self.status == TranscodeStatus::Running {
180            self.status = TranscodeStatus::Paused;
181        }
182    }
183
184    /// Resumes the job.
185    pub fn resume(&mut self) {
186        if self.status == TranscodeStatus::Paused {
187            self.status = TranscodeStatus::Running;
188        }
189    }
190
191    /// Updates the job progress.
192    pub fn update_progress(&mut self, progress: f64) {
193        self.progress = progress.clamp(0.0, 100.0);
194    }
195
196    /// Increments the retry count.
197    pub fn increment_retry(&mut self) {
198        self.retry_count += 1;
199    }
200
201    /// Checks if the job can be retried.
202    #[must_use]
203    pub fn can_retry(&self) -> bool {
204        self.status == TranscodeStatus::Failed && self.retry_count < self.config.max_retries
205    }
206
207    /// Gets the elapsed time since the job started.
208    #[must_use]
209    pub fn elapsed_time(&self) -> Option<Duration> {
210        self.started_at
211            .and_then(|start| SystemTime::now().duration_since(start).ok())
212    }
213
214    /// Gets the total time from creation to completion.
215    #[must_use]
216    pub fn total_time(&self) -> Option<Duration> {
217        self.completed_at
218            .and_then(|end| end.duration_since(self.created_at).ok())
219    }
220
221    /// Checks if the job has timed out.
222    #[must_use]
223    pub fn is_timed_out(&self) -> bool {
224        if let Some(timeout) = self.config.timeout {
225            if let Some(elapsed) = self.elapsed_time() {
226                return elapsed > timeout;
227            }
228        }
229        false
230    }
231
232    /// Gets a human-readable status description.
233    #[must_use]
234    pub fn status_string(&self) -> String {
235        match self.status {
236            TranscodeStatus::Queued => "Queued".to_string(),
237            TranscodeStatus::Running => format!("Running ({:.1}%)", self.progress),
238            TranscodeStatus::Completed => "Completed".to_string(),
239            TranscodeStatus::Failed => {
240                if let Some(ref error) = self.error {
241                    format!("Failed: {error}")
242                } else {
243                    "Failed".to_string()
244                }
245            }
246            TranscodeStatus::Cancelled => "Cancelled".to_string(),
247            TranscodeStatus::Paused => format!("Paused ({:.1}%)", self.progress),
248        }
249    }
250}
251
252/// Job queue for managing multiple transcode jobs.
253pub struct JobQueue {
254    jobs: Vec<TranscodeJob>,
255    max_concurrent: usize,
256}
257
258impl JobQueue {
259    /// Creates a new job queue.
260    #[must_use]
261    pub fn new(max_concurrent: usize) -> Self {
262        Self {
263            jobs: Vec::new(),
264            max_concurrent,
265        }
266    }
267
268    /// Adds a job to the queue.
269    pub fn enqueue(&mut self, job: TranscodeJob) {
270        self.jobs.push(job);
271        self.sort_by_priority();
272    }
273
274    /// Gets the next job to execute.
275    #[must_use]
276    pub fn dequeue(&mut self) -> Option<TranscodeJob> {
277        let running_count = self
278            .jobs
279            .iter()
280            .filter(|j| j.status == TranscodeStatus::Running)
281            .count();
282
283        if running_count >= self.max_concurrent {
284            return None;
285        }
286
287        // Find first queued job
288        if let Some(index) = self
289            .jobs
290            .iter()
291            .position(|j| j.status == TranscodeStatus::Queued)
292        {
293            let mut job = self.jobs.remove(index);
294            job.start();
295            self.jobs.push(job.clone());
296            Some(job)
297        } else {
298            None
299        }
300    }
301
302    /// Gets the number of jobs in the queue.
303    #[must_use]
304    pub fn len(&self) -> usize {
305        self.jobs.len()
306    }
307
308    /// Checks if the queue is empty.
309    #[must_use]
310    pub fn is_empty(&self) -> bool {
311        self.jobs.is_empty()
312    }
313
314    /// Gets the number of running jobs.
315    #[must_use]
316    pub fn running_count(&self) -> usize {
317        self.jobs
318            .iter()
319            .filter(|j| j.status == TranscodeStatus::Running)
320            .count()
321    }
322
323    /// Gets the number of queued jobs.
324    #[must_use]
325    pub fn queued_count(&self) -> usize {
326        self.jobs
327            .iter()
328            .filter(|j| j.status == TranscodeStatus::Queued)
329            .count()
330    }
331
332    /// Cancels a job by ID.
333    #[allow(dead_code)]
334    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
335        if let Some(job) = self.jobs.iter_mut().find(|j| j.id == job_id) {
336            job.cancel();
337            Ok(())
338        } else {
339            Err(TranscodeError::JobError(format!("Job not found: {job_id}")))
340        }
341    }
342
343    /// Gets a job by ID.
344    #[must_use]
345    pub fn get_job(&self, job_id: &str) -> Option<&TranscodeJob> {
346        #[allow(dead_code)]
347        self.jobs.iter().find(|j| j.id == job_id)
348    }
349
350    /// Clears completed and failed jobs.
351    pub fn clear_finished(&mut self) {
352        self.jobs.retain(|j| {
353            !matches!(
354                j.status,
355                TranscodeStatus::Completed | TranscodeStatus::Failed | TranscodeStatus::Cancelled
356            )
357        });
358    }
359
360    fn sort_by_priority(&mut self) {
361        self.jobs.sort_by(|a, b| {
362            b.config
363                .priority
364                .cmp(&a.config.priority)
365                .then_with(|| a.created_at.cmp(&b.created_at))
366        });
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn test_job_creation() {
376        let config = TranscodeJobConfig::new(TranscodeConfig::default());
377        let job = TranscodeJob::new(config);
378
379        assert_eq!(job.status, TranscodeStatus::Queued);
380        assert_eq!(job.retry_count, 0);
381        assert_eq!(job.progress, 0.0);
382        assert!(job.started_at.is_none());
383        assert!(job.completed_at.is_none());
384    }
385
386    #[test]
387    fn test_job_lifecycle() {
388        let config = TranscodeJobConfig::new(TranscodeConfig::default());
389        let mut job = TranscodeJob::new(config);
390
391        // Start job
392        job.start();
393        assert_eq!(job.status, TranscodeStatus::Running);
394        assert!(job.started_at.is_some());
395
396        // Update progress
397        job.update_progress(50.0);
398        assert_eq!(job.progress, 50.0);
399
400        // Complete job
401        let output = TranscodeOutput {
402            output_path: "test.mp4".to_string(),
403            file_size: 1000,
404            duration: 60.0,
405            video_bitrate: 5_000_000,
406            audio_bitrate: 128_000,
407            encoding_time: 30.0,
408            speed_factor: 2.0,
409        };
410        job.complete(output);
411
412        assert_eq!(job.status, TranscodeStatus::Completed);
413        assert_eq!(job.progress, 100.0);
414        assert!(job.completed_at.is_some());
415        assert!(job.output.is_some());
416    }
417
418    #[test]
419    fn test_job_failure() {
420        let config = TranscodeJobConfig::new(TranscodeConfig::default());
421        let mut job = TranscodeJob::new(config);
422
423        job.start();
424        job.fail("Test error");
425
426        assert_eq!(job.status, TranscodeStatus::Failed);
427        assert_eq!(job.error, Some("Test error".to_string()));
428        assert!(job.completed_at.is_some());
429    }
430
431    #[test]
432    fn test_job_retry() {
433        let config = TranscodeJobConfig::new(TranscodeConfig::default()).with_max_retries(3);
434        let mut job = TranscodeJob::new(config);
435
436        job.fail("Error");
437        assert!(job.can_retry());
438
439        job.increment_retry();
440        assert_eq!(job.retry_count, 1);
441        assert!(job.can_retry());
442
443        job.increment_retry();
444        job.increment_retry();
445        assert_eq!(job.retry_count, 3);
446        assert!(!job.can_retry());
447    }
448
449    #[test]
450    fn test_job_pause_resume() {
451        let config = TranscodeJobConfig::new(TranscodeConfig::default());
452        let mut job = TranscodeJob::new(config);
453
454        job.start();
455        assert_eq!(job.status, TranscodeStatus::Running);
456
457        job.pause();
458        assert_eq!(job.status, TranscodeStatus::Paused);
459
460        job.resume();
461        assert_eq!(job.status, TranscodeStatus::Running);
462    }
463
464    #[test]
465    fn test_job_queue() {
466        let mut queue = JobQueue::new(2);
467        assert_eq!(queue.len(), 0);
468        assert!(queue.is_empty());
469
470        let config1 = TranscodeJobConfig::new(TranscodeConfig::default());
471        let config2 = TranscodeJobConfig::new(TranscodeConfig::default());
472
473        queue.enqueue(TranscodeJob::new(config1));
474        queue.enqueue(TranscodeJob::new(config2));
475
476        assert_eq!(queue.len(), 2);
477        assert!(!queue.is_empty());
478        assert_eq!(queue.queued_count(), 2);
479        assert_eq!(queue.running_count(), 0);
480    }
481
482    #[test]
483    fn test_job_queue_priority() {
484        let mut queue = JobQueue::new(5);
485
486        let low =
487            TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::Low);
488        let high =
489            TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::High);
490        let normal =
491            TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::Normal);
492
493        queue.enqueue(TranscodeJob::new(low));
494        queue.enqueue(TranscodeJob::new(high));
495        queue.enqueue(TranscodeJob::new(normal));
496
497        // High priority should be first
498        let next = queue.dequeue().expect("should succeed in test");
499        assert_eq!(next.config.priority, JobPriority::High);
500    }
501
502    #[test]
503    fn test_job_queue_clear_finished() {
504        let mut queue = JobQueue::new(5);
505
506        let config = TranscodeJobConfig::new(TranscodeConfig::default());
507        let mut job = TranscodeJob::new(config);
508        job.complete(TranscodeOutput {
509            output_path: "test.mp4".to_string(),
510            file_size: 1000,
511            duration: 60.0,
512            video_bitrate: 5_000_000,
513            audio_bitrate: 128_000,
514            encoding_time: 30.0,
515            speed_factor: 2.0,
516        });
517
518        queue.enqueue(job);
519        assert_eq!(queue.len(), 1);
520
521        queue.clear_finished();
522        assert_eq!(queue.len(), 0);
523    }
524
525    #[test]
526    fn test_job_config_builder() {
527        let config = TranscodeJobConfig::new(TranscodeConfig::default())
528            .with_priority(JobPriority::High)
529            .with_max_retries(5)
530            .with_timeout(Duration::from_secs(3600))
531            .with_metadata("user", "test_user")
532            .with_metadata("project", "test_project");
533
534        assert_eq!(config.priority, JobPriority::High);
535        assert_eq!(config.max_retries, 5);
536        assert_eq!(config.timeout, Some(Duration::from_secs(3600)));
537        assert_eq!(config.metadata.get("user"), Some(&"test_user".to_string()));
538        assert_eq!(
539            config.metadata.get("project"),
540            Some(&"test_project".to_string())
541        );
542    }
543}