Skip to main content

taktora_executor/
context.rs

1//! Per-invocation context handed to [`ExecutableItem::execute`].
2
3use crate::observer::{Observer, UserEvent};
4use crate::task_id::TaskId;
5use iceoryx2::port::notifier::Notifier as IxNotifier;
6use iceoryx2::prelude::ipc;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10/// Shared stop flag passed via [`Context::stoppable`].
11///
12/// Cloneable, thread-safe. Setting it asks the executor to terminate the
13/// run loop after the current iteration completes — and, if the handle
14/// was bound to a running executor, also wakes the `WaitSet` immediately.
15#[derive(Clone)]
16pub struct Stoppable {
17    flag: Arc<AtomicBool>,
18    waker: Option<Arc<IxNotifier<ipc::Service>>>,
19}
20
21// SAFETY: `IxNotifier<ipc::Service>` is `!Send` only because `ipc::Service`
22// uses `SingleThreaded` (an `Rc`-backed arc policy) which is mutated only at
23// port-construction time.  After the notifier is created and wrapped in `Arc`,
24// the only operation we perform on it from any thread is `notifier.notify()`,
25// which does not touch the `Rc` refcount — it writes into a lock-free shared
26// memory ring.  We never expose a `&mut Notifier` across thread boundaries and
27// we do not implement `Sync` (Arc<Stoppable> is only Clone, not Deref-to-mut),
28// so concurrent mutation of the Rc is impossible.  Moving the Arc across
29// threads is therefore sound.
30#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
31unsafe impl Send for Stoppable {}
32
33impl Default for Stoppable {
34    fn default() -> Self {
35        Self {
36            flag: Arc::new(AtomicBool::new(false)),
37            waker: None,
38        }
39    }
40}
41
42impl Stoppable {
43    /// Create a fresh, un-stopped handle with no wakeup wired.
44    /// Useful for tests; the executor uses `with_waker` to bind a notifier.
45    #[must_use]
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Internal constructor — the executor injects a notifier so `stop()`
51    /// wakes the WaitSet thread.
52    #[doc(hidden)]
53    pub(crate) fn with_waker(waker: Arc<IxNotifier<ipc::Service>>) -> Self {
54        Self {
55            flag: Arc::new(AtomicBool::new(false)),
56            waker: Some(waker),
57        }
58    }
59
60    /// Request stop. Flips the flag (Release) and, if a waker was bound,
61    /// notifies the `WaitSet` so it returns from `wait_and_process` promptly.
62    #[track_caller]
63    pub fn stop(&self) {
64        self.flag.store(true, Ordering::Release);
65        if let Some(w) = &self.waker {
66            let _ = w.notify();
67        }
68    }
69
70    /// Check whether stop has been requested.
71    #[must_use]
72    pub fn is_stopped(&self) -> bool {
73        self.flag.load(Ordering::Acquire)
74    }
75}
76
77impl core::fmt::Debug for Stoppable {
78    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
79        f.debug_struct("Stoppable")
80            .field("flag", &self.is_stopped())
81            .field("waker", &self.waker.is_some())
82            .finish()
83    }
84}
85
86/// Per-invocation context. Borrowed view; not stored across calls.
87#[non_exhaustive]
88pub struct Context<'a> {
89    task_id: &'a TaskId,
90    stop: &'a Stoppable,
91    observer: &'a dyn Observer,
92}
93
94impl<'a> Context<'a> {
95    /// Internal constructor used by the executor and the test harness.
96    #[doc(hidden)]
97    pub fn new(task_id: &'a TaskId, stop: &'a Stoppable, observer: &'a dyn Observer) -> Self {
98        Self {
99            task_id,
100            stop,
101            observer,
102        }
103    }
104
105    /// Identifier of the task currently executing.
106    pub const fn task_id(&self) -> &TaskId {
107        self.task_id
108    }
109
110    /// Request the enclosing executor to stop.
111    pub fn stop_executor(&self) {
112        self.stop.stop();
113    }
114
115    /// Get a clonable [`Stoppable`] handle that other threads may hold.
116    pub fn stoppable(&self) -> Stoppable {
117        self.stop.clone()
118    }
119
120    /// Forward a user event to the observer (no-op if no observer is configured).
121    pub fn send_event(&self, ev: UserEvent) {
122        self.observer.on_send_event(self.task_id.clone(), ev);
123    }
124}
125
126/// Test-only harness for constructing a `Context` outside an executor.
127#[cfg(test)]
128pub struct ContextHarness {
129    task_id: TaskId,
130    stop: Stoppable,
131}
132
133#[cfg(test)]
134impl ContextHarness {
135    pub(crate) fn new(id: impl Into<TaskId>) -> Self {
136        Self {
137            task_id: id.into(),
138            stop: Stoppable::new(),
139        }
140    }
141
142    pub(crate) fn context(&self) -> Context<'_> {
143        static NOOP: crate::observer::NoopObserver = crate::observer::NoopObserver;
144        Context::new(&self.task_id, &self.stop, &NOOP)
145    }
146}