#![allow(dead_code)]
#![allow(clippy::redundant_pub_crate)]
use crate::Channel;
use crate::context::Stoppable;
use crate::error::ExecutorError;
use crate::fault::{
ExecutorFaultAtomic, ExecutorFaultReason, ExecutorFaultState, FaultAtomic, FaultReason,
FaultState, duration_to_ms_sat, instant_to_since_ms,
};
use crate::item::ExecutableItem;
use crate::monitor::{ExecutionMonitor, NoopMonitor};
use crate::observer::{NoopObserver, Observer};
use crate::payload::Payload;
use crate::pool::Pool;
use crate::task_id::TaskId;
use crate::task_kind::TaskKind;
use crate::thread_attrs::ThreadAttributes;
use crate::trigger::{TriggerDecl, TriggerDeclarer};
use core::sync::atomic::AtomicU32;
use iceoryx2::node::Node;
use iceoryx2::port::listener::Listener as IxListener;
use iceoryx2::prelude::ipc;
use iceoryx2::prelude::*;
use iceoryx2::waitset::WaitSetRunResult;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) struct TaskEntry {
pub(crate) id: TaskId,
pub(crate) kind: TaskKind,
pub(crate) decls: Vec<TriggerDecl>,
pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
pub(crate) budget: Option<Duration>,
pub(crate) fault: Arc<FaultAtomic>,
pub(crate) overrun_count: Arc<AtomicU64>,
pub(crate) handler_job: Option<Box<dyn FnMut() + Send + 'static>>,
}
pub struct Executor {
pub(crate) node: Node<ipc::Service>,
pub(crate) pool: Arc<Pool>,
pub(crate) tasks: Vec<TaskEntry>,
pub(crate) running: Arc<AtomicBool>,
pub(crate) stoppable: Stoppable,
pub(crate) next_id: AtomicU64,
pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
pub(crate) observer: Arc<dyn Observer>,
pub(crate) monitor: Arc<dyn ExecutionMonitor>,
pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
pub(crate) iteration_budget: Option<Duration>,
pub(crate) exec_fault: Arc<ExecutorFaultAtomic>,
pub(crate) exec_fault_task_idx: Arc<AtomicU32>,
pub(crate) exec_fault_budget_ms: Arc<AtomicU32>,
pub(crate) start_time: Arc<OnceLock<Instant>>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Executor {}
impl Executor {
#[must_use]
pub fn builder() -> ExecutorBuilder {
ExecutorBuilder::default()
}
pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
Channel::open_or_create(&self.node, name)
}
pub fn service<Req, Resp>(
&mut self,
name: &str,
) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
where
Req: Payload,
Resp: Payload,
{
crate::Service::open_or_create(&self.node, name)
}
pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
let id = TaskId::new(format!(
"task-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
self.add_with_id(id, item)
}
pub fn add_with_id(
&mut self,
id: impl Into<TaskId>,
mut item: impl ExecutableItem,
) -> Result<TaskId, ExecutorError> {
let id_arg: TaskId = id.into();
let id = item.task_id().map_or(id_arg, TaskId::new);
let mut declarer = TriggerDeclarer::new_internal();
item.declare_triggers(&mut declarer)?;
let budget = declarer.budget;
let decls = declarer.into_decls();
let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
#[allow(unsafe_code)]
let item_ptr =
SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
let task_fault = Arc::new(FaultAtomic::new());
let overrun_count = Arc::new(AtomicU64::new(0));
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = self.tasks.len() as u32;
let fault_ctx = FaultDispatchCtx {
task_budget: budget,
task_fault: Arc::clone(&task_fault),
overrun_count: Arc::clone(&overrun_count),
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let job = build_single_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
app_id,
app_inst,
item_ptr,
fault_ctx,
);
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Single(item_box),
decls,
job: Some(job),
budget,
fault: task_fault,
overrun_count,
handler_job: None,
});
Ok(id)
}
pub fn add_with_fault_handler<I, H>(
&mut self,
main: I,
handler: H,
) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
H: ExecutableItem,
{
let task_id = self.add(main)?;
let mut handler_box: Box<dyn ExecutableItem> = Box::new(handler);
let mut throwaway = TriggerDeclarer::new_internal();
handler_box.declare_triggers(&mut throwaway)?;
drop(throwaway);
let app_id = handler_box.app_id();
let app_inst = handler_box.app_instance_id();
let task_idx = self
.tasks
.iter()
.position(|t| t.id == task_id)
.expect("just added; must exist");
let task = &self.tasks[task_idx];
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = task_idx as u32;
let handler_fault_ctx = FaultDispatchCtx {
task_budget: task.budget,
task_fault: Arc::clone(&task.fault),
overrun_count: Arc::clone(&task.overrun_count),
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let handler_closure = build_handler_job(
task_id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
app_id,
app_inst,
handler_box,
handler_fault_ctx,
);
self.tasks[task_idx].handler_job = Some(handler_closure);
Ok(task_id)
}
pub fn clear_task_fault(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
let entry = self
.tasks
.iter()
.find(|t| t.id == task)
.ok_or_else(|| ExecutorError::TaskNotFound(task.clone()))?;
let budget_ms = entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
let prev = entry.fault.swap(FaultState::Running, budget_ms);
match prev {
FaultState::Running => Err(ExecutorError::TaskNotFaulted(task)),
FaultState::Faulted { .. } => {
self.observer.on_task_clear(task);
Ok(prev)
}
}
}
pub fn clear_executor_fault(&self) -> Result<ExecutorFaultState, ExecutorError> {
let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
let prev = self
.exec_fault
.swap(ExecutorFaultState::Running, task_idx, budget_ms);
match prev {
ExecutorFaultState::Running => Err(ExecutorError::ExecutorNotFaulted),
ExecutorFaultState::Faulted { .. } => {
for entry in &self.tasks {
let task_budget_ms =
entry.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
if let FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
..
} = entry.fault.load(task_budget_ms)
{
let _ = entry.fault.swap(FaultState::Running, task_budget_ms);
self.observer.on_task_clear(entry.id.clone());
}
}
self.observer.on_executor_clear();
Ok(prev)
}
}
}
pub fn overrun_count(&self, task: TaskId) -> Result<u64, ExecutorError> {
self.tasks
.iter()
.find(|t| t.id == task)
.map(|t| t.overrun_count.load(Ordering::Acquire))
.ok_or_else(|| ExecutorError::TaskNotFound(task))
}
pub fn task_fault_state(&self, task: TaskId) -> Result<FaultState, ExecutorError> {
self.tasks
.iter()
.find(|t| t.id == task)
.map(|t| {
let budget_ms = t.budget.map_or(0_u32, crate::fault::duration_to_ms_sat);
t.fault.load(budget_ms)
})
.ok_or_else(|| ExecutorError::TaskNotFound(task))
}
#[must_use]
pub fn executor_fault_state(&self) -> ExecutorFaultState {
let task_idx = self.exec_fault_task_idx.load(Ordering::Acquire);
let budget_ms = self.exec_fault_budget_ms.load(Ordering::Acquire);
self.exec_fault.load(task_idx, budget_ms)
}
pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let id = TaskId::new(format!(
"chain-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id, boxed)
}
pub fn add_chain_with_id<I, C>(
&mut self,
id: impl Into<TaskId>,
items: C,
) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id.into(), boxed)
}
fn add_chain_with_id_boxed(
&mut self,
id: TaskId,
mut items: Vec<Box<dyn ExecutableItem>>,
) -> Result<TaskId, ExecutorError> {
if items.is_empty() {
return Err(ExecutorError::Builder(
"chain must contain at least one item".into(),
));
}
let id = items[0].task_id().map_or(id, TaskId::new);
let mut head_declarer = TriggerDeclarer::new_internal();
items[0].declare_triggers(&mut head_declarer)?;
let decls = head_declarer.into_decls();
for (i, body) in items.iter_mut().enumerate().skip(1) {
let mut spurious = TriggerDeclarer::new_internal();
let _ = body.declare_triggers(&mut spurious);
if !spurious.is_empty() {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "taktora-executor",
task = %id,
position = i,
"non-head chain item declared triggers; they will be ignored"
);
#[cfg(not(feature = "tracing"))]
{
let _ = i;
}
}
}
let mut items = items;
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
&mut items,
));
let _ = chain_ptr;
let task_fault = Arc::new(FaultAtomic::new());
let overrun_count = Arc::new(AtomicU64::new(0));
#[allow(clippy::cast_possible_truncation)]
let task_idx_u32 = self.tasks.len() as u32;
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Chain(items),
decls,
job: None, budget: None,
fault: Arc::clone(&task_fault),
overrun_count: Arc::clone(&overrun_count),
handler_job: None,
});
let task_idx = self.tasks.len() - 1;
let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
{
TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
_ => unreachable!("just-pushed task is TaskKind::Chain"),
};
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(chain_vec_ptr);
let fault_ctx = FaultDispatchCtx {
task_budget: None, task_fault,
overrun_count,
iteration_budget: self.iteration_budget,
exec_fault: Arc::clone(&self.exec_fault),
exec_fault_task_idx: Arc::clone(&self.exec_fault_task_idx),
exec_fault_budget_ms: Arc::clone(&self.exec_fault_budget_ms),
task_idx_u32,
exec_start: Arc::clone(&self.start_time),
observer: Arc::clone(&self.observer),
};
let job = build_chain_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
chain_ptr,
fault_ctx,
);
self.tasks[task_idx].job = Some(job);
Ok(id)
}
#[must_use]
pub fn stoppable(&self) -> Stoppable {
self.stoppable.clone()
}
pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
&self.node
}
pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
ExecutorGraphBuilder {
executor: self,
builder: crate::graph::GraphBuilder::new(),
custom_id: None,
}
}
}
pub struct ExecutorBuilder {
worker_threads: Option<usize>,
observer: Option<Arc<dyn Observer>>,
monitor: Option<Arc<dyn ExecutionMonitor>>,
worker_attrs: ThreadAttributes,
iteration_budget: Option<Duration>,
}
impl Default for ExecutorBuilder {
fn default() -> Self {
Self {
worker_threads: None,
observer: None,
monitor: None,
worker_attrs: ThreadAttributes::new(),
iteration_budget: None,
}
}
}
impl ExecutorBuilder {
#[must_use]
pub const fn worker_threads(mut self, n: usize) -> Self {
self.worker_threads = Some(n);
self
}
#[must_use]
pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
self.observer = Some(obs);
self
}
#[must_use]
pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
self.monitor = Some(mon);
self
}
#[must_use]
pub const fn iteration_budget(mut self, dur: Duration) -> Self {
self.iteration_budget = Some(dur);
self
}
#[must_use]
#[allow(clippy::missing_const_for_fn)]
pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
self.worker_attrs = attrs;
self
}
#[allow(clippy::arc_with_non_send_sync)] #[track_caller]
pub fn build(self) -> Result<Executor, ExecutorError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(ExecutorError::iceoryx2)?;
let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
let stop_topic = format!(
"taktora.exec.stop.{}.{exec_seq}.__taktora_event",
std::process::id()
);
let stop_event = node
.service_builder(&stop_topic.as_str().try_into().unwrap())
.event()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
let stop_notifier = Arc::new(
stop_event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stop_listener = Arc::new(
stop_event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stoppable = Stoppable::with_waker(stop_notifier);
let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
let monitor: Arc<dyn ExecutionMonitor> =
self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
let exec = Executor {
node,
pool,
tasks: Vec::new(),
running: Arc::new(AtomicBool::new(false)),
stoppable,
next_id: AtomicU64::new(0),
stop_listener,
observer,
monitor,
iter_err: Arc::new(std::sync::Mutex::new(None)),
iteration_budget: self.iteration_budget,
exec_fault: Arc::new(ExecutorFaultAtomic::new()),
exec_fault_task_idx: Arc::new(AtomicU32::new(0)),
exec_fault_budget_ms: Arc::new(AtomicU32::new(0)),
start_time: Arc::new(OnceLock::new()),
};
Ok(exec)
}
}
impl Executor {
pub fn run(&mut self) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Forever)
}
pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Until(Instant::now() + max))
}
pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Iterations(n))
}
pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Predicate(&mut predicate))
}
}
enum RunMode<'a> {
Forever,
Until(Instant),
Iterations(usize),
Predicate(&'a mut dyn FnMut() -> bool),
}
impl Executor {
fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(ExecutorError::AlreadyRunning);
}
self.observer.on_executor_up();
let result = self.dispatch_loop(&mut mode);
match &result {
Ok(()) => self.observer.on_executor_down(),
Err(e) => self.observer.on_executor_error(e),
}
self.running.store(false, Ordering::SeqCst);
result
}
#[allow(
unsafe_code,
clippy::too_many_lines,
clippy::ref_as_ptr,
clippy::borrow_as_ptr
)]
fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
.create()
.map_err(ExecutorError::iceoryx2)?;
let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
let mut attachment_to_task: Vec<usize> = Vec::new();
for (task_idx, task) in self.tasks.iter().enumerate() {
for decl in &task.decls {
match decl {
TriggerDecl::Subscriber { listener } => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_notification(l_ref)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::Interval(d) => {
let guard = waitset
.attach_interval(*d)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::Deadline { listener, deadline } => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_deadline(l_ref, *deadline)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::RawListener(listener) => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_notification(l_ref)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
}
}
}
let stop_listener_ref: &IxListener<ipc::Service> =
unsafe { &*(self.stop_listener.as_ref() as *const _) };
let _stop_guard = waitset
.attach_notification(stop_listener_ref)
.map_err(ExecutorError::iceoryx2)?;
let iterations_done = AtomicUsize::new(0);
let stop_flag = self.stoppable.clone();
loop {
*self.iter_err.lock().unwrap() = None;
let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
let pool = &self.pool;
let iter_err_inner = Arc::clone(&self.iter_err);
let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
let exec_fault_ptr = &*self.exec_fault as *const ExecutorFaultAtomic;
let exec_start_ptr = &*self.start_time as *const OnceLock<Instant>;
let cb_result = waitset.wait_and_process_once(
|attachment_id: WaitSetAttachmentId<ipc::Service>| {
let stop_l = unsafe { &*stop_listener_ptr };
while let Ok(Some(_)) = stop_l.try_wait_one() {}
for (i, guard) in guards.iter().enumerate() {
let fired = attachment_id.has_event_from(guard)
|| attachment_id.has_missed_deadline(guard);
if !fired {
continue;
}
let task_idx = attachment_to_task[i];
let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
if matches!(task.kind, TaskKind::Single(_) | TaskKind::Chain(_)) {
let exec_faulted = matches!(
unsafe { &*exec_fault_ptr }.load(0, 0),
ExecutorFaultState::Faulted { .. }
);
let task_budget_ms = task.budget.map_or(0_u32, duration_to_ms_sat);
let task_state = task.fault.load(task_budget_ms);
let task_faulted =
if exec_faulted && matches!(task_state, FaultState::Running) {
let exec_start = *unsafe { &*exec_start_ptr }
.get_or_init(std::time::Instant::now);
let since_ms =
instant_to_since_ms(std::time::Instant::now(), exec_start);
let _ = task.fault.swap(
FaultState::Faulted {
reason: FaultReason::ExecutorFaulted,
since_ms,
},
task_budget_ms,
);
true
} else {
matches!(task_state, FaultState::Faulted { .. })
};
let route_to_handler = exec_faulted || task_faulted;
if route_to_handler {
if let Some(handler_box) = task.handler_job.as_deref_mut() {
let job_ptr: *mut (dyn FnMut() + Send) =
handler_box as *mut (dyn FnMut() + Send);
#[allow(unsafe_code)]
unsafe {
pool.submit_borrowed(crate::pool::BorrowedJob::new(
job_ptr,
));
}
}
continue;
}
}
match &mut task.kind {
TaskKind::Single(_) | TaskKind::Chain(_) => {
let job_box = task
.job
.as_deref_mut()
.expect("Single/Chain tasks carry a pre-built job");
let job_ptr: *mut (dyn FnMut() + Send) =
job_box as *mut (dyn FnMut() + Send);
#[allow(unsafe_code)]
unsafe {
pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
}
}
TaskKind::Graph(graph) => {
let outcome = graph.run_once_borrowed(pool);
if let Some(source) = outcome.error {
let mut g = iter_err_inner.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: task.id.clone(),
source,
});
}
}
let _ = outcome.stopped_chain; }
}
}
pool.barrier();
CallbackProgression::Continue
},
);
let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
if matches!(
cb_result,
WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
) {
return Ok(());
}
let maybe_err = self.iter_err.lock().unwrap().take();
if let Some(err) = maybe_err {
return Err(err);
}
if stop_flag.is_stopped() {
return Ok(());
}
iterations_done.fetch_add(1, Ordering::SeqCst);
match mode {
RunMode::Forever => {}
RunMode::Iterations(n) => {
if iterations_done.load(Ordering::SeqCst) >= *n {
return Ok(());
}
}
RunMode::Until(deadline) => {
if Instant::now() >= *deadline {
return Ok(());
}
}
RunMode::Predicate(p) => {
if (p)() {
return Ok(());
}
}
}
}
}
}
#[allow(unsafe_code)]
struct SendItemPtr {
ptr: *mut dyn ExecutableItem,
}
impl SendItemPtr {
fn new(ptr: *mut dyn ExecutableItem) -> Self {
Self { ptr }
}
fn get(&self) -> *mut dyn ExecutableItem {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendItemPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendItemPtr {}
#[allow(unsafe_code)]
struct SendChainPtr {
ptr: *mut Vec<Box<dyn ExecutableItem>>,
}
impl SendChainPtr {
fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
Self { ptr }
}
fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendChainPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendChainPtr {}
struct FaultDispatchCtx {
task_budget: Option<Duration>,
task_fault: Arc<FaultAtomic>,
overrun_count: Arc<AtomicU64>,
iteration_budget: Option<Duration>,
exec_fault: Arc<ExecutorFaultAtomic>,
exec_fault_task_idx: Arc<AtomicU32>,
exec_fault_budget_ms: Arc<AtomicU32>,
task_idx_u32: u32,
exec_start: Arc<OnceLock<Instant>>,
observer: Arc<dyn Observer>,
}
#[allow(clippy::too_many_arguments)]
fn build_single_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
app_id: Option<u32>,
app_inst: Option<u32>,
item_ptr: SendItemPtr,
fault_ctx: FaultDispatchCtx,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = item_ptr.get();
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
record_first_err(&err_slot, &id, res);
})
}
#[allow(clippy::too_many_arguments)]
fn build_handler_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
app_id: Option<u32>,
app_inst: Option<u32>,
mut handler: Box<dyn ExecutableItem>,
fault_ctx: FaultDispatchCtx,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
let res = run_item_catch_unwind(handler.as_mut(), &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
record_first_err(&err_slot, &id, res);
})
}
#[allow(clippy::too_many_arguments)]
fn build_chain_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
chain_ptr: SendChainPtr,
fault_ctx: FaultDispatchCtx,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
#[allow(unsafe_code)]
let chain_items = unsafe { &mut *chain_ptr.get() };
for item_box in chain_items.iter_mut() {
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
post_execute_detect_fault(&id, started, took, &fault_ctx);
match res {
Ok(crate::ControlFlow::Continue) => {}
Ok(crate::ControlFlow::StopChain) => break,
Err(_) => {
record_first_err(&err_slot, &id, res);
break;
}
}
}
})
}
#[derive(Debug)]
struct PanickedTask(String);
impl core::fmt::Display for PanickedTask {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "task panicked: {}", self.0)
}
}
impl std::error::Error for PanickedTask {}
#[allow(clippy::option_if_let_else)]
fn run_item_catch_unwind(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
|payload| {
let msg = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"panicked task".to_string()
};
Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
},
)
}
pub(crate) fn run_item_catch_unwind_external(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
run_item_catch_unwind(item, ctx)
}
fn record_first_err(
slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
id: &TaskId,
res: crate::ExecuteResult,
) {
if let Err(source) = res {
let mut g = slot.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: id.clone(),
source,
});
}
}
}
fn post_execute_detect_fault(
id: &TaskId,
started: Instant,
took: Duration,
fault_ctx: &FaultDispatchCtx,
) {
if let Some(budget) = fault_ctx.task_budget {
if took > budget {
fault_ctx.overrun_count.fetch_add(1, Ordering::Relaxed);
let took_ms = duration_to_ms_sat(took);
let budget_ms = duration_to_ms_sat(budget);
let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
let since_ms = instant_to_since_ms(started, exec_start);
let new_state = FaultState::Faulted {
reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
since_ms,
};
let prev = fault_ctx.task_fault.swap(new_state, budget_ms);
if matches!(prev, FaultState::Running) {
fault_ctx.observer.on_task_fault(
id.clone(),
FaultReason::BudgetExceeded { took_ms, budget_ms },
);
}
}
}
if let Some(iter_budget) = fault_ctx.iteration_budget {
if took > iter_budget {
let took_ms = duration_to_ms_sat(took);
let budget_ms = duration_to_ms_sat(iter_budget);
let exec_start = *fault_ctx.exec_start.get_or_init(|| started);
let since_ms = instant_to_since_ms(started, exec_start);
fault_ctx
.exec_fault_task_idx
.store(fault_ctx.task_idx_u32, Ordering::Release);
fault_ctx
.exec_fault_budget_ms
.store(budget_ms, Ordering::Release);
let new_state = ExecutorFaultState::Faulted {
reason: ExecutorFaultReason::IterationBudgetExceeded {
task_idx: fault_ctx.task_idx_u32,
took_ms,
budget_ms,
},
since_ms,
};
let prev = fault_ctx
.exec_fault
.swap(new_state, fault_ctx.task_idx_u32, budget_ms);
if matches!(prev, ExecutorFaultState::Running) {
fault_ctx.observer.on_executor_fault(
ExecutorFaultReason::IterationBudgetExceeded {
task_idx: fault_ctx.task_idx_u32,
took_ms,
budget_ms,
},
);
}
}
}
}
pub struct ExecutorGraphBuilder<'e> {
executor: &'e mut Executor,
builder: crate::graph::GraphBuilder,
custom_id: Option<TaskId>,
}
impl ExecutorGraphBuilder<'_> {
pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
self.builder.vertex(item)
}
pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
self.builder.edge(from, to);
self
}
pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
self.builder.root(v);
self
}
pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
self.custom_id = Some(id.into());
self
}
pub fn build(self) -> Result<TaskId, ExecutorError> {
let g = self.builder.finish()?;
let auto_id = || {
TaskId::new(format!(
"graph-{}",
self.executor.next_id.fetch_add(1, Ordering::SeqCst)
))
};
let id = g
.root_task_id()
.map(TaskId::new)
.or(self.custom_id)
.unwrap_or_else(auto_id);
let decls = g.decls.clone();
let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
graph_box.prepare_dispatch(
id.clone(),
self.executor.stoppable.clone(),
Arc::clone(&self.executor.observer),
Arc::clone(&self.executor.monitor),
Arc::clone(&self.executor.iter_err),
);
self.executor.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Graph(graph_box),
decls,
job: None,
budget: None,
fault: Arc::new(FaultAtomic::new()),
overrun_count: Arc::new(AtomicU64::new(0)),
handler_job: None,
});
Ok(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{ControlFlow, item};
#[test]
fn add_returns_unique_ids() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
assert_ne!(a, b);
}
#[test]
fn custom_id_is_preserved() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let id = exec
.add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
.unwrap();
assert_eq!(id.as_str(), "my-task");
}
#[test]
fn add_persists_declared_budget() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
let entry = exec
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("task present");
assert_eq!(entry.budget, Some(Duration::from_millis(5)));
}
#[test]
fn add_with_fault_handler_stores_handler_job() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add_with_fault_handler(
crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
),
crate::item::item_with_triggers(|_d| Ok(()), |_| Ok(crate::ControlFlow::Continue)),
)
.unwrap();
let entry = exec
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("task present");
assert!(
entry.handler_job.is_some(),
"handler_job should be Some after add_with_fault_handler"
);
assert!(entry.job.is_some(), "main job should still be present");
}
#[test]
fn declare_triggers_called_at_add_time() {
let called = Arc::new(AtomicBool::new(false));
let called_d = Arc::clone(&called);
let it = crate::item::item_with_triggers(
move |_d| {
called_d.store(true, Ordering::SeqCst);
Ok(())
},
|_| Ok(ControlFlow::Continue),
);
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
exec.add(it).unwrap();
assert!(called.load(Ordering::SeqCst));
}
#[test]
fn clear_task_fault_errors_on_running_task() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
let err = exec.clear_task_fault(task_id).expect_err("not faulted");
assert!(matches!(err, ExecutorError::TaskNotFaulted(_)));
}
#[test]
fn clear_executor_fault_errors_on_running_executor() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec.clear_executor_fault().expect_err("not faulted");
assert!(matches!(err, ExecutorError::ExecutorNotFaulted));
}
#[test]
fn overrun_count_returns_zero_for_new_task() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
d.budget(Duration::from_millis(5));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
assert_eq!(exec.overrun_count(task_id).unwrap(), 0);
}
#[test]
fn overrun_count_errors_for_unknown_task() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
let err = exec
.overrun_count(crate::TaskId::new("nope"))
.expect_err("unknown task");
assert!(matches!(err, ExecutorError::TaskNotFound(_)));
}
#[test]
fn task_fault_state_starts_running() {
use core::time::Duration;
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let task_id = exec
.add(crate::item::item_with_triggers(
|d| {
d.interval(Duration::from_millis(10));
Ok(())
},
|_| Ok(crate::ControlFlow::Continue),
))
.unwrap();
assert_eq!(exec.task_fault_state(task_id).unwrap(), FaultState::Running);
}
#[test]
fn executor_fault_state_starts_running() {
let exec = Executor::builder().worker_threads(0).build().unwrap();
assert_eq!(exec.executor_fault_state(), ExecutorFaultState::Running);
}
}