Skip to main content

taktora_executor/
executor.rs

1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::clock::{MonotonicClock, SystemClock};
10use crate::context::Stoppable;
11use crate::error::ExecutorError;
12use crate::fatal::{FatalDispatch, FatalHandler, FatalSite, guard_or_fatal, panic_payload_message};
13use crate::fault::{
14    ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
15    FaultState, duration_to_ms_sat, instant_to_since_ms,
16};
17use crate::item::ExecutableItem;
18use crate::monitor::{ExecutionMonitor, NoopMonitor};
19use crate::observer::{NoopObserver, Observer};
20use crate::payload::Payload;
21use crate::pool::Pool;
22use crate::stats::{CycleObservation, StatsSnapshot, TaskStatsEntry};
23use crate::task_id::TaskId;
24use crate::task_kind::TaskKind;
25use crate::thread_attrs::ThreadAttributes;
26use crate::trigger::{TriggerDecl, TriggerDeclarer};
27use core::sync::atomic::AtomicU32;
28use iceoryx2::node::Node;
29use iceoryx2::port::listener::Listener as IxListener;
30use iceoryx2::prelude::ipc;
31use iceoryx2::prelude::*;
32use iceoryx2::waitset::WaitSetRunResult;
33use std::sync::Arc;
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
36use std::time::{Duration, Instant};
37use taktora_stats::ExecutorCycleStats;
38
39/// Monotonically increasing counter so multiple executors in the same process
40/// each get a unique stop-event service name.
41static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
42
43/// Executor histogram segment count (`S`) and exact-window length (`W`) for
44/// per-task cycle stats. Fixed at compile time per `ADR_0060`.
45pub(crate) type TaskCycleStats = ExecutorCycleStats<8, 256>;
46
47/// A single wakeup's pending cycle record, stashed on the [`TaskEntry`] between
48/// the pre-dispatch capture and the post-barrier fold. Bundling the pre-dispatch
49/// timestamp with its `faulted` flag in one `Option` makes them impossible to
50/// desync: a cycle is pending iff this is `Some`, and the `faulted` bit is then
51/// always the one captured at the same wakeup (`REQ_0107`).
52#[derive(Clone, Copy)]
53pub(crate) struct CyclePending {
54    /// Pre-dispatch timestamp for this wakeup (the cycle's `pre`), in
55    /// telemetry-clock nanoseconds (see [`MonotonicClock`]).
56    pub(crate) pre: u64,
57    /// `true` when this wakeup's scan was fault-routed/skipped, so the
58    /// post-barrier fold records it with `faulted=true`.
59    pub(crate) faulted: bool,
60}
61
62/// One registered task entry.
63pub(crate) struct TaskEntry {
64    /// Task identifier.
65    pub(crate) id: TaskId,
66    /// The kind of work this entry holds (single item or chain).
67    pub(crate) kind: TaskKind,
68    /// Trigger declarations recorded at `add` time.
69    pub(crate) decls: Vec<TriggerDecl>,
70    /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
71    /// time and re-invoked on every dispatch iteration via
72    /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
73    /// that `Pool::submit<F>` requires in threaded mode. Required for
74    /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
75    /// `TaskKind::Graph`, which dispatches its vertices via a separate
76    /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
77    pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
78
79    /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
80    /// means no per-task check; the executor-wide iteration budget
81    /// still applies. `REQ_0070`.
82    pub(crate) budget: Option<Duration>,
83
84    /// Per-task fault state. Wait-free read on the dispatch hot path.
85    /// Wrapped in `Arc` so dispatch closures built at `add` time can
86    /// capture an owning handle into the same atomic the `TaskEntry`
87    /// holds — `Arc::clone` is refcount-only, so this stays compatible
88    /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
89    pub(crate) fault: Arc<FaultAtomic>,
90
91    /// Monotonic per-task overrun counter. Increments on EVERY budget
92    /// breach, including breaches while already `Faulted`. Never reset
93    /// by clearing the fault. Shared with the dispatch closure via
94    /// `Arc::clone`. `REQ_0102`.
95    pub(crate) overrun_count: Arc<AtomicU64>,
96
97    /// Pre-built dispatch closure for the fault-handler item. Mirrors
98    /// `job`. `None` means no handler — the task is simply skipped
99    /// during fault. `REQ_0072`.
100    pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
101
102    /// Declared scan period for cyclic tasks (the `TriggerDecl::Interval`
103    /// duration), or `None` for event-driven tasks. Cached at add time so the
104    /// dispatch loop reads it without scanning `decls` per cycle. Gates cycle
105    /// telemetry: only cyclic tasks participate (`REQ_0106`).
106    pub(crate) scan_period: Option<Duration>,
107    /// Last-cycle execute duration in ns, written by the dispatch closure on
108    /// the pool worker and read by the `WaitSet` thread after `barrier()`.
109    /// Shared via `Arc` exactly like `overrun_count`. Sentinel `u64::MAX` =
110    /// "no sample this cycle" (the closure never ran — e.g. a faulted scan).
111    pub(crate) last_took_ns: Arc<AtomicU64>,
112
113    /// WaitSet-thread-only timestamp of this task's previous dispatch, for
114    /// computing `actual_period` (`REQ_0101`). Not shared (no atomic) — only the
115    /// single dispatch thread touches it. `None` before the first dispatch.
116    /// Telemetry-clock nanoseconds (see [`MonotonicClock`]).
117    pub(crate) last_dispatch: Option<u64>,
118
119    /// WaitSet-thread-only lateness grid slot for this task (`REQ_0106`).
120    /// Advances by exactly **one per scan attempt plus the dispatcher's
121    /// skipped-slot signal** (`REQ_0840`) — never reconstructed from the
122    /// noisy measured period, which over-counts on coalesced catch-up wakes
123    /// (issue #46 / `ADR_0101`). Starts at `0` (the first recorded cycle is
124    /// the task's own grid origin).
125    pub(crate) grid_slot: u64,
126
127    /// WaitSet-thread-only per-task lateness grid epoch (`REQ_0106`): the
128    /// task's first recorded `pre` — including a faulted first scan, whose
129    /// dispatch instant is real — back-dated by the dispatcher's `late_by`
130    /// signal when one is present (`Grid` mode), so the grid anchors at the
131    /// first dispatch's NOMINAL slot and a late process start is reported as
132    /// real first-cycle lateness instead of becoming a permanent negative
133    /// floor on every later on-grid cycle. Per-task (not executor-shared) so
134    /// a task's start phase never reads as permanent lateness.
135    pub(crate) grid_epoch: Option<u64>,
136
137    /// WaitSet-thread-only dispatcher skip signal for the *current* wakeup
138    /// (`REQ_0840`): written by the Grid dispatch pass before `dispatch_task`,
139    /// consumed (`take`n) by `record_cycle_for`. Never written in `Legacy`
140    /// mode or for event-driven tasks — stays `0`.
141    pub(crate) pending_skipped: u32,
142
143    /// WaitSet-thread-only dispatcher lateness signal for the *current*
144    /// wakeup (`REQ_0106`): how far past its nominal grid slot this dispatch
145    /// is, in scheduling-clock nanoseconds. Written by the Grid dispatch
146    /// pass, consumed (`take`n) by `record_cycle_for`, which uses the FIRST
147    /// one to anchor [`Self::grid_epoch`] on the nominal grid. `None` in
148    /// `Legacy` mode and for event-driven tasks (no dispatcher grid exists:
149    /// the epoch falls back to the first observed `pre`).
150    pub(crate) pending_late: Option<u64>,
151
152    /// WaitSet-thread-only stash of the *current* wakeup's pending cycle —
153    /// the pre-dispatch timestamp plus its `faulted` flag — carried across
154    /// `pool.barrier()` so the post-barrier record pass can fold this cycle's
155    /// telemetry without re-reading the clock or allocating a fired-index list.
156    /// `Some` between the pre-dispatch capture and the post-barrier
157    /// `record_cycle_for` `take`; `None` otherwise. Bundling the timestamp and
158    /// the fault flag in one `Option` keeps them from ever desyncing
159    /// (`REQ_0107`). Only the single dispatch thread touches it (no atomic).
160    pub(crate) pending_cycle: Option<CyclePending>,
161}
162
163/// Top-level executor. One per process is the typical case.
164pub struct Executor {
165    pub(crate) node: Node<ipc::Service>,
166    pub(crate) pool: Arc<Pool>,
167    pub(crate) tasks: Vec<TaskEntry>,
168    /// One cycle-stats aggregator per registered task, index-aligned with
169    /// `tasks`. Pushed at task-add time (before `run`), so no steady-state
170    /// allocation (`REQ_0060`, `REQ_0104`). Updated single-writer on the
171    /// `WaitSet` thread (Task 6).
172    pub(crate) cycle_stats: Vec<TaskCycleStats>,
173    /// Histogram sliding-window size in samples (`REQ_0100`).
174    pub(crate) stats_window: u32,
175    pub(crate) running: Arc<AtomicBool>,
176    pub(crate) stoppable: Stoppable,
177    pub(crate) next_id: AtomicU64,
178    /// Listener for the internal stop event service. Held here so it outlives
179    /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
180    /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
181    pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
182    /// Lifecycle observer. Defaults to a no-op.
183    pub(crate) observer: Arc<dyn Observer>,
184    /// Execution monitor. Defaults to a no-op.
185    pub(crate) monitor: Arc<dyn ExecutionMonitor>,
186    /// Per-iteration error capture slot — allocated once at build time and
187    /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
188    /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
189    /// the per-iteration heap allocation that the previous design incurred.
190    /// Required for `REQ_0060`.
191    pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
192    /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
193    /// `None` means no executor-wide check.
194    pub(crate) iteration_budget: Option<Duration>,
195    /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
196    /// closure can hold an owning handle without re-borrowing through
197    /// `self`. `REQ_0071`.
198    pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
199
200    /// Index of the task whose `execute()` overran when the executor
201    /// transitioned to `Faulted`. Read alongside `exec_fault`.
202    pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
203
204    /// Budget that was breached when the executor transitioned to
205    /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
206    pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
207
208    /// Executor start time, set on first dispatch. Used to compute
209    /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
210    /// in `Arc` so dispatch closures share the same `OnceLock` with the
211    /// executor — `get_or_init` is idempotent and wait-free.
212    pub(crate) start_time: Arc<OnceLock<Instant>>,
213
214    /// Fatal-dispatch handle. Called once on the fail-fast path from the
215    /// executor-thread run-loop boundary; the pool holds a separate
216    /// `Arc::clone` for its own worker / inline-submit boundaries.
217    pub(crate) fatal_dispatch: Arc<FatalDispatch>,
218
219    /// Telemetry time source (`REQ_0101`/`REQ_0105`/`REQ_0106`). Read on the
220    /// worker (for `took`) and the `WaitSet` thread (for `pre`); defaults to
221    /// [`SystemClock`]. A test can substitute a [`MockClock`] via
222    /// [`ExecutorBuilder::clock`] for deterministic timing assertions. Affects
223    /// only telemetry — never scheduling or fault behaviour.
224    pub(crate) clock: Arc<dyn MonotonicClock>,
225
226    /// Cyclic dispatch timing strategy (`REQ_0268` / `ADR_0100`). Read once at
227    /// `dispatch_loop` entry and hoisted to a local, so steady-state cost is a
228    /// single `Copy`-enum compare per cycle. Defaults to
229    /// [`DispatchMode::Grid`](crate::DispatchMode).
230    pub(crate) dispatch_mode: crate::DispatchMode,
231
232    /// Scheduling time source for the absolute grid (`REQ_0268`). Distinct from
233    /// [`Executor::clock`] (telemetry): a telemetry mock can never alter
234    /// dispatch timing. Defaults to
235    /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock).
236    pub(crate) cyclic_clock: std::sync::Arc<dyn crate::CyclicClock>,
237}
238
239// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
240// `SingleThreaded` reason as `IxNotifier`. After construction, the only
241// per-iteration call is `listener.try_wait_one()`, which does not mutate the
242// Rc. `Executor` is never shared across threads (it requires `&mut self` for
243// `run()`), so there is no aliased concurrent mutation.
244#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
245unsafe impl Send for Executor {}
246
247impl Executor {
248    /// Start a new builder.
249    #[must_use]
250    pub fn builder() -> ExecutorBuilder {
251        ExecutorBuilder::default()
252    }
253
254    /// Open or create a pub/sub channel bound to this executor's node.
255    pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
256        Channel::open_or_create(&self.node, name)
257    }
258
259    /// Open or create a request/response service bound to this executor's node.
260    pub fn service<Req, Resp>(
261        &mut self,
262        name: &str,
263    ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
264    where
265        Req: Payload,
266        Resp: Payload,
267    {
268        crate::Service::open_or_create(&self.node, name)
269    }
270
271    /// Borrowed snapshot of every task's cycle aggregates (`REQ_0103` pull
272    /// path). Relaxed reads; never blocks the dispatch writer.
273    #[must_use]
274    pub fn stats_snapshot(&self) -> StatsSnapshot {
275        let per_task = self
276            .tasks
277            .iter()
278            .zip(self.cycle_stats.iter())
279            .map(|(t, s)| {
280                let snap = s.snapshot();
281                TaskStatsEntry {
282                    task_id: t.id.clone(),
283                    p50_ns: snap.p50_ns,
284                    p95_ns: snap.p95_ns,
285                    p99_ns: snap.p99_ns,
286                    min_ns: snap.min_ns,
287                    max_ns: snap.max_ns,
288                    max_jitter_ns: snap.max_jitter_ns,
289                    max_lateness_ns: snap.max_lateness_ns,
290                    overrun_count: t.overrun_count.load(Ordering::Acquire),
291                }
292            })
293            .collect();
294        StatsSnapshot { per_task }
295    }
296
297    /// Add an item to the executor with an auto-generated id.
298    pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
299        let id = TaskId::new(format!(
300            "task-{}",
301            self.next_id.fetch_add(1, Ordering::SeqCst)
302        ));
303        self.add_with_id(id, item)
304    }
305
306    /// Add an item with a user-supplied id.
307    ///
308    /// The item's [`ExecutableItem::task_id`] override takes precedence over
309    /// the caller-supplied `id`, which itself takes precedence over the
310    /// auto-generated id assigned by [`Executor::add`].
311    pub fn add_with_id(
312        &mut self,
313        id: impl Into<TaskId>,
314        mut item: impl ExecutableItem,
315    ) -> Result<TaskId, ExecutorError> {
316        let id_arg: TaskId = id.into();
317        // The item's `task_id()` override wins over the user-supplied id.
318        let id = item.task_id().map_or(id_arg, TaskId::new);
319        let mut declarer = TriggerDeclarer::new_internal();
320        item.declare_triggers(&mut declarer)?;
321        let budget = declarer.budget;
322        let decls = declarer.into_decls();
323
324        // REQ_0268: reject ill-defined trigger shapes (cyclic+event, zero
325        // period) before the task joins the table — the natural validation
326        // point, where the decls are first available, for every DispatchMode.
327        validate_decls(&id, &decls)?;
328
329        let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
330        let app_id = item_box.app_id();
331        let app_inst = item_box.app_instance_id();
332        // SAFETY: the raw pointer points into the heap allocation of
333        // `item_box`. `Box` keeps that allocation at a stable address even
334        // when the `Box` itself is moved (e.g. when `self.tasks` grows),
335        // so the pointer remains valid for the lifetime of the
336        // `TaskEntry`. See SendItemPtr safety doc for the rest of the
337        // discipline (barrier() pairs with worker access).
338        #[allow(unsafe_code)]
339        let item_ptr =
340            SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
341
342        // Allocate the per-task atomics now so the dispatch closure
343        // and the `TaskEntry` share the same `Arc` storage. The task
344        // will occupy `self.tasks.len()` after the push below — capture
345        // that index up front for `task_idx_u32`. Bounded workspace, so
346        // the `as u32` cast is sound; explicit allow keeps clippy quiet.
347        let task_fault = Arc::new(FaultAtomic::new());
348        let overrun_count = Arc::new(AtomicU64::new(0));
349        let scan_period = scan_period_from_decls(&decls);
350        let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
351        #[allow(clippy::cast_possible_truncation)]
352        let task_idx_u32 = self.tasks.len() as u32;
353        let fault_ctx = FaultDispatchCtx {
354            task_budget: budget,
355            task_fault: Arc::clone(&task_fault),
356            overrun_count: Arc::clone(&overrun_count),
357            iteration_budget: self.iteration_budget,
358            exec_fault: Arc::clone(&self.exec_fault),
359            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
360            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
361            task_idx_u32,
362            exec_start: Arc::clone(&self.start_time),
363            observer: Arc::clone(&self.observer),
364        };
365
366        let job = build_single_job(
367            id.clone(),
368            self.stoppable.clone(),
369            Arc::clone(&self.observer),
370            Arc::clone(&self.monitor),
371            Arc::clone(&self.iter_err),
372            app_id,
373            app_inst,
374            item_ptr,
375            fault_ctx,
376            Arc::clone(&last_took_ns),
377            Arc::clone(&self.clock),
378        );
379
380        self.tasks.push(TaskEntry {
381            id: id.clone(),
382            kind: TaskKind::Single(item_box),
383            decls,
384            job: Some(job),
385            budget,
386            fault: task_fault,
387            overrun_count,
388            handler_job: None,
389            scan_period,
390            last_took_ns: Arc::clone(&last_took_ns),
391            last_dispatch: None,
392            grid_slot: 0,
393            grid_epoch: None,
394            pending_skipped: 0,
395            pending_late: None,
396            pending_cycle: None,
397        });
398        self.cycle_stats
399            .push(TaskCycleStats::new(self.stats_window));
400        Ok(id)
401    }
402
403    /// Register an item plus a fault-handler item.
404    ///
405    /// The main item is registered through the canonical [`add`](Self::add)
406    /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
407    /// is called (so handlers that internally rely on the declarer being
408    /// invoked observe the call) but its returned trigger list is
409    /// **ignored** — the handler dispatches on the main item's triggers
410    /// while the task is in `Faulted` state and runs in place of the main
411    /// item's `execute()`. The pre-built handler dispatch closure is
412    /// stashed on the same task entry as the main item's `job`,
413    /// satisfying `REQ_0072`.
414    ///
415    /// # Errors
416    ///
417    /// Propagates any error from registering the main item via `add`, or
418    /// from the handler's `declare_triggers` call.
419    ///
420    /// # Panics
421    ///
422    /// Panics if the task entry just inserted by [`add`](Self::add) cannot
423    /// be located in `self.tasks` — this is unreachable by construction
424    /// and indicates a logic bug.
425    pub fn add_with_fault_handler<I, H>(
426        &mut self,
427        main: I,
428        handler: H,
429    ) -> Result<TaskId, ExecutorError>
430    where
431        I: ExecutableItem,
432        H: ExecutableItem,
433    {
434        let task_id = self.add(main)?;
435
436        // Drain the handler's trigger declarations — they are ignored by
437        // design (the handler runs on the main item's triggers).
438        let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
439        let mut throwaway = TriggerDeclarer::new_internal();
440        handler_box.declare_triggers(&mut throwaway)?;
441        drop(throwaway);
442
443        let app_id = handler_box.app_id();
444        let app_inst = handler_box.app_instance_id();
445
446        // Locate the task we just added so we can share its per-task
447        // atomics with the handler's `FaultDispatchCtx`. The handler
448        // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
449        // breach increments `overrun_count` and keeps state `Faulted`
450        // without re-firing the observer.
451        let task_idx = self
452            .tasks
453            .iter()
454            .position(|t| t.id == task_id)
455            .expect("just added; must exist");
456        let task = &self.tasks[task_idx];
457        #[allow(clippy::cast_possible_truncation)]
458        let task_idx_u32 = task_idx as u32;
459        let handler_fault_ctx = FaultDispatchCtx {
460            task_budget: task.budget,
461            task_fault: Arc::clone(&task.fault),
462            overrun_count: Arc::clone(&task.overrun_count),
463            iteration_budget: self.iteration_budget,
464            exec_fault: Arc::clone(&self.exec_fault),
465            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
466            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
467            task_idx_u32,
468            exec_start: Arc::clone(&self.start_time),
469            observer: Arc::clone(&self.observer),
470        };
471
472        let handler_closure = build_handler_job(
473            task_id.clone(),
474            self.stoppable.clone(),
475            Arc::clone(&self.observer),
476            Arc::clone(&self.monitor),
477            Arc::clone(&self.iter_err),
478            app_id,
479            app_inst,
480            handler_box,
481            handler_fault_ctx,
482        );
483
484        self.tasks[task_idx].handler_job = Some(handler_closure);
485
486        Ok(task_id)
487    }
488
489    /// Clear a per-task fault. Returns the previous `FaultState`.
490    /// Fires `Observer::on_task_clear` if the state changed from
491    /// `Faulted` to `Running`. `REQ_0070`.
492    ///
493    /// # Errors
494    ///
495    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
496    /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
497    pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
498        let entry = self
499            .tasks
500            .iter()
501            .find(|t| t.id == task)
502            .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
503        let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
504        let prev = entry.fault.swap(FaultState::Running, budget_ms);
505        match prev {
506            FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
507            FaultState::Faulted { .. } => {
508                self.observer.on_task_clear(task);
509                Ok(prev)
510            }
511        }
512    }
513
514    /// Clear the executor-wide fault and cascade-clear every task whose
515    /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
516    /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
517    /// breach is independent). Fires `Observer::on_executor_clear` and
518    /// one `Observer::on_task_clear` per cascade-cleared task.
519    /// `REQ_0071`.
520    ///
521    /// # Errors
522    ///
523    /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
524    pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
525        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
526        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
527        let prev = self
528            .exec_fault
529            .swap(ExecutorFaultState::Running, task_idx, budget_ms);
530        match prev {
531            ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
532            ExecutorFaultState::Faulted { .. } => {
533                // Cascade-clear tasks whose reason is ExecutorFaulted.
534                for entry in &self.tasks {
535                    let task_budget_ms =
536                        entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
537                    if let FaultState::Faulted {
538                        reason: FaultReason::ExecutorFaulted,
539                        ..
540                    } = entry.fault.load(task_budget_ms)
541                    {
542                        let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
543                        self.observer.on_task_clear(entry.id.clone());
544                    }
545                }
546                self.observer.on_executor_clear();
547                Ok(prev)
548            }
549        }
550    }
551
552    /// Return the per-task overrun counter — number of times the task's
553    /// `execute()` exceeded its budget over the executor's lifetime.
554    /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
555    ///
556    /// # Errors
557    ///
558    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
559    pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
560        self.tasks
561            .iter()
562            .find(|t| t.id == task)
563            .map(|t| t.overrun_count.load(Ordering::Acquire))
564            .ok_or_else(|| ExecutorError::TaskNotFound(task))
565    }
566
567    /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
568    ///
569    /// # Errors
570    ///
571    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
572    pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
573        self.tasks
574            .iter()
575            .find(|t| t.id == task)
576            .map(|t| {
577                let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
578                t.fault.load(budget_ms)
579            })
580            .ok_or_else(|| ExecutorError::TaskNotFound(task))
581    }
582
583    /// Return a snapshot of the executor-wide `ExecutorFaultState`.
584    /// `REQ_0073` (pull path).
585    #[must_use]
586    pub fn executor_fault_state(&self) -> ExecutorFaultState {
587        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
588        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
589        self.exec_fault.load(task_idx, budget_ms)
590    }
591
592    /// Add a sequential chain of items. Only the head item's
593    /// `declare_triggers` is consulted; non-head triggers are ignored with a
594    /// tracing warn.
595    pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
596    where
597        I: ExecutableItem,
598        C: IntoIterator<Item = I>,
599    {
600        let id = TaskId::new(format!(
601            "chain-{}",
602            self.next_id.fetch_add(1, Ordering::SeqCst)
603        ));
604        let boxed: Vec<Box<dyn ExecutableItem>> = items
605            .into_iter()
606            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
607            .collect();
608        self.add_chain_with_id_boxed(id, boxed)
609    }
610
611    /// Like [`Executor::add_chain`] but with a user-supplied id.
612    pub fn add_chain_with_id<I, C>(
613        &mut self,
614        id: impl Into<TaskId>,
615        items: C,
616    ) -> Result<TaskId, ExecutorError>
617    where
618        I: ExecutableItem,
619        C: IntoIterator<Item = I>,
620    {
621        let boxed: Vec<Box<dyn ExecutableItem>> = items
622            .into_iter()
623            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
624            .collect();
625        self.add_chain_with_id_boxed(id.into(), boxed)
626    }
627
628    fn add_chain_with_id_boxed(
629        &mut self,
630        id: TaskId,
631        mut items: Vec<Box<dyn ExecutableItem>>,
632    ) -> Result<TaskId, ExecutorError> {
633        if items.is_empty() {
634            return Err(ExecutorError::Builder(
635                "chain must contain at least one item".into(),
636            ));
637        }
638
639        // Head item's `task_id()` override wins over the user-supplied id.
640        let id = items[0].task_id().map_or(id, TaskId::new);
641
642        // Head's triggers gate the chain.
643        let mut head_declarer = TriggerDeclarer::new_internal();
644        items[0].declare_triggers(&mut head_declarer)?;
645        let decls = head_declarer.into_decls();
646
647        // REQ_0268: same trigger-shape validation as the single-item path,
648        // applied to the head item's decls (which gate the whole chain).
649        validate_decls(&id, &decls)?;
650
651        // Warn if non-head items declared triggers (those will be ignored).
652        for (i, body) in items.iter_mut().enumerate().skip(1) {
653            let mut spurious = TriggerDeclarer::new_internal();
654            let _ = body.declare_triggers(&mut spurious);
655            if !spurious.is_empty() {
656                #[cfg(feature = "tracing")]
657                tracing::warn!(
658                    target: "taktora-executor",
659                    task = %id,
660                    position = i,
661                    "non-head chain item declared triggers; they will be ignored"
662                );
663                #[cfg(not(feature = "tracing"))]
664                {
665                    let _ = i;
666                }
667            }
668        }
669
670        let mut items = items;
671        // SAFETY: pointer into the chain's `items` Vec. The Vec lives
672        // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
673        // is stable once `add_chain` returns — `self.tasks` may grow
674        // (moving the `Vec<Box<...>>` header itself), but the Vec's
675        // heap buffer is referenced via the header's data pointer and
676        // is unaffected by header moves. We never resize the chain Vec
677        // after this point. See SendChainPtr safety doc for the rest.
678        #[allow(unsafe_code)]
679        let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
680            &mut items,
681        ));
682        // NB: the pointer above is to the local `items` Vec on the
683        // stack — it's invalid after the `push` below moves items into
684        // the TaskEntry. We rederive a stable pointer after the push.
685        // (See the rebuild step below.)
686        let _ = chain_ptr;
687
688        // Pre-allocate the per-task atomics so the chain's dispatch
689        // closure can capture clones of the same `Arc`s the `TaskEntry`
690        // holds. The chain occupies `self.tasks.len()` after the push.
691        let task_fault = Arc::new(FaultAtomic::new());
692        let overrun_count = Arc::new(AtomicU64::new(0));
693        let scan_period = scan_period_from_decls(&decls);
694        let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
695        #[allow(clippy::cast_possible_truncation)]
696        let task_idx_u32 = self.tasks.len() as u32;
697
698        self.tasks.push(TaskEntry {
699            id: id.clone(),
700            kind: TaskKind::Chain(items),
701            decls,
702            job: None, // populated in the rebuild step below
703            // TODO(post-Task-10): chain budgets carried separately; for now None.
704            budget: None,
705            fault: Arc::clone(&task_fault),
706            overrun_count: Arc::clone(&overrun_count),
707            handler_job: None,
708            scan_period,
709            last_took_ns: Arc::clone(&last_took_ns),
710            last_dispatch: None,
711            grid_slot: 0,
712            grid_epoch: None,
713            pending_skipped: 0,
714            pending_late: None,
715            pending_cycle: None,
716        });
717        self.cycle_stats
718            .push(TaskCycleStats::new(self.stats_window));
719
720        // After the push, the TaskEntry lives at a stable position in
721        // `self.tasks` for the duration of this `add_chain_with_id_boxed`
722        // call. Take a stable pointer to its chain Vec and build the
723        // dispatch closure. If `self.tasks` later grows, the Vec header
724        // inside the TaskEntry moves but the header's data pointer
725        // (which addresses the chain's heap buffer) does not — and the
726        // closure derefs that pointer per dispatch, so it re-reads the
727        // current heap address each time. Sound under the same
728        // discipline as `tasks_ptr` in dispatch_loop.
729        let task_idx = self.tasks.len() - 1;
730        let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
731        {
732            TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
733            // The push above used TaskKind::Chain, so this arm is
734            // unreachable. Mark it explicitly to satisfy `match`.
735            _ => unreachable!("just-pushed task is TaskKind::Chain"),
736        };
737        #[allow(unsafe_code)]
738        let chain_ptr = SendChainPtr::new(chain_vec_ptr);
739        let fault_ctx = FaultDispatchCtx {
740            task_budget: None, // chain budgets are intentionally None for now
741            task_fault,
742            overrun_count,
743            iteration_budget: self.iteration_budget,
744            exec_fault: Arc::clone(&self.exec_fault),
745            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
746            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
747            task_idx_u32,
748            exec_start: Arc::clone(&self.start_time),
749            observer: Arc::clone(&self.observer),
750        };
751        let job = build_chain_job(
752            id.clone(),
753            self.stoppable.clone(),
754            Arc::clone(&self.observer),
755            Arc::clone(&self.monitor),
756            Arc::clone(&self.iter_err),
757            chain_ptr,
758            fault_ctx,
759            Arc::clone(&last_took_ns),
760            Arc::clone(&self.clock),
761        );
762        self.tasks[task_idx].job = Some(job);
763        Ok(id)
764    }
765
766    /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
767    /// executor is built. Clone before calling `run()` — any clone taken at any
768    /// time will wake the `WaitSet` when `stop()` is called.
769    #[must_use]
770    pub fn stoppable(&self) -> Stoppable {
771        self.stoppable.clone()
772    }
773
774    /// Borrow the underlying iceoryx2 node (escape hatch for power users).
775    pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
776        &self.node
777    }
778
779    /// Begin building a graph. Call `.build()` on the returned builder to
780    /// register the graph as a task.
781    pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
782        ExecutorGraphBuilder {
783            executor: self,
784            builder: crate::graph::GraphBuilder::new(),
785            custom_id: None,
786        }
787    }
788}
789
790/// Builder for [`Executor`].
791pub struct ExecutorBuilder {
792    worker_threads: Option<usize>,
793    observer: Option<Arc<dyn Observer>>,
794    monitor: Option<Arc<dyn ExecutionMonitor>>,
795    worker_attrs: ThreadAttributes,
796    /// Executor-wide iteration budget (`REQ_0071`). `None` means no
797    /// executor-wide check.
798    iteration_budget: Option<Duration>,
799    /// User-supplied fatal handler. `None` → resolved to a no-op `Arc` in
800    /// `build()`.
801    fatal_handler: Option<FatalHandler>,
802    /// Sliding-window size (samples) for cycle-stats aggregation
803    /// (`REQ_0100`). `None` → resolved to `1024` in `build()`.
804    stats_window: Option<u32>,
805    /// Telemetry time source. `None` → resolved to [`SystemClock`] in
806    /// `build()`. Override with a [`MockClock`](crate::MockClock) for
807    /// deterministic timing tests.
808    clock: Option<Arc<dyn MonotonicClock>>,
809    /// Cyclic dispatch timing strategy (`REQ_0268`). Default
810    /// [`DispatchMode::Grid`](crate::DispatchMode).
811    dispatch_mode: crate::DispatchMode,
812    /// Scheduling clock for the absolute grid. `None` → resolved to
813    /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock) in `build()`.
814    cyclic_clock: Option<std::sync::Arc<dyn crate::CyclicClock>>,
815}
816
817impl Default for ExecutorBuilder {
818    fn default() -> Self {
819        Self {
820            worker_threads: None,
821            observer: None,
822            monitor: None,
823            worker_attrs: ThreadAttributes::new(),
824            iteration_budget: None,
825            fatal_handler: None,
826            stats_window: None,
827            clock: None,
828            dispatch_mode: crate::DispatchMode::default(),
829            cyclic_clock: None,
830        }
831    }
832}
833
834impl ExecutorBuilder {
835    /// Number of worker threads. `0` → inline (no pool). Default → physical
836    /// cores.
837    #[must_use]
838    pub const fn worker_threads(mut self, n: usize) -> Self {
839        self.worker_threads = Some(n);
840        self
841    }
842
843    /// Attach a lifecycle observer. If not called, a no-op observer is used.
844    #[must_use]
845    pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
846        self.observer = Some(obs);
847        self
848    }
849
850    /// Attach an execution monitor. If not called, a no-op monitor is used.
851    #[must_use]
852    pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
853        self.monitor = Some(mon);
854        self
855    }
856
857    /// Configure the executor-wide iteration budget. Any task whose
858    /// `execute()` exceeds `dur` transitions the executor to `Faulted`
859    /// (`REQ_0071`). Default: unset (no executor-wide check).
860    #[must_use]
861    pub const fn iteration_budget(mut self, dur: Duration) -> Self {
862        self.iteration_budget = Some(dur);
863        self
864    }
865
866    /// Sliding-window size (samples) for percentile / min-max / jitter /
867    /// lateness aggregation (`REQ_0100`). Default `1024`.
868    #[must_use]
869    pub const fn stats_window(mut self, samples: u32) -> Self {
870        self.stats_window = Some(samples);
871        self
872    }
873
874    /// Substitute the telemetry time source. Defaults to [`SystemClock`].
875    ///
876    /// Pass a [`MockClock`](crate::MockClock) clone to drive `took` / jitter /
877    /// lateness from scripted instants, making timing assertions exact and
878    /// independent of the host scheduler. The clock affects telemetry only —
879    /// scheduling, run-mode deadlines and fault detection always use the real
880    /// monotonic clock.
881    #[must_use]
882    pub fn clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
883        self.clock = Some(clock);
884        self
885    }
886
887    /// Select cyclic dispatch timing (default `DispatchMode::Grid`). `Legacy` is
888    /// the pre-REQ_0268 `attach_interval` path, retained only until the Pi A/B.
889    #[must_use]
890    pub const fn dispatch_mode(mut self, mode: crate::DispatchMode) -> Self {
891        self.dispatch_mode = mode;
892        self
893    }
894
895    /// Override the scheduling clock (default `MonotonicCyclicClock`). Distinct
896    /// from `clock` (telemetry) — see `CyclicClock`.
897    #[must_use]
898    pub fn cyclic_clock(mut self, clock: std::sync::Arc<dyn crate::CyclicClock>) -> Self {
899        self.cyclic_clock = Some(clock);
900        self
901    }
902
903    /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
904    /// for worker threads. Has no effect when `worker_threads` is `0` (inline
905    /// mode). Requires the `thread_attrs` feature for non-default settings.
906    #[must_use]
907    #[allow(clippy::missing_const_for_fn)]
908    pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
909        self.worker_attrs = attrs;
910        self
911    }
912
913    /// Register a best-effort last-gasp handler invoked once on the fail-fast
914    /// path immediately before `std::process::abort()`.
915    ///
916    /// **Contract**: runs over known-unsound executor state — MUST NOT touch
917    /// executor internals; a panic inside the handler routes straight to
918    /// `abort()`.
919    ///
920    /// The handler is expected to be time-bounded (the caller's responsibility);
921    /// no runtime deadline is imposed.
922    ///
923    /// **Observer / monitor containment carve-out**: the panic containment
924    /// described in the executor documentation covers only a user item's
925    /// `execute()` call. Panics that originate in framework-invoked user
926    /// callbacks that run *outside* that inner catch — such as
927    /// [`Observer`](crate::Observer) methods (e.g. `on_app_error`,
928    /// `on_task_fault`) and [`ExecutionMonitor`](crate::ExecutionMonitor)
929    /// methods (e.g. `post_execute`) — escape to this fail-fast boundary and
930    /// cause `abort()`. Those callbacks must therefore be treated as
931    /// non-panicking by the implementor. See `REQ_0123`.
932    ///
933    /// If not called, a no-op handler is used and `abort()` is still reached
934    /// after any unrecoverable fault.
935    #[must_use]
936    pub fn on_fatal(
937        mut self,
938        handler: impl Fn(&crate::FatalContext) + Send + Sync + 'static,
939    ) -> Self {
940        self.fatal_handler = Some(Arc::new(handler));
941        self
942    }
943
944    /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
945    /// internal stop-event service so that any `Stoppable` clone (taken before
946    /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
947    ///
948    /// # Panics
949    ///
950    /// Panics if the internally-generated stop-event service name exceeds the
951    /// iceoryx2 service name length limit (this cannot happen under normal use
952    /// because the name is derived from the process id and a monotonic counter).
953    #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
954    #[track_caller]
955    pub fn build(self) -> Result<Executor, ExecutorError> {
956        let node = NodeBuilder::new()
957            .create::<ipc::Service>()
958            .map_err(ExecutorError::iceoryx2)?;
959
960        let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
961
962        // Resolve the fatal handler: use the user-supplied one or fall back to a no-op.
963        let fatal_handler: FatalHandler = self
964            .fatal_handler
965            .unwrap_or_else(|| Arc::new(|_ctx: &crate::FatalContext| {}));
966        let fatal_dispatch = Arc::new(FatalDispatch::new(fatal_handler));
967
968        let pool = Arc::new(Pool::new(
969            n_workers,
970            self.worker_attrs,
971            Arc::clone(&fatal_dispatch),
972        )?);
973
974        // Build the internal stop event service with a unique-per-process name
975        // so multiple executors in the same process don't collide.
976        let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
977        let stop_topic = format!(
978            "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
979            std::process::id()
980        );
981        let stop_event = node
982            .service_builder(&stop_topic.as_str().try_into().unwrap())
983            .event()
984            .open_or_create()
985            .map_err(ExecutorError::iceoryx2)?;
986
987        let stop_notifier = Arc::new(
988            stop_event
989                .notifier_builder()
990                .create()
991                .map_err(ExecutorError::iceoryx2)?,
992        );
993
994        // SAFETY: see module-level note; Arc<IxListener> is held here and only
995        // accessed on the executor thread.
996        let stop_listener = Arc::new(
997            stop_event
998                .listener_builder()
999                .create()
1000                .map_err(ExecutorError::iceoryx2)?,
1001        );
1002
1003        // Wire the notifier into the Stoppable so every clone is waker-aware
1004        // from the moment the executor is built.
1005        let stoppable = Stoppable::with_waker(stop_notifier);
1006
1007        let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
1008
1009        let monitor: Arc<dyn ExecutionMonitor> =
1010            self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
1011
1012        let clock: Arc<dyn MonotonicClock> =
1013            self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
1014
1015        let cyclic_clock: std::sync::Arc<dyn crate::CyclicClock> = self
1016            .cyclic_clock
1017            .unwrap_or_else(|| std::sync::Arc::new(crate::MonotonicCyclicClock::new()));
1018
1019        let exec = Executor {
1020            node,
1021            pool,
1022            tasks: Vec::new(),
1023            cycle_stats: Vec::new(),
1024            stats_window: self.stats_window.unwrap_or(1024),
1025            running: Arc::new(AtomicBool::new(false)),
1026            stoppable,
1027            next_id: AtomicU64::new(0),
1028            stop_listener,
1029            observer,
1030            monitor,
1031            iter_err: Arc::new(std::sync::Mutex::new(None)),
1032            iteration_budget: self.iteration_budget,
1033            exec_fault: Arc::new(ExecutorFaultAtomic::new()),
1034            exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
1035            exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
1036            start_time: Arc::new(OnceLock::new()),
1037            fatal_dispatch,
1038            clock,
1039            dispatch_mode: self.dispatch_mode,
1040            cyclic_clock,
1041        };
1042
1043        Ok(exec)
1044    }
1045}
1046
1047// ── Run loop ──────────────────────────────────────────────────────────────────
1048
1049impl Executor {
1050    /// Run the executor until [`Stoppable::stop`] is called or a task signals
1051    /// stop via [`crate::Context::stop_executor`].
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1056    ///
1057    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1058    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1059    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1060    ///
1061    /// If multiple items error in the same dispatch iteration, only the first
1062    /// is preserved; subsequent errors are discarded silently. To observe
1063    /// every error, attach an [`Observer`](crate::Observer) and read errors
1064    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1065    pub fn run(&mut self) -> Result<(), ExecutorError> {
1066        self.run_inner(RunMode::Forever)
1067    }
1068
1069    /// Run for at most `max` wall-clock duration, then return.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1074    ///
1075    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1076    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1077    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1078    ///
1079    /// If multiple items error in the same dispatch iteration, only the first
1080    /// is preserved; subsequent errors are discarded silently. To observe
1081    /// every error, attach an [`Observer`](crate::Observer) and read errors
1082    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1083    pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
1084        self.run_inner(RunMode::Until(Instant::now() + max))
1085    }
1086
1087    /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1092    ///
1093    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1094    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1095    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1096    ///
1097    /// If multiple items error in the same dispatch iteration, only the first
1098    /// is preserved; subsequent errors are discarded silently. To observe
1099    /// every error, attach an [`Observer`](crate::Observer) and read errors
1100    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1101    pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
1102        self.run_inner(RunMode::Iterations(n))
1103    }
1104
1105    /// Run until `predicate()` returns true. Checked after each `WaitSet`
1106    /// wakeup.
1107    ///
1108    /// # Errors
1109    ///
1110    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1111    ///
1112    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1113    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1114    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1115    ///
1116    /// If multiple items error in the same dispatch iteration, only the first
1117    /// is preserved; subsequent errors are discarded silently. To observe
1118    /// every error, attach an [`Observer`](crate::Observer) and read errors
1119    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1120    pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
1121        self.run_inner(RunMode::Predicate(&mut predicate))
1122    }
1123}
1124
1125enum RunMode<'a> {
1126    Forever,
1127    Until(Instant),
1128    Iterations(usize),
1129    Predicate(&'a mut dyn FnMut() -> bool),
1130}
1131
1132impl Executor {
1133    fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
1134        // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
1135        // remains true permanently. Calling `run()` again after a stop will return
1136        // promptly without doing any meaningful work (it blocks until the first
1137        // trigger fires, then immediately exits the dispatch loop). Task 10's
1138        // Runner accommodates this by treating an Executor as one-shot: each
1139        // Runner owns the Executor and consumes it.
1140        if self.running.swap(true, Ordering::SeqCst) {
1141            return Err(ExecutorError::AlreadyRunning);
1142        }
1143
1144        self.observer.on_executor_up();
1145        let result = self.dispatch_loop(&mut mode);
1146        match &result {
1147            Ok(()) => self.observer.on_executor_down(),
1148            Err(e) => self.observer.on_executor_error(e),
1149        }
1150
1151        self.running.store(false, Ordering::SeqCst);
1152        result
1153    }
1154
1155    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1156    #[allow(
1157        unsafe_code,
1158        clippy::too_many_lines,
1159        clippy::ref_as_ptr,
1160        clippy::borrow_as_ptr
1161    )]
1162    fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
1163        // Tight timer slack for the dispatch thread (REQ_0274). SCHED_OTHER
1164        // threads inherit the kernel's 50 µs default, and `epoll_wait`
1165        // sleeps through `schedule_hrtimeout_range` WITH that slack: on the
1166        // Pi5 rig that cost 56 µs/cycle accumulated Legacy-dispatch drift
1167        // (5.5 µs/cycle at 1 µs slack — the residual is iceoryx2's
1168        // ms-rounded epoll timeout + wake latency). Real-time classes force
1169        // slack to 0, so this is a no-op under SCHED_FIFO and for the
1170        // Grid/timerfd path (fd readiness, not timeout, drives the wake);
1171        // it removes a 10× cyclic-precision regression for non-RT Legacy
1172        // deployments. Thread-local, never fails for valid args; an error
1173        // (exotic kernel) would only restore today's behavior, so the
1174        // return value is deliberately not checked.
1175        #[cfg(target_os = "linux")]
1176        // SAFETY: prctl(PR_SET_TIMERSLACK) only adjusts the calling
1177        // thread's timer slack; no memory is involved.
1178        unsafe {
1179            libc::prctl(libc::PR_SET_TIMERSLACK, 1_000u64, 0, 0, 0);
1180        }
1181
1182        let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
1183            .create()
1184            .map_err(ExecutorError::iceoryx2)?;
1185
1186        // Keep Arc<RawListener> alive for at least as long as the WaitSet
1187        // guards — the guard borrows the listener via 'attachment lifetime.
1188        let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
1189        // Guards must outlive the run loop.
1190        let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
1191        // Maps guard index → task index.
1192        let mut attachment_to_task: Vec<usize> = Vec::new();
1193
1194        // Hoist to a local for the hot loop — one Copy-enum compare per cycle,
1195        // never a field re-read (REQ_0268).
1196        let dispatch_mode = self.dispatch_mode;
1197
1198        // Cyclic tasks are dispatched by the master timer + GridTimer (REQ_0268),
1199        // not attached as individual WaitSet triggers. Cross-platform: only the
1200        // wake source differs (Task 3).
1201        let mut cyclic_task_indices: Vec<usize> = Vec::new();
1202        let mut cyclic_periods: Vec<u64> = Vec::new();
1203        build_attachments(
1204            &waitset,
1205            &self.tasks,
1206            dispatch_mode,
1207            &mut listener_storage,
1208            &mut guards,
1209            &mut attachment_to_task,
1210            &mut cyclic_task_indices,
1211            &mut cyclic_periods,
1212        )?;
1213        // `cyclic_periods` is cloned, not moved, because Task 3 reads it again to
1214        // arm the single master timerfd; on non-Linux it is unused after this.
1215        let mut grid =
1216            crate::grid::GridTimer::new(self.cyclic_clock.now_nanos(), cyclic_periods.clone());
1217        let mut due_cyclic: Vec<(usize, u64, u64)> = Vec::new();
1218
1219        // Master cyclic timer (REQ_0268, Linux). ONE timerfd armed at the base
1220        // period (gcd of cyclic periods) drives the absolute grid; GridTimer
1221        // decides which tasks are due each tick. Declared above its own guard so
1222        // it drops AFTER the guard (detach before close → no EBADF). Must be
1223        // declared here, after `build_attachments` has filled `cyclic_periods`.
1224        #[cfg(target_os = "linux")]
1225        let master_timer: Option<crate::timerfd::TimerFd> = {
1226            let base = crate::grid::base_period(&cyclic_periods);
1227            if base == 0 {
1228                None
1229            } else {
1230                Some(
1231                    crate::timerfd::TimerFd::new(std::time::Duration::from_nanos(base)).map_err(
1232                        |e| {
1233                            ExecutorError::DeclareTriggers(format!(
1234                                "failed to arm master timerfd: {e}"
1235                            ))
1236                        },
1237                    )?,
1238                )
1239            }
1240        };
1241
1242        // Attach the master timer as a wake-only notification, held separately
1243        // (like the stop listener) so `process_attachment` never maps it to a
1244        // task. `_master_timer_guard` is declared immediately after `master_timer`
1245        // so on scope exit it drops FIRST — detaching the fd from the WaitSet's
1246        // epoll set — and `master_timer` drops SECOND, closing the fd. That
1247        // ordering is what prevents iceoryx2's `EPOLL_CTL_DEL` from hitting a
1248        // closed fd (EBADF). `master_timer`'s fd is referenced ONLY by this guard
1249        // (independent of `guards`/`listener_storage`, which own other fds), so
1250        // its drop position relative to those Vecs is immaterial.
1251        #[cfg(target_os = "linux")]
1252        #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1253        let _master_timer_guard = match &master_timer {
1254            // SAFETY: `master_timer` is a stack local that outlives this guard
1255            // (declared above it); the cast erases the borrow lifetime to the
1256            // attachment lifetime, sound by the same discipline as the stop
1257            // listener. Dropped before `master_timer` closes the fd.
1258            Some(tf) => Some(
1259                waitset
1260                    .attach_notification(unsafe { &*(tf as *const crate::timerfd::TimerFd) })
1261                    .map_err(ExecutorError::iceoryx2)?,
1262            ),
1263            None => None,
1264        };
1265
1266        // Attach the internal stop listener so the WaitSet wakes when
1267        // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
1268        // struct which is valid for the lifetime of dispatch_loop. We use the
1269        // same raw-pointer-cast pattern as user listeners above.
1270        //
1271        // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
1272        // exclusively borrowed for the duration of `run_inner` (which calls
1273        // `dispatch_loop`). The listener is not freed while the guard is alive
1274        // because the Arc keeps it alive and `self` outlives this function.
1275        let stop_listener_ref: &IxListener<ipc::Service> =
1276            unsafe { &*(self.stop_listener.as_ref() as *const _) };
1277        let _stop_guard = waitset
1278            .attach_notification(stop_listener_ref)
1279            .map_err(ExecutorError::iceoryx2)?;
1280
1281        let iterations_done = AtomicUsize::new(0);
1282        let stop_flag = self.stoppable.clone();
1283
1284        loop {
1285            // Reset the pre-allocated per-iteration error slot (REQ_0060):
1286            // the slot is owned by `self.iter_err`, allocated once at build
1287            // time. Pool worker closures obtain a refcount-only clone of
1288            // the `Arc`; the slot itself is reused across iterations.
1289            #[allow(clippy::unwrap_used)]
1290            // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1291            let mut iter_err_guard = self.iter_err.lock().unwrap();
1292            *iter_err_guard = None;
1293            drop(iter_err_guard);
1294
1295            // SAFETY: we capture &mut self.tasks via a raw pointer because
1296            // wait_and_process expects FnMut and Rust can't see the closure
1297            // outlives `self`. The discipline that makes this sound:
1298            //   1. The closure body on the executor thread is the *only* code that
1299            //      reads `tasks_ptr`. The pool jobs it submits hold borrowed
1300            //      `*mut dyn ExecutableItem` slices into individual TaskEntries,
1301            //      not into the Vec itself, so they don't race with the Vec.
1302            //   2. `pool.barrier()` at the end of this callback ensures every
1303            //      submitted pool job has completed (and dropped its raw pointer)
1304            //      before the callback returns. The next iteration of the WaitSet
1305            //      loop is therefore the sole user of `tasks_ptr` again.
1306            //   3. The Vec is never resized inside this loop (no `push` / `remove`
1307            //      after dispatch starts), so the underlying buffer addresses are
1308            //      stable for the lifetime of `dispatch_loop`.
1309            let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
1310            // Take the cycle_stats raw pointer before borrowing `observer`, so
1311            // the &mut borrow is released first — same discipline as tasks_ptr.
1312            let cycle_stats_ptr = &mut self.cycle_stats as *mut Vec<TaskCycleStats>;
1313            let observer = &self.observer;
1314            let pool = &self.pool;
1315            // Refcount-only clone of the pre-allocated error slot. Pool jobs
1316            // need a `'static` handle, and an `Arc::clone` does not allocate.
1317            // The Single/Chain paths use the closure baked into `task.job`,
1318            // which already captured stable Arc clones at `add`-time; the
1319            // Graph path uses closures pre-built by `prepare_dispatch`. Only
1320            // the error-aggregation logic on the WaitSet thread still needs
1321            // the slot here.
1322            let iter_err_inner = Arc::clone(&self.iter_err);
1323            // Raw pointer to the stop listener for draining inside the callback.
1324            // SAFETY: same as stop_listener_ref above — the Arc is alive for
1325            // the lifetime of dispatch_loop.
1326            let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
1327            // Raw pointer to the executor-wide fault state. Same safety
1328            // discipline as `tasks_ptr`: `Executor` is alive for the
1329            // duration of `dispatch_loop`; the WaitSet callback is the
1330            // only reader. REQ_0071. `self.exec_fault` is
1331            // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
1332            // pointer to the inner `ExecutorFaultAtomic`.
1333            let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
1334            // Raw pointer to the executor start time. Used by the lazy
1335            // cascade below to compute `since_ms` on task transitions
1336            // triggered by an executor-wide fault.
1337            let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1338            // Telemetry clock. Same lifetime/aliasing discipline as the
1339            // pointers above: the Executor outlives the dispatch loop and the
1340            // WaitSet callback is the sole reader.
1341            let clock = &self.clock;
1342
1343            // Wrap the per-iteration dispatch body in the framework panic
1344            // boundary. A panic escaping here is *infrastructure* (the WaitSet
1345            // drive, pool submission/barrier, or dispatch wiring) — not a user
1346            // item panic, which is already caught and faulted inside
1347            // `run_item_catch_unwind`. On such a panic `guard_or_fatal` runs the
1348            // user fatal handler then aborts in production. Under a test
1349            // terminal it returns `None`, in which case we must NOT keep
1350            // iterating over possibly-corrupt executor state, so we break out.
1351            let Some(cb_result) =
1352                guard_or_fatal(&self.fatal_dispatch, FatalSite::ExecutorRunLoop, || {
1353                    // Bundle the per-iteration captures into a single context the
1354                    // WaitSet callback delegates to. Keeping the closure a thin
1355                    // adapter over `DispatchPass::process_attachment` keeps the
1356                    // dispatch logic in named, individually-measurable functions.
1357                    let mut pass = DispatchPass {
1358                        guards: &guards,
1359                        attachment_to_task: &attachment_to_task,
1360                        tasks_ptr,
1361                        cycle_stats_ptr,
1362                        observer,
1363                        exec_fault_ptr,
1364                        exec_start_ptr,
1365                        clock,
1366                        stop_listener_ptr,
1367                        pool,
1368                        iter_err: &iter_err_inner,
1369                    };
1370
1371                    // Linux: block on fds — the master timerfd wakes us on the
1372                    // absolute grid. Non-Linux dev: bound the wait by the earliest
1373                    // pending grid target so the post-wait pass can dispatch.
1374                    #[cfg(target_os = "linux")]
1375                    let timeout = std::time::Duration::MAX;
1376                    #[cfg(not(target_os = "linux"))]
1377                    let timeout = match dispatch_mode {
1378                        crate::DispatchMode::Grid => {
1379                            grid.next_timeout(self.cyclic_clock.now_nanos())
1380                        }
1381                        crate::DispatchMode::Legacy => std::time::Duration::MAX,
1382                    };
1383                    waitset.wait_and_process_once_with_timeout(
1384                        |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1385                            pass.process_attachment(&attachment_id)
1386                        },
1387                        timeout,
1388                    )
1389                })
1390            else {
1391                // Only reachable under a test terminal (production aborts in
1392                // `fire`). Bail out of the run loop rather than continuing over
1393                // possibly-corrupt executor state.
1394                //
1395                // Unreachable in production: the production terminal aborts
1396                // before returning, so this branch exists solely so a
1397                // `#[cfg(test)]` recording terminal can unwind the loop.
1398                // Consequently, silently discarding any pending `iter_err`
1399                // here is immaterial to production behavior.
1400                break Ok(());
1401            };
1402
1403            // Did the master timer tick this wake? Linux: drain it (clears epoll
1404            // readiness; >0 overruns means the absolute grid advanced). Non-Linux:
1405            // the self-computed timeout drove the wake, so always consult the grid
1406            // (take_due self-gates per task on `now >= next`). REQ_0268.
1407            #[cfg(target_os = "linux")]
1408            let ticked = master_timer.as_ref().is_some_and(|tf| tf.drain() > 0);
1409            #[cfg(not(target_os = "linux"))]
1410            let ticked = true;
1411
1412            // Post-wait master-grid pass (Grid mode). `run_grid_cyclic_pass`
1413            // self-gates on `ticked` / stop-wake / mode / non-empty, then
1414            // dispatches EVERY due cyclic task atomically this tick (PLC
1415            // semantics). `cpass` is a side-effect-free bundle of borrows, so
1416            // building it unconditionally is free; the gate lives in the helper
1417            // to keep `dispatch_loop` within the complexity budget. REQ_0268.
1418            let cpass = DispatchPass {
1419                guards: &guards,
1420                attachment_to_task: &attachment_to_task,
1421                tasks_ptr,
1422                cycle_stats_ptr,
1423                observer,
1424                exec_fault_ptr,
1425                exec_start_ptr,
1426                clock,
1427                stop_listener_ptr,
1428                pool,
1429                iter_err: &iter_err_inner,
1430            };
1431            run_grid_cyclic_pass(
1432                cpass,
1433                ticked,
1434                dispatch_mode,
1435                &stop_flag,
1436                cb_result,
1437                &mut grid,
1438                self.cyclic_clock.now_nanos(),
1439                &cyclic_task_indices,
1440                &mut due_cyclic,
1441            );
1442
1443            // Funnel the post-callback decision (interrupt / item error /
1444            // stop request / run-mode termination) through one helper that
1445            // yields a single control value, so the loop has exactly one exit.
1446            match self.after_callback(cb_result, mode, &iterations_done, &stop_flag) {
1447                IterOutcome::Continue => {}
1448                IterOutcome::Done => break Ok(()),
1449                IterOutcome::Failed(err) => break Err(err),
1450            }
1451        }
1452    }
1453
1454    /// Evaluates the post-callback termination conditions for one dispatch
1455    /// iteration and reports whether the loop should continue, stop, or fail.
1456    ///
1457    /// Order of precedence matches the original inline checks: `WaitSet`
1458    /// errors, then SIGINT/SIGTERM, then a captured item error, then a stop
1459    /// request, then a bare-EINTR continue (`REQ_0269` — an interrupted wait
1460    /// is a spurious wake, not a termination), then the active [`RunMode`]
1461    /// limit.
1462    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1463    fn after_callback(
1464        &self,
1465        cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1466        mode: &mut RunMode<'_>,
1467        iterations_done: &AtomicUsize,
1468        stop_flag: &Stoppable,
1469    ) -> IterOutcome {
1470        let cb_result = match cb_result.map_err(ExecutorError::iceoryx2) {
1471            Ok(r) => r,
1472            Err(e) => return IterOutcome::Failed(e),
1473        };
1474
1475        // iceoryx2's WaitSet catches SIGINT/SIGTERM internally and reports
1476        // them as `TerminationRequest`; honor that here for a clean exit.
1477        if matches!(cb_result, WaitSetRunResult::TerminationRequest) {
1478            return IterOutcome::Done;
1479        }
1480
1481        // `Interrupt` is a bare EINTR: ANY handled signal — job control
1482        // (SIGSTOP/SIGCONT), a debugger attach, a user-installed handler —
1483        // interrupts the reactor wait. That is a spurious wake, not a
1484        // termination request: a cyclic control runtime must keep running
1485        // (REQ_0269; found on the Pi5 rig, where `kill -STOP`/`-CONT` ended
1486        // `run_n(60000)` cleanly at ~5k cycles). Item errors and `stop()`
1487        // are still honored below; the wake is NOT counted as an iteration
1488        // (nothing was dispatched). A real SIGINT/SIGTERM still terminates:
1489        // iceoryx2 latches it and the NEXT wait call returns
1490        // `TerminationRequest` before blocking.
1491        let interrupted = matches!(cb_result, WaitSetRunResult::Interrupt);
1492
1493        // Extract the error before dropping the MutexGuard — avoids holding the
1494        // lock across the return (clippy::significant_drop_in_scrutinee).
1495        #[allow(clippy::unwrap_used)]
1496        // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1497        let maybe_err = self.iter_err.lock().unwrap().take();
1498        if let Some(err) = maybe_err {
1499            return IterOutcome::Failed(err);
1500        }
1501        if stop_flag.is_stopped() {
1502            return IterOutcome::Done;
1503        }
1504
1505        // An interrupted wake is spurious (REQ_0269): it dispatched nothing,
1506        // so it neither counts as an iteration nor evaluates the run-mode
1507        // limit — it short-circuits to `Continue` and re-enters the wait.
1508        let reached_limit = !interrupted && {
1509            iterations_done.fetch_add(1, Ordering::SeqCst);
1510            match mode {
1511                RunMode::Forever => false,
1512                RunMode::Iterations(n) => iterations_done.load(Ordering::SeqCst) >= *n,
1513                RunMode::Until(deadline) => Instant::now() >= *deadline,
1514                RunMode::Predicate(p) => (p)(),
1515            }
1516        };
1517        if reached_limit {
1518            IterOutcome::Done
1519        } else {
1520            IterOutcome::Continue
1521        }
1522    }
1523}
1524
1525/// Outcome of one `dispatch_loop` iteration's post-callback evaluation.
1526enum IterOutcome {
1527    /// Run another iteration.
1528    Continue,
1529    /// Terminate the loop successfully.
1530    Done,
1531    /// Terminate the loop with the given error.
1532    Failed(ExecutorError),
1533}
1534
1535/// Post-wait absolute-grid pass (Grid mode only, `REQ_0268` / `ADR_0100`).
1536///
1537/// The `WaitSet` callback handles event/fd tasks; cyclic tasks are timed here
1538/// off the scheduling clock. `pass` mirrors the callback's `DispatchPass`
1539/// exactly — same borrows and raw pointers, same single-writer WaitSet-thread
1540/// discipline — and the callback is already dropped (its borrows freed) by the
1541/// time this runs. We poll `grid` for due cyclic slots, dispatch each due task,
1542/// and fold their telemetry through the SHARED [`DispatchPass::barrier_and_record`]
1543/// helper. This is a SEPARATE barrier phase from the callback's: each phase
1544/// barriers and folds only its own `pending_cycle` stashes, so cyclic tasks
1545/// record exactly once, identically to event tasks. We do NOT call
1546/// `record_cycle_for` directly here.
1547///
1548/// Self-gates and returns early (no dispatch, no record) unless this wake should
1549/// run the grid: the master timer ticked (`ticked`), we are in `Grid` mode, it is
1550/// not a stop wake, and there is at least one cyclic task with something due.
1551///
1552/// **Stop-wake suppression (`REQ_0268`)**: a `stop()` (or a SIGINT/SIGTERM
1553/// `cb_result`) must emit no spurious cyclic cycle — Legacy dispatches none on a
1554/// stop wake, so the grid path matches, or a `stop()` would emit one extra cycle
1555/// observation and desync the `FEAT_0038` `cycle_index` join key. Termination
1556/// itself is still decided by `after_callback`; this only suppresses the side
1557/// effects.
1558#[allow(clippy::too_many_arguments)]
1559fn run_grid_cyclic_pass(
1560    mut pass: DispatchPass<'_, '_, '_>,
1561    ticked: bool,
1562    dispatch_mode: crate::DispatchMode,
1563    stop_flag: &Stoppable,
1564    cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1565    grid: &mut crate::grid::GridTimer,
1566    now_nanos: u64,
1567    cyclic_task_indices: &[usize],
1568    due_cyclic: &mut Vec<(usize, u64, u64)>,
1569) {
1570    let stopping = stop_flag.is_stopped()
1571        || matches!(
1572            cb_result,
1573            Ok(WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest)
1574        );
1575    if !ticked
1576        || stopping
1577        || dispatch_mode != crate::DispatchMode::Grid
1578        || cyclic_task_indices.is_empty()
1579    {
1580        return;
1581    }
1582    grid.take_due(now_nanos, due_cyclic);
1583    if due_cyclic.is_empty() {
1584        return;
1585    }
1586    for (slot, skipped, late_by) in due_cyclic.iter() {
1587        pass.dispatch_cyclic(cyclic_task_indices[*slot], *skipped, *late_by);
1588    }
1589    pass.barrier_and_record();
1590}
1591
1592/// Build every `WaitSet` attachment for the task table (`REQ_0268`). In `Grid`
1593/// mode, `TriggerDecl::Interval` cyclic tasks are only *collected* into
1594/// `cyclic_task_indices` / `cyclic_periods` — they are NOT attached as
1595/// individual `WaitSet` triggers. The master timer + `GridTimer` owns their
1596/// wakeups (cross-platform; wake-source wiring is done in the caller).
1597/// Every other decl (and every decl in `Legacy` mode, including `Interval`
1598/// via `attach_interval`) is attached normally. Extracted from `dispatch_loop`
1599/// to keep that function within the cyclomatic-complexity budget.
1600#[allow(clippy::too_many_arguments)]
1601fn build_attachments<'w>(
1602    waitset: &'w WaitSet<ipc::Service>,
1603    tasks: &[TaskEntry],
1604    dispatch_mode: crate::DispatchMode,
1605    listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1606    guards: &mut Vec<WaitSetGuard<'w, 'w, ipc::Service>>,
1607    attachment_to_task: &mut Vec<usize>,
1608    cyclic_task_indices: &mut Vec<usize>,
1609    cyclic_periods: &mut Vec<u64>,
1610) -> Result<(), ExecutorError> {
1611    for (task_idx, task) in tasks.iter().enumerate() {
1612        for decl in &task.decls {
1613            if dispatch_mode == crate::DispatchMode::Grid {
1614                if let TriggerDecl::Interval(d) = decl {
1615                    // Grid mode owns cyclic timing via the master timer + GridTimer;
1616                    // these decls are NOT attached as individual WaitSet triggers.
1617                    cyclic_task_indices.push(task_idx);
1618                    cyclic_periods.push(u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
1619                    continue;
1620                }
1621            }
1622            let guard = attach_trigger_decl(waitset, listener_storage, decl)?;
1623            guards.push(guard);
1624            attachment_to_task.push(task_idx);
1625        }
1626    }
1627    Ok(())
1628}
1629
1630/// Attaches a single [`TriggerDecl`] to `waitset`, returning the resulting
1631/// guard.
1632///
1633/// Listener-backed declarations (`Subscriber`, `Deadline`, `RawListener`)
1634/// clone the listener `Arc` into `listener_storage` to extend its lifetime to
1635/// the surrounding `dispatch_loop` scope; `Interval` attaches a bare timer.
1636///
1637/// # Safety
1638///
1639/// The returned guard borrows the listener via a raw-pointer cast that erases
1640/// its lifetime. Soundness relies on the caller keeping `listener_storage` (and
1641/// `waitset`) alive for at least as long as the guard, and dropping the guards
1642/// before `listener_storage` — exactly the discipline `dispatch_loop` follows.
1643#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1644fn attach_trigger_decl<'w>(
1645    waitset: &'w WaitSet<ipc::Service>,
1646    listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1647    decl: &TriggerDecl,
1648) -> Result<WaitSetGuard<'w, 'w, ipc::Service>, ExecutorError> {
1649    // Clone the listener Arc and obtain a lifetime-erased reference. SAFETY:
1650    // both `listener_storage` and `waitset` are stack-local in `dispatch_loop`
1651    // and dropped together at its end; guards are dropped before
1652    // `listener_storage`.
1653    let mut listener_ref = |listener: &Arc<crate::trigger::RawListener>| {
1654        listener_storage.push(Arc::clone(listener));
1655        let l_ref = listener_storage.last().unwrap().as_ref();
1656        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
1657        l_ref
1658    };
1659
1660    let guard = match decl {
1661        TriggerDecl::Subscriber { listener } | TriggerDecl::RawListener(listener) => {
1662            waitset.attach_notification(listener_ref(listener))
1663        }
1664        TriggerDecl::Interval(d) => waitset.attach_interval(*d),
1665        TriggerDecl::Deadline { listener, deadline } => {
1666            waitset.attach_deadline(listener_ref(listener), *deadline)
1667        }
1668    };
1669    guard.map_err(ExecutorError::iceoryx2)
1670}
1671
1672/// Per-iteration dispatch context handed to the `WaitSet` callback.
1673///
1674/// `dispatch_loop` rebuilds one of these every iteration and the `WaitSet`
1675/// callback is a thin adapter over [`DispatchPass::process_attachment`]. All
1676/// fields are short-lived borrows / raw pointers into the `Executor` that owns
1677/// the surrounding `dispatch_loop`; their soundness is documented at each use
1678/// site in `dispatch_loop` (same single-threaded, barrier-bounded discipline).
1679struct DispatchPass<'a, 'g, 'w> {
1680    /// `WaitSet` guards, indexed in parallel with `attachment_to_task`.
1681    guards: &'a [WaitSetGuard<'g, 'w, ipc::Service>],
1682    /// Maps guard index to task index in `tasks_ptr`.
1683    attachment_to_task: &'a [usize],
1684    /// Raw pointer to `Executor::tasks`.
1685    tasks_ptr: *mut Vec<TaskEntry>,
1686    /// Raw pointer to `Executor::cycle_stats` (index-aligned with `tasks`).
1687    cycle_stats_ptr: *mut Vec<TaskCycleStats>,
1688    /// Borrow of the executor's observer for the `on_cycle_stats` push.
1689    observer: &'a Arc<dyn Observer>,
1690    /// Raw pointer to `Executor::exec_fault` inner state.
1691    exec_fault_ptr: *const ExecutorFaultAtomic,
1692    /// Raw pointer to `Executor::start_time`.
1693    exec_start_ptr: *const OnceLock<Instant>,
1694    /// Borrow of the executor's telemetry clock, read for each cycle's `pre`.
1695    clock: &'a Arc<dyn MonotonicClock>,
1696    /// Raw pointer to the internal stop listener.
1697    stop_listener_ptr: *const IxListener<ipc::Service>,
1698    /// Borrow of the executor thread pool.
1699    pool: &'a Pool,
1700    /// Refcount-only handle to the per-iteration error slot.
1701    iter_err: &'a Arc<std::sync::Mutex<Option<ExecutorError>>>,
1702}
1703
1704impl DispatchPass<'_, '_, '_> {
1705    /// Dispatches a single task by index for one wakeup: takes the `&mut`
1706    /// borrow into the task table, applies the pre-dispatch fault gate, stashes
1707    /// this cycle's `pending_cycle` timestamp for the post-barrier telemetry
1708    /// fold, and submits the task's work to the pool.
1709    ///
1710    /// Shared by the `WaitSet` callback (`process_attachment`) and — per
1711    /// `REQ_0268` / `ADR_0100` — the forthcoming post-wait absolute-grid
1712    /// dispatch pass, so the per-task barrier/telemetry contract is identical
1713    /// across both call paths.
1714    /// Grid-mode cyclic dispatch: stashes the dispatcher's skipped-slot
1715    /// signal (`REQ_0840`) on the task, then runs the shared dispatch path —
1716    /// the post-barrier `record_cycle_for` `take`s it and folds
1717    /// `grid_slot += 1 + skipped` (`REQ_0106` / `ADR_0101`). Only this
1718    /// discrete count crosses the scheduler→telemetry boundary; timestamps
1719    /// stay on the telemetry clock (`REQ_0268`).
1720    #[allow(unsafe_code)]
1721    fn dispatch_cyclic(&mut self, task_idx: usize, skipped: u64, late_by: u64) {
1722        // SAFETY: same single-writer WaitSet-thread discipline as
1723        // dispatch_task; the pointer is valid for the duration of this call.
1724        let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1725        task.pending_skipped = u32::try_from(skipped).unwrap_or(u32::MAX);
1726        task.pending_late = Some(late_by);
1727        self.dispatch_task(task_idx);
1728    }
1729
1730    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1731    #[allow(unsafe_code)]
1732    fn dispatch_task(&mut self, task_idx: usize) {
1733        // SAFETY: we are the only thread that may touch the task table
1734        // during the callback. wait_and_process_once is single-threaded
1735        // and dispatch_loop holds &mut self. The pointer is valid for the
1736        // duration of this call.
1737        let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1738
1739        // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072). When it
1740        // routes to a (possible) handler, normal dispatch is skipped.
1741        if self.handle_fault_routing(task) {
1742            // REQ_0107: a faulted/fault-routed scan STILL advances
1743            // cycle_index and emits on_cycle_stats, or the executor's count
1744            // desyncs from the connector's join key (FEAT_0038). took/jitter
1745            // are None (poison-safe); the index always moves. Allocation-free:
1746            // a CyclePending { Instant, bool } written onto the TaskEntry,
1747            // no heap.
1748            if task.scan_period.is_some() {
1749                task.pending_cycle = Some(CyclePending {
1750                    pre: self.clock.now_nanos(),
1751                    faulted: true,
1752                });
1753            }
1754            return;
1755        }
1756
1757        // Stash the pre-dispatch instant so the post-barrier record pass
1758        // can fold this cycle's telemetry. Allocation-free: the timestamp
1759        // lives on the TaskEntry, not in a per-wakeup Vec. `take`n in the
1760        // post-barrier loop below — guarantees exactly-once even if two
1761        // guards map to the same task. `faulted: false`: a task that faulted
1762        // last wakeup and recovered this one records the normal path (the
1763        // whole CyclePending is overwritten, so the flag can't be stale).
1764        task.pending_cycle = Some(CyclePending {
1765            pre: self.clock.now_nanos(),
1766            faulted: false,
1767        });
1768
1769        self.submit_task_job(task);
1770    }
1771
1772    /// Handles a single `WaitSet` wakeup: drains stop notifications, then
1773    /// dispatches every task whose attachment fired. Always returns
1774    /// [`CallbackProgression::Continue`]; termination is decided by the
1775    /// `stop_flag` check in `dispatch_loop` after the callback returns.
1776    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1777    #[allow(unsafe_code)]
1778    fn process_attachment(
1779        &mut self,
1780        attachment_id: &WaitSetAttachmentId<ipc::Service>,
1781    ) -> CallbackProgression {
1782        // Drain stop notifications first (no dispatch — the stop_flag check
1783        // after the callback returns handles termination).
1784        // SAFETY: stop_listener_ptr is valid for the duration of the call;
1785        // the Arc in self.stop_listener keeps it alive.
1786        let stop_l = unsafe { &*self.stop_listener_ptr };
1787        while let Ok(Some(_)) = stop_l.try_wait_one() {}
1788
1789        for i in 0..self.guards.len() {
1790            let guard = &self.guards[i];
1791            let fired =
1792                attachment_id.has_event_from(guard) || attachment_id.has_missed_deadline(guard);
1793            if !fired {
1794                continue;
1795            }
1796            let task_idx = self.attachment_to_task[i];
1797            self.dispatch_task(task_idx);
1798        }
1799
1800        self.barrier_and_record();
1801
1802        CallbackProgression::Continue
1803    }
1804
1805    /// Barrier all submitted pool jobs for this dispatch phase, then fold each
1806    /// task's stashed `pending_cycle` into recorded cycle telemetry. Shared by
1807    /// the `WaitSet` callback (event/fd tasks) and the post-wait grid pass
1808    /// (cyclic tasks, `REQ_0268`). Keyed on `pending_cycle` so it records
1809    /// exactly the tasks dispatched this phase, exactly once.
1810    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1811    #[allow(unsafe_code)]
1812    fn barrier_and_record(&mut self) {
1813        // Wait for all submitted jobs to finish before leaving the callback
1814        // scope (validates item_ptr safety contract). The barrier also makes
1815        // every worker's `last_took_ns` Release-store visible to the record
1816        // pass below.
1817        self.pool.barrier();
1818
1819        // Post-barrier telemetry fold. The source of truth for "this task was
1820        // dispatched this wakeup and owes a record" is `pending_cycle`,
1821        // set in the dispatch loop above — not the guard fired-status. Keying
1822        // solely on the stash (rather than re-querying `has_event_from`)
1823        // removes any dependency on the fired-status query being stable across
1824        // a second scan, so a dispatched cycle can never be silently
1825        // under-recorded (which would lag `cycle_index` — the desync FEAT_0038
1826        // must avoid). `take` clears the stash, guaranteeing exactly-once.
1827        // Allocation-free: iterate task indices in place.
1828        // SAFETY: same single-writer WaitSet-thread discipline as the dispatch
1829        // loop above; barrier-bounded, no in-flight pool job aliases `tasks`.
1830        let task_count = unsafe { (*self.tasks_ptr).len() };
1831        for task_idx in 0..task_count {
1832            // SAFETY: single-writer WaitSet thread; borrow released before
1833            // the record_cycle_for call (which re-derefs tasks_ptr).
1834            let pending = unsafe { (&mut *self.tasks_ptr)[task_idx].pending_cycle.take() };
1835            if let Some(CyclePending { pre, faulted }) = pending {
1836                self.record_cycle_for(task_idx, faulted, pre);
1837            }
1838        }
1839    }
1840
1841    /// Fold one scan cycle's telemetry and push it to the observer. Called
1842    /// once per fired CYCLIC attachment per wakeup. `faulted = true` (Task 10)
1843    /// means the scan was skipped/errored: `took`/`jitter`/`lateness` are
1844    /// unmeasured. Event-driven tasks (no `scan_period`) are skipped entirely
1845    /// (`REQ_0106`).
1846    #[allow(unsafe_code)]
1847    fn record_cycle_for(&mut self, task_idx: usize, faulted: bool, pre_ns: u64) {
1848        // SAFETY: single-writer WaitSet thread; same discipline as tasks_ptr.
1849        let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1850        let Some(period) = task.scan_period else {
1851            return; // event-driven: no cycle telemetry
1852        };
1853        let period_ns = u64::try_from(period.as_nanos()).unwrap_or(u64::MAX);
1854
1855        // Release/Acquire pairing with the worker store (M2): `swap` acquires
1856        // the worker's Release-store and resets the sentinel atomically.
1857        let took_raw = task.last_took_ns.swap(u64::MAX, Ordering::AcqRel);
1858        let took = if faulted || took_raw == u64::MAX {
1859            None
1860        } else {
1861            Some(took_raw)
1862        };
1863
1864        // actual_period + jitter vs the previous dispatch (REQ_0101). Always
1865        // advance `last_dispatch` (even on a faulted attempt) so the next
1866        // cycle's period is measured from this wakeup. `actual_period` is
1867        // `None` on the very first cycle (no previous timestamp); jitter is
1868        // additionally suppressed on a faulted scan (poison-safe: REQ_0107).
1869        let actual_period = task
1870            .last_dispatch
1871            .replace(pre_ns)
1872            .map(|prev| pre_ns.saturating_sub(prev));
1873        let jitter = if faulted {
1874            None
1875        } else {
1876            actual_period.map(|ap| ap.abs_diff(period_ns))
1877        };
1878
1879        // Per-task lateness grid (REQ_0106 / ADR_0101): the task's first
1880        // record anchors the grid at slot 0 (a faulted first scan anchors
1881        // too, its dispatch instant is real); every later record advances by
1882        // exactly one slot plus the dispatcher's skipped-slot signal
1883        // (REQ_0840). Never reconstructed from the measured period, which
1884        // over-counts on coalesced catch-up wakes and fabricates negative
1885        // lateness (issue #46). The anchor is the first dispatch's NOMINAL
1886        // slot — `pre` back-dated by the dispatcher's `late_by` signal — so
1887        // a late process start is reported as real first-cycle lateness
1888        // instead of becoming a permanent negative floor on later on-grid
1889        // cycles. `late_by` is a scheduling-clock difference applied to a
1890        // telemetry-clock instant: domain-safe, both are monotonic ns.
1891        // Without a dispatcher signal (Legacy mode, event-driven tasks) the
1892        // anchor stays the first observed `pre`.
1893        let skipped = core::mem::take(&mut task.pending_skipped);
1894        let late_by = task.pending_late.take();
1895        let first = task.grid_epoch.is_none();
1896        let grid_epoch = *task
1897            .grid_epoch
1898            .get_or_insert_with(|| pre_ns.saturating_sub(late_by.unwrap_or(0)));
1899        if !first {
1900            task.grid_slot = task
1901                .grid_slot
1902                .saturating_add(1)
1903                .saturating_add(u64::from(skipped));
1904        }
1905        let grid_slot = task.grid_slot;
1906
1907        // SAFETY: cycle_stats is index-aligned with tasks; single-writer.
1908        let stats = unsafe { &mut (&mut *self.cycle_stats_ptr)[task_idx] };
1909
1910        // Deadline lateness (REQ_0106): signed offset of the actual start
1911        // (`pre_ns`) from its nominal grid point `grid_epoch + grid_slot *
1912        // period`. Positive => started late; negative => early. Captures
1913        // steady drift (jitter is blind to a constant offset; lateness is
1914        // not). A dispatcher skip re-anchors via `skipped` above; absent a
1915        // signal (e.g. `Legacy` mode, which never signals), a whole missed
1916        // period honestly remains visible as a persistent offset rather than
1917        // being absorbed.
1918        let lateness = if period_ns > 0 && !faulted {
1919            let elapsed_ns = i64::try_from(pre_ns.saturating_sub(grid_epoch)).unwrap_or(i64::MAX);
1920            let expected_ns =
1921                i64::try_from(u128::from(grid_slot) * u128::from(period_ns)).unwrap_or(i64::MAX);
1922            Some(elapsed_ns.saturating_sub(expected_ns))
1923        } else {
1924            None
1925        };
1926
1927        let cycle_index = stats.record_cycle(took, jitter, lateness);
1928
1929        let obs = CycleObservation {
1930            cycle_index,
1931            task_id: task.id.clone(),
1932            task_index: u32::try_from(task_idx).unwrap_or(u32::MAX),
1933            faulted,
1934            period_ns,
1935            pre_ns,
1936            actual_period_ns: actual_period,
1937            jitter_ns: jitter,
1938            lateness_ns: lateness,
1939            skipped_slots: skipped,
1940            took_ns: took,
1941        };
1942        self.observer.on_cycle_stats(&obs);
1943    }
1944
1945    /// Applies the pre-dispatch fault gate for `Single`/`Chain` tasks.
1946    ///
1947    /// Returns `true` when the task is routed to its fault handler (or
1948    /// silently skipped because no handler is registered) and normal dispatch
1949    /// must therefore be skipped. Returns `false` when normal dispatch should
1950    /// proceed. `Graph` tasks always return `false` — they use their own
1951    /// per-vertex scheduling and are out of scope for `FEAT_0018`.
1952    #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1953    fn handle_fault_routing(&self, task: &mut TaskEntry) -> bool {
1954        if !matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1955            return false;
1956        }
1957
1958        // SAFETY: exec_fault_ptr derefs into the Executor that owns the
1959        // surrounding dispatch_loop — alive for this call's lifetime.
1960        let exec_faulted = matches!(
1961            unsafe { &*self.exec_fault_ptr }.load(0, 0),
1962            ExecutorFaultState::Faulted { .. }
1963        );
1964        let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1965        let task_state = task.fault.load(task_budget_ms);
1966
1967        // Lazy cascade: if executor is `Faulted` and task is still `Running`,
1968        // silently transition the task to `Faulted{ExecutorFaulted}`. No
1969        // `on_task_fault` — the Observer already heard about the executor-wide
1970        // fault via `on_executor_fault` (cascade-noise invariant, FEAT_0018
1971        // §4.6).
1972        let task_faulted = if exec_faulted && matches!(task_state, FaultState::Running) {
1973            // SAFETY: exec_start_ptr derefs into the same Executor owning the
1974            // dispatch_loop. The OnceLock is wait-free.
1975            let exec_start = *unsafe { &*self.exec_start_ptr }.get_or_init(std::time::Instant::now);
1976            let since_ms = instant_to_since_ms(std::time::Instant::now(), exec_start);
1977            let _ = task.fault.swap(
1978                FaultState::Faulted {
1979                    reason: FaultReason::ExecutorFaulted,
1980                    since_ms,
1981                },
1982                task_budget_ms,
1983            );
1984            true
1985        } else {
1986            matches!(task_state, FaultState::Faulted { .. })
1987        };
1988
1989        if !(exec_faulted || task_faulted) {
1990            return false;
1991        }
1992
1993        // If a handler is registered, dispatch it. Otherwise, skip dispatch
1994        // entirely this wakeup.
1995        if let Some(handler_box) = task.handler_job.as_deref_mut() {
1996            let job_ptr: *mut (dyn FnMut() + Send) = handler_box as *mut (dyn FnMut() + Send);
1997            // SAFETY: same as the main-job dispatch below — handler_job is
1998            // owned by the TaskEntry; pool.barrier() awaits its completion
1999            // before the next callback.
2000            unsafe {
2001                self.pool
2002                    .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
2003            }
2004        }
2005        true
2006    }
2007
2008    /// Dispatches `task`'s normal (non-fault) work for one wakeup.
2009    ///
2010    /// `Single`/`Chain` tasks submit their pre-built job to the pool;
2011    /// `Graph` tasks drive one pass and capture the first item error into the
2012    /// per-iteration error slot.
2013    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2014    #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
2015    fn submit_task_job(&self, task: &mut TaskEntry) {
2016        match &mut task.kind {
2017            TaskKind::Single(_) | TaskKind::Chain(_) => {
2018                // The dispatch closure was pre-allocated at task-add time and
2019                // stashed on `task.job`. Submit it via `submit_borrowed` — no
2020                // per-iteration Box allocation. Required by REQ_0060.
2021                #[allow(clippy::expect_used)]
2022                // fail-fast: Single/Chain task.job is always Some — set at add time in build_single_job/build_chain_job and never cleared
2023                let job_box = task
2024                    .job
2025                    .as_deref_mut()
2026                    .expect("Single/Chain tasks carry a pre-built job");
2027                let job_ptr: *mut (dyn FnMut() + Send) = job_box as *mut (dyn FnMut() + Send);
2028                // SAFETY: the closure lives in `task.job`, owned by
2029                // `self.tasks[task_idx]`; `tasks_ptr` is sound for the
2030                // duration of this callback. `pool.barrier()` in
2031                // `process_attachment` finishes the closure invocation before
2032                // the next iteration's callback. The WaitSet thread does not
2033                // touch the closure between this submit and that barrier.
2034                unsafe {
2035                    self.pool
2036                        .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
2037                }
2038            }
2039            TaskKind::Graph(graph) => {
2040                // Outer driver runs on the WaitSet thread; vertices run on the
2041                // pool. The graph holds its own pre-built per-vertex closures
2042                // and SPSC ready ring (REQ_0060), so dispatch is
2043                // allocation-free in steady state.
2044                let outcome = graph.run_once_borrowed(self.pool);
2045                if let Some(source) = outcome.error {
2046                    #[allow(clippy::unwrap_used)]
2047                    // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
2048                    let mut g = self.iter_err.lock().unwrap();
2049                    if g.is_none() {
2050                        *g = Some(ExecutorError::Item {
2051                            task_id: task.id.clone(),
2052                            source,
2053                        });
2054                    }
2055                }
2056                let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
2057            }
2058        }
2059    }
2060}
2061
2062/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
2063/// `Pool::submit`. The send is safe because:
2064///   1. The executor guarantees at most one invocation of a given item at a
2065///      time (via `pool.barrier()` before the pointer is reused).
2066///   2. `ExecutableItem: Send`, so moving the pointee across threads is sound
2067///      when no aliasing exists.
2068#[allow(unsafe_code)]
2069struct SendItemPtr {
2070    ptr: *mut dyn ExecutableItem,
2071}
2072
2073impl SendItemPtr {
2074    fn new(ptr: *mut dyn ExecutableItem) -> Self {
2075        Self { ptr }
2076    }
2077
2078    /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
2079    /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
2080    /// dispatch closure to be reusable across iterations without allocation).
2081    fn get(&self) -> *mut dyn ExecutableItem {
2082        self.ptr
2083    }
2084}
2085
2086// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
2087// closure can borrow `&SendItemPtr` per invocation without making the
2088// closure itself `!Send`.
2089#[allow(unsafe_code)]
2090unsafe impl Send for SendItemPtr {}
2091#[allow(unsafe_code)]
2092unsafe impl Sync for SendItemPtr {}
2093
2094/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
2095/// closure can iterate the chain's items in place without first
2096/// collecting them into a freshly-allocated `Vec`. The send is safe
2097/// for the same reason as [`SendItemPtr`] (see above): the executor
2098/// holds `&mut self` for the duration of `dispatch_loop`, and the
2099/// `pool.barrier()` at the end of each callback ensures the closure
2100/// has finished using this pointer before the Vec could be touched
2101/// from the `WaitSet` thread again. The Vec is never resized after
2102/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
2103/// allocate per iteration.
2104#[allow(unsafe_code)]
2105struct SendChainPtr {
2106    ptr: *mut Vec<Box<dyn ExecutableItem>>,
2107}
2108
2109impl SendChainPtr {
2110    fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
2111        Self { ptr }
2112    }
2113
2114    fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
2115        self.ptr
2116    }
2117}
2118
2119// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
2120// borrow `&SendChainPtr` per invocation while staying `Send`.
2121#[allow(unsafe_code)]
2122unsafe impl Send for SendChainPtr {}
2123#[allow(unsafe_code)]
2124unsafe impl Sync for SendChainPtr {}
2125
2126/// Captured state needed by a dispatch closure to perform post-execute
2127/// fault detection. All fields are `Arc`-shared with the owning
2128/// `Executor` and `TaskEntry` so the closure can read/write them
2129/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
2130/// `REQ_0102`.
2131struct FaultDispatchCtx {
2132    /// Per-task budget. `None` for chain / graph tasks (no per-task
2133    /// check) — the executor-wide iteration budget still applies.
2134    task_budget: Option<Duration>,
2135    /// Per-task fault state (shared with `TaskEntry::fault`).
2136    task_fault: Arc<FaultAtomic>,
2137    /// Per-task monotonic overrun counter (shared with
2138    /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
2139    overrun_count: Arc<AtomicU64>,
2140    /// Executor-wide iteration budget. `None` means no executor-wide
2141    /// check.
2142    iteration_budget: Option<Duration>,
2143    /// Executor-wide fault state (shared with `Executor::exec_fault`).
2144    exec_fault: Arc<ExecutorFaultAtomic>,
2145    /// Executor-wide offending-task index storage (shared with
2146    /// `Executor::exec_fault_task_idx`).
2147    exec_fault_task_idx: Arc<AtomicU32>,
2148    /// Executor-wide breached-budget storage (shared with
2149    /// `Executor::exec_fault_budget_ms`).
2150    exec_fault_budget_ms: Arc<AtomicU32>,
2151    /// Index of this task in the executor's task table.
2152    task_idx_u32: u32,
2153    /// Executor start time (shared with `Executor::start_time`).
2154    exec_start: Arc<OnceLock<Instant>>,
2155    /// Observer for `on_task_fault` / `on_executor_fault` notifications.
2156    observer: Arc<dyn Observer>,
2157}
2158
2159/// Validate a task's collected trigger declarations before it joins the task
2160/// table (`REQ_0268`). Applied at every add path — single, chain head, and
2161/// fault-handler main — at the point the `TriggerDecl`s are first available,
2162/// regardless of [`DispatchMode`] (the rejected shapes are ill-defined in any
2163/// mode; Legacy is temporary).
2164///
2165/// Rejects two shapes:
2166///
2167/// 1. **Cyclic AND event-driven** — a task carrying both an `Interval` decl and
2168///    any listener-backed decl (`Subscriber` / `Deadline` / `RawListener`). Per
2169///    `REQ_0106` a task is cyclic XOR event-driven: cyclic tasks have a
2170///    period/lateness, event-driven tasks do not. Allowing both would dispatch
2171///    and record the task twice in one wake (phase-a event + phase-b grid),
2172///    desyncing the `FEAT_0038` `cycle_index` join key (`REQ_0107`).
2173/// 2. **Zero-period interval** — an `Interval(Duration::ZERO)` busy-spins the
2174///    grid (`GridTimer::next_timeout` returns `0` every wake and `take_due`
2175///    re-fires without advancing). A zero scan period is nonsensical.
2176fn validate_decls(id: &TaskId, decls: &[crate::trigger::TriggerDecl]) -> Result<(), ExecutorError> {
2177    use crate::trigger::TriggerDecl;
2178
2179    let has_interval = decls.iter().any(|d| matches!(d, TriggerDecl::Interval(_)));
2180    let has_listener = decls.iter().any(|d| {
2181        matches!(
2182            d,
2183            TriggerDecl::Subscriber { .. }
2184                | TriggerDecl::Deadline { .. }
2185                | TriggerDecl::RawListener(_)
2186        )
2187    });
2188
2189    if has_interval && has_listener {
2190        return Err(ExecutorError::DeclareTriggers(format!(
2191            "task `{id}` declares both an interval (cyclic) and a listener \
2192             (event-driven) trigger; a task may be cyclic (interval) or \
2193             event-driven (listener) but not both — split it into two tasks"
2194        )));
2195    }
2196
2197    if decls
2198        .iter()
2199        .any(|d| matches!(d, TriggerDecl::Interval(dur) if dur.is_zero()))
2200    {
2201        return Err(ExecutorError::DeclareTriggers(format!(
2202            "task `{id}` declares a zero-duration interval; a cyclic scan \
2203             period must be strictly positive"
2204        )));
2205    }
2206
2207    Ok(())
2208}
2209
2210/// Extract the declared scan period (first `Interval` trigger) from a task's
2211/// trigger declarations, or `None` for event-driven tasks.
2212fn scan_period_from_decls(decls: &[crate::trigger::TriggerDecl]) -> Option<Duration> {
2213    decls.iter().find_map(|d| match d {
2214        crate::trigger::TriggerDecl::Interval(dur) => Some(*dur),
2215        _ => None,
2216    })
2217}
2218
2219/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
2220///
2221/// The returned closure is stored on `TaskEntry::job` and invoked once
2222/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
2223/// performs no allocation. The closure captures Arc clones of the
2224/// executor's shared state — those clones are refcount-only at build
2225/// time and are reused on every dispatch. Required for `REQ_0060`.
2226#[allow(clippy::too_many_arguments)]
2227fn build_single_job(
2228    id: TaskId,
2229    stop: Stoppable,
2230    obs: Arc<dyn Observer>,
2231    mon: Arc<dyn ExecutionMonitor>,
2232    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2233    app_id: Option<u32>,
2234    app_inst: Option<u32>,
2235    item_ptr: SendItemPtr,
2236    fault_ctx: FaultDispatchCtx,
2237    last_took_ns: Arc<AtomicU64>,
2238    clock: Arc<dyn MonotonicClock>,
2239) -> Box<dyn FnMut() + Send + 'static> {
2240    Box::new(move || {
2241        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2242        if let Some(aid) = app_id {
2243            obs.on_app_start(id.clone(), aid, app_inst);
2244        }
2245        let raw = item_ptr.get();
2246        let started = std::time::Instant::now();
2247        // Telemetry `took` is measured on the injected clock (REQ_0105) so a
2248        // MockClock can make it exact; the real `started`/`took` below stay on
2249        // the system clock for the monitor and fault-budget paths.
2250        let tele_t0 = clock.now_nanos();
2251        mon.pre_execute(id.clone(), started);
2252        // SAFETY: barrier() pairs with this invocation; the WaitSet
2253        // thread does not touch the item between `submit_borrowed` and
2254        // the matching `barrier()`. See SendItemPtr safety doc.
2255        #[allow(unsafe_code)]
2256        let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2257        let took = started.elapsed();
2258        // Release pairs with the WaitSet-thread Acquire (swap) in
2259        // `record_cycle_for` (M2). `pool.barrier()` also fences, but the
2260        // explicit pairing documents intent and is robust on weak-memory archs.
2261        last_took_ns.store(clock.now_nanos().saturating_sub(tele_t0), Ordering::Release);
2262        mon.post_execute(id.clone(), started, took, res.is_ok());
2263        if let Err(ref e) = res {
2264            obs.on_app_error(id.clone(), e.as_ref());
2265        }
2266        if app_id.is_some() {
2267            obs.on_app_stop(id.clone());
2268        }
2269        post_execute_detect_fault(&id, started, took, &fault_ctx);
2270        record_first_err(&err_slot, &id, res);
2271    })
2272}
2273
2274/// Build the per-iteration dispatch closure for a fault-handler item.
2275///
2276/// Mirrors [`build_single_job`] in every detail (same monitor /
2277/// observer / first-error capture wiring) but owns the
2278/// `Box<dyn ExecutableItem>` directly inside the closure instead of
2279/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
2280/// owner inside [`TaskEntry`] — the handler closure stored in
2281/// `handler_job` is the sole owner — so the simpler owning form is
2282/// both sound and avoids the aliasing dance the main item needs.
2283/// (Unlike [`build_single_job`], this closure does NOT update
2284/// `last_took_ns` — the handler runs in place of the main item, so the
2285/// main item's `last_took_ns` keeps its sentinel `u64::MAX` = "no
2286/// sample this cycle".)
2287/// `REQ_0072`.
2288#[allow(clippy::too_many_arguments)]
2289fn build_handler_job(
2290    id: TaskId,
2291    stop: Stoppable,
2292    obs: Arc<dyn Observer>,
2293    mon: Arc<dyn ExecutionMonitor>,
2294    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2295    app_id: Option<u32>,
2296    app_inst: Option<u32>,
2297    mut handler: Box<dyn ExecutableItem>,
2298    fault_ctx: FaultDispatchCtx,
2299) -> Box<dyn FnMut() + Send + 'static> {
2300    Box::new(move || {
2301        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2302        if let Some(aid) = app_id {
2303            obs.on_app_start(id.clone(), aid, app_inst);
2304        }
2305        let started = std::time::Instant::now();
2306        mon.pre_execute(id.clone(), started);
2307        let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
2308        let took = started.elapsed();
2309        mon.post_execute(id.clone(), started, took, res.is_ok());
2310        if let Err(ref e) = res {
2311            obs.on_app_error(id.clone(), e.as_ref());
2312        }
2313        if app_id.is_some() {
2314            obs.on_app_stop(id.clone());
2315        }
2316        // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
2317        // budget keeps the task in `Faulted` (state already `Faulted`),
2318        // `overrun_count` increments, NO new `on_task_fault` fires —
2319        // the `matches!(prev, FaultState::Running)` gate inside
2320        // `post_execute_detect_fault` enforces that.
2321        post_execute_detect_fault(&id, started, took, &fault_ctx);
2322        record_first_err(&err_slot, &id, res);
2323    })
2324}
2325
2326/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
2327#[allow(clippy::too_many_arguments)]
2328fn build_chain_job(
2329    id: TaskId,
2330    stop: Stoppable,
2331    obs: Arc<dyn Observer>,
2332    mon: Arc<dyn ExecutionMonitor>,
2333    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2334    chain_ptr: SendChainPtr,
2335    fault_ctx: FaultDispatchCtx,
2336    last_took_ns: Arc<AtomicU64>,
2337    clock: Arc<dyn MonotonicClock>,
2338) -> Box<dyn FnMut() + Send + 'static> {
2339    Box::new(move || {
2340        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2341        // Overall chain scan timer — the chain's `took` is the elapsed
2342        // telemetry-clock time from the first item's pre-execute to the last
2343        // item's completion (or early break), mirroring the single-item `took`
2344        // notion (REQ_0105). Per-item monitor timing uses each item's own
2345        // real-clock `started` below.
2346        let chain_tele_t0 = clock.now_nanos();
2347        // SAFETY: barrier() pairs with this invocation; the chain Vec
2348        // and the items it owns are not touched by the WaitSet thread
2349        // until barrier() returns. See SendChainPtr safety doc.
2350        #[allow(unsafe_code)]
2351        let chain_items = unsafe { &mut *chain_ptr.get() };
2352        for item_box in chain_items.iter_mut() {
2353            let app_id = item_box.app_id();
2354            let app_inst = item_box.app_instance_id();
2355            if let Some(aid) = app_id {
2356                obs.on_app_start(id.clone(), aid, app_inst);
2357            }
2358            let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
2359            let started = std::time::Instant::now();
2360            mon.pre_execute(id.clone(), started);
2361            #[allow(unsafe_code)]
2362            let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2363            let took = started.elapsed();
2364            mon.post_execute(id.clone(), started, took, res.is_ok());
2365            if let Err(ref e) = res {
2366                obs.on_app_error(id.clone(), e.as_ref());
2367            }
2368            if app_id.is_some() {
2369                obs.on_app_stop(id.clone());
2370            }
2371            // Per-item post-execute fault detection. `task_budget` is
2372            // `None` for chains (see `add_chain_with_id_boxed`), so the
2373            // per-task check no-ops; the executor-wide iteration-budget
2374            // check still fires per item. `REQ_0071`.
2375            post_execute_detect_fault(&id, started, took, &fault_ctx);
2376            match res {
2377                Ok(crate::ControlFlow::Continue) => {}
2378                Ok(crate::ControlFlow::StopChain) => break,
2379                Err(_) => {
2380                    record_first_err(&err_slot, &id, res);
2381                    break;
2382                }
2383            }
2384        }
2385        // Release pairs with the WaitSet-thread Acquire (swap) in
2386        // `record_cycle_for` (M2). See the Single-job store for the rationale.
2387        last_took_ns.store(
2388            clock.now_nanos().saturating_sub(chain_tele_t0),
2389            Ordering::Release,
2390        );
2391    })
2392}
2393
2394#[derive(Debug)]
2395struct PanickedTask(String);
2396
2397impl core::fmt::Display for PanickedTask {
2398    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2399        write!(f, "task panicked: {}", self.0)
2400    }
2401}
2402
2403impl std::error::Error for PanickedTask {}
2404
2405/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
2406fn run_item_catch_unwind(
2407    item: &mut dyn ExecutableItem,
2408    ctx: &mut crate::context::Context<'_>,
2409) -> crate::ExecuteResult {
2410    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
2411        |payload| {
2412            let msg =
2413                panic_payload_message(&*payload).unwrap_or_else(|| "panicked task".to_string());
2414            Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
2415        },
2416    )
2417}
2418
2419/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
2420/// without depending on its private name.
2421pub(crate) fn run_item_catch_unwind_external(
2422    item: &mut dyn ExecutableItem,
2423    ctx: &mut crate::context::Context<'_>,
2424) -> crate::ExecuteResult {
2425    run_item_catch_unwind(item, ctx)
2426}
2427
2428/// Record the first error into `slot`. Subsequent errors are silently dropped.
2429fn record_first_err(
2430    slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
2431    id: &TaskId,
2432    res: crate::ExecuteResult,
2433) {
2434    if let Err(source) = res {
2435        let mut g = slot.lock().unwrap();
2436        if g.is_none() {
2437            *g = Some(ExecutorError::Item {
2438                task_id: id.clone(),
2439                source,
2440            });
2441        }
2442    }
2443}
2444
2445/// Post-execute fault detection — runs on a pool worker AFTER
2446/// `mon.post_execute` so the full `took` is available. Implements:
2447///
2448///   * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
2449///     `overrun_count` on every breach, transitions
2450///     `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
2451///     breaches keep the state `Faulted` and do NOT re-fire the
2452///     observer).
2453///   * `REQ_0071` — executor-wide iteration overrun: transitions
2454///     `Running -> Faulted{IterationBudgetExceeded}` exactly once;
2455///     cascade to per-task state is LAZY (see the pre-dispatch block
2456///     in `dispatch_loop`), so the per-task `on_task_fault` does NOT
2457///     fire during cascade — only `on_executor_fault` does.
2458fn post_execute_detect_fault(
2459    id: &TaskId,
2460    started: Instant,
2461    took: Duration,
2462    fault_ctx: &FaultDispatchCtx,
2463) {
2464    // REQ_0070 / REQ_0102 — per-task budget overrun.
2465    if let Some(budget) = fault_ctx.task_budget {
2466        if took > budget {
2467            fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
2468            let took_ms = duration_to_ms_sat(took);
2469            let budget_ms = duration_to_ms_sat(budget);
2470            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2471            let since_ms = instant_to_since_ms(started, exec_start);
2472            let new_state = FaultState::Faulted {
2473                reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
2474                since_ms,
2475            };
2476            let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
2477            if matches!(prev, FaultState::Running) {
2478                fault_ctx.observer.on_task_fault(
2479                    id.clone(),
2480                    FaultReason::BudgetExceeded { took_ms, budget_ms },
2481                );
2482            }
2483        }
2484    }
2485
2486    // REQ_0071 — executor-wide iteration overrun.
2487    if let Some(iter_budget) = fault_ctx.iteration_budget {
2488        if took > iter_budget {
2489            let took_ms = duration_to_ms_sat(took);
2490            let budget_ms = duration_to_ms_sat(iter_budget);
2491            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2492            let since_ms = instant_to_since_ms(started, exec_start);
2493            fault_ctx
2494                .exec_fault_task_idx
2495                .store(fault_ctx.task_idx_u32, Ordering::Release);
2496            fault_ctx
2497                .exec_fault_budget_ms
2498                .store(budget_ms, Ordering::Release);
2499            let new_state = ExecutorFaultState::Faulted {
2500                reason: ExecutorFaultReason::IterationBudgetExceeded {
2501                    task_idx: fault_ctx.task_idx_u32,
2502                    took_ms,
2503                    budget_ms,
2504                },
2505                since_ms,
2506            };
2507            let prev = fault_ctx
2508                .exec_fault
2509                .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
2510            if matches!(prev, ExecutorFaultState::Running) {
2511                fault_ctx.observer.on_executor_fault(
2512                    ExecutorFaultReason::IterationBudgetExceeded {
2513                        task_idx: fault_ctx.task_idx_u32,
2514                        took_ms,
2515                        budget_ms,
2516                    },
2517                );
2518                // NO eager cascade here. Cascade is lazy: the
2519                // pre-dispatch block in `dispatch_loop` transitions
2520                // each `Running` task to `Faulted{ExecutorFaulted}` on
2521                // the next wakeup — silently, so per-task observers
2522                // do not fire (see §4.6 invariant on cascade-noise).
2523            }
2524        }
2525    }
2526}
2527
2528// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
2529
2530/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
2531/// into a registered task.
2532pub struct ExecutorGraphBuilder<'e> {
2533    executor: &'e mut Executor,
2534    builder: crate::graph::GraphBuilder,
2535    custom_id: Option<TaskId>,
2536}
2537
2538impl ExecutorGraphBuilder<'_> {
2539    /// Add a vertex to the graph; returns its handle.
2540    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
2541        self.builder.vertex(item)
2542    }
2543
2544    /// Add a directed edge from one vertex to another.
2545    pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
2546        self.builder.edge(from, to);
2547        self
2548    }
2549
2550    /// Designate the root vertex (its triggers gate the graph).
2551    pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
2552        self.builder.root(v);
2553        self
2554    }
2555
2556    /// Override the auto-generated id with a custom one.
2557    pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
2558        self.custom_id = Some(id.into());
2559        self
2560    }
2561
2562    /// Validate and register the graph. Returns the task id.
2563    ///
2564    /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
2565    /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
2566    /// precedence over the auto-generated id.
2567    pub fn build(self) -> Result<TaskId, ExecutorError> {
2568        let g = self.builder.finish()?;
2569        // Root vertex's task_id() override wins over the custom id, which wins
2570        // over the auto-generated fallback.
2571        let auto_id = || {
2572            TaskId::new(format!(
2573                "graph-{}",
2574                self.executor.next_id.fetch_add(1, Ordering::SeqCst)
2575            ))
2576        };
2577        let id = g
2578            .root_task_id()
2579            .map(TaskId::new)
2580            .or(self.custom_id)
2581            .unwrap_or_else(auto_id);
2582        let decls = g.decls.clone();
2583        // The graph root's decls become a grid-registered TaskEntry, so the same
2584        // cyclic-XOR-event-driven / non-zero-period validation that guards the
2585        // single-item, fault-handler, and chain add paths must guard this one too
2586        // (REQ_0268). Non-root vertex triggers never reach a TaskEntry — they are
2587        // discarded in `GraphBuilder::collect_root_decls` — so validating the root
2588        // decls is sufficient.
2589        validate_decls(&id, &decls)?;
2590        let scan_period = scan_period_from_decls(&decls);
2591
2592        // Box the graph for address stability — per-vertex dispatch
2593        // closures capture `*const Graph` and must not see it move.
2594        let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
2595        // Pre-build the per-vertex closures now that we know the
2596        // task_id and have access to the executor's shared state.
2597        graph_box.prepare_dispatch(
2598            id.clone(),
2599            self.executor.stoppable.clone(),
2600            Arc::clone(&self.executor.observer),
2601            Arc::clone(&self.executor.monitor),
2602            Arc::clone(&self.executor.iter_err),
2603        );
2604
2605        self.executor.tasks.push(TaskEntry {
2606            id: id.clone(),
2607            kind: TaskKind::Graph(graph_box),
2608            decls,
2609            // Graph tasks dispatch their vertices via `vertex_jobs`
2610            // stored inside the `Graph`; the per-task `job` slot
2611            // is unused for graphs.
2612            job: None,
2613            // TODO(post-Task-10): graph budgets carried separately; for now None.
2614            budget: None,
2615            fault: Arc::new(FaultAtomic::new()),
2616            overrun_count: Arc::new(AtomicU64::new(0)),
2617            handler_job: None,
2618            scan_period,
2619            // Graphs dispatch vertices via their own path and do not ferry a
2620            // per-task `took`; sentinel = "no sample". Wired for struct
2621            // completeness; nothing reads it yet (Task 6).
2622            last_took_ns: Arc::new(AtomicU64::new(u64::MAX)),
2623            last_dispatch: None,
2624            grid_slot: 0,
2625            grid_epoch: None,
2626            pending_skipped: 0,
2627            pending_late: None,
2628            pending_cycle: None,
2629        });
2630        self.executor
2631            .cycle_stats
2632            .push(TaskCycleStats::new(self.executor.stats_window));
2633        Ok(id)
2634    }
2635}
2636
2637// ── Unit tests ────────────────────────────────────────────────────────────────
2638
2639#[cfg(test)]
2640mod tests {
2641    use super::*;
2642    use crate::{ControlFlow, item};
2643    use iceoryx2::prelude::ZeroCopySend;
2644
2645    /// Minimal zero-copy payload for tests that need a real subscriber to
2646    /// produce a listener-backed trigger decl.
2647    #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
2648    #[repr(C)]
2649    struct Msg(u32);
2650
2651    #[test]
2652    fn add_returns_unique_ids() {
2653        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2654        let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2655        let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2656        assert_ne!(a, b);
2657    }
2658
2659    #[test]
2660    fn grid_mode_dispatches_cyclic_task_each_cycle() {
2661        use std::sync::Arc;
2662        use std::sync::atomic::{AtomicU64, Ordering};
2663        let hits = Arc::new(AtomicU64::new(0));
2664        let h = Arc::clone(&hits);
2665        let mut exec = Executor::builder()
2666            .worker_threads(0)
2667            .dispatch_mode(crate::DispatchMode::Grid)
2668            .build()
2669            .expect("build");
2670        exec.add(crate::item::item_with_triggers(
2671            move |d| {
2672                d.interval(std::time::Duration::from_millis(1));
2673                Ok(())
2674            },
2675            move |_ctx| {
2676                h.fetch_add(1, Ordering::Relaxed);
2677                Ok(ControlFlow::Continue)
2678            },
2679        ))
2680        .expect("add");
2681        exec.run_n(10).expect("run");
2682        assert!(
2683            hits.load(Ordering::Relaxed) >= 8,
2684            "grid mode under-dispatched: {}",
2685            hits.load(Ordering::Relaxed)
2686        );
2687    }
2688
2689    #[test]
2690    fn legacy_mode_dispatches_cyclic_task_each_cycle() {
2691        use std::sync::Arc;
2692        use std::sync::atomic::{AtomicU64, Ordering};
2693        let hits = Arc::new(AtomicU64::new(0));
2694        let h = Arc::clone(&hits);
2695        let mut exec = Executor::builder()
2696            .worker_threads(0)
2697            .dispatch_mode(crate::DispatchMode::Legacy)
2698            .build()
2699            .expect("build");
2700        exec.add(crate::item::item_with_triggers(
2701            move |d| {
2702                d.interval(std::time::Duration::from_millis(1));
2703                Ok(())
2704            },
2705            move |_ctx| {
2706                h.fetch_add(1, Ordering::Relaxed);
2707                Ok(ControlFlow::Continue)
2708            },
2709        ))
2710        .expect("add");
2711        exec.run_n(10).expect("run");
2712        assert!(
2713            hits.load(Ordering::Relaxed) >= 8,
2714            "legacy mode under-dispatched: {}",
2715            hits.load(Ordering::Relaxed)
2716        );
2717    }
2718
2719    // --- REQ_0268 trigger-combination validation (Fix 1 / Fix 3) ---
2720
2721    #[test]
2722    fn add_rejects_cyclic_plus_subscriber_combination() {
2723        use core::time::Duration;
2724        // A task declaring BOTH an Interval and a listener-backed trigger is
2725        // ill-defined (cyclic XOR event-driven, REQ_0106) and must be rejected
2726        // at add time. We use a real subscriber so the listener decl is genuine.
2727        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2728        let ch = exec.channel::<Msg>("taktora.test.req0268.combo").unwrap();
2729        let sub = ch.subscriber().unwrap();
2730        let err = exec
2731            .add(crate::item::item_with_triggers(
2732                move |d| {
2733                    d.interval(Duration::from_millis(1));
2734                    d.subscriber(&sub);
2735                    Ok(())
2736                },
2737                |_| Ok(crate::ControlFlow::Continue),
2738            ))
2739            .expect_err("interval + subscriber must be rejected");
2740        match err {
2741            ExecutorError::DeclareTriggers(msg) => {
2742                assert!(
2743                    msg.contains("cyclic") && msg.contains("event-driven"),
2744                    "message must explain cyclic vs event-driven: {msg}"
2745                );
2746                assert!(
2747                    msg.contains("split"),
2748                    "message must suggest splitting into two tasks: {msg}"
2749                );
2750            }
2751            other => panic!("expected DeclareTriggers, got {other:?}"),
2752        }
2753    }
2754
2755    #[test]
2756    fn add_rejects_cyclic_plus_listener_regardless_of_mode() {
2757        use core::time::Duration;
2758        // The combination is ill-defined irrespective of DispatchMode (Legacy
2759        // is temporary), so Legacy must reject it too.
2760        let mut exec = Executor::builder()
2761            .worker_threads(0)
2762            .dispatch_mode(crate::DispatchMode::Legacy)
2763            .build()
2764            .unwrap();
2765        let ch = exec
2766            .channel::<Msg>("taktora.test.req0268.combo.legacy")
2767            .unwrap();
2768        let sub = ch.subscriber().unwrap();
2769        let err = exec
2770            .add(crate::item::item_with_triggers(
2771                move |d| {
2772                    d.interval(Duration::from_millis(1));
2773                    d.subscriber(&sub);
2774                    Ok(())
2775                },
2776                |_| Ok(crate::ControlFlow::Continue),
2777            ))
2778            .expect_err("interval + subscriber must be rejected in Legacy too");
2779        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2780    }
2781
2782    #[test]
2783    fn add_accepts_multiple_intervals_and_single_kinds() {
2784        use core::time::Duration;
2785        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2786        // Multiple Interval decls: still cyclic-only, accepted.
2787        exec.add(crate::item::item_with_triggers(
2788            |d| {
2789                d.interval(Duration::from_millis(1));
2790                d.interval(Duration::from_millis(2));
2791                Ok(())
2792            },
2793            |_| Ok(crate::ControlFlow::Continue),
2794        ))
2795        .expect("multiple intervals accepted");
2796        // Single interval: accepted.
2797        exec.add(crate::item::item_with_triggers(
2798            |d| {
2799                d.interval(Duration::from_millis(1));
2800                Ok(())
2801            },
2802            |_| Ok(crate::ControlFlow::Continue),
2803        ))
2804        .expect("single interval accepted");
2805        // Multiple listeners (no interval): accepted.
2806        let ch = exec
2807            .channel::<Msg>("taktora.test.req0268.multi.listener")
2808            .unwrap();
2809        let sub_a = ch.subscriber().unwrap();
2810        let sub_b = ch.subscriber().unwrap();
2811        exec.add(crate::item::item_with_triggers(
2812            move |d| {
2813                d.subscriber(&sub_a);
2814                d.subscriber(&sub_b);
2815                Ok(())
2816            },
2817            |_| Ok(crate::ControlFlow::Continue),
2818        ))
2819        .expect("multiple listeners accepted");
2820    }
2821
2822    #[test]
2823    fn add_rejects_zero_period_interval() {
2824        use core::time::Duration;
2825        // A zero-period interval busy-spins the grid (next_timeout == 0 every
2826        // wake), so it must be rejected at add time.
2827        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2828        let err = exec
2829            .add(crate::item::item_with_triggers(
2830                |d| {
2831                    d.interval(Duration::ZERO);
2832                    Ok(())
2833                },
2834                |_| Ok(crate::ControlFlow::Continue),
2835            ))
2836            .expect_err("zero-period interval must be rejected");
2837        match err {
2838            ExecutorError::DeclareTriggers(msg) => {
2839                assert!(
2840                    msg.contains("zero"),
2841                    "message must mention the zero period: {msg}"
2842                );
2843            }
2844            other => panic!("expected DeclareTriggers, got {other:?}"),
2845        }
2846    }
2847
2848    #[test]
2849    fn add_chain_rejects_cyclic_plus_listener() {
2850        use core::time::Duration;
2851        // The chain path collects the head item's decls; the same validation
2852        // must apply there.
2853        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2854        let ch = exec
2855            .channel::<Msg>("taktora.test.req0268.chain.combo")
2856            .unwrap();
2857        let sub = ch.subscriber().unwrap();
2858        let err = exec
2859            .add_chain(vec![crate::item::item_with_triggers(
2860                move |d| {
2861                    d.interval(Duration::from_millis(1));
2862                    d.subscriber(&sub);
2863                    Ok(())
2864                },
2865                |_| Ok(crate::ControlFlow::Continue),
2866            )])
2867            .expect_err("chain head interval + subscriber must be rejected");
2868        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2869    }
2870
2871    #[test]
2872    fn add_chain_rejects_zero_period_interval() {
2873        use core::time::Duration;
2874        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2875        let err = exec
2876            .add_chain(vec![crate::item::item_with_triggers(
2877                |d| {
2878                    d.interval(Duration::ZERO);
2879                    Ok(())
2880                },
2881                |_| Ok(crate::ControlFlow::Continue),
2882            )])
2883            .expect_err("chain head zero-period interval must be rejected");
2884        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2885    }
2886
2887    #[test]
2888    fn add_graph_rejects_cyclic_plus_listener() {
2889        use core::time::Duration;
2890        // The graph path collects the root vertex's decls into a grid-registered
2891        // TaskEntry; the same cyclic-XOR-event-driven validation must apply there
2892        // (REQ_0268). We use a real subscriber so the listener decl is genuine.
2893        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2894        let ch = exec
2895            .channel::<Msg>("taktora.test.req0268.graph.combo")
2896            .unwrap();
2897        let sub = ch.subscriber().unwrap();
2898        let mut g = exec.add_graph();
2899        let r = g.vertex(crate::item::item_with_triggers(
2900            move |d| {
2901                d.interval(Duration::from_millis(1));
2902                d.subscriber(&sub);
2903                Ok(())
2904            },
2905            |_| Ok(crate::ControlFlow::Continue),
2906        ));
2907        g.root(r);
2908        let err = g
2909            .build()
2910            .expect_err("graph root interval + subscriber must be rejected");
2911        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2912    }
2913
2914    #[test]
2915    fn add_graph_rejects_zero_period_interval() {
2916        use core::time::Duration;
2917        // A zero-period interval on the graph root busy-spins the grid, so the
2918        // graph path must reject it just like the single-item/chain paths.
2919        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2920        let mut g = exec.add_graph();
2921        let r = g.vertex(crate::item::item_with_triggers(
2922            |d| {
2923                d.interval(Duration::ZERO);
2924                Ok(())
2925            },
2926            |_| Ok(crate::ControlFlow::Continue),
2927        ));
2928        g.root(r);
2929        let err = g
2930            .build()
2931            .expect_err("graph root zero-period interval must be rejected");
2932        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2933    }
2934
2935    #[test]
2936    fn stopped_iteration_emits_no_cyclic_cycle_observation() {
2937        use core::time::Duration;
2938        use std::sync::atomic::AtomicU64;
2939
2940        // A CyclicClock that starts at 0 (epoch) then jumps far past the first
2941        // grid target, so the post-wait `take_due` finds the cyclic task due on
2942        // the very first (stopping) wake. Distinct from the telemetry clock
2943        // (scheduling role).
2944        struct JumpClock {
2945            calls: AtomicU64,
2946        }
2947        impl crate::CyclicClock for JumpClock {
2948            fn now_nanos(&self) -> u64 {
2949                // First read (grid epoch at loop entry) = 0; every later read
2950                // is well past the 1ms target.
2951                if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
2952                    0
2953                } else {
2954                    1_000_000_000
2955                }
2956            }
2957        }
2958
2959        // Observer that counts on_cycle_stats calls.
2960        struct Counter {
2961            cycles: AtomicU64,
2962        }
2963        impl Observer for Counter {
2964            fn on_cycle_stats(&self, _obs: &CycleObservation) {
2965                self.cycles.fetch_add(1, Ordering::SeqCst);
2966            }
2967        }
2968
2969        let counter = Arc::new(Counter {
2970            cycles: AtomicU64::new(0),
2971        });
2972        let mut exec = Executor::builder()
2973            .worker_threads(0)
2974            .dispatch_mode(crate::DispatchMode::Grid)
2975            .cyclic_clock(Arc::new(JumpClock {
2976                calls: AtomicU64::new(0),
2977            }))
2978            .observer(Arc::clone(&counter) as Arc<dyn Observer>)
2979            .build()
2980            .unwrap();
2981        exec.add(crate::item::item_with_triggers(
2982            |d| {
2983                d.interval(Duration::from_millis(1));
2984                Ok(())
2985            },
2986            |_| Ok(crate::ControlFlow::Continue),
2987        ))
2988        .unwrap();
2989
2990        // Stop BEFORE running: the WaitSet wakes immediately on the stop
2991        // listener; the grid target is already due (JumpClock). Without the
2992        // stop guard the post-wait cyclic pass would dispatch + record one
2993        // spurious cycle on this stopping iteration; with it, zero.
2994        exec.stoppable().stop();
2995        exec.run().expect("run returns cleanly after stop");
2996
2997        assert_eq!(
2998            counter.cycles.load(Ordering::SeqCst),
2999            0,
3000            "no cyclic cycle observation may be emitted on a stop wake"
3001        );
3002    }
3003
3004    #[test]
3005    fn custom_id_is_preserved() {
3006        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3007        let id = exec
3008            .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
3009            .unwrap();
3010        assert_eq!(id.as_str(), "my-task");
3011    }
3012
3013    #[test]
3014    fn add_persists_declared_budget() {
3015        use core::time::Duration;
3016        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3017        let task_id = exec
3018            .add(crate::item::item_with_triggers(
3019                |d| {
3020                    d.interval(Duration::from_millis(10));
3021                    d.budget(Duration::from_millis(5));
3022                    Ok(())
3023                },
3024                |_| Ok(crate::ControlFlow::Continue),
3025            ))
3026            .unwrap();
3027        let entry = exec
3028            .tasks
3029            .iter()
3030            .find(|t| t.id == task_id)
3031            .expect("task present");
3032        assert_eq!(entry.budget, Some(Duration::from_millis(5)));
3033    }
3034
3035    #[test]
3036    fn scan_period_cached_for_cyclic_only() {
3037        use core::time::Duration;
3038        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3039        let cyclic = exec
3040            .add(crate::item::item_with_triggers(
3041                |d| {
3042                    d.interval(Duration::from_millis(5));
3043                    Ok(())
3044                },
3045                |_| Ok(crate::ControlFlow::Continue),
3046            ))
3047            .unwrap();
3048        let event_driven = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3049
3050        let cyclic_entry = exec
3051            .tasks
3052            .iter()
3053            .find(|t| t.id == cyclic)
3054            .expect("cyclic task present");
3055        assert_eq!(cyclic_entry.scan_period, Some(Duration::from_millis(5)));
3056        // Sentinel: no sample has been taken yet.
3057        assert_eq!(cyclic_entry.last_took_ns.load(Ordering::Relaxed), u64::MAX);
3058
3059        let event_entry = exec
3060            .tasks
3061            .iter()
3062            .find(|t| t.id == event_driven)
3063            .expect("event-driven task present");
3064        assert_eq!(event_entry.scan_period, None);
3065    }
3066
3067    #[test]
3068    fn cycle_stats_index_aligned_with_tasks() {
3069        use core::time::Duration;
3070        let mut exec = Executor::builder()
3071            .worker_threads(0)
3072            .stats_window(512)
3073            .build()
3074            .unwrap();
3075        // Builder option flows through to the executor.
3076        assert_eq!(exec.stats_window, 512);
3077        // No tasks yet → both Vecs empty and aligned.
3078        assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3079
3080        // Cyclic single-item add path.
3081        exec.add(crate::item::item_with_triggers(
3082            |d| {
3083                d.interval(Duration::from_millis(5));
3084                Ok(())
3085            },
3086            |_| Ok(crate::ControlFlow::Continue),
3087        ))
3088        .unwrap();
3089        // Event-driven single-item add path.
3090        exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3091
3092        assert_eq!(exec.tasks.len(), 2);
3093        assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3094    }
3095
3096    #[test]
3097    fn add_with_fault_handler_stores_handler_job() {
3098        use core::time::Duration;
3099        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3100        let task_id = exec
3101            .add_with_fault_handler(
3102                crate::item::item_with_triggers(
3103                    |d| {
3104                        d.interval(Duration::from_millis(10));
3105                        d.budget(Duration::from_millis(5));
3106                        Ok(())
3107                    },
3108                    |_| Ok(crate::ControlFlow::Continue),
3109                ),
3110                crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
3111            )
3112            .unwrap();
3113        let entry = exec
3114            .tasks
3115            .iter()
3116            .find(|t| t.id == task_id)
3117            .expect("task present");
3118        assert!(
3119            entry.handler_job.is_some(),
3120            "handler_job should be Some after add_with_fault_handler"
3121        );
3122        // Main job should still be present.
3123        assert!(entry.job.is_some(), "main job should still be present");
3124    }
3125
3126    #[test]
3127    fn declare_triggers_called_at_add_time() {
3128        let called = Arc::new(AtomicBool::new(false));
3129        let called_d = Arc::clone(&called);
3130
3131        let it = crate::item::item_with_triggers(
3132            move |_d| {
3133                called_d.store(true, Ordering::SeqCst);
3134                Ok(())
3135            },
3136            |_| Ok(ControlFlow::Continue),
3137        );
3138
3139        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3140        exec.add(it).unwrap();
3141        assert!(called.load(Ordering::SeqCst));
3142    }
3143
3144    #[test]
3145    fn clear_task_fault_errors_on_running_task() {
3146        use core::time::Duration;
3147        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3148        let task_id = exec
3149            .add(crate::item::item_with_triggers(
3150                |d| {
3151                    d.interval(Duration::from_millis(10));
3152                    Ok(())
3153                },
3154                |_| Ok(crate::ControlFlow::Continue),
3155            ))
3156            .unwrap();
3157        // Task starts in Running state — clearing should error.
3158        let err = exec.clear_task_fault(task_id).expect_err("not faulted");
3159        assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
3160    }
3161
3162    #[test]
3163    fn clear_executor_fault_errors_on_running_executor() {
3164        let exec = Executor::builder().worker_threads(0).build().unwrap();
3165        let err = exec.clear_executor_fault().expect_err("not faulted");
3166        assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
3167    }
3168
3169    #[test]
3170    fn overrun_count_returns_zero_for_new_task() {
3171        use core::time::Duration;
3172        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3173        let task_id = exec
3174            .add(crate::item::item_with_triggers(
3175                |d| {
3176                    d.interval(Duration::from_millis(10));
3177                    d.budget(Duration::from_millis(5));
3178                    Ok(())
3179                },
3180                |_| Ok(crate::ControlFlow::Continue),
3181            ))
3182            .unwrap();
3183        assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
3184    }
3185
3186    #[test]
3187    fn overrun_count_errors_for_unknown_task() {
3188        let exec = Executor::builder().worker_threads(0).build().unwrap();
3189        let err = exec
3190            .overrun_count(crate::TaskId::new("nope"))
3191            .expect_err("unknown task");
3192        assert!(matches!(err, ExecutorError::TaskNotFound(_)));
3193    }
3194
3195    #[test]
3196    fn task_fault_state_starts_running() {
3197        use core::time::Duration;
3198        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3199        let task_id = exec
3200            .add(crate::item::item_with_triggers(
3201                |d| {
3202                    d.interval(Duration::from_millis(10));
3203                    Ok(())
3204                },
3205                |_| Ok(crate::ControlFlow::Continue),
3206            ))
3207            .unwrap();
3208        assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
3209    }
3210
3211    #[test]
3212    fn executor_fault_state_starts_running() {
3213        let exec = Executor::builder().worker_threads(0).build().unwrap();
3214        assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
3215    }
3216
3217    // --- on_fatal / FatalDispatch integration tests ---
3218
3219    #[test]
3220    fn build_without_on_fatal_succeeds() {
3221        use crate::fatal::{FatalContext, FatalSite};
3222        use std::sync::{Arc, Mutex};
3223        // Default builder (no on_fatal) must build successfully.
3224        let exec = Executor::builder().worker_threads(0).build().unwrap();
3225        // The fatal_dispatch field is present; fire via a test terminal to
3226        // confirm the no-op handler doesn't blow up.
3227        let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3228        let reached2 = Arc::clone(&reached);
3229        let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3230            exec.fatal_dispatch.handler().clone(),
3231            move |_| {
3232                *reached2.lock().unwrap() = true;
3233            },
3234        );
3235        test_dispatch.fire(&FatalContext {
3236            cause: "test".to_string(),
3237            site: FatalSite::PoolWorker,
3238        });
3239        assert!(*reached.lock().unwrap(), "terminal not reached");
3240    }
3241
3242    #[test]
3243    fn on_fatal_handler_is_stored_and_invoked() {
3244        use crate::fatal::{FatalContext, FatalSite};
3245        use std::sync::{Arc, Mutex};
3246        let called: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3247        let called2 = Arc::clone(&called);
3248        let exec = Executor::builder()
3249            .worker_threads(0)
3250            .on_fatal(move |ctx| {
3251                called2.lock().unwrap().push(ctx.cause.clone());
3252            })
3253            .build()
3254            .unwrap();
3255        // Verify the handler fires via a test terminal.
3256        let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3257        let reached2 = Arc::clone(&reached);
3258        let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3259            exec.fatal_dispatch.handler().clone(),
3260            move |_| {
3261                *reached2.lock().unwrap() = true;
3262            },
3263        );
3264        test_dispatch.fire(&FatalContext {
3265            cause: "my-cause".to_string(),
3266            site: FatalSite::ExecutorRunLoop,
3267        });
3268        assert!(*reached.lock().unwrap(), "terminal not reached");
3269        let log = called.lock().unwrap().clone();
3270        assert_eq!(
3271            log,
3272            vec!["my-cause"],
3273            "handler should have been called with cause"
3274        );
3275    }
3276}