1#![allow(dead_code)]
7
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProxySpec {
14 pub resolution: (u32, u32),
16 pub codec: String,
18 pub bitrate_kbps: u32,
20}
21
22impl ProxySpec {
23 #[must_use]
25 pub fn new(resolution: (u32, u32), codec: impl Into<String>, bitrate_kbps: u32) -> Self {
26 Self {
27 resolution,
28 codec: codec.into(),
29 bitrate_kbps,
30 }
31 }
32
33 #[must_use]
35 pub fn h264_hd() -> Self {
36 Self::new((1920, 1080), "h264", 8_000)
37 }
38
39 #[must_use]
41 pub fn prores_proxy() -> Self {
42 Self::new((1920, 1080), "prores_proxy", 45_000)
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ProxyRequest {
49 pub id: String,
51 pub source_path: String,
53 pub proxy_spec: ProxySpec,
55 pub priority: u8,
57 pub submitted_at_ms: u64,
59}
60
61impl ProxyRequest {
62 #[must_use]
64 pub fn new(
65 id: impl Into<String>,
66 source_path: impl Into<String>,
67 proxy_spec: ProxySpec,
68 priority: u8,
69 submitted_at_ms: u64,
70 ) -> Self {
71 Self {
72 id: id.into(),
73 source_path: source_path.into(),
74 proxy_spec,
75 priority,
76 submitted_at_ms,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum JobStatus {
84 Queued,
86 Running,
88 Completed,
90 Failed,
92 Cancelled,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ProxyTranscodeJob {
99 pub request: ProxyRequest,
101 pub status: JobStatus,
103 pub started_at_ms: Option<u64>,
105 pub completed_at_ms: Option<u64>,
107 pub output_path: Option<String>,
109 pub error: Option<String>,
111}
112
113impl ProxyTranscodeJob {
114 #[must_use]
116 pub fn new(request: ProxyRequest) -> Self {
117 Self {
118 request,
119 status: JobStatus::Queued,
120 started_at_ms: None,
121 completed_at_ms: None,
122 output_path: None,
123 error: None,
124 }
125 }
126
127 #[must_use]
129 pub fn wait_duration_ms(&self) -> Option<u64> {
130 self.started_at_ms
131 .map(|start| start.saturating_sub(self.request.submitted_at_ms))
132 }
133
134 #[must_use]
136 pub fn processing_duration_ms(&self) -> Option<u64> {
137 match (self.started_at_ms, self.completed_at_ms) {
138 (Some(start), Some(end)) => Some(end.saturating_sub(start)),
139 _ => None,
140 }
141 }
142}
143
144#[derive(Debug, Default)]
149pub struct ProxyTranscodeQueue {
150 jobs: std::collections::HashMap<String, ProxyTranscodeJob>,
152 order: VecDeque<String>,
154}
155
156impl ProxyTranscodeQueue {
157 #[must_use]
159 pub fn new() -> Self {
160 Self::default()
161 }
162
163 pub fn submit(&mut self, request: ProxyRequest) -> String {
165 let id = request.id.clone();
166 let job = ProxyTranscodeJob::new(request);
167 let priority = job.request.priority;
169 let submitted = job.request.submitted_at_ms;
170 let pos = self
172 .order
173 .iter()
174 .position(|existing_id| {
175 if let Some(j) = self.jobs.get(existing_id) {
176 let ep = j.request.priority;
177 let es = j.request.submitted_at_ms;
178 ep < priority || (ep == priority && es > submitted)
181 } else {
182 false
183 }
184 })
185 .unwrap_or(self.order.len());
186 self.order.insert(pos, id.clone());
187 self.jobs.insert(id.clone(), job);
188 id
189 }
190
191 #[must_use]
193 pub fn next_job(&mut self) -> Option<&mut ProxyTranscodeJob> {
194 let next_id = self
196 .order
197 .iter()
198 .find(|id| {
199 self.jobs
200 .get(*id)
201 .map(|j| j.status == JobStatus::Queued)
202 .unwrap_or(false)
203 })
204 .cloned();
205 next_id.and_then(|id| self.jobs.get_mut(&id))
206 }
207
208 pub fn start_job(&mut self, id: &str, started_at_ms: u64) {
210 if let Some(job) = self.jobs.get_mut(id) {
211 if job.status == JobStatus::Queued {
212 job.status = JobStatus::Running;
213 job.started_at_ms = Some(started_at_ms);
214 }
215 }
216 }
217
218 pub fn complete_job(&mut self, id: &str, output: &str) {
220 if let Some(job) = self.jobs.get_mut(id) {
221 job.status = JobStatus::Completed;
222 job.output_path = Some(output.to_string());
223 if job.completed_at_ms.is_none() {
225 job.completed_at_ms = job.started_at_ms.map(|s| s + 1);
226 }
227 }
228 }
229
230 pub fn fail_job(&mut self, id: &str, error: &str) {
232 if let Some(job) = self.jobs.get_mut(id) {
233 job.status = JobStatus::Failed;
234 job.error = Some(error.to_string());
235 }
236 }
237
238 pub fn cancel_job(&mut self, id: &str) {
240 if let Some(job) = self.jobs.get_mut(id) {
241 if job.status == JobStatus::Queued {
242 job.status = JobStatus::Cancelled;
243 }
244 }
245 }
246
247 #[must_use]
249 pub fn get(&self, id: &str) -> Option<&ProxyTranscodeJob> {
250 self.jobs.get(id)
251 }
252
253 #[must_use]
255 pub fn len(&self) -> usize {
256 self.jobs.len()
257 }
258
259 #[must_use]
261 pub fn is_empty(&self) -> bool {
262 self.jobs.is_empty()
263 }
264
265 pub fn iter(&self) -> impl Iterator<Item = &ProxyTranscodeJob> {
267 self.jobs.values()
268 }
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct QueueStats {
274 pub pending: u32,
276 pub running: u32,
278 pub completed: u64,
280 pub failed: u64,
282 pub avg_wait_ms: f64,
284}
285
286impl QueueStats {
287 #[must_use]
289 pub fn compute(queue: &ProxyTranscodeQueue) -> Self {
290 let mut pending = 0u32;
291 let mut running = 0u32;
292 let mut completed = 0u64;
293 let mut failed = 0u64;
294 let mut total_wait_ms = 0u64;
295 let mut wait_count = 0u32;
296
297 for job in queue.iter() {
298 match job.status {
299 JobStatus::Queued => pending += 1,
300 JobStatus::Running => running += 1,
301 JobStatus::Completed => completed += 1,
302 JobStatus::Failed => failed += 1,
303 JobStatus::Cancelled => {}
304 }
305 if let Some(wait) = job.wait_duration_ms() {
306 total_wait_ms += wait;
307 wait_count += 1;
308 }
309 }
310
311 let avg_wait_ms = if wait_count == 0 {
312 0.0
313 } else {
314 total_wait_ms as f64 / wait_count as f64
315 };
316
317 Self {
318 pending,
319 running,
320 completed,
321 failed,
322 avg_wait_ms,
323 }
324 }
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ProxyBatchRequest {
330 pub source_paths: Vec<String>,
332 pub spec: ProxySpec,
334 pub concurrent_limit: u32,
336}
337
338impl ProxyBatchRequest {
339 #[must_use]
341 pub fn new(source_paths: Vec<String>, spec: ProxySpec, concurrent_limit: u32) -> Self {
342 Self {
343 source_paths,
344 spec,
345 concurrent_limit,
346 }
347 }
348
349 #[must_use]
354 pub fn estimate_duration_mins(items: usize, fps: f32) -> f32 {
355 if fps <= 0.0 || items == 0 {
356 return 0.0;
357 }
358 let per_item_mins = 1.0 / 2.0; let fps_factor = fps / 25.0; items as f32 * per_item_mins * fps_factor
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn make_request(id: &str, priority: u8, submitted_at_ms: u64) -> ProxyRequest {
372 ProxyRequest::new(
373 id,
374 format!("/source/{id}.mov"),
375 ProxySpec::h264_hd(),
376 priority,
377 submitted_at_ms,
378 )
379 }
380
381 #[test]
382 fn test_proxy_spec_new() {
383 let spec = ProxySpec::new((1280, 720), "h264", 5_000);
384 assert_eq!(spec.resolution, (1280, 720));
385 assert_eq!(spec.codec, "h264");
386 assert_eq!(spec.bitrate_kbps, 5_000);
387 }
388
389 #[test]
390 fn test_proxy_spec_presets() {
391 let hd = ProxySpec::h264_hd();
392 assert_eq!(hd.codec, "h264");
393 assert_eq!(hd.resolution, (1920, 1080));
394
395 let prores = ProxySpec::prores_proxy();
396 assert_eq!(prores.codec, "prores_proxy");
397 }
398
399 #[test]
400 fn test_submit_returns_id() {
401 let mut queue = ProxyTranscodeQueue::new();
402 let req = make_request("job_001", 100, 1000);
403 let id = queue.submit(req);
404 assert_eq!(id, "job_001");
405 assert_eq!(queue.len(), 1);
406 }
407
408 #[test]
409 fn test_priority_ordering() {
410 let mut queue = ProxyTranscodeQueue::new();
411 queue.submit(make_request("low", 10, 1000));
412 queue.submit(make_request("high", 200, 2000));
413 queue.submit(make_request("mid", 100, 1500));
414
415 let next = queue.next_job().expect("should succeed in test");
417 assert_eq!(next.request.id, "high");
418 }
419
420 #[test]
421 fn test_complete_job() {
422 let mut queue = ProxyTranscodeQueue::new();
423 queue.submit(make_request("j1", 50, 1000));
424 queue.start_job("j1", 1100);
425 queue.complete_job("j1", "/proxy/j1.mp4");
426 let job = queue.get("j1").expect("should succeed in test");
427 assert_eq!(job.status, JobStatus::Completed);
428 assert_eq!(job.output_path.as_deref(), Some("/proxy/j1.mp4"));
429 }
430
431 #[test]
432 fn test_fail_job() {
433 let mut queue = ProxyTranscodeQueue::new();
434 queue.submit(make_request("j2", 50, 1000));
435 queue.start_job("j2", 1100);
436 queue.fail_job("j2", "codec error");
437 let job = queue.get("j2").expect("should succeed in test");
438 assert_eq!(job.status, JobStatus::Failed);
439 assert_eq!(job.error.as_deref(), Some("codec error"));
440 }
441
442 #[test]
443 fn test_cancel_job() {
444 let mut queue = ProxyTranscodeQueue::new();
445 queue.submit(make_request("j3", 50, 1000));
446 queue.cancel_job("j3");
447 let job = queue.get("j3").expect("should succeed in test");
448 assert_eq!(job.status, JobStatus::Cancelled);
449 }
450
451 #[test]
452 fn test_queue_stats() {
453 let mut queue = ProxyTranscodeQueue::new();
454 queue.submit(make_request("a", 10, 0));
455 queue.submit(make_request("b", 10, 0));
456 queue.submit(make_request("c", 10, 0));
457 queue.start_job("a", 100);
458 queue.complete_job("a", "/out/a.mp4");
459 queue.start_job("b", 100);
460 queue.fail_job("b", "err");
461
462 let stats = QueueStats::compute(&queue);
463 assert_eq!(stats.pending, 1);
464 assert_eq!(stats.completed, 1);
465 assert_eq!(stats.failed, 1);
466 }
467
468 #[test]
469 fn test_queue_is_empty() {
470 let queue = ProxyTranscodeQueue::new();
471 assert!(queue.is_empty());
472 }
473
474 #[test]
475 fn test_wait_duration_ms() {
476 let mut queue = ProxyTranscodeQueue::new();
477 queue.submit(make_request("w", 10, 1000));
478 queue.start_job("w", 2000);
479 let job = queue.get("w").expect("should succeed in test");
480 assert_eq!(job.wait_duration_ms(), Some(1000));
481 }
482
483 #[test]
484 fn test_batch_estimate_duration_zero_fps() {
485 assert!((ProxyBatchRequest::estimate_duration_mins(10, 0.0) - 0.0).abs() < f32::EPSILON);
486 }
487
488 #[test]
489 fn test_batch_estimate_duration_zero_items() {
490 assert!((ProxyBatchRequest::estimate_duration_mins(0, 25.0) - 0.0).abs() < f32::EPSILON);
491 }
492
493 #[test]
494 fn test_batch_estimate_duration_positive() {
495 let mins = ProxyBatchRequest::estimate_duration_mins(10, 25.0);
496 assert!(mins > 0.0);
497 }
498}