Skip to main content

taktora_executor/
channel.rs

1//! `Channel<T>` — iceoryx2 publish/subscribe paired with an event service so
2//! that subscribers can be attached as triggers on the executor's `WaitSet`.
3
4use crate::error::ExecutorError;
5use crate::payload::Payload;
6use iceoryx2::port::listener::Listener as IxListener;
7use iceoryx2::port::notifier::Notifier as IxNotifier;
8use iceoryx2::port::publisher::Publisher as IxPublisher;
9use iceoryx2::port::subscriber::Subscriber as IxSubscriber;
10use iceoryx2::prelude::*;
11use iceoryx2::sample::Sample as IxSample;
12use std::sync::Arc;
13
14/// Outcome of a [`Publisher`] send operation.
15///
16/// Returned by [`Publisher::send_copy`], [`Publisher::loan_send`], and
17/// [`Publisher::loan`]. Inspect `listeners_notified` to detect dropped
18/// notifications: a value smaller than the number of subscribers known to be
19/// attached indicates that at least one listener's kernel socket buffer was
20/// full when the publisher tried to wake it. iceoryx2 will *also* log a
21/// `FailedToDeliverSignal` warning per dropped delivery; this struct lets
22/// callers detect the same condition programmatically without parsing logs.
23///
24/// Note that `listeners_notified == 0` is *not* always a problem — it just
25/// means no listeners were attached at the moment of notification (e.g.
26/// no subscribers exist yet, which is normal during startup).
27#[non_exhaustive]
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
29pub struct NotifyOutcome {
30    /// `true` if the message was published. For `loan_send` and `loan`, this
31    /// reflects the closure's return value: `false` means the closure asked
32    /// to skip the send (no payload was sent and no notification fired).
33    pub sent: bool,
34
35    /// Number of listeners the notification was successfully delivered to.
36    /// Always `0` when `sent == false`. May be less than the expected listener
37    /// count under back-pressure — see the type-level docs.
38    pub listeners_notified: usize,
39}
40
41impl NotifyOutcome {
42    /// Convenience: `true` iff the message was sent AND at least one listener
43    /// was woken. Useful for asserting end-to-end pickup in tests.
44    #[must_use]
45    pub const fn delivered_to_any_listener(self) -> bool {
46        self.sent && self.listeners_notified > 0
47    }
48}
49
50/// Suffix appended to a topic name to form the paired event-service name.
51///
52/// `Channel<T>` reserves this suffix; users must not open an event service
53/// at `<topic><EVENT_SUFFIX>` themselves through iceoryx2 directly.
54pub const EVENT_SUFFIX: &str = ".__taktora_event";
55
56type IpcService = ipc::Service;
57
58/// Pub/sub channel with a paired event service.
59pub struct Channel<T: core::fmt::Debug + ZeroCopySend + 'static> {
60    pubsub: iceoryx2::service::port_factory::publish_subscribe::PortFactory<IpcService, T, ()>,
61    event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
62}
63
64impl<T: Payload> Channel<T> {
65    /// Open or create the channel by topic name.
66    pub fn open_or_create(
67        node: &iceoryx2::node::Node<IpcService>,
68        topic: &str,
69    ) -> Result<Arc<Self>, ExecutorError> {
70        let pubsub_name = topic
71            .try_into()
72            .map_err(|e| ExecutorError::Builder(format!("invalid topic name: {e:?}")))?;
73        let pubsub = node
74            .service_builder(&pubsub_name)
75            .publish_subscribe::<T>()
76            .open_or_create()
77            .map_err(ExecutorError::iceoryx2)?;
78
79        let event_topic = format!("{topic}{EVENT_SUFFIX}");
80        let event_name = event_topic
81            .as_str()
82            .try_into()
83            .map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
84        let event = node
85            .service_builder(&event_name)
86            .event()
87            .open_or_create()
88            .map_err(ExecutorError::iceoryx2)?;
89
90        Ok(Arc::new(Self { pubsub, event }))
91    }
92
93    /// Create a new publisher attached to this channel.
94    pub fn publisher(self: &Arc<Self>) -> Result<Publisher<T>, ExecutorError> {
95        let inner = self
96            .pubsub
97            .publisher_builder()
98            .create()
99            .map_err(ExecutorError::iceoryx2)?;
100        let notifier = self
101            .event
102            .notifier_builder()
103            .create()
104            .map_err(ExecutorError::iceoryx2)?;
105        Ok(Publisher { inner, notifier })
106    }
107
108    /// Create a new subscriber attached to this channel.
109    pub fn subscriber(self: &Arc<Self>) -> Result<Subscriber<T>, ExecutorError> {
110        let inner = self
111            .pubsub
112            .subscriber_builder()
113            .create()
114            .map_err(ExecutorError::iceoryx2)?;
115        let listener = self
116            .event
117            .listener_builder()
118            .create()
119            .map_err(ExecutorError::iceoryx2)?;
120        // SAFETY: iceoryx2's `Listener<ipc::Service>` is conditionally
121        // `Send + Sync` (the impl exists but clippy cannot verify the concrete
122        // service type satisfies the bounds at this generic call site).
123        #[allow(clippy::arc_with_non_send_sync)]
124        let listener = Arc::new(listener);
125        Ok(Subscriber { inner, listener })
126    }
127}
128
129/// Pub/sub publisher that auto-notifies the paired event service on every send.
130pub struct Publisher<T: core::fmt::Debug + ZeroCopySend + 'static> {
131    inner: IxPublisher<IpcService, T, ()>,
132    notifier: IxNotifier<IpcService>,
133}
134
135// SAFETY: same rationale as `Subscriber<T>` above. `IxPublisher` is
136// `!Send` only because of the same `SingleThreaded` Rc; after port
137// creation, `publisher.send_copy(...)` and `publisher.loan_send(...)`
138// don't touch the Rc concurrently. Move-only, no Sync.
139#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
140unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Publisher<T> {}
141
142impl<T: Payload + Copy> Publisher<T> {
143    /// Send by value (copies). Notifies the paired event service on success.
144    ///
145    /// Returns a [`NotifyOutcome`] whose `listeners_notified` field reports how
146    /// many listeners actually received the wakeup. A value less than the
147    /// number of attached subscribers means at least one listener's kernel
148    /// socket buffer was full at notification time (the *data* is still
149    /// delivered — only the wakeup signal was dropped). See [`NotifyOutcome`]
150    /// for guidance on interpreting this value.
151    pub fn send_copy(&self, value: T) -> Result<NotifyOutcome, ExecutorError> {
152        self.inner
153            .send_copy(value)
154            .map_err(ExecutorError::iceoryx2)?;
155        let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
156        Ok(NotifyOutcome {
157            sent: true,
158            listeners_notified,
159        })
160    }
161}
162
163impl<T: Payload> Publisher<T> {
164    /// # Example
165    ///
166    /// ```no_run
167    /// use iceoryx2::prelude::*;
168    /// use taktora_executor::Channel;
169    /// use std::sync::Arc;
170    ///
171    /// #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
172    /// #[repr(C)]
173    /// struct Tick(u64);
174    ///
175    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
176    /// let node = NodeBuilder::new().create::<ipc::Service>()?;
177    /// let ch: Arc<Channel<Tick>> = Channel::open_or_create(&node, "demo")?;
178    /// let publisher = ch.publisher()?;
179    ///
180    /// let _ = publisher.loan_send(|t: &mut Tick| { t.0 = 1; true })?;
181    /// # Ok(()) }
182    /// ```
183    ///
184    /// Loan a sample initialised to `T::default()`, run `f` to fill it, then
185    /// send + notify. Returns a [`NotifyOutcome`] with `sent == false` if `f`
186    /// returns `false` — caller signalled "skip send".
187    ///
188    /// When `sent == true`, `listeners_notified` reports how many listeners
189    /// received the wakeup. A value less than the number of attached subscribers
190    /// means at least one listener's kernel socket buffer was full (dropped
191    /// wakeup — data is still delivered). See [`NotifyOutcome`] for details.
192    ///
193    /// `T: Default` is required here because the shared-memory slot is
194    /// pre-initialised via `T::default()` before the closure runs. For types
195    /// that do not implement `Default`, use [`loan`](Self::loan) instead.
196    pub fn loan_send<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
197    where
198        T: Default,
199        F: FnOnce(&mut T) -> bool,
200    {
201        let sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
202        let mut sample = sample.write_payload(T::default());
203        let cont = f(sample.payload_mut());
204        if !cont {
205            return Ok(NotifyOutcome {
206                sent: false,
207                listeners_notified: 0,
208            });
209        }
210        sample.send().map_err(ExecutorError::iceoryx2)?;
211        let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
212        Ok(NotifyOutcome {
213            sent: true,
214            listeners_notified,
215        })
216    }
217
218    /// True zero-copy send. The closure receives `&mut MaybeUninit<T>`; it
219    /// must fully initialize the payload (e.g., via `MaybeUninit::write(v)`
220    /// or in-place construction such as iceoryx2's `placement_default!`)
221    /// before returning `true`. Returning `false` skips the send.
222    ///
223    /// On success, sends and notifies. Returns a [`NotifyOutcome`] with
224    /// `sent == true` if the payload was sent, `sent == false` if the closure
225    /// returned `false`. When `sent == true`, `listeners_notified` reports how
226    /// many listeners received the wakeup — see [`NotifyOutcome`] for details.
227    ///
228    /// # Contract
229    ///
230    /// **Returning `true` from the closure asserts that the payload is
231    /// fully initialized.** Returning `true` without writing a valid `T`
232    /// causes undefined behaviour at the subsequent `assume_init` step.
233    ///
234    /// `T: Default` is **not** required — that's the point of this method
235    /// versus [`loan_send`](Self::loan_send). For types that have a sensible
236    /// `Default` and are cheap to default-construct, prefer `loan_send`.
237    ///
238    /// # Example
239    ///
240    /// ```no_run
241    /// use core::mem::MaybeUninit;
242    /// use iceoryx2::prelude::*;
243    /// use taktora_executor::Channel;
244    /// use std::sync::Arc;
245    ///
246    /// #[derive(Debug, ZeroCopySend)]
247    /// #[repr(C)]
248    /// struct LargeMsg { payload: [u8; 64] }
249    ///
250    /// // Manual Default impl — e.g. initialised to a sentinel value rather
251    /// // than zero, so `loan_send` would use it but it is expensive.
252    /// impl Default for LargeMsg {
253    ///     fn default() -> Self { LargeMsg { payload: [0xFF; 64] } }
254    /// }
255    ///
256    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
257    /// let node = NodeBuilder::new().create::<ipc::Service>()?;
258    /// let ch: Arc<Channel<LargeMsg>> = Channel::open_or_create(&node, "demo")?;
259    /// let publisher = ch.publisher()?;
260    ///
261    /// let _ = publisher.loan(|slot: &mut MaybeUninit<LargeMsg>| {
262    ///     // Initialise directly in shared memory — no Default construction,
263    ///     // no stack temporary for the payload array.
264    ///     slot.write(LargeMsg { payload: [0u8; 64] });
265    ///     true
266    /// })?;
267    /// # Ok(()) }
268    /// ```
269    #[allow(unsafe_code)]
270    pub fn loan<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
271    where
272        F: FnOnce(&mut core::mem::MaybeUninit<T>) -> bool,
273    {
274        let mut sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
275        let cont = f(sample.payload_mut());
276        if !cont {
277            return Ok(NotifyOutcome {
278                sent: false,
279                listeners_notified: 0,
280            });
281        }
282        // SAFETY: the closure returned `true`, asserting that the payload was
283        // fully initialised before this point. Per the documented contract,
284        // a closure that returns `true` without writing a valid `T` is a
285        // contract violation and the resulting behaviour is undefined.
286        let sample = unsafe { sample.assume_init() };
287        sample.send().map_err(ExecutorError::iceoryx2)?;
288        let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
289        Ok(NotifyOutcome {
290            sent: true,
291            listeners_notified,
292        })
293    }
294}
295
296/// Pub/sub subscriber. Carries the paired event listener as `Arc<Listener>`
297/// so the executor can attach it to its `WaitSet`.
298pub struct Subscriber<T: core::fmt::Debug + ZeroCopySend + 'static> {
299    inner: IxSubscriber<IpcService, T, ()>,
300    listener: Arc<IxListener<IpcService>>,
301}
302
303// SAFETY:
304// `IxSubscriber<ipc::Service, T, ()>` is `!Send` because the `ipc::Service`
305// `ArcThreadSafetyPolicy` is `SingleThreaded`, which holds an `Rc<...>`.
306// The Rc is mutated only when methods that call `lock()` on the policy
307// run — primarily during port creation. After construction, the executor
308// only invokes:
309//   * `subscriber.take()` → `IxSubscriber::receive()` (does not touch the
310//     listener's Rc; pure shared-memory read path)
311//   * `subscriber.listener_handle()` → cheap `Arc::clone` (own Arc, not iceoryx2's Rc)
312// No two threads concurrently mutate the same Rc refcount, so moving a
313// `Subscriber` to a pool worker is sound. We do not implement `Sync`;
314// `Subscriber` is move-only across threads, never shared.
315#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
316unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Subscriber<T> {}
317
318impl<T: Payload> Subscriber<T> {
319    /// Take the next sample, if any.
320    pub fn take(&self) -> Result<Option<IxSample<IpcService, T, ()>>, ExecutorError> {
321        self.inner.receive().map_err(ExecutorError::iceoryx2)
322    }
323
324    /// Borrow the listener handle (executor uses this for trigger attachment).
325    #[doc(hidden)]
326    pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
327        Arc::clone(&self.listener)
328    }
329}