brainwires_network/remote/
command_queue.rs1use std::cmp::Ordering;
7use std::collections::BinaryHeap;
8use std::time::{Duration, Instant};
9use tracing::debug;
10
11use super::protocol::{BackendCommand, CommandPriority, PrioritizedCommand};
12
13const DEFAULT_QUEUE_MAX_DEPTH: usize = 1000;
14
15#[derive(Debug)]
17pub struct QueueEntry {
18 pub command: PrioritizedCommand,
20 pub enqueued_at: Instant,
22 pub deadline: Option<Instant>,
24 pub retry_attempt: u32,
26 pub sequence: u64,
28}
29
30impl QueueEntry {
31 pub fn new(command: PrioritizedCommand, sequence: u64) -> Self {
33 let now = Instant::now();
34 let deadline = command
35 .deadline_ms
36 .map(|ms| now + Duration::from_millis(ms));
37
38 Self {
39 command,
40 enqueued_at: now,
41 deadline,
42 retry_attempt: 0,
43 sequence,
44 }
45 }
46
47 pub fn is_expired(&self) -> bool {
49 self.deadline.map(|d| Instant::now() > d).unwrap_or(false)
50 }
51
52 pub fn time_until_deadline(&self) -> Option<Duration> {
54 self.deadline.and_then(|d| {
55 let now = Instant::now();
56 if now < d { Some(d - now) } else { None }
57 })
58 }
59
60 pub fn next_retry_delay(&self) -> Option<Duration> {
62 self.command.retry_policy.as_ref().and_then(|policy| {
63 if self.retry_attempt >= policy.max_attempts {
64 None
65 } else {
66 let delay_ms = policy.initial_delay_ms as f32
67 * policy.backoff_multiplier.powi(self.retry_attempt as i32);
68 Some(Duration::from_millis(delay_ms as u64))
69 }
70 })
71 }
72
73 pub fn increment_retry(&mut self) {
75 self.retry_attempt += 1;
76 }
77
78 pub fn should_retry(&self) -> bool {
80 self.command
81 .retry_policy
82 .as_ref()
83 .map(|p| self.retry_attempt < p.max_attempts)
84 .unwrap_or(false)
85 }
86}
87
88impl PartialEq for QueueEntry {
90 fn eq(&self, other: &Self) -> bool {
91 self.command.priority == other.command.priority && self.sequence == other.sequence
92 }
93}
94
95impl Eq for QueueEntry {}
96
97impl PartialOrd for QueueEntry {
98 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
99 Some(self.cmp(other))
100 }
101}
102
103impl Ord for QueueEntry {
104 fn cmp(&self, other: &Self) -> Ordering {
105 match other.command.priority.cmp(&self.command.priority) {
108 Ordering::Equal => {
109 other.sequence.cmp(&self.sequence)
112 }
113 ord => ord,
114 }
115 }
116}
117
118pub struct CommandQueue {
120 queue: BinaryHeap<QueueEntry>,
122 sequence: u64,
124 max_depth: usize,
126}
127
128impl CommandQueue {
129 pub fn new(max_depth: usize) -> Self {
131 Self {
132 queue: BinaryHeap::new(),
133 sequence: 0,
134 max_depth,
135 }
136 }
137
138 pub fn enqueue(&mut self, command: PrioritizedCommand) -> Result<(), QueueError> {
140 if self.queue.len() >= self.max_depth {
142 if command.priority != CommandPriority::Critical {
144 return Err(QueueError::QueueFull);
145 }
146 }
147
148 let entry = QueueEntry::new(command, self.sequence);
149 self.sequence = self.sequence.wrapping_add(1);
150 self.queue.push(entry);
151 Ok(())
152 }
153
154 pub fn enqueue_simple(&mut self, command: BackendCommand) -> Result<(), QueueError> {
156 self.enqueue(PrioritizedCommand {
157 command,
158 priority: CommandPriority::Normal,
159 deadline_ms: None,
160 retry_policy: None,
161 })
162 }
163
164 pub fn dequeue(&mut self) -> Option<QueueEntry> {
166 self.remove_expired();
168
169 self.queue.pop()
170 }
171
172 pub fn peek(&self) -> Option<&QueueEntry> {
174 self.queue.peek()
175 }
176
177 pub fn len(&self) -> usize {
179 self.queue.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.queue.is_empty()
185 }
186
187 fn remove_expired(&mut self) {
189 let mut temp = BinaryHeap::new();
190 while let Some(entry) = self.queue.pop() {
191 if !entry.is_expired() {
192 temp.push(entry);
193 } else {
194 debug!(
195 "Removed expired command: {:?}",
196 std::mem::discriminant(&entry.command.command)
197 );
198 }
199 }
200 self.queue = temp;
201 }
202
203 pub fn requeue_for_retry(&mut self, mut entry: QueueEntry) -> Result<(), QueueError> {
205 if !entry.should_retry() {
206 return Err(QueueError::MaxRetriesExceeded);
207 }
208
209 entry.increment_retry();
210 entry.sequence = self.sequence;
212 self.sequence = self.sequence.wrapping_add(1);
213 self.queue.push(entry);
214 Ok(())
215 }
216
217 pub fn stats(&self) -> QueueStats {
219 let mut critical = 0;
220 let mut high = 0;
221 let mut normal = 0;
222 let mut low = 0;
223
224 for entry in self.queue.iter() {
225 match entry.command.priority {
226 CommandPriority::Critical => critical += 1,
227 CommandPriority::High => high += 1,
228 CommandPriority::Normal => normal += 1,
229 CommandPriority::Low => low += 1,
230 }
231 }
232
233 QueueStats {
234 total: self.queue.len(),
235 critical,
236 high,
237 normal,
238 low,
239 }
240 }
241}
242
243impl Default for CommandQueue {
244 fn default() -> Self {
245 Self::new(DEFAULT_QUEUE_MAX_DEPTH)
246 }
247}
248
249#[derive(Debug, Clone, Default)]
251pub struct QueueStats {
252 pub total: usize,
254 pub critical: usize,
256 pub high: usize,
258 pub normal: usize,
260 pub low: usize,
262}
263
264#[derive(Debug, thiserror::Error)]
266pub enum QueueError {
267 #[error("Queue is full")]
269 QueueFull,
270 #[error("Maximum retries exceeded")]
272 MaxRetriesExceeded,
273}
274
275#[cfg(test)]
276mod tests {
277 use super::super::protocol::RetryPolicy;
278 use super::*;
279
280 fn make_command(priority: CommandPriority) -> PrioritizedCommand {
281 PrioritizedCommand {
282 command: BackendCommand::Ping { timestamp: 0 },
283 priority,
284 deadline_ms: None,
285 retry_policy: None,
286 }
287 }
288
289 #[test]
290 fn test_priority_ordering() {
291 let mut queue = CommandQueue::new(100);
292
293 queue.enqueue(make_command(CommandPriority::Low)).unwrap();
294 queue.enqueue(make_command(CommandPriority::High)).unwrap();
295 queue
296 .enqueue(make_command(CommandPriority::Normal))
297 .unwrap();
298 queue
299 .enqueue(make_command(CommandPriority::Critical))
300 .unwrap();
301
302 assert_eq!(
303 queue.dequeue().unwrap().command.priority,
304 CommandPriority::Critical
305 );
306 assert_eq!(
307 queue.dequeue().unwrap().command.priority,
308 CommandPriority::High
309 );
310 assert_eq!(
311 queue.dequeue().unwrap().command.priority,
312 CommandPriority::Normal
313 );
314 assert_eq!(
315 queue.dequeue().unwrap().command.priority,
316 CommandPriority::Low
317 );
318 }
319
320 #[test]
321 fn test_fifo_within_priority() {
322 let mut queue = CommandQueue::new(100);
323
324 for i in 0..5 {
326 queue
327 .enqueue(PrioritizedCommand {
328 command: BackendCommand::Ping { timestamp: i },
329 priority: CommandPriority::Normal,
330 deadline_ms: None,
331 retry_policy: None,
332 })
333 .unwrap();
334 }
335
336 for i in 0..5 {
338 let entry = queue.dequeue().unwrap();
339 if let BackendCommand::Ping { timestamp } = entry.command.command {
340 assert_eq!(timestamp, i);
341 } else {
342 panic!("Expected Ping command");
343 }
344 }
345 }
346
347 #[test]
348 fn test_queue_full() {
349 let mut queue = CommandQueue::new(2);
350
351 queue
352 .enqueue(make_command(CommandPriority::Normal))
353 .unwrap();
354 queue
355 .enqueue(make_command(CommandPriority::Normal))
356 .unwrap();
357
358 assert!(matches!(
360 queue.enqueue(make_command(CommandPriority::Normal)),
361 Err(QueueError::QueueFull)
362 ));
363
364 assert!(
366 queue
367 .enqueue(make_command(CommandPriority::Critical))
368 .is_ok()
369 );
370 }
371
372 #[test]
373 fn test_retry_logic() {
374 let mut queue = CommandQueue::new(100);
375
376 let cmd = PrioritizedCommand {
377 command: BackendCommand::Ping { timestamp: 42 },
378 priority: CommandPriority::Normal,
379 deadline_ms: None,
380 retry_policy: Some(RetryPolicy {
381 max_attempts: 3,
382 backoff_multiplier: 2.0,
383 initial_delay_ms: 100,
384 }),
385 };
386
387 queue.enqueue(cmd).unwrap();
388 let mut entry = queue.dequeue().unwrap();
389
390 assert!(entry.should_retry());
392 queue.requeue_for_retry(entry).unwrap();
393
394 entry = queue.dequeue().unwrap();
395 assert_eq!(entry.retry_attempt, 1);
396 assert!(entry.should_retry());
397 queue.requeue_for_retry(entry).unwrap();
398
399 entry = queue.dequeue().unwrap();
400 assert_eq!(entry.retry_attempt, 2);
401 assert!(entry.should_retry());
402 queue.requeue_for_retry(entry).unwrap();
403
404 entry = queue.dequeue().unwrap();
405 assert_eq!(entry.retry_attempt, 3);
406 assert!(!entry.should_retry()); assert!(matches!(
410 queue.requeue_for_retry(entry),
411 Err(QueueError::MaxRetriesExceeded)
412 ));
413 }
414}