use crate::context::Stoppable;
use crate::error::ExecutorError;
use crate::executor::Executor;
use bitflags::bitflags;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::{self, JoinHandle};
bitflags! {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RunnerFlags: u32 {
const DEFERRED = 1 << 0;
const SIGNAL_ON_ERROR = 1 << 1;
}
}
pub struct Runner {
handle: Option<JoinHandle<Result<(), ExecutorError>>>,
stop: Stoppable,
deferred_start: Option<crossbeam_channel::Sender<()>>,
#[allow(dead_code)]
captured_error: Arc<Mutex<Option<ExecutorError>>>,
flags: RunnerFlags,
}
impl Runner {
#[allow(clippy::missing_panics_doc)] #[track_caller]
pub fn new(exec: Executor, flags: RunnerFlags) -> Result<Self, ExecutorError> {
let stop = exec.stoppable();
let captured_error = Arc::new(Mutex::new(None::<ExecutorError>));
let captured_clone = Arc::clone(&captured_error);
let mut exec = exec;
let (start_tx, start_rx) = crossbeam_channel::bounded::<()>(1);
let deferred = flags.contains(RunnerFlags::DEFERRED);
let handle = thread::Builder::new()
.name("taktora-runner".to_owned())
.spawn(move || -> Result<(), ExecutorError> {
if deferred {
let _ = start_rx.recv();
}
let res = exec.run();
if let Err(e) = &res {
*captured_clone.lock().unwrap() = Some(clone_executor_error(e));
}
res
})
.map_err(|e| ExecutorError::Builder(format!("spawn runner: {e}")))?;
Ok(Self {
handle: Some(handle),
stop,
deferred_start: if deferred { Some(start_tx) } else { None },
captured_error,
flags,
})
}
pub fn start(&mut self) -> Result<(), ExecutorError> {
if let Some(tx) = self.deferred_start.take() {
tx.send(()).map_err(|_| ExecutorError::RunnerJoin)?;
}
Ok(())
}
pub fn stop(&mut self) -> Result<(), ExecutorError> {
if self.deferred_start.is_some() {
let _ = self.start();
}
self.stop.stop();
self.handle.take().map_or_else(
|| Ok(()),
|handle| match handle.join() {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(ExecutorError::RunnerJoin),
},
)
}
pub fn stoppable(&self) -> Stoppable {
self.stop.clone()
}
}
impl Drop for Runner {
fn drop(&mut self) {
match self.stop() {
Ok(()) => {}
Err(e) => {
#[cfg(feature = "tracing")]
tracing::error!(target: "taktora-executor", error = %e, "Runner dropped with error");
#[cfg(not(feature = "tracing"))]
eprintln!("[taktora-executor] runner dropped with error: {e}");
let _ = self.flags;
}
}
}
}
fn clone_executor_error(e: &ExecutorError) -> ExecutorError {
match e {
ExecutorError::Iceoryx2(s) => ExecutorError::Iceoryx2(s.clone()),
ExecutorError::InvalidGraph(s) => ExecutorError::InvalidGraph(s.clone()),
ExecutorError::DeclareTriggers(s) => ExecutorError::DeclareTriggers(s.clone()),
ExecutorError::Item { task_id, source } => ExecutorError::Item {
task_id: task_id.clone(),
source: Box::new(StringError(source.to_string())),
},
ExecutorError::AlreadyRunning => ExecutorError::AlreadyRunning,
ExecutorError::RunnerJoin => ExecutorError::RunnerJoin,
ExecutorError::Builder(s) => ExecutorError::Builder(s.clone()),
}
}
#[derive(Debug)]
struct StringError(String);
impl core::fmt::Display for StringError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for StringError {}