1#![allow(dead_code)]
3
4use std::collections::VecDeque;
5
6#[derive(Debug, Clone)]
8pub struct ProxySchedulerConfig {
9 pub max_concurrent: usize,
11 pub max_queue_depth: usize,
13 pub default_priority: u8,
15}
16
17impl ProxySchedulerConfig {
18 pub fn new(max_concurrent: usize) -> Self {
20 Self {
21 max_concurrent,
22 max_queue_depth: 256,
23 default_priority: 50,
24 }
25 }
26
27 pub fn max_concurrent(&self) -> usize {
29 self.max_concurrent
30 }
31}
32
33impl Default for ProxySchedulerConfig {
34 fn default() -> Self {
35 Self::new(4)
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ProxyTask {
42 pub id: u64,
44 pub source_path: String,
46 pub dest_path: String,
48 pub estimated_secs: f64,
50 pub priority: u8,
52}
53
54impl ProxyTask {
55 pub fn new(id: u64, source_path: &str, dest_path: &str, estimated_secs: f64) -> Self {
57 Self {
58 id,
59 source_path: source_path.to_owned(),
60 dest_path: dest_path.to_owned(),
61 estimated_secs,
62 priority: 50,
63 }
64 }
65
66 pub fn estimated_secs(&self) -> f64 {
68 self.estimated_secs
69 }
70
71 pub fn with_priority(mut self, priority: u8) -> Self {
73 self.priority = priority;
74 self
75 }
76}
77
78#[derive(Debug, Clone, Default)]
80pub struct ProxySchedulerStats {
81 pub completed: u64,
83 pub elapsed_secs: f64,
85 pub failed: u64,
87}
88
89impl ProxySchedulerStats {
90 #[allow(clippy::cast_precision_loss)]
92 pub fn throughput_per_hour(&self) -> f64 {
93 if self.elapsed_secs <= 0.0 {
94 return 0.0;
95 }
96 (self.completed as f64 / self.elapsed_secs) * 3600.0
97 }
98
99 pub fn total_processed(&self) -> u64 {
101 self.completed + self.failed
102 }
103}
104
105#[derive(Debug)]
107pub struct ProxyScheduler {
108 config: ProxySchedulerConfig,
109 queue: VecDeque<ProxyTask>,
110 running: Vec<ProxyTask>,
111 stats: ProxySchedulerStats,
112 next_id: u64,
113}
114
115impl ProxyScheduler {
116 pub fn new(config: ProxySchedulerConfig) -> Self {
118 Self {
119 config,
120 queue: VecDeque::new(),
121 running: Vec::new(),
122 stats: ProxySchedulerStats::default(),
123 next_id: 1,
124 }
125 }
126
127 pub fn submit(&mut self, source: &str, dest: &str, estimated_secs: f64) -> Option<u64> {
129 if self.queue.len() >= self.config.max_queue_depth {
130 return None;
131 }
132 let id = self.next_id;
133 self.next_id += 1;
134 let task = ProxyTask::new(id, source, dest, estimated_secs);
135 self.queue.push_back(task);
136 self.pump();
137 Some(id)
138 }
139
140 pub fn complete_one(&mut self) {
143 if !self.running.is_empty() {
144 self.running.remove(0);
145 self.stats.completed += 1;
146 self.pump();
147 }
148 }
149
150 fn pump(&mut self) {
152 while self.running.len() < self.config.max_concurrent {
153 if let Some(task) = self.queue.pop_front() {
154 self.running.push(task);
155 } else {
156 break;
157 }
158 }
159 }
160
161 pub fn running_count(&self) -> usize {
163 self.running.len()
164 }
165
166 pub fn queued_count(&self) -> usize {
168 self.queue.len()
169 }
170
171 pub fn stats(&self) -> &ProxySchedulerStats {
173 &self.stats
174 }
175
176 pub fn record_elapsed(&mut self, secs: f64) {
178 self.stats.elapsed_secs += secs;
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 fn make_scheduler(max: usize) -> ProxyScheduler {
187 ProxyScheduler::new(ProxySchedulerConfig::new(max))
188 }
189
190 #[test]
191 fn test_config_max_concurrent() {
192 let cfg = ProxySchedulerConfig::new(8);
193 assert_eq!(cfg.max_concurrent(), 8);
194 }
195
196 #[test]
197 fn test_config_default() {
198 let cfg = ProxySchedulerConfig::default();
199 assert_eq!(cfg.max_concurrent(), 4);
200 }
201
202 #[test]
203 fn test_task_estimated_secs() {
204 let task = ProxyTask::new(1, "a.mov", "a_proxy.mp4", 42.5);
205 assert!((task.estimated_secs() - 42.5).abs() < 1e-9);
206 }
207
208 #[test]
209 fn test_task_priority_default() {
210 let task = ProxyTask::new(1, "a.mov", "b.mp4", 10.0);
211 assert_eq!(task.priority, 50);
212 }
213
214 #[test]
215 fn test_task_with_priority() {
216 let task = ProxyTask::new(1, "a.mov", "b.mp4", 10.0).with_priority(200);
217 assert_eq!(task.priority, 200);
218 }
219
220 #[test]
221 fn test_submit_increments_id() {
222 let mut sched = make_scheduler(2);
223 let id1 = sched
224 .submit("a.mov", "a_p.mp4", 5.0)
225 .expect("should succeed in test");
226 let id2 = sched
227 .submit("b.mov", "b_p.mp4", 5.0)
228 .expect("should succeed in test");
229 assert!(id2 > id1);
230 }
231
232 #[test]
233 fn test_submit_fills_running_slots() {
234 let mut sched = make_scheduler(2);
235 sched
236 .submit("a.mov", "a_p.mp4", 5.0)
237 .expect("should succeed in test");
238 sched
239 .submit("b.mov", "b_p.mp4", 5.0)
240 .expect("should succeed in test");
241 assert_eq!(sched.running_count(), 2);
242 assert_eq!(sched.queued_count(), 0);
243 }
244
245 #[test]
246 fn test_submit_queues_excess() {
247 let mut sched = make_scheduler(1);
248 sched
249 .submit("a.mov", "a_p.mp4", 5.0)
250 .expect("should succeed in test");
251 sched
252 .submit("b.mov", "b_p.mp4", 5.0)
253 .expect("should succeed in test");
254 assert_eq!(sched.running_count(), 1);
255 assert_eq!(sched.queued_count(), 1);
256 }
257
258 #[test]
259 fn test_complete_one_promotes_queued() {
260 let mut sched = make_scheduler(1);
261 sched
262 .submit("a.mov", "a_p.mp4", 5.0)
263 .expect("should succeed in test");
264 sched
265 .submit("b.mov", "b_p.mp4", 5.0)
266 .expect("should succeed in test");
267 sched.complete_one();
268 assert_eq!(sched.running_count(), 1);
269 assert_eq!(sched.queued_count(), 0);
270 assert_eq!(sched.stats().completed, 1);
271 }
272
273 #[test]
274 fn test_queue_depth_limit() {
275 let mut cfg = ProxySchedulerConfig::new(1);
276 cfg.max_queue_depth = 2;
277 let mut sched = ProxyScheduler::new(cfg);
278 sched
279 .submit("a.mov", "a_p.mp4", 1.0)
280 .expect("should succeed in test"); sched
282 .submit("b.mov", "b_p.mp4", 1.0)
283 .expect("should succeed in test"); sched
285 .submit("c.mov", "c_p.mp4", 1.0)
286 .expect("should succeed in test"); let rejected = sched.submit("d.mov", "d_p.mp4", 1.0);
288 assert!(rejected.is_none());
289 }
290
291 #[test]
292 fn test_stats_throughput_zero_elapsed() {
293 let stats = ProxySchedulerStats::default();
294 assert!((stats.throughput_per_hour() - 0.0).abs() < 1e-9);
295 }
296
297 #[test]
298 fn test_stats_throughput_nonzero() {
299 let stats = ProxySchedulerStats {
300 completed: 3600,
301 elapsed_secs: 3600.0,
302 failed: 0,
303 };
304 assert!((stats.throughput_per_hour() - 3600.0).abs() < 1e-3);
305 }
306
307 #[test]
308 fn test_stats_total_processed() {
309 let stats = ProxySchedulerStats {
310 completed: 10,
311 elapsed_secs: 60.0,
312 failed: 3,
313 };
314 assert_eq!(stats.total_processed(), 13);
315 }
316
317 #[test]
318 fn test_record_elapsed_accumulates() {
319 let mut sched = make_scheduler(2);
320 sched.record_elapsed(30.0);
321 sched.record_elapsed(30.0);
322 assert!((sched.stats().elapsed_secs - 60.0).abs() < 1e-9);
323 }
324}