1use super::classification::DataType;
49use super::QoSClass;
50use crate::{Error, Result};
51use std::collections::VecDeque;
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::time::{Duration, Instant};
54
55#[derive(Debug, Clone)]
57pub struct PendingSync {
58 pub data: Vec<u8>,
60
61 pub qos_class: QoSClass,
63
64 pub data_type: DataType,
66
67 pub queued_at: Instant,
69
70 pub priority_multiplier: f32,
75}
76
77impl PendingSync {
78 pub fn new(data: Vec<u8>, qos_class: QoSClass, data_type: DataType) -> Self {
80 Self {
81 data,
82 qos_class,
83 data_type,
84 queued_at: Instant::now(),
85 priority_multiplier: 1.0,
86 }
87 }
88
89 pub fn with_multiplier(
91 data: Vec<u8>,
92 qos_class: QoSClass,
93 data_type: DataType,
94 multiplier: f32,
95 ) -> Self {
96 Self {
97 data,
98 qos_class,
99 data_type,
100 queued_at: Instant::now(),
101 priority_multiplier: multiplier,
102 }
103 }
104
105 pub fn queue_duration(&self) -> Duration {
107 self.queued_at.elapsed()
108 }
109
110 pub fn effective_class(&self) -> QoSClass {
114 let wait_hours = self.queue_duration().as_secs_f32() / 3600.0;
115 let adjusted_hours = wait_hours * self.priority_multiplier;
116
117 match self.qos_class {
118 QoSClass::Bulk if adjusted_hours >= 1.0 => QoSClass::Low,
119 QoSClass::Low if adjusted_hours >= 2.0 => QoSClass::Normal,
120 other => other,
121 }
122 }
123
124 pub fn should_promote(&self) -> bool {
126 self.effective_class() != self.qos_class
127 }
128
129 pub fn size(&self) -> usize {
131 self.data.len()
132 }
133}
134
135#[derive(Debug)]
140pub struct PrioritySyncQueue {
141 queues: [VecDeque<PendingSync>; 5],
143
144 total_bytes: AtomicUsize,
146
147 max_bytes: usize,
149
150 aging_promotions: AtomicUsize,
152}
153
154impl PrioritySyncQueue {
155 pub fn new(max_bytes: usize) -> Self {
161 Self {
162 queues: [
163 VecDeque::new(), VecDeque::new(), VecDeque::new(), VecDeque::new(), VecDeque::new(), ],
169 total_bytes: AtomicUsize::new(0),
170 max_bytes,
171 aging_promotions: AtomicUsize::new(0),
172 }
173 }
174
175 pub fn default_capacity() -> Self {
177 Self::new(10 * 1024 * 1024)
178 }
179
180 #[inline]
182 fn queue_index(class: QoSClass) -> usize {
183 (class.as_u8() - 1) as usize
184 }
185
186 pub fn enqueue(&mut self, sync: PendingSync) -> Result<()> {
190 let size = sync.size();
191 let current_bytes = self.total_bytes.load(Ordering::Relaxed);
192
193 if current_bytes + size > self.max_bytes {
194 return Err(Error::Internal(format!(
195 "Queue full: {} + {} > {} bytes",
196 current_bytes, size, self.max_bytes
197 )));
198 }
199
200 let idx = Self::queue_index(sync.qos_class);
201 self.queues[idx].push_back(sync);
202 self.total_bytes.fetch_add(size, Ordering::Relaxed);
203
204 Ok(())
205 }
206
207 pub fn dequeue_highest(&mut self) -> Option<PendingSync> {
211 for idx in 0..5 {
213 if let Some(sync) = self.queues[idx].pop_front() {
214 self.total_bytes.fetch_sub(sync.size(), Ordering::Relaxed);
215 return Some(sync);
216 }
217 }
218 None
219 }
220
221 pub fn peek_highest(&self) -> Option<&PendingSync> {
223 for idx in 0..5 {
224 if let Some(sync) = self.queues[idx].front() {
225 return Some(sync);
226 }
227 }
228 None
229 }
230
231 pub fn apply_aging(&mut self) -> usize {
236 let mut promoted = 0;
237
238 let bulk_idx = Self::queue_index(QoSClass::Bulk);
240 let low_idx = Self::queue_index(QoSClass::Low);
241
242 let mut to_promote_bulk = Vec::new();
243 self.queues[bulk_idx].retain(|sync| {
244 if sync.should_promote() && sync.effective_class() == QoSClass::Low {
245 to_promote_bulk.push(sync.clone());
246 false
247 } else {
248 true
249 }
250 });
251
252 for mut sync in to_promote_bulk {
253 sync.qos_class = QoSClass::Low;
254 self.queues[low_idx].push_back(sync);
255 promoted += 1;
256 }
257
258 let normal_idx = Self::queue_index(QoSClass::Normal);
260
261 let mut to_promote_low = Vec::new();
262 self.queues[low_idx].retain(|sync| {
263 if sync.should_promote() && sync.effective_class() == QoSClass::Normal {
264 to_promote_low.push(sync.clone());
265 false
266 } else {
267 true
268 }
269 });
270
271 for mut sync in to_promote_low {
272 sync.qos_class = QoSClass::Normal;
273 self.queues[normal_idx].push_back(sync);
274 promoted += 1;
275 }
276
277 if promoted > 0 {
278 self.aging_promotions.fetch_add(promoted, Ordering::Relaxed);
279 }
280
281 promoted
282 }
283
284 pub fn queue_depth(&self, class: QoSClass) -> usize {
286 let idx = Self::queue_index(class);
287 self.queues[idx].len()
288 }
289
290 pub fn total_bytes_queued(&self) -> usize {
292 self.total_bytes.load(Ordering::Relaxed)
293 }
294
295 pub fn total_items(&self) -> usize {
297 self.queues.iter().map(|q| q.len()).sum()
298 }
299
300 pub fn is_empty(&self) -> bool {
302 self.queues.iter().all(|q| q.is_empty())
303 }
304
305 pub fn is_full(&self) -> bool {
307 self.total_bytes.load(Ordering::Relaxed) >= self.max_bytes
308 }
309
310 pub fn available_bytes(&self) -> usize {
312 let current = self.total_bytes.load(Ordering::Relaxed);
313 self.max_bytes.saturating_sub(current)
314 }
315
316 pub fn max_bytes(&self) -> usize {
318 self.max_bytes
319 }
320
321 pub fn stats(&self) -> QueueStats {
323 QueueStats {
324 total_items: self.total_items(),
325 total_bytes: self.total_bytes_queued(),
326 max_bytes: self.max_bytes,
327 depth_critical: self.queue_depth(QoSClass::Critical),
328 depth_high: self.queue_depth(QoSClass::High),
329 depth_normal: self.queue_depth(QoSClass::Normal),
330 depth_low: self.queue_depth(QoSClass::Low),
331 depth_bulk: self.queue_depth(QoSClass::Bulk),
332 aging_promotions: self.aging_promotions.load(Ordering::Relaxed),
333 }
334 }
335
336 pub fn clear(&mut self) {
338 for queue in &mut self.queues {
339 queue.clear();
340 }
341 self.total_bytes.store(0, Ordering::Relaxed);
342 }
343
344 pub fn drain_class(&mut self, class: QoSClass) -> Vec<PendingSync> {
346 let idx = Self::queue_index(class);
347 let items: Vec<_> = self.queues[idx].drain(..).collect();
348
349 let bytes: usize = items.iter().map(|s| s.size()).sum();
350 self.total_bytes.fetch_sub(bytes, Ordering::Relaxed);
351
352 items
353 }
354
355 pub fn remove_stale(&mut self, max_age: Duration) -> usize {
359 let mut removed = 0;
360 let mut bytes_removed = 0;
361
362 for queue in &mut self.queues {
363 let old_len = queue.len();
364 queue.retain(|sync| {
365 let keep = sync.queue_duration() < max_age;
366 if !keep {
367 bytes_removed += sync.size();
368 }
369 keep
370 });
371 removed += old_len - queue.len();
372 }
373
374 if bytes_removed > 0 {
375 self.total_bytes.fetch_sub(bytes_removed, Ordering::Relaxed);
376 }
377
378 removed
379 }
380
381 pub fn oldest_item_age(&self) -> Option<Duration> {
383 self.queues
384 .iter()
385 .filter_map(|q| q.front())
386 .map(|s| s.queue_duration())
387 .max()
388 }
389
390 pub fn dequeue_batch(&mut self, max_items: usize) -> Vec<PendingSync> {
392 let mut batch = Vec::with_capacity(max_items);
393
394 while batch.len() < max_items {
395 if let Some(sync) = self.dequeue_highest() {
396 batch.push(sync);
397 } else {
398 break;
399 }
400 }
401
402 batch
403 }
404
405 pub fn dequeue_bytes(&mut self, max_bytes: usize) -> Vec<PendingSync> {
407 let mut batch = Vec::new();
408 let mut total_bytes = 0;
409
410 while total_bytes < max_bytes {
411 if let Some(peek) = self.peek_highest() {
413 if total_bytes + peek.size() > max_bytes && !batch.is_empty() {
414 break; }
416 }
417
418 if let Some(sync) = self.dequeue_highest() {
419 total_bytes += sync.size();
420 batch.push(sync);
421 } else {
422 break;
423 }
424 }
425
426 batch
427 }
428}
429
430#[derive(Debug, Clone, Copy)]
432pub struct QueueStats {
433 pub total_items: usize,
435
436 pub total_bytes: usize,
438
439 pub max_bytes: usize,
441
442 pub depth_critical: usize,
444
445 pub depth_high: usize,
447
448 pub depth_normal: usize,
450
451 pub depth_low: usize,
453
454 pub depth_bulk: usize,
456
457 pub aging_promotions: usize,
459}
460
461impl QueueStats {
462 pub fn utilization(&self) -> f64 {
464 if self.max_bytes == 0 {
465 0.0
466 } else {
467 self.total_bytes as f64 / self.max_bytes as f64
468 }
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 #[test]
477 fn test_pending_sync_creation() {
478 let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Critical, DataType::ContactReport);
479
480 assert_eq!(sync.size(), 3);
481 assert_eq!(sync.qos_class, QoSClass::Critical);
482 assert_eq!(sync.priority_multiplier, 1.0);
483 }
484
485 #[test]
486 fn test_queue_creation() {
487 let queue = PrioritySyncQueue::new(1024);
488
489 assert!(queue.is_empty());
490 assert_eq!(queue.max_bytes(), 1024);
491 assert_eq!(queue.available_bytes(), 1024);
492 }
493
494 #[test]
495 fn test_enqueue_dequeue() {
496 let mut queue = PrioritySyncQueue::new(1024);
497
498 let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Normal, DataType::HealthStatus);
499 queue.enqueue(sync).unwrap();
500
501 assert_eq!(queue.total_items(), 1);
502 assert_eq!(queue.total_bytes_queued(), 3);
503
504 let dequeued = queue.dequeue_highest().unwrap();
505 assert_eq!(dequeued.qos_class, QoSClass::Normal);
506 assert!(queue.is_empty());
507 }
508
509 #[test]
510 fn test_priority_ordering() {
511 let mut queue = PrioritySyncQueue::new(1024);
512
513 queue
515 .enqueue(PendingSync::new(
516 vec![5],
517 QoSClass::Bulk,
518 DataType::DebugLog,
519 ))
520 .unwrap();
521 queue
522 .enqueue(PendingSync::new(
523 vec![1],
524 QoSClass::Critical,
525 DataType::ContactReport,
526 ))
527 .unwrap();
528 queue
529 .enqueue(PendingSync::new(
530 vec![3],
531 QoSClass::Normal,
532 DataType::HealthStatus,
533 ))
534 .unwrap();
535
536 assert_eq!(
538 queue.dequeue_highest().unwrap().qos_class,
539 QoSClass::Critical
540 );
541 assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Normal);
542 assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Bulk);
543 }
544
545 #[test]
546 fn test_queue_full() {
547 let mut queue = PrioritySyncQueue::new(10);
548
549 let sync1 = PendingSync::new(vec![0; 8], QoSClass::Normal, DataType::HealthStatus);
550 queue.enqueue(sync1).unwrap();
551
552 let sync2 = PendingSync::new(vec![0; 5], QoSClass::Normal, DataType::HealthStatus);
554 assert!(queue.enqueue(sync2).is_err());
555 }
556
557 #[test]
558 fn test_queue_depth() {
559 let mut queue = PrioritySyncQueue::new(1024);
560
561 queue
562 .enqueue(PendingSync::new(
563 vec![1],
564 QoSClass::Critical,
565 DataType::ContactReport,
566 ))
567 .unwrap();
568 queue
569 .enqueue(PendingSync::new(
570 vec![2],
571 QoSClass::Critical,
572 DataType::EmergencyAlert,
573 ))
574 .unwrap();
575 queue
576 .enqueue(PendingSync::new(
577 vec![3],
578 QoSClass::Normal,
579 DataType::HealthStatus,
580 ))
581 .unwrap();
582
583 assert_eq!(queue.queue_depth(QoSClass::Critical), 2);
584 assert_eq!(queue.queue_depth(QoSClass::Normal), 1);
585 assert_eq!(queue.queue_depth(QoSClass::Bulk), 0);
586 }
587
588 #[test]
589 fn test_peek_highest() {
590 let mut queue = PrioritySyncQueue::new(1024);
591
592 queue
593 .enqueue(PendingSync::new(
594 vec![3],
595 QoSClass::Normal,
596 DataType::HealthStatus,
597 ))
598 .unwrap();
599 queue
600 .enqueue(PendingSync::new(
601 vec![1],
602 QoSClass::Critical,
603 DataType::ContactReport,
604 ))
605 .unwrap();
606
607 let peeked = queue.peek_highest().unwrap();
609 assert_eq!(peeked.qos_class, QoSClass::Critical);
610 assert_eq!(queue.total_items(), 2);
611 }
612
613 #[test]
614 fn test_clear() {
615 let mut queue = PrioritySyncQueue::new(1024);
616
617 queue
618 .enqueue(PendingSync::new(
619 vec![1; 100],
620 QoSClass::Normal,
621 DataType::HealthStatus,
622 ))
623 .unwrap();
624 queue
625 .enqueue(PendingSync::new(
626 vec![2; 100],
627 QoSClass::High,
628 DataType::TargetImage,
629 ))
630 .unwrap();
631
632 queue.clear();
633
634 assert!(queue.is_empty());
635 assert_eq!(queue.total_bytes_queued(), 0);
636 }
637
638 #[test]
639 fn test_drain_class() {
640 let mut queue = PrioritySyncQueue::new(1024);
641
642 queue
643 .enqueue(PendingSync::new(
644 vec![1],
645 QoSClass::Normal,
646 DataType::HealthStatus,
647 ))
648 .unwrap();
649 queue
650 .enqueue(PendingSync::new(
651 vec![2],
652 QoSClass::Normal,
653 DataType::CapabilityChange,
654 ))
655 .unwrap();
656 queue
657 .enqueue(PendingSync::new(
658 vec![3],
659 QoSClass::High,
660 DataType::TargetImage,
661 ))
662 .unwrap();
663
664 let drained = queue.drain_class(QoSClass::Normal);
665 assert_eq!(drained.len(), 2);
666 assert_eq!(queue.queue_depth(QoSClass::Normal), 0);
667 assert_eq!(queue.queue_depth(QoSClass::High), 1);
668 }
669
670 #[test]
671 fn test_stats() {
672 let mut queue = PrioritySyncQueue::new(1024);
673
674 queue
675 .enqueue(PendingSync::new(
676 vec![0; 100],
677 QoSClass::Critical,
678 DataType::ContactReport,
679 ))
680 .unwrap();
681 queue
682 .enqueue(PendingSync::new(
683 vec![0; 50],
684 QoSClass::Bulk,
685 DataType::DebugLog,
686 ))
687 .unwrap();
688
689 let stats = queue.stats();
690 assert_eq!(stats.total_items, 2);
691 assert_eq!(stats.total_bytes, 150);
692 assert_eq!(stats.depth_critical, 1);
693 assert_eq!(stats.depth_bulk, 1);
694 assert!((stats.utilization() - 150.0 / 1024.0).abs() < 0.001);
695 }
696
697 #[test]
698 fn test_dequeue_batch() {
699 let mut queue = PrioritySyncQueue::new(1024);
700
701 for i in 0..5 {
702 queue
703 .enqueue(PendingSync::new(
704 vec![i],
705 QoSClass::Normal,
706 DataType::HealthStatus,
707 ))
708 .unwrap();
709 }
710
711 let batch = queue.dequeue_batch(3);
712 assert_eq!(batch.len(), 3);
713 assert_eq!(queue.total_items(), 2);
714 }
715
716 #[test]
717 fn test_dequeue_bytes() {
718 let mut queue = PrioritySyncQueue::new(1024);
719
720 queue
721 .enqueue(PendingSync::new(
722 vec![0; 100],
723 QoSClass::Critical,
724 DataType::ContactReport,
725 ))
726 .unwrap();
727 queue
728 .enqueue(PendingSync::new(
729 vec![0; 100],
730 QoSClass::High,
731 DataType::TargetImage,
732 ))
733 .unwrap();
734 queue
735 .enqueue(PendingSync::new(
736 vec![0; 100],
737 QoSClass::Normal,
738 DataType::HealthStatus,
739 ))
740 .unwrap();
741
742 let batch = queue.dequeue_bytes(150);
744 assert!(!batch.is_empty());
745 }
746
747 #[test]
748 fn test_effective_class_no_aging() {
749 let sync = PendingSync::new(vec![1], QoSClass::Bulk, DataType::DebugLog);
750
751 assert_eq!(sync.effective_class(), QoSClass::Bulk);
753 assert!(!sync.should_promote());
754 }
755
756 #[test]
757 fn test_oldest_item_age() {
758 let mut queue = PrioritySyncQueue::new(1024);
759
760 assert!(queue.oldest_item_age().is_none());
761
762 queue
763 .enqueue(PendingSync::new(
764 vec![1],
765 QoSClass::Normal,
766 DataType::HealthStatus,
767 ))
768 .unwrap();
769
770 let age = queue.oldest_item_age().unwrap();
771 assert!(age < Duration::from_secs(1));
772 }
773
774 #[test]
775 fn test_available_bytes() {
776 let mut queue = PrioritySyncQueue::new(1000);
777
778 queue
779 .enqueue(PendingSync::new(
780 vec![0; 300],
781 QoSClass::Normal,
782 DataType::HealthStatus,
783 ))
784 .unwrap();
785
786 assert_eq!(queue.available_bytes(), 700);
787 }
788}