Skip to main content

taskgraph/executor/
sync.rs

1//! Synchronous executor for task graphs.
2//! Executes tasks sequentially according to the DAG order.
3
4use crate::error::{Result, TaskError};
5use crate::core::task::{TaskStore, TaskStatus, TaskBody};
6use core::sync::atomic::Ordering;
7
8/// Synchronous runner for a task graph.
9pub struct SyncExecutor<'a, S: TaskStore> {
10    storage: &'a mut S,
11}
12
13impl<'a, S: TaskStore> SyncExecutor<'a, S> {
14    /// Create a new synchronous executor.
15    pub fn new(storage: &'a mut S) -> Self {
16        Self { storage }
17    }
18
19    /// Run the graph until completion or first error.
20    pub fn run(&mut self) -> Result<()> {
21        let count = self.storage.task_count();
22        let mut completed_count = 0;
23
24        while completed_count < count {
25            let mut made_progress = false;
26
27            for i in 0..count {
28                let (ready, to_run) = {
29                    if let Some(task) = self.storage.get_task(i) {
30                        (
31                            task.remaining_deps.load(Ordering::SeqCst) == 0,
32                            task.status == TaskStatus::Pending || task.status == TaskStatus::Retrying
33                        )
34                    } else {
35                        (false, false)
36                    }
37                };
38
39                if ready && to_run {
40                    // Start task
41                    self.storage.update_status(i, TaskStatus::Running)?;
42                    
43                    // Execute task logic
44                    let res = self.execute_task(i);
45                    
46                    match res {
47                        Ok(()) => {
48                            self.storage.update_status(i, TaskStatus::Completed)?;
49                            completed_count += 1;
50                            made_progress = true;
51                            
52                            // Propagate completion (decrement dependents)
53                            self.propagate_completion(i)?;
54                        }
55                        Err(e) => {
56                            self.handle_failure(i, e)?;
57                            made_progress = true; // Retry counts as progress for the loop
58                        }
59                    }
60                }
61            }
62
63            if !made_progress && completed_count < count {
64                // If we didn't finish but no progress was made, it means there's a problem.
65                // Normally validate_graph catches cycles, but let's be safe.
66                return Err(TaskError::InvalidState);
67            }
68        }
69
70        Ok(())
71    }
72
73    /// Internal: Execute a specific task based on its ID.
74    fn execute_task(&self, id: usize) -> Result<()> {
75        let task = self.storage.get_task(id).ok_or(TaskError::InvalidState)?;
76
77        #[cfg(feature = "std")]
78        {
79            let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
80                match &task.body {
81                    TaskBody::Static(f) => f(),
82                    #[cfg(feature = "alloc")]
83                    TaskBody::Dynamic(f) => f(),
84                    #[cfg(all(feature = "alloc", feature = "async"))]
85                    TaskBody::Async(_) => Err(crate::error::TaskError::InvalidState),
86                }
87            }));
88
89            match res {
90                Ok(inner_res) => inner_res,
91                Err(_) => Err(TaskError::Panic), // Task panicked
92            }
93        }
94
95        #[cfg(not(feature = "std"))]
96        {
97            match &task.body {
98                TaskBody::Static(f) => f(),
99                #[cfg(feature = "alloc")]
100                TaskBody::Dynamic(f) => f(),
101                #[cfg(all(feature = "alloc", feature = "async"))]
102                TaskBody::Async(_) => Err(crate::error::TaskError::InvalidState),
103            }
104        }
105    }
106
107
108    /// Internal: Decrement dependencies of tasks that depend on the completed one.
109    fn propagate_completion(&mut self, completed_id: usize) -> Result<()> {
110        let successors_mask = self.storage.get_successors(completed_id);
111        
112        // Iterate over bits set in the mask (up to 64)
113        for target_id in 0..64 {
114            if (successors_mask >> target_id) & 1 == 1 {
115                if let Some(target_task) = self.storage.get_task(target_id) {
116                    // Atomic decrement.
117                    // If target_task.remaining_deps was 0, it means the graph was invalid or logic error.
118                    target_task.remaining_deps.fetch_sub(1, Ordering::SeqCst);
119                }
120            }
121        }
122        
123        Ok(())
124    }
125
126    /// Internal: Handle task failure with optional retry.
127    fn handle_failure(&mut self, id: usize, err: TaskError) -> Result<()> {
128        let (can_retry, _current) = {
129            let task = self.storage.get_task(id).ok_or(TaskError::InvalidState)?;
130            let current = task.current_retry.fetch_add(1, Ordering::SeqCst);
131            (current < task.retries, current)
132        };
133
134        if can_retry {
135            self.storage.update_status(id, TaskStatus::Retrying)?;
136            // In a sync executor, we just try again immediately (simple strategy)
137            // or we could requeue it.
138            Ok(())
139        } else {
140            self.storage.update_status(id, TaskStatus::Failed)?;
141            Err(err)
142        }
143    }
144}