Skip to main content

taktora_executor/
graph.rs

1//! Parallel-execution graph: a DAG of [`ExecutableItem`]s rooted at a single
2//! vertex whose triggers gate the whole graph.
3
4use crate::error::ExecutorError;
5use crate::item::ExecutableItem;
6use crate::trigger::{TriggerDecl, TriggerDeclarer};
7
8/// Opaque handle to a graph vertex. Returned by [`GraphBuilder::vertex`].
9#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
10pub struct Vertex(pub(crate) usize);
11
12/// Internal graph storage.
13///
14/// Stored inside `TaskKind::Graph(Box<Graph>)` to guarantee a stable heap
15/// address — the per-vertex dispatch closures capture a `*const Graph`
16/// pointing back into this struct, and would dangle if the `Graph` moved.
17/// All runtime state below is pre-allocated at `finish()` time and reset
18/// in place each `run_once_borrowed` call. Required for `REQ_0060`.
19#[allow(clippy::redundant_pub_crate)]
20pub(crate) struct Graph {
21    pub(crate) items: Vec<Box<dyn ExecutableItem>>,
22    pub(crate) successors: Vec<Vec<usize>>, // adjacency list
23    pub(crate) in_degree: Vec<usize>,       // initial in-degree
24    pub(crate) root: usize,
25    pub(crate) decls: Vec<TriggerDecl>,
26
27    // ── Pre-allocated dispatch state (REQ_0060) ────────────────────────
28    /// Stable raw pointers into each item's heap-allocated `Box`.
29    /// Populated once in `finish`. The `Box` contents do not move when
30    /// the outer `Vec` resizes, so these pointers stay valid for the
31    /// lifetime of the `Graph`.
32    vertex_ptrs: Vec<VertexPtr>,
33    /// Per-vertex in-degree counter; reset to `in_degree[i]` at the top
34    /// of every `run_once_borrowed`. `usize::MAX` is used as a "cancelled"
35    /// sentinel during stop-flag propagation.
36    counters: Vec<AtomicUsize>,
37    /// Number of vertices still pending in the current run.
38    pending: AtomicUsize,
39    /// Stop request observed during this run.
40    stop_flag: AtomicBool,
41    /// `ControlFlow::StopChain` observed during this run.
42    stop_chain_seen: AtomicBool,
43    /// First per-vertex error observed during this run.
44    first_err: Mutex<Option<crate::error::ItemError>>,
45    /// Completion condvar; signalled when `pending` reaches zero.
46    done_cv: (Mutex<()>, Condvar),
47    /// Re-dispatch ring — completed pool workers push ready successors;
48    /// the `WaitSet` thread drains and re-dispatches. Sized to
49    /// `next_power_of_two(n_vertices)` at `finish`. Required for `REQ_0060`.
50    ready_ring: crate::ready_ring::ReadyRing,
51    /// Per-vertex pre-built dispatch closures. Empty after `finish`,
52    /// populated by `prepare_dispatch` when the graph is registered with
53    /// an executor (it needs `task_id`/`stop`/`obs`/`mon`/`err_slot` from
54    /// the executor). Used by `run_once_borrowed` via
55    /// `Pool::submit_borrowed`, avoiding the per-vertex `Box` allocation
56    /// that `Pool::submit` requires.
57    vertex_jobs: Vec<Box<dyn FnMut() + Send + 'static>>,
58}
59
60impl core::fmt::Debug for Graph {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        f.debug_struct("Graph")
63            .field("n_items", &self.items.len())
64            .field("successors", &self.successors)
65            .field("in_degree", &self.in_degree)
66            .field("root", &self.root)
67            .finish_non_exhaustive()
68    }
69}
70
71impl Graph {
72    /// Return the root vertex's `task_id()` override, if any.
73    pub(crate) fn root_task_id(&self) -> Option<&str> {
74        self.items[self.root].task_id()
75    }
76}
77
78/// Builder for a graph.
79pub struct GraphBuilder {
80    items: Vec<Box<dyn ExecutableItem>>,
81    edges: Vec<(usize, usize)>,
82    root: Option<usize>,
83}
84
85impl GraphBuilder {
86    pub(crate) fn new() -> Self {
87        Self {
88            items: Vec::new(),
89            edges: Vec::new(),
90            root: None,
91        }
92    }
93
94    /// Add a vertex; returns its handle.
95    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> Vertex {
96        let idx = self.items.len();
97        self.items.push(Box::new(item));
98        Vertex(idx)
99    }
100
101    /// Add a directed edge `from -> to`.
102    pub fn edge(&mut self, from: Vertex, to: Vertex) -> &mut Self {
103        self.edges.push((from.0, to.0));
104        self
105    }
106
107    /// Designate the root vertex (whose triggers gate the graph).
108    pub const fn root(&mut self, v: Vertex) -> &mut Self {
109        self.root = Some(v.0);
110        self
111    }
112
113    /// Build, validating connectedness, acyclicity, and exactly-one root.
114    ///
115    /// The validation steps run in the same precedence as before — non-empty,
116    /// root presence/bounds, edge validity, acyclicity, reachability, then
117    /// trigger collection — each delegated to a private helper that surfaces
118    /// the first failing condition via `?`. Behaviour is identical to the
119    /// previous inline form: the earliest violating check still wins.
120    pub(crate) fn finish(mut self) -> Result<Graph, ExecutorError> {
121        let n = self.items.len();
122        let root = Self::validate_root(self.root, n)?;
123        let (successors, in_degree) = Self::build_adjacency(&self.edges, n)?;
124        Self::assert_acyclic(&successors, &in_degree, n)?;
125        Self::assert_reachable(&successors, root, n)?;
126        let decls = self.collect_root_decls(root)?;
127        Ok(Self::assemble(
128            self.items, successors, in_degree, root, decls,
129        ))
130    }
131
132    /// Validate that the graph is non-empty and that a root vertex was set and
133    /// is in bounds. Returns the validated root index.
134    ///
135    /// Preserves the original precedence: empty-graph rejection wins over the
136    /// missing-root check, which wins over the out-of-bounds check.
137    fn validate_root(root: Option<usize>, n: usize) -> Result<usize, ExecutorError> {
138        if n == 0 {
139            return Err(ExecutorError::InvalidGraph("graph has no vertices".into()));
140        }
141        let root = root.ok_or_else(|| ExecutorError::InvalidGraph("no root vertex set".into()))?;
142        if root >= n {
143            return Err(ExecutorError::InvalidGraph(
144                "root index out of bounds".into(),
145            ));
146        }
147        Ok(root)
148    }
149
150    /// Build the adjacency list and per-vertex initial in-degree from `edges`,
151    /// rejecting out-of-bounds endpoints and self-loops (in that precedence,
152    /// matching the original inline loop).
153    fn build_adjacency(
154        edges: &[(usize, usize)],
155        n: usize,
156    ) -> Result<(Vec<Vec<usize>>, Vec<usize>), ExecutorError> {
157        let mut successors = vec![Vec::<usize>::new(); n];
158        let mut in_degree = vec![0_usize; n];
159        for &(from, to) in edges {
160            if from >= n || to >= n {
161                return Err(ExecutorError::InvalidGraph(
162                    "edge index out of bounds".into(),
163                ));
164            }
165            if from == to {
166                return Err(ExecutorError::InvalidGraph(
167                    "self-loops are not allowed".into(),
168                ));
169            }
170            successors[from].push(to);
171            in_degree[to] += 1;
172        }
173        Ok((successors, in_degree))
174    }
175
176    /// Reject graphs that contain a cycle, via Kahn's algorithm. `in_degree`
177    /// is cloned internally because the algorithm mutates it.
178    fn assert_acyclic(
179        successors: &[Vec<usize>],
180        in_degree: &[usize],
181        n: usize,
182    ) -> Result<(), ExecutorError> {
183        let mut k_in = in_degree.to_vec();
184        let mut queue: Vec<usize> = k_in
185            .iter()
186            .enumerate()
187            .filter_map(|(i, d)| (*d == 0).then_some(i))
188            .collect();
189        let mut visited = 0_usize;
190        while let Some(u) = queue.pop() {
191            visited += 1;
192            for &v in &successors[u] {
193                k_in[v] -= 1;
194                if k_in[v] == 0 {
195                    queue.push(v);
196                }
197            }
198        }
199        if visited != n {
200            return Err(ExecutorError::InvalidGraph("graph contains a cycle".into()));
201        }
202        Ok(())
203    }
204
205    /// Reject graphs in which some vertex is unreachable from `root`, via DFS.
206    fn assert_reachable(
207        successors: &[Vec<usize>],
208        root: usize,
209        n: usize,
210    ) -> Result<(), ExecutorError> {
211        let mut reach = vec![false; n];
212        let mut stack = vec![root];
213        while let Some(u) = stack.pop() {
214            if reach[u] {
215                continue;
216            }
217            reach[u] = true;
218            for &v in &successors[u] {
219                stack.push(v);
220            }
221        }
222        if reach.iter().any(|r| !*r) {
223            return Err(ExecutorError::InvalidGraph(
224                "every vertex must be reachable from the root".into(),
225            ));
226        }
227        Ok(())
228    }
229
230    /// Collect the root vertex's trigger declarations (which gate the whole
231    /// graph) and warn about any triggers declared by non-root vertices, which
232    /// are ignored. Propagates a declaration error from the root vertex.
233    fn collect_root_decls(&mut self, root: usize) -> Result<Vec<TriggerDecl>, ExecutorError> {
234        let mut decl = TriggerDeclarer::new_internal();
235        self.items[root].declare_triggers(&mut decl)?;
236        let decls = decl.into_decls();
237
238        for (i, body) in self.items.iter_mut().enumerate() {
239            if i == root {
240                continue;
241            }
242            let mut spurious = TriggerDeclarer::new_internal();
243            let _ = body.declare_triggers(&mut spurious);
244            if !spurious.is_empty() {
245                #[cfg(feature = "tracing")]
246                tracing::warn!(target: "taktora-executor", vertex = i,
247                    "non-root graph vertex declared triggers; ignored");
248            }
249        }
250        Ok(decls)
251    }
252
253    /// Assemble the validated pieces into a `Graph`, pre-allocating the runtime
254    /// dispatch state (`REQ_0060`).
255    fn assemble(
256        mut items: Vec<Box<dyn ExecutableItem>>,
257        successors: Vec<Vec<usize>>,
258        in_degree: Vec<usize>,
259        root: usize,
260        decls: Vec<TriggerDecl>,
261    ) -> Graph {
262        let n_items = items.len();
263        // SAFETY: each `Box<dyn ExecutableItem>` is heap-allocated; its
264        // contents do not move when the outer Vec resizes. Stable.
265        #[allow(unsafe_code)]
266        let vertex_ptrs: Vec<VertexPtr> = items
267            .iter_mut()
268            .map(|b| VertexPtr(std::ptr::from_mut(b.as_mut())))
269            .collect();
270        let counters: Vec<AtomicUsize> = in_degree.iter().map(|d| AtomicUsize::new(*d)).collect();
271
272        Graph {
273            items,
274            successors,
275            in_degree,
276            root,
277            decls,
278            vertex_ptrs,
279            counters,
280            pending: AtomicUsize::new(n_items),
281            stop_flag: AtomicBool::new(false),
282            stop_chain_seen: AtomicBool::new(false),
283            first_err: Mutex::new(None),
284            done_cv: (Mutex::new(()), Condvar::new()),
285            ready_ring: crate::ready_ring::ReadyRing::new(n_items),
286            vertex_jobs: Vec::new(),
287        }
288    }
289}
290
291// ── Graph scheduler (Task 14) ─────────────────────────────────────────────────
292
293use crate::context::Stoppable;
294use crate::monitor::ExecutionMonitor;
295use crate::observer::Observer;
296use crate::pool::Pool;
297use crate::task_id::TaskId;
298use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
299use std::sync::{Arc, Condvar, Mutex};
300
301/// Outcome of running a graph once.
302#[allow(clippy::redundant_pub_crate)]
303pub(crate) struct GraphRunOutcome {
304    #[allow(clippy::redundant_pub_crate)]
305    pub(crate) error: Option<crate::error::ItemError>,
306    #[allow(clippy::redundant_pub_crate)]
307    pub(crate) stopped_chain: bool,
308}
309
310/// Wrapper around `*mut dyn ExecutableItem` that asserts Send+Sync.
311struct VertexPtr(*mut dyn ExecutableItem);
312
313// SAFETY: the executor guarantees a vertex runs on at most one thread at
314// a time (the in-degree counter sequences dispatches), and the pointer is
315// stable for the lifetime of `Graph::run_once` (the underlying Box is not
316// moved while we hold &mut self in run_once).
317#[allow(unsafe_code)]
318unsafe impl Send for VertexPtr {}
319#[allow(unsafe_code)]
320unsafe impl Sync for VertexPtr {}
321
322/// Send-able raw pointer back into a `Box<Graph>`. Used by the per-vertex
323/// dispatch closures to reach the graph's atomics and the ready ring
324/// without an `Arc`. Sound because the `Graph` is owned by
325/// `TaskKind::Graph(Box<Graph>)`, which keeps it at a stable heap
326/// address, and `pool.barrier()` (in `dispatch_loop`) serialises the
327/// closure's invocation with the executor thread's own access.
328#[allow(unsafe_code)]
329#[derive(Copy, Clone)]
330struct SendGraphPtr(*const Graph);
331
332impl SendGraphPtr {
333    /// Return the underlying pointer. Method form so Rust 2021 per-field
334    /// capture analysis grabs the whole `SendGraphPtr` (which is `Send +
335    /// Sync`) rather than `self.0` (a `*const`, which is not).
336    const fn get(&self) -> *const Graph {
337        self.0
338    }
339}
340
341#[allow(unsafe_code)]
342unsafe impl Send for SendGraphPtr {}
343#[allow(unsafe_code)]
344unsafe impl Sync for SendGraphPtr {}
345
346impl Graph {
347    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
348    fn finalise_skipped(&self, i: usize) {
349        if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
350            self.notify_done();
351            return;
352        }
353        for &j in &self.successors[i] {
354            self.cancel_subtree(j);
355        }
356    }
357
358    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
359    fn cancel_subtree(&self, root: usize) {
360        // Iterative DFS using a small stack on the heap. The stack is
361        // bounded by the number of vertices and used only on the stop
362        // path; the steady-state happy path (REQ_0060) never enters
363        // here, so this stack's allocation does not violate the
364        // requirement. A pre-allocated scratch stack would be needed if
365        // cancellation were ever a hot path; document as future work
366        // when that becomes relevant.
367        let mut stack = vec![root];
368        while let Some(u) = stack.pop() {
369            let prev = self.counters[u].swap(usize::MAX, Ordering::AcqRel);
370            if prev != usize::MAX {
371                if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
372                    self.notify_done();
373                    return;
374                }
375                for &v in &self.successors[u] {
376                    stack.push(v);
377                }
378            }
379        }
380    }
381
382    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
383    fn notify_done(&self) {
384        // fail-fast: poison is unreachable — a holder panic aborts the process
385        // before any other thread observes the lock (ADR_0065)
386        #[allow(clippy::unwrap_used)]
387        let _g = self.done_cv.0.lock().unwrap();
388        self.done_cv.1.notify_all();
389    }
390}
391
392impl Graph {
393    /// Build per-vertex dispatch closures and stash them on the graph.
394    /// Called once, when the graph is registered with an executor via
395    /// `ExecutorGraphBuilder::build`. The graph must already live inside
396    /// its `Box<Graph>` — closures capture `*const Graph` and rely on
397    /// that pointer remaining valid for the graph's lifetime.
398    ///
399    /// All captures are `Arc::clone`s (refcount-only at build time)
400    /// and `Copy` primitives; no per-iteration allocation occurs in
401    /// the resulting closures. Required for `REQ_0060`.
402    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
403    #[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
404    pub(crate) fn prepare_dispatch(
405        self: &mut Box<Self>,
406        task_id: TaskId,
407        stop: Stoppable,
408        observer: Arc<dyn Observer>,
409        monitor: Arc<dyn ExecutionMonitor>,
410        err_slot: Arc<Mutex<Option<crate::error::ExecutorError>>>,
411    ) {
412        let n = self.items.len();
413        // SAFETY: we deref through the Box, getting a `*const Graph`
414        // that points at the Box's heap allocation. The Box's contents
415        // do not move while we hold the Box, so this pointer is stable
416        // for the lifetime of `self`. The pointer is shared with every
417        // per-vertex closure; the closures access only `&self`-style
418        // immutable atomics / Mutex slots on `Graph` (no aliasing
419        // mutation through this pointer).
420        #[allow(unsafe_code)]
421        let graph_ptr = SendGraphPtr(std::ptr::from_ref::<Self>(self.as_ref()));
422
423        let mut jobs: Vec<Box<dyn FnMut() + Send + 'static>> = Vec::with_capacity(n);
424        for i in 0..n {
425            let task_id = task_id.clone();
426            let stop = stop.clone();
427            let observer = Arc::clone(&observer);
428            let monitor = Arc::clone(&monitor);
429            let err_slot = Arc::clone(&err_slot);
430            let job: Box<dyn FnMut() + Send + 'static> = Box::new(move || {
431                // SAFETY: see SendGraphPtr doc — pointer is stable, no
432                // aliasing mutation; pool.barrier() serialises the
433                // closure with the executor thread's own graph access.
434                #[allow(unsafe_code)]
435                let g: &Self = unsafe { &*graph_ptr.get() };
436
437                if g.stop_flag.load(Ordering::Acquire) {
438                    g.finalise_skipped(i);
439                    return;
440                }
441                let mut ctx = crate::context::Context::new(&task_id, &stop, observer.as_ref());
442                let ptr = g.vertex_ptrs[i].0;
443                // SAFETY: vertex_ptrs hold stable raw pointers into the
444                // graph's `items` Boxes (see VertexPtr). In-degree
445                // counters sequence pool dispatches so at most one
446                // thread executes vertex `i` at a time.
447                #[allow(unsafe_code)]
448                let app_id = unsafe { (*ptr).app_id() };
449                #[allow(unsafe_code)]
450                let app_inst = unsafe { (*ptr).app_instance_id() };
451                if let Some(aid) = app_id {
452                    observer.on_app_start(task_id.clone(), aid, app_inst);
453                }
454                let started = std::time::Instant::now();
455                monitor.pre_execute(task_id.clone(), started);
456                #[allow(unsafe_code)]
457                let res =
458                    crate::executor::run_item_catch_unwind_external(unsafe { &mut *ptr }, &mut ctx);
459                let took = started.elapsed();
460                monitor.post_execute(task_id.clone(), started, took, res.is_ok());
461                if let Err(ref e) = res {
462                    observer.on_app_error(task_id.clone(), e.as_ref());
463                }
464                if app_id.is_some() {
465                    observer.on_app_stop(task_id.clone());
466                }
467                match &res {
468                    Ok(crate::ControlFlow::Continue) => {}
469                    Ok(crate::ControlFlow::StopChain) => {
470                        g.stop_chain_seen.store(true, Ordering::Release);
471                        g.stop_flag.store(true, Ordering::Release);
472                    }
473                    Err(_) => g.stop_flag.store(true, Ordering::Release),
474                }
475                if let Err(e) = res {
476                    // fail-fast: poison is unreachable — a holder panic aborts
477                    // the process before any other thread observes the lock
478                    // (ADR_0065)
479                    #[allow(clippy::unwrap_used)]
480                    let mut fe = g.first_err.lock().unwrap();
481                    if fe.is_none() {
482                        *fe = Some(e);
483                    }
484                }
485                if g.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
486                    g.notify_done();
487                } else if g.stop_flag.load(Ordering::Acquire) {
488                    for &j in &g.successors[i] {
489                        g.cancel_subtree(j);
490                    }
491                } else {
492                    for &j in &g.successors[i] {
493                        if g.counters[j].fetch_sub(1, Ordering::AcqRel) == 1 {
494                            // fail-fast: ring is sized to next_power_of_two(n_vertices);
495                            // each vertex becomes ready at most once per run, so overflow
496                            // means broken in-degree accounting
497                            #[allow(clippy::expect_used)]
498                            g.ready_ring
499                                .push(j)
500                                .expect("ready_ring sized to n_vertices");
501                        }
502                    }
503                }
504                let _ = &err_slot; // currently unused on the vertex path
505                // (errors are bubbled via first_err / GraphRunOutcome)
506            });
507            jobs.push(job);
508        }
509        self.vertex_jobs = jobs;
510    }
511
512    /// Dispatch this graph once and block until completion. Allocation-free
513    /// in the steady state — runtime state was pre-allocated by
514    /// `Graph::finish` and per-vertex closures by `prepare_dispatch`.
515    /// Required by `REQ_0060`.
516    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
517    #[allow(unsafe_code)]
518    pub(crate) fn run_once_borrowed(&mut self, pool: &Pool) -> GraphRunOutcome {
519        let n = self.items.len();
520
521        // Reset per-iteration state in place.
522        for (c, d) in self.counters.iter().zip(self.in_degree.iter()) {
523            c.store(*d, Ordering::Relaxed);
524        }
525        self.pending.store(n, Ordering::Relaxed);
526        self.stop_flag.store(false, Ordering::Relaxed);
527        self.stop_chain_seen.store(false, Ordering::Relaxed);
528        // fail-fast: poison is unreachable — a holder panic aborts the process
529        // before any other thread observes the lock (ADR_0065)
530        #[allow(clippy::unwrap_used)]
531        {
532            *self.first_err.lock().unwrap() = None;
533        }
534        self.ready_ring.reset();
535
536        // Seed: dispatch every initially-ready vertex (those whose
537        // **initial** in-degree is zero). Race-free — `in_degree` is
538        // built once at finish() and never mutated, so we can't be
539        // tricked by a worker that has already started running root
540        // and decremented `counters[succ]` to zero before the seed
541        // loop reaches `succ`. Reading `counters[i]` here would race
542        // with the worker, redispatching the successor a second time
543        // (the worker's own push to `ready_ring` is the legitimate
544        // dispatch path).
545        for i in 0..n {
546            if self.in_degree[i] == 0 {
547                self.dispatch_vertex(pool, i);
548            }
549        }
550
551        // Drain ready_ring until pending hits 0.
552        loop {
553            while let Some(i) = self.ready_ring.pop() {
554                self.dispatch_vertex(pool, i);
555            }
556            if self.pending.load(Ordering::Acquire) == 0 {
557                break;
558            }
559            // fail-fast: poison is unreachable — a holder panic aborts the
560            // process before any other thread observes the lock (ADR_0065)
561            #[allow(clippy::unwrap_used)]
562            let guard = self.done_cv.0.lock().unwrap();
563            if self.pending.load(Ordering::Acquire) == 0 {
564                drop(guard);
565                break;
566            }
567            // fail-fast: condvar poison is unreachable under the abort
568            // boundary (ADR_0065)
569            #[allow(clippy::unwrap_used)]
570            drop(
571                self.done_cv
572                    .1
573                    .wait_timeout(guard, std::time::Duration::from_millis(5))
574                    .unwrap()
575                    .0,
576            );
577        }
578        // Final drain.
579        while self.ready_ring.pop().is_some() {}
580
581        // fail-fast: poison is unreachable — a holder panic aborts the process
582        // before any other thread observes the lock (ADR_0065)
583        #[allow(clippy::unwrap_used)]
584        let mut first_err = self.first_err.lock().unwrap();
585        GraphRunOutcome {
586            error: first_err.take(),
587            stopped_chain: self.stop_chain_seen.load(Ordering::Acquire),
588        }
589    }
590
591    /// Submit vertex `i`'s pre-built closure to the pool. Allocation-free
592    /// (uses `Pool::submit_borrowed`).
593    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
594    #[allow(unsafe_code)]
595    fn dispatch_vertex(&mut self, pool: &Pool, i: usize) {
596        let job_ptr: *mut (dyn FnMut() + Send) =
597            std::ptr::from_mut::<dyn FnMut() + Send>(self.vertex_jobs[i].as_mut());
598        // SAFETY: closure lives on this Graph, which lives inside
599        // `Box<Graph>` inside `TaskEntry`. `pool.barrier()` (called by
600        // the WaitSet thread at the end of every callback) ensures the
601        // closure has finished executing before the next iteration's
602        // callback can touch the graph again. We hold `&mut self`
603        // throughout `run_once_borrowed`, so the WaitSet thread is the
604        // sole user of the graph state outside of the pool worker's
605        // closure invocation.
606        unsafe {
607            pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
608        }
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::{ControlFlow, item};
616
617    #[test]
618    fn empty_graph_rejected() {
619        let b = GraphBuilder::new();
620        let err = b.finish().expect_err("empty graph");
621        assert!(format!("{err}").contains("no vertices"));
622    }
623
624    #[test]
625    fn missing_root_rejected() {
626        let mut b = GraphBuilder::new();
627        b.vertex(item(|_| Ok(ControlFlow::Continue)));
628        let err = b.finish().expect_err("missing root");
629        assert!(format!("{err}").contains("no root"));
630    }
631
632    #[test]
633    fn cycle_rejected() {
634        let mut b = GraphBuilder::new();
635        let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
636        let v = b.vertex(item(|_| Ok(ControlFlow::Continue)));
637        b.edge(a, v).edge(v, a).root(a);
638        let err = b.finish().expect_err("cycle");
639        assert!(format!("{err}").contains("cycle"));
640    }
641
642    #[test]
643    fn unreachable_vertex_rejected() {
644        let mut b = GraphBuilder::new();
645        let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
646        let _orphan = b.vertex(item(|_| Ok(ControlFlow::Continue)));
647        b.root(a);
648        let err = b.finish().expect_err("unreachable");
649        assert!(format!("{err}").contains("reachable"));
650    }
651
652    #[test]
653    #[allow(clippy::many_single_char_names)]
654    fn diamond_graph_builds() {
655        let mut b = GraphBuilder::new();
656        let r = b.vertex(item(|_| Ok(ControlFlow::Continue)));
657        let l = b.vertex(item(|_| Ok(ControlFlow::Continue)));
658        let rt = b.vertex(item(|_| Ok(ControlFlow::Continue)));
659        let m = b.vertex(item(|_| Ok(ControlFlow::Continue)));
660        b.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
661        let g = b.finish().expect("diamond");
662        assert_eq!(g.successors[r.0], vec![l.0, rt.0]);
663        assert_eq!(g.in_degree[m.0], 2);
664    }
665}