Skip to main content

zerodds_flatdata/
posix.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `PosixSlotAllocator` — real cross-process zero-copy via POSIX
4//! `shm_open` + `mmap` (spec §4.1, ADR-0003).
5//!
6//! Segment layout:
7//!
8//! ```text
9//!   0x00 | u32 segment_magic (0x5A445353 = "ZDSS")
10//!   0x04 | u32 slot_count
11//!   0x08 | u32 slot_total_size
12//!   0x0c | u32 next_sn (atomic counter)
13//!   0x10 | [slot_total_size; slot_count]   ← Slot-Array
14//! ```
15//!
16//! Per slot:
17//!
18//! ```text
19//!   0x00 | SlotHeader (16 byte)
20//!   0x10 | [u8; capacity] payload
21//!   0x?? | padding bis 64-byte-Boundary
22//! ```
23//!
24//! Atomic operations: `next_sn` is an `AtomicU32`. The `SlotHeader`
25//! `reader_mask` is updated via compare-and-swap (see the
26//! `mark_read` implementation). The slot `loaned` status lives in the
27//! owner process's RAM (Mutex), not in the SHM — cross-process loaning
28//! would require a lock-free allocator with an atomic-flag slot that
29//! works across process boundaries; that is explicitly out of scope for
30//! this owner-centric allocator (the loan API is therefore restricted
31//! to owner-process callers — reader processes only read
32//! committed samples).
33
34extern crate alloc;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use core::sync::atomic::{AtomicU32, Ordering};
38use std::path::PathBuf;
39use std::sync::Mutex;
40
41use shared_memory::{Shmem, ShmemConf, ShmemError};
42
43use crate::allocator::{SlotError, SlotHandle};
44use crate::backend::SlotBackend;
45use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
46
47const SEGMENT_MAGIC: u32 = 0x5A44_5353; // "ZDSS"
48
49/// Segment header size in bytes. Layout: u32 magic, slot_count,
50/// slot_total_size, next_sn (offset 0x00..0x0c), then a u32 notify generation
51/// at offset 0x10 (Spec §4.2 cross-process futex word), padded to 0x20 so the
52/// slot array stays 8-byte aligned.
53const SEGMENT_HEADER_SIZE: usize = 0x20;
54
55/// Byte offset of the notify-generation u32 within the segment header.
56const GEN_OFFSET: usize = 0x10;
57
58/// Error while setting up the POSIX segment.
59#[derive(Debug)]
60#[non_exhaustive]
61pub enum PosixSlotError {
62    /// Shm backend error.
63    Shm(ShmemError),
64    /// Slot capacity too large for u32.
65    CapacityOverflow,
66    /// Segment header does not match (different owner / wrong magic).
67    InvalidHeader,
68    /// Internal slot error (passes through).
69    Slot(SlotError),
70}
71
72impl core::fmt::Display for PosixSlotError {
73    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74        match self {
75            Self::Shm(e) => write!(f, "shm error: {e}"),
76            Self::CapacityOverflow => f.write_str("slot capacity overflows u32"),
77            Self::InvalidHeader => f.write_str("segment magic/version mismatch"),
78            Self::Slot(e) => write!(f, "{e}"),
79        }
80    }
81}
82
83impl std::error::Error for PosixSlotError {}
84
85impl From<ShmemError> for PosixSlotError {
86    fn from(e: ShmemError) -> Self {
87        Self::Shm(e)
88    }
89}
90
91impl From<SlotError> for PosixSlotError {
92    fn from(e: SlotError) -> Self {
93        Self::Slot(e)
94    }
95}
96
97/// POSIX mmap slot allocator. An owner process creates the segment;
98/// consumer processes attach via `attach`.
99pub struct PosixSlotAllocator {
100    /// Shared-memory segment. Drop unmaps the segment.
101    /// `None` only during the drop.
102    shmem: Option<Shmem>,
103    /// Path to the flink file (for reattachment discovery).
104    flink: PathBuf,
105    /// Loan tracking per slot — local to the owner process. The loan API
106    /// is owner-centric (see module docs); reader processes only read
107    /// committed samples.
108    loaned: Mutex<Vec<bool>>,
109    /// Slot count (for the bounds check, redundant with the header).
110    slot_count: u32,
111    /// Total slot size (header + payload + padding).
112    slot_total_size: u32,
113    /// Slot data capacity (without header, without padding).
114    slot_capacity: u32,
115}
116
117// SAFETY: Shmem is not Sync by default; we control access
118// via Mutex<loaned>. The header is modified via a *mut pointer,
119// for which the atomic discipline is responsible.
120unsafe impl Send for PosixSlotAllocator {}
121// SAFETY: read paths use ptr::read(SlotHeader), write paths use
122// AtomicU32 via a raw pointer cast (mark_read). loaned is behind a Mutex.
123unsafe impl Sync for PosixSlotAllocator {}
124
125impl PosixSlotAllocator {
126    /// Creates a new POSIX SHM segment as the owner.
127    ///
128    /// `flink_path` is a file in the filesystem (typically
129    /// `/tmp/zerodds/<segment_id>.flink`) that reveals the
130    /// real OS segment name to the consumer.
131    ///
132    /// # Errors
133    /// `Shm` on a `shm_open` error; `CapacityOverflow` when
134    /// `slot_capacity > u32::MAX`.
135    pub fn create<P: Into<PathBuf>>(
136        flink_path: P,
137        slot_count: usize,
138        slot_capacity: usize,
139    ) -> Result<Self, PosixSlotError> {
140        let flink_path = flink_path.into();
141        if let Some(parent) = flink_path.parent() {
142            let _ = std::fs::create_dir_all(parent);
143        }
144        let slot_capacity_u32 =
145            u32::try_from(slot_capacity).map_err(|_| PosixSlotError::CapacityOverflow)?;
146        let slot_count_u32 =
147            u32::try_from(slot_count).map_err(|_| PosixSlotError::CapacityOverflow)?;
148        let slot_total_size = align_up(SLOT_HEADER_SIZE + slot_capacity, 64);
149        let slot_total_size_u32 =
150            u32::try_from(slot_total_size).map_err(|_| PosixSlotError::CapacityOverflow)?;
151        let header_size = SEGMENT_HEADER_SIZE;
152        let total_size = header_size + slot_count * slot_total_size;
153
154        let shmem = ShmemConf::new()
155            .size(total_size)
156            .flink(&flink_path)
157            .create()?;
158
159        // Spec §7.1: owner-only 0600 on both the flink file and the backing
160        // shm object (the `shared_memory` crate leaves them at the umask
161        // default, typically 0644 — world-readable). A peer must be the same
162        // uid to attach (cross-host/cross-uid is gated separately); 0600 stops
163        // any other local user reading the zero-copy payload.
164        #[cfg(unix)]
165        {
166            use std::os::unix::fs::PermissionsExt;
167            let mode = std::fs::Permissions::from_mode(0o600);
168            let _ = std::fs::set_permissions(&flink_path, mode.clone());
169            #[cfg(target_os = "linux")]
170            {
171                // shm_open names map to /dev/shm/<name> on Linux.
172                let shm_path = std::path::Path::new("/dev/shm")
173                    .join(shmem.get_os_id().trim_start_matches('/'));
174                let _ = std::fs::set_permissions(&shm_path, mode);
175            }
176        }
177
178        // Initialize the header.
179        // SAFETY: as_ptr_mut points to an mmap'd region of size
180        // total_size; we write the header into the first 16 byte.
181        unsafe {
182            let base = shmem.as_ptr();
183            let p = base as *mut u32;
184            p.add(0).write(SEGMENT_MAGIC);
185            p.add(1).write(slot_count_u32);
186            p.add(2).write(slot_total_size_u32);
187            p.add(3).write(0); // next_sn = 0
188            p.add(GEN_OFFSET / 4).write(0); // notify generation = 0 (§4.2)
189            // Zero the slots.
190            core::ptr::write_bytes(base.add(header_size), 0u8, slot_count * slot_total_size);
191        }
192
193        Ok(Self {
194            shmem: Some(shmem),
195            flink: flink_path,
196            loaned: Mutex::new(alloc::vec![false; slot_count]),
197            slot_count: slot_count_u32,
198            slot_total_size: slot_total_size_u32,
199            slot_capacity: slot_capacity_u32,
200        })
201    }
202
203    /// Attaches to an existing POSIX SHM segment via the flink path.
204    /// The caller becomes a consumer (not an owner — Drop only unmaps,
205    /// it does not `shm_unlink`).
206    ///
207    /// # Errors
208    /// `Shm` on an attach error; `InvalidHeader` when the magic/layout
209    /// does not match.
210    pub fn attach<P: Into<PathBuf>>(flink_path: P) -> Result<Self, PosixSlotError> {
211        let flink_path = flink_path.into();
212        let shmem = ShmemConf::new().flink(&flink_path).open()?;
213
214        // Validate the header.
215        // SAFETY: shmem.as_ptr is valid for at least 16 byte
216        // (otherwise create would have failed). We read 4 u32.
217        let (magic, slot_count, slot_total_size, _next_sn) = unsafe {
218            let p = shmem.as_ptr() as *const u32;
219            (
220                p.add(0).read(),
221                p.add(1).read(),
222                p.add(2).read(),
223                p.add(3).read(),
224            )
225        };
226        if magic != SEGMENT_MAGIC {
227            return Err(PosixSlotError::InvalidHeader);
228        }
229
230        let slot_capacity = slot_total_size.saturating_sub(SLOT_HEADER_SIZE as u32);
231
232        Ok(Self {
233            shmem: Some(shmem),
234            flink: flink_path,
235            loaned: Mutex::new(alloc::vec![false; slot_count as usize]),
236            slot_count,
237            slot_total_size,
238            slot_capacity,
239        })
240    }
241
242    /// Path of the flink file (for discovery).
243    #[must_use]
244    pub fn flink_path(&self) -> &str {
245        self.flink.to_str().unwrap_or("")
246    }
247
248    /// Returns the segment path as a string for the ShmLocator.
249    /// This is what is stored in the PID_SHM_LOCATOR.
250    pub fn segment_path(&self) -> String {
251        self.flink_path().to_string()
252    }
253
254    /// OS shm id of the backing segment (e.g. for the §7.1 permission check).
255    // Currently only exercised by the Linux permission test; kept available for
256    // the production §7.1 check (unused in the non-test lib build).
257    #[cfg(target_os = "linux")]
258    #[cfg_attr(not(test), allow(dead_code))]
259    pub(crate) fn shmem_os_id(&self) -> &str {
260        self.shmem
261            .as_ref()
262            .map_or("", shared_memory::Shmem::get_os_id)
263    }
264
265    fn slot_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
266        if idx >= self.slot_count {
267            return Err(SlotError::OutOfBounds);
268        }
269        let header_size = SEGMENT_HEADER_SIZE;
270        // SAFETY: caller bound checked (idx < slot_count); the offset stays
271        // within total_size, which was guaranteed at create time.
272        let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
273        let base = shmem.as_ptr();
274        // SAFETY: idx < slot_count (checked above); the offset stays within
275        // total_size, which was guaranteed at create time (header_size +
276        // slot_count * slot_total_size).
277        unsafe { Ok(base.add(header_size + (idx as usize) * (self.slot_total_size as usize))) }
278    }
279
280    fn read_header(&self, idx: u32) -> Result<SlotHeader, SlotError> {
281        let p = self.slot_ptr(idx)?;
282        // SAFETY: p points to SLOT_HEADER_SIZE bytes (guaranteed by the
283        // slot_ptr bounds); SlotHeader is repr(C, align(4)), 16 byte.
284        let header = unsafe { core::ptr::read(p as *const SlotHeader) };
285        Ok(header)
286    }
287
288    fn write_header(&self, idx: u32, header: SlotHeader) -> Result<(), SlotError> {
289        let p = self.slot_ptr(idx)?;
290        // SAFETY: p is 4-byte-aligned (layout guarantee); 16 byte write
291        // region; SlotHeader is repr(C, align(4)).
292        unsafe {
293            core::ptr::write(p as *mut SlotHeader, header);
294        }
295        Ok(())
296    }
297
298    fn next_sn_inc(&self) -> Result<u32, SlotError> {
299        let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
300        // SAFETY: next_sn is at offset 12 in the header (4th u32). The
301        // Shmem is at least 16 byte. AtomicU32 + ptr::read:
302        // we use the AtomicU32 directly via a raw pointer.
303        let sn_ptr = unsafe { shmem.as_ptr().add(12) as *const AtomicU32 };
304        // SAFETY: sn_ptr points to a 4-byte-aligned u32 in the SHM.
305        let atomic = unsafe { &*sn_ptr };
306        Ok(atomic.fetch_add(1, Ordering::Relaxed))
307    }
308
309    fn data_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
310        let p = self.slot_ptr(idx)?;
311        // SAFETY: data follows directly after the header (offset 16).
312        Ok(unsafe { p.add(SLOT_HEADER_SIZE) })
313    }
314
315    /// `&AtomicU32` view of the notify-generation word at [`GEN_OFFSET`] in the
316    /// shared segment header. Shared across processes that map the same segment.
317    fn gen_atomic(&self) -> Option<&AtomicU32> {
318        let shmem = self.shmem.as_ref()?;
319        // SAFETY: GEN_OFFSET (0x10) is inside the header (size 0x20), 4-aligned;
320        // the segment outlives this borrow (owned by self).
321        Some(unsafe { &*(shmem.as_ptr().add(GEN_OFFSET) as *const AtomicU32) })
322    }
323
324    /// Spec §4.2: bump the shared generation and wake any cross-process waiters
325    /// (futex on Linux; no-op wake elsewhere — readers there fall back to the
326    /// caller-driven poll).
327    fn bump_notify(&self) {
328        if let Some(g) = self.gen_atomic() {
329            g.fetch_add(1, Ordering::Release);
330            #[cfg(target_os = "linux")]
331            futex::wake_all(g);
332        }
333    }
334}
335
336/// Linux cross-process futex helpers (Spec §4.2). A futex on the shared
337/// generation word lets a reader park in the kernel until the writer wakes it —
338/// no busy-poll, no UDP roundtrip. Cross-process (not `FUTEX_PRIVATE`).
339#[cfg(target_os = "linux")]
340mod futex {
341    use core::sync::atomic::AtomicU32;
342
343    pub(super) fn wake_all(word: &AtomicU32) {
344        let ptr = core::ptr::from_ref(word).cast::<u32>().cast_mut();
345        // SAFETY: ptr is a valid, aligned u32 in shared memory. FUTEX_WAKE with
346        // i32::MAX wakes all waiters; extra args are ignored for WAKE.
347        unsafe {
348            libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, i32::MAX, 0, 0, 0);
349        }
350    }
351
352    /// Parks until `*word != expected` or `timeout` elapses.
353    pub(super) fn wait(word: &AtomicU32, expected: u32, timeout: core::time::Duration) {
354        let ts = libc::timespec {
355            tv_sec: timeout.as_secs() as libc::time_t,
356            tv_nsec: libc::c_long::from(timeout.subsec_nanos().min(999_999_999) as i32),
357        };
358        let ptr = core::ptr::from_ref(word).cast::<u32>().cast_mut();
359        // SAFETY: ptr is a valid, aligned u32 in shared memory; &ts is valid for
360        // the call. FUTEX_WAIT returns immediately if *ptr != expected.
361        unsafe {
362            libc::syscall(
363                libc::SYS_futex,
364                ptr,
365                libc::FUTEX_WAIT,
366                expected as libc::c_int,
367                core::ptr::from_ref(&ts),
368                0,
369                0,
370            );
371        }
372    }
373}
374
375impl SlotBackend for PosixSlotAllocator {
376    fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
377        let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
378        for idx in 0..self.slot_count {
379            if loaned[idx as usize] {
380                continue;
381            }
382            let header = self.read_header(idx)?;
383            if header.sample_size == 0 || header.all_read(active_readers_mask) {
384                loaned[idx as usize] = true;
385                return Ok(SlotHandle {
386                    segment_id: 0,
387                    slot_index: idx,
388                });
389            }
390        }
391        Err(SlotError::NoFreeSlot)
392    }
393
394    fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
395        if bytes.len() > self.slot_capacity as usize {
396            return Err(SlotError::SampleTooLarge {
397                sample: bytes.len(),
398                slot_capacity: self.slot_capacity as usize,
399            });
400        }
401        let sn = self.next_sn_inc()?;
402        let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
403        let header = SlotHeader::new(sn, sample_size);
404        // Data first, header last (release ordering).
405        let dp = self.data_ptr(handle.slot_index)?;
406        // SAFETY: dp is the slot data area, at least slot_capacity bytes.
407        unsafe {
408            core::ptr::copy_nonoverlapping(bytes.as_ptr(), dp, bytes.len());
409        }
410        self.write_header(handle.slot_index, header)?;
411        // Release the loan.
412        {
413            let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
414            loaned[handle.slot_index as usize] = false;
415        }
416        self.bump_notify(); // new sample → wake cross-process readers (§4.2)
417        Ok(sn)
418    }
419
420    fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
421        {
422            let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
423            if (handle.slot_index as usize) >= loaned.len() {
424                return Err(SlotError::OutOfBounds);
425            }
426            loaned[handle.slot_index as usize] = false;
427        }
428        self.bump_notify();
429        Ok(())
430    }
431
432    fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
433        {
434            let loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
435            let idx = handle.slot_index as usize;
436            if idx >= loaned.len() || !loaned[idx] {
437                return Err(SlotError::OutOfBounds);
438            }
439        }
440        // The slot data lives in the mmap segment at a fixed offset; the
441        // pointer is stable for the whole loan.
442        let dp = self.data_ptr(handle.slot_index)?;
443        Ok((dp, self.slot_capacity as usize))
444    }
445
446    fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
447        if len > self.slot_capacity as usize {
448            return Err(SlotError::SampleTooLarge {
449                sample: len,
450                slot_capacity: self.slot_capacity as usize,
451            });
452        }
453        let sn = self.next_sn_inc()?;
454        let sample_size = u32::try_from(len).unwrap_or(u32::MAX);
455        // Data already written in place by the caller — header only (release
456        // ordering, identical to commit_slot minus the copy).
457        self.write_header(handle.slot_index, SlotHeader::new(sn, sample_size))?;
458        {
459            let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
460            loaned[handle.slot_index as usize] = false;
461        }
462        self.bump_notify();
463        Ok(sn)
464    }
465
466    fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
467        let header = self.read_header(handle.slot_index)?;
468        let n = (header.sample_size as usize).min(self.slot_capacity as usize);
469        let dp = self.data_ptr(handle.slot_index)?;
470        Ok((dp.cast_const(), n))
471    }
472
473    fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
474        let bit = 1u32 << reader_index;
475        for idx in 0..self.slot_count {
476            let header = self.read_header(idx)?;
477            if header.sample_size > 0 && (header.reader_mask & bit) == 0 {
478                return Ok(Some(SlotHandle {
479                    segment_id: 0,
480                    slot_index: idx,
481                }));
482            }
483        }
484        Ok(None)
485    }
486
487    fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
488        let header = self.read_header(handle.slot_index)?;
489        let n = (header.sample_size as usize).min(self.slot_capacity as usize);
490        let dp = self.data_ptr(handle.slot_index)?;
491        let mut buf = alloc::vec![0u8; n];
492        // SAFETY: dp is slot_capacity bytes; n <= slot_capacity.
493        unsafe {
494            core::ptr::copy_nonoverlapping(dp, buf.as_mut_ptr(), n);
495        }
496        Ok((header, buf))
497    }
498
499    fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
500        debug_assert!(reader_index < 32);
501        // SAFETY: slot_ptr returns a pointer into the slot (bounds-
502        // checked); the header starts there. reader_mask is a u32 at
503        // offset 8 in the header.
504        let p = self.slot_ptr(handle.slot_index)?;
505        // SAFETY: p is the slot start; +8 points to the reader_mask u32.
506        let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
507        // SAFETY: mask_ptr points to a u32 in SHM, valid until Drop.
508        let atomic = unsafe { &*mask_ptr };
509        atomic.fetch_or(1u32 << reader_index, Ordering::Relaxed);
510        self.bump_notify(); // slot may have freed → wake cross-process writers
511        Ok(())
512    }
513
514    fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
515        debug_assert!(reader_index < 32);
516        let bit = 1u32 << reader_index;
517        for idx in 0..self.slot_count {
518            let p = self.slot_ptr(idx)?;
519            // SAFETY: reader_mask is at offset 8 in the header
520            // (after sn:u32 + sample_size:u32). 4-byte aligned per the
521            // SlotHeader layout guarantee.
522            let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
523            // SAFETY: mask_ptr points to a u32 in SHM, valid until Drop.
524            let atomic = unsafe { &*mask_ptr };
525            atomic.fetch_or(bit, Ordering::Relaxed);
526        }
527        self.bump_notify();
528        Ok(())
529    }
530
531    fn slot_count(&self) -> Result<usize, SlotError> {
532        Ok(self.slot_count as usize)
533    }
534
535    fn slot_total_size(&self) -> usize {
536        self.slot_total_size as usize
537    }
538
539    fn slot_capacity(&self) -> usize {
540        self.slot_capacity as usize
541    }
542
543    fn notify_generation(&self) -> u64 {
544        self.gen_atomic()
545            .map_or(0, |g| u64::from(g.load(Ordering::Acquire)))
546    }
547
548    fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
549        // Cross-process futex park on the shared generation word (Linux). On
550        // other platforms there is no portable cross-process futex; the reader
551        // there falls back to the caller-driven poll (no event-driven wait).
552        #[cfg(target_os = "linux")]
553        if let Some(g) = self.gen_atomic() {
554            futex::wait(g, last as u32, timeout);
555        }
556        #[cfg(not(target_os = "linux"))]
557        let _ = (last, timeout);
558    }
559}
560
561fn align_up(x: usize, n: usize) -> usize {
562    debug_assert!(n.is_power_of_two());
563    (x + n - 1) & !(n - 1)
564}
565
566#[cfg(test)]
567#[allow(clippy::expect_used, clippy::unwrap_used)]
568mod tests {
569    use super::*;
570    use core::sync::atomic::{AtomicU64, Ordering};
571
572    fn unique_flink() -> PathBuf {
573        static N: AtomicU64 = AtomicU64::new(0);
574        let pid = std::process::id();
575        let n = N.fetch_add(1, Ordering::Relaxed);
576        let mut p = std::env::temp_dir();
577        p.push(alloc::format!("zerodds-flatdata-test-{pid}-{n}"));
578        p
579    }
580
581    #[test]
582    fn create_attach_roundtrip() {
583        let flink = unique_flink();
584        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
585        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
586        assert_eq!(SlotBackend::slot_count(&owner).unwrap(), 4);
587        assert_eq!(SlotBackend::slot_count(&consumer).unwrap(), 4);
588        // Slot-Total-Size: 16 + 64 = 80 → padded auf 128.
589        assert_eq!(SlotBackend::slot_total_size(&owner), 128);
590    }
591
592    #[cfg(target_os = "linux")]
593    #[test]
594    fn futex_notify_wakes_consumer_across_mappings() {
595        // Spec §4.2: the consumer parks on a cross-process futex on the shared
596        // generation word; the owner's commit wakes it. Owner (create) and
597        // consumer (attach) map the SAME segment at different virtual addresses
598        // but the same physical page — exactly the cross-process case.
599        use alloc::sync::Arc;
600        use core::time::Duration;
601        let flink = unique_flink();
602        let owner = Arc::new(PosixSlotAllocator::create(&flink, 4, 64).expect("create"));
603        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
604
605        let w = Arc::clone(&owner);
606        let h = std::thread::spawn(move || {
607            std::thread::sleep(Duration::from_millis(50));
608            let handle = w.reserve_slot(0b1).expect("reserve");
609            w.commit_slot(handle, &[1, 2, 3, 4]).expect("commit"); // bumps + futex_wake
610        });
611
612        let start = std::time::Instant::now();
613        let g0 = consumer.notify_generation();
614        consumer.wait_for_change(g0, Duration::from_secs(5)); // futex_wait
615        let woke = start.elapsed();
616        assert!(
617            woke < Duration::from_secs(2),
618            "consumer should wake on the owner's futex_wake, not spin to timeout (waited {woke:?})"
619        );
620        assert!(
621            consumer.notify_generation() != g0,
622            "generation must have advanced"
623        );
624        h.join().unwrap();
625    }
626
627    #[cfg(target_os = "linux")]
628    #[test]
629    fn segment_is_owner_only_0600() {
630        // Spec §7.1: the flink file and the /dev/shm object must be 0600
631        // (owner-only), not world-readable. Linux-only (shm path is /dev/shm).
632        use std::os::unix::fs::PermissionsExt;
633        let flink = unique_flink();
634        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
635        let flink_mode = std::fs::metadata(&flink)
636            .expect("flink stat")
637            .permissions()
638            .mode()
639            & 0o777;
640        assert_eq!(
641            flink_mode, 0o600,
642            "flink file must be 0600, was {flink_mode:o}"
643        );
644        let shm_path =
645            std::path::Path::new("/dev/shm").join(owner.shmem_os_id().trim_start_matches('/'));
646        let shm_mode = std::fs::metadata(&shm_path)
647            .expect("shm stat")
648            .permissions()
649            .mode()
650            & 0o777;
651        assert_eq!(shm_mode, 0o600, "shm object must be 0600, was {shm_mode:o}");
652    }
653
654    #[test]
655    fn write_read_through_shm() {
656        let flink = unique_flink();
657        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
658        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
659
660        let h = SlotBackend::reserve_slot(&owner, 0b1).expect("reserve");
661        let _sn = SlotBackend::commit_slot(&owner, h, &[1, 2, 3, 4]).expect("commit");
662
663        let (header, bytes) = SlotBackend::read_slot(&consumer, h).expect("read");
664        assert_eq!(header.sample_size, 4);
665        assert_eq!(bytes, vec![1, 2, 3, 4]);
666    }
667
668    #[test]
669    fn mark_read_visible_to_owner() {
670        let flink = unique_flink();
671        let owner = PosixSlotAllocator::create(&flink, 1, 64).expect("create");
672        let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
673
674        let h = SlotBackend::reserve_slot(&owner, 0b011).expect("reserve");
675        SlotBackend::commit_slot(&owner, h, &[0xFF]).expect("commit");
676
677        // Consumer marks reader 0 + reader 1 as read.
678        SlotBackend::mark_read(&consumer, h, 0).expect("mark0");
679        SlotBackend::mark_read(&consumer, h, 1).expect("mark1");
680
681        // The owner sees reader_mask = 0b11 → the slot is free for reuse.
682        let (header, _) = SlotBackend::read_slot(&owner, h).unwrap();
683        assert_eq!(header.reader_mask, 0b011);
684
685        // The owner can reserve the slot again.
686        let _ = SlotBackend::reserve_slot(&owner, 0b011).expect("reuse");
687    }
688
689    #[test]
690    fn next_sn_increments_atomically() {
691        let flink = unique_flink();
692        let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
693
694        let h0 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
695        let sn0 = SlotBackend::commit_slot(&owner, h0, &[0]).unwrap();
696        let h1 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
697        let sn1 = SlotBackend::commit_slot(&owner, h1, &[1]).unwrap();
698        assert!(sn1 > sn0);
699    }
700}