#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt;
use zerodds_rtps::wire_types::Guid;
#[cfg(feature = "std")]
pub const DEFAULT_BASE_DIR_NAME: &str = "zerodds-shm";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Role {
Owner,
Consumer,
}
#[derive(Clone)]
pub enum SameHostState {
Pending,
Bound {
transport: Arc<dyn core::any::Any + Send + Sync>,
role: Role,
},
Failed {
reason: &'static str,
},
}
impl fmt::Debug for SameHostState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => f.write_str("Pending"),
Self::Bound { role, .. } => f.debug_struct("Bound").field("role", role).finish(),
Self::Failed { reason } => f.debug_struct("Failed").field("reason", reason).finish(),
}
}
}
#[must_use]
pub fn shm_segment_id_for_pair(writer: Guid, reader: Guid) -> [u8; 16] {
let mut buf = [0u8; 32];
buf[..16].copy_from_slice(&writer.to_bytes());
buf[16..].copy_from_slice(&reader.to_bytes());
fnv1a_128(&buf)
}
#[must_use]
pub fn shm_segment_filename(id: [u8; 16]) -> alloc::string::String {
let mut s = alloc::string::String::with_capacity(32);
for b in id {
const HEX: &[u8; 16] = b"0123456789abcdef";
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0F) as usize] as char);
}
s
}
fn fnv1a_128(data: &[u8]) -> [u8; 16] {
let prime: u128 = 0x0000_0000_0100_0000_0000_0000_0000_013B;
let offset: u128 = 0x6c62_272e_07bb_0142_62b8_2175_6295_c58d;
let mut h = offset;
for &b in data {
h ^= u128::from(b);
h = h.wrapping_mul(prime);
}
h.to_le_bytes()
}
#[cfg(feature = "std")]
pub struct SameHostTracker {
pairs: std::sync::RwLock<alloc::collections::BTreeMap<(Guid, Guid), SameHostState>>,
}
#[cfg(feature = "std")]
impl SameHostTracker {
#[must_use]
pub fn new() -> Self {
Self {
pairs: std::sync::RwLock::new(alloc::collections::BTreeMap::new()),
}
}
pub fn register_pending(&self, writer: Guid, reader: Guid) {
if let Ok(mut g) = self.pairs.write() {
g.entry((writer, reader)).or_insert(SameHostState::Pending);
}
}
pub fn mark_bound(
&self,
writer: Guid,
reader: Guid,
transport: Arc<dyn core::any::Any + Send + Sync>,
role: Role,
) {
if let Ok(mut g) = self.pairs.write() {
g.insert((writer, reader), SameHostState::Bound { transport, role });
}
}
pub fn mark_failed(&self, writer: Guid, reader: Guid, reason: &'static str) {
if let Ok(mut g) = self.pairs.write() {
g.insert((writer, reader), SameHostState::Failed { reason });
}
}
#[must_use]
pub fn lookup(&self, writer: Guid, reader: Guid) -> Option<SameHostState> {
self.pairs.read().ok()?.get(&(writer, reader)).cloned()
}
pub fn remove(&self, writer: Guid, reader: Guid) {
if let Ok(mut g) = self.pairs.write() {
g.remove(&(writer, reader));
}
}
#[must_use]
pub fn len(&self) -> usize {
self.pairs.read().map(|g| g.len()).unwrap_or(0)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn snapshot(&self) -> Vec<(Guid, Guid, SameHostState)> {
self.pairs
.read()
.map(|g| {
g.iter()
.map(|(&(w, r), s)| (w, r, s.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
}
#[cfg(feature = "std")]
impl Default for SameHostTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "std")]
impl fmt::Debug for SameHostTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SameHostTracker")
.field("len", &self.len())
.finish()
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use super::*;
use zerodds_rtps::wire_types::{EntityId, GuidPrefix};
fn writer_guid(seed: u8) -> Guid {
Guid::new(
GuidPrefix::from_bytes([seed; 12]),
EntityId::user_writer_with_key([seed, seed, seed]),
)
}
fn reader_guid(seed: u8) -> Guid {
Guid::new(
GuidPrefix::from_bytes([seed; 12]),
EntityId::user_reader_with_key([seed, seed, seed]),
)
}
#[test]
fn segment_id_is_deterministic_for_same_input() {
let w = writer_guid(0xAA);
let r = reader_guid(0xBB);
let a = shm_segment_id_for_pair(w, r);
let b = shm_segment_id_for_pair(w, r);
assert_eq!(a, b, "same input → same ID");
}
#[test]
fn segment_id_differs_for_swapped_pair() {
let w = writer_guid(0xAA);
let r = reader_guid(0xBB);
let a = shm_segment_id_for_pair(w, r);
let b = shm_segment_id_for_pair(r, w);
assert_ne!(a, b, "ordered hash: pair (w,r) ≠ (r,w)");
}
#[test]
fn segment_id_differs_for_different_pairs() {
let a = shm_segment_id_for_pair(writer_guid(1), reader_guid(2));
let b = shm_segment_id_for_pair(writer_guid(1), reader_guid(3));
assert_ne!(a, b);
}
#[test]
fn segment_filename_is_32_lowercase_hex_chars() {
let id = shm_segment_id_for_pair(writer_guid(7), reader_guid(8));
let name = shm_segment_filename(id);
assert_eq!(name.len(), 32);
assert!(
name.chars()
.all(|c| c.is_ascii_hexdigit() && !c.is_uppercase())
);
}
#[test]
fn tracker_register_then_lookup_pending() {
let t = SameHostTracker::new();
let w = writer_guid(1);
let r = reader_guid(2);
t.register_pending(w, r);
assert!(matches!(t.lookup(w, r), Some(SameHostState::Pending)));
assert_eq!(t.len(), 1);
}
#[test]
fn tracker_mark_bound_overwrites_pending() {
let t = SameHostTracker::new();
let w = writer_guid(1);
let r = reader_guid(2);
t.register_pending(w, r);
let dummy: Arc<dyn core::any::Any + Send + Sync> = Arc::new(42u32);
t.mark_bound(w, r, dummy, Role::Owner);
let st = t.lookup(w, r).expect("entry");
match st {
SameHostState::Bound { role, .. } => assert_eq!(role, Role::Owner),
other => panic!("expected Bound, got {other:?}"),
}
}
#[test]
fn tracker_register_pending_is_idempotent() {
let t = SameHostTracker::new();
let w = writer_guid(1);
let r = reader_guid(2);
let dummy: Arc<dyn core::any::Any + Send + Sync> = Arc::new(42u32);
t.mark_bound(w, r, dummy, Role::Consumer);
t.register_pending(w, r);
assert!(matches!(t.lookup(w, r), Some(SameHostState::Bound { .. })));
}
#[test]
fn tracker_mark_failed_signals_udp_fallback() {
let t = SameHostTracker::new();
let w = writer_guid(1);
let r = reader_guid(2);
t.mark_failed(w, r, "shm_open ENOENT");
match t.lookup(w, r) {
Some(SameHostState::Failed { reason }) => assert!(reason.contains("ENOENT")),
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn tracker_remove_drops_entry() {
let t = SameHostTracker::new();
let w = writer_guid(1);
let r = reader_guid(2);
t.register_pending(w, r);
t.remove(w, r);
assert!(t.lookup(w, r).is_none());
assert!(t.is_empty());
}
}