Skip to main content

brainwires_network/remote/
command_queue.rs

1//! Priority Command Queue
2//!
3//! Implements a priority queue for remote commands with deadline tracking
4//! and retry logic.
5
6use std::cmp::Ordering;
7use std::collections::BinaryHeap;
8use std::time::{Duration, Instant};
9use tracing::debug;
10
11use super::protocol::{BackendCommand, CommandPriority, PrioritizedCommand};
12
13const DEFAULT_QUEUE_MAX_DEPTH: usize = 1000;
14
15/// Entry in the priority queue
16#[derive(Debug)]
17pub struct QueueEntry {
18    /// The prioritized command
19    pub command: PrioritizedCommand,
20    /// When the command was enqueued
21    pub enqueued_at: Instant,
22    /// Deadline instant (if set)
23    pub deadline: Option<Instant>,
24    /// Current retry attempt (0-based)
25    pub retry_attempt: u32,
26    /// Sequence number for FIFO within same priority
27    pub sequence: u64,
28}
29
30impl QueueEntry {
31    /// Create a new queue entry
32    pub fn new(command: PrioritizedCommand, sequence: u64) -> Self {
33        let now = Instant::now();
34        let deadline = command
35            .deadline_ms
36            .map(|ms| now + Duration::from_millis(ms));
37
38        Self {
39            command,
40            enqueued_at: now,
41            deadline,
42            retry_attempt: 0,
43            sequence,
44        }
45    }
46
47    /// Check if the command has expired
48    pub fn is_expired(&self) -> bool {
49        self.deadline.map(|d| Instant::now() > d).unwrap_or(false)
50    }
51
52    /// Get time until deadline (if set)
53    pub fn time_until_deadline(&self) -> Option<Duration> {
54        self.deadline.and_then(|d| {
55            let now = Instant::now();
56            if now < d { Some(d - now) } else { None }
57        })
58    }
59
60    /// Calculate next retry delay
61    pub fn next_retry_delay(&self) -> Option<Duration> {
62        self.command.retry_policy.as_ref().and_then(|policy| {
63            if self.retry_attempt >= policy.max_attempts {
64                None
65            } else {
66                let delay_ms = policy.initial_delay_ms as f32
67                    * policy.backoff_multiplier.powi(self.retry_attempt as i32);
68                Some(Duration::from_millis(delay_ms as u64))
69            }
70        })
71    }
72
73    /// Increment retry attempt
74    pub fn increment_retry(&mut self) {
75        self.retry_attempt += 1;
76    }
77
78    /// Check if should retry
79    pub fn should_retry(&self) -> bool {
80        self.command
81            .retry_policy
82            .as_ref()
83            .map(|p| self.retry_attempt < p.max_attempts)
84            .unwrap_or(false)
85    }
86}
87
88// Implement ordering for BinaryHeap (max-heap, so we reverse for min priority)
89impl PartialEq for QueueEntry {
90    fn eq(&self, other: &Self) -> bool {
91        self.command.priority == other.command.priority && self.sequence == other.sequence
92    }
93}
94
95impl Eq for QueueEntry {}
96
97impl PartialOrd for QueueEntry {
98    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
99        Some(self.cmp(other))
100    }
101}
102
103impl Ord for QueueEntry {
104    fn cmp(&self, other: &Self) -> Ordering {
105        // Lower priority value = higher priority (Critical=0 is highest)
106        // Reverse ordering because BinaryHeap is a max-heap
107        match other.command.priority.cmp(&self.command.priority) {
108            Ordering::Equal => {
109                // Within same priority, use sequence (FIFO)
110                // Lower sequence = earlier = should be processed first
111                other.sequence.cmp(&self.sequence)
112            }
113            ord => ord,
114        }
115    }
116}
117
118/// Priority command queue
119pub struct CommandQueue {
120    /// The priority queue
121    queue: BinaryHeap<QueueEntry>,
122    /// Sequence counter for FIFO ordering within priority
123    sequence: u64,
124    /// Maximum queue depth
125    max_depth: usize,
126}
127
128impl CommandQueue {
129    /// Create a new command queue
130    pub fn new(max_depth: usize) -> Self {
131        Self {
132            queue: BinaryHeap::new(),
133            sequence: 0,
134            max_depth,
135        }
136    }
137
138    /// Enqueue a command with priority
139    pub fn enqueue(&mut self, command: PrioritizedCommand) -> Result<(), QueueError> {
140        // Check queue depth
141        if self.queue.len() >= self.max_depth {
142            // For critical commands, we allow exceeding the limit
143            if command.priority != CommandPriority::Critical {
144                return Err(QueueError::QueueFull);
145            }
146        }
147
148        let entry = QueueEntry::new(command, self.sequence);
149        self.sequence = self.sequence.wrapping_add(1);
150        self.queue.push(entry);
151        Ok(())
152    }
153
154    /// Enqueue a simple command (no priority metadata)
155    pub fn enqueue_simple(&mut self, command: BackendCommand) -> Result<(), QueueError> {
156        self.enqueue(PrioritizedCommand {
157            command,
158            priority: CommandPriority::Normal,
159            deadline_ms: None,
160            retry_policy: None,
161        })
162    }
163
164    /// Dequeue the highest priority command
165    pub fn dequeue(&mut self) -> Option<QueueEntry> {
166        // Remove expired entries
167        self.remove_expired();
168
169        self.queue.pop()
170    }
171
172    /// Peek at the highest priority command without removing it
173    pub fn peek(&self) -> Option<&QueueEntry> {
174        self.queue.peek()
175    }
176
177    /// Get current queue depth
178    pub fn len(&self) -> usize {
179        self.queue.len()
180    }
181
182    /// Check if queue is empty
183    pub fn is_empty(&self) -> bool {
184        self.queue.is_empty()
185    }
186
187    /// Remove expired entries
188    fn remove_expired(&mut self) {
189        let mut temp = BinaryHeap::new();
190        while let Some(entry) = self.queue.pop() {
191            if !entry.is_expired() {
192                temp.push(entry);
193            } else {
194                debug!(
195                    "Removed expired command: {:?}",
196                    std::mem::discriminant(&entry.command.command)
197                );
198            }
199        }
200        self.queue = temp;
201    }
202
203    /// Re-enqueue a command for retry
204    pub fn requeue_for_retry(&mut self, mut entry: QueueEntry) -> Result<(), QueueError> {
205        if !entry.should_retry() {
206            return Err(QueueError::MaxRetriesExceeded);
207        }
208
209        entry.increment_retry();
210        // Update sequence to maintain fairness
211        entry.sequence = self.sequence;
212        self.sequence = self.sequence.wrapping_add(1);
213        self.queue.push(entry);
214        Ok(())
215    }
216
217    /// Get queue statistics
218    pub fn stats(&self) -> QueueStats {
219        let mut critical = 0;
220        let mut high = 0;
221        let mut normal = 0;
222        let mut low = 0;
223
224        for entry in self.queue.iter() {
225            match entry.command.priority {
226                CommandPriority::Critical => critical += 1,
227                CommandPriority::High => high += 1,
228                CommandPriority::Normal => normal += 1,
229                CommandPriority::Low => low += 1,
230            }
231        }
232
233        QueueStats {
234            total: self.queue.len(),
235            critical,
236            high,
237            normal,
238            low,
239        }
240    }
241}
242
243impl Default for CommandQueue {
244    fn default() -> Self {
245        Self::new(DEFAULT_QUEUE_MAX_DEPTH)
246    }
247}
248
249/// Queue statistics
250#[derive(Debug, Clone, Default)]
251pub struct QueueStats {
252    /// Total number of commands in the queue.
253    pub total: usize,
254    /// Number of critical priority commands.
255    pub critical: usize,
256    /// Number of high priority commands.
257    pub high: usize,
258    /// Number of normal priority commands.
259    pub normal: usize,
260    /// Number of low priority commands.
261    pub low: usize,
262}
263
264/// Queue errors
265#[derive(Debug, thiserror::Error)]
266pub enum QueueError {
267    /// The queue has reached its maximum depth.
268    #[error("Queue is full")]
269    QueueFull,
270    /// The command has exceeded its maximum retry attempts.
271    #[error("Maximum retries exceeded")]
272    MaxRetriesExceeded,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::super::protocol::RetryPolicy;
278    use super::*;
279
280    fn make_command(priority: CommandPriority) -> PrioritizedCommand {
281        PrioritizedCommand {
282            command: BackendCommand::Ping { timestamp: 0 },
283            priority,
284            deadline_ms: None,
285            retry_policy: None,
286        }
287    }
288
289    #[test]
290    fn test_priority_ordering() {
291        let mut queue = CommandQueue::new(100);
292
293        queue.enqueue(make_command(CommandPriority::Low)).unwrap();
294        queue.enqueue(make_command(CommandPriority::High)).unwrap();
295        queue
296            .enqueue(make_command(CommandPriority::Normal))
297            .unwrap();
298        queue
299            .enqueue(make_command(CommandPriority::Critical))
300            .unwrap();
301
302        assert_eq!(
303            queue.dequeue().unwrap().command.priority,
304            CommandPriority::Critical
305        );
306        assert_eq!(
307            queue.dequeue().unwrap().command.priority,
308            CommandPriority::High
309        );
310        assert_eq!(
311            queue.dequeue().unwrap().command.priority,
312            CommandPriority::Normal
313        );
314        assert_eq!(
315            queue.dequeue().unwrap().command.priority,
316            CommandPriority::Low
317        );
318    }
319
320    #[test]
321    fn test_fifo_within_priority() {
322        let mut queue = CommandQueue::new(100);
323
324        // Enqueue multiple normal priority commands
325        for i in 0..5 {
326            queue
327                .enqueue(PrioritizedCommand {
328                    command: BackendCommand::Ping { timestamp: i },
329                    priority: CommandPriority::Normal,
330                    deadline_ms: None,
331                    retry_policy: None,
332                })
333                .unwrap();
334        }
335
336        // Should come out in FIFO order
337        for i in 0..5 {
338            let entry = queue.dequeue().unwrap();
339            if let BackendCommand::Ping { timestamp } = entry.command.command {
340                assert_eq!(timestamp, i);
341            } else {
342                panic!("Expected Ping command");
343            }
344        }
345    }
346
347    #[test]
348    fn test_queue_full() {
349        let mut queue = CommandQueue::new(2);
350
351        queue
352            .enqueue(make_command(CommandPriority::Normal))
353            .unwrap();
354        queue
355            .enqueue(make_command(CommandPriority::Normal))
356            .unwrap();
357
358        // Third normal should fail
359        assert!(matches!(
360            queue.enqueue(make_command(CommandPriority::Normal)),
361            Err(QueueError::QueueFull)
362        ));
363
364        // But critical should succeed even when full
365        assert!(
366            queue
367                .enqueue(make_command(CommandPriority::Critical))
368                .is_ok()
369        );
370    }
371
372    #[test]
373    fn test_retry_logic() {
374        let mut queue = CommandQueue::new(100);
375
376        let cmd = PrioritizedCommand {
377            command: BackendCommand::Ping { timestamp: 42 },
378            priority: CommandPriority::Normal,
379            deadline_ms: None,
380            retry_policy: Some(RetryPolicy {
381                max_attempts: 3,
382                backoff_multiplier: 2.0,
383                initial_delay_ms: 100,
384            }),
385        };
386
387        queue.enqueue(cmd).unwrap();
388        let mut entry = queue.dequeue().unwrap();
389
390        // Should be able to retry 3 times
391        assert!(entry.should_retry());
392        queue.requeue_for_retry(entry).unwrap();
393
394        entry = queue.dequeue().unwrap();
395        assert_eq!(entry.retry_attempt, 1);
396        assert!(entry.should_retry());
397        queue.requeue_for_retry(entry).unwrap();
398
399        entry = queue.dequeue().unwrap();
400        assert_eq!(entry.retry_attempt, 2);
401        assert!(entry.should_retry());
402        queue.requeue_for_retry(entry).unwrap();
403
404        entry = queue.dequeue().unwrap();
405        assert_eq!(entry.retry_attempt, 3);
406        assert!(!entry.should_retry()); // No more retries
407
408        // Should fail to requeue
409        assert!(matches!(
410            queue.requeue_for_retry(entry),
411            Err(QueueError::MaxRetriesExceeded)
412        ));
413    }
414}