#![allow(dead_code)]
#![allow(clippy::redundant_pub_crate)]
use crate::Channel;
use crate::clock::{MonotonicClock, SystemClock};
use crate::context::Stoppable;
use crate::error::ExecutorError;
use crate::fatal::{FatalDispatch, FatalHandler, FatalSite, guard_or_fatal, panic_payload_message};
use crate::fault::{
ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
FaultState, duration_to_ms_sat, instant_to_since_ms,
};
use crate::item::ExecutableItem;
use crate::monitor::{ExecutionMonitor, NoopMonitor};
use crate::observer::{NoopObserver, Observer};
use crate::payload::Payload;
use crate::pool::Pool;
use crate::stats::{CycleObservation, StatsSnapshot, TaskStatsEntry};
use crate::task_id::TaskId;
use crate::task_kind::TaskKind;
use crate::thread_attrs::ThreadAttributes;
use crate::trigger::{TriggerDecl, TriggerDeclarer};
use core::sync::atomic::AtomicU32;
use iceoryx2::node::Node;
use iceoryx2::port::listener::Listener as IxListener;
use iceoryx2::prelude::ipc;
use iceoryx2::prelude::*;
use iceoryx2::waitset::WaitSetRunResult;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use taktora_stats::ExecutorCycleStats;
static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) type TaskCycleStats = ExecutorCycleStats<8, 256>;
#[derive(Clone, Copy)]
pub(crate) struct CyclePending {
pub(crate) pre: u64,
pub(crate) faulted: bool,
}
pub(crate) struct TaskEntry {
pub(crate) id: TaskId,
pub(crate) kind: TaskKind,
pub(crate) decls: Vec<TriggerDecl>,
pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
pub(crate) budget: Option<Duration>,
pub(crate) fault: Arc<FaultAtomic>,
pub(crate) overrun_count: Arc<AtomicU64>,
pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
pub(crate) scan_period: Option<Duration>,
pub(crate) last_took_ns: Arc<AtomicU64>,
pub(crate) last_dispatch: Option<u64>,
pub(crate) grid_slot: u64,
pub(crate) pending_cycle: Option<CyclePending>,
}
pub struct Executor {
pub(crate) node: Node<ipc::Service>,
pub(crate) pool: Arc<Pool>,
pub(crate) tasks: Vec<TaskEntry>,
pub(crate) cycle_stats: Vec<TaskCycleStats>,
pub(crate) stats_window: u32,
pub(crate) running: Arc<AtomicBool>,
pub(crate) stoppable: Stoppable,
pub(crate) next_id: AtomicU64,
pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
pub(crate) observer: Arc<dyn Observer>,
pub(crate) monitor: Arc<dyn ExecutionMonitor>,
pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
pub(crate) iteration_budget: Option<Duration>,
pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
pub(crate) start_time: Arc<OnceLock<Instant>>,
pub(crate) fatal_dispatch: Arc<FatalDispatch>,
pub(crate) clock: Arc<dyn MonotonicClock>,
pub(crate) grid_epoch: Arc<OnceLock<u64>>,
pub(crate) dispatch_mode: crate::DispatchMode,
pub(crate) cyclic_clock: std::sync::Arc<dyn crate::CyclicClock>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Executor {}
impl Executor {
#[must_use]
pub fn builder() -> ExecutorBuilder {
ExecutorBuilder::default()
}
pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
Channel::open_or_create(&self.node, name)
}
pub fn service<Req, Resp>(
&mut self,
name: &str,
) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
where
Req: Payload,
Resp: Payload,
{
crate::Service::open_or_create(&self.node, name)
}
#[must_use]
pub fn stats_snapshot(&self) -> StatsSnapshot {
let per_task = self
.tasks
.iter()
.zip(self.cycle_stats.iter())
.map(|(t, s)| {
let snap = s.snapshot();
TaskStatsEntry {
task_id: t.id.clone(),
p50_ns: snap.p50_ns,
p95_ns: snap.p95_ns,
p99_ns: snap.p99_ns,
min_ns: snap.min_ns,
max_ns: snap.max_ns,
max_jitter_ns: snap.max_jitter_ns,
max_lateness_ns: snap.max_lateness_ns,
overrun_count: t.overrun_count.load(Ordering::Acquire),
}
})
.collect();
StatsSnapshot { per_task }
}
pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
let id = TaskId::new(format!(
"task-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
self.add_with_id(id, item)
}
pub fn add_with_id(
&mut self,
id: impl Into<TaskId>,
mut item: impl ExecutableItem,
) -> Result<TaskId, ExecutorError> {
let id_arg: TaskId = id.into();
let id = item.task_id().map_or(id_arg, TaskId::new);
let mut declarer = TriggerDeclarer::new_internal();
item.declare_triggers(&mut declarer)?;
let budget = declarer.budget;
let decls = declarer.into_decls();
validate_decls(&id, &decls)?;
let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
#[allow(unsafe_code)]
let item_ptr =
SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
let task_fault = Arc::new(FaultAtomic::new());
let overrun_count = Arc::new(AtomicU64::new(0));
let scan_period = scan_period_from_decls(&decls);
let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = self.tasks.len() as u32;
let fault_ctx = FaultDispatchCtx {
task_budget: budget,
task_fault: Arc::clone(&task_fault),
overrun_count: Arc::clone(&overrun_count),
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let job = build_single_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
app_id,
app_inst,
item_ptr,
fault_ctx,
Arc::clone(&last_took_ns),
Arc::clone(&self.clock),
);
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Single(item_box),
decls,
job: Some(job),
budget,
fault: task_fault,
overrun_count,
handler_job: None,
scan_period,
last_took_ns: Arc::clone(&last_took_ns),
last_dispatch: None,
grid_slot: 0,
pending_cycle: None,
});
self.cycle_stats
.push(TaskCycleStats::new(self.stats_window));
Ok(id)
}
pub fn add_with_fault_handler<I, H>(
&mut self,
main: I,
handler: H,
) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
H: ExecutableItem,
{
let task_id = self.add(main)?;
let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
let mut throwaway = TriggerDeclarer::new_internal();
handler_box.declare_triggers(&mut throwaway)?;
drop(throwaway);
let app_id = handler_box.app_id();
let app_inst = handler_box.app_instance_id();
let task_idx = self
.tasks
.iter()
.position(|t| t.id == task_id)
.expect("just added; must exist");
let task = &self.tasks[task_idx];
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = task_idx as u32;
let handler_fault_ctx = FaultDispatchCtx {
task_budget: task.budget,
task_fault: Arc::clone(&task.fault),
overrun_count: Arc::clone(&task.overrun_count),
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let handler_closure = build_handler_job(
task_id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
app_id,
app_inst,
handler_box,
handler_fault_ctx,
);
self.tasks[task_idx].handler_job = Some(handler_closure);
Ok(task_id)
}
pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
let entry = self
.tasks
.iter()
.find(|t| t.id == task)
.ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
let prev = entry.fault.swap(FaultState::Running, budget_ms);
match prev {
FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
FaultState::Faulted { .. } => {
self.observer.on_task_clear(task);
Ok(prev)
}
}
}
pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
let prev = self
.exec_fault
.swap(ExecutorFaultState::Running, task_idx, budget_ms);
match prev {
ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
ExecutorFaultState::Faulted { .. } => {
for entry in &self.tasks {
let task_budget_ms =
entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
if let FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
..
} = entry.fault.load(task_budget_ms)
{
let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
self.observer.on_task_clear(entry.id.clone());
}
}
self.observer.on_executor_clear();
Ok(prev)
}
}
}
pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
self.tasks
.iter()
.find(|t| t.id == task)
.map(|t| t.overrun_count.load(Ordering::Acquire))
.ok_or_else(|| ExecutorError::TaskNotFound(task))
}
pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
self.tasks
.iter()
.find(|t| t.id == task)
.map(|t| {
let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
t.fault.load(budget_ms)
})
.ok_or_else(|| ExecutorError::TaskNotFound(task))
}
#[must_use]
pub fn executor_fault_state(&self) -> ExecutorFaultState {
let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
self.exec_fault.load(task_idx, budget_ms)
}
pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let id = TaskId::new(format!(
"chain-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id, boxed)
}
pub fn add_chain_with_id<I, C>(
&mut self,
id: impl Into<TaskId>,
items: C,
) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id.into(), boxed)
}
fn add_chain_with_id_boxed(
&mut self,
id: TaskId,
mut items: Vec<Box<dyn ExecutableItem>>,
) -> Result<TaskId, ExecutorError> {
if items.is_empty() {
return Err(ExecutorError::Builder(
"chain must contain at least one item".into(),
));
}
let id = items[0].task_id().map_or(id, TaskId::new);
let mut head_declarer = TriggerDeclarer::new_internal();
items[0].declare_triggers(&mut head_declarer)?;
let decls = head_declarer.into_decls();
validate_decls(&id, &decls)?;
for (i, body) in items.iter_mut().enumerate().skip(1) {
let mut spurious = TriggerDeclarer::new_internal();
let _ = body.declare_triggers(&mut spurious);
if !spurious.is_empty() {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "taktora-executor",
task = %id,
position = i,
"non-head chain item declared triggers; they will be ignored"
);
#[cfg(not(feature = "tracing"))]
{
let _ = i;
}
}
}
let mut items = items;
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
&mut items,
));
let _ = chain_ptr;
let task_fault = Arc::new(FaultAtomic::new());
let overrun_count = Arc::new(AtomicU64::new(0));
let scan_period = scan_period_from_decls(&decls);
let last_took_ns = Arc::new(AtomicU64::new(u64::MAX));
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = self.tasks.len() as u32;
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Chain(items),
decls,
job: None, budget: None,
fault: Arc::clone(&task_fault),
overrun_count: Arc::clone(&overrun_count),
handler_job: None,
scan_period,
last_took_ns: Arc::clone(&last_took_ns),
last_dispatch: None,
grid_slot: 0,
pending_cycle: None,
});
self.cycle_stats
.push(TaskCycleStats::new(self.stats_window));
let task_idx = self.tasks.len() - 1;
let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
{
TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
_ => unreachable!("just-pushed task is TaskKind::Chain"),
};
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(chain_vec_ptr);
let fault_ctx = FaultDispatchCtx {
task_budget: None, task_fault,
overrun_count,
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let job = build_chain_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
chain_ptr,
fault_ctx,
Arc::clone(&last_took_ns),
Arc::clone(&self.clock),
);
self.tasks[task_idx].job = Some(job);
Ok(id)
}
#[must_use]
pub fn stoppable(&self) -> Stoppable {
self.stoppable.clone()
}
pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
&self.node
}
pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
ExecutorGraphBuilder {
executor: self,
builder: crate::graph::GraphBuilder::new(),
custom_id: None,
}
}
}
pub struct ExecutorBuilder {
worker_threads: Option<usize>,
observer: Option<Arc<dyn Observer>>,
monitor: Option<Arc<dyn ExecutionMonitor>>,
worker_attrs: ThreadAttributes,
iteration_budget: Option<Duration>,
fatal_handler: Option<FatalHandler>,
stats_window: Option<u32>,
clock: Option<Arc<dyn MonotonicClock>>,
dispatch_mode: crate::DispatchMode,
cyclic_clock: Option<std::sync::Arc<dyn crate::CyclicClock>>,
}
impl Default for ExecutorBuilder {
fn default() -> Self {
Self {
worker_threads: None,
observer: None,
monitor: None,
worker_attrs: ThreadAttributes::new(),
iteration_budget: None,
fatal_handler: None,
stats_window: None,
clock: None,
dispatch_mode: crate::DispatchMode::default(),
cyclic_clock: None,
}
}
}
impl ExecutorBuilder {
#[must_use]
pub const fn worker_threads(mut self, n: usize) -> Self {
self.worker_threads = Some(n);
self
}
#[must_use]
pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
self.observer = Some(obs);
self
}
#[must_use]
pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
self.monitor = Some(mon);
self
}
#[must_use]
pub const fn iteration_budget(mut self, dur: Duration) -> Self {
self.iteration_budget = Some(dur);
self
}
#[must_use]
pub const fn stats_window(mut self, samples: u32) -> Self {
self.stats_window = Some(samples);
self
}
#[must_use]
pub fn clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
self.clock = Some(clock);
self
}
#[must_use]
pub const fn dispatch_mode(mut self, mode: crate::DispatchMode) -> Self {
self.dispatch_mode = mode;
self
}
#[must_use]
pub fn cyclic_clock(mut self, clock: std::sync::Arc<dyn crate::CyclicClock>) -> Self {
self.cyclic_clock = Some(clock);
self
}
#[must_use]
#[allow(clippy::missing_const_for_fn)]
pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
self.worker_attrs = attrs;
self
}
#[must_use]
pub fn on_fatal(
mut self,
handler: impl Fn(&crate::FatalContext) + Send + Sync + 'static,
) -> Self {
self.fatal_handler = Some(Arc::new(handler));
self
}
#[allow(clippy::arc_with_non_send_sync)] #[track_caller]
pub fn build(self) -> Result<Executor, ExecutorError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(ExecutorError::iceoryx2)?;
let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
let fatal_handler: FatalHandler = self
.fatal_handler
.unwrap_or_else(|| Arc::new(|_ctx: &crate::FatalContext| {}));
let fatal_dispatch = Arc::new(FatalDispatch::new(fatal_handler));
let pool = Arc::new(Pool::new(
n_workers,
self.worker_attrs,
Arc::clone(&fatal_dispatch),
)?);
let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
let stop_topic = format!(
"taktora.exec.stop.{}.{exec_seq}.__taktora_event",
std::process::id()
);
let stop_event = node
.service_builder(&stop_topic.as_str().try_into().unwrap())
.event()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
let stop_notifier = Arc::new(
stop_event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stop_listener = Arc::new(
stop_event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stoppable = Stoppable::with_waker(stop_notifier);
let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
let monitor: Arc<dyn ExecutionMonitor> =
self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
let clock: Arc<dyn MonotonicClock> =
self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
let cyclic_clock: std::sync::Arc<dyn crate::CyclicClock> = self
.cyclic_clock
.unwrap_or_else(|| std::sync::Arc::new(crate::MonotonicCyclicClock::new()));
let exec = Executor {
node,
pool,
tasks: Vec::new(),
cycle_stats: Vec::new(),
stats_window: self.stats_window.unwrap_or(1024),
running: Arc::new(AtomicBool::new(false)),
stoppable,
next_id: AtomicU64::new(0),
stop_listener,
observer,
monitor,
iter_err: Arc::new(std::sync::Mutex::new(None)),
iteration_budget: self.iteration_budget,
exec_fault: Arc::new(ExecutorFaultAtomic::new()),
exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
start_time: Arc::new(OnceLock::new()),
fatal_dispatch,
clock,
grid_epoch: Arc::new(OnceLock::new()),
dispatch_mode: self.dispatch_mode,
cyclic_clock,
};
Ok(exec)
}
}
impl Executor {
pub fn run(&mut self) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Forever)
}
pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Until(Instant::now() + max))
}
pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Iterations(n))
}
pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Predicate(&mut predicate))
}
}
enum RunMode<'a> {
Forever,
Until(Instant),
Iterations(usize),
Predicate(&'a mut dyn FnMut() -> bool),
}
impl Executor {
fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(ExecutorError::AlreadyRunning);
}
self.observer.on_executor_up();
let result = self.dispatch_loop(&mut mode);
match &result {
Ok(()) => self.observer.on_executor_down(),
Err(e) => self.observer.on_executor_error(e),
}
self.running.store(false, Ordering::SeqCst);
result
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(
unsafe_code,
clippy::too_many_lines,
clippy::ref_as_ptr,
clippy::borrow_as_ptr
)]
fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
.create()
.map_err(ExecutorError::iceoryx2)?;
let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
let mut attachment_to_task: Vec<usize> = Vec::new();
let dispatch_mode = self.dispatch_mode;
let mut cyclic_task_indices: Vec<usize> = Vec::new();
let mut cyclic_periods: Vec<u64> = Vec::new();
build_attachments(
&waitset,
&self.tasks,
dispatch_mode,
&mut listener_storage,
&mut guards,
&mut attachment_to_task,
&mut cyclic_task_indices,
&mut cyclic_periods,
)?;
let mut grid =
crate::grid::GridTimer::new(self.cyclic_clock.now_nanos(), cyclic_periods.clone());
let mut due_cyclic: Vec<usize> = Vec::new();
#[cfg(target_os = "linux")]
let master_timer: Option<crate::timerfd::TimerFd> = {
let base = crate::grid::base_period(&cyclic_periods);
if base == 0 {
None
} else {
Some(
crate::timerfd::TimerFd::new(std::time::Duration::from_nanos(base)).map_err(
|e| {
ExecutorError::DeclareTriggers(format!(
"failed to arm master timerfd: {e}"
))
},
)?,
)
}
};
#[cfg(target_os = "linux")]
#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
let _master_timer_guard = match &master_timer {
Some(tf) => Some(
waitset
.attach_notification(unsafe { &*(tf as *const crate::timerfd::TimerFd) })
.map_err(ExecutorError::iceoryx2)?,
),
None => None,
};
let stop_listener_ref: &IxListener<ipc::Service> =
unsafe { &*(self.stop_listener.as_ref() as *const _) };
let _stop_guard = waitset
.attach_notification(stop_listener_ref)
.map_err(ExecutorError::iceoryx2)?;
let iterations_done = AtomicUsize::new(0);
let stop_flag = self.stoppable.clone();
loop {
#[allow(clippy::unwrap_used)]
let mut iter_err_guard = self.iter_err.lock().unwrap();
*iter_err_guard = None;
drop(iter_err_guard);
let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
let cycle_stats_ptr = &mut self.cycle_stats as *mut Vec<TaskCycleStats>;
let observer = &self.observer;
let pool = &self.pool;
let iter_err_inner = Arc::clone(&self.iter_err);
let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
let clock = &self.clock;
let grid_epoch_ptr = &*self.grid_epoch as *const OnceLock<u64>;
let Some(cb_result) =
guard_or_fatal(&self.fatal_dispatch, FatalSite::ExecutorRunLoop, || {
let mut pass = DispatchPass {
guards: &guards,
attachment_to_task: &attachment_to_task,
tasks_ptr,
cycle_stats_ptr,
observer,
exec_fault_ptr,
exec_start_ptr,
clock,
grid_epoch_ptr,
stop_listener_ptr,
pool,
iter_err: &iter_err_inner,
};
#[cfg(target_os = "linux")]
let timeout = std::time::Duration::MAX;
#[cfg(not(target_os = "linux"))]
let timeout = match dispatch_mode {
crate::DispatchMode::Grid => {
grid.next_timeout(self.cyclic_clock.now_nanos())
}
crate::DispatchMode::Legacy => std::time::Duration::MAX,
};
waitset.wait_and_process_once_with_timeout(
|attachment_id: WaitSetAttachmentId<ipc::Service>| {
pass.process_attachment(&attachment_id)
},
timeout,
)
})
else {
break Ok(());
};
#[cfg(target_os = "linux")]
let ticked = master_timer.as_ref().is_some_and(|tf| tf.drain() > 0);
#[cfg(not(target_os = "linux"))]
let ticked = true;
let cpass = DispatchPass {
guards: &guards,
attachment_to_task: &attachment_to_task,
tasks_ptr,
cycle_stats_ptr,
observer,
exec_fault_ptr,
exec_start_ptr,
clock,
grid_epoch_ptr,
stop_listener_ptr,
pool,
iter_err: &iter_err_inner,
};
run_grid_cyclic_pass(
cpass,
ticked,
dispatch_mode,
&stop_flag,
cb_result,
&mut grid,
self.cyclic_clock.now_nanos(),
&cyclic_task_indices,
&mut due_cyclic,
);
match self.after_callback(cb_result, mode, &iterations_done, &stop_flag) {
IterOutcome::Continue => {}
IterOutcome::Done => break Ok(()),
IterOutcome::Failed(err) => break Err(err),
}
}
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
fn after_callback(
&self,
cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
mode: &mut RunMode<'_>,
iterations_done: &AtomicUsize,
stop_flag: &Stoppable,
) -> IterOutcome {
let cb_result = match cb_result.map_err(ExecutorError::iceoryx2) {
Ok(r) => r,
Err(e) => return IterOutcome::Failed(e),
};
if matches!(
cb_result,
WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
) {
return IterOutcome::Done;
}
#[allow(clippy::unwrap_used)]
let maybe_err = self.iter_err.lock().unwrap().take();
if let Some(err) = maybe_err {
return IterOutcome::Failed(err);
}
if stop_flag.is_stopped() {
return IterOutcome::Done;
}
iterations_done.fetch_add(1, Ordering::SeqCst);
let reached_limit = match mode {
RunMode::Forever => false,
RunMode::Iterations(n) => iterations_done.load(Ordering::SeqCst) >= *n,
RunMode::Until(deadline) => Instant::now() >= *deadline,
RunMode::Predicate(p) => (p)(),
};
if reached_limit {
IterOutcome::Done
} else {
IterOutcome::Continue
}
}
}
enum IterOutcome {
Continue,
Done,
Failed(ExecutorError),
}
#[allow(clippy::too_many_arguments)]
fn run_grid_cyclic_pass(
mut pass: DispatchPass<'_, '_, '_>,
ticked: bool,
dispatch_mode: crate::DispatchMode,
stop_flag: &Stoppable,
cb_result: Result<WaitSetRunResult, iceoryx2::waitset::WaitSetRunError>,
grid: &mut crate::grid::GridTimer,
now_nanos: u64,
cyclic_task_indices: &[usize],
due_cyclic: &mut Vec<usize>,
) {
let stopping = stop_flag.is_stopped()
|| matches!(
cb_result,
Ok(WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest)
);
if !ticked
|| stopping
|| dispatch_mode != crate::DispatchMode::Grid
|| cyclic_task_indices.is_empty()
{
return;
}
grid.take_due(now_nanos, due_cyclic);
if due_cyclic.is_empty() {
return;
}
for slot in due_cyclic.iter() {
pass.dispatch_task(cyclic_task_indices[*slot]);
}
pass.barrier_and_record();
}
#[allow(clippy::too_many_arguments)]
fn build_attachments<'w>(
waitset: &'w WaitSet<ipc::Service>,
tasks: &[TaskEntry],
dispatch_mode: crate::DispatchMode,
listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
guards: &mut Vec<WaitSetGuard<'w, 'w, ipc::Service>>,
attachment_to_task: &mut Vec<usize>,
cyclic_task_indices: &mut Vec<usize>,
cyclic_periods: &mut Vec<u64>,
) -> Result<(), ExecutorError> {
for (task_idx, task) in tasks.iter().enumerate() {
for decl in &task.decls {
if dispatch_mode == crate::DispatchMode::Grid {
if let TriggerDecl::Interval(d) = decl {
cyclic_task_indices.push(task_idx);
cyclic_periods.push(u64::try_from(d.as_nanos()).unwrap_or(u64::MAX));
continue;
}
}
let guard = attach_trigger_decl(waitset, listener_storage, decl)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
}
Ok(())
}
#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
fn attach_trigger_decl<'w>(
waitset: &'w WaitSet<ipc::Service>,
listener_storage: &mut Vec<Arc<crate::trigger::RawListener>>,
decl: &TriggerDecl,
) -> Result<WaitSetGuard<'w, 'w, ipc::Service>, ExecutorError> {
let mut listener_ref = |listener: &Arc<crate::trigger::RawListener>| {
listener_storage.push(Arc::clone(listener));
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
l_ref
};
let guard = match decl {
TriggerDecl::Subscriber { listener } | TriggerDecl::RawListener(listener) => {
waitset.attach_notification(listener_ref(listener))
}
TriggerDecl::Interval(d) => waitset.attach_interval(*d),
TriggerDecl::Deadline { listener, deadline } => {
waitset.attach_deadline(listener_ref(listener), *deadline)
}
};
guard.map_err(ExecutorError::iceoryx2)
}
struct DispatchPass<'a, 'g, 'w> {
guards: &'a [WaitSetGuard<'g, 'w, ipc::Service>],
attachment_to_task: &'a [usize],
tasks_ptr: *mut Vec<TaskEntry>,
cycle_stats_ptr: *mut Vec<TaskCycleStats>,
observer: &'a Arc<dyn Observer>,
exec_fault_ptr: *const ExecutorFaultAtomic,
exec_start_ptr: *const OnceLock<Instant>,
clock: &'a Arc<dyn MonotonicClock>,
grid_epoch_ptr: *const OnceLock<u64>,
stop_listener_ptr: *const IxListener<ipc::Service>,
pool: &'a Pool,
iter_err: &'a Arc<std::sync::Mutex<Option<ExecutorError>>>,
}
impl DispatchPass<'_, '_, '_> {
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(unsafe_code)]
fn dispatch_task(&mut self, task_idx: usize) {
let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
if self.handle_fault_routing(task) {
if task.scan_period.is_some() {
task.pending_cycle = Some(CyclePending {
pre: self.clock.now_nanos(),
faulted: true,
});
}
return;
}
task.pending_cycle = Some(CyclePending {
pre: self.clock.now_nanos(),
faulted: false,
});
self.submit_task_job(task);
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(unsafe_code)]
fn process_attachment(
&mut self,
attachment_id: &WaitSetAttachmentId<ipc::Service>,
) -> CallbackProgression {
let stop_l = unsafe { &*self.stop_listener_ptr };
while let Ok(Some(_)) = stop_l.try_wait_one() {}
for i in 0..self.guards.len() {
let guard = &self.guards[i];
let fired =
attachment_id.has_event_from(guard) || attachment_id.has_missed_deadline(guard);
if !fired {
continue;
}
let task_idx = self.attachment_to_task[i];
self.dispatch_task(task_idx);
}
self.barrier_and_record();
CallbackProgression::Continue
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(unsafe_code)]
fn barrier_and_record(&mut self) {
self.pool.barrier();
let task_count = unsafe { (*self.tasks_ptr).len() };
for task_idx in 0..task_count {
let pending = unsafe { (&mut *self.tasks_ptr)[task_idx].pending_cycle.take() };
if let Some(CyclePending { pre, faulted }) = pending {
self.record_cycle_for(task_idx, faulted, pre);
}
}
}
#[allow(unsafe_code)]
fn record_cycle_for(&mut self, task_idx: usize, faulted: bool, pre_ns: u64) {
let task = unsafe { &mut (&mut *self.tasks_ptr)[task_idx] };
let Some(period) = task.scan_period else {
return; };
let period_ns = u64::try_from(period.as_nanos()).unwrap_or(u64::MAX);
let took_raw = task.last_took_ns.swap(u64::MAX, Ordering::AcqRel);
let took = if faulted || took_raw == u64::MAX {
None
} else {
Some(took_raw)
};
let actual_period = task
.last_dispatch
.replace(pre_ns)
.map(|prev| pre_ns.saturating_sub(prev));
let jitter = if faulted {
None
} else {
actual_period.map(|ap| ap.abs_diff(period_ns))
};
if let Some(ap) = actual_period {
if let Some(slots) = ap.saturating_add(period_ns / 2).checked_div(period_ns) {
task.grid_slot = task.grid_slot.saturating_add(slots.max(1));
}
}
let grid_slot = task.grid_slot;
let stats = unsafe { &mut (&mut *self.cycle_stats_ptr)[task_idx] };
let lateness = if period_ns > 0 && !faulted {
let grid_epoch = *unsafe { &*self.grid_epoch_ptr }.get_or_init(|| pre_ns);
let elapsed_ns = i64::try_from(pre_ns.saturating_sub(grid_epoch)).unwrap_or(i64::MAX);
let expected_ns =
i64::try_from(u128::from(grid_slot) * u128::from(period_ns)).unwrap_or(i64::MAX);
Some(elapsed_ns.saturating_sub(expected_ns))
} else {
None
};
let cycle_index = stats.record_cycle(took, jitter, lateness);
let obs = CycleObservation {
cycle_index,
task_id: task.id.clone(),
task_index: u32::try_from(task_idx).unwrap_or(u32::MAX),
faulted,
period_ns,
pre_ns,
actual_period_ns: actual_period,
jitter_ns: jitter,
lateness_ns: lateness,
took_ns: took,
};
self.observer.on_cycle_stats(&obs);
}
#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
fn handle_fault_routing(&self, task: &mut TaskEntry) -> bool {
if !matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
return false;
}
let exec_faulted = matches!(
unsafe { &*self.exec_fault_ptr }.load(0, 0),
ExecutorFaultState::Faulted { .. }
);
let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
let task_state = task.fault.load(task_budget_ms);
let task_faulted = if exec_faulted && matches!(task_state, FaultState::Running) {
let exec_start = *unsafe { &*self.exec_start_ptr }.get_or_init(std::time::Instant::now);
let since_ms = instant_to_since_ms(std::time::Instant::now(), exec_start);
let _ = task.fault.swap(
FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
since_ms,
},
task_budget_ms,
);
true
} else {
matches!(task_state, FaultState::Faulted { .. })
};
if !(exec_faulted || task_faulted) {
return false;
}
if let Some(handler_box) = task.handler_job.as_deref_mut() {
let job_ptr: *mut (dyn FnMut() + Send) = handler_box as *mut (dyn FnMut() + Send);
unsafe {
self.pool
.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
}
}
true
}
#[deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
#[allow(unsafe_code, clippy::ref_as_ptr, clippy::borrow_as_ptr)]
fn submit_task_job(&self, task: &mut TaskEntry) {
match &mut task.kind {
TaskKind::Single(_) | TaskKind::Chain(_) => {
#[allow(clippy::expect_used)]
let job_box = task
.job
.as_deref_mut()
.expect("Single/Chain tasks carry a pre-built job");
let job_ptr: *mut (dyn FnMut() + Send) = job_box as *mut (dyn FnMut() + Send);
unsafe {
self.pool
.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
}
}
TaskKind::Graph(graph) => {
let outcome = graph.run_once_borrowed(self.pool);
if let Some(source) = outcome.error {
#[allow(clippy::unwrap_used)]
let mut g = self.iter_err.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: task.id.clone(),
source,
});
}
}
let _ = outcome.stopped_chain; }
}
}
}
#[allow(unsafe_code)]
struct SendItemPtr {
ptr: *mut dyn ExecutableItem,
}
impl SendItemPtr {
fn new(ptr: *mut dyn ExecutableItem) -> Self {
Self { ptr }
}
fn get(&self) -> *mut dyn ExecutableItem {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendItemPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendItemPtr {}
#[allow(unsafe_code)]
struct SendChainPtr {
ptr: *mut Vec<Box<dyn ExecutableItem>>,
}
impl SendChainPtr {
fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
Self { ptr }
}
fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendChainPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendChainPtr {}
struct FaultDispatchCtx {
task_budget: Option<Duration>,
task_fault: Arc<FaultAtomic>,
overrun_count: Arc<AtomicU64>,
iteration_budget: Option<Duration>,
exec_fault: Arc<ExecutorFaultAtomic>,
exec_fault_task_idx: Arc<AtomicU32>,
exec_fault_budget_ms: Arc<AtomicU32>,
task_idx_u32: u32,
exec_start: Arc<OnceLock<Instant>>,
observer: Arc<dyn Observer>,
}
fn validate_decls(id: &TaskId, decls: &[crate::trigger::TriggerDecl]) -> Result<(), ExecutorError> {
use crate::trigger::TriggerDecl;
let has_interval = decls.iter().any(|d| matches!(d, TriggerDecl::Interval(_)));
let has_listener = decls.iter().any(|d| {
matches!(
d,
TriggerDecl::Subscriber { .. }
| TriggerDecl::Deadline { .. }
| TriggerDecl::RawListener(_)
)
});
if has_interval && has_listener {
return Err(ExecutorError::DeclareTriggers(format!(
"task `{id}` declares both an interval (cyclic) and a listener \
(event-driven) trigger; a task may be cyclic (interval) or \
event-driven (listener) but not both — split it into two tasks"
)));
}
if decls
.iter()
.any(|d| matches!(d, TriggerDecl::Interval(dur) if dur.is_zero()))
{
return Err(ExecutorError::DeclareTriggers(format!(
"task `{id}` declares a zero-duration interval; a cyclic scan \
period must be strictly positive"
)));
}
Ok(())
}
fn scan_period_from_decls(decls: &[crate::trigger::TriggerDecl]) -> Option<Duration> {
decls.iter().find_map(|d| match d {
crate::trigger::TriggerDecl::Interval(dur) => Some(*dur),
_ => None,
})
}
#[allow(clippy::too_many_arguments)]
fn build_single_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
app_id: Option<u32>,
app_inst: Option<u32>,
item_ptr: SendItemPtr,
fault_ctx: FaultDispatchCtx,
last_took_ns: Arc<AtomicU64>,
clock: Arc<dyn MonotonicClock>,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = item_ptr.get();
let started = std::time::Instant::now();
let tele_t0 = clock.now_nanos();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
last_took_ns.store(clock.now_nanos().saturating_sub(tele_t0), Ordering::Release);
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
record_first_err(&err_slot, &id, res);
})
}
#[allow(clippy::too_many_arguments)]
fn build_handler_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
app_id: Option<u32>,
app_inst: Option<u32>,
mut handler: Box<dyn ExecutableItem>,
fault_ctx: FaultDispatchCtx,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
record_first_err(&err_slot, &id, res);
})
}
#[allow(clippy::too_many_arguments)]
fn build_chain_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
chain_ptr: SendChainPtr,
fault_ctx: FaultDispatchCtx,
last_took_ns: Arc<AtomicU64>,
clock: Arc<dyn MonotonicClock>,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
let chain_tele_t0 = clock.now_nanos();
#[allow(unsafe_code)]
let chain_items = unsafe { &mut *chain_ptr.get() };
for item_box in chain_items.iter_mut() {
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
match res {
Ok(crate::ControlFlow::Continue) => {}
Ok(crate::ControlFlow::StopChain) => break,
Err(_) => {
record_first_err(&err_slot, &id, res);
break;
}
}
}
last_took_ns.store(
clock.now_nanos().saturating_sub(chain_tele_t0),
Ordering::Release,
);
})
}
#[derive(Debug)]
struct PanickedTask(String);
impl core::fmt::Display for PanickedTask {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "task panicked: {}", self.0)
}
}
impl std::error::Error for PanickedTask {}
fn run_item_catch_unwind(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
|payload| {
let msg =
panic_payload_message(&*payload).unwrap_or_else(|| "panicked task".to_string());
Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
},
)
}
pub(crate) fn run_item_catch_unwind_external(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
run_item_catch_unwind(item, ctx)
}
fn record_first_err(
slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
id: &TaskId,
res: crate::ExecuteResult,
) {
if let Err(source) = res {
let mut g = slot.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: id.clone(),
source,
});
}
}
}
fn post_execute_detect_fault(
id: &TaskId,
started: Instant,
took: Duration,
fault_ctx: &FaultDispatchCtx,
) {
if let Some(budget) = fault_ctx.task_budget {
if took > budget {
fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
let took_ms = duration_to_ms_sat(took);
let budget_ms = duration_to_ms_sat(budget);
let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
let since_ms = instant_to_since_ms(started, exec_start);
let new_state = FaultState::Faulted {
reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
since_ms,
};
let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
if matches!(prev, FaultState::Running) {
fault_ctx.observer.on_task_fault(
id.clone(),
FaultReason::BudgetExceeded { took_ms, budget_ms },
);
}
}
}
if let Some(iter_budget) = fault_ctx.iteration_budget {
if took > iter_budget {
let took_ms = duration_to_ms_sat(took);
let budget_ms = duration_to_ms_sat(iter_budget);
let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
let since_ms = instant_to_since_ms(started, exec_start);
fault_ctx
.exec_fault_task_idx
.store(fault_ctx.task_idx_u32, Ordering::Release);
fault_ctx
.exec_fault_budget_ms
.store(budget_ms, Ordering::Release);
let new_state = ExecutorFaultState::Faulted {
reason: ExecutorFaultReason::IterationBudgetExceeded {
task_idx: fault_ctx.task_idx_u32,
took_ms,
budget_ms,
},
since_ms,
};
let prev = fault_ctx
.exec_fault
.swap(new_state, fault_ctx.task_idx_u32, budget_ms);
if matches!(prev, ExecutorFaultState::Running) {
fault_ctx.observer.on_executor_fault(
ExecutorFaultReason::IterationBudgetExceeded {
task_idx: fault_ctx.task_idx_u32,
took_ms,
budget_ms,
},
);
}
}
}
}
pub struct ExecutorGraphBuilder<'e> {
executor: &'e mut Executor,
builder: crate::graph::GraphBuilder,
custom_id: Option<TaskId>,
}
impl ExecutorGraphBuilder<'_> {
pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
self.builder.vertex(item)
}
pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
self.builder.edge(from, to);
self
}
pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
self.builder.root(v);
self
}
pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
self.custom_id = Some(id.into());
self
}
pub fn build(self) -> Result<TaskId, ExecutorError> {
let g = self.builder.finish()?;
let auto_id = || {
TaskId::new(format!(
"graph-{}",
self.executor.next_id.fetch_add(1, Ordering::SeqCst)
))
};
let id = g
.root_task_id()
.map(TaskId::new)
.or(self.custom_id)
.unwrap_or_else(auto_id);
let decls = g.decls.clone();
validate_decls(&id, &decls)?;
let scan_period = scan_period_from_decls(&decls);
let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
graph_box.prepare_dispatch(
id.clone(),
self.executor.stoppable.clone(),
Arc::clone(&self.executor.observer),
Arc::clone(&self.executor.monitor),
Arc::clone(&self.executor.iter_err),
);
self.executor.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Graph(graph_box),
decls,
job: None,
budget: None,
fault: Arc::new(FaultAtomic::new()),
overrun_count: Arc::new(AtomicU64::new(0)),
handler_job: None,
scan_period,
last_took_ns: Arc::new(AtomicU64::new(u64::MAX)),
last_dispatch: None,
grid_slot: 0,
pending_cycle: None,
});
self.executor
.cycle_stats
.push(TaskCycleStats::new(self.executor.stats_window));
Ok(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{ControlFlow, item};
use iceoryx2::prelude::ZeroCopySend;
#[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
#[repr(C)]
struct Msg(u32);
#[test]
fn add_returns_unique_ids() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
assert_ne!(a, b);
}
#[test]
fn grid_mode_dispatches_cyclic_task_each_cycle() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
let hits = Arc::new(AtomicU64::new(0));
let h = Arc::clone(&hits);
let mut exec = Executor::builder()
.worker_threads(0)
.dispatch_mode(crate::DispatchMode::Grid)
.build()
.expect("build");
exec.add(crate::item::item_with_triggers(
move |d| {
d.interval(std::time::Duration::from_millis(1));
Ok(())
},
move |_ctx| {
h.fetch_add(1, Ordering::Relaxed);
Ok(ControlFlow::Continue)
},
))
.expect("add");
exec.run_n(10).expect("run");
assert!(
hits.load(Ordering::Relaxed) >= 8,
"grid mode under-dispatched: {}",
hits.load(Ordering::Relaxed)
);
}
#[test]
fn legacy_mode_dispatches_cyclic_task_each_cycle() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
let hits = Arc::new(AtomicU64::new(0));
let h = Arc::clone(&hits);
let mut exec = Executor::builder()
.worker_threads(0)
.dispatch_mode(crate::DispatchMode::Legacy)
.build()
.expect("build");
exec.add(crate::item::item_with_triggers(
move |d| {
d.interval(std::time::Duration::from_millis(1));
Ok(())
},
move |_ctx| {
h.fetch_add(1, Ordering::Relaxed);
Ok(ControlFlow::Continue)
},
))
.expect("add");
exec.run_n(10).expect("run");
assert!(
hits.load(Ordering::Relaxed) >= 8,
"legacy mode under-dispatched: {}",
hits.load(Ordering::Relaxed)
);
}
#[test]
fn add_rejects_cyclic_plus_subscriber_combination() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let ch = exec.channel::<Msg>("taktora.test.req0268.combo").unwrap();
let sub = ch.subscriber().unwrap();
let err = exec
.add(crate::item::item_with_triggers(
move |d| {
d.interval(Duration::from_millis(1));
d.subscriber(&sub);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect_err("interval + subscriber must be rejected");
match err {
ExecutorError::DeclareTriggers(msg) => {
assert!(
msg.contains("cyclic") && msg.contains("event-driven"),
"message must explain cyclic vs event-driven: {msg}"
);
assert!(
msg.contains("split"),
"message must suggest splitting into two tasks: {msg}"
);
}
other => panic!("expected DeclareTriggers, got {other:?}"),
}
}
#[test]
fn add_rejects_cyclic_plus_listener_regardless_of_mode() {
use core::time::Duration;
let mut exec = Executor::builder()
.worker_threads(0)
.dispatch_mode(crate::DispatchMode::Legacy)
.build()
.unwrap();
let ch = exec
.channel::<Msg>("taktora.test.req0268.combo.legacy")
.unwrap();
let sub = ch.subscriber().unwrap();
let err = exec
.add(crate::item::item_with_triggers(
move |d| {
d.interval(Duration::from_millis(1));
d.subscriber(&sub);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect_err("interval + subscriber must be rejected in Legacy too");
assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
}
#[test]
fn add_accepts_multiple_intervals_and_single_kinds() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
exec.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
d.interval(Duration::from_millis(2));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect("multiple intervals accepted");
exec.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect("single interval accepted");
let ch = exec
.channel::<Msg>("taktora.test.req0268.multi.listener")
.unwrap();
let sub_a = ch.subscriber().unwrap();
let sub_b = ch.subscriber().unwrap();
exec.add(crate::item::item_with_triggers(
move |d| {
d.subscriber(&sub_a);
d.subscriber(&sub_b);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect("multiple listeners accepted");
}
#[test]
fn add_rejects_zero_period_interval() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::ZERO);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.expect_err("zero-period interval must be rejected");
match err {
ExecutorError::DeclareTriggers(msg) => {
assert!(
msg.contains("zero"),
"message must mention the zero period: {msg}"
);
}
other => panic!("expected DeclareTriggers, got {other:?}"),
}
}
#[test]
fn add_chain_rejects_cyclic_plus_listener() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let ch = exec
.channel::<Msg>("taktora.test.req0268.chain.combo")
.unwrap();
let sub = ch.subscriber().unwrap();
let err = exec
.add_chain(vec![crate::item::item_with_triggers(
move |d| {
d.interval(Duration::from_millis(1));
d.subscriber(&sub);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
)])
.expect_err("chain head interval + subscriber must be rejected");
assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
}
#[test]
fn add_chain_rejects_zero_period_interval() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec
.add_chain(vec![crate::item::item_with_triggers(
|d| {
d.interval(Duration::ZERO);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
)])
.expect_err("chain head zero-period interval must be rejected");
assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
}
#[test]
fn add_graph_rejects_cyclic_plus_listener() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let ch = exec
.channel::<Msg>("taktora.test.req0268.graph.combo")
.unwrap();
let sub = ch.subscriber().unwrap();
let mut g = exec.add_graph();
let r = g.vertex(crate::item::item_with_triggers(
move |d| {
d.interval(Duration::from_millis(1));
d.subscriber(&sub);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
));
g.root(r);
let err = g
.build()
.expect_err("graph root interval + subscriber must be rejected");
assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
}
#[test]
fn add_graph_rejects_zero_period_interval() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let mut g = exec.add_graph();
let r = g.vertex(crate::item::item_with_triggers(
|d| {
d.interval(Duration::ZERO);
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
));
g.root(r);
let err = g
.build()
.expect_err("graph root zero-period interval must be rejected");
assert!(matches!(err, ExecutorError::DeclareTriggers(_)));
}
#[test]
fn stopped_iteration_emits_no_cyclic_cycle_observation() {
use core::time::Duration;
use std::sync::atomic::AtomicU64;
struct JumpClock {
calls: AtomicU64,
}
impl crate::CyclicClock for JumpClock {
fn now_nanos(&self) -> u64 {
if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
0
} else {
1_000_000_000
}
}
}
struct Counter {
cycles: AtomicU64,
}
impl Observer for Counter {
fn on_cycle_stats(&self, _obs: &CycleObservation) {
self.cycles.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(Counter {
cycles: AtomicU64::new(0),
});
let mut exec = Executor::builder()
.worker_threads(0)
.dispatch_mode(crate::DispatchMode::Grid)
.cyclic_clock(Arc::new(JumpClock {
calls: AtomicU64::new(0),
}))
.observer(Arc::clone(&counter) as Arc<dyn Observer>)
.build()
.unwrap();
exec.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
exec.stoppable().stop();
exec.run().expect("run returns cleanly after stop");
assert_eq!(
counter.cycles.load(Ordering::SeqCst),
0,
"no cyclic cycle observation may be emitted on a stop wake"
);
}
#[test]
fn custom_id_is_preserved() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let id = exec
.add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
.unwrap();
assert_eq!(id.as_str(), "my-task");
}
#[test]
fn add_persists_declared_budget() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
let entry = exec
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("task present");
assert_eq!(entry.budget, Some(Duration::from_millis(5)));
}
#[test]
fn scan_period_cached_for_cyclic_only() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let cyclic = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
let event_driven = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
let cyclic_entry = exec
.tasks
.iter()
.find(|t| t.id == cyclic)
.expect("cyclic task present");
assert_eq!(cyclic_entry.scan_period, Some(Duration::from_millis(5)));
assert_eq!(cyclic_entry.last_took_ns.load(Ordering::Relaxed), u64::MAX);
let event_entry = exec
.tasks
.iter()
.find(|t| t.id == event_driven)
.expect("event-driven task present");
assert_eq!(event_entry.scan_period, None);
}
#[test]
fn cycle_stats_index_aligned_with_tasks() {
use core::time::Duration;
let mut exec = Executor::builder()
.worker_threads(0)
.stats_window(512)
.build()
.unwrap();
assert_eq!(exec.stats_window, 512);
assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
exec.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
assert_eq!(exec.tasks.len(), 2);
assert_eq!(exec.cycle_stats.len(), exec.tasks.len());
}
#[test]
fn add_with_fault_handler_stores_handler_job() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add_with_fault_handler(
crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
),
crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
)
.unwrap();
let entry = exec
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("task present");
assert!(
entry.handler_job.is_some(),
"handler_job should be Some after add_with_fault_handler"
);
assert!(entry.job.is_some(), "main job should still be present");
}
#[test]
fn declare_triggers_called_at_add_time() {
let called = Arc::new(AtomicBool::new(false));
let called_d = Arc::clone(&called);
let it = crate::item::item_with_triggers(
move |_d| {
called_d.store(true, Ordering::SeqCst);
Ok(())
},
|_| Ok(ControlFlow::Continue),
);
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
exec.add(it).unwrap();
assert!(called.load(Ordering::SeqCst));
}
#[test]
fn clear_task_fault_errors_on_running_task() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
let err = exec.clear_task_fault(task_id).expect_err("not faulted");
assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
}
#[test]
fn clear_executor_fault_errors_on_running_executor() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec.clear_executor_fault().expect_err("not faulted");
assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
}
#[test]
fn overrun_count_returns_zero_for_new_task() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
}
#[test]
fn overrun_count_errors_for_unknown_task() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec
.overrun_count(crate::TaskId::new("nope"))
.expect_err("unknown task");
assert!(matches!(err, ExecutorError::TaskNotFound(_)));
}
#[test]
fn task_fault_state_starts_running() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
}
#[test]
fn executor_fault_state_starts_running() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
}
#[test]
fn build_without_on_fatal_succeeds() {
use crate::fatal::{FatalContext, FatalSite};
use std::sync::{Arc, Mutex};
let exec = Executor::builder().worker_threads(0).build().unwrap();
let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let reached2 = Arc::clone(&reached);
let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
exec.fatal_dispatch.handler().clone(),
move |_| {
*reached2.lock().unwrap() = true;
},
);
test_dispatch.fire(&FatalContext {
cause: "test".to_string(),
site: FatalSite::PoolWorker,
});
assert!(*reached.lock().unwrap(), "terminal not reached");
}
#[test]
fn on_fatal_handler_is_stored_and_invoked() {
use crate::fatal::{FatalContext, FatalSite};
use std::sync::{Arc, Mutex};
let called: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let called2 = Arc::clone(&called);
let exec = Executor::builder()
.worker_threads(0)
.on_fatal(move |ctx| {
called2.lock().unwrap().push(ctx.cause.clone());
})
.build()
.unwrap();
let reached: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let reached2 = Arc::clone(&reached);
let test_dispatch = crate::fatal::FatalDispatch::with_terminal(
exec.fatal_dispatch.handler().clone(),
move |_| {
*reached2.lock().unwrap() = true;
},
);
test_dispatch.fire(&FatalContext {
cause: "my-cause".to_string(),
site: FatalSite::ExecutorRunLoop,
});
assert!(*reached.lock().unwrap(), "terminal not reached");
let log = called.lock().unwrap().clone();
assert_eq!(
log,
vec!["my-cause"],
"handler should have been called with cause"
);
}
}