Skip to main content

peat_protocol/qos/
sync_queue.rs

1//! Priority-based sync queue (ADR-019 Phase 3)
2//!
3//! This module provides a priority-ordered queue for pending synchronization
4//! operations. Data is dequeued in priority order (P1 Critical first) with
5//! support for priority aging to prevent starvation.
6//!
7//! # Architecture
8//!
9//! The queue maintains separate internal queues for each QoS class:
10//! - P1 Critical: Emergency alerts, contact reports
11//! - P2 High: Target imagery, mission retasking
12//! - P3 Normal: Health status, capability changes
13//! - P4 Low: Position updates, heartbeats
14//! - P5 Bulk: Model updates, debug logs
15//!
16//! # Priority Aging
17//!
18//! To prevent starvation of low-priority data, items are promoted after
19//! extended wait times:
20//! - P5 → P4 after 1 hour
21//! - P4 → P3 after 2 hours
22//!
23//! # Example
24//!
25//! ```
26//! use peat_protocol::qos::{QoSClass, DataType, PrioritySyncQueue, PendingSync};
27//! use std::time::Instant;
28//!
29//! let mut queue = PrioritySyncQueue::new(10 * 1024 * 1024); // 10 MB max
30//!
31//! // Enqueue some data
32//! let sync = PendingSync {
33//!     data: vec![1, 2, 3],
34//!     qos_class: QoSClass::Critical,
35//!     data_type: DataType::ContactReport,
36//!     queued_at: Instant::now(),
37//!     priority_multiplier: 1.0,
38//! };
39//!
40//! queue.enqueue(sync).unwrap();
41//!
42//! // Dequeue highest priority
43//! if let Some(item) = queue.dequeue_highest() {
44//!     assert_eq!(item.qos_class, QoSClass::Critical);
45//! }
46//! ```
47
48use super::classification::DataType;
49use super::QoSClass;
50use crate::{Error, Result};
51use std::collections::VecDeque;
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::time::{Duration, Instant};
54
55/// A pending synchronization item
56#[derive(Debug, Clone)]
57pub struct PendingSync {
58    /// The data to synchronize
59    pub data: Vec<u8>,
60
61    /// QoS class for this data
62    pub qos_class: QoSClass,
63
64    /// Type of data being synced
65    pub data_type: DataType,
66
67    /// When this item was enqueued
68    pub queued_at: Instant,
69
70    /// Priority multiplier for aging (1.0 = normal)
71    ///
72    /// Values > 1.0 accelerate aging promotion.
73    /// Values < 1.0 slow down aging promotion.
74    pub priority_multiplier: f32,
75}
76
77impl PendingSync {
78    /// Create a new pending sync item
79    pub fn new(data: Vec<u8>, qos_class: QoSClass, data_type: DataType) -> Self {
80        Self {
81            data,
82            qos_class,
83            data_type,
84            queued_at: Instant::now(),
85            priority_multiplier: 1.0,
86        }
87    }
88
89    /// Create with custom priority multiplier
90    pub fn with_multiplier(
91        data: Vec<u8>,
92        qos_class: QoSClass,
93        data_type: DataType,
94        multiplier: f32,
95    ) -> Self {
96        Self {
97            data,
98            qos_class,
99            data_type,
100            queued_at: Instant::now(),
101            priority_multiplier: multiplier,
102        }
103    }
104
105    /// Get the time this item has been queued
106    pub fn queue_duration(&self) -> Duration {
107        self.queued_at.elapsed()
108    }
109
110    /// Get effective priority considering aging
111    ///
112    /// Returns the potentially-promoted QoS class based on wait time.
113    pub fn effective_class(&self) -> QoSClass {
114        let wait_hours = self.queue_duration().as_secs_f32() / 3600.0;
115        let adjusted_hours = wait_hours * self.priority_multiplier;
116
117        match self.qos_class {
118            QoSClass::Bulk if adjusted_hours >= 1.0 => QoSClass::Low,
119            QoSClass::Low if adjusted_hours >= 2.0 => QoSClass::Normal,
120            other => other,
121        }
122    }
123
124    /// Check if this item should be promoted due to aging
125    pub fn should_promote(&self) -> bool {
126        self.effective_class() != self.qos_class
127    }
128
129    /// Get size in bytes
130    pub fn size(&self) -> usize {
131        self.data.len()
132    }
133}
134
135/// Priority-ordered sync queue
136///
137/// Maintains separate queues per QoS class with automatic aging promotion
138/// to prevent starvation of low-priority data.
139#[derive(Debug)]
140pub struct PrioritySyncQueue {
141    /// Internal queues, indexed by QoS class (1-5)
142    queues: [VecDeque<PendingSync>; 5],
143
144    /// Total bytes currently queued
145    total_bytes: AtomicUsize,
146
147    /// Maximum queue size in bytes
148    max_bytes: usize,
149
150    /// Number of items promoted due to aging
151    aging_promotions: AtomicUsize,
152}
153
154impl PrioritySyncQueue {
155    /// Create a new priority sync queue
156    ///
157    /// # Arguments
158    ///
159    /// * `max_bytes` - Maximum total size in bytes the queue can hold
160    pub fn new(max_bytes: usize) -> Self {
161        Self {
162            queues: [
163                VecDeque::new(), // P1 Critical (index 0)
164                VecDeque::new(), // P2 High (index 1)
165                VecDeque::new(), // P3 Normal (index 2)
166                VecDeque::new(), // P4 Low (index 3)
167                VecDeque::new(), // P5 Bulk (index 4)
168            ],
169            total_bytes: AtomicUsize::new(0),
170            max_bytes,
171            aging_promotions: AtomicUsize::new(0),
172        }
173    }
174
175    /// Default queue with 10 MB capacity
176    pub fn default_capacity() -> Self {
177        Self::new(10 * 1024 * 1024)
178    }
179
180    /// Get the queue index for a QoS class
181    #[inline]
182    fn queue_index(class: QoSClass) -> usize {
183        (class.as_u8() - 1) as usize
184    }
185
186    /// Enqueue a sync item
187    ///
188    /// Returns an error if the queue is full.
189    pub fn enqueue(&mut self, sync: PendingSync) -> Result<()> {
190        let size = sync.size();
191        let current_bytes = self.total_bytes.load(Ordering::Relaxed);
192
193        if current_bytes + size > self.max_bytes {
194            return Err(Error::Internal(format!(
195                "Queue full: {} + {} > {} bytes",
196                current_bytes, size, self.max_bytes
197            )));
198        }
199
200        let idx = Self::queue_index(sync.qos_class);
201        self.queues[idx].push_back(sync);
202        self.total_bytes.fetch_add(size, Ordering::Relaxed);
203
204        Ok(())
205    }
206
207    /// Dequeue the highest priority item
208    ///
209    /// Returns None if all queues are empty.
210    pub fn dequeue_highest(&mut self) -> Option<PendingSync> {
211        // Check queues in priority order (P1 first)
212        for idx in 0..5 {
213            if let Some(sync) = self.queues[idx].pop_front() {
214                self.total_bytes.fetch_sub(sync.size(), Ordering::Relaxed);
215                return Some(sync);
216            }
217        }
218        None
219    }
220
221    /// Peek at the highest priority item without removing
222    pub fn peek_highest(&self) -> Option<&PendingSync> {
223        for idx in 0..5 {
224            if let Some(sync) = self.queues[idx].front() {
225                return Some(sync);
226            }
227        }
228        None
229    }
230
231    /// Apply aging promotion to queued items
232    ///
233    /// Moves items that have waited long enough to higher priority queues.
234    /// Returns the number of items promoted.
235    pub fn apply_aging(&mut self) -> usize {
236        let mut promoted = 0;
237
238        // Process P5 Bulk → P4 Low promotions
239        let bulk_idx = Self::queue_index(QoSClass::Bulk);
240        let low_idx = Self::queue_index(QoSClass::Low);
241
242        let mut to_promote_bulk = Vec::new();
243        self.queues[bulk_idx].retain(|sync| {
244            if sync.should_promote() && sync.effective_class() == QoSClass::Low {
245                to_promote_bulk.push(sync.clone());
246                false
247            } else {
248                true
249            }
250        });
251
252        for mut sync in to_promote_bulk {
253            sync.qos_class = QoSClass::Low;
254            self.queues[low_idx].push_back(sync);
255            promoted += 1;
256        }
257
258        // Process P4 Low → P3 Normal promotions
259        let normal_idx = Self::queue_index(QoSClass::Normal);
260
261        let mut to_promote_low = Vec::new();
262        self.queues[low_idx].retain(|sync| {
263            if sync.should_promote() && sync.effective_class() == QoSClass::Normal {
264                to_promote_low.push(sync.clone());
265                false
266            } else {
267                true
268            }
269        });
270
271        for mut sync in to_promote_low {
272            sync.qos_class = QoSClass::Normal;
273            self.queues[normal_idx].push_back(sync);
274            promoted += 1;
275        }
276
277        if promoted > 0 {
278            self.aging_promotions.fetch_add(promoted, Ordering::Relaxed);
279        }
280
281        promoted
282    }
283
284    /// Get queue depth for a specific class
285    pub fn queue_depth(&self, class: QoSClass) -> usize {
286        let idx = Self::queue_index(class);
287        self.queues[idx].len()
288    }
289
290    /// Get total bytes currently queued
291    pub fn total_bytes_queued(&self) -> usize {
292        self.total_bytes.load(Ordering::Relaxed)
293    }
294
295    /// Get total items currently queued
296    pub fn total_items(&self) -> usize {
297        self.queues.iter().map(|q| q.len()).sum()
298    }
299
300    /// Check if the queue is empty
301    pub fn is_empty(&self) -> bool {
302        self.queues.iter().all(|q| q.is_empty())
303    }
304
305    /// Check if the queue is full
306    pub fn is_full(&self) -> bool {
307        self.total_bytes.load(Ordering::Relaxed) >= self.max_bytes
308    }
309
310    /// Get available capacity in bytes
311    pub fn available_bytes(&self) -> usize {
312        let current = self.total_bytes.load(Ordering::Relaxed);
313        self.max_bytes.saturating_sub(current)
314    }
315
316    /// Get max capacity in bytes
317    pub fn max_bytes(&self) -> usize {
318        self.max_bytes
319    }
320
321    /// Get queue statistics
322    pub fn stats(&self) -> QueueStats {
323        QueueStats {
324            total_items: self.total_items(),
325            total_bytes: self.total_bytes_queued(),
326            max_bytes: self.max_bytes,
327            depth_critical: self.queue_depth(QoSClass::Critical),
328            depth_high: self.queue_depth(QoSClass::High),
329            depth_normal: self.queue_depth(QoSClass::Normal),
330            depth_low: self.queue_depth(QoSClass::Low),
331            depth_bulk: self.queue_depth(QoSClass::Bulk),
332            aging_promotions: self.aging_promotions.load(Ordering::Relaxed),
333        }
334    }
335
336    /// Clear all queued items
337    pub fn clear(&mut self) {
338        for queue in &mut self.queues {
339            queue.clear();
340        }
341        self.total_bytes.store(0, Ordering::Relaxed);
342    }
343
344    /// Drain all items from a specific class
345    pub fn drain_class(&mut self, class: QoSClass) -> Vec<PendingSync> {
346        let idx = Self::queue_index(class);
347        let items: Vec<_> = self.queues[idx].drain(..).collect();
348
349        let bytes: usize = items.iter().map(|s| s.size()).sum();
350        self.total_bytes.fetch_sub(bytes, Ordering::Relaxed);
351
352        items
353    }
354
355    /// Remove items older than a threshold
356    ///
357    /// Returns the number of items removed.
358    pub fn remove_stale(&mut self, max_age: Duration) -> usize {
359        let mut removed = 0;
360        let mut bytes_removed = 0;
361
362        for queue in &mut self.queues {
363            let old_len = queue.len();
364            queue.retain(|sync| {
365                let keep = sync.queue_duration() < max_age;
366                if !keep {
367                    bytes_removed += sync.size();
368                }
369                keep
370            });
371            removed += old_len - queue.len();
372        }
373
374        if bytes_removed > 0 {
375            self.total_bytes.fetch_sub(bytes_removed, Ordering::Relaxed);
376        }
377
378        removed
379    }
380
381    /// Get oldest item across all queues (for monitoring)
382    pub fn oldest_item_age(&self) -> Option<Duration> {
383        self.queues
384            .iter()
385            .filter_map(|q| q.front())
386            .map(|s| s.queue_duration())
387            .max()
388    }
389
390    /// Dequeue up to N items, respecting priority order
391    pub fn dequeue_batch(&mut self, max_items: usize) -> Vec<PendingSync> {
392        let mut batch = Vec::with_capacity(max_items);
393
394        while batch.len() < max_items {
395            if let Some(sync) = self.dequeue_highest() {
396                batch.push(sync);
397            } else {
398                break;
399            }
400        }
401
402        batch
403    }
404
405    /// Dequeue items up to a byte limit, respecting priority order
406    pub fn dequeue_bytes(&mut self, max_bytes: usize) -> Vec<PendingSync> {
407        let mut batch = Vec::new();
408        let mut total_bytes = 0;
409
410        while total_bytes < max_bytes {
411            // Peek first to check size
412            if let Some(peek) = self.peek_highest() {
413                if total_bytes + peek.size() > max_bytes && !batch.is_empty() {
414                    break; // Would exceed limit
415                }
416            }
417
418            if let Some(sync) = self.dequeue_highest() {
419                total_bytes += sync.size();
420                batch.push(sync);
421            } else {
422                break;
423            }
424        }
425
426        batch
427    }
428}
429
430/// Queue statistics
431#[derive(Debug, Clone, Copy)]
432pub struct QueueStats {
433    /// Total items in queue
434    pub total_items: usize,
435
436    /// Total bytes in queue
437    pub total_bytes: usize,
438
439    /// Maximum bytes allowed
440    pub max_bytes: usize,
441
442    /// Items in P1 Critical queue
443    pub depth_critical: usize,
444
445    /// Items in P2 High queue
446    pub depth_high: usize,
447
448    /// Items in P3 Normal queue
449    pub depth_normal: usize,
450
451    /// Items in P4 Low queue
452    pub depth_low: usize,
453
454    /// Items in P5 Bulk queue
455    pub depth_bulk: usize,
456
457    /// Number of aging promotions
458    pub aging_promotions: usize,
459}
460
461impl QueueStats {
462    /// Get utilization percentage (0.0 - 1.0)
463    pub fn utilization(&self) -> f64 {
464        if self.max_bytes == 0 {
465            0.0
466        } else {
467            self.total_bytes as f64 / self.max_bytes as f64
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    #[test]
477    fn test_pending_sync_creation() {
478        let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Critical, DataType::ContactReport);
479
480        assert_eq!(sync.size(), 3);
481        assert_eq!(sync.qos_class, QoSClass::Critical);
482        assert_eq!(sync.priority_multiplier, 1.0);
483    }
484
485    #[test]
486    fn test_queue_creation() {
487        let queue = PrioritySyncQueue::new(1024);
488
489        assert!(queue.is_empty());
490        assert_eq!(queue.max_bytes(), 1024);
491        assert_eq!(queue.available_bytes(), 1024);
492    }
493
494    #[test]
495    fn test_enqueue_dequeue() {
496        let mut queue = PrioritySyncQueue::new(1024);
497
498        let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Normal, DataType::HealthStatus);
499        queue.enqueue(sync).unwrap();
500
501        assert_eq!(queue.total_items(), 1);
502        assert_eq!(queue.total_bytes_queued(), 3);
503
504        let dequeued = queue.dequeue_highest().unwrap();
505        assert_eq!(dequeued.qos_class, QoSClass::Normal);
506        assert!(queue.is_empty());
507    }
508
509    #[test]
510    fn test_priority_ordering() {
511        let mut queue = PrioritySyncQueue::new(1024);
512
513        // Enqueue in reverse priority order
514        queue
515            .enqueue(PendingSync::new(
516                vec![5],
517                QoSClass::Bulk,
518                DataType::DebugLog,
519            ))
520            .unwrap();
521        queue
522            .enqueue(PendingSync::new(
523                vec![1],
524                QoSClass::Critical,
525                DataType::ContactReport,
526            ))
527            .unwrap();
528        queue
529            .enqueue(PendingSync::new(
530                vec![3],
531                QoSClass::Normal,
532                DataType::HealthStatus,
533            ))
534            .unwrap();
535
536        // Should dequeue in priority order
537        assert_eq!(
538            queue.dequeue_highest().unwrap().qos_class,
539            QoSClass::Critical
540        );
541        assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Normal);
542        assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Bulk);
543    }
544
545    #[test]
546    fn test_queue_full() {
547        let mut queue = PrioritySyncQueue::new(10);
548
549        let sync1 = PendingSync::new(vec![0; 8], QoSClass::Normal, DataType::HealthStatus);
550        queue.enqueue(sync1).unwrap();
551
552        // This should fail - would exceed capacity
553        let sync2 = PendingSync::new(vec![0; 5], QoSClass::Normal, DataType::HealthStatus);
554        assert!(queue.enqueue(sync2).is_err());
555    }
556
557    #[test]
558    fn test_queue_depth() {
559        let mut queue = PrioritySyncQueue::new(1024);
560
561        queue
562            .enqueue(PendingSync::new(
563                vec![1],
564                QoSClass::Critical,
565                DataType::ContactReport,
566            ))
567            .unwrap();
568        queue
569            .enqueue(PendingSync::new(
570                vec![2],
571                QoSClass::Critical,
572                DataType::EmergencyAlert,
573            ))
574            .unwrap();
575        queue
576            .enqueue(PendingSync::new(
577                vec![3],
578                QoSClass::Normal,
579                DataType::HealthStatus,
580            ))
581            .unwrap();
582
583        assert_eq!(queue.queue_depth(QoSClass::Critical), 2);
584        assert_eq!(queue.queue_depth(QoSClass::Normal), 1);
585        assert_eq!(queue.queue_depth(QoSClass::Bulk), 0);
586    }
587
588    #[test]
589    fn test_peek_highest() {
590        let mut queue = PrioritySyncQueue::new(1024);
591
592        queue
593            .enqueue(PendingSync::new(
594                vec![3],
595                QoSClass::Normal,
596                DataType::HealthStatus,
597            ))
598            .unwrap();
599        queue
600            .enqueue(PendingSync::new(
601                vec![1],
602                QoSClass::Critical,
603                DataType::ContactReport,
604            ))
605            .unwrap();
606
607        // Peek should not remove
608        let peeked = queue.peek_highest().unwrap();
609        assert_eq!(peeked.qos_class, QoSClass::Critical);
610        assert_eq!(queue.total_items(), 2);
611    }
612
613    #[test]
614    fn test_clear() {
615        let mut queue = PrioritySyncQueue::new(1024);
616
617        queue
618            .enqueue(PendingSync::new(
619                vec![1; 100],
620                QoSClass::Normal,
621                DataType::HealthStatus,
622            ))
623            .unwrap();
624        queue
625            .enqueue(PendingSync::new(
626                vec![2; 100],
627                QoSClass::High,
628                DataType::TargetImage,
629            ))
630            .unwrap();
631
632        queue.clear();
633
634        assert!(queue.is_empty());
635        assert_eq!(queue.total_bytes_queued(), 0);
636    }
637
638    #[test]
639    fn test_drain_class() {
640        let mut queue = PrioritySyncQueue::new(1024);
641
642        queue
643            .enqueue(PendingSync::new(
644                vec![1],
645                QoSClass::Normal,
646                DataType::HealthStatus,
647            ))
648            .unwrap();
649        queue
650            .enqueue(PendingSync::new(
651                vec![2],
652                QoSClass::Normal,
653                DataType::CapabilityChange,
654            ))
655            .unwrap();
656        queue
657            .enqueue(PendingSync::new(
658                vec![3],
659                QoSClass::High,
660                DataType::TargetImage,
661            ))
662            .unwrap();
663
664        let drained = queue.drain_class(QoSClass::Normal);
665        assert_eq!(drained.len(), 2);
666        assert_eq!(queue.queue_depth(QoSClass::Normal), 0);
667        assert_eq!(queue.queue_depth(QoSClass::High), 1);
668    }
669
670    #[test]
671    fn test_stats() {
672        let mut queue = PrioritySyncQueue::new(1024);
673
674        queue
675            .enqueue(PendingSync::new(
676                vec![0; 100],
677                QoSClass::Critical,
678                DataType::ContactReport,
679            ))
680            .unwrap();
681        queue
682            .enqueue(PendingSync::new(
683                vec![0; 50],
684                QoSClass::Bulk,
685                DataType::DebugLog,
686            ))
687            .unwrap();
688
689        let stats = queue.stats();
690        assert_eq!(stats.total_items, 2);
691        assert_eq!(stats.total_bytes, 150);
692        assert_eq!(stats.depth_critical, 1);
693        assert_eq!(stats.depth_bulk, 1);
694        assert!((stats.utilization() - 150.0 / 1024.0).abs() < 0.001);
695    }
696
697    #[test]
698    fn test_dequeue_batch() {
699        let mut queue = PrioritySyncQueue::new(1024);
700
701        for i in 0..5 {
702            queue
703                .enqueue(PendingSync::new(
704                    vec![i],
705                    QoSClass::Normal,
706                    DataType::HealthStatus,
707                ))
708                .unwrap();
709        }
710
711        let batch = queue.dequeue_batch(3);
712        assert_eq!(batch.len(), 3);
713        assert_eq!(queue.total_items(), 2);
714    }
715
716    #[test]
717    fn test_dequeue_bytes() {
718        let mut queue = PrioritySyncQueue::new(1024);
719
720        queue
721            .enqueue(PendingSync::new(
722                vec![0; 100],
723                QoSClass::Critical,
724                DataType::ContactReport,
725            ))
726            .unwrap();
727        queue
728            .enqueue(PendingSync::new(
729                vec![0; 100],
730                QoSClass::High,
731                DataType::TargetImage,
732            ))
733            .unwrap();
734        queue
735            .enqueue(PendingSync::new(
736                vec![0; 100],
737                QoSClass::Normal,
738                DataType::HealthStatus,
739            ))
740            .unwrap();
741
742        // Dequeue up to 150 bytes - should get 2 items (200 bytes, since we allow going over if empty)
743        let batch = queue.dequeue_bytes(150);
744        assert!(!batch.is_empty());
745    }
746
747    #[test]
748    fn test_effective_class_no_aging() {
749        let sync = PendingSync::new(vec![1], QoSClass::Bulk, DataType::DebugLog);
750
751        // Fresh item should not be promoted
752        assert_eq!(sync.effective_class(), QoSClass::Bulk);
753        assert!(!sync.should_promote());
754    }
755
756    #[test]
757    fn test_oldest_item_age() {
758        let mut queue = PrioritySyncQueue::new(1024);
759
760        assert!(queue.oldest_item_age().is_none());
761
762        queue
763            .enqueue(PendingSync::new(
764                vec![1],
765                QoSClass::Normal,
766                DataType::HealthStatus,
767            ))
768            .unwrap();
769
770        let age = queue.oldest_item_age().unwrap();
771        assert!(age < Duration::from_secs(1));
772    }
773
774    #[test]
775    fn test_available_bytes() {
776        let mut queue = PrioritySyncQueue::new(1000);
777
778        queue
779            .enqueue(PendingSync::new(
780                vec![0; 300],
781                QoSClass::Normal,
782                DataType::HealthStatus,
783            ))
784            .unwrap();
785
786        assert_eq!(queue.available_bytes(), 700);
787    }
788}