1#![allow(dead_code)]
11
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14use std::collections::VecDeque;
15use std::sync::{Arc, Mutex};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ProxySpec {
20 pub resolution: (u32, u32),
22 pub codec: String,
24 pub bitrate_kbps: u32,
26}
27
28impl ProxySpec {
29 #[must_use]
31 pub fn new(resolution: (u32, u32), codec: impl Into<String>, bitrate_kbps: u32) -> Self {
32 Self {
33 resolution,
34 codec: codec.into(),
35 bitrate_kbps,
36 }
37 }
38
39 #[must_use]
41 pub fn h264_hd() -> Self {
42 Self::new((1920, 1080), "h264", 8_000)
43 }
44
45 #[must_use]
47 pub fn prores_proxy() -> Self {
48 Self::new((1920, 1080), "prores_proxy", 45_000)
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ProxyRequest {
55 pub id: String,
57 pub source_path: String,
59 pub proxy_spec: ProxySpec,
61 pub priority: u8,
63 pub submitted_at_ms: u64,
65}
66
67impl ProxyRequest {
68 #[must_use]
70 pub fn new(
71 id: impl Into<String>,
72 source_path: impl Into<String>,
73 proxy_spec: ProxySpec,
74 priority: u8,
75 submitted_at_ms: u64,
76 ) -> Self {
77 Self {
78 id: id.into(),
79 source_path: source_path.into(),
80 proxy_spec,
81 priority,
82 submitted_at_ms,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
89pub enum JobStatus {
90 Queued,
92 Running,
94 Completed,
96 Failed,
98 Cancelled,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct ProxyTranscodeJob {
105 pub request: ProxyRequest,
107 pub status: JobStatus,
109 pub started_at_ms: Option<u64>,
111 pub completed_at_ms: Option<u64>,
113 pub output_path: Option<String>,
115 pub error: Option<String>,
117}
118
119impl ProxyTranscodeJob {
120 #[must_use]
122 pub fn new(request: ProxyRequest) -> Self {
123 Self {
124 request,
125 status: JobStatus::Queued,
126 started_at_ms: None,
127 completed_at_ms: None,
128 output_path: None,
129 error: None,
130 }
131 }
132
133 #[must_use]
135 pub fn wait_duration_ms(&self) -> Option<u64> {
136 self.started_at_ms
137 .map(|start| start.saturating_sub(self.request.submitted_at_ms))
138 }
139
140 #[must_use]
142 pub fn processing_duration_ms(&self) -> Option<u64> {
143 match (self.started_at_ms, self.completed_at_ms) {
144 (Some(start), Some(end)) => Some(end.saturating_sub(start)),
145 _ => None,
146 }
147 }
148}
149
150#[derive(Debug, Default)]
155pub struct ProxyTranscodeQueue {
156 jobs: std::collections::HashMap<String, ProxyTranscodeJob>,
158 order: VecDeque<String>,
160}
161
162impl ProxyTranscodeQueue {
163 #[must_use]
165 pub fn new() -> Self {
166 Self::default()
167 }
168
169 pub fn submit(&mut self, request: ProxyRequest) -> String {
171 let id = request.id.clone();
172 let job = ProxyTranscodeJob::new(request);
173 let priority = job.request.priority;
175 let submitted = job.request.submitted_at_ms;
176 let pos = self
178 .order
179 .iter()
180 .position(|existing_id| {
181 if let Some(j) = self.jobs.get(existing_id) {
182 let ep = j.request.priority;
183 let es = j.request.submitted_at_ms;
184 ep < priority || (ep == priority && es > submitted)
187 } else {
188 false
189 }
190 })
191 .unwrap_or(self.order.len());
192 self.order.insert(pos, id.clone());
193 self.jobs.insert(id.clone(), job);
194 id
195 }
196
197 #[must_use]
199 pub fn next_job(&mut self) -> Option<&mut ProxyTranscodeJob> {
200 let next_id = self
202 .order
203 .iter()
204 .find(|id| {
205 self.jobs
206 .get(*id)
207 .map(|j| j.status == JobStatus::Queued)
208 .unwrap_or(false)
209 })
210 .cloned();
211 next_id.and_then(|id| self.jobs.get_mut(&id))
212 }
213
214 pub fn start_job(&mut self, id: &str, started_at_ms: u64) {
216 if let Some(job) = self.jobs.get_mut(id) {
217 if job.status == JobStatus::Queued {
218 job.status = JobStatus::Running;
219 job.started_at_ms = Some(started_at_ms);
220 }
221 }
222 }
223
224 pub fn complete_job(&mut self, id: &str, output: &str) {
226 if let Some(job) = self.jobs.get_mut(id) {
227 job.status = JobStatus::Completed;
228 job.output_path = Some(output.to_string());
229 if job.completed_at_ms.is_none() {
231 job.completed_at_ms = job.started_at_ms.map(|s| s + 1);
232 }
233 }
234 }
235
236 pub fn fail_job(&mut self, id: &str, error: &str) {
238 if let Some(job) = self.jobs.get_mut(id) {
239 job.status = JobStatus::Failed;
240 job.error = Some(error.to_string());
241 }
242 }
243
244 pub fn cancel_job(&mut self, id: &str) {
246 if let Some(job) = self.jobs.get_mut(id) {
247 if job.status == JobStatus::Queued {
248 job.status = JobStatus::Cancelled;
249 }
250 }
251 }
252
253 #[must_use]
255 pub fn get(&self, id: &str) -> Option<&ProxyTranscodeJob> {
256 self.jobs.get(id)
257 }
258
259 #[must_use]
261 pub fn len(&self) -> usize {
262 self.jobs.len()
263 }
264
265 #[must_use]
267 pub fn is_empty(&self) -> bool {
268 self.jobs.is_empty()
269 }
270
271 pub fn iter(&self) -> impl Iterator<Item = &ProxyTranscodeJob> {
273 self.jobs.values()
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct QueueStats {
280 pub pending: u32,
282 pub running: u32,
284 pub completed: u64,
286 pub failed: u64,
288 pub avg_wait_ms: f64,
290}
291
292impl QueueStats {
293 #[must_use]
295 pub fn compute(queue: &ProxyTranscodeQueue) -> Self {
296 let mut pending = 0u32;
297 let mut running = 0u32;
298 let mut completed = 0u64;
299 let mut failed = 0u64;
300 let mut total_wait_ms = 0u64;
301 let mut wait_count = 0u32;
302
303 for job in queue.iter() {
304 match job.status {
305 JobStatus::Queued => pending += 1,
306 JobStatus::Running => running += 1,
307 JobStatus::Completed => completed += 1,
308 JobStatus::Failed => failed += 1,
309 JobStatus::Cancelled => {}
310 }
311 if let Some(wait) = job.wait_duration_ms() {
312 total_wait_ms += wait;
313 wait_count += 1;
314 }
315 }
316
317 let avg_wait_ms = if wait_count == 0 {
318 0.0
319 } else {
320 total_wait_ms as f64 / wait_count as f64
321 };
322
323 Self {
324 pending,
325 running,
326 completed,
327 failed,
328 avg_wait_ms,
329 }
330 }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct ProxyBatchRequest {
336 pub source_paths: Vec<String>,
338 pub spec: ProxySpec,
340 pub concurrent_limit: u32,
342}
343
344impl ProxyBatchRequest {
345 #[must_use]
347 pub fn new(source_paths: Vec<String>, spec: ProxySpec, concurrent_limit: u32) -> Self {
348 Self {
349 source_paths,
350 spec,
351 concurrent_limit,
352 }
353 }
354
355 #[must_use]
360 pub fn estimate_duration_mins(items: usize, fps: f32) -> f32 {
361 if fps <= 0.0 || items == 0 {
362 return 0.0;
363 }
364 let per_item_mins = 1.0 / 2.0; let fps_factor = fps / 25.0; items as f32 * per_item_mins * fps_factor
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 fn make_request(id: &str, priority: u8, submitted_at_ms: u64) -> ProxyRequest {
378 ProxyRequest::new(
379 id,
380 format!("/source/{id}.mov"),
381 ProxySpec::h264_hd(),
382 priority,
383 submitted_at_ms,
384 )
385 }
386
387 #[test]
388 fn test_proxy_spec_new() {
389 let spec = ProxySpec::new((1280, 720), "h264", 5_000);
390 assert_eq!(spec.resolution, (1280, 720));
391 assert_eq!(spec.codec, "h264");
392 assert_eq!(spec.bitrate_kbps, 5_000);
393 }
394
395 #[test]
396 fn test_proxy_spec_presets() {
397 let hd = ProxySpec::h264_hd();
398 assert_eq!(hd.codec, "h264");
399 assert_eq!(hd.resolution, (1920, 1080));
400
401 let prores = ProxySpec::prores_proxy();
402 assert_eq!(prores.codec, "prores_proxy");
403 }
404
405 #[test]
406 fn test_submit_returns_id() {
407 let mut queue = ProxyTranscodeQueue::new();
408 let req = make_request("job_001", 100, 1000);
409 let id = queue.submit(req);
410 assert_eq!(id, "job_001");
411 assert_eq!(queue.len(), 1);
412 }
413
414 #[test]
415 fn test_priority_ordering() {
416 let mut queue = ProxyTranscodeQueue::new();
417 queue.submit(make_request("low", 10, 1000));
418 queue.submit(make_request("high", 200, 2000));
419 queue.submit(make_request("mid", 100, 1500));
420
421 let next = queue.next_job().expect("should succeed in test");
423 assert_eq!(next.request.id, "high");
424 }
425
426 #[test]
427 fn test_complete_job() {
428 let mut queue = ProxyTranscodeQueue::new();
429 queue.submit(make_request("j1", 50, 1000));
430 queue.start_job("j1", 1100);
431 queue.complete_job("j1", "/proxy/j1.mp4");
432 let job = queue.get("j1").expect("should succeed in test");
433 assert_eq!(job.status, JobStatus::Completed);
434 assert_eq!(job.output_path.as_deref(), Some("/proxy/j1.mp4"));
435 }
436
437 #[test]
438 fn test_fail_job() {
439 let mut queue = ProxyTranscodeQueue::new();
440 queue.submit(make_request("j2", 50, 1000));
441 queue.start_job("j2", 1100);
442 queue.fail_job("j2", "codec error");
443 let job = queue.get("j2").expect("should succeed in test");
444 assert_eq!(job.status, JobStatus::Failed);
445 assert_eq!(job.error.as_deref(), Some("codec error"));
446 }
447
448 #[test]
449 fn test_cancel_job() {
450 let mut queue = ProxyTranscodeQueue::new();
451 queue.submit(make_request("j3", 50, 1000));
452 queue.cancel_job("j3");
453 let job = queue.get("j3").expect("should succeed in test");
454 assert_eq!(job.status, JobStatus::Cancelled);
455 }
456
457 #[test]
458 fn test_queue_stats() {
459 let mut queue = ProxyTranscodeQueue::new();
460 queue.submit(make_request("a", 10, 0));
461 queue.submit(make_request("b", 10, 0));
462 queue.submit(make_request("c", 10, 0));
463 queue.start_job("a", 100);
464 queue.complete_job("a", "/out/a.mp4");
465 queue.start_job("b", 100);
466 queue.fail_job("b", "err");
467
468 let stats = QueueStats::compute(&queue);
469 assert_eq!(stats.pending, 1);
470 assert_eq!(stats.completed, 1);
471 assert_eq!(stats.failed, 1);
472 }
473
474 #[test]
475 fn test_queue_is_empty() {
476 let queue = ProxyTranscodeQueue::new();
477 assert!(queue.is_empty());
478 }
479
480 #[test]
481 fn test_wait_duration_ms() {
482 let mut queue = ProxyTranscodeQueue::new();
483 queue.submit(make_request("w", 10, 1000));
484 queue.start_job("w", 2000);
485 let job = queue.get("w").expect("should succeed in test");
486 assert_eq!(job.wait_duration_ms(), Some(1000));
487 }
488
489 #[test]
490 fn test_batch_estimate_duration_zero_fps() {
491 assert!((ProxyBatchRequest::estimate_duration_mins(10, 0.0) - 0.0).abs() < f32::EPSILON);
492 }
493
494 #[test]
495 fn test_batch_estimate_duration_zero_items() {
496 assert!((ProxyBatchRequest::estimate_duration_mins(0, 25.0) - 0.0).abs() < f32::EPSILON);
497 }
498
499 #[test]
500 fn test_batch_estimate_duration_positive() {
501 let mins = ProxyBatchRequest::estimate_duration_mins(10, 25.0);
502 assert!(mins > 0.0);
503 }
504}
505
506#[derive(Debug, Clone)]
512pub struct ParallelJobResult {
513 pub id: String,
515 pub success: bool,
517 pub output_path: Option<String>,
519 pub error: Option<String>,
521 pub duration_ms: u64,
523}
524
525pub type TranscodeFn =
531 Arc<dyn Fn(&str, &ProxySpec) -> std::result::Result<String, String> + Send + Sync>;
532
533#[derive(Clone)]
535pub struct ParallelExecutorConfig {
536 pub thread_count: usize,
539 pub transcode_fn: TranscodeFn,
541}
542
543impl ParallelExecutorConfig {
544 pub fn new(thread_count: usize, transcode_fn: TranscodeFn) -> Self {
546 Self {
547 thread_count,
548 transcode_fn,
549 }
550 }
551
552 #[must_use]
554 pub fn stub() -> Self {
555 let fn_: TranscodeFn = Arc::new(|src, spec| {
556 Ok(format!(
557 "/proxy/{}_{}x{}.mp4",
558 std::path::Path::new(src)
559 .file_stem()
560 .and_then(|s| s.to_str())
561 .unwrap_or("clip"),
562 spec.resolution.0,
563 spec.resolution.1,
564 ))
565 });
566 Self {
567 thread_count: 0,
568 transcode_fn: fn_,
569 }
570 }
571}
572
573pub struct ParallelTranscodeExecutor {
579 config: ParallelExecutorConfig,
580}
581
582impl ParallelTranscodeExecutor {
583 pub fn new(config: ParallelExecutorConfig) -> Self {
585 Self { config }
586 }
587
588 pub fn execute_batch(&self, queue: &ProxyTranscodeQueue) -> Vec<ParallelJobResult> {
599 let jobs: Vec<(String, String, ProxySpec)> = queue
601 .iter()
602 .filter(|j| j.status == JobStatus::Queued)
603 .map(|j| {
604 (
605 j.request.id.clone(),
606 j.request.source_path.clone(),
607 j.request.proxy_spec.clone(),
608 )
609 })
610 .collect();
611
612 let transcode_fn = Arc::clone(&self.config.transcode_fn);
613 let results: Arc<Mutex<Vec<ParallelJobResult>>> = Arc::new(Mutex::new(Vec::new()));
614
615 let threads = if self.config.thread_count == 0 {
616 num_cpus::get()
617 } else {
618 self.config.thread_count
619 };
620
621 let pool_opt: Option<rayon::ThreadPool> = rayon::ThreadPoolBuilder::new()
626 .num_threads(threads)
627 .build()
628 .ok();
629
630 let results_clone = Arc::clone(&results);
631 let run_jobs = move || {
632 jobs.par_iter().for_each(|(id, src, spec)| {
633 let start = std::time::Instant::now();
634 let outcome = (transcode_fn)(src, spec);
635 let duration_ms = start.elapsed().as_millis() as u64;
636
637 let result = match outcome {
638 Ok(out) => ParallelJobResult {
639 id: id.clone(),
640 success: true,
641 output_path: Some(out),
642 error: None,
643 duration_ms,
644 },
645 Err(e) => ParallelJobResult {
646 id: id.clone(),
647 success: false,
648 output_path: None,
649 error: Some(e),
650 duration_ms,
651 },
652 };
653
654 if let Ok(mut guard) = results_clone.lock() {
655 guard.push(result);
656 }
657 });
658 };
659
660 match pool_opt {
661 Some(pool) => pool.install(run_jobs),
662 None => run_jobs(),
664 }
665
666 Arc::try_unwrap(results)
667 .ok()
668 .and_then(|m| m.into_inner().ok())
669 .unwrap_or_default()
670 }
671
672 pub fn apply_results(queue: &mut ProxyTranscodeQueue, results: &[ParallelJobResult]) {
674 for r in results {
675 if r.success {
676 queue.start_job(&r.id, 0);
677 queue.complete_job(&r.id, r.output_path.as_deref().unwrap_or(""));
678 } else {
679 queue.start_job(&r.id, 0);
680 queue.fail_job(&r.id, r.error.as_deref().unwrap_or("unknown error"));
681 }
682 }
683 }
684}
685
686#[cfg(test)]
687mod parallel_tests {
688 use super::*;
689
690 fn make_queue(n: usize) -> ProxyTranscodeQueue {
691 let mut q = ProxyTranscodeQueue::new();
692 for i in 0..n {
693 q.submit(ProxyRequest::new(
694 format!("job_{i}"),
695 format!("/src/clip_{i}.mov"),
696 ProxySpec::h264_hd(),
697 100,
698 i as u64 * 10,
699 ));
700 }
701 q
702 }
703
704 #[test]
705 fn test_parallel_executor_all_succeed() {
706 let config = ParallelExecutorConfig::stub();
707 let executor = ParallelTranscodeExecutor::new(config);
708 let queue = make_queue(8);
709
710 let results = executor.execute_batch(&queue);
711 assert_eq!(results.len(), 8);
712 assert!(results.iter().all(|r| r.success));
713 assert!(results.iter().all(|r| r.output_path.is_some()));
714 }
715
716 #[test]
717 fn test_parallel_executor_error_fn() {
718 let fail_fn: TranscodeFn =
719 Arc::new(|_src, _spec| Err("simulated transcode failure".to_string()));
720 let config = ParallelExecutorConfig::new(2, fail_fn);
721 let executor = ParallelTranscodeExecutor::new(config);
722 let queue = make_queue(4);
723
724 let results = executor.execute_batch(&queue);
725 assert_eq!(results.len(), 4);
726 assert!(results.iter().all(|r| !r.success));
727 assert!(results.iter().all(|r| r.error.is_some()));
728 }
729
730 #[test]
731 fn test_parallel_executor_empty_queue() {
732 let config = ParallelExecutorConfig::stub();
733 let executor = ParallelTranscodeExecutor::new(config);
734 let queue = ProxyTranscodeQueue::new();
735
736 let results = executor.execute_batch(&queue);
737 assert!(results.is_empty());
738 }
739
740 #[test]
741 fn test_apply_results_updates_statuses() {
742 let config = ParallelExecutorConfig::stub();
743 let executor = ParallelTranscodeExecutor::new(config);
744 let mut queue = make_queue(3);
745
746 let results = executor.execute_batch(&queue);
747 ParallelTranscodeExecutor::apply_results(&mut queue, &results);
748
749 let stats = QueueStats::compute(&queue);
750 assert_eq!(stats.completed, 3);
751 assert_eq!(stats.pending, 0);
752 }
753
754 #[test]
755 fn test_apply_results_marks_failures() {
756 let fail_fn: TranscodeFn = Arc::new(|_src, _spec| Err("err".to_string()));
757 let config = ParallelExecutorConfig::new(1, fail_fn);
758 let executor = ParallelTranscodeExecutor::new(config);
759 let mut queue = make_queue(2);
760
761 let results = executor.execute_batch(&queue);
762 ParallelTranscodeExecutor::apply_results(&mut queue, &results);
763
764 let stats = QueueStats::compute(&queue);
765 assert_eq!(stats.failed, 2);
766 }
767
768 #[test]
769 fn test_parallel_result_has_duration() {
770 let config = ParallelExecutorConfig::stub();
771 let executor = ParallelTranscodeExecutor::new(config);
772 let queue = make_queue(2);
773 let results = executor.execute_batch(&queue);
774 assert!(results.iter().all(|r| r.duration_ms < u64::MAX));
776 }
777
778 #[test]
779 fn test_parallel_skips_non_queued_jobs() {
780 let config = ParallelExecutorConfig::stub();
781 let executor = ParallelTranscodeExecutor::new(config);
782 let mut queue = make_queue(4);
783 queue.cancel_job("job_0");
785 queue.cancel_job("job_1");
786
787 let results = executor.execute_batch(&queue);
788 assert_eq!(results.len(), 2);
790 }
791}