1use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use serde::{Deserialize, Serialize};
5use std::time::{Duration, SystemTime};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
9pub enum TranscodeStatus {
10 Queued,
12 Running,
14 Completed,
16 Failed,
18 Cancelled,
20 Paused,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
26pub enum JobPriority {
27 Low = 0,
29 #[default]
31 Normal = 1,
32 High = 2,
34 Critical = 3,
36}
37
38#[derive(Debug, Clone)]
40pub struct TranscodeJobConfig {
41 pub config: TranscodeConfig,
43 pub priority: JobPriority,
45 pub max_retries: u32,
47 pub timeout: Option<Duration>,
49 pub metadata: std::collections::HashMap<String, String>,
51}
52
53impl TranscodeJobConfig {
54 #[must_use]
56 pub fn new(config: TranscodeConfig) -> Self {
57 Self {
58 config,
59 priority: JobPriority::Normal,
60 max_retries: 3,
61 timeout: None,
62 metadata: std::collections::HashMap::new(),
63 }
64 }
65
66 #[must_use]
68 pub fn with_priority(mut self, priority: JobPriority) -> Self {
69 self.priority = priority;
70 self
71 }
72
73 #[must_use]
75 pub fn with_max_retries(mut self, retries: u32) -> Self {
76 self.max_retries = retries;
77 self
78 }
79
80 #[must_use]
82 pub fn with_timeout(mut self, timeout: Duration) -> Self {
83 self.timeout = Some(timeout);
84 self
85 }
86
87 #[must_use]
89 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
90 self.metadata.insert(key.into(), value.into());
91 self
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct TranscodeJob {
98 pub id: String,
100 pub config: TranscodeJobConfig,
102 pub status: TranscodeStatus,
104 pub retry_count: u32,
106 pub created_at: SystemTime,
108 pub started_at: Option<SystemTime>,
110 pub completed_at: Option<SystemTime>,
112 pub error: Option<String>,
114 pub output: Option<TranscodeOutput>,
116 pub progress: f64,
118}
119
120impl TranscodeJob {
121 #[must_use]
123 pub fn new(config: TranscodeJobConfig) -> Self {
124 Self {
125 id: Self::generate_id(),
126 config,
127 status: TranscodeStatus::Queued,
128 retry_count: 0,
129 created_at: SystemTime::now(),
130 started_at: None,
131 completed_at: None,
132 error: None,
133 output: None,
134 progress: 0.0,
135 }
136 }
137
138 fn generate_id() -> String {
140 use std::time::UNIX_EPOCH;
141
142 let timestamp = SystemTime::now()
143 .duration_since(UNIX_EPOCH)
144 .unwrap_or_default()
145 .as_micros();
146
147 format!("job_{timestamp}")
148 }
149
150 pub fn start(&mut self) {
152 self.status = TranscodeStatus::Running;
153 self.started_at = Some(SystemTime::now());
154 }
155
156 pub fn complete(&mut self, output: TranscodeOutput) {
158 self.status = TranscodeStatus::Completed;
159 self.completed_at = Some(SystemTime::now());
160 self.output = Some(output);
161 self.progress = 100.0;
162 }
163
164 pub fn fail(&mut self, error: impl Into<String>) {
166 self.status = TranscodeStatus::Failed;
167 self.completed_at = Some(SystemTime::now());
168 self.error = Some(error.into());
169 }
170
171 pub fn cancel(&mut self) {
173 self.status = TranscodeStatus::Cancelled;
174 self.completed_at = Some(SystemTime::now());
175 }
176
177 pub fn pause(&mut self) {
179 if self.status == TranscodeStatus::Running {
180 self.status = TranscodeStatus::Paused;
181 }
182 }
183
184 pub fn resume(&mut self) {
186 if self.status == TranscodeStatus::Paused {
187 self.status = TranscodeStatus::Running;
188 }
189 }
190
191 pub fn update_progress(&mut self, progress: f64) {
193 self.progress = progress.clamp(0.0, 100.0);
194 }
195
196 pub fn increment_retry(&mut self) {
198 self.retry_count += 1;
199 }
200
201 #[must_use]
203 pub fn can_retry(&self) -> bool {
204 self.status == TranscodeStatus::Failed && self.retry_count < self.config.max_retries
205 }
206
207 #[must_use]
209 pub fn elapsed_time(&self) -> Option<Duration> {
210 self.started_at
211 .and_then(|start| SystemTime::now().duration_since(start).ok())
212 }
213
214 #[must_use]
216 pub fn total_time(&self) -> Option<Duration> {
217 self.completed_at
218 .and_then(|end| end.duration_since(self.created_at).ok())
219 }
220
221 #[must_use]
223 pub fn is_timed_out(&self) -> bool {
224 if let Some(timeout) = self.config.timeout {
225 if let Some(elapsed) = self.elapsed_time() {
226 return elapsed > timeout;
227 }
228 }
229 false
230 }
231
232 #[must_use]
234 pub fn status_string(&self) -> String {
235 match self.status {
236 TranscodeStatus::Queued => "Queued".to_string(),
237 TranscodeStatus::Running => format!("Running ({:.1}%)", self.progress),
238 TranscodeStatus::Completed => "Completed".to_string(),
239 TranscodeStatus::Failed => {
240 if let Some(ref error) = self.error {
241 format!("Failed: {error}")
242 } else {
243 "Failed".to_string()
244 }
245 }
246 TranscodeStatus::Cancelled => "Cancelled".to_string(),
247 TranscodeStatus::Paused => format!("Paused ({:.1}%)", self.progress),
248 }
249 }
250}
251
252pub struct JobQueue {
254 jobs: Vec<TranscodeJob>,
255 max_concurrent: usize,
256}
257
258impl JobQueue {
259 #[must_use]
261 pub fn new(max_concurrent: usize) -> Self {
262 Self {
263 jobs: Vec::new(),
264 max_concurrent,
265 }
266 }
267
268 pub fn enqueue(&mut self, job: TranscodeJob) {
270 self.jobs.push(job);
271 self.sort_by_priority();
272 }
273
274 #[must_use]
276 pub fn dequeue(&mut self) -> Option<TranscodeJob> {
277 let running_count = self
278 .jobs
279 .iter()
280 .filter(|j| j.status == TranscodeStatus::Running)
281 .count();
282
283 if running_count >= self.max_concurrent {
284 return None;
285 }
286
287 if let Some(index) = self
289 .jobs
290 .iter()
291 .position(|j| j.status == TranscodeStatus::Queued)
292 {
293 let mut job = self.jobs.remove(index);
294 job.start();
295 self.jobs.push(job.clone());
296 Some(job)
297 } else {
298 None
299 }
300 }
301
302 #[must_use]
304 pub fn len(&self) -> usize {
305 self.jobs.len()
306 }
307
308 #[must_use]
310 pub fn is_empty(&self) -> bool {
311 self.jobs.is_empty()
312 }
313
314 #[must_use]
316 pub fn running_count(&self) -> usize {
317 self.jobs
318 .iter()
319 .filter(|j| j.status == TranscodeStatus::Running)
320 .count()
321 }
322
323 #[must_use]
325 pub fn queued_count(&self) -> usize {
326 self.jobs
327 .iter()
328 .filter(|j| j.status == TranscodeStatus::Queued)
329 .count()
330 }
331
332 #[allow(dead_code)]
334 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
335 if let Some(job) = self.jobs.iter_mut().find(|j| j.id == job_id) {
336 job.cancel();
337 Ok(())
338 } else {
339 Err(TranscodeError::JobError(format!("Job not found: {job_id}")))
340 }
341 }
342
343 #[must_use]
345 pub fn get_job(&self, job_id: &str) -> Option<&TranscodeJob> {
346 #[allow(dead_code)]
347 self.jobs.iter().find(|j| j.id == job_id)
348 }
349
350 pub fn clear_finished(&mut self) {
352 self.jobs.retain(|j| {
353 !matches!(
354 j.status,
355 TranscodeStatus::Completed | TranscodeStatus::Failed | TranscodeStatus::Cancelled
356 )
357 });
358 }
359
360 fn sort_by_priority(&mut self) {
361 self.jobs.sort_by(|a, b| {
362 b.config
363 .priority
364 .cmp(&a.config.priority)
365 .then_with(|| a.created_at.cmp(&b.created_at))
366 });
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn test_job_creation() {
376 let config = TranscodeJobConfig::new(TranscodeConfig::default());
377 let job = TranscodeJob::new(config);
378
379 assert_eq!(job.status, TranscodeStatus::Queued);
380 assert_eq!(job.retry_count, 0);
381 assert_eq!(job.progress, 0.0);
382 assert!(job.started_at.is_none());
383 assert!(job.completed_at.is_none());
384 }
385
386 #[test]
387 fn test_job_lifecycle() {
388 let config = TranscodeJobConfig::new(TranscodeConfig::default());
389 let mut job = TranscodeJob::new(config);
390
391 job.start();
393 assert_eq!(job.status, TranscodeStatus::Running);
394 assert!(job.started_at.is_some());
395
396 job.update_progress(50.0);
398 assert_eq!(job.progress, 50.0);
399
400 let output = TranscodeOutput {
402 output_path: "test.mp4".to_string(),
403 file_size: 1000,
404 duration: 60.0,
405 video_bitrate: 5_000_000,
406 audio_bitrate: 128_000,
407 encoding_time: 30.0,
408 speed_factor: 2.0,
409 };
410 job.complete(output);
411
412 assert_eq!(job.status, TranscodeStatus::Completed);
413 assert_eq!(job.progress, 100.0);
414 assert!(job.completed_at.is_some());
415 assert!(job.output.is_some());
416 }
417
418 #[test]
419 fn test_job_failure() {
420 let config = TranscodeJobConfig::new(TranscodeConfig::default());
421 let mut job = TranscodeJob::new(config);
422
423 job.start();
424 job.fail("Test error");
425
426 assert_eq!(job.status, TranscodeStatus::Failed);
427 assert_eq!(job.error, Some("Test error".to_string()));
428 assert!(job.completed_at.is_some());
429 }
430
431 #[test]
432 fn test_job_retry() {
433 let config = TranscodeJobConfig::new(TranscodeConfig::default()).with_max_retries(3);
434 let mut job = TranscodeJob::new(config);
435
436 job.fail("Error");
437 assert!(job.can_retry());
438
439 job.increment_retry();
440 assert_eq!(job.retry_count, 1);
441 assert!(job.can_retry());
442
443 job.increment_retry();
444 job.increment_retry();
445 assert_eq!(job.retry_count, 3);
446 assert!(!job.can_retry());
447 }
448
449 #[test]
450 fn test_job_pause_resume() {
451 let config = TranscodeJobConfig::new(TranscodeConfig::default());
452 let mut job = TranscodeJob::new(config);
453
454 job.start();
455 assert_eq!(job.status, TranscodeStatus::Running);
456
457 job.pause();
458 assert_eq!(job.status, TranscodeStatus::Paused);
459
460 job.resume();
461 assert_eq!(job.status, TranscodeStatus::Running);
462 }
463
464 #[test]
465 fn test_job_queue() {
466 let mut queue = JobQueue::new(2);
467 assert_eq!(queue.len(), 0);
468 assert!(queue.is_empty());
469
470 let config1 = TranscodeJobConfig::new(TranscodeConfig::default());
471 let config2 = TranscodeJobConfig::new(TranscodeConfig::default());
472
473 queue.enqueue(TranscodeJob::new(config1));
474 queue.enqueue(TranscodeJob::new(config2));
475
476 assert_eq!(queue.len(), 2);
477 assert!(!queue.is_empty());
478 assert_eq!(queue.queued_count(), 2);
479 assert_eq!(queue.running_count(), 0);
480 }
481
482 #[test]
483 fn test_job_queue_priority() {
484 let mut queue = JobQueue::new(5);
485
486 let low =
487 TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::Low);
488 let high =
489 TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::High);
490 let normal =
491 TranscodeJobConfig::new(TranscodeConfig::default()).with_priority(JobPriority::Normal);
492
493 queue.enqueue(TranscodeJob::new(low));
494 queue.enqueue(TranscodeJob::new(high));
495 queue.enqueue(TranscodeJob::new(normal));
496
497 let next = queue.dequeue().expect("should succeed in test");
499 assert_eq!(next.config.priority, JobPriority::High);
500 }
501
502 #[test]
503 fn test_job_queue_clear_finished() {
504 let mut queue = JobQueue::new(5);
505
506 let config = TranscodeJobConfig::new(TranscodeConfig::default());
507 let mut job = TranscodeJob::new(config);
508 job.complete(TranscodeOutput {
509 output_path: "test.mp4".to_string(),
510 file_size: 1000,
511 duration: 60.0,
512 video_bitrate: 5_000_000,
513 audio_bitrate: 128_000,
514 encoding_time: 30.0,
515 speed_factor: 2.0,
516 });
517
518 queue.enqueue(job);
519 assert_eq!(queue.len(), 1);
520
521 queue.clear_finished();
522 assert_eq!(queue.len(), 0);
523 }
524
525 #[test]
526 fn test_job_config_builder() {
527 let config = TranscodeJobConfig::new(TranscodeConfig::default())
528 .with_priority(JobPriority::High)
529 .with_max_retries(5)
530 .with_timeout(Duration::from_secs(3600))
531 .with_metadata("user", "test_user")
532 .with_metadata("project", "test_project");
533
534 assert_eq!(config.priority, JobPriority::High);
535 assert_eq!(config.max_retries, 5);
536 assert_eq!(config.timeout, Some(Duration::from_secs(3600)));
537 assert_eq!(config.metadata.get("user"), Some(&"test_user".to_string()));
538 assert_eq!(
539 config.metadata.get("project"),
540 Some(&"test_project".to_string())
541 );
542 }
543}