Skip to main content

oximedia_proxy/
proxy_scheduler.rs

1//! Proxy task scheduler for managing concurrent proxy generation jobs.
2#![allow(dead_code)]
3
4use std::collections::VecDeque;
5
6/// Configuration for the proxy scheduler.
7#[derive(Debug, Clone)]
8pub struct ProxySchedulerConfig {
9    /// Maximum number of concurrent proxy generation tasks.
10    pub max_concurrent: usize,
11    /// Maximum queue depth before new tasks are rejected.
12    pub max_queue_depth: usize,
13    /// Default priority for tasks that don't specify one.
14    pub default_priority: u8,
15}
16
17impl ProxySchedulerConfig {
18    /// Create a new scheduler config.
19    pub fn new(max_concurrent: usize) -> Self {
20        Self {
21            max_concurrent,
22            max_queue_depth: 256,
23            default_priority: 50,
24        }
25    }
26
27    /// Return the maximum number of concurrent tasks.
28    pub fn max_concurrent(&self) -> usize {
29        self.max_concurrent
30    }
31}
32
33impl Default for ProxySchedulerConfig {
34    fn default() -> Self {
35        Self::new(4)
36    }
37}
38
39/// A single proxy generation task.
40#[derive(Debug, Clone)]
41pub struct ProxyTask {
42    /// Unique task identifier.
43    pub id: u64,
44    /// Source file path.
45    pub source_path: String,
46    /// Destination file path.
47    pub dest_path: String,
48    /// Estimated duration in seconds.
49    pub estimated_secs: f64,
50    /// Task priority (0 = lowest, 255 = highest).
51    pub priority: u8,
52}
53
54impl ProxyTask {
55    /// Create a new proxy task.
56    pub fn new(id: u64, source_path: &str, dest_path: &str, estimated_secs: f64) -> Self {
57        Self {
58            id,
59            source_path: source_path.to_owned(),
60            dest_path: dest_path.to_owned(),
61            estimated_secs,
62            priority: 50,
63        }
64    }
65
66    /// Return the estimated processing time in seconds.
67    pub fn estimated_secs(&self) -> f64 {
68        self.estimated_secs
69    }
70
71    /// Set task priority.
72    pub fn with_priority(mut self, priority: u8) -> Self {
73        self.priority = priority;
74        self
75    }
76}
77
78/// Statistics collected by the proxy scheduler.
79#[derive(Debug, Clone, Default)]
80pub struct ProxySchedulerStats {
81    /// Total tasks completed.
82    pub completed: u64,
83    /// Total wall-clock seconds elapsed since creation.
84    pub elapsed_secs: f64,
85    /// Total tasks that failed.
86    pub failed: u64,
87}
88
89impl ProxySchedulerStats {
90    /// Calculate approximate throughput as tasks per hour.
91    #[allow(clippy::cast_precision_loss)]
92    pub fn throughput_per_hour(&self) -> f64 {
93        if self.elapsed_secs <= 0.0 {
94            return 0.0;
95        }
96        (self.completed as f64 / self.elapsed_secs) * 3600.0
97    }
98
99    /// Return total tasks processed (completed + failed).
100    pub fn total_processed(&self) -> u64 {
101        self.completed + self.failed
102    }
103}
104
105/// Proxy task scheduler.
106#[derive(Debug)]
107pub struct ProxyScheduler {
108    config: ProxySchedulerConfig,
109    queue: VecDeque<ProxyTask>,
110    running: Vec<ProxyTask>,
111    stats: ProxySchedulerStats,
112    next_id: u64,
113}
114
115impl ProxyScheduler {
116    /// Create a new scheduler with the given config.
117    pub fn new(config: ProxySchedulerConfig) -> Self {
118        Self {
119            config,
120            queue: VecDeque::new(),
121            running: Vec::new(),
122            stats: ProxySchedulerStats::default(),
123            next_id: 1,
124        }
125    }
126
127    /// Submit a task to the scheduler queue. Returns the assigned task id.
128    pub fn submit(&mut self, source: &str, dest: &str, estimated_secs: f64) -> Option<u64> {
129        if self.queue.len() >= self.config.max_queue_depth {
130            return None;
131        }
132        let id = self.next_id;
133        self.next_id += 1;
134        let task = ProxyTask::new(id, source, dest, estimated_secs);
135        self.queue.push_back(task);
136        self.pump();
137        Some(id)
138    }
139
140    /// Advance running tasks — mark the first running task as complete.
141    /// In a real implementation this would be driven by async I/O callbacks.
142    pub fn complete_one(&mut self) {
143        if !self.running.is_empty() {
144            self.running.remove(0);
145            self.stats.completed += 1;
146            self.pump();
147        }
148    }
149
150    /// Internal: move queued tasks into running slots.
151    fn pump(&mut self) {
152        while self.running.len() < self.config.max_concurrent {
153            if let Some(task) = self.queue.pop_front() {
154                self.running.push(task);
155            } else {
156                break;
157            }
158        }
159    }
160
161    /// Return the number of currently running tasks.
162    pub fn running_count(&self) -> usize {
163        self.running.len()
164    }
165
166    /// Return the number of queued (pending) tasks.
167    pub fn queued_count(&self) -> usize {
168        self.queue.len()
169    }
170
171    /// Return a reference to scheduler statistics.
172    pub fn stats(&self) -> &ProxySchedulerStats {
173        &self.stats
174    }
175
176    /// Record elapsed time for statistics calculation.
177    pub fn record_elapsed(&mut self, secs: f64) {
178        self.stats.elapsed_secs += secs;
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    fn make_scheduler(max: usize) -> ProxyScheduler {
187        ProxyScheduler::new(ProxySchedulerConfig::new(max))
188    }
189
190    #[test]
191    fn test_config_max_concurrent() {
192        let cfg = ProxySchedulerConfig::new(8);
193        assert_eq!(cfg.max_concurrent(), 8);
194    }
195
196    #[test]
197    fn test_config_default() {
198        let cfg = ProxySchedulerConfig::default();
199        assert_eq!(cfg.max_concurrent(), 4);
200    }
201
202    #[test]
203    fn test_task_estimated_secs() {
204        let task = ProxyTask::new(1, "a.mov", "a_proxy.mp4", 42.5);
205        assert!((task.estimated_secs() - 42.5).abs() < 1e-9);
206    }
207
208    #[test]
209    fn test_task_priority_default() {
210        let task = ProxyTask::new(1, "a.mov", "b.mp4", 10.0);
211        assert_eq!(task.priority, 50);
212    }
213
214    #[test]
215    fn test_task_with_priority() {
216        let task = ProxyTask::new(1, "a.mov", "b.mp4", 10.0).with_priority(200);
217        assert_eq!(task.priority, 200);
218    }
219
220    #[test]
221    fn test_submit_increments_id() {
222        let mut sched = make_scheduler(2);
223        let id1 = sched
224            .submit("a.mov", "a_p.mp4", 5.0)
225            .expect("should succeed in test");
226        let id2 = sched
227            .submit("b.mov", "b_p.mp4", 5.0)
228            .expect("should succeed in test");
229        assert!(id2 > id1);
230    }
231
232    #[test]
233    fn test_submit_fills_running_slots() {
234        let mut sched = make_scheduler(2);
235        sched
236            .submit("a.mov", "a_p.mp4", 5.0)
237            .expect("should succeed in test");
238        sched
239            .submit("b.mov", "b_p.mp4", 5.0)
240            .expect("should succeed in test");
241        assert_eq!(sched.running_count(), 2);
242        assert_eq!(sched.queued_count(), 0);
243    }
244
245    #[test]
246    fn test_submit_queues_excess() {
247        let mut sched = make_scheduler(1);
248        sched
249            .submit("a.mov", "a_p.mp4", 5.0)
250            .expect("should succeed in test");
251        sched
252            .submit("b.mov", "b_p.mp4", 5.0)
253            .expect("should succeed in test");
254        assert_eq!(sched.running_count(), 1);
255        assert_eq!(sched.queued_count(), 1);
256    }
257
258    #[test]
259    fn test_complete_one_promotes_queued() {
260        let mut sched = make_scheduler(1);
261        sched
262            .submit("a.mov", "a_p.mp4", 5.0)
263            .expect("should succeed in test");
264        sched
265            .submit("b.mov", "b_p.mp4", 5.0)
266            .expect("should succeed in test");
267        sched.complete_one();
268        assert_eq!(sched.running_count(), 1);
269        assert_eq!(sched.queued_count(), 0);
270        assert_eq!(sched.stats().completed, 1);
271    }
272
273    #[test]
274    fn test_queue_depth_limit() {
275        let mut cfg = ProxySchedulerConfig::new(1);
276        cfg.max_queue_depth = 2;
277        let mut sched = ProxyScheduler::new(cfg);
278        sched
279            .submit("a.mov", "a_p.mp4", 1.0)
280            .expect("should succeed in test"); // running
281        sched
282            .submit("b.mov", "b_p.mp4", 1.0)
283            .expect("should succeed in test"); // queue slot 1
284        sched
285            .submit("c.mov", "c_p.mp4", 1.0)
286            .expect("should succeed in test"); // queue slot 2
287        let rejected = sched.submit("d.mov", "d_p.mp4", 1.0);
288        assert!(rejected.is_none());
289    }
290
291    #[test]
292    fn test_stats_throughput_zero_elapsed() {
293        let stats = ProxySchedulerStats::default();
294        assert!((stats.throughput_per_hour() - 0.0).abs() < 1e-9);
295    }
296
297    #[test]
298    fn test_stats_throughput_nonzero() {
299        let stats = ProxySchedulerStats {
300            completed: 3600,
301            elapsed_secs: 3600.0,
302            failed: 0,
303        };
304        assert!((stats.throughput_per_hour() - 3600.0).abs() < 1e-3);
305    }
306
307    #[test]
308    fn test_stats_total_processed() {
309        let stats = ProxySchedulerStats {
310            completed: 10,
311            elapsed_secs: 60.0,
312            failed: 3,
313        };
314        assert_eq!(stats.total_processed(), 13);
315    }
316
317    #[test]
318    fn test_record_elapsed_accumulates() {
319        let mut sched = make_scheduler(2);
320        sched.record_elapsed(30.0);
321        sched.record_elapsed(30.0);
322        assert!((sched.stats().elapsed_secs - 60.0).abs() < 1e-9);
323    }
324}