taktora_executor/executor.rs
1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::context::Stoppable;
10use crate::error::ExecutorError;
11use crate::item::ExecutableItem;
12use crate::monitor::{ExecutionMonitor, NoopMonitor};
13use crate::observer::{NoopObserver, Observer};
14use crate::payload::Payload;
15use crate::pool::Pool;
16use crate::task_id::TaskId;
17use crate::task_kind::TaskKind;
18use crate::thread_attrs::ThreadAttributes;
19use crate::trigger::{TriggerDecl, TriggerDeclarer};
20use iceoryx2::node::Node;
21use iceoryx2::port::listener::Listener as IxListener;
22use iceoryx2::prelude::ipc;
23use iceoryx2::prelude::*;
24use iceoryx2::waitset::WaitSetRunResult;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
27use std::time::{Duration, Instant};
28
29/// Monotonically increasing counter so multiple executors in the same process
30/// each get a unique stop-event service name.
31static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33/// One registered task entry.
34pub(crate) struct TaskEntry {
35 /// Task identifier.
36 pub(crate) id: TaskId,
37 /// The kind of work this entry holds (single item or chain).
38 pub(crate) kind: TaskKind,
39 /// Trigger declarations recorded at `add` time.
40 pub(crate) decls: Vec<TriggerDecl>,
41 /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
42 /// time and re-invoked on every dispatch iteration via
43 /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
44 /// that `Pool::submit<F>` requires in threaded mode. Required for
45 /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
46 /// `TaskKind::Graph`, which dispatches its vertices via a separate
47 /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
48 pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
49}
50
51/// Top-level executor. One per process is the typical case.
52pub struct Executor {
53 pub(crate) node: Node<ipc::Service>,
54 pub(crate) pool: Arc<Pool>,
55 pub(crate) tasks: Vec<TaskEntry>,
56 pub(crate) running: Arc<AtomicBool>,
57 pub(crate) stoppable: Stoppable,
58 pub(crate) next_id: AtomicU64,
59 /// Listener for the internal stop event service. Held here so it outlives
60 /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
61 /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
62 pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
63 /// Lifecycle observer. Defaults to a no-op.
64 pub(crate) observer: Arc<dyn Observer>,
65 /// Execution monitor. Defaults to a no-op.
66 pub(crate) monitor: Arc<dyn ExecutionMonitor>,
67 /// Per-iteration error capture slot — allocated once at build time and
68 /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
69 /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
70 /// the per-iteration heap allocation that the previous design incurred.
71 /// Required for `REQ_0060`.
72 pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
73}
74
75// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
76// `SingleThreaded` reason as `IxNotifier`. After construction, the only
77// per-iteration call is `listener.try_wait_one()`, which does not mutate the
78// Rc. `Executor` is never shared across threads (it requires `&mut self` for
79// `run()`), so there is no aliased concurrent mutation.
80#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
81unsafe impl Send for Executor {}
82
83impl Executor {
84 /// Start a new builder.
85 #[must_use]
86 pub fn builder() -> ExecutorBuilder {
87 ExecutorBuilder::default()
88 }
89
90 /// Open or create a pub/sub channel bound to this executor's node.
91 pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
92 Channel::open_or_create(&self.node, name)
93 }
94
95 /// Open or create a request/response service bound to this executor's node.
96 pub fn service<Req, Resp>(
97 &mut self,
98 name: &str,
99 ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
100 where
101 Req: Payload,
102 Resp: Payload,
103 {
104 crate::Service::open_or_create(&self.node, name)
105 }
106
107 /// Add an item to the executor with an auto-generated id.
108 pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
109 let id = TaskId::new(format!(
110 "task-{}",
111 self.next_id.fetch_add(1, Ordering::SeqCst)
112 ));
113 self.add_with_id(id, item)
114 }
115
116 /// Add an item with a user-supplied id.
117 ///
118 /// The item's [`ExecutableItem::task_id`] override takes precedence over
119 /// the caller-supplied `id`, which itself takes precedence over the
120 /// auto-generated id assigned by [`Executor::add`].
121 pub fn add_with_id(
122 &mut self,
123 id: impl Into<TaskId>,
124 mut item: impl ExecutableItem,
125 ) -> Result<TaskId, ExecutorError> {
126 let id_arg: TaskId = id.into();
127 // The item's `task_id()` override wins over the user-supplied id.
128 let id = item.task_id().map_or(id_arg, TaskId::new);
129 let mut declarer = TriggerDeclarer::new_internal();
130 item.declare_triggers(&mut declarer)?;
131 let decls = declarer.into_decls();
132
133 let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
134 let app_id = item_box.app_id();
135 let app_inst = item_box.app_instance_id();
136 // SAFETY: the raw pointer points into the heap allocation of
137 // `item_box`. `Box` keeps that allocation at a stable address even
138 // when the `Box` itself is moved (e.g. when `self.tasks` grows),
139 // so the pointer remains valid for the lifetime of the
140 // `TaskEntry`. See SendItemPtr safety doc for the rest of the
141 // discipline (barrier() pairs with worker access).
142 #[allow(unsafe_code)]
143 let item_ptr =
144 SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
145
146 let job = build_single_job(
147 id.clone(),
148 self.stoppable.clone(),
149 Arc::clone(&self.observer),
150 Arc::clone(&self.monitor),
151 Arc::clone(&self.iter_err),
152 app_id,
153 app_inst,
154 item_ptr,
155 );
156
157 self.tasks.push(TaskEntry {
158 id: id.clone(),
159 kind: TaskKind::Single(item_box),
160 decls,
161 job: Some(job),
162 });
163 Ok(id)
164 }
165
166 /// Add a sequential chain of items. Only the head item's
167 /// `declare_triggers` is consulted; non-head triggers are ignored with a
168 /// tracing warn.
169 pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
170 where
171 I: ExecutableItem,
172 C: IntoIterator<Item = I>,
173 {
174 let id = TaskId::new(format!(
175 "chain-{}",
176 self.next_id.fetch_add(1, Ordering::SeqCst)
177 ));
178 let boxed: Vec<Box<dyn ExecutableItem>> = items
179 .into_iter()
180 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
181 .collect();
182 self.add_chain_with_id_boxed(id, boxed)
183 }
184
185 /// Like [`Executor::add_chain`] but with a user-supplied id.
186 pub fn add_chain_with_id<I, C>(
187 &mut self,
188 id: impl Into<TaskId>,
189 items: C,
190 ) -> Result<TaskId, ExecutorError>
191 where
192 I: ExecutableItem,
193 C: IntoIterator<Item = I>,
194 {
195 let boxed: Vec<Box<dyn ExecutableItem>> = items
196 .into_iter()
197 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
198 .collect();
199 self.add_chain_with_id_boxed(id.into(), boxed)
200 }
201
202 fn add_chain_with_id_boxed(
203 &mut self,
204 id: TaskId,
205 mut items: Vec<Box<dyn ExecutableItem>>,
206 ) -> Result<TaskId, ExecutorError> {
207 if items.is_empty() {
208 return Err(ExecutorError::Builder(
209 "chain must contain at least one item".into(),
210 ));
211 }
212
213 // Head item's `task_id()` override wins over the user-supplied id.
214 let id = items[0].task_id().map_or(id, TaskId::new);
215
216 // Head's triggers gate the chain.
217 let mut head_declarer = TriggerDeclarer::new_internal();
218 items[0].declare_triggers(&mut head_declarer)?;
219 let decls = head_declarer.into_decls();
220
221 // Warn if non-head items declared triggers (those will be ignored).
222 for (i, body) in items.iter_mut().enumerate().skip(1) {
223 let mut spurious = TriggerDeclarer::new_internal();
224 let _ = body.declare_triggers(&mut spurious);
225 if !spurious.is_empty() {
226 #[cfg(feature = "tracing")]
227 tracing::warn!(
228 target: "taktora-executor",
229 task = %id,
230 position = i,
231 "non-head chain item declared triggers; they will be ignored"
232 );
233 #[cfg(not(feature = "tracing"))]
234 {
235 let _ = i;
236 }
237 }
238 }
239
240 let mut items = items;
241 // SAFETY: pointer into the chain's `items` Vec. The Vec lives
242 // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
243 // is stable once `add_chain` returns — `self.tasks` may grow
244 // (moving the `Vec<Box<...>>` header itself), but the Vec's
245 // heap buffer is referenced via the header's data pointer and
246 // is unaffected by header moves. We never resize the chain Vec
247 // after this point. See SendChainPtr safety doc for the rest.
248 #[allow(unsafe_code)]
249 let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
250 &mut items,
251 ));
252 // NB: the pointer above is to the local `items` Vec on the
253 // stack — it's invalid after the `push` below moves items into
254 // the TaskEntry. We rederive a stable pointer after the push.
255 // (See the rebuild step below.)
256 let _ = chain_ptr;
257
258 self.tasks.push(TaskEntry {
259 id: id.clone(),
260 kind: TaskKind::Chain(items),
261 decls,
262 job: None, // populated in the rebuild step below
263 });
264
265 // After the push, the TaskEntry lives at a stable position in
266 // `self.tasks` for the duration of this `add_chain_with_id_boxed`
267 // call. Take a stable pointer to its chain Vec and build the
268 // dispatch closure. If `self.tasks` later grows, the Vec header
269 // inside the TaskEntry moves but the header's data pointer
270 // (which addresses the chain's heap buffer) does not — and the
271 // closure derefs that pointer per dispatch, so it re-reads the
272 // current heap address each time. Sound under the same
273 // discipline as `tasks_ptr` in dispatch_loop.
274 let task_idx = self.tasks.len() - 1;
275 let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
276 {
277 TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
278 // The push above used TaskKind::Chain, so this arm is
279 // unreachable. Mark it explicitly to satisfy `match`.
280 _ => unreachable!("just-pushed task is TaskKind::Chain"),
281 };
282 #[allow(unsafe_code)]
283 let chain_ptr = SendChainPtr::new(chain_vec_ptr);
284 let job = build_chain_job(
285 id.clone(),
286 self.stoppable.clone(),
287 Arc::clone(&self.observer),
288 Arc::clone(&self.monitor),
289 Arc::clone(&self.iter_err),
290 chain_ptr,
291 );
292 self.tasks[task_idx].job = Some(job);
293 Ok(id)
294 }
295
296 /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
297 /// executor is built. Clone before calling `run()` — any clone taken at any
298 /// time will wake the `WaitSet` when `stop()` is called.
299 #[must_use]
300 pub fn stoppable(&self) -> Stoppable {
301 self.stoppable.clone()
302 }
303
304 /// Borrow the underlying iceoryx2 node (escape hatch for power users).
305 pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
306 &self.node
307 }
308
309 /// Begin building a graph. Call `.build()` on the returned builder to
310 /// register the graph as a task.
311 pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
312 ExecutorGraphBuilder {
313 executor: self,
314 builder: crate::graph::GraphBuilder::new(),
315 custom_id: None,
316 }
317 }
318}
319
320/// Builder for [`Executor`].
321pub struct ExecutorBuilder {
322 worker_threads: Option<usize>,
323 observer: Option<Arc<dyn Observer>>,
324 monitor: Option<Arc<dyn ExecutionMonitor>>,
325 worker_attrs: ThreadAttributes,
326}
327
328impl Default for ExecutorBuilder {
329 fn default() -> Self {
330 Self {
331 worker_threads: None,
332 observer: None,
333 monitor: None,
334 worker_attrs: ThreadAttributes::new(),
335 }
336 }
337}
338
339impl ExecutorBuilder {
340 /// Number of worker threads. `0` → inline (no pool). Default → physical
341 /// cores.
342 #[must_use]
343 pub const fn worker_threads(mut self, n: usize) -> Self {
344 self.worker_threads = Some(n);
345 self
346 }
347
348 /// Attach a lifecycle observer. If not called, a no-op observer is used.
349 #[must_use]
350 pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
351 self.observer = Some(obs);
352 self
353 }
354
355 /// Attach an execution monitor. If not called, a no-op monitor is used.
356 #[must_use]
357 pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
358 self.monitor = Some(mon);
359 self
360 }
361
362 /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
363 /// for worker threads. Has no effect when `worker_threads` is `0` (inline
364 /// mode). Requires the `thread_attrs` feature for non-default settings.
365 #[must_use]
366 #[allow(clippy::missing_const_for_fn)]
367 pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
368 self.worker_attrs = attrs;
369 self
370 }
371
372 /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
373 /// internal stop-event service so that any `Stoppable` clone (taken before
374 /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
375 ///
376 /// # Panics
377 ///
378 /// Panics if the internally-generated stop-event service name exceeds the
379 /// iceoryx2 service name length limit (this cannot happen under normal use
380 /// because the name is derived from the process id and a monotonic counter).
381 #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
382 #[track_caller]
383 pub fn build(self) -> Result<Executor, ExecutorError> {
384 let node = NodeBuilder::new()
385 .create::<ipc::Service>()
386 .map_err(ExecutorError::iceoryx2)?;
387
388 let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
389 let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
390
391 // Build the internal stop event service with a unique-per-process name
392 // so multiple executors in the same process don't collide.
393 let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
394 let stop_topic = format!(
395 "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
396 std::process::id()
397 );
398 let stop_event = node
399 .service_builder(&stop_topic.as_str().try_into().unwrap())
400 .event()
401 .open_or_create()
402 .map_err(ExecutorError::iceoryx2)?;
403
404 let stop_notifier = Arc::new(
405 stop_event
406 .notifier_builder()
407 .create()
408 .map_err(ExecutorError::iceoryx2)?,
409 );
410
411 // SAFETY: see module-level note; Arc<IxListener> is held here and only
412 // accessed on the executor thread.
413 let stop_listener = Arc::new(
414 stop_event
415 .listener_builder()
416 .create()
417 .map_err(ExecutorError::iceoryx2)?,
418 );
419
420 // Wire the notifier into the Stoppable so every clone is waker-aware
421 // from the moment the executor is built.
422 let stoppable = Stoppable::with_waker(stop_notifier);
423
424 let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
425
426 let monitor: Arc<dyn ExecutionMonitor> =
427 self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
428
429 let exec = Executor {
430 node,
431 pool,
432 tasks: Vec::new(),
433 running: Arc::new(AtomicBool::new(false)),
434 stoppable,
435 next_id: AtomicU64::new(0),
436 stop_listener,
437 observer,
438 monitor,
439 iter_err: Arc::new(std::sync::Mutex::new(None)),
440 };
441
442 Ok(exec)
443 }
444}
445
446// ── Run loop ──────────────────────────────────────────────────────────────────
447
448impl Executor {
449 /// Run the executor until [`Stoppable::stop`] is called or a task signals
450 /// stop via [`crate::Context::stop_executor`].
451 ///
452 /// # Errors
453 ///
454 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
455 ///
456 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
457 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
458 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
459 ///
460 /// If multiple items error in the same dispatch iteration, only the first
461 /// is preserved; subsequent errors are discarded silently. To observe
462 /// every error, attach an [`Observer`](crate::Observer) and read errors
463 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
464 pub fn run(&mut self) -> Result<(), ExecutorError> {
465 self.run_inner(RunMode::Forever)
466 }
467
468 /// Run for at most `max` wall-clock duration, then return.
469 ///
470 /// # Errors
471 ///
472 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
473 ///
474 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
475 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
476 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
477 ///
478 /// If multiple items error in the same dispatch iteration, only the first
479 /// is preserved; subsequent errors are discarded silently. To observe
480 /// every error, attach an [`Observer`](crate::Observer) and read errors
481 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
482 pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
483 self.run_inner(RunMode::Until(Instant::now() + max))
484 }
485
486 /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
487 ///
488 /// # Errors
489 ///
490 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
491 ///
492 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
493 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
494 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
495 ///
496 /// If multiple items error in the same dispatch iteration, only the first
497 /// is preserved; subsequent errors are discarded silently. To observe
498 /// every error, attach an [`Observer`](crate::Observer) and read errors
499 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
500 pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
501 self.run_inner(RunMode::Iterations(n))
502 }
503
504 /// Run until `predicate()` returns true. Checked after each `WaitSet`
505 /// wakeup.
506 ///
507 /// # Errors
508 ///
509 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
510 ///
511 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
512 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
513 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
514 ///
515 /// If multiple items error in the same dispatch iteration, only the first
516 /// is preserved; subsequent errors are discarded silently. To observe
517 /// every error, attach an [`Observer`](crate::Observer) and read errors
518 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
519 pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
520 self.run_inner(RunMode::Predicate(&mut predicate))
521 }
522}
523
524enum RunMode<'a> {
525 Forever,
526 Until(Instant),
527 Iterations(usize),
528 Predicate(&'a mut dyn FnMut() -> bool),
529}
530
531impl Executor {
532 fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
533 // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
534 // remains true permanently. Calling `run()` again after a stop will return
535 // promptly without doing any meaningful work (it blocks until the first
536 // trigger fires, then immediately exits the dispatch loop). Task 10's
537 // Runner accommodates this by treating an Executor as one-shot: each
538 // Runner owns the Executor and consumes it.
539 if self.running.swap(true, Ordering::SeqCst) {
540 return Err(ExecutorError::AlreadyRunning);
541 }
542
543 self.observer.on_executor_up();
544 let result = self.dispatch_loop(&mut mode);
545 match &result {
546 Ok(()) => self.observer.on_executor_down(),
547 Err(e) => self.observer.on_executor_error(e),
548 }
549
550 self.running.store(false, Ordering::SeqCst);
551 result
552 }
553
554 #[allow(
555 unsafe_code,
556 clippy::too_many_lines,
557 clippy::ref_as_ptr,
558 clippy::borrow_as_ptr
559 )]
560 fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
561 let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
562 .create()
563 .map_err(ExecutorError::iceoryx2)?;
564
565 // Keep Arc<RawListener> alive for at least as long as the WaitSet
566 // guards — the guard borrows the listener via 'attachment lifetime.
567 let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
568 // Guards must outlive the run loop.
569 let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
570 // Maps guard index → task index.
571 let mut attachment_to_task: Vec<usize> = Vec::new();
572
573 for (task_idx, task) in self.tasks.iter().enumerate() {
574 for decl in &task.decls {
575 match decl {
576 TriggerDecl::Subscriber { listener } => {
577 // Clone Arc to extend listener lifetime to this scope.
578 let l = Arc::clone(listener);
579 listener_storage.push(l);
580 let l_ref = listener_storage.last().unwrap().as_ref();
581 // SAFETY: we cast the reference lifetime to match
582 // 'waitset / 'attachment; both listener_storage and
583 // waitset are stack-local and dropped together at the
584 // end of dispatch_loop. Guards are dropped before
585 // listener_storage below.
586 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
587 let guard = waitset
588 .attach_notification(l_ref)
589 .map_err(ExecutorError::iceoryx2)?;
590 guards.push(guard);
591 attachment_to_task.push(task_idx);
592 }
593 TriggerDecl::Interval(d) => {
594 let guard = waitset
595 .attach_interval(*d)
596 .map_err(ExecutorError::iceoryx2)?;
597 guards.push(guard);
598 attachment_to_task.push(task_idx);
599 }
600 TriggerDecl::Deadline { listener, deadline } => {
601 let l = Arc::clone(listener);
602 listener_storage.push(l);
603 let l_ref = listener_storage.last().unwrap().as_ref();
604 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
605 let guard = waitset
606 .attach_deadline(l_ref, *deadline)
607 .map_err(ExecutorError::iceoryx2)?;
608 guards.push(guard);
609 attachment_to_task.push(task_idx);
610 }
611 TriggerDecl::RawListener(listener) => {
612 let l = Arc::clone(listener);
613 listener_storage.push(l);
614 let l_ref = listener_storage.last().unwrap().as_ref();
615 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
616 let guard = waitset
617 .attach_notification(l_ref)
618 .map_err(ExecutorError::iceoryx2)?;
619 guards.push(guard);
620 attachment_to_task.push(task_idx);
621 }
622 }
623 }
624 }
625
626 // Attach the internal stop listener so the WaitSet wakes when
627 // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
628 // struct which is valid for the lifetime of dispatch_loop. We use the
629 // same raw-pointer-cast pattern as user listeners above.
630 //
631 // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
632 // exclusively borrowed for the duration of `run_inner` (which calls
633 // `dispatch_loop`). The listener is not freed while the guard is alive
634 // because the Arc keeps it alive and `self` outlives this function.
635 let stop_listener_ref: &IxListener<ipc::Service> =
636 unsafe { &*(self.stop_listener.as_ref() as *const _) };
637 let _stop_guard = waitset
638 .attach_notification(stop_listener_ref)
639 .map_err(ExecutorError::iceoryx2)?;
640
641 let iterations_done = AtomicUsize::new(0);
642 let stop_flag = self.stoppable.clone();
643
644 loop {
645 // Reset the pre-allocated per-iteration error slot (REQ_0060):
646 // the slot is owned by `self.iter_err`, allocated once at build
647 // time. Pool worker closures obtain a refcount-only clone of
648 // the `Arc`; the slot itself is reused across iterations.
649 *self.iter_err.lock().unwrap() = None;
650
651 // SAFETY: we capture &mut self.tasks via a raw pointer because
652 // wait_and_process expects FnMut and Rust can't see the closure
653 // outlives `self`. The discipline that makes this sound:
654 // 1. The closure body on the executor thread is the *only* code that
655 // reads `tasks_ptr`. The pool jobs it submits hold borrowed
656 // `*mut dyn ExecutableItem` slices into individual TaskEntries,
657 // not into the Vec itself, so they don't race with the Vec.
658 // 2. `pool.barrier()` at the end of this callback ensures every
659 // submitted pool job has completed (and dropped its raw pointer)
660 // before the callback returns. The next iteration of the WaitSet
661 // loop is therefore the sole user of `tasks_ptr` again.
662 // 3. The Vec is never resized inside this loop (no `push` / `remove`
663 // after dispatch starts), so the underlying buffer addresses are
664 // stable for the lifetime of `dispatch_loop`.
665 let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
666 let pool = &self.pool;
667 // Refcount-only clone of the pre-allocated error slot. Pool jobs
668 // need a `'static` handle, and an `Arc::clone` does not allocate.
669 // The Single/Chain paths use the closure baked into `task.job`,
670 // which already captured stable Arc clones at `add`-time; the
671 // Graph path uses closures pre-built by `prepare_dispatch`. Only
672 // the error-aggregation logic on the WaitSet thread still needs
673 // the slot here.
674 let iter_err_inner = Arc::clone(&self.iter_err);
675 // Raw pointer to the stop listener for draining inside the callback.
676 // SAFETY: same as stop_listener_ref above — the Arc is alive for
677 // the lifetime of dispatch_loop.
678 let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
679
680 let cb_result = waitset.wait_and_process_once(
681 |attachment_id: WaitSetAttachmentId<ipc::Service>| {
682 // Drain stop notifications first (no dispatch — the stop_flag
683 // check after the callback returns handles termination).
684 // SAFETY: stop_listener_ptr is valid for the duration of the
685 // closure; the Arc in self.stop_listener keeps it alive.
686 let stop_l = unsafe { &*stop_listener_ptr };
687 while let Ok(Some(_)) = stop_l.try_wait_one() {}
688
689 for (i, guard) in guards.iter().enumerate() {
690 let fired = attachment_id.has_event_from(guard)
691 || attachment_id.has_missed_deadline(guard);
692 if !fired {
693 continue;
694 }
695 let task_idx = attachment_to_task[i];
696
697 // SAFETY: we are the only thread that may touch
698 // `self` during the callback. wait_and_process_once
699 // is single-threaded; we hold &mut self in
700 // dispatch_loop. The pointer is valid for the
701 // duration of this closure.
702 let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
703
704 match &mut task.kind {
705 TaskKind::Single(_) | TaskKind::Chain(_) => {
706 // The dispatch closure was pre-allocated at
707 // task-add time and stashed on `task.job`.
708 // Submit it via `submit_borrowed` — no
709 // per-iteration Box allocation. Required by
710 // REQ_0060.
711 let job_box = task
712 .job
713 .as_deref_mut()
714 .expect("Single/Chain tasks carry a pre-built job");
715 let job_ptr: *mut (dyn FnMut() + Send) =
716 job_box as *mut (dyn FnMut() + Send);
717 // SAFETY: the closure lives in
718 // `task.job` which is owned by
719 // `self.tasks[task_idx]`; `tasks_ptr` is
720 // sound for the duration of this
721 // callback. `pool.barrier()` below
722 // finishes the closure invocation before
723 // we re-enter the next iteration's
724 // callback. The WaitSet thread does not
725 // touch the closure between this submit
726 // and that barrier.
727 #[allow(unsafe_code)]
728 unsafe {
729 pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
730 }
731 }
732 TaskKind::Graph(graph) => {
733 // Outer driver runs on the WaitSet thread; vertices run on the
734 // pool. The graph holds its own pre-built per-vertex closures
735 // and SPSC ready ring (REQ_0060), so dispatch is allocation-free
736 // in steady state.
737 let outcome = graph.run_once_borrowed(pool);
738 if let Some(source) = outcome.error {
739 let mut g = iter_err_inner.lock().unwrap();
740 if g.is_none() {
741 *g = Some(ExecutorError::Item {
742 task_id: task.id.clone(),
743 source,
744 });
745 }
746 }
747 let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
748 }
749 }
750 }
751
752 // Wait for all submitted jobs to finish before leaving
753 // the callback scope (validates item_ptr safety contract).
754 pool.barrier();
755 CallbackProgression::Continue
756 },
757 );
758
759 let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
760
761 // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
762 // here for a clean exit.
763 if matches!(
764 cb_result,
765 WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
766 ) {
767 return Ok(());
768 }
769
770 // Extract the error before dropping the MutexGuard — avoids
771 // holding the lock across the return (clippy::significant_drop_in_scrutinee).
772 let maybe_err = self.iter_err.lock().unwrap().take();
773 if let Some(err) = maybe_err {
774 return Err(err);
775 }
776 if stop_flag.is_stopped() {
777 return Ok(());
778 }
779
780 iterations_done.fetch_add(1, Ordering::SeqCst);
781 match mode {
782 RunMode::Forever => {}
783 RunMode::Iterations(n) => {
784 if iterations_done.load(Ordering::SeqCst) >= *n {
785 return Ok(());
786 }
787 }
788 RunMode::Until(deadline) => {
789 if Instant::now() >= *deadline {
790 return Ok(());
791 }
792 }
793 RunMode::Predicate(p) => {
794 if (p)() {
795 return Ok(());
796 }
797 }
798 }
799 }
800 }
801}
802
803/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
804/// `Pool::submit`. The send is safe because:
805/// 1. The executor guarantees at most one invocation of a given item at a
806/// time (via `pool.barrier()` before the pointer is reused).
807/// 2. `ExecutableItem: Send`, so moving the pointee across threads is sound
808/// when no aliasing exists.
809#[allow(unsafe_code)]
810struct SendItemPtr {
811 ptr: *mut dyn ExecutableItem,
812}
813
814impl SendItemPtr {
815 fn new(ptr: *mut dyn ExecutableItem) -> Self {
816 Self { ptr }
817 }
818
819 /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
820 /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
821 /// dispatch closure to be reusable across iterations without allocation).
822 fn get(&self) -> *mut dyn ExecutableItem {
823 self.ptr
824 }
825}
826
827// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
828// closure can borrow `&SendItemPtr` per invocation without making the
829// closure itself `!Send`.
830#[allow(unsafe_code)]
831unsafe impl Send for SendItemPtr {}
832#[allow(unsafe_code)]
833unsafe impl Sync for SendItemPtr {}
834
835/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
836/// closure can iterate the chain's items in place without first
837/// collecting them into a freshly-allocated `Vec`. The send is safe
838/// for the same reason as [`SendItemPtr`] (see above): the executor
839/// holds `&mut self` for the duration of `dispatch_loop`, and the
840/// `pool.barrier()` at the end of each callback ensures the closure
841/// has finished using this pointer before the Vec could be touched
842/// from the `WaitSet` thread again. The Vec is never resized after
843/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
844/// allocate per iteration.
845#[allow(unsafe_code)]
846struct SendChainPtr {
847 ptr: *mut Vec<Box<dyn ExecutableItem>>,
848}
849
850impl SendChainPtr {
851 fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
852 Self { ptr }
853 }
854
855 fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
856 self.ptr
857 }
858}
859
860// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
861// borrow `&SendChainPtr` per invocation while staying `Send`.
862#[allow(unsafe_code)]
863unsafe impl Send for SendChainPtr {}
864#[allow(unsafe_code)]
865unsafe impl Sync for SendChainPtr {}
866
867/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
868///
869/// The returned closure is stored on `TaskEntry::job` and invoked once
870/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
871/// performs no allocation. The closure captures Arc clones of the
872/// executor's shared state — those clones are refcount-only at build
873/// time and are reused on every dispatch. Required for `REQ_0060`.
874#[allow(clippy::too_many_arguments)]
875fn build_single_job(
876 id: TaskId,
877 stop: Stoppable,
878 obs: Arc<dyn Observer>,
879 mon: Arc<dyn ExecutionMonitor>,
880 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
881 app_id: Option<u32>,
882 app_inst: Option<u32>,
883 item_ptr: SendItemPtr,
884) -> Box<dyn FnMut() + Send + 'static> {
885 Box::new(move || {
886 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
887 if let Some(aid) = app_id {
888 obs.on_app_start(id.clone(), aid, app_inst);
889 }
890 let raw = item_ptr.get();
891 let started = std::time::Instant::now();
892 mon.pre_execute(id.clone(), started);
893 // SAFETY: barrier() pairs with this invocation; the WaitSet
894 // thread does not touch the item between `submit_borrowed` and
895 // the matching `barrier()`. See SendItemPtr safety doc.
896 #[allow(unsafe_code)]
897 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
898 let took = started.elapsed();
899 mon.post_execute(id.clone(), started, took, res.is_ok());
900 if let Err(ref e) = res {
901 obs.on_app_error(id.clone(), e.as_ref());
902 }
903 if app_id.is_some() {
904 obs.on_app_stop(id.clone());
905 }
906 record_first_err(&err_slot, &id, res);
907 })
908}
909
910/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
911fn build_chain_job(
912 id: TaskId,
913 stop: Stoppable,
914 obs: Arc<dyn Observer>,
915 mon: Arc<dyn ExecutionMonitor>,
916 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
917 chain_ptr: SendChainPtr,
918) -> Box<dyn FnMut() + Send + 'static> {
919 Box::new(move || {
920 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
921 // SAFETY: barrier() pairs with this invocation; the chain Vec
922 // and the items it owns are not touched by the WaitSet thread
923 // until barrier() returns. See SendChainPtr safety doc.
924 #[allow(unsafe_code)]
925 let chain_items = unsafe { &mut *chain_ptr.get() };
926 for item_box in chain_items.iter_mut() {
927 let app_id = item_box.app_id();
928 let app_inst = item_box.app_instance_id();
929 if let Some(aid) = app_id {
930 obs.on_app_start(id.clone(), aid, app_inst);
931 }
932 let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
933 let started = std::time::Instant::now();
934 mon.pre_execute(id.clone(), started);
935 #[allow(unsafe_code)]
936 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
937 let took = started.elapsed();
938 mon.post_execute(id.clone(), started, took, res.is_ok());
939 if let Err(ref e) = res {
940 obs.on_app_error(id.clone(), e.as_ref());
941 }
942 if app_id.is_some() {
943 obs.on_app_stop(id.clone());
944 }
945 match res {
946 Ok(crate::ControlFlow::Continue) => {}
947 Ok(crate::ControlFlow::StopChain) => break,
948 Err(_) => {
949 record_first_err(&err_slot, &id, res);
950 break;
951 }
952 }
953 }
954 })
955}
956
957#[derive(Debug)]
958struct PanickedTask(String);
959
960impl core::fmt::Display for PanickedTask {
961 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
962 write!(f, "task panicked: {}", self.0)
963 }
964}
965
966impl std::error::Error for PanickedTask {}
967
968/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
969#[allow(clippy::option_if_let_else)]
970fn run_item_catch_unwind(
971 item: &mut dyn ExecutableItem,
972 ctx: &mut crate::context::Context<'_>,
973) -> crate::ExecuteResult {
974 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
975 |payload| {
976 let msg = if let Some(s) = payload.downcast_ref::<&str>() {
977 (*s).to_string()
978 } else if let Some(s) = payload.downcast_ref::<String>() {
979 s.clone()
980 } else {
981 "panicked task".to_string()
982 };
983 Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
984 },
985 )
986}
987
988/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
989/// without depending on its private name.
990pub(crate) fn run_item_catch_unwind_external(
991 item: &mut dyn ExecutableItem,
992 ctx: &mut crate::context::Context<'_>,
993) -> crate::ExecuteResult {
994 run_item_catch_unwind(item, ctx)
995}
996
997/// Record the first error into `slot`. Subsequent errors are silently dropped.
998fn record_first_err(
999 slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
1000 id: &TaskId,
1001 res: crate::ExecuteResult,
1002) {
1003 if let Err(source) = res {
1004 let mut g = slot.lock().unwrap();
1005 if g.is_none() {
1006 *g = Some(ExecutorError::Item {
1007 task_id: id.clone(),
1008 source,
1009 });
1010 }
1011 }
1012}
1013
1014// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
1015
1016/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
1017/// into a registered task.
1018pub struct ExecutorGraphBuilder<'e> {
1019 executor: &'e mut Executor,
1020 builder: crate::graph::GraphBuilder,
1021 custom_id: Option<TaskId>,
1022}
1023
1024impl ExecutorGraphBuilder<'_> {
1025 /// Add a vertex to the graph; returns its handle.
1026 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
1027 self.builder.vertex(item)
1028 }
1029
1030 /// Add a directed edge from one vertex to another.
1031 pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
1032 self.builder.edge(from, to);
1033 self
1034 }
1035
1036 /// Designate the root vertex (its triggers gate the graph).
1037 pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
1038 self.builder.root(v);
1039 self
1040 }
1041
1042 /// Override the auto-generated id with a custom one.
1043 pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
1044 self.custom_id = Some(id.into());
1045 self
1046 }
1047
1048 /// Validate and register the graph. Returns the task id.
1049 ///
1050 /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
1051 /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
1052 /// precedence over the auto-generated id.
1053 pub fn build(self) -> Result<TaskId, ExecutorError> {
1054 let g = self.builder.finish()?;
1055 // Root vertex's task_id() override wins over the custom id, which wins
1056 // over the auto-generated fallback.
1057 let auto_id = || {
1058 TaskId::new(format!(
1059 "graph-{}",
1060 self.executor.next_id.fetch_add(1, Ordering::SeqCst)
1061 ))
1062 };
1063 let id = g
1064 .root_task_id()
1065 .map(TaskId::new)
1066 .or(self.custom_id)
1067 .unwrap_or_else(auto_id);
1068 let decls = g.decls.clone();
1069
1070 // Box the graph for address stability — per-vertex dispatch
1071 // closures capture `*const Graph` and must not see it move.
1072 let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
1073 // Pre-build the per-vertex closures now that we know the
1074 // task_id and have access to the executor's shared state.
1075 graph_box.prepare_dispatch(
1076 id.clone(),
1077 self.executor.stoppable.clone(),
1078 Arc::clone(&self.executor.observer),
1079 Arc::clone(&self.executor.monitor),
1080 Arc::clone(&self.executor.iter_err),
1081 );
1082
1083 self.executor.tasks.push(TaskEntry {
1084 id: id.clone(),
1085 kind: TaskKind::Graph(graph_box),
1086 decls,
1087 // Graph tasks dispatch their vertices via `vertex_jobs`
1088 // stored inside the `Graph`; the per-task `job` slot
1089 // is unused for graphs.
1090 job: None,
1091 });
1092 Ok(id)
1093 }
1094}
1095
1096// ── Unit tests ────────────────────────────────────────────────────────────────
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::*;
1101 use crate::{ControlFlow, item};
1102
1103 #[test]
1104 fn add_returns_unique_ids() {
1105 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1106 let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1107 let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1108 assert_ne!(a, b);
1109 }
1110
1111 #[test]
1112 fn custom_id_is_preserved() {
1113 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1114 let id = exec
1115 .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
1116 .unwrap();
1117 assert_eq!(id.as_str(), "my-task");
1118 }
1119
1120 #[test]
1121 fn declare_triggers_called_at_add_time() {
1122 let called = Arc::new(AtomicBool::new(false));
1123 let called_d = Arc::clone(&called);
1124
1125 let it = crate::item::item_with_triggers(
1126 move |_d| {
1127 called_d.store(true, Ordering::SeqCst);
1128 Ok(())
1129 },
1130 |_| Ok(ControlFlow::Continue),
1131 );
1132
1133 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1134 exec.add(it).unwrap();
1135 assert!(called.load(Ordering::SeqCst));
1136 }
1137}