argentor_orchestrator/
task_queue.rs1use crate::types::{Task, TaskStatus};
2use chrono::Utc;
3use std::collections::HashMap;
4use uuid::Uuid;
5
6pub struct TaskQueue {
8 tasks: HashMap<Uuid, Task>,
9 completed: Vec<Uuid>,
10}
11
12impl TaskQueue {
13 pub fn new() -> Self {
15 Self {
16 tasks: HashMap::new(),
17 completed: Vec::new(),
18 }
19 }
20
21 pub fn add(&mut self, task: Task) -> Uuid {
23 let id = task.id;
24 self.tasks.insert(id, task);
25 id
26 }
27
28 pub fn next_ready(&self) -> Option<&Task> {
31 let mut ready: Vec<&Task> = self
32 .tasks
33 .values()
34 .filter(|t| t.is_ready(&self.completed))
35 .collect();
36 ready.sort_by_key(|t| t.created_at);
37 ready.into_iter().next()
38 }
39
40 pub fn all_ready(&self) -> Vec<&Task> {
42 let mut ready: Vec<&Task> = self
43 .tasks
44 .values()
45 .filter(|t| t.is_ready(&self.completed))
46 .collect();
47 ready.sort_by_key(|t| t.created_at);
48 ready
49 }
50
51 pub fn mark_running(&mut self, id: Uuid) -> bool {
53 if let Some(task) = self.tasks.get_mut(&id) {
54 task.status = TaskStatus::Running;
55 true
56 } else {
57 false
58 }
59 }
60
61 pub fn mark_completed(&mut self, id: Uuid) -> bool {
63 if let Some(task) = self.tasks.get_mut(&id) {
64 task.status = TaskStatus::Completed;
65 task.completed_at = Some(Utc::now());
66 self.completed.push(id);
67 true
68 } else {
69 false
70 }
71 }
72
73 pub fn mark_failed(&mut self, id: Uuid, reason: String) -> bool {
75 if let Some(task) = self.tasks.get_mut(&id) {
76 task.status = TaskStatus::Failed { reason };
77 true
78 } else {
79 false
80 }
81 }
82
83 pub fn mark_needs_review(&mut self, id: Uuid) -> bool {
85 if let Some(task) = self.tasks.get_mut(&id) {
86 task.status = TaskStatus::NeedsHumanReview;
87 true
88 } else {
89 false
90 }
91 }
92
93 pub fn get(&self, id: Uuid) -> Option<&Task> {
95 self.tasks.get(&id)
96 }
97
98 pub fn get_mut(&mut self, id: Uuid) -> Option<&mut Task> {
100 self.tasks.get_mut(&id)
101 }
102
103 pub fn all_tasks(&self) -> Vec<&Task> {
105 let mut tasks: Vec<&Task> = self.tasks.values().collect();
106 tasks.sort_by_key(|t| t.created_at);
107 tasks
108 }
109
110 pub fn pending_count(&self) -> usize {
112 self.tasks
113 .values()
114 .filter(|t| t.status == TaskStatus::Pending)
115 .count()
116 }
117
118 pub fn completed_count(&self) -> usize {
120 self.completed.len()
121 }
122
123 pub fn total_count(&self) -> usize {
125 self.tasks.len()
126 }
127
128 pub fn is_done(&self) -> bool {
130 self.tasks.values().all(|t| {
131 matches!(
132 t.status,
133 TaskStatus::Completed | TaskStatus::Failed { .. } | TaskStatus::NeedsHumanReview
134 )
135 })
136 }
137
138 pub fn needs_review_count(&self) -> usize {
140 self.tasks
141 .values()
142 .filter(|t| t.status == TaskStatus::NeedsHumanReview)
143 .count()
144 }
145
146 pub fn has_cycle(&self) -> bool {
149 let mut visited = HashMap::new();
150 for &id in self.tasks.keys() {
151 if self.dfs_cycle(id, &mut visited) {
152 return true;
153 }
154 }
155 false
156 }
157
158 fn dfs_cycle(&self, id: Uuid, visited: &mut HashMap<Uuid, u8>) -> bool {
159 match visited.get(&id) {
160 Some(1) => return true, Some(2) => return false, _ => {}
163 }
164 visited.insert(id, 1); if let Some(task) = self.tasks.get(&id) {
166 for dep in &task.dependencies {
167 if self.dfs_cycle(*dep, visited) {
168 return true;
169 }
170 }
171 }
172 visited.insert(id, 2); false
174 }
175}
176
177impl Default for TaskQueue {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183#[cfg(test)]
184#[allow(clippy::unwrap_used, clippy::expect_used)]
185mod tests {
186 use super::*;
187 use crate::types::AgentRole;
188
189 #[test]
190 fn test_empty_queue() {
191 let queue = TaskQueue::new();
192 assert_eq!(queue.total_count(), 0);
193 assert_eq!(queue.pending_count(), 0);
194 assert!(queue.is_done());
195 assert!(queue.next_ready().is_none());
196 }
197
198 #[test]
199 fn test_add_and_retrieve() {
200 let mut queue = TaskQueue::new();
201 let task = Task::new("Test task", AgentRole::Coder);
202 let id = task.id;
203 queue.add(task);
204
205 assert_eq!(queue.total_count(), 1);
206 assert!(queue.get(id).is_some());
207 assert_eq!(queue.get(id).unwrap().description, "Test task");
208 }
209
210 #[test]
211 fn test_next_ready_no_deps() {
212 let mut queue = TaskQueue::new();
213 let task = Task::new("Ready task", AgentRole::Spec);
214 queue.add(task);
215
216 let ready = queue.next_ready();
217 assert!(ready.is_some());
218 assert_eq!(ready.unwrap().description, "Ready task");
219 }
220
221 #[test]
222 fn test_next_ready_with_deps() {
223 let mut queue = TaskQueue::new();
224
225 let t1 = Task::new("First", AgentRole::Spec);
226 let t1_id = t1.id;
227 queue.add(t1);
228
229 let t2 = Task::new("Second", AgentRole::Coder).with_dependencies(vec![t1_id]);
230 queue.add(t2);
231
232 let ready = queue.next_ready();
234 assert_eq!(ready.unwrap().description, "First");
235
236 queue.mark_running(t1_id);
238 queue.mark_completed(t1_id);
239
240 let ready = queue.next_ready();
242 assert_eq!(ready.unwrap().description, "Second");
243 }
244
245 #[test]
246 fn test_all_ready_parallel() {
247 let mut queue = TaskQueue::new();
248
249 let t1 = Task::new("Task A", AgentRole::Spec);
250 queue.add(t1);
251 let t2 = Task::new("Task B", AgentRole::Coder);
252 queue.add(t2);
253 let t3 = Task::new("Task C", AgentRole::Tester);
254 queue.add(t3);
255
256 let ready = queue.all_ready();
257 assert_eq!(ready.len(), 3);
258 }
259
260 #[test]
261 fn test_mark_completed() {
262 let mut queue = TaskQueue::new();
263 let task = Task::new("Complete me", AgentRole::Coder);
264 let id = task.id;
265 queue.add(task);
266
267 queue.mark_running(id);
268 assert_eq!(queue.get(id).unwrap().status, TaskStatus::Running);
269
270 queue.mark_completed(id);
271 assert_eq!(queue.get(id).unwrap().status, TaskStatus::Completed);
272 assert_eq!(queue.completed_count(), 1);
273 assert!(queue.is_done());
274 }
275
276 #[test]
277 fn test_mark_failed() {
278 let mut queue = TaskQueue::new();
279 let task = Task::new("Fail me", AgentRole::Tester);
280 let id = task.id;
281 queue.add(task);
282
283 queue.mark_failed(id, "compilation error".to_string());
284 assert!(matches!(
285 queue.get(id).unwrap().status,
286 TaskStatus::Failed { .. }
287 ));
288 }
289
290 #[test]
291 fn test_mark_needs_review() {
292 let mut queue = TaskQueue::new();
293 let task = Task::new("Review me", AgentRole::Reviewer);
294 let id = task.id;
295 queue.add(task);
296
297 queue.mark_needs_review(id);
298 assert_eq!(queue.get(id).unwrap().status, TaskStatus::NeedsHumanReview);
299 }
300
301 #[test]
302 fn test_dependency_chain() {
303 let mut queue = TaskQueue::new();
304
305 let t1 = Task::new("Spec", AgentRole::Spec);
306 let t1_id = t1.id;
307 queue.add(t1);
308
309 let t2 = Task::new("Code", AgentRole::Coder).with_dependencies(vec![t1_id]);
310 let t2_id = t2.id;
311 queue.add(t2);
312
313 let t3 = Task::new("Test", AgentRole::Tester).with_dependencies(vec![t2_id]);
314 let t3_id = t3.id;
315 queue.add(t3);
316
317 let t4 = Task::new("Review", AgentRole::Reviewer).with_dependencies(vec![t2_id, t3_id]);
318 queue.add(t4);
319
320 assert_eq!(queue.all_ready().len(), 1);
322
323 queue.mark_running(t1_id);
324 queue.mark_completed(t1_id);
325 assert_eq!(queue.all_ready().len(), 1);
327
328 queue.mark_running(t2_id);
329 queue.mark_completed(t2_id);
330 assert_eq!(queue.all_ready().len(), 1);
332
333 queue.mark_running(t3_id);
334 queue.mark_completed(t3_id);
335 assert_eq!(queue.all_ready().len(), 1);
337 }
338
339 #[test]
340 fn test_no_cycle() {
341 let mut queue = TaskQueue::new();
342 let t1 = Task::new("A", AgentRole::Spec);
343 let t1_id = t1.id;
344 queue.add(t1);
345 let t2 = Task::new("B", AgentRole::Coder).with_dependencies(vec![t1_id]);
346 queue.add(t2);
347 assert!(!queue.has_cycle());
348 }
349
350 #[test]
351 fn test_cycle_detection() {
352 let mut queue = TaskQueue::new();
353 let id1 = Uuid::new_v4();
354 let id2 = Uuid::new_v4();
355
356 let mut t1 = Task::new("A", AgentRole::Spec);
357 t1.id = id1;
358 t1.dependencies = vec![id2];
359
360 let mut t2 = Task::new("B", AgentRole::Coder);
361 t2.id = id2;
362 t2.dependencies = vec![id1];
363
364 queue.add(t1);
365 queue.add(t2);
366 assert!(queue.has_cycle());
367 }
368
369 #[test]
370 fn test_is_done() {
371 let mut queue = TaskQueue::new();
372 let task = Task::new("Only task", AgentRole::Spec);
373 let id = task.id;
374 queue.add(task);
375
376 assert!(!queue.is_done());
377 queue.mark_completed(id);
378 assert!(queue.is_done());
379 }
380
381 #[test]
382 fn test_is_done_with_needs_review() {
383 let mut queue = TaskQueue::new();
384 let t1 = Task::new("Task 1", AgentRole::Coder);
385 let t1_id = t1.id;
386 let t2 = Task::new("Task 2", AgentRole::Reviewer);
387 let t2_id = t2.id;
388 queue.add(t1);
389 queue.add(t2);
390
391 queue.mark_completed(t1_id);
392 assert!(!queue.is_done());
393
394 queue.mark_needs_review(t2_id);
395 assert!(queue.is_done());
396 }
397
398 #[test]
399 fn test_is_done_with_failed() {
400 let mut queue = TaskQueue::new();
401 let task = Task::new("Failing task", AgentRole::Tester);
402 let id = task.id;
403 queue.add(task);
404
405 queue.mark_failed(id, "error".into());
406 assert!(queue.is_done());
407 }
408
409 #[test]
410 fn test_needs_review_count() {
411 let mut queue = TaskQueue::new();
412 let t1 = Task::new("Task 1", AgentRole::Coder);
413 let t1_id = t1.id;
414 let t2 = Task::new("Task 2", AgentRole::Reviewer);
415 let t2_id = t2.id;
416 queue.add(t1);
417 queue.add(t2);
418
419 assert_eq!(queue.needs_review_count(), 0);
420 queue.mark_needs_review(t1_id);
421 assert_eq!(queue.needs_review_count(), 1);
422 queue.mark_needs_review(t2_id);
423 assert_eq!(queue.needs_review_count(), 2);
424 }
425}