use super::classification::DataType;
use super::QoSClass;
use crate::{Error, Result};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PendingSync {
pub data: Vec<u8>,
pub qos_class: QoSClass,
pub data_type: DataType,
pub queued_at: Instant,
pub priority_multiplier: f32,
}
impl PendingSync {
pub fn new(data: Vec<u8>, qos_class: QoSClass, data_type: DataType) -> Self {
Self {
data,
qos_class,
data_type,
queued_at: Instant::now(),
priority_multiplier: 1.0,
}
}
pub fn with_multiplier(
data: Vec<u8>,
qos_class: QoSClass,
data_type: DataType,
multiplier: f32,
) -> Self {
Self {
data,
qos_class,
data_type,
queued_at: Instant::now(),
priority_multiplier: multiplier,
}
}
pub fn queue_duration(&self) -> Duration {
self.queued_at.elapsed()
}
pub fn effective_class(&self) -> QoSClass {
let wait_hours = self.queue_duration().as_secs_f32() / 3600.0;
let adjusted_hours = wait_hours * self.priority_multiplier;
match self.qos_class {
QoSClass::Bulk if adjusted_hours >= 1.0 => QoSClass::Low,
QoSClass::Low if adjusted_hours >= 2.0 => QoSClass::Normal,
other => other,
}
}
pub fn should_promote(&self) -> bool {
self.effective_class() != self.qos_class
}
pub fn size(&self) -> usize {
self.data.len()
}
}
#[derive(Debug)]
pub struct PrioritySyncQueue {
queues: [VecDeque<PendingSync>; 5],
total_bytes: AtomicUsize,
max_bytes: usize,
aging_promotions: AtomicUsize,
}
impl PrioritySyncQueue {
pub fn new(max_bytes: usize) -> Self {
Self {
queues: [
VecDeque::new(), VecDeque::new(), VecDeque::new(), VecDeque::new(), VecDeque::new(), ],
total_bytes: AtomicUsize::new(0),
max_bytes,
aging_promotions: AtomicUsize::new(0),
}
}
pub fn default_capacity() -> Self {
Self::new(10 * 1024 * 1024)
}
#[inline]
fn queue_index(class: QoSClass) -> usize {
(class.as_u8() - 1) as usize
}
pub fn enqueue(&mut self, sync: PendingSync) -> Result<()> {
let size = sync.size();
let current_bytes = self.total_bytes.load(Ordering::Relaxed);
if current_bytes + size > self.max_bytes {
return Err(Error::Internal(format!(
"Queue full: {} + {} > {} bytes",
current_bytes, size, self.max_bytes
)));
}
let idx = Self::queue_index(sync.qos_class);
self.queues[idx].push_back(sync);
self.total_bytes.fetch_add(size, Ordering::Relaxed);
Ok(())
}
pub fn dequeue_highest(&mut self) -> Option<PendingSync> {
for idx in 0..5 {
if let Some(sync) = self.queues[idx].pop_front() {
self.total_bytes.fetch_sub(sync.size(), Ordering::Relaxed);
return Some(sync);
}
}
None
}
pub fn peek_highest(&self) -> Option<&PendingSync> {
for idx in 0..5 {
if let Some(sync) = self.queues[idx].front() {
return Some(sync);
}
}
None
}
pub fn apply_aging(&mut self) -> usize {
let mut promoted = 0;
let bulk_idx = Self::queue_index(QoSClass::Bulk);
let low_idx = Self::queue_index(QoSClass::Low);
let mut to_promote_bulk = Vec::new();
self.queues[bulk_idx].retain(|sync| {
if sync.should_promote() && sync.effective_class() == QoSClass::Low {
to_promote_bulk.push(sync.clone());
false
} else {
true
}
});
for mut sync in to_promote_bulk {
sync.qos_class = QoSClass::Low;
self.queues[low_idx].push_back(sync);
promoted += 1;
}
let normal_idx = Self::queue_index(QoSClass::Normal);
let mut to_promote_low = Vec::new();
self.queues[low_idx].retain(|sync| {
if sync.should_promote() && sync.effective_class() == QoSClass::Normal {
to_promote_low.push(sync.clone());
false
} else {
true
}
});
for mut sync in to_promote_low {
sync.qos_class = QoSClass::Normal;
self.queues[normal_idx].push_back(sync);
promoted += 1;
}
if promoted > 0 {
self.aging_promotions.fetch_add(promoted, Ordering::Relaxed);
}
promoted
}
pub fn queue_depth(&self, class: QoSClass) -> usize {
let idx = Self::queue_index(class);
self.queues[idx].len()
}
pub fn total_bytes_queued(&self) -> usize {
self.total_bytes.load(Ordering::Relaxed)
}
pub fn total_items(&self) -> usize {
self.queues.iter().map(|q| q.len()).sum()
}
pub fn is_empty(&self) -> bool {
self.queues.iter().all(|q| q.is_empty())
}
pub fn is_full(&self) -> bool {
self.total_bytes.load(Ordering::Relaxed) >= self.max_bytes
}
pub fn available_bytes(&self) -> usize {
let current = self.total_bytes.load(Ordering::Relaxed);
self.max_bytes.saturating_sub(current)
}
pub fn max_bytes(&self) -> usize {
self.max_bytes
}
pub fn stats(&self) -> QueueStats {
QueueStats {
total_items: self.total_items(),
total_bytes: self.total_bytes_queued(),
max_bytes: self.max_bytes,
depth_critical: self.queue_depth(QoSClass::Critical),
depth_high: self.queue_depth(QoSClass::High),
depth_normal: self.queue_depth(QoSClass::Normal),
depth_low: self.queue_depth(QoSClass::Low),
depth_bulk: self.queue_depth(QoSClass::Bulk),
aging_promotions: self.aging_promotions.load(Ordering::Relaxed),
}
}
pub fn clear(&mut self) {
for queue in &mut self.queues {
queue.clear();
}
self.total_bytes.store(0, Ordering::Relaxed);
}
pub fn drain_class(&mut self, class: QoSClass) -> Vec<PendingSync> {
let idx = Self::queue_index(class);
let items: Vec<_> = self.queues[idx].drain(..).collect();
let bytes: usize = items.iter().map(|s| s.size()).sum();
self.total_bytes.fetch_sub(bytes, Ordering::Relaxed);
items
}
pub fn remove_stale(&mut self, max_age: Duration) -> usize {
let mut removed = 0;
let mut bytes_removed = 0;
for queue in &mut self.queues {
let old_len = queue.len();
queue.retain(|sync| {
let keep = sync.queue_duration() < max_age;
if !keep {
bytes_removed += sync.size();
}
keep
});
removed += old_len - queue.len();
}
if bytes_removed > 0 {
self.total_bytes.fetch_sub(bytes_removed, Ordering::Relaxed);
}
removed
}
pub fn oldest_item_age(&self) -> Option<Duration> {
self.queues
.iter()
.filter_map(|q| q.front())
.map(|s| s.queue_duration())
.max()
}
pub fn dequeue_batch(&mut self, max_items: usize) -> Vec<PendingSync> {
let mut batch = Vec::with_capacity(max_items);
while batch.len() < max_items {
if let Some(sync) = self.dequeue_highest() {
batch.push(sync);
} else {
break;
}
}
batch
}
pub fn dequeue_bytes(&mut self, max_bytes: usize) -> Vec<PendingSync> {
let mut batch = Vec::new();
let mut total_bytes = 0;
while total_bytes < max_bytes {
if let Some(peek) = self.peek_highest() {
if total_bytes + peek.size() > max_bytes && !batch.is_empty() {
break; }
}
if let Some(sync) = self.dequeue_highest() {
total_bytes += sync.size();
batch.push(sync);
} else {
break;
}
}
batch
}
}
#[derive(Debug, Clone, Copy)]
pub struct QueueStats {
pub total_items: usize,
pub total_bytes: usize,
pub max_bytes: usize,
pub depth_critical: usize,
pub depth_high: usize,
pub depth_normal: usize,
pub depth_low: usize,
pub depth_bulk: usize,
pub aging_promotions: usize,
}
impl QueueStats {
pub fn utilization(&self) -> f64 {
if self.max_bytes == 0 {
0.0
} else {
self.total_bytes as f64 / self.max_bytes as f64
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pending_sync_creation() {
let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Critical, DataType::ContactReport);
assert_eq!(sync.size(), 3);
assert_eq!(sync.qos_class, QoSClass::Critical);
assert_eq!(sync.priority_multiplier, 1.0);
}
#[test]
fn test_queue_creation() {
let queue = PrioritySyncQueue::new(1024);
assert!(queue.is_empty());
assert_eq!(queue.max_bytes(), 1024);
assert_eq!(queue.available_bytes(), 1024);
}
#[test]
fn test_enqueue_dequeue() {
let mut queue = PrioritySyncQueue::new(1024);
let sync = PendingSync::new(vec![1, 2, 3], QoSClass::Normal, DataType::HealthStatus);
queue.enqueue(sync).unwrap();
assert_eq!(queue.total_items(), 1);
assert_eq!(queue.total_bytes_queued(), 3);
let dequeued = queue.dequeue_highest().unwrap();
assert_eq!(dequeued.qos_class, QoSClass::Normal);
assert!(queue.is_empty());
}
#[test]
fn test_priority_ordering() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![5],
QoSClass::Bulk,
DataType::DebugLog,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![1],
QoSClass::Critical,
DataType::ContactReport,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![3],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
assert_eq!(
queue.dequeue_highest().unwrap().qos_class,
QoSClass::Critical
);
assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Normal);
assert_eq!(queue.dequeue_highest().unwrap().qos_class, QoSClass::Bulk);
}
#[test]
fn test_queue_full() {
let mut queue = PrioritySyncQueue::new(10);
let sync1 = PendingSync::new(vec![0; 8], QoSClass::Normal, DataType::HealthStatus);
queue.enqueue(sync1).unwrap();
let sync2 = PendingSync::new(vec![0; 5], QoSClass::Normal, DataType::HealthStatus);
assert!(queue.enqueue(sync2).is_err());
}
#[test]
fn test_queue_depth() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![1],
QoSClass::Critical,
DataType::ContactReport,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![2],
QoSClass::Critical,
DataType::EmergencyAlert,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![3],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
assert_eq!(queue.queue_depth(QoSClass::Critical), 2);
assert_eq!(queue.queue_depth(QoSClass::Normal), 1);
assert_eq!(queue.queue_depth(QoSClass::Bulk), 0);
}
#[test]
fn test_peek_highest() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![3],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![1],
QoSClass::Critical,
DataType::ContactReport,
))
.unwrap();
let peeked = queue.peek_highest().unwrap();
assert_eq!(peeked.qos_class, QoSClass::Critical);
assert_eq!(queue.total_items(), 2);
}
#[test]
fn test_clear() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![1; 100],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![2; 100],
QoSClass::High,
DataType::TargetImage,
))
.unwrap();
queue.clear();
assert!(queue.is_empty());
assert_eq!(queue.total_bytes_queued(), 0);
}
#[test]
fn test_drain_class() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![1],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![2],
QoSClass::Normal,
DataType::CapabilityChange,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![3],
QoSClass::High,
DataType::TargetImage,
))
.unwrap();
let drained = queue.drain_class(QoSClass::Normal);
assert_eq!(drained.len(), 2);
assert_eq!(queue.queue_depth(QoSClass::Normal), 0);
assert_eq!(queue.queue_depth(QoSClass::High), 1);
}
#[test]
fn test_stats() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![0; 100],
QoSClass::Critical,
DataType::ContactReport,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![0; 50],
QoSClass::Bulk,
DataType::DebugLog,
))
.unwrap();
let stats = queue.stats();
assert_eq!(stats.total_items, 2);
assert_eq!(stats.total_bytes, 150);
assert_eq!(stats.depth_critical, 1);
assert_eq!(stats.depth_bulk, 1);
assert!((stats.utilization() - 150.0 / 1024.0).abs() < 0.001);
}
#[test]
fn test_dequeue_batch() {
let mut queue = PrioritySyncQueue::new(1024);
for i in 0..5 {
queue
.enqueue(PendingSync::new(
vec![i],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
}
let batch = queue.dequeue_batch(3);
assert_eq!(batch.len(), 3);
assert_eq!(queue.total_items(), 2);
}
#[test]
fn test_dequeue_bytes() {
let mut queue = PrioritySyncQueue::new(1024);
queue
.enqueue(PendingSync::new(
vec![0; 100],
QoSClass::Critical,
DataType::ContactReport,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![0; 100],
QoSClass::High,
DataType::TargetImage,
))
.unwrap();
queue
.enqueue(PendingSync::new(
vec![0; 100],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
let batch = queue.dequeue_bytes(150);
assert!(!batch.is_empty());
}
#[test]
fn test_effective_class_no_aging() {
let sync = PendingSync::new(vec![1], QoSClass::Bulk, DataType::DebugLog);
assert_eq!(sync.effective_class(), QoSClass::Bulk);
assert!(!sync.should_promote());
}
#[test]
fn test_oldest_item_age() {
let mut queue = PrioritySyncQueue::new(1024);
assert!(queue.oldest_item_age().is_none());
queue
.enqueue(PendingSync::new(
vec![1],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
let age = queue.oldest_item_age().unwrap();
assert!(age < Duration::from_secs(1));
}
#[test]
fn test_available_bytes() {
let mut queue = PrioritySyncQueue::new(1000);
queue
.enqueue(PendingSync::new(
vec![0; 300],
QoSClass::Normal,
DataType::HealthStatus,
))
.unwrap();
assert_eq!(queue.available_bytes(), 700);
}
}