#![allow(dead_code)]
#![allow(clippy::redundant_pub_crate)]
use crate::Channel;
use crate::context::Stoppable;
use crate::error::ExecutorError;
use crate::item::ExecutableItem;
use crate::monitor::{ExecutionMonitor, NoopMonitor};
use crate::observer::{NoopObserver, Observer};
use crate::payload::Payload;
use crate::pool::Pool;
use crate::task_id::TaskId;
use crate::task_kind::TaskKind;
use crate::thread_attrs::ThreadAttributes;
use crate::trigger::{TriggerDecl, TriggerDeclarer};
use iceoryx2::node::Node;
use iceoryx2::port::listener::Listener as IxListener;
use iceoryx2::prelude::ipc;
use iceoryx2::prelude::*;
use iceoryx2::waitset::WaitSetRunResult;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
static EXEC_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) struct TaskEntry {
pub(crate) id: TaskId,
pub(crate) kind: TaskKind,
pub(crate) decls: Vec<TriggerDecl>,
pub(crate) job: Option<Box<dyn FnMut() + Send + 'static>>,
}
pub struct Executor {
pub(crate) node: Node<ipc::Service>,
pub(crate) pool: Arc<Pool>,
pub(crate) tasks: Vec<TaskEntry>,
pub(crate) running: Arc<AtomicBool>,
pub(crate) stoppable: Stoppable,
pub(crate) next_id: AtomicU64,
pub(crate) stop_listener: Arc<IxListener<ipc::Service>>,
pub(crate) observer: Arc<dyn Observer>,
pub(crate) monitor: Arc<dyn ExecutionMonitor>,
pub(crate) iter_err: Arc<std::sync::Mutex<Option<ExecutorError>>>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Executor {}
impl Executor {
#[must_use]
pub fn builder() -> ExecutorBuilder {
ExecutorBuilder::default()
}
pub fn channel<T: Payload>(&mut self, name: &str) -> Result<Arc<Channel<T>>, ExecutorError> {
Channel::open_or_create(&self.node, name)
}
pub fn service<Req, Resp>(
&mut self,
name: &str,
) -> Result<Arc<crate::Service<Req, Resp>>, ExecutorError>
where
Req: Payload,
Resp: Payload,
{
crate::Service::open_or_create(&self.node, name)
}
pub fn add(&mut self, item: impl ExecutableItem) -> Result<TaskId, ExecutorError> {
let id = TaskId::new(format!(
"task-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
self.add_with_id(id, item)
}
pub fn add_with_id(
&mut self,
id: impl Into<TaskId>,
mut item: impl ExecutableItem,
) -> Result<TaskId, ExecutorError> {
let id_arg: TaskId = id.into();
let id = item.task_id().map_or(id_arg, TaskId::new);
let mut declarer = TriggerDeclarer::new_internal();
item.declare_triggers(&mut declarer)?;
let decls = declarer.into_decls();
let mut item_box: Box<dyn ExecutableItem> = Box::new(item);
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
#[allow(unsafe_code)]
let item_ptr =
SendItemPtr::new(std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut()));
let job = build_single_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
app_id,
app_inst,
item_ptr,
);
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Single(item_box),
decls,
job: Some(job),
});
Ok(id)
}
pub fn add_chain<I, C>(&mut self, items: C) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let id = TaskId::new(format!(
"chain-{}",
self.next_id.fetch_add(1, Ordering::SeqCst)
));
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id, boxed)
}
pub fn add_chain_with_id<I, C>(
&mut self,
id: impl Into<TaskId>,
items: C,
) -> Result<TaskId, ExecutorError>
where
I: ExecutableItem,
C: IntoIterator<Item = I>,
{
let boxed: Vec<Box<dyn ExecutableItem>> = items
.into_iter()
.map(|i| Box::new(i) as Box<dyn ExecutableItem>)
.collect();
self.add_chain_with_id_boxed(id.into(), boxed)
}
fn add_chain_with_id_boxed(
&mut self,
id: TaskId,
mut items: Vec<Box<dyn ExecutableItem>>,
) -> Result<TaskId, ExecutorError> {
if items.is_empty() {
return Err(ExecutorError::Builder(
"chain must contain at least one item".into(),
));
}
let id = items[0].task_id().map_or(id, TaskId::new);
let mut head_declarer = TriggerDeclarer::new_internal();
items[0].declare_triggers(&mut head_declarer)?;
let decls = head_declarer.into_decls();
for (i, body) in items.iter_mut().enumerate().skip(1) {
let mut spurious = TriggerDeclarer::new_internal();
let _ = body.declare_triggers(&mut spurious);
if !spurious.is_empty() {
#[cfg(feature = "tracing")]
tracing::warn!(
target: "taktora-executor",
task = %id,
position = i,
"non-head chain item declared triggers; they will be ignored"
);
#[cfg(not(feature = "tracing"))]
{
let _ = i;
}
}
}
let mut items = items;
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(
&mut items,
));
let _ = chain_ptr;
self.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Chain(items),
decls,
job: None, });
let task_idx = self.tasks.len() - 1;
let chain_vec_ptr: *mut Vec<Box<dyn ExecutableItem>> = match &mut self.tasks[task_idx].kind
{
TaskKind::Chain(v) => std::ptr::from_mut::<Vec<Box<dyn ExecutableItem>>>(v),
_ => unreachable!("just-pushed task is TaskKind::Chain"),
};
#[allow(unsafe_code)]
let chain_ptr = SendChainPtr::new(chain_vec_ptr);
let job = build_chain_job(
id.clone(),
self.stoppable.clone(),
Arc::clone(&self.observer),
Arc::clone(&self.monitor),
Arc::clone(&self.iter_err),
chain_ptr,
);
self.tasks[task_idx].job = Some(job);
Ok(id)
}
#[must_use]
pub fn stoppable(&self) -> Stoppable {
self.stoppable.clone()
}
pub const fn iceoryx_node(&self) -> &Node<ipc::Service> {
&self.node
}
pub fn add_graph(&mut self) -> ExecutorGraphBuilder<'_> {
ExecutorGraphBuilder {
executor: self,
builder: crate::graph::GraphBuilder::new(),
custom_id: None,
}
}
}
pub struct ExecutorBuilder {
worker_threads: Option<usize>,
observer: Option<Arc<dyn Observer>>,
monitor: Option<Arc<dyn ExecutionMonitor>>,
worker_attrs: ThreadAttributes,
}
impl Default for ExecutorBuilder {
fn default() -> Self {
Self {
worker_threads: None,
observer: None,
monitor: None,
worker_attrs: ThreadAttributes::new(),
}
}
}
impl ExecutorBuilder {
#[must_use]
pub const fn worker_threads(mut self, n: usize) -> Self {
self.worker_threads = Some(n);
self
}
#[must_use]
pub fn observer(mut self, obs: Arc<dyn Observer>) -> Self {
self.observer = Some(obs);
self
}
#[must_use]
pub fn monitor(mut self, mon: Arc<dyn ExecutionMonitor>) -> Self {
self.monitor = Some(mon);
self
}
#[must_use]
#[allow(clippy::missing_const_for_fn)]
pub fn worker_attrs(mut self, attrs: ThreadAttributes) -> Self {
self.worker_attrs = attrs;
self
}
#[allow(clippy::arc_with_non_send_sync)] #[track_caller]
pub fn build(self) -> Result<Executor, ExecutorError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(ExecutorError::iceoryx2)?;
let n_workers = self.worker_threads.unwrap_or_else(num_cpus::get_physical);
let pool = Arc::new(Pool::new(n_workers, self.worker_attrs)?);
let exec_seq = EXEC_COUNTER.fetch_add(1, Ordering::Relaxed);
let stop_topic = format!(
"taktora.exec.stop.{}.{exec_seq}.__taktora_event",
std::process::id()
);
let stop_event = node
.service_builder(&stop_topic.as_str().try_into().unwrap())
.event()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
let stop_notifier = Arc::new(
stop_event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stop_listener = Arc::new(
stop_event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?,
);
let stoppable = Stoppable::with_waker(stop_notifier);
let observer: Arc<dyn Observer> = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
let monitor: Arc<dyn ExecutionMonitor> =
self.monitor.unwrap_or_else(|| Arc::new(NoopMonitor));
let exec = Executor {
node,
pool,
tasks: Vec::new(),
running: Arc::new(AtomicBool::new(false)),
stoppable,
next_id: AtomicU64::new(0),
stop_listener,
observer,
monitor,
iter_err: Arc::new(std::sync::Mutex::new(None)),
};
Ok(exec)
}
}
impl Executor {
pub fn run(&mut self) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Forever)
}
pub fn run_for(&mut self, max: Duration) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Until(Instant::now() + max))
}
pub fn run_n(&mut self, n: usize) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Iterations(n))
}
pub fn run_until<F: FnMut() -> bool>(&mut self, mut predicate: F) -> Result<(), ExecutorError> {
self.run_inner(RunMode::Predicate(&mut predicate))
}
}
enum RunMode<'a> {
Forever,
Until(Instant),
Iterations(usize),
Predicate(&'a mut dyn FnMut() -> bool),
}
impl Executor {
fn run_inner(&mut self, mut mode: RunMode<'_>) -> Result<(), ExecutorError> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(ExecutorError::AlreadyRunning);
}
self.observer.on_executor_up();
let result = self.dispatch_loop(&mut mode);
match &result {
Ok(()) => self.observer.on_executor_down(),
Err(e) => self.observer.on_executor_error(e),
}
self.running.store(false, Ordering::SeqCst);
result
}
#[allow(
unsafe_code,
clippy::too_many_lines,
clippy::ref_as_ptr,
clippy::borrow_as_ptr
)]
fn dispatch_loop(&mut self, mode: &mut RunMode<'_>) -> Result<(), ExecutorError> {
let waitset: WaitSet<ipc::Service> = WaitSetBuilder::new()
.create()
.map_err(ExecutorError::iceoryx2)?;
let mut listener_storage: Vec<Arc<crate::trigger::RawListener>> = Vec::new();
let mut guards: Vec<WaitSetGuard<'_, '_, ipc::Service>> = Vec::new();
let mut attachment_to_task: Vec<usize> = Vec::new();
for (task_idx, task) in self.tasks.iter().enumerate() {
for decl in &task.decls {
match decl {
TriggerDecl::Subscriber { listener } => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_notification(l_ref)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::Interval(d) => {
let guard = waitset
.attach_interval(*d)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::Deadline { listener, deadline } => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_deadline(l_ref, *deadline)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
TriggerDecl::RawListener(listener) => {
let l = Arc::clone(listener);
listener_storage.push(l);
let l_ref = listener_storage.last().unwrap().as_ref();
let l_ref: &crate::trigger::RawListener = unsafe { &*(l_ref as *const _) };
let guard = waitset
.attach_notification(l_ref)
.map_err(ExecutorError::iceoryx2)?;
guards.push(guard);
attachment_to_task.push(task_idx);
}
}
}
}
let stop_listener_ref: &IxListener<ipc::Service> =
unsafe { &*(self.stop_listener.as_ref() as *const _) };
let _stop_guard = waitset
.attach_notification(stop_listener_ref)
.map_err(ExecutorError::iceoryx2)?;
let iterations_done = AtomicUsize::new(0);
let stop_flag = self.stoppable.clone();
loop {
*self.iter_err.lock().unwrap() = None;
let tasks_ptr = &mut self.tasks as *mut Vec<TaskEntry>;
let pool = &self.pool;
let iter_err_inner = Arc::clone(&self.iter_err);
let stop_listener_ptr = self.stop_listener.as_ref() as *const IxListener<ipc::Service>;
let cb_result = waitset.wait_and_process_once(
|attachment_id: WaitSetAttachmentId<ipc::Service>| {
let stop_l = unsafe { &*stop_listener_ptr };
while let Ok(Some(_)) = stop_l.try_wait_one() {}
for (i, guard) in guards.iter().enumerate() {
let fired = attachment_id.has_event_from(guard)
|| attachment_id.has_missed_deadline(guard);
if !fired {
continue;
}
let task_idx = attachment_to_task[i];
let task = unsafe { &mut (&mut *tasks_ptr)[task_idx] };
match &mut task.kind {
TaskKind::Single(_) | TaskKind::Chain(_) => {
let job_box = task
.job
.as_deref_mut()
.expect("Single/Chain tasks carry a pre-built job");
let job_ptr: *mut (dyn FnMut() + Send) =
job_box as *mut (dyn FnMut() + Send);
#[allow(unsafe_code)]
unsafe {
pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
}
}
TaskKind::Graph(graph) => {
let outcome = graph.run_once_borrowed(pool);
if let Some(source) = outcome.error {
let mut g = iter_err_inner.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: task.id.clone(),
source,
});
}
}
let _ = outcome.stopped_chain; }
}
}
pool.barrier();
CallbackProgression::Continue
},
);
let cb_result = cb_result.map_err(ExecutorError::iceoryx2)?;
if matches!(
cb_result,
WaitSetRunResult::Interrupt | WaitSetRunResult::TerminationRequest
) {
return Ok(());
}
let maybe_err = self.iter_err.lock().unwrap().take();
if let Some(err) = maybe_err {
return Err(err);
}
if stop_flag.is_stopped() {
return Ok(());
}
iterations_done.fetch_add(1, Ordering::SeqCst);
match mode {
RunMode::Forever => {}
RunMode::Iterations(n) => {
if iterations_done.load(Ordering::SeqCst) >= *n {
return Ok(());
}
}
RunMode::Until(deadline) => {
if Instant::now() >= *deadline {
return Ok(());
}
}
RunMode::Predicate(p) => {
if (p)() {
return Ok(());
}
}
}
}
}
}
#[allow(unsafe_code)]
struct SendItemPtr {
ptr: *mut dyn ExecutableItem,
}
impl SendItemPtr {
fn new(ptr: *mut dyn ExecutableItem) -> Self {
Self { ptr }
}
fn get(&self) -> *mut dyn ExecutableItem {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendItemPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendItemPtr {}
#[allow(unsafe_code)]
struct SendChainPtr {
ptr: *mut Vec<Box<dyn ExecutableItem>>,
}
impl SendChainPtr {
fn new(ptr: *mut Vec<Box<dyn ExecutableItem>>) -> Self {
Self { ptr }
}
fn get(&self) -> *mut Vec<Box<dyn ExecutableItem>> {
self.ptr
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendChainPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendChainPtr {}
#[allow(clippy::too_many_arguments)]
fn build_single_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
app_id: Option<u32>,
app_inst: Option<u32>,
item_ptr: SendItemPtr,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = item_ptr.get();
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
record_first_err(&err_slot, &id, res);
})
}
fn build_chain_job(
id: TaskId,
stop: Stoppable,
obs: Arc<dyn Observer>,
mon: Arc<dyn ExecutionMonitor>,
err_slot: Arc<std::sync::Mutex<Option<ExecutorError>>>,
chain_ptr: SendChainPtr,
) -> Box<dyn FnMut() + Send + 'static> {
Box::new(move || {
let mut ctx = crate::context::Context::new(&id, &stop, obs.as_ref());
#[allow(unsafe_code)]
let chain_items = unsafe { &mut *chain_ptr.get() };
for item_box in chain_items.iter_mut() {
let app_id = item_box.app_id();
let app_inst = item_box.app_instance_id();
if let Some(aid) = app_id {
obs.on_app_start(id.clone(), aid, app_inst);
}
let raw = std::ptr::from_mut::<dyn ExecutableItem>(item_box.as_mut());
let started = std::time::Instant::now();
mon.pre_execute(id.clone(), started);
#[allow(unsafe_code)]
let res = run_item_catch_unwind(unsafe { &mut *raw }, &mut ctx);
let took = started.elapsed();
mon.post_execute(id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
obs.on_app_error(id.clone(), e.as_ref());
}
if app_id.is_some() {
obs.on_app_stop(id.clone());
}
match res {
Ok(crate::ControlFlow::Continue) => {}
Ok(crate::ControlFlow::StopChain) => break,
Err(_) => {
record_first_err(&err_slot, &id, res);
break;
}
}
}
})
}
#[derive(Debug)]
struct PanickedTask(String);
impl core::fmt::Display for PanickedTask {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "task panicked: {}", self.0)
}
}
impl std::error::Error for PanickedTask {}
#[allow(clippy::option_if_let_else)]
fn run_item_catch_unwind(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| item.execute(ctx))).unwrap_or_else(
|payload| {
let msg = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"panicked task".to_string()
};
Err::<crate::ControlFlow, crate::ItemError>(Box::new(PanickedTask(msg)))
},
)
}
pub(crate) fn run_item_catch_unwind_external(
item: &mut dyn ExecutableItem,
ctx: &mut crate::context::Context<'_>,
) -> crate::ExecuteResult {
run_item_catch_unwind(item, ctx)
}
fn record_first_err(
slot: &Arc<std::sync::Mutex<Option<ExecutorError>>>,
id: &TaskId,
res: crate::ExecuteResult,
) {
if let Err(source) = res {
let mut g = slot.lock().unwrap();
if g.is_none() {
*g = Some(ExecutorError::Item {
task_id: id.clone(),
source,
});
}
}
}
pub struct ExecutorGraphBuilder<'e> {
executor: &'e mut Executor,
builder: crate::graph::GraphBuilder,
custom_id: Option<TaskId>,
}
impl ExecutorGraphBuilder<'_> {
pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> crate::graph::Vertex {
self.builder.vertex(item)
}
pub fn edge(&mut self, from: crate::graph::Vertex, to: crate::graph::Vertex) -> &mut Self {
self.builder.edge(from, to);
self
}
pub const fn root(&mut self, v: crate::graph::Vertex) -> &mut Self {
self.builder.root(v);
self
}
pub fn id(&mut self, id: impl Into<TaskId>) -> &mut Self {
self.custom_id = Some(id.into());
self
}
pub fn build(self) -> Result<TaskId, ExecutorError> {
let g = self.builder.finish()?;
let auto_id = || {
TaskId::new(format!(
"graph-{}",
self.executor.next_id.fetch_add(1, Ordering::SeqCst)
))
};
let id = g
.root_task_id()
.map(TaskId::new)
.or(self.custom_id)
.unwrap_or_else(auto_id);
let decls = g.decls.clone();
let mut graph_box: Box<crate::graph::Graph> = Box::new(g);
graph_box.prepare_dispatch(
id.clone(),
self.executor.stoppable.clone(),
Arc::clone(&self.executor.observer),
Arc::clone(&self.executor.monitor),
Arc::clone(&self.executor.iter_err),
);
self.executor.tasks.push(TaskEntry {
id: id.clone(),
kind: TaskKind::Graph(graph_box),
decls,
job: None,
});
Ok(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{ControlFlow, item};
#[test]
fn add_returns_unique_ids() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let a = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
let b = exec.add(item(|_| Ok(ControlFlow::Continue))).unwrap();
assert_ne!(a, b);
}
#[test]
fn custom_id_is_preserved() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let id = exec
.add_with_id("my-task", item(|_| Ok(ControlFlow::Continue)))
.unwrap();
assert_eq!(id.as_str(), "my-task");
}
#[test]
fn declare_triggers_called_at_add_time() {
let called = Arc::new(AtomicBool::new(false));
let called_d = Arc::clone(&called);
let it = crate::item::item_with_triggers(
move |_d| {
called_d.store(true, Ordering::SeqCst);
Ok(())
},
|_| Ok(ControlFlow::Continue),
);
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
exec.add(it).unwrap();
assert!(called.load(Ordering::SeqCst));
}
}