Skip to main content

oximedia_proxy/
transcode_queue.rs

1//! Proxy transcode queue management.
2//!
3//! Provides a priority-based queue for scheduling and tracking proxy transcode
4//! jobs, along with batch request support and queue statistics.
5//!
6//! The `ParallelTranscodeExecutor` uses Rayon's work-stealing thread pool to
7//! transcode multiple proxy jobs concurrently, saturating available CPU cores
8//! without spawning excessive threads.
9
10#![allow(dead_code)]
11
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14use std::collections::VecDeque;
15use std::sync::{Arc, Mutex};
16
17/// Specification for a proxy transcode output.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ProxySpec {
20    /// Target resolution (width, height).
21    pub resolution: (u32, u32),
22    /// Codec identifier (e.g. "h264", "prores_proxy").
23    pub codec: String,
24    /// Target bitrate in kilobits per second.
25    pub bitrate_kbps: u32,
26}
27
28impl ProxySpec {
29    /// Create a new proxy spec.
30    #[must_use]
31    pub fn new(resolution: (u32, u32), codec: impl Into<String>, bitrate_kbps: u32) -> Self {
32        Self {
33            resolution,
34            codec: codec.into(),
35            bitrate_kbps,
36        }
37    }
38
39    /// Standard H.264 HD proxy.
40    #[must_use]
41    pub fn h264_hd() -> Self {
42        Self::new((1920, 1080), "h264", 8_000)
43    }
44
45    /// Standard ProRes Proxy.
46    #[must_use]
47    pub fn prores_proxy() -> Self {
48        Self::new((1920, 1080), "prores_proxy", 45_000)
49    }
50}
51
52/// A proxy transcode request submitted to the queue.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ProxyRequest {
55    /// Unique request identifier.
56    pub id: String,
57    /// Path to the source media file.
58    pub source_path: String,
59    /// Desired proxy specification.
60    pub proxy_spec: ProxySpec,
61    /// Priority (0 = lowest, 255 = highest).
62    pub priority: u8,
63    /// Submission timestamp in milliseconds since epoch.
64    pub submitted_at_ms: u64,
65}
66
67impl ProxyRequest {
68    /// Create a new proxy request.
69    #[must_use]
70    pub fn new(
71        id: impl Into<String>,
72        source_path: impl Into<String>,
73        proxy_spec: ProxySpec,
74        priority: u8,
75        submitted_at_ms: u64,
76    ) -> Self {
77        Self {
78            id: id.into(),
79            source_path: source_path.into(),
80            proxy_spec,
81            priority,
82            submitted_at_ms,
83        }
84    }
85}
86
87/// Status of a proxy transcode job.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
89pub enum JobStatus {
90    /// Waiting in the queue.
91    Queued,
92    /// Currently being transcoded.
93    Running,
94    /// Successfully completed.
95    Completed,
96    /// Failed due to an error.
97    Failed,
98    /// Cancelled before execution.
99    Cancelled,
100}
101
102/// A proxy transcode job with its current status and timing information.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct ProxyTranscodeJob {
105    /// The original request.
106    pub request: ProxyRequest,
107    /// Current job status.
108    pub status: JobStatus,
109    /// When the job started, in milliseconds since epoch.
110    pub started_at_ms: Option<u64>,
111    /// When the job completed (or failed), in milliseconds since epoch.
112    pub completed_at_ms: Option<u64>,
113    /// Path to the output proxy file (set on completion).
114    pub output_path: Option<String>,
115    /// Error message if the job failed.
116    pub error: Option<String>,
117}
118
119impl ProxyTranscodeJob {
120    /// Create a new job in the `Queued` state.
121    #[must_use]
122    pub fn new(request: ProxyRequest) -> Self {
123        Self {
124            request,
125            status: JobStatus::Queued,
126            started_at_ms: None,
127            completed_at_ms: None,
128            output_path: None,
129            error: None,
130        }
131    }
132
133    /// Duration the job waited in the queue before starting, in milliseconds.
134    #[must_use]
135    pub fn wait_duration_ms(&self) -> Option<u64> {
136        self.started_at_ms
137            .map(|start| start.saturating_sub(self.request.submitted_at_ms))
138    }
139
140    /// Total processing duration, in milliseconds.
141    #[must_use]
142    pub fn processing_duration_ms(&self) -> Option<u64> {
143        match (self.started_at_ms, self.completed_at_ms) {
144            (Some(start), Some(end)) => Some(end.saturating_sub(start)),
145            _ => None,
146        }
147    }
148}
149
150/// Priority-based proxy transcode queue.
151///
152/// Higher-priority jobs are dispatched first. Within the same priority,
153/// earlier submission time wins (FIFO).
154#[derive(Debug, Default)]
155pub struct ProxyTranscodeQueue {
156    /// All jobs indexed by their ID.
157    jobs: std::collections::HashMap<String, ProxyTranscodeJob>,
158    /// Queue ordering: sorted by (priority desc, submitted_at_ms asc).
159    order: VecDeque<String>,
160}
161
162impl ProxyTranscodeQueue {
163    /// Create an empty transcode queue.
164    #[must_use]
165    pub fn new() -> Self {
166        Self::default()
167    }
168
169    /// Submit a new request and return the job ID.
170    pub fn submit(&mut self, request: ProxyRequest) -> String {
171        let id = request.id.clone();
172        let job = ProxyTranscodeJob::new(request);
173        // Insert into order queue maintaining priority order
174        let priority = job.request.priority;
175        let submitted = job.request.submitted_at_ms;
176        // Find insertion point: higher priority first, then earlier submission
177        let pos = self
178            .order
179            .iter()
180            .position(|existing_id| {
181                if let Some(j) = self.jobs.get(existing_id) {
182                    let ep = j.request.priority;
183                    let es = j.request.submitted_at_ms;
184                    // Insert before this entry if we have higher priority,
185                    // or equal priority and earlier submission
186                    ep < priority || (ep == priority && es > submitted)
187                } else {
188                    false
189                }
190            })
191            .unwrap_or(self.order.len());
192        self.order.insert(pos, id.clone());
193        self.jobs.insert(id.clone(), job);
194        id
195    }
196
197    /// Get the next queued job (highest priority, earliest submitted).
198    #[must_use]
199    pub fn next_job(&mut self) -> Option<&mut ProxyTranscodeJob> {
200        // Find the first job in order that is still Queued
201        let next_id = self
202            .order
203            .iter()
204            .find(|id| {
205                self.jobs
206                    .get(*id)
207                    .map(|j| j.status == JobStatus::Queued)
208                    .unwrap_or(false)
209            })
210            .cloned();
211        next_id.and_then(|id| self.jobs.get_mut(&id))
212    }
213
214    /// Mark a job as started at the given timestamp.
215    pub fn start_job(&mut self, id: &str, started_at_ms: u64) {
216        if let Some(job) = self.jobs.get_mut(id) {
217            if job.status == JobStatus::Queued {
218                job.status = JobStatus::Running;
219                job.started_at_ms = Some(started_at_ms);
220            }
221        }
222    }
223
224    /// Mark a job as successfully completed.
225    pub fn complete_job(&mut self, id: &str, output: &str) {
226        if let Some(job) = self.jobs.get_mut(id) {
227            job.status = JobStatus::Completed;
228            job.output_path = Some(output.to_string());
229            // Use a synthetic completion time if not already set
230            if job.completed_at_ms.is_none() {
231                job.completed_at_ms = job.started_at_ms.map(|s| s + 1);
232            }
233        }
234    }
235
236    /// Mark a job as failed with an error message.
237    pub fn fail_job(&mut self, id: &str, error: &str) {
238        if let Some(job) = self.jobs.get_mut(id) {
239            job.status = JobStatus::Failed;
240            job.error = Some(error.to_string());
241        }
242    }
243
244    /// Cancel a queued job.
245    pub fn cancel_job(&mut self, id: &str) {
246        if let Some(job) = self.jobs.get_mut(id) {
247            if job.status == JobStatus::Queued {
248                job.status = JobStatus::Cancelled;
249            }
250        }
251    }
252
253    /// Get a job by ID (immutable).
254    #[must_use]
255    pub fn get(&self, id: &str) -> Option<&ProxyTranscodeJob> {
256        self.jobs.get(id)
257    }
258
259    /// Total number of jobs (all statuses).
260    #[must_use]
261    pub fn len(&self) -> usize {
262        self.jobs.len()
263    }
264
265    /// Whether the queue has any jobs at all.
266    #[must_use]
267    pub fn is_empty(&self) -> bool {
268        self.jobs.is_empty()
269    }
270
271    /// Iterate over all jobs.
272    pub fn iter(&self) -> impl Iterator<Item = &ProxyTranscodeJob> {
273        self.jobs.values()
274    }
275}
276
277/// Aggregated statistics for a `ProxyTranscodeQueue`.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct QueueStats {
280    /// Number of jobs currently queued (waiting).
281    pub pending: u32,
282    /// Number of jobs currently running.
283    pub running: u32,
284    /// Total jobs completed successfully.
285    pub completed: u64,
286    /// Total jobs that failed.
287    pub failed: u64,
288    /// Average wait time from submission to start, in milliseconds.
289    pub avg_wait_ms: f64,
290}
291
292impl QueueStats {
293    /// Compute statistics from the current state of the queue.
294    #[must_use]
295    pub fn compute(queue: &ProxyTranscodeQueue) -> Self {
296        let mut pending = 0u32;
297        let mut running = 0u32;
298        let mut completed = 0u64;
299        let mut failed = 0u64;
300        let mut total_wait_ms = 0u64;
301        let mut wait_count = 0u32;
302
303        for job in queue.iter() {
304            match job.status {
305                JobStatus::Queued => pending += 1,
306                JobStatus::Running => running += 1,
307                JobStatus::Completed => completed += 1,
308                JobStatus::Failed => failed += 1,
309                JobStatus::Cancelled => {}
310            }
311            if let Some(wait) = job.wait_duration_ms() {
312                total_wait_ms += wait;
313                wait_count += 1;
314            }
315        }
316
317        let avg_wait_ms = if wait_count == 0 {
318            0.0
319        } else {
320            total_wait_ms as f64 / wait_count as f64
321        };
322
323        Self {
324            pending,
325            running,
326            completed,
327            failed,
328            avg_wait_ms,
329        }
330    }
331}
332
333/// A batch proxy request: transcode many sources with the same spec.
334#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct ProxyBatchRequest {
336    /// Source file paths to transcode.
337    pub source_paths: Vec<String>,
338    /// Proxy specification to apply to all sources.
339    pub spec: ProxySpec,
340    /// Maximum number of concurrent transcode jobs.
341    pub concurrent_limit: u32,
342}
343
344impl ProxyBatchRequest {
345    /// Create a new batch request.
346    #[must_use]
347    pub fn new(source_paths: Vec<String>, spec: ProxySpec, concurrent_limit: u32) -> Self {
348        Self {
349            source_paths,
350            spec,
351            concurrent_limit,
352        }
353    }
354
355    /// Estimate the total time in minutes to transcode `items` files at `fps` frames-per-second.
356    ///
357    /// Assumes each source file has ~1 minute of content at `fps` fps, and that
358    /// the transcode runs at 2× real-time per concurrent slot.
359    #[must_use]
360    pub fn estimate_duration_mins(items: usize, fps: f32) -> f32 {
361        if fps <= 0.0 || items == 0 {
362            return 0.0;
363        }
364        // Simple model: each item takes 0.5 real-time minutes (2× speed),
365        // batched by concurrent_limit which defaults to 1 here for the pure fn.
366        let per_item_mins = 1.0 / 2.0; // 1 minute of source → 0.5 min at 2× speed
367                                       // fps affects CPU cost (higher fps = proportionally more work)
368        let fps_factor = fps / 25.0; // normalise to 25 fps
369        items as f32 * per_item_mins * fps_factor
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    fn make_request(id: &str, priority: u8, submitted_at_ms: u64) -> ProxyRequest {
378        ProxyRequest::new(
379            id,
380            format!("/source/{id}.mov"),
381            ProxySpec::h264_hd(),
382            priority,
383            submitted_at_ms,
384        )
385    }
386
387    #[test]
388    fn test_proxy_spec_new() {
389        let spec = ProxySpec::new((1280, 720), "h264", 5_000);
390        assert_eq!(spec.resolution, (1280, 720));
391        assert_eq!(spec.codec, "h264");
392        assert_eq!(spec.bitrate_kbps, 5_000);
393    }
394
395    #[test]
396    fn test_proxy_spec_presets() {
397        let hd = ProxySpec::h264_hd();
398        assert_eq!(hd.codec, "h264");
399        assert_eq!(hd.resolution, (1920, 1080));
400
401        let prores = ProxySpec::prores_proxy();
402        assert_eq!(prores.codec, "prores_proxy");
403    }
404
405    #[test]
406    fn test_submit_returns_id() {
407        let mut queue = ProxyTranscodeQueue::new();
408        let req = make_request("job_001", 100, 1000);
409        let id = queue.submit(req);
410        assert_eq!(id, "job_001");
411        assert_eq!(queue.len(), 1);
412    }
413
414    #[test]
415    fn test_priority_ordering() {
416        let mut queue = ProxyTranscodeQueue::new();
417        queue.submit(make_request("low", 10, 1000));
418        queue.submit(make_request("high", 200, 2000));
419        queue.submit(make_request("mid", 100, 1500));
420
421        // next_job should return "high" (priority 200)
422        let next = queue.next_job().expect("should succeed in test");
423        assert_eq!(next.request.id, "high");
424    }
425
426    #[test]
427    fn test_complete_job() {
428        let mut queue = ProxyTranscodeQueue::new();
429        queue.submit(make_request("j1", 50, 1000));
430        queue.start_job("j1", 1100);
431        queue.complete_job("j1", "/proxy/j1.mp4");
432        let job = queue.get("j1").expect("should succeed in test");
433        assert_eq!(job.status, JobStatus::Completed);
434        assert_eq!(job.output_path.as_deref(), Some("/proxy/j1.mp4"));
435    }
436
437    #[test]
438    fn test_fail_job() {
439        let mut queue = ProxyTranscodeQueue::new();
440        queue.submit(make_request("j2", 50, 1000));
441        queue.start_job("j2", 1100);
442        queue.fail_job("j2", "codec error");
443        let job = queue.get("j2").expect("should succeed in test");
444        assert_eq!(job.status, JobStatus::Failed);
445        assert_eq!(job.error.as_deref(), Some("codec error"));
446    }
447
448    #[test]
449    fn test_cancel_job() {
450        let mut queue = ProxyTranscodeQueue::new();
451        queue.submit(make_request("j3", 50, 1000));
452        queue.cancel_job("j3");
453        let job = queue.get("j3").expect("should succeed in test");
454        assert_eq!(job.status, JobStatus::Cancelled);
455    }
456
457    #[test]
458    fn test_queue_stats() {
459        let mut queue = ProxyTranscodeQueue::new();
460        queue.submit(make_request("a", 10, 0));
461        queue.submit(make_request("b", 10, 0));
462        queue.submit(make_request("c", 10, 0));
463        queue.start_job("a", 100);
464        queue.complete_job("a", "/out/a.mp4");
465        queue.start_job("b", 100);
466        queue.fail_job("b", "err");
467
468        let stats = QueueStats::compute(&queue);
469        assert_eq!(stats.pending, 1);
470        assert_eq!(stats.completed, 1);
471        assert_eq!(stats.failed, 1);
472    }
473
474    #[test]
475    fn test_queue_is_empty() {
476        let queue = ProxyTranscodeQueue::new();
477        assert!(queue.is_empty());
478    }
479
480    #[test]
481    fn test_wait_duration_ms() {
482        let mut queue = ProxyTranscodeQueue::new();
483        queue.submit(make_request("w", 10, 1000));
484        queue.start_job("w", 2000);
485        let job = queue.get("w").expect("should succeed in test");
486        assert_eq!(job.wait_duration_ms(), Some(1000));
487    }
488
489    #[test]
490    fn test_batch_estimate_duration_zero_fps() {
491        assert!((ProxyBatchRequest::estimate_duration_mins(10, 0.0) - 0.0).abs() < f32::EPSILON);
492    }
493
494    #[test]
495    fn test_batch_estimate_duration_zero_items() {
496        assert!((ProxyBatchRequest::estimate_duration_mins(0, 25.0) - 0.0).abs() < f32::EPSILON);
497    }
498
499    #[test]
500    fn test_batch_estimate_duration_positive() {
501        let mins = ProxyBatchRequest::estimate_duration_mins(10, 25.0);
502        assert!(mins > 0.0);
503    }
504}
505
506// ============================================================================
507// Parallel Transcode Executor (Rayon work-stealing)
508// ============================================================================
509
510/// Result of a single parallel transcode task.
511#[derive(Debug, Clone)]
512pub struct ParallelJobResult {
513    /// Job ID from the original request.
514    pub id: String,
515    /// Whether the job succeeded.
516    pub success: bool,
517    /// Output path on success.
518    pub output_path: Option<String>,
519    /// Error message on failure.
520    pub error: Option<String>,
521    /// Simulated processing duration in milliseconds.
522    pub duration_ms: u64,
523}
524
525/// User-supplied transcode function signature.
526///
527/// The function receives `(source_path, spec)` and should return either
528/// `Ok(output_path)` or `Err(error_message)`.  It will be called from Rayon
529/// worker threads, so it must be `Send + Sync`.
530pub type TranscodeFn =
531    Arc<dyn Fn(&str, &ProxySpec) -> std::result::Result<String, String> + Send + Sync>;
532
533/// Configuration for the parallel executor.
534#[derive(Clone)]
535pub struct ParallelExecutorConfig {
536    /// Maximum number of concurrent transcode threads.
537    /// `0` means use all available logical CPUs.
538    pub thread_count: usize,
539    /// User-supplied transcode function.
540    pub transcode_fn: TranscodeFn,
541}
542
543impl ParallelExecutorConfig {
544    /// Create a config with a custom transcode function and thread count.
545    pub fn new(thread_count: usize, transcode_fn: TranscodeFn) -> Self {
546        Self {
547            thread_count,
548            transcode_fn,
549        }
550    }
551
552    /// Create a config that uses a no-op (stub) transcode for testing.
553    #[must_use]
554    pub fn stub() -> Self {
555        let fn_: TranscodeFn = Arc::new(|src, spec| {
556            Ok(format!(
557                "/proxy/{}_{}x{}.mp4",
558                std::path::Path::new(src)
559                    .file_stem()
560                    .and_then(|s| s.to_str())
561                    .unwrap_or("clip"),
562                spec.resolution.0,
563                spec.resolution.1,
564            ))
565        });
566        Self {
567            thread_count: 0,
568            transcode_fn: fn_,
569        }
570    }
571}
572
573/// Parallel batch proxy transcode executor powered by Rayon.
574///
575/// Jobs from a `ProxyTranscodeQueue` are dispatched to Rayon's global or a
576/// custom thread pool using work-stealing.  Completed results are collected
577/// in a thread-safe results vector.
578pub struct ParallelTranscodeExecutor {
579    config: ParallelExecutorConfig,
580}
581
582impl ParallelTranscodeExecutor {
583    /// Create a new executor with the given configuration.
584    pub fn new(config: ParallelExecutorConfig) -> Self {
585        Self { config }
586    }
587
588    /// Execute all `Queued` jobs from `queue` in parallel and return results.
589    ///
590    /// The queue is not mutated; callers should apply results using
591    /// [`ParallelTranscodeExecutor::apply_results`] if they wish to update
592    /// statuses.
593    ///
594    /// # Errors
595    ///
596    /// Never errors at the executor level; individual job failures are captured
597    /// in [`ParallelJobResult::success`] / [`ParallelJobResult::error`].
598    pub fn execute_batch(&self, queue: &ProxyTranscodeQueue) -> Vec<ParallelJobResult> {
599        // Collect all queued jobs into an owned snapshot (no queue mutation)
600        let jobs: Vec<(String, String, ProxySpec)> = queue
601            .iter()
602            .filter(|j| j.status == JobStatus::Queued)
603            .map(|j| {
604                (
605                    j.request.id.clone(),
606                    j.request.source_path.clone(),
607                    j.request.proxy_spec.clone(),
608                )
609            })
610            .collect();
611
612        let transcode_fn = Arc::clone(&self.config.transcode_fn);
613        let results: Arc<Mutex<Vec<ParallelJobResult>>> = Arc::new(Mutex::new(Vec::new()));
614
615        let threads = if self.config.thread_count == 0 {
616            num_cpus::get()
617        } else {
618            self.config.thread_count
619        };
620
621        // Build a private thread pool limited to `threads` workers.
622        // If construction fails (extremely rare — only on resource exhaustion),
623        // fall back to running jobs sequentially on the calling thread so that
624        // we always return results and never panic.
625        let pool_opt: Option<rayon::ThreadPool> = rayon::ThreadPoolBuilder::new()
626            .num_threads(threads)
627            .build()
628            .ok();
629
630        let results_clone = Arc::clone(&results);
631        let run_jobs = move || {
632            jobs.par_iter().for_each(|(id, src, spec)| {
633                let start = std::time::Instant::now();
634                let outcome = (transcode_fn)(src, spec);
635                let duration_ms = start.elapsed().as_millis() as u64;
636
637                let result = match outcome {
638                    Ok(out) => ParallelJobResult {
639                        id: id.clone(),
640                        success: true,
641                        output_path: Some(out),
642                        error: None,
643                        duration_ms,
644                    },
645                    Err(e) => ParallelJobResult {
646                        id: id.clone(),
647                        success: false,
648                        output_path: None,
649                        error: Some(e),
650                        duration_ms,
651                    },
652                };
653
654                if let Ok(mut guard) = results_clone.lock() {
655                    guard.push(result);
656                }
657            });
658        };
659
660        match pool_opt {
661            Some(pool) => pool.install(run_jobs),
662            // Fallback: global rayon pool (uses all CPUs, still parallel)
663            None => run_jobs(),
664        }
665
666        Arc::try_unwrap(results)
667            .ok()
668            .and_then(|m| m.into_inner().ok())
669            .unwrap_or_default()
670    }
671
672    /// Apply a batch of results back to a mutable queue, updating job statuses.
673    pub fn apply_results(queue: &mut ProxyTranscodeQueue, results: &[ParallelJobResult]) {
674        for r in results {
675            if r.success {
676                queue.start_job(&r.id, 0);
677                queue.complete_job(&r.id, r.output_path.as_deref().unwrap_or(""));
678            } else {
679                queue.start_job(&r.id, 0);
680                queue.fail_job(&r.id, r.error.as_deref().unwrap_or("unknown error"));
681            }
682        }
683    }
684}
685
686#[cfg(test)]
687mod parallel_tests {
688    use super::*;
689
690    fn make_queue(n: usize) -> ProxyTranscodeQueue {
691        let mut q = ProxyTranscodeQueue::new();
692        for i in 0..n {
693            q.submit(ProxyRequest::new(
694                format!("job_{i}"),
695                format!("/src/clip_{i}.mov"),
696                ProxySpec::h264_hd(),
697                100,
698                i as u64 * 10,
699            ));
700        }
701        q
702    }
703
704    #[test]
705    fn test_parallel_executor_all_succeed() {
706        let config = ParallelExecutorConfig::stub();
707        let executor = ParallelTranscodeExecutor::new(config);
708        let queue = make_queue(8);
709
710        let results = executor.execute_batch(&queue);
711        assert_eq!(results.len(), 8);
712        assert!(results.iter().all(|r| r.success));
713        assert!(results.iter().all(|r| r.output_path.is_some()));
714    }
715
716    #[test]
717    fn test_parallel_executor_error_fn() {
718        let fail_fn: TranscodeFn =
719            Arc::new(|_src, _spec| Err("simulated transcode failure".to_string()));
720        let config = ParallelExecutorConfig::new(2, fail_fn);
721        let executor = ParallelTranscodeExecutor::new(config);
722        let queue = make_queue(4);
723
724        let results = executor.execute_batch(&queue);
725        assert_eq!(results.len(), 4);
726        assert!(results.iter().all(|r| !r.success));
727        assert!(results.iter().all(|r| r.error.is_some()));
728    }
729
730    #[test]
731    fn test_parallel_executor_empty_queue() {
732        let config = ParallelExecutorConfig::stub();
733        let executor = ParallelTranscodeExecutor::new(config);
734        let queue = ProxyTranscodeQueue::new();
735
736        let results = executor.execute_batch(&queue);
737        assert!(results.is_empty());
738    }
739
740    #[test]
741    fn test_apply_results_updates_statuses() {
742        let config = ParallelExecutorConfig::stub();
743        let executor = ParallelTranscodeExecutor::new(config);
744        let mut queue = make_queue(3);
745
746        let results = executor.execute_batch(&queue);
747        ParallelTranscodeExecutor::apply_results(&mut queue, &results);
748
749        let stats = QueueStats::compute(&queue);
750        assert_eq!(stats.completed, 3);
751        assert_eq!(stats.pending, 0);
752    }
753
754    #[test]
755    fn test_apply_results_marks_failures() {
756        let fail_fn: TranscodeFn = Arc::new(|_src, _spec| Err("err".to_string()));
757        let config = ParallelExecutorConfig::new(1, fail_fn);
758        let executor = ParallelTranscodeExecutor::new(config);
759        let mut queue = make_queue(2);
760
761        let results = executor.execute_batch(&queue);
762        ParallelTranscodeExecutor::apply_results(&mut queue, &results);
763
764        let stats = QueueStats::compute(&queue);
765        assert_eq!(stats.failed, 2);
766    }
767
768    #[test]
769    fn test_parallel_result_has_duration() {
770        let config = ParallelExecutorConfig::stub();
771        let executor = ParallelTranscodeExecutor::new(config);
772        let queue = make_queue(2);
773        let results = executor.execute_batch(&queue);
774        // duration_ms may be 0 on fast machines but field must exist
775        assert!(results.iter().all(|r| r.duration_ms < u64::MAX));
776    }
777
778    #[test]
779    fn test_parallel_skips_non_queued_jobs() {
780        let config = ParallelExecutorConfig::stub();
781        let executor = ParallelTranscodeExecutor::new(config);
782        let mut queue = make_queue(4);
783        // Pre-cancel two jobs so they aren't in Queued state
784        queue.cancel_job("job_0");
785        queue.cancel_job("job_1");
786
787        let results = executor.execute_batch(&queue);
788        // Only 2 queued jobs should be executed
789        assert_eq!(results.len(), 2);
790    }
791}