1use crate::error::ExecutorError;
5use crate::item::ExecutableItem;
6use crate::trigger::{TriggerDecl, TriggerDeclarer};
7
8#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
10pub struct Vertex(pub(crate) usize);
11
12#[allow(clippy::redundant_pub_crate)]
20pub(crate) struct Graph {
21 pub(crate) items: Vec<Box<dyn ExecutableItem>>,
22 pub(crate) successors: Vec<Vec<usize>>, pub(crate) in_degree: Vec<usize>, pub(crate) root: usize,
25 pub(crate) decls: Vec<TriggerDecl>,
26
27 vertex_ptrs: Vec<VertexPtr>,
33 counters: Vec<AtomicUsize>,
37 pending: AtomicUsize,
39 stop_flag: AtomicBool,
41 stop_chain_seen: AtomicBool,
43 first_err: Mutex<Option<crate::error::ItemError>>,
45 done_cv: (Mutex<()>, Condvar),
47 ready_ring: crate::ready_ring::ReadyRing,
51 vertex_jobs: Vec<Box<dyn FnMut() + Send + 'static>>,
58}
59
60impl core::fmt::Debug for Graph {
61 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62 f.debug_struct("Graph")
63 .field("n_items", &self.items.len())
64 .field("successors", &self.successors)
65 .field("in_degree", &self.in_degree)
66 .field("root", &self.root)
67 .finish_non_exhaustive()
68 }
69}
70
71impl Graph {
72 pub(crate) fn root_task_id(&self) -> Option<&str> {
74 self.items[self.root].task_id()
75 }
76}
77
78pub struct GraphBuilder {
80 items: Vec<Box<dyn ExecutableItem>>,
81 edges: Vec<(usize, usize)>,
82 root: Option<usize>,
83}
84
85impl GraphBuilder {
86 pub(crate) fn new() -> Self {
87 Self {
88 items: Vec::new(),
89 edges: Vec::new(),
90 root: None,
91 }
92 }
93
94 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> Vertex {
96 let idx = self.items.len();
97 self.items.push(Box::new(item));
98 Vertex(idx)
99 }
100
101 pub fn edge(&mut self, from: Vertex, to: Vertex) -> &mut Self {
103 self.edges.push((from.0, to.0));
104 self
105 }
106
107 pub const fn root(&mut self, v: Vertex) -> &mut Self {
109 self.root = Some(v.0);
110 self
111 }
112
113 #[allow(clippy::too_many_lines)]
115 pub(crate) fn finish(mut self) -> Result<Graph, ExecutorError> {
116 let n = self.items.len();
117 if n == 0 {
118 return Err(ExecutorError::InvalidGraph("graph has no vertices".into()));
119 }
120 let root = self
121 .root
122 .ok_or_else(|| ExecutorError::InvalidGraph("no root vertex set".into()))?;
123 if root >= n {
124 return Err(ExecutorError::InvalidGraph(
125 "root index out of bounds".into(),
126 ));
127 }
128
129 let mut successors = vec![Vec::<usize>::new(); n];
130 let mut in_degree = vec![0_usize; n];
131 for &(from, to) in &self.edges {
132 if from >= n || to >= n {
133 return Err(ExecutorError::InvalidGraph(
134 "edge index out of bounds".into(),
135 ));
136 }
137 if from == to {
138 return Err(ExecutorError::InvalidGraph(
139 "self-loops are not allowed".into(),
140 ));
141 }
142 successors[from].push(to);
143 in_degree[to] += 1;
144 }
145
146 let mut k_in = in_degree.clone();
148 let mut queue: Vec<usize> = k_in
149 .iter()
150 .enumerate()
151 .filter_map(|(i, d)| (*d == 0).then_some(i))
152 .collect();
153 let mut visited = 0_usize;
154 while let Some(u) = queue.pop() {
155 visited += 1;
156 for &v in &successors[u] {
157 k_in[v] -= 1;
158 if k_in[v] == 0 {
159 queue.push(v);
160 }
161 }
162 }
163 if visited != n {
164 return Err(ExecutorError::InvalidGraph("graph contains a cycle".into()));
165 }
166
167 let mut reach = vec![false; n];
169 let mut stack = vec![root];
170 while let Some(u) = stack.pop() {
171 if reach[u] {
172 continue;
173 }
174 reach[u] = true;
175 for &v in &successors[u] {
176 stack.push(v);
177 }
178 }
179 if reach.iter().any(|r| !*r) {
180 return Err(ExecutorError::InvalidGraph(
181 "every vertex must be reachable from the root".into(),
182 ));
183 }
184
185 let mut decl = TriggerDeclarer::new_internal();
187 self.items[root].declare_triggers(&mut decl)?;
188 let decls = decl.into_decls();
189
190 for (i, body) in self.items.iter_mut().enumerate() {
192 if i == root {
193 continue;
194 }
195 let mut spurious = TriggerDeclarer::new_internal();
196 let _ = body.declare_triggers(&mut spurious);
197 if !spurious.is_empty() {
198 #[cfg(feature = "tracing")]
199 tracing::warn!(target: "taktora-executor", vertex = i,
200 "non-root graph vertex declared triggers; ignored");
201 }
202 }
203
204 let n_items = self.items.len();
205 let mut items = self.items;
206 #[allow(unsafe_code)]
209 let vertex_ptrs: Vec<VertexPtr> = items
210 .iter_mut()
211 .map(|b| VertexPtr(std::ptr::from_mut(b.as_mut())))
212 .collect();
213 let counters: Vec<AtomicUsize> = in_degree.iter().map(|d| AtomicUsize::new(*d)).collect();
214
215 Ok(Graph {
216 items,
217 successors,
218 in_degree,
219 root,
220 decls,
221 vertex_ptrs,
222 counters,
223 pending: AtomicUsize::new(n_items),
224 stop_flag: AtomicBool::new(false),
225 stop_chain_seen: AtomicBool::new(false),
226 first_err: Mutex::new(None),
227 done_cv: (Mutex::new(()), Condvar::new()),
228 ready_ring: crate::ready_ring::ReadyRing::new(n_items),
229 vertex_jobs: Vec::new(),
230 })
231 }
232}
233
234use crate::context::Stoppable;
237use crate::monitor::ExecutionMonitor;
238use crate::observer::Observer;
239use crate::pool::Pool;
240use crate::task_id::TaskId;
241use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
242use std::sync::{Arc, Condvar, Mutex};
243
244#[allow(clippy::redundant_pub_crate)]
246pub(crate) struct GraphRunOutcome {
247 #[allow(clippy::redundant_pub_crate)]
248 pub(crate) error: Option<crate::error::ItemError>,
249 #[allow(clippy::redundant_pub_crate)]
250 pub(crate) stopped_chain: bool,
251}
252
253struct VertexPtr(*mut dyn ExecutableItem);
255
256#[allow(unsafe_code)]
261unsafe impl Send for VertexPtr {}
262#[allow(unsafe_code)]
263unsafe impl Sync for VertexPtr {}
264
265#[allow(unsafe_code)]
272#[derive(Copy, Clone)]
273struct SendGraphPtr(*const Graph);
274
275impl SendGraphPtr {
276 const fn get(&self) -> *const Graph {
280 self.0
281 }
282}
283
284#[allow(unsafe_code)]
285unsafe impl Send for SendGraphPtr {}
286#[allow(unsafe_code)]
287unsafe impl Sync for SendGraphPtr {}
288
289impl Graph {
290 fn finalise_skipped(&self, i: usize) {
291 if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
292 self.notify_done();
293 return;
294 }
295 for &j in &self.successors[i] {
296 self.cancel_subtree(j);
297 }
298 }
299
300 fn cancel_subtree(&self, root: usize) {
301 let mut stack = vec![root];
309 while let Some(u) = stack.pop() {
310 let prev = self.counters[u].swap(usize::MAX, Ordering::AcqRel);
311 if prev != usize::MAX {
312 if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
313 self.notify_done();
314 return;
315 }
316 for &v in &self.successors[u] {
317 stack.push(v);
318 }
319 }
320 }
321 }
322
323 fn notify_done(&self) {
324 let _g = self.done_cv.0.lock().unwrap();
325 self.done_cv.1.notify_all();
326 }
327}
328
329impl Graph {
330 #[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
340 pub(crate) fn prepare_dispatch(
341 self: &mut Box<Self>,
342 task_id: TaskId,
343 stop: Stoppable,
344 observer: Arc<dyn Observer>,
345 monitor: Arc<dyn ExecutionMonitor>,
346 err_slot: Arc<Mutex<Option<crate::error::ExecutorError>>>,
347 ) {
348 let n = self.items.len();
349 #[allow(unsafe_code)]
357 let graph_ptr = SendGraphPtr(std::ptr::from_ref::<Self>(self.as_ref()));
358
359 let mut jobs: Vec<Box<dyn FnMut() + Send + 'static>> = Vec::with_capacity(n);
360 for i in 0..n {
361 let task_id = task_id.clone();
362 let stop = stop.clone();
363 let observer = Arc::clone(&observer);
364 let monitor = Arc::clone(&monitor);
365 let err_slot = Arc::clone(&err_slot);
366 let job: Box<dyn FnMut() + Send + 'static> = Box::new(move || {
367 #[allow(unsafe_code)]
371 let g: &Self = unsafe { &*graph_ptr.get() };
372
373 if g.stop_flag.load(Ordering::Acquire) {
374 g.finalise_skipped(i);
375 return;
376 }
377 let mut ctx = crate::context::Context::new(&task_id, &stop, observer.as_ref());
378 let ptr = g.vertex_ptrs[i].0;
379 #[allow(unsafe_code)]
384 let app_id = unsafe { (*ptr).app_id() };
385 #[allow(unsafe_code)]
386 let app_inst = unsafe { (*ptr).app_instance_id() };
387 if let Some(aid) = app_id {
388 observer.on_app_start(task_id.clone(), aid, app_inst);
389 }
390 let started = std::time::Instant::now();
391 monitor.pre_execute(task_id.clone(), started);
392 #[allow(unsafe_code)]
393 let res =
394 crate::executor::run_item_catch_unwind_external(unsafe { &mut *ptr }, &mut ctx);
395 let took = started.elapsed();
396 monitor.post_execute(task_id.clone(), started, took, res.is_ok());
397 if let Err(ref e) = res {
398 observer.on_app_error(task_id.clone(), e.as_ref());
399 }
400 if app_id.is_some() {
401 observer.on_app_stop(task_id.clone());
402 }
403 match &res {
404 Ok(crate::ControlFlow::Continue) => {}
405 Ok(crate::ControlFlow::StopChain) => {
406 g.stop_chain_seen.store(true, Ordering::Release);
407 g.stop_flag.store(true, Ordering::Release);
408 }
409 Err(_) => g.stop_flag.store(true, Ordering::Release),
410 }
411 if let Err(e) = res {
412 let mut fe = g.first_err.lock().unwrap();
413 if fe.is_none() {
414 *fe = Some(e);
415 }
416 }
417 if g.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
418 g.notify_done();
419 } else if g.stop_flag.load(Ordering::Acquire) {
420 for &j in &g.successors[i] {
421 g.cancel_subtree(j);
422 }
423 } else {
424 for &j in &g.successors[i] {
425 if g.counters[j].fetch_sub(1, Ordering::AcqRel) == 1 {
426 g.ready_ring
431 .push(j)
432 .expect("ready_ring sized to n_vertices");
433 }
434 }
435 }
436 let _ = &err_slot; });
439 jobs.push(job);
440 }
441 self.vertex_jobs = jobs;
442 }
443
444 #[allow(unsafe_code)]
449 pub(crate) fn run_once_borrowed(&mut self, pool: &Pool) -> GraphRunOutcome {
450 let n = self.items.len();
451
452 for (c, d) in self.counters.iter().zip(self.in_degree.iter()) {
454 c.store(*d, Ordering::Relaxed);
455 }
456 self.pending.store(n, Ordering::Relaxed);
457 self.stop_flag.store(false, Ordering::Relaxed);
458 self.stop_chain_seen.store(false, Ordering::Relaxed);
459 *self.first_err.lock().unwrap() = None;
460 self.ready_ring.reset();
461
462 for i in 0..n {
472 if self.in_degree[i] == 0 {
473 self.dispatch_vertex(pool, i);
474 }
475 }
476
477 loop {
479 while let Some(i) = self.ready_ring.pop() {
480 self.dispatch_vertex(pool, i);
481 }
482 if self.pending.load(Ordering::Acquire) == 0 {
483 break;
484 }
485 let guard = self.done_cv.0.lock().unwrap();
486 if self.pending.load(Ordering::Acquire) == 0 {
487 drop(guard);
488 break;
489 }
490 drop(
491 self.done_cv
492 .1
493 .wait_timeout(guard, std::time::Duration::from_millis(5))
494 .unwrap()
495 .0,
496 );
497 }
498 while self.ready_ring.pop().is_some() {}
500
501 let mut first_err = self.first_err.lock().unwrap();
502 GraphRunOutcome {
503 error: first_err.take(),
504 stopped_chain: self.stop_chain_seen.load(Ordering::Acquire),
505 }
506 }
507
508 #[allow(unsafe_code)]
511 fn dispatch_vertex(&mut self, pool: &Pool, i: usize) {
512 let job_ptr: *mut (dyn FnMut() + Send) =
513 std::ptr::from_mut::<dyn FnMut() + Send>(self.vertex_jobs[i].as_mut());
514 unsafe {
523 pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::{ControlFlow, item};
532
533 #[test]
534 fn empty_graph_rejected() {
535 let b = GraphBuilder::new();
536 let err = b.finish().expect_err("empty graph");
537 assert!(format!("{err}").contains("no vertices"));
538 }
539
540 #[test]
541 fn missing_root_rejected() {
542 let mut b = GraphBuilder::new();
543 b.vertex(item(|_| Ok(ControlFlow::Continue)));
544 let err = b.finish().expect_err("missing root");
545 assert!(format!("{err}").contains("no root"));
546 }
547
548 #[test]
549 fn cycle_rejected() {
550 let mut b = GraphBuilder::new();
551 let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
552 let v = b.vertex(item(|_| Ok(ControlFlow::Continue)));
553 b.edge(a, v).edge(v, a).root(a);
554 let err = b.finish().expect_err("cycle");
555 assert!(format!("{err}").contains("cycle"));
556 }
557
558 #[test]
559 fn unreachable_vertex_rejected() {
560 let mut b = GraphBuilder::new();
561 let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
562 let _orphan = b.vertex(item(|_| Ok(ControlFlow::Continue)));
563 b.root(a);
564 let err = b.finish().expect_err("unreachable");
565 assert!(format!("{err}").contains("reachable"));
566 }
567
568 #[test]
569 #[allow(clippy::many_single_char_names)]
570 fn diamond_graph_builds() {
571 let mut b = GraphBuilder::new();
572 let r = b.vertex(item(|_| Ok(ControlFlow::Continue)));
573 let l = b.vertex(item(|_| Ok(ControlFlow::Continue)));
574 let rt = b.vertex(item(|_| Ok(ControlFlow::Continue)));
575 let m = b.vertex(item(|_| Ok(ControlFlow::Continue)));
576 b.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
577 let g = b.finish().expect("diamond");
578 assert_eq!(g.successors[r.0], vec![l.0, rt.0]);
579 assert_eq!(g.in_degree[m.0], 2);
580 }
581}