use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
use tracing::debug;
use super::protocol::{BackendCommand, CommandPriority, PrioritizedCommand};
const DEFAULT_QUEUE_MAX_DEPTH: usize = 1000;
#[derive(Debug)]
pub struct QueueEntry {
pub command: PrioritizedCommand,
pub enqueued_at: Instant,
pub deadline: Option<Instant>,
pub retry_attempt: u32,
pub sequence: u64,
}
impl QueueEntry {
pub fn new(command: PrioritizedCommand, sequence: u64) -> Self {
let now = Instant::now();
let deadline = command
.deadline_ms
.map(|ms| now + Duration::from_millis(ms));
Self {
command,
enqueued_at: now,
deadline,
retry_attempt: 0,
sequence,
}
}
pub fn is_expired(&self) -> bool {
self.deadline.map(|d| Instant::now() > d).unwrap_or(false)
}
pub fn time_until_deadline(&self) -> Option<Duration> {
self.deadline.and_then(|d| {
let now = Instant::now();
if now < d {
Some(d - now)
} else {
None
}
})
}
pub fn next_retry_delay(&self) -> Option<Duration> {
self.command.retry_policy.as_ref().and_then(|policy| {
if self.retry_attempt >= policy.max_attempts {
None
} else {
let delay_ms = policy.initial_delay_ms as f32
* policy.backoff_multiplier.powi(self.retry_attempt as i32);
Some(Duration::from_millis(delay_ms as u64))
}
})
}
pub fn increment_retry(&mut self) {
self.retry_attempt += 1;
}
pub fn should_retry(&self) -> bool {
self.command
.retry_policy
.as_ref()
.map(|p| self.retry_attempt < p.max_attempts)
.unwrap_or(false)
}
}
impl PartialEq for QueueEntry {
fn eq(&self, other: &Self) -> bool {
self.command.priority == other.command.priority && self.sequence == other.sequence
}
}
impl Eq for QueueEntry {}
impl PartialOrd for QueueEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for QueueEntry {
fn cmp(&self, other: &Self) -> Ordering {
match other.command.priority.cmp(&self.command.priority) {
Ordering::Equal => {
other.sequence.cmp(&self.sequence)
}
ord => ord,
}
}
}
pub struct CommandQueue {
queue: BinaryHeap<QueueEntry>,
sequence: u64,
max_depth: usize,
}
impl CommandQueue {
pub fn new(max_depth: usize) -> Self {
Self {
queue: BinaryHeap::new(),
sequence: 0,
max_depth,
}
}
pub fn enqueue(&mut self, command: PrioritizedCommand) -> Result<(), QueueError> {
if self.queue.len() >= self.max_depth {
if command.priority != CommandPriority::Critical {
return Err(QueueError::QueueFull);
}
}
let entry = QueueEntry::new(command, self.sequence);
self.sequence = self.sequence.wrapping_add(1);
self.queue.push(entry);
Ok(())
}
pub fn enqueue_simple(&mut self, command: BackendCommand) -> Result<(), QueueError> {
self.enqueue(PrioritizedCommand {
command,
priority: CommandPriority::Normal,
deadline_ms: None,
retry_policy: None,
})
}
pub fn dequeue(&mut self) -> Option<QueueEntry> {
self.remove_expired();
self.queue.pop()
}
pub fn peek(&self) -> Option<&QueueEntry> {
self.queue.peek()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn remove_expired(&mut self) {
let mut temp = BinaryHeap::new();
while let Some(entry) = self.queue.pop() {
if !entry.is_expired() {
temp.push(entry);
} else {
debug!(
"Removed expired command: {:?}",
std::mem::discriminant(&entry.command.command)
);
}
}
self.queue = temp;
}
pub fn requeue_for_retry(&mut self, mut entry: QueueEntry) -> Result<(), QueueError> {
if !entry.should_retry() {
return Err(QueueError::MaxRetriesExceeded);
}
entry.increment_retry();
entry.sequence = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
self.queue.push(entry);
Ok(())
}
pub fn stats(&self) -> QueueStats {
let mut critical = 0;
let mut high = 0;
let mut normal = 0;
let mut low = 0;
for entry in self.queue.iter() {
match entry.command.priority {
CommandPriority::Critical => critical += 1,
CommandPriority::High => high += 1,
CommandPriority::Normal => normal += 1,
CommandPriority::Low => low += 1,
}
}
QueueStats {
total: self.queue.len(),
critical,
high,
normal,
low,
}
}
}
impl Default for CommandQueue {
fn default() -> Self {
Self::new(DEFAULT_QUEUE_MAX_DEPTH)
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub total: usize,
pub critical: usize,
pub high: usize,
pub normal: usize,
pub low: usize,
}
#[derive(Debug, thiserror::Error)]
pub enum QueueError {
#[error("Queue is full")]
QueueFull,
#[error("Maximum retries exceeded")]
MaxRetriesExceeded,
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::protocol::RetryPolicy;
fn make_command(priority: CommandPriority) -> PrioritizedCommand {
PrioritizedCommand {
command: BackendCommand::Ping { timestamp: 0 },
priority,
deadline_ms: None,
retry_policy: None,
}
}
#[test]
fn test_priority_ordering() {
let mut queue = CommandQueue::new(100);
queue.enqueue(make_command(CommandPriority::Low)).unwrap();
queue.enqueue(make_command(CommandPriority::High)).unwrap();
queue
.enqueue(make_command(CommandPriority::Normal))
.unwrap();
queue
.enqueue(make_command(CommandPriority::Critical))
.unwrap();
assert_eq!(
queue.dequeue().unwrap().command.priority,
CommandPriority::Critical
);
assert_eq!(
queue.dequeue().unwrap().command.priority,
CommandPriority::High
);
assert_eq!(
queue.dequeue().unwrap().command.priority,
CommandPriority::Normal
);
assert_eq!(
queue.dequeue().unwrap().command.priority,
CommandPriority::Low
);
}
#[test]
fn test_fifo_within_priority() {
let mut queue = CommandQueue::new(100);
for i in 0..5 {
queue
.enqueue(PrioritizedCommand {
command: BackendCommand::Ping { timestamp: i },
priority: CommandPriority::Normal,
deadline_ms: None,
retry_policy: None,
})
.unwrap();
}
for i in 0..5 {
let entry = queue.dequeue().unwrap();
if let BackendCommand::Ping { timestamp } = entry.command.command {
assert_eq!(timestamp, i);
} else {
panic!("Expected Ping command");
}
}
}
#[test]
fn test_queue_full() {
let mut queue = CommandQueue::new(2);
queue
.enqueue(make_command(CommandPriority::Normal))
.unwrap();
queue
.enqueue(make_command(CommandPriority::Normal))
.unwrap();
assert!(matches!(
queue.enqueue(make_command(CommandPriority::Normal)),
Err(QueueError::QueueFull)
));
assert!(queue
.enqueue(make_command(CommandPriority::Critical))
.is_ok());
}
#[test]
fn test_retry_logic() {
let mut queue = CommandQueue::new(100);
let cmd = PrioritizedCommand {
command: BackendCommand::Ping { timestamp: 42 },
priority: CommandPriority::Normal,
deadline_ms: None,
retry_policy: Some(RetryPolicy {
max_attempts: 3,
backoff_multiplier: 2.0,
initial_delay_ms: 100,
}),
};
queue.enqueue(cmd).unwrap();
let mut entry = queue.dequeue().unwrap();
assert!(entry.should_retry());
queue.requeue_for_retry(entry).unwrap();
entry = queue.dequeue().unwrap();
assert_eq!(entry.retry_attempt, 1);
assert!(entry.should_retry());
queue.requeue_for_retry(entry).unwrap();
entry = queue.dequeue().unwrap();
assert_eq!(entry.retry_attempt, 2);
assert!(entry.should_retry());
queue.requeue_for_retry(entry).unwrap();
entry = queue.dequeue().unwrap();
assert_eq!(entry.retry_attempt, 3);
assert!(!entry.should_retry());
assert!(matches!(
queue.requeue_for_retry(entry),
Err(QueueError::MaxRetriesExceeded)
));
}
}