Skip to main content

oximedia_proxy/
transcode_queue.rs

1//! Proxy transcode queue management.
2//!
3//! Provides a priority-based queue for scheduling and tracking proxy transcode
4//! jobs, along with batch request support and queue statistics.
5
6#![allow(dead_code)]
7
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10
11/// Specification for a proxy transcode output.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProxySpec {
14    /// Target resolution (width, height).
15    pub resolution: (u32, u32),
16    /// Codec identifier (e.g. "h264", "prores_proxy").
17    pub codec: String,
18    /// Target bitrate in kilobits per second.
19    pub bitrate_kbps: u32,
20}
21
22impl ProxySpec {
23    /// Create a new proxy spec.
24    #[must_use]
25    pub fn new(resolution: (u32, u32), codec: impl Into<String>, bitrate_kbps: u32) -> Self {
26        Self {
27            resolution,
28            codec: codec.into(),
29            bitrate_kbps,
30        }
31    }
32
33    /// Standard H.264 HD proxy.
34    #[must_use]
35    pub fn h264_hd() -> Self {
36        Self::new((1920, 1080), "h264", 8_000)
37    }
38
39    /// Standard ProRes Proxy.
40    #[must_use]
41    pub fn prores_proxy() -> Self {
42        Self::new((1920, 1080), "prores_proxy", 45_000)
43    }
44}
45
46/// A proxy transcode request submitted to the queue.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ProxyRequest {
49    /// Unique request identifier.
50    pub id: String,
51    /// Path to the source media file.
52    pub source_path: String,
53    /// Desired proxy specification.
54    pub proxy_spec: ProxySpec,
55    /// Priority (0 = lowest, 255 = highest).
56    pub priority: u8,
57    /// Submission timestamp in milliseconds since epoch.
58    pub submitted_at_ms: u64,
59}
60
61impl ProxyRequest {
62    /// Create a new proxy request.
63    #[must_use]
64    pub fn new(
65        id: impl Into<String>,
66        source_path: impl Into<String>,
67        proxy_spec: ProxySpec,
68        priority: u8,
69        submitted_at_ms: u64,
70    ) -> Self {
71        Self {
72            id: id.into(),
73            source_path: source_path.into(),
74            proxy_spec,
75            priority,
76            submitted_at_ms,
77        }
78    }
79}
80
81/// Status of a proxy transcode job.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum JobStatus {
84    /// Waiting in the queue.
85    Queued,
86    /// Currently being transcoded.
87    Running,
88    /// Successfully completed.
89    Completed,
90    /// Failed due to an error.
91    Failed,
92    /// Cancelled before execution.
93    Cancelled,
94}
95
96/// A proxy transcode job with its current status and timing information.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ProxyTranscodeJob {
99    /// The original request.
100    pub request: ProxyRequest,
101    /// Current job status.
102    pub status: JobStatus,
103    /// When the job started, in milliseconds since epoch.
104    pub started_at_ms: Option<u64>,
105    /// When the job completed (or failed), in milliseconds since epoch.
106    pub completed_at_ms: Option<u64>,
107    /// Path to the output proxy file (set on completion).
108    pub output_path: Option<String>,
109    /// Error message if the job failed.
110    pub error: Option<String>,
111}
112
113impl ProxyTranscodeJob {
114    /// Create a new job in the `Queued` state.
115    #[must_use]
116    pub fn new(request: ProxyRequest) -> Self {
117        Self {
118            request,
119            status: JobStatus::Queued,
120            started_at_ms: None,
121            completed_at_ms: None,
122            output_path: None,
123            error: None,
124        }
125    }
126
127    /// Duration the job waited in the queue before starting, in milliseconds.
128    #[must_use]
129    pub fn wait_duration_ms(&self) -> Option<u64> {
130        self.started_at_ms
131            .map(|start| start.saturating_sub(self.request.submitted_at_ms))
132    }
133
134    /// Total processing duration, in milliseconds.
135    #[must_use]
136    pub fn processing_duration_ms(&self) -> Option<u64> {
137        match (self.started_at_ms, self.completed_at_ms) {
138            (Some(start), Some(end)) => Some(end.saturating_sub(start)),
139            _ => None,
140        }
141    }
142}
143
144/// Priority-based proxy transcode queue.
145///
146/// Higher-priority jobs are dispatched first. Within the same priority,
147/// earlier submission time wins (FIFO).
148#[derive(Debug, Default)]
149pub struct ProxyTranscodeQueue {
150    /// All jobs indexed by their ID.
151    jobs: std::collections::HashMap<String, ProxyTranscodeJob>,
152    /// Queue ordering: sorted by (priority desc, submitted_at_ms asc).
153    order: VecDeque<String>,
154}
155
156impl ProxyTranscodeQueue {
157    /// Create an empty transcode queue.
158    #[must_use]
159    pub fn new() -> Self {
160        Self::default()
161    }
162
163    /// Submit a new request and return the job ID.
164    pub fn submit(&mut self, request: ProxyRequest) -> String {
165        let id = request.id.clone();
166        let job = ProxyTranscodeJob::new(request);
167        // Insert into order queue maintaining priority order
168        let priority = job.request.priority;
169        let submitted = job.request.submitted_at_ms;
170        // Find insertion point: higher priority first, then earlier submission
171        let pos = self
172            .order
173            .iter()
174            .position(|existing_id| {
175                if let Some(j) = self.jobs.get(existing_id) {
176                    let ep = j.request.priority;
177                    let es = j.request.submitted_at_ms;
178                    // Insert before this entry if we have higher priority,
179                    // or equal priority and earlier submission
180                    ep < priority || (ep == priority && es > submitted)
181                } else {
182                    false
183                }
184            })
185            .unwrap_or(self.order.len());
186        self.order.insert(pos, id.clone());
187        self.jobs.insert(id.clone(), job);
188        id
189    }
190
191    /// Get the next queued job (highest priority, earliest submitted).
192    #[must_use]
193    pub fn next_job(&mut self) -> Option<&mut ProxyTranscodeJob> {
194        // Find the first job in order that is still Queued
195        let next_id = self
196            .order
197            .iter()
198            .find(|id| {
199                self.jobs
200                    .get(*id)
201                    .map(|j| j.status == JobStatus::Queued)
202                    .unwrap_or(false)
203            })
204            .cloned();
205        next_id.and_then(|id| self.jobs.get_mut(&id))
206    }
207
208    /// Mark a job as started at the given timestamp.
209    pub fn start_job(&mut self, id: &str, started_at_ms: u64) {
210        if let Some(job) = self.jobs.get_mut(id) {
211            if job.status == JobStatus::Queued {
212                job.status = JobStatus::Running;
213                job.started_at_ms = Some(started_at_ms);
214            }
215        }
216    }
217
218    /// Mark a job as successfully completed.
219    pub fn complete_job(&mut self, id: &str, output: &str) {
220        if let Some(job) = self.jobs.get_mut(id) {
221            job.status = JobStatus::Completed;
222            job.output_path = Some(output.to_string());
223            // Use a synthetic completion time if not already set
224            if job.completed_at_ms.is_none() {
225                job.completed_at_ms = job.started_at_ms.map(|s| s + 1);
226            }
227        }
228    }
229
230    /// Mark a job as failed with an error message.
231    pub fn fail_job(&mut self, id: &str, error: &str) {
232        if let Some(job) = self.jobs.get_mut(id) {
233            job.status = JobStatus::Failed;
234            job.error = Some(error.to_string());
235        }
236    }
237
238    /// Cancel a queued job.
239    pub fn cancel_job(&mut self, id: &str) {
240        if let Some(job) = self.jobs.get_mut(id) {
241            if job.status == JobStatus::Queued {
242                job.status = JobStatus::Cancelled;
243            }
244        }
245    }
246
247    /// Get a job by ID (immutable).
248    #[must_use]
249    pub fn get(&self, id: &str) -> Option<&ProxyTranscodeJob> {
250        self.jobs.get(id)
251    }
252
253    /// Total number of jobs (all statuses).
254    #[must_use]
255    pub fn len(&self) -> usize {
256        self.jobs.len()
257    }
258
259    /// Whether the queue has any jobs at all.
260    #[must_use]
261    pub fn is_empty(&self) -> bool {
262        self.jobs.is_empty()
263    }
264
265    /// Iterate over all jobs.
266    pub fn iter(&self) -> impl Iterator<Item = &ProxyTranscodeJob> {
267        self.jobs.values()
268    }
269}
270
271/// Aggregated statistics for a `ProxyTranscodeQueue`.
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct QueueStats {
274    /// Number of jobs currently queued (waiting).
275    pub pending: u32,
276    /// Number of jobs currently running.
277    pub running: u32,
278    /// Total jobs completed successfully.
279    pub completed: u64,
280    /// Total jobs that failed.
281    pub failed: u64,
282    /// Average wait time from submission to start, in milliseconds.
283    pub avg_wait_ms: f64,
284}
285
286impl QueueStats {
287    /// Compute statistics from the current state of the queue.
288    #[must_use]
289    pub fn compute(queue: &ProxyTranscodeQueue) -> Self {
290        let mut pending = 0u32;
291        let mut running = 0u32;
292        let mut completed = 0u64;
293        let mut failed = 0u64;
294        let mut total_wait_ms = 0u64;
295        let mut wait_count = 0u32;
296
297        for job in queue.iter() {
298            match job.status {
299                JobStatus::Queued => pending += 1,
300                JobStatus::Running => running += 1,
301                JobStatus::Completed => completed += 1,
302                JobStatus::Failed => failed += 1,
303                JobStatus::Cancelled => {}
304            }
305            if let Some(wait) = job.wait_duration_ms() {
306                total_wait_ms += wait;
307                wait_count += 1;
308            }
309        }
310
311        let avg_wait_ms = if wait_count == 0 {
312            0.0
313        } else {
314            total_wait_ms as f64 / wait_count as f64
315        };
316
317        Self {
318            pending,
319            running,
320            completed,
321            failed,
322            avg_wait_ms,
323        }
324    }
325}
326
327/// A batch proxy request: transcode many sources with the same spec.
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ProxyBatchRequest {
330    /// Source file paths to transcode.
331    pub source_paths: Vec<String>,
332    /// Proxy specification to apply to all sources.
333    pub spec: ProxySpec,
334    /// Maximum number of concurrent transcode jobs.
335    pub concurrent_limit: u32,
336}
337
338impl ProxyBatchRequest {
339    /// Create a new batch request.
340    #[must_use]
341    pub fn new(source_paths: Vec<String>, spec: ProxySpec, concurrent_limit: u32) -> Self {
342        Self {
343            source_paths,
344            spec,
345            concurrent_limit,
346        }
347    }
348
349    /// Estimate the total time in minutes to transcode `items` files at `fps` frames-per-second.
350    ///
351    /// Assumes each source file has ~1 minute of content at `fps` fps, and that
352    /// the transcode runs at 2× real-time per concurrent slot.
353    #[must_use]
354    pub fn estimate_duration_mins(items: usize, fps: f32) -> f32 {
355        if fps <= 0.0 || items == 0 {
356            return 0.0;
357        }
358        // Simple model: each item takes 0.5 real-time minutes (2× speed),
359        // batched by concurrent_limit which defaults to 1 here for the pure fn.
360        let per_item_mins = 1.0 / 2.0; // 1 minute of source → 0.5 min at 2× speed
361                                       // fps affects CPU cost (higher fps = proportionally more work)
362        let fps_factor = fps / 25.0; // normalise to 25 fps
363        items as f32 * per_item_mins * fps_factor
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn make_request(id: &str, priority: u8, submitted_at_ms: u64) -> ProxyRequest {
372        ProxyRequest::new(
373            id,
374            format!("/source/{id}.mov"),
375            ProxySpec::h264_hd(),
376            priority,
377            submitted_at_ms,
378        )
379    }
380
381    #[test]
382    fn test_proxy_spec_new() {
383        let spec = ProxySpec::new((1280, 720), "h264", 5_000);
384        assert_eq!(spec.resolution, (1280, 720));
385        assert_eq!(spec.codec, "h264");
386        assert_eq!(spec.bitrate_kbps, 5_000);
387    }
388
389    #[test]
390    fn test_proxy_spec_presets() {
391        let hd = ProxySpec::h264_hd();
392        assert_eq!(hd.codec, "h264");
393        assert_eq!(hd.resolution, (1920, 1080));
394
395        let prores = ProxySpec::prores_proxy();
396        assert_eq!(prores.codec, "prores_proxy");
397    }
398
399    #[test]
400    fn test_submit_returns_id() {
401        let mut queue = ProxyTranscodeQueue::new();
402        let req = make_request("job_001", 100, 1000);
403        let id = queue.submit(req);
404        assert_eq!(id, "job_001");
405        assert_eq!(queue.len(), 1);
406    }
407
408    #[test]
409    fn test_priority_ordering() {
410        let mut queue = ProxyTranscodeQueue::new();
411        queue.submit(make_request("low", 10, 1000));
412        queue.submit(make_request("high", 200, 2000));
413        queue.submit(make_request("mid", 100, 1500));
414
415        // next_job should return "high" (priority 200)
416        let next = queue.next_job().expect("should succeed in test");
417        assert_eq!(next.request.id, "high");
418    }
419
420    #[test]
421    fn test_complete_job() {
422        let mut queue = ProxyTranscodeQueue::new();
423        queue.submit(make_request("j1", 50, 1000));
424        queue.start_job("j1", 1100);
425        queue.complete_job("j1", "/proxy/j1.mp4");
426        let job = queue.get("j1").expect("should succeed in test");
427        assert_eq!(job.status, JobStatus::Completed);
428        assert_eq!(job.output_path.as_deref(), Some("/proxy/j1.mp4"));
429    }
430
431    #[test]
432    fn test_fail_job() {
433        let mut queue = ProxyTranscodeQueue::new();
434        queue.submit(make_request("j2", 50, 1000));
435        queue.start_job("j2", 1100);
436        queue.fail_job("j2", "codec error");
437        let job = queue.get("j2").expect("should succeed in test");
438        assert_eq!(job.status, JobStatus::Failed);
439        assert_eq!(job.error.as_deref(), Some("codec error"));
440    }
441
442    #[test]
443    fn test_cancel_job() {
444        let mut queue = ProxyTranscodeQueue::new();
445        queue.submit(make_request("j3", 50, 1000));
446        queue.cancel_job("j3");
447        let job = queue.get("j3").expect("should succeed in test");
448        assert_eq!(job.status, JobStatus::Cancelled);
449    }
450
451    #[test]
452    fn test_queue_stats() {
453        let mut queue = ProxyTranscodeQueue::new();
454        queue.submit(make_request("a", 10, 0));
455        queue.submit(make_request("b", 10, 0));
456        queue.submit(make_request("c", 10, 0));
457        queue.start_job("a", 100);
458        queue.complete_job("a", "/out/a.mp4");
459        queue.start_job("b", 100);
460        queue.fail_job("b", "err");
461
462        let stats = QueueStats::compute(&queue);
463        assert_eq!(stats.pending, 1);
464        assert_eq!(stats.completed, 1);
465        assert_eq!(stats.failed, 1);
466    }
467
468    #[test]
469    fn test_queue_is_empty() {
470        let queue = ProxyTranscodeQueue::new();
471        assert!(queue.is_empty());
472    }
473
474    #[test]
475    fn test_wait_duration_ms() {
476        let mut queue = ProxyTranscodeQueue::new();
477        queue.submit(make_request("w", 10, 1000));
478        queue.start_job("w", 2000);
479        let job = queue.get("w").expect("should succeed in test");
480        assert_eq!(job.wait_duration_ms(), Some(1000));
481    }
482
483    #[test]
484    fn test_batch_estimate_duration_zero_fps() {
485        assert!((ProxyBatchRequest::estimate_duration_mins(10, 0.0) - 0.0).abs() < f32::EPSILON);
486    }
487
488    #[test]
489    fn test_batch_estimate_duration_zero_items() {
490        assert!((ProxyBatchRequest::estimate_duration_mins(0, 25.0) - 0.0).abs() < f32::EPSILON);
491    }
492
493    #[test]
494    fn test_batch_estimate_duration_positive() {
495        let mins = ProxyBatchRequest::estimate_duration_mins(10, 25.0);
496        assert!(mins > 0.0);
497    }
498}