Skip to main content

taktora_executor/
graph.rs

1//! Parallel-execution graph: a DAG of [`ExecutableItem`]s rooted at a single
2//! vertex whose triggers gate the whole graph.
3
4use crate::error::ExecutorError;
5use crate::item::ExecutableItem;
6use crate::trigger::{TriggerDecl, TriggerDeclarer};
7
8/// Opaque handle to a graph vertex. Returned by [`GraphBuilder::vertex`].
9#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
10pub struct Vertex(pub(crate) usize);
11
12/// Internal graph storage.
13///
14/// Stored inside `TaskKind::Graph(Box<Graph>)` to guarantee a stable heap
15/// address — the per-vertex dispatch closures capture a `*const Graph`
16/// pointing back into this struct, and would dangle if the `Graph` moved.
17/// All runtime state below is pre-allocated at `finish()` time and reset
18/// in place each `run_once_borrowed` call. Required for `REQ_0060`.
19#[allow(clippy::redundant_pub_crate)]
20pub(crate) struct Graph {
21    pub(crate) items: Vec<Box<dyn ExecutableItem>>,
22    pub(crate) successors: Vec<Vec<usize>>, // adjacency list
23    pub(crate) in_degree: Vec<usize>,       // initial in-degree
24    pub(crate) root: usize,
25    pub(crate) decls: Vec<TriggerDecl>,
26
27    // ── Pre-allocated dispatch state (REQ_0060) ────────────────────────
28    /// Stable raw pointers into each item's heap-allocated `Box`.
29    /// Populated once in `finish`. The `Box` contents do not move when
30    /// the outer `Vec` resizes, so these pointers stay valid for the
31    /// lifetime of the `Graph`.
32    vertex_ptrs: Vec<VertexPtr>,
33    /// Per-vertex in-degree counter; reset to `in_degree[i]` at the top
34    /// of every `run_once_borrowed`. `usize::MAX` is used as a "cancelled"
35    /// sentinel during stop-flag propagation.
36    counters: Vec<AtomicUsize>,
37    /// Number of vertices still pending in the current run.
38    pending: AtomicUsize,
39    /// Stop request observed during this run.
40    stop_flag: AtomicBool,
41    /// `ControlFlow::StopChain` observed during this run.
42    stop_chain_seen: AtomicBool,
43    /// First per-vertex error observed during this run.
44    first_err: Mutex<Option<crate::error::ItemError>>,
45    /// Completion condvar; signalled when `pending` reaches zero.
46    done_cv: (Mutex<()>, Condvar),
47    /// Re-dispatch ring — completed pool workers push ready successors;
48    /// the `WaitSet` thread drains and re-dispatches. Sized to
49    /// `next_power_of_two(n_vertices)` at `finish`. Required for `REQ_0060`.
50    ready_ring: crate::ready_ring::ReadyRing,
51    /// Per-vertex pre-built dispatch closures. Empty after `finish`,
52    /// populated by `prepare_dispatch` when the graph is registered with
53    /// an executor (it needs `task_id`/`stop`/`obs`/`mon`/`err_slot` from
54    /// the executor). Used by `run_once_borrowed` via
55    /// `Pool::submit_borrowed`, avoiding the per-vertex `Box` allocation
56    /// that `Pool::submit` requires.
57    vertex_jobs: Vec<Box<dyn FnMut() + Send + 'static>>,
58}
59
60impl core::fmt::Debug for Graph {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        f.debug_struct("Graph")
63            .field("n_items", &self.items.len())
64            .field("successors", &self.successors)
65            .field("in_degree", &self.in_degree)
66            .field("root", &self.root)
67            .finish_non_exhaustive()
68    }
69}
70
71impl Graph {
72    /// Return the root vertex's `task_id()` override, if any.
73    pub(crate) fn root_task_id(&self) -> Option<&str> {
74        self.items[self.root].task_id()
75    }
76}
77
78/// Builder for a graph.
79pub struct GraphBuilder {
80    items: Vec<Box<dyn ExecutableItem>>,
81    edges: Vec<(usize, usize)>,
82    root: Option<usize>,
83}
84
85impl GraphBuilder {
86    pub(crate) fn new() -> Self {
87        Self {
88            items: Vec::new(),
89            edges: Vec::new(),
90            root: None,
91        }
92    }
93
94    /// Add a vertex; returns its handle.
95    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> Vertex {
96        let idx = self.items.len();
97        self.items.push(Box::new(item));
98        Vertex(idx)
99    }
100
101    /// Add a directed edge `from -> to`.
102    pub fn edge(&mut self, from: Vertex, to: Vertex) -> &mut Self {
103        self.edges.push((from.0, to.0));
104        self
105    }
106
107    /// Designate the root vertex (whose triggers gate the graph).
108    pub const fn root(&mut self, v: Vertex) -> &mut Self {
109        self.root = Some(v.0);
110        self
111    }
112
113    /// Build, validating connectedness, acyclicity, and exactly-one root.
114    #[allow(clippy::too_many_lines)]
115    pub(crate) fn finish(mut self) -> Result<Graph, ExecutorError> {
116        let n = self.items.len();
117        if n == 0 {
118            return Err(ExecutorError::InvalidGraph("graph has no vertices".into()));
119        }
120        let root = self
121            .root
122            .ok_or_else(|| ExecutorError::InvalidGraph("no root vertex set".into()))?;
123        if root >= n {
124            return Err(ExecutorError::InvalidGraph(
125                "root index out of bounds".into(),
126            ));
127        }
128
129        let mut successors = vec![Vec::<usize>::new(); n];
130        let mut in_degree = vec![0_usize; n];
131        for &(from, to) in &self.edges {
132            if from >= n || to >= n {
133                return Err(ExecutorError::InvalidGraph(
134                    "edge index out of bounds".into(),
135                ));
136            }
137            if from == to {
138                return Err(ExecutorError::InvalidGraph(
139                    "self-loops are not allowed".into(),
140                ));
141            }
142            successors[from].push(to);
143            in_degree[to] += 1;
144        }
145
146        // Acyclicity via Kahn's algorithm — clone in_degree because we mutate.
147        let mut k_in = in_degree.clone();
148        let mut queue: Vec<usize> = k_in
149            .iter()
150            .enumerate()
151            .filter_map(|(i, d)| (*d == 0).then_some(i))
152            .collect();
153        let mut visited = 0_usize;
154        while let Some(u) = queue.pop() {
155            visited += 1;
156            for &v in &successors[u] {
157                k_in[v] -= 1;
158                if k_in[v] == 0 {
159                    queue.push(v);
160                }
161            }
162        }
163        if visited != n {
164            return Err(ExecutorError::InvalidGraph("graph contains a cycle".into()));
165        }
166
167        // Reachability from root (DFS).
168        let mut reach = vec![false; n];
169        let mut stack = vec![root];
170        while let Some(u) = stack.pop() {
171            if reach[u] {
172                continue;
173            }
174            reach[u] = true;
175            for &v in &successors[u] {
176                stack.push(v);
177            }
178        }
179        if reach.iter().any(|r| !*r) {
180            return Err(ExecutorError::InvalidGraph(
181                "every vertex must be reachable from the root".into(),
182            ));
183        }
184
185        // Root's triggers gate the graph.
186        let mut decl = TriggerDeclarer::new_internal();
187        self.items[root].declare_triggers(&mut decl)?;
188        let decls = decl.into_decls();
189
190        // Warn if non-root vertices declared triggers (ignored).
191        for (i, body) in self.items.iter_mut().enumerate() {
192            if i == root {
193                continue;
194            }
195            let mut spurious = TriggerDeclarer::new_internal();
196            let _ = body.declare_triggers(&mut spurious);
197            if !spurious.is_empty() {
198                #[cfg(feature = "tracing")]
199                tracing::warn!(target: "taktora-executor", vertex = i,
200                    "non-root graph vertex declared triggers; ignored");
201            }
202        }
203
204        let n_items = self.items.len();
205        let mut items = self.items;
206        // SAFETY: each `Box<dyn ExecutableItem>` is heap-allocated; its
207        // contents do not move when the outer Vec resizes. Stable.
208        #[allow(unsafe_code)]
209        let vertex_ptrs: Vec<VertexPtr> = items
210            .iter_mut()
211            .map(|b| VertexPtr(std::ptr::from_mut(b.as_mut())))
212            .collect();
213        let counters: Vec<AtomicUsize> = in_degree.iter().map(|d| AtomicUsize::new(*d)).collect();
214
215        Ok(Graph {
216            items,
217            successors,
218            in_degree,
219            root,
220            decls,
221            vertex_ptrs,
222            counters,
223            pending: AtomicUsize::new(n_items),
224            stop_flag: AtomicBool::new(false),
225            stop_chain_seen: AtomicBool::new(false),
226            first_err: Mutex::new(None),
227            done_cv: (Mutex::new(()), Condvar::new()),
228            ready_ring: crate::ready_ring::ReadyRing::new(n_items),
229            vertex_jobs: Vec::new(),
230        })
231    }
232}
233
234// ── Graph scheduler (Task 14) ─────────────────────────────────────────────────
235
236use crate::context::Stoppable;
237use crate::monitor::ExecutionMonitor;
238use crate::observer::Observer;
239use crate::pool::Pool;
240use crate::task_id::TaskId;
241use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
242use std::sync::{Arc, Condvar, Mutex};
243
244/// Outcome of running a graph once.
245#[allow(clippy::redundant_pub_crate)]
246pub(crate) struct GraphRunOutcome {
247    #[allow(clippy::redundant_pub_crate)]
248    pub(crate) error: Option<crate::error::ItemError>,
249    #[allow(clippy::redundant_pub_crate)]
250    pub(crate) stopped_chain: bool,
251}
252
253/// Wrapper around `*mut dyn ExecutableItem` that asserts Send+Sync.
254struct VertexPtr(*mut dyn ExecutableItem);
255
256// SAFETY: the executor guarantees a vertex runs on at most one thread at
257// a time (the in-degree counter sequences dispatches), and the pointer is
258// stable for the lifetime of `Graph::run_once` (the underlying Box is not
259// moved while we hold &mut self in run_once).
260#[allow(unsafe_code)]
261unsafe impl Send for VertexPtr {}
262#[allow(unsafe_code)]
263unsafe impl Sync for VertexPtr {}
264
265/// Send-able raw pointer back into a `Box<Graph>`. Used by the per-vertex
266/// dispatch closures to reach the graph's atomics and the ready ring
267/// without an `Arc`. Sound because the `Graph` is owned by
268/// `TaskKind::Graph(Box<Graph>)`, which keeps it at a stable heap
269/// address, and `pool.barrier()` (in `dispatch_loop`) serialises the
270/// closure's invocation with the executor thread's own access.
271#[allow(unsafe_code)]
272#[derive(Copy, Clone)]
273struct SendGraphPtr(*const Graph);
274
275impl SendGraphPtr {
276    /// Return the underlying pointer. Method form so Rust 2021 per-field
277    /// capture analysis grabs the whole `SendGraphPtr` (which is `Send +
278    /// Sync`) rather than `self.0` (a `*const`, which is not).
279    const fn get(&self) -> *const Graph {
280        self.0
281    }
282}
283
284#[allow(unsafe_code)]
285unsafe impl Send for SendGraphPtr {}
286#[allow(unsafe_code)]
287unsafe impl Sync for SendGraphPtr {}
288
289impl Graph {
290    fn finalise_skipped(&self, i: usize) {
291        if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
292            self.notify_done();
293            return;
294        }
295        for &j in &self.successors[i] {
296            self.cancel_subtree(j);
297        }
298    }
299
300    fn cancel_subtree(&self, root: usize) {
301        // Iterative DFS using a small stack on the heap. The stack is
302        // bounded by the number of vertices and used only on the stop
303        // path; the steady-state happy path (REQ_0060) never enters
304        // here, so this stack's allocation does not violate the
305        // requirement. A pre-allocated scratch stack would be needed if
306        // cancellation were ever a hot path; document as future work
307        // when that becomes relevant.
308        let mut stack = vec![root];
309        while let Some(u) = stack.pop() {
310            let prev = self.counters[u].swap(usize::MAX, Ordering::AcqRel);
311            if prev != usize::MAX {
312                if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
313                    self.notify_done();
314                    return;
315                }
316                for &v in &self.successors[u] {
317                    stack.push(v);
318                }
319            }
320        }
321    }
322
323    fn notify_done(&self) {
324        let _g = self.done_cv.0.lock().unwrap();
325        self.done_cv.1.notify_all();
326    }
327}
328
329impl Graph {
330    /// Build per-vertex dispatch closures and stash them on the graph.
331    /// Called once, when the graph is registered with an executor via
332    /// `ExecutorGraphBuilder::build`. The graph must already live inside
333    /// its `Box<Graph>` — closures capture `*const Graph` and rely on
334    /// that pointer remaining valid for the graph's lifetime.
335    ///
336    /// All captures are `Arc::clone`s (refcount-only at build time)
337    /// and `Copy` primitives; no per-iteration allocation occurs in
338    /// the resulting closures. Required for `REQ_0060`.
339    #[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
340    pub(crate) fn prepare_dispatch(
341        self: &mut Box<Self>,
342        task_id: TaskId,
343        stop: Stoppable,
344        observer: Arc<dyn Observer>,
345        monitor: Arc<dyn ExecutionMonitor>,
346        err_slot: Arc<Mutex<Option<crate::error::ExecutorError>>>,
347    ) {
348        let n = self.items.len();
349        // SAFETY: we deref through the Box, getting a `*const Graph`
350        // that points at the Box's heap allocation. The Box's contents
351        // do not move while we hold the Box, so this pointer is stable
352        // for the lifetime of `self`. The pointer is shared with every
353        // per-vertex closure; the closures access only `&self`-style
354        // immutable atomics / Mutex slots on `Graph` (no aliasing
355        // mutation through this pointer).
356        #[allow(unsafe_code)]
357        let graph_ptr = SendGraphPtr(std::ptr::from_ref::<Self>(self.as_ref()));
358
359        let mut jobs: Vec<Box<dyn FnMut() + Send + 'static>> = Vec::with_capacity(n);
360        for i in 0..n {
361            let task_id = task_id.clone();
362            let stop = stop.clone();
363            let observer = Arc::clone(&observer);
364            let monitor = Arc::clone(&monitor);
365            let err_slot = Arc::clone(&err_slot);
366            let job: Box<dyn FnMut() + Send + 'static> = Box::new(move || {
367                // SAFETY: see SendGraphPtr doc — pointer is stable, no
368                // aliasing mutation; pool.barrier() serialises the
369                // closure with the executor thread's own graph access.
370                #[allow(unsafe_code)]
371                let g: &Self = unsafe { &*graph_ptr.get() };
372
373                if g.stop_flag.load(Ordering::Acquire) {
374                    g.finalise_skipped(i);
375                    return;
376                }
377                let mut ctx = crate::context::Context::new(&task_id, &stop, observer.as_ref());
378                let ptr = g.vertex_ptrs[i].0;
379                // SAFETY: vertex_ptrs hold stable raw pointers into the
380                // graph's `items` Boxes (see VertexPtr). In-degree
381                // counters sequence pool dispatches so at most one
382                // thread executes vertex `i` at a time.
383                #[allow(unsafe_code)]
384                let app_id = unsafe { (*ptr).app_id() };
385                #[allow(unsafe_code)]
386                let app_inst = unsafe { (*ptr).app_instance_id() };
387                if let Some(aid) = app_id {
388                    observer.on_app_start(task_id.clone(), aid, app_inst);
389                }
390                let started = std::time::Instant::now();
391                monitor.pre_execute(task_id.clone(), started);
392                #[allow(unsafe_code)]
393                let res =
394                    crate::executor::run_item_catch_unwind_external(unsafe { &mut *ptr }, &mut ctx);
395                let took = started.elapsed();
396                monitor.post_execute(task_id.clone(), started, took, res.is_ok());
397                if let Err(ref e) = res {
398                    observer.on_app_error(task_id.clone(), e.as_ref());
399                }
400                if app_id.is_some() {
401                    observer.on_app_stop(task_id.clone());
402                }
403                match &res {
404                    Ok(crate::ControlFlow::Continue) => {}
405                    Ok(crate::ControlFlow::StopChain) => {
406                        g.stop_chain_seen.store(true, Ordering::Release);
407                        g.stop_flag.store(true, Ordering::Release);
408                    }
409                    Err(_) => g.stop_flag.store(true, Ordering::Release),
410                }
411                if let Err(e) = res {
412                    let mut fe = g.first_err.lock().unwrap();
413                    if fe.is_none() {
414                        *fe = Some(e);
415                    }
416                }
417                if g.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
418                    g.notify_done();
419                } else if g.stop_flag.load(Ordering::Acquire) {
420                    for &j in &g.successors[i] {
421                        g.cancel_subtree(j);
422                    }
423                } else {
424                    for &j in &g.successors[i] {
425                        if g.counters[j].fetch_sub(1, Ordering::AcqRel) == 1 {
426                            // Ring is sized to `next_power_of_two(n)` so
427                            // every vertex becoming ready exactly once
428                            // fits. If this fires the graph's
429                            // accounting is broken.
430                            g.ready_ring
431                                .push(j)
432                                .expect("ready_ring sized to n_vertices");
433                        }
434                    }
435                }
436                let _ = &err_slot; // currently unused on the vertex path
437                // (errors are bubbled via first_err / GraphRunOutcome)
438            });
439            jobs.push(job);
440        }
441        self.vertex_jobs = jobs;
442    }
443
444    /// Dispatch this graph once and block until completion. Allocation-free
445    /// in the steady state — runtime state was pre-allocated by
446    /// `Graph::finish` and per-vertex closures by `prepare_dispatch`.
447    /// Required by `REQ_0060`.
448    #[allow(unsafe_code)]
449    pub(crate) fn run_once_borrowed(&mut self, pool: &Pool) -> GraphRunOutcome {
450        let n = self.items.len();
451
452        // Reset per-iteration state in place.
453        for (c, d) in self.counters.iter().zip(self.in_degree.iter()) {
454            c.store(*d, Ordering::Relaxed);
455        }
456        self.pending.store(n, Ordering::Relaxed);
457        self.stop_flag.store(false, Ordering::Relaxed);
458        self.stop_chain_seen.store(false, Ordering::Relaxed);
459        *self.first_err.lock().unwrap() = None;
460        self.ready_ring.reset();
461
462        // Seed: dispatch every initially-ready vertex (those whose
463        // **initial** in-degree is zero). Race-free — `in_degree` is
464        // built once at finish() and never mutated, so we can't be
465        // tricked by a worker that has already started running root
466        // and decremented `counters[succ]` to zero before the seed
467        // loop reaches `succ`. Reading `counters[i]` here would race
468        // with the worker, redispatching the successor a second time
469        // (the worker's own push to `ready_ring` is the legitimate
470        // dispatch path).
471        for i in 0..n {
472            if self.in_degree[i] == 0 {
473                self.dispatch_vertex(pool, i);
474            }
475        }
476
477        // Drain ready_ring until pending hits 0.
478        loop {
479            while let Some(i) = self.ready_ring.pop() {
480                self.dispatch_vertex(pool, i);
481            }
482            if self.pending.load(Ordering::Acquire) == 0 {
483                break;
484            }
485            let guard = self.done_cv.0.lock().unwrap();
486            if self.pending.load(Ordering::Acquire) == 0 {
487                drop(guard);
488                break;
489            }
490            drop(
491                self.done_cv
492                    .1
493                    .wait_timeout(guard, std::time::Duration::from_millis(5))
494                    .unwrap()
495                    .0,
496            );
497        }
498        // Final drain.
499        while self.ready_ring.pop().is_some() {}
500
501        let mut first_err = self.first_err.lock().unwrap();
502        GraphRunOutcome {
503            error: first_err.take(),
504            stopped_chain: self.stop_chain_seen.load(Ordering::Acquire),
505        }
506    }
507
508    /// Submit vertex `i`'s pre-built closure to the pool. Allocation-free
509    /// (uses `Pool::submit_borrowed`).
510    #[allow(unsafe_code)]
511    fn dispatch_vertex(&mut self, pool: &Pool, i: usize) {
512        let job_ptr: *mut (dyn FnMut() + Send) =
513            std::ptr::from_mut::<dyn FnMut() + Send>(self.vertex_jobs[i].as_mut());
514        // SAFETY: closure lives on this Graph, which lives inside
515        // `Box<Graph>` inside `TaskEntry`. `pool.barrier()` (called by
516        // the WaitSet thread at the end of every callback) ensures the
517        // closure has finished executing before the next iteration's
518        // callback can touch the graph again. We hold `&mut self`
519        // throughout `run_once_borrowed`, so the WaitSet thread is the
520        // sole user of the graph state outside of the pool worker's
521        // closure invocation.
522        unsafe {
523            pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::{ControlFlow, item};
532
533    #[test]
534    fn empty_graph_rejected() {
535        let b = GraphBuilder::new();
536        let err = b.finish().expect_err("empty graph");
537        assert!(format!("{err}").contains("no vertices"));
538    }
539
540    #[test]
541    fn missing_root_rejected() {
542        let mut b = GraphBuilder::new();
543        b.vertex(item(|_| Ok(ControlFlow::Continue)));
544        let err = b.finish().expect_err("missing root");
545        assert!(format!("{err}").contains("no root"));
546    }
547
548    #[test]
549    fn cycle_rejected() {
550        let mut b = GraphBuilder::new();
551        let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
552        let v = b.vertex(item(|_| Ok(ControlFlow::Continue)));
553        b.edge(a, v).edge(v, a).root(a);
554        let err = b.finish().expect_err("cycle");
555        assert!(format!("{err}").contains("cycle"));
556    }
557
558    #[test]
559    fn unreachable_vertex_rejected() {
560        let mut b = GraphBuilder::new();
561        let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
562        let _orphan = b.vertex(item(|_| Ok(ControlFlow::Continue)));
563        b.root(a);
564        let err = b.finish().expect_err("unreachable");
565        assert!(format!("{err}").contains("reachable"));
566    }
567
568    #[test]
569    #[allow(clippy::many_single_char_names)]
570    fn diamond_graph_builds() {
571        let mut b = GraphBuilder::new();
572        let r = b.vertex(item(|_| Ok(ControlFlow::Continue)));
573        let l = b.vertex(item(|_| Ok(ControlFlow::Continue)));
574        let rt = b.vertex(item(|_| Ok(ControlFlow::Continue)));
575        let m = b.vertex(item(|_| Ok(ControlFlow::Continue)));
576        b.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
577        let g = b.finish().expect("diamond");
578        assert_eq!(g.successors[r.0], vec![l.0, rt.0]);
579        assert_eq!(g.in_degree[m.0], 2);
580    }
581}