taskgraph/executor/
sync.rs1use crate::error::{Result, TaskError};
5use crate::core::task::{TaskStore, TaskStatus, TaskBody};
6use core::sync::atomic::Ordering;
7
8pub struct SyncExecutor<'a, S: TaskStore> {
10 storage: &'a mut S,
11}
12
13impl<'a, S: TaskStore> SyncExecutor<'a, S> {
14 pub fn new(storage: &'a mut S) -> Self {
16 Self { storage }
17 }
18
19 pub fn run(&mut self) -> Result<()> {
21 let count = self.storage.task_count();
22 let mut completed_count = 0;
23
24 while completed_count < count {
25 let mut made_progress = false;
26
27 for i in 0..count {
28 let (ready, to_run) = {
29 if let Some(task) = self.storage.get_task(i) {
30 (
31 task.remaining_deps.load(Ordering::SeqCst) == 0,
32 task.status == TaskStatus::Pending || task.status == TaskStatus::Retrying
33 )
34 } else {
35 (false, false)
36 }
37 };
38
39 if ready && to_run {
40 self.storage.update_status(i, TaskStatus::Running)?;
42
43 let res = self.execute_task(i);
45
46 match res {
47 Ok(()) => {
48 self.storage.update_status(i, TaskStatus::Completed)?;
49 completed_count += 1;
50 made_progress = true;
51
52 self.propagate_completion(i)?;
54 }
55 Err(e) => {
56 self.handle_failure(i, e)?;
57 made_progress = true; }
59 }
60 }
61 }
62
63 if !made_progress && completed_count < count {
64 return Err(TaskError::InvalidState);
67 }
68 }
69
70 Ok(())
71 }
72
73 fn execute_task(&self, id: usize) -> Result<()> {
75 let task = self.storage.get_task(id).ok_or(TaskError::InvalidState)?;
76
77 #[cfg(feature = "std")]
78 {
79 let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
80 match &task.body {
81 TaskBody::Static(f) => f(),
82 #[cfg(feature = "alloc")]
83 TaskBody::Dynamic(f) => f(),
84 #[cfg(all(feature = "alloc", feature = "async"))]
85 TaskBody::Async(_) => Err(crate::error::TaskError::InvalidState),
86 }
87 }));
88
89 match res {
90 Ok(inner_res) => inner_res,
91 Err(_) => Err(TaskError::Panic), }
93 }
94
95 #[cfg(not(feature = "std"))]
96 {
97 match &task.body {
98 TaskBody::Static(f) => f(),
99 #[cfg(feature = "alloc")]
100 TaskBody::Dynamic(f) => f(),
101 #[cfg(all(feature = "alloc", feature = "async"))]
102 TaskBody::Async(_) => Err(crate::error::TaskError::InvalidState),
103 }
104 }
105 }
106
107
108 fn propagate_completion(&mut self, completed_id: usize) -> Result<()> {
110 let successors_mask = self.storage.get_successors(completed_id);
111
112 for target_id in 0..64 {
114 if (successors_mask >> target_id) & 1 == 1 {
115 if let Some(target_task) = self.storage.get_task(target_id) {
116 target_task.remaining_deps.fetch_sub(1, Ordering::SeqCst);
119 }
120 }
121 }
122
123 Ok(())
124 }
125
126 fn handle_failure(&mut self, id: usize, err: TaskError) -> Result<()> {
128 let (can_retry, _current) = {
129 let task = self.storage.get_task(id).ok_or(TaskError::InvalidState)?;
130 let current = task.current_retry.fetch_add(1, Ordering::SeqCst);
131 (current < task.retries, current)
132 };
133
134 if can_retry {
135 self.storage.update_status(id, TaskStatus::Retrying)?;
136 Ok(())
139 } else {
140 self.storage.update_status(id, TaskStatus::Failed)?;
141 Err(err)
142 }
143 }
144}