taskgraph/core/
scheduler.rs1use crate::error::{Result, TaskError};
5use crate::core::task::{TaskStore, TaskStatus};
6use core::sync::atomic::Ordering;
7
8pub fn validate_graph<S: TaskStore>(storage: &mut S) -> Result<()> {
11 let count = storage.task_count();
12 if count == 0 {
13 return Ok(());
14 }
15
16 for i in 0..count {
19 storage.update_status(i, TaskStatus::Pending)?;
20 }
21
22 let mut visited_count = 0;
23 let mut changed = true;
24
25 while changed {
26 changed = false;
27 for i in 0..count {
28 let (is_ready, is_pending) = {
29 let task = storage.get_task(i).ok_or(TaskError::InvalidState)?;
30 (
31 task.remaining_deps.load(Ordering::SeqCst) == 0,
32 task.status == TaskStatus::Pending
33 )
34 };
35
36 if is_ready && is_pending {
37 storage.update_status(i, TaskStatus::Completed)?;
38 visited_count += 1;
39 changed = true;
40
41 let successors = storage.get_successors(i);
43 for target_id in 0..64 {
44 if (successors >> target_id) & 1 == 1 {
45 if let Some(target_task) = storage.get_task(target_id) {
46 target_task.remaining_deps.fetch_sub(1, Ordering::SeqCst);
47 }
48 }
49 }
50 }
51 }
52 }
53
54 for i in 0..count {
57 if let Some(task) = storage.get_task_mut(i) {
58 task.remaining_deps.store(task.initial_deps, Ordering::SeqCst);
59 task.status = TaskStatus::Pending;
60 }
61 }
62
63 if visited_count == count {
64 Ok(())
65 } else {
66 Err(TaskError::CycleDetected)
67 }
68}