taktora_executor/executor.rs
1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::clock::{MonotonicClock, SystemClock};
10use crate::context::Stoppable;
11use crate::error::ExecutorError;
12use crate::fatal::{FatalDispatch, FatalHandler, FatalSite, guard_or_fatal, panic_payload_message};
13use crate::fault::{
14 ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
15 FaultState, duration_to_ms_sat, instant_to_since_ms,
16};
17use crate::item::ExecutableItem;
18use crate::monitor::{ExecutionMonitor, NoopMonitor};
19use crate::observer::{NoopObserver, Observer};
20use crate::payload::Payload;
21use crate::pool::Pool;
22use crate::stats::{CycleObservation, StatsSnapshot, TaskStatsEntry};
23use crate::task_id::TaskId;
24use crate::task_kind::TaskKind;
25use crate::thread_attrs::ThreadAttributes;
26use crate::trigger::{TriggerDecl, TriggerDeclarer};
27use core::sync::atomic::AtomicU32;
28use iceoryx2::node::Node;
29use iceoryx2::port::listener::Listener as IxListener;
30use iceoryx2::prelude::ipc;
31use iceoryx2::prelude::*;
32use iceoryx2::waitset::WaitSetRunResult;
33use std::sync::Arc;
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
36use std::time::{Duration, Instant};
37use taktora_stats::ExecutorCycleStats;
38
39/// Monotonically increasing counter so multiple executors in the same process
40/// each get a unique stop-event service name.
41static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
42
43/// Executor histogram segment count (`S`) and exact-window length (`W`) for
44/// per-task cycle stats. Fixed at compile time per `ADR_0060`.
45pub(crate) type TaskCycleStats = ExecutorCycleStats<8, 256>;
46
47/// A single wakeup's pending cycle record, stashed on the [`TaskEntry`] between
48/// the pre-dispatch capture and the post-barrier fold. Bundling the pre-dispatch
49/// timestamp with its `faulted` flag in one `Option` makes them impossible to
50/// desync: a cycle is pending iff this is `Some`, and the `faulted` bit is then
51/// always the one captured at the same wakeup (`REQ_0107`).
52#[derive(Clone, Copy)]
53pub(crate) struct CyclePending {
54 /// Pre-dispatch timestamp for this wakeup (the cycle's `pre`), in
55 /// telemetry-clock nanoseconds (see [`MonotonicClock`]).
56 pub(crate) pre: u64,
57 /// `true` when this wakeup's scan was fault-routed/skipped, so the
58 /// post-barrier fold records it with `faulted=true`.
59 pub(crate) faulted: bool,
60}
61
62/// One registered task entry.
63pub(crate) struct TaskEntry {
64 /// Task identifier.
65 pub(crate) id: TaskId,
66 /// The kind of work this entry holds (single item or chain).
67 pub(crate) kind: TaskKind,
68 /// Trigger declarations recorded at `add` time.
69 pub(crate) decls: Vec<TriggerDecl>,
70 /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
71 /// time and re-invoked on every dispatch iteration via
72 /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
73 /// that `Pool::submit<F>` requires in threaded mode. Required for
74 /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
75 /// `TaskKind::Graph`, which dispatches its vertices via a separate
76 /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
77 pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
78
79 /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
80 /// means no per-task check; the executor-wide iteration budget
81 /// still applies. `REQ_0070`.
82 pub(crate) budget: Option<Duration>,
83
84 /// Per-task fault state. Wait-free read on the dispatch hot path.
85 /// Wrapped in `Arc` so dispatch closures built at `add` time can
86 /// capture an owning handle into the same atomic the `TaskEntry`
87 /// holds — `Arc::clone` is refcount-only, so this stays compatible
88 /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
89 pub(crate) fault: Arc<FaultAtomic>,
90
91 /// Monotonic per-task overrun counter. Increments on EVERY budget
92 /// breach, including breaches while already `Faulted`. Never reset
93 /// by clearing the fault. Shared with the dispatch closure via
94 /// `Arc::clone`. `REQ_0102`.
95 pub(crate) overrun_count: Arc<AtomicU64>,
96
97 /// Pre-built dispatch closure for the fault-handler item. Mirrors
98 /// `job`. `None` means no handler — the task is simply skipped
99 /// during fault. `REQ_0072`.
100 pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
101
102 /// Declared scan period for cyclic tasks (the `TriggerDecl::Interval`
103 /// duration), or `None` for event-driven tasks. Cached at add time so the
104 /// dispatch loop reads it without scanning `decls` per cycle. Gates cycle
105 /// telemetry: only cyclic tasks participate (`REQ_0106`).
106 pub(crate) scan_period: Option<Duration>,
107 /// Last-cycle execute duration in ns, written by the dispatch closure on
108 /// the pool worker and read by the `WaitSet` thread after `barrier()`.
109 /// Shared via `Arc` exactly like `overrun_count`. Sentinel `u64::MAX` =
110 /// "no sample this cycle" (the closure never ran — e.g. a faulted scan).
111 pub(crate) last_took_ns: Arc<AtomicU64>,
112
113 /// WaitSet-thread-only timestamp of this task's previous dispatch, for
114 /// computing `actual_period` (`REQ_0101`). Not shared (no atomic) — only the
115 /// single dispatch thread touches it. `None` before the first dispatch.
116 /// Telemetry-clock nanoseconds (see [`MonotonicClock`]).
117 pub(crate) last_dispatch: Option<u64>,
118
119 /// WaitSet-thread-only running grid-slot index for deadline lateness
120 /// (`REQ_0106`). Counts nominal periods elapsed since the grid epoch,
121 /// advancing one slot per cycle under steady drift and several at once
122 /// across a coalesced/missed wakeup — decoupled from `cycle_index` so a
123 /// transient hiccup re-anchors the grid instead of biasing it forever.
124 /// Starts at `0` (the first cycle is on its own grid point by definition).
125 pub(crate) grid_slot: u64,
126
127 /// WaitSet-thread-only stash of the *current* wakeup's pending cycle —
128 /// the pre-dispatch timestamp plus its `faulted` flag — carried across
129 /// `pool.barrier()` so the post-barrier record pass can fold this cycle's
130 /// telemetry without re-reading the clock or allocating a fired-index list.
131 /// `Some` between the pre-dispatch capture and the post-barrier
132 /// `record_cycle_for` `take`; `None` otherwise. Bundling the timestamp and
133 /// the fault flag in one `Option` keeps them from ever desyncing
134 /// (`REQ_0107`). Only the single dispatch thread touches it (no atomic).
135 pub(crate) pending_cycle: Option<CyclePending>,
136}
137
138/// Top-level executor. One per process is the typical case.
139pub struct Executor {
140 pub(crate) node: Node<ipc::Service>,
141 pub(crate) pool: Arc<Pool>,
142 pub(crate) tasks: Vec<TaskEntry>,
143 /// One cycle-stats aggregator per registered task, index-aligned with
144 /// `tasks`. Pushed at task-add time (before `run`), so no steady-state
145 /// allocation (`REQ_0060`, `REQ_0104`). Updated single-writer on the
146 /// `WaitSet` thread (Task 6).
147 pub(crate) cycle_stats: Vec<TaskCycleStats>,
148 /// Histogram sliding-window size in samples (`REQ_0100`).
149 pub(crate) stats_window: u32,
150 pub(crate) running: Arc<AtomicBool>,
151 pub(crate) stoppable: Stoppable,
152 pub(crate) next_id: AtomicU64,
153 /// Listener for the internal stop event service. Held here so it outlives
154 /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
155 /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
156 pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
157 /// Lifecycle observer. Defaults to a no-op.
158 pub(crate) observer: Arc<dyn Observer>,
159 /// Execution monitor. Defaults to a no-op.
160 pub(crate) monitor: Arc<dyn ExecutionMonitor>,
161 /// Per-iteration error capture slot — allocated once at build time and
162 /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
163 /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
164 /// the per-iteration heap allocation that the previous design incurred.
165 /// Required for `REQ_0060`.
166 pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
167 /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
168 /// `None` means no executor-wide check.
169 pub(crate) iteration_budget: Option<Duration>,
170 /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
171 /// closure can hold an owning handle without re-borrowing through
172 /// `self`. `REQ_0071`.
173 pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
174
175 /// Index of the task whose `execute()` overran when the executor
176 /// transitioned to `Faulted`. Read alongside `exec_fault`.
177 pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
178
179 /// Budget that was breached when the executor transitioned to
180 /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
181 pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
182
183 /// Executor start time, set on first dispatch. Used to compute
184 /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
185 /// in `Arc` so dispatch closures share the same `OnceLock` with the
186 /// executor — `get_or_init` is idempotent and wait-free.
187 pub(crate) start_time: Arc<OnceLock<Instant>>,
188
189 /// Fatal-dispatch handle. Called once on the fail-fast path from the
190 /// executor-thread run-loop boundary; the pool holds a separate
191 /// `Arc::clone` for its own worker / inline-submit boundaries.
192 pub(crate) fatal_dispatch: Arc<FatalDispatch>,
193
194 /// Telemetry time source (`REQ_0101`/`REQ_0105`/`REQ_0106`). Read on the
195 /// worker (for `took`) and the `WaitSet` thread (for `pre`); defaults to
196 /// [`SystemClock`]. A test can substitute a [`MockClock`] via
197 /// [`ExecutorBuilder::clock`] for deterministic timing assertions. Affects
198 /// only telemetry — never scheduling or fault behaviour.
199 pub(crate) clock: Arc<dyn MonotonicClock>,
200
201 /// Lateness grid epoch in telemetry-clock nanoseconds (`REQ_0106`): the
202 /// `pre` of this executor's first recorded cyclic dispatch. Grid point `n`
203 /// is `grid_epoch + n * period`. Set once (lazily) on the `WaitSet` thread;
204 /// shared as an `Arc` so the dispatch loop and `record_cycle_for` see the
205 /// same `OnceLock`.
206 pub(crate) grid_epoch: Arc<OnceLock<u64>>,
207
208 /// Cyclic dispatch timing strategy (`REQ_0268` / `ADR_0100`). Read once at
209 /// `dispatch_loop` entry and hoisted to a local, so steady-state cost is a
210 /// single `Copy`-enum compare per cycle. Defaults to
211 /// [`DispatchMode::Grid`](crate::DispatchMode).
212 pub(crate) dispatch_mode: crate::DispatchMode,
213
214 /// Scheduling time source for the absolute grid (`REQ_0268`). Distinct from
215 /// [`Executor::clock`] (telemetry): a telemetry mock can never alter
216 /// dispatch timing. Defaults to
217 /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock).
218 pub(crate) cyclic_clock: std::sync::Arc<dyn crate::CyclicClock>,
219}
220
221// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
222// `SingleThreaded` reason as `IxNotifier`. After construction, the only
223// per-iteration call is `listener.try_wait_one()`, which does not mutate the
224// Rc. `Executor` is never shared across threads (it requires `&mut self` for
225// `run()`), so there is no aliased concurrent mutation.
226#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
227unsafe impl Send for Executor {}
228
229impl Executor {
230 /// Start a new builder.
231 #[must_use]
232 pub fn builder() -> ExecutorBuilder {
233 ExecutorBuilder::default()
234 }
235
236 /// Open or create a pub/sub channel bound to this executor's node.
237 pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
238 Channel::open_or_create(&self.node, name)
239 }
240
241 /// Open or create a request/response service bound to this executor's node.
242 pub fn service<Req, Resp>(
243 &mut self,
244 name: &str,
245 ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
246 where
247 Req: Payload,
248 Resp: Payload,
249 {
250 crate::Service::open_or_create(&self.node, name)
251 }
252
253 /// Borrowed snapshot of every task's cycle aggregates (`REQ_0103` pull
254 /// path). Relaxed reads; never blocks the dispatch writer.
255 #[must_use]
256 pub fn stats_snapshot(&self) -> StatsSnapshot {
257 let per_task = self
258 .tasks
259 .iter()
260 .zip(self.cycle_stats.iter())
261 .map(|(t, s)| {
262 let snap = s.snapshot();
263 TaskStatsEntry {
264 task_id: t.id.clone(),
265 p50_ns: snap.p50_ns,
266 p95_ns: snap.p95_ns,
267 p99_ns: snap.p99_ns,
268 min_ns: snap.min_ns,
269 max_ns: snap.max_ns,
270 max_jitter_ns: snap.max_jitter_ns,
271 max_lateness_ns: snap.max_lateness_ns,
272 overrun_count: t.overrun_count.load(Ordering::Acquire),
273 }
274 })
275 .collect();
276 StatsSnapshot { per_task }
277 }
278
279 /// Add an item to the executor with an auto-generated id.
280 pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
281 let id = TaskId::new(format!(
282 "task-{}",
283 self.next_id.fetch_add(1, Ordering::SeqCst)
284 ));
285 self.add_with_id(id, item)
286 }
287
288 /// Add an item with a user-supplied id.
289 ///
290 /// The item's [`ExecutableItem::task_id`] override takes precedence over
291 /// the caller-supplied `id`, which itself takes precedence over the
292 /// auto-generated id assigned by [`Executor::add`].
293 pub fn add_with_id(
294 &mut self,
295 id: impl Into<TaskId>,
296 mut item: impl ExecutableItem,
297 ) -> Result<TaskId, ExecutorError> {
298 let id_arg: TaskId = id.into();
299 // The item's `task_id()` override wins over the user-supplied id.
300 let id = item.task_id().map_or(id_arg, TaskId::new);
301 let mut declarer = TriggerDeclarer::new_internal();
302 item.declare_triggers(&mut declarer)?;
303 let budget = declarer.budget;
304 let decls = declarer.into_decls();
305
306 // REQ_0268: reject ill-defined trigger shapes (cyclic+event, zero
307 // period) before the task joins the table — the natural validation
308 // point, where the decls are first available, for every DispatchMode.
309 validate_decls(&id, &decls)?;
310
311 let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
312 let app_id = item_box.app_id();
313 let app_inst = item_box.app_instance_id();
314 // SAFETY: the raw pointer points into the heap allocation of
315 // `item_box`. `Box` keeps that allocation at a stable address even
316 // when the `Box` itself is moved (e.g. when `self.tasks` grows),
317 // so the pointer remains valid for the lifetime of the
318 // `TaskEntry`. See SendItemPtr safety doc for the rest of the
319 // discipline (barrier() pairs with worker access).
320 #[allow(unsafe_code)]
321 let item_ptr =
322 SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
323
324 // Allocate the per-task atomics now so the dispatch closure
325 // and the `TaskEntry` share the same `Arc` storage. The task
326 // will occupy `self.tasks.len()` after the push below — capture
327 // that index up front for `task_idx_u32`. Bounded workspace, so
328 // the `as u32` cast is sound; explicit allow keeps clippy quiet.
329 let task_fault = Arc::new(FaultAtomic::new());
330 let overrun_count = Arc::new(AtomicU64::new(0));
331 let scan_period = scan_period_from_decls(&decls);
332 let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
333 #[allow(clippy::cast_possible_truncation)]
334 let task_idx_u32 = self.tasks.len() as u32;
335 let fault_ctx = FaultDispatchCtx {
336 task_budget: budget,
337 task_fault: Arc::clone(&task_fault),
338 overrun_count: Arc::clone(&overrun_count),
339 iteration_budget: self.iteration_budget,
340 exec_fault: Arc::clone(&self.exec_fault),
341 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
342 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
343 task_idx_u32,
344 exec_start: Arc::clone(&self.start_time),
345 observer: Arc::clone(&self.observer),
346 };
347
348 let job = build_single_job(
349 id.clone(),
350 self.stoppable.clone(),
351 Arc::clone(&self.observer),
352 Arc::clone(&self.monitor),
353 Arc::clone(&self.iter_err),
354 app_id,
355 app_inst,
356 item_ptr,
357 fault_ctx,
358 Arc::clone(&last_took_ns),
359 Arc::clone(&self.clock),
360 );
361
362 self.tasks.push(TaskEntry {
363 id: id.clone(),
364 kind: TaskKind::Single(item_box),
365 decls,
366 job: Some(job),
367 budget,
368 fault: task_fault,
369 overrun_count,
370 handler_job: None,
371 scan_period,
372 last_took_ns: Arc::clone(&last_took_ns),
373 last_dispatch: None,
374 grid_slot: 0,
375 pending_cycle: None,
376 });
377 self.cycle_stats
378 .push(TaskCycleStats::new(self.stats_window));
379 Ok(id)
380 }
381
382 /// Register an item plus a fault-handler item.
383 ///
384 /// The main item is registered through the canonical [`add`](Self::add)
385 /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
386 /// is called (so handlers that internally rely on the declarer being
387 /// invoked observe the call) but its returned trigger list is
388 /// **ignored** — the handler dispatches on the main item's triggers
389 /// while the task is in `Faulted` state and runs in place of the main
390 /// item's `execute()`. The pre-built handler dispatch closure is
391 /// stashed on the same task entry as the main item's `job`,
392 /// satisfying `REQ_0072`.
393 ///
394 /// # Errors
395 ///
396 /// Propagates any error from registering the main item via `add`, or
397 /// from the handler's `declare_triggers` call.
398 ///
399 /// # Panics
400 ///
401 /// Panics if the task entry just inserted by [`add`](Self::add) cannot
402 /// be located in `self.tasks` — this is unreachable by construction
403 /// and indicates a logic bug.
404 pub fn add_with_fault_handler<I, H>(
405 &mut self,
406 main: I,
407 handler: H,
408 ) -> Result<TaskId, ExecutorError>
409 where
410 I: ExecutableItem,
411 H: ExecutableItem,
412 {
413 let task_id = self.add(main)?;
414
415 // Drain the handler's trigger declarations — they are ignored by
416 // design (the handler runs on the main item's triggers).
417 let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
418 let mut throwaway = TriggerDeclarer::new_internal();
419 handler_box.declare_triggers(&mut throwaway)?;
420 drop(throwaway);
421
422 let app_id = handler_box.app_id();
423 let app_inst = handler_box.app_instance_id();
424
425 // Locate the task we just added so we can share its per-task
426 // atomics with the handler's `FaultDispatchCtx`. The handler
427 // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
428 // breach increments `overrun_count` and keeps state `Faulted`
429 // without re-firing the observer.
430 let task_idx = self
431 .tasks
432 .iter()
433 .position(|t| t.id == task_id)
434 .expect("just added; must exist");
435 let task = &self.tasks[task_idx];
436 #[allow(clippy::cast_possible_truncation)]
437 let task_idx_u32 = task_idx as u32;
438 let handler_fault_ctx = FaultDispatchCtx {
439 task_budget: task.budget,
440 task_fault: Arc::clone(&task.fault),
441 overrun_count: Arc::clone(&task.overrun_count),
442 iteration_budget: self.iteration_budget,
443 exec_fault: Arc::clone(&self.exec_fault),
444 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
445 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
446 task_idx_u32,
447 exec_start: Arc::clone(&self.start_time),
448 observer: Arc::clone(&self.observer),
449 };
450
451 let handler_closure = build_handler_job(
452 task_id.clone(),
453 self.stoppable.clone(),
454 Arc::clone(&self.observer),
455 Arc::clone(&self.monitor),
456 Arc::clone(&self.iter_err),
457 app_id,
458 app_inst,
459 handler_box,
460 handler_fault_ctx,
461 );
462
463 self.tasks[task_idx].handler_job = Some(handler_closure);
464
465 Ok(task_id)
466 }
467
468 /// Clear a per-task fault. Returns the previous `FaultState`.
469 /// Fires `Observer::on_task_clear` if the state changed from
470 /// `Faulted` to `Running`. `REQ_0070`.
471 ///
472 /// # Errors
473 ///
474 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
475 /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
476 pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
477 let entry = self
478 .tasks
479 .iter()
480 .find(|t| t.id == task)
481 .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
482 let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
483 let prev = entry.fault.swap(FaultState::Running, budget_ms);
484 match prev {
485 FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
486 FaultState::Faulted { .. } => {
487 self.observer.on_task_clear(task);
488 Ok(prev)
489 }
490 }
491 }
492
493 /// Clear the executor-wide fault and cascade-clear every task whose
494 /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
495 /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
496 /// breach is independent). Fires `Observer::on_executor_clear` and
497 /// one `Observer::on_task_clear` per cascade-cleared task.
498 /// `REQ_0071`.
499 ///
500 /// # Errors
501 ///
502 /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
503 pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
504 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
505 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
506 let prev = self
507 .exec_fault
508 .swap(ExecutorFaultState::Running, task_idx, budget_ms);
509 match prev {
510 ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
511 ExecutorFaultState::Faulted { .. } => {
512 // Cascade-clear tasks whose reason is ExecutorFaulted.
513 for entry in &self.tasks {
514 let task_budget_ms =
515 entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
516 if let FaultState::Faulted {
517 reason: FaultReason::ExecutorFaulted,
518 ..
519 } = entry.fault.load(task_budget_ms)
520 {
521 let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
522 self.observer.on_task_clear(entry.id.clone());
523 }
524 }
525 self.observer.on_executor_clear();
526 Ok(prev)
527 }
528 }
529 }
530
531 /// Return the per-task overrun counter — number of times the task's
532 /// `execute()` exceeded its budget over the executor's lifetime.
533 /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
534 ///
535 /// # Errors
536 ///
537 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
538 pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
539 self.tasks
540 .iter()
541 .find(|t| t.id == task)
542 .map(|t| t.overrun_count.load(Ordering::Acquire))
543 .ok_or_else(|| ExecutorError::TaskNotFound(task))
544 }
545
546 /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
547 ///
548 /// # Errors
549 ///
550 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
551 pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
552 self.tasks
553 .iter()
554 .find(|t| t.id == task)
555 .map(|t| {
556 let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
557 t.fault.load(budget_ms)
558 })
559 .ok_or_else(|| ExecutorError::TaskNotFound(task))
560 }
561
562 /// Return a snapshot of the executor-wide `ExecutorFaultState`.
563 /// `REQ_0073` (pull path).
564 #[must_use]
565 pub fn executor_fault_state(&self) -> ExecutorFaultState {
566 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
567 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
568 self.exec_fault.load(task_idx, budget_ms)
569 }
570
571 /// Add a sequential chain of items. Only the head item's
572 /// `declare_triggers` is consulted; non-head triggers are ignored with a
573 /// tracing warn.
574 pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
575 where
576 I: ExecutableItem,
577 C: IntoIterator<Item = I>,
578 {
579 let id = TaskId::new(format!(
580 "chain-{}",
581 self.next_id.fetch_add(1, Ordering::SeqCst)
582 ));
583 let boxed: Vec<Box<dyn ExecutableItem>> = items
584 .into_iter()
585 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
586 .collect();
587 self.add_chain_with_id_boxed(id, boxed)
588 }
589
590 /// Like [`Executor::add_chain`] but with a user-supplied id.
591 pub fn add_chain_with_id<I, C>(
592 &mut self,
593 id: impl Into<TaskId>,
594 items: C,
595 ) -> Result<TaskId, ExecutorError>
596 where
597 I: ExecutableItem,
598 C: IntoIterator<Item = I>,
599 {
600 let boxed: Vec<Box<dyn ExecutableItem>> = items
601 .into_iter()
602 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
603 .collect();
604 self.add_chain_with_id_boxed(id.into(), boxed)
605 }
606
607 fn add_chain_with_id_boxed(
608 &mut self,
609 id: TaskId,
610 mut items: Vec<Box<dyn ExecutableItem>>,
611 ) -> Result<TaskId, ExecutorError> {
612 if items.is_empty() {
613 return Err(ExecutorError::Builder(
614 "chain must contain at least one item".into(),
615 ));
616 }
617
618 // Head item's `task_id()` override wins over the user-supplied id.
619 let id = items[0].task_id().map_or(id, TaskId::new);
620
621 // Head's triggers gate the chain.
622 let mut head_declarer = TriggerDeclarer::new_internal();
623 items[0].declare_triggers(&mut head_declarer)?;
624 let decls = head_declarer.into_decls();
625
626 // REQ_0268: same trigger-shape validation as the single-item path,
627 // applied to the head item's decls (which gate the whole chain).
628 validate_decls(&id, &decls)?;
629
630 // Warn if non-head items declared triggers (those will be ignored).
631 for (i, body) in items.iter_mut().enumerate().skip(1) {
632 let mut spurious = TriggerDeclarer::new_internal();
633 let _ = body.declare_triggers(&mut spurious);
634 if !spurious.is_empty() {
635 #[cfg(feature = "tracing")]
636 tracing::warn!(
637 target: "taktora-executor",
638 task = %id,
639 position = i,
640 "non-head chain item declared triggers; they will be ignored"
641 );
642 #[cfg(not(feature = "tracing"))]
643 {
644 let _ = i;
645 }
646 }
647 }
648
649 let mut items = items;
650 // SAFETY: pointer into the chain's `items` Vec. The Vec lives
651 // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
652 // is stable once `add_chain` returns — `self.tasks` may grow
653 // (moving the `Vec<Box<...>>` header itself), but the Vec's
654 // heap buffer is referenced via the header's data pointer and
655 // is unaffected by header moves. We never resize the chain Vec
656 // after this point. See SendChainPtr safety doc for the rest.
657 #[allow(unsafe_code)]
658 let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
659 &mut items,
660 ));
661 // NB: the pointer above is to the local `items` Vec on the
662 // stack — it's invalid after the `push` below moves items into
663 // the TaskEntry. We rederive a stable pointer after the push.
664 // (See the rebuild step below.)
665 let _ = chain_ptr;
666
667 // Pre-allocate the per-task atomics so the chain's dispatch
668 // closure can capture clones of the same `Arc`s the `TaskEntry`
669 // holds. The chain occupies `self.tasks.len()` after the push.
670 let task_fault = Arc::new(FaultAtomic::new());
671 let overrun_count = Arc::new(AtomicU64::new(0));
672 let scan_period = scan_period_from_decls(&decls);
673 let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
674 #[allow(clippy::cast_possible_truncation)]
675 let task_idx_u32 = self.tasks.len() as u32;
676
677 self.tasks.push(TaskEntry {
678 id: id.clone(),
679 kind: TaskKind::Chain(items),
680 decls,
681 job: None, // populated in the rebuild step below
682 // TODO(post-Task-10): chain budgets carried separately; for now None.
683 budget: None,
684 fault: Arc::clone(&task_fault),
685 overrun_count: Arc::clone(&overrun_count),
686 handler_job: None,
687 scan_period,
688 last_took_ns: Arc::clone(&last_took_ns),
689 last_dispatch: None,
690 grid_slot: 0,
691 pending_cycle: None,
692 });
693 self.cycle_stats
694 .push(TaskCycleStats::new(self.stats_window));
695
696 // After the push, the TaskEntry lives at a stable position in
697 // `self.tasks` for the duration of this `add_chain_with_id_boxed`
698 // call. Take a stable pointer to its chain Vec and build the
699 // dispatch closure. If `self.tasks` later grows, the Vec header
700 // inside the TaskEntry moves but the header's data pointer
701 // (which addresses the chain's heap buffer) does not — and the
702 // closure derefs that pointer per dispatch, so it re-reads the
703 // current heap address each time. Sound under the same
704 // discipline as `tasks_ptr` in dispatch_loop.
705 let task_idx = self.tasks.len() - 1;
706 let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
707 {
708 TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
709 // The push above used TaskKind::Chain, so this arm is
710 // unreachable. Mark it explicitly to satisfy `match`.
711 _ => unreachable!("just-pushed task is TaskKind::Chain"),
712 };
713 #[allow(unsafe_code)]
714 let chain_ptr = SendChainPtr::new(chain_vec_ptr);
715 let fault_ctx = FaultDispatchCtx {
716 task_budget: None, // chain budgets are intentionally None for now
717 task_fault,
718 overrun_count,
719 iteration_budget: self.iteration_budget,
720 exec_fault: Arc::clone(&self.exec_fault),
721 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
722 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
723 task_idx_u32,
724 exec_start: Arc::clone(&self.start_time),
725 observer: Arc::clone(&self.observer),
726 };
727 let job = build_chain_job(
728 id.clone(),
729 self.stoppable.clone(),
730 Arc::clone(&self.observer),
731 Arc::clone(&self.monitor),
732 Arc::clone(&self.iter_err),
733 chain_ptr,
734 fault_ctx,
735 Arc::clone(&last_took_ns),
736 Arc::clone(&self.clock),
737 );
738 self.tasks[task_idx].job = Some(job);
739 Ok(id)
740 }
741
742 /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
743 /// executor is built. Clone before calling `run()` — any clone taken at any
744 /// time will wake the `WaitSet` when `stop()` is called.
745 #[must_use]
746 pub fn stoppable(&self) -> Stoppable {
747 self.stoppable.clone()
748 }
749
750 /// Borrow the underlying iceoryx2 node (escape hatch for power users).
751 pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
752 &self.node
753 }
754
755 /// Begin building a graph. Call `.build()` on the returned builder to
756 /// register the graph as a task.
757 pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
758 ExecutorGraphBuilder {
759 executor: self,
760 builder: crate::graph::GraphBuilder::new(),
761 custom_id: None,
762 }
763 }
764}
765
766/// Builder for [`Executor`].
767pub struct ExecutorBuilder {
768 worker_threads: Option<usize>,
769 observer: Option<Arc<dyn Observer>>,
770 monitor: Option<Arc<dyn ExecutionMonitor>>,
771 worker_attrs: ThreadAttributes,
772 /// Executor-wide iteration budget (`REQ_0071`). `None` means no
773 /// executor-wide check.
774 iteration_budget: Option<Duration>,
775 /// User-supplied fatal handler. `None` → resolved to a no-op `Arc` in
776 /// `build()`.
777 fatal_handler: Option<FatalHandler>,
778 /// Sliding-window size (samples) for cycle-stats aggregation
779 /// (`REQ_0100`). `None` → resolved to `1024` in `build()`.
780 stats_window: Option<u32>,
781 /// Telemetry time source. `None` → resolved to [`SystemClock`] in
782 /// `build()`. Override with a [`MockClock`](crate::MockClock) for
783 /// deterministic timing tests.
784 clock: Option<Arc<dyn MonotonicClock>>,
785 /// Cyclic dispatch timing strategy (`REQ_0268`). Default
786 /// [`DispatchMode::Grid`](crate::DispatchMode).
787 dispatch_mode: crate::DispatchMode,
788 /// Scheduling clock for the absolute grid. `None` → resolved to
789 /// [`MonotonicCyclicClock`](crate::MonotonicCyclicClock) in `build()`.
790 cyclic_clock: Option<std::sync::Arc<dyn crate::CyclicClock>>,
791}
792
793impl Default for ExecutorBuilder {
794 fn default() -> Self {
795 Self {
796 worker_threads: None,
797 observer: None,
798 monitor: None,
799 worker_attrs: ThreadAttributes::new(),
800 iteration_budget: None,
801 fatal_handler: None,
802 stats_window: None,
803 clock: None,
804 dispatch_mode: crate::DispatchMode::default(),
805 cyclic_clock: None,
806 }
807 }
808}
809
810impl ExecutorBuilder {
811 /// Number of worker threads. `0` → inline (no pool). Default → physical
812 /// cores.
813 #[must_use]
814 pub const fn worker_threads(mut self, n: usize) -> Self {
815 self.worker_threads = Some(n);
816 self
817 }
818
819 /// Attach a lifecycle observer. If not called, a no-op observer is used.
820 #[must_use]
821 pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
822 self.observer = Some(obs);
823 self
824 }
825
826 /// Attach an execution monitor. If not called, a no-op monitor is used.
827 #[must_use]
828 pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
829 self.monitor = Some(mon);
830 self
831 }
832
833 /// Configure the executor-wide iteration budget. Any task whose
834 /// `execute()` exceeds `dur` transitions the executor to `Faulted`
835 /// (`REQ_0071`). Default: unset (no executor-wide check).
836 #[must_use]
837 pub const fn iteration_budget(mut self, dur: Duration) -> Self {
838 self.iteration_budget = Some(dur);
839 self
840 }
841
842 /// Sliding-window size (samples) for percentile / min-max / jitter /
843 /// lateness aggregation (`REQ_0100`). Default `1024`.
844 #[must_use]
845 pub const fn stats_window(mut self, samples: u32) -> Self {
846 self.stats_window = Some(samples);
847 self
848 }
849
850 /// Substitute the telemetry time source. Defaults to [`SystemClock`].
851 ///
852 /// Pass a [`MockClock`](crate::MockClock) clone to drive `took` / jitter /
853 /// lateness from scripted instants, making timing assertions exact and
854 /// independent of the host scheduler. The clock affects telemetry only —
855 /// scheduling, run-mode deadlines and fault detection always use the real
856 /// monotonic clock.
857 #[must_use]
858 pub fn clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
859 self.clock = Some(clock);
860 self
861 }
862
863 /// Select cyclic dispatch timing (default `DispatchMode::Grid`). `Legacy` is
864 /// the pre-REQ_0268 `attach_interval` path, retained only until the Pi A/B.
865 #[must_use]
866 pub const fn dispatch_mode(mut self, mode: crate::DispatchMode) -> Self {
867 self.dispatch_mode = mode;
868 self
869 }
870
871 /// Override the scheduling clock (default `MonotonicCyclicClock`). Distinct
872 /// from `clock` (telemetry) — see `CyclicClock`.
873 #[must_use]
874 pub fn cyclic_clock(mut self, clock: std::sync::Arc<dyn crate::CyclicClock>) -> Self {
875 self.cyclic_clock = Some(clock);
876 self
877 }
878
879 /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
880 /// for worker threads. Has no effect when `worker_threads` is `0` (inline
881 /// mode). Requires the `thread_attrs` feature for non-default settings.
882 #[must_use]
883 #[allow(clippy::missing_const_for_fn)]
884 pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
885 self.worker_attrs = attrs;
886 self
887 }
888
889 /// Register a best-effort last-gasp handler invoked once on the fail-fast
890 /// path immediately before `std::process::abort()`.
891 ///
892 /// **Contract**: runs over known-unsound executor state — MUST NOT touch
893 /// executor internals; a panic inside the handler routes straight to
894 /// `abort()`.
895 ///
896 /// The handler is expected to be time-bounded (the caller's responsibility);
897 /// no runtime deadline is imposed.
898 ///
899 /// **Observer / monitor containment carve-out**: the panic containment
900 /// described in the executor documentation covers only a user item's
901 /// `execute()` call. Panics that originate in framework-invoked user
902 /// callbacks that run *outside* that inner catch — such as
903 /// [`Observer`](crate::Observer) methods (e.g. `on_app_error`,
904 /// `on_task_fault`) and [`ExecutionMonitor`](crate::ExecutionMonitor)
905 /// methods (e.g. `post_execute`) — escape to this fail-fast boundary and
906 /// cause `abort()`. Those callbacks must therefore be treated as
907 /// non-panicking by the implementor. See `REQ_0123`.
908 ///
909 /// If not called, a no-op handler is used and `abort()` is still reached
910 /// after any unrecoverable fault.
911 #[must_use]
912 pub fn on_fatal(
913 mut self,
914 handler: impl Fn(&crate::FatalContext) + Send + Sync + 'static,
915 ) -> Self {
916 self.fatal_handler = Some(Arc::new(handler));
917 self
918 }
919
920 /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
921 /// internal stop-event service so that any `Stoppable` clone (taken before
922 /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
923 ///
924 /// # Panics
925 ///
926 /// Panics if the internally-generated stop-event service name exceeds the
927 /// iceoryx2 service name length limit (this cannot happen under normal use
928 /// because the name is derived from the process id and a monotonic counter).
929 #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
930 #[track_caller]
931 pub fn build(self) -> Result<Executor, ExecutorError> {
932 let node = NodeBuilder::new()
933 .create::<ipc::Service>()
934 .map_err(ExecutorError::iceoryx2)?;
935
936 let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
937
938 // Resolve the fatal handler: use the user-supplied one or fall back to a no-op.
939 let fatal_handler: FatalHandler = self
940 .fatal_handler
941 .unwrap_or_else(|| Arc::new(|_ctx: &crate::FatalContext| {}));
942 let fatal_dispatch = Arc::new(FatalDispatch::new(fatal_handler));
943
944 let pool = Arc::new(Pool::new(
945 n_workers,
946 self.worker_attrs,
947 Arc::clone(&fatal_dispatch),
948 )?);
949
950 // Build the internal stop event service with a unique-per-process name
951 // so multiple executors in the same process don't collide.
952 let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
953 let stop_topic = format!(
954 "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
955 std::process::id()
956 );
957 let stop_event = node
958 .service_builder(&stop_topic.as_str().try_into().unwrap())
959 .event()
960 .open_or_create()
961 .map_err(ExecutorError::iceoryx2)?;
962
963 let stop_notifier = Arc::new(
964 stop_event
965 .notifier_builder()
966 .create()
967 .map_err(ExecutorError::iceoryx2)?,
968 );
969
970 // SAFETY: see module-level note; Arc<IxListener> is held here and only
971 // accessed on the executor thread.
972 let stop_listener = Arc::new(
973 stop_event
974 .listener_builder()
975 .create()
976 .map_err(ExecutorError::iceoryx2)?,
977 );
978
979 // Wire the notifier into the Stoppable so every clone is waker-aware
980 // from the moment the executor is built.
981 let stoppable = Stoppable::with_waker(stop_notifier);
982
983 let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
984
985 let monitor: Arc<dyn ExecutionMonitor> =
986 self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
987
988 let clock: Arc<dyn MonotonicClock> =
989 self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
990
991 let cyclic_clock: std::sync::Arc<dyn crate::CyclicClock> = self
992 .cyclic_clock
993 .unwrap_or_else(|| std::sync::Arc::new(crate::MonotonicCyclicClock::new()));
994
995 let exec = Executor {
996 node,
997 pool,
998 tasks: Vec::new(),
999 cycle_stats: Vec::new(),
1000 stats_window: self.stats_window.unwrap_or(1024),
1001 running: Arc::new(AtomicBool::new(false)),
1002 stoppable,
1003 next_id: AtomicU64::new(0),
1004 stop_listener,
1005 observer,
1006 monitor,
1007 iter_err: Arc::new(std::sync::Mutex::new(None)),
1008 iteration_budget: self.iteration_budget,
1009 exec_fault: Arc::new(ExecutorFaultAtomic::new()),
1010 exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
1011 exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
1012 start_time: Arc::new(OnceLock::new()),
1013 fatal_dispatch,
1014 clock,
1015 grid_epoch: Arc::new(OnceLock::new()),
1016 dispatch_mode: self.dispatch_mode,
1017 cyclic_clock,
1018 };
1019
1020 Ok(exec)
1021 }
1022}
1023
1024// ── Run loop ──────────────────────────────────────────────────────────────────
1025
1026impl Executor {
1027 /// Run the executor until [`Stoppable::stop`] is called or a task signals
1028 /// stop via [`crate::Context::stop_executor`].
1029 ///
1030 /// # Errors
1031 ///
1032 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1033 ///
1034 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1035 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1036 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1037 ///
1038 /// If multiple items error in the same dispatch iteration, only the first
1039 /// is preserved; subsequent errors are discarded silently. To observe
1040 /// every error, attach an [`Observer`](crate::Observer) and read errors
1041 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1042 pub fn run(&mut self) -> Result<(), ExecutorError> {
1043 self.run_inner(RunMode::Forever)
1044 }
1045
1046 /// Run for at most `max` wall-clock duration, then return.
1047 ///
1048 /// # Errors
1049 ///
1050 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1051 ///
1052 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1053 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1054 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1055 ///
1056 /// If multiple items error in the same dispatch iteration, only the first
1057 /// is preserved; subsequent errors are discarded silently. To observe
1058 /// every error, attach an [`Observer`](crate::Observer) and read errors
1059 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1060 pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
1061 self.run_inner(RunMode::Until(Instant::now() + max))
1062 }
1063
1064 /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
1065 ///
1066 /// # Errors
1067 ///
1068 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1069 ///
1070 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1071 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1072 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1073 ///
1074 /// If multiple items error in the same dispatch iteration, only the first
1075 /// is preserved; subsequent errors are discarded silently. To observe
1076 /// every error, attach an [`Observer`](crate::Observer) and read errors
1077 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1078 pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
1079 self.run_inner(RunMode::Iterations(n))
1080 }
1081
1082 /// Run until `predicate()` returns true. Checked after each `WaitSet`
1083 /// wakeup.
1084 ///
1085 /// # Errors
1086 ///
1087 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
1088 ///
1089 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
1090 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
1091 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
1092 ///
1093 /// If multiple items error in the same dispatch iteration, only the first
1094 /// is preserved; subsequent errors are discarded silently. To observe
1095 /// every error, attach an [`Observer`](crate::Observer) and read errors
1096 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
1097 pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
1098 self.run_inner(RunMode::Predicate(&mut predicate))
1099 }
1100}
1101
1102enum RunMode<'a> {
1103 Forever,
1104 Until(Instant),
1105 Iterations(usize),
1106 Predicate(&'a mut dyn FnMut() -> bool),
1107}
1108
1109impl Executor {
1110 fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
1111 // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
1112 // remains true permanently. Calling `run()` again after a stop will return
1113 // promptly without doing any meaningful work (it blocks until the first
1114 // trigger fires, then immediately exits the dispatch loop). Task 10's
1115 // Runner accommodates this by treating an Executor as one-shot: each
1116 // Runner owns the Executor and consumes it.
1117 if self.running.swap(true, Ordering::SeqCst) {
1118 return Err(ExecutorError::AlreadyRunning);
1119 }
1120
1121 self.observer.on_executor_up();
1122 let result = self.dispatch_loop(&mut mode);
1123 match &result {
1124 Ok(()) => self.observer.on_executor_down(),
1125 Err(e) => self.observer.on_executor_error(e),
1126 }
1127
1128 self.running.store(false, Ordering::SeqCst);
1129 result
1130 }
1131
1132 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1133 #[allow(
1134 unsafe_code,
1135 clippy::too_many_lines,
1136 clippy::ref_as_ptr,
1137 clippy::borrow_as_ptr
1138 )]
1139 fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
1140 let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
1141 .create()
1142 .map_err(ExecutorError::iceoryx2)?;
1143
1144 // Keep Arc<RawListener> alive for at least as long as the WaitSet
1145 // guards — the guard borrows the listener via 'attachment lifetime.
1146 let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
1147 // Guards must outlive the run loop.
1148 let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
1149 // Maps guard index → task index.
1150 let mut attachment_to_task: Vec<usize> = Vec::new();
1151
1152 // Hoist to a local for the hot loop — one Copy-enum compare per cycle,
1153 // never a field re-read (REQ_0268).
1154 let dispatch_mode = self.dispatch_mode;
1155
1156 // Cyclic tasks are dispatched by the master timer + GridTimer (REQ_0268),
1157 // not attached as individual WaitSet triggers. Cross-platform: only the
1158 // wake source differs (Task 3).
1159 let mut cyclic_task_indices: Vec<usize> = Vec::new();
1160 let mut cyclic_periods: Vec<u64> = Vec::new();
1161 build_attachments(
1162 &waitset,
1163 &self.tasks,
1164 dispatch_mode,
1165 &mut listener_storage,
1166 &mut guards,
1167 &mut attachment_to_task,
1168 &mut cyclic_task_indices,
1169 &mut cyclic_periods,
1170 )?;
1171 // `cyclic_periods` is cloned, not moved, because Task 3 reads it again to
1172 // arm the single master timerfd; on non-Linux it is unused after this.
1173 let mut grid =
1174 crate::grid::GridTimer::new(self.cyclic_clock.now_nanos(), cyclic_periods.clone());
1175 let mut due_cyclic: Vec<usize> = Vec::new();
1176
1177 // Master cyclic timer (REQ_0268, Linux). ONE timerfd armed at the base
1178 // period (gcd of cyclic periods) drives the absolute grid; GridTimer
1179 // decides which tasks are due each tick. Declared above its own guard so
1180 // it drops AFTER the guard (detach before close → no EBADF). Must be
1181 // declared here, after `build_attachments` has filled `cyclic_periods`.
1182 #[cfg(target_os = "linux")]
1183 let master_timer: Option<crate::timerfd::TimerFd> = {
1184 let base = crate::grid::base_period(&cyclic_periods);
1185 if base == 0 {
1186 None
1187 } else {
1188 Some(
1189 crate::timerfd::TimerFd::new(std::time::Duration::from_nanos(base)).map_err(
1190 |e| {
1191 ExecutorError::DeclareTriggers(format!(
1192 "failed to arm master timerfd: {e}"
1193 ))
1194 },
1195 )?,
1196 )
1197 }
1198 };
1199
1200 // Attach the master timer as a wake-only notification, held separately
1201 // (like the stop listener) so `process_attachment` never maps it to a
1202 // task. `_master_timer_guard` is declared immediately after `master_timer`
1203 // so on scope exit it drops FIRST — detaching the fd from the WaitSet's
1204 // epoll set — and `master_timer` drops SECOND, closing the fd. That
1205 // ordering is what prevents iceoryx2's `EPOLL_CTL_DEL` from hitting a
1206 // closed fd (EBADF). `master_timer`'s fd is referenced ONLY by this guard
1207 // (independent of `guards`/`listener_storage`, which own other fds), so
1208 // its drop position relative to those Vecs is immaterial.
1209 #[cfg(target_os = "linux")]
1210 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1211 let _master_timer_guard = match &master_timer {
1212 // SAFETY: `master_timer` is a stack local that outlives this guard
1213 // (declared above it); the cast erases the borrow lifetime to the
1214 // attachment lifetime, sound by the same discipline as the stop
1215 // listener. Dropped before `master_timer` closes the fd.
1216 Some(tf) => Some(
1217 waitset
1218 .attach_notification(unsafe { &*(tf as *const crate::timerfd::TimerFd) })
1219 .map_err(ExecutorError::iceoryx2)?,
1220 ),
1221 None => None,
1222 };
1223
1224 // Attach the internal stop listener so the WaitSet wakes when
1225 // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
1226 // struct which is valid for the lifetime of dispatch_loop. We use the
1227 // same raw-pointer-cast pattern as user listeners above.
1228 //
1229 // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
1230 // exclusively borrowed for the duration of `run_inner` (which calls
1231 // `dispatch_loop`). The listener is not freed while the guard is alive
1232 // because the Arc keeps it alive and `self` outlives this function.
1233 let stop_listener_ref: &IxListener<ipc::Service> =
1234 unsafe { &*(self.stop_listener.as_ref() as *const _) };
1235 let _stop_guard = waitset
1236 .attach_notification(stop_listener_ref)
1237 .map_err(ExecutorError::iceoryx2)?;
1238
1239 let iterations_done = AtomicUsize::new(0);
1240 let stop_flag = self.stoppable.clone();
1241
1242 loop {
1243 // Reset the pre-allocated per-iteration error slot (REQ_0060):
1244 // the slot is owned by `self.iter_err`, allocated once at build
1245 // time. Pool worker closures obtain a refcount-only clone of
1246 // the `Arc`; the slot itself is reused across iterations.
1247 #[allow(clippy::unwrap_used)]
1248 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1249 let mut iter_err_guard = self.iter_err.lock().unwrap();
1250 *iter_err_guard = None;
1251 drop(iter_err_guard);
1252
1253 // SAFETY: we capture &mut self.tasks via a raw pointer because
1254 // wait_and_process expects FnMut and Rust can't see the closure
1255 // outlives `self`. The discipline that makes this sound:
1256 // 1. The closure body on the executor thread is the *only* code that
1257 // reads `tasks_ptr`. The pool jobs it submits hold borrowed
1258 // `*mut dyn ExecutableItem` slices into individual TaskEntries,
1259 // not into the Vec itself, so they don't race with the Vec.
1260 // 2. `pool.barrier()` at the end of this callback ensures every
1261 // submitted pool job has completed (and dropped its raw pointer)
1262 // before the callback returns. The next iteration of the WaitSet
1263 // loop is therefore the sole user of `tasks_ptr` again.
1264 // 3. The Vec is never resized inside this loop (no `push` / `remove`
1265 // after dispatch starts), so the underlying buffer addresses are
1266 // stable for the lifetime of `dispatch_loop`.
1267 let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
1268 // Take the cycle_stats raw pointer before borrowing `observer`, so
1269 // the &mut borrow is released first — same discipline as tasks_ptr.
1270 let cycle_stats_ptr = &mut self.cycle_stats as *mut Vec<TaskCycleStats>;
1271 let observer = &self.observer;
1272 let pool = &self.pool;
1273 // Refcount-only clone of the pre-allocated error slot. Pool jobs
1274 // need a `'static` handle, and an `Arc::clone` does not allocate.
1275 // The Single/Chain paths use the closure baked into `task.job`,
1276 // which already captured stable Arc clones at `add`-time; the
1277 // Graph path uses closures pre-built by `prepare_dispatch`. Only
1278 // the error-aggregation logic on the WaitSet thread still needs
1279 // the slot here.
1280 let iter_err_inner = Arc::clone(&self.iter_err);
1281 // Raw pointer to the stop listener for draining inside the callback.
1282 // SAFETY: same as stop_listener_ref above — the Arc is alive for
1283 // the lifetime of dispatch_loop.
1284 let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
1285 // Raw pointer to the executor-wide fault state. Same safety
1286 // discipline as `tasks_ptr`: `Executor` is alive for the
1287 // duration of `dispatch_loop`; the WaitSet callback is the
1288 // only reader. REQ_0071. `self.exec_fault` is
1289 // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
1290 // pointer to the inner `ExecutorFaultAtomic`.
1291 let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
1292 // Raw pointer to the executor start time. Used by the lazy
1293 // cascade below to compute `since_ms` on task transitions
1294 // triggered by an executor-wide fault.
1295 let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1296 // Telemetry clock + lateness grid epoch. Same lifetime/aliasing
1297 // discipline as the pointers above: the Executor outlives the
1298 // dispatch loop and the WaitSet callback is the sole reader.
1299 let clock = &self.clock;
1300 let grid_epoch_ptr = &*self.grid_epoch as *const OnceLock<u64>;
1301
1302 // Wrap the per-iteration dispatch body in the framework panic
1303 // boundary. A panic escaping here is *infrastructure* (the WaitSet
1304 // drive, pool submission/barrier, or dispatch wiring) — not a user
1305 // item panic, which is already caught and faulted inside
1306 // `run_item_catch_unwind`. On such a panic `guard_or_fatal` runs the
1307 // user fatal handler then aborts in production. Under a test
1308 // terminal it returns `None`, in which case we must NOT keep
1309 // iterating over possibly-corrupt executor state, so we break out.
1310 let Some(cb_result) =
1311 guard_or_fatal(&self.fatal_dispatch, FatalSite::ExecutorRunLoop, || {
1312 // Bundle the per-iteration captures into a single context the
1313 // WaitSet callback delegates to. Keeping the closure a thin
1314 // adapter over `DispatchPass::process_attachment` keeps the
1315 // dispatch logic in named, individually-measurable functions.
1316 let mut pass = DispatchPass {
1317 guards: &guards,
1318 attachment_to_task: &attachment_to_task,
1319 tasks_ptr,
1320 cycle_stats_ptr,
1321 observer,
1322 exec_fault_ptr,
1323 exec_start_ptr,
1324 clock,
1325 grid_epoch_ptr,
1326 stop_listener_ptr,
1327 pool,
1328 iter_err: &iter_err_inner,
1329 };
1330
1331 // Linux: block on fds — the master timerfd wakes us on the
1332 // absolute grid. Non-Linux dev: bound the wait by the earliest
1333 // pending grid target so the post-wait pass can dispatch.
1334 #[cfg(target_os = "linux")]
1335 let timeout = std::time::Duration::MAX;
1336 #[cfg(not(target_os = "linux"))]
1337 let timeout = match dispatch_mode {
1338 crate::DispatchMode::Grid => {
1339 grid.next_timeout(self.cyclic_clock.now_nanos())
1340 }
1341 crate::DispatchMode::Legacy => std::time::Duration::MAX,
1342 };
1343 waitset.wait_and_process_once_with_timeout(
1344 |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1345 pass.process_attachment(&attachment_id)
1346 },
1347 timeout,
1348 )
1349 })
1350 else {
1351 // Only reachable under a test terminal (production aborts in
1352 // `fire`). Bail out of the run loop rather than continuing over
1353 // possibly-corrupt executor state.
1354 //
1355 // Unreachable in production: the production terminal aborts
1356 // before returning, so this branch exists solely so a
1357 // `#[cfg(test)]` recording terminal can unwind the loop.
1358 // Consequently, silently discarding any pending `iter_err`
1359 // here is immaterial to production behavior.
1360 break Ok(());
1361 };
1362
1363 // Did the master timer tick this wake? Linux: drain it (clears epoll
1364 // readiness; >0 overruns means the absolute grid advanced). Non-Linux:
1365 // the self-computed timeout drove the wake, so always consult the grid
1366 // (take_due self-gates per task on `now >= next`). REQ_0268.
1367 #[cfg(target_os = "linux")]
1368 let ticked = master_timer.as_ref().is_some_and(|tf| tf.drain() > 0);
1369 #[cfg(not(target_os = "linux"))]
1370 let ticked = true;
1371
1372 // Post-wait master-grid pass (Grid mode). `run_grid_cyclic_pass`
1373 // self-gates on `ticked` / stop-wake / mode / non-empty, then
1374 // dispatches EVERY due cyclic task atomically this tick (PLC
1375 // semantics). `cpass` is a side-effect-free bundle of borrows, so
1376 // building it unconditionally is free; the gate lives in the helper
1377 // to keep `dispatch_loop` within the complexity budget. REQ_0268.
1378 let cpass = DispatchPass {
1379 guards: &guards,
1380 attachment_to_task: &attachment_to_task,
1381 tasks_ptr,
1382 cycle_stats_ptr,
1383 observer,
1384 exec_fault_ptr,
1385 exec_start_ptr,
1386 clock,
1387 grid_epoch_ptr,
1388 stop_listener_ptr,
1389 pool,
1390 iter_err: &iter_err_inner,
1391 };
1392 run_grid_cyclic_pass(
1393 cpass,
1394 ticked,
1395 dispatch_mode,
1396 &stop_flag,
1397 cb_result,
1398 &mut grid,
1399 self.cyclic_clock.now_nanos(),
1400 &cyclic_task_indices,
1401 &mut due_cyclic,
1402 );
1403
1404 // Funnel the post-callback decision (interrupt / item error /
1405 // stop request / run-mode termination) through one helper that
1406 // yields a single control value, so the loop has exactly one exit.
1407 match self.after_callback(cb_result, mode, &iterations_done, &stop_flag) {
1408 IterOutcome::Continue => {}
1409 IterOutcome::Done => break Ok(()),
1410 IterOutcome::Failed(err) => break Err(err),
1411 }
1412 }
1413 }
1414
1415 /// Evaluates the post-callback termination conditions for one dispatch
1416 /// iteration and reports whether the loop should continue, stop, or fail.
1417 ///
1418 /// Order of precedence matches the original inline checks: `WaitSet`
1419 /// errors, then SIGINT/SIGTERM, then a captured item error, then a stop
1420 /// request, then the active [`RunMode`] limit.
1421 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1422 fn after_callback(
1423 &self,
1424 cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1425 mode: &mut RunMode<'_>,
1426 iterations_done: &AtomicUsize,
1427 stop_flag: &Stoppable,
1428 ) -> IterOutcome {
1429 let cb_result = match cb_result.map_err(ExecutorError::iceoryx2) {
1430 Ok(r) => r,
1431 Err(e) => return IterOutcome::Failed(e),
1432 };
1433
1434 // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
1435 // here for a clean exit.
1436 if matches!(
1437 cb_result,
1438 WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
1439 ) {
1440 return IterOutcome::Done;
1441 }
1442
1443 // Extract the error before dropping the MutexGuard — avoids holding the
1444 // lock across the return (clippy::significant_drop_in_scrutinee).
1445 #[allow(clippy::unwrap_used)]
1446 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1447 let maybe_err = self.iter_err.lock().unwrap().take();
1448 if let Some(err) = maybe_err {
1449 return IterOutcome::Failed(err);
1450 }
1451 if stop_flag.is_stopped() {
1452 return IterOutcome::Done;
1453 }
1454
1455 iterations_done.fetch_add(1, Ordering::SeqCst);
1456 let reached_limit = match mode {
1457 RunMode::Forever => false,
1458 RunMode::Iterations(n) => iterations_done.load(Ordering::SeqCst) >= *n,
1459 RunMode::Until(deadline) => Instant::now() >= *deadline,
1460 RunMode::Predicate(p) => (p)(),
1461 };
1462 if reached_limit {
1463 IterOutcome::Done
1464 } else {
1465 IterOutcome::Continue
1466 }
1467 }
1468}
1469
1470/// Outcome of one `dispatch_loop` iteration's post-callback evaluation.
1471enum IterOutcome {
1472 /// Run another iteration.
1473 Continue,
1474 /// Terminate the loop successfully.
1475 Done,
1476 /// Terminate the loop with the given error.
1477 Failed(ExecutorError),
1478}
1479
1480/// Post-wait absolute-grid pass (Grid mode only, `REQ_0268` / `ADR_0100`).
1481///
1482/// The `WaitSet` callback handles event/fd tasks; cyclic tasks are timed here
1483/// off the scheduling clock. `pass` mirrors the callback's `DispatchPass`
1484/// exactly — same borrows and raw pointers, same single-writer WaitSet-thread
1485/// discipline — and the callback is already dropped (its borrows freed) by the
1486/// time this runs. We poll `grid` for due cyclic slots, dispatch each due task,
1487/// and fold their telemetry through the SHARED [`DispatchPass::barrier_and_record`]
1488/// helper. This is a SEPARATE barrier phase from the callback's: each phase
1489/// barriers and folds only its own `pending_cycle` stashes, so cyclic tasks
1490/// record exactly once, identically to event tasks. We do NOT call
1491/// `record_cycle_for` directly here.
1492///
1493/// Self-gates and returns early (no dispatch, no record) unless this wake should
1494/// run the grid: the master timer ticked (`ticked`), we are in `Grid` mode, it is
1495/// not a stop wake, and there is at least one cyclic task with something due.
1496///
1497/// **Stop-wake suppression (`REQ_0268`)**: a `stop()` (or a SIGINT/SIGTERM
1498/// `cb_result`) must emit no spurious cyclic cycle — Legacy dispatches none on a
1499/// stop wake, so the grid path matches, or a `stop()` would emit one extra cycle
1500/// observation and desync the `FEAT_0038` `cycle_index` join key. Termination
1501/// itself is still decided by `after_callback`; this only suppresses the side
1502/// effects.
1503#[allow(clippy::too_many_arguments)]
1504fn run_grid_cyclic_pass(
1505 mut pass: DispatchPass<'_, '_, '_>,
1506 ticked: bool,
1507 dispatch_mode: crate::DispatchMode,
1508 stop_flag: &Stoppable,
1509 cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
1510 grid: &mut crate::grid::GridTimer,
1511 now_nanos: u64,
1512 cyclic_task_indices: &[usize],
1513 due_cyclic: &mut Vec<usize>,
1514) {
1515 let stopping = stop_flag.is_stopped()
1516 || matches!(
1517 cb_result,
1518 Ok(WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest)
1519 );
1520 if !ticked
1521 || stopping
1522 || dispatch_mode != crate::DispatchMode::Grid
1523 || cyclic_task_indices.is_empty()
1524 {
1525 return;
1526 }
1527 grid.take_due(now_nanos, due_cyclic);
1528 if due_cyclic.is_empty() {
1529 return;
1530 }
1531 for slot in due_cyclic.iter() {
1532 pass.dispatch_task(cyclic_task_indices[*slot]);
1533 }
1534 pass.barrier_and_record();
1535}
1536
1537/// Build every `WaitSet` attachment for the task table (`REQ_0268`). In `Grid`
1538/// mode, `TriggerDecl::Interval` cyclic tasks are only *collected* into
1539/// `cyclic_task_indices` / `cyclic_periods` — they are NOT attached as
1540/// individual `WaitSet` triggers. The master timer + `GridTimer` owns their
1541/// wakeups (cross-platform; wake-source wiring is done in the caller).
1542/// Every other decl (and every decl in `Legacy` mode, including `Interval`
1543/// via `attach_interval`) is attached normally. Extracted from `dispatch_loop`
1544/// to keep that function within the cyclomatic-complexity budget.
1545#[allow(clippy::too_many_arguments)]
1546fn build_attachments<'w>(
1547 waitset: &'w WaitSet<ipc::Service>,
1548 tasks: &[TaskEntry],
1549 dispatch_mode: crate::DispatchMode,
1550 listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1551 guards: &mut Vec<WaitSetGuard<'w, 'w, ipc::Service>>,
1552 attachment_to_task: &mut Vec<usize>,
1553 cyclic_task_indices: &mut Vec<usize>,
1554 cyclic_periods: &mut Vec<u64>,
1555) -> Result<(), ExecutorError> {
1556 for (task_idx, task) in tasks.iter().enumerate() {
1557 for decl in &task.decls {
1558 if dispatch_mode == crate::DispatchMode::Grid {
1559 if let TriggerDecl::Interval(d) = decl {
1560 // Grid mode owns cyclic timing via the master timer + GridTimer;
1561 // these decls are NOT attached as individual WaitSet triggers.
1562 cyclic_task_indices.push(task_idx);
1563 cyclic_periods.push(u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
1564 continue;
1565 }
1566 }
1567 let guard = attach_trigger_decl(waitset, listener_storage, decl)?;
1568 guards.push(guard);
1569 attachment_to_task.push(task_idx);
1570 }
1571 }
1572 Ok(())
1573}
1574
1575/// Attaches a single [`TriggerDecl`] to `waitset`, returning the resulting
1576/// guard.
1577///
1578/// Listener-backed declarations (`Subscriber`, `Deadline`, `RawListener`)
1579/// clone the listener `Arc` into `listener_storage` to extend its lifetime to
1580/// the surrounding `dispatch_loop` scope; `Interval` attaches a bare timer.
1581///
1582/// # Safety
1583///
1584/// The returned guard borrows the listener via a raw-pointer cast that erases
1585/// its lifetime. Soundness relies on the caller keeping `listener_storage` (and
1586/// `waitset`) alive for at least as long as the guard, and dropping the guards
1587/// before `listener_storage` — exactly the discipline `dispatch_loop` follows.
1588#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1589fn attach_trigger_decl<'w>(
1590 waitset: &'w WaitSet<ipc::Service>,
1591 listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
1592 decl: &TriggerDecl,
1593) -> Result<WaitSetGuard<'w, 'w, ipc::Service>, ExecutorError> {
1594 // Clone the listener Arc and obtain a lifetime-erased reference. SAFETY:
1595 // both `listener_storage` and `waitset` are stack-local in `dispatch_loop`
1596 // and dropped together at its end; guards are dropped before
1597 // `listener_storage`.
1598 let mut listener_ref = |listener: &Arc<crate::trigger::RawListener>| {
1599 listener_storage.push(Arc::clone(listener));
1600 let l_ref = listener_storage.last().unwrap().as_ref();
1601 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
1602 l_ref
1603 };
1604
1605 let guard = match decl {
1606 TriggerDecl::Subscriber { listener } | TriggerDecl::RawListener(listener) => {
1607 waitset.attach_notification(listener_ref(listener))
1608 }
1609 TriggerDecl::Interval(d) => waitset.attach_interval(*d),
1610 TriggerDecl::Deadline { listener, deadline } => {
1611 waitset.attach_deadline(listener_ref(listener), *deadline)
1612 }
1613 };
1614 guard.map_err(ExecutorError::iceoryx2)
1615}
1616
1617/// Per-iteration dispatch context handed to the `WaitSet` callback.
1618///
1619/// `dispatch_loop` rebuilds one of these every iteration and the `WaitSet`
1620/// callback is a thin adapter over [`DispatchPass::process_attachment`]. All
1621/// fields are short-lived borrows / raw pointers into the `Executor` that owns
1622/// the surrounding `dispatch_loop`; their soundness is documented at each use
1623/// site in `dispatch_loop` (same single-threaded, barrier-bounded discipline).
1624struct DispatchPass<'a, 'g, 'w> {
1625 /// `WaitSet` guards, indexed in parallel with `attachment_to_task`.
1626 guards: &'a [WaitSetGuard<'g, 'w, ipc::Service>],
1627 /// Maps guard index to task index in `tasks_ptr`.
1628 attachment_to_task: &'a [usize],
1629 /// Raw pointer to `Executor::tasks`.
1630 tasks_ptr: *mut Vec<TaskEntry>,
1631 /// Raw pointer to `Executor::cycle_stats` (index-aligned with `tasks`).
1632 cycle_stats_ptr: *mut Vec<TaskCycleStats>,
1633 /// Borrow of the executor's observer for the `on_cycle_stats` push.
1634 observer: &'a Arc<dyn Observer>,
1635 /// Raw pointer to `Executor::exec_fault` inner state.
1636 exec_fault_ptr: *const ExecutorFaultAtomic,
1637 /// Raw pointer to `Executor::start_time`.
1638 exec_start_ptr: *const OnceLock<Instant>,
1639 /// Borrow of the executor's telemetry clock, read for each cycle's `pre`.
1640 clock: &'a Arc<dyn MonotonicClock>,
1641 /// Raw pointer to `Executor::grid_epoch` (lateness grid anchor, `REQ_0106`).
1642 grid_epoch_ptr: *const OnceLock<u64>,
1643 /// Raw pointer to the internal stop listener.
1644 stop_listener_ptr: *const IxListener<ipc::Service>,
1645 /// Borrow of the executor thread pool.
1646 pool: &'a Pool,
1647 /// Refcount-only handle to the per-iteration error slot.
1648 iter_err: &'a Arc<std::sync::Mutex<Option<ExecutorError>>>,
1649}
1650
1651impl DispatchPass<'_, '_, '_> {
1652 /// Dispatches a single task by index for one wakeup: takes the `&mut`
1653 /// borrow into the task table, applies the pre-dispatch fault gate, stashes
1654 /// this cycle's `pending_cycle` timestamp for the post-barrier telemetry
1655 /// fold, and submits the task's work to the pool.
1656 ///
1657 /// Shared by the `WaitSet` callback (`process_attachment`) and — per
1658 /// `REQ_0268` / `ADR_0100` — the forthcoming post-wait absolute-grid
1659 /// dispatch pass, so the per-task barrier/telemetry contract is identical
1660 /// across both call paths.
1661 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1662 #[allow(unsafe_code)]
1663 fn dispatch_task(&mut self, task_idx: usize) {
1664 // SAFETY: we are the only thread that may touch the task table
1665 // during the callback. wait_and_process_once is single-threaded
1666 // and dispatch_loop holds &mut self. The pointer is valid for the
1667 // duration of this call.
1668 let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1669
1670 // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072). When it
1671 // routes to a (possible) handler, normal dispatch is skipped.
1672 if self.handle_fault_routing(task) {
1673 // REQ_0107: a faulted/fault-routed scan STILL advances
1674 // cycle_index and emits on_cycle_stats, or the executor's count
1675 // desyncs from the connector's join key (FEAT_0038). took/jitter
1676 // are None (poison-safe); the index always moves. Allocation-free:
1677 // a CyclePending { Instant, bool } written onto the TaskEntry,
1678 // no heap.
1679 if task.scan_period.is_some() {
1680 task.pending_cycle = Some(CyclePending {
1681 pre: self.clock.now_nanos(),
1682 faulted: true,
1683 });
1684 }
1685 return;
1686 }
1687
1688 // Stash the pre-dispatch instant so the post-barrier record pass
1689 // can fold this cycle's telemetry. Allocation-free: the timestamp
1690 // lives on the TaskEntry, not in a per-wakeup Vec. `take`n in the
1691 // post-barrier loop below — guarantees exactly-once even if two
1692 // guards map to the same task. `faulted: false`: a task that faulted
1693 // last wakeup and recovered this one records the normal path (the
1694 // whole CyclePending is overwritten, so the flag can't be stale).
1695 task.pending_cycle = Some(CyclePending {
1696 pre: self.clock.now_nanos(),
1697 faulted: false,
1698 });
1699
1700 self.submit_task_job(task);
1701 }
1702
1703 /// Handles a single `WaitSet` wakeup: drains stop notifications, then
1704 /// dispatches every task whose attachment fired. Always returns
1705 /// [`CallbackProgression::Continue`]; termination is decided by the
1706 /// `stop_flag` check in `dispatch_loop` after the callback returns.
1707 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1708 #[allow(unsafe_code)]
1709 fn process_attachment(
1710 &mut self,
1711 attachment_id: &WaitSetAttachmentId<ipc::Service>,
1712 ) -> CallbackProgression {
1713 // Drain stop notifications first (no dispatch — the stop_flag check
1714 // after the callback returns handles termination).
1715 // SAFETY: stop_listener_ptr is valid for the duration of the call;
1716 // the Arc in self.stop_listener keeps it alive.
1717 let stop_l = unsafe { &*self.stop_listener_ptr };
1718 while let Ok(Some(_)) = stop_l.try_wait_one() {}
1719
1720 for i in 0..self.guards.len() {
1721 let guard = &self.guards[i];
1722 let fired =
1723 attachment_id.has_event_from(guard) || attachment_id.has_missed_deadline(guard);
1724 if !fired {
1725 continue;
1726 }
1727 let task_idx = self.attachment_to_task[i];
1728 self.dispatch_task(task_idx);
1729 }
1730
1731 self.barrier_and_record();
1732
1733 CallbackProgression::Continue
1734 }
1735
1736 /// Barrier all submitted pool jobs for this dispatch phase, then fold each
1737 /// task's stashed `pending_cycle` into recorded cycle telemetry. Shared by
1738 /// the `WaitSet` callback (event/fd tasks) and the post-wait grid pass
1739 /// (cyclic tasks, `REQ_0268`). Keyed on `pending_cycle` so it records
1740 /// exactly the tasks dispatched this phase, exactly once.
1741 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1742 #[allow(unsafe_code)]
1743 fn barrier_and_record(&mut self) {
1744 // Wait for all submitted jobs to finish before leaving the callback
1745 // scope (validates item_ptr safety contract). The barrier also makes
1746 // every worker's `last_took_ns` Release-store visible to the record
1747 // pass below.
1748 self.pool.barrier();
1749
1750 // Post-barrier telemetry fold. The source of truth for "this task was
1751 // dispatched this wakeup and owes a record" is `pending_cycle`,
1752 // set in the dispatch loop above — not the guard fired-status. Keying
1753 // solely on the stash (rather than re-querying `has_event_from`)
1754 // removes any dependency on the fired-status query being stable across
1755 // a second scan, so a dispatched cycle can never be silently
1756 // under-recorded (which would lag `cycle_index` — the desync FEAT_0038
1757 // must avoid). `take` clears the stash, guaranteeing exactly-once.
1758 // Allocation-free: iterate task indices in place.
1759 // SAFETY: same single-writer WaitSet-thread discipline as the dispatch
1760 // loop above; barrier-bounded, no in-flight pool job aliases `tasks`.
1761 let task_count = unsafe { (*self.tasks_ptr).len() };
1762 for task_idx in 0..task_count {
1763 // SAFETY: single-writer WaitSet thread; borrow released before
1764 // the record_cycle_for call (which re-derefs tasks_ptr).
1765 let pending = unsafe { (&mut *self.tasks_ptr)[task_idx].pending_cycle.take() };
1766 if let Some(CyclePending { pre, faulted }) = pending {
1767 self.record_cycle_for(task_idx, faulted, pre);
1768 }
1769 }
1770 }
1771
1772 /// Fold one scan cycle's telemetry and push it to the observer. Called
1773 /// once per fired CYCLIC attachment per wakeup. `faulted = true` (Task 10)
1774 /// means the scan was skipped/errored: `took`/`jitter`/`lateness` are
1775 /// unmeasured. Event-driven tasks (no `scan_period`) are skipped entirely
1776 /// (`REQ_0106`).
1777 #[allow(unsafe_code)]
1778 fn record_cycle_for(&mut self, task_idx: usize, faulted: bool, pre_ns: u64) {
1779 // SAFETY: single-writer WaitSet thread; same discipline as tasks_ptr.
1780 let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
1781 let Some(period) = task.scan_period else {
1782 return; // event-driven: no cycle telemetry
1783 };
1784 let period_ns = u64::try_from(period.as_nanos()).unwrap_or(u64::MAX);
1785
1786 // Release/Acquire pairing with the worker store (M2): `swap` acquires
1787 // the worker's Release-store and resets the sentinel atomically.
1788 let took_raw = task.last_took_ns.swap(u64::MAX, Ordering::AcqRel);
1789 let took = if faulted || took_raw == u64::MAX {
1790 None
1791 } else {
1792 Some(took_raw)
1793 };
1794
1795 // actual_period + jitter vs the previous dispatch (REQ_0101). Always
1796 // advance `last_dispatch` (even on a faulted attempt) so the next
1797 // cycle's period is measured from this wakeup. `actual_period` is
1798 // `None` on the very first cycle (no previous timestamp); jitter is
1799 // additionally suppressed on a faulted scan (poison-safe: REQ_0107).
1800 let actual_period = task
1801 .last_dispatch
1802 .replace(pre_ns)
1803 .map(|prev| pre_ns.saturating_sub(prev));
1804 let jitter = if faulted {
1805 None
1806 } else {
1807 actual_period.map(|ap| ap.abs_diff(period_ns))
1808 };
1809
1810 // Advance the lateness grid slot (REQ_0106). The slot counts nominal
1811 // periods elapsed and is decoupled from `cycle_index`: a steady
1812 // sub-period slip rounds to exactly one slot per cycle, so drift
1813 // accumulates; a coalesced/missed wakeup (the WaitSet was starved past
1814 // one or more whole periods) advances several slots at once,
1815 // re-anchoring the grid so a transient hiccup does not permanently bias
1816 // every later cycle's lateness. First cycle (`actual_period == None`):
1817 // the slot stays at its initial 0.
1818 if let Some(ap) = actual_period {
1819 // round(ap / period) = (ap + period/2) / period, via checked_div so
1820 // a degenerate period_ns == 0 simply contributes no slot advance.
1821 if let Some(slots) = ap.saturating_add(period_ns / 2).checked_div(period_ns) {
1822 task.grid_slot = task.grid_slot.saturating_add(slots.max(1));
1823 }
1824 }
1825 let grid_slot = task.grid_slot;
1826
1827 // SAFETY: cycle_stats is index-aligned with tasks; single-writer.
1828 let stats = unsafe { &mut (&mut *self.cycle_stats_ptr)[task_idx] };
1829
1830 // Deadline lateness (REQ_0106): signed offset of the actual start
1831 // (`pre_ns`) from its nominal grid point `grid_epoch + grid_slot*period`,
1832 // where `grid_epoch` is this task set's first recorded `pre`. Positive
1833 // => started late; negative => early. Captures steady drift (jitter is
1834 // blind to a constant offset; lateness is not) while self-healing across
1835 // discrete missed wakeups via the grid-slot re-anchoring above.
1836 let lateness = if period_ns > 0 && !faulted {
1837 // SAFETY: grid_epoch_ptr derefs the Executor owning this dispatch_loop.
1838 let grid_epoch = *unsafe { &*self.grid_epoch_ptr }.get_or_init(|| pre_ns);
1839 let elapsed_ns = i64::try_from(pre_ns.saturating_sub(grid_epoch)).unwrap_or(i64::MAX);
1840 let expected_ns =
1841 i64::try_from(u128::from(grid_slot) * u128::from(period_ns)).unwrap_or(i64::MAX);
1842 Some(elapsed_ns.saturating_sub(expected_ns))
1843 } else {
1844 None
1845 };
1846
1847 let cycle_index = stats.record_cycle(took, jitter, lateness);
1848
1849 let obs = CycleObservation {
1850 cycle_index,
1851 task_id: task.id.clone(),
1852 task_index: u32::try_from(task_idx).unwrap_or(u32::MAX),
1853 faulted,
1854 period_ns,
1855 pre_ns,
1856 actual_period_ns: actual_period,
1857 jitter_ns: jitter,
1858 lateness_ns: lateness,
1859 took_ns: took,
1860 };
1861 self.observer.on_cycle_stats(&obs);
1862 }
1863
1864 /// Applies the pre-dispatch fault gate for `Single`/`Chain` tasks.
1865 ///
1866 /// Returns `true` when the task is routed to its fault handler (or
1867 /// silently skipped because no handler is registered) and normal dispatch
1868 /// must therefore be skipped. Returns `false` when normal dispatch should
1869 /// proceed. `Graph` tasks always return `false` — they use their own
1870 /// per-vertex scheduling and are out of scope for `FEAT_0018`.
1871 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1872 fn handle_fault_routing(&self, task: &mut TaskEntry) -> bool {
1873 if !matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1874 return false;
1875 }
1876
1877 // SAFETY: exec_fault_ptr derefs into the Executor that owns the
1878 // surrounding dispatch_loop — alive for this call's lifetime.
1879 let exec_faulted = matches!(
1880 unsafe { &*self.exec_fault_ptr }.load(0, 0),
1881 ExecutorFaultState::Faulted { .. }
1882 );
1883 let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1884 let task_state = task.fault.load(task_budget_ms);
1885
1886 // Lazy cascade: if executor is `Faulted` and task is still `Running`,
1887 // silently transition the task to `Faulted{ExecutorFaulted}`. No
1888 // `on_task_fault` — the Observer already heard about the executor-wide
1889 // fault via `on_executor_fault` (cascade-noise invariant, FEAT_0018
1890 // §4.6).
1891 let task_faulted = if exec_faulted && matches!(task_state, FaultState::Running) {
1892 // SAFETY: exec_start_ptr derefs into the same Executor owning the
1893 // dispatch_loop. The OnceLock is wait-free.
1894 let exec_start = *unsafe { &*self.exec_start_ptr }.get_or_init(std::time::Instant::now);
1895 let since_ms = instant_to_since_ms(std::time::Instant::now(), exec_start);
1896 let _ = task.fault.swap(
1897 FaultState::Faulted {
1898 reason: FaultReason::ExecutorFaulted,
1899 since_ms,
1900 },
1901 task_budget_ms,
1902 );
1903 true
1904 } else {
1905 matches!(task_state, FaultState::Faulted { .. })
1906 };
1907
1908 if !(exec_faulted || task_faulted) {
1909 return false;
1910 }
1911
1912 // If a handler is registered, dispatch it. Otherwise, skip dispatch
1913 // entirely this wakeup.
1914 if let Some(handler_box) = task.handler_job.as_deref_mut() {
1915 let job_ptr: *mut (dyn FnMut() + Send) = handler_box as *mut (dyn FnMut() + Send);
1916 // SAFETY: same as the main-job dispatch below — handler_job is
1917 // owned by the TaskEntry; pool.barrier() awaits its completion
1918 // before the next callback.
1919 unsafe {
1920 self.pool
1921 .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1922 }
1923 }
1924 true
1925 }
1926
1927 /// Dispatches `task`'s normal (non-fault) work for one wakeup.
1928 ///
1929 /// `Single`/`Chain` tasks submit their pre-built job to the pool;
1930 /// `Graph` tasks drive one pass and capture the first item error into the
1931 /// per-iteration error slot.
1932 #[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1933 #[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
1934 fn submit_task_job(&self, task: &mut TaskEntry) {
1935 match &mut task.kind {
1936 TaskKind::Single(_) | TaskKind::Chain(_) => {
1937 // The dispatch closure was pre-allocated at task-add time and
1938 // stashed on `task.job`. Submit it via `submit_borrowed` — no
1939 // per-iteration Box allocation. Required by REQ_0060.
1940 #[allow(clippy::expect_used)]
1941 // fail-fast: Single/Chain task.job is always Some — set at add time in build_single_job/build_chain_job and never cleared
1942 let job_box = task
1943 .job
1944 .as_deref_mut()
1945 .expect("Single/Chain tasks carry a pre-built job");
1946 let job_ptr: *mut (dyn FnMut() + Send) = job_box as *mut (dyn FnMut() + Send);
1947 // SAFETY: the closure lives in `task.job`, owned by
1948 // `self.tasks[task_idx]`; `tasks_ptr` is sound for the
1949 // duration of this callback. `pool.barrier()` in
1950 // `process_attachment` finishes the closure invocation before
1951 // the next iteration's callback. The WaitSet thread does not
1952 // touch the closure between this submit and that barrier.
1953 unsafe {
1954 self.pool
1955 .submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1956 }
1957 }
1958 TaskKind::Graph(graph) => {
1959 // Outer driver runs on the WaitSet thread; vertices run on the
1960 // pool. The graph holds its own pre-built per-vertex closures
1961 // and SPSC ready ring (REQ_0060), so dispatch is
1962 // allocation-free in steady state.
1963 let outcome = graph.run_once_borrowed(self.pool);
1964 if let Some(source) = outcome.error {
1965 #[allow(clippy::unwrap_used)]
1966 // fail-fast: poison unreachable — the lock is held only over an infallible Option insert/take, and any holder panic aborts the process before another thread observes it (ADR_0065)
1967 let mut g = self.iter_err.lock().unwrap();
1968 if g.is_none() {
1969 *g = Some(ExecutorError::Item {
1970 task_id: task.id.clone(),
1971 source,
1972 });
1973 }
1974 }
1975 let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
1976 }
1977 }
1978 }
1979}
1980
1981/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
1982/// `Pool::submit`. The send is safe because:
1983/// 1. The executor guarantees at most one invocation of a given item at a
1984/// time (via `pool.barrier()` before the pointer is reused).
1985/// 2. `ExecutableItem: Send`, so moving the pointee across threads is sound
1986/// when no aliasing exists.
1987#[allow(unsafe_code)]
1988struct SendItemPtr {
1989 ptr: *mut dyn ExecutableItem,
1990}
1991
1992impl SendItemPtr {
1993 fn new(ptr: *mut dyn ExecutableItem) -> Self {
1994 Self { ptr }
1995 }
1996
1997 /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
1998 /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
1999 /// dispatch closure to be reusable across iterations without allocation).
2000 fn get(&self) -> *mut dyn ExecutableItem {
2001 self.ptr
2002 }
2003}
2004
2005// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
2006// closure can borrow `&SendItemPtr` per invocation without making the
2007// closure itself `!Send`.
2008#[allow(unsafe_code)]
2009unsafe impl Send for SendItemPtr {}
2010#[allow(unsafe_code)]
2011unsafe impl Sync for SendItemPtr {}
2012
2013/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
2014/// closure can iterate the chain's items in place without first
2015/// collecting them into a freshly-allocated `Vec`. The send is safe
2016/// for the same reason as [`SendItemPtr`] (see above): the executor
2017/// holds `&mut self` for the duration of `dispatch_loop`, and the
2018/// `pool.barrier()` at the end of each callback ensures the closure
2019/// has finished using this pointer before the Vec could be touched
2020/// from the `WaitSet` thread again. The Vec is never resized after
2021/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
2022/// allocate per iteration.
2023#[allow(unsafe_code)]
2024struct SendChainPtr {
2025 ptr: *mut Vec<Box<dyn ExecutableItem>>,
2026}
2027
2028impl SendChainPtr {
2029 fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
2030 Self { ptr }
2031 }
2032
2033 fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
2034 self.ptr
2035 }
2036}
2037
2038// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
2039// borrow `&SendChainPtr` per invocation while staying `Send`.
2040#[allow(unsafe_code)]
2041unsafe impl Send for SendChainPtr {}
2042#[allow(unsafe_code)]
2043unsafe impl Sync for SendChainPtr {}
2044
2045/// Captured state needed by a dispatch closure to perform post-execute
2046/// fault detection. All fields are `Arc`-shared with the owning
2047/// `Executor` and `TaskEntry` so the closure can read/write them
2048/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
2049/// `REQ_0102`.
2050struct FaultDispatchCtx {
2051 /// Per-task budget. `None` for chain / graph tasks (no per-task
2052 /// check) — the executor-wide iteration budget still applies.
2053 task_budget: Option<Duration>,
2054 /// Per-task fault state (shared with `TaskEntry::fault`).
2055 task_fault: Arc<FaultAtomic>,
2056 /// Per-task monotonic overrun counter (shared with
2057 /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
2058 overrun_count: Arc<AtomicU64>,
2059 /// Executor-wide iteration budget. `None` means no executor-wide
2060 /// check.
2061 iteration_budget: Option<Duration>,
2062 /// Executor-wide fault state (shared with `Executor::exec_fault`).
2063 exec_fault: Arc<ExecutorFaultAtomic>,
2064 /// Executor-wide offending-task index storage (shared with
2065 /// `Executor::exec_fault_task_idx`).
2066 exec_fault_task_idx: Arc<AtomicU32>,
2067 /// Executor-wide breached-budget storage (shared with
2068 /// `Executor::exec_fault_budget_ms`).
2069 exec_fault_budget_ms: Arc<AtomicU32>,
2070 /// Index of this task in the executor's task table.
2071 task_idx_u32: u32,
2072 /// Executor start time (shared with `Executor::start_time`).
2073 exec_start: Arc<OnceLock<Instant>>,
2074 /// Observer for `on_task_fault` / `on_executor_fault` notifications.
2075 observer: Arc<dyn Observer>,
2076}
2077
2078/// Validate a task's collected trigger declarations before it joins the task
2079/// table (`REQ_0268`). Applied at every add path — single, chain head, and
2080/// fault-handler main — at the point the `TriggerDecl`s are first available,
2081/// regardless of [`DispatchMode`] (the rejected shapes are ill-defined in any
2082/// mode; Legacy is temporary).
2083///
2084/// Rejects two shapes:
2085///
2086/// 1. **Cyclic AND event-driven** — a task carrying both an `Interval` decl and
2087/// any listener-backed decl (`Subscriber` / `Deadline` / `RawListener`). Per
2088/// `REQ_0106` a task is cyclic XOR event-driven: cyclic tasks have a
2089/// period/lateness, event-driven tasks do not. Allowing both would dispatch
2090/// and record the task twice in one wake (phase-a event + phase-b grid),
2091/// desyncing the `FEAT_0038` `cycle_index` join key (`REQ_0107`).
2092/// 2. **Zero-period interval** — an `Interval(Duration::ZERO)` busy-spins the
2093/// grid (`GridTimer::next_timeout` returns `0` every wake and `take_due`
2094/// re-fires without advancing). A zero scan period is nonsensical.
2095fn validate_decls(id: &TaskId, decls: &[crate::trigger::TriggerDecl]) -> Result<(), ExecutorError> {
2096 use crate::trigger::TriggerDecl;
2097
2098 let has_interval = decls.iter().any(|d| matches!(d, TriggerDecl::Interval(_)));
2099 let has_listener = decls.iter().any(|d| {
2100 matches!(
2101 d,
2102 TriggerDecl::Subscriber { .. }
2103 | TriggerDecl::Deadline { .. }
2104 | TriggerDecl::RawListener(_)
2105 )
2106 });
2107
2108 if has_interval && has_listener {
2109 return Err(ExecutorError::DeclareTriggers(format!(
2110 "task `{id}` declares both an interval (cyclic) and a listener \
2111 (event-driven) trigger; a task may be cyclic (interval) or \
2112 event-driven (listener) but not both — split it into two tasks"
2113 )));
2114 }
2115
2116 if decls
2117 .iter()
2118 .any(|d| matches!(d, TriggerDecl::Interval(dur) if dur.is_zero()))
2119 {
2120 return Err(ExecutorError::DeclareTriggers(format!(
2121 "task `{id}` declares a zero-duration interval; a cyclic scan \
2122 period must be strictly positive"
2123 )));
2124 }
2125
2126 Ok(())
2127}
2128
2129/// Extract the declared scan period (first `Interval` trigger) from a task's
2130/// trigger declarations, or `None` for event-driven tasks.
2131fn scan_period_from_decls(decls: &[crate::trigger::TriggerDecl]) -> Option<Duration> {
2132 decls.iter().find_map(|d| match d {
2133 crate::trigger::TriggerDecl::Interval(dur) => Some(*dur),
2134 _ => None,
2135 })
2136}
2137
2138/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
2139///
2140/// The returned closure is stored on `TaskEntry::job` and invoked once
2141/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
2142/// performs no allocation. The closure captures Arc clones of the
2143/// executor's shared state — those clones are refcount-only at build
2144/// time and are reused on every dispatch. Required for `REQ_0060`.
2145#[allow(clippy::too_many_arguments)]
2146fn build_single_job(
2147 id: TaskId,
2148 stop: Stoppable,
2149 obs: Arc<dyn Observer>,
2150 mon: Arc<dyn ExecutionMonitor>,
2151 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2152 app_id: Option<u32>,
2153 app_inst: Option<u32>,
2154 item_ptr: SendItemPtr,
2155 fault_ctx: FaultDispatchCtx,
2156 last_took_ns: Arc<AtomicU64>,
2157 clock: Arc<dyn MonotonicClock>,
2158) -> Box<dyn FnMut() + Send + 'static> {
2159 Box::new(move || {
2160 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2161 if let Some(aid) = app_id {
2162 obs.on_app_start(id.clone(), aid, app_inst);
2163 }
2164 let raw = item_ptr.get();
2165 let started = std::time::Instant::now();
2166 // Telemetry `took` is measured on the injected clock (REQ_0105) so a
2167 // MockClock can make it exact; the real `started`/`took` below stay on
2168 // the system clock for the monitor and fault-budget paths.
2169 let tele_t0 = clock.now_nanos();
2170 mon.pre_execute(id.clone(), started);
2171 // SAFETY: barrier() pairs with this invocation; the WaitSet
2172 // thread does not touch the item between `submit_borrowed` and
2173 // the matching `barrier()`. See SendItemPtr safety doc.
2174 #[allow(unsafe_code)]
2175 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2176 let took = started.elapsed();
2177 // Release pairs with the WaitSet-thread Acquire (swap) in
2178 // `record_cycle_for` (M2). `pool.barrier()` also fences, but the
2179 // explicit pairing documents intent and is robust on weak-memory archs.
2180 last_took_ns.store(clock.now_nanos().saturating_sub(tele_t0), Ordering::Release);
2181 mon.post_execute(id.clone(), started, took, res.is_ok());
2182 if let Err(ref e) = res {
2183 obs.on_app_error(id.clone(), e.as_ref());
2184 }
2185 if app_id.is_some() {
2186 obs.on_app_stop(id.clone());
2187 }
2188 post_execute_detect_fault(&id, started, took, &fault_ctx);
2189 record_first_err(&err_slot, &id, res);
2190 })
2191}
2192
2193/// Build the per-iteration dispatch closure for a fault-handler item.
2194///
2195/// Mirrors [`build_single_job`] in every detail (same monitor /
2196/// observer / first-error capture wiring) but owns the
2197/// `Box<dyn ExecutableItem>` directly inside the closure instead of
2198/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
2199/// owner inside [`TaskEntry`] — the handler closure stored in
2200/// `handler_job` is the sole owner — so the simpler owning form is
2201/// both sound and avoids the aliasing dance the main item needs.
2202/// (Unlike [`build_single_job`], this closure does NOT update
2203/// `last_took_ns` — the handler runs in place of the main item, so the
2204/// main item's `last_took_ns` keeps its sentinel `u64::MAX` = "no
2205/// sample this cycle".)
2206/// `REQ_0072`.
2207#[allow(clippy::too_many_arguments)]
2208fn build_handler_job(
2209 id: TaskId,
2210 stop: Stoppable,
2211 obs: Arc<dyn Observer>,
2212 mon: Arc<dyn ExecutionMonitor>,
2213 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2214 app_id: Option<u32>,
2215 app_inst: Option<u32>,
2216 mut handler: Box<dyn ExecutableItem>,
2217 fault_ctx: FaultDispatchCtx,
2218) -> Box<dyn FnMut() + Send + 'static> {
2219 Box::new(move || {
2220 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2221 if let Some(aid) = app_id {
2222 obs.on_app_start(id.clone(), aid, app_inst);
2223 }
2224 let started = std::time::Instant::now();
2225 mon.pre_execute(id.clone(), started);
2226 let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
2227 let took = started.elapsed();
2228 mon.post_execute(id.clone(), started, took, res.is_ok());
2229 if let Err(ref e) = res {
2230 obs.on_app_error(id.clone(), e.as_ref());
2231 }
2232 if app_id.is_some() {
2233 obs.on_app_stop(id.clone());
2234 }
2235 // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
2236 // budget keeps the task in `Faulted` (state already `Faulted`),
2237 // `overrun_count` increments, NO new `on_task_fault` fires —
2238 // the `matches!(prev, FaultState::Running)` gate inside
2239 // `post_execute_detect_fault` enforces that.
2240 post_execute_detect_fault(&id, started, took, &fault_ctx);
2241 record_first_err(&err_slot, &id, res);
2242 })
2243}
2244
2245/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
2246#[allow(clippy::too_many_arguments)]
2247fn build_chain_job(
2248 id: TaskId,
2249 stop: Stoppable,
2250 obs: Arc<dyn Observer>,
2251 mon: Arc<dyn ExecutionMonitor>,
2252 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
2253 chain_ptr: SendChainPtr,
2254 fault_ctx: FaultDispatchCtx,
2255 last_took_ns: Arc<AtomicU64>,
2256 clock: Arc<dyn MonotonicClock>,
2257) -> Box<dyn FnMut() + Send + 'static> {
2258 Box::new(move || {
2259 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
2260 // Overall chain scan timer — the chain's `took` is the elapsed
2261 // telemetry-clock time from the first item's pre-execute to the last
2262 // item's completion (or early break), mirroring the single-item `took`
2263 // notion (REQ_0105). Per-item monitor timing uses each item's own
2264 // real-clock `started` below.
2265 let chain_tele_t0 = clock.now_nanos();
2266 // SAFETY: barrier() pairs with this invocation; the chain Vec
2267 // and the items it owns are not touched by the WaitSet thread
2268 // until barrier() returns. See SendChainPtr safety doc.
2269 #[allow(unsafe_code)]
2270 let chain_items = unsafe { &mut *chain_ptr.get() };
2271 for item_box in chain_items.iter_mut() {
2272 let app_id = item_box.app_id();
2273 let app_inst = item_box.app_instance_id();
2274 if let Some(aid) = app_id {
2275 obs.on_app_start(id.clone(), aid, app_inst);
2276 }
2277 let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
2278 let started = std::time::Instant::now();
2279 mon.pre_execute(id.clone(), started);
2280 #[allow(unsafe_code)]
2281 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
2282 let took = started.elapsed();
2283 mon.post_execute(id.clone(), started, took, res.is_ok());
2284 if let Err(ref e) = res {
2285 obs.on_app_error(id.clone(), e.as_ref());
2286 }
2287 if app_id.is_some() {
2288 obs.on_app_stop(id.clone());
2289 }
2290 // Per-item post-execute fault detection. `task_budget` is
2291 // `None` for chains (see `add_chain_with_id_boxed`), so the
2292 // per-task check no-ops; the executor-wide iteration-budget
2293 // check still fires per item. `REQ_0071`.
2294 post_execute_detect_fault(&id, started, took, &fault_ctx);
2295 match res {
2296 Ok(crate::ControlFlow::Continue) => {}
2297 Ok(crate::ControlFlow::StopChain) => break,
2298 Err(_) => {
2299 record_first_err(&err_slot, &id, res);
2300 break;
2301 }
2302 }
2303 }
2304 // Release pairs with the WaitSet-thread Acquire (swap) in
2305 // `record_cycle_for` (M2). See the Single-job store for the rationale.
2306 last_took_ns.store(
2307 clock.now_nanos().saturating_sub(chain_tele_t0),
2308 Ordering::Release,
2309 );
2310 })
2311}
2312
2313#[derive(Debug)]
2314struct PanickedTask(String);
2315
2316impl core::fmt::Display for PanickedTask {
2317 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2318 write!(f, "task panicked: {}", self.0)
2319 }
2320}
2321
2322impl std::error::Error for PanickedTask {}
2323
2324/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
2325fn run_item_catch_unwind(
2326 item: &mut dyn ExecutableItem,
2327 ctx: &mut crate::context::Context<'_>,
2328) -> crate::ExecuteResult {
2329 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
2330 |payload| {
2331 let msg =
2332 panic_payload_message(&*payload).unwrap_or_else(|| "panicked task".to_string());
2333 Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
2334 },
2335 )
2336}
2337
2338/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
2339/// without depending on its private name.
2340pub(crate) fn run_item_catch_unwind_external(
2341 item: &mut dyn ExecutableItem,
2342 ctx: &mut crate::context::Context<'_>,
2343) -> crate::ExecuteResult {
2344 run_item_catch_unwind(item, ctx)
2345}
2346
2347/// Record the first error into `slot`. Subsequent errors are silently dropped.
2348fn record_first_err(
2349 slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
2350 id: &TaskId,
2351 res: crate::ExecuteResult,
2352) {
2353 if let Err(source) = res {
2354 let mut g = slot.lock().unwrap();
2355 if g.is_none() {
2356 *g = Some(ExecutorError::Item {
2357 task_id: id.clone(),
2358 source,
2359 });
2360 }
2361 }
2362}
2363
2364/// Post-execute fault detection — runs on a pool worker AFTER
2365/// `mon.post_execute` so the full `took` is available. Implements:
2366///
2367/// * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
2368/// `overrun_count` on every breach, transitions
2369/// `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
2370/// breaches keep the state `Faulted` and do NOT re-fire the
2371/// observer).
2372/// * `REQ_0071` — executor-wide iteration overrun: transitions
2373/// `Running -> Faulted{IterationBudgetExceeded}` exactly once;
2374/// cascade to per-task state is LAZY (see the pre-dispatch block
2375/// in `dispatch_loop`), so the per-task `on_task_fault` does NOT
2376/// fire during cascade — only `on_executor_fault` does.
2377fn post_execute_detect_fault(
2378 id: &TaskId,
2379 started: Instant,
2380 took: Duration,
2381 fault_ctx: &FaultDispatchCtx,
2382) {
2383 // REQ_0070 / REQ_0102 — per-task budget overrun.
2384 if let Some(budget) = fault_ctx.task_budget {
2385 if took > budget {
2386 fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
2387 let took_ms = duration_to_ms_sat(took);
2388 let budget_ms = duration_to_ms_sat(budget);
2389 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2390 let since_ms = instant_to_since_ms(started, exec_start);
2391 let new_state = FaultState::Faulted {
2392 reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
2393 since_ms,
2394 };
2395 let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
2396 if matches!(prev, FaultState::Running) {
2397 fault_ctx.observer.on_task_fault(
2398 id.clone(),
2399 FaultReason::BudgetExceeded { took_ms, budget_ms },
2400 );
2401 }
2402 }
2403 }
2404
2405 // REQ_0071 — executor-wide iteration overrun.
2406 if let Some(iter_budget) = fault_ctx.iteration_budget {
2407 if took > iter_budget {
2408 let took_ms = duration_to_ms_sat(took);
2409 let budget_ms = duration_to_ms_sat(iter_budget);
2410 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
2411 let since_ms = instant_to_since_ms(started, exec_start);
2412 fault_ctx
2413 .exec_fault_task_idx
2414 .store(fault_ctx.task_idx_u32, Ordering::Release);
2415 fault_ctx
2416 .exec_fault_budget_ms
2417 .store(budget_ms, Ordering::Release);
2418 let new_state = ExecutorFaultState::Faulted {
2419 reason: ExecutorFaultReason::IterationBudgetExceeded {
2420 task_idx: fault_ctx.task_idx_u32,
2421 took_ms,
2422 budget_ms,
2423 },
2424 since_ms,
2425 };
2426 let prev = fault_ctx
2427 .exec_fault
2428 .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
2429 if matches!(prev, ExecutorFaultState::Running) {
2430 fault_ctx.observer.on_executor_fault(
2431 ExecutorFaultReason::IterationBudgetExceeded {
2432 task_idx: fault_ctx.task_idx_u32,
2433 took_ms,
2434 budget_ms,
2435 },
2436 );
2437 // NO eager cascade here. Cascade is lazy: the
2438 // pre-dispatch block in `dispatch_loop` transitions
2439 // each `Running` task to `Faulted{ExecutorFaulted}` on
2440 // the next wakeup — silently, so per-task observers
2441 // do not fire (see §4.6 invariant on cascade-noise).
2442 }
2443 }
2444 }
2445}
2446
2447// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
2448
2449/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
2450/// into a registered task.
2451pub struct ExecutorGraphBuilder<'e> {
2452 executor: &'e mut Executor,
2453 builder: crate::graph::GraphBuilder,
2454 custom_id: Option<TaskId>,
2455}
2456
2457impl ExecutorGraphBuilder<'_> {
2458 /// Add a vertex to the graph; returns its handle.
2459 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
2460 self.builder.vertex(item)
2461 }
2462
2463 /// Add a directed edge from one vertex to another.
2464 pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
2465 self.builder.edge(from, to);
2466 self
2467 }
2468
2469 /// Designate the root vertex (its triggers gate the graph).
2470 pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
2471 self.builder.root(v);
2472 self
2473 }
2474
2475 /// Override the auto-generated id with a custom one.
2476 pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
2477 self.custom_id = Some(id.into());
2478 self
2479 }
2480
2481 /// Validate and register the graph. Returns the task id.
2482 ///
2483 /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
2484 /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
2485 /// precedence over the auto-generated id.
2486 pub fn build(self) -> Result<TaskId, ExecutorError> {
2487 let g = self.builder.finish()?;
2488 // Root vertex's task_id() override wins over the custom id, which wins
2489 // over the auto-generated fallback.
2490 let auto_id = || {
2491 TaskId::new(format!(
2492 "graph-{}",
2493 self.executor.next_id.fetch_add(1, Ordering::SeqCst)
2494 ))
2495 };
2496 let id = g
2497 .root_task_id()
2498 .map(TaskId::new)
2499 .or(self.custom_id)
2500 .unwrap_or_else(auto_id);
2501 let decls = g.decls.clone();
2502 // The graph root's decls become a grid-registered TaskEntry, so the same
2503 // cyclic-XOR-event-driven / non-zero-period validation that guards the
2504 // single-item, fault-handler, and chain add paths must guard this one too
2505 // (REQ_0268). Non-root vertex triggers never reach a TaskEntry — they are
2506 // discarded in `GraphBuilder::collect_root_decls` — so validating the root
2507 // decls is sufficient.
2508 validate_decls(&id, &decls)?;
2509 let scan_period = scan_period_from_decls(&decls);
2510
2511 // Box the graph for address stability — per-vertex dispatch
2512 // closures capture `*const Graph` and must not see it move.
2513 let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
2514 // Pre-build the per-vertex closures now that we know the
2515 // task_id and have access to the executor's shared state.
2516 graph_box.prepare_dispatch(
2517 id.clone(),
2518 self.executor.stoppable.clone(),
2519 Arc::clone(&self.executor.observer),
2520 Arc::clone(&self.executor.monitor),
2521 Arc::clone(&self.executor.iter_err),
2522 );
2523
2524 self.executor.tasks.push(TaskEntry {
2525 id: id.clone(),
2526 kind: TaskKind::Graph(graph_box),
2527 decls,
2528 // Graph tasks dispatch their vertices via `vertex_jobs`
2529 // stored inside the `Graph`; the per-task `job` slot
2530 // is unused for graphs.
2531 job: None,
2532 // TODO(post-Task-10): graph budgets carried separately; for now None.
2533 budget: None,
2534 fault: Arc::new(FaultAtomic::new()),
2535 overrun_count: Arc::new(AtomicU64::new(0)),
2536 handler_job: None,
2537 scan_period,
2538 // Graphs dispatch vertices via their own path and do not ferry a
2539 // per-task `took`; sentinel = "no sample". Wired for struct
2540 // completeness; nothing reads it yet (Task 6).
2541 last_took_ns: Arc::new(AtomicU64::new(u64::MAX)),
2542 last_dispatch: None,
2543 grid_slot: 0,
2544 pending_cycle: None,
2545 });
2546 self.executor
2547 .cycle_stats
2548 .push(TaskCycleStats::new(self.executor.stats_window));
2549 Ok(id)
2550 }
2551}
2552
2553// ── Unit tests ────────────────────────────────────────────────────────────────
2554
2555#[cfg(test)]
2556mod tests {
2557 use super::*;
2558 use crate::{ControlFlow, item};
2559 use iceoryx2::prelude::ZeroCopySend;
2560
2561 /// Minimal zero-copy payload for tests that need a real subscriber to
2562 /// produce a listener-backed trigger decl.
2563 #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
2564 #[repr(C)]
2565 struct Msg(u32);
2566
2567 #[test]
2568 fn add_returns_unique_ids() {
2569 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2570 let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2571 let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2572 assert_ne!(a, b);
2573 }
2574
2575 #[test]
2576 fn grid_mode_dispatches_cyclic_task_each_cycle() {
2577 use std::sync::Arc;
2578 use std::sync::atomic::{AtomicU64, Ordering};
2579 let hits = Arc::new(AtomicU64::new(0));
2580 let h = Arc::clone(&hits);
2581 let mut exec = Executor::builder()
2582 .worker_threads(0)
2583 .dispatch_mode(crate::DispatchMode::Grid)
2584 .build()
2585 .expect("build");
2586 exec.add(crate::item::item_with_triggers(
2587 move |d| {
2588 d.interval(std::time::Duration::from_millis(1));
2589 Ok(())
2590 },
2591 move |_ctx| {
2592 h.fetch_add(1, Ordering::Relaxed);
2593 Ok(ControlFlow::Continue)
2594 },
2595 ))
2596 .expect("add");
2597 exec.run_n(10).expect("run");
2598 assert!(
2599 hits.load(Ordering::Relaxed) >= 8,
2600 "grid mode under-dispatched: {}",
2601 hits.load(Ordering::Relaxed)
2602 );
2603 }
2604
2605 #[test]
2606 fn legacy_mode_dispatches_cyclic_task_each_cycle() {
2607 use std::sync::Arc;
2608 use std::sync::atomic::{AtomicU64, Ordering};
2609 let hits = Arc::new(AtomicU64::new(0));
2610 let h = Arc::clone(&hits);
2611 let mut exec = Executor::builder()
2612 .worker_threads(0)
2613 .dispatch_mode(crate::DispatchMode::Legacy)
2614 .build()
2615 .expect("build");
2616 exec.add(crate::item::item_with_triggers(
2617 move |d| {
2618 d.interval(std::time::Duration::from_millis(1));
2619 Ok(())
2620 },
2621 move |_ctx| {
2622 h.fetch_add(1, Ordering::Relaxed);
2623 Ok(ControlFlow::Continue)
2624 },
2625 ))
2626 .expect("add");
2627 exec.run_n(10).expect("run");
2628 assert!(
2629 hits.load(Ordering::Relaxed) >= 8,
2630 "legacy mode under-dispatched: {}",
2631 hits.load(Ordering::Relaxed)
2632 );
2633 }
2634
2635 // --- REQ_0268 trigger-combination validation (Fix 1 / Fix 3) ---
2636
2637 #[test]
2638 fn add_rejects_cyclic_plus_subscriber_combination() {
2639 use core::time::Duration;
2640 // A task declaring BOTH an Interval and a listener-backed trigger is
2641 // ill-defined (cyclic XOR event-driven, REQ_0106) and must be rejected
2642 // at add time. We use a real subscriber so the listener decl is genuine.
2643 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2644 let ch = exec.channel::<Msg>("taktora.test.req0268.combo").unwrap();
2645 let sub = ch.subscriber().unwrap();
2646 let err = exec
2647 .add(crate::item::item_with_triggers(
2648 move |d| {
2649 d.interval(Duration::from_millis(1));
2650 d.subscriber(&sub);
2651 Ok(())
2652 },
2653 |_| Ok(crate::ControlFlow::Continue),
2654 ))
2655 .expect_err("interval + subscriber must be rejected");
2656 match err {
2657 ExecutorError::DeclareTriggers(msg) => {
2658 assert!(
2659 msg.contains("cyclic") && msg.contains("event-driven"),
2660 "message must explain cyclic vs event-driven: {msg}"
2661 );
2662 assert!(
2663 msg.contains("split"),
2664 "message must suggest splitting into two tasks: {msg}"
2665 );
2666 }
2667 other => panic!("expected DeclareTriggers, got {other:?}"),
2668 }
2669 }
2670
2671 #[test]
2672 fn add_rejects_cyclic_plus_listener_regardless_of_mode() {
2673 use core::time::Duration;
2674 // The combination is ill-defined irrespective of DispatchMode (Legacy
2675 // is temporary), so Legacy must reject it too.
2676 let mut exec = Executor::builder()
2677 .worker_threads(0)
2678 .dispatch_mode(crate::DispatchMode::Legacy)
2679 .build()
2680 .unwrap();
2681 let ch = exec
2682 .channel::<Msg>("taktora.test.req0268.combo.legacy")
2683 .unwrap();
2684 let sub = ch.subscriber().unwrap();
2685 let err = exec
2686 .add(crate::item::item_with_triggers(
2687 move |d| {
2688 d.interval(Duration::from_millis(1));
2689 d.subscriber(&sub);
2690 Ok(())
2691 },
2692 |_| Ok(crate::ControlFlow::Continue),
2693 ))
2694 .expect_err("interval + subscriber must be rejected in Legacy too");
2695 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2696 }
2697
2698 #[test]
2699 fn add_accepts_multiple_intervals_and_single_kinds() {
2700 use core::time::Duration;
2701 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2702 // Multiple Interval decls: still cyclic-only, accepted.
2703 exec.add(crate::item::item_with_triggers(
2704 |d| {
2705 d.interval(Duration::from_millis(1));
2706 d.interval(Duration::from_millis(2));
2707 Ok(())
2708 },
2709 |_| Ok(crate::ControlFlow::Continue),
2710 ))
2711 .expect("multiple intervals accepted");
2712 // Single interval: accepted.
2713 exec.add(crate::item::item_with_triggers(
2714 |d| {
2715 d.interval(Duration::from_millis(1));
2716 Ok(())
2717 },
2718 |_| Ok(crate::ControlFlow::Continue),
2719 ))
2720 .expect("single interval accepted");
2721 // Multiple listeners (no interval): accepted.
2722 let ch = exec
2723 .channel::<Msg>("taktora.test.req0268.multi.listener")
2724 .unwrap();
2725 let sub_a = ch.subscriber().unwrap();
2726 let sub_b = ch.subscriber().unwrap();
2727 exec.add(crate::item::item_with_triggers(
2728 move |d| {
2729 d.subscriber(&sub_a);
2730 d.subscriber(&sub_b);
2731 Ok(())
2732 },
2733 |_| Ok(crate::ControlFlow::Continue),
2734 ))
2735 .expect("multiple listeners accepted");
2736 }
2737
2738 #[test]
2739 fn add_rejects_zero_period_interval() {
2740 use core::time::Duration;
2741 // A zero-period interval busy-spins the grid (next_timeout == 0 every
2742 // wake), so it must be rejected at add time.
2743 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2744 let err = exec
2745 .add(crate::item::item_with_triggers(
2746 |d| {
2747 d.interval(Duration::ZERO);
2748 Ok(())
2749 },
2750 |_| Ok(crate::ControlFlow::Continue),
2751 ))
2752 .expect_err("zero-period interval must be rejected");
2753 match err {
2754 ExecutorError::DeclareTriggers(msg) => {
2755 assert!(
2756 msg.contains("zero"),
2757 "message must mention the zero period: {msg}"
2758 );
2759 }
2760 other => panic!("expected DeclareTriggers, got {other:?}"),
2761 }
2762 }
2763
2764 #[test]
2765 fn add_chain_rejects_cyclic_plus_listener() {
2766 use core::time::Duration;
2767 // The chain path collects the head item's decls; the same validation
2768 // must apply there.
2769 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2770 let ch = exec
2771 .channel::<Msg>("taktora.test.req0268.chain.combo")
2772 .unwrap();
2773 let sub = ch.subscriber().unwrap();
2774 let err = exec
2775 .add_chain(vec![crate::item::item_with_triggers(
2776 move |d| {
2777 d.interval(Duration::from_millis(1));
2778 d.subscriber(&sub);
2779 Ok(())
2780 },
2781 |_| Ok(crate::ControlFlow::Continue),
2782 )])
2783 .expect_err("chain head interval + subscriber must be rejected");
2784 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2785 }
2786
2787 #[test]
2788 fn add_chain_rejects_zero_period_interval() {
2789 use core::time::Duration;
2790 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2791 let err = exec
2792 .add_chain(vec![crate::item::item_with_triggers(
2793 |d| {
2794 d.interval(Duration::ZERO);
2795 Ok(())
2796 },
2797 |_| Ok(crate::ControlFlow::Continue),
2798 )])
2799 .expect_err("chain head zero-period interval must be rejected");
2800 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2801 }
2802
2803 #[test]
2804 fn add_graph_rejects_cyclic_plus_listener() {
2805 use core::time::Duration;
2806 // The graph path collects the root vertex's decls into a grid-registered
2807 // TaskEntry; the same cyclic-XOR-event-driven validation must apply there
2808 // (REQ_0268). We use a real subscriber so the listener decl is genuine.
2809 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2810 let ch = exec
2811 .channel::<Msg>("taktora.test.req0268.graph.combo")
2812 .unwrap();
2813 let sub = ch.subscriber().unwrap();
2814 let mut g = exec.add_graph();
2815 let r = g.vertex(crate::item::item_with_triggers(
2816 move |d| {
2817 d.interval(Duration::from_millis(1));
2818 d.subscriber(&sub);
2819 Ok(())
2820 },
2821 |_| Ok(crate::ControlFlow::Continue),
2822 ));
2823 g.root(r);
2824 let err = g
2825 .build()
2826 .expect_err("graph root interval + subscriber must be rejected");
2827 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2828 }
2829
2830 #[test]
2831 fn add_graph_rejects_zero_period_interval() {
2832 use core::time::Duration;
2833 // A zero-period interval on the graph root busy-spins the grid, so the
2834 // graph path must reject it just like the single-item/chain paths.
2835 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2836 let mut g = exec.add_graph();
2837 let r = g.vertex(crate::item::item_with_triggers(
2838 |d| {
2839 d.interval(Duration::ZERO);
2840 Ok(())
2841 },
2842 |_| Ok(crate::ControlFlow::Continue),
2843 ));
2844 g.root(r);
2845 let err = g
2846 .build()
2847 .expect_err("graph root zero-period interval must be rejected");
2848 assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
2849 }
2850
2851 #[test]
2852 fn stopped_iteration_emits_no_cyclic_cycle_observation() {
2853 use core::time::Duration;
2854 use std::sync::atomic::AtomicU64;
2855
2856 // A CyclicClock that starts at 0 (epoch) then jumps far past the first
2857 // grid target, so the post-wait `take_due` finds the cyclic task due on
2858 // the very first (stopping) wake. Distinct from the telemetry clock
2859 // (scheduling role).
2860 struct JumpClock {
2861 calls: AtomicU64,
2862 }
2863 impl crate::CyclicClock for JumpClock {
2864 fn now_nanos(&self) -> u64 {
2865 // First read (grid epoch at loop entry) = 0; every later read
2866 // is well past the 1ms target.
2867 if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
2868 0
2869 } else {
2870 1_000_000_000
2871 }
2872 }
2873 }
2874
2875 // Observer that counts on_cycle_stats calls.
2876 struct Counter {
2877 cycles: AtomicU64,
2878 }
2879 impl Observer for Counter {
2880 fn on_cycle_stats(&self, _obs: &CycleObservation) {
2881 self.cycles.fetch_add(1, Ordering::SeqCst);
2882 }
2883 }
2884
2885 let counter = Arc::new(Counter {
2886 cycles: AtomicU64::new(0),
2887 });
2888 let mut exec = Executor::builder()
2889 .worker_threads(0)
2890 .dispatch_mode(crate::DispatchMode::Grid)
2891 .cyclic_clock(Arc::new(JumpClock {
2892 calls: AtomicU64::new(0),
2893 }))
2894 .observer(Arc::clone(&counter) as Arc<dyn Observer>)
2895 .build()
2896 .unwrap();
2897 exec.add(crate::item::item_with_triggers(
2898 |d| {
2899 d.interval(Duration::from_millis(1));
2900 Ok(())
2901 },
2902 |_| Ok(crate::ControlFlow::Continue),
2903 ))
2904 .unwrap();
2905
2906 // Stop BEFORE running: the WaitSet wakes immediately on the stop
2907 // listener; the grid target is already due (JumpClock). Without the
2908 // stop guard the post-wait cyclic pass would dispatch + record one
2909 // spurious cycle on this stopping iteration; with it, zero.
2910 exec.stoppable().stop();
2911 exec.run().expect("run returns cleanly after stop");
2912
2913 assert_eq!(
2914 counter.cycles.load(Ordering::SeqCst),
2915 0,
2916 "no cyclic cycle observation may be emitted on a stop wake"
2917 );
2918 }
2919
2920 #[test]
2921 fn custom_id_is_preserved() {
2922 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2923 let id = exec
2924 .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
2925 .unwrap();
2926 assert_eq!(id.as_str(), "my-task");
2927 }
2928
2929 #[test]
2930 fn add_persists_declared_budget() {
2931 use core::time::Duration;
2932 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2933 let task_id = exec
2934 .add(crate::item::item_with_triggers(
2935 |d| {
2936 d.interval(Duration::from_millis(10));
2937 d.budget(Duration::from_millis(5));
2938 Ok(())
2939 },
2940 |_| Ok(crate::ControlFlow::Continue),
2941 ))
2942 .unwrap();
2943 let entry = exec
2944 .tasks
2945 .iter()
2946 .find(|t| t.id == task_id)
2947 .expect("task present");
2948 assert_eq!(entry.budget, Some(Duration::from_millis(5)));
2949 }
2950
2951 #[test]
2952 fn scan_period_cached_for_cyclic_only() {
2953 use core::time::Duration;
2954 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
2955 let cyclic = exec
2956 .add(crate::item::item_with_triggers(
2957 |d| {
2958 d.interval(Duration::from_millis(5));
2959 Ok(())
2960 },
2961 |_| Ok(crate::ControlFlow::Continue),
2962 ))
2963 .unwrap();
2964 let event_driven = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
2965
2966 let cyclic_entry = exec
2967 .tasks
2968 .iter()
2969 .find(|t| t.id == cyclic)
2970 .expect("cyclic task present");
2971 assert_eq!(cyclic_entry.scan_period, Some(Duration::from_millis(5)));
2972 // Sentinel: no sample has been taken yet.
2973 assert_eq!(cyclic_entry.last_took_ns.load(Ordering::Relaxed), u64::MAX);
2974
2975 let event_entry = exec
2976 .tasks
2977 .iter()
2978 .find(|t| t.id == event_driven)
2979 .expect("event-driven task present");
2980 assert_eq!(event_entry.scan_period, None);
2981 }
2982
2983 #[test]
2984 fn cycle_stats_index_aligned_with_tasks() {
2985 use core::time::Duration;
2986 let mut exec = Executor::builder()
2987 .worker_threads(0)
2988 .stats_window(512)
2989 .build()
2990 .unwrap();
2991 // Builder option flows through to the executor.
2992 assert_eq!(exec.stats_window, 512);
2993 // No tasks yet → both Vecs empty and aligned.
2994 assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
2995
2996 // Cyclic single-item add path.
2997 exec.add(crate::item::item_with_triggers(
2998 |d| {
2999 d.interval(Duration::from_millis(5));
3000 Ok(())
3001 },
3002 |_| Ok(crate::ControlFlow::Continue),
3003 ))
3004 .unwrap();
3005 // Event-driven single-item add path.
3006 exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
3007
3008 assert_eq!(exec.tasks.len(), 2);
3009 assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
3010 }
3011
3012 #[test]
3013 fn add_with_fault_handler_stores_handler_job() {
3014 use core::time::Duration;
3015 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3016 let task_id = exec
3017 .add_with_fault_handler(
3018 crate::item::item_with_triggers(
3019 |d| {
3020 d.interval(Duration::from_millis(10));
3021 d.budget(Duration::from_millis(5));
3022 Ok(())
3023 },
3024 |_| Ok(crate::ControlFlow::Continue),
3025 ),
3026 crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
3027 )
3028 .unwrap();
3029 let entry = exec
3030 .tasks
3031 .iter()
3032 .find(|t| t.id == task_id)
3033 .expect("task present");
3034 assert!(
3035 entry.handler_job.is_some(),
3036 "handler_job should be Some after add_with_fault_handler"
3037 );
3038 // Main job should still be present.
3039 assert!(entry.job.is_some(), "main job should still be present");
3040 }
3041
3042 #[test]
3043 fn declare_triggers_called_at_add_time() {
3044 let called = Arc::new(AtomicBool::new(false));
3045 let called_d = Arc::clone(&called);
3046
3047 let it = crate::item::item_with_triggers(
3048 move |_d| {
3049 called_d.store(true, Ordering::SeqCst);
3050 Ok(())
3051 },
3052 |_| Ok(ControlFlow::Continue),
3053 );
3054
3055 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3056 exec.add(it).unwrap();
3057 assert!(called.load(Ordering::SeqCst));
3058 }
3059
3060 #[test]
3061 fn clear_task_fault_errors_on_running_task() {
3062 use core::time::Duration;
3063 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3064 let task_id = exec
3065 .add(crate::item::item_with_triggers(
3066 |d| {
3067 d.interval(Duration::from_millis(10));
3068 Ok(())
3069 },
3070 |_| Ok(crate::ControlFlow::Continue),
3071 ))
3072 .unwrap();
3073 // Task starts in Running state — clearing should error.
3074 let err = exec.clear_task_fault(task_id).expect_err("not faulted");
3075 assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
3076 }
3077
3078 #[test]
3079 fn clear_executor_fault_errors_on_running_executor() {
3080 let exec = Executor::builder().worker_threads(0).build().unwrap();
3081 let err = exec.clear_executor_fault().expect_err("not faulted");
3082 assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
3083 }
3084
3085 #[test]
3086 fn overrun_count_returns_zero_for_new_task() {
3087 use core::time::Duration;
3088 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3089 let task_id = exec
3090 .add(crate::item::item_with_triggers(
3091 |d| {
3092 d.interval(Duration::from_millis(10));
3093 d.budget(Duration::from_millis(5));
3094 Ok(())
3095 },
3096 |_| Ok(crate::ControlFlow::Continue),
3097 ))
3098 .unwrap();
3099 assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
3100 }
3101
3102 #[test]
3103 fn overrun_count_errors_for_unknown_task() {
3104 let exec = Executor::builder().worker_threads(0).build().unwrap();
3105 let err = exec
3106 .overrun_count(crate::TaskId::new("nope"))
3107 .expect_err("unknown task");
3108 assert!(matches!(err, ExecutorError::TaskNotFound(_)));
3109 }
3110
3111 #[test]
3112 fn task_fault_state_starts_running() {
3113 use core::time::Duration;
3114 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
3115 let task_id = exec
3116 .add(crate::item::item_with_triggers(
3117 |d| {
3118 d.interval(Duration::from_millis(10));
3119 Ok(())
3120 },
3121 |_| Ok(crate::ControlFlow::Continue),
3122 ))
3123 .unwrap();
3124 assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
3125 }
3126
3127 #[test]
3128 fn executor_fault_state_starts_running() {
3129 let exec = Executor::builder().worker_threads(0).build().unwrap();
3130 assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
3131 }
3132
3133 // --- on_fatal / FatalDispatch integration tests ---
3134
3135 #[test]
3136 fn build_without_on_fatal_succeeds() {
3137 use crate::fatal::{FatalContext, FatalSite};
3138 use std::sync::{Arc, Mutex};
3139 // Default builder (no on_fatal) must build successfully.
3140 let exec = Executor::builder().worker_threads(0).build().unwrap();
3141 // The fatal_dispatch field is present; fire via a test terminal to
3142 // confirm the no-op handler doesn't blow up.
3143 let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3144 let reached2 = Arc::clone(&reached);
3145 let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3146 exec.fatal_dispatch.handler().clone(),
3147 move |_| {
3148 *reached2.lock().unwrap() = true;
3149 },
3150 );
3151 test_dispatch.fire(&FatalContext {
3152 cause: "test".to_string(),
3153 site: FatalSite::PoolWorker,
3154 });
3155 assert!(*reached.lock().unwrap(), "terminal not reached");
3156 }
3157
3158 #[test]
3159 fn on_fatal_handler_is_stored_and_invoked() {
3160 use crate::fatal::{FatalContext, FatalSite};
3161 use std::sync::{Arc, Mutex};
3162 let called: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3163 let called2 = Arc::clone(&called);
3164 let exec = Executor::builder()
3165 .worker_threads(0)
3166 .on_fatal(move |ctx| {
3167 called2.lock().unwrap().push(ctx.cause.clone());
3168 })
3169 .build()
3170 .unwrap();
3171 // Verify the handler fires via a test terminal.
3172 let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3173 let reached2 = Arc::clone(&reached);
3174 let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
3175 exec.fatal_dispatch.handler().clone(),
3176 move |_| {
3177 *reached2.lock().unwrap() = true;
3178 },
3179 );
3180 test_dispatch.fire(&FatalContext {
3181 cause: "my-cause".to_string(),
3182 site: FatalSite::ExecutorRunLoop,
3183 });
3184 assert!(*reached.lock().unwrap(), "terminal not reached");
3185 let log = called.lock().unwrap().clone();
3186 assert_eq!(
3187 log,
3188 vec!["my-cause"],
3189 "handler should have been called with cause"
3190 );
3191 }
3192}