1use crate::error::ExecutorError;
5use crate::item::ExecutableItem;
6use crate::trigger::{TriggerDecl, TriggerDeclarer};
7
8#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
10pub struct Vertex(pub(crate) usize);
11
12#[allow(clippy::redundant_pub_crate)]
20pub(crate) struct Graph {
21 pub(crate) items: Vec<Box<dyn ExecutableItem>>,
22 pub(crate) successors: Vec<Vec<usize>>, pub(crate) in_degree: Vec<usize>, pub(crate) root: usize,
25 pub(crate) decls: Vec<TriggerDecl>,
26
27 vertex_ptrs: Vec<VertexPtr>,
33 counters: Vec<AtomicUsize>,
37 pending: AtomicUsize,
39 stop_flag: AtomicBool,
41 stop_chain_seen: AtomicBool,
43 first_err: Mutex<Option<crate::error::ItemError>>,
45 done_cv: (Mutex<()>, Condvar),
47 ready_ring: crate::ready_ring::ReadyRing,
51 vertex_jobs: Vec<Box<dyn FnMut() + Send + 'static>>,
58}
59
60impl core::fmt::Debug for Graph {
61 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62 f.debug_struct("Graph")
63 .field("n_items", &self.items.len())
64 .field("successors", &self.successors)
65 .field("in_degree", &self.in_degree)
66 .field("root", &self.root)
67 .finish_non_exhaustive()
68 }
69}
70
71impl Graph {
72 pub(crate) fn root_task_id(&self) -> Option<&str> {
74 self.items[self.root].task_id()
75 }
76}
77
78pub struct GraphBuilder {
80 items: Vec<Box<dyn ExecutableItem>>,
81 edges: Vec<(usize, usize)>,
82 root: Option<usize>,
83}
84
85impl GraphBuilder {
86 pub(crate) fn new() -> Self {
87 Self {
88 items: Vec::new(),
89 edges: Vec::new(),
90 root: None,
91 }
92 }
93
94 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> Vertex {
96 let idx = self.items.len();
97 self.items.push(Box::new(item));
98 Vertex(idx)
99 }
100
101 pub fn edge(&mut self, from: Vertex, to: Vertex) -> &mut Self {
103 self.edges.push((from.0, to.0));
104 self
105 }
106
107 pub const fn root(&mut self, v: Vertex) -> &mut Self {
109 self.root = Some(v.0);
110 self
111 }
112
113 pub(crate) fn finish(mut self) -> Result<Graph, ExecutorError> {
121 let n = self.items.len();
122 let root = Self::validate_root(self.root, n)?;
123 let (successors, in_degree) = Self::build_adjacency(&self.edges, n)?;
124 Self::assert_acyclic(&successors, &in_degree, n)?;
125 Self::assert_reachable(&successors, root, n)?;
126 let decls = self.collect_root_decls(root)?;
127 Ok(Self::assemble(
128 self.items, successors, in_degree, root, decls,
129 ))
130 }
131
132 fn validate_root(root: Option<usize>, n: usize) -> Result<usize, ExecutorError> {
138 if n == 0 {
139 return Err(ExecutorError::InvalidGraph("graph has no vertices".into()));
140 }
141 let root = root.ok_or_else(|| ExecutorError::InvalidGraph("no root vertex set".into()))?;
142 if root >= n {
143 return Err(ExecutorError::InvalidGraph(
144 "root index out of bounds".into(),
145 ));
146 }
147 Ok(root)
148 }
149
150 fn build_adjacency(
154 edges: &[(usize, usize)],
155 n: usize,
156 ) -> Result<(Vec<Vec<usize>>, Vec<usize>), ExecutorError> {
157 let mut successors = vec![Vec::<usize>::new(); n];
158 let mut in_degree = vec![0_usize; n];
159 for &(from, to) in edges {
160 if from >= n || to >= n {
161 return Err(ExecutorError::InvalidGraph(
162 "edge index out of bounds".into(),
163 ));
164 }
165 if from == to {
166 return Err(ExecutorError::InvalidGraph(
167 "self-loops are not allowed".into(),
168 ));
169 }
170 successors[from].push(to);
171 in_degree[to] += 1;
172 }
173 Ok((successors, in_degree))
174 }
175
176 fn assert_acyclic(
179 successors: &[Vec<usize>],
180 in_degree: &[usize],
181 n: usize,
182 ) -> Result<(), ExecutorError> {
183 let mut k_in = in_degree.to_vec();
184 let mut queue: Vec<usize> = k_in
185 .iter()
186 .enumerate()
187 .filter_map(|(i, d)| (*d == 0).then_some(i))
188 .collect();
189 let mut visited = 0_usize;
190 while let Some(u) = queue.pop() {
191 visited += 1;
192 for &v in &successors[u] {
193 k_in[v] -= 1;
194 if k_in[v] == 0 {
195 queue.push(v);
196 }
197 }
198 }
199 if visited != n {
200 return Err(ExecutorError::InvalidGraph("graph contains a cycle".into()));
201 }
202 Ok(())
203 }
204
205 fn assert_reachable(
207 successors: &[Vec<usize>],
208 root: usize,
209 n: usize,
210 ) -> Result<(), ExecutorError> {
211 let mut reach = vec![false; n];
212 let mut stack = vec![root];
213 while let Some(u) = stack.pop() {
214 if reach[u] {
215 continue;
216 }
217 reach[u] = true;
218 for &v in &successors[u] {
219 stack.push(v);
220 }
221 }
222 if reach.iter().any(|r| !*r) {
223 return Err(ExecutorError::InvalidGraph(
224 "every vertex must be reachable from the root".into(),
225 ));
226 }
227 Ok(())
228 }
229
230 fn collect_root_decls(&mut self, root: usize) -> Result<Vec<TriggerDecl>, ExecutorError> {
234 let mut decl = TriggerDeclarer::new_internal();
235 self.items[root].declare_triggers(&mut decl)?;
236 let decls = decl.into_decls();
237
238 for (i, body) in self.items.iter_mut().enumerate() {
239 if i == root {
240 continue;
241 }
242 let mut spurious = TriggerDeclarer::new_internal();
243 let _ = body.declare_triggers(&mut spurious);
244 if !spurious.is_empty() {
245 #[cfg(feature = "tracing")]
246 tracing::warn!(target: "taktora-executor", vertex = i,
247 "non-root graph vertex declared triggers; ignored");
248 }
249 }
250 Ok(decls)
251 }
252
253 fn assemble(
256 mut items: Vec<Box<dyn ExecutableItem>>,
257 successors: Vec<Vec<usize>>,
258 in_degree: Vec<usize>,
259 root: usize,
260 decls: Vec<TriggerDecl>,
261 ) -> Graph {
262 let n_items = items.len();
263 #[allow(unsafe_code)]
266 let vertex_ptrs: Vec<VertexPtr> = items
267 .iter_mut()
268 .map(|b| VertexPtr(std::ptr::from_mut(b.as_mut())))
269 .collect();
270 let counters: Vec<AtomicUsize> = in_degree.iter().map(|d| AtomicUsize::new(*d)).collect();
271
272 Graph {
273 items,
274 successors,
275 in_degree,
276 root,
277 decls,
278 vertex_ptrs,
279 counters,
280 pending: AtomicUsize::new(n_items),
281 stop_flag: AtomicBool::new(false),
282 stop_chain_seen: AtomicBool::new(false),
283 first_err: Mutex::new(None),
284 done_cv: (Mutex::new(()), Condvar::new()),
285 ready_ring: crate::ready_ring::ReadyRing::new(n_items),
286 vertex_jobs: Vec::new(),
287 }
288 }
289}
290
291use crate::context::Stoppable;
294use crate::monitor::ExecutionMonitor;
295use crate::observer::Observer;
296use crate::pool::Pool;
297use crate::task_id::TaskId;
298use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
299use std::sync::{Arc, Condvar, Mutex};
300
301#[allow(clippy::redundant_pub_crate)]
303pub(crate) struct GraphRunOutcome {
304 #[allow(clippy::redundant_pub_crate)]
305 pub(crate) error: Option<crate::error::ItemError>,
306 #[allow(clippy::redundant_pub_crate)]
307 pub(crate) stopped_chain: bool,
308}
309
310struct VertexPtr(*mut dyn ExecutableItem);
312
313#[allow(unsafe_code)]
318unsafe impl Send for VertexPtr {}
319#[allow(unsafe_code)]
320unsafe impl Sync for VertexPtr {}
321
322#[allow(unsafe_code)]
329#[derive(Copy, Clone)]
330struct SendGraphPtr(*const Graph);
331
332impl SendGraphPtr {
333 const fn get(&self) -> *const Graph {
337 self.0
338 }
339}
340
341#[allow(unsafe_code)]
342unsafe impl Send for SendGraphPtr {}
343#[allow(unsafe_code)]
344unsafe impl Sync for SendGraphPtr {}
345
346impl Graph {
347 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
348 fn finalise_skipped(&self, i: usize) {
349 if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
350 self.notify_done();
351 return;
352 }
353 for &j in &self.successors[i] {
354 self.cancel_subtree(j);
355 }
356 }
357
358 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
359 fn cancel_subtree(&self, root: usize) {
360 let mut stack = vec![root];
368 while let Some(u) = stack.pop() {
369 let prev = self.counters[u].swap(usize::MAX, Ordering::AcqRel);
370 if prev != usize::MAX {
371 if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
372 self.notify_done();
373 return;
374 }
375 for &v in &self.successors[u] {
376 stack.push(v);
377 }
378 }
379 }
380 }
381
382 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
383 fn notify_done(&self) {
384 #[allow(clippy::unwrap_used)]
387 let _g = self.done_cv.0.lock().unwrap();
388 self.done_cv.1.notify_all();
389 }
390}
391
392impl Graph {
393 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
403 #[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
404 pub(crate) fn prepare_dispatch(
405 self: &mut Box<Self>,
406 task_id: TaskId,
407 stop: Stoppable,
408 observer: Arc<dyn Observer>,
409 monitor: Arc<dyn ExecutionMonitor>,
410 err_slot: Arc<Mutex<Option<crate::error::ExecutorError>>>,
411 ) {
412 let n = self.items.len();
413 #[allow(unsafe_code)]
421 let graph_ptr = SendGraphPtr(std::ptr::from_ref::<Self>(self.as_ref()));
422
423 let mut jobs: Vec<Box<dyn FnMut() + Send + 'static>> = Vec::with_capacity(n);
424 for i in 0..n {
425 let task_id = task_id.clone();
426 let stop = stop.clone();
427 let observer = Arc::clone(&observer);
428 let monitor = Arc::clone(&monitor);
429 let err_slot = Arc::clone(&err_slot);
430 let job: Box<dyn FnMut() + Send + 'static> = Box::new(move || {
431 #[allow(unsafe_code)]
435 let g: &Self = unsafe { &*graph_ptr.get() };
436
437 if g.stop_flag.load(Ordering::Acquire) {
438 g.finalise_skipped(i);
439 return;
440 }
441 let mut ctx = crate::context::Context::new(&task_id, &stop, observer.as_ref());
442 let ptr = g.vertex_ptrs[i].0;
443 #[allow(unsafe_code)]
448 let app_id = unsafe { (*ptr).app_id() };
449 #[allow(unsafe_code)]
450 let app_inst = unsafe { (*ptr).app_instance_id() };
451 if let Some(aid) = app_id {
452 observer.on_app_start(task_id.clone(), aid, app_inst);
453 }
454 let started = std::time::Instant::now();
455 monitor.pre_execute(task_id.clone(), started);
456 #[allow(unsafe_code)]
457 let res =
458 crate::executor::run_item_catch_unwind_external(unsafe { &mut *ptr }, &mut ctx);
459 let took = started.elapsed();
460 monitor.post_execute(task_id.clone(), started, took, res.is_ok());
461 if let Err(ref e) = res {
462 observer.on_app_error(task_id.clone(), e.as_ref());
463 }
464 if app_id.is_some() {
465 observer.on_app_stop(task_id.clone());
466 }
467 match &res {
468 Ok(crate::ControlFlow::Continue) => {}
469 Ok(crate::ControlFlow::StopChain) => {
470 g.stop_chain_seen.store(true, Ordering::Release);
471 g.stop_flag.store(true, Ordering::Release);
472 }
473 Err(_) => g.stop_flag.store(true, Ordering::Release),
474 }
475 if let Err(e) = res {
476 #[allow(clippy::unwrap_used)]
480 let mut fe = g.first_err.lock().unwrap();
481 if fe.is_none() {
482 *fe = Some(e);
483 }
484 }
485 if g.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
486 g.notify_done();
487 } else if g.stop_flag.load(Ordering::Acquire) {
488 for &j in &g.successors[i] {
489 g.cancel_subtree(j);
490 }
491 } else {
492 for &j in &g.successors[i] {
493 if g.counters[j].fetch_sub(1, Ordering::AcqRel) == 1 {
494 #[allow(clippy::expect_used)]
498 g.ready_ring
499 .push(j)
500 .expect("ready_ring sized to n_vertices");
501 }
502 }
503 }
504 let _ = &err_slot; });
507 jobs.push(job);
508 }
509 self.vertex_jobs = jobs;
510 }
511
512 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
517 #[allow(unsafe_code)]
518 pub(crate) fn run_once_borrowed(&mut self, pool: &Pool) -> GraphRunOutcome {
519 let n = self.items.len();
520
521 for (c, d) in self.counters.iter().zip(self.in_degree.iter()) {
523 c.store(*d, Ordering::Relaxed);
524 }
525 self.pending.store(n, Ordering::Relaxed);
526 self.stop_flag.store(false, Ordering::Relaxed);
527 self.stop_chain_seen.store(false, Ordering::Relaxed);
528 #[allow(clippy::unwrap_used)]
531 {
532 *self.first_err.lock().unwrap() = None;
533 }
534 self.ready_ring.reset();
535
536 for i in 0..n {
546 if self.in_degree[i] == 0 {
547 self.dispatch_vertex(pool, i);
548 }
549 }
550
551 loop {
553 while let Some(i) = self.ready_ring.pop() {
554 self.dispatch_vertex(pool, i);
555 }
556 if self.pending.load(Ordering::Acquire) == 0 {
557 break;
558 }
559 #[allow(clippy::unwrap_used)]
562 let guard = self.done_cv.0.lock().unwrap();
563 if self.pending.load(Ordering::Acquire) == 0 {
564 drop(guard);
565 break;
566 }
567 #[allow(clippy::unwrap_used)]
570 drop(
571 self.done_cv
572 .1
573 .wait_timeout(guard, std::time::Duration::from_millis(5))
574 .unwrap()
575 .0,
576 );
577 }
578 while self.ready_ring.pop().is_some() {}
580
581 #[allow(clippy::unwrap_used)]
584 let mut first_err = self.first_err.lock().unwrap();
585 GraphRunOutcome {
586 error: first_err.take(),
587 stopped_chain: self.stop_chain_seen.load(Ordering::Acquire),
588 }
589 }
590
591 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
594 #[allow(unsafe_code)]
595 fn dispatch_vertex(&mut self, pool: &Pool, i: usize) {
596 let job_ptr: *mut (dyn FnMut() + Send) =
597 std::ptr::from_mut::<dyn FnMut() + Send>(self.vertex_jobs[i].as_mut());
598 unsafe {
607 pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use crate::{ControlFlow, item};
616
617 #[test]
618 fn empty_graph_rejected() {
619 let b = GraphBuilder::new();
620 let err = b.finish().expect_err("empty graph");
621 assert!(format!("{err}").contains("no vertices"));
622 }
623
624 #[test]
625 fn missing_root_rejected() {
626 let mut b = GraphBuilder::new();
627 b.vertex(item(|_| Ok(ControlFlow::Continue)));
628 let err = b.finish().expect_err("missing root");
629 assert!(format!("{err}").contains("no root"));
630 }
631
632 #[test]
633 fn cycle_rejected() {
634 let mut b = GraphBuilder::new();
635 let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
636 let v = b.vertex(item(|_| Ok(ControlFlow::Continue)));
637 b.edge(a, v).edge(v, a).root(a);
638 let err = b.finish().expect_err("cycle");
639 assert!(format!("{err}").contains("cycle"));
640 }
641
642 #[test]
643 fn unreachable_vertex_rejected() {
644 let mut b = GraphBuilder::new();
645 let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
646 let _orphan = b.vertex(item(|_| Ok(ControlFlow::Continue)));
647 b.root(a);
648 let err = b.finish().expect_err("unreachable");
649 assert!(format!("{err}").contains("reachable"));
650 }
651
652 #[test]
653 #[allow(clippy::many_single_char_names)]
654 fn diamond_graph_builds() {
655 let mut b = GraphBuilder::new();
656 let r = b.vertex(item(|_| Ok(ControlFlow::Continue)));
657 let l = b.vertex(item(|_| Ok(ControlFlow::Continue)));
658 let rt = b.vertex(item(|_| Ok(ControlFlow::Continue)));
659 let m = b.vertex(item(|_| Ok(ControlFlow::Continue)));
660 b.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
661 let g = b.finish().expect("diamond");
662 assert_eq!(g.successors[r.0], vec![l.0, rt.0]);
663 assert_eq!(g.in_degree[m.0], 2);
664 }
665}