taktora_executor/executor.rs
1//! `Executor` and `ExecutorBuilder`. Run loop lives in Task 8.
2
3// Fields consumed by the run loop (Task 8) and graph scheduler (Task 14).
4#![allow(dead_code)]
5// pub(crate) inside a private module — intentional, Task 8+ will use them.
6#![allow(clippy::redundant_pub_crate)]
7
8use crate::Channel;
9use crate::context::Stoppable;
10use crate::error::ExecutorError;
11use crate::fault::{
12 ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
13 FaultState, duration_to_ms_sat, instant_to_since_ms,
14};
15use crate::item::ExecutableItem;
16use crate::monitor::{ExecutionMonitor, NoopMonitor};
17use crate::observer::{NoopObserver, Observer};
18use crate::payload::Payload;
19use crate::pool::Pool;
20use crate::task_id::TaskId;
21use crate::task_kind::TaskKind;
22use crate::thread_attrs::ThreadAttributes;
23use crate::trigger::{TriggerDecl, TriggerDeclarer};
24use core::sync::atomic::AtomicU32;
25use iceoryx2::node::Node;
26use iceoryx2::port::listener::Listener as IxListener;
27use iceoryx2::prelude::ipc;
28use iceoryx2::prelude::*;
29use iceoryx2::waitset::WaitSetRunResult;
30use std::sync::Arc;
31use std::sync::OnceLock;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33use std::time::{Duration, Instant};
34
35/// Monotonically increasing counter so multiple executors in the same process
36/// each get a unique stop-event service name.
37static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
38
39/// One registered task entry.
40pub(crate) struct TaskEntry {
41 /// Task identifier.
42 pub(crate) id: TaskId,
43 /// The kind of work this entry holds (single item or chain).
44 pub(crate) kind: TaskKind,
45 /// Trigger declarations recorded at `add` time.
46 pub(crate) decls: Vec<TriggerDecl>,
47 /// Pre-allocated dispatch closure. Built once at `add` / `add_chain`
48 /// time and re-invoked on every dispatch iteration via
49 /// `Pool::submit_borrowed`, avoiding the per-iteration `Box::new(closure)`
50 /// that `Pool::submit<F>` requires in threaded mode. Required for
51 /// `REQ_0060` (zero-alloc steady-state dispatch). `None` for
52 /// `TaskKind::Graph`, which dispatches its vertices via a separate
53 /// path and is handled by `REQ_0062` / `REQ_0063` follow-on work.
54 pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
55
56 /// Per-task budget declared via `TriggerDeclarer::budget`. `None`
57 /// means no per-task check; the executor-wide iteration budget
58 /// still applies. `REQ_0070`.
59 pub(crate) budget: Option<Duration>,
60
61 /// Per-task fault state. Wait-free read on the dispatch hot path.
62 /// Wrapped in `Arc` so dispatch closures built at `add` time can
63 /// capture an owning handle into the same atomic the `TaskEntry`
64 /// holds — `Arc::clone` is refcount-only, so this stays compatible
65 /// with `REQ_0060` (no per-iteration allocation). `REQ_0070`.
66 pub(crate) fault: Arc<FaultAtomic>,
67
68 /// Monotonic per-task overrun counter. Increments on EVERY budget
69 /// breach, including breaches while already `Faulted`. Never reset
70 /// by clearing the fault. Shared with the dispatch closure via
71 /// `Arc::clone`. `REQ_0102`.
72 pub(crate) overrun_count: Arc<AtomicU64>,
73
74 /// Pre-built dispatch closure for the fault-handler item. Mirrors
75 /// `job`. `None` means no handler — the task is simply skipped
76 /// during fault. `REQ_0072`.
77 pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
78}
79
80/// Top-level executor. One per process is the typical case.
81pub struct Executor {
82 pub(crate) node: Node<ipc::Service>,
83 pub(crate) pool: Arc<Pool>,
84 pub(crate) tasks: Vec<TaskEntry>,
85 pub(crate) running: Arc<AtomicBool>,
86 pub(crate) stoppable: Stoppable,
87 pub(crate) next_id: AtomicU64,
88 /// Listener for the internal stop event service. Held here so it outlives
89 /// the `WaitSet` guard inside `dispatch_loop`. Created at `build()` time so
90 /// any `Stoppable` clone (taken before or after `run()`) carries the waker.
91 pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
92 /// Lifecycle observer. Defaults to a no-op.
93 pub(crate) observer: Arc<dyn Observer>,
94 /// Execution monitor. Defaults to a no-op.
95 pub(crate) monitor: Arc<dyn ExecutionMonitor>,
96 /// Per-iteration error capture slot — allocated once at build time and
97 /// reset to `None` at the top of each `dispatch_loop` iteration. Pool
98 /// workers obtain a refcount-only `Arc::clone` of this slot, avoiding
99 /// the per-iteration heap allocation that the previous design incurred.
100 /// Required for `REQ_0060`.
101 pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
102 /// Executor-wide iteration budget from `ExecutorBuilder::iteration_budget`.
103 /// `None` means no executor-wide check.
104 pub(crate) iteration_budget: Option<Duration>,
105 /// Executor-wide fault state. Wrapped in `Arc` so each dispatch
106 /// closure can hold an owning handle without re-borrowing through
107 /// `self`. `REQ_0071`.
108 pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
109
110 /// Index of the task whose `execute()` overran when the executor
111 /// transitioned to `Faulted`. Read alongside `exec_fault`.
112 pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
113
114 /// Budget that was breached when the executor transitioned to
115 /// `Faulted`, in ms (saturated). Read alongside `exec_fault`.
116 pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
117
118 /// Executor start time, set on first dispatch. Used to compute
119 /// `since_ms` for faults relative to `Executor::run` entry. Wrapped
120 /// in `Arc` so dispatch closures share the same `OnceLock` with the
121 /// executor — `get_or_init` is idempotent and wait-free.
122 pub(crate) start_time: Arc<OnceLock<Instant>>,
123}
124
125// SAFETY: `IxListener<ipc::Service>` is `!Send` for the same Rc-based
126// `SingleThreaded` reason as `IxNotifier`. After construction, the only
127// per-iteration call is `listener.try_wait_one()`, which does not mutate the
128// Rc. `Executor` is never shared across threads (it requires `&mut self` for
129// `run()`), so there is no aliased concurrent mutation.
130#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
131unsafe impl Send for Executor {}
132
133impl Executor {
134 /// Start a new builder.
135 #[must_use]
136 pub fn builder() -> ExecutorBuilder {
137 ExecutorBuilder::default()
138 }
139
140 /// Open or create a pub/sub channel bound to this executor's node.
141 pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
142 Channel::open_or_create(&self.node, name)
143 }
144
145 /// Open or create a request/response service bound to this executor's node.
146 pub fn service<Req, Resp>(
147 &mut self,
148 name: &str,
149 ) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
150 where
151 Req: Payload,
152 Resp: Payload,
153 {
154 crate::Service::open_or_create(&self.node, name)
155 }
156
157 /// Add an item to the executor with an auto-generated id.
158 pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
159 let id = TaskId::new(format!(
160 "task-{}",
161 self.next_id.fetch_add(1, Ordering::SeqCst)
162 ));
163 self.add_with_id(id, item)
164 }
165
166 /// Add an item with a user-supplied id.
167 ///
168 /// The item's [`ExecutableItem::task_id`] override takes precedence over
169 /// the caller-supplied `id`, which itself takes precedence over the
170 /// auto-generated id assigned by [`Executor::add`].
171 pub fn add_with_id(
172 &mut self,
173 id: impl Into<TaskId>,
174 mut item: impl ExecutableItem,
175 ) -> Result<TaskId, ExecutorError> {
176 let id_arg: TaskId = id.into();
177 // The item's `task_id()` override wins over the user-supplied id.
178 let id = item.task_id().map_or(id_arg, TaskId::new);
179 let mut declarer = TriggerDeclarer::new_internal();
180 item.declare_triggers(&mut declarer)?;
181 let budget = declarer.budget;
182 let decls = declarer.into_decls();
183
184 let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
185 let app_id = item_box.app_id();
186 let app_inst = item_box.app_instance_id();
187 // SAFETY: the raw pointer points into the heap allocation of
188 // `item_box`. `Box` keeps that allocation at a stable address even
189 // when the `Box` itself is moved (e.g. when `self.tasks` grows),
190 // so the pointer remains valid for the lifetime of the
191 // `TaskEntry`. See SendItemPtr safety doc for the rest of the
192 // discipline (barrier() pairs with worker access).
193 #[allow(unsafe_code)]
194 let item_ptr =
195 SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
196
197 // Allocate the per-task atomics now so the dispatch closure
198 // and the `TaskEntry` share the same `Arc` storage. The task
199 // will occupy `self.tasks.len()` after the push below — capture
200 // that index up front for `task_idx_u32`. Bounded workspace, so
201 // the `as u32` cast is sound; explicit allow keeps clippy quiet.
202 let task_fault = Arc::new(FaultAtomic::new());
203 let overrun_count = Arc::new(AtomicU64::new(0));
204 #[allow(clippy::cast_possible_truncation)]
205 let task_idx_u32 = self.tasks.len() as u32;
206 let fault_ctx = FaultDispatchCtx {
207 task_budget: budget,
208 task_fault: Arc::clone(&task_fault),
209 overrun_count: Arc::clone(&overrun_count),
210 iteration_budget: self.iteration_budget,
211 exec_fault: Arc::clone(&self.exec_fault),
212 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
213 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
214 task_idx_u32,
215 exec_start: Arc::clone(&self.start_time),
216 observer: Arc::clone(&self.observer),
217 };
218
219 let job = build_single_job(
220 id.clone(),
221 self.stoppable.clone(),
222 Arc::clone(&self.observer),
223 Arc::clone(&self.monitor),
224 Arc::clone(&self.iter_err),
225 app_id,
226 app_inst,
227 item_ptr,
228 fault_ctx,
229 );
230
231 self.tasks.push(TaskEntry {
232 id: id.clone(),
233 kind: TaskKind::Single(item_box),
234 decls,
235 job: Some(job),
236 budget,
237 fault: task_fault,
238 overrun_count,
239 handler_job: None,
240 });
241 Ok(id)
242 }
243
244 /// Register an item plus a fault-handler item.
245 ///
246 /// The main item is registered through the canonical [`add`](Self::add)
247 /// path. The handler's [`declare_triggers`](ExecutableItem::declare_triggers)
248 /// is called (so handlers that internally rely on the declarer being
249 /// invoked observe the call) but its returned trigger list is
250 /// **ignored** — the handler dispatches on the main item's triggers
251 /// while the task is in `Faulted` state and runs in place of the main
252 /// item's `execute()`. The pre-built handler dispatch closure is
253 /// stashed on the same task entry as the main item's `job`,
254 /// satisfying `REQ_0072`.
255 ///
256 /// # Errors
257 ///
258 /// Propagates any error from registering the main item via `add`, or
259 /// from the handler's `declare_triggers` call.
260 ///
261 /// # Panics
262 ///
263 /// Panics if the task entry just inserted by [`add`](Self::add) cannot
264 /// be located in `self.tasks` — this is unreachable by construction
265 /// and indicates a logic bug.
266 pub fn add_with_fault_handler<I, H>(
267 &mut self,
268 main: I,
269 handler: H,
270 ) -> Result<TaskId, ExecutorError>
271 where
272 I: ExecutableItem,
273 H: ExecutableItem,
274 {
275 let task_id = self.add(main)?;
276
277 // Drain the handler's trigger declarations — they are ignored by
278 // design (the handler runs on the main item's triggers).
279 let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
280 let mut throwaway = TriggerDeclarer::new_internal();
281 handler_box.declare_triggers(&mut throwaway)?;
282 drop(throwaway);
283
284 let app_id = handler_box.app_id();
285 let app_inst = handler_box.app_instance_id();
286
287 // Locate the task we just added so we can share its per-task
288 // atomics with the handler's `FaultDispatchCtx`. The handler
289 // runs on the same `TaskEntry`; per §4.6 invariant 5, a handler
290 // breach increments `overrun_count` and keeps state `Faulted`
291 // without re-firing the observer.
292 let task_idx = self
293 .tasks
294 .iter()
295 .position(|t| t.id == task_id)
296 .expect("just added; must exist");
297 let task = &self.tasks[task_idx];
298 #[allow(clippy::cast_possible_truncation)]
299 let task_idx_u32 = task_idx as u32;
300 let handler_fault_ctx = FaultDispatchCtx {
301 task_budget: task.budget,
302 task_fault: Arc::clone(&task.fault),
303 overrun_count: Arc::clone(&task.overrun_count),
304 iteration_budget: self.iteration_budget,
305 exec_fault: Arc::clone(&self.exec_fault),
306 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
307 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
308 task_idx_u32,
309 exec_start: Arc::clone(&self.start_time),
310 observer: Arc::clone(&self.observer),
311 };
312
313 let handler_closure = build_handler_job(
314 task_id.clone(),
315 self.stoppable.clone(),
316 Arc::clone(&self.observer),
317 Arc::clone(&self.monitor),
318 Arc::clone(&self.iter_err),
319 app_id,
320 app_inst,
321 handler_box,
322 handler_fault_ctx,
323 );
324
325 self.tasks[task_idx].handler_job = Some(handler_closure);
326
327 Ok(task_id)
328 }
329
330 /// Clear a per-task fault. Returns the previous `FaultState`.
331 /// Fires `Observer::on_task_clear` if the state changed from
332 /// `Faulted` to `Running`. `REQ_0070`.
333 ///
334 /// # Errors
335 ///
336 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
337 /// * [`ExecutorError::TaskNotFaulted`] if `task` is already `Running`.
338 pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
339 let entry = self
340 .tasks
341 .iter()
342 .find(|t| t.id == task)
343 .ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
344 let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
345 let prev = entry.fault.swap(FaultState::Running, budget_ms);
346 match prev {
347 FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
348 FaultState::Faulted { .. } => {
349 self.observer.on_task_clear(task);
350 Ok(prev)
351 }
352 }
353 }
354
355 /// Clear the executor-wide fault and cascade-clear every task whose
356 /// state is `Faulted{ExecutorFaulted}`. Tasks whose state is
357 /// `Faulted{BudgetExceeded}` are NOT cleared (their own contract
358 /// breach is independent). Fires `Observer::on_executor_clear` and
359 /// one `Observer::on_task_clear` per cascade-cleared task.
360 /// `REQ_0071`.
361 ///
362 /// # Errors
363 ///
364 /// * [`ExecutorError::ExecutorNotFaulted`] if the executor is `Running`.
365 pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
366 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
367 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
368 let prev = self
369 .exec_fault
370 .swap(ExecutorFaultState::Running, task_idx, budget_ms);
371 match prev {
372 ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
373 ExecutorFaultState::Faulted { .. } => {
374 // Cascade-clear tasks whose reason is ExecutorFaulted.
375 for entry in &self.tasks {
376 let task_budget_ms =
377 entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
378 if let FaultState::Faulted {
379 reason: FaultReason::ExecutorFaulted,
380 ..
381 } = entry.fault.load(task_budget_ms)
382 {
383 let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
384 self.observer.on_task_clear(entry.id.clone());
385 }
386 }
387 self.observer.on_executor_clear();
388 Ok(prev)
389 }
390 }
391 }
392
393 /// Return the per-task overrun counter — number of times the task's
394 /// `execute()` exceeded its budget over the executor's lifetime.
395 /// Monotonic; not reset by `clear_task_fault`. `REQ_0102`.
396 ///
397 /// # Errors
398 ///
399 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
400 pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
401 self.tasks
402 .iter()
403 .find(|t| t.id == task)
404 .map(|t| t.overrun_count.load(Ordering::Acquire))
405 .ok_or_else(|| ExecutorError::TaskNotFound(task))
406 }
407
408 /// Return a snapshot of the per-task `FaultState`. `REQ_0073` (pull path).
409 ///
410 /// # Errors
411 ///
412 /// * [`ExecutorError::TaskNotFound`] if `task` is unknown.
413 pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
414 self.tasks
415 .iter()
416 .find(|t| t.id == task)
417 .map(|t| {
418 let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
419 t.fault.load(budget_ms)
420 })
421 .ok_or_else(|| ExecutorError::TaskNotFound(task))
422 }
423
424 /// Return a snapshot of the executor-wide `ExecutorFaultState`.
425 /// `REQ_0073` (pull path).
426 #[must_use]
427 pub fn executor_fault_state(&self) -> ExecutorFaultState {
428 let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
429 let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
430 self.exec_fault.load(task_idx, budget_ms)
431 }
432
433 /// Add a sequential chain of items. Only the head item's
434 /// `declare_triggers` is consulted; non-head triggers are ignored with a
435 /// tracing warn.
436 pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
437 where
438 I: ExecutableItem,
439 C: IntoIterator<Item = I>,
440 {
441 let id = TaskId::new(format!(
442 "chain-{}",
443 self.next_id.fetch_add(1, Ordering::SeqCst)
444 ));
445 let boxed: Vec<Box<dyn ExecutableItem>> = items
446 .into_iter()
447 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
448 .collect();
449 self.add_chain_with_id_boxed(id, boxed)
450 }
451
452 /// Like [`Executor::add_chain`] but with a user-supplied id.
453 pub fn add_chain_with_id<I, C>(
454 &mut self,
455 id: impl Into<TaskId>,
456 items: C,
457 ) -> Result<TaskId, ExecutorError>
458 where
459 I: ExecutableItem,
460 C: IntoIterator<Item = I>,
461 {
462 let boxed: Vec<Box<dyn ExecutableItem>> = items
463 .into_iter()
464 .map(|i| Box::new(i) as Box<dyn ExecutableItem>)
465 .collect();
466 self.add_chain_with_id_boxed(id.into(), boxed)
467 }
468
469 fn add_chain_with_id_boxed(
470 &mut self,
471 id: TaskId,
472 mut items: Vec<Box<dyn ExecutableItem>>,
473 ) -> Result<TaskId, ExecutorError> {
474 if items.is_empty() {
475 return Err(ExecutorError::Builder(
476 "chain must contain at least one item".into(),
477 ));
478 }
479
480 // Head item's `task_id()` override wins over the user-supplied id.
481 let id = items[0].task_id().map_or(id, TaskId::new);
482
483 // Head's triggers gate the chain.
484 let mut head_declarer = TriggerDeclarer::new_internal();
485 items[0].declare_triggers(&mut head_declarer)?;
486 let decls = head_declarer.into_decls();
487
488 // Warn if non-head items declared triggers (those will be ignored).
489 for (i, body) in items.iter_mut().enumerate().skip(1) {
490 let mut spurious = TriggerDeclarer::new_internal();
491 let _ = body.declare_triggers(&mut spurious);
492 if !spurious.is_empty() {
493 #[cfg(feature = "tracing")]
494 tracing::warn!(
495 target: "taktora-executor",
496 task = %id,
497 position = i,
498 "non-head chain item declared triggers; they will be ignored"
499 );
500 #[cfg(not(feature = "tracing"))]
501 {
502 let _ = i;
503 }
504 }
505 }
506
507 let mut items = items;
508 // SAFETY: pointer into the chain's `items` Vec. The Vec lives
509 // inside `TaskKind::Chain` inside `TaskEntry`. The Vec's buffer
510 // is stable once `add_chain` returns — `self.tasks` may grow
511 // (moving the `Vec<Box<...>>` header itself), but the Vec's
512 // heap buffer is referenced via the header's data pointer and
513 // is unaffected by header moves. We never resize the chain Vec
514 // after this point. See SendChainPtr safety doc for the rest.
515 #[allow(unsafe_code)]
516 let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
517 &mut items,
518 ));
519 // NB: the pointer above is to the local `items` Vec on the
520 // stack — it's invalid after the `push` below moves items into
521 // the TaskEntry. We rederive a stable pointer after the push.
522 // (See the rebuild step below.)
523 let _ = chain_ptr;
524
525 // Pre-allocate the per-task atomics so the chain's dispatch
526 // closure can capture clones of the same `Arc`s the `TaskEntry`
527 // holds. The chain occupies `self.tasks.len()` after the push.
528 let task_fault = Arc::new(FaultAtomic::new());
529 let overrun_count = Arc::new(AtomicU64::new(0));
530 #[allow(clippy::cast_possible_truncation)]
531 let task_idx_u32 = self.tasks.len() as u32;
532
533 self.tasks.push(TaskEntry {
534 id: id.clone(),
535 kind: TaskKind::Chain(items),
536 decls,
537 job: None, // populated in the rebuild step below
538 // TODO(post-Task-10): chain budgets carried separately; for now None.
539 budget: None,
540 fault: Arc::clone(&task_fault),
541 overrun_count: Arc::clone(&overrun_count),
542 handler_job: None,
543 });
544
545 // After the push, the TaskEntry lives at a stable position in
546 // `self.tasks` for the duration of this `add_chain_with_id_boxed`
547 // call. Take a stable pointer to its chain Vec and build the
548 // dispatch closure. If `self.tasks` later grows, the Vec header
549 // inside the TaskEntry moves but the header's data pointer
550 // (which addresses the chain's heap buffer) does not — and the
551 // closure derefs that pointer per dispatch, so it re-reads the
552 // current heap address each time. Sound under the same
553 // discipline as `tasks_ptr` in dispatch_loop.
554 let task_idx = self.tasks.len() - 1;
555 let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
556 {
557 TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
558 // The push above used TaskKind::Chain, so this arm is
559 // unreachable. Mark it explicitly to satisfy `match`.
560 _ => unreachable!("just-pushed task is TaskKind::Chain"),
561 };
562 #[allow(unsafe_code)]
563 let chain_ptr = SendChainPtr::new(chain_vec_ptr);
564 let fault_ctx = FaultDispatchCtx {
565 task_budget: None, // chain budgets are intentionally None for now
566 task_fault,
567 overrun_count,
568 iteration_budget: self.iteration_budget,
569 exec_fault: Arc::clone(&self.exec_fault),
570 exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
571 exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
572 task_idx_u32,
573 exec_start: Arc::clone(&self.start_time),
574 observer: Arc::clone(&self.observer),
575 };
576 let job = build_chain_job(
577 id.clone(),
578 self.stoppable.clone(),
579 Arc::clone(&self.observer),
580 Arc::clone(&self.monitor),
581 Arc::clone(&self.iter_err),
582 chain_ptr,
583 fault_ctx,
584 );
585 self.tasks[task_idx].job = Some(job);
586 Ok(id)
587 }
588
589 /// Returns a [`Stoppable`] handle that is waker-aware from the moment the
590 /// executor is built. Clone before calling `run()` — any clone taken at any
591 /// time will wake the `WaitSet` when `stop()` is called.
592 #[must_use]
593 pub fn stoppable(&self) -> Stoppable {
594 self.stoppable.clone()
595 }
596
597 /// Borrow the underlying iceoryx2 node (escape hatch for power users).
598 pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
599 &self.node
600 }
601
602 /// Begin building a graph. Call `.build()` on the returned builder to
603 /// register the graph as a task.
604 pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
605 ExecutorGraphBuilder {
606 executor: self,
607 builder: crate::graph::GraphBuilder::new(),
608 custom_id: None,
609 }
610 }
611}
612
613/// Builder for [`Executor`].
614pub struct ExecutorBuilder {
615 worker_threads: Option<usize>,
616 observer: Option<Arc<dyn Observer>>,
617 monitor: Option<Arc<dyn ExecutionMonitor>>,
618 worker_attrs: ThreadAttributes,
619 /// Executor-wide iteration budget (`REQ_0071`). `None` means no
620 /// executor-wide check.
621 iteration_budget: Option<Duration>,
622}
623
624impl Default for ExecutorBuilder {
625 fn default() -> Self {
626 Self {
627 worker_threads: None,
628 observer: None,
629 monitor: None,
630 worker_attrs: ThreadAttributes::new(),
631 iteration_budget: None,
632 }
633 }
634}
635
636impl ExecutorBuilder {
637 /// Number of worker threads. `0` → inline (no pool). Default → physical
638 /// cores.
639 #[must_use]
640 pub const fn worker_threads(mut self, n: usize) -> Self {
641 self.worker_threads = Some(n);
642 self
643 }
644
645 /// Attach a lifecycle observer. If not called, a no-op observer is used.
646 #[must_use]
647 pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
648 self.observer = Some(obs);
649 self
650 }
651
652 /// Attach an execution monitor. If not called, a no-op monitor is used.
653 #[must_use]
654 pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
655 self.monitor = Some(mon);
656 self
657 }
658
659 /// Configure the executor-wide iteration budget. Any task whose
660 /// `execute()` exceeds `dur` transitions the executor to `Faulted`
661 /// (`REQ_0071`). Default: unset (no executor-wide check).
662 #[must_use]
663 pub const fn iteration_budget(mut self, dur: Duration) -> Self {
664 self.iteration_budget = Some(dur);
665 self
666 }
667
668 /// Set thread attributes (name prefix, CPU affinity, scheduling priority)
669 /// for worker threads. Has no effect when `worker_threads` is `0` (inline
670 /// mode). Requires the `thread_attrs` feature for non-default settings.
671 #[must_use]
672 #[allow(clippy::missing_const_for_fn)]
673 pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
674 self.worker_attrs = attrs;
675 self
676 }
677
678 /// Build the [`Executor`]. Creates a fresh iceoryx2 node and wires up the
679 /// internal stop-event service so that any `Stoppable` clone (taken before
680 /// or after `run()`) will wake the `WaitSet` when `stop()` is called.
681 ///
682 /// # Panics
683 ///
684 /// Panics if the internally-generated stop-event service name exceeds the
685 /// iceoryx2 service name length limit (this cannot happen under normal use
686 /// because the name is derived from the process id and a monotonic counter).
687 #[allow(clippy::arc_with_non_send_sync)] // see SAFETY on `impl Send for Executor`
688 #[track_caller]
689 pub fn build(self) -> Result<Executor, ExecutorError> {
690 let node = NodeBuilder::new()
691 .create::<ipc::Service>()
692 .map_err(ExecutorError::iceoryx2)?;
693
694 let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
695 let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
696
697 // Build the internal stop event service with a unique-per-process name
698 // so multiple executors in the same process don't collide.
699 let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
700 let stop_topic = format!(
701 "taktora.exec.stop.{}.{exec_seq}.__taktora_event",
702 std::process::id()
703 );
704 let stop_event = node
705 .service_builder(&stop_topic.as_str().try_into().unwrap())
706 .event()
707 .open_or_create()
708 .map_err(ExecutorError::iceoryx2)?;
709
710 let stop_notifier = Arc::new(
711 stop_event
712 .notifier_builder()
713 .create()
714 .map_err(ExecutorError::iceoryx2)?,
715 );
716
717 // SAFETY: see module-level note; Arc<IxListener> is held here and only
718 // accessed on the executor thread.
719 let stop_listener = Arc::new(
720 stop_event
721 .listener_builder()
722 .create()
723 .map_err(ExecutorError::iceoryx2)?,
724 );
725
726 // Wire the notifier into the Stoppable so every clone is waker-aware
727 // from the moment the executor is built.
728 let stoppable = Stoppable::with_waker(stop_notifier);
729
730 let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
731
732 let monitor: Arc<dyn ExecutionMonitor> =
733 self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
734
735 let exec = Executor {
736 node,
737 pool,
738 tasks: Vec::new(),
739 running: Arc::new(AtomicBool::new(false)),
740 stoppable,
741 next_id: AtomicU64::new(0),
742 stop_listener,
743 observer,
744 monitor,
745 iter_err: Arc::new(std::sync::Mutex::new(None)),
746 iteration_budget: self.iteration_budget,
747 exec_fault: Arc::new(ExecutorFaultAtomic::new()),
748 exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
749 exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
750 start_time: Arc::new(OnceLock::new()),
751 };
752
753 Ok(exec)
754 }
755}
756
757// ── Run loop ──────────────────────────────────────────────────────────────────
758
759impl Executor {
760 /// Run the executor until [`Stoppable::stop`] is called or a task signals
761 /// stop via [`crate::Context::stop_executor`].
762 ///
763 /// # Errors
764 ///
765 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
766 ///
767 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
768 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
769 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
770 ///
771 /// If multiple items error in the same dispatch iteration, only the first
772 /// is preserved; subsequent errors are discarded silently. To observe
773 /// every error, attach an [`Observer`](crate::Observer) and read errors
774 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
775 pub fn run(&mut self) -> Result<(), ExecutorError> {
776 self.run_inner(RunMode::Forever)
777 }
778
779 /// Run for at most `max` wall-clock duration, then return.
780 ///
781 /// # Errors
782 ///
783 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
784 ///
785 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
786 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
787 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
788 ///
789 /// If multiple items error in the same dispatch iteration, only the first
790 /// is preserved; subsequent errors are discarded silently. To observe
791 /// every error, attach an [`Observer`](crate::Observer) and read errors
792 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
793 pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
794 self.run_inner(RunMode::Until(Instant::now() + max))
795 }
796
797 /// Run until `n` full barrier-cycles (`WaitSet` wakeups) have completed.
798 ///
799 /// # Errors
800 ///
801 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
802 ///
803 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
804 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
805 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
806 ///
807 /// If multiple items error in the same dispatch iteration, only the first
808 /// is preserved; subsequent errors are discarded silently. To observe
809 /// every error, attach an [`Observer`](crate::Observer) and read errors
810 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
811 pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
812 self.run_inner(RunMode::Iterations(n))
813 }
814
815 /// Run until `predicate()` returns true. Checked after each `WaitSet`
816 /// wakeup.
817 ///
818 /// # Errors
819 ///
820 /// Returns the **first** [`ExecutorError`] surfaced during dispatch:
821 ///
822 /// * [`ExecutorError::Item`] if any item returns `Err` or panics.
823 /// * [`ExecutorError::Iceoryx2`] if a `WaitSet` operation fails.
824 /// * [`ExecutorError::AlreadyRunning`] if the executor is already running.
825 ///
826 /// If multiple items error in the same dispatch iteration, only the first
827 /// is preserved; subsequent errors are discarded silently. To observe
828 /// every error, attach an [`Observer`](crate::Observer) and read errors
829 /// via [`Observer::on_app_error`](crate::Observer::on_app_error).
830 pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
831 self.run_inner(RunMode::Predicate(&mut predicate))
832 }
833}
834
835enum RunMode<'a> {
836 Forever,
837 Until(Instant),
838 Iterations(usize),
839 Predicate(&'a mut dyn FnMut() -> bool),
840}
841
842impl Executor {
843 fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
844 // NOTE: Once `Stoppable::stop()` has been called, `self.stoppable.is_stopped()`
845 // remains true permanently. Calling `run()` again after a stop will return
846 // promptly without doing any meaningful work (it blocks until the first
847 // trigger fires, then immediately exits the dispatch loop). Task 10's
848 // Runner accommodates this by treating an Executor as one-shot: each
849 // Runner owns the Executor and consumes it.
850 if self.running.swap(true, Ordering::SeqCst) {
851 return Err(ExecutorError::AlreadyRunning);
852 }
853
854 self.observer.on_executor_up();
855 let result = self.dispatch_loop(&mut mode);
856 match &result {
857 Ok(()) => self.observer.on_executor_down(),
858 Err(e) => self.observer.on_executor_error(e),
859 }
860
861 self.running.store(false, Ordering::SeqCst);
862 result
863 }
864
865 #[allow(
866 unsafe_code,
867 clippy::too_many_lines,
868 clippy::ref_as_ptr,
869 clippy::borrow_as_ptr
870 )]
871 fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
872 let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
873 .create()
874 .map_err(ExecutorError::iceoryx2)?;
875
876 // Keep Arc<RawListener> alive for at least as long as the WaitSet
877 // guards — the guard borrows the listener via 'attachment lifetime.
878 let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
879 // Guards must outlive the run loop.
880 let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
881 // Maps guard index → task index.
882 let mut attachment_to_task: Vec<usize> = Vec::new();
883
884 for (task_idx, task) in self.tasks.iter().enumerate() {
885 for decl in &task.decls {
886 match decl {
887 TriggerDecl::Subscriber { listener } => {
888 // Clone Arc to extend listener lifetime to this scope.
889 let l = Arc::clone(listener);
890 listener_storage.push(l);
891 let l_ref = listener_storage.last().unwrap().as_ref();
892 // SAFETY: we cast the reference lifetime to match
893 // 'waitset / 'attachment; both listener_storage and
894 // waitset are stack-local and dropped together at the
895 // end of dispatch_loop. Guards are dropped before
896 // listener_storage below.
897 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
898 let guard = waitset
899 .attach_notification(l_ref)
900 .map_err(ExecutorError::iceoryx2)?;
901 guards.push(guard);
902 attachment_to_task.push(task_idx);
903 }
904 TriggerDecl::Interval(d) => {
905 let guard = waitset
906 .attach_interval(*d)
907 .map_err(ExecutorError::iceoryx2)?;
908 guards.push(guard);
909 attachment_to_task.push(task_idx);
910 }
911 TriggerDecl::Deadline { listener, deadline } => {
912 let l = Arc::clone(listener);
913 listener_storage.push(l);
914 let l_ref = listener_storage.last().unwrap().as_ref();
915 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
916 let guard = waitset
917 .attach_deadline(l_ref, *deadline)
918 .map_err(ExecutorError::iceoryx2)?;
919 guards.push(guard);
920 attachment_to_task.push(task_idx);
921 }
922 TriggerDecl::RawListener(listener) => {
923 let l = Arc::clone(listener);
924 listener_storage.push(l);
925 let l_ref = listener_storage.last().unwrap().as_ref();
926 let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
927 let guard = waitset
928 .attach_notification(l_ref)
929 .map_err(ExecutorError::iceoryx2)?;
930 guards.push(guard);
931 attachment_to_task.push(task_idx);
932 }
933 }
934 }
935 }
936
937 // Attach the internal stop listener so the WaitSet wakes when
938 // stop() is called. We hold `self.stop_listener` (Arc) in the Executor
939 // struct which is valid for the lifetime of dispatch_loop. We use the
940 // same raw-pointer-cast pattern as user listeners above.
941 //
942 // SAFETY: `self.stop_listener` is an Arc stored on `self`, which is
943 // exclusively borrowed for the duration of `run_inner` (which calls
944 // `dispatch_loop`). The listener is not freed while the guard is alive
945 // because the Arc keeps it alive and `self` outlives this function.
946 let stop_listener_ref: &IxListener<ipc::Service> =
947 unsafe { &*(self.stop_listener.as_ref() as *const _) };
948 let _stop_guard = waitset
949 .attach_notification(stop_listener_ref)
950 .map_err(ExecutorError::iceoryx2)?;
951
952 let iterations_done = AtomicUsize::new(0);
953 let stop_flag = self.stoppable.clone();
954
955 loop {
956 // Reset the pre-allocated per-iteration error slot (REQ_0060):
957 // the slot is owned by `self.iter_err`, allocated once at build
958 // time. Pool worker closures obtain a refcount-only clone of
959 // the `Arc`; the slot itself is reused across iterations.
960 *self.iter_err.lock().unwrap() = None;
961
962 // SAFETY: we capture &mut self.tasks via a raw pointer because
963 // wait_and_process expects FnMut and Rust can't see the closure
964 // outlives `self`. The discipline that makes this sound:
965 // 1. The closure body on the executor thread is the *only* code that
966 // reads `tasks_ptr`. The pool jobs it submits hold borrowed
967 // `*mut dyn ExecutableItem` slices into individual TaskEntries,
968 // not into the Vec itself, so they don't race with the Vec.
969 // 2. `pool.barrier()` at the end of this callback ensures every
970 // submitted pool job has completed (and dropped its raw pointer)
971 // before the callback returns. The next iteration of the WaitSet
972 // loop is therefore the sole user of `tasks_ptr` again.
973 // 3. The Vec is never resized inside this loop (no `push` / `remove`
974 // after dispatch starts), so the underlying buffer addresses are
975 // stable for the lifetime of `dispatch_loop`.
976 let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
977 let pool = &self.pool;
978 // Refcount-only clone of the pre-allocated error slot. Pool jobs
979 // need a `'static` handle, and an `Arc::clone` does not allocate.
980 // The Single/Chain paths use the closure baked into `task.job`,
981 // which already captured stable Arc clones at `add`-time; the
982 // Graph path uses closures pre-built by `prepare_dispatch`. Only
983 // the error-aggregation logic on the WaitSet thread still needs
984 // the slot here.
985 let iter_err_inner = Arc::clone(&self.iter_err);
986 // Raw pointer to the stop listener for draining inside the callback.
987 // SAFETY: same as stop_listener_ref above — the Arc is alive for
988 // the lifetime of dispatch_loop.
989 let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
990 // Raw pointer to the executor-wide fault state. Same safety
991 // discipline as `tasks_ptr`: `Executor` is alive for the
992 // duration of `dispatch_loop`; the WaitSet callback is the
993 // only reader. REQ_0071. `self.exec_fault` is
994 // `Arc<ExecutorFaultAtomic>` — we deref once to obtain a
995 // pointer to the inner `ExecutorFaultAtomic`.
996 let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
997 // Raw pointer to the executor start time. Used by the lazy
998 // cascade below to compute `since_ms` on task transitions
999 // triggered by an executor-wide fault.
1000 let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
1001
1002 let cb_result = waitset.wait_and_process_once(
1003 |attachment_id: WaitSetAttachmentId<ipc::Service>| {
1004 // Drain stop notifications first (no dispatch — the stop_flag
1005 // check after the callback returns handles termination).
1006 // SAFETY: stop_listener_ptr is valid for the duration of the
1007 // closure; the Arc in self.stop_listener keeps it alive.
1008 let stop_l = unsafe { &*stop_listener_ptr };
1009 while let Ok(Some(_)) = stop_l.try_wait_one() {}
1010
1011 for (i, guard) in guards.iter().enumerate() {
1012 let fired = attachment_id.has_event_from(guard)
1013 || attachment_id.has_missed_deadline(guard);
1014 if !fired {
1015 continue;
1016 }
1017 let task_idx = attachment_to_task[i];
1018
1019 // SAFETY: we are the only thread that may touch
1020 // `self` during the callback. wait_and_process_once
1021 // is single-threaded; we hold &mut self in
1022 // dispatch_loop. The pointer is valid for the
1023 // duration of this closure.
1024 let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
1025
1026 // Pre-dispatch fault check (REQ_0070, REQ_0071, REQ_0072).
1027 // Only applies to Single/Chain — Graph tasks use their
1028 // own per-vertex scheduling and are out of scope for
1029 // FEAT_0018.
1030 if matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
1031 // SAFETY: exec_fault_ptr derefs into the Executor that
1032 // owns this dispatch_loop — alive for the callback's
1033 // lifetime.
1034 let exec_faulted = matches!(
1035 unsafe { &*exec_fault_ptr }.load(0, 0),
1036 ExecutorFaultState::Faulted { .. }
1037 );
1038 let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
1039 let task_state = task.fault.load(task_budget_ms);
1040
1041 // Lazy cascade: if executor is `Faulted` and task
1042 // is still `Running`, silently transition the task
1043 // to `Faulted{ExecutorFaulted}`. No `on_task_fault`
1044 // — Observer already heard about the executor-wide
1045 // fault via `on_executor_fault` (cascade-noise
1046 // invariant from FEAT_0018 §4.6).
1047 let task_faulted =
1048 if exec_faulted && matches!(task_state, FaultState::Running) {
1049 // SAFETY: exec_start_ptr derefs into the same
1050 // `Executor` owning this dispatch_loop. The
1051 // `OnceLock` is wait-free.
1052 let exec_start = *unsafe { &*exec_start_ptr }
1053 .get_or_init(std::time::Instant::now);
1054 let since_ms =
1055 instant_to_since_ms(std::time::Instant::now(), exec_start);
1056 let _ = task.fault.swap(
1057 FaultState::Faulted {
1058 reason: FaultReason::ExecutorFaulted,
1059 since_ms,
1060 },
1061 task_budget_ms,
1062 );
1063 true
1064 } else {
1065 matches!(task_state, FaultState::Faulted { .. })
1066 };
1067 let route_to_handler = exec_faulted || task_faulted;
1068
1069 if route_to_handler {
1070 // If a handler is registered, dispatch it.
1071 // Otherwise, skip dispatch entirely this wakeup.
1072 if let Some(handler_box) = task.handler_job.as_deref_mut() {
1073 let job_ptr: *mut (dyn FnMut() + Send) =
1074 handler_box as *mut (dyn FnMut() + Send);
1075 // SAFETY: same as the main-job dispatch
1076 // below — handler_job is owned by the
1077 // TaskEntry; pool.barrier() awaits its
1078 // completion before the next callback.
1079 #[allow(unsafe_code)]
1080 unsafe {
1081 pool.submit_borrowed(crate::pool::BorrowedJob::new(
1082 job_ptr,
1083 ));
1084 }
1085 }
1086 // No handler and not Running — skip silently.
1087 continue;
1088 }
1089 }
1090
1091 match &mut task.kind {
1092 TaskKind::Single(_) | TaskKind::Chain(_) => {
1093 // The dispatch closure was pre-allocated at
1094 // task-add time and stashed on `task.job`.
1095 // Submit it via `submit_borrowed` — no
1096 // per-iteration Box allocation. Required by
1097 // REQ_0060.
1098 let job_box = task
1099 .job
1100 .as_deref_mut()
1101 .expect("Single/Chain tasks carry a pre-built job");
1102 let job_ptr: *mut (dyn FnMut() + Send) =
1103 job_box as *mut (dyn FnMut() + Send);
1104 // SAFETY: the closure lives in
1105 // `task.job` which is owned by
1106 // `self.tasks[task_idx]`; `tasks_ptr` is
1107 // sound for the duration of this
1108 // callback. `pool.barrier()` below
1109 // finishes the closure invocation before
1110 // we re-enter the next iteration's
1111 // callback. The WaitSet thread does not
1112 // touch the closure between this submit
1113 // and that barrier.
1114 #[allow(unsafe_code)]
1115 unsafe {
1116 pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
1117 }
1118 }
1119 TaskKind::Graph(graph) => {
1120 // Outer driver runs on the WaitSet thread; vertices run on the
1121 // pool. The graph holds its own pre-built per-vertex closures
1122 // and SPSC ready ring (REQ_0060), so dispatch is allocation-free
1123 // in steady state.
1124 let outcome = graph.run_once_borrowed(pool);
1125 if let Some(source) = outcome.error {
1126 let mut g = iter_err_inner.lock().unwrap();
1127 if g.is_none() {
1128 *g = Some(ExecutorError::Item {
1129 task_id: task.id.clone(),
1130 source,
1131 });
1132 }
1133 }
1134 let _ = outcome.stopped_chain; // chain-abort semantics: no extra bookkeeping at task level
1135 }
1136 }
1137 }
1138
1139 // Wait for all submitted jobs to finish before leaving
1140 // the callback scope (validates item_ptr safety contract).
1141 pool.barrier();
1142 CallbackProgression::Continue
1143 },
1144 );
1145
1146 let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
1147
1148 // iceoryx2's WaitSet catches SIGINT/SIGTERM internally; honor that
1149 // here for a clean exit.
1150 if matches!(
1151 cb_result,
1152 WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
1153 ) {
1154 return Ok(());
1155 }
1156
1157 // Extract the error before dropping the MutexGuard — avoids
1158 // holding the lock across the return (clippy::significant_drop_in_scrutinee).
1159 let maybe_err = self.iter_err.lock().unwrap().take();
1160 if let Some(err) = maybe_err {
1161 return Err(err);
1162 }
1163 if stop_flag.is_stopped() {
1164 return Ok(());
1165 }
1166
1167 iterations_done.fetch_add(1, Ordering::SeqCst);
1168 match mode {
1169 RunMode::Forever => {}
1170 RunMode::Iterations(n) => {
1171 if iterations_done.load(Ordering::SeqCst) >= *n {
1172 return Ok(());
1173 }
1174 }
1175 RunMode::Until(deadline) => {
1176 if Instant::now() >= *deadline {
1177 return Ok(());
1178 }
1179 }
1180 RunMode::Predicate(p) => {
1181 if (p)() {
1182 return Ok(());
1183 }
1184 }
1185 }
1186 }
1187 }
1188}
1189
1190/// Wraps a `*mut dyn ExecutableItem` so it can cross thread boundaries inside
1191/// `Pool::submit`. The send is safe because:
1192/// 1. The executor guarantees at most one invocation of a given item at a
1193/// time (via `pool.barrier()` before the pointer is reused).
1194/// 2. `ExecutableItem: Send`, so moving the pointee across threads is sound
1195/// when no aliasing exists.
1196#[allow(unsafe_code)]
1197struct SendItemPtr {
1198 ptr: *mut dyn ExecutableItem,
1199}
1200
1201impl SendItemPtr {
1202 fn new(ptr: *mut dyn ExecutableItem) -> Self {
1203 Self { ptr }
1204 }
1205
1206 /// Returns the raw pointer. Takes `&self` so the wrapper can be invoked
1207 /// repeatedly from an `FnMut` dispatch closure (`REQ_0060` requires the
1208 /// dispatch closure to be reusable across iterations without allocation).
1209 fn get(&self) -> *mut dyn ExecutableItem {
1210 self.ptr
1211 }
1212}
1213
1214// SAFETY: see doc comment above. `Sync` is required so the FnMut dispatch
1215// closure can borrow `&SendItemPtr` per invocation without making the
1216// closure itself `!Send`.
1217#[allow(unsafe_code)]
1218unsafe impl Send for SendItemPtr {}
1219#[allow(unsafe_code)]
1220unsafe impl Sync for SendItemPtr {}
1221
1222/// Wraps a `*mut Vec<Box<dyn ExecutableItem>>` so a chain dispatch
1223/// closure can iterate the chain's items in place without first
1224/// collecting them into a freshly-allocated `Vec`. The send is safe
1225/// for the same reason as [`SendItemPtr`] (see above): the executor
1226/// holds `&mut self` for the duration of `dispatch_loop`, and the
1227/// `pool.barrier()` at the end of each callback ensures the closure
1228/// has finished using this pointer before the Vec could be touched
1229/// from the `WaitSet` thread again. The Vec is never resized after
1230/// dispatch begins. Required for `REQ_0060` — chain dispatch must not
1231/// allocate per iteration.
1232#[allow(unsafe_code)]
1233struct SendChainPtr {
1234 ptr: *mut Vec<Box<dyn ExecutableItem>>,
1235}
1236
1237impl SendChainPtr {
1238 fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
1239 Self { ptr }
1240 }
1241
1242 fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
1243 self.ptr
1244 }
1245}
1246
1247// SAFETY: see doc comment above. `Sync` lets the FnMut dispatch closure
1248// borrow `&SendChainPtr` per invocation while staying `Send`.
1249#[allow(unsafe_code)]
1250unsafe impl Send for SendChainPtr {}
1251#[allow(unsafe_code)]
1252unsafe impl Sync for SendChainPtr {}
1253
1254/// Captured state needed by a dispatch closure to perform post-execute
1255/// fault detection. All fields are `Arc`-shared with the owning
1256/// `Executor` and `TaskEntry` so the closure can read/write them
1257/// wait-free from any pool worker thread. `REQ_0070`, `REQ_0071`,
1258/// `REQ_0102`.
1259struct FaultDispatchCtx {
1260 /// Per-task budget. `None` for chain / graph tasks (no per-task
1261 /// check) — the executor-wide iteration budget still applies.
1262 task_budget: Option<Duration>,
1263 /// Per-task fault state (shared with `TaskEntry::fault`).
1264 task_fault: Arc<FaultAtomic>,
1265 /// Per-task monotonic overrun counter (shared with
1266 /// `TaskEntry::overrun_count`). Increments on EVERY budget breach.
1267 overrun_count: Arc<AtomicU64>,
1268 /// Executor-wide iteration budget. `None` means no executor-wide
1269 /// check.
1270 iteration_budget: Option<Duration>,
1271 /// Executor-wide fault state (shared with `Executor::exec_fault`).
1272 exec_fault: Arc<ExecutorFaultAtomic>,
1273 /// Executor-wide offending-task index storage (shared with
1274 /// `Executor::exec_fault_task_idx`).
1275 exec_fault_task_idx: Arc<AtomicU32>,
1276 /// Executor-wide breached-budget storage (shared with
1277 /// `Executor::exec_fault_budget_ms`).
1278 exec_fault_budget_ms: Arc<AtomicU32>,
1279 /// Index of this task in the executor's task table.
1280 task_idx_u32: u32,
1281 /// Executor start time (shared with `Executor::start_time`).
1282 exec_start: Arc<OnceLock<Instant>>,
1283 /// Observer for `on_task_fault` / `on_executor_fault` notifications.
1284 observer: Arc<dyn Observer>,
1285}
1286
1287/// Build the per-iteration dispatch closure for a `TaskKind::Single`.
1288///
1289/// The returned closure is stored on `TaskEntry::job` and invoked once
1290/// per dispatch via `Pool::submit_borrowed`, which (unlike `submit`)
1291/// performs no allocation. The closure captures Arc clones of the
1292/// executor's shared state — those clones are refcount-only at build
1293/// time and are reused on every dispatch. Required for `REQ_0060`.
1294#[allow(clippy::too_many_arguments)]
1295fn build_single_job(
1296 id: TaskId,
1297 stop: Stoppable,
1298 obs: Arc<dyn Observer>,
1299 mon: Arc<dyn ExecutionMonitor>,
1300 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1301 app_id: Option<u32>,
1302 app_inst: Option<u32>,
1303 item_ptr: SendItemPtr,
1304 fault_ctx: FaultDispatchCtx,
1305) -> Box<dyn FnMut() + Send + 'static> {
1306 Box::new(move || {
1307 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1308 if let Some(aid) = app_id {
1309 obs.on_app_start(id.clone(), aid, app_inst);
1310 }
1311 let raw = item_ptr.get();
1312 let started = std::time::Instant::now();
1313 mon.pre_execute(id.clone(), started);
1314 // SAFETY: barrier() pairs with this invocation; the WaitSet
1315 // thread does not touch the item between `submit_borrowed` and
1316 // the matching `barrier()`. See SendItemPtr safety doc.
1317 #[allow(unsafe_code)]
1318 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
1319 let took = started.elapsed();
1320 mon.post_execute(id.clone(), started, took, res.is_ok());
1321 if let Err(ref e) = res {
1322 obs.on_app_error(id.clone(), e.as_ref());
1323 }
1324 if app_id.is_some() {
1325 obs.on_app_stop(id.clone());
1326 }
1327 post_execute_detect_fault(&id, started, took, &fault_ctx);
1328 record_first_err(&err_slot, &id, res);
1329 })
1330}
1331
1332/// Build the per-iteration dispatch closure for a fault-handler item.
1333///
1334/// Mirrors [`build_single_job`] in every detail (same monitor /
1335/// observer / first-error capture wiring) but owns the
1336/// `Box<dyn ExecutableItem>` directly inside the closure instead of
1337/// dereferencing a raw [`SendItemPtr`]. The handler has no parallel
1338/// owner inside [`TaskEntry`] — the handler closure stored in
1339/// `handler_job` is the sole owner — so the simpler owning form is
1340/// both sound and avoids the aliasing dance the main item needs.
1341/// `REQ_0072`.
1342#[allow(clippy::too_many_arguments)]
1343fn build_handler_job(
1344 id: TaskId,
1345 stop: Stoppable,
1346 obs: Arc<dyn Observer>,
1347 mon: Arc<dyn ExecutionMonitor>,
1348 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1349 app_id: Option<u32>,
1350 app_inst: Option<u32>,
1351 mut handler: Box<dyn ExecutableItem>,
1352 fault_ctx: FaultDispatchCtx,
1353) -> Box<dyn FnMut() + Send + 'static> {
1354 Box::new(move || {
1355 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1356 if let Some(aid) = app_id {
1357 obs.on_app_start(id.clone(), aid, app_inst);
1358 }
1359 let started = std::time::Instant::now();
1360 mon.pre_execute(id.clone(), started);
1361 let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
1362 let took = started.elapsed();
1363 mon.post_execute(id.clone(), started, took, res.is_ok());
1364 if let Err(ref e) = res {
1365 obs.on_app_error(id.clone(), e.as_ref());
1366 }
1367 if app_id.is_some() {
1368 obs.on_app_stop(id.clone());
1369 }
1370 // Per §4.6 invariant 5 of FEAT_0018: a handler that ALSO breaches
1371 // budget keeps the task in `Faulted` (state already `Faulted`),
1372 // `overrun_count` increments, NO new `on_task_fault` fires —
1373 // the `matches!(prev, FaultState::Running)` gate inside
1374 // `post_execute_detect_fault` enforces that.
1375 post_execute_detect_fault(&id, started, took, &fault_ctx);
1376 record_first_err(&err_slot, &id, res);
1377 })
1378}
1379
1380/// Build the per-iteration dispatch closure for a `TaskKind::Chain`.
1381#[allow(clippy::too_many_arguments)]
1382fn build_chain_job(
1383 id: TaskId,
1384 stop: Stoppable,
1385 obs: Arc<dyn Observer>,
1386 mon: Arc<dyn ExecutionMonitor>,
1387 err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
1388 chain_ptr: SendChainPtr,
1389 fault_ctx: FaultDispatchCtx,
1390) -> Box<dyn FnMut() + Send + 'static> {
1391 Box::new(move || {
1392 let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
1393 // SAFETY: barrier() pairs with this invocation; the chain Vec
1394 // and the items it owns are not touched by the WaitSet thread
1395 // until barrier() returns. See SendChainPtr safety doc.
1396 #[allow(unsafe_code)]
1397 let chain_items = unsafe { &mut *chain_ptr.get() };
1398 for item_box in chain_items.iter_mut() {
1399 let app_id = item_box.app_id();
1400 let app_inst = item_box.app_instance_id();
1401 if let Some(aid) = app_id {
1402 obs.on_app_start(id.clone(), aid, app_inst);
1403 }
1404 let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
1405 let started = std::time::Instant::now();
1406 mon.pre_execute(id.clone(), started);
1407 #[allow(unsafe_code)]
1408 let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
1409 let took = started.elapsed();
1410 mon.post_execute(id.clone(), started, took, res.is_ok());
1411 if let Err(ref e) = res {
1412 obs.on_app_error(id.clone(), e.as_ref());
1413 }
1414 if app_id.is_some() {
1415 obs.on_app_stop(id.clone());
1416 }
1417 // Per-item post-execute fault detection. `task_budget` is
1418 // `None` for chains (see `add_chain_with_id_boxed`), so the
1419 // per-task check no-ops; the executor-wide iteration-budget
1420 // check still fires per item. `REQ_0071`.
1421 post_execute_detect_fault(&id, started, took, &fault_ctx);
1422 match res {
1423 Ok(crate::ControlFlow::Continue) => {}
1424 Ok(crate::ControlFlow::StopChain) => break,
1425 Err(_) => {
1426 record_first_err(&err_slot, &id, res);
1427 break;
1428 }
1429 }
1430 }
1431 })
1432}
1433
1434#[derive(Debug)]
1435struct PanickedTask(String);
1436
1437impl core::fmt::Display for PanickedTask {
1438 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1439 write!(f, "task panicked: {}", self.0)
1440 }
1441}
1442
1443impl std::error::Error for PanickedTask {}
1444
1445/// Execute `item` inside `catch_unwind`, converting any panic into an `Err`.
1446#[allow(clippy::option_if_let_else)]
1447fn run_item_catch_unwind(
1448 item: &mut dyn ExecutableItem,
1449 ctx: &mut crate::context::Context<'_>,
1450) -> crate::ExecuteResult {
1451 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
1452 |payload| {
1453 let msg = if let Some(s) = payload.downcast_ref::<&str>() {
1454 (*s).to_string()
1455 } else if let Some(s) = payload.downcast_ref::<String>() {
1456 s.clone()
1457 } else {
1458 "panicked task".to_string()
1459 };
1460 Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
1461 },
1462 )
1463}
1464
1465/// Public-within-crate wrapper so `graph.rs` can call `run_item_catch_unwind`
1466/// without depending on its private name.
1467pub(crate) fn run_item_catch_unwind_external(
1468 item: &mut dyn ExecutableItem,
1469 ctx: &mut crate::context::Context<'_>,
1470) -> crate::ExecuteResult {
1471 run_item_catch_unwind(item, ctx)
1472}
1473
1474/// Record the first error into `slot`. Subsequent errors are silently dropped.
1475fn record_first_err(
1476 slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
1477 id: &TaskId,
1478 res: crate::ExecuteResult,
1479) {
1480 if let Err(source) = res {
1481 let mut g = slot.lock().unwrap();
1482 if g.is_none() {
1483 *g = Some(ExecutorError::Item {
1484 task_id: id.clone(),
1485 source,
1486 });
1487 }
1488 }
1489}
1490
1491/// Post-execute fault detection — runs on a pool worker AFTER
1492/// `mon.post_execute` so the full `took` is available. Implements:
1493///
1494/// * `REQ_0070` / `REQ_0102` — per-task budget overrun: increments
1495/// `overrun_count` on every breach, transitions
1496/// `Running -> Faulted{BudgetExceeded}` exactly once (subsequent
1497/// breaches keep the state `Faulted` and do NOT re-fire the
1498/// observer).
1499/// * `REQ_0071` — executor-wide iteration overrun: transitions
1500/// `Running -> Faulted{IterationBudgetExceeded}` exactly once;
1501/// cascade to per-task state is LAZY (see the pre-dispatch block
1502/// in `dispatch_loop`), so the per-task `on_task_fault` does NOT
1503/// fire during cascade — only `on_executor_fault` does.
1504fn post_execute_detect_fault(
1505 id: &TaskId,
1506 started: Instant,
1507 took: Duration,
1508 fault_ctx: &FaultDispatchCtx,
1509) {
1510 // REQ_0070 / REQ_0102 — per-task budget overrun.
1511 if let Some(budget) = fault_ctx.task_budget {
1512 if took > budget {
1513 fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
1514 let took_ms = duration_to_ms_sat(took);
1515 let budget_ms = duration_to_ms_sat(budget);
1516 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
1517 let since_ms = instant_to_since_ms(started, exec_start);
1518 let new_state = FaultState::Faulted {
1519 reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
1520 since_ms,
1521 };
1522 let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
1523 if matches!(prev, FaultState::Running) {
1524 fault_ctx.observer.on_task_fault(
1525 id.clone(),
1526 FaultReason::BudgetExceeded { took_ms, budget_ms },
1527 );
1528 }
1529 }
1530 }
1531
1532 // REQ_0071 — executor-wide iteration overrun.
1533 if let Some(iter_budget) = fault_ctx.iteration_budget {
1534 if took > iter_budget {
1535 let took_ms = duration_to_ms_sat(took);
1536 let budget_ms = duration_to_ms_sat(iter_budget);
1537 let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
1538 let since_ms = instant_to_since_ms(started, exec_start);
1539 fault_ctx
1540 .exec_fault_task_idx
1541 .store(fault_ctx.task_idx_u32, Ordering::Release);
1542 fault_ctx
1543 .exec_fault_budget_ms
1544 .store(budget_ms, Ordering::Release);
1545 let new_state = ExecutorFaultState::Faulted {
1546 reason: ExecutorFaultReason::IterationBudgetExceeded {
1547 task_idx: fault_ctx.task_idx_u32,
1548 took_ms,
1549 budget_ms,
1550 },
1551 since_ms,
1552 };
1553 let prev = fault_ctx
1554 .exec_fault
1555 .swap(new_state, fault_ctx.task_idx_u32, budget_ms);
1556 if matches!(prev, ExecutorFaultState::Running) {
1557 fault_ctx.observer.on_executor_fault(
1558 ExecutorFaultReason::IterationBudgetExceeded {
1559 task_idx: fault_ctx.task_idx_u32,
1560 took_ms,
1561 budget_ms,
1562 },
1563 );
1564 // NO eager cascade here. Cascade is lazy: the
1565 // pre-dispatch block in `dispatch_loop` transitions
1566 // each `Running` task to `Faulted{ExecutorFaulted}` on
1567 // the next wakeup — silently, so per-task observers
1568 // do not fire (see §4.6 invariant on cascade-noise).
1569 }
1570 }
1571 }
1572}
1573
1574// ── ExecutorGraphBuilder ──────────────────────────────────────────────────────
1575
1576/// Borrowed wrapper that finalises a [`GraphBuilder`](crate::graph::GraphBuilder)
1577/// into a registered task.
1578pub struct ExecutorGraphBuilder<'e> {
1579 executor: &'e mut Executor,
1580 builder: crate::graph::GraphBuilder,
1581 custom_id: Option<TaskId>,
1582}
1583
1584impl ExecutorGraphBuilder<'_> {
1585 /// Add a vertex to the graph; returns its handle.
1586 pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
1587 self.builder.vertex(item)
1588 }
1589
1590 /// Add a directed edge from one vertex to another.
1591 pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
1592 self.builder.edge(from, to);
1593 self
1594 }
1595
1596 /// Designate the root vertex (its triggers gate the graph).
1597 pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
1598 self.builder.root(v);
1599 self
1600 }
1601
1602 /// Override the auto-generated id with a custom one.
1603 pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
1604 self.custom_id = Some(id.into());
1605 self
1606 }
1607
1608 /// Validate and register the graph. Returns the task id.
1609 ///
1610 /// The root vertex's [`ExecutableItem::task_id`] override takes precedence
1611 /// over any id set via [`ExecutorGraphBuilder::id`], which itself takes
1612 /// precedence over the auto-generated id.
1613 pub fn build(self) -> Result<TaskId, ExecutorError> {
1614 let g = self.builder.finish()?;
1615 // Root vertex's task_id() override wins over the custom id, which wins
1616 // over the auto-generated fallback.
1617 let auto_id = || {
1618 TaskId::new(format!(
1619 "graph-{}",
1620 self.executor.next_id.fetch_add(1, Ordering::SeqCst)
1621 ))
1622 };
1623 let id = g
1624 .root_task_id()
1625 .map(TaskId::new)
1626 .or(self.custom_id)
1627 .unwrap_or_else(auto_id);
1628 let decls = g.decls.clone();
1629
1630 // Box the graph for address stability — per-vertex dispatch
1631 // closures capture `*const Graph` and must not see it move.
1632 let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
1633 // Pre-build the per-vertex closures now that we know the
1634 // task_id and have access to the executor's shared state.
1635 graph_box.prepare_dispatch(
1636 id.clone(),
1637 self.executor.stoppable.clone(),
1638 Arc::clone(&self.executor.observer),
1639 Arc::clone(&self.executor.monitor),
1640 Arc::clone(&self.executor.iter_err),
1641 );
1642
1643 self.executor.tasks.push(TaskEntry {
1644 id: id.clone(),
1645 kind: TaskKind::Graph(graph_box),
1646 decls,
1647 // Graph tasks dispatch their vertices via `vertex_jobs`
1648 // stored inside the `Graph`; the per-task `job` slot
1649 // is unused for graphs.
1650 job: None,
1651 // TODO(post-Task-10): graph budgets carried separately; for now None.
1652 budget: None,
1653 fault: Arc::new(FaultAtomic::new()),
1654 overrun_count: Arc::new(AtomicU64::new(0)),
1655 handler_job: None,
1656 });
1657 Ok(id)
1658 }
1659}
1660
1661// ── Unit tests ────────────────────────────────────────────────────────────────
1662
1663#[cfg(test)]
1664mod tests {
1665 use super::*;
1666 use crate::{ControlFlow, item};
1667
1668 #[test]
1669 fn add_returns_unique_ids() {
1670 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1671 let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1672 let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
1673 assert_ne!(a, b);
1674 }
1675
1676 #[test]
1677 fn custom_id_is_preserved() {
1678 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1679 let id = exec
1680 .add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
1681 .unwrap();
1682 assert_eq!(id.as_str(), "my-task");
1683 }
1684
1685 #[test]
1686 fn add_persists_declared_budget() {
1687 use core::time::Duration;
1688 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1689 let task_id = exec
1690 .add(crate::item::item_with_triggers(
1691 |d| {
1692 d.interval(Duration::from_millis(10));
1693 d.budget(Duration::from_millis(5));
1694 Ok(())
1695 },
1696 |_| Ok(crate::ControlFlow::Continue),
1697 ))
1698 .unwrap();
1699 let entry = exec
1700 .tasks
1701 .iter()
1702 .find(|t| t.id == task_id)
1703 .expect("task present");
1704 assert_eq!(entry.budget, Some(Duration::from_millis(5)));
1705 }
1706
1707 #[test]
1708 fn add_with_fault_handler_stores_handler_job() {
1709 use core::time::Duration;
1710 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1711 let task_id = exec
1712 .add_with_fault_handler(
1713 crate::item::item_with_triggers(
1714 |d| {
1715 d.interval(Duration::from_millis(10));
1716 d.budget(Duration::from_millis(5));
1717 Ok(())
1718 },
1719 |_| Ok(crate::ControlFlow::Continue),
1720 ),
1721 crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
1722 )
1723 .unwrap();
1724 let entry = exec
1725 .tasks
1726 .iter()
1727 .find(|t| t.id == task_id)
1728 .expect("task present");
1729 assert!(
1730 entry.handler_job.is_some(),
1731 "handler_job should be Some after add_with_fault_handler"
1732 );
1733 // Main job should still be present.
1734 assert!(entry.job.is_some(), "main job should still be present");
1735 }
1736
1737 #[test]
1738 fn declare_triggers_called_at_add_time() {
1739 let called = Arc::new(AtomicBool::new(false));
1740 let called_d = Arc::clone(&called);
1741
1742 let it = crate::item::item_with_triggers(
1743 move |_d| {
1744 called_d.store(true, Ordering::SeqCst);
1745 Ok(())
1746 },
1747 |_| Ok(ControlFlow::Continue),
1748 );
1749
1750 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1751 exec.add(it).unwrap();
1752 assert!(called.load(Ordering::SeqCst));
1753 }
1754
1755 #[test]
1756 fn clear_task_fault_errors_on_running_task() {
1757 use core::time::Duration;
1758 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1759 let task_id = exec
1760 .add(crate::item::item_with_triggers(
1761 |d| {
1762 d.interval(Duration::from_millis(10));
1763 Ok(())
1764 },
1765 |_| Ok(crate::ControlFlow::Continue),
1766 ))
1767 .unwrap();
1768 // Task starts in Running state — clearing should error.
1769 let err = exec.clear_task_fault(task_id).expect_err("not faulted");
1770 assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
1771 }
1772
1773 #[test]
1774 fn clear_executor_fault_errors_on_running_executor() {
1775 let exec = Executor::builder().worker_threads(0).build().unwrap();
1776 let err = exec.clear_executor_fault().expect_err("not faulted");
1777 assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
1778 }
1779
1780 #[test]
1781 fn overrun_count_returns_zero_for_new_task() {
1782 use core::time::Duration;
1783 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1784 let task_id = exec
1785 .add(crate::item::item_with_triggers(
1786 |d| {
1787 d.interval(Duration::from_millis(10));
1788 d.budget(Duration::from_millis(5));
1789 Ok(())
1790 },
1791 |_| Ok(crate::ControlFlow::Continue),
1792 ))
1793 .unwrap();
1794 assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
1795 }
1796
1797 #[test]
1798 fn overrun_count_errors_for_unknown_task() {
1799 let exec = Executor::builder().worker_threads(0).build().unwrap();
1800 let err = exec
1801 .overrun_count(crate::TaskId::new("nope"))
1802 .expect_err("unknown task");
1803 assert!(matches!(err, ExecutorError::TaskNotFound(_)));
1804 }
1805
1806 #[test]
1807 fn task_fault_state_starts_running() {
1808 use core::time::Duration;
1809 let mut exec = Executor::builder().worker_threads(0).build().unwrap();
1810 let task_id = exec
1811 .add(crate::item::item_with_triggers(
1812 |d| {
1813 d.interval(Duration::from_millis(10));
1814 Ok(())
1815 },
1816 |_| Ok(crate::ControlFlow::Continue),
1817 ))
1818 .unwrap();
1819 assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
1820 }
1821
1822 #[test]
1823 fn executor_fault_state_starts_running() {
1824 let exec = Executor::builder().worker_threads(0).build().unwrap();
1825 assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
1826 }
1827}