use core::marker::PhantomData;
use iceoryx2::node::{Node, NodeBuilder};
use iceoryx2::port::publisher::Publisher;
use iceoryx2::port::subscriber::Subscriber;
use iceoryx2::service::ipc::Service;
use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
use crate::FlatStruct;
#[derive(Debug)]
#[non_exhaustive]
pub enum Iceoryx2Error {
Service(alloc::string::String),
Publisher(alloc::string::String),
Subscriber(alloc::string::String),
Send(alloc::string::String),
Receive(alloc::string::String),
TypeHashMismatch,
EmptyServiceName,
}
impl core::fmt::Display for Iceoryx2Error {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Service(e) => write!(f, "iceoryx2 service: {e}"),
Self::Publisher(e) => write!(f, "iceoryx2 publisher: {e}"),
Self::Subscriber(e) => write!(f, "iceoryx2 subscriber: {e}"),
Self::Send(e) => write!(f, "iceoryx2 send: {e}"),
Self::Receive(e) => write!(f, "iceoryx2 receive: {e}"),
Self::TypeHashMismatch => f.write_str("iceoryx2 service type-hash mismatch"),
Self::EmptyServiceName => f.write_str("iceoryx2 service name must be non-empty"),
}
}
}
impl std::error::Error for Iceoryx2Error {}
fn build_service_name<T: FlatStruct>(base: &str) -> alloc::string::String {
use core::fmt::Write;
let mut s = alloc::string::String::with_capacity(base.len() + 1 + 32);
s.push_str(base);
s.push('#');
for b in T::TYPE_HASH.iter() {
let _ = write!(&mut s, "{b:02x}");
}
s
}
struct ServiceCtx<T: FlatStruct> {
_node: Node<Service>,
factory: PortFactory<Service, [u8], ()>,
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> ServiceCtx<T> {
fn open_or_create(base: &str) -> Result<Self, Iceoryx2Error> {
if base.is_empty() {
return Err(Iceoryx2Error::EmptyServiceName);
}
let node = NodeBuilder::new()
.create::<Service>()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
let svc_name = build_service_name::<T>(base);
let svc_id = svc_name
.as_str()
.try_into()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("invalid service name: {e:?}")))?;
let factory = node
.service_builder(&svc_id)
.publish_subscribe::<[u8]>()
.open_or_create()
.map_err(|e| Iceoryx2Error::Service(alloc::format!("{e:?}")))?;
Ok(Self {
_node: node,
factory,
_t: PhantomData,
})
}
}
pub struct Iceoryx2Publisher<T: FlatStruct> {
ctx: ServiceCtx<T>,
publisher: Publisher<Service, [u8], ()>,
}
impl<T: FlatStruct> Iceoryx2Publisher<T> {
pub fn create(service_name: &str) -> Result<Self, Iceoryx2Error> {
let ctx = ServiceCtx::<T>::open_or_create(service_name)?;
let publisher = ctx
.factory
.publisher_builder()
.initial_max_slice_len(T::WIRE_SIZE)
.create()
.map_err(|e| Iceoryx2Error::Publisher(alloc::format!("{e:?}")))?;
Ok(Self { ctx, publisher })
}
pub fn send(&self, sample: &T) -> Result<(), Iceoryx2Error> {
let bytes = sample.as_bytes();
let loan = self
.publisher
.loan_slice_uninit(bytes.len())
.map_err(|e| Iceoryx2Error::Send(alloc::format!("loan: {e:?}")))?;
let initialized = loan.write_from_slice(bytes);
initialized
.send()
.map_err(|e| Iceoryx2Error::Send(alloc::format!("send: {e:?}")))?;
Ok(())
}
#[must_use]
pub fn service_name(&self) -> alloc::string::String {
let _ = &self.ctx;
alloc::format!("zerodds-flatdata#TYPE_HASH={:02x?}", T::TYPE_HASH)
}
}
pub struct Iceoryx2Subscriber<T: FlatStruct> {
_ctx: ServiceCtx<T>,
subscriber: Subscriber<Service, [u8], ()>,
}
impl<T: FlatStruct> Iceoryx2Subscriber<T> {
pub fn create(service_name: &str) -> Result<Self, Iceoryx2Error> {
let ctx = ServiceCtx::<T>::open_or_create(service_name)?;
let subscriber = ctx
.factory
.subscriber_builder()
.create()
.map_err(|e| Iceoryx2Error::Subscriber(alloc::format!("{e:?}")))?;
Ok(Self {
_ctx: ctx,
subscriber,
})
}
pub fn receive(&self) -> Result<Option<T>, Iceoryx2Error> {
let Some(sample) = self
.subscriber
.receive()
.map_err(|e| Iceoryx2Error::Receive(alloc::format!("{e:?}")))?
else {
return Ok(None);
};
let bytes: &[u8] = sample.payload();
if bytes.len() < T::WIRE_SIZE {
return Ok(None);
}
let value = unsafe { T::from_bytes_unchecked(bytes) };
Ok(Some(value))
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[derive(Copy, Clone, Debug, PartialEq, Default)]
#[repr(C)]
struct Pose {
x: f64,
y: f64,
z: f64,
}
unsafe impl FlatStruct for Pose {
const TYPE_HASH: [u8; 16] = [0xCC; 16];
}
#[derive(Copy, Clone, Debug, PartialEq, Default)]
#[repr(C)]
struct OtherPose {
x: f64,
y: f64,
z: f64,
}
unsafe impl FlatStruct for OtherPose {
const TYPE_HASH: [u8; 16] = [0xDD; 16]; }
fn unique_service_name(suffix: &str) -> alloc::string::String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
alloc::format!("zerodds_flatdata_test_{pid}_{n}_{suffix}")
}
#[test]
fn publisher_subscriber_roundtrip_same_process() {
let svc = unique_service_name("rt");
let publisher = Iceoryx2Publisher::<Pose>::create(&svc).expect("publisher");
let subscriber = Iceoryx2Subscriber::<Pose>::create(&svc).expect("subscriber");
let p = Pose {
x: 1.0,
y: 2.0,
z: 3.0,
};
publisher.send(&p).expect("send");
let got = subscriber.receive().expect("receive");
assert_eq!(got, Some(p));
}
#[test]
fn subscriber_returns_none_without_publisher_data() {
let svc = unique_service_name("none");
let subscriber = Iceoryx2Subscriber::<Pose>::create(&svc).expect("subscriber");
let got = subscriber.receive().expect("receive");
assert!(got.is_none());
}
#[test]
fn type_hash_mismatch_isolates_services() {
let svc = unique_service_name("drift");
let publisher = Iceoryx2Publisher::<Pose>::create(&svc).expect("publisher");
let other_subscriber =
Iceoryx2Subscriber::<OtherPose>::create(&svc).expect("other subscriber");
publisher
.send(&Pose {
x: 9.0,
y: 9.0,
z: 9.0,
})
.expect("send");
let got = other_subscriber.receive().expect("receive");
assert!(got.is_none(), "other-typed subscriber must not see samples");
}
#[test]
fn empty_service_name_rejected() {
let res = Iceoryx2Publisher::<Pose>::create("");
assert!(matches!(res, Err(Iceoryx2Error::EmptyServiceName)));
}
}