use core::marker::PhantomData;
use iceoryx2::node::{Node, NodeBuilder};
use iceoryx2::port::listener::Listener;
use iceoryx2::port::notifier::Notifier;
use iceoryx2::port::publisher::Publisher;
use iceoryx2::port::subscriber::Subscriber;
use iceoryx2::service::ipc::Service;
use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
use iceoryx2::service::ipc_threadsafe::Service as TsService;
use crate::FlatStruct;
#[derive(Debug)]
#[non_exhaustive]
pub enum Iceoryx2Error {
Service(alloc::string::String),
Publisher(alloc::string::String),
Subscriber(alloc::string::String),
Send(alloc::string::String),
Receive(alloc::string::String),
TypeHashMismatch,
EmptyServiceName,
}
impl core::fmt::Display for Iceoryx2Error {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Service(e) => write!(f, "iceoryx2 service: {e}"),
Self::Publisher(e) => write!(f, "iceoryx2 publisher: {e}"),
Self::Subscriber(e) => write!(f, "iceoryx2 subscriber: {e}"),
Self::Send(e) => write!(f, "iceoryx2 send: {e}"),
Self::Receive(e) => write!(f, "iceoryx2 receive: {e}"),
Self::TypeHashMismatch => f.write_str("iceoryx2 service type-hash mismatch"),
Self::EmptyServiceName => f.write_str("iceoryx2 service name must be non-empty"),
}
}
}
impl std::error::Error for Iceoryx2Error {}
fn build_service_name<T: FlatStruct>(base: &str) -> alloc::string::String {
use core::fmt::Write;
let mut s = alloc::string::String::with_capacity(base.len() + 1 + 32);
s.push_str(base);
s.push('#');
for b in T::TYPE_HASH.iter() {
let _ = write!(&mut s, "{b:02x}");
}
s
}
struct ServiceCtx<T: FlatStruct> {
_node: Node<Service>,
factory: PortFactory<Service, [u8], ()>,
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> ServiceCtx<T> {
fn open_or_create(base: &str) -> Result<Self, Iceoryx2Error> {
if base.is_empty() {
return Err(Iceoryx2Error::EmptyServiceName);
}
let node = NodeBuilder::new()
.create::<Service>()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let svc_name = build_service_name::<T>(base);
let svc_id = svc_name
.as_str()
.try_into()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("invalid service name: {e:?}")))?;
let factory = node
.service_builder(&svc_id)
.publish_subscribe::<[u8]>()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
Ok(Self {
_node: node,
factory,
_t: PhantomData,
})
}
}
pub struct Iceoryx2Publisher<T: FlatStruct> {
ctx: ServiceCtx<T>,
publisher: Publisher<Service, [u8], ()>,
}
impl<T: FlatStruct> Iceoryx2Publisher<T> {
pub fn create(service_name: &str) -> Result<Self, Iceoryx2Error> {
let ctx = ServiceCtx::<T>::open_or_create(service_name)?;
let publisher = ctx
.factory
.publisher_builder()
.initial_max_slice_len(T::WIRE_SIZE)
.create()
.map_err(|e| Iceoryx2Error::Publisher(alloc::format!("{e:?}")))?;
Ok(Self { ctx, publisher })
}
pub fn send(&self, sample: &T) -> Result<(), Iceoryx2Error> {
let bytes = sample.as_bytes();
let loan = self
.publisher
.loan_slice_uninit(bytes.len())
.map_err(|e| Iceoryx2Error::Send(alloc::format!("loan: {e:?}")))?;
let initialized = loan.write_from_slice(bytes);
initialized
.send()
.map_err(|e| Iceoryx2Error::Send(alloc::format!("send: {e:?}")))?;
Ok(())
}
#[must_use]
pub fn service_name(&self) -> alloc::string::String {
let _ = &self.ctx;
alloc::format!("zerodds-flatdata#TYPE_HASH={:02x?}", T::TYPE_HASH)
}
}
pub struct RawIceoryx2Publisher {
_node: Node<TsService>,
publisher: Publisher<TsService, [u8], ()>,
notifier: Notifier<TsService>,
}
impl RawIceoryx2Publisher {
pub fn create(service_name: &str, max_len: usize) -> Result<Self, Iceoryx2Error> {
if service_name.is_empty() {
return Err(Iceoryx2Error::EmptyServiceName);
}
let node = NodeBuilder::new()
.create::<TsService>()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let svc_id = service_name
.try_into()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("invalid service name: {e:?}")))?;
let factory = node
.service_builder(&svc_id)
.publish_subscribe::<[u8]>()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let publisher = factory
.publisher_builder()
.initial_max_slice_len(max_len)
.create()
.map_err(|e| Iceoryx2Error::Publisher(alloc::format!("{e:?}")))?;
let event = node
.service_builder(&svc_id)
.event()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("event: {e:?}")))?;
let notifier = event
.notifier_builder()
.create()
.map_err(|e| Iceoryx2Error::Publisher(alloc::format!("notifier: {e:?}")))?;
Ok(Self {
_node: node,
publisher,
notifier,
})
}
pub fn send(&self, bytes: &[u8]) -> Result<(), Iceoryx2Error> {
let loan = self
.publisher
.loan_slice_uninit(bytes.len())
.map_err(|e| Iceoryx2Error::Send(alloc::format!("loan: {e:?}")))?;
let initialized = loan.write_from_slice(bytes);
initialized
.send()
.map_err(|e| Iceoryx2Error::Send(alloc::format!("send: {e:?}")))?;
let _ = self.notifier.notify();
Ok(())
}
}
pub struct RawIceoryx2Subscriber {
_node: Node<TsService>,
subscriber: Subscriber<TsService, [u8], ()>,
listener: Listener<TsService>,
}
impl RawIceoryx2Subscriber {
pub fn create(service_name: &str) -> Result<Self, Iceoryx2Error> {
if service_name.is_empty() {
return Err(Iceoryx2Error::EmptyServiceName);
}
let node = NodeBuilder::new()
.create::<TsService>()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let svc_id = service_name
.try_into()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("invalid service name: {e:?}")))?;
let factory = node
.service_builder(&svc_id)
.publish_subscribe::<[u8]>()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let subscriber = factory
.subscriber_builder()
.create()
.map_err(|e| Iceoryx2Error::Subscriber(alloc::format!("{e:?}")))?;
let event = node
.service_builder(&svc_id)
.event()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("event: {e:?}")))?;
let listener = event
.listener_builder()
.create()
.map_err(|e| Iceoryx2Error::Subscriber(alloc::format!("listener: {e:?}")))?;
Ok(Self {
_node: node,
subscriber,
listener,
})
}
#[must_use]
pub fn wait(&self, timeout: core::time::Duration) -> bool {
matches!(self.listener.timed_wait_one(timeout), Ok(Some(_)))
}
pub fn receive(&self) -> Result<Option<alloc::vec::Vec<u8>>, Iceoryx2Error> {
let Some(sample) = self
.subscriber
.receive()
.map_err(|e| Iceoryx2Error::Receive(alloc::format!("{e:?}")))?
else {
return Ok(None);
};
Ok(Some(sample.payload().to_vec()))
}
}
pub struct Iceoryx2Subscriber<T: FlatStruct> {
_ctx: ServiceCtx<T>,
subscriber: Subscriber<Service, [u8], ()>,
}
impl<T: FlatStruct> Iceoryx2Subscriber<T> {
pub fn create(service_name: &str) -> Result<Self, Iceoryx2Error> {
let ctx = ServiceCtx::<T>::open_or_create(service_name)?;
let subscriber = ctx
.factory
.subscriber_builder()
.create()
.map_err(|e| Iceoryx2Error::Subscriber(alloc::format!("{e:?}")))?;
Ok(Self {
_ctx: ctx,
subscriber,
})
}
pub fn receive(&self) -> Result<Option<T>, Iceoryx2Error> {
let Some(sample) = self
.subscriber
.receive()
.map_err(|e| Iceoryx2Error::Receive(alloc::format!("{e:?}")))?
else {
return Ok(None);
};
let bytes: &[u8] = sample.payload();
if bytes.len() < T::WIRE_SIZE {
return Ok(None);
}
let value = unsafe { T::from_bytes_unchecked(bytes) };
Ok(Some(value))
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[derive(Copy, Clone, Debug, PartialEq, Default)]
#[repr(C)]
struct Pose {
x: f64,
y: f64,
z: f64,
}
unsafe impl FlatStruct for Pose {
const TYPE_HASH: [u8; 16] = [0xCC; 16];
}
#[derive(Copy, Clone, Debug, PartialEq, Default)]
#[repr(C)]
struct OtherPose {
x: f64,
y: f64,
z: f64,
}
unsafe impl FlatStruct for OtherPose {
const TYPE_HASH: [u8; 16] = [0xDD; 16]; }
fn unique_service_name(suffix: &str) -> alloc::string::String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
alloc::format!("zerodds_flatdata_test_{pid}_{n}_{suffix}")
}
#[test]
fn publisher_subscriber_roundtrip_same_process() {
let svc = unique_service_name("rt");
let publisher = Iceoryx2Publisher::<Pose>::create(&svc).expect("publisher");
let subscriber = Iceoryx2Subscriber::<Pose>::create(&svc).expect("subscriber");
let p = Pose {
x: 1.0,
y: 2.0,
z: 3.0,
};
publisher.send(&p).expect("send");
let got = subscriber.receive().expect("receive");
assert_eq!(got, Some(p));
}
#[test]
fn subscriber_returns_none_without_publisher_data() {
let svc = unique_service_name("none");
let subscriber = Iceoryx2Subscriber::<Pose>::create(&svc).expect("subscriber");
let got = subscriber.receive().expect("receive");
assert!(got.is_none());
}
#[test]
fn type_hash_mismatch_isolates_services() {
let svc = unique_service_name("drift");
let publisher = Iceoryx2Publisher::<Pose>::create(&svc).expect("publisher");
let other_subscriber =
Iceoryx2Subscriber::<OtherPose>::create(&svc).expect("other subscriber");
publisher
.send(&Pose {
x: 9.0,
y: 9.0,
z: 9.0,
})
.expect("send");
let got = other_subscriber.receive().expect("receive");
assert!(got.is_none(), "other-typed subscriber must not see samples");
}
#[test]
fn empty_service_name_rejected() {
let res = Iceoryx2Publisher::<Pose>::create("");
assert!(matches!(res, Err(Iceoryx2Error::EmptyServiceName)));
}
#[test]
fn raw_byte_publisher_subscriber_roundtrip() {
let svc = unique_service_name("raw");
let publisher = RawIceoryx2Publisher::create(&svc, 64).expect("raw publisher");
let subscriber = RawIceoryx2Subscriber::create(&svc).expect("raw subscriber");
let payload: [u8; 8] = [0x2a, 0, 0, 0, 0xFF, 0xEE, 0xDD, 0xCC];
publisher.send(&payload).expect("raw send");
let got = subscriber.receive().expect("raw receive");
assert_eq!(got.as_deref(), Some(&payload[..]));
}
#[test]
fn raw_empty_service_name_rejected() {
assert!(matches!(
RawIceoryx2Publisher::create("", 16),
Err(Iceoryx2Error::EmptyServiceName)
));
assert!(matches!(
RawIceoryx2Subscriber::create(""),
Err(Iceoryx2Error::EmptyServiceName)
));
}
}