use crate::error::ExecutorError;
use crate::payload::Payload;
use iceoryx2::port::listener::Listener as IxListener;
use iceoryx2::port::notifier::Notifier as IxNotifier;
use iceoryx2::port::publisher::Publisher as IxPublisher;
use iceoryx2::port::subscriber::Subscriber as IxSubscriber;
use iceoryx2::prelude::*;
use iceoryx2::sample::Sample as IxSample;
use std::sync::Arc;
#[non_exhaustive]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct NotifyOutcome {
pub sent: bool,
pub listeners_notified: usize,
}
impl NotifyOutcome {
#[must_use]
pub const fn delivered_to_any_listener(self) -> bool {
self.sent && self.listeners_notified > 0
}
}
pub const EVENT_SUFFIX: &str = ".__taktora_event";
type IpcService = ipc::Service;
pub struct Channel<T: core::fmt::Debug + ZeroCopySend + 'static> {
pubsub: iceoryx2::service::port_factory::publish_subscribe::PortFactory<IpcService, T, ()>,
event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
}
impl<T: Payload> Channel<T> {
pub fn open_or_create(
node: &iceoryx2::node::Node<IpcService>,
topic: &str,
) -> Result<Arc<Self>, ExecutorError> {
let pubsub_name = topic
.try_into()
.map_err(|e| ExecutorError::Builder(format!("invalid topic name: {e:?}")))?;
let pubsub = node
.service_builder(&pubsub_name)
.publish_subscribe::<T>()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
let event_topic = format!("{topic}{EVENT_SUFFIX}");
let event_name = event_topic
.as_str()
.try_into()
.map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
let event = node
.service_builder(&event_name)
.event()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
Ok(Arc::new(Self { pubsub, event }))
}
pub fn publisher(self: &Arc<Self>) -> Result<Publisher<T>, ExecutorError> {
let inner = self
.pubsub
.publisher_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let notifier = self
.event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
Ok(Publisher { inner, notifier })
}
pub fn subscriber(self: &Arc<Self>) -> Result<Subscriber<T>, ExecutorError> {
let inner = self
.pubsub
.subscriber_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let listener = self
.event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
#[allow(clippy::arc_with_non_send_sync)]
let listener = Arc::new(listener);
Ok(Subscriber { inner, listener })
}
}
pub struct Publisher<T: core::fmt::Debug + ZeroCopySend + 'static> {
inner: IxPublisher<IpcService, T, ()>,
notifier: IxNotifier<IpcService>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Publisher<T> {}
impl<T: Payload + Copy> Publisher<T> {
pub fn send_copy(&self, value: T) -> Result<NotifyOutcome, ExecutorError> {
self.inner
.send_copy(value)
.map_err(ExecutorError::iceoryx2)?;
let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
Ok(NotifyOutcome {
sent: true,
listeners_notified,
})
}
}
impl<T: Payload> Publisher<T> {
pub fn loan_send<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
where
T: Default,
F: FnOnce(&mut T) -> bool,
{
let sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
let mut sample = sample.write_payload(T::default());
let cont = f(sample.payload_mut());
if !cont {
return Ok(NotifyOutcome {
sent: false,
listeners_notified: 0,
});
}
sample.send().map_err(ExecutorError::iceoryx2)?;
let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
Ok(NotifyOutcome {
sent: true,
listeners_notified,
})
}
#[allow(unsafe_code)]
pub fn loan<F>(&self, f: F) -> Result<NotifyOutcome, ExecutorError>
where
F: FnOnce(&mut core::mem::MaybeUninit<T>) -> bool,
{
let mut sample = self.inner.loan_uninit().map_err(ExecutorError::iceoryx2)?;
let cont = f(sample.payload_mut());
if !cont {
return Ok(NotifyOutcome {
sent: false,
listeners_notified: 0,
});
}
let sample = unsafe { sample.assume_init() };
sample.send().map_err(ExecutorError::iceoryx2)?;
let listeners_notified = self.notifier.notify().map_err(ExecutorError::iceoryx2)?;
Ok(NotifyOutcome {
sent: true,
listeners_notified,
})
}
}
pub struct Subscriber<T: core::fmt::Debug + ZeroCopySend + 'static> {
inner: IxSubscriber<IpcService, T, ()>,
listener: Arc<IxListener<IpcService>>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<T: core::fmt::Debug + ZeroCopySend + 'static> Send for Subscriber<T> {}
impl<T: Payload> Subscriber<T> {
pub fn take(&self) -> Result<Option<IxSample<IpcService, T, ()>>, ExecutorError> {
self.inner.receive().map_err(ExecutorError::iceoryx2)
}
#[doc(hidden)]
pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
Arc::clone(&self.listener)
}
}