Skip to main content

taktora_executor/
executor.rs

1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::clock::{MonotonicClock, SystemClock};
10use crate::context::Stoppable;
11use crate::error::ExecutorError;
12use crate::fatal::{FatalDispatch, FatalHandler, FatalSite, guard_or_fatal, panic_payload_message};
13use crate::fault::{
14    ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
15    FaultState, duration_to_ms_sat, instant_to_since_ms,
16};
17use crate::item::ExecutableItem;
18use crate::monitor::{ExecutionMonitor, NoopMonitor};
19use crate::observer::{NoopObserver, Observer};
20use crate::payload::Payload;
21use crate::pool::Pool;
22use crate::stats::{CycleObservation, StatsSnapshot, TaskStatsEntry};
23use crate::task_id::TaskId;
24use crate::task_kind::TaskKind;
25use crate::thread_attrs::ThreadAttributes;
26use crate::trigger::{TriggerDecl, TriggerDeclarer};
27use core::sync::atomic::AtomicU32;
28use iceoryx2::node::Node;
29use iceoryx2::port::listener::Listener as IxListener;
30use iceoryx2::prelude::ipc;
31use iceoryx2::prelude::*;
32use iceoryx2::waitset::WaitSetRunResult;
33use std::sync::Arc;
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
36use std::time::{Duration, Instant};
37use taktora_stats::ExecutorCycleStats;
38
39/// Monotonically increasing counter so multiple executors in the same process
40/// each get a unique stop-event service name.
41static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
42
43/// Executor histogram segment count (`S`) and exact-window length (`W`) for
44/// per-task cycle stats. Fixed at compile time per `ADR_0060`.
45pub(crate) type TaskCycleStats = ExecutorCycleStats<8, 256>;
46
47/// A single wakeup's pending cycle record, stashed on the [`TaskEntry`] between
48/// the pre-dispatch capture and the post-barrier fold. Bundling the pre-dispatch
49/// timestamp with its `faulted` flag in one `Option` makes them impossible to
50/// desync: a cycle is pending iff this is `Some`, and the `faulted` bit is then
51/// always the one captured at the same wakeup (`REQ_0107`).
52#[derive(Clone, Copy)]
53pub(crate) struct CyclePending {
54    /// Pre-dispatch timestamp for this wakeup (the cycle's `pre`), in
55    /// telemetry-clock nanoseconds (see [`MonotonicClock`]).
56    pub(crate) pre: u64,
57    /// `true` when this wakeup's scan was fault-routed/skipped, so the
58    /// post-barrier fold records it with `faulted=true`.
59    pub(crate) faulted: bool,
60}
61
62/// One registered task entry.
63pub(crate) struct TaskEntry {
64    /// Task identifier.
65    pub(crate) id: TaskId,
66    /// The kind of work this entry holds (single item or chain).
67    pub(crate) kind: TaskKind,
68    /// Trigger declarations recorded at `add` time.
69    pub(crate) decls: Vec<TriggerDecl>,
70    /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
71    /// time and re-invoked on every dispatch iteration via
72    /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
73    /// that `Pool::submit<F>` requires in threaded mode. Required for
74    /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
75    /// `TaskKind::Graph`, which dispatches its vertices via a separate
76    /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
77    pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
78
79    /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
80    /// means no per-task check; the executor-wide iteration budget
81    /// still applies. `REQ_0070`.
82    pub(crate) budget: Option<Duration>,
83
84    /// Per-task fault state. Wait-free read on the dispatch hot path.
85    /// Wrapped in `Arc` so dispatch closures built at `add` time can
86    /// capture an owning handle into the same atomic the `TaskEntry`
87    /// holds — `Arc::clone` is refcount-only, so this stays compatible
88    /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
89    pub(crate) fault: Arc<FaultAtomic>,
90
91    /// Monotonic per-task overrun counter. Increments on EVERY budget
92    /// breach, including breaches while already `Faulted`. Never reset
93    /// by clearing the fault. Shared with the dispatch closure via
94    /// `Arc::clone`. `REQ_0102`.
95    pub(crate) overrun_count: Arc<AtomicU64>,
96
97    /// Pre-built dispatch closure for the fault-handler item. Mirrors
98    /// `job`. `None` means no handler — the task is simply skipped
99    /// during fault. `REQ_0072`.
100    pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
101
102    /// Declared scan period for cyclic tasks (the `TriggerDecl::Interval`
103    /// duration), or `None` for event-driven tasks. Cached at add time so the
104    /// dispatch loop reads it without scanning `decls` per cycle. Gates cycle
105    /// telemetry: only cyclic tasks participate (`REQ_0106`).
106    pub(crate) scan_period: Option<Duration>,
107    /// Last-cycle execute duration in ns, written by the dispatch closure on
108    /// the pool worker and read by the `WaitSet` thread after `barrier()`.
109    /// Shared via `Arc` exactly like `overrun_count`. Sentinel `u64::MAX` =
110    /// "no sample this cycle" (the closure never ran — e.g. a faulted scan).
111    pub(crate) last_took_ns: Arc<AtomicU64>,
112
113    /// WaitSet-thread-only timestamp of this task's previous dispatch, for
114    /// computing `actual_period` (`REQ_0101`). Not shared (no atomic) — only the
115    /// single dispatch thread touches it. `None` before the first dispatch.
116    /// Telemetry-clock nanoseconds (see [`MonotonicClock`]).
117    pub(crate) last_dispatch: Option<u64>,
118
119    /// WaitSet-thread-only running grid-slot index for deadline lateness
120    /// (`REQ_0106`). Counts nominal periods elapsed since the grid epoch,
121    /// advancing one slot per cycle under steady drift and several at once
122    /// across a coalesced/missed wakeup — decoupled from `cycle_index` so a
123    /// transient hiccup re-anchors the grid instead of biasing it forever.
124    /// Starts at `0` (the first cycle is on its own grid point by definition).
125    pub(crate) grid_slot: u64,
126
127    /// WaitSet-thread-only stash of the *current* wakeup's pending cycle —
128    /// the pre-dispatch timestamp plus its `faulted` flag — carried across
129    /// `pool.barrier()` so the post-barrier record pass can fold this cycle's
130    /// telemetry without re-reading the clock or allocating a fired-index list.
131    /// `Some` between the pre-dispatch capture and the post-barrier
132    /// `record_cycle_for` `take`; `None` otherwise. Bundling the timestamp and
133    /// the fault flag in one `Option` keeps them from ever desyncing
134    /// (`REQ_0107`). Only the single dispatch thread touches it (no atomic).
135    pub(crate) pending_cycle: Option<CyclePending>,
136}
137
138/// Top-level executor. One per process is the typical case.
139pub struct Executor {
140    pub(crate) node: Node<ipc::Service>,
141    pub(crate) pool: Arc<Pool>,
142    pub(crate) tasks: Vec<TaskEntry>,
143    /// One cycle-stats aggregator per registered task, index-aligned with
144    /// `tasks`. Pushed at task-add time (before `run`), so no steady-state
145    /// allocation (`REQ_0060`, `REQ_0104`). Updated single-writer on the
146    /// `WaitSet` thread (Task 6).
147    pub(crate) cycle_stats: Vec<TaskCycleStats>,
148    /// Histogram sliding-window size in samples (`REQ_0100`).
149    pub(crate) stats_window: u32,
150    pub(crate) running: Arc<AtomicBool>,
151    pub(crate) stoppable: Stoppable,
152    pub(crate) next_id: AtomicU64,
153    /// Listener for the internal stop event service. Held here so it outlives
154    /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
155    /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
156    pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
157    /// Lifecycle observer. Defaults to a no-op.
158    pub(crate) observer: Arc<dyn Observer>,
159    /// Execution monitor. Defaults to a no-op.
160    pub(crate) monitor: Arc<dyn ExecutionMonitor>,
161    /// Per-iteration error capture slot — allocated once at build time and
162    /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
163    /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
164    /// the per-iteration heap allocation that the previous design incurred.
165    /// Required for `REQ_0060`.
166    pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
167    /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
168    /// `None` means no executor-wide check.
169    pub(crate) iteration_budget: Option<Duration>,
170    /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
171    /// closure can hold an owning handle without re-borrowing through
172    /// `self`. `REQ_0071`.
173    pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
174
175    /// Index of the task whose `execute()` overran when the executor
176    /// transitioned to `Faulted`. Read alongside `exec_fault`.
177    pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
178
179    /// Budget that was breached when the executor transitioned to
180    /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
181    pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
182
183    /// Executor start time, set on first dispatch. Used to compute
184    /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
185    /// in `Arc` so dispatch closures share the same `OnceLock` with the
186    /// executor — `get_or_init` is idempotent and wait-free.
187    pub(crate) start_time: Arc<OnceLock<Instant>>,
188
189    /// Fatal-dispatch handle. Called once on the fail-fast path from the
190    /// executor-thread run-loop boundary; the pool holds a separate
191    /// `Arc::clone` for its own worker / inline-submit boundaries.
192    pub(crate) fatal_dispatch: Arc<FatalDispatch>,
193
194    /// Telemetry time source (`REQ_0101`/`REQ_0105`/`REQ_0106`). Read on the
195    /// worker (for `took`) and the `WaitSet` thread (for `pre`); defaults to
196    /// [`SystemClock`]. A test can substitute a [`MockClock`] via
197    /// [`ExecutorBuilder::clock`] for deterministic timing assertions. Affects
198    /// only telemetry — never scheduling or fault behaviour.
199    pub(crate) clock: Arc<dyn MonotonicClock>,
200
201    /// Lateness grid epoch in telemetry-clock nanoseconds (`REQ_0106`): the
202    /// `pre` of this executor's first recorded cyclic dispatch. Grid point `n`
203    /// is `grid_epoch + n * period`. Set once (lazily) on the `WaitSet` thread;
204    /// shared as an `Arc` so the dispatch loop and `record_cycle_for` see the
205    /// same `OnceLock`.
206    pub(crate) grid_epoch: Arc<OnceLock<u64>>,
207
208    /// Cyclic dispatch timing strategy (`REQ_0268` / `ADR_0100`). Read once at
209    /// `dispatch_loop` entry and hoisted to a local, so steady-state cost is a
210    /// single `Copy`-enum compare per cycle. Defaults to
211    /// [`DispatchMode::Grid`](crate::DispatchMode).
212    pub(crate) dispatch_mode: crate::DispatchMode,
213
214    /// Scheduling time source for the absolute grid (`REQ_0268`). Distinct from
215    /// [`Executor::clock`] (telemetry): a telemetry mock can never alter
216    /// dispatch timing. Defaults to
217    /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock).
218    pub(crate) cyclic_clock: std::sync::Arc<dyn crate::CyclicClock>,
219}
220
221// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
222// `SingleThreaded` reason as `IxNotifier`. After construction, the only
223// per-iteration call is `listener.try_wait_one()`, which does not mutate the
224// Rc. `Executor` is never shared across threads (it requires `&mut self` for
225// `run()`), so there is no aliased concurrent mutation.
226#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
227unsafe impl Send for Executor {}
228
229impl Executor {
230    /// Start a new builder.
231    #[must_use]
232    pub fn builder() -> ExecutorBuilder {
233        ExecutorBuilder::default()
234    }
235
236    /// Open or create a pub/sub channel bound to this executor's node.
237    pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
238        Channel::open_or_create(&self.node, name)
239    }
240
241    /// Open or create a request/response service bound to this executor's node.
242    pub fn service<Req, Resp>(
243        &mut self,
244        name: &str,
245    ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
246    where
247        Req: Payload,
248        Resp: Payload,
249    {
250        crate::Service::open_or_create(&self.node, name)
251    }
252
253    /// Borrowed snapshot of every task's cycle aggregates (`REQ_0103` pull
254    /// path). Relaxed reads; never blocks the dispatch writer.
255    #[must_use]
256    pub fn stats_snapshot(&self) -> StatsSnapshot {
257        let per_task = self
258            .tasks
259            .iter()
260            .zip(self.cycle_stats.iter())
261            .map(|(t, s)| {
262                let snap = s.snapshot();
263                TaskStatsEntry {
264                    task_id: t.id.clone(),
265                    p50_ns: snap.p50_ns,
266                    p95_ns: snap.p95_ns,
267                    p99_ns: snap.p99_ns,
268                    min_ns: snap.min_ns,
269                    max_ns: snap.max_ns,
270                    max_jitter_ns: snap.max_jitter_ns,
271                    max_lateness_ns: snap.max_lateness_ns,
272                    overrun_count: t.overrun_count.load(Ordering::Acquire),
273                }
274            })
275            .collect();
276        StatsSnapshot { per_task }
277    }
278
279    /// Add an item to the executor with an auto-generated id.
280    pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
281        let id = TaskId::new(format!(
282            "task-{}",
283            self.next_id.fetch_add(1, Ordering::SeqCst)
284        ));
285        self.add_with_id(id, item)
286    }
287
288    /// Add an item with a user-supplied id.
289    ///
290    /// The item's [`ExecutableItem::task_id`] override takes precedence over
291    /// the caller-supplied `id`, which itself takes precedence over the
292    /// auto-generated id assigned by [`Executor::add`].
293    pub fn add_with_id(
294        &mut self,
295        id: impl Into<TaskId>,
296        mut item: impl ExecutableItem,
297    ) -> Result<TaskId, ExecutorError> {
298        let id_arg: TaskId = id.into();
299        // The item's `task_id()` override wins over the user-supplied id.
300        let id = item.task_id().map_or(id_arg, TaskId::new);
301        let mut declarer = TriggerDeclarer::new_internal();
302        item.declare_triggers(&mut declarer)?;
303        let budget = declarer.budget;
304        let decls = declarer.into_decls();
305
306        // REQ_0268: reject ill-defined trigger shapes (cyclic+event, zero
307        // period) before the task joins the table — the natural validation
308        // point, where the decls are first available, for every DispatchMode.
309        validate_decls(&id, &decls)?;
310
311        let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
312        let app_id = item_box.app_id();
313        let app_inst = item_box.app_instance_id();
314        // SAFETY: the raw pointer points into the heap allocation of
315        // `item_box`. `Box` keeps that allocation at a stable address even
316        // when the `Box` itself is moved (e.g. when `self.tasks` grows),
317        // so the pointer remains valid for the lifetime of the
318        // `TaskEntry`. See SendItemPtr safety doc for the rest of the
319        // discipline (barrier() pairs with worker access).
320        #[allow(unsafe_code)]
321        let item_ptr =
322            SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
323
324        // Allocate the per-task atomics now so the dispatch closure
325        // and the `TaskEntry` share the same `Arc` storage. The task
326        // will occupy `self.tasks.len()` after the push below — capture
327        // that index up front for `task_idx_u32`. Bounded workspace, so
328        // the `as u32` cast is sound; explicit allow keeps clippy quiet.
329        let task_fault = Arc::new(FaultAtomic::new());
330        let overrun_count = Arc::new(AtomicU64::new(0));
331        let scan_period = scan_period_from_decls(&decls);
332        let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
333        #[allow(clippy::cast_possible_truncation)]
334        let task_idx_u32 = self.tasks.len() as u32;
335        let fault_ctx = FaultDispatchCtx {
336            task_budget: budget,
337            task_fault: Arc::clone(&task_fault),
338            overrun_count: Arc::clone(&overrun_count),
339            iteration_budget: self.iteration_budget,
340            exec_fault: Arc::clone(&self.exec_fault),
341            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
342            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
343            task_idx_u32,
344            exec_start: Arc::clone(&self.start_time),
345            observer: Arc::clone(&self.observer),
346        };
347
348        let job = build_single_job(
349            id.clone(),
350            self.stoppable.clone(),
351            Arc::clone(&self.observer),
352            Arc::clone(&self.monitor),
353            Arc::clone(&self.iter_err),
354            app_id,
355            app_inst,
356            item_ptr,
357            fault_ctx,
358            Arc::clone(&last_took_ns),
359            Arc::clone(&self.clock),
360        );
361
362        self.tasks.push(TaskEntry {
363            id: id.clone(),
364            kind: TaskKind::Single(item_box),
365            decls,
366            job: Some(job),
367            budget,
368            fault: task_fault,
369            overrun_count,
370            handler_job: None,
371            scan_period,
372            last_took_ns: Arc::clone(&last_took_ns),
373            last_dispatch: None,
374            grid_slot: 0,
375            pending_cycle: None,
376        });
377        self.cycle_stats
378            .push(TaskCycleStats::new(self.stats_window));
379        Ok(id)
380    }
381
382    /// Register an item plus a fault-handler item.
383    ///
384    /// The main item is registered through the canonical [`add`](Self::add)
385    /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
386    /// is called (so handlers that internally rely on the declarer being
387    /// invoked observe the call) but its returned trigger list is
388    /// **ignored** — the handler dispatches on the main item's triggers
389    /// while the task is in `Faulted` state and runs in place of the main
390    /// item's `execute()`. The pre-built handler dispatch closure is
391    /// stashed on the same task entry as the main item's `job`,
392    /// satisfying `REQ_0072`.
393    ///
394    /// # Errors
395    ///
396    /// Propagates any error from registering the main item via `add`, or
397    /// from the handler's `declare_triggers` call.
398    ///
399    /// # Panics
400    ///
401    /// Panics if the task entry just inserted by [`add`](Self::add) cannot
402    /// be located in `self.tasks` — this is unreachable by construction
403    /// and indicates a logic bug.
404    pub fn add_with_fault_handler<I, H>(
405        &mut self,
406        main: I,
407        handler: H,
408    ) -> Result<TaskId, ExecutorError>
409    where
410        I: ExecutableItem,
411        H: ExecutableItem,
412    {
413        let task_id = self.add(main)?;
414
415        // Drain the handler's trigger declarations — they are ignored by
416        // design (the handler runs on the main item's triggers).
417        let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
418        let mut throwaway = TriggerDeclarer::new_internal();
419        handler_box.declare_triggers(&mut throwaway)?;
420        drop(throwaway);
421
422        let app_id = handler_box.app_id();
423        let app_inst = handler_box.app_instance_id();
424
425        // Locate the task we just added so we can share its per-task
426        // atomics with the handler's `FaultDispatchCtx`. The handler
427        // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
428        // breach increments `overrun_count` and keeps state `Faulted`
429        // without re-firing the observer.
430        let task_idx = self
431            .tasks
432            .iter()
433            .position(|t| t.id == task_id)
434            .expect("just added; must exist");
435        let task = &self.tasks[task_idx];
436        #[allow(clippy::cast_possible_truncation)]
437        let task_idx_u32 = task_idx as u32;
438        let handler_fault_ctx = FaultDispatchCtx {
439            task_budget: task.budget,
440            task_fault: Arc::clone(&task.fault),
441            overrun_count: Arc::clone(&task.overrun_count),
442            iteration_budget: self.iteration_budget,
443            exec_fault: Arc::clone(&self.exec_fault),
444            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
445            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
446            task_idx_u32,
447            exec_start: Arc::clone(&self.start_time),
448            observer: Arc::clone(&self.observer),
449        };
450
451        let handler_closure = build_handler_job(
452            task_id.clone(),
453            self.stoppable.clone(),
454            Arc::clone(&self.observer),
455            Arc::clone(&self.monitor),
456            Arc::clone(&self.iter_err),
457            app_id,
458            app_inst,
459            handler_box,
460            handler_fault_ctx,
461        );
462
463        self.tasks[task_idx].handler_job = Some(handler_closure);
464
465        Ok(task_id)
466    }
467
468    /// Clear a per-task fault. Returns the previous `FaultState`.
469    /// Fires `Observer::on_task_clear` if the state changed from
470    /// `Faulted` to `Running`. `REQ_0070`.
471    ///
472    /// # Errors
473    ///
474    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
475    /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
476    pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
477        let entry = self
478            .tasks
479            .iter()
480            .find(|t| t.id == task)
481            .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
482        let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
483        let prev = entry.fault.swap(FaultState::Running, budget_ms);
484        match prev {
485            FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
486            FaultState::Faulted { .. } => {
487                self.observer.on_task_clear(task);
488                Ok(prev)
489            }
490        }
491    }
492
493    /// Clear the executor-wide fault and cascade-clear every task whose
494    /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
495    /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
496    /// breach is independent). Fires `Observer::on_executor_clear` and
497    /// one `Observer::on_task_clear` per cascade-cleared task.
498    /// `REQ_0071`.
499    ///
500    /// # Errors
501    ///
502    /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
503    pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
504        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
505        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
506        let prev = self
507            .exec_fault
508            .swap(ExecutorFaultState::Running, task_idx, budget_ms);
509        match prev {
510            ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
511            ExecutorFaultState::Faulted { .. } => {
512                // Cascade-clear tasks whose reason is ExecutorFaulted.
513                for entry in &self.tasks {
514                    let task_budget_ms =
515                        entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
516                    if let FaultState::Faulted {
517                        reason: FaultReason::ExecutorFaulted,
518                        ..
519                    } = entry.fault.load(task_budget_ms)
520                    {
521                        let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
522                        self.observer.on_task_clear(entry.id.clone());
523                    }
524                }
525                self.observer.on_executor_clear();
526                Ok(prev)
527            }
528        }
529    }
530
531    /// Return the per-task overrun counter — number of times the task's
532    /// `execute()` exceeded its budget over the executor's lifetime.
533    /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
534    ///
535    /// # Errors
536    ///
537    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
538    pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
539        self.tasks
540            .iter()
541            .find(|t| t.id == task)
542            .map(|t| t.overrun_count.load(Ordering::Acquire))
543            .ok_or_else(|| ExecutorError::TaskNotFound(task))
544    }
545
546    /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
547    ///
548    /// # Errors
549    ///
550    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
551    pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
552        self.tasks
553            .iter()
554            .find(|t| t.id == task)
555            .map(|t| {
556                let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
557                t.fault.load(budget_ms)
558            })
559            .ok_or_else(|| ExecutorError::TaskNotFound(task))
560    }
561
562    /// Return a snapshot of the executor-wide `ExecutorFaultState`.
563    /// `REQ_0073` (pull path).
564    #[must_use]
565    pub fn executor_fault_state(&self) -> ExecutorFaultState {
566        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
567        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
568        self.exec_fault.load(task_idx, budget_ms)
569    }
570
571    /// Add a sequential chain of items. Only the head item's
572    /// `declare_triggers` is consulted; non-head triggers are ignored with a
573    /// tracing warn.
574    pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
575    where
576        I: ExecutableItem,
577        C: IntoIterator<Item = I>,
578    {
579        let id = TaskId::new(format!(
580            "chain-{}",
581            self.next_id.fetch_add(1, Ordering::SeqCst)
582        ));
583        let boxed: Vec<Box<dyn ExecutableItem>> = items
584            .into_iter()
585            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
586            .collect();
587        self.add_chain_with_id_boxed(id, boxed)
588    }
589
590    /// Like [`Executor::add_chain`] but with a user-supplied id.
591    pub fn add_chain_with_id<I, C>(
592        &mut self,
593        id: impl Into<TaskId>,
594        items: C,
595    ) -> Result<TaskId, ExecutorError>
596    where
597        I: ExecutableItem,
598        C: IntoIterator<Item = I>,
599    {
600        let boxed: Vec<Box<dyn ExecutableItem>> = items
601            .into_iter()
602            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
603            .collect();
604        self.add_chain_with_id_boxed(id.into(), boxed)
605    }
606
607    fn add_chain_with_id_boxed(
608        &mut self,
609        id: TaskId,
610        mut items: Vec<Box<dyn ExecutableItem>>,
611    ) -> Result<TaskId, ExecutorError> {
612        if items.is_empty() {
613            return Err(ExecutorError::Builder(
614                "chain must contain at least one item".into(),
615            ));
616        }
617
618        // Head item's `task_id()` override wins over the user-supplied id.
619        let id = items[0].task_id().map_or(id, TaskId::new);
620
621        // Head's triggers gate the chain.
622        let mut head_declarer = TriggerDeclarer::new_internal();
623        items[0].declare_triggers(&mut head_declarer)?;
624        let decls = head_declarer.into_decls();
625
626        // REQ_0268: same trigger-shape validation as the single-item path,
627        // applied to the head item's decls (which gate the whole chain).
628        validate_decls(&id, &decls)?;
629
630        // Warn if non-head items declared triggers (those will be ignored).
631        for (i, body) in items.iter_mut().enumerate().skip(1) {
632            let mut spurious = TriggerDeclarer::new_internal();
633            let _ = body.declare_triggers(&mut spurious);
634            if !spurious.is_empty() {
635                #[cfg(feature = "tracing")]
636                tracing::warn!(
637                    target: "taktora-executor",
638                    task = %id,
639                    position = i,
640                    "non-head chain item declared triggers; they will be ignored"
641                );
642                #[cfg(not(feature = "tracing"))]
643                {
644                    let _ = i;
645                }
646            }
647        }
648
649        let mut items = items;
650        // SAFETY: pointer into the chain's `items` Vec. The Vec lives
651        // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
652        // is stable once `add_chain` returns — `self.tasks` may grow
653        // (moving the `Vec<Box<...>>` header itself), but the Vec's
654        // heap buffer is referenced via the header's data pointer and
655        // is unaffected by header moves. We never resize the chain Vec
656        // after this point. See SendChainPtr safety doc for the rest.
657        #[allow(unsafe_code)]
658        let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
659            &mut items,
660        ));
661        // NB: the pointer above is to the local `items` Vec on the
662        // stack — it's invalid after the `push` below moves items into
663        // the TaskEntry. We rederive a stable pointer after the push.
664        // (See the rebuild step below.)
665        let _ = chain_ptr;
666
667        // Pre-allocate the per-task atomics so the chain's dispatch
668        // closure can capture clones of the same `Arc`s the `TaskEntry`
669        // holds. The chain occupies `self.tasks.len()` after the push.
670        let task_fault = Arc::new(FaultAtomic::new());
671        let overrun_count = Arc::new(AtomicU64::new(0));
672        let scan_period = scan_period_from_decls(&decls);
673        let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
674        #[allow(clippy::cast_possible_truncation)]
675        let task_idx_u32 = self.tasks.len() as u32;
676
677        self.tasks.push(TaskEntry {
678            id: id.clone(),
679            kind: TaskKind::Chain(items),
680            decls,
681            job: None, // populated in the rebuild step below
682            // TODO(post-Task-10): chain budgets carried separately; for now None.
683            budget: None,
684            fault: Arc::clone(&task_fault),
685            overrun_count: Arc::clone(&overrun_count),
686            handler_job: None,
687            scan_period,
688            last_took_ns: Arc::clone(&last_took_ns),
689            last_dispatch: None,
690            grid_slot: 0,
691            pending_cycle: None,
692        });
693        self.cycle_stats
694            .push(TaskCycleStats::new(self.stats_window));
695
696        // After the push, the TaskEntry lives at a stable position in
697        // `self.tasks` for the duration of this `add_chain_with_id_boxed`
698        // call. Take a stable pointer to its chain Vec and build the
699        // dispatch closure. If `self.tasks` later grows, the Vec header
700        // inside the TaskEntry moves but the header's data pointer
701        // (which addresses the chain's heap buffer) does not — and the
702        // closure derefs that pointer per dispatch, so it re-reads the
703        // current heap address each time. Sound under the same
704        // discipline as `tasks_ptr` in dispatch_loop.
705        let task_idx = self.tasks.len() - 1;
706        let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
707        {
708            TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
709            // The push above used TaskKind::Chain, so this arm is
710            // unreachable. Mark it explicitly to satisfy `match`.
711            _ => unreachable!("just-pushed task is TaskKind::Chain"),
712        };
713        #[allow(unsafe_code)]
714        let chain_ptr = SendChainPtr::new(chain_vec_ptr);
715        let fault_ctx = FaultDispatchCtx {
716            task_budget: None, // chain budgets are intentionally None for now
717            task_fault,
718            overrun_count,
719            iteration_budget: self.iteration_budget,
720            exec_fault: Arc::clone(&self.exec_fault),
721            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
722            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
723            task_idx_u32,
724            exec_start: Arc::clone(&self.start_time),
725            observer: Arc::clone(&self.observer),
726        };
727        let job = build_chain_job(
728            id.clone(),
729            self.stoppable.clone(),
730            Arc::clone(&self.observer),
731            Arc::clone(&self.monitor),
732            Arc::clone(&self.iter_err),
733            chain_ptr,
734            fault_ctx,
735            Arc::clone(&last_took_ns),
736            Arc::clone(&self.clock),
737        );
738        self.tasks[task_idx].job = Some(job);
739        Ok(id)
740    }
741
742    /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
743    /// executor is built. Clone before calling `run()` — any clone taken at any
744    /// time will wake the `WaitSet` when `stop()` is called.
745    #[must_use]
746    pub fn stoppable(&self) -> Stoppable {
747        self.stoppable.clone()
748    }
749
750    /// Borrow the underlying iceoryx2 node (escape hatch for power users).
751    pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
752        &self.node
753    }
754
755    /// Begin building a graph. Call `.build()` on the returned builder to
756    /// register the graph as a task.
757    pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
758        ExecutorGraphBuilder {
759            executor: self,
760            builder: crate::graph::GraphBuilder::new(),
761            custom_id: None,
762        }
763    }
764}
765
766/// Builder for [`Executor`].
767pub struct ExecutorBuilder {
768    worker_threads: Option<usize>,
769    observer: Option<Arc<dyn Observer>>,
770    monitor: Option<Arc<dyn ExecutionMonitor>>,
771    worker_attrs: ThreadAttributes,
772    /// Executor-wide iteration budget (`REQ_0071`). `None` means no
773    /// executor-wide check.
774    iteration_budget: Option<Duration>,
775    /// User-supplied fatal handler. `None` → resolved to a no-op `Arc` in
776    /// `build()`.
777    fatal_handler: Option<FatalHandler>,
778    /// Sliding-window size (samples) for cycle-stats aggregation
779    /// (`REQ_0100`). `None` → resolved to `1024` in `build()`.
780    stats_window: Option<u32>,
781    /// Telemetry time source. `None` → resolved to [`SystemClock`] in
782    /// `build()`. Override with a [`MockClock`](crate::MockClock) for
783    /// deterministic timing tests.
784    clock: Option<Arc<dyn MonotonicClock>>,
785    /// Cyclic dispatch timing strategy (`REQ_0268`). Default
786    /// [`DispatchMode::Grid`](crate::DispatchMode).
787    dispatch_mode: crate::DispatchMode,
788    /// Scheduling clock for the absolute grid. `None` → resolved to
789    /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock) in `build()`.
790    cyclic_clock: Option<std::sync::Arc<dyn crate::CyclicClock>>,
791}
792
793impl Default for ExecutorBuilder {
794    fn default() -> Self {
795        Self {
796            worker_threads: None,
797            observer: None,
798            monitor: None,
799            worker_attrs: ThreadAttributes::new(),
800            iteration_budget: None,
801            fatal_handler: None,
802            stats_window: None,
803            clock: None,
804            dispatch_mode: crate::DispatchMode::default(),
805            cyclic_clock: None,
806        }
807    }
808}
809
810impl ExecutorBuilder {
811    /// Number of worker threads. `0` → inline (no pool). Default → physical
812    /// cores.
813    #[must_use]
814    pub const fn worker_threads(mut self, n: usize) -> Self {
815        self.worker_threads = Some(n);
816        self
817    }
818
819    /// Attach a lifecycle observer. If not called, a no-op observer is used.
820    #[must_use]
821    pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
822        self.observer = Some(obs);
823        self
824    }
825
826    /// Attach an execution monitor. If not called, a no-op monitor is used.
827    #[must_use]
828    pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
829        self.monitor = Some(mon);
830        self
831    }
832
833    /// Configure the executor-wide iteration budget. Any task whose
834    /// `execute()` exceeds `dur` transitions the executor to `Faulted`
835    /// (`REQ_0071`). Default: unset (no executor-wide check).
836    #[must_use]
837    pub const fn iteration_budget(mut self, dur: Duration) -> Self {
838        self.iteration_budget = Some(dur);
839        self
840    }
841
842    /// Sliding-window size (samples) for percentile / min-max / jitter /
843    /// lateness aggregation (`REQ_0100`). Default `1024`.
844    #[must_use]
845    pub const fn stats_window(mut self, samples: u32) -> Self {
846        self.stats_window = Some(samples);
847        self
848    }
849
850    /// Substitute the telemetry time source. Defaults to [`SystemClock`].
851    ///
852    /// Pass a [`MockClock`](crate::MockClock) clone to drive `took` / jitter /
853    /// lateness from scripted instants, making timing assertions exact and
854    /// independent of the host scheduler. The clock affects telemetry only —
855    /// scheduling, run-mode deadlines and fault detection always use the real
856    /// monotonic clock.
857    #[must_use]
858    pub fn clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
859        self.clock = Some(clock);
860        self
861    }
862
863    /// Select cyclic dispatch timing (default `DispatchMode::Grid`). `Legacy` is
864    /// the pre-REQ_0268 `attach_interval` path, retained only until the Pi A/B.
865    #[must_use]
866    pub const fn dispatch_mode(mut self, mode: crate::DispatchMode) -> Self {
867        self.dispatch_mode = mode;
868        self
869    }
870
871    /// Override the scheduling clock (default `MonotonicCyclicClock`). Distinct
872    /// from `clock` (telemetry) — see `CyclicClock`.
873    #[must_use]
874    pub fn cyclic_clock(mut self, clock: std::sync::Arc<dyn crate::CyclicClock>) -> Self {
875        self.cyclic_clock = Some(clock);
876        self
877    }
878
879    /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
880    /// for worker threads. Has no effect when `worker_threads` is `0` (inline
881    /// mode). Requires the `thread_attrs` feature for non-default settings.
882    #[must_use]
883    #[allow(clippy::missing_const_for_fn)]
884    pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
885        self.worker_attrs = attrs;
886        self
887    }
888
889    /// Register a best-effort last-gasp handler invoked once on the fail-fast
890    /// path immediately before `std::process::abort()`.
891    ///
892    /// **Contract**: runs over known-unsound executor state — MUST NOT touch
893    /// executor internals; a panic inside the handler routes straight to
894    /// `abort()`.
895    ///
896    /// The handler is expected to be time-bounded (the caller's responsibility);
897    /// no runtime deadline is imposed.
898    ///
899    /// **Observer / monitor containment carve-out**: the panic containment
900    /// described in the executor documentation covers only a user item's
901    /// `execute()` call. Panics that originate in framework-invoked user
902    /// callbacks that run *outside* that inner catch — such as
903    /// [`Observer`](crate::Observer) methods (e.g. `on_app_error`,
904    /// `on_task_fault`) and [`ExecutionMonitor`](crate::ExecutionMonitor)
905    /// methods (e.g. `post_execute`) — escape to this fail-fast boundary and
906    /// cause `abort()`. Those callbacks must therefore be treated as
907    /// non-panicking by the implementor. See `REQ_0123`.
908    ///
909    /// If not called, a no-op handler is used and `abort()` is still reached
910    /// after any unrecoverable fault.
911    #[must_use]
912    pub fn on_fatal(
913        mut self,
914        handler: impl Fn(&crate::FatalContext) + Send + Sync + 'static,
915    ) -> Self {
916        self.fatal_handler = Some(Arc::new(handler));
917        self
918    }
919
920    /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
921    /// internal stop-event service so that any `Stoppable` clone (taken before
922    /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
923    ///
924    /// # Panics
925    ///
926    /// Panics if the internally-generated stop-event service name exceeds the
927    /// iceoryx2 service name length limit (this cannot happen under normal use
928    /// because the name is derived from the process id and a monotonic counter).
929    #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
930    #[track_caller]
931    pub fn build(self) -> Result<Executor, ExecutorError> {
932        let node = NodeBuilder::new()
933            .create::<ipc::Service>()
934            .map_err(ExecutorError::iceoryx2)?;
935
936        let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
937
938        // Resolve the fatal handler: use the user-supplied one or fall back to a no-op.
939        let fatal_handler: FatalHandler = self
940            .fatal_handler
941            .unwrap_or_else(|| Arc::new(|_ctx: &crate::FatalContext| {}));
942        let fatal_dispatch = Arc::new(FatalDispatch::new(fatal_handler));
943
944        let pool = Arc::new(Pool::new(
945            n_workers,
946            self.worker_attrs,
947            Arc::clone(&fatal_dispatch),
948        )?);
949
950        // Build the internal stop event service with a unique-per-process name
951        // so multiple executors in the same process don't collide.
952        let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
953        let stop_topic = format!(
954            "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
955            std::process::id()
956        );
957        let stop_event = node
958            .service_builder(&stop_topic.as_str().try_into().unwrap())
959            .event()
960            .open_or_create()
961            .map_err(ExecutorError::iceoryx2)?;
962
963        let stop_notifier = Arc::new(
964            stop_event
965                .notifier_builder()
966                .create()
967                .map_err(ExecutorError::iceoryx2)?,
968        );
969
970        // SAFETY: see module-level note; Arc<IxListener> is held here and only
971        // accessed on the executor thread.
972        let stop_listener = Arc::new(
973            stop_event
974                .listener_builder()
975                .create()
976                .map_err(ExecutorError::iceoryx2)?,
977        );
978
979        // Wire the notifier into the Stoppable so every clone is waker-aware
980        // from the moment the executor is built.
981        let stoppable = Stoppable::with_waker(stop_notifier);
982
983        let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
984
985        let monitor: Arc<dyn ExecutionMonitor> =
986            self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
987
988        let clock: Arc<dyn MonotonicClock> =
989            self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
990
991        let cyclic_clock: std::sync::Arc<dyn crate::CyclicClock> = self
992            .cyclic_clock
993            .unwrap_or_else(|| std::sync::Arc::new(crate::MonotonicCyclicClock::new()));
994
995        let exec = Executor {
996            node,
997            pool,
998            tasks: Vec::new(),
999            cycle_stats: Vec::new(),
1000            stats_window: self.stats_window.unwrap_or(1024),
1001            running: Arc::new(AtomicBool::new(false)),
1002            stoppable,
1003            next_id: AtomicU64::new(0),
1004            stop_listener,
1005            observer,
1006            monitor,
1007            iter_err: Arc::new(std::sync::Mutex::new(None)),
1008            iteration_budget: self.iteration_budget,
1009            exec_fault: Arc::new(ExecutorFaultAtomic::new()),
1010            exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
1011            exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
1012            start_time: Arc::new(OnceLock::new()),
1013            fatal_dispatch,
1014            clock,
1015            grid_epoch: Arc::new(OnceLock::new()),
1016            dispatch_mode: self.dispatch_mode,
1017            cyclic_clock,
1018        };
1019
1020        Ok(exec)
1021    }
1022}
1023
1024// ── Run loop ──────────────────────────────────────────────────────────────────
1025
1026impl Executor {
1027    /// Run the executor until [`Stoppable::stop`] is called or a task signals
1028    /// stop via [`crate::Context::stop_executor`].
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1033    ///
1034    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1035    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1036    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1037    ///
1038    /// If multiple items error in the same dispatch iteration, only the first
1039    /// is preserved; subsequent errors are discarded silently. To observe
1040    /// every error, attach an [`Observer`](crate::Observer) and read errors
1041    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1042    pub fn run(&mut self) -> Result<(), ExecutorError> {
1043        self.run_inner(RunMode::Forever)
1044    }
1045
1046    /// Run for at most `max` wall-clock duration, then return.
1047    ///
1048    /// # Errors
1049    ///
1050    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1051    ///
1052    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1053    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1054    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1055    ///
1056    /// If multiple items error in the same dispatch iteration, only the first
1057    /// is preserved; subsequent errors are discarded silently. To observe
1058    /// every error, attach an [`Observer`](crate::Observer) and read errors
1059    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1060    pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
1061        self.run_inner(RunMode::Until(Instant::now() + max))
1062    }
1063
1064    /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1069    ///
1070    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1071    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1072    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1073    ///
1074    /// If multiple items error in the same dispatch iteration, only the first
1075    /// is preserved; subsequent errors are discarded silently. To observe
1076    /// every error, attach an [`Observer`](crate::Observer) and read errors
1077    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1078    pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
1079        self.run_inner(RunMode::Iterations(n))
1080    }
1081
1082    /// Run until `predicate()` returns true. Checked after each `WaitSet`
1083    /// wakeup.
1084    ///
1085    /// # Errors
1086    ///
1087    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1088    ///
1089    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1090    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1091    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1092    ///
1093    /// If multiple items error in the same dispatch iteration, only the first
1094    /// is preserved; subsequent errors are discarded silently. To observe
1095    /// every error, attach an [`Observer`](crate::Observer) and read errors
1096    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1097    pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
1098        self.run_inner(RunMode::Predicate(&mut predicate))
1099    }
1100}
1101
1102enum RunMode<'a> {
1103    Forever,
1104    Until(Instant),
1105    Iterations(usize),
1106    Predicate(&'a mut dyn FnMut() -> bool),
1107}
1108
1109impl Executor {
1110    fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
1111        // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
1112        // remains true permanently. Calling `run()` again after a stop will return
1113        // promptly without doing any meaningful work (it blocks until the first
1114        // trigger fires, then immediately exits the dispatch loop). Task 10's
1115        // Runner accommodates this by treating an Executor as one-shot: each
1116        // Runner owns the Executor and consumes it.
1117        if self.running.swap(true, Ordering::SeqCst) {
1118            return Err(ExecutorError::AlreadyRunning);
1119        }
1120
1121        self.observer.on_executor_up();
1122        let result = self.dispatch_loop(&mut mode);
1123        match &result {
1124            Ok(()) => self.observer.on_executor_down(),
1125            Err(e) => self.observer.on_executor_error(e),
1126        }
1127
1128        self.running.store(false, Ordering::SeqCst);
1129        result
1130    }
1131
1132    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1133    #[allow(
1134        unsafe_code,
1135        clippy::too_many_lines,
1136        clippy::ref_as_ptr,
1137        clippy::borrow_as_ptr
1138    )]
1139    fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
1140        let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
1141            .create()
1142            .map_err(ExecutorError::iceoryx2)?;
1143
1144        // Keep Arc<RawListener> alive for at least as long as the WaitSet
1145        // guards — the guard borrows the listener via 'attachment lifetime.
1146        let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
1147        // Guards must outlive the run loop.
1148        let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
1149        // Maps guard index → task index.
1150        let mut attachment_to_task: Vec<usize> = Vec::new();
1151
1152        // Hoist to a local for the hot loop — one Copy-enum compare per cycle,
1153        // never a field re-read (REQ_0268).
1154        let dispatch_mode = self.dispatch_mode;
1155
1156        // Cyclic tasks are dispatched by the master timer + GridTimer (REQ_0268),
1157        // not attached as individual WaitSet triggers. Cross-platform: only the
1158        // wake source differs (Task 3).
1159        let mut cyclic_task_indices: Vec<usize> = Vec::new();
1160        let mut cyclic_periods: Vec<u64> = Vec::new();
1161        build_attachments(
1162            &waitset,
1163            &self.tasks,
1164            dispatch_mode,
1165            &mut listener_storage,
1166            &mut guards,
1167            &mut attachment_to_task,
1168            &mut cyclic_task_indices,
1169            &mut cyclic_periods,
1170        )?;
1171        // `cyclic_periods` is cloned, not moved, because Task 3 reads it again to
1172        // arm the single master timerfd; on non-Linux it is unused after this.
1173        let mut grid =
1174            crate::grid::GridTimer::new(self.cyclic_clock.now_nanos(), cyclic_periods.clone());
1175        let mut due_cyclic: Vec<usize> = Vec::new();
1176
1177        // Master cyclic timer (REQ_0268, Linux). ONE timerfd armed at the base
1178        // period (gcd of cyclic periods) drives the absolute grid; GridTimer
1179        // decides which tasks are due each tick. Declared above its own guard so
1180        // it drops AFTER the guard (detach before close → no EBADF). Must be
1181        // declared here, after `build_attachments` has filled `cyclic_periods`.
1182        #[cfg(target_os = "linux")]
1183        let master_timer: Option<crate::timerfd::TimerFd> = {
1184            let base = crate::grid::base_period(&cyclic_periods);
1185            if base == 0 {
1186                None
1187            } else {
1188                Some(
1189                    crate::timerfd::TimerFd::new(std::time::Duration::from_nanos(base)).map_err(
1190                        |e| {
1191                            ExecutorError::DeclareTriggers(format!(
1192                                "failed to arm master timerfd: {e}"
1193                            ))
1194                        },
1195                    )?,
1196                )
1197            }
1198        };
1199
1200        // Attach the master timer as a wake-only notification, held separately
1201        // (like the stop listener) so `process_attachment` never maps it to a
1202        // task. `_master_timer_guard` is declared immediately after `master_timer`
1203        // so on scope exit it drops FIRST — detaching the fd from the WaitSet's
1204        // epoll set — and `master_timer` drops SECOND, closing the fd. That
1205        // ordering is what prevents iceoryx2's `EPOLL_CTL_DEL` from hitting a
1206        // closed fd (EBADF). `master_timer`'s fd is referenced ONLY by this guard
1207        // (independent of `guards`/`listener_storage`, which own other fds), so
1208        // its drop position relative to those Vecs is immaterial.
1209        #[cfg(target_os = "linux")]
1210        #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1211        let _master_timer_guard = match &master_timer {
1212            // SAFETY: `master_timer` is a stack local that outlives this guard
1213            // (declared above it); the cast erases the borrow lifetime to the
1214            // attachment lifetime, sound by the same discipline as the stop
1215            // listener. Dropped before `master_timer` closes the fd.
1216            Some(tf) => Some(
1217                waitset
1218                    .attach_notification(unsafe { &*(tf as *const crate::timerfd::TimerFd) })
1219                    .map_err(ExecutorError::iceoryx2)?,
1220            ),
1221            None => None,
1222        };
1223
1224        // Attach the internal stop listener so the WaitSet wakes when
1225        // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
1226        // struct which is valid for the lifetime of dispatch_loop. We use the
1227        // same raw-pointer-cast pattern as user listeners above.
1228        //
1229        // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
1230        // exclusively borrowed for the duration of `run_inner` (which calls
1231        // `dispatch_loop`). The listener is not freed while the guard is alive
1232        // because the Arc keeps it alive and `self` outlives this function.
1233        let stop_listener_ref: &IxListener<ipc::Service> =
1234            unsafe { &*(self.stop_listener.as_ref() as *const _) };
1235        let _stop_guard = waitset
1236            .attach_notification(stop_listener_ref)
1237            .map_err(ExecutorError::iceoryx2)?;
1238
1239        let iterations_done = AtomicUsize::new(0);
1240        let stop_flag = self.stoppable.clone();
1241
1242        loop {
1243            // Reset the pre-allocated per-iteration error slot (REQ_0060):
1244            // the slot is owned by `self.iter_err`, allocated once at build
1245            // time. Pool worker closures obtain a refcount-only clone of
1246            // the `Arc`; the slot itself is reused across iterations.
1247            #[allow(clippy::unwrap_used)]
1248            // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1249            let mut iter_err_guard = self.iter_err.lock().unwrap();
1250            *iter_err_guard = None;
1251            drop(iter_err_guard);
1252
1253            // SAFETY: we capture &mut self.tasks via a raw pointer because
1254            // wait_and_process expects FnMut and Rust can't see the closure
1255            // outlives `self`. The discipline that makes this sound:
1256            //   1. The closure body on the executor thread is the *only* code that
1257            //      reads `tasks_ptr`. The pool jobs it submits hold borrowed
1258            //      `*mut dyn ExecutableItem` slices into individual TaskEntries,
1259            //      not into the Vec itself, so they don't race with the Vec.
1260            //   2. `pool.barrier()` at the end of this callback ensures every
1261            //      submitted pool job has completed (and dropped its raw pointer)
1262            //      before the callback returns. The next iteration of the WaitSet
1263            //      loop is therefore the sole user of `tasks_ptr` again.
1264            //   3. The Vec is never resized inside this loop (no `push` / `remove`
1265            //      after dispatch starts), so the underlying buffer addresses are
1266            //      stable for the lifetime of `dispatch_loop`.
1267            let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
1268            // Take the cycle_stats raw pointer before borrowing `observer`, so
1269            // the &mut borrow is released first — same discipline as tasks_ptr.
1270            let cycle_stats_ptr = &mut self.cycle_stats as *mut Vec<TaskCycleStats>;
1271            let observer = &self.observer;
1272            let pool = &self.pool;
1273            // Refcount-only clone of the pre-allocated error slot. Pool jobs
1274            // need a `'static` handle, and an `Arc::clone` does not allocate.
1275            // The Single/Chain paths use the closure baked into `task.job`,
1276            // which already captured stable Arc clones at `add`-time; the
1277            // Graph path uses closures pre-built by `prepare_dispatch`. Only
1278            // the error-aggregation logic on the WaitSet thread still needs
1279            // the slot here.
1280            let iter_err_inner = Arc::clone(&self.iter_err);
1281            // Raw pointer to the stop listener for draining inside the callback.
1282            // SAFETY: same as stop_listener_ref above — the Arc is alive for
1283            // the lifetime of dispatch_loop.
1284            let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
1285            // Raw pointer to the executor-wide fault state. Same safety
1286            // discipline as `tasks_ptr`: `Executor` is alive for the
1287            // duration of `dispatch_loop`; the WaitSet callback is the
1288            // only reader. REQ_0071. `self.exec_fault` is
1289            // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
1290            // pointer to the inner `ExecutorFaultAtomic`.
1291            let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
1292            // Raw pointer to the executor start time. Used by the lazy
1293            // cascade below to compute `since_ms` on task transitions
1294            // triggered by an executor-wide fault.
1295            let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1296            // Telemetry clock + lateness grid epoch. Same lifetime/aliasing
1297            // discipline as the pointers above: the Executor outlives the
1298            // dispatch loop and the WaitSet callback is the sole reader.
1299            let clock = &self.clock;
1300            let grid_epoch_ptr = &*self.grid_epoch as *const OnceLock<u64>;
1301
1302            // Wrap the per-iteration dispatch body in the framework panic
1303            // boundary. A panic escaping here is *infrastructure* (the WaitSet
1304            // drive, pool submission/barrier, or dispatch wiring) — not a user
1305            // item panic, which is already caught and faulted inside
1306            // `run_item_catch_unwind`. On such a panic `guard_or_fatal` runs the
1307            // user fatal handler then aborts in production. Under a test
1308            // terminal it returns `None`, in which case we must NOT keep
1309            // iterating over possibly-corrupt executor state, so we break out.
1310            let Some(cb_result) =
1311                guard_or_fatal(&self.fatal_dispatch, FatalSite::ExecutorRunLoop, || {
1312                    // Bundle the per-iteration captures into a single context the
1313                    // WaitSet callback delegates to. Keeping the closure a thin
1314                    // adapter over `DispatchPass::process_attachment` keeps the
1315                    // dispatch logic in named, individually-measurable functions.
1316                    let mut pass = DispatchPass {
1317                        guards: &guards,
1318                        attachment_to_task: &attachment_to_task,
1319                        tasks_ptr,
1320                        cycle_stats_ptr,
1321                        observer,
1322                        exec_fault_ptr,
1323                        exec_start_ptr,
1324                        clock,
1325                        grid_epoch_ptr,
1326                        stop_listener_ptr,
1327                        pool,
1328                        iter_err: &iter_err_inner,
1329                    };
1330
1331                    // Linux: block on fds — the master timerfd wakes us on the
1332                    // absolute grid. Non-Linux dev: bound the wait by the earliest
1333                    // pending grid target so the post-wait pass can dispatch.
1334                    #[cfg(target_os = "linux")]
1335                    let timeout = std::time::Duration::MAX;
1336                    #[cfg(not(target_os = "linux"))]
1337                    let timeout = match dispatch_mode {
1338                        crate::DispatchMode::Grid => {
1339                            grid.next_timeout(self.cyclic_clock.now_nanos())
1340                        }
1341                        crate::DispatchMode::Legacy => std::time::Duration::MAX,
1342                    };
1343                    waitset.wait_and_process_once_with_timeout(
1344                        |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1345                            pass.process_attachment(&attachment_id)
1346                        },
1347                        timeout,
1348                    )
1349                })
1350            else {
1351                // Only reachable under a test terminal (production aborts in
1352                // `fire`). Bail out of the run loop rather than continuing over
1353                // possibly-corrupt executor state.
1354                //
1355                // Unreachable in production: the production terminal aborts
1356                // before returning, so this branch exists solely so a
1357                // `#[cfg(test)]` recording terminal can unwind the loop.
1358                // Consequently, silently discarding any pending `iter_err`
1359                // here is immaterial to production behavior.
1360                break Ok(());
1361            };
1362
1363            // Did the master timer tick this wake? Linux: drain it (clears epoll
1364            // readiness; >0 overruns means the absolute grid advanced). Non-Linux:
1365            // the self-computed timeout drove the wake, so always consult the grid
1366            // (take_due self-gates per task on `now >= next`). REQ_0268.
1367            #[cfg(target_os = "linux")]
1368            let ticked = master_timer.as_ref().is_some_and(|tf| tf.drain() > 0);
1369            #[cfg(not(target_os = "linux"))]
1370            let ticked = true;
1371
1372            // Post-wait master-grid pass (Grid mode). `run_grid_cyclic_pass`
1373            // self-gates on `ticked` / stop-wake / mode / non-empty, then
1374            // dispatches EVERY due cyclic task atomically this tick (PLC
1375            // semantics). `cpass` is a side-effect-free bundle of borrows, so
1376            // building it unconditionally is free; the gate lives in the helper
1377            // to keep `dispatch_loop` within the complexity budget. REQ_0268.
1378            let cpass = DispatchPass {
1379                guards: &guards,
1380                attachment_to_task: &attachment_to_task,
1381                tasks_ptr,
1382                cycle_stats_ptr,
1383                observer,
1384                exec_fault_ptr,
1385                exec_start_ptr,
1386                clock,
1387                grid_epoch_ptr,
1388                stop_listener_ptr,
1389                pool,
1390                iter_err: &iter_err_inner,
1391            };
1392            run_grid_cyclic_pass(
1393                cpass,
1394                ticked,
1395                dispatch_mode,
1396                &stop_flag,
1397                cb_result,
1398                &mut grid,
1399                self.cyclic_clock.now_nanos(),
1400                &cyclic_task_indices,
1401                &mut due_cyclic,
1402            );
1403
1404            // Funnel the post-callback decision (interrupt / item error /
1405            // stop request / run-mode termination) through one helper that
1406            // yields a single control value, so the loop has exactly one exit.
1407            match self.after_callback(cb_result, mode, &iterations_done, &stop_flag) {
1408                IterOutcome::Continue => {}
1409                IterOutcome::Done => break Ok(()),
1410                IterOutcome::Failed(err) => break Err(err),
1411            }
1412        }
1413    }
1414
1415    /// Evaluates the post-callback termination conditions for one dispatch
1416    /// iteration and reports whether the loop should continue, stop, or fail.
1417    ///
1418    /// Order of precedence matches the original inline checks: `WaitSet`
1419    /// errors, then SIGINT/SIGTERM, then a captured item error, then a stop
1420    /// request, then the active [`RunMode`] limit.
1421    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1422    fn after_callback(
1423        &self,
1424        cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1425        mode: &mut RunMode<'_>,
1426        iterations_done: &AtomicUsize,
1427        stop_flag: &Stoppable,
1428    ) -> IterOutcome {
1429        let cb_result = match cb_result.map_err(ExecutorError::iceoryx2) {
1430            Ok(r) => r,
1431            Err(e) => return IterOutcome::Failed(e),
1432        };
1433
1434        // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
1435        // here for a clean exit.
1436        if matches!(
1437            cb_result,
1438            WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
1439        ) {
1440            return IterOutcome::Done;
1441        }
1442
1443        // Extract the error before dropping the MutexGuard — avoids holding the
1444        // lock across the return (clippy::significant_drop_in_scrutinee).
1445        #[allow(clippy::unwrap_used)]
1446        // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1447        let maybe_err = self.iter_err.lock().unwrap().take();
1448        if let Some(err) = maybe_err {
1449            return IterOutcome::Failed(err);
1450        }
1451        if stop_flag.is_stopped() {
1452            return IterOutcome::Done;
1453        }
1454
1455        iterations_done.fetch_add(1, Ordering::SeqCst);
1456        let reached_limit = match mode {
1457            RunMode::Forever => false,
1458            RunMode::Iterations(n) => iterations_done.load(Ordering::SeqCst) >= *n,
1459            RunMode::Until(deadline) => Instant::now() >= *deadline,
1460            RunMode::Predicate(p) => (p)(),
1461        };
1462        if reached_limit {
1463            IterOutcome::Done
1464        } else {
1465            IterOutcome::Continue
1466        }
1467    }
1468}
1469
1470/// Outcome of one `dispatch_loop` iteration's post-callback evaluation.
1471enum IterOutcome {
1472    /// Run another iteration.
1473    Continue,
1474    /// Terminate the loop successfully.
1475    Done,
1476    /// Terminate the loop with the given error.
1477    Failed(ExecutorError),
1478}
1479
1480/// Post-wait absolute-grid pass (Grid mode only, `REQ_0268` / `ADR_0100`).
1481///
1482/// The `WaitSet` callback handles event/fd tasks; cyclic tasks are timed here
1483/// off the scheduling clock. `pass` mirrors the callback's `DispatchPass`
1484/// exactly — same borrows and raw pointers, same single-writer WaitSet-thread
1485/// discipline — and the callback is already dropped (its borrows freed) by the
1486/// time this runs. We poll `grid` for due cyclic slots, dispatch each due task,
1487/// and fold their telemetry through the SHARED [`DispatchPass::barrier_and_record`]
1488/// helper. This is a SEPARATE barrier phase from the callback's: each phase
1489/// barriers and folds only its own `pending_cycle` stashes, so cyclic tasks
1490/// record exactly once, identically to event tasks. We do NOT call
1491/// `record_cycle_for` directly here.
1492///
1493/// Self-gates and returns early (no dispatch, no record) unless this wake should
1494/// run the grid: the master timer ticked (`ticked`), we are in `Grid` mode, it is
1495/// not a stop wake, and there is at least one cyclic task with something due.
1496///
1497/// **Stop-wake suppression (`REQ_0268`)**: a `stop()` (or a SIGINT/SIGTERM
1498/// `cb_result`) must emit no spurious cyclic cycle — Legacy dispatches none on a
1499/// stop wake, so the grid path matches, or a `stop()` would emit one extra cycle
1500/// observation and desync the `FEAT_0038` `cycle_index` join key. Termination
1501/// itself is still decided by `after_callback`; this only suppresses the side
1502/// effects.
1503#[allow(clippy::too_many_arguments)]
1504fn run_grid_cyclic_pass(
1505    mut pass: DispatchPass<'_, '_, '_>,
1506    ticked: bool,
1507    dispatch_mode: crate::DispatchMode,
1508    stop_flag: &Stoppable,
1509    cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1510    grid: &mut crate::grid::GridTimer,
1511    now_nanos: u64,
1512    cyclic_task_indices: &[usize],
1513    due_cyclic: &mut Vec<usize>,
1514) {
1515    let stopping = stop_flag.is_stopped()
1516        || matches!(
1517            cb_result,
1518            Ok(WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest)
1519        );
1520    if !ticked
1521        || stopping
1522        || dispatch_mode != crate::DispatchMode::Grid
1523        || cyclic_task_indices.is_empty()
1524    {
1525        return;
1526    }
1527    grid.take_due(now_nanos, due_cyclic);
1528    if due_cyclic.is_empty() {
1529        return;
1530    }
1531    for slot in due_cyclic.iter() {
1532        pass.dispatch_task(cyclic_task_indices[*slot]);
1533    }
1534    pass.barrier_and_record();
1535}
1536
1537/// Build every `WaitSet` attachment for the task table (`REQ_0268`). In `Grid`
1538/// mode, `TriggerDecl::Interval` cyclic tasks are only *collected* into
1539/// `cyclic_task_indices` / `cyclic_periods` — they are NOT attached as
1540/// individual `WaitSet` triggers. The master timer + `GridTimer` owns their
1541/// wakeups (cross-platform; wake-source wiring is done in the caller).
1542/// Every other decl (and every decl in `Legacy` mode, including `Interval`
1543/// via `attach_interval`) is attached normally. Extracted from `dispatch_loop`
1544/// to keep that function within the cyclomatic-complexity budget.
1545#[allow(clippy::too_many_arguments)]
1546fn build_attachments<'w>(
1547    waitset: &'w WaitSet<ipc::Service>,
1548    tasks: &[TaskEntry],
1549    dispatch_mode: crate::DispatchMode,
1550    listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1551    guards: &mut Vec<WaitSetGuard<'w, 'w, ipc::Service>>,
1552    attachment_to_task: &mut Vec<usize>,
1553    cyclic_task_indices: &mut Vec<usize>,
1554    cyclic_periods: &mut Vec<u64>,
1555) -> Result<(), ExecutorError> {
1556    for (task_idx, task) in tasks.iter().enumerate() {
1557        for decl in &task.decls {
1558            if dispatch_mode == crate::DispatchMode::Grid {
1559                if let TriggerDecl::Interval(d) = decl {
1560                    // Grid mode owns cyclic timing via the master timer + GridTimer;
1561                    // these decls are NOT attached as individual WaitSet triggers.
1562                    cyclic_task_indices.push(task_idx);
1563                    cyclic_periods.push(u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
1564                    continue;
1565                }
1566            }
1567            let guard = attach_trigger_decl(waitset, listener_storage, decl)?;
1568            guards.push(guard);
1569            attachment_to_task.push(task_idx);
1570        }
1571    }
1572    Ok(())
1573}
1574
1575/// Attaches a single [`TriggerDecl`] to `waitset`, returning the resulting
1576/// guard.
1577///
1578/// Listener-backed declarations (`Subscriber`, `Deadline`, `RawListener`)
1579/// clone the listener `Arc` into `listener_storage` to extend its lifetime to
1580/// the surrounding `dispatch_loop` scope; `Interval` attaches a bare timer.
1581///
1582/// # Safety
1583///
1584/// The returned guard borrows the listener via a raw-pointer cast that erases
1585/// its lifetime. Soundness relies on the caller keeping `listener_storage` (and
1586/// `waitset`) alive for at least as long as the guard, and dropping the guards
1587/// before `listener_storage` — exactly the discipline `dispatch_loop` follows.
1588#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1589fn attach_trigger_decl<'w>(
1590    waitset: &'w WaitSet<ipc::Service>,
1591    listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1592    decl: &TriggerDecl,
1593) -> Result<WaitSetGuard<'w, 'w, ipc::Service>, ExecutorError> {
1594    // Clone the listener Arc and obtain a lifetime-erased reference. SAFETY:
1595    // both `listener_storage` and `waitset` are stack-local in `dispatch_loop`
1596    // and dropped together at its end; guards are dropped before
1597    // `listener_storage`.
1598    let mut listener_ref = |listener: &Arc<crate::trigger::RawListener>| {
1599        listener_storage.push(Arc::clone(listener));
1600        let l_ref = listener_storage.last().unwrap().as_ref();
1601        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
1602        l_ref
1603    };
1604
1605    let guard = match decl {
1606        TriggerDecl::Subscriber { listener } | TriggerDecl::RawListener(listener) => {
1607            waitset.attach_notification(listener_ref(listener))
1608        }
1609        TriggerDecl::Interval(d) => waitset.attach_interval(*d),
1610        TriggerDecl::Deadline { listener, deadline } => {
1611            waitset.attach_deadline(listener_ref(listener), *deadline)
1612        }
1613    };
1614    guard.map_err(ExecutorError::iceoryx2)
1615}
1616
1617/// Per-iteration dispatch context handed to the `WaitSet` callback.
1618///
1619/// `dispatch_loop` rebuilds one of these every iteration and the `WaitSet`
1620/// callback is a thin adapter over [`DispatchPass::process_attachment`]. All
1621/// fields are short-lived borrows / raw pointers into the `Executor` that owns
1622/// the surrounding `dispatch_loop`; their soundness is documented at each use
1623/// site in `dispatch_loop` (same single-threaded, barrier-bounded discipline).
1624struct DispatchPass<'a, 'g, 'w> {
1625    /// `WaitSet` guards, indexed in parallel with `attachment_to_task`.
1626    guards: &'a [WaitSetGuard<'g, 'w, ipc::Service>],
1627    /// Maps guard index to task index in `tasks_ptr`.
1628    attachment_to_task: &'a [usize],
1629    /// Raw pointer to `Executor::tasks`.
1630    tasks_ptr: *mut Vec<TaskEntry>,
1631    /// Raw pointer to `Executor::cycle_stats` (index-aligned with `tasks`).
1632    cycle_stats_ptr: *mut Vec<TaskCycleStats>,
1633    /// Borrow of the executor's observer for the `on_cycle_stats` push.
1634    observer: &'a Arc<dyn Observer>,
1635    /// Raw pointer to `Executor::exec_fault` inner state.
1636    exec_fault_ptr: *const ExecutorFaultAtomic,
1637    /// Raw pointer to `Executor::start_time`.
1638    exec_start_ptr: *const OnceLock<Instant>,
1639    /// Borrow of the executor's telemetry clock, read for each cycle's `pre`.
1640    clock: &'a Arc<dyn MonotonicClock>,
1641    /// Raw pointer to `Executor::grid_epoch` (lateness grid anchor, `REQ_0106`).
1642    grid_epoch_ptr: *const OnceLock<u64>,
1643    /// Raw pointer to the internal stop listener.
1644    stop_listener_ptr: *const IxListener<ipc::Service>,
1645    /// Borrow of the executor thread pool.
1646    pool: &'a Pool,
1647    /// Refcount-only handle to the per-iteration error slot.
1648    iter_err: &'a Arc<std::sync::Mutex<Option<ExecutorError>>>,
1649}
1650
1651impl DispatchPass<'_, '_, '_> {
1652    /// Dispatches a single task by index for one wakeup: takes the `&mut`
1653    /// borrow into the task table, applies the pre-dispatch fault gate, stashes
1654    /// this cycle's `pending_cycle` timestamp for the post-barrier telemetry
1655    /// fold, and submits the task's work to the pool.
1656    ///
1657    /// Shared by the `WaitSet` callback (`process_attachment`) and — per
1658    /// `REQ_0268` / `ADR_0100` — the forthcoming post-wait absolute-grid
1659    /// dispatch pass, so the per-task barrier/telemetry contract is identical
1660    /// across both call paths.
1661    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1662    #[allow(unsafe_code)]
1663    fn dispatch_task(&mut self, task_idx: usize) {
1664        // SAFETY: we are the only thread that may touch the task table
1665        // during the callback. wait_and_process_once is single-threaded
1666        // and dispatch_loop holds &mut self. The pointer is valid for the
1667        // duration of this call.
1668        let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1669
1670        // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072). When it
1671        // routes to a (possible) handler, normal dispatch is skipped.
1672        if self.handle_fault_routing(task) {
1673            // REQ_0107: a faulted/fault-routed scan STILL advances
1674            // cycle_index and emits on_cycle_stats, or the executor's count
1675            // desyncs from the connector's join key (FEAT_0038). took/jitter
1676            // are None (poison-safe); the index always moves. Allocation-free:
1677            // a CyclePending { Instant, bool } written onto the TaskEntry,
1678            // no heap.
1679            if task.scan_period.is_some() {
1680                task.pending_cycle = Some(CyclePending {
1681                    pre: self.clock.now_nanos(),
1682                    faulted: true,
1683                });
1684            }
1685            return;
1686        }
1687
1688        // Stash the pre-dispatch instant so the post-barrier record pass
1689        // can fold this cycle's telemetry. Allocation-free: the timestamp
1690        // lives on the TaskEntry, not in a per-wakeup Vec. `take`n in the
1691        // post-barrier loop below — guarantees exactly-once even if two
1692        // guards map to the same task. `faulted: false`: a task that faulted
1693        // last wakeup and recovered this one records the normal path (the
1694        // whole CyclePending is overwritten, so the flag can't be stale).
1695        task.pending_cycle = Some(CyclePending {
1696            pre: self.clock.now_nanos(),
1697            faulted: false,
1698        });
1699
1700        self.submit_task_job(task);
1701    }
1702
1703    /// Handles a single `WaitSet` wakeup: drains stop notifications, then
1704    /// dispatches every task whose attachment fired. Always returns
1705    /// [`CallbackProgression::Continue`]; termination is decided by the
1706    /// `stop_flag` check in `dispatch_loop` after the callback returns.
1707    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1708    #[allow(unsafe_code)]
1709    fn process_attachment(
1710        &mut self,
1711        attachment_id: &WaitSetAttachmentId<ipc::Service>,
1712    ) -> CallbackProgression {
1713        // Drain stop notifications first (no dispatch — the stop_flag check
1714        // after the callback returns handles termination).
1715        // SAFETY: stop_listener_ptr is valid for the duration of the call;
1716        // the Arc in self.stop_listener keeps it alive.
1717        let stop_l = unsafe { &*self.stop_listener_ptr };
1718        while let Ok(Some(_)) = stop_l.try_wait_one() {}
1719
1720        for i in 0..self.guards.len() {
1721            let guard = &self.guards[i];
1722            let fired =
1723                attachment_id.has_event_from(guard) || attachment_id.has_missed_deadline(guard);
1724            if !fired {
1725                continue;
1726            }
1727            let task_idx = self.attachment_to_task[i];
1728            self.dispatch_task(task_idx);
1729        }
1730
1731        self.barrier_and_record();
1732
1733        CallbackProgression::Continue
1734    }
1735
1736    /// Barrier all submitted pool jobs for this dispatch phase, then fold each
1737    /// task's stashed `pending_cycle` into recorded cycle telemetry. Shared by
1738    /// the `WaitSet` callback (event/fd tasks) and the post-wait grid pass
1739    /// (cyclic tasks, `REQ_0268`). Keyed on `pending_cycle` so it records
1740    /// exactly the tasks dispatched this phase, exactly once.
1741    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1742    #[allow(unsafe_code)]
1743    fn barrier_and_record(&mut self) {
1744        // Wait for all submitted jobs to finish before leaving the callback
1745        // scope (validates item_ptr safety contract). The barrier also makes
1746        // every worker's `last_took_ns` Release-store visible to the record
1747        // pass below.
1748        self.pool.barrier();
1749
1750        // Post-barrier telemetry fold. The source of truth for "this task was
1751        // dispatched this wakeup and owes a record" is `pending_cycle`,
1752        // set in the dispatch loop above — not the guard fired-status. Keying
1753        // solely on the stash (rather than re-querying `has_event_from`)
1754        // removes any dependency on the fired-status query being stable across
1755        // a second scan, so a dispatched cycle can never be silently
1756        // under-recorded (which would lag `cycle_index` — the desync FEAT_0038
1757        // must avoid). `take` clears the stash, guaranteeing exactly-once.
1758        // Allocation-free: iterate task indices in place.
1759        // SAFETY: same single-writer WaitSet-thread discipline as the dispatch
1760        // loop above; barrier-bounded, no in-flight pool job aliases `tasks`.
1761        let task_count = unsafe { (*self.tasks_ptr).len() };
1762        for task_idx in 0..task_count {
1763            // SAFETY: single-writer WaitSet thread; borrow released before
1764            // the record_cycle_for call (which re-derefs tasks_ptr).
1765            let pending = unsafe { (&mut *self.tasks_ptr)[task_idx].pending_cycle.take() };
1766            if let Some(CyclePending { pre, faulted }) = pending {
1767                self.record_cycle_for(task_idx, faulted, pre);
1768            }
1769        }
1770    }
1771
1772    /// Fold one scan cycle's telemetry and push it to the observer. Called
1773    /// once per fired CYCLIC attachment per wakeup. `faulted = true` (Task 10)
1774    /// means the scan was skipped/errored: `took`/`jitter`/`lateness` are
1775    /// unmeasured. Event-driven tasks (no `scan_period`) are skipped entirely
1776    /// (`REQ_0106`).
1777    #[allow(unsafe_code)]
1778    fn record_cycle_for(&mut self, task_idx: usize, faulted: bool, pre_ns: u64) {
1779        // SAFETY: single-writer WaitSet thread; same discipline as tasks_ptr.
1780        let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1781        let Some(period) = task.scan_period else {
1782            return; // event-driven: no cycle telemetry
1783        };
1784        let period_ns = u64::try_from(period.as_nanos()).unwrap_or(u64::MAX);
1785
1786        // Release/Acquire pairing with the worker store (M2): `swap` acquires
1787        // the worker's Release-store and resets the sentinel atomically.
1788        let took_raw = task.last_took_ns.swap(u64::MAX, Ordering::AcqRel);
1789        let took = if faulted || took_raw == u64::MAX {
1790            None
1791        } else {
1792            Some(took_raw)
1793        };
1794
1795        // actual_period + jitter vs the previous dispatch (REQ_0101). Always
1796        // advance `last_dispatch` (even on a faulted attempt) so the next
1797        // cycle's period is measured from this wakeup. `actual_period` is
1798        // `None` on the very first cycle (no previous timestamp); jitter is
1799        // additionally suppressed on a faulted scan (poison-safe: REQ_0107).
1800        let actual_period = task
1801            .last_dispatch
1802            .replace(pre_ns)
1803            .map(|prev| pre_ns.saturating_sub(prev));
1804        let jitter = if faulted {
1805            None
1806        } else {
1807            actual_period.map(|ap| ap.abs_diff(period_ns))
1808        };
1809
1810        // Advance the lateness grid slot (REQ_0106). The slot counts nominal
1811        // periods elapsed and is decoupled from `cycle_index`: a steady
1812        // sub-period slip rounds to exactly one slot per cycle, so drift
1813        // accumulates; a coalesced/missed wakeup (the WaitSet was starved past
1814        // one or more whole periods) advances several slots at once,
1815        // re-anchoring the grid so a transient hiccup does not permanently bias
1816        // every later cycle's lateness. First cycle (`actual_period == None`):
1817        // the slot stays at its initial 0.
1818        if let Some(ap) = actual_period {
1819            // round(ap / period) = (ap + period/2) / period, via checked_div so
1820            // a degenerate period_ns == 0 simply contributes no slot advance.
1821            if let Some(slots) = ap.saturating_add(period_ns / 2).checked_div(period_ns) {
1822                task.grid_slot = task.grid_slot.saturating_add(slots.max(1));
1823            }
1824        }
1825        let grid_slot = task.grid_slot;
1826
1827        // SAFETY: cycle_stats is index-aligned with tasks; single-writer.
1828        let stats = unsafe { &mut (&mut *self.cycle_stats_ptr)[task_idx] };
1829
1830        // Deadline lateness (REQ_0106): signed offset of the actual start
1831        // (`pre_ns`) from its nominal grid point `grid_epoch + grid_slot*period`,
1832        // where `grid_epoch` is this task set's first recorded `pre`. Positive
1833        // => started late; negative => early. Captures steady drift (jitter is
1834        // blind to a constant offset; lateness is not) while self-healing across
1835        // discrete missed wakeups via the grid-slot re-anchoring above.
1836        let lateness = if period_ns > 0 && !faulted {
1837            // SAFETY: grid_epoch_ptr derefs the Executor owning this dispatch_loop.
1838            let grid_epoch = *unsafe { &*self.grid_epoch_ptr }.get_or_init(|| pre_ns);
1839            let elapsed_ns = i64::try_from(pre_ns.saturating_sub(grid_epoch)).unwrap_or(i64::MAX);
1840            let expected_ns =
1841                i64::try_from(u128::from(grid_slot) * u128::from(period_ns)).unwrap_or(i64::MAX);
1842            Some(elapsed_ns.saturating_sub(expected_ns))
1843        } else {
1844            None
1845        };
1846
1847        let cycle_index = stats.record_cycle(took, jitter, lateness);
1848
1849        let obs = CycleObservation {
1850            cycle_index,
1851            task_id: task.id.clone(),
1852            task_index: u32::try_from(task_idx).unwrap_or(u32::MAX),
1853            faulted,
1854            period_ns,
1855            pre_ns,
1856            actual_period_ns: actual_period,
1857            jitter_ns: jitter,
1858            lateness_ns: lateness,
1859            took_ns: took,
1860        };
1861        self.observer.on_cycle_stats(&obs);
1862    }
1863
1864    /// Applies the pre-dispatch fault gate for `Single`/`Chain` tasks.
1865    ///
1866    /// Returns `true` when the task is routed to its fault handler (or
1867    /// silently skipped because no handler is registered) and normal dispatch
1868    /// must therefore be skipped. Returns `false` when normal dispatch should
1869    /// proceed. `Graph` tasks always return `false` — they use their own
1870    /// per-vertex scheduling and are out of scope for `FEAT_0018`.
1871    #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1872    fn handle_fault_routing(&self, task: &mut TaskEntry) -> bool {
1873        if !matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1874            return false;
1875        }
1876
1877        // SAFETY: exec_fault_ptr derefs into the Executor that owns the
1878        // surrounding dispatch_loop — alive for this call's lifetime.
1879        let exec_faulted = matches!(
1880            unsafe { &*self.exec_fault_ptr }.load(0, 0),
1881            ExecutorFaultState::Faulted { .. }
1882        );
1883        let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1884        let task_state = task.fault.load(task_budget_ms);
1885
1886        // Lazy cascade: if executor is `Faulted` and task is still `Running`,
1887        // silently transition the task to `Faulted{ExecutorFaulted}`. No
1888        // `on_task_fault` — the Observer already heard about the executor-wide
1889        // fault via `on_executor_fault` (cascade-noise invariant, FEAT_0018
1890        // §4.6).
1891        let task_faulted = if exec_faulted && matches!(task_state, FaultState::Running) {
1892            // SAFETY: exec_start_ptr derefs into the same Executor owning the
1893            // dispatch_loop. The OnceLock is wait-free.
1894            let exec_start = *unsafe { &*self.exec_start_ptr }.get_or_init(std::time::Instant::now);
1895            let since_ms = instant_to_since_ms(std::time::Instant::now(), exec_start);
1896            let _ = task.fault.swap(
1897                FaultState::Faulted {
1898                    reason: FaultReason::ExecutorFaulted,
1899                    since_ms,
1900                },
1901                task_budget_ms,
1902            );
1903            true
1904        } else {
1905            matches!(task_state, FaultState::Faulted { .. })
1906        };
1907
1908        if !(exec_faulted || task_faulted) {
1909            return false;
1910        }
1911
1912        // If a handler is registered, dispatch it. Otherwise, skip dispatch
1913        // entirely this wakeup.
1914        if let Some(handler_box) = task.handler_job.as_deref_mut() {
1915            let job_ptr: *mut (dyn FnMut() + Send) = handler_box as *mut (dyn FnMut() + Send);
1916            // SAFETY: same as the main-job dispatch below — handler_job is
1917            // owned by the TaskEntry; pool.barrier() awaits its completion
1918            // before the next callback.
1919            unsafe {
1920                self.pool
1921                    .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1922            }
1923        }
1924        true
1925    }
1926
1927    /// Dispatches `task`'s normal (non-fault) work for one wakeup.
1928    ///
1929    /// `Single`/`Chain` tasks submit their pre-built job to the pool;
1930    /// `Graph` tasks drive one pass and capture the first item error into the
1931    /// per-iteration error slot.
1932    #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1933    #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1934    fn submit_task_job(&self, task: &mut TaskEntry) {
1935        match &mut task.kind {
1936            TaskKind::Single(_) | TaskKind::Chain(_) => {
1937                // The dispatch closure was pre-allocated at task-add time and
1938                // stashed on `task.job`. Submit it via `submit_borrowed` — no
1939                // per-iteration Box allocation. Required by REQ_0060.
1940                #[allow(clippy::expect_used)]
1941                // fail-fast: Single/Chain task.job is always Some — set at add time in build_single_job/build_chain_job and never cleared
1942                let job_box = task
1943                    .job
1944                    .as_deref_mut()
1945                    .expect("Single/Chain tasks carry a pre-built job");
1946                let job_ptr: *mut (dyn FnMut() + Send) = job_box as *mut (dyn FnMut() + Send);
1947                // SAFETY: the closure lives in `task.job`, owned by
1948                // `self.tasks[task_idx]`; `tasks_ptr` is sound for the
1949                // duration of this callback. `pool.barrier()` in
1950                // `process_attachment` finishes the closure invocation before
1951                // the next iteration's callback. The WaitSet thread does not
1952                // touch the closure between this submit and that barrier.
1953                unsafe {
1954                    self.pool
1955                        .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1956                }
1957            }
1958            TaskKind::Graph(graph) => {
1959                // Outer driver runs on the WaitSet thread; vertices run on the
1960                // pool. The graph holds its own pre-built per-vertex closures
1961                // and SPSC ready ring (REQ_0060), so dispatch is
1962                // allocation-free in steady state.
1963                let outcome = graph.run_once_borrowed(self.pool);
1964                if let Some(source) = outcome.error {
1965                    #[allow(clippy::unwrap_used)]
1966                    // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1967                    let mut g = self.iter_err.lock().unwrap();
1968                    if g.is_none() {
1969                        *g = Some(ExecutorError::Item {
1970                            task_id: task.id.clone(),
1971                            source,
1972                        });
1973                    }
1974                }
1975                let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
1976            }
1977        }
1978    }
1979}
1980
1981/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
1982/// `Pool::submit`. The send is safe because:
1983///   1. The executor guarantees at most one invocation of a given item at a
1984///      time (via `pool.barrier()` before the pointer is reused).
1985///   2. `ExecutableItem: Send`, so moving the pointee across threads is sound
1986///      when no aliasing exists.
1987#[allow(unsafe_code)]
1988struct SendItemPtr {
1989    ptr: *mut dyn ExecutableItem,
1990}
1991
1992impl SendItemPtr {
1993    fn new(ptr: *mut dyn ExecutableItem) -> Self {
1994        Self { ptr }
1995    }
1996
1997    /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
1998    /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
1999    /// dispatch closure to be reusable across iterations without allocation).
2000    fn get(&self) -> *mut dyn ExecutableItem {
2001        self.ptr
2002    }
2003}
2004
2005// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
2006// closure can borrow `&SendItemPtr` per invocation without making the
2007// closure itself `!Send`.
2008#[allow(unsafe_code)]
2009unsafe impl Send for SendItemPtr {}
2010#[allow(unsafe_code)]
2011unsafe impl Sync for SendItemPtr {}
2012
2013/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
2014/// closure can iterate the chain's items in place without first
2015/// collecting them into a freshly-allocated `Vec`. The send is safe
2016/// for the same reason as [`SendItemPtr`] (see above): the executor
2017/// holds `&mut self` for the duration of `dispatch_loop`, and the
2018/// `pool.barrier()` at the end of each callback ensures the closure
2019/// has finished using this pointer before the Vec could be touched
2020/// from the `WaitSet` thread again. The Vec is never resized after
2021/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
2022/// allocate per iteration.
2023#[allow(unsafe_code)]
2024struct SendChainPtr {
2025    ptr: *mut Vec<Box<dyn ExecutableItem>>,
2026}
2027
2028impl SendChainPtr {
2029    fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
2030        Self { ptr }
2031    }
2032
2033    fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
2034        self.ptr
2035    }
2036}
2037
2038// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
2039// borrow `&SendChainPtr` per invocation while staying `Send`.
2040#[allow(unsafe_code)]
2041unsafe impl Send for SendChainPtr {}
2042#[allow(unsafe_code)]
2043unsafe impl Sync for SendChainPtr {}
2044
2045/// Captured state needed by a dispatch closure to perform post-execute
2046/// fault detection. All fields are `Arc`-shared with the owning
2047/// `Executor` and `TaskEntry` so the closure can read/write them
2048/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
2049/// `REQ_0102`.
2050struct FaultDispatchCtx {
2051    /// Per-task budget. `None` for chain / graph tasks (no per-task
2052    /// check) — the executor-wide iteration budget still applies.
2053    task_budget: Option<Duration>,
2054    /// Per-task fault state (shared with `TaskEntry::fault`).
2055    task_fault: Arc<FaultAtomic>,
2056    /// Per-task monotonic overrun counter (shared with
2057    /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
2058    overrun_count: Arc<AtomicU64>,
2059    /// Executor-wide iteration budget. `None` means no executor-wide
2060    /// check.
2061    iteration_budget: Option<Duration>,
2062    /// Executor-wide fault state (shared with `Executor::exec_fault`).
2063    exec_fault: Arc<ExecutorFaultAtomic>,
2064    /// Executor-wide offending-task index storage (shared with
2065    /// `Executor::exec_fault_task_idx`).
2066    exec_fault_task_idx: Arc<AtomicU32>,
2067    /// Executor-wide breached-budget storage (shared with
2068    /// `Executor::exec_fault_budget_ms`).
2069    exec_fault_budget_ms: Arc<AtomicU32>,
2070    /// Index of this task in the executor's task table.
2071    task_idx_u32: u32,
2072    /// Executor start time (shared with `Executor::start_time`).
2073    exec_start: Arc<OnceLock<Instant>>,
2074    /// Observer for `on_task_fault` / `on_executor_fault` notifications.
2075    observer: Arc<dyn Observer>,
2076}
2077
2078/// Validate a task's collected trigger declarations before it joins the task
2079/// table (`REQ_0268`). Applied at every add path — single, chain head, and
2080/// fault-handler main — at the point the `TriggerDecl`s are first available,
2081/// regardless of [`DispatchMode`] (the rejected shapes are ill-defined in any
2082/// mode; Legacy is temporary).
2083///
2084/// Rejects two shapes:
2085///
2086/// 1. **Cyclic AND event-driven** — a task carrying both an `Interval` decl and
2087///    any listener-backed decl (`Subscriber` / `Deadline` / `RawListener`). Per
2088///    `REQ_0106` a task is cyclic XOR event-driven: cyclic tasks have a
2089///    period/lateness, event-driven tasks do not. Allowing both would dispatch
2090///    and record the task twice in one wake (phase-a event + phase-b grid),
2091///    desyncing the `FEAT_0038` `cycle_index` join key (`REQ_0107`).
2092/// 2. **Zero-period interval** — an `Interval(Duration::ZERO)` busy-spins the
2093///    grid (`GridTimer::next_timeout` returns `0` every wake and `take_due`
2094///    re-fires without advancing). A zero scan period is nonsensical.
2095fn validate_decls(id: &TaskId, decls: &[crate::trigger::TriggerDecl]) -> Result<(), ExecutorError> {
2096    use crate::trigger::TriggerDecl;
2097
2098    let has_interval = decls.iter().any(|d| matches!(d, TriggerDecl::Interval(_)));
2099    let has_listener = decls.iter().any(|d| {
2100        matches!(
2101            d,
2102            TriggerDecl::Subscriber { .. }
2103                | TriggerDecl::Deadline { .. }
2104                | TriggerDecl::RawListener(_)
2105        )
2106    });
2107
2108    if has_interval && has_listener {
2109        return Err(ExecutorError::DeclareTriggers(format!(
2110            "task `{id}` declares both an interval (cyclic) and a listener \
2111             (event-driven) trigger; a task may be cyclic (interval) or \
2112             event-driven (listener) but not both — split it into two tasks"
2113        )));
2114    }
2115
2116    if decls
2117        .iter()
2118        .any(|d| matches!(d, TriggerDecl::Interval(dur) if dur.is_zero()))
2119    {
2120        return Err(ExecutorError::DeclareTriggers(format!(
2121            "task `{id}` declares a zero-duration interval; a cyclic scan \
2122             period must be strictly positive"
2123        )));
2124    }
2125
2126    Ok(())
2127}
2128
2129/// Extract the declared scan period (first `Interval` trigger) from a task's
2130/// trigger declarations, or `None` for event-driven tasks.
2131fn scan_period_from_decls(decls: &[crate::trigger::TriggerDecl]) -> Option<Duration> {
2132    decls.iter().find_map(|d| match d {
2133        crate::trigger::TriggerDecl::Interval(dur) => Some(*dur),
2134        _ => None,
2135    })
2136}
2137
2138/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
2139///
2140/// The returned closure is stored on `TaskEntry::job` and invoked once
2141/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
2142/// performs no allocation. The closure captures Arc clones of the
2143/// executor's shared state — those clones are refcount-only at build
2144/// time and are reused on every dispatch. Required for `REQ_0060`.
2145#[allow(clippy::too_many_arguments)]
2146fn build_single_job(
2147    id: TaskId,
2148    stop: Stoppable,
2149    obs: Arc<dyn Observer>,
2150    mon: Arc<dyn ExecutionMonitor>,
2151    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2152    app_id: Option<u32>,
2153    app_inst: Option<u32>,
2154    item_ptr: SendItemPtr,
2155    fault_ctx: FaultDispatchCtx,
2156    last_took_ns: Arc<AtomicU64>,
2157    clock: Arc<dyn MonotonicClock>,
2158) -> Box<dyn FnMut() + Send + 'static> {
2159    Box::new(move || {
2160        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2161        if let Some(aid) = app_id {
2162            obs.on_app_start(id.clone(), aid, app_inst);
2163        }
2164        let raw = item_ptr.get();
2165        let started = std::time::Instant::now();
2166        // Telemetry `took` is measured on the injected clock (REQ_0105) so a
2167        // MockClock can make it exact; the real `started`/`took` below stay on
2168        // the system clock for the monitor and fault-budget paths.
2169        let tele_t0 = clock.now_nanos();
2170        mon.pre_execute(id.clone(), started);
2171        // SAFETY: barrier() pairs with this invocation; the WaitSet
2172        // thread does not touch the item between `submit_borrowed` and
2173        // the matching `barrier()`. See SendItemPtr safety doc.
2174        #[allow(unsafe_code)]
2175        let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2176        let took = started.elapsed();
2177        // Release pairs with the WaitSet-thread Acquire (swap) in
2178        // `record_cycle_for` (M2). `pool.barrier()` also fences, but the
2179        // explicit pairing documents intent and is robust on weak-memory archs.
2180        last_took_ns.store(clock.now_nanos().saturating_sub(tele_t0), Ordering::Release);
2181        mon.post_execute(id.clone(), started, took, res.is_ok());
2182        if let Err(ref e) = res {
2183            obs.on_app_error(id.clone(), e.as_ref());
2184        }
2185        if app_id.is_some() {
2186            obs.on_app_stop(id.clone());
2187        }
2188        post_execute_detect_fault(&id, started, took, &fault_ctx);
2189        record_first_err(&err_slot, &id, res);
2190    })
2191}
2192
2193/// Build the per-iteration dispatch closure for a fault-handler item.
2194///
2195/// Mirrors [`build_single_job`] in every detail (same monitor /
2196/// observer / first-error capture wiring) but owns the
2197/// `Box<dyn ExecutableItem>` directly inside the closure instead of
2198/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
2199/// owner inside [`TaskEntry`] — the handler closure stored in
2200/// `handler_job` is the sole owner — so the simpler owning form is
2201/// both sound and avoids the aliasing dance the main item needs.
2202/// (Unlike [`build_single_job`], this closure does NOT update
2203/// `last_took_ns` — the handler runs in place of the main item, so the
2204/// main item's `last_took_ns` keeps its sentinel `u64::MAX` = "no
2205/// sample this cycle".)
2206/// `REQ_0072`.
2207#[allow(clippy::too_many_arguments)]
2208fn build_handler_job(
2209    id: TaskId,
2210    stop: Stoppable,
2211    obs: Arc<dyn Observer>,
2212    mon: Arc<dyn ExecutionMonitor>,
2213    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2214    app_id: Option<u32>,
2215    app_inst: Option<u32>,
2216    mut handler: Box<dyn ExecutableItem>,
2217    fault_ctx: FaultDispatchCtx,
2218) -> Box<dyn FnMut() + Send + 'static> {
2219    Box::new(move || {
2220        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2221        if let Some(aid) = app_id {
2222            obs.on_app_start(id.clone(), aid, app_inst);
2223        }
2224        let started = std::time::Instant::now();
2225        mon.pre_execute(id.clone(), started);
2226        let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
2227        let took = started.elapsed();
2228        mon.post_execute(id.clone(), started, took, res.is_ok());
2229        if let Err(ref e) = res {
2230            obs.on_app_error(id.clone(), e.as_ref());
2231        }
2232        if app_id.is_some() {
2233            obs.on_app_stop(id.clone());
2234        }
2235        // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
2236        // budget keeps the task in `Faulted` (state already `Faulted`),
2237        // `overrun_count` increments, NO new `on_task_fault` fires —
2238        // the `matches!(prev, FaultState::Running)` gate inside
2239        // `post_execute_detect_fault` enforces that.
2240        post_execute_detect_fault(&id, started, took, &fault_ctx);
2241        record_first_err(&err_slot, &id, res);
2242    })
2243}
2244
2245/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
2246#[allow(clippy::too_many_arguments)]
2247fn build_chain_job(
2248    id: TaskId,
2249    stop: Stoppable,
2250    obs: Arc<dyn Observer>,
2251    mon: Arc<dyn ExecutionMonitor>,
2252    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2253    chain_ptr: SendChainPtr,
2254    fault_ctx: FaultDispatchCtx,
2255    last_took_ns: Arc<AtomicU64>,
2256    clock: Arc<dyn MonotonicClock>,
2257) -> Box<dyn FnMut() + Send + 'static> {
2258    Box::new(move || {
2259        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2260        // Overall chain scan timer — the chain's `took` is the elapsed
2261        // telemetry-clock time from the first item's pre-execute to the last
2262        // item's completion (or early break), mirroring the single-item `took`
2263        // notion (REQ_0105). Per-item monitor timing uses each item's own
2264        // real-clock `started` below.
2265        let chain_tele_t0 = clock.now_nanos();
2266        // SAFETY: barrier() pairs with this invocation; the chain Vec
2267        // and the items it owns are not touched by the WaitSet thread
2268        // until barrier() returns. See SendChainPtr safety doc.
2269        #[allow(unsafe_code)]
2270        let chain_items = unsafe { &mut *chain_ptr.get() };
2271        for item_box in chain_items.iter_mut() {
2272            let app_id = item_box.app_id();
2273            let app_inst = item_box.app_instance_id();
2274            if let Some(aid) = app_id {
2275                obs.on_app_start(id.clone(), aid, app_inst);
2276            }
2277            let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
2278            let started = std::time::Instant::now();
2279            mon.pre_execute(id.clone(), started);
2280            #[allow(unsafe_code)]
2281            let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2282            let took = started.elapsed();
2283            mon.post_execute(id.clone(), started, took, res.is_ok());
2284            if let Err(ref e) = res {
2285                obs.on_app_error(id.clone(), e.as_ref());
2286            }
2287            if app_id.is_some() {
2288                obs.on_app_stop(id.clone());
2289            }
2290            // Per-item post-execute fault detection. `task_budget` is
2291            // `None` for chains (see `add_chain_with_id_boxed`), so the
2292            // per-task check no-ops; the executor-wide iteration-budget
2293            // check still fires per item. `REQ_0071`.
2294            post_execute_detect_fault(&id, started, took, &fault_ctx);
2295            match res {
2296                Ok(crate::ControlFlow::Continue) => {}
2297                Ok(crate::ControlFlow::StopChain) => break,
2298                Err(_) => {
2299                    record_first_err(&err_slot, &id, res);
2300                    break;
2301                }
2302            }
2303        }
2304        // Release pairs with the WaitSet-thread Acquire (swap) in
2305        // `record_cycle_for` (M2). See the Single-job store for the rationale.
2306        last_took_ns.store(
2307            clock.now_nanos().saturating_sub(chain_tele_t0),
2308            Ordering::Release,
2309        );
2310    })
2311}
2312
2313#[derive(Debug)]
2314struct PanickedTask(String);
2315
2316impl core::fmt::Display for PanickedTask {
2317    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2318        write!(f, "task panicked: {}", self.0)
2319    }
2320}
2321
2322impl std::error::Error for PanickedTask {}
2323
2324/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
2325fn run_item_catch_unwind(
2326    item: &mut dyn ExecutableItem,
2327    ctx: &mut crate::context::Context<'_>,
2328) -> crate::ExecuteResult {
2329    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
2330        |payload| {
2331            let msg =
2332                panic_payload_message(&*payload).unwrap_or_else(|| "panicked task".to_string());
2333            Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
2334        },
2335    )
2336}
2337
2338/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
2339/// without depending on its private name.
2340pub(crate) fn run_item_catch_unwind_external(
2341    item: &mut dyn ExecutableItem,
2342    ctx: &mut crate::context::Context<'_>,
2343) -> crate::ExecuteResult {
2344    run_item_catch_unwind(item, ctx)
2345}
2346
2347/// Record the first error into `slot`. Subsequent errors are silently dropped.
2348fn record_first_err(
2349    slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
2350    id: &TaskId,
2351    res: crate::ExecuteResult,
2352) {
2353    if let Err(source) = res {
2354        let mut g = slot.lock().unwrap();
2355        if g.is_none() {
2356            *g = Some(ExecutorError::Item {
2357                task_id: id.clone(),
2358                source,
2359            });
2360        }
2361    }
2362}
2363
2364/// Post-execute fault detection — runs on a pool worker AFTER
2365/// `mon.post_execute` so the full `took` is available. Implements:
2366///
2367///   * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
2368///     `overrun_count` on every breach, transitions
2369///     `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
2370///     breaches keep the state `Faulted` and do NOT re-fire the
2371///     observer).
2372///   * `REQ_0071` — executor-wide iteration overrun: transitions
2373///     `Running -> Faulted{IterationBudgetExceeded}` exactly once;
2374///     cascade to per-task state is LAZY (see the pre-dispatch block
2375///     in `dispatch_loop`), so the per-task `on_task_fault` does NOT
2376///     fire during cascade — only `on_executor_fault` does.
2377fn post_execute_detect_fault(
2378    id: &TaskId,
2379    started: Instant,
2380    took: Duration,
2381    fault_ctx: &FaultDispatchCtx,
2382) {
2383    // REQ_0070 / REQ_0102 — per-task budget overrun.
2384    if let Some(budget) = fault_ctx.task_budget {
2385        if took > budget {
2386            fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
2387            let took_ms = duration_to_ms_sat(took);
2388            let budget_ms = duration_to_ms_sat(budget);
2389            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2390            let since_ms = instant_to_since_ms(started, exec_start);
2391            let new_state = FaultState::Faulted {
2392                reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
2393                since_ms,
2394            };
2395            let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
2396            if matches!(prev, FaultState::Running) {
2397                fault_ctx.observer.on_task_fault(
2398                    id.clone(),
2399                    FaultReason::BudgetExceeded { took_ms, budget_ms },
2400                );
2401            }
2402        }
2403    }
2404
2405    // REQ_0071 — executor-wide iteration overrun.
2406    if let Some(iter_budget) = fault_ctx.iteration_budget {
2407        if took > iter_budget {
2408            let took_ms = duration_to_ms_sat(took);
2409            let budget_ms = duration_to_ms_sat(iter_budget);
2410            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2411            let since_ms = instant_to_since_ms(started, exec_start);
2412            fault_ctx
2413                .exec_fault_task_idx
2414                .store(fault_ctx.task_idx_u32, Ordering::Release);
2415            fault_ctx
2416                .exec_fault_budget_ms
2417                .store(budget_ms, Ordering::Release);
2418            let new_state = ExecutorFaultState::Faulted {
2419                reason: ExecutorFaultReason::IterationBudgetExceeded {
2420                    task_idx: fault_ctx.task_idx_u32,
2421                    took_ms,
2422                    budget_ms,
2423                },
2424                since_ms,
2425            };
2426            let prev = fault_ctx
2427                .exec_fault
2428                .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
2429            if matches!(prev, ExecutorFaultState::Running) {
2430                fault_ctx.observer.on_executor_fault(
2431                    ExecutorFaultReason::IterationBudgetExceeded {
2432                        task_idx: fault_ctx.task_idx_u32,
2433                        took_ms,
2434                        budget_ms,
2435                    },
2436                );
2437                // NO eager cascade here. Cascade is lazy: the
2438                // pre-dispatch block in `dispatch_loop` transitions
2439                // each `Running` task to `Faulted{ExecutorFaulted}` on
2440                // the next wakeup — silently, so per-task observers
2441                // do not fire (see §4.6 invariant on cascade-noise).
2442            }
2443        }
2444    }
2445}
2446
2447// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
2448
2449/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
2450/// into a registered task.
2451pub struct ExecutorGraphBuilder<'e> {
2452    executor: &'e mut Executor,
2453    builder: crate::graph::GraphBuilder,
2454    custom_id: Option<TaskId>,
2455}
2456
2457impl ExecutorGraphBuilder<'_> {
2458    /// Add a vertex to the graph; returns its handle.
2459    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
2460        self.builder.vertex(item)
2461    }
2462
2463    /// Add a directed edge from one vertex to another.
2464    pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
2465        self.builder.edge(from, to);
2466        self
2467    }
2468
2469    /// Designate the root vertex (its triggers gate the graph).
2470    pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
2471        self.builder.root(v);
2472        self
2473    }
2474
2475    /// Override the auto-generated id with a custom one.
2476    pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
2477        self.custom_id = Some(id.into());
2478        self
2479    }
2480
2481    /// Validate and register the graph. Returns the task id.
2482    ///
2483    /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
2484    /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
2485    /// precedence over the auto-generated id.
2486    pub fn build(self) -> Result<TaskId, ExecutorError> {
2487        let g = self.builder.finish()?;
2488        // Root vertex's task_id() override wins over the custom id, which wins
2489        // over the auto-generated fallback.
2490        let auto_id = || {
2491            TaskId::new(format!(
2492                "graph-{}",
2493                self.executor.next_id.fetch_add(1, Ordering::SeqCst)
2494            ))
2495        };
2496        let id = g
2497            .root_task_id()
2498            .map(TaskId::new)
2499            .or(self.custom_id)
2500            .unwrap_or_else(auto_id);
2501        let decls = g.decls.clone();
2502        // The graph root's decls become a grid-registered TaskEntry, so the same
2503        // cyclic-XOR-event-driven / non-zero-period validation that guards the
2504        // single-item, fault-handler, and chain add paths must guard this one too
2505        // (REQ_0268). Non-root vertex triggers never reach a TaskEntry — they are
2506        // discarded in `GraphBuilder::collect_root_decls` — so validating the root
2507        // decls is sufficient.
2508        validate_decls(&id, &decls)?;
2509        let scan_period = scan_period_from_decls(&decls);
2510
2511        // Box the graph for address stability — per-vertex dispatch
2512        // closures capture `*const Graph` and must not see it move.
2513        let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
2514        // Pre-build the per-vertex closures now that we know the
2515        // task_id and have access to the executor's shared state.
2516        graph_box.prepare_dispatch(
2517            id.clone(),
2518            self.executor.stoppable.clone(),
2519            Arc::clone(&self.executor.observer),
2520            Arc::clone(&self.executor.monitor),
2521            Arc::clone(&self.executor.iter_err),
2522        );
2523
2524        self.executor.tasks.push(TaskEntry {
2525            id: id.clone(),
2526            kind: TaskKind::Graph(graph_box),
2527            decls,
2528            // Graph tasks dispatch their vertices via `vertex_jobs`
2529            // stored inside the `Graph`; the per-task `job` slot
2530            // is unused for graphs.
2531            job: None,
2532            // TODO(post-Task-10): graph budgets carried separately; for now None.
2533            budget: None,
2534            fault: Arc::new(FaultAtomic::new()),
2535            overrun_count: Arc::new(AtomicU64::new(0)),
2536            handler_job: None,
2537            scan_period,
2538            // Graphs dispatch vertices via their own path and do not ferry a
2539            // per-task `took`; sentinel = "no sample". Wired for struct
2540            // completeness; nothing reads it yet (Task 6).
2541            last_took_ns: Arc::new(AtomicU64::new(u64::MAX)),
2542            last_dispatch: None,
2543            grid_slot: 0,
2544            pending_cycle: None,
2545        });
2546        self.executor
2547            .cycle_stats
2548            .push(TaskCycleStats::new(self.executor.stats_window));
2549        Ok(id)
2550    }
2551}
2552
2553// ── Unit tests ────────────────────────────────────────────────────────────────
2554
2555#[cfg(test)]
2556mod tests {
2557    use super::*;
2558    use crate::{ControlFlow, item};
2559    use iceoryx2::prelude::ZeroCopySend;
2560
2561    /// Minimal zero-copy payload for tests that need a real subscriber to
2562    /// produce a listener-backed trigger decl.
2563    #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
2564    #[repr(C)]
2565    struct Msg(u32);
2566
2567    #[test]
2568    fn add_returns_unique_ids() {
2569        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2570        let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2571        let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2572        assert_ne!(a, b);
2573    }
2574
2575    #[test]
2576    fn grid_mode_dispatches_cyclic_task_each_cycle() {
2577        use std::sync::Arc;
2578        use std::sync::atomic::{AtomicU64, Ordering};
2579        let hits = Arc::new(AtomicU64::new(0));
2580        let h = Arc::clone(&hits);
2581        let mut exec = Executor::builder()
2582            .worker_threads(0)
2583            .dispatch_mode(crate::DispatchMode::Grid)
2584            .build()
2585            .expect("build");
2586        exec.add(crate::item::item_with_triggers(
2587            move |d| {
2588                d.interval(std::time::Duration::from_millis(1));
2589                Ok(())
2590            },
2591            move |_ctx| {
2592                h.fetch_add(1, Ordering::Relaxed);
2593                Ok(ControlFlow::Continue)
2594            },
2595        ))
2596        .expect("add");
2597        exec.run_n(10).expect("run");
2598        assert!(
2599            hits.load(Ordering::Relaxed) >= 8,
2600            "grid mode under-dispatched: {}",
2601            hits.load(Ordering::Relaxed)
2602        );
2603    }
2604
2605    #[test]
2606    fn legacy_mode_dispatches_cyclic_task_each_cycle() {
2607        use std::sync::Arc;
2608        use std::sync::atomic::{AtomicU64, Ordering};
2609        let hits = Arc::new(AtomicU64::new(0));
2610        let h = Arc::clone(&hits);
2611        let mut exec = Executor::builder()
2612            .worker_threads(0)
2613            .dispatch_mode(crate::DispatchMode::Legacy)
2614            .build()
2615            .expect("build");
2616        exec.add(crate::item::item_with_triggers(
2617            move |d| {
2618                d.interval(std::time::Duration::from_millis(1));
2619                Ok(())
2620            },
2621            move |_ctx| {
2622                h.fetch_add(1, Ordering::Relaxed);
2623                Ok(ControlFlow::Continue)
2624            },
2625        ))
2626        .expect("add");
2627        exec.run_n(10).expect("run");
2628        assert!(
2629            hits.load(Ordering::Relaxed) >= 8,
2630            "legacy mode under-dispatched: {}",
2631            hits.load(Ordering::Relaxed)
2632        );
2633    }
2634
2635    // --- REQ_0268 trigger-combination validation (Fix 1 / Fix 3) ---
2636
2637    #[test]
2638    fn add_rejects_cyclic_plus_subscriber_combination() {
2639        use core::time::Duration;
2640        // A task declaring BOTH an Interval and a listener-backed trigger is
2641        // ill-defined (cyclic XOR event-driven, REQ_0106) and must be rejected
2642        // at add time. We use a real subscriber so the listener decl is genuine.
2643        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2644        let ch = exec.channel::<Msg>("taktora.test.req0268.combo").unwrap();
2645        let sub = ch.subscriber().unwrap();
2646        let err = exec
2647            .add(crate::item::item_with_triggers(
2648                move |d| {
2649                    d.interval(Duration::from_millis(1));
2650                    d.subscriber(&sub);
2651                    Ok(())
2652                },
2653                |_| Ok(crate::ControlFlow::Continue),
2654            ))
2655            .expect_err("interval + subscriber must be rejected");
2656        match err {
2657            ExecutorError::DeclareTriggers(msg) => {
2658                assert!(
2659                    msg.contains("cyclic") && msg.contains("event-driven"),
2660                    "message must explain cyclic vs event-driven: {msg}"
2661                );
2662                assert!(
2663                    msg.contains("split"),
2664                    "message must suggest splitting into two tasks: {msg}"
2665                );
2666            }
2667            other => panic!("expected DeclareTriggers, got {other:?}"),
2668        }
2669    }
2670
2671    #[test]
2672    fn add_rejects_cyclic_plus_listener_regardless_of_mode() {
2673        use core::time::Duration;
2674        // The combination is ill-defined irrespective of DispatchMode (Legacy
2675        // is temporary), so Legacy must reject it too.
2676        let mut exec = Executor::builder()
2677            .worker_threads(0)
2678            .dispatch_mode(crate::DispatchMode::Legacy)
2679            .build()
2680            .unwrap();
2681        let ch = exec
2682            .channel::<Msg>("taktora.test.req0268.combo.legacy")
2683            .unwrap();
2684        let sub = ch.subscriber().unwrap();
2685        let err = exec
2686            .add(crate::item::item_with_triggers(
2687                move |d| {
2688                    d.interval(Duration::from_millis(1));
2689                    d.subscriber(&sub);
2690                    Ok(())
2691                },
2692                |_| Ok(crate::ControlFlow::Continue),
2693            ))
2694            .expect_err("interval + subscriber must be rejected in Legacy too");
2695        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2696    }
2697
2698    #[test]
2699    fn add_accepts_multiple_intervals_and_single_kinds() {
2700        use core::time::Duration;
2701        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2702        // Multiple Interval decls: still cyclic-only, accepted.
2703        exec.add(crate::item::item_with_triggers(
2704            |d| {
2705                d.interval(Duration::from_millis(1));
2706                d.interval(Duration::from_millis(2));
2707                Ok(())
2708            },
2709            |_| Ok(crate::ControlFlow::Continue),
2710        ))
2711        .expect("multiple intervals accepted");
2712        // Single interval: accepted.
2713        exec.add(crate::item::item_with_triggers(
2714            |d| {
2715                d.interval(Duration::from_millis(1));
2716                Ok(())
2717            },
2718            |_| Ok(crate::ControlFlow::Continue),
2719        ))
2720        .expect("single interval accepted");
2721        // Multiple listeners (no interval): accepted.
2722        let ch = exec
2723            .channel::<Msg>("taktora.test.req0268.multi.listener")
2724            .unwrap();
2725        let sub_a = ch.subscriber().unwrap();
2726        let sub_b = ch.subscriber().unwrap();
2727        exec.add(crate::item::item_with_triggers(
2728            move |d| {
2729                d.subscriber(&sub_a);
2730                d.subscriber(&sub_b);
2731                Ok(())
2732            },
2733            |_| Ok(crate::ControlFlow::Continue),
2734        ))
2735        .expect("multiple listeners accepted");
2736    }
2737
2738    #[test]
2739    fn add_rejects_zero_period_interval() {
2740        use core::time::Duration;
2741        // A zero-period interval busy-spins the grid (next_timeout == 0 every
2742        // wake), so it must be rejected at add time.
2743        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2744        let err = exec
2745            .add(crate::item::item_with_triggers(
2746                |d| {
2747                    d.interval(Duration::ZERO);
2748                    Ok(())
2749                },
2750                |_| Ok(crate::ControlFlow::Continue),
2751            ))
2752            .expect_err("zero-period interval must be rejected");
2753        match err {
2754            ExecutorError::DeclareTriggers(msg) => {
2755                assert!(
2756                    msg.contains("zero"),
2757                    "message must mention the zero period: {msg}"
2758                );
2759            }
2760            other => panic!("expected DeclareTriggers, got {other:?}"),
2761        }
2762    }
2763
2764    #[test]
2765    fn add_chain_rejects_cyclic_plus_listener() {
2766        use core::time::Duration;
2767        // The chain path collects the head item's decls; the same validation
2768        // must apply there.
2769        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2770        let ch = exec
2771            .channel::<Msg>("taktora.test.req0268.chain.combo")
2772            .unwrap();
2773        let sub = ch.subscriber().unwrap();
2774        let err = exec
2775            .add_chain(vec![crate::item::item_with_triggers(
2776                move |d| {
2777                    d.interval(Duration::from_millis(1));
2778                    d.subscriber(&sub);
2779                    Ok(())
2780                },
2781                |_| Ok(crate::ControlFlow::Continue),
2782            )])
2783            .expect_err("chain head interval + subscriber must be rejected");
2784        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2785    }
2786
2787    #[test]
2788    fn add_chain_rejects_zero_period_interval() {
2789        use core::time::Duration;
2790        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2791        let err = exec
2792            .add_chain(vec![crate::item::item_with_triggers(
2793                |d| {
2794                    d.interval(Duration::ZERO);
2795                    Ok(())
2796                },
2797                |_| Ok(crate::ControlFlow::Continue),
2798            )])
2799            .expect_err("chain head zero-period interval must be rejected");
2800        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2801    }
2802
2803    #[test]
2804    fn add_graph_rejects_cyclic_plus_listener() {
2805        use core::time::Duration;
2806        // The graph path collects the root vertex's decls into a grid-registered
2807        // TaskEntry; the same cyclic-XOR-event-driven validation must apply there
2808        // (REQ_0268). We use a real subscriber so the listener decl is genuine.
2809        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2810        let ch = exec
2811            .channel::<Msg>("taktora.test.req0268.graph.combo")
2812            .unwrap();
2813        let sub = ch.subscriber().unwrap();
2814        let mut g = exec.add_graph();
2815        let r = g.vertex(crate::item::item_with_triggers(
2816            move |d| {
2817                d.interval(Duration::from_millis(1));
2818                d.subscriber(&sub);
2819                Ok(())
2820            },
2821            |_| Ok(crate::ControlFlow::Continue),
2822        ));
2823        g.root(r);
2824        let err = g
2825            .build()
2826            .expect_err("graph root interval + subscriber must be rejected");
2827        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2828    }
2829
2830    #[test]
2831    fn add_graph_rejects_zero_period_interval() {
2832        use core::time::Duration;
2833        // A zero-period interval on the graph root busy-spins the grid, so the
2834        // graph path must reject it just like the single-item/chain paths.
2835        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2836        let mut g = exec.add_graph();
2837        let r = g.vertex(crate::item::item_with_triggers(
2838            |d| {
2839                d.interval(Duration::ZERO);
2840                Ok(())
2841            },
2842            |_| Ok(crate::ControlFlow::Continue),
2843        ));
2844        g.root(r);
2845        let err = g
2846            .build()
2847            .expect_err("graph root zero-period interval must be rejected");
2848        assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2849    }
2850
2851    #[test]
2852    fn stopped_iteration_emits_no_cyclic_cycle_observation() {
2853        use core::time::Duration;
2854        use std::sync::atomic::AtomicU64;
2855
2856        // A CyclicClock that starts at 0 (epoch) then jumps far past the first
2857        // grid target, so the post-wait `take_due` finds the cyclic task due on
2858        // the very first (stopping) wake. Distinct from the telemetry clock
2859        // (scheduling role).
2860        struct JumpClock {
2861            calls: AtomicU64,
2862        }
2863        impl crate::CyclicClock for JumpClock {
2864            fn now_nanos(&self) -> u64 {
2865                // First read (grid epoch at loop entry) = 0; every later read
2866                // is well past the 1ms target.
2867                if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
2868                    0
2869                } else {
2870                    1_000_000_000
2871                }
2872            }
2873        }
2874
2875        // Observer that counts on_cycle_stats calls.
2876        struct Counter {
2877            cycles: AtomicU64,
2878        }
2879        impl Observer for Counter {
2880            fn on_cycle_stats(&self, _obs: &CycleObservation) {
2881                self.cycles.fetch_add(1, Ordering::SeqCst);
2882            }
2883        }
2884
2885        let counter = Arc::new(Counter {
2886            cycles: AtomicU64::new(0),
2887        });
2888        let mut exec = Executor::builder()
2889            .worker_threads(0)
2890            .dispatch_mode(crate::DispatchMode::Grid)
2891            .cyclic_clock(Arc::new(JumpClock {
2892                calls: AtomicU64::new(0),
2893            }))
2894            .observer(Arc::clone(&counter) as Arc<dyn Observer>)
2895            .build()
2896            .unwrap();
2897        exec.add(crate::item::item_with_triggers(
2898            |d| {
2899                d.interval(Duration::from_millis(1));
2900                Ok(())
2901            },
2902            |_| Ok(crate::ControlFlow::Continue),
2903        ))
2904        .unwrap();
2905
2906        // Stop BEFORE running: the WaitSet wakes immediately on the stop
2907        // listener; the grid target is already due (JumpClock). Without the
2908        // stop guard the post-wait cyclic pass would dispatch + record one
2909        // spurious cycle on this stopping iteration; with it, zero.
2910        exec.stoppable().stop();
2911        exec.run().expect("run returns cleanly after stop");
2912
2913        assert_eq!(
2914            counter.cycles.load(Ordering::SeqCst),
2915            0,
2916            "no cyclic cycle observation may be emitted on a stop wake"
2917        );
2918    }
2919
2920    #[test]
2921    fn custom_id_is_preserved() {
2922        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2923        let id = exec
2924            .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
2925            .unwrap();
2926        assert_eq!(id.as_str(), "my-task");
2927    }
2928
2929    #[test]
2930    fn add_persists_declared_budget() {
2931        use core::time::Duration;
2932        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2933        let task_id = exec
2934            .add(crate::item::item_with_triggers(
2935                |d| {
2936                    d.interval(Duration::from_millis(10));
2937                    d.budget(Duration::from_millis(5));
2938                    Ok(())
2939                },
2940                |_| Ok(crate::ControlFlow::Continue),
2941            ))
2942            .unwrap();
2943        let entry = exec
2944            .tasks
2945            .iter()
2946            .find(|t| t.id == task_id)
2947            .expect("task present");
2948        assert_eq!(entry.budget, Some(Duration::from_millis(5)));
2949    }
2950
2951    #[test]
2952    fn scan_period_cached_for_cyclic_only() {
2953        use core::time::Duration;
2954        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2955        let cyclic = exec
2956            .add(crate::item::item_with_triggers(
2957                |d| {
2958                    d.interval(Duration::from_millis(5));
2959                    Ok(())
2960                },
2961                |_| Ok(crate::ControlFlow::Continue),
2962            ))
2963            .unwrap();
2964        let event_driven = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2965
2966        let cyclic_entry = exec
2967            .tasks
2968            .iter()
2969            .find(|t| t.id == cyclic)
2970            .expect("cyclic task present");
2971        assert_eq!(cyclic_entry.scan_period, Some(Duration::from_millis(5)));
2972        // Sentinel: no sample has been taken yet.
2973        assert_eq!(cyclic_entry.last_took_ns.load(Ordering::Relaxed), u64::MAX);
2974
2975        let event_entry = exec
2976            .tasks
2977            .iter()
2978            .find(|t| t.id == event_driven)
2979            .expect("event-driven task present");
2980        assert_eq!(event_entry.scan_period, None);
2981    }
2982
2983    #[test]
2984    fn cycle_stats_index_aligned_with_tasks() {
2985        use core::time::Duration;
2986        let mut exec = Executor::builder()
2987            .worker_threads(0)
2988            .stats_window(512)
2989            .build()
2990            .unwrap();
2991        // Builder option flows through to the executor.
2992        assert_eq!(exec.stats_window, 512);
2993        // No tasks yet → both Vecs empty and aligned.
2994        assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
2995
2996        // Cyclic single-item add path.
2997        exec.add(crate::item::item_with_triggers(
2998            |d| {
2999                d.interval(Duration::from_millis(5));
3000                Ok(())
3001            },
3002            |_| Ok(crate::ControlFlow::Continue),
3003        ))
3004        .unwrap();
3005        // Event-driven single-item add path.
3006        exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3007
3008        assert_eq!(exec.tasks.len(), 2);
3009        assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3010    }
3011
3012    #[test]
3013    fn add_with_fault_handler_stores_handler_job() {
3014        use core::time::Duration;
3015        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3016        let task_id = exec
3017            .add_with_fault_handler(
3018                crate::item::item_with_triggers(
3019                    |d| {
3020                        d.interval(Duration::from_millis(10));
3021                        d.budget(Duration::from_millis(5));
3022                        Ok(())
3023                    },
3024                    |_| Ok(crate::ControlFlow::Continue),
3025                ),
3026                crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
3027            )
3028            .unwrap();
3029        let entry = exec
3030            .tasks
3031            .iter()
3032            .find(|t| t.id == task_id)
3033            .expect("task present");
3034        assert!(
3035            entry.handler_job.is_some(),
3036            "handler_job should be Some after add_with_fault_handler"
3037        );
3038        // Main job should still be present.
3039        assert!(entry.job.is_some(), "main job should still be present");
3040    }
3041
3042    #[test]
3043    fn declare_triggers_called_at_add_time() {
3044        let called = Arc::new(AtomicBool::new(false));
3045        let called_d = Arc::clone(&called);
3046
3047        let it = crate::item::item_with_triggers(
3048            move |_d| {
3049                called_d.store(true, Ordering::SeqCst);
3050                Ok(())
3051            },
3052            |_| Ok(ControlFlow::Continue),
3053        );
3054
3055        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3056        exec.add(it).unwrap();
3057        assert!(called.load(Ordering::SeqCst));
3058    }
3059
3060    #[test]
3061    fn clear_task_fault_errors_on_running_task() {
3062        use core::time::Duration;
3063        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3064        let task_id = exec
3065            .add(crate::item::item_with_triggers(
3066                |d| {
3067                    d.interval(Duration::from_millis(10));
3068                    Ok(())
3069                },
3070                |_| Ok(crate::ControlFlow::Continue),
3071            ))
3072            .unwrap();
3073        // Task starts in Running state — clearing should error.
3074        let err = exec.clear_task_fault(task_id).expect_err("not faulted");
3075        assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
3076    }
3077
3078    #[test]
3079    fn clear_executor_fault_errors_on_running_executor() {
3080        let exec = Executor::builder().worker_threads(0).build().unwrap();
3081        let err = exec.clear_executor_fault().expect_err("not faulted");
3082        assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
3083    }
3084
3085    #[test]
3086    fn overrun_count_returns_zero_for_new_task() {
3087        use core::time::Duration;
3088        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3089        let task_id = exec
3090            .add(crate::item::item_with_triggers(
3091                |d| {
3092                    d.interval(Duration::from_millis(10));
3093                    d.budget(Duration::from_millis(5));
3094                    Ok(())
3095                },
3096                |_| Ok(crate::ControlFlow::Continue),
3097            ))
3098            .unwrap();
3099        assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
3100    }
3101
3102    #[test]
3103    fn overrun_count_errors_for_unknown_task() {
3104        let exec = Executor::builder().worker_threads(0).build().unwrap();
3105        let err = exec
3106            .overrun_count(crate::TaskId::new("nope"))
3107            .expect_err("unknown task");
3108        assert!(matches!(err, ExecutorError::TaskNotFound(_)));
3109    }
3110
3111    #[test]
3112    fn task_fault_state_starts_running() {
3113        use core::time::Duration;
3114        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3115        let task_id = exec
3116            .add(crate::item::item_with_triggers(
3117                |d| {
3118                    d.interval(Duration::from_millis(10));
3119                    Ok(())
3120                },
3121                |_| Ok(crate::ControlFlow::Continue),
3122            ))
3123            .unwrap();
3124        assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
3125    }
3126
3127    #[test]
3128    fn executor_fault_state_starts_running() {
3129        let exec = Executor::builder().worker_threads(0).build().unwrap();
3130        assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
3131    }
3132
3133    // --- on_fatal / FatalDispatch integration tests ---
3134
3135    #[test]
3136    fn build_without_on_fatal_succeeds() {
3137        use crate::fatal::{FatalContext, FatalSite};
3138        use std::sync::{Arc, Mutex};
3139        // Default builder (no on_fatal) must build successfully.
3140        let exec = Executor::builder().worker_threads(0).build().unwrap();
3141        // The fatal_dispatch field is present; fire via a test terminal to
3142        // confirm the no-op handler doesn't blow up.
3143        let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3144        let reached2 = Arc::clone(&reached);
3145        let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3146            exec.fatal_dispatch.handler().clone(),
3147            move |_| {
3148                *reached2.lock().unwrap() = true;
3149            },
3150        );
3151        test_dispatch.fire(&FatalContext {
3152            cause: "test".to_string(),
3153            site: FatalSite::PoolWorker,
3154        });
3155        assert!(*reached.lock().unwrap(), "terminal not reached");
3156    }
3157
3158    #[test]
3159    fn on_fatal_handler_is_stored_and_invoked() {
3160        use crate::fatal::{FatalContext, FatalSite};
3161        use std::sync::{Arc, Mutex};
3162        let called: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3163        let called2 = Arc::clone(&called);
3164        let exec = Executor::builder()
3165            .worker_threads(0)
3166            .on_fatal(move |ctx| {
3167                called2.lock().unwrap().push(ctx.cause.clone());
3168            })
3169            .build()
3170            .unwrap();
3171        // Verify the handler fires via a test terminal.
3172        let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3173        let reached2 = Arc::clone(&reached);
3174        let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3175            exec.fatal_dispatch.handler().clone(),
3176            move |_| {
3177                *reached2.lock().unwrap() = true;
3178            },
3179        );
3180        test_dispatch.fire(&FatalContext {
3181            cause: "my-cause".to_string(),
3182            site: FatalSite::ExecutorRunLoop,
3183        });
3184        assert!(*reached.lock().unwrap(), "terminal not reached");
3185        let log = called.lock().unwrap().clone();
3186        assert_eq!(
3187            log,
3188            vec!["my-cause"],
3189            "handler should have been called with cause"
3190        );
3191    }
3192}