Skip to main content

taskgraph/core/
scheduler.rs

1//! Graph validation and topological sort (Kahn's algorithm).
2//! Ensures the graph is a valid DAG before execution.
3
4use crate::error::{Result, TaskError};
5use crate::core::task::{TaskStore, TaskStatus};
6use core::sync::atomic::Ordering;
7
8/// Check for cycles in the task graph using Kahn's algorithm.
9/// This is a pre-execution validation step.
10pub fn validate_graph<S: TaskStore>(storage: &mut S) -> Result<()> {
11    let count = storage.task_count();
12    if count == 0 {
13        return Ok(());
14    }
15
16    // 1. Initialize: Mark all tasks as Pending for validation.
17    // NOTE: We rely on the initial state of remaining_deps.
18    for i in 0..count {
19        storage.update_status(i, TaskStatus::Pending)?;
20    }
21
22    let mut visited_count = 0;
23    let mut changed = true;
24
25    while changed {
26        changed = false;
27        for i in 0..count {
28            let (is_ready, is_pending) = {
29                let task = storage.get_task(i).ok_or(TaskError::InvalidState)?;
30                (
31                    task.remaining_deps.load(Ordering::SeqCst) == 0, 
32                    task.status == TaskStatus::Pending
33                )
34            };
35
36            if is_ready && is_pending {
37                storage.update_status(i, TaskStatus::Completed)?;
38                visited_count += 1;
39                changed = true;
40                
41                // Virtual propagation: Decrement successors' remaining_deps
42                let successors = storage.get_successors(i);
43                for target_id in 0..64 {
44                    if (successors >> target_id) & 1 == 1 {
45                        if let Some(target_task) = storage.get_task(target_id) {
46                            target_task.remaining_deps.fetch_sub(1, Ordering::SeqCst);
47                        }
48                    }
49                }
50            }
51        }
52    }
53
54    // 2. Reset the state for actual execution
55    // We must RESTORE remaining_deps from initial_deps.
56    for i in 0..count {
57        if let Some(task) = storage.get_task_mut(i) {
58            task.remaining_deps.store(task.initial_deps, Ordering::SeqCst);
59            task.status = TaskStatus::Pending;
60        }
61    }
62
63    if visited_count == count {
64        Ok(())
65    } else {
66        Err(TaskError::CycleDetected)
67    }
68}