taktora_executor/channel.rs
1//! `Channel<T>` — iceoryx2 publish/subscribe paired with an event service so
2//! that subscribers can be attached as triggers on the executor's `WaitSet`.
3
4use crate::error::ExecutorError;
5use crate::payload::Payload;
6use iceoryx2::port::listener::Listener as IxListener;
7use iceoryx2::port::notifier::Notifier as IxNotifier;
8use iceoryx2::port::publisher::Publisher as IxPublisher;
9use iceoryx2::port::subscriber::Subscriber as IxSubscriber;
10use iceoryx2::prelude::*;
11use iceoryx2::sample::Sample as IxSample;
12use std::sync::Arc;
13
14/// Outcome of a [`Publisher`] send operation.
15///
16/// Returned by [`Publisher::send_copy`], [`Publisher::loan_send`], and
17/// [`Publisher::loan`]. Inspect `listeners_notified` to detect dropped
18/// notifications: a value smaller than the number of subscribers known to be
19/// attached indicates that at least one listener's kernel socket buffer was
20/// full when the publisher tried to wake it. iceoryx2 will *also* log a
21/// `FailedToDeliverSignal` warning per dropped delivery; this struct lets
22/// callers detect the same condition programmatically without parsing logs.
23///
24/// Note that `listeners_notified == 0` is *not* always a problem — it just
25/// means no listeners were attached at the moment of notification (e.g.
26/// no subscribers exist yet, which is normal during startup).
27#[non_exhaustive]
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
29pub struct NotifyOutcome {
30 /// `true` if the message was published. For `loan_send` and `loan`, this
31 /// reflects the closure's return value: `false` means the closure asked
32 /// to skip the send (no payload was sent and no notification fired).
33 pub sent: bool,
34
35 /// Number of listeners the notification was successfully delivered to.
36 /// Always `0` when `sent == false`. May be less than the expected listener
37 /// count under back-pressure — see the type-level docs.
38 pub listeners_notified: usize,
39}
40
41impl NotifyOutcome {
42 /// Convenience: `true` iff the message was sent AND at least one listener
43 /// was woken. Useful for asserting end-to-end pickup in tests.
44 #[must_use]
45 pub const fn delivered_to_any_listener(self) -> bool {
46 self.sent && self.listeners_notified > 0
47 }
48}
49
50/// Suffix appended to a topic name to form the paired event-service name.
51///
52/// `Channel<T>` reserves this suffix; users must not open an event service
53/// at `<topic><EVENT_SUFFIX>` themselves through iceoryx2 directly.
54pub const EVENT_SUFFIX: &str = ".__taktora_event";
55
56type IpcService = ipc::Service;
57
58/// Pub/sub channel with a paired event service.
59pub struct Channel<T: core::fmt::Debug + ZeroCopySend + 'static> {
60 pubsub: iceoryx2::service::port_factory::publish_subscribe::PortFactory<IpcService, T, ()>,
61 event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
62}
63
64impl<T: Payload> Channel<T> {
65 /// Open or create the channel by topic name.
66 pub fn open_or_create(
67 node: &iceoryx2::node::Node<IpcService>,
68 topic: &str,
69 ) -> Result<Arc<Self>, ExecutorError> {
70 let pubsub_name = topic
71 .try_into()
72 .map_err(|e| ExecutorError::Builder(format!("invalid topic name: {e:?}")))?;
73 let pubsub = node
74 .service_builder(&pubsub_name)
75 .publish_subscribe::<T>()
76 .open_or_create()
77 .map_err(ExecutorError::iceoryx2)?;
78
79 let event_topic = format!("{topic}{EVENT_SUFFIX}");
80 let event_name = event_topic
81 .as_str()
82 .try_into()
83 .map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
84 let event = node
85 .service_builder(&event_name)
86 .event()
87 .open_or_create()
88 .map_err(ExecutorError::iceoryx2)?;
89
90 Ok(Arc::new(Self { pubsub, event }))
91 }
92
93 /// Create a new publisher attached to this channel.
94 pub fn publisher(self: &Arc<Self>) -> Result<Publisher<T>, ExecutorError> {
95 let inner = self
96 .pubsub
97 .publisher_builder()
98 .create()
99 .map_err(ExecutorError::iceoryx2)?;
100 let notifier = self
101 .event
102 .notifier_builder()
103 .create()
104 .map_err(ExecutorError::iceoryx2)?;
105 Ok(Publisher { inner, notifier })
106 }
107
108 /// Create a new subscriber attached to this channel.
109 pub fn subscriber(self: &Arc<Self>) -> Result<Subscriber<T>, ExecutorError> {
110 let inner = self
111 .pubsub
112 .subscriber_builder()
113 .create()
114 .map_err(ExecutorError::iceoryx2)?;
115 let listener = self
116 .event
117 .listener_builder()
118 .create()
119 .map_err(ExecutorError::iceoryx2)?;
120 // SAFETY: iceoryx2's `Listener<ipc::Service>` is conditionally
121 // `Send + Sync` (the impl exists but clippy cannot verify the concrete
122 // service type satisfies the bounds at this generic call site).
123 #[allow(clippy::arc_with_non_send_sync)]
124 let listener = Arc::new(listener);
125 Ok(Subscriber { inner, listener })
126 }
127}
128
129/// Pub/sub publisher that auto-notifies the paired event service on every send.
130pub struct Publisher<T: core::fmt::Debug + ZeroCopySend + 'static> {
131 inner: IxPublisher<IpcService, T, ()>,
132 notifier: IxNotifier<IpcService>,
133}
134
135// SAFETY: same rationale as `Subscriber<T>` above. `IxPublisher` is
136// `!Send` only because of the same `SingleThreaded` Rc; after port
137// creation, `publisher.send_copy(...)` and `publisher.loan_send(...)`
138// don't touch the Rc concurrently. Move-only, no Sync.
139#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
140unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Publisher<T> {}
141
142impl<T: Payload + Copy> Publisher<T> {
143 /// Send by value (copies). Notifies the paired event service on success.
144 ///
145 /// Returns a [`NotifyOutcome`] whose `listeners_notified` field reports how
146 /// many listeners actually received the wakeup. A value less than the
147 /// number of attached subscribers means at least one listener's kernel
148 /// socket buffer was full at notification time (the *data* is still
149 /// delivered — only the wakeup signal was dropped). See [`NotifyOutcome`]
150 /// for guidance on interpreting this value.
151 pub fn send_copy(&self, value: T) -> Result<NotifyOutcome, ExecutorError> {
152 self.inner
153 .send_copy(value)
154 .map_err(ExecutorError::iceoryx2)?;
155 let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
156 Ok(NotifyOutcome {
157 sent: true,
158 listeners_notified,
159 })
160 }
161}
162
163impl<T: Payload> Publisher<T> {
164 /// # Example
165 ///
166 /// ```no_run
167 /// use iceoryx2::prelude::*;
168 /// use taktora_executor::Channel;
169 /// use std::sync::Arc;
170 ///
171 /// #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
172 /// #[repr(C)]
173 /// struct Tick(u64);
174 ///
175 /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
176 /// let node = NodeBuilder::new().create::<ipc::Service>()?;
177 /// let ch: Arc<Channel<Tick>> = Channel::open_or_create(&node, "demo")?;
178 /// let publisher = ch.publisher()?;
179 ///
180 /// let _ = publisher.loan_send(|t: &mut Tick| { t.0 = 1; true })?;
181 /// # Ok(()) }
182 /// ```
183 ///
184 /// Loan a sample initialised to `T::default()`, run `f` to fill it, then
185 /// send + notify. Returns a [`NotifyOutcome`] with `sent == false` if `f`
186 /// returns `false` — caller signalled "skip send".
187 ///
188 /// When `sent == true`, `listeners_notified` reports how many listeners
189 /// received the wakeup. A value less than the number of attached subscribers
190 /// means at least one listener's kernel socket buffer was full (dropped
191 /// wakeup — data is still delivered). See [`NotifyOutcome`] for details.
192 ///
193 /// `T: Default` is required here because the shared-memory slot is
194 /// pre-initialised via `T::default()` before the closure runs. For types
195 /// that do not implement `Default`, use [`loan`](Self::loan) instead.
196 pub fn loan_send<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
197 where
198 T: Default,
199 F: FnOnce(&mut T) -> bool,
200 {
201 let sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
202 let mut sample = sample.write_payload(T::default());
203 let cont = f(sample.payload_mut());
204 if !cont {
205 return Ok(NotifyOutcome {
206 sent: false,
207 listeners_notified: 0,
208 });
209 }
210 sample.send().map_err(ExecutorError::iceoryx2)?;
211 let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
212 Ok(NotifyOutcome {
213 sent: true,
214 listeners_notified,
215 })
216 }
217
218 /// True zero-copy send. The closure receives `&mut MaybeUninit<T>`; it
219 /// must fully initialize the payload (e.g., via `MaybeUninit::write(v)`
220 /// or in-place construction such as iceoryx2's `placement_default!`)
221 /// before returning `true`. Returning `false` skips the send.
222 ///
223 /// On success, sends and notifies. Returns a [`NotifyOutcome`] with
224 /// `sent == true` if the payload was sent, `sent == false` if the closure
225 /// returned `false`. When `sent == true`, `listeners_notified` reports how
226 /// many listeners received the wakeup — see [`NotifyOutcome`] for details.
227 ///
228 /// # Contract
229 ///
230 /// **Returning `true` from the closure asserts that the payload is
231 /// fully initialized.** Returning `true` without writing a valid `T`
232 /// causes undefined behaviour at the subsequent `assume_init` step.
233 ///
234 /// `T: Default` is **not** required — that's the point of this method
235 /// versus [`loan_send`](Self::loan_send). For types that have a sensible
236 /// `Default` and are cheap to default-construct, prefer `loan_send`.
237 ///
238 /// # Example
239 ///
240 /// ```no_run
241 /// use core::mem::MaybeUninit;
242 /// use iceoryx2::prelude::*;
243 /// use taktora_executor::Channel;
244 /// use std::sync::Arc;
245 ///
246 /// #[derive(Debug, ZeroCopySend)]
247 /// #[repr(C)]
248 /// struct LargeMsg { payload: [u8; 64] }
249 ///
250 /// // Manual Default impl — e.g. initialised to a sentinel value rather
251 /// // than zero, so `loan_send` would use it but it is expensive.
252 /// impl Default for LargeMsg {
253 /// fn default() -> Self { LargeMsg { payload: [0xFF; 64] } }
254 /// }
255 ///
256 /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
257 /// let node = NodeBuilder::new().create::<ipc::Service>()?;
258 /// let ch: Arc<Channel<LargeMsg>> = Channel::open_or_create(&node, "demo")?;
259 /// let publisher = ch.publisher()?;
260 ///
261 /// let _ = publisher.loan(|slot: &mut MaybeUninit<LargeMsg>| {
262 /// // Initialise directly in shared memory — no Default construction,
263 /// // no stack temporary for the payload array.
264 /// slot.write(LargeMsg { payload: [0u8; 64] });
265 /// true
266 /// })?;
267 /// # Ok(()) }
268 /// ```
269 #[allow(unsafe_code)]
270 pub fn loan<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
271 where
272 F: FnOnce(&mut core::mem::MaybeUninit<T>) -> bool,
273 {
274 let mut sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
275 let cont = f(sample.payload_mut());
276 if !cont {
277 return Ok(NotifyOutcome {
278 sent: false,
279 listeners_notified: 0,
280 });
281 }
282 // SAFETY: the closure returned `true`, asserting that the payload was
283 // fully initialised before this point. Per the documented contract,
284 // a closure that returns `true` without writing a valid `T` is a
285 // contract violation and the resulting behaviour is undefined.
286 let sample = unsafe { sample.assume_init() };
287 sample.send().map_err(ExecutorError::iceoryx2)?;
288 let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
289 Ok(NotifyOutcome {
290 sent: true,
291 listeners_notified,
292 })
293 }
294}
295
296/// Pub/sub subscriber. Carries the paired event listener as `Arc<Listener>`
297/// so the executor can attach it to its `WaitSet`.
298pub struct Subscriber<T: core::fmt::Debug + ZeroCopySend + 'static> {
299 inner: IxSubscriber<IpcService, T, ()>,
300 listener: Arc<IxListener<IpcService>>,
301}
302
303// SAFETY:
304// `IxSubscriber<ipc::Service, T, ()>` is `!Send` because the `ipc::Service`
305// `ArcThreadSafetyPolicy` is `SingleThreaded`, which holds an `Rc<...>`.
306// The Rc is mutated only when methods that call `lock()` on the policy
307// run — primarily during port creation. After construction, the executor
308// only invokes:
309// * `subscriber.take()` → `IxSubscriber::receive()` (does not touch the
310// listener's Rc; pure shared-memory read path)
311// * `subscriber.listener_handle()` → cheap `Arc::clone` (own Arc, not iceoryx2's Rc)
312// No two threads concurrently mutate the same Rc refcount, so moving a
313// `Subscriber` to a pool worker is sound. We do not implement `Sync`;
314// `Subscriber` is move-only across threads, never shared.
315#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
316unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Subscriber<T> {}
317
318impl<T: Payload> Subscriber<T> {
319 /// Take the next sample, if any.
320 pub fn take(&self) -> Result<Option<IxSample<IpcService, T, ()>>, ExecutorError> {
321 self.inner.receive().map_err(ExecutorError::iceoryx2)
322 }
323
324 /// Borrow the listener handle (executor uses this for trigger attachment).
325 #[doc(hidden)]
326 pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
327 Arc::clone(&self.listener)
328 }
329}