Skip to main content

zerodds_flatdata/
posix.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `PosixSlotAllocator` — echter Cross-Process-Zero-Copy via POSIX
4//! `shm_open` + `mmap` (Spec §4.1, ADR-0003).
5//!
6//! Layout des Segments:
7//!
8//! ```text
9//!   0x00 | u32 segment_magic (0x5A445353 = "ZDSS")
10//!   0x04 | u32 slot_count
11//!   0x08 | u32 slot_total_size
12//!   0x0c | u32 next_sn (atomic counter)
13//!   0x10 | [slot_total_size; slot_count]   ← Slot-Array
14//! ```
15//!
16//! Pro Slot:
17//!
18//! ```text
19//!   0x00 | SlotHeader (16 byte)
20//!   0x10 | [u8; capacity] payload
21//!   0x?? | padding bis 64-byte-Boundary
22//! ```
23//!
24//! Atomare Operationen: `next_sn` ist `AtomicU32`. Der `SlotHeader`
25//! `reader_mask` wird via Compare-and-Swap aktualisiert (siehe
26//! `mark_read` Implementation). Slot-`loaned`-Status liegt im Owner-
27//! Process im RAM (Mutex), nicht im SHM — Cross-Process-Loaning
28//! erforderte einen Lock-Free-Allocator mit atomic-flag-Slot, der
29//! ueber Process-Boundaries lauft; das ist explizit nicht im Scope
30//! dieses Owner-zentrischen Allocators (Loan-API ist deshalb auf
31//! Owner-Process-Caller beschraenkt — Reader-Processes lesen nur
32//! committet Samples).
33
34extern crate alloc;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use core::sync::atomic::{AtomicU32, Ordering};
38use std::path::PathBuf;
39use std::sync::Mutex;
40
41use shared_memory::{Shmem, ShmemConf, ShmemError};
42
43use crate::allocator::{SlotError, SlotHandle};
44use crate::backend::SlotBackend;
45use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
46
47const SEGMENT_MAGIC: u32 = 0x5A44_5353; // "ZDSS"
48
49/// Fehler beim Aufbau des POSIX-Segments.
50#[derive(Debug)]
51#[non_exhaustive]
52pub enum PosixSlotError {
53    /// Shm-Backend-Fehler.
54    Shm(ShmemError),
55    /// Slot-Capacity zu gross fuer u32.
56    CapacityOverflow,
57    /// Segment-Header passt nicht (anderer Owner / wrong Magic).
58    InvalidHeader,
59    /// Internal slot-error (passes through).
60    Slot(SlotError),
61}
62
63impl core::fmt::Display for PosixSlotError {
64    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
65        match self {
66            Self::Shm(e) => write!(f, "shm error: {e}"),
67            Self::CapacityOverflow => f.write_str("slot capacity overflows u32"),
68            Self::InvalidHeader => f.write_str("segment magic/version mismatch"),
69            Self::Slot(e) => write!(f, "{e}"),
70        }
71    }
72}
73
74impl std::error::Error for PosixSlotError {}
75
76impl From<ShmemError> for PosixSlotError {
77    fn from(e: ShmemError) -> Self {
78        Self::Shm(e)
79    }
80}
81
82impl From<SlotError> for PosixSlotError {
83    fn from(e: SlotError) -> Self {
84        Self::Slot(e)
85    }
86}
87
88/// POSIX-mmap Slot-Allocator. Ein Owner-Process erzeugt das Segment;
89/// Consumer-Processes attachen via `attach`.
90pub struct PosixSlotAllocator {
91    /// Shared-memory-Segment. Drop unmappt das Segment.
92    /// `None` nur waehrend des Drops.
93    shmem: Option<Shmem>,
94    /// Pfad zur flink-Datei (zur Reattachment-Discovery).
95    flink: PathBuf,
96    /// Loan-Tracking pro Slot — lokal im Owner-Process. Loan-API
97    /// ist Owner-zentrisch (siehe Modul-Doc); Reader-Processes lesen
98    /// nur committet Samples.
99    loaned: Mutex<Vec<bool>>,
100    /// Slot-Anzahl (zur Bounds-Check, redundant zum Header).
101    slot_count: u32,
102    /// Slot-Total-Size (Header + Payload + Padding).
103    slot_total_size: u32,
104    /// Slot-Daten-Capacity (ohne Header, ohne Padding).
105    slot_capacity: u32,
106}
107
108// SAFETY: Shmem ist nicht Sync per default; wir kontrollieren den
109// Zugriff via Mutex<loaned>. Der Header wird via *mut-Pointer modifiziert,
110// dafuer ist die Atomic-Disziplin verantwortlich.
111unsafe impl Send for PosixSlotAllocator {}
112// SAFETY: Read-Pfade nutzen ptr::read(SlotHeader), Write-Pfade nutzen
113// AtomicU32 via raw pointer cast (mark_read). loaned ist hinter Mutex.
114unsafe impl Sync for PosixSlotAllocator {}
115
116impl PosixSlotAllocator {
117    /// Erzeugt ein neues POSIX-SHM-Segment als Owner.
118    ///
119    /// `flink_path` ist eine Datei im Filesystem (typisch
120    /// `/tmp/zerodds/<segment_id>.flink`), die dem Consumer den
121    /// realen OS-Segment-Namen verraet.
122    ///
123    /// # Errors
124    /// `Shm` bei `shm_open`-Fehler; `CapacityOverflow` wenn
125    /// `slot_capacity > u32::MAX`.
126    pub fn create<P: Into<PathBuf>>(
127        flink_path: P,
128        slot_count: usize,
129        slot_capacity: usize,
130    ) -> Result<Self, PosixSlotError> {
131        let flink_path = flink_path.into();
132        if let Some(parent) = flink_path.parent() {
133            let _ = std::fs::create_dir_all(parent);
134        }
135        let slot_capacity_u32 =
136            u32::try_from(slot_capacity).map_err(|_| PosixSlotError::CapacityOverflow)?;
137        let slot_count_u32 =
138            u32::try_from(slot_count).map_err(|_| PosixSlotError::CapacityOverflow)?;
139        let slot_total_size = align_up(SLOT_HEADER_SIZE + slot_capacity, 64);
140        let slot_total_size_u32 =
141            u32::try_from(slot_total_size).map_err(|_| PosixSlotError::CapacityOverflow)?;
142        let header_size = 0x10usize;
143        let total_size = header_size + slot_count * slot_total_size;
144
145        let shmem = ShmemConf::new()
146            .size(total_size)
147            .flink(&flink_path)
148            .create()?;
149
150        // Header initialisieren.
151        // SAFETY: as_ptr_mut zeigt auf einen mmap'd Region der Groesse
152        // total_size; wir schreiben in die ersten 16 byte den Header.
153        unsafe {
154            let base = shmem.as_ptr();
155            let p = base as *mut u32;
156            p.add(0).write(SEGMENT_MAGIC);
157            p.add(1).write(slot_count_u32);
158            p.add(2).write(slot_total_size_u32);
159            p.add(3).write(0); // next_sn = 0
160            // Slots zeroen.
161            core::ptr::write_bytes(base.add(header_size), 0u8, slot_count * slot_total_size);
162        }
163
164        Ok(Self {
165            shmem: Some(shmem),
166            flink: flink_path,
167            loaned: Mutex::new(alloc::vec![false; slot_count]),
168            slot_count: slot_count_u32,
169            slot_total_size: slot_total_size_u32,
170            slot_capacity: slot_capacity_u32,
171        })
172    }
173
174    /// Attached an ein bestehendes POSIX-SHM-Segment via flink-Pfad.
175    /// Der Caller wird Consumer (kein Owner — Drop unmappt nur, nicht
176    /// `shm_unlink`).
177    ///
178    /// # Errors
179    /// `Shm` bei attach-Fehler; `InvalidHeader` wenn Magic/Layout
180    /// nicht stimmt.
181    pub fn attach<P: Into<PathBuf>>(flink_path: P) -> Result<Self, PosixSlotError> {
182        let flink_path = flink_path.into();
183        let shmem = ShmemConf::new().flink(&flink_path).open()?;
184
185        // Header validieren.
186        // SAFETY: shmem.as_ptr ist valide fuer mindestens 16 byte
187        // (sonst waere create gescheitert). Wir lesen 4 u32.
188        let (magic, slot_count, slot_total_size, _next_sn) = unsafe {
189            let p = shmem.as_ptr() as *const u32;
190            (
191                p.add(0).read(),
192                p.add(1).read(),
193                p.add(2).read(),
194                p.add(3).read(),
195            )
196        };
197        if magic != SEGMENT_MAGIC {
198            return Err(PosixSlotError::InvalidHeader);
199        }
200
201        let slot_capacity = slot_total_size.saturating_sub(SLOT_HEADER_SIZE as u32);
202
203        Ok(Self {
204            shmem: Some(shmem),
205            flink: flink_path,
206            loaned: Mutex::new(alloc::vec![false; slot_count as usize]),
207            slot_count,
208            slot_total_size,
209            slot_capacity,
210        })
211    }
212
213    /// Pfad der flink-Datei (fuer Discovery).
214    #[must_use]
215    pub fn flink_path(&self) -> &str {
216        self.flink.to_str().unwrap_or("")
217    }
218
219    /// Liefert den Segment-Pfad als String fuer den ShmLocator.
220    /// Dies ist das, was im PID_SHM_LOCATOR steht.
221    pub fn segment_path(&self) -> String {
222        self.flink_path().to_string()
223    }
224
225    fn slot_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
226        if idx >= self.slot_count {
227            return Err(SlotError::OutOfBounds);
228        }
229        let header_size = 0x10usize;
230        // SAFETY: caller-Bound geprueft (idx < slot_count); offset bleibt
231        // im total_size, der bei create gesichert wurde.
232        let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
233        let base = shmem.as_ptr();
234        // SAFETY: idx < slot_count (oben gepruft); offset bleibt im
235        // total_size, der bei create gesichert wurde (header_size +
236        // slot_count * slot_total_size).
237        unsafe { Ok(base.add(header_size + (idx as usize) * (self.slot_total_size as usize))) }
238    }
239
240    fn read_header(&self, idx: u32) -> Result<SlotHeader, SlotError> {
241        let p = self.slot_ptr(idx)?;
242        // SAFETY: p zeigt auf SLOT_HEADER_SIZE-Bytes (durch slot_ptr-
243        // Bounds garantiert); SlotHeader ist repr(C, align(4)), 16 byte.
244        let header = unsafe { core::ptr::read(p as *const SlotHeader) };
245        Ok(header)
246    }
247
248    fn write_header(&self, idx: u32, header: SlotHeader) -> Result<(), SlotError> {
249        let p = self.slot_ptr(idx)?;
250        // SAFETY: p ist 4-byte-aligned (Layout-Garantie); 16 byte Schreib-
251        // Region; SlotHeader ist repr(C, align(4)).
252        unsafe {
253            core::ptr::write(p as *mut SlotHeader, header);
254        }
255        Ok(())
256    }
257
258    fn next_sn_inc(&self) -> Result<u32, SlotError> {
259        let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
260        // SAFETY: next_sn liegt bei Offset 12 im Header (4. u32). Der
261        // Shmem ist mindestens 16 byte. AtomicU32 + ptr::read:
262        // wir nutzen direkt den AtomicU32 ueber raw pointer.
263        let sn_ptr = unsafe { shmem.as_ptr().add(12) as *const AtomicU32 };
264        // SAFETY: sn_ptr zeigt auf 4-byte-aligned u32 im SHM.
265        let atomic = unsafe { &*sn_ptr };
266        Ok(atomic.fetch_add(1, Ordering::Relaxed))
267    }
268
269    fn data_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
270        let p = self.slot_ptr(idx)?;
271        // SAFETY: data folgt direkt nach Header (Offset 16).
272        Ok(unsafe { p.add(SLOT_HEADER_SIZE) })
273    }
274}
275
276impl SlotBackend for PosixSlotAllocator {
277    fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
278        let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
279        for idx in 0..self.slot_count {
280            if loaned[idx as usize] {
281                continue;
282            }
283            let header = self.read_header(idx)?;
284            if header.sample_size == 0 || header.all_read(active_readers_mask) {
285                loaned[idx as usize] = true;
286                return Ok(SlotHandle {
287                    segment_id: 0,
288                    slot_index: idx,
289                });
290            }
291        }
292        Err(SlotError::NoFreeSlot)
293    }
294
295    fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
296        if bytes.len() > self.slot_capacity as usize {
297            return Err(SlotError::SampleTooLarge {
298                sample: bytes.len(),
299                slot_capacity: self.slot_capacity as usize,
300            });
301        }
302        let sn = self.next_sn_inc()?;
303        let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
304        let header = SlotHeader::new(sn, sample_size);
305        // Daten zuerst, Header zuletzt (release-Reihenfolge).
306        let dp = self.data_ptr(handle.slot_index)?;
307        // SAFETY: dp ist Slot-Daten-Bereich, mindestens slot_capacity Bytes.
308        unsafe {
309            core::ptr::copy_nonoverlapping(bytes.as_ptr(), dp, bytes.len());
310        }
311        self.write_header(handle.slot_index, header)?;
312        // Loan freigeben.
313        let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
314        loaned[handle.slot_index as usize] = false;
315        Ok(sn)
316    }
317
318    fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
319        let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
320        if (handle.slot_index as usize) >= loaned.len() {
321            return Err(SlotError::OutOfBounds);
322        }
323        loaned[handle.slot_index as usize] = false;
324        Ok(())
325    }
326
327    fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
328        let header = self.read_header(handle.slot_index)?;
329        let n = (header.sample_size as usize).min(self.slot_capacity as usize);
330        let dp = self.data_ptr(handle.slot_index)?;
331        let mut buf = alloc::vec![0u8; n];
332        // SAFETY: dp ist slot_capacity Bytes; n <= slot_capacity.
333        unsafe {
334            core::ptr::copy_nonoverlapping(dp, buf.as_mut_ptr(), n);
335        }
336        Ok((header, buf))
337    }
338
339    fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
340        debug_assert!(reader_index < 32);
341        // SAFETY: slot_ptr returnt einen Pointer in den Slot (bounds-
342        // checked); Header startet dort. reader_mask ist u32 bei
343        // Offset 8 im Header.
344        let p = self.slot_ptr(handle.slot_index)?;
345        // SAFETY: p ist Slot-Beginn; +8 zeigt auf reader_mask u32.
346        let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
347        // SAFETY: mask_ptr zeigt auf u32 im SHM, gültig bis Drop.
348        let atomic = unsafe { &*mask_ptr };
349        atomic.fetch_or(1u32 << reader_index, Ordering::Relaxed);
350        Ok(())
351    }
352
353    fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
354        debug_assert!(reader_index < 32);
355        let bit = 1u32 << reader_index;
356        for idx in 0..self.slot_count {
357            let p = self.slot_ptr(idx)?;
358            // SAFETY: reader_mask liegt bei Offset 8 im Header
359            // (nach sn:u32 + sample_size:u32). 4-byte aligned per
360            // SlotHeader-Layout-Garantie.
361            let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
362            // SAFETY: mask_ptr zeigt auf u32 im SHM, gültig bis Drop.
363            let atomic = unsafe { &*mask_ptr };
364            atomic.fetch_or(bit, Ordering::Relaxed);
365        }
366        Ok(())
367    }
368
369    fn slot_count(&self) -> Result<usize, SlotError> {
370        Ok(self.slot_count as usize)
371    }
372
373    fn slot_total_size(&self) -> usize {
374        self.slot_total_size as usize
375    }
376
377    fn slot_capacity(&self) -> usize {
378        self.slot_capacity as usize
379    }
380}
381
382fn align_up(x: usize, n: usize) -> usize {
383    debug_assert!(n.is_power_of_two());
384    (x + n - 1) & !(n - 1)
385}
386
387#[cfg(test)]
388#[allow(clippy::expect_used, clippy::unwrap_used)]
389mod tests {
390    use super::*;
391    use core::sync::atomic::{AtomicU64, Ordering};
392
393    fn unique_flink() -> PathBuf {
394        static N: AtomicU64 = AtomicU64::new(0);
395        let pid = std::process::id();
396        let n = N.fetch_add(1, Ordering::Relaxed);
397        let mut p = std::env::temp_dir();
398        p.push(alloc::format!("zerodds-flatdata-test-{pid}-{n}"));
399        p
400    }
401
402    #[test]
403    fn create_attach_roundtrip() {
404        let flink = unique_flink();
405        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
406        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
407        assert_eq!(SlotBackend::slot_count(&owner).unwrap(), 4);
408        assert_eq!(SlotBackend::slot_count(&consumer).unwrap(), 4);
409        // Slot-Total-Size: 16 + 64 = 80 → padded auf 128.
410        assert_eq!(SlotBackend::slot_total_size(&owner), 128);
411    }
412
413    #[test]
414    fn write_read_through_shm() {
415        let flink = unique_flink();
416        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
417        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
418
419        let h = SlotBackend::reserve_slot(&owner, 0b1).expect("reserve");
420        let _sn = SlotBackend::commit_slot(&owner, h, &[1, 2, 3, 4]).expect("commit");
421
422        let (header, bytes) = SlotBackend::read_slot(&consumer, h).expect("read");
423        assert_eq!(header.sample_size, 4);
424        assert_eq!(bytes, vec![1, 2, 3, 4]);
425    }
426
427    #[test]
428    fn mark_read_visible_to_owner() {
429        let flink = unique_flink();
430        let owner = PosixSlotAllocator::create(&flink, 1, 64).expect("create");
431        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
432
433        let h = SlotBackend::reserve_slot(&owner, 0b011).expect("reserve");
434        SlotBackend::commit_slot(&owner, h, &[0xFF]).expect("commit");
435
436        // Consumer markiert Reader 0 + Reader 1 als gelesen.
437        SlotBackend::mark_read(&consumer, h, 0).expect("mark0");
438        SlotBackend::mark_read(&consumer, h, 1).expect("mark1");
439
440        // Owner sieht reader_mask = 0b11 → Slot ist frei fuer Reuse.
441        let (header, _) = SlotBackend::read_slot(&owner, h).unwrap();
442        assert_eq!(header.reader_mask, 0b011);
443
444        // Owner kann Slot wieder reservieren.
445        let _ = SlotBackend::reserve_slot(&owner, 0b011).expect("reuse");
446    }
447
448    #[test]
449    fn next_sn_increments_atomically() {
450        let flink = unique_flink();
451        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
452
453        let h0 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
454        let sn0 = SlotBackend::commit_slot(&owner, h0, &[0]).unwrap();
455        let h1 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
456        let sn1 = SlotBackend::commit_slot(&owner, h1, &[1]).unwrap();
457        assert!(sn1 > sn0);
458    }
459}