Skip to main content

taktora_executor/
executor.rs

1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::context::Stoppable;
10use crate::error::ExecutorError;
11use crate::item::ExecutableItem;
12use crate::monitor::{ExecutionMonitor, NoopMonitor};
13use crate::observer::{NoopObserver, Observer};
14use crate::payload::Payload;
15use crate::pool::Pool;
16use crate::task_id::TaskId;
17use crate::task_kind::TaskKind;
18use crate::thread_attrs::ThreadAttributes;
19use crate::trigger::{TriggerDecl, TriggerDeclarer};
20use iceoryx2::node::Node;
21use iceoryx2::port::listener::Listener as IxListener;
22use iceoryx2::prelude::ipc;
23use iceoryx2::prelude::*;
24use iceoryx2::waitset::WaitSetRunResult;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
27use std::time::{Duration, Instant};
28
29/// Monotonically increasing counter so multiple executors in the same process
30/// each get a unique stop-event service name.
31static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33/// One registered task entry.
34pub(crate) struct TaskEntry {
35    /// Task identifier.
36    pub(crate) id: TaskId,
37    /// The kind of work this entry holds (single item or chain).
38    pub(crate) kind: TaskKind,
39    /// Trigger declarations recorded at `add` time.
40    pub(crate) decls: Vec<TriggerDecl>,
41    /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
42    /// time and re-invoked on every dispatch iteration via
43    /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
44    /// that `Pool::submit<F>` requires in threaded mode. Required for
45    /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
46    /// `TaskKind::Graph`, which dispatches its vertices via a separate
47    /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
48    pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
49}
50
51/// Top-level executor. One per process is the typical case.
52pub struct Executor {
53    pub(crate) node: Node<ipc::Service>,
54    pub(crate) pool: Arc<Pool>,
55    pub(crate) tasks: Vec<TaskEntry>,
56    pub(crate) running: Arc<AtomicBool>,
57    pub(crate) stoppable: Stoppable,
58    pub(crate) next_id: AtomicU64,
59    /// Listener for the internal stop event service. Held here so it outlives
60    /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
61    /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
62    pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
63    /// Lifecycle observer. Defaults to a no-op.
64    pub(crate) observer: Arc<dyn Observer>,
65    /// Execution monitor. Defaults to a no-op.
66    pub(crate) monitor: Arc<dyn ExecutionMonitor>,
67    /// Per-iteration error capture slot — allocated once at build time and
68    /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
69    /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
70    /// the per-iteration heap allocation that the previous design incurred.
71    /// Required for `REQ_0060`.
72    pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
73}
74
75// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
76// `SingleThreaded` reason as `IxNotifier`. After construction, the only
77// per-iteration call is `listener.try_wait_one()`, which does not mutate the
78// Rc. `Executor` is never shared across threads (it requires `&mut self` for
79// `run()`), so there is no aliased concurrent mutation.
80#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
81unsafe impl Send for Executor {}
82
83impl Executor {
84    /// Start a new builder.
85    #[must_use]
86    pub fn builder() -> ExecutorBuilder {
87        ExecutorBuilder::default()
88    }
89
90    /// Open or create a pub/sub channel bound to this executor's node.
91    pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
92        Channel::open_or_create(&self.node, name)
93    }
94
95    /// Open or create a request/response service bound to this executor's node.
96    pub fn service<Req, Resp>(
97        &mut self,
98        name: &str,
99    ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
100    where
101        Req: Payload,
102        Resp: Payload,
103    {
104        crate::Service::open_or_create(&self.node, name)
105    }
106
107    /// Add an item to the executor with an auto-generated id.
108    pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
109        let id = TaskId::new(format!(
110            "task-{}",
111            self.next_id.fetch_add(1, Ordering::SeqCst)
112        ));
113        self.add_with_id(id, item)
114    }
115
116    /// Add an item with a user-supplied id.
117    ///
118    /// The item's [`ExecutableItem::task_id`] override takes precedence over
119    /// the caller-supplied `id`, which itself takes precedence over the
120    /// auto-generated id assigned by [`Executor::add`].
121    pub fn add_with_id(
122        &mut self,
123        id: impl Into<TaskId>,
124        mut item: impl ExecutableItem,
125    ) -> Result<TaskId, ExecutorError> {
126        let id_arg: TaskId = id.into();
127        // The item's `task_id()` override wins over the user-supplied id.
128        let id = item.task_id().map_or(id_arg, TaskId::new);
129        let mut declarer = TriggerDeclarer::new_internal();
130        item.declare_triggers(&mut declarer)?;
131        let decls = declarer.into_decls();
132
133        let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
134        let app_id = item_box.app_id();
135        let app_inst = item_box.app_instance_id();
136        // SAFETY: the raw pointer points into the heap allocation of
137        // `item_box`. `Box` keeps that allocation at a stable address even
138        // when the `Box` itself is moved (e.g. when `self.tasks` grows),
139        // so the pointer remains valid for the lifetime of the
140        // `TaskEntry`. See SendItemPtr safety doc for the rest of the
141        // discipline (barrier() pairs with worker access).
142        #[allow(unsafe_code)]
143        let item_ptr =
144            SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
145
146        let job = build_single_job(
147            id.clone(),
148            self.stoppable.clone(),
149            Arc::clone(&self.observer),
150            Arc::clone(&self.monitor),
151            Arc::clone(&self.iter_err),
152            app_id,
153            app_inst,
154            item_ptr,
155        );
156
157        self.tasks.push(TaskEntry {
158            id: id.clone(),
159            kind: TaskKind::Single(item_box),
160            decls,
161            job: Some(job),
162        });
163        Ok(id)
164    }
165
166    /// Add a sequential chain of items. Only the head item's
167    /// `declare_triggers` is consulted; non-head triggers are ignored with a
168    /// tracing warn.
169    pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
170    where
171        I: ExecutableItem,
172        C: IntoIterator<Item = I>,
173    {
174        let id = TaskId::new(format!(
175            "chain-{}",
176            self.next_id.fetch_add(1, Ordering::SeqCst)
177        ));
178        let boxed: Vec<Box<dyn ExecutableItem>> = items
179            .into_iter()
180            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
181            .collect();
182        self.add_chain_with_id_boxed(id, boxed)
183    }
184
185    /// Like [`Executor::add_chain`] but with a user-supplied id.
186    pub fn add_chain_with_id<I, C>(
187        &mut self,
188        id: impl Into<TaskId>,
189        items: C,
190    ) -> Result<TaskId, ExecutorError>
191    where
192        I: ExecutableItem,
193        C: IntoIterator<Item = I>,
194    {
195        let boxed: Vec<Box<dyn ExecutableItem>> = items
196            .into_iter()
197            .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
198            .collect();
199        self.add_chain_with_id_boxed(id.into(), boxed)
200    }
201
202    fn add_chain_with_id_boxed(
203        &mut self,
204        id: TaskId,
205        mut items: Vec<Box<dyn ExecutableItem>>,
206    ) -> Result<TaskId, ExecutorError> {
207        if items.is_empty() {
208            return Err(ExecutorError::Builder(
209                "chain must contain at least one item".into(),
210            ));
211        }
212
213        // Head item's `task_id()` override wins over the user-supplied id.
214        let id = items[0].task_id().map_or(id, TaskId::new);
215
216        // Head's triggers gate the chain.
217        let mut head_declarer = TriggerDeclarer::new_internal();
218        items[0].declare_triggers(&mut head_declarer)?;
219        let decls = head_declarer.into_decls();
220
221        // Warn if non-head items declared triggers (those will be ignored).
222        for (i, body) in items.iter_mut().enumerate().skip(1) {
223            let mut spurious = TriggerDeclarer::new_internal();
224            let _ = body.declare_triggers(&mut spurious);
225            if !spurious.is_empty() {
226                #[cfg(feature = "tracing")]
227                tracing::warn!(
228                    target: "taktora-executor",
229                    task = %id,
230                    position = i,
231                    "non-head chain item declared triggers; they will be ignored"
232                );
233                #[cfg(not(feature = "tracing"))]
234                {
235                    let _ = i;
236                }
237            }
238        }
239
240        let mut items = items;
241        // SAFETY: pointer into the chain's `items` Vec. The Vec lives
242        // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
243        // is stable once `add_chain` returns — `self.tasks` may grow
244        // (moving the `Vec<Box<...>>` header itself), but the Vec's
245        // heap buffer is referenced via the header's data pointer and
246        // is unaffected by header moves. We never resize the chain Vec
247        // after this point. See SendChainPtr safety doc for the rest.
248        #[allow(unsafe_code)]
249        let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
250            &mut items,
251        ));
252        // NB: the pointer above is to the local `items` Vec on the
253        // stack — it's invalid after the `push` below moves items into
254        // the TaskEntry. We rederive a stable pointer after the push.
255        // (See the rebuild step below.)
256        let _ = chain_ptr;
257
258        self.tasks.push(TaskEntry {
259            id: id.clone(),
260            kind: TaskKind::Chain(items),
261            decls,
262            job: None, // populated in the rebuild step below
263        });
264
265        // After the push, the TaskEntry lives at a stable position in
266        // `self.tasks` for the duration of this `add_chain_with_id_boxed`
267        // call. Take a stable pointer to its chain Vec and build the
268        // dispatch closure. If `self.tasks` later grows, the Vec header
269        // inside the TaskEntry moves but the header's data pointer
270        // (which addresses the chain's heap buffer) does not — and the
271        // closure derefs that pointer per dispatch, so it re-reads the
272        // current heap address each time. Sound under the same
273        // discipline as `tasks_ptr` in dispatch_loop.
274        let task_idx = self.tasks.len() - 1;
275        let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
276        {
277            TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
278            // The push above used TaskKind::Chain, so this arm is
279            // unreachable. Mark it explicitly to satisfy `match`.
280            _ => unreachable!("just-pushed task is TaskKind::Chain"),
281        };
282        #[allow(unsafe_code)]
283        let chain_ptr = SendChainPtr::new(chain_vec_ptr);
284        let job = build_chain_job(
285            id.clone(),
286            self.stoppable.clone(),
287            Arc::clone(&self.observer),
288            Arc::clone(&self.monitor),
289            Arc::clone(&self.iter_err),
290            chain_ptr,
291        );
292        self.tasks[task_idx].job = Some(job);
293        Ok(id)
294    }
295
296    /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
297    /// executor is built. Clone before calling `run()` — any clone taken at any
298    /// time will wake the `WaitSet` when `stop()` is called.
299    #[must_use]
300    pub fn stoppable(&self) -> Stoppable {
301        self.stoppable.clone()
302    }
303
304    /// Borrow the underlying iceoryx2 node (escape hatch for power users).
305    pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
306        &self.node
307    }
308
309    /// Begin building a graph. Call `.build()` on the returned builder to
310    /// register the graph as a task.
311    pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
312        ExecutorGraphBuilder {
313            executor: self,
314            builder: crate::graph::GraphBuilder::new(),
315            custom_id: None,
316        }
317    }
318}
319
320/// Builder for [`Executor`].
321pub struct ExecutorBuilder {
322    worker_threads: Option<usize>,
323    observer: Option<Arc<dyn Observer>>,
324    monitor: Option<Arc<dyn ExecutionMonitor>>,
325    worker_attrs: ThreadAttributes,
326}
327
328impl Default for ExecutorBuilder {
329    fn default() -> Self {
330        Self {
331            worker_threads: None,
332            observer: None,
333            monitor: None,
334            worker_attrs: ThreadAttributes::new(),
335        }
336    }
337}
338
339impl ExecutorBuilder {
340    /// Number of worker threads. `0` → inline (no pool). Default → physical
341    /// cores.
342    #[must_use]
343    pub const fn worker_threads(mut self, n: usize) -> Self {
344        self.worker_threads = Some(n);
345        self
346    }
347
348    /// Attach a lifecycle observer. If not called, a no-op observer is used.
349    #[must_use]
350    pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
351        self.observer = Some(obs);
352        self
353    }
354
355    /// Attach an execution monitor. If not called, a no-op monitor is used.
356    #[must_use]
357    pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
358        self.monitor = Some(mon);
359        self
360    }
361
362    /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
363    /// for worker threads. Has no effect when `worker_threads` is `0` (inline
364    /// mode). Requires the `thread_attrs` feature for non-default settings.
365    #[must_use]
366    #[allow(clippy::missing_const_for_fn)]
367    pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
368        self.worker_attrs = attrs;
369        self
370    }
371
372    /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
373    /// internal stop-event service so that any `Stoppable` clone (taken before
374    /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
375    ///
376    /// # Panics
377    ///
378    /// Panics if the internally-generated stop-event service name exceeds the
379    /// iceoryx2 service name length limit (this cannot happen under normal use
380    /// because the name is derived from the process id and a monotonic counter).
381    #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
382    #[track_caller]
383    pub fn build(self) -> Result<Executor, ExecutorError> {
384        let node = NodeBuilder::new()
385            .create::<ipc::Service>()
386            .map_err(ExecutorError::iceoryx2)?;
387
388        let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
389        let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
390
391        // Build the internal stop event service with a unique-per-process name
392        // so multiple executors in the same process don't collide.
393        let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
394        let stop_topic = format!(
395            "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
396            std::process::id()
397        );
398        let stop_event = node
399            .service_builder(&stop_topic.as_str().try_into().unwrap())
400            .event()
401            .open_or_create()
402            .map_err(ExecutorError::iceoryx2)?;
403
404        let stop_notifier = Arc::new(
405            stop_event
406                .notifier_builder()
407                .create()
408                .map_err(ExecutorError::iceoryx2)?,
409        );
410
411        // SAFETY: see module-level note; Arc<IxListener> is held here and only
412        // accessed on the executor thread.
413        let stop_listener = Arc::new(
414            stop_event
415                .listener_builder()
416                .create()
417                .map_err(ExecutorError::iceoryx2)?,
418        );
419
420        // Wire the notifier into the Stoppable so every clone is waker-aware
421        // from the moment the executor is built.
422        let stoppable = Stoppable::with_waker(stop_notifier);
423
424        let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
425
426        let monitor: Arc<dyn ExecutionMonitor> =
427            self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
428
429        let exec = Executor {
430            node,
431            pool,
432            tasks: Vec::new(),
433            running: Arc::new(AtomicBool::new(false)),
434            stoppable,
435            next_id: AtomicU64::new(0),
436            stop_listener,
437            observer,
438            monitor,
439            iter_err: Arc::new(std::sync::Mutex::new(None)),
440        };
441
442        Ok(exec)
443    }
444}
445
446// ── Run loop ──────────────────────────────────────────────────────────────────
447
448impl Executor {
449    /// Run the executor until [`Stoppable::stop`] is called or a task signals
450    /// stop via [`crate::Context::stop_executor`].
451    ///
452    /// # Errors
453    ///
454    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
455    ///
456    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
457    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
458    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
459    ///
460    /// If multiple items error in the same dispatch iteration, only the first
461    /// is preserved; subsequent errors are discarded silently. To observe
462    /// every error, attach an [`Observer`](crate::Observer) and read errors
463    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
464    pub fn run(&mut self) -> Result<(), ExecutorError> {
465        self.run_inner(RunMode::Forever)
466    }
467
468    /// Run for at most `max` wall-clock duration, then return.
469    ///
470    /// # Errors
471    ///
472    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
473    ///
474    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
475    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
476    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
477    ///
478    /// If multiple items error in the same dispatch iteration, only the first
479    /// is preserved; subsequent errors are discarded silently. To observe
480    /// every error, attach an [`Observer`](crate::Observer) and read errors
481    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
482    pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
483        self.run_inner(RunMode::Until(Instant::now() + max))
484    }
485
486    /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
487    ///
488    /// # Errors
489    ///
490    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
491    ///
492    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
493    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
494    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
495    ///
496    /// If multiple items error in the same dispatch iteration, only the first
497    /// is preserved; subsequent errors are discarded silently. To observe
498    /// every error, attach an [`Observer`](crate::Observer) and read errors
499    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
500    pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
501        self.run_inner(RunMode::Iterations(n))
502    }
503
504    /// Run until `predicate()` returns true. Checked after each `WaitSet`
505    /// wakeup.
506    ///
507    /// # Errors
508    ///
509    /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
510    ///
511    /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
512    /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
513    /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
514    ///
515    /// If multiple items error in the same dispatch iteration, only the first
516    /// is preserved; subsequent errors are discarded silently. To observe
517    /// every error, attach an [`Observer`](crate::Observer) and read errors
518    /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
519    pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
520        self.run_inner(RunMode::Predicate(&mut predicate))
521    }
522}
523
524enum RunMode<'a> {
525    Forever,
526    Until(Instant),
527    Iterations(usize),
528    Predicate(&'a mut dyn FnMut() -> bool),
529}
530
531impl Executor {
532    fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
533        // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
534        // remains true permanently. Calling `run()` again after a stop will return
535        // promptly without doing any meaningful work (it blocks until the first
536        // trigger fires, then immediately exits the dispatch loop). Task 10's
537        // Runner accommodates this by treating an Executor as one-shot: each
538        // Runner owns the Executor and consumes it.
539        if self.running.swap(true, Ordering::SeqCst) {
540            return Err(ExecutorError::AlreadyRunning);
541        }
542
543        self.observer.on_executor_up();
544        let result = self.dispatch_loop(&mut mode);
545        match &result {
546            Ok(()) => self.observer.on_executor_down(),
547            Err(e) => self.observer.on_executor_error(e),
548        }
549
550        self.running.store(false, Ordering::SeqCst);
551        result
552    }
553
554    #[allow(
555        unsafe_code,
556        clippy::too_many_lines,
557        clippy::ref_as_ptr,
558        clippy::borrow_as_ptr
559    )]
560    fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
561        let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
562            .create()
563            .map_err(ExecutorError::iceoryx2)?;
564
565        // Keep Arc<RawListener> alive for at least as long as the WaitSet
566        // guards — the guard borrows the listener via 'attachment lifetime.
567        let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
568        // Guards must outlive the run loop.
569        let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
570        // Maps guard index → task index.
571        let mut attachment_to_task: Vec<usize> = Vec::new();
572
573        for (task_idx, task) in self.tasks.iter().enumerate() {
574            for decl in &task.decls {
575                match decl {
576                    TriggerDecl::Subscriber { listener } => {
577                        // Clone Arc to extend listener lifetime to this scope.
578                        let l = Arc::clone(listener);
579                        listener_storage.push(l);
580                        let l_ref = listener_storage.last().unwrap().as_ref();
581                        // SAFETY: we cast the reference lifetime to match
582                        // 'waitset / 'attachment; both listener_storage and
583                        // waitset are stack-local and dropped together at the
584                        // end of dispatch_loop.  Guards are dropped before
585                        // listener_storage below.
586                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
587                        let guard = waitset
588                            .attach_notification(l_ref)
589                            .map_err(ExecutorError::iceoryx2)?;
590                        guards.push(guard);
591                        attachment_to_task.push(task_idx);
592                    }
593                    TriggerDecl::Interval(d) => {
594                        let guard = waitset
595                            .attach_interval(*d)
596                            .map_err(ExecutorError::iceoryx2)?;
597                        guards.push(guard);
598                        attachment_to_task.push(task_idx);
599                    }
600                    TriggerDecl::Deadline { listener, deadline } => {
601                        let l = Arc::clone(listener);
602                        listener_storage.push(l);
603                        let l_ref = listener_storage.last().unwrap().as_ref();
604                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
605                        let guard = waitset
606                            .attach_deadline(l_ref, *deadline)
607                            .map_err(ExecutorError::iceoryx2)?;
608                        guards.push(guard);
609                        attachment_to_task.push(task_idx);
610                    }
611                    TriggerDecl::RawListener(listener) => {
612                        let l = Arc::clone(listener);
613                        listener_storage.push(l);
614                        let l_ref = listener_storage.last().unwrap().as_ref();
615                        let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
616                        let guard = waitset
617                            .attach_notification(l_ref)
618                            .map_err(ExecutorError::iceoryx2)?;
619                        guards.push(guard);
620                        attachment_to_task.push(task_idx);
621                    }
622                }
623            }
624        }
625
626        // Attach the internal stop listener so the WaitSet wakes when
627        // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
628        // struct which is valid for the lifetime of dispatch_loop. We use the
629        // same raw-pointer-cast pattern as user listeners above.
630        //
631        // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
632        // exclusively borrowed for the duration of `run_inner` (which calls
633        // `dispatch_loop`). The listener is not freed while the guard is alive
634        // because the Arc keeps it alive and `self` outlives this function.
635        let stop_listener_ref: &IxListener<ipc::Service> =
636            unsafe { &*(self.stop_listener.as_ref() as *const _) };
637        let _stop_guard = waitset
638            .attach_notification(stop_listener_ref)
639            .map_err(ExecutorError::iceoryx2)?;
640
641        let iterations_done = AtomicUsize::new(0);
642        let stop_flag = self.stoppable.clone();
643
644        loop {
645            // Reset the pre-allocated per-iteration error slot (REQ_0060):
646            // the slot is owned by `self.iter_err`, allocated once at build
647            // time. Pool worker closures obtain a refcount-only clone of
648            // the `Arc`; the slot itself is reused across iterations.
649            *self.iter_err.lock().unwrap() = None;
650
651            // SAFETY: we capture &mut self.tasks via a raw pointer because
652            // wait_and_process expects FnMut and Rust can't see the closure
653            // outlives `self`. The discipline that makes this sound:
654            //   1. The closure body on the executor thread is the *only* code that
655            //      reads `tasks_ptr`. The pool jobs it submits hold borrowed
656            //      `*mut dyn ExecutableItem` slices into individual TaskEntries,
657            //      not into the Vec itself, so they don't race with the Vec.
658            //   2. `pool.barrier()` at the end of this callback ensures every
659            //      submitted pool job has completed (and dropped its raw pointer)
660            //      before the callback returns. The next iteration of the WaitSet
661            //      loop is therefore the sole user of `tasks_ptr` again.
662            //   3. The Vec is never resized inside this loop (no `push` / `remove`
663            //      after dispatch starts), so the underlying buffer addresses are
664            //      stable for the lifetime of `dispatch_loop`.
665            let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
666            let pool = &self.pool;
667            // Refcount-only clone of the pre-allocated error slot. Pool jobs
668            // need a `'static` handle, and an `Arc::clone` does not allocate.
669            // The Single/Chain paths use the closure baked into `task.job`,
670            // which already captured stable Arc clones at `add`-time; the
671            // Graph path uses closures pre-built by `prepare_dispatch`. Only
672            // the error-aggregation logic on the WaitSet thread still needs
673            // the slot here.
674            let iter_err_inner = Arc::clone(&self.iter_err);
675            // Raw pointer to the stop listener for draining inside the callback.
676            // SAFETY: same as stop_listener_ref above — the Arc is alive for
677            // the lifetime of dispatch_loop.
678            let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
679
680            let cb_result = waitset.wait_and_process_once(
681                |attachment_id: WaitSetAttachmentId<ipc::Service>| {
682                    // Drain stop notifications first (no dispatch — the stop_flag
683                    // check after the callback returns handles termination).
684                    // SAFETY: stop_listener_ptr is valid for the duration of the
685                    // closure; the Arc in self.stop_listener keeps it alive.
686                    let stop_l = unsafe { &*stop_listener_ptr };
687                    while let Ok(Some(_)) = stop_l.try_wait_one() {}
688
689                    for (i, guard) in guards.iter().enumerate() {
690                        let fired = attachment_id.has_event_from(guard)
691                            || attachment_id.has_missed_deadline(guard);
692                        if !fired {
693                            continue;
694                        }
695                        let task_idx = attachment_to_task[i];
696
697                        // SAFETY: we are the only thread that may touch
698                        // `self` during the callback. wait_and_process_once
699                        // is single-threaded; we hold &mut self in
700                        // dispatch_loop. The pointer is valid for the
701                        // duration of this closure.
702                        let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
703
704                        match &mut task.kind {
705                            TaskKind::Single(_) | TaskKind::Chain(_) => {
706                                // The dispatch closure was pre-allocated at
707                                // task-add time and stashed on `task.job`.
708                                // Submit it via `submit_borrowed` — no
709                                // per-iteration Box allocation. Required by
710                                // REQ_0060.
711                                let job_box = task
712                                    .job
713                                    .as_deref_mut()
714                                    .expect("Single/Chain tasks carry a pre-built job");
715                                let job_ptr: *mut (dyn FnMut() + Send) =
716                                    job_box as *mut (dyn FnMut() + Send);
717                                // SAFETY: the closure lives in
718                                // `task.job` which is owned by
719                                // `self.tasks[task_idx]`; `tasks_ptr` is
720                                // sound for the duration of this
721                                // callback. `pool.barrier()` below
722                                // finishes the closure invocation before
723                                // we re-enter the next iteration's
724                                // callback. The WaitSet thread does not
725                                // touch the closure between this submit
726                                // and that barrier.
727                                #[allow(unsafe_code)]
728                                unsafe {
729                                    pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
730                                }
731                            }
732                            TaskKind::Graph(graph) => {
733                                // Outer driver runs on the WaitSet thread; vertices run on the
734                                // pool. The graph holds its own pre-built per-vertex closures
735                                // and SPSC ready ring (REQ_0060), so dispatch is allocation-free
736                                // in steady state.
737                                let outcome = graph.run_once_borrowed(pool);
738                                if let Some(source) = outcome.error {
739                                    let mut g = iter_err_inner.lock().unwrap();
740                                    if g.is_none() {
741                                        *g = Some(ExecutorError::Item {
742                                            task_id: task.id.clone(),
743                                            source,
744                                        });
745                                    }
746                                }
747                                let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
748                            }
749                        }
750                    }
751
752                    // Wait for all submitted jobs to finish before leaving
753                    // the callback scope (validates item_ptr safety contract).
754                    pool.barrier();
755                    CallbackProgression::Continue
756                },
757            );
758
759            let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
760
761            // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
762            // here for a clean exit.
763            if matches!(
764                cb_result,
765                WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
766            ) {
767                return Ok(());
768            }
769
770            // Extract the error before dropping the MutexGuard — avoids
771            // holding the lock across the return (clippy::significant_drop_in_scrutinee).
772            let maybe_err = self.iter_err.lock().unwrap().take();
773            if let Some(err) = maybe_err {
774                return Err(err);
775            }
776            if stop_flag.is_stopped() {
777                return Ok(());
778            }
779
780            iterations_done.fetch_add(1, Ordering::SeqCst);
781            match mode {
782                RunMode::Forever => {}
783                RunMode::Iterations(n) => {
784                    if iterations_done.load(Ordering::SeqCst) >= *n {
785                        return Ok(());
786                    }
787                }
788                RunMode::Until(deadline) => {
789                    if Instant::now() >= *deadline {
790                        return Ok(());
791                    }
792                }
793                RunMode::Predicate(p) => {
794                    if (p)() {
795                        return Ok(());
796                    }
797                }
798            }
799        }
800    }
801}
802
803/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
804/// `Pool::submit`. The send is safe because:
805///   1. The executor guarantees at most one invocation of a given item at a
806///      time (via `pool.barrier()` before the pointer is reused).
807///   2. `ExecutableItem: Send`, so moving the pointee across threads is sound
808///      when no aliasing exists.
809#[allow(unsafe_code)]
810struct SendItemPtr {
811    ptr: *mut dyn ExecutableItem,
812}
813
814impl SendItemPtr {
815    fn new(ptr: *mut dyn ExecutableItem) -> Self {
816        Self { ptr }
817    }
818
819    /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
820    /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
821    /// dispatch closure to be reusable across iterations without allocation).
822    fn get(&self) -> *mut dyn ExecutableItem {
823        self.ptr
824    }
825}
826
827// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
828// closure can borrow `&SendItemPtr` per invocation without making the
829// closure itself `!Send`.
830#[allow(unsafe_code)]
831unsafe impl Send for SendItemPtr {}
832#[allow(unsafe_code)]
833unsafe impl Sync for SendItemPtr {}
834
835/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
836/// closure can iterate the chain's items in place without first
837/// collecting them into a freshly-allocated `Vec`. The send is safe
838/// for the same reason as [`SendItemPtr`] (see above): the executor
839/// holds `&mut self` for the duration of `dispatch_loop`, and the
840/// `pool.barrier()` at the end of each callback ensures the closure
841/// has finished using this pointer before the Vec could be touched
842/// from the `WaitSet` thread again. The Vec is never resized after
843/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
844/// allocate per iteration.
845#[allow(unsafe_code)]
846struct SendChainPtr {
847    ptr: *mut Vec<Box<dyn ExecutableItem>>,
848}
849
850impl SendChainPtr {
851    fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
852        Self { ptr }
853    }
854
855    fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
856        self.ptr
857    }
858}
859
860// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
861// borrow `&SendChainPtr` per invocation while staying `Send`.
862#[allow(unsafe_code)]
863unsafe impl Send for SendChainPtr {}
864#[allow(unsafe_code)]
865unsafe impl Sync for SendChainPtr {}
866
867/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
868///
869/// The returned closure is stored on `TaskEntry::job` and invoked once
870/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
871/// performs no allocation. The closure captures Arc clones of the
872/// executor's shared state — those clones are refcount-only at build
873/// time and are reused on every dispatch. Required for `REQ_0060`.
874#[allow(clippy::too_many_arguments)]
875fn build_single_job(
876    id: TaskId,
877    stop: Stoppable,
878    obs: Arc<dyn Observer>,
879    mon: Arc<dyn ExecutionMonitor>,
880    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
881    app_id: Option<u32>,
882    app_inst: Option<u32>,
883    item_ptr: SendItemPtr,
884) -> Box<dyn FnMut() + Send + 'static> {
885    Box::new(move || {
886        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
887        if let Some(aid) = app_id {
888            obs.on_app_start(id.clone(), aid, app_inst);
889        }
890        let raw = item_ptr.get();
891        let started = std::time::Instant::now();
892        mon.pre_execute(id.clone(), started);
893        // SAFETY: barrier() pairs with this invocation; the WaitSet
894        // thread does not touch the item between `submit_borrowed` and
895        // the matching `barrier()`. See SendItemPtr safety doc.
896        #[allow(unsafe_code)]
897        let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
898        let took = started.elapsed();
899        mon.post_execute(id.clone(), started, took, res.is_ok());
900        if let Err(ref e) = res {
901            obs.on_app_error(id.clone(), e.as_ref());
902        }
903        if app_id.is_some() {
904            obs.on_app_stop(id.clone());
905        }
906        record_first_err(&err_slot, &id, res);
907    })
908}
909
910/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
911fn build_chain_job(
912    id: TaskId,
913    stop: Stoppable,
914    obs: Arc<dyn Observer>,
915    mon: Arc<dyn ExecutionMonitor>,
916    err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
917    chain_ptr: SendChainPtr,
918) -> Box<dyn FnMut() + Send + 'static> {
919    Box::new(move || {
920        let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
921        // SAFETY: barrier() pairs with this invocation; the chain Vec
922        // and the items it owns are not touched by the WaitSet thread
923        // until barrier() returns. See SendChainPtr safety doc.
924        #[allow(unsafe_code)]
925        let chain_items = unsafe { &mut *chain_ptr.get() };
926        for item_box in chain_items.iter_mut() {
927            let app_id = item_box.app_id();
928            let app_inst = item_box.app_instance_id();
929            if let Some(aid) = app_id {
930                obs.on_app_start(id.clone(), aid, app_inst);
931            }
932            let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
933            let started = std::time::Instant::now();
934            mon.pre_execute(id.clone(), started);
935            #[allow(unsafe_code)]
936            let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
937            let took = started.elapsed();
938            mon.post_execute(id.clone(), started, took, res.is_ok());
939            if let Err(ref e) = res {
940                obs.on_app_error(id.clone(), e.as_ref());
941            }
942            if app_id.is_some() {
943                obs.on_app_stop(id.clone());
944            }
945            match res {
946                Ok(crate::ControlFlow::Continue) => {}
947                Ok(crate::ControlFlow::StopChain) => break,
948                Err(_) => {
949                    record_first_err(&err_slot, &id, res);
950                    break;
951                }
952            }
953        }
954    })
955}
956
957#[derive(Debug)]
958struct PanickedTask(String);
959
960impl core::fmt::Display for PanickedTask {
961    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
962        write!(f, "task panicked: {}", self.0)
963    }
964}
965
966impl std::error::Error for PanickedTask {}
967
968/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
969#[allow(clippy::option_if_let_else)]
970fn run_item_catch_unwind(
971    item: &mut dyn ExecutableItem,
972    ctx: &mut crate::context::Context<'_>,
973) -> crate::ExecuteResult {
974    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
975        |payload| {
976            let msg = if let Some(s) = payload.downcast_ref::<&str>() {
977                (*s).to_string()
978            } else if let Some(s) = payload.downcast_ref::<String>() {
979                s.clone()
980            } else {
981                "panicked task".to_string()
982            };
983            Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
984        },
985    )
986}
987
988/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
989/// without depending on its private name.
990pub(crate) fn run_item_catch_unwind_external(
991    item: &mut dyn ExecutableItem,
992    ctx: &mut crate::context::Context<'_>,
993) -> crate::ExecuteResult {
994    run_item_catch_unwind(item, ctx)
995}
996
997/// Record the first error into `slot`. Subsequent errors are silently dropped.
998fn record_first_err(
999    slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
1000    id: &TaskId,
1001    res: crate::ExecuteResult,
1002) {
1003    if let Err(source) = res {
1004        let mut g = slot.lock().unwrap();
1005        if g.is_none() {
1006            *g = Some(ExecutorError::Item {
1007                task_id: id.clone(),
1008                source,
1009            });
1010        }
1011    }
1012}
1013
1014// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
1015
1016/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
1017/// into a registered task.
1018pub struct ExecutorGraphBuilder<'e> {
1019    executor: &'e mut Executor,
1020    builder: crate::graph::GraphBuilder,
1021    custom_id: Option<TaskId>,
1022}
1023
1024impl ExecutorGraphBuilder<'_> {
1025    /// Add a vertex to the graph; returns its handle.
1026    pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
1027        self.builder.vertex(item)
1028    }
1029
1030    /// Add a directed edge from one vertex to another.
1031    pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
1032        self.builder.edge(from, to);
1033        self
1034    }
1035
1036    /// Designate the root vertex (its triggers gate the graph).
1037    pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
1038        self.builder.root(v);
1039        self
1040    }
1041
1042    /// Override the auto-generated id with a custom one.
1043    pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
1044        self.custom_id = Some(id.into());
1045        self
1046    }
1047
1048    /// Validate and register the graph. Returns the task id.
1049    ///
1050    /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
1051    /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
1052    /// precedence over the auto-generated id.
1053    pub fn build(self) -> Result<TaskId, ExecutorError> {
1054        let g = self.builder.finish()?;
1055        // Root vertex's task_id() override wins over the custom id, which wins
1056        // over the auto-generated fallback.
1057        let auto_id = || {
1058            TaskId::new(format!(
1059                "graph-{}",
1060                self.executor.next_id.fetch_add(1, Ordering::SeqCst)
1061            ))
1062        };
1063        let id = g
1064            .root_task_id()
1065            .map(TaskId::new)
1066            .or(self.custom_id)
1067            .unwrap_or_else(auto_id);
1068        let decls = g.decls.clone();
1069
1070        // Box the graph for address stability — per-vertex dispatch
1071        // closures capture `*const Graph` and must not see it move.
1072        let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
1073        // Pre-build the per-vertex closures now that we know the
1074        // task_id and have access to the executor's shared state.
1075        graph_box.prepare_dispatch(
1076            id.clone(),
1077            self.executor.stoppable.clone(),
1078            Arc::clone(&self.executor.observer),
1079            Arc::clone(&self.executor.monitor),
1080            Arc::clone(&self.executor.iter_err),
1081        );
1082
1083        self.executor.tasks.push(TaskEntry {
1084            id: id.clone(),
1085            kind: TaskKind::Graph(graph_box),
1086            decls,
1087            // Graph tasks dispatch their vertices via `vertex_jobs`
1088            // stored inside the `Graph`; the per-task `job` slot
1089            // is unused for graphs.
1090            job: None,
1091        });
1092        Ok(id)
1093    }
1094}
1095
1096// ── Unit tests ────────────────────────────────────────────────────────────────
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::*;
1101    use crate::{ControlFlow, item};
1102
1103    #[test]
1104    fn add_returns_unique_ids() {
1105        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1106        let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1107        let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1108        assert_ne!(a, b);
1109    }
1110
1111    #[test]
1112    fn custom_id_is_preserved() {
1113        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1114        let id = exec
1115            .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
1116            .unwrap();
1117        assert_eq!(id.as_str(), "my-task");
1118    }
1119
1120    #[test]
1121    fn declare_triggers_called_at_add_time() {
1122        let called = Arc::new(AtomicBool::new(false));
1123        let called_d = Arc::clone(&called);
1124
1125        let it = crate::item::item_with_triggers(
1126            move |_d| {
1127                called_d.store(true, Ordering::SeqCst);
1128                Ok(())
1129            },
1130            |_| Ok(ControlFlow::Continue),
1131        );
1132
1133        let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1134        exec.add(it).unwrap();
1135        assert!(called.load(Ordering::SeqCst));
1136    }
1137}