1#![allow(dead_code)]
2use std::collections::VecDeque;
9use std::time::Instant;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct UploadId(
14 pub u64,
16);
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
20pub enum UploadPriority {
21 Low = 0,
23 #[default]
25 Normal = 1,
26 High = 2,
28 Critical = 3,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UploadState {
35 Queued,
37 Transferring,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47#[derive(Debug, Clone)]
49pub struct UploadTarget {
50 pub buffer_id: u64,
52 pub offset: usize,
54 pub buffer_size: usize,
56}
57
58impl UploadTarget {
59 #[must_use]
61 pub fn new(buffer_id: u64, offset: usize, buffer_size: usize) -> Self {
62 Self {
63 buffer_id,
64 offset,
65 buffer_size,
66 }
67 }
68
69 #[must_use]
71 pub fn fits(&self, data_size: usize) -> bool {
72 self.offset + data_size <= self.buffer_size
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct UploadRequest {
79 pub id: UploadId,
81 pub data: Vec<u8>,
83 pub target: UploadTarget,
85 pub priority: UploadPriority,
87 pub state: UploadState,
89 pub enqueue_time: Instant,
91 pub complete_time: Option<Instant>,
93}
94
95impl UploadRequest {
96 #[must_use]
98 pub fn new(
99 id: UploadId,
100 data: Vec<u8>,
101 target: UploadTarget,
102 priority: UploadPriority,
103 ) -> Self {
104 Self {
105 id,
106 data,
107 target,
108 priority,
109 state: UploadState::Queued,
110 enqueue_time: Instant::now(),
111 complete_time: None,
112 }
113 }
114
115 #[must_use]
117 pub fn data_size(&self) -> usize {
118 self.data.len()
119 }
120
121 #[must_use]
123 pub fn latency(&self) -> Option<std::time::Duration> {
124 self.complete_time
125 .map(|t| t.duration_since(self.enqueue_time))
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct UploadQueueConfig {
132 pub max_pending: usize,
134 pub max_queued_bytes: usize,
136 pub flush_batch_size: usize,
138 pub priority_sort: bool,
140}
141
142impl Default for UploadQueueConfig {
143 fn default() -> Self {
144 Self {
145 max_pending: 256,
146 max_queued_bytes: 256 * 1024 * 1024, flush_batch_size: 32,
148 priority_sort: true,
149 }
150 }
151}
152
153#[derive(Debug, Clone, Default)]
155pub struct UploadQueueStats {
156 pub queued_count: usize,
158 pub queued_bytes: usize,
160 pub total_processed: u64,
162 pub total_bytes_transferred: u64,
164 pub failed_count: u64,
166 pub cancelled_count: u64,
168 pub avg_throughput_bps: f64,
170}
171
172impl UploadQueueStats {
173 #[allow(clippy::cast_precision_loss)]
175 #[must_use]
176 pub fn queued_mb(&self) -> f64 {
177 self.queued_bytes as f64 / (1024.0 * 1024.0)
178 }
179
180 #[allow(clippy::cast_precision_loss)]
182 #[must_use]
183 pub fn transferred_mb(&self) -> f64 {
184 self.total_bytes_transferred as f64 / (1024.0 * 1024.0)
185 }
186}
187
188pub struct UploadQueue {
190 queue: VecDeque<UploadRequest>,
192 completed: Vec<UploadRequest>,
194 config: UploadQueueConfig,
196 next_id: u64,
198 total_bytes_transferred: u64,
200 failed_count: u64,
202 cancelled_count: u64,
204}
205
206impl UploadQueue {
207 #[must_use]
209 pub fn new() -> Self {
210 Self::with_config(UploadQueueConfig::default())
211 }
212
213 #[must_use]
215 pub fn with_config(config: UploadQueueConfig) -> Self {
216 Self {
217 queue: VecDeque::new(),
218 completed: Vec::new(),
219 config,
220 next_id: 0,
221 total_bytes_transferred: 0,
222 failed_count: 0,
223 cancelled_count: 0,
224 }
225 }
226
227 pub fn enqueue(
230 &mut self,
231 data: Vec<u8>,
232 target: UploadTarget,
233 priority: UploadPriority,
234 ) -> Option<UploadId> {
235 let current_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
236 if self.queue.len() >= self.config.max_pending {
237 return None;
238 }
239 if current_bytes + data.len() > self.config.max_queued_bytes {
240 return None;
241 }
242 let id = UploadId(self.next_id);
243 self.next_id += 1;
244 let request = UploadRequest::new(id, data, target, priority);
245 self.queue.push_back(request);
246 Some(id)
247 }
248
249 pub fn flush(&mut self) -> Vec<UploadId> {
252 if self.config.priority_sort {
253 let mut items: Vec<UploadRequest> = self.queue.drain(..).collect();
254 items.sort_by(|a, b| b.priority.cmp(&a.priority));
255 self.queue = items.into_iter().collect();
256 }
257
258 let batch_size = self.config.flush_batch_size.min(self.queue.len());
259 let mut processed = Vec::with_capacity(batch_size);
260
261 for _ in 0..batch_size {
262 if let Some(mut request) = self.queue.pop_front() {
263 if request.target.fits(request.data.len()) {
264 request.state = UploadState::Completed;
265 request.complete_time = Some(Instant::now());
266 self.total_bytes_transferred += request.data.len() as u64;
267 } else {
268 request.state = UploadState::Failed;
269 self.failed_count += 1;
270 }
271 processed.push(request.id);
272 self.completed.push(request);
273 }
274 }
275
276 processed
277 }
278
279 pub fn cancel(&mut self, id: UploadId) -> bool {
281 if let Some(pos) = self.queue.iter().position(|r| r.id == id) {
282 let mut request = self.queue.remove(pos).unwrap();
283 request.state = UploadState::Cancelled;
284 self.cancelled_count += 1;
285 self.completed.push(request);
286 true
287 } else {
288 false
289 }
290 }
291
292 #[must_use]
294 pub fn pending_count(&self) -> usize {
295 self.queue.len()
296 }
297
298 #[must_use]
300 pub fn queued_bytes(&self) -> usize {
301 self.queue.iter().map(|r| r.data.len()).sum()
302 }
303
304 #[must_use]
306 pub fn is_empty(&self) -> bool {
307 self.queue.is_empty()
308 }
309
310 #[must_use]
312 pub fn request_state(&self, id: UploadId) -> Option<UploadState> {
313 self.queue
314 .iter()
315 .find(|r| r.id == id)
316 .map(|r| r.state)
317 .or_else(|| self.completed.iter().find(|r| r.id == id).map(|r| r.state))
318 }
319
320 #[allow(clippy::cast_precision_loss)]
322 #[must_use]
323 pub fn stats(&self) -> UploadQueueStats {
324 let queued_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
325 let total_processed = self.completed.len() as u64;
326 let latencies: Vec<f64> = self
327 .completed
328 .iter()
329 .filter_map(UploadRequest::latency)
330 .map(|d| d.as_secs_f64())
331 .collect();
332 let avg_throughput_bps = if latencies.is_empty() {
333 0.0
334 } else {
335 let total_secs: f64 = latencies.iter().sum();
336 if total_secs > 0.0 {
337 self.total_bytes_transferred as f64 / total_secs
338 } else {
339 0.0
340 }
341 };
342
343 UploadQueueStats {
344 queued_count: self.queue.len(),
345 queued_bytes,
346 total_processed,
347 total_bytes_transferred: self.total_bytes_transferred,
348 failed_count: self.failed_count,
349 cancelled_count: self.cancelled_count,
350 avg_throughput_bps,
351 }
352 }
353
354 pub fn clear(&mut self) {
356 while let Some(mut req) = self.queue.pop_front() {
357 req.state = UploadState::Cancelled;
358 self.cancelled_count += 1;
359 self.completed.push(req);
360 }
361 }
362
363 #[must_use]
365 pub fn peek_next(&self) -> Option<&UploadRequest> {
366 self.queue.front()
367 }
368}
369
370impl Default for UploadQueue {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 fn make_target(size: usize) -> UploadTarget {
381 UploadTarget::new(1, 0, size)
382 }
383
384 #[test]
385 fn test_create_queue() {
386 let queue = UploadQueue::new();
387 assert!(queue.is_empty());
388 assert_eq!(queue.pending_count(), 0);
389 }
390
391 #[test]
392 fn test_enqueue_request() {
393 let mut queue = UploadQueue::new();
394 let data = vec![0u8; 1024];
395 let target = make_target(2048);
396 let id = queue.enqueue(data, target, UploadPriority::Normal);
397 assert!(id.is_some());
398 assert_eq!(queue.pending_count(), 1);
399 }
400
401 #[test]
402 fn test_enqueue_limit() {
403 let config = UploadQueueConfig {
404 max_pending: 2,
405 ..Default::default()
406 };
407 let mut queue = UploadQueue::with_config(config);
408 let t1 = make_target(4096);
409 let t2 = make_target(4096);
410 let t3 = make_target(4096);
411 assert!(queue
412 .enqueue(vec![0; 100], t1, UploadPriority::Normal)
413 .is_some());
414 assert!(queue
415 .enqueue(vec![0; 100], t2, UploadPriority::Normal)
416 .is_some());
417 assert!(queue
418 .enqueue(vec![0; 100], t3, UploadPriority::Normal)
419 .is_none());
420 }
421
422 #[test]
423 fn test_enqueue_byte_limit() {
424 let config = UploadQueueConfig {
425 max_queued_bytes: 200,
426 ..Default::default()
427 };
428 let mut queue = UploadQueue::with_config(config);
429 let t1 = make_target(4096);
430 let t2 = make_target(4096);
431 assert!(queue
432 .enqueue(vec![0; 150], t1, UploadPriority::Normal)
433 .is_some());
434 assert!(queue
435 .enqueue(vec![0; 100], t2, UploadPriority::Normal)
436 .is_none());
437 }
438
439 #[test]
440 fn test_flush_batch() {
441 let mut queue = UploadQueue::new();
442 for i in 0..5 {
443 let target = make_target(4096);
444 queue.enqueue(vec![0u8; 100], target, UploadPriority::Normal);
445 let _ = i;
446 }
447 let processed = queue.flush();
448 assert!(!processed.is_empty());
449 }
450
451 #[test]
452 fn test_flush_priority_ordering() {
453 let config = UploadQueueConfig {
454 priority_sort: true,
455 flush_batch_size: 1,
456 ..Default::default()
457 };
458 let mut queue = UploadQueue::with_config(config);
459 let t1 = make_target(4096);
460 let t2 = make_target(4096);
461 let _low_id = queue.enqueue(vec![1], t1, UploadPriority::Low).unwrap();
462 let high_id = queue
463 .enqueue(vec![2], t2, UploadPriority::Critical)
464 .unwrap();
465 let processed = queue.flush();
466 assert_eq!(processed.len(), 1);
467 assert_eq!(processed[0], high_id);
468 }
469
470 #[test]
471 fn test_cancel_request() {
472 let mut queue = UploadQueue::new();
473 let target = make_target(4096);
474 let id = queue
475 .enqueue(vec![0; 100], target, UploadPriority::Normal)
476 .unwrap();
477 assert!(queue.cancel(id));
478 assert!(queue.is_empty());
479 assert_eq!(queue.request_state(id), Some(UploadState::Cancelled));
480 }
481
482 #[test]
483 fn test_cancel_nonexistent() {
484 let mut queue = UploadQueue::new();
485 assert!(!queue.cancel(UploadId(999)));
486 }
487
488 #[test]
489 fn test_request_state_tracking() {
490 let mut queue = UploadQueue::new();
491 let target = make_target(4096);
492 let id = queue
493 .enqueue(vec![0; 100], target, UploadPriority::Normal)
494 .unwrap();
495 assert_eq!(queue.request_state(id), Some(UploadState::Queued));
496 queue.flush();
497 assert_eq!(queue.request_state(id), Some(UploadState::Completed));
498 }
499
500 #[test]
501 fn test_failed_transfer() {
502 let mut queue = UploadQueue::new();
503 let target = UploadTarget::new(1, 0, 10);
505 let id = queue
506 .enqueue(vec![0; 100], target, UploadPriority::Normal)
507 .unwrap();
508 queue.flush();
509 assert_eq!(queue.request_state(id), Some(UploadState::Failed));
510 }
511
512 #[test]
513 fn test_queued_bytes() {
514 let mut queue = UploadQueue::new();
515 let t1 = make_target(4096);
516 let t2 = make_target(4096);
517 queue.enqueue(vec![0; 100], t1, UploadPriority::Normal);
518 queue.enqueue(vec![0; 200], t2, UploadPriority::Normal);
519 assert_eq!(queue.queued_bytes(), 300);
520 }
521
522 #[test]
523 fn test_stats() {
524 let mut queue = UploadQueue::new();
525 let target = make_target(4096);
526 queue.enqueue(vec![0; 1024], target, UploadPriority::Normal);
527 queue.flush();
528 let stats = queue.stats();
529 assert_eq!(stats.total_processed, 1);
530 assert_eq!(stats.total_bytes_transferred, 1024);
531 }
532
533 #[test]
534 fn test_stats_mb_conversion() {
535 let stats = UploadQueueStats {
536 queued_bytes: 1_048_576,
537 total_bytes_transferred: 2_097_152,
538 ..Default::default()
539 };
540 assert!((stats.queued_mb() - 1.0).abs() < 0.001);
541 assert!((stats.transferred_mb() - 2.0).abs() < 0.001);
542 }
543
544 #[test]
545 fn test_clear_queue() {
546 let mut queue = UploadQueue::new();
547 for _ in 0..5 {
548 let target = make_target(4096);
549 queue.enqueue(vec![0; 100], target, UploadPriority::Normal);
550 }
551 queue.clear();
552 assert!(queue.is_empty());
553 assert_eq!(queue.stats().cancelled_count, 5);
554 }
555
556 #[test]
557 fn test_peek_next() {
558 let mut queue = UploadQueue::new();
559 assert!(queue.peek_next().is_none());
560 let target = make_target(4096);
561 let id = queue
562 .enqueue(vec![0; 100], target, UploadPriority::Normal)
563 .unwrap();
564 let peeked = queue.peek_next().unwrap();
565 assert_eq!(peeked.id, id);
566 }
567
568 #[test]
569 fn test_upload_target_fits() {
570 let target = UploadTarget::new(1, 10, 100);
571 assert!(target.fits(90));
572 assert!(target.fits(0));
573 assert!(!target.fits(91));
574 }
575}