1use anyhow::Result;
7use std::collections::VecDeque;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11use brainwires_core::Task;
12pub use brainwires_core::TaskPriority;
14
15#[derive(Debug, Clone)]
17pub struct QueuedTask {
18 pub task: Task,
20 pub priority: TaskPriority,
22 pub queued_at: std::time::SystemTime,
24 pub assigned_to: Option<String>,
26}
27
28impl QueuedTask {
29 pub fn new(task: Task, priority: TaskPriority) -> Self {
31 Self {
32 task,
33 priority,
34 queued_at: std::time::SystemTime::now(),
35 assigned_to: None,
36 }
37 }
38
39 pub fn assign_to(&mut self, worker_id: String) {
41 self.assigned_to = Some(worker_id);
42 }
43
44 pub fn is_assigned(&self) -> bool {
46 self.assigned_to.is_some()
47 }
48}
49
50pub struct TaskQueue {
52 queues: Arc<Mutex<PriorityQueues>>,
53 max_size: usize,
54}
55
56struct PriorityQueues {
57 urgent: VecDeque<QueuedTask>,
58 high: VecDeque<QueuedTask>,
59 normal: VecDeque<QueuedTask>,
60 low: VecDeque<QueuedTask>,
61}
62
63impl TaskQueue {
64 pub fn new(max_size: usize) -> Self {
66 Self {
67 queues: Arc::new(Mutex::new(PriorityQueues {
68 urgent: VecDeque::new(),
69 high: VecDeque::new(),
70 normal: VecDeque::new(),
71 low: VecDeque::new(),
72 })),
73 max_size,
74 }
75 }
76
77 pub async fn enqueue(&self, task: Task, priority: TaskPriority) -> Result<()> {
79 let mut queues = self.queues.lock().await;
80
81 if self.total_size(&queues) >= self.max_size {
83 anyhow::bail!("Task queue is full (max: {})", self.max_size);
84 }
85
86 let queued_task = QueuedTask::new(task, priority);
87
88 match priority {
89 TaskPriority::Urgent => queues.urgent.push_back(queued_task),
90 TaskPriority::High => queues.high.push_back(queued_task),
91 TaskPriority::Normal => queues.normal.push_back(queued_task),
92 TaskPriority::Low => queues.low.push_back(queued_task),
93 }
94
95 Ok(())
96 }
97
98 pub async fn dequeue(&self) -> Option<QueuedTask> {
100 let mut queues = self.queues.lock().await;
101
102 queues
104 .urgent
105 .pop_front()
106 .or_else(|| queues.high.pop_front())
107 .or_else(|| queues.normal.pop_front())
108 .or_else(|| queues.low.pop_front())
109 }
110
111 pub async fn dequeue_and_assign(&self, worker_id: String) -> Option<QueuedTask> {
113 let mut queues = self.queues.lock().await;
114
115 let mut task = queues
117 .urgent
118 .pop_front()
119 .or_else(|| queues.high.pop_front())
120 .or_else(|| queues.normal.pop_front())
121 .or_else(|| queues.low.pop_front());
122
123 if let Some(ref mut t) = task {
124 t.assign_to(worker_id);
125 }
126
127 task
128 }
129
130 pub async fn peek(&self) -> Option<QueuedTask> {
132 let queues = self.queues.lock().await;
133
134 queues
135 .urgent
136 .front()
137 .or_else(|| queues.high.front())
138 .or_else(|| queues.normal.front())
139 .or_else(|| queues.low.front())
140 .cloned()
141 }
142
143 pub async fn size(&self) -> usize {
145 let queues = self.queues.lock().await;
146 self.total_size(&queues)
147 }
148
149 pub async fn size_by_priority(&self) -> (usize, usize, usize, usize) {
151 let queues = self.queues.lock().await;
152 (
153 queues.urgent.len(),
154 queues.high.len(),
155 queues.normal.len(),
156 queues.low.len(),
157 )
158 }
159
160 pub async fn is_empty(&self) -> bool {
162 self.size().await == 0
163 }
164
165 pub async fn is_full(&self) -> bool {
167 self.size().await >= self.max_size
168 }
169
170 pub async fn clear(&self) {
172 let mut queues = self.queues.lock().await;
173 queues.urgent.clear();
174 queues.high.clear();
175 queues.normal.clear();
176 queues.low.clear();
177 }
178
179 pub async fn all_tasks(&self) -> Vec<QueuedTask> {
181 let queues = self.queues.lock().await;
182 let mut tasks = Vec::new();
183
184 tasks.extend(queues.urgent.iter().cloned());
185 tasks.extend(queues.high.iter().cloned());
186 tasks.extend(queues.normal.iter().cloned());
187 tasks.extend(queues.low.iter().cloned());
188
189 tasks
190 }
191
192 pub async fn find_by_status(&self, status: brainwires_core::TaskStatus) -> Vec<QueuedTask> {
194 let all_tasks = self.all_tasks().await;
195 all_tasks
196 .into_iter()
197 .filter(|qt| qt.task.status == status)
198 .collect()
199 }
200
201 pub async fn remove_by_id(&self, task_id: &str) -> Option<QueuedTask> {
203 let mut queues = self.queues.lock().await;
204
205 if let Some(pos) = queues.urgent.iter().position(|t| t.task.id == task_id) {
207 return queues.urgent.remove(pos);
208 }
209 if let Some(pos) = queues.high.iter().position(|t| t.task.id == task_id) {
210 return queues.high.remove(pos);
211 }
212 if let Some(pos) = queues.normal.iter().position(|t| t.task.id == task_id) {
213 return queues.normal.remove(pos);
214 }
215 if let Some(pos) = queues.low.iter().position(|t| t.task.id == task_id) {
216 return queues.low.remove(pos);
217 }
218
219 None
220 }
221
222 fn total_size(&self, queues: &PriorityQueues) -> usize {
224 queues.urgent.len() + queues.high.len() + queues.normal.len() + queues.low.len()
225 }
226}
227
228impl Default for TaskQueue {
229 fn default() -> Self {
230 Self::new(100) }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[tokio::test]
239 async fn test_queue_enqueue_dequeue() {
240 let queue = TaskQueue::new(10);
241 let task = Task::new("test-1".to_string(), "Test task".to_string());
242
243 queue
244 .enqueue(task.clone(), TaskPriority::Normal)
245 .await
246 .unwrap();
247 assert_eq!(queue.size().await, 1);
248
249 let dequeued = queue.dequeue().await;
250 assert!(dequeued.is_some());
251 assert_eq!(dequeued.unwrap().task.id, "test-1");
252 assert_eq!(queue.size().await, 0);
253 }
254
255 #[tokio::test]
256 async fn test_priority_ordering() {
257 let queue = TaskQueue::new(10);
258
259 let low = Task::new("low-1".to_string(), "Low priority".to_string());
260 let normal = Task::new("normal-1".to_string(), "Normal priority".to_string());
261 let high = Task::new("high-1".to_string(), "High priority".to_string());
262 let urgent = Task::new("urgent-1".to_string(), "Urgent priority".to_string());
263
264 queue.enqueue(low, TaskPriority::Low).await.unwrap();
266 queue.enqueue(normal, TaskPriority::Normal).await.unwrap();
267 queue.enqueue(high, TaskPriority::High).await.unwrap();
268 queue.enqueue(urgent, TaskPriority::Urgent).await.unwrap();
269
270 assert_eq!(queue.dequeue().await.unwrap().task.id, "urgent-1");
272 assert_eq!(queue.dequeue().await.unwrap().task.id, "high-1");
273 assert_eq!(queue.dequeue().await.unwrap().task.id, "normal-1");
274 assert_eq!(queue.dequeue().await.unwrap().task.id, "low-1");
275 }
276
277 #[tokio::test]
278 async fn test_max_size() {
279 let queue = TaskQueue::new(2);
280
281 let task1 = Task::new("1".to_string(), "Task 1".to_string());
282 let task2 = Task::new("2".to_string(), "Task 2".to_string());
283 let task3 = Task::new("3".to_string(), "Task 3".to_string());
284
285 queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
286 queue.enqueue(task2, TaskPriority::Normal).await.unwrap();
287
288 let result = queue.enqueue(task3, TaskPriority::Normal).await;
290 assert!(result.is_err());
291 }
292
293 #[tokio::test]
294 async fn test_remove_by_id() {
295 let queue = TaskQueue::new(10);
296
297 let task1 = Task::new("1".to_string(), "Task 1".to_string());
298 let task2 = Task::new("2".to_string(), "Task 2".to_string());
299
300 queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
301 queue.enqueue(task2, TaskPriority::High).await.unwrap();
302
303 assert_eq!(queue.size().await, 2);
304
305 let removed = queue.remove_by_id("1").await;
306 assert!(removed.is_some());
307 assert_eq!(removed.unwrap().task.id, "1");
308 assert_eq!(queue.size().await, 1);
309 }
310
311 #[tokio::test]
312 async fn test_assign_to_worker() {
313 let queue = TaskQueue::new(10);
314 let task = Task::new("test-1".to_string(), "Test task".to_string());
315
316 queue.enqueue(task, TaskPriority::Normal).await.unwrap();
317
318 let dequeued = queue.dequeue_and_assign("worker-1".to_string()).await;
319 assert!(dequeued.is_some());
320
321 let qt = dequeued.unwrap();
322 assert!(qt.is_assigned());
323 assert_eq!(qt.assigned_to.unwrap(), "worker-1");
324 }
325
326 #[tokio::test]
327 async fn test_peek() {
328 let queue = TaskQueue::new(10);
329 let task = Task::new("test-1".to_string(), "Test task".to_string());
330
331 queue
332 .enqueue(task.clone(), TaskPriority::High)
333 .await
334 .unwrap();
335
336 let peeked = queue.peek().await;
337 assert!(peeked.is_some());
338 assert_eq!(peeked.unwrap().task.id, "test-1");
339
340 assert_eq!(queue.size().await, 1);
342 }
343
344 #[tokio::test]
345 async fn test_is_empty_and_full() {
346 let queue = TaskQueue::new(2);
347
348 assert!(queue.is_empty().await);
349 assert!(!queue.is_full().await);
350
351 let task1 = Task::new("1".to_string(), "Task 1".to_string());
352 let task2 = Task::new("2".to_string(), "Task 2".to_string());
353
354 queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
355 assert!(!queue.is_empty().await);
356 assert!(!queue.is_full().await);
357
358 queue.enqueue(task2, TaskPriority::Normal).await.unwrap();
359 assert!(!queue.is_empty().await);
360 assert!(queue.is_full().await);
361 }
362
363 #[tokio::test]
364 async fn test_clear() {
365 let queue = TaskQueue::new(10);
366 let task1 = Task::new("1".to_string(), "Task 1".to_string());
367 let task2 = Task::new("2".to_string(), "Task 2".to_string());
368
369 queue.enqueue(task1, TaskPriority::Normal).await.unwrap();
370 queue.enqueue(task2, TaskPriority::High).await.unwrap();
371
372 assert_eq!(queue.size().await, 2);
373
374 queue.clear().await;
375
376 assert_eq!(queue.size().await, 0);
377 assert!(queue.is_empty().await);
378 }
379
380 #[tokio::test]
381 async fn test_size_by_priority() {
382 let queue = TaskQueue::new(10);
383
384 queue
385 .enqueue(
386 Task::new("1".to_string(), "T1".to_string()),
387 TaskPriority::Urgent,
388 )
389 .await
390 .unwrap();
391 queue
392 .enqueue(
393 Task::new("2".to_string(), "T2".to_string()),
394 TaskPriority::High,
395 )
396 .await
397 .unwrap();
398 queue
399 .enqueue(
400 Task::new("3".to_string(), "T3".to_string()),
401 TaskPriority::High,
402 )
403 .await
404 .unwrap();
405 queue
406 .enqueue(
407 Task::new("4".to_string(), "T4".to_string()),
408 TaskPriority::Normal,
409 )
410 .await
411 .unwrap();
412
413 let (urgent, high, normal, low) = queue.size_by_priority().await;
414 assert_eq!(urgent, 1);
415 assert_eq!(high, 2);
416 assert_eq!(normal, 1);
417 assert_eq!(low, 0);
418 }
419
420 #[tokio::test]
421 async fn test_default_queue() {
422 let queue = TaskQueue::default();
423 assert_eq!(queue.max_size, 100);
424 }
425
426 #[test]
427 fn test_task_priority_ordering() {
428 assert!(TaskPriority::Urgent > TaskPriority::High);
429 assert!(TaskPriority::High > TaskPriority::Normal);
430 assert!(TaskPriority::Normal > TaskPriority::Low);
431 }
432
433 #[test]
434 fn test_queued_task_new() {
435 let task = Task::new("test".to_string(), "Test task".to_string());
436 let queued = QueuedTask::new(task.clone(), TaskPriority::High);
437
438 assert_eq!(queued.task.id, task.id);
439 assert_eq!(queued.priority, TaskPriority::High);
440 assert!(!queued.is_assigned());
441 }
442}