1use super::flow::{FlowError, TaskExecState, TaskFlow};
7use super::graph::TaskGraph;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum ScheduleResult {
13 Ready(Vec<Uuid>),
15 Complete,
17 Blocked,
19 HasFailures(Vec<Uuid>),
21}
22
23pub struct Scheduler<'a> {
25 graph: &'a TaskGraph,
26 flow: &'a mut TaskFlow,
27}
28
29impl<'a> Scheduler<'a> {
30 pub fn new(graph: &'a TaskGraph, flow: &'a mut TaskFlow) -> Self {
32 Self { graph, flow }
33 }
34
35 pub fn update_readiness(&mut self) -> Vec<Uuid> {
39 let mut newly_ready = Vec::new();
40
41 for task_id in self.graph.tasks.keys() {
42 if let Some(exec) = self.flow.get_task_execution(*task_id) {
43 if exec.state != TaskExecState::Pending {
44 continue;
45 }
46 } else {
47 continue;
48 }
49
50 if self.dependencies_satisfied(*task_id)
52 && self
53 .flow
54 .transition_task(*task_id, TaskExecState::Ready)
55 .is_ok()
56 {
57 newly_ready.push(*task_id);
58 }
59 }
60
61 newly_ready
62 }
63
64 fn dependencies_satisfied(&self, task_id: Uuid) -> bool {
66 let Some(deps) = self.graph.dependencies.get(&task_id) else {
67 return true;
68 };
69
70 for dep_id in deps {
71 if let Some(exec) = self.flow.get_task_execution(*dep_id) {
72 if exec.state != TaskExecState::Success {
73 return false;
74 }
75 } else {
76 return false;
77 }
78 }
79
80 true
81 }
82
83 pub fn schedule(&mut self) -> ScheduleResult {
85 self.update_readiness();
87
88 let failed: Vec<_> = self.flow.tasks_in_state(TaskExecState::Failed);
90 let escalated: Vec<_> = self.flow.tasks_in_state(TaskExecState::Escalated);
91 if !failed.is_empty() || !escalated.is_empty() {
92 let mut failures = failed;
93 failures.extend(escalated);
94 return ScheduleResult::HasFailures(failures);
95 }
96
97 let ready: Vec<_> = self.flow.tasks_in_state(TaskExecState::Ready);
99 if !ready.is_empty() {
100 return ScheduleResult::Ready(ready);
101 }
102
103 let pending = self.flow.tasks_in_state(TaskExecState::Pending);
105 let running = self.flow.tasks_in_state(TaskExecState::Running);
106 let verifying = self.flow.tasks_in_state(TaskExecState::Verifying);
107 let retry = self.flow.tasks_in_state(TaskExecState::Retry);
108
109 if pending.is_empty() && running.is_empty() && verifying.is_empty() && retry.is_empty() {
110 return ScheduleResult::Complete;
111 }
112
113 ScheduleResult::Blocked
115 }
116
117 pub fn start_task(&mut self, task_id: Uuid) -> Result<(), FlowError> {
119 let exec = self
120 .flow
121 .get_task_execution(task_id)
122 .ok_or(FlowError::TaskNotFound(task_id))?;
123
124 if exec.state != TaskExecState::Ready && exec.state != TaskExecState::Retry {
126 return Err(FlowError::InvalidTransition {
127 from: exec.state,
128 to: TaskExecState::Running,
129 });
130 }
131
132 self.flow.transition_task(task_id, TaskExecState::Running)
133 }
134
135 pub fn complete_execution(&mut self, task_id: Uuid) -> Result<(), FlowError> {
137 self.flow.transition_task(task_id, TaskExecState::Verifying)
138 }
139
140 pub fn record_verification(
142 &mut self,
143 task_id: Uuid,
144 passed: bool,
145 can_retry: bool,
146 ) -> Result<(), FlowError> {
147 let new_state = if passed {
148 TaskExecState::Success
149 } else if can_retry {
150 TaskExecState::Retry
151 } else {
152 TaskExecState::Failed
153 };
154
155 self.flow.transition_task(task_id, new_state)
156 }
157
158 pub fn escalate(&mut self, task_id: Uuid) -> Result<(), FlowError> {
160 let exec = self
161 .flow
162 .get_task_execution(task_id)
163 .ok_or(FlowError::TaskNotFound(task_id))?;
164
165 if exec.state != TaskExecState::Failed {
166 return Err(FlowError::InvalidTransition {
167 from: exec.state,
168 to: TaskExecState::Escalated,
169 });
170 }
171
172 self.flow.transition_task(task_id, TaskExecState::Escalated)
173 }
174
175 pub fn blocked_by(&self, task_id: Uuid) -> Vec<Uuid> {
177 self.graph.dependents(task_id)
178 }
179
180 pub fn can_complete(&self) -> bool {
182 let pending = self.flow.tasks_in_state(TaskExecState::Pending);
183 let running = self.flow.tasks_in_state(TaskExecState::Running);
184 let verifying = self.flow.tasks_in_state(TaskExecState::Verifying);
185 let retry = self.flow.tasks_in_state(TaskExecState::Retry);
186 let ready = self.flow.tasks_in_state(TaskExecState::Ready);
187
188 pending.is_empty()
189 && running.is_empty()
190 && verifying.is_empty()
191 && retry.is_empty()
192 && ready.is_empty()
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct Attempt {
199 pub id: Uuid,
201 pub task_id: Uuid,
203 pub attempt_number: u32,
205 pub started_at: chrono::DateTime<chrono::Utc>,
207 pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
209 pub outcome: Option<AttemptOutcome>,
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum AttemptOutcome {
216 Success,
218 Failed,
220 Crashed,
222}
223
224impl Attempt {
225 pub fn new(task_id: Uuid, attempt_number: u32) -> Self {
227 Self {
228 id: Uuid::new_v4(),
229 task_id,
230 attempt_number,
231 started_at: chrono::Utc::now(),
232 ended_at: None,
233 outcome: None,
234 }
235 }
236
237 pub fn complete(&mut self, outcome: AttemptOutcome) {
239 self.ended_at = Some(chrono::Utc::now());
240 self.outcome = Some(outcome);
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::core::graph::{GraphTask, SuccessCriteria, TaskGraph};
248
249 fn setup_test() -> (TaskGraph, TaskFlow) {
250 let project_id = Uuid::new_v4();
251 let mut graph = TaskGraph::new(project_id, "test");
252
253 let t1 = graph
254 .add_task(GraphTask::new("Task 1", SuccessCriteria::new("Done")))
255 .unwrap();
256 let t2 = graph
257 .add_task(GraphTask::new("Task 2", SuccessCriteria::new("Done")))
258 .unwrap();
259 let t3 = graph
260 .add_task(GraphTask::new("Task 3", SuccessCriteria::new("Done")))
261 .unwrap();
262
263 graph.add_dependency(t2, t1).unwrap();
265 graph.add_dependency(t3, t2).unwrap();
266
267 let task_ids: Vec<_> = graph.tasks.keys().copied().collect();
268 let flow = TaskFlow::new(graph.id, project_id, &task_ids);
269
270 (graph, flow)
271 }
272
273 #[test]
274 fn initial_readiness() {
275 let (graph, mut flow) = setup_test();
276 let mut scheduler = Scheduler::new(&graph, &mut flow);
277
278 let ready = scheduler.update_readiness();
279
280 assert_eq!(ready.len(), 1);
282
283 let root_tasks = graph.root_tasks();
284 assert!(ready.iter().all(|id| root_tasks.contains(id)));
285 }
286
287 #[test]
288 fn dependency_chain_execution() {
289 let (graph, mut flow) = setup_test();
290 let task_ids: Vec<_> = graph.topological_order();
291 let t1 = task_ids[0];
292 let t2 = task_ids[1];
293 let t3 = task_ids[2];
294
295 {
296 let mut scheduler = Scheduler::new(&graph, &mut flow);
297
298 let result = scheduler.schedule();
300 assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t1)));
301
302 scheduler.start_task(t1).unwrap();
304 }
305
306 {
307 let mut scheduler = Scheduler::new(&graph, &mut flow);
308
309 let result = scheduler.schedule();
311 assert!(matches!(result, ScheduleResult::Blocked));
312
313 scheduler.complete_execution(t1).unwrap();
315
316 scheduler.record_verification(t1, true, false).unwrap();
318 }
319
320 {
321 let mut scheduler = Scheduler::new(&graph, &mut flow);
322
323 let result = scheduler.schedule();
325 assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t2)));
326
327 scheduler.start_task(t2).unwrap();
329 scheduler.complete_execution(t2).unwrap();
330 scheduler.record_verification(t2, true, false).unwrap();
331 }
332
333 {
334 let mut scheduler = Scheduler::new(&graph, &mut flow);
335
336 let result = scheduler.schedule();
338 assert!(matches!(result, ScheduleResult::Ready(ref tasks) if tasks.contains(&t3)));
339
340 scheduler.start_task(t3).unwrap();
342 scheduler.complete_execution(t3).unwrap();
343 scheduler.record_verification(t3, true, false).unwrap();
344 }
345
346 {
347 let mut scheduler = Scheduler::new(&graph, &mut flow);
348
349 let result = scheduler.schedule();
351 assert!(matches!(result, ScheduleResult::Complete));
352 }
353 }
354
355 #[test]
356 fn retry_cycle() {
357 let (graph, mut flow) = setup_test();
358 let t1 = graph.root_tasks()[0];
359
360 {
361 let mut scheduler = Scheduler::new(&graph, &mut flow);
362 scheduler.update_readiness();
363 scheduler.start_task(t1).unwrap();
364 scheduler.complete_execution(t1).unwrap();
365 scheduler.record_verification(t1, false, true).unwrap();
367 }
368
369 let exec = flow.get_task_execution(t1).unwrap();
370 assert_eq!(exec.state, TaskExecState::Retry);
371
372 {
373 let mut scheduler = Scheduler::new(&graph, &mut flow);
374 scheduler.start_task(t1).unwrap();
376 }
377
378 let exec = flow.get_task_execution(t1).unwrap();
379 assert_eq!(exec.attempt_count, 2);
380 }
381
382 #[test]
383 fn failure_handling() {
384 let (graph, mut flow) = setup_test();
385 let t1 = graph.root_tasks()[0];
386
387 let mut scheduler = Scheduler::new(&graph, &mut flow);
388
389 scheduler.update_readiness();
390 scheduler.start_task(t1).unwrap();
391 scheduler.complete_execution(t1).unwrap();
392
393 scheduler.record_verification(t1, false, false).unwrap();
395
396 let result = scheduler.schedule();
397 assert!(matches!(result, ScheduleResult::HasFailures(_)));
398 }
399
400 #[test]
401 fn escalation() {
402 let (graph, mut flow) = setup_test();
403 let t1 = graph.root_tasks()[0];
404
405 let mut scheduler = Scheduler::new(&graph, &mut flow);
406
407 scheduler.update_readiness();
408 scheduler.start_task(t1).unwrap();
409 scheduler.complete_execution(t1).unwrap();
410 scheduler.record_verification(t1, false, false).unwrap();
411
412 scheduler.escalate(t1).unwrap();
414
415 let exec = flow.get_task_execution(t1).unwrap();
416 assert_eq!(exec.state, TaskExecState::Escalated);
417 }
418
419 #[test]
420 fn attempt_tracking() {
421 let task_id = Uuid::new_v4();
422 let mut attempt = Attempt::new(task_id, 1);
423
424 assert_eq!(attempt.attempt_number, 1);
425 assert!(attempt.ended_at.is_none());
426 assert!(attempt.outcome.is_none());
427
428 attempt.complete(AttemptOutcome::Success);
429
430 assert!(attempt.ended_at.is_some());
431 assert_eq!(attempt.outcome, Some(AttemptOutcome::Success));
432 }
433}