taktora_executor/
runner.rs1use crate::context::Stoppable;
4use crate::error::ExecutorError;
5use crate::executor::Executor;
6use bitflags::bitflags;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::thread::{self, JoinHandle};
10
11bitflags! {
12 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
14 pub struct RunnerFlags: u32 {
15 const DEFERRED = 1 << 0;
17 const SIGNAL_ON_ERROR = 1 << 1;
19 }
20}
21
22pub struct Runner {
24 handle: Option<JoinHandle<Result<(), ExecutorError>>>,
25 stop: Stoppable,
26 deferred_start: Option<crossbeam_channel::Sender<()>>,
27 #[allow(dead_code)]
31 captured_error: Arc<Mutex<Option<ExecutorError>>>,
32 flags: RunnerFlags,
33}
34
35impl Runner {
36 #[allow(clippy::missing_panics_doc)] #[track_caller]
46 pub fn new(exec: Executor, flags: RunnerFlags) -> Result<Self, ExecutorError> {
47 let stop = exec.stoppable();
48 let captured_error = Arc::new(Mutex::new(None::<ExecutorError>));
49 let captured_clone = Arc::clone(&captured_error);
50 let mut exec = exec;
51
52 let (start_tx, start_rx) = crossbeam_channel::bounded::<()>(1);
53 let deferred = flags.contains(RunnerFlags::DEFERRED);
54
55 let handle = thread::Builder::new()
56 .name("taktora-runner".to_owned())
57 .spawn(move || -> Result<(), ExecutorError> {
58 if deferred {
59 let _ = start_rx.recv();
60 }
61 let res = exec.run();
62 if let Err(e) = &res {
63 *captured_clone.lock().unwrap() = Some(clone_executor_error(e));
64 }
65 res
66 })
67 .map_err(|e| ExecutorError::Builder(format!("spawn runner: {e}")))?;
68
69 Ok(Self {
70 handle: Some(handle),
71 stop,
72 deferred_start: if deferred { Some(start_tx) } else { None },
73 captured_error,
74 flags,
75 })
76 }
77
78 pub fn start(&mut self) -> Result<(), ExecutorError> {
80 if let Some(tx) = self.deferred_start.take() {
81 tx.send(()).map_err(|_| ExecutorError::RunnerJoin)?;
82 }
83 Ok(())
84 }
85
86 pub fn stop(&mut self) -> Result<(), ExecutorError> {
88 if self.deferred_start.is_some() {
89 let _ = self.start();
91 }
92 self.stop.stop();
93 self.handle.take().map_or_else(
94 || Ok(()),
95 |handle| match handle.join() {
96 Ok(Ok(())) => Ok(()),
97 Ok(Err(e)) => Err(e),
98 Err(_) => Err(ExecutorError::RunnerJoin),
99 },
100 )
101 }
102
103 pub fn stoppable(&self) -> Stoppable {
105 self.stop.clone()
106 }
107}
108
109impl Drop for Runner {
110 fn drop(&mut self) {
111 match self.stop() {
113 Ok(()) => {}
114 Err(e) => {
115 #[cfg(feature = "tracing")]
116 tracing::error!(target: "taktora-executor", error = %e, "Runner dropped with error");
117 #[cfg(not(feature = "tracing"))]
118 eprintln!("[taktora-executor] runner dropped with error: {e}");
119 let _ = self.flags;
120 }
121 }
122 }
123}
124
125fn clone_executor_error(e: &ExecutorError) -> ExecutorError {
126 match e {
129 ExecutorError::Iceoryx2(s) => ExecutorError::Iceoryx2(s.clone()),
130 ExecutorError::InvalidGraph(s) => ExecutorError::InvalidGraph(s.clone()),
131 ExecutorError::DeclareTriggers(s) => ExecutorError::DeclareTriggers(s.clone()),
132 ExecutorError::Item { task_id, source } => ExecutorError::Item {
133 task_id: task_id.clone(),
134 source: Box::new(StringError(source.to_string())),
135 },
136 ExecutorError::AlreadyRunning => ExecutorError::AlreadyRunning,
137 ExecutorError::RunnerJoin => ExecutorError::RunnerJoin,
138 ExecutorError::Builder(s) => ExecutorError::Builder(s.clone()),
139 }
140}
141
142#[derive(Debug)]
143struct StringError(String);
144impl core::fmt::Display for StringError {
145 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
146 f.write_str(&self.0)
147 }
148}
149impl std::error::Error for StringError {}