1#![allow(dead_code)]
2use std::collections::VecDeque;
9use std::time::Instant;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct UploadId(
14 pub u64,
16);
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
20pub enum UploadPriority {
21 Low = 0,
23 #[default]
25 Normal = 1,
26 High = 2,
28 Critical = 3,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UploadState {
35 Queued,
37 Transferring,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47#[derive(Debug, Clone)]
49pub struct UploadTarget {
50 pub buffer_id: u64,
52 pub offset: usize,
54 pub buffer_size: usize,
56}
57
58impl UploadTarget {
59 #[must_use]
61 pub fn new(buffer_id: u64, offset: usize, buffer_size: usize) -> Self {
62 Self {
63 buffer_id,
64 offset,
65 buffer_size,
66 }
67 }
68
69 #[must_use]
71 pub fn fits(&self, data_size: usize) -> bool {
72 self.offset + data_size <= self.buffer_size
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct UploadRequest {
79 pub id: UploadId,
81 pub data: Vec<u8>,
83 pub target: UploadTarget,
85 pub priority: UploadPriority,
87 pub state: UploadState,
89 pub enqueue_time: Instant,
91 pub complete_time: Option<Instant>,
93}
94
95impl UploadRequest {
96 #[must_use]
98 pub fn new(
99 id: UploadId,
100 data: Vec<u8>,
101 target: UploadTarget,
102 priority: UploadPriority,
103 ) -> Self {
104 Self {
105 id,
106 data,
107 target,
108 priority,
109 state: UploadState::Queued,
110 enqueue_time: Instant::now(),
111 complete_time: None,
112 }
113 }
114
115 #[must_use]
117 pub fn data_size(&self) -> usize {
118 self.data.len()
119 }
120
121 #[must_use]
123 pub fn latency(&self) -> Option<std::time::Duration> {
124 self.complete_time
125 .map(|t| t.duration_since(self.enqueue_time))
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct UploadQueueConfig {
132 pub max_pending: usize,
134 pub max_queued_bytes: usize,
136 pub flush_batch_size: usize,
138 pub priority_sort: bool,
140}
141
142impl Default for UploadQueueConfig {
143 fn default() -> Self {
144 Self {
145 max_pending: 256,
146 max_queued_bytes: 256 * 1024 * 1024, flush_batch_size: 32,
148 priority_sort: true,
149 }
150 }
151}
152
153#[derive(Debug, Clone, Default)]
155pub struct UploadQueueStats {
156 pub queued_count: usize,
158 pub queued_bytes: usize,
160 pub total_processed: u64,
162 pub total_bytes_transferred: u64,
164 pub failed_count: u64,
166 pub cancelled_count: u64,
168 pub avg_throughput_bps: f64,
170}
171
172impl UploadQueueStats {
173 #[allow(clippy::cast_precision_loss)]
175 #[must_use]
176 pub fn queued_mb(&self) -> f64 {
177 self.queued_bytes as f64 / (1024.0 * 1024.0)
178 }
179
180 #[allow(clippy::cast_precision_loss)]
182 #[must_use]
183 pub fn transferred_mb(&self) -> f64 {
184 self.total_bytes_transferred as f64 / (1024.0 * 1024.0)
185 }
186}
187
188pub struct UploadQueue {
190 queue: VecDeque<UploadRequest>,
192 completed: Vec<UploadRequest>,
194 config: UploadQueueConfig,
196 next_id: u64,
198 total_bytes_transferred: u64,
200 failed_count: u64,
202 cancelled_count: u64,
204}
205
206impl UploadQueue {
207 #[must_use]
209 pub fn new() -> Self {
210 Self::with_config(UploadQueueConfig::default())
211 }
212
213 #[must_use]
215 pub fn with_config(config: UploadQueueConfig) -> Self {
216 Self {
217 queue: VecDeque::new(),
218 completed: Vec::new(),
219 config,
220 next_id: 0,
221 total_bytes_transferred: 0,
222 failed_count: 0,
223 cancelled_count: 0,
224 }
225 }
226
227 pub fn enqueue(
230 &mut self,
231 data: Vec<u8>,
232 target: UploadTarget,
233 priority: UploadPriority,
234 ) -> Option<UploadId> {
235 let current_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
236 if self.queue.len() >= self.config.max_pending {
237 return None;
238 }
239 if current_bytes + data.len() > self.config.max_queued_bytes {
240 return None;
241 }
242 let id = UploadId(self.next_id);
243 self.next_id += 1;
244 let request = UploadRequest::new(id, data, target, priority);
245 self.queue.push_back(request);
246 Some(id)
247 }
248
249 pub fn flush(&mut self) -> Vec<UploadId> {
252 if self.config.priority_sort {
253 let mut items: Vec<UploadRequest> = self.queue.drain(..).collect();
254 items.sort_by(|a, b| b.priority.cmp(&a.priority));
255 self.queue = items.into_iter().collect();
256 }
257
258 let batch_size = self.config.flush_batch_size.min(self.queue.len());
259 let mut processed = Vec::with_capacity(batch_size);
260
261 for _ in 0..batch_size {
262 if let Some(mut request) = self.queue.pop_front() {
263 if request.target.fits(request.data.len()) {
264 request.state = UploadState::Completed;
265 request.complete_time = Some(Instant::now());
266 self.total_bytes_transferred += request.data.len() as u64;
267 } else {
268 request.state = UploadState::Failed;
269 self.failed_count += 1;
270 }
271 processed.push(request.id);
272 self.completed.push(request);
273 }
274 }
275
276 processed
277 }
278
279 pub fn cancel(&mut self, id: UploadId) -> bool {
281 if let Some(pos) = self.queue.iter().position(|r| r.id == id) {
282 let mut request = match self.queue.remove(pos) {
283 Some(r) => r,
284 None => return false,
285 };
286 request.state = UploadState::Cancelled;
287 self.cancelled_count += 1;
288 self.completed.push(request);
289 true
290 } else {
291 false
292 }
293 }
294
295 #[must_use]
297 pub fn pending_count(&self) -> usize {
298 self.queue.len()
299 }
300
301 #[must_use]
303 pub fn queued_bytes(&self) -> usize {
304 self.queue.iter().map(|r| r.data.len()).sum()
305 }
306
307 #[must_use]
309 pub fn is_empty(&self) -> bool {
310 self.queue.is_empty()
311 }
312
313 #[must_use]
315 pub fn request_state(&self, id: UploadId) -> Option<UploadState> {
316 self.queue
317 .iter()
318 .find(|r| r.id == id)
319 .map(|r| r.state)
320 .or_else(|| self.completed.iter().find(|r| r.id == id).map(|r| r.state))
321 }
322
323 #[allow(clippy::cast_precision_loss)]
325 #[must_use]
326 pub fn stats(&self) -> UploadQueueStats {
327 let queued_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
328 let total_processed = self.completed.len() as u64;
329 let latencies: Vec<f64> = self
330 .completed
331 .iter()
332 .filter_map(UploadRequest::latency)
333 .map(|d| d.as_secs_f64())
334 .collect();
335 let avg_throughput_bps = if latencies.is_empty() {
336 0.0
337 } else {
338 let total_secs: f64 = latencies.iter().sum();
339 if total_secs > 0.0 {
340 self.total_bytes_transferred as f64 / total_secs
341 } else {
342 0.0
343 }
344 };
345
346 UploadQueueStats {
347 queued_count: self.queue.len(),
348 queued_bytes,
349 total_processed,
350 total_bytes_transferred: self.total_bytes_transferred,
351 failed_count: self.failed_count,
352 cancelled_count: self.cancelled_count,
353 avg_throughput_bps,
354 }
355 }
356
357 pub fn clear(&mut self) {
359 while let Some(mut req) = self.queue.pop_front() {
360 req.state = UploadState::Cancelled;
361 self.cancelled_count += 1;
362 self.completed.push(req);
363 }
364 }
365
366 #[must_use]
368 pub fn peek_next(&self) -> Option<&UploadRequest> {
369 self.queue.front()
370 }
371}
372
373impl Default for UploadQueue {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 fn make_target(size: usize) -> UploadTarget {
384 UploadTarget::new(1, 0, size)
385 }
386
387 #[test]
388 fn test_create_queue() {
389 let queue = UploadQueue::new();
390 assert!(queue.is_empty());
391 assert_eq!(queue.pending_count(), 0);
392 }
393
394 #[test]
395 fn test_enqueue_request() {
396 let mut queue = UploadQueue::new();
397 let data = vec![0u8; 1024];
398 let target = make_target(2048);
399 let id = queue.enqueue(data, target, UploadPriority::Normal);
400 assert!(id.is_some());
401 assert_eq!(queue.pending_count(), 1);
402 }
403
404 #[test]
405 fn test_enqueue_limit() {
406 let config = UploadQueueConfig {
407 max_pending: 2,
408 ..Default::default()
409 };
410 let mut queue = UploadQueue::with_config(config);
411 let t1 = make_target(4096);
412 let t2 = make_target(4096);
413 let t3 = make_target(4096);
414 assert!(queue
415 .enqueue(vec![0; 100], t1, UploadPriority::Normal)
416 .is_some());
417 assert!(queue
418 .enqueue(vec![0; 100], t2, UploadPriority::Normal)
419 .is_some());
420 assert!(queue
421 .enqueue(vec![0; 100], t3, UploadPriority::Normal)
422 .is_none());
423 }
424
425 #[test]
426 fn test_enqueue_byte_limit() {
427 let config = UploadQueueConfig {
428 max_queued_bytes: 200,
429 ..Default::default()
430 };
431 let mut queue = UploadQueue::with_config(config);
432 let t1 = make_target(4096);
433 let t2 = make_target(4096);
434 assert!(queue
435 .enqueue(vec![0; 150], t1, UploadPriority::Normal)
436 .is_some());
437 assert!(queue
438 .enqueue(vec![0; 100], t2, UploadPriority::Normal)
439 .is_none());
440 }
441
442 #[test]
443 fn test_flush_batch() {
444 let mut queue = UploadQueue::new();
445 for i in 0..5 {
446 let target = make_target(4096);
447 queue.enqueue(vec![0u8; 100], target, UploadPriority::Normal);
448 let _ = i;
449 }
450 let processed = queue.flush();
451 assert!(!processed.is_empty());
452 }
453
454 #[test]
455 fn test_flush_priority_ordering() {
456 let config = UploadQueueConfig {
457 priority_sort: true,
458 flush_batch_size: 1,
459 ..Default::default()
460 };
461 let mut queue = UploadQueue::with_config(config);
462 let t1 = make_target(4096);
463 let t2 = make_target(4096);
464 let _low_id = queue
465 .enqueue(vec![1], t1, UploadPriority::Low)
466 .expect("enqueue should succeed");
467 let high_id = queue
468 .enqueue(vec![2], t2, UploadPriority::Critical)
469 .expect("operation should succeed in test");
470 let processed = queue.flush();
471 assert_eq!(processed.len(), 1);
472 assert_eq!(processed[0], high_id);
473 }
474
475 #[test]
476 fn test_cancel_request() {
477 let mut queue = UploadQueue::new();
478 let target = make_target(4096);
479 let id = queue
480 .enqueue(vec![0; 100], target, UploadPriority::Normal)
481 .expect("operation should succeed in test");
482 assert!(queue.cancel(id));
483 assert!(queue.is_empty());
484 assert_eq!(queue.request_state(id), Some(UploadState::Cancelled));
485 }
486
487 #[test]
488 fn test_cancel_nonexistent() {
489 let mut queue = UploadQueue::new();
490 assert!(!queue.cancel(UploadId(999)));
491 }
492
493 #[test]
494 fn test_request_state_tracking() {
495 let mut queue = UploadQueue::new();
496 let target = make_target(4096);
497 let id = queue
498 .enqueue(vec![0; 100], target, UploadPriority::Normal)
499 .expect("operation should succeed in test");
500 assert_eq!(queue.request_state(id), Some(UploadState::Queued));
501 queue.flush();
502 assert_eq!(queue.request_state(id), Some(UploadState::Completed));
503 }
504
505 #[test]
506 fn test_failed_transfer() {
507 let mut queue = UploadQueue::new();
508 let target = UploadTarget::new(1, 0, 10);
510 let id = queue
511 .enqueue(vec![0; 100], target, UploadPriority::Normal)
512 .expect("operation should succeed in test");
513 queue.flush();
514 assert_eq!(queue.request_state(id), Some(UploadState::Failed));
515 }
516
517 #[test]
518 fn test_queued_bytes() {
519 let mut queue = UploadQueue::new();
520 let t1 = make_target(4096);
521 let t2 = make_target(4096);
522 queue.enqueue(vec![0; 100], t1, UploadPriority::Normal);
523 queue.enqueue(vec![0; 200], t2, UploadPriority::Normal);
524 assert_eq!(queue.queued_bytes(), 300);
525 }
526
527 #[test]
528 fn test_stats() {
529 let mut queue = UploadQueue::new();
530 let target = make_target(4096);
531 queue.enqueue(vec![0; 1024], target, UploadPriority::Normal);
532 queue.flush();
533 let stats = queue.stats();
534 assert_eq!(stats.total_processed, 1);
535 assert_eq!(stats.total_bytes_transferred, 1024);
536 }
537
538 #[test]
539 fn test_stats_mb_conversion() {
540 let stats = UploadQueueStats {
541 queued_bytes: 1_048_576,
542 total_bytes_transferred: 2_097_152,
543 ..Default::default()
544 };
545 assert!((stats.queued_mb() - 1.0).abs() < 0.001);
546 assert!((stats.transferred_mb() - 2.0).abs() < 0.001);
547 }
548
549 #[test]
550 fn test_clear_queue() {
551 let mut queue = UploadQueue::new();
552 for _ in 0..5 {
553 let target = make_target(4096);
554 queue.enqueue(vec![0; 100], target, UploadPriority::Normal);
555 }
556 queue.clear();
557 assert!(queue.is_empty());
558 assert_eq!(queue.stats().cancelled_count, 5);
559 }
560
561 #[test]
562 fn test_peek_next() {
563 let mut queue = UploadQueue::new();
564 assert!(queue.peek_next().is_none());
565 let target = make_target(4096);
566 let id = queue
567 .enqueue(vec![0; 100], target, UploadPriority::Normal)
568 .expect("operation should succeed in test");
569 let peeked = queue.peek_next().expect("peek should return next item");
570 assert_eq!(peeked.id, id);
571 }
572
573 #[test]
574 fn test_upload_target_fits() {
575 let target = UploadTarget::new(1, 10, 100);
576 assert!(target.fits(90));
577 assert!(target.fits(0));
578 assert!(!target.fits(91));
579 }
580}