Skip to main content

hivemind/core/
scheduler.rs

1//! Scheduler - Dependency resolution and task execution coordination.
2//!
3//! The scheduler releases tasks for execution when their dependencies
4//! are satisfied and coordinates the execution flow.
5
6use super::flow::{FlowError, TaskExecState, TaskFlow};
7use super::graph::TaskGraph;
8use uuid::Uuid;
9
10/// Result of a scheduling decision.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ScheduleResult {
13    /// Tasks are ready for execution.
14    Ready(Vec<Uuid>),
15    /// All tasks are complete.
16    Complete,
17    /// Flow is blocked (waiting on running/verifying tasks).
18    Blocked,
19    /// Flow has failed tasks.
20    HasFailures(Vec<Uuid>),
21}
22
23/// Scheduler for `TaskFlow` execution.
24pub struct Scheduler<'a> {
25    graph: &'a TaskGraph,
26    flow: &'a mut TaskFlow,
27}
28
29impl<'a> Scheduler<'a> {
30    /// Creates a new scheduler.
31    pub fn new(graph: &'a TaskGraph, flow: &'a mut TaskFlow) -> Self {
32        Self { graph, flow }
33    }
34
35    /// Updates task readiness based on dependencies.
36    ///
37    /// Returns the list of tasks that became ready.
38    pub fn update_readiness(&mut self) -> Vec<Uuid> {
39        let mut newly_ready = Vec::new();
40
41        for task_id in self.graph.tasks.keys() {
42            if let Some(exec) = self.flow.get_task_execution(*task_id) {
43                if exec.state != TaskExecState::Pending {
44                    continue;
45                }
46            } else {
47                continue;
48            }
49
50            // Check if all dependencies are satisfied
51            if self.dependencies_satisfied(*task_id)
52                && self
53                    .flow
54                    .transition_task(*task_id, TaskExecState::Ready)
55                    .is_ok()
56            {
57                newly_ready.push(*task_id);
58            }
59        }
60
61        newly_ready
62    }
63
64    /// Checks if all dependencies for a task are satisfied (SUCCESS).
65    fn dependencies_satisfied(&self, task_id: Uuid) -> bool {
66        let Some(deps) = self.graph.dependencies.get(&task_id) else {
67            return true;
68        };
69
70        for dep_id in deps {
71            if let Some(exec) = self.flow.get_task_execution(*dep_id) {
72                if exec.state != TaskExecState::Success {
73                    return false;
74                }
75            } else {
76                return false;
77            }
78        }
79
80        true
81    }
82
83    /// Gets the current scheduling decision.
84    pub fn schedule(&mut self) -> ScheduleResult {
85        // First, update readiness
86        self.update_readiness();
87
88        // Check for failures
89        let failed: Vec<_> = self.flow.tasks_in_state(TaskExecState::Failed);
90        let escalated: Vec<_> = self.flow.tasks_in_state(TaskExecState::Escalated);
91        if !failed.is_empty() || !escalated.is_empty() {
92            let mut failures = failed;
93            failures.extend(escalated);
94            return ScheduleResult::HasFailures(failures);
95        }
96
97        // Check for ready tasks
98        let ready: Vec<_> = self.flow.tasks_in_state(TaskExecState::Ready);
99        if !ready.is_empty() {
100            return ScheduleResult::Ready(ready);
101        }
102
103        // Check if all tasks are complete
104        let pending = self.flow.tasks_in_state(TaskExecState::Pending);
105        let running = self.flow.tasks_in_state(TaskExecState::Running);
106        let verifying = self.flow.tasks_in_state(TaskExecState::Verifying);
107        let retry = self.flow.tasks_in_state(TaskExecState::Retry);
108
109        if pending.is_empty() && running.is_empty() && verifying.is_empty() && retry.is_empty() {
110            return ScheduleResult::Complete;
111        }
112
113        // Otherwise, we're blocked waiting
114        ScheduleResult::Blocked
115    }
116
117    /// Marks a task as started (Running).
118    pub fn start_task(&mut self, task_id: Uuid) -> Result<(), FlowError> {
119        let exec = self
120            .flow
121            .get_task_execution(task_id)
122            .ok_or(FlowError::TaskNotFound(task_id))?;
123
124        // Can start from Ready or Retry state
125        if exec.state != TaskExecState::Ready && exec.state != TaskExecState::Retry {
126            return Err(FlowError::InvalidTransition {
127                from: exec.state,
128                to: TaskExecState::Running,
129            });
130        }
131
132        self.flow.transition_task(task_id, TaskExecState::Running)
133    }
134
135    /// Marks a task as completed execution (moves to Verifying).
136    pub fn complete_execution(&mut self, task_id: Uuid) -> Result<(), FlowError> {
137        self.flow.transition_task(task_id, TaskExecState::Verifying)
138    }
139
140    /// Records verification result.
141    pub fn record_verification(
142        &mut self,
143        task_id: Uuid,
144        passed: bool,
145        can_retry: bool,
146    ) -> Result<(), FlowError> {
147        let new_state = if passed {
148            TaskExecState::Success
149        } else if can_retry {
150            TaskExecState::Retry
151        } else {
152            TaskExecState::Failed
153        };
154
155        self.flow.transition_task(task_id, new_state)
156    }
157
158    /// Escalates a failed task to human intervention.
159    pub fn escalate(&mut self, task_id: Uuid) -> Result<(), FlowError> {
160        let exec = self
161            .flow
162            .get_task_execution(task_id)
163            .ok_or(FlowError::TaskNotFound(task_id))?;
164
165        if exec.state != TaskExecState::Failed {
166            return Err(FlowError::InvalidTransition {
167                from: exec.state,
168                to: TaskExecState::Escalated,
169            });
170        }
171
172        self.flow.transition_task(task_id, TaskExecState::Escalated)
173    }
174
175    /// Gets tasks that are blocked by a specific task.
176    pub fn blocked_by(&self, task_id: Uuid) -> Vec<Uuid> {
177        self.graph.dependents(task_id)
178    }
179
180    /// Checks if the flow can complete (all tasks in terminal state or can reach terminal).
181    pub fn can_complete(&self) -> bool {
182        let pending = self.flow.tasks_in_state(TaskExecState::Pending);
183        let running = self.flow.tasks_in_state(TaskExecState::Running);
184        let verifying = self.flow.tasks_in_state(TaskExecState::Verifying);
185        let retry = self.flow.tasks_in_state(TaskExecState::Retry);
186        let ready = self.flow.tasks_in_state(TaskExecState::Ready);
187
188        pending.is_empty()
189            && running.is_empty()
190            && verifying.is_empty()
191            && retry.is_empty()
192            && ready.is_empty()
193    }
194}
195
196/// Attempt tracking for a task execution.
197#[derive(Debug, Clone)]
198pub struct Attempt {
199    /// Unique attempt ID.
200    pub id: Uuid,
201    /// Task this attempt is for.
202    pub task_id: Uuid,
203    /// Attempt number (1-indexed).
204    pub attempt_number: u32,
205    /// Start timestamp.
206    pub started_at: chrono::DateTime<chrono::Utc>,
207    /// End timestamp.
208    pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
209    /// Outcome.
210    pub outcome: Option<AttemptOutcome>,
211}
212
213/// Outcome of an attempt.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum AttemptOutcome {
216    /// Attempt succeeded.
217    Success,
218    /// Attempt failed (may retry).
219    Failed,
220    /// Attempt crashed.
221    Crashed,
222}
223
224impl Attempt {
225    /// Creates a new attempt.
226    pub fn new(task_id: Uuid, attempt_number: u32) -> Self {
227        Self {
228            id: Uuid::new_v4(),
229            task_id,
230            attempt_number,
231            started_at: chrono::Utc::now(),
232            ended_at: None,
233            outcome: None,
234        }
235    }
236
237    /// Completes the attempt with an outcome.
238    pub fn complete(&mut self, outcome: AttemptOutcome) {
239        self.ended_at = Some(chrono::Utc::now());
240        self.outcome = Some(outcome);
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::core::graph::{GraphTask, SuccessCriteria, TaskGraph};
248
249    fn setup_test() -> (TaskGraph, TaskFlow) {
250        let project_id = Uuid::new_v4();
251        let mut graph = TaskGraph::new(project_id, "test");
252
253        let t1 = graph
254            .add_task(GraphTask::new("Task 1", SuccessCriteria::new("Done")))
255            .unwrap();
256        let t2 = graph
257            .add_task(GraphTask::new("Task 2", SuccessCriteria::new("Done")))
258            .unwrap();
259        let t3 = graph
260            .add_task(GraphTask::new("Task 3", SuccessCriteria::new("Done")))
261            .unwrap();
262
263        // t2 depends on t1, t3 depends on t2
264        graph.add_dependency(t2, t1).unwrap();
265        graph.add_dependency(t3, t2).unwrap();
266
267        let task_ids: Vec<_> = graph.tasks.keys().copied().collect();
268        let flow = TaskFlow::new(graph.id, project_id, &task_ids);
269
270        (graph, flow)
271    }
272
273    #[test]
274    fn initial_readiness() {
275        let (graph, mut flow) = setup_test();
276        let mut scheduler = Scheduler::new(&graph, &mut flow);
277
278        let ready = scheduler.update_readiness();
279
280        // Only t1 should be ready (no dependencies)
281        assert_eq!(ready.len(), 1);
282
283        let root_tasks = graph.root_tasks();
284        assert!(ready.iter().all(|id| root_tasks.contains(id)));
285    }
286
287    #[test]
288    fn dependency_chain_execution() {
289        let (graph, mut flow) = setup_test();
290        let task_ids: Vec<_> = graph.topological_order();
291        let t1 = task_ids[0];
292        let t2 = task_ids[1];
293        let t3 = task_ids[2];
294
295        {
296            let mut scheduler = Scheduler::new(&graph, &mut flow);
297
298            // Initial schedule - t1 should be ready
299            let result = scheduler.schedule();
300            assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t1)));
301
302            // Start t1
303            scheduler.start_task(t1).unwrap();
304        }
305
306        {
307            let mut scheduler = Scheduler::new(&graph, &mut flow);
308
309            // Should be blocked (t1 running)
310            let result = scheduler.schedule();
311            assert!(matches!(result, ScheduleResult::Blocked));
312
313            // Complete t1 execution
314            scheduler.complete_execution(t1).unwrap();
315
316            // Pass verification
317            scheduler.record_verification(t1, true, false).unwrap();
318        }
319
320        {
321            let mut scheduler = Scheduler::new(&graph, &mut flow);
322
323            // Now t2 should be ready
324            let result = scheduler.schedule();
325            assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t2)));
326
327            // Complete t2
328            scheduler.start_task(t2).unwrap();
329            scheduler.complete_execution(t2).unwrap();
330            scheduler.record_verification(t2, true, false).unwrap();
331        }
332
333        {
334            let mut scheduler = Scheduler::new(&graph, &mut flow);
335
336            // Now t3 should be ready
337            let result = scheduler.schedule();
338            assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t3)));
339
340            // Complete t3
341            scheduler.start_task(t3).unwrap();
342            scheduler.complete_execution(t3).unwrap();
343            scheduler.record_verification(t3, true, false).unwrap();
344        }
345
346        {
347            let mut scheduler = Scheduler::new(&graph, &mut flow);
348
349            // Should be complete
350            let result = scheduler.schedule();
351            assert!(matches!(result, ScheduleResult::Complete));
352        }
353    }
354
355    #[test]
356    fn retry_cycle() {
357        let (graph, mut flow) = setup_test();
358        let t1 = graph.root_tasks()[0];
359
360        {
361            let mut scheduler = Scheduler::new(&graph, &mut flow);
362            scheduler.update_readiness();
363            scheduler.start_task(t1).unwrap();
364            scheduler.complete_execution(t1).unwrap();
365            // Fail with retry
366            scheduler.record_verification(t1, false, true).unwrap();
367        }
368
369        let exec = flow.get_task_execution(t1).unwrap();
370        assert_eq!(exec.state, TaskExecState::Retry);
371
372        {
373            let mut scheduler = Scheduler::new(&graph, &mut flow);
374            // Retry
375            scheduler.start_task(t1).unwrap();
376        }
377
378        let exec = flow.get_task_execution(t1).unwrap();
379        assert_eq!(exec.attempt_count, 2);
380    }
381
382    #[test]
383    fn failure_handling() {
384        let (graph, mut flow) = setup_test();
385        let t1 = graph.root_tasks()[0];
386
387        let mut scheduler = Scheduler::new(&graph, &mut flow);
388
389        scheduler.update_readiness();
390        scheduler.start_task(t1).unwrap();
391        scheduler.complete_execution(t1).unwrap();
392
393        // Fail without retry
394        scheduler.record_verification(t1, false, false).unwrap();
395
396        let result = scheduler.schedule();
397        assert!(matches!(result, ScheduleResult::HasFailures(_)));
398    }
399
400    #[test]
401    fn escalation() {
402        let (graph, mut flow) = setup_test();
403        let t1 = graph.root_tasks()[0];
404
405        let mut scheduler = Scheduler::new(&graph, &mut flow);
406
407        scheduler.update_readiness();
408        scheduler.start_task(t1).unwrap();
409        scheduler.complete_execution(t1).unwrap();
410        scheduler.record_verification(t1, false, false).unwrap();
411
412        // Escalate
413        scheduler.escalate(t1).unwrap();
414
415        let exec = flow.get_task_execution(t1).unwrap();
416        assert_eq!(exec.state, TaskExecState::Escalated);
417    }
418
419    #[test]
420    fn attempt_tracking() {
421        let task_id = Uuid::new_v4();
422        let mut attempt = Attempt::new(task_id, 1);
423
424        assert_eq!(attempt.attempt_number, 1);
425        assert!(attempt.ended_at.is_none());
426        assert!(attempt.outcome.is_none());
427
428        attempt.complete(AttemptOutcome::Success);
429
430        assert!(attempt.ended_at.is_some());
431        assert_eq!(attempt.outcome, Some(AttemptOutcome::Success));
432    }
433}