taktora_executor/executor.rs
1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::clock::{MonotonicClock, SystemClock};
10use crate::context::Stoppable;
11use crate::error::ExecutorError;
12use crate::fatal::{FatalDispatch, FatalHandler, FatalSite, guard_or_fatal, panic_payload_message};
13use crate::fault::{
14 ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
15 FaultState, duration_to_ms_sat, instant_to_since_ms,
16};
17use crate::item::ExecutableItem;
18use crate::monitor::{ExecutionMonitor, NoopMonitor};
19use crate::observer::{NoopObserver, Observer};
20use crate::payload::Payload;
21use crate::pool::Pool;
22use crate::stats::{CycleObservation, StatsSnapshot, TaskStatsEntry};
23use crate::task_id::TaskId;
24use crate::task_kind::TaskKind;
25use crate::thread_attrs::ThreadAttributes;
26use crate::trigger::{TriggerDecl, TriggerDeclarer};
27use core::sync::atomic::AtomicU32;
28use iceoryx2::node::Node;
29use iceoryx2::port::listener::Listener as IxListener;
30use iceoryx2::prelude::ipc;
31use iceoryx2::prelude::*;
32use iceoryx2::waitset::WaitSetRunResult;
33use std::sync::Arc;
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
36use std::time::{Duration, Instant};
37use taktora_stats::ExecutorCycleStats;
38
39/// Monotonically increasing counter so multiple executors in the same process
40/// each get a unique stop-event service name.
41static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
42
43/// Executor histogram segment count (`S`) and exact-window length (`W`) for
44/// per-task cycle stats. Fixed at compile time per `ADR_0060`.
45pub(crate) type TaskCycleStats = ExecutorCycleStats<8, 256>;
46
47/// A single wakeup's pending cycle record, stashed on the [`TaskEntry`] between
48/// the pre-dispatch capture and the post-barrier fold. Bundling the pre-dispatch
49/// timestamp with its `faulted` flag in one `Option` makes them impossible to
50/// desync: a cycle is pending iff this is `Some`, and the `faulted` bit is then
51/// always the one captured at the same wakeup (`REQ_0107`).
52#[derive(Clone, Copy)]
53pub(crate) struct CyclePending {
54 /// Pre-dispatch timestamp for this wakeup (the cycle's `pre`), in
55 /// telemetry-clock nanoseconds (see [`MonotonicClock`]).
56 pub(crate) pre: u64,
57 /// `true` when this wakeup's scan was fault-routed/skipped, so the
58 /// post-barrier fold records it with `faulted=true`.
59 pub(crate) faulted: bool,
60}
61
62/// One registered task entry.
63pub(crate) struct TaskEntry {
64 /// Task identifier.
65 pub(crate) id: TaskId,
66 /// The kind of work this entry holds (single item or chain).
67 pub(crate) kind: TaskKind,
68 /// Trigger declarations recorded at `add` time.
69 pub(crate) decls: Vec<TriggerDecl>,
70 /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
71 /// time and re-invoked on every dispatch iteration via
72 /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
73 /// that `Pool::submit<F>` requires in threaded mode. Required for
74 /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
75 /// `TaskKind::Graph`, which dispatches its vertices via a separate
76 /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
77 pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
78
79 /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
80 /// means no per-task check; the executor-wide iteration budget
81 /// still applies. `REQ_0070`.
82 pub(crate) budget: Option<Duration>,
83
84 /// Per-task fault state. Wait-free read on the dispatch hot path.
85 /// Wrapped in `Arc` so dispatch closures built at `add` time can
86 /// capture an owning handle into the same atomic the `TaskEntry`
87 /// holds — `Arc::clone` is refcount-only, so this stays compatible
88 /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
89 pub(crate) fault: Arc<FaultAtomic>,
90
91 /// Monotonic per-task overrun counter. Increments on EVERY budget
92 /// breach, including breaches while already `Faulted`. Never reset
93 /// by clearing the fault. Shared with the dispatch closure via
94 /// `Arc::clone`. `REQ_0102`.
95 pub(crate) overrun_count: Arc<AtomicU64>,
96
97 /// Pre-built dispatch closure for the fault-handler item. Mirrors
98 /// `job`. `None` means no handler — the task is simply skipped
99 /// during fault. `REQ_0072`.
100 pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
101
102 /// Declared scan period for cyclic tasks (the `TriggerDecl::Interval`
103 /// duration), or `None` for event-driven tasks. Cached at add time so the
104 /// dispatch loop reads it without scanning `decls` per cycle. Gates cycle
105 /// telemetry: only cyclic tasks participate (`REQ_0106`).
106 pub(crate) scan_period: Option<Duration>,
107 /// Last-cycle execute duration in ns, written by the dispatch closure on
108 /// the pool worker and read by the `WaitSet` thread after `barrier()`.
109 /// Shared via `Arc` exactly like `overrun_count`. Sentinel `u64::MAX` =
110 /// "no sample this cycle" (the closure never ran — e.g. a faulted scan).
111 pub(crate) last_took_ns: Arc<AtomicU64>,
112
113 /// WaitSet-thread-only timestamp of this task's previous dispatch, for
114 /// computing `actual_period` (`REQ_0101`). Not shared (no atomic) — only the
115 /// single dispatch thread touches it. `None` before the first dispatch.
116 /// Telemetry-clock nanoseconds (see [`MonotonicClock`]).
117 pub(crate) last_dispatch: Option<u64>,
118
119 /// WaitSet-thread-only lateness grid slot for this task (`REQ_0106`).
120 /// Advances by exactly **one per scan attempt plus the dispatcher's
121 /// skipped-slot signal** (`REQ_0840`) — never reconstructed from the
122 /// noisy measured period, which over-counts on coalesced catch-up wakes
123 /// (issue #46 / `ADR_0101`). Starts at `0` (the first recorded cycle is
124 /// the task's own grid origin).
125 pub(crate) grid_slot: u64,
126
127 /// WaitSet-thread-only per-task lateness grid epoch (`REQ_0106`): the
128 /// task's first recorded `pre` — including a faulted first scan, whose
129 /// dispatch instant is real — back-dated by the dispatcher's `late_by`
130 /// signal when one is present (`Grid` mode), so the grid anchors at the
131 /// first dispatch's NOMINAL slot and a late process start is reported as
132 /// real first-cycle lateness instead of becoming a permanent negative
133 /// floor on every later on-grid cycle. Per-task (not executor-shared) so
134 /// a task's start phase never reads as permanent lateness.
135 pub(crate) grid_epoch: Option<u64>,
136
137 /// WaitSet-thread-only dispatcher skip signal for the *current* wakeup
138 /// (`REQ_0840`): written by the Grid dispatch pass before `dispatch_task`,
139 /// consumed (`take`n) by `record_cycle_for`. Never written in `Legacy`
140 /// mode or for event-driven tasks — stays `0`.
141 pub(crate) pending_skipped: u32,
142
143 /// WaitSet-thread-only dispatcher lateness signal for the *current*
144 /// wakeup (`REQ_0106`): how far past its nominal grid slot this dispatch
145 /// is, in scheduling-clock nanoseconds. Written by the Grid dispatch
146 /// pass, consumed (`take`n) by `record_cycle_for`, which uses the FIRST
147 /// one to anchor [`Self::grid_epoch`] on the nominal grid. `None` in
148 /// `Legacy` mode and for event-driven tasks (no dispatcher grid exists:
149 /// the epoch falls back to the first observed `pre`).
150 pub(crate) pending_late: Option<u64>,
151
152 /// WaitSet-thread-only stash of the *current* wakeup's pending cycle —
153 /// the pre-dispatch timestamp plus its `faulted` flag — carried across
154 /// `pool.barrier()` so the post-barrier record pass can fold this cycle's
155 /// telemetry without re-reading the clock or allocating a fired-index list.
156 /// `Some` between the pre-dispatch capture and the post-barrier
157 /// `record_cycle_for` `take`; `None` otherwise. Bundling the timestamp and
158 /// the fault flag in one `Option` keeps them from ever desyncing
159 /// (`REQ_0107`). Only the single dispatch thread touches it (no atomic).
160 pub(crate) pending_cycle: Option<CyclePending>,
161}
162
163/// Top-level executor. One per process is the typical case.
164pub struct Executor {
165 pub(crate) node: Node<ipc::Service>,
166 pub(crate) pool: Arc<Pool>,
167 pub(crate) tasks: Vec<TaskEntry>,
168 /// One cycle-stats aggregator per registered task, index-aligned with
169 /// `tasks`. Pushed at task-add time (before `run`), so no steady-state
170 /// allocation (`REQ_0060`, `REQ_0104`). Updated single-writer on the
171 /// `WaitSet` thread (Task 6).
172 pub(crate) cycle_stats: Vec<TaskCycleStats>,
173 /// Histogram sliding-window size in samples (`REQ_0100`).
174 pub(crate) stats_window: u32,
175 pub(crate) running: Arc<AtomicBool>,
176 pub(crate) stoppable: Stoppable,
177 pub(crate) next_id: AtomicU64,
178 /// Listener for the internal stop event service. Held here so it outlives
179 /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
180 /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
181 pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
182 /// Lifecycle observer. Defaults to a no-op.
183 pub(crate) observer: Arc<dyn Observer>,
184 /// Execution monitor. Defaults to a no-op.
185 pub(crate) monitor: Arc<dyn ExecutionMonitor>,
186 /// Per-iteration error capture slot — allocated once at build time and
187 /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
188 /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
189 /// the per-iteration heap allocation that the previous design incurred.
190 /// Required for `REQ_0060`.
191 pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
192 /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
193 /// `None` means no executor-wide check.
194 pub(crate) iteration_budget: Option<Duration>,
195 /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
196 /// closure can hold an owning handle without re-borrowing through
197 /// `self`. `REQ_0071`.
198 pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
199
200 /// Index of the task whose `execute()` overran when the executor
201 /// transitioned to `Faulted`. Read alongside `exec_fault`.
202 pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
203
204 /// Budget that was breached when the executor transitioned to
205 /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
206 pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
207
208 /// Executor start time, set on first dispatch. Used to compute
209 /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
210 /// in `Arc` so dispatch closures share the same `OnceLock` with the
211 /// executor — `get_or_init` is idempotent and wait-free.
212 pub(crate) start_time: Arc<OnceLock<Instant>>,
213
214 /// Fatal-dispatch handle. Called once on the fail-fast path from the
215 /// executor-thread run-loop boundary; the pool holds a separate
216 /// `Arc::clone` for its own worker / inline-submit boundaries.
217 pub(crate) fatal_dispatch: Arc<FatalDispatch>,
218
219 /// Telemetry time source (`REQ_0101`/`REQ_0105`/`REQ_0106`). Read on the
220 /// worker (for `took`) and the `WaitSet` thread (for `pre`); defaults to
221 /// [`SystemClock`]. A test can substitute a [`MockClock`] via
222 /// [`ExecutorBuilder::clock`] for deterministic timing assertions. Affects
223 /// only telemetry — never scheduling or fault behaviour.
224 pub(crate) clock: Arc<dyn MonotonicClock>,
225
226 /// Cyclic dispatch timing strategy (`REQ_0268` / `ADR_0100`). Read once at
227 /// `dispatch_loop` entry and hoisted to a local, so steady-state cost is a
228 /// single `Copy`-enum compare per cycle. Defaults to
229 /// [`DispatchMode::Grid`](crate::DispatchMode).
230 pub(crate) dispatch_mode: crate::DispatchMode,
231
232 /// Scheduling time source for the absolute grid (`REQ_0268`). Distinct from
233 /// [`Executor::clock`] (telemetry): a telemetry mock can never alter
234 /// dispatch timing. Defaults to
235 /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock).
236 pub(crate) cyclic_clock: std::sync::Arc<dyn crate::CyclicClock>,
237}
238
239// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
240// `SingleThreaded` reason as `IxNotifier`. After construction, the only
241// per-iteration call is `listener.try_wait_one()`, which does not mutate the
242// Rc. `Executor` is never shared across threads (it requires `&mut self` for
243// `run()`), so there is no aliased concurrent mutation.
244#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
245unsafe impl Send for Executor {}
246
247impl Executor {
248 /// Start a new builder.
249 #[must_use]
250 pub fn builder() -> ExecutorBuilder {
251 ExecutorBuilder::default()
252 }
253
254 /// Open or create a pub/sub channel bound to this executor's node.
255 pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
256 Channel::open_or_create(&self.node, name)
257 }
258
259 /// Open or create a request/response service bound to this executor's node.
260 pub fn service<Req, Resp>(
261 &mut self,
262 name: &str,
263 ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
264 where
265 Req: Payload,
266 Resp: Payload,
267 {
268 crate::Service::open_or_create(&self.node, name)
269 }
270
271 /// Borrowed snapshot of every task's cycle aggregates (`REQ_0103` pull
272 /// path). Relaxed reads; never blocks the dispatch writer.
273 #[must_use]
274 pub fn stats_snapshot(&self) -> StatsSnapshot {
275 let per_task = self
276 .tasks
277 .iter()
278 .zip(self.cycle_stats.iter())
279 .map(|(t, s)| {
280 let snap = s.snapshot();
281 TaskStatsEntry {
282 task_id: t.id.clone(),
283 p50_ns: snap.p50_ns,
284 p95_ns: snap.p95_ns,
285 p99_ns: snap.p99_ns,
286 min_ns: snap.min_ns,
287 max_ns: snap.max_ns,
288 max_jitter_ns: snap.max_jitter_ns,
289 max_lateness_ns: snap.max_lateness_ns,
290 overrun_count: t.overrun_count.load(Ordering::Acquire),
291 }
292 })
293 .collect();
294 StatsSnapshot { per_task }
295 }
296
297 /// Add an item to the executor with an auto-generated id.
298 pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
299 let id = TaskId::new(format!(
300 "task-{}",
301 self.next_id.fetch_add(1, Ordering::SeqCst)
302 ));
303 self.add_with_id(id, item)
304 }
305
306 /// Add an item with a user-supplied id.
307 ///
308 /// The item's [`ExecutableItem::task_id`] override takes precedence over
309 /// the caller-supplied `id`, which itself takes precedence over the
310 /// auto-generated id assigned by [`Executor::add`].
311 pub fn add_with_id(
312 &mut self,
313 id: impl Into<TaskId>,
314 mut item: impl ExecutableItem,
315 ) -> Result<TaskId, ExecutorError> {
316 let id_arg: TaskId = id.into();
317 // The item's `task_id()` override wins over the user-supplied id.
318 let id = item.task_id().map_or(id_arg, TaskId::new);
319 let mut declarer = TriggerDeclarer::new_internal();
320 item.declare_triggers(&mut declarer)?;
321 let budget = declarer.budget;
322 let decls = declarer.into_decls();
323
324 // REQ_0268: reject ill-defined trigger shapes (cyclic+event, zero
325 // period) before the task joins the table — the natural validation
326 // point, where the decls are first available, for every DispatchMode.
327 validate_decls(&id, &decls)?;
328
329 let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
330 let app_id = item_box.app_id();
331 let app_inst = item_box.app_instance_id();
332 // SAFETY: the raw pointer points into the heap allocation of
333 // `item_box`. `Box` keeps that allocation at a stable address even
334 // when the `Box` itself is moved (e.g. when `self.tasks` grows),
335 // so the pointer remains valid for the lifetime of the
336 // `TaskEntry`. See SendItemPtr safety doc for the rest of the
337 // discipline (barrier() pairs with worker access).
338 #[allow(unsafe_code)]
339 let item_ptr =
340 SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
341
342 // Allocate the per-task atomics now so the dispatch closure
343 // and the `TaskEntry` share the same `Arc` storage. The task
344 // will occupy `self.tasks.len()` after the push below — capture
345 // that index up front for `task_idx_u32`. Bounded workspace, so
346 // the `as u32` cast is sound; explicit allow keeps clippy quiet.
347 let task_fault = Arc::new(FaultAtomic::new());
348 let overrun_count = Arc::new(AtomicU64::new(0));
349 let scan_period = scan_period_from_decls(&decls);
350 let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
351 #[allow(clippy::cast_possible_truncation)]
352 let task_idx_u32 = self.tasks.len() as u32;
353 let fault_ctx = FaultDispatchCtx {
354 task_budget: budget,
355 task_fault: Arc::clone(&task_fault),
356 overrun_count: Arc::clone(&overrun_count),
357 iteration_budget: self.iteration_budget,
358 exec_fault: Arc::clone(&self.exec_fault),
359 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
360 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
361 task_idx_u32,
362 exec_start: Arc::clone(&self.start_time),
363 observer: Arc::clone(&self.observer),
364 };
365
366 let job = build_single_job(
367 id.clone(),
368 self.stoppable.clone(),
369 Arc::clone(&self.observer),
370 Arc::clone(&self.monitor),
371 Arc::clone(&self.iter_err),
372 app_id,
373 app_inst,
374 item_ptr,
375 fault_ctx,
376 Arc::clone(&last_took_ns),
377 Arc::clone(&self.clock),
378 );
379
380 self.tasks.push(TaskEntry {
381 id: id.clone(),
382 kind: TaskKind::Single(item_box),
383 decls,
384 job: Some(job),
385 budget,
386 fault: task_fault,
387 overrun_count,
388 handler_job: None,
389 scan_period,
390 last_took_ns: Arc::clone(&last_took_ns),
391 last_dispatch: None,
392 grid_slot: 0,
393 grid_epoch: None,
394 pending_skipped: 0,
395 pending_late: None,
396 pending_cycle: None,
397 });
398 self.cycle_stats
399 .push(TaskCycleStats::new(self.stats_window));
400 Ok(id)
401 }
402
403 /// Register an item plus a fault-handler item.
404 ///
405 /// The main item is registered through the canonical [`add`](Self::add)
406 /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
407 /// is called (so handlers that internally rely on the declarer being
408 /// invoked observe the call) but its returned trigger list is
409 /// **ignored** — the handler dispatches on the main item's triggers
410 /// while the task is in `Faulted` state and runs in place of the main
411 /// item's `execute()`. The pre-built handler dispatch closure is
412 /// stashed on the same task entry as the main item's `job`,
413 /// satisfying `REQ_0072`.
414 ///
415 /// # Errors
416 ///
417 /// Propagates any error from registering the main item via `add`, or
418 /// from the handler's `declare_triggers` call.
419 ///
420 /// # Panics
421 ///
422 /// Panics if the task entry just inserted by [`add`](Self::add) cannot
423 /// be located in `self.tasks` — this is unreachable by construction
424 /// and indicates a logic bug.
425 pub fn add_with_fault_handler<I, H>(
426 &mut self,
427 main: I,
428 handler: H,
429 ) -> Result<TaskId, ExecutorError>
430 where
431 I: ExecutableItem,
432 H: ExecutableItem,
433 {
434 let task_id = self.add(main)?;
435
436 // Drain the handler's trigger declarations — they are ignored by
437 // design (the handler runs on the main item's triggers).
438 let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
439 let mut throwaway = TriggerDeclarer::new_internal();
440 handler_box.declare_triggers(&mut throwaway)?;
441 drop(throwaway);
442
443 let app_id = handler_box.app_id();
444 let app_inst = handler_box.app_instance_id();
445
446 // Locate the task we just added so we can share its per-task
447 // atomics with the handler's `FaultDispatchCtx`. The handler
448 // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
449 // breach increments `overrun_count` and keeps state `Faulted`
450 // without re-firing the observer.
451 let task_idx = self
452 .tasks
453 .iter()
454 .position(|t| t.id == task_id)
455 .expect("just added; must exist");
456 let task = &self.tasks[task_idx];
457 #[allow(clippy::cast_possible_truncation)]
458 let task_idx_u32 = task_idx as u32;
459 let handler_fault_ctx = FaultDispatchCtx {
460 task_budget: task.budget,
461 task_fault: Arc::clone(&task.fault),
462 overrun_count: Arc::clone(&task.overrun_count),
463 iteration_budget: self.iteration_budget,
464 exec_fault: Arc::clone(&self.exec_fault),
465 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
466 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
467 task_idx_u32,
468 exec_start: Arc::clone(&self.start_time),
469 observer: Arc::clone(&self.observer),
470 };
471
472 let handler_closure = build_handler_job(
473 task_id.clone(),
474 self.stoppable.clone(),
475 Arc::clone(&self.observer),
476 Arc::clone(&self.monitor),
477 Arc::clone(&self.iter_err),
478 app_id,
479 app_inst,
480 handler_box,
481 handler_fault_ctx,
482 );
483
484 self.tasks[task_idx].handler_job = Some(handler_closure);
485
486 Ok(task_id)
487 }
488
489 /// Clear a per-task fault. Returns the previous `FaultState`.
490 /// Fires `Observer::on_task_clear` if the state changed from
491 /// `Faulted` to `Running`. `REQ_0070`.
492 ///
493 /// # Errors
494 ///
495 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
496 /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
497 pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
498 let entry = self
499 .tasks
500 .iter()
501 .find(|t| t.id == task)
502 .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
503 let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
504 let prev = entry.fault.swap(FaultState::Running, budget_ms);
505 match prev {
506 FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
507 FaultState::Faulted { .. } => {
508 self.observer.on_task_clear(task);
509 Ok(prev)
510 }
511 }
512 }
513
514 /// Clear the executor-wide fault and cascade-clear every task whose
515 /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
516 /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
517 /// breach is independent). Fires `Observer::on_executor_clear` and
518 /// one `Observer::on_task_clear` per cascade-cleared task.
519 /// `REQ_0071`.
520 ///
521 /// # Errors
522 ///
523 /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
524 pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
525 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
526 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
527 let prev = self
528 .exec_fault
529 .swap(ExecutorFaultState::Running, task_idx, budget_ms);
530 match prev {
531 ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
532 ExecutorFaultState::Faulted { .. } => {
533 // Cascade-clear tasks whose reason is ExecutorFaulted.
534 for entry in &self.tasks {
535 let task_budget_ms =
536 entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
537 if let FaultState::Faulted {
538 reason: FaultReason::ExecutorFaulted,
539 ..
540 } = entry.fault.load(task_budget_ms)
541 {
542 let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
543 self.observer.on_task_clear(entry.id.clone());
544 }
545 }
546 self.observer.on_executor_clear();
547 Ok(prev)
548 }
549 }
550 }
551
552 /// Return the per-task overrun counter — number of times the task's
553 /// `execute()` exceeded its budget over the executor's lifetime.
554 /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
555 ///
556 /// # Errors
557 ///
558 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
559 pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
560 self.tasks
561 .iter()
562 .find(|t| t.id == task)
563 .map(|t| t.overrun_count.load(Ordering::Acquire))
564 .ok_or_else(|| ExecutorError::TaskNotFound(task))
565 }
566
567 /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
568 ///
569 /// # Errors
570 ///
571 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
572 pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
573 self.tasks
574 .iter()
575 .find(|t| t.id == task)
576 .map(|t| {
577 let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
578 t.fault.load(budget_ms)
579 })
580 .ok_or_else(|| ExecutorError::TaskNotFound(task))
581 }
582
583 /// Return a snapshot of the executor-wide `ExecutorFaultState`.
584 /// `REQ_0073` (pull path).
585 #[must_use]
586 pub fn executor_fault_state(&self) -> ExecutorFaultState {
587 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
588 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
589 self.exec_fault.load(task_idx, budget_ms)
590 }
591
592 /// Add a sequential chain of items. Only the head item's
593 /// `declare_triggers` is consulted; non-head triggers are ignored with a
594 /// tracing warn.
595 pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
596 where
597 I: ExecutableItem,
598 C: IntoIterator<Item = I>,
599 {
600 let id = TaskId::new(format!(
601 "chain-{}",
602 self.next_id.fetch_add(1, Ordering::SeqCst)
603 ));
604 let boxed: Vec<Box<dyn ExecutableItem>> = items
605 .into_iter()
606 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
607 .collect();
608 self.add_chain_with_id_boxed(id, boxed)
609 }
610
611 /// Like [`Executor::add_chain`] but with a user-supplied id.
612 pub fn add_chain_with_id<I, C>(
613 &mut self,
614 id: impl Into<TaskId>,
615 items: C,
616 ) -> Result<TaskId, ExecutorError>
617 where
618 I: ExecutableItem,
619 C: IntoIterator<Item = I>,
620 {
621 let boxed: Vec<Box<dyn ExecutableItem>> = items
622 .into_iter()
623 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
624 .collect();
625 self.add_chain_with_id_boxed(id.into(), boxed)
626 }
627
628 fn add_chain_with_id_boxed(
629 &mut self,
630 id: TaskId,
631 mut items: Vec<Box<dyn ExecutableItem>>,
632 ) -> Result<TaskId, ExecutorError> {
633 if items.is_empty() {
634 return Err(ExecutorError::Builder(
635 "chain must contain at least one item".into(),
636 ));
637 }
638
639 // Head item's `task_id()` override wins over the user-supplied id.
640 let id = items[0].task_id().map_or(id, TaskId::new);
641
642 // Head's triggers gate the chain.
643 let mut head_declarer = TriggerDeclarer::new_internal();
644 items[0].declare_triggers(&mut head_declarer)?;
645 let decls = head_declarer.into_decls();
646
647 // REQ_0268: same trigger-shape validation as the single-item path,
648 // applied to the head item's decls (which gate the whole chain).
649 validate_decls(&id, &decls)?;
650
651 // Warn if non-head items declared triggers (those will be ignored).
652 for (i, body) in items.iter_mut().enumerate().skip(1) {
653 let mut spurious = TriggerDeclarer::new_internal();
654 let _ = body.declare_triggers(&mut spurious);
655 if !spurious.is_empty() {
656 #[cfg(feature = "tracing")]
657 tracing::warn!(
658 target: "taktora-executor",
659 task = %id,
660 position = i,
661 "non-head chain item declared triggers; they will be ignored"
662 );
663 #[cfg(not(feature = "tracing"))]
664 {
665 let _ = i;
666 }
667 }
668 }
669
670 let mut items = items;
671 // SAFETY: pointer into the chain's `items` Vec. The Vec lives
672 // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
673 // is stable once `add_chain` returns — `self.tasks` may grow
674 // (moving the `Vec<Box<...>>` header itself), but the Vec's
675 // heap buffer is referenced via the header's data pointer and
676 // is unaffected by header moves. We never resize the chain Vec
677 // after this point. See SendChainPtr safety doc for the rest.
678 #[allow(unsafe_code)]
679 let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
680 &mut items,
681 ));
682 // NB: the pointer above is to the local `items` Vec on the
683 // stack — it's invalid after the `push` below moves items into
684 // the TaskEntry. We rederive a stable pointer after the push.
685 // (See the rebuild step below.)
686 let _ = chain_ptr;
687
688 // Pre-allocate the per-task atomics so the chain's dispatch
689 // closure can capture clones of the same `Arc`s the `TaskEntry`
690 // holds. The chain occupies `self.tasks.len()` after the push.
691 let task_fault = Arc::new(FaultAtomic::new());
692 let overrun_count = Arc::new(AtomicU64::new(0));
693 let scan_period = scan_period_from_decls(&decls);
694 let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
695 #[allow(clippy::cast_possible_truncation)]
696 let task_idx_u32 = self.tasks.len() as u32;
697
698 self.tasks.push(TaskEntry {
699 id: id.clone(),
700 kind: TaskKind::Chain(items),
701 decls,
702 job: None, // populated in the rebuild step below
703 // TODO(post-Task-10): chain budgets carried separately; for now None.
704 budget: None,
705 fault: Arc::clone(&task_fault),
706 overrun_count: Arc::clone(&overrun_count),
707 handler_job: None,
708 scan_period,
709 last_took_ns: Arc::clone(&last_took_ns),
710 last_dispatch: None,
711 grid_slot: 0,
712 grid_epoch: None,
713 pending_skipped: 0,
714 pending_late: None,
715 pending_cycle: None,
716 });
717 self.cycle_stats
718 .push(TaskCycleStats::new(self.stats_window));
719
720 // After the push, the TaskEntry lives at a stable position in
721 // `self.tasks` for the duration of this `add_chain_with_id_boxed`
722 // call. Take a stable pointer to its chain Vec and build the
723 // dispatch closure. If `self.tasks` later grows, the Vec header
724 // inside the TaskEntry moves but the header's data pointer
725 // (which addresses the chain's heap buffer) does not — and the
726 // closure derefs that pointer per dispatch, so it re-reads the
727 // current heap address each time. Sound under the same
728 // discipline as `tasks_ptr` in dispatch_loop.
729 let task_idx = self.tasks.len() - 1;
730 let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
731 {
732 TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
733 // The push above used TaskKind::Chain, so this arm is
734 // unreachable. Mark it explicitly to satisfy `match`.
735 _ => unreachable!("just-pushed task is TaskKind::Chain"),
736 };
737 #[allow(unsafe_code)]
738 let chain_ptr = SendChainPtr::new(chain_vec_ptr);
739 let fault_ctx = FaultDispatchCtx {
740 task_budget: None, // chain budgets are intentionally None for now
741 task_fault,
742 overrun_count,
743 iteration_budget: self.iteration_budget,
744 exec_fault: Arc::clone(&self.exec_fault),
745 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
746 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
747 task_idx_u32,
748 exec_start: Arc::clone(&self.start_time),
749 observer: Arc::clone(&self.observer),
750 };
751 let job = build_chain_job(
752 id.clone(),
753 self.stoppable.clone(),
754 Arc::clone(&self.observer),
755 Arc::clone(&self.monitor),
756 Arc::clone(&self.iter_err),
757 chain_ptr,
758 fault_ctx,
759 Arc::clone(&last_took_ns),
760 Arc::clone(&self.clock),
761 );
762 self.tasks[task_idx].job = Some(job);
763 Ok(id)
764 }
765
766 /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
767 /// executor is built. Clone before calling `run()` — any clone taken at any
768 /// time will wake the `WaitSet` when `stop()` is called.
769 #[must_use]
770 pub fn stoppable(&self) -> Stoppable {
771 self.stoppable.clone()
772 }
773
774 /// Borrow the underlying iceoryx2 node (escape hatch for power users).
775 pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
776 &self.node
777 }
778
779 /// Begin building a graph. Call `.build()` on the returned builder to
780 /// register the graph as a task.
781 pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
782 ExecutorGraphBuilder {
783 executor: self,
784 builder: crate::graph::GraphBuilder::new(),
785 custom_id: None,
786 }
787 }
788}
789
790/// Builder for [`Executor`].
791pub struct ExecutorBuilder {
792 worker_threads: Option<usize>,
793 observer: Option<Arc<dyn Observer>>,
794 monitor: Option<Arc<dyn ExecutionMonitor>>,
795 worker_attrs: ThreadAttributes,
796 /// Executor-wide iteration budget (`REQ_0071`). `None` means no
797 /// executor-wide check.
798 iteration_budget: Option<Duration>,
799 /// User-supplied fatal handler. `None` → resolved to a no-op `Arc` in
800 /// `build()`.
801 fatal_handler: Option<FatalHandler>,
802 /// Sliding-window size (samples) for cycle-stats aggregation
803 /// (`REQ_0100`). `None` → resolved to `1024` in `build()`.
804 stats_window: Option<u32>,
805 /// Telemetry time source. `None` → resolved to [`SystemClock`] in
806 /// `build()`. Override with a [`MockClock`](crate::MockClock) for
807 /// deterministic timing tests.
808 clock: Option<Arc<dyn MonotonicClock>>,
809 /// Cyclic dispatch timing strategy (`REQ_0268`). Default
810 /// [`DispatchMode::Grid`](crate::DispatchMode).
811 dispatch_mode: crate::DispatchMode,
812 /// Scheduling clock for the absolute grid. `None` → resolved to
813 /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock) in `build()`.
814 cyclic_clock: Option<std::sync::Arc<dyn crate::CyclicClock>>,
815}
816
817impl Default for ExecutorBuilder {
818 fn default() -> Self {
819 Self {
820 worker_threads: None,
821 observer: None,
822 monitor: None,
823 worker_attrs: ThreadAttributes::new(),
824 iteration_budget: None,
825 fatal_handler: None,
826 stats_window: None,
827 clock: None,
828 dispatch_mode: crate::DispatchMode::default(),
829 cyclic_clock: None,
830 }
831 }
832}
833
834impl ExecutorBuilder {
835 /// Number of worker threads. `0` → inline (no pool). Default → physical
836 /// cores.
837 #[must_use]
838 pub const fn worker_threads(mut self, n: usize) -> Self {
839 self.worker_threads = Some(n);
840 self
841 }
842
843 /// Attach a lifecycle observer. If not called, a no-op observer is used.
844 #[must_use]
845 pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
846 self.observer = Some(obs);
847 self
848 }
849
850 /// Attach an execution monitor. If not called, a no-op monitor is used.
851 #[must_use]
852 pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
853 self.monitor = Some(mon);
854 self
855 }
856
857 /// Configure the executor-wide iteration budget. Any task whose
858 /// `execute()` exceeds `dur` transitions the executor to `Faulted`
859 /// (`REQ_0071`). Default: unset (no executor-wide check).
860 #[must_use]
861 pub const fn iteration_budget(mut self, dur: Duration) -> Self {
862 self.iteration_budget = Some(dur);
863 self
864 }
865
866 /// Sliding-window size (samples) for percentile / min-max / jitter /
867 /// lateness aggregation (`REQ_0100`). Default `1024`.
868 #[must_use]
869 pub const fn stats_window(mut self, samples: u32) -> Self {
870 self.stats_window = Some(samples);
871 self
872 }
873
874 /// Substitute the telemetry time source. Defaults to [`SystemClock`].
875 ///
876 /// Pass a [`MockClock`](crate::MockClock) clone to drive `took` / jitter /
877 /// lateness from scripted instants, making timing assertions exact and
878 /// independent of the host scheduler. The clock affects telemetry only —
879 /// scheduling, run-mode deadlines and fault detection always use the real
880 /// monotonic clock.
881 #[must_use]
882 pub fn clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
883 self.clock = Some(clock);
884 self
885 }
886
887 /// Select cyclic dispatch timing (default `DispatchMode::Grid`). `Legacy` is
888 /// the pre-REQ_0268 `attach_interval` path, retained only until the Pi A/B.
889 #[must_use]
890 pub const fn dispatch_mode(mut self, mode: crate::DispatchMode) -> Self {
891 self.dispatch_mode = mode;
892 self
893 }
894
895 /// Override the scheduling clock (default `MonotonicCyclicClock`). Distinct
896 /// from `clock` (telemetry) — see `CyclicClock`.
897 #[must_use]
898 pub fn cyclic_clock(mut self, clock: std::sync::Arc<dyn crate::CyclicClock>) -> Self {
899 self.cyclic_clock = Some(clock);
900 self
901 }
902
903 /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
904 /// for worker threads. Has no effect when `worker_threads` is `0` (inline
905 /// mode). Requires the `thread_attrs` feature for non-default settings.
906 #[must_use]
907 #[allow(clippy::missing_const_for_fn)]
908 pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
909 self.worker_attrs = attrs;
910 self
911 }
912
913 /// Register a best-effort last-gasp handler invoked once on the fail-fast
914 /// path immediately before `std::process::abort()`.
915 ///
916 /// **Contract**: runs over known-unsound executor state — MUST NOT touch
917 /// executor internals; a panic inside the handler routes straight to
918 /// `abort()`.
919 ///
920 /// The handler is expected to be time-bounded (the caller's responsibility);
921 /// no runtime deadline is imposed.
922 ///
923 /// **Observer / monitor containment carve-out**: the panic containment
924 /// described in the executor documentation covers only a user item's
925 /// `execute()` call. Panics that originate in framework-invoked user
926 /// callbacks that run *outside* that inner catch — such as
927 /// [`Observer`](crate::Observer) methods (e.g. `on_app_error`,
928 /// `on_task_fault`) and [`ExecutionMonitor`](crate::ExecutionMonitor)
929 /// methods (e.g. `post_execute`) — escape to this fail-fast boundary and
930 /// cause `abort()`. Those callbacks must therefore be treated as
931 /// non-panicking by the implementor. See `REQ_0123`.
932 ///
933 /// If not called, a no-op handler is used and `abort()` is still reached
934 /// after any unrecoverable fault.
935 #[must_use]
936 pub fn on_fatal(
937 mut self,
938 handler: impl Fn(&crate::FatalContext) + Send + Sync + 'static,
939 ) -> Self {
940 self.fatal_handler = Some(Arc::new(handler));
941 self
942 }
943
944 /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
945 /// internal stop-event service so that any `Stoppable` clone (taken before
946 /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
947 ///
948 /// # Panics
949 ///
950 /// Panics if the internally-generated stop-event service name exceeds the
951 /// iceoryx2 service name length limit (this cannot happen under normal use
952 /// because the name is derived from the process id and a monotonic counter).
953 #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
954 #[track_caller]
955 pub fn build(self) -> Result<Executor, ExecutorError> {
956 let node = NodeBuilder::new()
957 .create::<ipc::Service>()
958 .map_err(ExecutorError::iceoryx2)?;
959
960 let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
961
962 // Resolve the fatal handler: use the user-supplied one or fall back to a no-op.
963 let fatal_handler: FatalHandler = self
964 .fatal_handler
965 .unwrap_or_else(|| Arc::new(|_ctx: &crate::FatalContext| {}));
966 let fatal_dispatch = Arc::new(FatalDispatch::new(fatal_handler));
967
968 let pool = Arc::new(Pool::new(
969 n_workers,
970 self.worker_attrs,
971 Arc::clone(&fatal_dispatch),
972 )?);
973
974 // Build the internal stop event service with a unique-per-process name
975 // so multiple executors in the same process don't collide.
976 let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
977 let stop_topic = format!(
978 "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
979 std::process::id()
980 );
981 let stop_event = node
982 .service_builder(&stop_topic.as_str().try_into().unwrap())
983 .event()
984 .open_or_create()
985 .map_err(ExecutorError::iceoryx2)?;
986
987 let stop_notifier = Arc::new(
988 stop_event
989 .notifier_builder()
990 .create()
991 .map_err(ExecutorError::iceoryx2)?,
992 );
993
994 // SAFETY: see module-level note; Arc<IxListener> is held here and only
995 // accessed on the executor thread.
996 let stop_listener = Arc::new(
997 stop_event
998 .listener_builder()
999 .create()
1000 .map_err(ExecutorError::iceoryx2)?,
1001 );
1002
1003 // Wire the notifier into the Stoppable so every clone is waker-aware
1004 // from the moment the executor is built.
1005 let stoppable = Stoppable::with_waker(stop_notifier);
1006
1007 let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
1008
1009 let monitor: Arc<dyn ExecutionMonitor> =
1010 self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
1011
1012 let clock: Arc<dyn MonotonicClock> =
1013 self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
1014
1015 let cyclic_clock: std::sync::Arc<dyn crate::CyclicClock> = self
1016 .cyclic_clock
1017 .unwrap_or_else(|| std::sync::Arc::new(crate::MonotonicCyclicClock::new()));
1018
1019 let exec = Executor {
1020 node,
1021 pool,
1022 tasks: Vec::new(),
1023 cycle_stats: Vec::new(),
1024 stats_window: self.stats_window.unwrap_or(1024),
1025 running: Arc::new(AtomicBool::new(false)),
1026 stoppable,
1027 next_id: AtomicU64::new(0),
1028 stop_listener,
1029 observer,
1030 monitor,
1031 iter_err: Arc::new(std::sync::Mutex::new(None)),
1032 iteration_budget: self.iteration_budget,
1033 exec_fault: Arc::new(ExecutorFaultAtomic::new()),
1034 exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
1035 exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
1036 start_time: Arc::new(OnceLock::new()),
1037 fatal_dispatch,
1038 clock,
1039 dispatch_mode: self.dispatch_mode,
1040 cyclic_clock,
1041 };
1042
1043 Ok(exec)
1044 }
1045}
1046
1047// ── Run loop ──────────────────────────────────────────────────────────────────
1048
1049impl Executor {
1050 /// Run the executor until [`Stoppable::stop`] is called or a task signals
1051 /// stop via [`crate::Context::stop_executor`].
1052 ///
1053 /// # Errors
1054 ///
1055 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1056 ///
1057 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1058 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1059 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1060 ///
1061 /// If multiple items error in the same dispatch iteration, only the first
1062 /// is preserved; subsequent errors are discarded silently. To observe
1063 /// every error, attach an [`Observer`](crate::Observer) and read errors
1064 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1065 pub fn run(&mut self) -> Result<(), ExecutorError> {
1066 self.run_inner(RunMode::Forever)
1067 }
1068
1069 /// Run for at most `max` wall-clock duration, then return.
1070 ///
1071 /// # Errors
1072 ///
1073 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1074 ///
1075 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1076 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1077 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1078 ///
1079 /// If multiple items error in the same dispatch iteration, only the first
1080 /// is preserved; subsequent errors are discarded silently. To observe
1081 /// every error, attach an [`Observer`](crate::Observer) and read errors
1082 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1083 pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
1084 self.run_inner(RunMode::Until(Instant::now() + max))
1085 }
1086
1087 /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
1088 ///
1089 /// # Errors
1090 ///
1091 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1092 ///
1093 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1094 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1095 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1096 ///
1097 /// If multiple items error in the same dispatch iteration, only the first
1098 /// is preserved; subsequent errors are discarded silently. To observe
1099 /// every error, attach an [`Observer`](crate::Observer) and read errors
1100 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1101 pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
1102 self.run_inner(RunMode::Iterations(n))
1103 }
1104
1105 /// Run until `predicate()` returns true. Checked after each `WaitSet`
1106 /// wakeup.
1107 ///
1108 /// # Errors
1109 ///
1110 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1111 ///
1112 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1113 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1114 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1115 ///
1116 /// If multiple items error in the same dispatch iteration, only the first
1117 /// is preserved; subsequent errors are discarded silently. To observe
1118 /// every error, attach an [`Observer`](crate::Observer) and read errors
1119 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1120 pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
1121 self.run_inner(RunMode::Predicate(&mut predicate))
1122 }
1123}
1124
1125enum RunMode<'a> {
1126 Forever,
1127 Until(Instant),
1128 Iterations(usize),
1129 Predicate(&'a mut dyn FnMut() -> bool),
1130}
1131
1132impl Executor {
1133 fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
1134 // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
1135 // remains true permanently. Calling `run()` again after a stop will return
1136 // promptly without doing any meaningful work (it blocks until the first
1137 // trigger fires, then immediately exits the dispatch loop). Task 10's
1138 // Runner accommodates this by treating an Executor as one-shot: each
1139 // Runner owns the Executor and consumes it.
1140 if self.running.swap(true, Ordering::SeqCst) {
1141 return Err(ExecutorError::AlreadyRunning);
1142 }
1143
1144 self.observer.on_executor_up();
1145 let result = self.dispatch_loop(&mut mode);
1146 match &result {
1147 Ok(()) => self.observer.on_executor_down(),
1148 Err(e) => self.observer.on_executor_error(e),
1149 }
1150
1151 self.running.store(false, Ordering::SeqCst);
1152 result
1153 }
1154
1155 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1156 #[allow(
1157 unsafe_code,
1158 clippy::too_many_lines,
1159 clippy::ref_as_ptr,
1160 clippy::borrow_as_ptr
1161 )]
1162 fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
1163 // Tight timer slack for the dispatch thread (REQ_0274). SCHED_OTHER
1164 // threads inherit the kernel's 50 µs default, and `epoll_wait`
1165 // sleeps through `schedule_hrtimeout_range` WITH that slack: on the
1166 // Pi5 rig that cost 56 µs/cycle accumulated Legacy-dispatch drift
1167 // (5.5 µs/cycle at 1 µs slack — the residual is iceoryx2's
1168 // ms-rounded epoll timeout + wake latency). Real-time classes force
1169 // slack to 0, so this is a no-op under SCHED_FIFO and for the
1170 // Grid/timerfd path (fd readiness, not timeout, drives the wake);
1171 // it removes a 10× cyclic-precision regression for non-RT Legacy
1172 // deployments. Thread-local, never fails for valid args; an error
1173 // (exotic kernel) would only restore today's behavior, so the
1174 // return value is deliberately not checked.
1175 #[cfg(target_os = "linux")]
1176 // SAFETY: prctl(PR_SET_TIMERSLACK) only adjusts the calling
1177 // thread's timer slack; no memory is involved.
1178 unsafe {
1179 libc::prctl(libc::PR_SET_TIMERSLACK, 1_000u64, 0, 0, 0);
1180 }
1181
1182 let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
1183 .create()
1184 .map_err(ExecutorError::iceoryx2)?;
1185
1186 // Keep Arc<RawListener> alive for at least as long as the WaitSet
1187 // guards — the guard borrows the listener via 'attachment lifetime.
1188 let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
1189 // Guards must outlive the run loop.
1190 let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
1191 // Maps guard index → task index.
1192 let mut attachment_to_task: Vec<usize> = Vec::new();
1193
1194 // Hoist to a local for the hot loop — one Copy-enum compare per cycle,
1195 // never a field re-read (REQ_0268).
1196 let dispatch_mode = self.dispatch_mode;
1197
1198 // Cyclic tasks are dispatched by the master timer + GridTimer (REQ_0268),
1199 // not attached as individual WaitSet triggers. Cross-platform: only the
1200 // wake source differs (Task 3).
1201 let mut cyclic_task_indices: Vec<usize> = Vec::new();
1202 let mut cyclic_periods: Vec<u64> = Vec::new();
1203 build_attachments(
1204 &waitset,
1205 &self.tasks,
1206 dispatch_mode,
1207 &mut listener_storage,
1208 &mut guards,
1209 &mut attachment_to_task,
1210 &mut cyclic_task_indices,
1211 &mut cyclic_periods,
1212 )?;
1213 // `cyclic_periods` is cloned, not moved, because Task 3 reads it again to
1214 // arm the single master timerfd; on non-Linux it is unused after this.
1215 let mut grid =
1216 crate::grid::GridTimer::new(self.cyclic_clock.now_nanos(), cyclic_periods.clone());
1217 let mut due_cyclic: Vec<(usize, u64, u64)> = Vec::new();
1218
1219 // Master cyclic timer (REQ_0268, Linux). ONE timerfd armed at the base
1220 // period (gcd of cyclic periods) drives the absolute grid; GridTimer
1221 // decides which tasks are due each tick. Declared above its own guard so
1222 // it drops AFTER the guard (detach before close → no EBADF). Must be
1223 // declared here, after `build_attachments` has filled `cyclic_periods`.
1224 #[cfg(target_os = "linux")]
1225 let master_timer: Option<crate::timerfd::TimerFd> = {
1226 let base = crate::grid::base_period(&cyclic_periods);
1227 if base == 0 {
1228 None
1229 } else {
1230 Some(
1231 crate::timerfd::TimerFd::new(std::time::Duration::from_nanos(base)).map_err(
1232 |e| {
1233 ExecutorError::DeclareTriggers(format!(
1234 "failed to arm master timerfd: {e}"
1235 ))
1236 },
1237 )?,
1238 )
1239 }
1240 };
1241
1242 // Attach the master timer as a wake-only notification, held separately
1243 // (like the stop listener) so `process_attachment` never maps it to a
1244 // task. `_master_timer_guard` is declared immediately after `master_timer`
1245 // so on scope exit it drops FIRST — detaching the fd from the WaitSet's
1246 // epoll set — and `master_timer` drops SECOND, closing the fd. That
1247 // ordering is what prevents iceoryx2's `EPOLL_CTL_DEL` from hitting a
1248 // closed fd (EBADF). `master_timer`'s fd is referenced ONLY by this guard
1249 // (independent of `guards`/`listener_storage`, which own other fds), so
1250 // its drop position relative to those Vecs is immaterial.
1251 #[cfg(target_os = "linux")]
1252 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1253 let _master_timer_guard = match &master_timer {
1254 // SAFETY: `master_timer` is a stack local that outlives this guard
1255 // (declared above it); the cast erases the borrow lifetime to the
1256 // attachment lifetime, sound by the same discipline as the stop
1257 // listener. Dropped before `master_timer` closes the fd.
1258 Some(tf) => Some(
1259 waitset
1260 .attach_notification(unsafe { &*(tf as *const crate::timerfd::TimerFd) })
1261 .map_err(ExecutorError::iceoryx2)?,
1262 ),
1263 None => None,
1264 };
1265
1266 // Attach the internal stop listener so the WaitSet wakes when
1267 // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
1268 // struct which is valid for the lifetime of dispatch_loop. We use the
1269 // same raw-pointer-cast pattern as user listeners above.
1270 //
1271 // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
1272 // exclusively borrowed for the duration of `run_inner` (which calls
1273 // `dispatch_loop`). The listener is not freed while the guard is alive
1274 // because the Arc keeps it alive and `self` outlives this function.
1275 let stop_listener_ref: &IxListener<ipc::Service> =
1276 unsafe { &*(self.stop_listener.as_ref() as *const _) };
1277 let _stop_guard = waitset
1278 .attach_notification(stop_listener_ref)
1279 .map_err(ExecutorError::iceoryx2)?;
1280
1281 let iterations_done = AtomicUsize::new(0);
1282 let stop_flag = self.stoppable.clone();
1283
1284 loop {
1285 // Reset the pre-allocated per-iteration error slot (REQ_0060):
1286 // the slot is owned by `self.iter_err`, allocated once at build
1287 // time. Pool worker closures obtain a refcount-only clone of
1288 // the `Arc`; the slot itself is reused across iterations.
1289 #[allow(clippy::unwrap_used)]
1290 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1291 let mut iter_err_guard = self.iter_err.lock().unwrap();
1292 *iter_err_guard = None;
1293 drop(iter_err_guard);
1294
1295 // SAFETY: we capture &mut self.tasks via a raw pointer because
1296 // wait_and_process expects FnMut and Rust can't see the closure
1297 // outlives `self`. The discipline that makes this sound:
1298 // 1. The closure body on the executor thread is the *only* code that
1299 // reads `tasks_ptr`. The pool jobs it submits hold borrowed
1300 // `*mut dyn ExecutableItem` slices into individual TaskEntries,
1301 // not into the Vec itself, so they don't race with the Vec.
1302 // 2. `pool.barrier()` at the end of this callback ensures every
1303 // submitted pool job has completed (and dropped its raw pointer)
1304 // before the callback returns. The next iteration of the WaitSet
1305 // loop is therefore the sole user of `tasks_ptr` again.
1306 // 3. The Vec is never resized inside this loop (no `push` / `remove`
1307 // after dispatch starts), so the underlying buffer addresses are
1308 // stable for the lifetime of `dispatch_loop`.
1309 let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
1310 // Take the cycle_stats raw pointer before borrowing `observer`, so
1311 // the &mut borrow is released first — same discipline as tasks_ptr.
1312 let cycle_stats_ptr = &mut self.cycle_stats as *mut Vec<TaskCycleStats>;
1313 let observer = &self.observer;
1314 let pool = &self.pool;
1315 // Refcount-only clone of the pre-allocated error slot. Pool jobs
1316 // need a `'static` handle, and an `Arc::clone` does not allocate.
1317 // The Single/Chain paths use the closure baked into `task.job`,
1318 // which already captured stable Arc clones at `add`-time; the
1319 // Graph path uses closures pre-built by `prepare_dispatch`. Only
1320 // the error-aggregation logic on the WaitSet thread still needs
1321 // the slot here.
1322 let iter_err_inner = Arc::clone(&self.iter_err);
1323 // Raw pointer to the stop listener for draining inside the callback.
1324 // SAFETY: same as stop_listener_ref above — the Arc is alive for
1325 // the lifetime of dispatch_loop.
1326 let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
1327 // Raw pointer to the executor-wide fault state. Same safety
1328 // discipline as `tasks_ptr`: `Executor` is alive for the
1329 // duration of `dispatch_loop`; the WaitSet callback is the
1330 // only reader. REQ_0071. `self.exec_fault` is
1331 // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
1332 // pointer to the inner `ExecutorFaultAtomic`.
1333 let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
1334 // Raw pointer to the executor start time. Used by the lazy
1335 // cascade below to compute `since_ms` on task transitions
1336 // triggered by an executor-wide fault.
1337 let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1338 // Telemetry clock. Same lifetime/aliasing discipline as the
1339 // pointers above: the Executor outlives the dispatch loop and the
1340 // WaitSet callback is the sole reader.
1341 let clock = &self.clock;
1342
1343 // Wrap the per-iteration dispatch body in the framework panic
1344 // boundary. A panic escaping here is *infrastructure* (the WaitSet
1345 // drive, pool submission/barrier, or dispatch wiring) — not a user
1346 // item panic, which is already caught and faulted inside
1347 // `run_item_catch_unwind`. On such a panic `guard_or_fatal` runs the
1348 // user fatal handler then aborts in production. Under a test
1349 // terminal it returns `None`, in which case we must NOT keep
1350 // iterating over possibly-corrupt executor state, so we break out.
1351 let Some(cb_result) =
1352 guard_or_fatal(&self.fatal_dispatch, FatalSite::ExecutorRunLoop, || {
1353 // Bundle the per-iteration captures into a single context the
1354 // WaitSet callback delegates to. Keeping the closure a thin
1355 // adapter over `DispatchPass::process_attachment` keeps the
1356 // dispatch logic in named, individually-measurable functions.
1357 let mut pass = DispatchPass {
1358 guards: &guards,
1359 attachment_to_task: &attachment_to_task,
1360 tasks_ptr,
1361 cycle_stats_ptr,
1362 observer,
1363 exec_fault_ptr,
1364 exec_start_ptr,
1365 clock,
1366 stop_listener_ptr,
1367 pool,
1368 iter_err: &iter_err_inner,
1369 };
1370
1371 // Linux: block on fds — the master timerfd wakes us on the
1372 // absolute grid. Non-Linux dev: bound the wait by the earliest
1373 // pending grid target so the post-wait pass can dispatch.
1374 #[cfg(target_os = "linux")]
1375 let timeout = std::time::Duration::MAX;
1376 #[cfg(not(target_os = "linux"))]
1377 let timeout = match dispatch_mode {
1378 crate::DispatchMode::Grid => {
1379 grid.next_timeout(self.cyclic_clock.now_nanos())
1380 }
1381 crate::DispatchMode::Legacy => std::time::Duration::MAX,
1382 };
1383 waitset.wait_and_process_once_with_timeout(
1384 |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1385 pass.process_attachment(&attachment_id)
1386 },
1387 timeout,
1388 )
1389 })
1390 else {
1391 // Only reachable under a test terminal (production aborts in
1392 // `fire`). Bail out of the run loop rather than continuing over
1393 // possibly-corrupt executor state.
1394 //
1395 // Unreachable in production: the production terminal aborts
1396 // before returning, so this branch exists solely so a
1397 // `#[cfg(test)]` recording terminal can unwind the loop.
1398 // Consequently, silently discarding any pending `iter_err`
1399 // here is immaterial to production behavior.
1400 break Ok(());
1401 };
1402
1403 // Did the master timer tick this wake? Linux: drain it (clears epoll
1404 // readiness; >0 overruns means the absolute grid advanced). Non-Linux:
1405 // the self-computed timeout drove the wake, so always consult the grid
1406 // (take_due self-gates per task on `now >= next`). REQ_0268.
1407 #[cfg(target_os = "linux")]
1408 let ticked = master_timer.as_ref().is_some_and(|tf| tf.drain() > 0);
1409 #[cfg(not(target_os = "linux"))]
1410 let ticked = true;
1411
1412 // Post-wait master-grid pass (Grid mode). `run_grid_cyclic_pass`
1413 // self-gates on `ticked` / stop-wake / mode / non-empty, then
1414 // dispatches EVERY due cyclic task atomically this tick (PLC
1415 // semantics). `cpass` is a side-effect-free bundle of borrows, so
1416 // building it unconditionally is free; the gate lives in the helper
1417 // to keep `dispatch_loop` within the complexity budget. REQ_0268.
1418 let cpass = DispatchPass {
1419 guards: &guards,
1420 attachment_to_task: &attachment_to_task,
1421 tasks_ptr,
1422 cycle_stats_ptr,
1423 observer,
1424 exec_fault_ptr,
1425 exec_start_ptr,
1426 clock,
1427 stop_listener_ptr,
1428 pool,
1429 iter_err: &iter_err_inner,
1430 };
1431 run_grid_cyclic_pass(
1432 cpass,
1433 ticked,
1434 dispatch_mode,
1435 &stop_flag,
1436 cb_result,
1437 &mut grid,
1438 self.cyclic_clock.now_nanos(),
1439 &cyclic_task_indices,
1440 &mut due_cyclic,
1441 );
1442
1443 // Funnel the post-callback decision (interrupt / item error /
1444 // stop request / run-mode termination) through one helper that
1445 // yields a single control value, so the loop has exactly one exit.
1446 match self.after_callback(cb_result, mode, &iterations_done, &stop_flag) {
1447 IterOutcome::Continue => {}
1448 IterOutcome::Done => break Ok(()),
1449 IterOutcome::Failed(err) => break Err(err),
1450 }
1451 }
1452 }
1453
1454 /// Evaluates the post-callback termination conditions for one dispatch
1455 /// iteration and reports whether the loop should continue, stop, or fail.
1456 ///
1457 /// Order of precedence matches the original inline checks: `WaitSet`
1458 /// errors, then SIGINT/SIGTERM, then a captured item error, then a stop
1459 /// request, then a bare-EINTR continue (`REQ_0269` — an interrupted wait
1460 /// is a spurious wake, not a termination), then the active [`RunMode`]
1461 /// limit.
1462 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1463 fn after_callback(
1464 &self,
1465 cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1466 mode: &mut RunMode<'_>,
1467 iterations_done: &AtomicUsize,
1468 stop_flag: &Stoppable,
1469 ) -> IterOutcome {
1470 let cb_result = match cb_result.map_err(ExecutorError::iceoryx2) {
1471 Ok(r) => r,
1472 Err(e) => return IterOutcome::Failed(e),
1473 };
1474
1475 // iceoryx2's WaitSet catches SIGINT/SIGTERM internally and reports
1476 // them as `TerminationRequest`; honor that here for a clean exit.
1477 if matches!(cb_result, WaitSetRunResult::TerminationRequest) {
1478 return IterOutcome::Done;
1479 }
1480
1481 // `Interrupt` is a bare EINTR: ANY handled signal — job control
1482 // (SIGSTOP/SIGCONT), a debugger attach, a user-installed handler —
1483 // interrupts the reactor wait. That is a spurious wake, not a
1484 // termination request: a cyclic control runtime must keep running
1485 // (REQ_0269; found on the Pi5 rig, where `kill -STOP`/`-CONT` ended
1486 // `run_n(60000)` cleanly at ~5k cycles). Item errors and `stop()`
1487 // are still honored below; the wake is NOT counted as an iteration
1488 // (nothing was dispatched). A real SIGINT/SIGTERM still terminates:
1489 // iceoryx2 latches it and the NEXT wait call returns
1490 // `TerminationRequest` before blocking.
1491 let interrupted = matches!(cb_result, WaitSetRunResult::Interrupt);
1492
1493 // Extract the error before dropping the MutexGuard — avoids holding the
1494 // lock across the return (clippy::significant_drop_in_scrutinee).
1495 #[allow(clippy::unwrap_used)]
1496 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1497 let maybe_err = self.iter_err.lock().unwrap().take();
1498 if let Some(err) = maybe_err {
1499 return IterOutcome::Failed(err);
1500 }
1501 if stop_flag.is_stopped() {
1502 return IterOutcome::Done;
1503 }
1504
1505 // An interrupted wake is spurious (REQ_0269): it dispatched nothing,
1506 // so it neither counts as an iteration nor evaluates the run-mode
1507 // limit — it short-circuits to `Continue` and re-enters the wait.
1508 let reached_limit = !interrupted && {
1509 iterations_done.fetch_add(1, Ordering::SeqCst);
1510 match mode {
1511 RunMode::Forever => false,
1512 RunMode::Iterations(n) => iterations_done.load(Ordering::SeqCst) >= *n,
1513 RunMode::Until(deadline) => Instant::now() >= *deadline,
1514 RunMode::Predicate(p) => (p)(),
1515 }
1516 };
1517 if reached_limit {
1518 IterOutcome::Done
1519 } else {
1520 IterOutcome::Continue
1521 }
1522 }
1523}
1524
1525/// Outcome of one `dispatch_loop` iteration's post-callback evaluation.
1526enum IterOutcome {
1527 /// Run another iteration.
1528 Continue,
1529 /// Terminate the loop successfully.
1530 Done,
1531 /// Terminate the loop with the given error.
1532 Failed(ExecutorError),
1533}
1534
1535/// Post-wait absolute-grid pass (Grid mode only, `REQ_0268` / `ADR_0100`).
1536///
1537/// The `WaitSet` callback handles event/fd tasks; cyclic tasks are timed here
1538/// off the scheduling clock. `pass` mirrors the callback's `DispatchPass`
1539/// exactly — same borrows and raw pointers, same single-writer WaitSet-thread
1540/// discipline — and the callback is already dropped (its borrows freed) by the
1541/// time this runs. We poll `grid` for due cyclic slots, dispatch each due task,
1542/// and fold their telemetry through the SHARED [`DispatchPass::barrier_and_record`]
1543/// helper. This is a SEPARATE barrier phase from the callback's: each phase
1544/// barriers and folds only its own `pending_cycle` stashes, so cyclic tasks
1545/// record exactly once, identically to event tasks. We do NOT call
1546/// `record_cycle_for` directly here.
1547///
1548/// Self-gates and returns early (no dispatch, no record) unless this wake should
1549/// run the grid: the master timer ticked (`ticked`), we are in `Grid` mode, it is
1550/// not a stop wake, and there is at least one cyclic task with something due.
1551///
1552/// **Stop-wake suppression (`REQ_0268`)**: a `stop()` (or a SIGINT/SIGTERM
1553/// `cb_result`) must emit no spurious cyclic cycle — Legacy dispatches none on a
1554/// stop wake, so the grid path matches, or a `stop()` would emit one extra cycle
1555/// observation and desync the `FEAT_0038` `cycle_index` join key. Termination
1556/// itself is still decided by `after_callback`; this only suppresses the side
1557/// effects.
1558#[allow(clippy::too_many_arguments)]
1559fn run_grid_cyclic_pass(
1560 mut pass: DispatchPass<'_, '_, '_>,
1561 ticked: bool,
1562 dispatch_mode: crate::DispatchMode,
1563 stop_flag: &Stoppable,
1564 cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1565 grid: &mut crate::grid::GridTimer,
1566 now_nanos: u64,
1567 cyclic_task_indices: &[usize],
1568 due_cyclic: &mut Vec<(usize, u64, u64)>,
1569) {
1570 let stopping = stop_flag.is_stopped()
1571 || matches!(
1572 cb_result,
1573 Ok(WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest)
1574 );
1575 if !ticked
1576 || stopping
1577 || dispatch_mode != crate::DispatchMode::Grid
1578 || cyclic_task_indices.is_empty()
1579 {
1580 return;
1581 }
1582 grid.take_due(now_nanos, due_cyclic);
1583 if due_cyclic.is_empty() {
1584 return;
1585 }
1586 for (slot, skipped, late_by) in due_cyclic.iter() {
1587 pass.dispatch_cyclic(cyclic_task_indices[*slot], *skipped, *late_by);
1588 }
1589 pass.barrier_and_record();
1590}
1591
1592/// Build every `WaitSet` attachment for the task table (`REQ_0268`). In `Grid`
1593/// mode, `TriggerDecl::Interval` cyclic tasks are only *collected* into
1594/// `cyclic_task_indices` / `cyclic_periods` — they are NOT attached as
1595/// individual `WaitSet` triggers. The master timer + `GridTimer` owns their
1596/// wakeups (cross-platform; wake-source wiring is done in the caller).
1597/// Every other decl (and every decl in `Legacy` mode, including `Interval`
1598/// via `attach_interval`) is attached normally. Extracted from `dispatch_loop`
1599/// to keep that function within the cyclomatic-complexity budget.
1600#[allow(clippy::too_many_arguments)]
1601fn build_attachments<'w>(
1602 waitset: &'w WaitSet<ipc::Service>,
1603 tasks: &[TaskEntry],
1604 dispatch_mode: crate::DispatchMode,
1605 listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1606 guards: &mut Vec<WaitSetGuard<'w, 'w, ipc::Service>>,
1607 attachment_to_task: &mut Vec<usize>,
1608 cyclic_task_indices: &mut Vec<usize>,
1609 cyclic_periods: &mut Vec<u64>,
1610) -> Result<(), ExecutorError> {
1611 for (task_idx, task) in tasks.iter().enumerate() {
1612 for decl in &task.decls {
1613 if dispatch_mode == crate::DispatchMode::Grid {
1614 if let TriggerDecl::Interval(d) = decl {
1615 // Grid mode owns cyclic timing via the master timer + GridTimer;
1616 // these decls are NOT attached as individual WaitSet triggers.
1617 cyclic_task_indices.push(task_idx);
1618 cyclic_periods.push(u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
1619 continue;
1620 }
1621 }
1622 let guard = attach_trigger_decl(waitset, listener_storage, decl)?;
1623 guards.push(guard);
1624 attachment_to_task.push(task_idx);
1625 }
1626 }
1627 Ok(())
1628}
1629
1630/// Attaches a single [`TriggerDecl`] to `waitset`, returning the resulting
1631/// guard.
1632///
1633/// Listener-backed declarations (`Subscriber`, `Deadline`, `RawListener`)
1634/// clone the listener `Arc` into `listener_storage` to extend its lifetime to
1635/// the surrounding `dispatch_loop` scope; `Interval` attaches a bare timer.
1636///
1637/// # Safety
1638///
1639/// The returned guard borrows the listener via a raw-pointer cast that erases
1640/// its lifetime. Soundness relies on the caller keeping `listener_storage` (and
1641/// `waitset`) alive for at least as long as the guard, and dropping the guards
1642/// before `listener_storage` — exactly the discipline `dispatch_loop` follows.
1643#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1644fn attach_trigger_decl<'w>(
1645 waitset: &'w WaitSet<ipc::Service>,
1646 listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1647 decl: &TriggerDecl,
1648) -> Result<WaitSetGuard<'w, 'w, ipc::Service>, ExecutorError> {
1649 // Clone the listener Arc and obtain a lifetime-erased reference. SAFETY:
1650 // both `listener_storage` and `waitset` are stack-local in `dispatch_loop`
1651 // and dropped together at its end; guards are dropped before
1652 // `listener_storage`.
1653 let mut listener_ref = |listener: &Arc<crate::trigger::RawListener>| {
1654 listener_storage.push(Arc::clone(listener));
1655 let l_ref = listener_storage.last().unwrap().as_ref();
1656 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
1657 l_ref
1658 };
1659
1660 let guard = match decl {
1661 TriggerDecl::Subscriber { listener } | TriggerDecl::RawListener(listener) => {
1662 waitset.attach_notification(listener_ref(listener))
1663 }
1664 TriggerDecl::Interval(d) => waitset.attach_interval(*d),
1665 TriggerDecl::Deadline { listener, deadline } => {
1666 waitset.attach_deadline(listener_ref(listener), *deadline)
1667 }
1668 };
1669 guard.map_err(ExecutorError::iceoryx2)
1670}
1671
1672/// Per-iteration dispatch context handed to the `WaitSet` callback.
1673///
1674/// `dispatch_loop` rebuilds one of these every iteration and the `WaitSet`
1675/// callback is a thin adapter over [`DispatchPass::process_attachment`]. All
1676/// fields are short-lived borrows / raw pointers into the `Executor` that owns
1677/// the surrounding `dispatch_loop`; their soundness is documented at each use
1678/// site in `dispatch_loop` (same single-threaded, barrier-bounded discipline).
1679struct DispatchPass<'a, 'g, 'w> {
1680 /// `WaitSet` guards, indexed in parallel with `attachment_to_task`.
1681 guards: &'a [WaitSetGuard<'g, 'w, ipc::Service>],
1682 /// Maps guard index to task index in `tasks_ptr`.
1683 attachment_to_task: &'a [usize],
1684 /// Raw pointer to `Executor::tasks`.
1685 tasks_ptr: *mut Vec<TaskEntry>,
1686 /// Raw pointer to `Executor::cycle_stats` (index-aligned with `tasks`).
1687 cycle_stats_ptr: *mut Vec<TaskCycleStats>,
1688 /// Borrow of the executor's observer for the `on_cycle_stats` push.
1689 observer: &'a Arc<dyn Observer>,
1690 /// Raw pointer to `Executor::exec_fault` inner state.
1691 exec_fault_ptr: *const ExecutorFaultAtomic,
1692 /// Raw pointer to `Executor::start_time`.
1693 exec_start_ptr: *const OnceLock<Instant>,
1694 /// Borrow of the executor's telemetry clock, read for each cycle's `pre`.
1695 clock: &'a Arc<dyn MonotonicClock>,
1696 /// Raw pointer to the internal stop listener.
1697 stop_listener_ptr: *const IxListener<ipc::Service>,
1698 /// Borrow of the executor thread pool.
1699 pool: &'a Pool,
1700 /// Refcount-only handle to the per-iteration error slot.
1701 iter_err: &'a Arc<std::sync::Mutex<Option<ExecutorError>>>,
1702}
1703
1704impl DispatchPass<'_, '_, '_> {
1705 /// Dispatches a single task by index for one wakeup: takes the `&mut`
1706 /// borrow into the task table, applies the pre-dispatch fault gate, stashes
1707 /// this cycle's `pending_cycle` timestamp for the post-barrier telemetry
1708 /// fold, and submits the task's work to the pool.
1709 ///
1710 /// Shared by the `WaitSet` callback (`process_attachment`) and — per
1711 /// `REQ_0268` / `ADR_0100` — the forthcoming post-wait absolute-grid
1712 /// dispatch pass, so the per-task barrier/telemetry contract is identical
1713 /// across both call paths.
1714 /// Grid-mode cyclic dispatch: stashes the dispatcher's skipped-slot
1715 /// signal (`REQ_0840`) on the task, then runs the shared dispatch path —
1716 /// the post-barrier `record_cycle_for` `take`s it and folds
1717 /// `grid_slot += 1 + skipped` (`REQ_0106` / `ADR_0101`). Only this
1718 /// discrete count crosses the scheduler→telemetry boundary; timestamps
1719 /// stay on the telemetry clock (`REQ_0268`).
1720 #[allow(unsafe_code)]
1721 fn dispatch_cyclic(&mut self, task_idx: usize, skipped: u64, late_by: u64) {
1722 // SAFETY: same single-writer WaitSet-thread discipline as
1723 // dispatch_task; the pointer is valid for the duration of this call.
1724 let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1725 task.pending_skipped = u32::try_from(skipped).unwrap_or(u32::MAX);
1726 task.pending_late = Some(late_by);
1727 self.dispatch_task(task_idx);
1728 }
1729
1730 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1731 #[allow(unsafe_code)]
1732 fn dispatch_task(&mut self, task_idx: usize) {
1733 // SAFETY: we are the only thread that may touch the task table
1734 // during the callback. wait_and_process_once is single-threaded
1735 // and dispatch_loop holds &mut self. The pointer is valid for the
1736 // duration of this call.
1737 let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1738
1739 // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072). When it
1740 // routes to a (possible) handler, normal dispatch is skipped.
1741 if self.handle_fault_routing(task) {
1742 // REQ_0107: a faulted/fault-routed scan STILL advances
1743 // cycle_index and emits on_cycle_stats, or the executor's count
1744 // desyncs from the connector's join key (FEAT_0038). took/jitter
1745 // are None (poison-safe); the index always moves. Allocation-free:
1746 // a CyclePending { Instant, bool } written onto the TaskEntry,
1747 // no heap.
1748 if task.scan_period.is_some() {
1749 task.pending_cycle = Some(CyclePending {
1750 pre: self.clock.now_nanos(),
1751 faulted: true,
1752 });
1753 }
1754 return;
1755 }
1756
1757 // Stash the pre-dispatch instant so the post-barrier record pass
1758 // can fold this cycle's telemetry. Allocation-free: the timestamp
1759 // lives on the TaskEntry, not in a per-wakeup Vec. `take`n in the
1760 // post-barrier loop below — guarantees exactly-once even if two
1761 // guards map to the same task. `faulted: false`: a task that faulted
1762 // last wakeup and recovered this one records the normal path (the
1763 // whole CyclePending is overwritten, so the flag can't be stale).
1764 task.pending_cycle = Some(CyclePending {
1765 pre: self.clock.now_nanos(),
1766 faulted: false,
1767 });
1768
1769 self.submit_task_job(task);
1770 }
1771
1772 /// Handles a single `WaitSet` wakeup: drains stop notifications, then
1773 /// dispatches every task whose attachment fired. Always returns
1774 /// [`CallbackProgression::Continue`]; termination is decided by the
1775 /// `stop_flag` check in `dispatch_loop` after the callback returns.
1776 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1777 #[allow(unsafe_code)]
1778 fn process_attachment(
1779 &mut self,
1780 attachment_id: &WaitSetAttachmentId<ipc::Service>,
1781 ) -> CallbackProgression {
1782 // Drain stop notifications first (no dispatch — the stop_flag check
1783 // after the callback returns handles termination).
1784 // SAFETY: stop_listener_ptr is valid for the duration of the call;
1785 // the Arc in self.stop_listener keeps it alive.
1786 let stop_l = unsafe { &*self.stop_listener_ptr };
1787 while let Ok(Some(_)) = stop_l.try_wait_one() {}
1788
1789 for i in 0..self.guards.len() {
1790 let guard = &self.guards[i];
1791 let fired =
1792 attachment_id.has_event_from(guard) || attachment_id.has_missed_deadline(guard);
1793 if !fired {
1794 continue;
1795 }
1796 let task_idx = self.attachment_to_task[i];
1797 self.dispatch_task(task_idx);
1798 }
1799
1800 self.barrier_and_record();
1801
1802 CallbackProgression::Continue
1803 }
1804
1805 /// Barrier all submitted pool jobs for this dispatch phase, then fold each
1806 /// task's stashed `pending_cycle` into recorded cycle telemetry. Shared by
1807 /// the `WaitSet` callback (event/fd tasks) and the post-wait grid pass
1808 /// (cyclic tasks, `REQ_0268`). Keyed on `pending_cycle` so it records
1809 /// exactly the tasks dispatched this phase, exactly once.
1810 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1811 #[allow(unsafe_code)]
1812 fn barrier_and_record(&mut self) {
1813 // Wait for all submitted jobs to finish before leaving the callback
1814 // scope (validates item_ptr safety contract). The barrier also makes
1815 // every worker's `last_took_ns` Release-store visible to the record
1816 // pass below.
1817 self.pool.barrier();
1818
1819 // Post-barrier telemetry fold. The source of truth for "this task was
1820 // dispatched this wakeup and owes a record" is `pending_cycle`,
1821 // set in the dispatch loop above — not the guard fired-status. Keying
1822 // solely on the stash (rather than re-querying `has_event_from`)
1823 // removes any dependency on the fired-status query being stable across
1824 // a second scan, so a dispatched cycle can never be silently
1825 // under-recorded (which would lag `cycle_index` — the desync FEAT_0038
1826 // must avoid). `take` clears the stash, guaranteeing exactly-once.
1827 // Allocation-free: iterate task indices in place.
1828 // SAFETY: same single-writer WaitSet-thread discipline as the dispatch
1829 // loop above; barrier-bounded, no in-flight pool job aliases `tasks`.
1830 let task_count = unsafe { (*self.tasks_ptr).len() };
1831 for task_idx in 0..task_count {
1832 // SAFETY: single-writer WaitSet thread; borrow released before
1833 // the record_cycle_for call (which re-derefs tasks_ptr).
1834 let pending = unsafe { (&mut *self.tasks_ptr)[task_idx].pending_cycle.take() };
1835 if let Some(CyclePending { pre, faulted }) = pending {
1836 self.record_cycle_for(task_idx, faulted, pre);
1837 }
1838 }
1839 }
1840
1841 /// Fold one scan cycle's telemetry and push it to the observer. Called
1842 /// once per fired CYCLIC attachment per wakeup. `faulted = true` (Task 10)
1843 /// means the scan was skipped/errored: `took`/`jitter`/`lateness` are
1844 /// unmeasured. Event-driven tasks (no `scan_period`) are skipped entirely
1845 /// (`REQ_0106`).
1846 #[allow(unsafe_code)]
1847 fn record_cycle_for(&mut self, task_idx: usize, faulted: bool, pre_ns: u64) {
1848 // SAFETY: single-writer WaitSet thread; same discipline as tasks_ptr.
1849 let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1850 let Some(period) = task.scan_period else {
1851 return; // event-driven: no cycle telemetry
1852 };
1853 let period_ns = u64::try_from(period.as_nanos()).unwrap_or(u64::MAX);
1854
1855 // Release/Acquire pairing with the worker store (M2): `swap` acquires
1856 // the worker's Release-store and resets the sentinel atomically.
1857 let took_raw = task.last_took_ns.swap(u64::MAX, Ordering::AcqRel);
1858 let took = if faulted || took_raw == u64::MAX {
1859 None
1860 } else {
1861 Some(took_raw)
1862 };
1863
1864 // actual_period + jitter vs the previous dispatch (REQ_0101). Always
1865 // advance `last_dispatch` (even on a faulted attempt) so the next
1866 // cycle's period is measured from this wakeup. `actual_period` is
1867 // `None` on the very first cycle (no previous timestamp); jitter is
1868 // additionally suppressed on a faulted scan (poison-safe: REQ_0107).
1869 let actual_period = task
1870 .last_dispatch
1871 .replace(pre_ns)
1872 .map(|prev| pre_ns.saturating_sub(prev));
1873 let jitter = if faulted {
1874 None
1875 } else {
1876 actual_period.map(|ap| ap.abs_diff(period_ns))
1877 };
1878
1879 // Per-task lateness grid (REQ_0106 / ADR_0101): the task's first
1880 // record anchors the grid at slot 0 (a faulted first scan anchors
1881 // too, its dispatch instant is real); every later record advances by
1882 // exactly one slot plus the dispatcher's skipped-slot signal
1883 // (REQ_0840). Never reconstructed from the measured period, which
1884 // over-counts on coalesced catch-up wakes and fabricates negative
1885 // lateness (issue #46). The anchor is the first dispatch's NOMINAL
1886 // slot — `pre` back-dated by the dispatcher's `late_by` signal — so
1887 // a late process start is reported as real first-cycle lateness
1888 // instead of becoming a permanent negative floor on later on-grid
1889 // cycles. `late_by` is a scheduling-clock difference applied to a
1890 // telemetry-clock instant: domain-safe, both are monotonic ns.
1891 // Without a dispatcher signal (Legacy mode, event-driven tasks) the
1892 // anchor stays the first observed `pre`.
1893 let skipped = core::mem::take(&mut task.pending_skipped);
1894 let late_by = task.pending_late.take();
1895 let first = task.grid_epoch.is_none();
1896 let grid_epoch = *task
1897 .grid_epoch
1898 .get_or_insert_with(|| pre_ns.saturating_sub(late_by.unwrap_or(0)));
1899 if !first {
1900 task.grid_slot = task
1901 .grid_slot
1902 .saturating_add(1)
1903 .saturating_add(u64::from(skipped));
1904 }
1905 let grid_slot = task.grid_slot;
1906
1907 // SAFETY: cycle_stats is index-aligned with tasks; single-writer.
1908 let stats = unsafe { &mut (&mut *self.cycle_stats_ptr)[task_idx] };
1909
1910 // Deadline lateness (REQ_0106): signed offset of the actual start
1911 // (`pre_ns`) from its nominal grid point `grid_epoch + grid_slot *
1912 // period`. Positive => started late; negative => early. Captures
1913 // steady drift (jitter is blind to a constant offset; lateness is
1914 // not). A dispatcher skip re-anchors via `skipped` above; absent a
1915 // signal (e.g. `Legacy` mode, which never signals), a whole missed
1916 // period honestly remains visible as a persistent offset rather than
1917 // being absorbed.
1918 let lateness = if period_ns > 0 && !faulted {
1919 let elapsed_ns = i64::try_from(pre_ns.saturating_sub(grid_epoch)).unwrap_or(i64::MAX);
1920 let expected_ns =
1921 i64::try_from(u128::from(grid_slot) * u128::from(period_ns)).unwrap_or(i64::MAX);
1922 Some(elapsed_ns.saturating_sub(expected_ns))
1923 } else {
1924 None
1925 };
1926
1927 let cycle_index = stats.record_cycle(took, jitter, lateness);
1928
1929 let obs = CycleObservation {
1930 cycle_index,
1931 task_id: task.id.clone(),
1932 task_index: u32::try_from(task_idx).unwrap_or(u32::MAX),
1933 faulted,
1934 period_ns,
1935 pre_ns,
1936 actual_period_ns: actual_period,
1937 jitter_ns: jitter,
1938 lateness_ns: lateness,
1939 skipped_slots: skipped,
1940 took_ns: took,
1941 };
1942 self.observer.on_cycle_stats(&obs);
1943 }
1944
1945 /// Applies the pre-dispatch fault gate for `Single`/`Chain` tasks.
1946 ///
1947 /// Returns `true` when the task is routed to its fault handler (or
1948 /// silently skipped because no handler is registered) and normal dispatch
1949 /// must therefore be skipped. Returns `false` when normal dispatch should
1950 /// proceed. `Graph` tasks always return `false` — they use their own
1951 /// per-vertex scheduling and are out of scope for `FEAT_0018`.
1952 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1953 fn handle_fault_routing(&self, task: &mut TaskEntry) -> bool {
1954 if !matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1955 return false;
1956 }
1957
1958 // SAFETY: exec_fault_ptr derefs into the Executor that owns the
1959 // surrounding dispatch_loop — alive for this call's lifetime.
1960 let exec_faulted = matches!(
1961 unsafe { &*self.exec_fault_ptr }.load(0, 0),
1962 ExecutorFaultState::Faulted { .. }
1963 );
1964 let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1965 let task_state = task.fault.load(task_budget_ms);
1966
1967 // Lazy cascade: if executor is `Faulted` and task is still `Running`,
1968 // silently transition the task to `Faulted{ExecutorFaulted}`. No
1969 // `on_task_fault` — the Observer already heard about the executor-wide
1970 // fault via `on_executor_fault` (cascade-noise invariant, FEAT_0018
1971 // §4.6).
1972 let task_faulted = if exec_faulted && matches!(task_state, FaultState::Running) {
1973 // SAFETY: exec_start_ptr derefs into the same Executor owning the
1974 // dispatch_loop. The OnceLock is wait-free.
1975 let exec_start = *unsafe { &*self.exec_start_ptr }.get_or_init(std::time::Instant::now);
1976 let since_ms = instant_to_since_ms(std::time::Instant::now(), exec_start);
1977 let _ = task.fault.swap(
1978 FaultState::Faulted {
1979 reason: FaultReason::ExecutorFaulted,
1980 since_ms,
1981 },
1982 task_budget_ms,
1983 );
1984 true
1985 } else {
1986 matches!(task_state, FaultState::Faulted { .. })
1987 };
1988
1989 if !(exec_faulted || task_faulted) {
1990 return false;
1991 }
1992
1993 // If a handler is registered, dispatch it. Otherwise, skip dispatch
1994 // entirely this wakeup.
1995 if let Some(handler_box) = task.handler_job.as_deref_mut() {
1996 let job_ptr: *mut (dyn FnMut() + Send) = handler_box as *mut (dyn FnMut() + Send);
1997 // SAFETY: same as the main-job dispatch below — handler_job is
1998 // owned by the TaskEntry; pool.barrier() awaits its completion
1999 // before the next callback.
2000 unsafe {
2001 self.pool
2002 .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
2003 }
2004 }
2005 true
2006 }
2007
2008 /// Dispatches `task`'s normal (non-fault) work for one wakeup.
2009 ///
2010 /// `Single`/`Chain` tasks submit their pre-built job to the pool;
2011 /// `Graph` tasks drive one pass and capture the first item error into the
2012 /// per-iteration error slot.
2013 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2014 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
2015 fn submit_task_job(&self, task: &mut TaskEntry) {
2016 match &mut task.kind {
2017 TaskKind::Single(_) | TaskKind::Chain(_) => {
2018 // The dispatch closure was pre-allocated at task-add time and
2019 // stashed on `task.job`. Submit it via `submit_borrowed` — no
2020 // per-iteration Box allocation. Required by REQ_0060.
2021 #[allow(clippy::expect_used)]
2022 // fail-fast: Single/Chain task.job is always Some — set at add time in build_single_job/build_chain_job and never cleared
2023 let job_box = task
2024 .job
2025 .as_deref_mut()
2026 .expect("Single/Chain tasks carry a pre-built job");
2027 let job_ptr: *mut (dyn FnMut() + Send) = job_box as *mut (dyn FnMut() + Send);
2028 // SAFETY: the closure lives in `task.job`, owned by
2029 // `self.tasks[task_idx]`; `tasks_ptr` is sound for the
2030 // duration of this callback. `pool.barrier()` in
2031 // `process_attachment` finishes the closure invocation before
2032 // the next iteration's callback. The WaitSet thread does not
2033 // touch the closure between this submit and that barrier.
2034 unsafe {
2035 self.pool
2036 .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
2037 }
2038 }
2039 TaskKind::Graph(graph) => {
2040 // Outer driver runs on the WaitSet thread; vertices run on the
2041 // pool. The graph holds its own pre-built per-vertex closures
2042 // and SPSC ready ring (REQ_0060), so dispatch is
2043 // allocation-free in steady state.
2044 let outcome = graph.run_once_borrowed(self.pool);
2045 if let Some(source) = outcome.error {
2046 #[allow(clippy::unwrap_used)]
2047 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
2048 let mut g = self.iter_err.lock().unwrap();
2049 if g.is_none() {
2050 *g = Some(ExecutorError::Item {
2051 task_id: task.id.clone(),
2052 source,
2053 });
2054 }
2055 }
2056 let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
2057 }
2058 }
2059 }
2060}
2061
2062/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
2063/// `Pool::submit`. The send is safe because:
2064/// 1. The executor guarantees at most one invocation of a given item at a
2065/// time (via `pool.barrier()` before the pointer is reused).
2066/// 2. `ExecutableItem: Send`, so moving the pointee across threads is sound
2067/// when no aliasing exists.
2068#[allow(unsafe_code)]
2069struct SendItemPtr {
2070 ptr: *mut dyn ExecutableItem,
2071}
2072
2073impl SendItemPtr {
2074 fn new(ptr: *mut dyn ExecutableItem) -> Self {
2075 Self { ptr }
2076 }
2077
2078 /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
2079 /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
2080 /// dispatch closure to be reusable across iterations without allocation).
2081 fn get(&self) -> *mut dyn ExecutableItem {
2082 self.ptr
2083 }
2084}
2085
2086// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
2087// closure can borrow `&SendItemPtr` per invocation without making the
2088// closure itself `!Send`.
2089#[allow(unsafe_code)]
2090unsafe impl Send for SendItemPtr {}
2091#[allow(unsafe_code)]
2092unsafe impl Sync for SendItemPtr {}
2093
2094/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
2095/// closure can iterate the chain's items in place without first
2096/// collecting them into a freshly-allocated `Vec`. The send is safe
2097/// for the same reason as [`SendItemPtr`] (see above): the executor
2098/// holds `&mut self` for the duration of `dispatch_loop`, and the
2099/// `pool.barrier()` at the end of each callback ensures the closure
2100/// has finished using this pointer before the Vec could be touched
2101/// from the `WaitSet` thread again. The Vec is never resized after
2102/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
2103/// allocate per iteration.
2104#[allow(unsafe_code)]
2105struct SendChainPtr {
2106 ptr: *mut Vec<Box<dyn ExecutableItem>>,
2107}
2108
2109impl SendChainPtr {
2110 fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
2111 Self { ptr }
2112 }
2113
2114 fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
2115 self.ptr
2116 }
2117}
2118
2119// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
2120// borrow `&SendChainPtr` per invocation while staying `Send`.
2121#[allow(unsafe_code)]
2122unsafe impl Send for SendChainPtr {}
2123#[allow(unsafe_code)]
2124unsafe impl Sync for SendChainPtr {}
2125
2126/// Captured state needed by a dispatch closure to perform post-execute
2127/// fault detection. All fields are `Arc`-shared with the owning
2128/// `Executor` and `TaskEntry` so the closure can read/write them
2129/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
2130/// `REQ_0102`.
2131struct FaultDispatchCtx {
2132 /// Per-task budget. `None` for chain / graph tasks (no per-task
2133 /// check) — the executor-wide iteration budget still applies.
2134 task_budget: Option<Duration>,
2135 /// Per-task fault state (shared with `TaskEntry::fault`).
2136 task_fault: Arc<FaultAtomic>,
2137 /// Per-task monotonic overrun counter (shared with
2138 /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
2139 overrun_count: Arc<AtomicU64>,
2140 /// Executor-wide iteration budget. `None` means no executor-wide
2141 /// check.
2142 iteration_budget: Option<Duration>,
2143 /// Executor-wide fault state (shared with `Executor::exec_fault`).
2144 exec_fault: Arc<ExecutorFaultAtomic>,
2145 /// Executor-wide offending-task index storage (shared with
2146 /// `Executor::exec_fault_task_idx`).
2147 exec_fault_task_idx: Arc<AtomicU32>,
2148 /// Executor-wide breached-budget storage (shared with
2149 /// `Executor::exec_fault_budget_ms`).
2150 exec_fault_budget_ms: Arc<AtomicU32>,
2151 /// Index of this task in the executor's task table.
2152 task_idx_u32: u32,
2153 /// Executor start time (shared with `Executor::start_time`).
2154 exec_start: Arc<OnceLock<Instant>>,
2155 /// Observer for `on_task_fault` / `on_executor_fault` notifications.
2156 observer: Arc<dyn Observer>,
2157}
2158
2159/// Validate a task's collected trigger declarations before it joins the task
2160/// table (`REQ_0268`). Applied at every add path — single, chain head, and
2161/// fault-handler main — at the point the `TriggerDecl`s are first available,
2162/// regardless of [`DispatchMode`] (the rejected shapes are ill-defined in any
2163/// mode; Legacy is temporary).
2164///
2165/// Rejects two shapes:
2166///
2167/// 1. **Cyclic AND event-driven** — a task carrying both an `Interval` decl and
2168/// any listener-backed decl (`Subscriber` / `Deadline` / `RawListener`). Per
2169/// `REQ_0106` a task is cyclic XOR event-driven: cyclic tasks have a
2170/// period/lateness, event-driven tasks do not. Allowing both would dispatch
2171/// and record the task twice in one wake (phase-a event + phase-b grid),
2172/// desyncing the `FEAT_0038` `cycle_index` join key (`REQ_0107`).
2173/// 2. **Zero-period interval** — an `Interval(Duration::ZERO)` busy-spins the
2174/// grid (`GridTimer::next_timeout` returns `0` every wake and `take_due`
2175/// re-fires without advancing). A zero scan period is nonsensical.
2176fn validate_decls(id: &TaskId, decls: &[crate::trigger::TriggerDecl]) -> Result<(), ExecutorError> {
2177 use crate::trigger::TriggerDecl;
2178
2179 let has_interval = decls.iter().any(|d| matches!(d, TriggerDecl::Interval(_)));
2180 let has_listener = decls.iter().any(|d| {
2181 matches!(
2182 d,
2183 TriggerDecl::Subscriber { .. }
2184 | TriggerDecl::Deadline { .. }
2185 | TriggerDecl::RawListener(_)
2186 )
2187 });
2188
2189 if has_interval && has_listener {
2190 return Err(ExecutorError::DeclareTriggers(format!(
2191 "task `{id}` declares both an interval (cyclic) and a listener \
2192 (event-driven) trigger; a task may be cyclic (interval) or \
2193 event-driven (listener) but not both — split it into two tasks"
2194 )));
2195 }
2196
2197 if decls
2198 .iter()
2199 .any(|d| matches!(d, TriggerDecl::Interval(dur) if dur.is_zero()))
2200 {
2201 return Err(ExecutorError::DeclareTriggers(format!(
2202 "task `{id}` declares a zero-duration interval; a cyclic scan \
2203 period must be strictly positive"
2204 )));
2205 }
2206
2207 Ok(())
2208}
2209
2210/// Extract the declared scan period (first `Interval` trigger) from a task's
2211/// trigger declarations, or `None` for event-driven tasks.
2212fn scan_period_from_decls(decls: &[crate::trigger::TriggerDecl]) -> Option<Duration> {
2213 decls.iter().find_map(|d| match d {
2214 crate::trigger::TriggerDecl::Interval(dur) => Some(*dur),
2215 _ => None,
2216 })
2217}
2218
2219/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
2220///
2221/// The returned closure is stored on `TaskEntry::job` and invoked once
2222/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
2223/// performs no allocation. The closure captures Arc clones of the
2224/// executor's shared state — those clones are refcount-only at build
2225/// time and are reused on every dispatch. Required for `REQ_0060`.
2226#[allow(clippy::too_many_arguments)]
2227fn build_single_job(
2228 id: TaskId,
2229 stop: Stoppable,
2230 obs: Arc<dyn Observer>,
2231 mon: Arc<dyn ExecutionMonitor>,
2232 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2233 app_id: Option<u32>,
2234 app_inst: Option<u32>,
2235 item_ptr: SendItemPtr,
2236 fault_ctx: FaultDispatchCtx,
2237 last_took_ns: Arc<AtomicU64>,
2238 clock: Arc<dyn MonotonicClock>,
2239) -> Box<dyn FnMut() + Send + 'static> {
2240 Box::new(move || {
2241 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2242 if let Some(aid) = app_id {
2243 obs.on_app_start(id.clone(), aid, app_inst);
2244 }
2245 let raw = item_ptr.get();
2246 let started = std::time::Instant::now();
2247 // Telemetry `took` is measured on the injected clock (REQ_0105) so a
2248 // MockClock can make it exact; the real `started`/`took` below stay on
2249 // the system clock for the monitor and fault-budget paths.
2250 let tele_t0 = clock.now_nanos();
2251 mon.pre_execute(id.clone(), started);
2252 // SAFETY: barrier() pairs with this invocation; the WaitSet
2253 // thread does not touch the item between `submit_borrowed` and
2254 // the matching `barrier()`. See SendItemPtr safety doc.
2255 #[allow(unsafe_code)]
2256 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2257 let took = started.elapsed();
2258 // Release pairs with the WaitSet-thread Acquire (swap) in
2259 // `record_cycle_for` (M2). `pool.barrier()` also fences, but the
2260 // explicit pairing documents intent and is robust on weak-memory archs.
2261 last_took_ns.store(clock.now_nanos().saturating_sub(tele_t0), Ordering::Release);
2262 mon.post_execute(id.clone(), started, took, res.is_ok());
2263 if let Err(ref e) = res {
2264 obs.on_app_error(id.clone(), e.as_ref());
2265 }
2266 if app_id.is_some() {
2267 obs.on_app_stop(id.clone());
2268 }
2269 post_execute_detect_fault(&id, started, took, &fault_ctx);
2270 record_first_err(&err_slot, &id, res);
2271 })
2272}
2273
2274/// Build the per-iteration dispatch closure for a fault-handler item.
2275///
2276/// Mirrors [`build_single_job`] in every detail (same monitor /
2277/// observer / first-error capture wiring) but owns the
2278/// `Box<dyn ExecutableItem>` directly inside the closure instead of
2279/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
2280/// owner inside [`TaskEntry`] — the handler closure stored in
2281/// `handler_job` is the sole owner — so the simpler owning form is
2282/// both sound and avoids the aliasing dance the main item needs.
2283/// (Unlike [`build_single_job`], this closure does NOT update
2284/// `last_took_ns` — the handler runs in place of the main item, so the
2285/// main item's `last_took_ns` keeps its sentinel `u64::MAX` = "no
2286/// sample this cycle".)
2287/// `REQ_0072`.
2288#[allow(clippy::too_many_arguments)]
2289fn build_handler_job(
2290 id: TaskId,
2291 stop: Stoppable,
2292 obs: Arc<dyn Observer>,
2293 mon: Arc<dyn ExecutionMonitor>,
2294 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2295 app_id: Option<u32>,
2296 app_inst: Option<u32>,
2297 mut handler: Box<dyn ExecutableItem>,
2298 fault_ctx: FaultDispatchCtx,
2299) -> Box<dyn FnMut() + Send + 'static> {
2300 Box::new(move || {
2301 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2302 if let Some(aid) = app_id {
2303 obs.on_app_start(id.clone(), aid, app_inst);
2304 }
2305 let started = std::time::Instant::now();
2306 mon.pre_execute(id.clone(), started);
2307 let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
2308 let took = started.elapsed();
2309 mon.post_execute(id.clone(), started, took, res.is_ok());
2310 if let Err(ref e) = res {
2311 obs.on_app_error(id.clone(), e.as_ref());
2312 }
2313 if app_id.is_some() {
2314 obs.on_app_stop(id.clone());
2315 }
2316 // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
2317 // budget keeps the task in `Faulted` (state already `Faulted`),
2318 // `overrun_count` increments, NO new `on_task_fault` fires —
2319 // the `matches!(prev, FaultState::Running)` gate inside
2320 // `post_execute_detect_fault` enforces that.
2321 post_execute_detect_fault(&id, started, took, &fault_ctx);
2322 record_first_err(&err_slot, &id, res);
2323 })
2324}
2325
2326/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
2327#[allow(clippy::too_many_arguments)]
2328fn build_chain_job(
2329 id: TaskId,
2330 stop: Stoppable,
2331 obs: Arc<dyn Observer>,
2332 mon: Arc<dyn ExecutionMonitor>,
2333 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2334 chain_ptr: SendChainPtr,
2335 fault_ctx: FaultDispatchCtx,
2336 last_took_ns: Arc<AtomicU64>,
2337 clock: Arc<dyn MonotonicClock>,
2338) -> Box<dyn FnMut() + Send + 'static> {
2339 Box::new(move || {
2340 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2341 // Overall chain scan timer — the chain's `took` is the elapsed
2342 // telemetry-clock time from the first item's pre-execute to the last
2343 // item's completion (or early break), mirroring the single-item `took`
2344 // notion (REQ_0105). Per-item monitor timing uses each item's own
2345 // real-clock `started` below.
2346 let chain_tele_t0 = clock.now_nanos();
2347 // SAFETY: barrier() pairs with this invocation; the chain Vec
2348 // and the items it owns are not touched by the WaitSet thread
2349 // until barrier() returns. See SendChainPtr safety doc.
2350 #[allow(unsafe_code)]
2351 let chain_items = unsafe { &mut *chain_ptr.get() };
2352 for item_box in chain_items.iter_mut() {
2353 let app_id = item_box.app_id();
2354 let app_inst = item_box.app_instance_id();
2355 if let Some(aid) = app_id {
2356 obs.on_app_start(id.clone(), aid, app_inst);
2357 }
2358 let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
2359 let started = std::time::Instant::now();
2360 mon.pre_execute(id.clone(), started);
2361 #[allow(unsafe_code)]
2362 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2363 let took = started.elapsed();
2364 mon.post_execute(id.clone(), started, took, res.is_ok());
2365 if let Err(ref e) = res {
2366 obs.on_app_error(id.clone(), e.as_ref());
2367 }
2368 if app_id.is_some() {
2369 obs.on_app_stop(id.clone());
2370 }
2371 // Per-item post-execute fault detection. `task_budget` is
2372 // `None` for chains (see `add_chain_with_id_boxed`), so the
2373 // per-task check no-ops; the executor-wide iteration-budget
2374 // check still fires per item. `REQ_0071`.
2375 post_execute_detect_fault(&id, started, took, &fault_ctx);
2376 match res {
2377 Ok(crate::ControlFlow::Continue) => {}
2378 Ok(crate::ControlFlow::StopChain) => break,
2379 Err(_) => {
2380 record_first_err(&err_slot, &id, res);
2381 break;
2382 }
2383 }
2384 }
2385 // Release pairs with the WaitSet-thread Acquire (swap) in
2386 // `record_cycle_for` (M2). See the Single-job store for the rationale.
2387 last_took_ns.store(
2388 clock.now_nanos().saturating_sub(chain_tele_t0),
2389 Ordering::Release,
2390 );
2391 })
2392}
2393
2394#[derive(Debug)]
2395struct PanickedTask(String);
2396
2397impl core::fmt::Display for PanickedTask {
2398 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2399 write!(f, "task panicked: {}", self.0)
2400 }
2401}
2402
2403impl std::error::Error for PanickedTask {}
2404
2405/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
2406fn run_item_catch_unwind(
2407 item: &mut dyn ExecutableItem,
2408 ctx: &mut crate::context::Context<'_>,
2409) -> crate::ExecuteResult {
2410 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
2411 |payload| {
2412 let msg =
2413 panic_payload_message(&*payload).unwrap_or_else(|| "panicked task".to_string());
2414 Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
2415 },
2416 )
2417}
2418
2419/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
2420/// without depending on its private name.
2421pub(crate) fn run_item_catch_unwind_external(
2422 item: &mut dyn ExecutableItem,
2423 ctx: &mut crate::context::Context<'_>,
2424) -> crate::ExecuteResult {
2425 run_item_catch_unwind(item, ctx)
2426}
2427
2428/// Record the first error into `slot`. Subsequent errors are silently dropped.
2429fn record_first_err(
2430 slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
2431 id: &TaskId,
2432 res: crate::ExecuteResult,
2433) {
2434 if let Err(source) = res {
2435 let mut g = slot.lock().unwrap();
2436 if g.is_none() {
2437 *g = Some(ExecutorError::Item {
2438 task_id: id.clone(),
2439 source,
2440 });
2441 }
2442 }
2443}
2444
2445/// Post-execute fault detection — runs on a pool worker AFTER
2446/// `mon.post_execute` so the full `took` is available. Implements:
2447///
2448/// * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
2449/// `overrun_count` on every breach, transitions
2450/// `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
2451/// breaches keep the state `Faulted` and do NOT re-fire the
2452/// observer).
2453/// * `REQ_0071` — executor-wide iteration overrun: transitions
2454/// `Running -> Faulted{IterationBudgetExceeded}` exactly once;
2455/// cascade to per-task state is LAZY (see the pre-dispatch block
2456/// in `dispatch_loop`), so the per-task `on_task_fault` does NOT
2457/// fire during cascade — only `on_executor_fault` does.
2458fn post_execute_detect_fault(
2459 id: &TaskId,
2460 started: Instant,
2461 took: Duration,
2462 fault_ctx: &FaultDispatchCtx,
2463) {
2464 // REQ_0070 / REQ_0102 — per-task budget overrun.
2465 if let Some(budget) = fault_ctx.task_budget {
2466 if took > budget {
2467 fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
2468 let took_ms = duration_to_ms_sat(took);
2469 let budget_ms = duration_to_ms_sat(budget);
2470 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2471 let since_ms = instant_to_since_ms(started, exec_start);
2472 let new_state = FaultState::Faulted {
2473 reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
2474 since_ms,
2475 };
2476 let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
2477 if matches!(prev, FaultState::Running) {
2478 fault_ctx.observer.on_task_fault(
2479 id.clone(),
2480 FaultReason::BudgetExceeded { took_ms, budget_ms },
2481 );
2482 }
2483 }
2484 }
2485
2486 // REQ_0071 — executor-wide iteration overrun.
2487 if let Some(iter_budget) = fault_ctx.iteration_budget {
2488 if took > iter_budget {
2489 let took_ms = duration_to_ms_sat(took);
2490 let budget_ms = duration_to_ms_sat(iter_budget);
2491 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2492 let since_ms = instant_to_since_ms(started, exec_start);
2493 fault_ctx
2494 .exec_fault_task_idx
2495 .store(fault_ctx.task_idx_u32, Ordering::Release);
2496 fault_ctx
2497 .exec_fault_budget_ms
2498 .store(budget_ms, Ordering::Release);
2499 let new_state = ExecutorFaultState::Faulted {
2500 reason: ExecutorFaultReason::IterationBudgetExceeded {
2501 task_idx: fault_ctx.task_idx_u32,
2502 took_ms,
2503 budget_ms,
2504 },
2505 since_ms,
2506 };
2507 let prev = fault_ctx
2508 .exec_fault
2509 .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
2510 if matches!(prev, ExecutorFaultState::Running) {
2511 fault_ctx.observer.on_executor_fault(
2512 ExecutorFaultReason::IterationBudgetExceeded {
2513 task_idx: fault_ctx.task_idx_u32,
2514 took_ms,
2515 budget_ms,
2516 },
2517 );
2518 // NO eager cascade here. Cascade is lazy: the
2519 // pre-dispatch block in `dispatch_loop` transitions
2520 // each `Running` task to `Faulted{ExecutorFaulted}` on
2521 // the next wakeup — silently, so per-task observers
2522 // do not fire (see §4.6 invariant on cascade-noise).
2523 }
2524 }
2525 }
2526}
2527
2528// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
2529
2530/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
2531/// into a registered task.
2532pub struct ExecutorGraphBuilder<'e> {
2533 executor: &'e mut Executor,
2534 builder: crate::graph::GraphBuilder,
2535 custom_id: Option<TaskId>,
2536}
2537
2538impl ExecutorGraphBuilder<'_> {
2539 /// Add a vertex to the graph; returns its handle.
2540 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
2541 self.builder.vertex(item)
2542 }
2543
2544 /// Add a directed edge from one vertex to another.
2545 pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
2546 self.builder.edge(from, to);
2547 self
2548 }
2549
2550 /// Designate the root vertex (its triggers gate the graph).
2551 pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
2552 self.builder.root(v);
2553 self
2554 }
2555
2556 /// Override the auto-generated id with a custom one.
2557 pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
2558 self.custom_id = Some(id.into());
2559 self
2560 }
2561
2562 /// Validate and register the graph. Returns the task id.
2563 ///
2564 /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
2565 /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
2566 /// precedence over the auto-generated id.
2567 pub fn build(self) -> Result<TaskId, ExecutorError> {
2568 let g = self.builder.finish()?;
2569 // Root vertex's task_id() override wins over the custom id, which wins
2570 // over the auto-generated fallback.
2571 let auto_id = || {
2572 TaskId::new(format!(
2573 "graph-{}",
2574 self.executor.next_id.fetch_add(1, Ordering::SeqCst)
2575 ))
2576 };
2577 let id = g
2578 .root_task_id()
2579 .map(TaskId::new)
2580 .or(self.custom_id)
2581 .unwrap_or_else(auto_id);
2582 let decls = g.decls.clone();
2583 // The graph root's decls become a grid-registered TaskEntry, so the same
2584 // cyclic-XOR-event-driven / non-zero-period validation that guards the
2585 // single-item, fault-handler, and chain add paths must guard this one too
2586 // (REQ_0268). Non-root vertex triggers never reach a TaskEntry — they are
2587 // discarded in `GraphBuilder::collect_root_decls` — so validating the root
2588 // decls is sufficient.
2589 validate_decls(&id, &decls)?;
2590 let scan_period = scan_period_from_decls(&decls);
2591
2592 // Box the graph for address stability — per-vertex dispatch
2593 // closures capture `*const Graph` and must not see it move.
2594 let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
2595 // Pre-build the per-vertex closures now that we know the
2596 // task_id and have access to the executor's shared state.
2597 graph_box.prepare_dispatch(
2598 id.clone(),
2599 self.executor.stoppable.clone(),
2600 Arc::clone(&self.executor.observer),
2601 Arc::clone(&self.executor.monitor),
2602 Arc::clone(&self.executor.iter_err),
2603 );
2604
2605 self.executor.tasks.push(TaskEntry {
2606 id: id.clone(),
2607 kind: TaskKind::Graph(graph_box),
2608 decls,
2609 // Graph tasks dispatch their vertices via `vertex_jobs`
2610 // stored inside the `Graph`; the per-task `job` slot
2611 // is unused for graphs.
2612 job: None,
2613 // TODO(post-Task-10): graph budgets carried separately; for now None.
2614 budget: None,
2615 fault: Arc::new(FaultAtomic::new()),
2616 overrun_count: Arc::new(AtomicU64::new(0)),
2617 handler_job: None,
2618 scan_period,
2619 // Graphs dispatch vertices via their own path and do not ferry a
2620 // per-task `took`; sentinel = "no sample". Wired for struct
2621 // completeness; nothing reads it yet (Task 6).
2622 last_took_ns: Arc::new(AtomicU64::new(u64::MAX)),
2623 last_dispatch: None,
2624 grid_slot: 0,
2625 grid_epoch: None,
2626 pending_skipped: 0,
2627 pending_late: None,
2628 pending_cycle: None,
2629 });
2630 self.executor
2631 .cycle_stats
2632 .push(TaskCycleStats::new(self.executor.stats_window));
2633 Ok(id)
2634 }
2635}
2636
2637// ── Unit tests ────────────────────────────────────────────────────────────────
2638
2639#[cfg(test)]
2640mod tests {
2641 use super::*;
2642 use crate::{ControlFlow, item};
2643 use iceoryx2::prelude::ZeroCopySend;
2644
2645 /// Minimal zero-copy payload for tests that need a real subscriber to
2646 /// produce a listener-backed trigger decl.
2647 #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
2648 #[repr(C)]
2649 struct Msg(u32);
2650
2651 #[test]
2652 fn add_returns_unique_ids() {
2653 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2654 let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2655 let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2656 assert_ne!(a, b);
2657 }
2658
2659 #[test]
2660 fn grid_mode_dispatches_cyclic_task_each_cycle() {
2661 use std::sync::Arc;
2662 use std::sync::atomic::{AtomicU64, Ordering};
2663 let hits = Arc::new(AtomicU64::new(0));
2664 let h = Arc::clone(&hits);
2665 let mut exec = Executor::builder()
2666 .worker_threads(0)
2667 .dispatch_mode(crate::DispatchMode::Grid)
2668 .build()
2669 .expect("build");
2670 exec.add(crate::item::item_with_triggers(
2671 move |d| {
2672 d.interval(std::time::Duration::from_millis(1));
2673 Ok(())
2674 },
2675 move |_ctx| {
2676 h.fetch_add(1, Ordering::Relaxed);
2677 Ok(ControlFlow::Continue)
2678 },
2679 ))
2680 .expect("add");
2681 exec.run_n(10).expect("run");
2682 assert!(
2683 hits.load(Ordering::Relaxed) >= 8,
2684 "grid mode under-dispatched: {}",
2685 hits.load(Ordering::Relaxed)
2686 );
2687 }
2688
2689 #[test]
2690 fn legacy_mode_dispatches_cyclic_task_each_cycle() {
2691 use std::sync::Arc;
2692 use std::sync::atomic::{AtomicU64, Ordering};
2693 let hits = Arc::new(AtomicU64::new(0));
2694 let h = Arc::clone(&hits);
2695 let mut exec = Executor::builder()
2696 .worker_threads(0)
2697 .dispatch_mode(crate::DispatchMode::Legacy)
2698 .build()
2699 .expect("build");
2700 exec.add(crate::item::item_with_triggers(
2701 move |d| {
2702 d.interval(std::time::Duration::from_millis(1));
2703 Ok(())
2704 },
2705 move |_ctx| {
2706 h.fetch_add(1, Ordering::Relaxed);
2707 Ok(ControlFlow::Continue)
2708 },
2709 ))
2710 .expect("add");
2711 exec.run_n(10).expect("run");
2712 assert!(
2713 hits.load(Ordering::Relaxed) >= 8,
2714 "legacy mode under-dispatched: {}",
2715 hits.load(Ordering::Relaxed)
2716 );
2717 }
2718
2719 // --- REQ_0268 trigger-combination validation (Fix 1 / Fix 3) ---
2720
2721 #[test]
2722 fn add_rejects_cyclic_plus_subscriber_combination() {
2723 use core::time::Duration;
2724 // A task declaring BOTH an Interval and a listener-backed trigger is
2725 // ill-defined (cyclic XOR event-driven, REQ_0106) and must be rejected
2726 // at add time. We use a real subscriber so the listener decl is genuine.
2727 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2728 let ch = exec.channel::<Msg>("taktora.test.req0268.combo").unwrap();
2729 let sub = ch.subscriber().unwrap();
2730 let err = exec
2731 .add(crate::item::item_with_triggers(
2732 move |d| {
2733 d.interval(Duration::from_millis(1));
2734 d.subscriber(&sub);
2735 Ok(())
2736 },
2737 |_| Ok(crate::ControlFlow::Continue),
2738 ))
2739 .expect_err("interval + subscriber must be rejected");
2740 match err {
2741 ExecutorError::DeclareTriggers(msg) => {
2742 assert!(
2743 msg.contains("cyclic") && msg.contains("event-driven"),
2744 "message must explain cyclic vs event-driven: {msg}"
2745 );
2746 assert!(
2747 msg.contains("split"),
2748 "message must suggest splitting into two tasks: {msg}"
2749 );
2750 }
2751 other => panic!("expected DeclareTriggers, got {other:?}"),
2752 }
2753 }
2754
2755 #[test]
2756 fn add_rejects_cyclic_plus_listener_regardless_of_mode() {
2757 use core::time::Duration;
2758 // The combination is ill-defined irrespective of DispatchMode (Legacy
2759 // is temporary), so Legacy must reject it too.
2760 let mut exec = Executor::builder()
2761 .worker_threads(0)
2762 .dispatch_mode(crate::DispatchMode::Legacy)
2763 .build()
2764 .unwrap();
2765 let ch = exec
2766 .channel::<Msg>("taktora.test.req0268.combo.legacy")
2767 .unwrap();
2768 let sub = ch.subscriber().unwrap();
2769 let err = exec
2770 .add(crate::item::item_with_triggers(
2771 move |d| {
2772 d.interval(Duration::from_millis(1));
2773 d.subscriber(&sub);
2774 Ok(())
2775 },
2776 |_| Ok(crate::ControlFlow::Continue),
2777 ))
2778 .expect_err("interval + subscriber must be rejected in Legacy too");
2779 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2780 }
2781
2782 #[test]
2783 fn add_accepts_multiple_intervals_and_single_kinds() {
2784 use core::time::Duration;
2785 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2786 // Multiple Interval decls: still cyclic-only, accepted.
2787 exec.add(crate::item::item_with_triggers(
2788 |d| {
2789 d.interval(Duration::from_millis(1));
2790 d.interval(Duration::from_millis(2));
2791 Ok(())
2792 },
2793 |_| Ok(crate::ControlFlow::Continue),
2794 ))
2795 .expect("multiple intervals accepted");
2796 // Single interval: accepted.
2797 exec.add(crate::item::item_with_triggers(
2798 |d| {
2799 d.interval(Duration::from_millis(1));
2800 Ok(())
2801 },
2802 |_| Ok(crate::ControlFlow::Continue),
2803 ))
2804 .expect("single interval accepted");
2805 // Multiple listeners (no interval): accepted.
2806 let ch = exec
2807 .channel::<Msg>("taktora.test.req0268.multi.listener")
2808 .unwrap();
2809 let sub_a = ch.subscriber().unwrap();
2810 let sub_b = ch.subscriber().unwrap();
2811 exec.add(crate::item::item_with_triggers(
2812 move |d| {
2813 d.subscriber(&sub_a);
2814 d.subscriber(&sub_b);
2815 Ok(())
2816 },
2817 |_| Ok(crate::ControlFlow::Continue),
2818 ))
2819 .expect("multiple listeners accepted");
2820 }
2821
2822 #[test]
2823 fn add_rejects_zero_period_interval() {
2824 use core::time::Duration;
2825 // A zero-period interval busy-spins the grid (next_timeout == 0 every
2826 // wake), so it must be rejected at add time.
2827 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2828 let err = exec
2829 .add(crate::item::item_with_triggers(
2830 |d| {
2831 d.interval(Duration::ZERO);
2832 Ok(())
2833 },
2834 |_| Ok(crate::ControlFlow::Continue),
2835 ))
2836 .expect_err("zero-period interval must be rejected");
2837 match err {
2838 ExecutorError::DeclareTriggers(msg) => {
2839 assert!(
2840 msg.contains("zero"),
2841 "message must mention the zero period: {msg}"
2842 );
2843 }
2844 other => panic!("expected DeclareTriggers, got {other:?}"),
2845 }
2846 }
2847
2848 #[test]
2849 fn add_chain_rejects_cyclic_plus_listener() {
2850 use core::time::Duration;
2851 // The chain path collects the head item's decls; the same validation
2852 // must apply there.
2853 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2854 let ch = exec
2855 .channel::<Msg>("taktora.test.req0268.chain.combo")
2856 .unwrap();
2857 let sub = ch.subscriber().unwrap();
2858 let err = exec
2859 .add_chain(vec![crate::item::item_with_triggers(
2860 move |d| {
2861 d.interval(Duration::from_millis(1));
2862 d.subscriber(&sub);
2863 Ok(())
2864 },
2865 |_| Ok(crate::ControlFlow::Continue),
2866 )])
2867 .expect_err("chain head interval + subscriber must be rejected");
2868 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2869 }
2870
2871 #[test]
2872 fn add_chain_rejects_zero_period_interval() {
2873 use core::time::Duration;
2874 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2875 let err = exec
2876 .add_chain(vec![crate::item::item_with_triggers(
2877 |d| {
2878 d.interval(Duration::ZERO);
2879 Ok(())
2880 },
2881 |_| Ok(crate::ControlFlow::Continue),
2882 )])
2883 .expect_err("chain head zero-period interval must be rejected");
2884 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2885 }
2886
2887 #[test]
2888 fn add_graph_rejects_cyclic_plus_listener() {
2889 use core::time::Duration;
2890 // The graph path collects the root vertex's decls into a grid-registered
2891 // TaskEntry; the same cyclic-XOR-event-driven validation must apply there
2892 // (REQ_0268). We use a real subscriber so the listener decl is genuine.
2893 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2894 let ch = exec
2895 .channel::<Msg>("taktora.test.req0268.graph.combo")
2896 .unwrap();
2897 let sub = ch.subscriber().unwrap();
2898 let mut g = exec.add_graph();
2899 let r = g.vertex(crate::item::item_with_triggers(
2900 move |d| {
2901 d.interval(Duration::from_millis(1));
2902 d.subscriber(&sub);
2903 Ok(())
2904 },
2905 |_| Ok(crate::ControlFlow::Continue),
2906 ));
2907 g.root(r);
2908 let err = g
2909 .build()
2910 .expect_err("graph root interval + subscriber must be rejected");
2911 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2912 }
2913
2914 #[test]
2915 fn add_graph_rejects_zero_period_interval() {
2916 use core::time::Duration;
2917 // A zero-period interval on the graph root busy-spins the grid, so the
2918 // graph path must reject it just like the single-item/chain paths.
2919 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2920 let mut g = exec.add_graph();
2921 let r = g.vertex(crate::item::item_with_triggers(
2922 |d| {
2923 d.interval(Duration::ZERO);
2924 Ok(())
2925 },
2926 |_| Ok(crate::ControlFlow::Continue),
2927 ));
2928 g.root(r);
2929 let err = g
2930 .build()
2931 .expect_err("graph root zero-period interval must be rejected");
2932 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2933 }
2934
2935 #[test]
2936 fn stopped_iteration_emits_no_cyclic_cycle_observation() {
2937 use core::time::Duration;
2938 use std::sync::atomic::AtomicU64;
2939
2940 // A CyclicClock that starts at 0 (epoch) then jumps far past the first
2941 // grid target, so the post-wait `take_due` finds the cyclic task due on
2942 // the very first (stopping) wake. Distinct from the telemetry clock
2943 // (scheduling role).
2944 struct JumpClock {
2945 calls: AtomicU64,
2946 }
2947 impl crate::CyclicClock for JumpClock {
2948 fn now_nanos(&self) -> u64 {
2949 // First read (grid epoch at loop entry) = 0; every later read
2950 // is well past the 1ms target.
2951 if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
2952 0
2953 } else {
2954 1_000_000_000
2955 }
2956 }
2957 }
2958
2959 // Observer that counts on_cycle_stats calls.
2960 struct Counter {
2961 cycles: AtomicU64,
2962 }
2963 impl Observer for Counter {
2964 fn on_cycle_stats(&self, _obs: &CycleObservation) {
2965 self.cycles.fetch_add(1, Ordering::SeqCst);
2966 }
2967 }
2968
2969 let counter = Arc::new(Counter {
2970 cycles: AtomicU64::new(0),
2971 });
2972 let mut exec = Executor::builder()
2973 .worker_threads(0)
2974 .dispatch_mode(crate::DispatchMode::Grid)
2975 .cyclic_clock(Arc::new(JumpClock {
2976 calls: AtomicU64::new(0),
2977 }))
2978 .observer(Arc::clone(&counter) as Arc<dyn Observer>)
2979 .build()
2980 .unwrap();
2981 exec.add(crate::item::item_with_triggers(
2982 |d| {
2983 d.interval(Duration::from_millis(1));
2984 Ok(())
2985 },
2986 |_| Ok(crate::ControlFlow::Continue),
2987 ))
2988 .unwrap();
2989
2990 // Stop BEFORE running: the WaitSet wakes immediately on the stop
2991 // listener; the grid target is already due (JumpClock). Without the
2992 // stop guard the post-wait cyclic pass would dispatch + record one
2993 // spurious cycle on this stopping iteration; with it, zero.
2994 exec.stoppable().stop();
2995 exec.run().expect("run returns cleanly after stop");
2996
2997 assert_eq!(
2998 counter.cycles.load(Ordering::SeqCst),
2999 0,
3000 "no cyclic cycle observation may be emitted on a stop wake"
3001 );
3002 }
3003
3004 #[test]
3005 fn custom_id_is_preserved() {
3006 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3007 let id = exec
3008 .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
3009 .unwrap();
3010 assert_eq!(id.as_str(), "my-task");
3011 }
3012
3013 #[test]
3014 fn add_persists_declared_budget() {
3015 use core::time::Duration;
3016 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3017 let task_id = exec
3018 .add(crate::item::item_with_triggers(
3019 |d| {
3020 d.interval(Duration::from_millis(10));
3021 d.budget(Duration::from_millis(5));
3022 Ok(())
3023 },
3024 |_| Ok(crate::ControlFlow::Continue),
3025 ))
3026 .unwrap();
3027 let entry = exec
3028 .tasks
3029 .iter()
3030 .find(|t| t.id == task_id)
3031 .expect("task present");
3032 assert_eq!(entry.budget, Some(Duration::from_millis(5)));
3033 }
3034
3035 #[test]
3036 fn scan_period_cached_for_cyclic_only() {
3037 use core::time::Duration;
3038 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3039 let cyclic = exec
3040 .add(crate::item::item_with_triggers(
3041 |d| {
3042 d.interval(Duration::from_millis(5));
3043 Ok(())
3044 },
3045 |_| Ok(crate::ControlFlow::Continue),
3046 ))
3047 .unwrap();
3048 let event_driven = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3049
3050 let cyclic_entry = exec
3051 .tasks
3052 .iter()
3053 .find(|t| t.id == cyclic)
3054 .expect("cyclic task present");
3055 assert_eq!(cyclic_entry.scan_period, Some(Duration::from_millis(5)));
3056 // Sentinel: no sample has been taken yet.
3057 assert_eq!(cyclic_entry.last_took_ns.load(Ordering::Relaxed), u64::MAX);
3058
3059 let event_entry = exec
3060 .tasks
3061 .iter()
3062 .find(|t| t.id == event_driven)
3063 .expect("event-driven task present");
3064 assert_eq!(event_entry.scan_period, None);
3065 }
3066
3067 #[test]
3068 fn cycle_stats_index_aligned_with_tasks() {
3069 use core::time::Duration;
3070 let mut exec = Executor::builder()
3071 .worker_threads(0)
3072 .stats_window(512)
3073 .build()
3074 .unwrap();
3075 // Builder option flows through to the executor.
3076 assert_eq!(exec.stats_window, 512);
3077 // No tasks yet → both Vecs empty and aligned.
3078 assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3079
3080 // Cyclic single-item add path.
3081 exec.add(crate::item::item_with_triggers(
3082 |d| {
3083 d.interval(Duration::from_millis(5));
3084 Ok(())
3085 },
3086 |_| Ok(crate::ControlFlow::Continue),
3087 ))
3088 .unwrap();
3089 // Event-driven single-item add path.
3090 exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3091
3092 assert_eq!(exec.tasks.len(), 2);
3093 assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3094 }
3095
3096 #[test]
3097 fn add_with_fault_handler_stores_handler_job() {
3098 use core::time::Duration;
3099 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3100 let task_id = exec
3101 .add_with_fault_handler(
3102 crate::item::item_with_triggers(
3103 |d| {
3104 d.interval(Duration::from_millis(10));
3105 d.budget(Duration::from_millis(5));
3106 Ok(())
3107 },
3108 |_| Ok(crate::ControlFlow::Continue),
3109 ),
3110 crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
3111 )
3112 .unwrap();
3113 let entry = exec
3114 .tasks
3115 .iter()
3116 .find(|t| t.id == task_id)
3117 .expect("task present");
3118 assert!(
3119 entry.handler_job.is_some(),
3120 "handler_job should be Some after add_with_fault_handler"
3121 );
3122 // Main job should still be present.
3123 assert!(entry.job.is_some(), "main job should still be present");
3124 }
3125
3126 #[test]
3127 fn declare_triggers_called_at_add_time() {
3128 let called = Arc::new(AtomicBool::new(false));
3129 let called_d = Arc::clone(&called);
3130
3131 let it = crate::item::item_with_triggers(
3132 move |_d| {
3133 called_d.store(true, Ordering::SeqCst);
3134 Ok(())
3135 },
3136 |_| Ok(ControlFlow::Continue),
3137 );
3138
3139 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3140 exec.add(it).unwrap();
3141 assert!(called.load(Ordering::SeqCst));
3142 }
3143
3144 #[test]
3145 fn clear_task_fault_errors_on_running_task() {
3146 use core::time::Duration;
3147 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3148 let task_id = exec
3149 .add(crate::item::item_with_triggers(
3150 |d| {
3151 d.interval(Duration::from_millis(10));
3152 Ok(())
3153 },
3154 |_| Ok(crate::ControlFlow::Continue),
3155 ))
3156 .unwrap();
3157 // Task starts in Running state — clearing should error.
3158 let err = exec.clear_task_fault(task_id).expect_err("not faulted");
3159 assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
3160 }
3161
3162 #[test]
3163 fn clear_executor_fault_errors_on_running_executor() {
3164 let exec = Executor::builder().worker_threads(0).build().unwrap();
3165 let err = exec.clear_executor_fault().expect_err("not faulted");
3166 assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
3167 }
3168
3169 #[test]
3170 fn overrun_count_returns_zero_for_new_task() {
3171 use core::time::Duration;
3172 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3173 let task_id = exec
3174 .add(crate::item::item_with_triggers(
3175 |d| {
3176 d.interval(Duration::from_millis(10));
3177 d.budget(Duration::from_millis(5));
3178 Ok(())
3179 },
3180 |_| Ok(crate::ControlFlow::Continue),
3181 ))
3182 .unwrap();
3183 assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
3184 }
3185
3186 #[test]
3187 fn overrun_count_errors_for_unknown_task() {
3188 let exec = Executor::builder().worker_threads(0).build().unwrap();
3189 let err = exec
3190 .overrun_count(crate::TaskId::new("nope"))
3191 .expect_err("unknown task");
3192 assert!(matches!(err, ExecutorError::TaskNotFound(_)));
3193 }
3194
3195 #[test]
3196 fn task_fault_state_starts_running() {
3197 use core::time::Duration;
3198 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3199 let task_id = exec
3200 .add(crate::item::item_with_triggers(
3201 |d| {
3202 d.interval(Duration::from_millis(10));
3203 Ok(())
3204 },
3205 |_| Ok(crate::ControlFlow::Continue),
3206 ))
3207 .unwrap();
3208 assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
3209 }
3210
3211 #[test]
3212 fn executor_fault_state_starts_running() {
3213 let exec = Executor::builder().worker_threads(0).build().unwrap();
3214 assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
3215 }
3216
3217 // --- on_fatal / FatalDispatch integration tests ---
3218
3219 #[test]
3220 fn build_without_on_fatal_succeeds() {
3221 use crate::fatal::{FatalContext, FatalSite};
3222 use std::sync::{Arc, Mutex};
3223 // Default builder (no on_fatal) must build successfully.
3224 let exec = Executor::builder().worker_threads(0).build().unwrap();
3225 // The fatal_dispatch field is present; fire via a test terminal to
3226 // confirm the no-op handler doesn't blow up.
3227 let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3228 let reached2 = Arc::clone(&reached);
3229 let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3230 exec.fatal_dispatch.handler().clone(),
3231 move |_| {
3232 *reached2.lock().unwrap() = true;
3233 },
3234 );
3235 test_dispatch.fire(&FatalContext {
3236 cause: "test".to_string(),
3237 site: FatalSite::PoolWorker,
3238 });
3239 assert!(*reached.lock().unwrap(), "terminal not reached");
3240 }
3241
3242 #[test]
3243 fn on_fatal_handler_is_stored_and_invoked() {
3244 use crate::fatal::{FatalContext, FatalSite};
3245 use std::sync::{Arc, Mutex};
3246 let called: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3247 let called2 = Arc::clone(&called);
3248 let exec = Executor::builder()
3249 .worker_threads(0)
3250 .on_fatal(move |ctx| {
3251 called2.lock().unwrap().push(ctx.cause.clone());
3252 })
3253 .build()
3254 .unwrap();
3255 // Verify the handler fires via a test terminal.
3256 let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3257 let reached2 = Arc::clone(&reached);
3258 let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3259 exec.fatal_dispatch.handler().clone(),
3260 move |_| {
3261 *reached2.lock().unwrap() = true;
3262 },
3263 );
3264 test_dispatch.fire(&FatalContext {
3265 cause: "my-cause".to_string(),
3266 site: FatalSite::ExecutorRunLoop,
3267 });
3268 assert!(*reached.lock().unwrap(), "terminal not reached");
3269 let log = called.lock().unwrap().clone();
3270 assert_eq!(
3271 log,
3272 vec!["my-cause"],
3273 "handler should have been called with cause"
3274 );
3275 }
3276}