Skip to main content

taktora_executor/
executor.rs

1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::context::Stoppable;
10use crate::error::ExecutorError;
11use crate::fault::{
12    ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
13    FaultState, duration_to_ms_sat, instant_to_since_ms,
14};
15use crate::item::ExecutableItem;
16use crate::monitor::{ExecutionMonitor, NoopMonitor};
17use crate::observer::{NoopObserver, Observer};
18use crate::payload::Payload;
19use crate::pool::Pool;
20use crate::task_id::TaskId;
21use crate::task_kind::TaskKind;
22use crate::thread_attrs::ThreadAttributes;
23use crate::trigger::{TriggerDecl, TriggerDeclarer};
24use core::sync::atomic::AtomicU32;
25use iceoryx2::node::Node;
26use iceoryx2::port::listener::Listener as IxListener;
27use iceoryx2::prelude::ipc;
28use iceoryx2::prelude::*;
29use iceoryx2::waitset::WaitSetRunResult;
30use std::sync::Arc;
31use std::sync::OnceLock;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33use std::time::{Duration, Instant};
34
35/// Monotonically increasing counter so multiple executors in the same process
36/// each get a unique stop-event service name.
37static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
38
39/// One registered task entry.
40pub(crate) struct TaskEntry {
41    /// Task identifier.
42    pub(crate) id: TaskId,
43    /// The kind of work this entry holds (single item or chain).
44    pub(crate) kind: TaskKind,
45    /// Trigger declarations recorded at `add` time.
46    pub(crate) decls: Vec<TriggerDecl>,
47    /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
48    /// time and re-invoked on every dispatch iteration via
49    /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
50    /// that `Pool::submit<F>` requires in threaded mode. Required for
51    /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
52    /// `TaskKind::Graph`, which dispatches its vertices via a separate
53    /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
54    pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
55
56    /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
57    /// means no per-task check; the executor-wide iteration budget
58    /// still applies. `REQ_0070`.
59    pub(crate) budget: Option<Duration>,
60
61    /// Per-task fault state. Wait-free read on the dispatch hot path.
62    /// Wrapped in `Arc` so dispatch closures built at `add` time can
63    /// capture an owning handle into the same atomic the `TaskEntry`
64    /// holds — `Arc::clone` is refcount-only, so this stays compatible
65    /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
66    pub(crate) fault: Arc<FaultAtomic>,
67
68    /// Monotonic per-task overrun counter. Increments on EVERY budget
69    /// breach, including breaches while already `Faulted`. Never reset
70    /// by clearing the fault. Shared with the dispatch closure via
71    /// `Arc::clone`. `REQ_0102`.
72    pub(crate) overrun_count: Arc<AtomicU64>,
73
74    /// Pre-built dispatch closure for the fault-handler item. Mirrors
75    /// `job`. `None` means no handler — the task is simply skipped
76    /// during fault. `REQ_0072`.
77    pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
78}
79
80/// Top-level executor. One per process is the typical case.
81pub struct Executor {
82    pub(crate) node: Node<ipc::Service>,
83    pub(crate) pool: Arc<Pool>,
84    pub(crate) tasks: Vec<TaskEntry>,
85    pub(crate) running: Arc<AtomicBool>,
86    pub(crate) stoppable: Stoppable,
87    pub(crate) next_id: AtomicU64,
88    /// Listener for the internal stop event service. Held here so it outlives
89    /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
90    /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
91    pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
92    /// Lifecycle observer. Defaults to a no-op.
93    pub(crate) observer: Arc<dyn Observer>,
94    /// Execution monitor. Defaults to a no-op.
95    pub(crate) monitor: Arc<dyn ExecutionMonitor>,
96    /// Per-iteration error capture slot — allocated once at build time and
97    /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
98    /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
99    /// the per-iteration heap allocation that the previous design incurred.
100    /// Required for `REQ_0060`.
101    pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
102    /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
103    /// `None` means no executor-wide check.
104    pub(crate) iteration_budget: Option<Duration>,
105    /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
106    /// closure can hold an owning handle without re-borrowing through
107    /// `self`. `REQ_0071`.
108    pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
109
110    /// Index of the task whose `execute()` overran when the executor
111    /// transitioned to `Faulted`. Read alongside `exec_fault`.
112    pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
113
114    /// Budget that was breached when the executor transitioned to
115    /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
116    pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
117
118    /// Executor start time, set on first dispatch. Used to compute
119    /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
120    /// in `Arc` so dispatch closures share the same `OnceLock` with the
121    /// executor — `get_or_init` is idempotent and wait-free.
122    pub(crate) start_time: Arc<OnceLock<Instant>>,
123}
124
125// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
126// `SingleThreaded` reason as `IxNotifier`. After construction, the only
127// per-iteration call is `listener.try_wait_one()`, which does not mutate the
128// Rc. `Executor` is never shared across threads (it requires `&mut self` for
129// `run()`), so there is no aliased concurrent mutation.
130#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
131unsafe impl Send for Executor {}
132
133impl Executor {
134    /// Start a new builder.
135    #[must_use]
136    pub fn builder() -> ExecutorBuilder {
137        ExecutorBuilder::default()
138    }
139
140    /// Open or create a pub/sub channel bound to this executor's node.
141    pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
142        Channel::open_or_create(&self.node, name)
143    }
144
145    /// Open or create a request/response service bound to this executor's node.
146    pub fn service<Req, Resp>(
147        &mut self,
148        name: &str,
149    ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
150    where
151        Req: Payload,
152        Resp: Payload,
153    {
154        crate::Service::open_or_create(&self.node, name)
155    }
156
157    /// Add an item to the executor with an auto-generated id.
158    pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
159        let id = TaskId::new(format!(
160            "task-{}",
161            self.next_id.fetch_add(1, Ordering::SeqCst)
162        ));
163        self.add_with_id(id, item)
164    }
165
166    /// Add an item with a user-supplied id.
167    ///
168    /// The item's [`ExecutableItem::task_id`] override takes precedence over
169    /// the caller-supplied `id`, which itself takes precedence over the
170    /// auto-generated id assigned by [`Executor::add`].
171    pub fn add_with_id(
172        &mut self,
173        id: impl Into<TaskId>,
174        mut item: impl ExecutableItem,
175    ) -> Result<TaskId, ExecutorError> {
176        let id_arg: TaskId = id.into();
177        // The item's `task_id()` override wins over the user-supplied id.
178        let id = item.task_id().map_or(id_arg, TaskId::new);
179        let mut declarer = TriggerDeclarer::new_internal();
180        item.declare_triggers(&mut declarer)?;
181        let budget = declarer.budget;
182        let decls = declarer.into_decls();
183
184        let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
185        let app_id = item_box.app_id();
186        let app_inst = item_box.app_instance_id();
187        // SAFETY: the raw pointer points into the heap allocation of
188        // `item_box`. `Box` keeps that allocation at a stable address even
189        // when the `Box` itself is moved (e.g. when `self.tasks` grows),
190        // so the pointer remains valid for the lifetime of the
191        // `TaskEntry`. See SendItemPtr safety doc for the rest of the
192        // discipline (barrier() pairs with worker access).
193        #[allow(unsafe_code)]
194        let item_ptr =
195            SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
196
197        // Allocate the per-task atomics now so the dispatch closure
198        // and the `TaskEntry` share the same `Arc` storage. The task
199        // will occupy `self.tasks.len()` after the push below — capture
200        // that index up front for `task_idx_u32`. Bounded workspace, so
201        // the `as u32` cast is sound; explicit allow keeps clippy quiet.
202        let task_fault = Arc::new(FaultAtomic::new());
203        let overrun_count = Arc::new(AtomicU64::new(0));
204        #[allow(clippy::cast_possible_truncation)]
205        let task_idx_u32 = self.tasks.len() as u32;
206        let fault_ctx = FaultDispatchCtx {
207            task_budget: budget,
208            task_fault: Arc::clone(&task_fault),
209            overrun_count: Arc::clone(&overrun_count),
210            iteration_budget: self.iteration_budget,
211            exec_fault: Arc::clone(&self.exec_fault),
212            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
213            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
214            task_idx_u32,
215            exec_start: Arc::clone(&self.start_time),
216            observer: Arc::clone(&self.observer),
217        };
218
219        let job = build_single_job(
220            id.clone(),
221            self.stoppable.clone(),
222            Arc::clone(&self.observer),
223            Arc::clone(&self.monitor),
224            Arc::clone(&self.iter_err),
225            app_id,
226            app_inst,
227            item_ptr,
228            fault_ctx,
229        );
230
231        self.tasks.push(TaskEntry {
232            id: id.clone(),
233            kind: TaskKind::Single(item_box),
234            decls,
235            job: Some(job),
236            budget,
237            fault: task_fault,
238            overrun_count,
239            handler_job: None,
240        });
241        Ok(id)
242    }
243
244    /// Register an item plus a fault-handler item.
245    ///
246    /// The main item is registered through the canonical [`add`](Self::add)
247    /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
248    /// is called (so handlers that internally rely on the declarer being
249    /// invoked observe the call) but its returned trigger list is
250    /// **ignored** — the handler dispatches on the main item's triggers
251    /// while the task is in `Faulted` state and runs in place of the main
252    /// item's `execute()`. The pre-built handler dispatch closure is
253    /// stashed on the same task entry as the main item's `job`,
254    /// satisfying `REQ_0072`.
255    ///
256    /// # Errors
257    ///
258    /// Propagates any error from registering the main item via `add`, or
259    /// from the handler's `declare_triggers` call.
260    ///
261    /// # Panics
262    ///
263    /// Panics if the task entry just inserted by [`add`](Self::add) cannot
264    /// be located in `self.tasks` — this is unreachable by construction
265    /// and indicates a logic bug.
266    pub fn add_with_fault_handler<I, H>(
267        &mut self,
268        main: I,
269        handler: H,
270    ) -> Result<TaskId, ExecutorError>
271    where
272        I: ExecutableItem,
273        H: ExecutableItem,
274    {
275        let task_id = self.add(main)?;
276
277        // Drain the handler's trigger declarations — they are ignored by
278        // design (the handler runs on the main item's triggers).
279        let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
280        let mut throwaway = TriggerDeclarer::new_internal();
281        handler_box.declare_triggers(&mut throwaway)?;
282        drop(throwaway);
283
284        let app_id = handler_box.app_id();
285        let app_inst = handler_box.app_instance_id();
286
287        // Locate the task we just added so we can share its per-task
288        // atomics with the handler's `FaultDispatchCtx`. The handler
289        // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
290        // breach increments `overrun_count` and keeps state `Faulted`
291        // without re-firing the observer.
292        let task_idx = self
293            .tasks
294            .iter()
295            .position(|t| t.id == task_id)
296            .expect("just added; must exist");
297        let task = &self.tasks[task_idx];
298        #[allow(clippy::cast_possible_truncation)]
299        let task_idx_u32 = task_idx as u32;
300        let handler_fault_ctx = FaultDispatchCtx {
301            task_budget: task.budget,
302            task_fault: Arc::clone(&task.fault),
303            overrun_count: Arc::clone(&task.overrun_count),
304            iteration_budget: self.iteration_budget,
305            exec_fault: Arc::clone(&self.exec_fault),
306            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
307            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
308            task_idx_u32,
309            exec_start: Arc::clone(&self.start_time),
310            observer: Arc::clone(&self.observer),
311        };
312
313        let handler_closure = build_handler_job(
314            task_id.clone(),
315            self.stoppable.clone(),
316            Arc::clone(&self.observer),
317            Arc::clone(&self.monitor),
318            Arc::clone(&self.iter_err),
319            app_id,
320            app_inst,
321            handler_box,
322            handler_fault_ctx,
323        );
324
325        self.tasks[task_idx].handler_job = Some(handler_closure);
326
327        Ok(task_id)
328    }
329
330    /// Clear a per-task fault. Returns the previous `FaultState`.
331    /// Fires `Observer::on_task_clear` if the state changed from
332    /// `Faulted` to `Running`. `REQ_0070`.
333    ///
334    /// # Errors
335    ///
336    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
337    /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
338    pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
339        let entry = self
340            .tasks
341            .iter()
342            .find(|t| t.id == task)
343            .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
344        let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
345        let prev = entry.fault.swap(FaultState::Running, budget_ms);
346        match prev {
347            FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
348            FaultState::Faulted { .. } => {
349                self.observer.on_task_clear(task);
350                Ok(prev)
351            }
352        }
353    }
354
355    /// Clear the executor-wide fault and cascade-clear every task whose
356    /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
357    /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
358    /// breach is independent). Fires `Observer::on_executor_clear` and
359    /// one `Observer::on_task_clear` per cascade-cleared task.
360    /// `REQ_0071`.
361    ///
362    /// # Errors
363    ///
364    /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
365    pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
366        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
367        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
368        let prev = self
369            .exec_fault
370            .swap(ExecutorFaultState::Running, task_idx, budget_ms);
371        match prev {
372            ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
373            ExecutorFaultState::Faulted { .. } => {
374                // Cascade-clear tasks whose reason is ExecutorFaulted.
375                for entry in &self.tasks {
376                    let task_budget_ms =
377                        entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
378                    if let FaultState::Faulted {
379                        reason: FaultReason::ExecutorFaulted,
380                        ..
381                    } = entry.fault.load(task_budget_ms)
382                    {
383                        let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
384                        self.observer.on_task_clear(entry.id.clone());
385                    }
386                }
387                self.observer.on_executor_clear();
388                Ok(prev)
389            }
390        }
391    }
392
393    /// Return the per-task overrun counter — number of times the task's
394    /// `execute()` exceeded its budget over the executor's lifetime.
395    /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
396    ///
397    /// # Errors
398    ///
399    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
400    pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
401        self.tasks
402            .iter()
403            .find(|t| t.id == task)
404            .map(|t| t.overrun_count.load(Ordering::Acquire))
405            .ok_or_else(|| ExecutorError::TaskNotFound(task))
406    }
407
408    /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
409    ///
410    /// # Errors
411    ///
412    /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
413    pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
414        self.tasks
415            .iter()
416            .find(|t| t.id == task)
417            .map(|t| {
418                let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
419                t.fault.load(budget_ms)
420            })
421            .ok_or_else(|| ExecutorError::TaskNotFound(task))
422    }
423
424    /// Return a snapshot of the executor-wide `ExecutorFaultState`.
425    /// `REQ_0073` (pull path).
426    #[must_use]
427    pub fn executor_fault_state(&self) -> ExecutorFaultState {
428        let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
429        let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
430        self.exec_fault.load(task_idx, budget_ms)
431    }
432
433    /// Add a sequential chain of items. Only the head item's
434    /// `declare_triggers` is consulted; non-head triggers are ignored with a
435    /// tracing warn.
436    pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
437    where
438        I: ExecutableItem,
439        C: IntoIterator<Item = I>,
440    {
441        let id = TaskId::new(format!(
442            "chain-{}",
443            self.next_id.fetch_add(1, Ordering::SeqCst)
444        ));
445        let boxed: Vec<Box<dyn ExecutableItem>> = items
446            .into_iter()
447            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
448            .collect();
449        self.add_chain_with_id_boxed(id, boxed)
450    }
451
452    /// Like [`Executor::add_chain`] but with a user-supplied id.
453    pub fn add_chain_with_id<I, C>(
454        &mut self,
455        id: impl Into<TaskId>,
456        items: C,
457    ) -> Result<TaskId, ExecutorError>
458    where
459        I: ExecutableItem,
460        C: IntoIterator<Item = I>,
461    {
462        let boxed: Vec<Box<dyn ExecutableItem>> = items
463            .into_iter()
464            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
465            .collect();
466        self.add_chain_with_id_boxed(id.into(), boxed)
467    }
468
469    fn add_chain_with_id_boxed(
470        &mut self,
471        id: TaskId,
472        mut items: Vec<Box<dyn ExecutableItem>>,
473    ) -> Result<TaskId, ExecutorError> {
474        if items.is_empty() {
475            return Err(ExecutorError::Builder(
476                "chain must contain at least one item".into(),
477            ));
478        }
479
480        // Head item's `task_id()` override wins over the user-supplied id.
481        let id = items[0].task_id().map_or(id, TaskId::new);
482
483        // Head's triggers gate the chain.
484        let mut head_declarer = TriggerDeclarer::new_internal();
485        items[0].declare_triggers(&mut head_declarer)?;
486        let decls = head_declarer.into_decls();
487
488        // Warn if non-head items declared triggers (those will be ignored).
489        for (i, body) in items.iter_mut().enumerate().skip(1) {
490            let mut spurious = TriggerDeclarer::new_internal();
491            let _ = body.declare_triggers(&mut spurious);
492            if !spurious.is_empty() {
493                #[cfg(feature = "tracing")]
494                tracing::warn!(
495                    target: "taktora-executor",
496                    task = %id,
497                    position = i,
498                    "non-head chain item declared triggers; they will be ignored"
499                );
500                #[cfg(not(feature = "tracing"))]
501                {
502                    let _ = i;
503                }
504            }
505        }
506
507        let mut items = items;
508        // SAFETY: pointer into the chain's `items` Vec. The Vec lives
509        // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
510        // is stable once `add_chain` returns — `self.tasks` may grow
511        // (moving the `Vec<Box<...>>` header itself), but the Vec's
512        // heap buffer is referenced via the header's data pointer and
513        // is unaffected by header moves. We never resize the chain Vec
514        // after this point. See SendChainPtr safety doc for the rest.
515        #[allow(unsafe_code)]
516        let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
517            &mut items,
518        ));
519        // NB: the pointer above is to the local `items` Vec on the
520        // stack — it's invalid after the `push` below moves items into
521        // the TaskEntry. We rederive a stable pointer after the push.
522        // (See the rebuild step below.)
523        let _ = chain_ptr;
524
525        // Pre-allocate the per-task atomics so the chain's dispatch
526        // closure can capture clones of the same `Arc`s the `TaskEntry`
527        // holds. The chain occupies `self.tasks.len()` after the push.
528        let task_fault = Arc::new(FaultAtomic::new());
529        let overrun_count = Arc::new(AtomicU64::new(0));
530        #[allow(clippy::cast_possible_truncation)]
531        let task_idx_u32 = self.tasks.len() as u32;
532
533        self.tasks.push(TaskEntry {
534            id: id.clone(),
535            kind: TaskKind::Chain(items),
536            decls,
537            job: None, // populated in the rebuild step below
538            // TODO(post-Task-10): chain budgets carried separately; for now None.
539            budget: None,
540            fault: Arc::clone(&task_fault),
541            overrun_count: Arc::clone(&overrun_count),
542            handler_job: None,
543        });
544
545        // After the push, the TaskEntry lives at a stable position in
546        // `self.tasks` for the duration of this `add_chain_with_id_boxed`
547        // call. Take a stable pointer to its chain Vec and build the
548        // dispatch closure. If `self.tasks` later grows, the Vec header
549        // inside the TaskEntry moves but the header's data pointer
550        // (which addresses the chain's heap buffer) does not — and the
551        // closure derefs that pointer per dispatch, so it re-reads the
552        // current heap address each time. Sound under the same
553        // discipline as `tasks_ptr` in dispatch_loop.
554        let task_idx = self.tasks.len() - 1;
555        let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
556        {
557            TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
558            // The push above used TaskKind::Chain, so this arm is
559            // unreachable. Mark it explicitly to satisfy `match`.
560            _ => unreachable!("just-pushed task is TaskKind::Chain"),
561        };
562        #[allow(unsafe_code)]
563        let chain_ptr = SendChainPtr::new(chain_vec_ptr);
564        let fault_ctx = FaultDispatchCtx {
565            task_budget: None, // chain budgets are intentionally None for now
566            task_fault,
567            overrun_count,
568            iteration_budget: self.iteration_budget,
569            exec_fault: Arc::clone(&self.exec_fault),
570            exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
571            exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
572            task_idx_u32,
573            exec_start: Arc::clone(&self.start_time),
574            observer: Arc::clone(&self.observer),
575        };
576        let job = build_chain_job(
577            id.clone(),
578            self.stoppable.clone(),
579            Arc::clone(&self.observer),
580            Arc::clone(&self.monitor),
581            Arc::clone(&self.iter_err),
582            chain_ptr,
583            fault_ctx,
584        );
585        self.tasks[task_idx].job = Some(job);
586        Ok(id)
587    }
588
589    /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
590    /// executor is built. Clone before calling `run()` — any clone taken at any
591    /// time will wake the `WaitSet` when `stop()` is called.
592    #[must_use]
593    pub fn stoppable(&self) -> Stoppable {
594        self.stoppable.clone()
595    }
596
597    /// Borrow the underlying iceoryx2 node (escape hatch for power users).
598    pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
599        &self.node
600    }
601
602    /// Begin building a graph. Call `.build()` on the returned builder to
603    /// register the graph as a task.
604    pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
605        ExecutorGraphBuilder {
606            executor: self,
607            builder: crate::graph::GraphBuilder::new(),
608            custom_id: None,
609        }
610    }
611}
612
613/// Builder for [`Executor`].
614pub struct ExecutorBuilder {
615    worker_threads: Option<usize>,
616    observer: Option<Arc<dyn Observer>>,
617    monitor: Option<Arc<dyn ExecutionMonitor>>,
618    worker_attrs: ThreadAttributes,
619    /// Executor-wide iteration budget (`REQ_0071`). `None` means no
620    /// executor-wide check.
621    iteration_budget: Option<Duration>,
622}
623
624impl Default for ExecutorBuilder {
625    fn default() -> Self {
626        Self {
627            worker_threads: None,
628            observer: None,
629            monitor: None,
630            worker_attrs: ThreadAttributes::new(),
631            iteration_budget: None,
632        }
633    }
634}
635
636impl ExecutorBuilder {
637    /// Number of worker threads. `0` → inline (no pool). Default → physical
638    /// cores.
639    #[must_use]
640    pub const fn worker_threads(mut self, n: usize) -> Self {
641        self.worker_threads = Some(n);
642        self
643    }
644
645    /// Attach a lifecycle observer. If not called, a no-op observer is used.
646    #[must_use]
647    pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
648        self.observer = Some(obs);
649        self
650    }
651
652    /// Attach an execution monitor. If not called, a no-op monitor is used.
653    #[must_use]
654    pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
655        self.monitor = Some(mon);
656        self
657    }
658
659    /// Configure the executor-wide iteration budget. Any task whose
660    /// `execute()` exceeds `dur` transitions the executor to `Faulted`
661    /// (`REQ_0071`). Default: unset (no executor-wide check).
662    #[must_use]
663    pub const fn iteration_budget(mut self, dur: Duration) -> Self {
664        self.iteration_budget = Some(dur);
665        self
666    }
667
668    /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
669    /// for worker threads. Has no effect when `worker_threads` is `0` (inline
670    /// mode). Requires the `thread_attrs` feature for non-default settings.
671    #[must_use]
672    #[allow(clippy::missing_const_for_fn)]
673    pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
674        self.worker_attrs = attrs;
675        self
676    }
677
678    /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
679    /// internal stop-event service so that any `Stoppable` clone (taken before
680    /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
681    ///
682    /// # Panics
683    ///
684    /// Panics if the internally-generated stop-event service name exceeds the
685    /// iceoryx2 service name length limit (this cannot happen under normal use
686    /// because the name is derived from the process id and a monotonic counter).
687    #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
688    #[track_caller]
689    pub fn build(self) -> Result<Executor, ExecutorError> {
690        let node = NodeBuilder::new()
691            .create::<ipc::Service>()
692            .map_err(ExecutorError::iceoryx2)?;
693
694        let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
695        let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
696
697        // Build the internal stop event service with a unique-per-process name
698        // so multiple executors in the same process don't collide.
699        let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
700        let stop_topic = format!(
701            "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
702            std::process::id()
703        );
704        let stop_event = node
705            .service_builder(&stop_topic.as_str().try_into().unwrap())
706            .event()
707            .open_or_create()
708            .map_err(ExecutorError::iceoryx2)?;
709
710        let stop_notifier = Arc::new(
711            stop_event
712                .notifier_builder()
713                .create()
714                .map_err(ExecutorError::iceoryx2)?,
715        );
716
717        // SAFETY: see module-level note; Arc<IxListener> is held here and only
718        // accessed on the executor thread.
719        let stop_listener = Arc::new(
720            stop_event
721                .listener_builder()
722                .create()
723                .map_err(ExecutorError::iceoryx2)?,
724        );
725
726        // Wire the notifier into the Stoppable so every clone is waker-aware
727        // from the moment the executor is built.
728        let stoppable = Stoppable::with_waker(stop_notifier);
729
730        let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
731
732        let monitor: Arc<dyn ExecutionMonitor> =
733            self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
734
735        let exec = Executor {
736            node,
737            pool,
738            tasks: Vec::new(),
739            running: Arc::new(AtomicBool::new(false)),
740            stoppable,
741            next_id: AtomicU64::new(0),
742            stop_listener,
743            observer,
744            monitor,
745            iter_err: Arc::new(std::sync::Mutex::new(None)),
746            iteration_budget: self.iteration_budget,
747            exec_fault: Arc::new(ExecutorFaultAtomic::new()),
748            exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
749            exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
750            start_time: Arc::new(OnceLock::new()),
751        };
752
753        Ok(exec)
754    }
755}
756
757// ── Run loop ──────────────────────────────────────────────────────────────────
758
759impl Executor {
760    /// Run the executor until [`Stoppable::stop`] is called or a task signals
761    /// stop via [`crate::Context::stop_executor`].
762    ///
763    /// # Errors
764    ///
765    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
766    ///
767    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
768    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
769    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
770    ///
771    /// If multiple items error in the same dispatch iteration, only the first
772    /// is preserved; subsequent errors are discarded silently. To observe
773    /// every error, attach an [`Observer`](crate::Observer) and read errors
774    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
775    pub fn run(&mut self) -> Result<(), ExecutorError> {
776        self.run_inner(RunMode::Forever)
777    }
778
779    /// Run for at most `max` wall-clock duration, then return.
780    ///
781    /// # Errors
782    ///
783    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
784    ///
785    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
786    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
787    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
788    ///
789    /// If multiple items error in the same dispatch iteration, only the first
790    /// is preserved; subsequent errors are discarded silently. To observe
791    /// every error, attach an [`Observer`](crate::Observer) and read errors
792    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
793    pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
794        self.run_inner(RunMode::Until(Instant::now() + max))
795    }
796
797    /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
798    ///
799    /// # Errors
800    ///
801    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
802    ///
803    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
804    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
805    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
806    ///
807    /// If multiple items error in the same dispatch iteration, only the first
808    /// is preserved; subsequent errors are discarded silently. To observe
809    /// every error, attach an [`Observer`](crate::Observer) and read errors
810    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
811    pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
812        self.run_inner(RunMode::Iterations(n))
813    }
814
815    /// Run until `predicate()` returns true. Checked after each `WaitSet`
816    /// wakeup.
817    ///
818    /// # Errors
819    ///
820    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
821    ///
822    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
823    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
824    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
825    ///
826    /// If multiple items error in the same dispatch iteration, only the first
827    /// is preserved; subsequent errors are discarded silently. To observe
828    /// every error, attach an [`Observer`](crate::Observer) and read errors
829    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
830    pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
831        self.run_inner(RunMode::Predicate(&mut predicate))
832    }
833}
834
835enum RunMode<'a> {
836    Forever,
837    Until(Instant),
838    Iterations(usize),
839    Predicate(&'a mut dyn FnMut() -> bool),
840}
841
842impl Executor {
843    fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
844        // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
845        // remains true permanently. Calling `run()` again after a stop will return
846        // promptly without doing any meaningful work (it blocks until the first
847        // trigger fires, then immediately exits the dispatch loop). Task 10's
848        // Runner accommodates this by treating an Executor as one-shot: each
849        // Runner owns the Executor and consumes it.
850        if self.running.swap(true, Ordering::SeqCst) {
851            return Err(ExecutorError::AlreadyRunning);
852        }
853
854        self.observer.on_executor_up();
855        let result = self.dispatch_loop(&mut mode);
856        match &result {
857            Ok(()) => self.observer.on_executor_down(),
858            Err(e) => self.observer.on_executor_error(e),
859        }
860
861        self.running.store(false, Ordering::SeqCst);
862        result
863    }
864
865    #[allow(
866        unsafe_code,
867        clippy::too_many_lines,
868        clippy::ref_as_ptr,
869        clippy::borrow_as_ptr
870    )]
871    fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
872        let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
873            .create()
874            .map_err(ExecutorError::iceoryx2)?;
875
876        // Keep Arc<RawListener> alive for at least as long as the WaitSet
877        // guards — the guard borrows the listener via 'attachment lifetime.
878        let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
879        // Guards must outlive the run loop.
880        let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
881        // Maps guard index → task index.
882        let mut attachment_to_task: Vec<usize> = Vec::new();
883
884        for (task_idx, task) in self.tasks.iter().enumerate() {
885            for decl in &task.decls {
886                match decl {
887                    TriggerDecl::Subscriber { listener } => {
888                        // Clone Arc to extend listener lifetime to this scope.
889                        let l = Arc::clone(listener);
890                        listener_storage.push(l);
891                        let l_ref = listener_storage.last().unwrap().as_ref();
892                        // SAFETY: we cast the reference lifetime to match
893                        // 'waitset / 'attachment; both listener_storage and
894                        // waitset are stack-local and dropped together at the
895                        // end of dispatch_loop.  Guards are dropped before
896                        // listener_storage below.
897                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
898                        let guard = waitset
899                            .attach_notification(l_ref)
900                            .map_err(ExecutorError::iceoryx2)?;
901                        guards.push(guard);
902                        attachment_to_task.push(task_idx);
903                    }
904                    TriggerDecl::Interval(d) => {
905                        let guard = waitset
906                            .attach_interval(*d)
907                            .map_err(ExecutorError::iceoryx2)?;
908                        guards.push(guard);
909                        attachment_to_task.push(task_idx);
910                    }
911                    TriggerDecl::Deadline { listener, deadline } => {
912                        let l = Arc::clone(listener);
913                        listener_storage.push(l);
914                        let l_ref = listener_storage.last().unwrap().as_ref();
915                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
916                        let guard = waitset
917                            .attach_deadline(l_ref, *deadline)
918                            .map_err(ExecutorError::iceoryx2)?;
919                        guards.push(guard);
920                        attachment_to_task.push(task_idx);
921                    }
922                    TriggerDecl::RawListener(listener) => {
923                        let l = Arc::clone(listener);
924                        listener_storage.push(l);
925                        let l_ref = listener_storage.last().unwrap().as_ref();
926                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
927                        let guard = waitset
928                            .attach_notification(l_ref)
929                            .map_err(ExecutorError::iceoryx2)?;
930                        guards.push(guard);
931                        attachment_to_task.push(task_idx);
932                    }
933                }
934            }
935        }
936
937        // Attach the internal stop listener so the WaitSet wakes when
938        // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
939        // struct which is valid for the lifetime of dispatch_loop. We use the
940        // same raw-pointer-cast pattern as user listeners above.
941        //
942        // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
943        // exclusively borrowed for the duration of `run_inner` (which calls
944        // `dispatch_loop`). The listener is not freed while the guard is alive
945        // because the Arc keeps it alive and `self` outlives this function.
946        let stop_listener_ref: &IxListener<ipc::Service> =
947            unsafe { &*(self.stop_listener.as_ref() as *const _) };
948        let _stop_guard = waitset
949            .attach_notification(stop_listener_ref)
950            .map_err(ExecutorError::iceoryx2)?;
951
952        let iterations_done = AtomicUsize::new(0);
953        let stop_flag = self.stoppable.clone();
954
955        loop {
956            // Reset the pre-allocated per-iteration error slot (REQ_0060):
957            // the slot is owned by `self.iter_err`, allocated once at build
958            // time. Pool worker closures obtain a refcount-only clone of
959            // the `Arc`; the slot itself is reused across iterations.
960            *self.iter_err.lock().unwrap() = None;
961
962            // SAFETY: we capture &mut self.tasks via a raw pointer because
963            // wait_and_process expects FnMut and Rust can't see the closure
964            // outlives `self`. The discipline that makes this sound:
965            //   1. The closure body on the executor thread is the *only* code that
966            //      reads `tasks_ptr`. The pool jobs it submits hold borrowed
967            //      `*mut dyn ExecutableItem` slices into individual TaskEntries,
968            //      not into the Vec itself, so they don't race with the Vec.
969            //   2. `pool.barrier()` at the end of this callback ensures every
970            //      submitted pool job has completed (and dropped its raw pointer)
971            //      before the callback returns. The next iteration of the WaitSet
972            //      loop is therefore the sole user of `tasks_ptr` again.
973            //   3. The Vec is never resized inside this loop (no `push` / `remove`
974            //      after dispatch starts), so the underlying buffer addresses are
975            //      stable for the lifetime of `dispatch_loop`.
976            let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
977            let pool = &self.pool;
978            // Refcount-only clone of the pre-allocated error slot. Pool jobs
979            // need a `'static` handle, and an `Arc::clone` does not allocate.
980            // The Single/Chain paths use the closure baked into `task.job`,
981            // which already captured stable Arc clones at `add`-time; the
982            // Graph path uses closures pre-built by `prepare_dispatch`. Only
983            // the error-aggregation logic on the WaitSet thread still needs
984            // the slot here.
985            let iter_err_inner = Arc::clone(&self.iter_err);
986            // Raw pointer to the stop listener for draining inside the callback.
987            // SAFETY: same as stop_listener_ref above — the Arc is alive for
988            // the lifetime of dispatch_loop.
989            let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
990            // Raw pointer to the executor-wide fault state. Same safety
991            // discipline as `tasks_ptr`: `Executor` is alive for the
992            // duration of `dispatch_loop`; the WaitSet callback is the
993            // only reader. REQ_0071. `self.exec_fault` is
994            // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
995            // pointer to the inner `ExecutorFaultAtomic`.
996            let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
997            // Raw pointer to the executor start time. Used by the lazy
998            // cascade below to compute `since_ms` on task transitions
999            // triggered by an executor-wide fault.
1000            let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1001
1002            let cb_result = waitset.wait_and_process_once(
1003                |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1004                    // Drain stop notifications first (no dispatch — the stop_flag
1005                    // check after the callback returns handles termination).
1006                    // SAFETY: stop_listener_ptr is valid for the duration of the
1007                    // closure; the Arc in self.stop_listener keeps it alive.
1008                    let stop_l = unsafe { &*stop_listener_ptr };
1009                    while let Ok(Some(_)) = stop_l.try_wait_one() {}
1010
1011                    for (i, guard) in guards.iter().enumerate() {
1012                        let fired = attachment_id.has_event_from(guard)
1013                            || attachment_id.has_missed_deadline(guard);
1014                        if !fired {
1015                            continue;
1016                        }
1017                        let task_idx = attachment_to_task[i];
1018
1019                        // SAFETY: we are the only thread that may touch
1020                        // `self` during the callback. wait_and_process_once
1021                        // is single-threaded; we hold &mut self in
1022                        // dispatch_loop. The pointer is valid for the
1023                        // duration of this closure.
1024                        let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
1025
1026                        // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072).
1027                        // Only applies to Single/Chain — Graph tasks use their
1028                        // own per-vertex scheduling and are out of scope for
1029                        // FEAT_0018.
1030                        if matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1031                            // SAFETY: exec_fault_ptr derefs into the Executor that
1032                            // owns this dispatch_loop — alive for the callback's
1033                            // lifetime.
1034                            let exec_faulted = matches!(
1035                                unsafe { &*exec_fault_ptr }.load(0, 0),
1036                                ExecutorFaultState::Faulted { .. }
1037                            );
1038                            let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1039                            let task_state = task.fault.load(task_budget_ms);
1040
1041                            // Lazy cascade: if executor is `Faulted` and task
1042                            // is still `Running`, silently transition the task
1043                            // to `Faulted{ExecutorFaulted}`. No `on_task_fault`
1044                            // — Observer already heard about the executor-wide
1045                            // fault via `on_executor_fault` (cascade-noise
1046                            // invariant from FEAT_0018 §4.6).
1047                            let task_faulted =
1048                                if exec_faulted && matches!(task_state, FaultState::Running) {
1049                                    // SAFETY: exec_start_ptr derefs into the same
1050                                    // `Executor` owning this dispatch_loop. The
1051                                    // `OnceLock` is wait-free.
1052                                    let exec_start = *unsafe { &*exec_start_ptr }
1053                                        .get_or_init(std::time::Instant::now);
1054                                    let since_ms =
1055                                        instant_to_since_ms(std::time::Instant::now(), exec_start);
1056                                    let _ = task.fault.swap(
1057                                        FaultState::Faulted {
1058                                            reason: FaultReason::ExecutorFaulted,
1059                                            since_ms,
1060                                        },
1061                                        task_budget_ms,
1062                                    );
1063                                    true
1064                                } else {
1065                                    matches!(task_state, FaultState::Faulted { .. })
1066                                };
1067                            let route_to_handler = exec_faulted || task_faulted;
1068
1069                            if route_to_handler {
1070                                // If a handler is registered, dispatch it.
1071                                // Otherwise, skip dispatch entirely this wakeup.
1072                                if let Some(handler_box) = task.handler_job.as_deref_mut() {
1073                                    let job_ptr: *mut (dyn FnMut() + Send) =
1074                                        handler_box as *mut (dyn FnMut() + Send);
1075                                    // SAFETY: same as the main-job dispatch
1076                                    // below — handler_job is owned by the
1077                                    // TaskEntry; pool.barrier() awaits its
1078                                    // completion before the next callback.
1079                                    #[allow(unsafe_code)]
1080                                    unsafe {
1081                                        pool.submit_borrowed(crate::pool::BorrowedJob::new(
1082                                            job_ptr,
1083                                        ));
1084                                    }
1085                                }
1086                                // No handler and not Running — skip silently.
1087                                continue;
1088                            }
1089                        }
1090
1091                        match &mut task.kind {
1092                            TaskKind::Single(_) | TaskKind::Chain(_) => {
1093                                // The dispatch closure was pre-allocated at
1094                                // task-add time and stashed on `task.job`.
1095                                // Submit it via `submit_borrowed` — no
1096                                // per-iteration Box allocation. Required by
1097                                // REQ_0060.
1098                                let job_box = task
1099                                    .job
1100                                    .as_deref_mut()
1101                                    .expect("Single/Chain tasks carry a pre-built job");
1102                                let job_ptr: *mut (dyn FnMut() + Send) =
1103                                    job_box as *mut (dyn FnMut() + Send);
1104                                // SAFETY: the closure lives in
1105                                // `task.job` which is owned by
1106                                // `self.tasks[task_idx]`; `tasks_ptr` is
1107                                // sound for the duration of this
1108                                // callback. `pool.barrier()` below
1109                                // finishes the closure invocation before
1110                                // we re-enter the next iteration's
1111                                // callback. The WaitSet thread does not
1112                                // touch the closure between this submit
1113                                // and that barrier.
1114                                #[allow(unsafe_code)]
1115                                unsafe {
1116                                    pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1117                                }
1118                            }
1119                            TaskKind::Graph(graph) => {
1120                                // Outer driver runs on the WaitSet thread; vertices run on the
1121                                // pool. The graph holds its own pre-built per-vertex closures
1122                                // and SPSC ready ring (REQ_0060), so dispatch is allocation-free
1123                                // in steady state.
1124                                let outcome = graph.run_once_borrowed(pool);
1125                                if let Some(source) = outcome.error {
1126                                    let mut g = iter_err_inner.lock().unwrap();
1127                                    if g.is_none() {
1128                                        *g = Some(ExecutorError::Item {
1129                                            task_id: task.id.clone(),
1130                                            source,
1131                                        });
1132                                    }
1133                                }
1134                                let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
1135                            }
1136                        }
1137                    }
1138
1139                    // Wait for all submitted jobs to finish before leaving
1140                    // the callback scope (validates item_ptr safety contract).
1141                    pool.barrier();
1142                    CallbackProgression::Continue
1143                },
1144            );
1145
1146            let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
1147
1148            // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
1149            // here for a clean exit.
1150            if matches!(
1151                cb_result,
1152                WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
1153            ) {
1154                return Ok(());
1155            }
1156
1157            // Extract the error before dropping the MutexGuard — avoids
1158            // holding the lock across the return (clippy::significant_drop_in_scrutinee).
1159            let maybe_err = self.iter_err.lock().unwrap().take();
1160            if let Some(err) = maybe_err {
1161                return Err(err);
1162            }
1163            if stop_flag.is_stopped() {
1164                return Ok(());
1165            }
1166
1167            iterations_done.fetch_add(1, Ordering::SeqCst);
1168            match mode {
1169                RunMode::Forever => {}
1170                RunMode::Iterations(n) => {
1171                    if iterations_done.load(Ordering::SeqCst) >= *n {
1172                        return Ok(());
1173                    }
1174                }
1175                RunMode::Until(deadline) => {
1176                    if Instant::now() >= *deadline {
1177                        return Ok(());
1178                    }
1179                }
1180                RunMode::Predicate(p) => {
1181                    if (p)() {
1182                        return Ok(());
1183                    }
1184                }
1185            }
1186        }
1187    }
1188}
1189
1190/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
1191/// `Pool::submit`. The send is safe because:
1192///   1. The executor guarantees at most one invocation of a given item at a
1193///      time (via `pool.barrier()` before the pointer is reused).
1194///   2. `ExecutableItem: Send`, so moving the pointee across threads is sound
1195///      when no aliasing exists.
1196#[allow(unsafe_code)]
1197struct SendItemPtr {
1198    ptr: *mut dyn ExecutableItem,
1199}
1200
1201impl SendItemPtr {
1202    fn new(ptr: *mut dyn ExecutableItem) -> Self {
1203        Self { ptr }
1204    }
1205
1206    /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
1207    /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
1208    /// dispatch closure to be reusable across iterations without allocation).
1209    fn get(&self) -> *mut dyn ExecutableItem {
1210        self.ptr
1211    }
1212}
1213
1214// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
1215// closure can borrow `&SendItemPtr` per invocation without making the
1216// closure itself `!Send`.
1217#[allow(unsafe_code)]
1218unsafe impl Send for SendItemPtr {}
1219#[allow(unsafe_code)]
1220unsafe impl Sync for SendItemPtr {}
1221
1222/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
1223/// closure can iterate the chain's items in place without first
1224/// collecting them into a freshly-allocated `Vec`. The send is safe
1225/// for the same reason as [`SendItemPtr`] (see above): the executor
1226/// holds `&mut self` for the duration of `dispatch_loop`, and the
1227/// `pool.barrier()` at the end of each callback ensures the closure
1228/// has finished using this pointer before the Vec could be touched
1229/// from the `WaitSet` thread again. The Vec is never resized after
1230/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
1231/// allocate per iteration.
1232#[allow(unsafe_code)]
1233struct SendChainPtr {
1234    ptr: *mut Vec<Box<dyn ExecutableItem>>,
1235}
1236
1237impl SendChainPtr {
1238    fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
1239        Self { ptr }
1240    }
1241
1242    fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
1243        self.ptr
1244    }
1245}
1246
1247// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
1248// borrow `&SendChainPtr` per invocation while staying `Send`.
1249#[allow(unsafe_code)]
1250unsafe impl Send for SendChainPtr {}
1251#[allow(unsafe_code)]
1252unsafe impl Sync for SendChainPtr {}
1253
1254/// Captured state needed by a dispatch closure to perform post-execute
1255/// fault detection. All fields are `Arc`-shared with the owning
1256/// `Executor` and `TaskEntry` so the closure can read/write them
1257/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
1258/// `REQ_0102`.
1259struct FaultDispatchCtx {
1260    /// Per-task budget. `None` for chain / graph tasks (no per-task
1261    /// check) — the executor-wide iteration budget still applies.
1262    task_budget: Option<Duration>,
1263    /// Per-task fault state (shared with `TaskEntry::fault`).
1264    task_fault: Arc<FaultAtomic>,
1265    /// Per-task monotonic overrun counter (shared with
1266    /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
1267    overrun_count: Arc<AtomicU64>,
1268    /// Executor-wide iteration budget. `None` means no executor-wide
1269    /// check.
1270    iteration_budget: Option<Duration>,
1271    /// Executor-wide fault state (shared with `Executor::exec_fault`).
1272    exec_fault: Arc<ExecutorFaultAtomic>,
1273    /// Executor-wide offending-task index storage (shared with
1274    /// `Executor::exec_fault_task_idx`).
1275    exec_fault_task_idx: Arc<AtomicU32>,
1276    /// Executor-wide breached-budget storage (shared with
1277    /// `Executor::exec_fault_budget_ms`).
1278    exec_fault_budget_ms: Arc<AtomicU32>,
1279    /// Index of this task in the executor's task table.
1280    task_idx_u32: u32,
1281    /// Executor start time (shared with `Executor::start_time`).
1282    exec_start: Arc<OnceLock<Instant>>,
1283    /// Observer for `on_task_fault` / `on_executor_fault` notifications.
1284    observer: Arc<dyn Observer>,
1285}
1286
1287/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
1288///
1289/// The returned closure is stored on `TaskEntry::job` and invoked once
1290/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
1291/// performs no allocation. The closure captures Arc clones of the
1292/// executor's shared state — those clones are refcount-only at build
1293/// time and are reused on every dispatch. Required for `REQ_0060`.
1294#[allow(clippy::too_many_arguments)]
1295fn build_single_job(
1296    id: TaskId,
1297    stop: Stoppable,
1298    obs: Arc<dyn Observer>,
1299    mon: Arc<dyn ExecutionMonitor>,
1300    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1301    app_id: Option<u32>,
1302    app_inst: Option<u32>,
1303    item_ptr: SendItemPtr,
1304    fault_ctx: FaultDispatchCtx,
1305) -> Box<dyn FnMut() + Send + 'static> {
1306    Box::new(move || {
1307        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1308        if let Some(aid) = app_id {
1309            obs.on_app_start(id.clone(), aid, app_inst);
1310        }
1311        let raw = item_ptr.get();
1312        let started = std::time::Instant::now();
1313        mon.pre_execute(id.clone(), started);
1314        // SAFETY: barrier() pairs with this invocation; the WaitSet
1315        // thread does not touch the item between `submit_borrowed` and
1316        // the matching `barrier()`. See SendItemPtr safety doc.
1317        #[allow(unsafe_code)]
1318        let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
1319        let took = started.elapsed();
1320        mon.post_execute(id.clone(), started, took, res.is_ok());
1321        if let Err(ref e) = res {
1322            obs.on_app_error(id.clone(), e.as_ref());
1323        }
1324        if app_id.is_some() {
1325            obs.on_app_stop(id.clone());
1326        }
1327        post_execute_detect_fault(&id, started, took, &fault_ctx);
1328        record_first_err(&err_slot, &id, res);
1329    })
1330}
1331
1332/// Build the per-iteration dispatch closure for a fault-handler item.
1333///
1334/// Mirrors [`build_single_job`] in every detail (same monitor /
1335/// observer / first-error capture wiring) but owns the
1336/// `Box<dyn ExecutableItem>` directly inside the closure instead of
1337/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
1338/// owner inside [`TaskEntry`] — the handler closure stored in
1339/// `handler_job` is the sole owner — so the simpler owning form is
1340/// both sound and avoids the aliasing dance the main item needs.
1341/// `REQ_0072`.
1342#[allow(clippy::too_many_arguments)]
1343fn build_handler_job(
1344    id: TaskId,
1345    stop: Stoppable,
1346    obs: Arc<dyn Observer>,
1347    mon: Arc<dyn ExecutionMonitor>,
1348    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1349    app_id: Option<u32>,
1350    app_inst: Option<u32>,
1351    mut handler: Box<dyn ExecutableItem>,
1352    fault_ctx: FaultDispatchCtx,
1353) -> Box<dyn FnMut() + Send + 'static> {
1354    Box::new(move || {
1355        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1356        if let Some(aid) = app_id {
1357            obs.on_app_start(id.clone(), aid, app_inst);
1358        }
1359        let started = std::time::Instant::now();
1360        mon.pre_execute(id.clone(), started);
1361        let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
1362        let took = started.elapsed();
1363        mon.post_execute(id.clone(), started, took, res.is_ok());
1364        if let Err(ref e) = res {
1365            obs.on_app_error(id.clone(), e.as_ref());
1366        }
1367        if app_id.is_some() {
1368            obs.on_app_stop(id.clone());
1369        }
1370        // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
1371        // budget keeps the task in `Faulted` (state already `Faulted`),
1372        // `overrun_count` increments, NO new `on_task_fault` fires —
1373        // the `matches!(prev, FaultState::Running)` gate inside
1374        // `post_execute_detect_fault` enforces that.
1375        post_execute_detect_fault(&id, started, took, &fault_ctx);
1376        record_first_err(&err_slot, &id, res);
1377    })
1378}
1379
1380/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
1381#[allow(clippy::too_many_arguments)]
1382fn build_chain_job(
1383    id: TaskId,
1384    stop: Stoppable,
1385    obs: Arc<dyn Observer>,
1386    mon: Arc<dyn ExecutionMonitor>,
1387    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1388    chain_ptr: SendChainPtr,
1389    fault_ctx: FaultDispatchCtx,
1390) -> Box<dyn FnMut() + Send + 'static> {
1391    Box::new(move || {
1392        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1393        // SAFETY: barrier() pairs with this invocation; the chain Vec
1394        // and the items it owns are not touched by the WaitSet thread
1395        // until barrier() returns. See SendChainPtr safety doc.
1396        #[allow(unsafe_code)]
1397        let chain_items = unsafe { &mut *chain_ptr.get() };
1398        for item_box in chain_items.iter_mut() {
1399            let app_id = item_box.app_id();
1400            let app_inst = item_box.app_instance_id();
1401            if let Some(aid) = app_id {
1402                obs.on_app_start(id.clone(), aid, app_inst);
1403            }
1404            let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
1405            let started = std::time::Instant::now();
1406            mon.pre_execute(id.clone(), started);
1407            #[allow(unsafe_code)]
1408            let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
1409            let took = started.elapsed();
1410            mon.post_execute(id.clone(), started, took, res.is_ok());
1411            if let Err(ref e) = res {
1412                obs.on_app_error(id.clone(), e.as_ref());
1413            }
1414            if app_id.is_some() {
1415                obs.on_app_stop(id.clone());
1416            }
1417            // Per-item post-execute fault detection. `task_budget` is
1418            // `None` for chains (see `add_chain_with_id_boxed`), so the
1419            // per-task check no-ops; the executor-wide iteration-budget
1420            // check still fires per item. `REQ_0071`.
1421            post_execute_detect_fault(&id, started, took, &fault_ctx);
1422            match res {
1423                Ok(crate::ControlFlow::Continue) => {}
1424                Ok(crate::ControlFlow::StopChain) => break,
1425                Err(_) => {
1426                    record_first_err(&err_slot, &id, res);
1427                    break;
1428                }
1429            }
1430        }
1431    })
1432}
1433
1434#[derive(Debug)]
1435struct PanickedTask(String);
1436
1437impl core::fmt::Display for PanickedTask {
1438    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1439        write!(f, "task panicked: {}", self.0)
1440    }
1441}
1442
1443impl std::error::Error for PanickedTask {}
1444
1445/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
1446#[allow(clippy::option_if_let_else)]
1447fn run_item_catch_unwind(
1448    item: &mut dyn ExecutableItem,
1449    ctx: &mut crate::context::Context<'_>,
1450) -> crate::ExecuteResult {
1451    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
1452        |payload| {
1453            let msg = if let Some(s) = payload.downcast_ref::<&str>() {
1454                (*s).to_string()
1455            } else if let Some(s) = payload.downcast_ref::<String>() {
1456                s.clone()
1457            } else {
1458                "panicked task".to_string()
1459            };
1460            Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
1461        },
1462    )
1463}
1464
1465/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
1466/// without depending on its private name.
1467pub(crate) fn run_item_catch_unwind_external(
1468    item: &mut dyn ExecutableItem,
1469    ctx: &mut crate::context::Context<'_>,
1470) -> crate::ExecuteResult {
1471    run_item_catch_unwind(item, ctx)
1472}
1473
1474/// Record the first error into `slot`. Subsequent errors are silently dropped.
1475fn record_first_err(
1476    slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
1477    id: &TaskId,
1478    res: crate::ExecuteResult,
1479) {
1480    if let Err(source) = res {
1481        let mut g = slot.lock().unwrap();
1482        if g.is_none() {
1483            *g = Some(ExecutorError::Item {
1484                task_id: id.clone(),
1485                source,
1486            });
1487        }
1488    }
1489}
1490
1491/// Post-execute fault detection — runs on a pool worker AFTER
1492/// `mon.post_execute` so the full `took` is available. Implements:
1493///
1494///   * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
1495///     `overrun_count` on every breach, transitions
1496///     `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
1497///     breaches keep the state `Faulted` and do NOT re-fire the
1498///     observer).
1499///   * `REQ_0071` — executor-wide iteration overrun: transitions
1500///     `Running -> Faulted{IterationBudgetExceeded}` exactly once;
1501///     cascade to per-task state is LAZY (see the pre-dispatch block
1502///     in `dispatch_loop`), so the per-task `on_task_fault` does NOT
1503///     fire during cascade — only `on_executor_fault` does.
1504fn post_execute_detect_fault(
1505    id: &TaskId,
1506    started: Instant,
1507    took: Duration,
1508    fault_ctx: &FaultDispatchCtx,
1509) {
1510    // REQ_0070 / REQ_0102 — per-task budget overrun.
1511    if let Some(budget) = fault_ctx.task_budget {
1512        if took > budget {
1513            fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
1514            let took_ms = duration_to_ms_sat(took);
1515            let budget_ms = duration_to_ms_sat(budget);
1516            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
1517            let since_ms = instant_to_since_ms(started, exec_start);
1518            let new_state = FaultState::Faulted {
1519                reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
1520                since_ms,
1521            };
1522            let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
1523            if matches!(prev, FaultState::Running) {
1524                fault_ctx.observer.on_task_fault(
1525                    id.clone(),
1526                    FaultReason::BudgetExceeded { took_ms, budget_ms },
1527                );
1528            }
1529        }
1530    }
1531
1532    // REQ_0071 — executor-wide iteration overrun.
1533    if let Some(iter_budget) = fault_ctx.iteration_budget {
1534        if took > iter_budget {
1535            let took_ms = duration_to_ms_sat(took);
1536            let budget_ms = duration_to_ms_sat(iter_budget);
1537            let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
1538            let since_ms = instant_to_since_ms(started, exec_start);
1539            fault_ctx
1540                .exec_fault_task_idx
1541                .store(fault_ctx.task_idx_u32, Ordering::Release);
1542            fault_ctx
1543                .exec_fault_budget_ms
1544                .store(budget_ms, Ordering::Release);
1545            let new_state = ExecutorFaultState::Faulted {
1546                reason: ExecutorFaultReason::IterationBudgetExceeded {
1547                    task_idx: fault_ctx.task_idx_u32,
1548                    took_ms,
1549                    budget_ms,
1550                },
1551                since_ms,
1552            };
1553            let prev = fault_ctx
1554                .exec_fault
1555                .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
1556            if matches!(prev, ExecutorFaultState::Running) {
1557                fault_ctx.observer.on_executor_fault(
1558                    ExecutorFaultReason::IterationBudgetExceeded {
1559                        task_idx: fault_ctx.task_idx_u32,
1560                        took_ms,
1561                        budget_ms,
1562                    },
1563                );
1564                // NO eager cascade here. Cascade is lazy: the
1565                // pre-dispatch block in `dispatch_loop` transitions
1566                // each `Running` task to `Faulted{ExecutorFaulted}` on
1567                // the next wakeup — silently, so per-task observers
1568                // do not fire (see §4.6 invariant on cascade-noise).
1569            }
1570        }
1571    }
1572}
1573
1574// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
1575
1576/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
1577/// into a registered task.
1578pub struct ExecutorGraphBuilder<'e> {
1579    executor: &'e mut Executor,
1580    builder: crate::graph::GraphBuilder,
1581    custom_id: Option<TaskId>,
1582}
1583
1584impl ExecutorGraphBuilder<'_> {
1585    /// Add a vertex to the graph; returns its handle.
1586    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
1587        self.builder.vertex(item)
1588    }
1589
1590    /// Add a directed edge from one vertex to another.
1591    pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
1592        self.builder.edge(from, to);
1593        self
1594    }
1595
1596    /// Designate the root vertex (its triggers gate the graph).
1597    pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
1598        self.builder.root(v);
1599        self
1600    }
1601
1602    /// Override the auto-generated id with a custom one.
1603    pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
1604        self.custom_id = Some(id.into());
1605        self
1606    }
1607
1608    /// Validate and register the graph. Returns the task id.
1609    ///
1610    /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
1611    /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
1612    /// precedence over the auto-generated id.
1613    pub fn build(self) -> Result<TaskId, ExecutorError> {
1614        let g = self.builder.finish()?;
1615        // Root vertex's task_id() override wins over the custom id, which wins
1616        // over the auto-generated fallback.
1617        let auto_id = || {
1618            TaskId::new(format!(
1619                "graph-{}",
1620                self.executor.next_id.fetch_add(1, Ordering::SeqCst)
1621            ))
1622        };
1623        let id = g
1624            .root_task_id()
1625            .map(TaskId::new)
1626            .or(self.custom_id)
1627            .unwrap_or_else(auto_id);
1628        let decls = g.decls.clone();
1629
1630        // Box the graph for address stability — per-vertex dispatch
1631        // closures capture `*const Graph` and must not see it move.
1632        let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
1633        // Pre-build the per-vertex closures now that we know the
1634        // task_id and have access to the executor's shared state.
1635        graph_box.prepare_dispatch(
1636            id.clone(),
1637            self.executor.stoppable.clone(),
1638            Arc::clone(&self.executor.observer),
1639            Arc::clone(&self.executor.monitor),
1640            Arc::clone(&self.executor.iter_err),
1641        );
1642
1643        self.executor.tasks.push(TaskEntry {
1644            id: id.clone(),
1645            kind: TaskKind::Graph(graph_box),
1646            decls,
1647            // Graph tasks dispatch their vertices via `vertex_jobs`
1648            // stored inside the `Graph`; the per-task `job` slot
1649            // is unused for graphs.
1650            job: None,
1651            // TODO(post-Task-10): graph budgets carried separately; for now None.
1652            budget: None,
1653            fault: Arc::new(FaultAtomic::new()),
1654            overrun_count: Arc::new(AtomicU64::new(0)),
1655            handler_job: None,
1656        });
1657        Ok(id)
1658    }
1659}
1660
1661// ── Unit tests ────────────────────────────────────────────────────────────────
1662
1663#[cfg(test)]
1664mod tests {
1665    use super::*;
1666    use crate::{ControlFlow, item};
1667
1668    #[test]
1669    fn add_returns_unique_ids() {
1670        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1671        let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1672        let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1673        assert_ne!(a, b);
1674    }
1675
1676    #[test]
1677    fn custom_id_is_preserved() {
1678        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1679        let id = exec
1680            .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
1681            .unwrap();
1682        assert_eq!(id.as_str(), "my-task");
1683    }
1684
1685    #[test]
1686    fn add_persists_declared_budget() {
1687        use core::time::Duration;
1688        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1689        let task_id = exec
1690            .add(crate::item::item_with_triggers(
1691                |d| {
1692                    d.interval(Duration::from_millis(10));
1693                    d.budget(Duration::from_millis(5));
1694                    Ok(())
1695                },
1696                |_| Ok(crate::ControlFlow::Continue),
1697            ))
1698            .unwrap();
1699        let entry = exec
1700            .tasks
1701            .iter()
1702            .find(|t| t.id == task_id)
1703            .expect("task present");
1704        assert_eq!(entry.budget, Some(Duration::from_millis(5)));
1705    }
1706
1707    #[test]
1708    fn add_with_fault_handler_stores_handler_job() {
1709        use core::time::Duration;
1710        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1711        let task_id = exec
1712            .add_with_fault_handler(
1713                crate::item::item_with_triggers(
1714                    |d| {
1715                        d.interval(Duration::from_millis(10));
1716                        d.budget(Duration::from_millis(5));
1717                        Ok(())
1718                    },
1719                    |_| Ok(crate::ControlFlow::Continue),
1720                ),
1721                crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
1722            )
1723            .unwrap();
1724        let entry = exec
1725            .tasks
1726            .iter()
1727            .find(|t| t.id == task_id)
1728            .expect("task present");
1729        assert!(
1730            entry.handler_job.is_some(),
1731            "handler_job should be Some after add_with_fault_handler"
1732        );
1733        // Main job should still be present.
1734        assert!(entry.job.is_some(), "main job should still be present");
1735    }
1736
1737    #[test]
1738    fn declare_triggers_called_at_add_time() {
1739        let called = Arc::new(AtomicBool::new(false));
1740        let called_d = Arc::clone(&called);
1741
1742        let it = crate::item::item_with_triggers(
1743            move |_d| {
1744                called_d.store(true, Ordering::SeqCst);
1745                Ok(())
1746            },
1747            |_| Ok(ControlFlow::Continue),
1748        );
1749
1750        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1751        exec.add(it).unwrap();
1752        assert!(called.load(Ordering::SeqCst));
1753    }
1754
1755    #[test]
1756    fn clear_task_fault_errors_on_running_task() {
1757        use core::time::Duration;
1758        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1759        let task_id = exec
1760            .add(crate::item::item_with_triggers(
1761                |d| {
1762                    d.interval(Duration::from_millis(10));
1763                    Ok(())
1764                },
1765                |_| Ok(crate::ControlFlow::Continue),
1766            ))
1767            .unwrap();
1768        // Task starts in Running state — clearing should error.
1769        let err = exec.clear_task_fault(task_id).expect_err("not faulted");
1770        assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
1771    }
1772
1773    #[test]
1774    fn clear_executor_fault_errors_on_running_executor() {
1775        let exec = Executor::builder().worker_threads(0).build().unwrap();
1776        let err = exec.clear_executor_fault().expect_err("not faulted");
1777        assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
1778    }
1779
1780    #[test]
1781    fn overrun_count_returns_zero_for_new_task() {
1782        use core::time::Duration;
1783        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1784        let task_id = exec
1785            .add(crate::item::item_with_triggers(
1786                |d| {
1787                    d.interval(Duration::from_millis(10));
1788                    d.budget(Duration::from_millis(5));
1789                    Ok(())
1790                },
1791                |_| Ok(crate::ControlFlow::Continue),
1792            ))
1793            .unwrap();
1794        assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
1795    }
1796
1797    #[test]
1798    fn overrun_count_errors_for_unknown_task() {
1799        let exec = Executor::builder().worker_threads(0).build().unwrap();
1800        let err = exec
1801            .overrun_count(crate::TaskId::new("nope"))
1802            .expect_err("unknown task");
1803        assert!(matches!(err, ExecutorError::TaskNotFound(_)));
1804    }
1805
1806    #[test]
1807    fn task_fault_state_starts_running() {
1808        use core::time::Duration;
1809        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1810        let task_id = exec
1811            .add(crate::item::item_with_triggers(
1812                |d| {
1813                    d.interval(Duration::from_millis(10));
1814                    Ok(())
1815                },
1816                |_| Ok(crate::ControlFlow::Continue),
1817            ))
1818            .unwrap();
1819        assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
1820    }
1821
1822    #[test]
1823    fn executor_fault_state_starts_running() {
1824        let exec = Executor::builder().worker_threads(0).build().unwrap();
1825        assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
1826    }
1827}