Skip to main content

argentor_orchestrator/
task_queue.rs

1use crate::types::{Task, TaskStatus};
2use chrono::Utc;
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// A task queue with dependency resolution (topological ordering).
7pub struct TaskQueue {
8    tasks: HashMap<Uuid, Task>,
9    completed: Vec<Uuid>,
10}
11
12impl TaskQueue {
13    /// Create a new empty task queue.
14    pub fn new() -> Self {
15        Self {
16            tasks: HashMap::new(),
17            completed: Vec::new(),
18        }
19    }
20
21    /// Add a task to the queue.
22    pub fn add(&mut self, task: Task) -> Uuid {
23        let id = task.id;
24        self.tasks.insert(id, task);
25        id
26    }
27
28    /// Get the next ready task (all dependencies resolved, status == Pending).
29    /// Returns tasks in creation-time order among those that are ready.
30    pub fn next_ready(&self) -> Option<&Task> {
31        let mut ready: Vec<&Task> = self
32            .tasks
33            .values()
34            .filter(|t| t.is_ready(&self.completed))
35            .collect();
36        ready.sort_by_key(|t| t.created_at);
37        ready.into_iter().next()
38    }
39
40    /// Get all tasks that are ready to execute.
41    pub fn all_ready(&self) -> Vec<&Task> {
42        let mut ready: Vec<&Task> = self
43            .tasks
44            .values()
45            .filter(|t| t.is_ready(&self.completed))
46            .collect();
47        ready.sort_by_key(|t| t.created_at);
48        ready
49    }
50
51    /// Mark a task as running.
52    pub fn mark_running(&mut self, id: Uuid) -> bool {
53        if let Some(task) = self.tasks.get_mut(&id) {
54            task.status = TaskStatus::Running;
55            true
56        } else {
57            false
58        }
59    }
60
61    /// Mark a task as completed.
62    pub fn mark_completed(&mut self, id: Uuid) -> bool {
63        if let Some(task) = self.tasks.get_mut(&id) {
64            task.status = TaskStatus::Completed;
65            task.completed_at = Some(Utc::now());
66            self.completed.push(id);
67            true
68        } else {
69            false
70        }
71    }
72
73    /// Mark a task as failed.
74    pub fn mark_failed(&mut self, id: Uuid, reason: String) -> bool {
75        if let Some(task) = self.tasks.get_mut(&id) {
76            task.status = TaskStatus::Failed { reason };
77            true
78        } else {
79            false
80        }
81    }
82
83    /// Mark a task as needing human review (HITL).
84    pub fn mark_needs_review(&mut self, id: Uuid) -> bool {
85        if let Some(task) = self.tasks.get_mut(&id) {
86            task.status = TaskStatus::NeedsHumanReview;
87            true
88        } else {
89            false
90        }
91    }
92
93    /// Get a task by ID.
94    pub fn get(&self, id: Uuid) -> Option<&Task> {
95        self.tasks.get(&id)
96    }
97
98    /// Get a mutable reference to a task.
99    pub fn get_mut(&mut self, id: Uuid) -> Option<&mut Task> {
100        self.tasks.get_mut(&id)
101    }
102
103    /// List all tasks.
104    pub fn all_tasks(&self) -> Vec<&Task> {
105        let mut tasks: Vec<&Task> = self.tasks.values().collect();
106        tasks.sort_by_key(|t| t.created_at);
107        tasks
108    }
109
110    /// Count of pending tasks.
111    pub fn pending_count(&self) -> usize {
112        self.tasks
113            .values()
114            .filter(|t| t.status == TaskStatus::Pending)
115            .count()
116    }
117
118    /// Count of completed tasks.
119    pub fn completed_count(&self) -> usize {
120        self.completed.len()
121    }
122
123    /// Total number of tasks.
124    pub fn total_count(&self) -> usize {
125        self.tasks.len()
126    }
127
128    /// Check if all tasks are in a terminal state (completed, failed, or awaiting review).
129    pub fn is_done(&self) -> bool {
130        self.tasks.values().all(|t| {
131            matches!(
132                t.status,
133                TaskStatus::Completed | TaskStatus::Failed { .. } | TaskStatus::NeedsHumanReview
134            )
135        })
136    }
137
138    /// Count of tasks awaiting human review.
139    pub fn needs_review_count(&self) -> usize {
140        self.tasks
141            .values()
142            .filter(|t| t.status == TaskStatus::NeedsHumanReview)
143            .count()
144    }
145
146    /// Check for cycles in the dependency graph.
147    /// Returns true if a cycle is detected.
148    pub fn has_cycle(&self) -> bool {
149        let mut visited = HashMap::new();
150        for &id in self.tasks.keys() {
151            if self.dfs_cycle(id, &mut visited) {
152                return true;
153            }
154        }
155        false
156    }
157
158    fn dfs_cycle(&self, id: Uuid, visited: &mut HashMap<Uuid, u8>) -> bool {
159        match visited.get(&id) {
160            Some(1) => return true,  // back edge = cycle
161            Some(2) => return false, // already processed
162            _ => {}
163        }
164        visited.insert(id, 1); // mark as in progress
165        if let Some(task) = self.tasks.get(&id) {
166            for dep in &task.dependencies {
167                if self.dfs_cycle(*dep, visited) {
168                    return true;
169                }
170            }
171        }
172        visited.insert(id, 2); // mark as done
173        false
174    }
175}
176
177impl Default for TaskQueue {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183#[cfg(test)]
184#[allow(clippy::unwrap_used, clippy::expect_used)]
185mod tests {
186    use super::*;
187    use crate::types::AgentRole;
188
189    #[test]
190    fn test_empty_queue() {
191        let queue = TaskQueue::new();
192        assert_eq!(queue.total_count(), 0);
193        assert_eq!(queue.pending_count(), 0);
194        assert!(queue.is_done());
195        assert!(queue.next_ready().is_none());
196    }
197
198    #[test]
199    fn test_add_and_retrieve() {
200        let mut queue = TaskQueue::new();
201        let task = Task::new("Test task", AgentRole::Coder);
202        let id = task.id;
203        queue.add(task);
204
205        assert_eq!(queue.total_count(), 1);
206        assert!(queue.get(id).is_some());
207        assert_eq!(queue.get(id).unwrap().description, "Test task");
208    }
209
210    #[test]
211    fn test_next_ready_no_deps() {
212        let mut queue = TaskQueue::new();
213        let task = Task::new("Ready task", AgentRole::Spec);
214        queue.add(task);
215
216        let ready = queue.next_ready();
217        assert!(ready.is_some());
218        assert_eq!(ready.unwrap().description, "Ready task");
219    }
220
221    #[test]
222    fn test_next_ready_with_deps() {
223        let mut queue = TaskQueue::new();
224
225        let t1 = Task::new("First", AgentRole::Spec);
226        let t1_id = t1.id;
227        queue.add(t1);
228
229        let t2 = Task::new("Second", AgentRole::Coder).with_dependencies(vec![t1_id]);
230        queue.add(t2);
231
232        // Only t1 should be ready
233        let ready = queue.next_ready();
234        assert_eq!(ready.unwrap().description, "First");
235
236        // Complete t1
237        queue.mark_running(t1_id);
238        queue.mark_completed(t1_id);
239
240        // Now t2 should be ready
241        let ready = queue.next_ready();
242        assert_eq!(ready.unwrap().description, "Second");
243    }
244
245    #[test]
246    fn test_all_ready_parallel() {
247        let mut queue = TaskQueue::new();
248
249        let t1 = Task::new("Task A", AgentRole::Spec);
250        queue.add(t1);
251        let t2 = Task::new("Task B", AgentRole::Coder);
252        queue.add(t2);
253        let t3 = Task::new("Task C", AgentRole::Tester);
254        queue.add(t3);
255
256        let ready = queue.all_ready();
257        assert_eq!(ready.len(), 3);
258    }
259
260    #[test]
261    fn test_mark_completed() {
262        let mut queue = TaskQueue::new();
263        let task = Task::new("Complete me", AgentRole::Coder);
264        let id = task.id;
265        queue.add(task);
266
267        queue.mark_running(id);
268        assert_eq!(queue.get(id).unwrap().status, TaskStatus::Running);
269
270        queue.mark_completed(id);
271        assert_eq!(queue.get(id).unwrap().status, TaskStatus::Completed);
272        assert_eq!(queue.completed_count(), 1);
273        assert!(queue.is_done());
274    }
275
276    #[test]
277    fn test_mark_failed() {
278        let mut queue = TaskQueue::new();
279        let task = Task::new("Fail me", AgentRole::Tester);
280        let id = task.id;
281        queue.add(task);
282
283        queue.mark_failed(id, "compilation error".to_string());
284        assert!(matches!(
285            queue.get(id).unwrap().status,
286            TaskStatus::Failed { .. }
287        ));
288    }
289
290    #[test]
291    fn test_mark_needs_review() {
292        let mut queue = TaskQueue::new();
293        let task = Task::new("Review me", AgentRole::Reviewer);
294        let id = task.id;
295        queue.add(task);
296
297        queue.mark_needs_review(id);
298        assert_eq!(queue.get(id).unwrap().status, TaskStatus::NeedsHumanReview);
299    }
300
301    #[test]
302    fn test_dependency_chain() {
303        let mut queue = TaskQueue::new();
304
305        let t1 = Task::new("Spec", AgentRole::Spec);
306        let t1_id = t1.id;
307        queue.add(t1);
308
309        let t2 = Task::new("Code", AgentRole::Coder).with_dependencies(vec![t1_id]);
310        let t2_id = t2.id;
311        queue.add(t2);
312
313        let t3 = Task::new("Test", AgentRole::Tester).with_dependencies(vec![t2_id]);
314        let t3_id = t3.id;
315        queue.add(t3);
316
317        let t4 = Task::new("Review", AgentRole::Reviewer).with_dependencies(vec![t2_id, t3_id]);
318        queue.add(t4);
319
320        // Only t1 ready initially
321        assert_eq!(queue.all_ready().len(), 1);
322
323        queue.mark_running(t1_id);
324        queue.mark_completed(t1_id);
325        // Now t2 ready
326        assert_eq!(queue.all_ready().len(), 1);
327
328        queue.mark_running(t2_id);
329        queue.mark_completed(t2_id);
330        // Now t3 ready
331        assert_eq!(queue.all_ready().len(), 1);
332
333        queue.mark_running(t3_id);
334        queue.mark_completed(t3_id);
335        // Now t4 ready (both deps complete)
336        assert_eq!(queue.all_ready().len(), 1);
337    }
338
339    #[test]
340    fn test_no_cycle() {
341        let mut queue = TaskQueue::new();
342        let t1 = Task::new("A", AgentRole::Spec);
343        let t1_id = t1.id;
344        queue.add(t1);
345        let t2 = Task::new("B", AgentRole::Coder).with_dependencies(vec![t1_id]);
346        queue.add(t2);
347        assert!(!queue.has_cycle());
348    }
349
350    #[test]
351    fn test_cycle_detection() {
352        let mut queue = TaskQueue::new();
353        let id1 = Uuid::new_v4();
354        let id2 = Uuid::new_v4();
355
356        let mut t1 = Task::new("A", AgentRole::Spec);
357        t1.id = id1;
358        t1.dependencies = vec![id2];
359
360        let mut t2 = Task::new("B", AgentRole::Coder);
361        t2.id = id2;
362        t2.dependencies = vec![id1];
363
364        queue.add(t1);
365        queue.add(t2);
366        assert!(queue.has_cycle());
367    }
368
369    #[test]
370    fn test_is_done() {
371        let mut queue = TaskQueue::new();
372        let task = Task::new("Only task", AgentRole::Spec);
373        let id = task.id;
374        queue.add(task);
375
376        assert!(!queue.is_done());
377        queue.mark_completed(id);
378        assert!(queue.is_done());
379    }
380
381    #[test]
382    fn test_is_done_with_needs_review() {
383        let mut queue = TaskQueue::new();
384        let t1 = Task::new("Task 1", AgentRole::Coder);
385        let t1_id = t1.id;
386        let t2 = Task::new("Task 2", AgentRole::Reviewer);
387        let t2_id = t2.id;
388        queue.add(t1);
389        queue.add(t2);
390
391        queue.mark_completed(t1_id);
392        assert!(!queue.is_done());
393
394        queue.mark_needs_review(t2_id);
395        assert!(queue.is_done());
396    }
397
398    #[test]
399    fn test_is_done_with_failed() {
400        let mut queue = TaskQueue::new();
401        let task = Task::new("Failing task", AgentRole::Tester);
402        let id = task.id;
403        queue.add(task);
404
405        queue.mark_failed(id, "error".into());
406        assert!(queue.is_done());
407    }
408
409    #[test]
410    fn test_needs_review_count() {
411        let mut queue = TaskQueue::new();
412        let t1 = Task::new("Task 1", AgentRole::Coder);
413        let t1_id = t1.id;
414        let t2 = Task::new("Task 2", AgentRole::Reviewer);
415        let t2_id = t2.id;
416        queue.add(t1);
417        queue.add(t2);
418
419        assert_eq!(queue.needs_review_count(), 0);
420        queue.mark_needs_review(t1_id);
421        assert_eq!(queue.needs_review_count(), 1);
422        queue.mark_needs_review(t2_id);
423        assert_eq!(queue.needs_review_count(), 2);
424    }
425}