Skip to main content

taktora_executor/
runner.rs

1//! Hosts an [`Executor`] on a dedicated OS thread.
2
3use crate::context::Stoppable;
4use crate::error::ExecutorError;
5use crate::executor::Executor;
6use bitflags::bitflags;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::thread::{self, JoinHandle};
10
11bitflags! {
12    /// Behaviour flags for [`Runner`].
13    #[derive(Clone, Copy, Debug, Eq, PartialEq)]
14    pub struct RunnerFlags: u32 {
15        /// Don't auto-start; require an explicit `start()` call.
16        const DEFERRED        = 1 << 0;
17        /// Reserved — actual signal-on-error wiring lands in Task 22.
18        const SIGNAL_ON_ERROR = 1 << 1;
19    }
20}
21
22/// Hosts an [`Executor`] on a dedicated thread.
23pub struct Runner {
24    handle: Option<JoinHandle<Result<(), ExecutorError>>>,
25    stop: Stoppable,
26    deferred_start: Option<crossbeam_channel::Sender<()>>,
27    /// Retains the last error emitted by the run loop so that `stop()` can
28    /// re-throw it. The Arc is cloned into the thread closure; the field here
29    /// keeps the allocation alive for the lifetime of the `Runner`.
30    #[allow(dead_code)]
31    captured_error: Arc<Mutex<Option<ExecutorError>>>,
32    flags: RunnerFlags,
33}
34
35impl Runner {
36    /// Spawn a runner thread; returns immediately. If `flags.DEFERRED` is set
37    /// the run loop blocks until [`Runner::start`] is invoked.
38    ///
39    /// # Panics
40    ///
41    /// Panics if the internal error-capture mutex is poisoned, which can only
42    /// happen if a previous holder panicked while holding it — an event that
43    /// is not expected under normal use.
44    #[allow(clippy::missing_panics_doc)] // unwrap on Mutex::lock; poisoning is unreachable in practice
45    #[track_caller]
46    pub fn new(exec: Executor, flags: RunnerFlags) -> Result<Self, ExecutorError> {
47        let stop = exec.stoppable();
48        let captured_error = Arc::new(Mutex::new(None::<ExecutorError>));
49        let captured_clone = Arc::clone(&captured_error);
50        let mut exec = exec;
51
52        let (start_tx, start_rx) = crossbeam_channel::bounded::<()>(1);
53        let deferred = flags.contains(RunnerFlags::DEFERRED);
54
55        let handle = thread::Builder::new()
56            .name("taktora-runner".to_owned())
57            .spawn(move || -> Result<(), ExecutorError> {
58                if deferred {
59                    let _ = start_rx.recv();
60                }
61                let res = exec.run();
62                if let Err(e) = &res {
63                    *captured_clone.lock().unwrap() = Some(clone_executor_error(e));
64                }
65                res
66            })
67            .map_err(|e| ExecutorError::Builder(format!("spawn runner: {e}")))?;
68
69        Ok(Self {
70            handle: Some(handle),
71            stop,
72            deferred_start: if deferred { Some(start_tx) } else { None },
73            captured_error,
74            flags,
75        })
76    }
77
78    /// Resume a deferred runner. No-op if the runner was not deferred.
79    pub fn start(&mut self) -> Result<(), ExecutorError> {
80        if let Some(tx) = self.deferred_start.take() {
81            tx.send(()).map_err(|_| ExecutorError::RunnerJoin)?;
82        }
83        Ok(())
84    }
85
86    /// Stop the runner and re-throw any captured item error.
87    pub fn stop(&mut self) -> Result<(), ExecutorError> {
88        if self.deferred_start.is_some() {
89            // Releasing the deferred wait so the thread can run + observe stop.
90            let _ = self.start();
91        }
92        self.stop.stop();
93        self.handle.take().map_or_else(
94            || Ok(()),
95            |handle| match handle.join() {
96                Ok(Ok(())) => Ok(()),
97                Ok(Err(e)) => Err(e),
98                Err(_) => Err(ExecutorError::RunnerJoin),
99            },
100        )
101    }
102
103    /// Get a [`Stoppable`] for sharing into other threads.
104    pub fn stoppable(&self) -> Stoppable {
105        self.stop.clone()
106    }
107}
108
109impl Drop for Runner {
110    fn drop(&mut self) {
111        // Best-effort: stop and surface error via tracing or stderr; never panic.
112        match self.stop() {
113            Ok(()) => {}
114            Err(e) => {
115                #[cfg(feature = "tracing")]
116                tracing::error!(target: "taktora-executor", error = %e, "Runner dropped with error");
117                #[cfg(not(feature = "tracing"))]
118                eprintln!("[taktora-executor] runner dropped with error: {e}");
119                let _ = self.flags;
120            }
121        }
122    }
123}
124
125fn clone_executor_error(e: &ExecutorError) -> ExecutorError {
126    // Errors don't impl Clone (ItemError is dyn Error); we synthesise an
127    // equivalent description.
128    match e {
129        ExecutorError::Iceoryx2(s) => ExecutorError::Iceoryx2(s.clone()),
130        ExecutorError::InvalidGraph(s) => ExecutorError::InvalidGraph(s.clone()),
131        ExecutorError::DeclareTriggers(s) => ExecutorError::DeclareTriggers(s.clone()),
132        ExecutorError::Item { task_id, source } => ExecutorError::Item {
133            task_id: task_id.clone(),
134            source: Box::new(StringError(source.to_string())),
135        },
136        ExecutorError::AlreadyRunning => ExecutorError::AlreadyRunning,
137        ExecutorError::RunnerJoin => ExecutorError::RunnerJoin,
138        ExecutorError::Builder(s) => ExecutorError::Builder(s.clone()),
139    }
140}
141
142#[derive(Debug)]
143struct StringError(String);
144impl core::fmt::Display for StringError {
145    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
146        f.write_str(&self.0)
147    }
148}
149impl std::error::Error for StringError {}