use std::collections::HashMap;
use std::ffi::{CStr, c_char, c_int};
use std::sync::{Arc, Mutex, OnceLock};
use zerodds_dcps::runtime::DcpsRuntime;
use zerodds_flatdata::{PosixSlotAllocator, ShmLocator, SlotBackend, SlotHandle};
use zerodds_rtps::wire_types::EntityId;
use crate::ZeroDdsStatus;
use crate::entities::{ZeroDdsDataReader, ZeroDdsDataWriter};
type Key = (usize, EntityId);
fn key(rt: &Arc<DcpsRuntime>, eid: EntityId) -> Key {
(Arc::as_ptr(rt) as usize, eid)
}
struct WriterShm {
backend: Arc<PosixSlotAllocator>,
capacity: usize,
loans: HashMap<usize, SlotHandle>,
}
struct ReaderShm {
backend: Arc<PosixSlotAllocator>,
reader_index: u8,
}
fn writers() -> &'static Mutex<HashMap<Key, WriterShm>> {
static W: OnceLock<Mutex<HashMap<Key, WriterShm>>> = OnceLock::new();
W.get_or_init(|| Mutex::new(HashMap::new()))
}
fn readers() -> &'static Mutex<HashMap<Key, ReaderShm>> {
static R: OnceLock<Mutex<HashMap<Key, ReaderShm>>> = OnceLock::new();
R.get_or_init(|| Mutex::new(HashMap::new()))
}
pub(crate) const MODE_PORTABLE: u8 = 0;
pub(crate) const MODE_RAW_SAME_HOST: u8 = 1;
pub(crate) const MODE_ICEORYX: u8 = 2;
fn modes() -> &'static Mutex<HashMap<Key, u8>> {
static M: OnceLock<Mutex<HashMap<Key, u8>>> = OnceLock::new();
M.get_or_init(|| Mutex::new(HashMap::new()))
}
fn env_default_mode() -> u8 {
static D: OnceLock<u8> = OnceLock::new();
*D.get_or_init(
|| match std::env::var("ZERODDS_DELIVERY_MODE").ok().as_deref() {
Some("raw-same-host") => MODE_RAW_SAME_HOST,
_ => MODE_PORTABLE,
},
)
}
pub(crate) fn publishes_to_wire(mode: u8) -> bool {
mode == MODE_PORTABLE
}
fn configured_mode(rt: &Arc<DcpsRuntime>, eid: EntityId) -> u8 {
modes()
.lock()
.ok()
.and_then(|m| m.get(&key(rt, eid)).copied())
.unwrap_or_else(env_default_mode)
}
pub(crate) fn set_delivery_mode(rt: &Arc<DcpsRuntime>, eid: EntityId, mode: u8) -> c_int {
match mode {
MODE_PORTABLE | MODE_RAW_SAME_HOST => {}
MODE_ICEORYX => {
#[cfg(not(feature = "delivery-iceoryx"))]
return ZeroDdsStatus::Unsupported as c_int;
}
_ => return ZeroDdsStatus::BadParameter as c_int,
}
let Ok(mut m) = modes().lock() else {
return ZeroDdsStatus::Error as c_int;
};
m.insert(key(rt, eid), mode);
ZeroDdsStatus::Ok as c_int
}
#[cfg(feature = "delivery-iceoryx")]
use zerodds_flatdata::{RawIceoryx2Publisher, RawIceoryx2Subscriber};
#[cfg(feature = "delivery-iceoryx")]
struct IceoryxWriter {
publisher: RawIceoryx2Publisher,
loans: HashMap<usize, Vec<u8>>,
}
#[cfg(feature = "delivery-iceoryx")]
struct IceoryxReader {
subscriber: Arc<RawIceoryx2Subscriber>,
pending: HashMap<u32, Vec<u8>>,
next_slot: u32,
}
#[cfg(feature = "delivery-iceoryx")]
fn ice_writers() -> &'static Mutex<HashMap<Key, IceoryxWriter>> {
static W: OnceLock<Mutex<HashMap<Key, IceoryxWriter>>> = OnceLock::new();
W.get_or_init(|| Mutex::new(HashMap::new()))
}
#[cfg(feature = "delivery-iceoryx")]
fn ice_readers() -> &'static Mutex<HashMap<Key, IceoryxReader>> {
static R: OnceLock<Mutex<HashMap<Key, IceoryxReader>>> = OnceLock::new();
R.get_or_init(|| Mutex::new(HashMap::new()))
}
#[cfg(feature = "delivery-iceoryx")]
pub(crate) fn enable_iceoryx_writer(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
service_name: String,
max_len: usize,
) -> c_int {
if service_name.is_empty() || max_len == 0 {
return ZeroDdsStatus::BadParameter as c_int;
}
let publisher = match RawIceoryx2Publisher::create(&service_name, max_len) {
Ok(p) => p,
Err(_) => return ZeroDdsStatus::OutOfResources as c_int,
};
let Ok(mut w) = ice_writers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
w.insert(
key(rt, eid),
IceoryxWriter {
publisher,
loans: HashMap::new(),
},
);
ZeroDdsStatus::Ok as c_int
}
#[cfg(feature = "delivery-iceoryx")]
pub(crate) fn enable_iceoryx_reader(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
service_name: String,
) -> c_int {
if service_name.is_empty() {
return ZeroDdsStatus::BadParameter as c_int;
}
let subscriber = match RawIceoryx2Subscriber::create(&service_name) {
Ok(s) => s,
Err(_) => return ZeroDdsStatus::PreconditionNotMet as c_int,
};
let Ok(mut r) = ice_readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
r.insert(
key(rt, eid),
IceoryxReader {
subscriber: Arc::new(subscriber),
pending: HashMap::new(),
next_slot: 0,
},
);
ZeroDdsStatus::Ok as c_int
}
pub(crate) fn enable_writer(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
path: String,
slot_count: usize,
slot_capacity: usize,
) -> c_int {
if slot_count == 0 || slot_capacity == 0 {
return ZeroDdsStatus::BadParameter as c_int;
}
let backend = match PosixSlotAllocator::create(path.clone(), slot_count, slot_capacity) {
Ok(b) => Arc::new(b),
Err(_) => return ZeroDdsStatus::OutOfResources as c_int,
};
let locator = ShmLocator {
hostname_hash: zerodds_flatdata::fnv1a_32(host_id().as_bytes()),
uid: process_uid(),
slot_count: u32::try_from(slot_count).unwrap_or(u32::MAX),
slot_size: u32::try_from(backend.slot_total_size()).unwrap_or(u32::MAX),
segment_path: path,
};
if let Ok(bytes) = locator.to_bytes_le() {
rt.set_shm_locator(eid, bytes);
}
let Ok(mut w) = writers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
w.insert(
key(rt, eid),
WriterShm {
backend,
capacity: slot_capacity,
loans: HashMap::new(),
},
);
ZeroDdsStatus::Ok as c_int
}
pub(crate) unsafe fn try_loan(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
len: usize,
out_ptr: *mut *mut u8,
out_len: *mut usize,
) -> Option<c_int> {
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut iw) = ice_writers().lock() {
if let Some(entry) = iw.get_mut(&key(rt, eid)) {
let mut buf = vec![0u8; len];
let ptr = buf.as_mut_ptr();
entry.loans.insert(ptr as usize, buf);
unsafe {
*out_ptr = ptr;
*out_len = len;
}
return Some(ZeroDdsStatus::Ok as c_int);
}
}
let mut w = writers().lock().ok()?;
let entry = w.get_mut(&key(rt, eid))?;
if len > entry.capacity {
return Some(ZeroDdsStatus::OutOfResources as c_int);
}
let handle = match entry.backend.reserve_slot(0) {
Ok(h) => h,
Err(_) => return Some(ZeroDdsStatus::OutOfResources as c_int),
};
let (ptr, _cap) = match entry.backend.slot_data_ptr(handle) {
Ok(p) => p,
Err(_) => {
let _ = entry.backend.discard_slot(handle);
return Some(ZeroDdsStatus::Error as c_int);
}
};
entry.loans.insert(ptr as usize, handle);
unsafe {
*out_ptr = ptr;
*out_len = len;
}
Some(ZeroDdsStatus::Ok as c_int)
}
pub(crate) unsafe fn try_commit(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
ptr: *mut u8,
len: usize,
) -> Option<c_int> {
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut iw) = ice_writers().lock() {
if let Some(entry) = iw.get_mut(&key(rt, eid)) {
let Some(buf) = entry.loans.remove(&(ptr as usize)) else {
return Some(ZeroDdsStatus::BadParameter as c_int);
};
let n = len.min(buf.len());
return Some(match entry.publisher.send(&buf[..n]) {
Ok(()) => ZeroDdsStatus::Ok as c_int,
Err(_) => ZeroDdsStatus::Error as c_int,
});
}
}
let (backend, handle) = {
let mut w = writers().lock().ok()?;
let entry = w.get_mut(&key(rt, eid))?;
let handle = entry.loans.remove(&(ptr as usize))?;
(Arc::clone(&entry.backend), handle)
};
if backend.commit_in_place(handle, len).is_err() {
return Some(ZeroDdsStatus::Error as c_int);
}
if publishes_to_wire(configured_mode(rt, eid)) {
if let Ok((rptr, n)) = backend.slot_read_ptr(handle) {
let bytes = unsafe { core::slice::from_raw_parts(rptr, n) };
return Some(match rt.write_user_sample_borrowed(eid, bytes) {
Ok(()) => ZeroDdsStatus::Ok as c_int,
Err(_) => ZeroDdsStatus::Error as c_int,
});
}
}
Some(ZeroDdsStatus::Ok as c_int)
}
pub(crate) fn try_discard(rt: &Arc<DcpsRuntime>, eid: EntityId, ptr: *mut u8) -> Option<c_int> {
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut iw) = ice_writers().lock() {
if let Some(entry) = iw.get_mut(&key(rt, eid)) {
return Some(if entry.loans.remove(&(ptr as usize)).is_some() {
ZeroDdsStatus::Ok as c_int
} else {
ZeroDdsStatus::BadParameter as c_int
});
}
}
let mut w = writers().lock().ok()?;
let entry = w.get_mut(&key(rt, eid))?;
let handle = entry.loans.remove(&(ptr as usize))?;
Some(match entry.backend.discard_slot(handle) {
Ok(()) => ZeroDdsStatus::Ok as c_int,
Err(_) => ZeroDdsStatus::Error as c_int,
})
}
pub(crate) fn forget_writer(rt: &Arc<DcpsRuntime>, eid: EntityId) {
if let Ok(mut w) = writers().lock() {
w.remove(&key(rt, eid));
}
if let Ok(mut m) = modes().lock() {
m.remove(&key(rt, eid));
}
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut iw) = ice_writers().lock() {
iw.remove(&key(rt, eid));
}
}
pub(crate) fn enable_reader(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
path: String,
reader_index: u8,
) -> c_int {
if reader_index >= 32 {
return ZeroDdsStatus::BadParameter as c_int;
}
let backend = match PosixSlotAllocator::attach(path) {
Ok(b) => Arc::new(b),
Err(_) => return ZeroDdsStatus::PreconditionNotMet as c_int,
};
let Ok(mut r) = readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
r.insert(
key(rt, eid),
ReaderShm {
backend,
reader_index,
},
);
ZeroDdsStatus::Ok as c_int
}
pub(crate) unsafe fn try_take(
rt: &Arc<DcpsRuntime>,
eid: EntityId,
out_ptr: *mut *const u8,
out_len: *mut usize,
out_slot: *mut u32,
) -> c_int {
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut ir) = ice_readers().lock() {
if let Some(entry) = ir.get_mut(&key(rt, eid)) {
return match entry.subscriber.receive() {
Ok(Some(buf)) => {
let slot = entry.next_slot;
entry.next_slot = entry.next_slot.wrapping_add(1);
let n = buf.len();
let p = buf.as_ptr();
entry.pending.insert(slot, buf);
unsafe {
*out_ptr = p;
*out_len = n;
*out_slot = slot;
}
ZeroDdsStatus::Ok as c_int
}
Ok(None) => ZeroDdsStatus::NoData as c_int,
Err(_) => ZeroDdsStatus::Error as c_int,
};
}
}
let Ok(r) = readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
let Some(entry) = r.get(&key(rt, eid)) else {
return ZeroDdsStatus::PreconditionNotMet as c_int;
};
let handle = match entry.backend.next_unread_slot(entry.reader_index) {
Ok(Some(h)) => h,
Ok(None) => return ZeroDdsStatus::NoData as c_int,
Err(_) => return ZeroDdsStatus::Error as c_int,
};
match entry.backend.slot_read_ptr(handle) {
Ok((ptr, n)) => {
unsafe {
*out_ptr = ptr;
*out_len = n;
*out_slot = handle.slot_index;
}
ZeroDdsStatus::Ok as c_int
}
Err(_) => ZeroDdsStatus::Error as c_int,
}
}
pub(crate) fn try_release(rt: &Arc<DcpsRuntime>, eid: EntityId, slot_index: u32) -> c_int {
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut ir) = ice_readers().lock() {
if let Some(entry) = ir.get_mut(&key(rt, eid)) {
return if entry.pending.remove(&slot_index).is_some() {
ZeroDdsStatus::Ok as c_int
} else {
ZeroDdsStatus::PreconditionNotMet as c_int
};
}
}
let Ok(r) = readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
let Some(entry) = r.get(&key(rt, eid)) else {
return ZeroDdsStatus::PreconditionNotMet as c_int;
};
let handle = SlotHandle {
segment_id: 0,
slot_index,
};
match entry.backend.mark_read(handle, entry.reader_index) {
Ok(()) => ZeroDdsStatus::Ok as c_int,
Err(_) => ZeroDdsStatus::Error as c_int,
}
}
pub(crate) fn forget_reader(rt: &Arc<DcpsRuntime>, eid: EntityId) {
if let Ok(mut r) = readers().lock() {
r.remove(&key(rt, eid));
}
#[cfg(feature = "delivery-iceoryx")]
if let Ok(mut ir) = ice_readers().lock() {
ir.remove(&key(rt, eid));
}
}
pub(crate) fn try_raw_wait(rt: &Arc<DcpsRuntime>, eid: EntityId, timeout_ms: u64) -> c_int {
let dur = core::time::Duration::from_millis(timeout_ms);
#[cfg(feature = "delivery-iceoryx")]
{
let sub = {
let Ok(r) = ice_readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
r.get(&key(rt, eid)).map(|e| Arc::clone(&e.subscriber))
};
if let Some(sub) = sub {
return if sub.wait(dur) {
ZeroDdsStatus::Ok as c_int
} else {
ZeroDdsStatus::NoData as c_int
};
}
}
let backend_gen = {
let Ok(r) = readers().lock() else {
return ZeroDdsStatus::Error as c_int;
};
r.get(&key(rt, eid))
.map(|e| (Arc::clone(&e.backend), e.backend.notify_generation()))
};
let Some((backend, generation)) = backend_gen else {
return ZeroDdsStatus::PreconditionNotMet as c_int;
};
backend.wait_for_change(generation, dur);
if backend.notify_generation() != generation {
ZeroDdsStatus::Ok as c_int
} else {
ZeroDdsStatus::NoData as c_int
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dw_enable_shm_loan(
dw: *mut ZeroDdsDataWriter,
name: *const c_char,
slot_count: usize,
slot_capacity: usize,
) -> c_int {
if dw.is_null() || name.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let path = match unsafe { c_str(name) } {
Ok(s) => s,
Err(rc) => return rc,
};
let dwr = unsafe { &*dw };
enable_writer(&dwr.rt, dwr.eid, path, slot_count, slot_capacity)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dw_set_delivery_mode(
dw: *mut ZeroDdsDataWriter,
mode: c_int,
) -> c_int {
if dw.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let Ok(mode) = u8::try_from(mode) else {
return ZeroDdsStatus::BadParameter as c_int;
};
let dwr = unsafe { &*dw };
set_delivery_mode(&dwr.rt, dwr.eid, mode)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_enable_shm(
dr: *mut ZeroDdsDataReader,
name: *const c_char,
reader_index: u8,
) -> c_int {
if dr.is_null() || name.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let path = match unsafe { c_str(name) } {
Ok(s) => s,
Err(rc) => return rc,
};
let drr = unsafe { &*dr };
enable_reader(&drr.rt, drr.eid, path, reader_index)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_take_shm(
dr: *mut ZeroDdsDataReader,
out_ptr: *mut *const u8,
out_len: *mut usize,
out_slot: *mut u32,
) -> c_int {
if dr.is_null() || out_ptr.is_null() || out_len.is_null() || out_slot.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
unsafe { try_take(&drr.rt, drr.eid, out_ptr, out_len, out_slot) }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_release_shm(
dr: *mut ZeroDdsDataReader,
slot_index: u32,
) -> c_int {
if dr.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
try_release(&drr.rt, drr.eid, slot_index)
}
#[cfg(feature = "delivery-iceoryx")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dw_enable_iceoryx(
dw: *mut ZeroDdsDataWriter,
service_name: *const c_char,
max_len: usize,
) -> c_int {
if dw.is_null() || service_name.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let service = match unsafe { c_str(service_name) } {
Ok(s) => s,
Err(rc) => return rc,
};
let dwr = unsafe { &*dw };
enable_iceoryx_writer(&dwr.rt, dwr.eid, service, max_len)
}
#[cfg(feature = "delivery-iceoryx")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_enable_iceoryx(
dr: *mut ZeroDdsDataReader,
service_name: *const c_char,
) -> c_int {
if dr.is_null() || service_name.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let service = match unsafe { c_str(service_name) } {
Ok(s) => s,
Err(rc) => return rc,
};
let drr = unsafe { &*dr };
enable_iceoryx_reader(&drr.rt, drr.eid, service)
}
unsafe fn c_str(name: *const c_char) -> Result<String, c_int> {
match unsafe { CStr::from_ptr(name) }.to_str() {
Ok(s) => Ok(s.to_string()),
Err(_) => Err(ZeroDdsStatus::InvalidUtf8 as c_int),
}
}
fn host_id() -> String {
std::env::var("HOSTNAME").unwrap_or_else(|_| "localhost".to_string())
}
fn process_uid() -> u32 {
0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn only_portable_reaches_the_wire() {
assert!(publishes_to_wire(MODE_PORTABLE));
assert!(!publishes_to_wire(MODE_RAW_SAME_HOST));
assert!(!publishes_to_wire(MODE_ICEORYX));
}
}