Skip to main content

brainwires_agents/
task_queue.rs

1//! Priority-based task queue for agent scheduling
2//!
3//! Provides a thread-safe queue with priority levels (Urgent, High, Normal, Low)
4//! for scheduling tasks across worker agents.
5
6use anyhow::Result;
7use std::collections::VecDeque;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11use brainwires_core::Task;
12// Re-export from core to maintain public API compatibility
13pub use brainwires_core::TaskPriority;
14
15/// A queued task with priority and metadata
16#[derive(Debug, Clone)]
17pub struct QueuedTask {
18    /// The underlying task.
19    pub task: Task,
20    /// Priority level.
21    pub priority: TaskPriority,
22    /// When the task was queued.
23    pub queued_at: std::time::SystemTime,
24    /// Worker ID if assigned.
25    pub assigned_to: Option<String>,
26}
27
28impl QueuedTask {
29    /// Create a new queued task
30    pub fn new(task: Task, priority: TaskPriority) -> Self {
31        Self {
32            task,
33            priority,
34            queued_at: std::time::SystemTime::now(),
35            assigned_to: None,
36        }
37    }
38
39    /// Assign this task to a worker
40    pub fn assign_to(&mut self, worker_id: String) {
41        self.assigned_to = Some(worker_id);
42    }
43
44    /// Check if task is assigned
45    pub fn is_assigned(&self) -> bool {
46        self.assigned_to.is_some()
47    }
48}
49
50/// Thread-safe task queue with priority support
51pub struct TaskQueue {
52    queues: Arc<Mutex<PriorityQueues>>,
53    max_size: usize,
54}
55
56struct PriorityQueues {
57    urgent: VecDeque<QueuedTask>,
58    high: VecDeque<QueuedTask>,
59    normal: VecDeque<QueuedTask>,
60    low: VecDeque<QueuedTask>,
61}
62
63impl TaskQueue {
64    /// Create a new task queue
65    pub fn new(max_size: usize) -> Self {
66        Self {
67            queues: Arc::new(Mutex::new(PriorityQueues {
68                urgent: VecDeque::new(),
69                high: VecDeque::new(),
70                normal: VecDeque::new(),
71                low: VecDeque::new(),
72            })),
73            max_size,
74        }
75    }
76
77    /// Add a task to the queue
78    pub async fn enqueue(&self, task: Task, priority: TaskPriority) -> Result<()> {
79        let mut queues = self.queues.lock().await;
80
81        // Check if queue is full
82        if self.total_size(&queues) >= self.max_size {
83            anyhow::bail!("Task queue is full (max: {})", self.max_size);
84        }
85
86        let queued_task = QueuedTask::new(task, priority);
87
88        match priority {
89            TaskPriority::Urgent => queues.urgent.push_back(queued_task),
90            TaskPriority::High => queues.high.push_back(queued_task),
91            TaskPriority::Normal => queues.normal.push_back(queued_task),
92            TaskPriority::Low => queues.low.push_back(queued_task),
93        }
94
95        Ok(())
96    }
97
98    /// Dequeue the highest priority task
99    pub async fn dequeue(&self) -> Option<QueuedTask> {
100        let mut queues = self.queues.lock().await;
101
102        // Try to dequeue from highest priority first
103        queues
104            .urgent
105            .pop_front()
106            .or_else(|| queues.high.pop_front())
107            .or_else(|| queues.normal.pop_front())
108            .or_else(|| queues.low.pop_front())
109    }
110
111    /// Dequeue a task and assign it to a worker
112    pub async fn dequeue_and_assign(&self, worker_id: String) -> Option<QueuedTask> {
113        let mut queues = self.queues.lock().await;
114
115        // Try to dequeue from highest priority first
116        let mut task = queues
117            .urgent
118            .pop_front()
119            .or_else(|| queues.high.pop_front())
120            .or_else(|| queues.normal.pop_front())
121            .or_else(|| queues.low.pop_front());
122
123        if let Some(ref mut t) = task {
124            t.assign_to(worker_id);
125        }
126
127        task
128    }
129
130    /// Peek at the next task without removing it
131    pub async fn peek(&self) -> Option<QueuedTask> {
132        let queues = self.queues.lock().await;
133
134        queues
135            .urgent
136            .front()
137            .or_else(|| queues.high.front())
138            .or_else(|| queues.normal.front())
139            .or_else(|| queues.low.front())
140            .cloned()
141    }
142
143    /// Get the total number of tasks in the queue
144    pub async fn size(&self) -> usize {
145        let queues = self.queues.lock().await;
146        self.total_size(&queues)
147    }
148
149    /// Get the number of tasks at each priority level
150    pub async fn size_by_priority(&self) -> (usize, usize, usize, usize) {
151        let queues = self.queues.lock().await;
152        (
153            queues.urgent.len(),
154            queues.high.len(),
155            queues.normal.len(),
156            queues.low.len(),
157        )
158    }
159
160    /// Check if the queue is empty
161    pub async fn is_empty(&self) -> bool {
162        self.size().await == 0
163    }
164
165    /// Check if the queue is full
166    pub async fn is_full(&self) -> bool {
167        self.size().await >= self.max_size
168    }
169
170    /// Clear all tasks from the queue
171    pub async fn clear(&self) {
172        let mut queues = self.queues.lock().await;
173        queues.urgent.clear();
174        queues.high.clear();
175        queues.normal.clear();
176        queues.low.clear();
177    }
178
179    /// Get all tasks (for inspection/debugging)
180    pub async fn all_tasks(&self) -> Vec<QueuedTask> {
181        let queues = self.queues.lock().await;
182        let mut tasks = Vec::new();
183
184        tasks.extend(queues.urgent.iter().cloned());
185        tasks.extend(queues.high.iter().cloned());
186        tasks.extend(queues.normal.iter().cloned());
187        tasks.extend(queues.low.iter().cloned());
188
189        tasks
190    }
191
192    /// Find tasks by status
193    pub async fn find_by_status(&self, status: brainwires_core::TaskStatus) -> Vec<QueuedTask> {
194        let all_tasks = self.all_tasks().await;
195        all_tasks
196            .into_iter()
197            .filter(|qt| qt.task.status == status)
198            .collect()
199    }
200
201    /// Remove a specific task by ID
202    pub async fn remove_by_id(&self, task_id: &str) -> Option<QueuedTask> {
203        let mut queues = self.queues.lock().await;
204
205        // Try to find and remove from each queue
206        if let Some(pos) = queues.urgent.iter().position(|t| t.task.id == task_id) {
207            return queues.urgent.remove(pos);
208        }
209        if let Some(pos) = queues.high.iter().position(|t| t.task.id == task_id) {
210            return queues.high.remove(pos);
211        }
212        if let Some(pos) = queues.normal.iter().position(|t| t.task.id == task_id) {
213            return queues.normal.remove(pos);
214        }
215        if let Some(pos) = queues.low.iter().position(|t| t.task.id == task_id) {
216            return queues.low.remove(pos);
217        }
218
219        None
220    }
221
222    /// Helper to calculate total size
223    fn total_size(&self, queues: &PriorityQueues) -> usize {
224        queues.urgent.len() + queues.high.len() + queues.normal.len() + queues.low.len()
225    }
226}
227
228impl Default for TaskQueue {
229    fn default() -> Self {
230        Self::new(100) // Default max size of 100 tasks
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[tokio::test]
239    async fn test_queue_enqueue_dequeue() {
240        let queue = TaskQueue::new(10);
241        let task = Task::new("test-1".to_string(), "Test task".to_string());
242
243        queue
244            .enqueue(task.clone(), TaskPriority::Normal)
245            .await
246            .unwrap();
247        assert_eq!(queue.size().await, 1);
248
249        let dequeued = queue.dequeue().await;
250        assert!(dequeued.is_some());
251        assert_eq!(dequeued.unwrap().task.id, "test-1");
252        assert_eq!(queue.size().await, 0);
253    }
254
255    #[tokio::test]
256    async fn test_priority_ordering() {
257        let queue = TaskQueue::new(10);
258
259        let low = Task::new("low-1".to_string(), "Low priority".to_string());
260        let normal = Task::new("normal-1".to_string(), "Normal priority".to_string());
261        let high = Task::new("high-1".to_string(), "High priority".to_string());
262        let urgent = Task::new("urgent-1".to_string(), "Urgent priority".to_string());
263
264        // Enqueue in reverse priority order
265        queue.enqueue(low, TaskPriority::Low).await.unwrap();
266        queue.enqueue(normal, TaskPriority::Normal).await.unwrap();
267        queue.enqueue(high, TaskPriority::High).await.unwrap();
268        queue.enqueue(urgent, TaskPriority::Urgent).await.unwrap();
269
270        // Should dequeue in priority order: Urgent, High, Normal, Low
271        assert_eq!(queue.dequeue().await.unwrap().task.id, "urgent-1");
272        assert_eq!(queue.dequeue().await.unwrap().task.id, "high-1");
273        assert_eq!(queue.dequeue().await.unwrap().task.id, "normal-1");
274        assert_eq!(queue.dequeue().await.unwrap().task.id, "low-1");
275    }
276
277    #[tokio::test]
278    async fn test_max_size() {
279        let queue = TaskQueue::new(2);
280
281        let task1 = Task::new("1".to_string(), "Task 1".to_string());
282        let task2 = Task::new("2".to_string(), "Task 2".to_string());
283        let task3 = Task::new("3".to_string(), "Task 3".to_string());
284
285        queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
286        queue.enqueue(task2, TaskPriority::Normal).await.unwrap();
287
288        // Should fail - queue is full
289        let result = queue.enqueue(task3, TaskPriority::Normal).await;
290        assert!(result.is_err());
291    }
292
293    #[tokio::test]
294    async fn test_remove_by_id() {
295        let queue = TaskQueue::new(10);
296
297        let task1 = Task::new("1".to_string(), "Task 1".to_string());
298        let task2 = Task::new("2".to_string(), "Task 2".to_string());
299
300        queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
301        queue.enqueue(task2, TaskPriority::High).await.unwrap();
302
303        assert_eq!(queue.size().await, 2);
304
305        let removed = queue.remove_by_id("1").await;
306        assert!(removed.is_some());
307        assert_eq!(removed.unwrap().task.id, "1");
308        assert_eq!(queue.size().await, 1);
309    }
310
311    #[tokio::test]
312    async fn test_assign_to_worker() {
313        let queue = TaskQueue::new(10);
314        let task = Task::new("test-1".to_string(), "Test task".to_string());
315
316        queue.enqueue(task, TaskPriority::Normal).await.unwrap();
317
318        let dequeued = queue.dequeue_and_assign("worker-1".to_string()).await;
319        assert!(dequeued.is_some());
320
321        let qt = dequeued.unwrap();
322        assert!(qt.is_assigned());
323        assert_eq!(qt.assigned_to.unwrap(), "worker-1");
324    }
325
326    #[tokio::test]
327    async fn test_peek() {
328        let queue = TaskQueue::new(10);
329        let task = Task::new("test-1".to_string(), "Test task".to_string());
330
331        queue
332            .enqueue(task.clone(), TaskPriority::High)
333            .await
334            .unwrap();
335
336        let peeked = queue.peek().await;
337        assert!(peeked.is_some());
338        assert_eq!(peeked.unwrap().task.id, "test-1");
339
340        // Size should still be 1 after peek
341        assert_eq!(queue.size().await, 1);
342    }
343
344    #[tokio::test]
345    async fn test_is_empty_and_full() {
346        let queue = TaskQueue::new(2);
347
348        assert!(queue.is_empty().await);
349        assert!(!queue.is_full().await);
350
351        let task1 = Task::new("1".to_string(), "Task 1".to_string());
352        let task2 = Task::new("2".to_string(), "Task 2".to_string());
353
354        queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
355        assert!(!queue.is_empty().await);
356        assert!(!queue.is_full().await);
357
358        queue.enqueue(task2, TaskPriority::Normal).await.unwrap();
359        assert!(!queue.is_empty().await);
360        assert!(queue.is_full().await);
361    }
362
363    #[tokio::test]
364    async fn test_clear() {
365        let queue = TaskQueue::new(10);
366        let task1 = Task::new("1".to_string(), "Task 1".to_string());
367        let task2 = Task::new("2".to_string(), "Task 2".to_string());
368
369        queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
370        queue.enqueue(task2, TaskPriority::High).await.unwrap();
371
372        assert_eq!(queue.size().await, 2);
373
374        queue.clear().await;
375
376        assert_eq!(queue.size().await, 0);
377        assert!(queue.is_empty().await);
378    }
379
380    #[tokio::test]
381    async fn test_size_by_priority() {
382        let queue = TaskQueue::new(10);
383
384        queue
385            .enqueue(
386                Task::new("1".to_string(), "T1".to_string()),
387                TaskPriority::Urgent,
388            )
389            .await
390            .unwrap();
391        queue
392            .enqueue(
393                Task::new("2".to_string(), "T2".to_string()),
394                TaskPriority::High,
395            )
396            .await
397            .unwrap();
398        queue
399            .enqueue(
400                Task::new("3".to_string(), "T3".to_string()),
401                TaskPriority::High,
402            )
403            .await
404            .unwrap();
405        queue
406            .enqueue(
407                Task::new("4".to_string(), "T4".to_string()),
408                TaskPriority::Normal,
409            )
410            .await
411            .unwrap();
412
413        let (urgent, high, normal, low) = queue.size_by_priority().await;
414        assert_eq!(urgent, 1);
415        assert_eq!(high, 2);
416        assert_eq!(normal, 1);
417        assert_eq!(low, 0);
418    }
419
420    #[tokio::test]
421    async fn test_default_queue() {
422        let queue = TaskQueue::default();
423        assert_eq!(queue.max_size, 100);
424    }
425
426    #[test]
427    fn test_task_priority_ordering() {
428        assert!(TaskPriority::Urgent > TaskPriority::High);
429        assert!(TaskPriority::High > TaskPriority::Normal);
430        assert!(TaskPriority::Normal > TaskPriority::Low);
431    }
432
433    #[test]
434    fn test_queued_task_new() {
435        let task = Task::new("test".to_string(), "Test task".to_string());
436        let queued = QueuedTask::new(task.clone(), TaskPriority::High);
437
438        assert_eq!(queued.task.id, task.id);
439        assert_eq!(queued.priority, TaskPriority::High);
440        assert!(!queued.is_assigned());
441    }
442}