Skip to main content

zerodds_dcps/
same_host.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Same-host tracker for Wave 4 of the zero-copy roadmap
4//! (Spec `docs/specs/zerodds-zero-copy-1.0.md` §6 Wave 3).
5//!
6//! # Purpose
7//!
8//! Discovery detects via [`GuidPrefix::is_same_host`] (Wave 4a) whether
9//! a peer runs on the same machine. On a match, the DCPS runtime
10//! registers a `(WriterGuid, ReaderGuid)` pair in this tracker. The
11//! tracker keeps a deterministic SHM-segment identification so that both
12//! sides can derive the same transport path without an extra discovery
13//! round-trip.
14//!
15//! # Path convention
16//!
17//! An SHM segment for a writer-reader pair is identified by a
18//! deterministic 16-byte hash of the two GUIDs (see
19//! [`shm_segment_id_for_pair`]). Both sides arrive at the same id because
20//! the hash is symmetric / ordered.
21//!
22//! The `base_dir` default is `${TMPDIR}/zerodds-shm` (Linux) or
23//! `${TEMP}\zerodds-shm` (Windows). The `host_id_hex` sub-folder prevents
24//! cross-host collisions on NFS mounts; should `gethostname` fail
25//! (fallback in `participant::host_id_bytes`), the path remains unique
26//! per process.
27//!
28//! # Tracker state
29//!
30//! Each pair has a [`SameHostState`]:
31//!
32//! - `Pending` — match detected, SHM setup not yet performed (e.g.
33//!   because the Wave-4b.2 hook only maps the segment on the first send).
34//! - `Bound { transport, role }` — SHM transport is initialized; `role`
35//!   says which side is Owner (writer producer) or Consumer (reader
36//!   receiver) (see [`Role`]).
37//! - `Failed { reason }` — setup failed; the sender should fall back to
38//!   the classic UDP path.
39//!
40//! The concrete transport instantiation does **not** live here (that is
41//! the job of the Wave-4b.2 hook in the `runtime` module), so this module
42//! stays `no_std + alloc` portable and does not depend directly on
43//! `transport-shm`.
44//!
45//! # Spec anchors
46//!
47//! - `docs/specs/zerodds-zero-copy-1.0.md` §6 Wave 3 (Iceoryx backend
48//!   hot-path wiring; ZeroDDS reuses it for the SHM-bytes path).
49//! - RTPS 2.5 §9.4 — `LOCATOR_KIND_SHM`.
50
51#![cfg_attr(not(feature = "std"), no_std)]
52
53extern crate alloc;
54
55use alloc::sync::Arc;
56use alloc::vec::Vec;
57use core::fmt;
58
59use zerodds_rtps::wire_types::Guid;
60
61/// Default base directory for SHM segments. Used only in `std` builds;
62/// in the `no_std` profile the tracker is state-only and no path lookup
63/// is needed.
64#[cfg(feature = "std")]
65pub const DEFAULT_BASE_DIR_NAME: &str = "zerodds-shm";
66
67/// Role of the local endpoint in the SHM pair.
68///
69/// The convention follows `zerodds-transport-shm` (Owner = producer):
70/// in the DCPS model the **writer** writes samples into the ring buffer
71/// → the writer is the `Owner`. The **reader** reads them out → the
72/// reader is the `Consumer`.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum Role {
75    /// Writer side — creates the SHM segment via `open_owner` and writes
76    /// sample datagrams into it.
77    Owner,
78    /// Reader side — attaches via `open_consumer` and reads sample
79    /// datagrams out of the segment.
80    Consumer,
81}
82
83/// State of a same-host pair in the tracker.
84#[derive(Clone)]
85pub enum SameHostState {
86    /// Match detected, SHM transport not yet set up.
87    Pending,
88    /// SHM transport active. `transport` is an opaque `Arc<dyn Any>` so
89    /// that this module does not depend directly on `transport-shm`. The
90    /// `runtime` hook downcasts to the concrete transport instance.
91    Bound {
92        /// Opaque transport pointer; `Arc<dyn core::any::Any + Send + Sync>`.
93        transport: Arc<dyn core::any::Any + Send + Sync>,
94        /// Role of this endpoint.
95        role: Role,
96    },
97    /// SHM setup failed; UDP fallback active.
98    Failed {
99        /// Diagnostic text for logs.
100        reason: &'static str,
101    },
102}
103
104impl fmt::Debug for SameHostState {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        match self {
107            Self::Pending => f.write_str("Pending"),
108            Self::Bound { role, .. } => f.debug_struct("Bound").field("role", role).finish(),
109            Self::Failed { reason } => f.debug_struct("Failed").field("reason", reason).finish(),
110        }
111    }
112}
113
114/// Deterministic 16-byte segment identification for a writer-reader
115/// pair.
116///
117/// Both sides compute the same value: the hash mixes the complete GUIDs
118/// in a fixed order (writer first, then reader). FNV1a-128 is
119/// deterministic and has no external deps.
120///
121/// The value is typically converted into a hex string for the file name
122/// (see [`shm_segment_filename`]).
123#[must_use]
124pub fn shm_segment_id_for_pair(writer: Guid, reader: Guid) -> [u8; 16] {
125    let mut buf = [0u8; 32];
126    buf[..16].copy_from_slice(&writer.to_bytes());
127    buf[16..].copy_from_slice(&reader.to_bytes());
128    fnv1a_128(&buf)
129}
130
131/// Hex representation of a [`shm_segment_id_for_pair`] result. Returns
132/// 32 lowercase hex characters.
133#[must_use]
134pub fn shm_segment_filename(id: [u8; 16]) -> alloc::string::String {
135    let mut s = alloc::string::String::with_capacity(32);
136    for b in id {
137        // SAFETY: no unsafe; avoiding format! keeps this no_std compatible.
138        const HEX: &[u8; 16] = b"0123456789abcdef";
139        s.push(HEX[(b >> 4) as usize] as char);
140        s.push(HEX[(b & 0x0F) as usize] as char);
141    }
142    s
143}
144
145/// FNV1a-128 over `data` (deterministic, no external dep).
146fn fnv1a_128(data: &[u8]) -> [u8; 16] {
147    let prime: u128 = 0x0000_0000_0100_0000_0000_0000_0000_013B;
148    let offset: u128 = 0x6c62_272e_07bb_0142_62b8_2175_6295_c58d;
149    let mut h = offset;
150    for &b in data {
151        h ^= u128::from(b);
152        h = h.wrapping_mul(prime);
153    }
154    h.to_le_bytes()
155}
156
157// ============================================================================
158// Tracker
159// ============================================================================
160
161/// Tracker for all `(WriterGuid, ReaderGuid)` same-host pairs of the
162/// local participant. Thread-safe via an inner mutex.
163///
164/// The API is deliberately minimal: `register_pending`, `mark_bound`,
165/// `mark_failed`, `lookup`. More complex operations (iteration,
166/// cleanup-on-drop) are left to the caller.
167#[cfg(feature = "std")]
168pub struct SameHostTracker {
169    pairs: std::sync::RwLock<alloc::collections::BTreeMap<(Guid, Guid), SameHostState>>,
170}
171
172#[cfg(feature = "std")]
173impl SameHostTracker {
174    /// New empty tracker.
175    #[must_use]
176    pub fn new() -> Self {
177        Self {
178            pairs: std::sync::RwLock::new(alloc::collections::BTreeMap::new()),
179        }
180    }
181
182    /// Registers a pair in the `Pending` state. Idempotent — an existing
183    /// entry is **not** overwritten (so that a `Bound` state is not
184    /// accidentally reset).
185    pub fn register_pending(&self, writer: Guid, reader: Guid) {
186        if let Ok(mut g) = self.pairs.write() {
187            g.entry((writer, reader)).or_insert(SameHostState::Pending);
188        }
189    }
190
191    /// Marks a pair as `Bound` with a transport handle and role.
192    /// Overwrites any previous state.
193    pub fn mark_bound(
194        &self,
195        writer: Guid,
196        reader: Guid,
197        transport: Arc<dyn core::any::Any + Send + Sync>,
198        role: Role,
199    ) {
200        if let Ok(mut g) = self.pairs.write() {
201            g.insert((writer, reader), SameHostState::Bound { transport, role });
202        }
203    }
204
205    /// Marks a pair as `Failed`. The caller (hot path) should then choose
206    /// the UDP fallback.
207    pub fn mark_failed(&self, writer: Guid, reader: Guid, reason: &'static str) {
208        if let Ok(mut g) = self.pairs.write() {
209            g.insert((writer, reader), SameHostState::Failed { reason });
210        }
211    }
212
213    /// Returns a copy of the state for a pair.
214    #[must_use]
215    pub fn lookup(&self, writer: Guid, reader: Guid) -> Option<SameHostState> {
216        self.pairs.read().ok()?.get(&(writer, reader)).cloned()
217    }
218
219    /// Removes an entry (e.g. when the match is dissolved).
220    pub fn remove(&self, writer: Guid, reader: Guid) {
221        if let Ok(mut g) = self.pairs.write() {
222            g.remove(&(writer, reader));
223        }
224    }
225
226    /// Number of currently registered pairs.
227    #[must_use]
228    pub fn len(&self) -> usize {
229        self.pairs.read().map(|g| g.len()).unwrap_or(0)
230    }
231
232    /// `true` if the tracker is empty.
233    #[must_use]
234    pub fn is_empty(&self) -> bool {
235        self.len() == 0
236    }
237
238    /// Snapshot of all pairs. Mainly for debug/diagnostics.
239    #[must_use]
240    pub fn snapshot(&self) -> Vec<(Guid, Guid, SameHostState)> {
241        self.pairs
242            .read()
243            .map(|g| {
244                g.iter()
245                    .map(|(&(w, r), s)| (w, r, s.clone()))
246                    .collect::<Vec<_>>()
247            })
248            .unwrap_or_default()
249    }
250}
251
252#[cfg(feature = "std")]
253impl Default for SameHostTracker {
254    fn default() -> Self {
255        Self::new()
256    }
257}
258
259#[cfg(feature = "std")]
260impl fmt::Debug for SameHostTracker {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        f.debug_struct("SameHostTracker")
263            .field("len", &self.len())
264            .finish()
265    }
266}
267
268// ============================================================================
269// Tests
270// ============================================================================
271
272#[cfg(test)]
273mod tests {
274    #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
275    use super::*;
276    use zerodds_rtps::wire_types::{EntityId, GuidPrefix};
277
278    fn writer_guid(seed: u8) -> Guid {
279        Guid::new(
280            GuidPrefix::from_bytes([seed; 12]),
281            EntityId::user_writer_with_key([seed, seed, seed]),
282        )
283    }
284
285    fn reader_guid(seed: u8) -> Guid {
286        Guid::new(
287            GuidPrefix::from_bytes([seed; 12]),
288            EntityId::user_reader_with_key([seed, seed, seed]),
289        )
290    }
291
292    #[test]
293    fn segment_id_is_deterministic_for_same_input() {
294        let w = writer_guid(0xAA);
295        let r = reader_guid(0xBB);
296        let a = shm_segment_id_for_pair(w, r);
297        let b = shm_segment_id_for_pair(w, r);
298        assert_eq!(a, b, "same input → same ID");
299    }
300
301    #[test]
302    fn segment_id_differs_for_swapped_pair() {
303        // (writer, reader) must differ from (reader, writer); otherwise
304        // the Owner/Consumer roles are ambiguous.
305        let w = writer_guid(0xAA);
306        let r = reader_guid(0xBB);
307        let a = shm_segment_id_for_pair(w, r);
308        let b = shm_segment_id_for_pair(r, w);
309        assert_ne!(a, b, "ordered hash: pair (w,r) ≠ (r,w)");
310    }
311
312    #[test]
313    fn segment_id_differs_for_different_pairs() {
314        let a = shm_segment_id_for_pair(writer_guid(1), reader_guid(2));
315        let b = shm_segment_id_for_pair(writer_guid(1), reader_guid(3));
316        assert_ne!(a, b);
317    }
318
319    #[test]
320    fn segment_filename_is_32_lowercase_hex_chars() {
321        let id = shm_segment_id_for_pair(writer_guid(7), reader_guid(8));
322        let name = shm_segment_filename(id);
323        assert_eq!(name.len(), 32);
324        assert!(
325            name.chars()
326                .all(|c| c.is_ascii_hexdigit() && !c.is_uppercase())
327        );
328    }
329
330    #[test]
331    fn tracker_register_then_lookup_pending() {
332        let t = SameHostTracker::new();
333        let w = writer_guid(1);
334        let r = reader_guid(2);
335        t.register_pending(w, r);
336        assert!(matches!(t.lookup(w, r), Some(SameHostState::Pending)));
337        assert_eq!(t.len(), 1);
338    }
339
340    #[test]
341    fn tracker_mark_bound_overwrites_pending() {
342        let t = SameHostTracker::new();
343        let w = writer_guid(1);
344        let r = reader_guid(2);
345        t.register_pending(w, r);
346        let dummy: Arc<dyn core::any::Any + Send + Sync> = Arc::new(42u32);
347        t.mark_bound(w, r, dummy, Role::Owner);
348        let st = t.lookup(w, r).expect("entry");
349        match st {
350            SameHostState::Bound { role, .. } => assert_eq!(role, Role::Owner),
351            other => panic!("expected Bound, got {other:?}"),
352        }
353    }
354
355    #[test]
356    fn tracker_register_pending_is_idempotent() {
357        let t = SameHostTracker::new();
358        let w = writer_guid(1);
359        let r = reader_guid(2);
360        let dummy: Arc<dyn core::any::Any + Send + Sync> = Arc::new(42u32);
361        t.mark_bound(w, r, dummy, Role::Consumer);
362        // Re-register must NOT reset the Bound state back to Pending.
363        t.register_pending(w, r);
364        assert!(matches!(t.lookup(w, r), Some(SameHostState::Bound { .. })));
365    }
366
367    #[test]
368    fn tracker_mark_failed_signals_udp_fallback() {
369        let t = SameHostTracker::new();
370        let w = writer_guid(1);
371        let r = reader_guid(2);
372        t.mark_failed(w, r, "shm_open ENOENT");
373        match t.lookup(w, r) {
374            Some(SameHostState::Failed { reason }) => assert!(reason.contains("ENOENT")),
375            other => panic!("expected Failed, got {other:?}"),
376        }
377    }
378
379    #[test]
380    fn tracker_remove_drops_entry() {
381        let t = SameHostTracker::new();
382        let w = writer_guid(1);
383        let r = reader_guid(2);
384        t.register_pending(w, r);
385        t.remove(w, r);
386        assert!(t.lookup(w, r).is_none());
387        assert!(t.is_empty());
388    }
389}