taktora_executor/
context.rs1use crate::observer::{Observer, UserEvent};
4use crate::task_id::TaskId;
5use iceoryx2::port::notifier::Notifier as IxNotifier;
6use iceoryx2::prelude::ipc;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10#[derive(Clone)]
16pub struct Stoppable {
17 flag: Arc<AtomicBool>,
18 waker: Option<Arc<IxNotifier<ipc::Service>>>,
19}
20
21#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
31unsafe impl Send for Stoppable {}
32
33impl Default for Stoppable {
34 fn default() -> Self {
35 Self {
36 flag: Arc::new(AtomicBool::new(false)),
37 waker: None,
38 }
39 }
40}
41
42impl Stoppable {
43 #[must_use]
46 pub fn new() -> Self {
47 Self::default()
48 }
49
50 #[doc(hidden)]
53 pub(crate) fn with_waker(waker: Arc<IxNotifier<ipc::Service>>) -> Self {
54 Self {
55 flag: Arc::new(AtomicBool::new(false)),
56 waker: Some(waker),
57 }
58 }
59
60 #[track_caller]
63 pub fn stop(&self) {
64 self.flag.store(true, Ordering::Release);
65 if let Some(w) = &self.waker {
66 let _ = w.notify();
67 }
68 }
69
70 #[must_use]
72 pub fn is_stopped(&self) -> bool {
73 self.flag.load(Ordering::Acquire)
74 }
75}
76
77impl core::fmt::Debug for Stoppable {
78 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
79 f.debug_struct("Stoppable")
80 .field("flag", &self.is_stopped())
81 .field("waker", &self.waker.is_some())
82 .finish()
83 }
84}
85
86#[non_exhaustive]
88pub struct Context<'a> {
89 task_id: &'a TaskId,
90 stop: &'a Stoppable,
91 observer: &'a dyn Observer,
92}
93
94impl<'a> Context<'a> {
95 #[doc(hidden)]
97 pub fn new(task_id: &'a TaskId, stop: &'a Stoppable, observer: &'a dyn Observer) -> Self {
98 Self {
99 task_id,
100 stop,
101 observer,
102 }
103 }
104
105 pub const fn task_id(&self) -> &TaskId {
107 self.task_id
108 }
109
110 pub fn stop_executor(&self) {
112 self.stop.stop();
113 }
114
115 pub fn stoppable(&self) -> Stoppable {
117 self.stop.clone()
118 }
119
120 pub fn send_event(&self, ev: UserEvent) {
122 self.observer.on_send_event(self.task_id.clone(), ev);
123 }
124}
125
126#[cfg(test)]
128pub struct ContextHarness {
129 task_id: TaskId,
130 stop: Stoppable,
131}
132
133#[cfg(test)]
134impl ContextHarness {
135 pub(crate) fn new(id: impl Into<TaskId>) -> Self {
136 Self {
137 task_id: id.into(),
138 stop: Stoppable::new(),
139 }
140 }
141
142 pub(crate) fn context(&self) -> Context<'_> {
143 static NOOP: crate::observer::NoopObserver = crate::observer::NoopObserver;
144 Context::new(&self.task_id, &self.stop, &NOOP)
145 }
146}