Skip to main content

vox_shm/
lib.rs

1//! Shared-memory transport for vox.
2//!
3//! Implements [`Link`] over lock-free ring buffers in shared memory.
4//! Inline bipbuf payloads are copied into boxed backing; slot-ref payloads are exposed
5//! as shared zero-copy backing and freed when the backing is dropped.
6
7use std::io;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
10use std::sync::{Arc, Mutex};
11
12use shm_primitives::{BipBuf, PeerId};
13use shm_primitives_async::{Doorbell, SignalResult};
14use tracing::{debug, trace, warn};
15use vox_types::{
16    Backing, Link, LinkRx, LinkTx, LinkTxPermit, SharedBacking, TransportMode, WriteSlot,
17};
18
19use crate::framing::{DEFAULT_INLINE_THRESHOLD, MmapRef, OwnedFrame};
20use crate::mmap_registry::{
21    MmapAllocation, MmapAttachments, MmapChannelRx, MmapChannelTx, MmapRegistry,
22};
23
24pub mod bootstrap;
25pub mod framing;
26pub mod host;
27pub mod mmap_registry;
28pub mod peer_table;
29pub mod segment;
30pub mod varslot;
31
32pub use segment::{AttachError, Segment, SegmentConfig, SegmentLayout};
33pub use varslot::{SizeClassConfig, SlotRef, VarSlotPool};
34
35pub use host::create_test_link_pair;
36pub use host::{
37    AddPeerOptions, GuestSpawnTicket, HostHub, HostPeer, MultiPeerHostDriver, PreparedPeer, ShmHost,
38};
39#[cfg(windows)]
40pub use host::{guest_link_from_names, guest_link_from_ticket_windows};
41#[cfg(unix)]
42pub use host::{guest_link_from_raw, guest_link_from_ticket};
43
44pub mod driver {
45    pub use crate::host::{AddPeerOptions, MultiPeerHostDriver, ShmHost};
46}
47
48const SLOT_LEN_PREFIX_SIZE: usize = 4;
49
50#[derive(Clone)]
51struct Backend(Arc<Segment>);
52
53impl Backend {
54    fn allocate_slot(&self, size: u32, owner_peer: u8) -> Option<SlotRef> {
55        self.0.var_pool().allocate(size, owner_peer)
56    }
57
58    fn free_slot(&self, slot_ref: SlotRef) {
59        let _ = self.0.var_pool().free(slot_ref);
60    }
61
62    unsafe fn slot_data<'a>(&self, slot_ref: &SlotRef) -> &'a [u8] {
63        unsafe { self.0.var_pool().slot_data(slot_ref) }
64    }
65
66    unsafe fn slot_data_mut<'a>(&self, slot_ref: &SlotRef) -> &'a mut [u8] {
67        unsafe { self.0.var_pool().slot_data_mut(slot_ref) }
68    }
69
70    fn max_slot_size(&self) -> Option<u32> {
71        let pool = self.0.var_pool();
72        let class_count = pool.class_count();
73        if class_count == 0 {
74            return None;
75        }
76        let mut max_size = 0u32;
77        for class_idx in 0..class_count {
78            max_size = max_size.max(pool.slot_size(class_idx));
79        }
80        Some(max_size)
81    }
82}
83
84struct TxShared {
85    tx_bipbuf: Arc<BipBuf>,
86    backend: Backend,
87    owner_peer: u8,
88    max_payload_size: u32,
89    inline_threshold: u32,
90    max_varslot_payload_size: Option<u32>,
91    reserve_ring_bytes: u32,
92    tx_lock: Mutex<()>,
93    doorbell: Arc<Doorbell>,
94    doorbell_dead: AtomicBool,
95    stats: Arc<ShmTransportStats>,
96    mmap_registry: Mutex<MmapRegistry>,
97}
98
99/// A [`Link`] over shared memory ring buffers.
100// r[impl transport.shm]
101// r[impl zerocopy.framing.link.shm]
102pub struct ShmLink {
103    tx_shared: Arc<TxShared>,
104    rx_bipbuf: Arc<BipBuf>,
105    rx_backend: Backend,
106    tx_closed: Arc<AtomicBool>,
107    peer_closed: Arc<AtomicBool>,
108    mmap_attachments: MmapAttachments,
109}
110
111#[derive(Default)]
112struct ShmTransportStats {
113    inline_sends: AtomicU64,
114    slot_ref_sends: AtomicU64,
115    mmap_ref_sends: AtomicU64,
116    inline_recvs: AtomicU64,
117    slot_ref_recvs: AtomicU64,
118    mmap_ref_recvs: AtomicU64,
119    varslot_exhausted: AtomicU64,
120    ring_exhausted: AtomicU64,
121    reserve_waits: AtomicU64,
122    commit_retries: AtomicU64,
123    doorbell_peer_dead: AtomicU64,
124    doorbell_wait_errors: AtomicU64,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub struct ShmTransportStatsSnapshot {
129    pub inline_sends: u64,
130    pub slot_ref_sends: u64,
131    pub mmap_ref_sends: u64,
132    pub inline_recvs: u64,
133    pub slot_ref_recvs: u64,
134    pub mmap_ref_recvs: u64,
135    pub varslot_exhausted: u64,
136    pub ring_exhausted: u64,
137    pub reserve_waits: u64,
138    pub commit_retries: u64,
139    pub doorbell_peer_dead: u64,
140    pub doorbell_wait_errors: u64,
141}
142
143impl ShmTransportStats {
144    fn snapshot(&self) -> ShmTransportStatsSnapshot {
145        ShmTransportStatsSnapshot {
146            inline_sends: self.inline_sends.load(AtomicOrdering::Relaxed),
147            slot_ref_sends: self.slot_ref_sends.load(AtomicOrdering::Relaxed),
148            mmap_ref_sends: self.mmap_ref_sends.load(AtomicOrdering::Relaxed),
149            inline_recvs: self.inline_recvs.load(AtomicOrdering::Relaxed),
150            slot_ref_recvs: self.slot_ref_recvs.load(AtomicOrdering::Relaxed),
151            mmap_ref_recvs: self.mmap_ref_recvs.load(AtomicOrdering::Relaxed),
152            varslot_exhausted: self.varslot_exhausted.load(AtomicOrdering::Relaxed),
153            ring_exhausted: self.ring_exhausted.load(AtomicOrdering::Relaxed),
154            reserve_waits: self.reserve_waits.load(AtomicOrdering::Relaxed),
155            commit_retries: self.commit_retries.load(AtomicOrdering::Relaxed),
156            doorbell_peer_dead: self.doorbell_peer_dead.load(AtomicOrdering::Relaxed),
157            doorbell_wait_errors: self.doorbell_wait_errors.load(AtomicOrdering::Relaxed),
158        }
159    }
160}
161
162impl ShmLink {
163    fn normalize_threshold(threshold: u32) -> u32 {
164        if threshold == 0 {
165            DEFAULT_INLINE_THRESHOLD
166        } else {
167            threshold
168        }
169    }
170
171    #[allow(clippy::too_many_arguments)]
172    fn from_parts(
173        tx_bipbuf: Arc<BipBuf>,
174        rx_bipbuf: Arc<BipBuf>,
175        backend: Backend,
176        doorbell: Arc<Doorbell>,
177        owner_peer: u8,
178        max_payload_size: u32,
179        inline_threshold: u32,
180        tx_closed: Arc<AtomicBool>,
181        peer_closed: Arc<AtomicBool>,
182        mmap_channel_tx: MmapChannelTx,
183        mmap_channel_rx: MmapChannelRx,
184    ) -> Self {
185        let ring_capacity = tx_bipbuf.capacity();
186        let ring_contiguous_ceiling = ring_capacity.saturating_sub(1);
187        let inline_ceiling = max_payload_size
188            .min(Self::normalize_threshold(inline_threshold))
189            .min(ring_contiguous_ceiling.saturating_sub(framing::FRAME_HEADER_SIZE as u32));
190        let reserve_inline_bytes = ((framing::FRAME_HEADER_SIZE as u32 + inline_ceiling) + 3) & !3;
191        let reserve_ring_bytes = reserve_inline_bytes
192            .max(framing::SLOT_REF_ENTRY_SIZE)
193            .max(framing::MMAP_REF_ENTRY_SIZE)
194            .min(ring_contiguous_ceiling);
195        let max_varslot_payload_size = backend
196            .max_slot_size()
197            .and_then(|slot_size| slot_size.checked_sub(SLOT_LEN_PREFIX_SIZE as u32));
198
199        let default_mmap_region_size = 1024 * 1024; // 1 MiB default
200        let mmap_registry = MmapRegistry::new(mmap_channel_tx, default_mmap_region_size);
201
202        let stats = Arc::new(ShmTransportStats::default());
203        let tx_shared = Arc::new(TxShared {
204            tx_bipbuf,
205            backend: backend.clone(),
206            owner_peer,
207            max_payload_size,
208            inline_threshold: Self::normalize_threshold(inline_threshold),
209            max_varslot_payload_size,
210            reserve_ring_bytes,
211            tx_lock: Mutex::new(()),
212            doorbell,
213            doorbell_dead: AtomicBool::new(false),
214            stats: stats.clone(),
215            mmap_registry: Mutex::new(mmap_registry),
216        });
217
218        let mmap_attachments = MmapAttachments::new(mmap_channel_rx);
219
220        Self {
221            tx_shared,
222            rx_bipbuf,
223            rx_backend: backend,
224            tx_closed,
225            peer_closed,
226            mmap_attachments,
227        }
228    }
229
230    /// Build a guest-side SHM link from a shared segment.
231    pub fn for_guest(
232        segment: Arc<Segment>,
233        peer_id: PeerId,
234        doorbell: Doorbell,
235        mmap_channel_tx: MmapChannelTx,
236        mmap_channel_rx: MmapChannelRx,
237    ) -> Self {
238        let tx_bipbuf = Arc::new(segment.g2h_bipbuf(peer_id));
239        let rx_bipbuf = Arc::new(segment.h2g_bipbuf(peer_id));
240        let backend = Backend(segment.clone());
241
242        Self::from_parts(
243            tx_bipbuf,
244            rx_bipbuf,
245            backend,
246            Arc::new(doorbell),
247            peer_id.get(),
248            segment.header().max_payload_size,
249            segment.header().inline_threshold,
250            Arc::new(AtomicBool::new(false)),
251            Arc::new(AtomicBool::new(false)),
252            mmap_channel_tx,
253            mmap_channel_rx,
254        )
255    }
256
257    /// Build a host-side SHM link for one peer from a shared segment.
258    pub fn for_host(
259        segment: Arc<Segment>,
260        peer_id: PeerId,
261        doorbell: Doorbell,
262        mmap_channel_tx: MmapChannelTx,
263        mmap_channel_rx: MmapChannelRx,
264    ) -> Self {
265        let tx_bipbuf = Arc::new(segment.h2g_bipbuf(peer_id));
266        let rx_bipbuf = Arc::new(segment.g2h_bipbuf(peer_id));
267        let backend = Backend(segment.clone());
268
269        Self::from_parts(
270            tx_bipbuf,
271            rx_bipbuf,
272            backend,
273            Arc::new(doorbell),
274            0,
275            segment.header().max_payload_size,
276            segment.header().inline_threshold,
277            Arc::new(AtomicBool::new(false)),
278            Arc::new(AtomicBool::new(false)),
279            mmap_channel_tx,
280            mmap_channel_rx,
281        )
282    }
283
284    /// Accept the doorbell connection (Windows only).
285    ///
286    /// On Windows, the named pipe server must call `ConnectNamedPipe` before
287    /// data can flow.  Call this on the **host** link after the guest has
288    /// connected (i.e., after `Doorbell::from_handle` on the guest side).
289    ///
290    /// No-op on Unix.
291    #[cfg(windows)]
292    pub async fn accept_doorbell(&self) -> std::io::Result<()> {
293        self.tx_shared.doorbell.accept().await
294    }
295}
296
297/// Sending half of a [`ShmLink`].
298pub struct ShmLinkTx {
299    shared: Arc<TxShared>,
300    tx_closed: Arc<AtomicBool>,
301}
302
303/// Receiving half of a [`ShmLink`].
304pub struct ShmLinkRx {
305    rx_bipbuf: Arc<BipBuf>,
306    backend: Backend,
307    peer_closed: Arc<AtomicBool>,
308    doorbell: Arc<Doorbell>,
309    stats: Arc<ShmTransportStats>,
310    mmap_attachments: MmapAttachments,
311}
312
313pub struct ShmTxPermit {
314    shared: Arc<TxShared>,
315    tx_closed: Arc<AtomicBool>,
316}
317
318enum ShmWriteSlotInner {
319    Inline {
320        bytes: Vec<u8>,
321    },
322    VarSlot {
323        slot_ref: Option<SlotRef>,
324        payload_len: usize,
325    },
326    MmapRef {
327        alloc: Option<MmapAllocation>,
328        payload_len: usize,
329    },
330}
331
332pub struct ShmWriteSlot {
333    shared: Arc<TxShared>,
334    inner: ShmWriteSlotInner,
335}
336
337impl Drop for ShmWriteSlot {
338    fn drop(&mut self) {
339        match &mut self.inner {
340            ShmWriteSlotInner::VarSlot { slot_ref, .. } => {
341                if let Some(slot_ref) = slot_ref.take() {
342                    self.shared.backend.free_slot(slot_ref);
343                    if matches!(self.shared.doorbell.signal_now(), SignalResult::PeerDead) {
344                        self.shared.doorbell_dead.store(true, Ordering::Release);
345                        self.shared
346                            .stats
347                            .doorbell_peer_dead
348                            .fetch_add(1, AtomicOrdering::Relaxed);
349                    }
350                }
351            }
352            ShmWriteSlotInner::MmapRef { alloc, .. } => {
353                // r[impl shm.mmap.release]
354                if let Some(alloc) = alloc.take() {
355                    alloc.lease_counter.fetch_sub(1, Ordering::Release);
356                }
357            }
358            ShmWriteSlotInner::Inline { .. } => {}
359        }
360    }
361}
362
363impl Link for ShmLink {
364    type Tx = ShmLinkTx;
365    type Rx = ShmLinkRx;
366
367    fn split(self) -> (Self::Tx, Self::Rx) {
368        let tx_shared = self.tx_shared;
369        let doorbell = tx_shared.doorbell.clone();
370        let stats = tx_shared.stats.clone();
371        (
372            ShmLinkTx {
373                shared: tx_shared,
374                tx_closed: self.tx_closed,
375            },
376            ShmLinkRx {
377                rx_bipbuf: self.rx_bipbuf,
378                backend: self.rx_backend,
379                peer_closed: self.peer_closed,
380                doorbell,
381                stats,
382                mmap_attachments: self.mmap_attachments,
383            },
384        )
385    }
386
387    fn supports_transport_mode(mode: TransportMode) -> bool
388    where
389        Self: Sized,
390    {
391        matches!(mode, TransportMode::Bare)
392    }
393}
394
395impl LinkTx for ShmLinkTx {
396    type Permit = ShmTxPermit;
397
398    async fn reserve(&self) -> io::Result<Self::Permit> {
399        loop {
400            if self.tx_closed.load(Ordering::Acquire) {
401                return Err(io::Error::new(
402                    io::ErrorKind::BrokenPipe,
403                    "shm tx is closed",
404                ));
405            }
406            if self.shared.doorbell_dead.load(Ordering::Acquire) {
407                return Err(io::Error::new(
408                    io::ErrorKind::BrokenPipe,
409                    "shm doorbell peer is closed",
410                ));
411            }
412            if self
413                .shared
414                .tx_bipbuf
415                .inner()
416                .can_grant(self.shared.reserve_ring_bytes)
417            {
418                return Ok(ShmTxPermit {
419                    shared: self.shared.clone(),
420                    tx_closed: self.tx_closed.clone(),
421                });
422            }
423
424            self.shared
425                .stats
426                .reserve_waits
427                .fetch_add(1, AtomicOrdering::Relaxed);
428            self.shared
429                .stats
430                .ring_exhausted
431                .fetch_add(1, AtomicOrdering::Relaxed);
432            if let Err(err) = self.shared.doorbell.wait().await {
433                self.shared
434                    .stats
435                    .doorbell_wait_errors
436                    .fetch_add(1, AtomicOrdering::Relaxed);
437                warn!(
438                    error = %err,
439                    raw_os_error = ?err.raw_os_error(),
440                    "shm tx doorbell wait failed"
441                );
442                return Err(io::Error::new(
443                    io::ErrorKind::BrokenPipe,
444                    format!("shm doorbell wait failed: {err}"),
445                ));
446            }
447        }
448    }
449
450    async fn close(self) -> io::Result<()> {
451        self.tx_closed.store(true, Ordering::Release);
452        match self.shared.doorbell.signal_now() {
453            SignalResult::PeerDead => {
454                self.shared.doorbell_dead.store(true, Ordering::Release);
455                self.shared
456                    .stats
457                    .doorbell_peer_dead
458                    .fetch_add(1, AtomicOrdering::Relaxed);
459                Err(io::Error::new(
460                    io::ErrorKind::BrokenPipe,
461                    "shm doorbell peer is closed",
462                ))
463            }
464            _ => Ok(()),
465        }
466    }
467}
468
469// r[impl zerocopy.send.shm]
470impl LinkTxPermit for ShmTxPermit {
471    type Slot = ShmWriteSlot;
472
473    fn alloc(self, len: usize) -> io::Result<Self::Slot> {
474        if self.tx_closed.load(Ordering::Acquire) {
475            return Err(io::Error::new(
476                io::ErrorKind::BrokenPipe,
477                "shm tx is closed",
478            ));
479        }
480        if len > self.shared.max_payload_size as usize {
481            return Err(io::Error::new(
482                io::ErrorKind::InvalidInput,
483                "payload exceeds max_payload_size",
484            ));
485        }
486
487        if len as u32 <= self.shared.inline_threshold {
488            return Ok(ShmWriteSlot {
489                shared: self.shared.clone(),
490                inner: ShmWriteSlotInner::Inline {
491                    bytes: vec![0; len],
492                },
493            });
494        }
495        let needed = len.checked_add(SLOT_LEN_PREFIX_SIZE).ok_or_else(|| {
496            io::Error::new(
497                io::ErrorKind::InvalidInput,
498                "payload length overflow while allocating slot-ref",
499            )
500        })?;
501        let needed_u32 = u32::try_from(needed).map_err(|_| {
502            io::Error::new(
503                io::ErrorKind::InvalidInput,
504                "payload length exceeds varslot addressing range",
505            )
506        })?;
507        let max_varslot_payload = self.shared.max_varslot_payload_size.ok_or_else(|| {
508            io::Error::new(
509                io::ErrorKind::Unsupported,
510                "payload exceeds inline threshold but no varslot class is configured",
511            )
512        })?;
513        if len as u32 > max_varslot_payload {
514            // r[impl shm.mmap.ordering]
515            // Payload exceeds varslot — use mmap-ref path.
516            // Step 1: alloc delivers fd to peer (ordering: registry visible)
517            let mut registry = self
518                .shared
519                .mmap_registry
520                .lock()
521                .expect("mmap registry poisoned");
522            let alloc = registry
523                .alloc(len)
524                .map_err(|e| io::Error::other(format!("mmap alloc failed: {e}")))?;
525            return Ok(ShmWriteSlot {
526                shared: self.shared.clone(),
527                inner: ShmWriteSlotInner::MmapRef {
528                    alloc: Some(alloc),
529                    payload_len: len,
530                },
531            });
532        }
533        let slot_ref = self
534            .shared
535            .backend
536            .allocate_slot(needed_u32, self.shared.owner_peer)
537            .ok_or_else(|| {
538                self.shared
539                    .stats
540                    .varslot_exhausted
541                    .fetch_add(1, AtomicOrdering::Relaxed);
542                if matches!(self.shared.doorbell.signal_now(), SignalResult::PeerDead) {
543                    self.shared.doorbell_dead.store(true, Ordering::Release);
544                    self.shared
545                        .stats
546                        .doorbell_peer_dead
547                        .fetch_add(1, AtomicOrdering::Relaxed);
548                }
549                io::Error::new(
550                    io::ErrorKind::WouldBlock,
551                    "varslot exhausted; retry on next reserve/send cycle",
552                )
553            })?;
554
555        Ok(ShmWriteSlot {
556            shared: self.shared.clone(),
557            inner: ShmWriteSlotInner::VarSlot {
558                slot_ref: Some(slot_ref),
559                payload_len: len,
560            },
561        })
562    }
563}
564
565impl WriteSlot for ShmWriteSlot {
566    fn as_mut_slice(&mut self) -> &mut [u8] {
567        match &mut self.inner {
568            ShmWriteSlotInner::Inline { bytes } => bytes.as_mut_slice(),
569            ShmWriteSlotInner::VarSlot {
570                slot_ref,
571                payload_len,
572            } => {
573                let slot_ref = slot_ref
574                    .as_ref()
575                    .expect("slot must be present while write slot is alive");
576                let end = SLOT_LEN_PREFIX_SIZE + *payload_len;
577                let data = unsafe { self.shared.backend.slot_data_mut(slot_ref) };
578                &mut data[SLOT_LEN_PREFIX_SIZE..end]
579            }
580            ShmWriteSlotInner::MmapRef { alloc, payload_len } => {
581                let alloc = alloc
582                    .as_mut()
583                    .expect("mmap alloc must be present while write slot is alive");
584                // SAFETY: We just allocated this range and no one else is reading it.
585                unsafe { alloc.payload_mut(*payload_len) }
586            }
587        }
588    }
589
590    fn commit(mut self) {
591        fn ring_doorbell(shared: &TxShared) {
592            if matches!(shared.doorbell.signal_now(), SignalResult::PeerDead) {
593                shared.doorbell_dead.store(true, Ordering::Release);
594                shared
595                    .stats
596                    .doorbell_peer_dead
597                    .fetch_add(1, AtomicOrdering::Relaxed);
598            }
599        }
600
601        match &mut self.inner {
602            ShmWriteSlotInner::Inline { bytes } => loop {
603                let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
604                let (mut producer, _) = self.shared.tx_bipbuf.split();
605                let result = framing::write_inline(&mut producer, bytes);
606                drop(lock);
607                match result {
608                    Ok(()) => {
609                        self.shared
610                            .stats
611                            .inline_sends
612                            .fetch_add(1, AtomicOrdering::Relaxed);
613                        ring_doorbell(&self.shared);
614                        return;
615                    }
616                    Err(_) => {
617                        self.shared
618                            .stats
619                            .commit_retries
620                            .fetch_add(1, AtomicOrdering::Relaxed);
621                        std::thread::yield_now();
622                    }
623                }
624            },
625            ShmWriteSlotInner::VarSlot {
626                slot_ref,
627                payload_len,
628            } => {
629                let Some(slot_ref_value) = *slot_ref else {
630                    return;
631                };
632
633                {
634                    let payload_len_bytes = (*payload_len as u32).to_le_bytes();
635                    let data = unsafe { self.shared.backend.slot_data_mut(&slot_ref_value) };
636                    data[..SLOT_LEN_PREFIX_SIZE].copy_from_slice(&payload_len_bytes);
637                }
638
639                loop {
640                    let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
641                    let (mut producer, _) = self.shared.tx_bipbuf.split();
642                    let result = framing::write_slot_ref(&mut producer, &slot_ref_value);
643                    drop(lock);
644                    match result {
645                        Ok(()) => {
646                            *slot_ref = None;
647                            self.shared
648                                .stats
649                                .slot_ref_sends
650                                .fetch_add(1, AtomicOrdering::Relaxed);
651                            ring_doorbell(&self.shared);
652                            return;
653                        }
654                        Err(_) => {
655                            self.shared
656                                .stats
657                                .commit_retries
658                                .fetch_add(1, AtomicOrdering::Relaxed);
659                            std::thread::yield_now();
660                        }
661                    }
662                }
663            }
664            // r[impl shm.mmap.ordering]
665            ShmWriteSlotInner::MmapRef { alloc, payload_len } => {
666                let Some(alloc_value) = alloc.take() else {
667                    return;
668                };
669
670                // Step 2→3: bytes are initialized (caller wrote them), issue release fence
671                std::sync::atomic::fence(Ordering::Release);
672
673                let mmap_ref = MmapRef {
674                    map_id: alloc_value.map_id,
675                    map_generation: alloc_value.map_generation,
676                    map_offset: alloc_value.map_offset,
677                    payload_len: *payload_len as u32,
678                };
679
680                loop {
681                    let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
682                    let (mut producer, _) = self.shared.tx_bipbuf.split();
683                    let result = framing::write_mmap_ref(&mut producer, &mmap_ref);
684                    drop(lock);
685                    match result {
686                        Ok(()) => {
687                            // The lease counter stays at 1 — the receiver will hold
688                            // it via ShmMmapBacking and decrement on drop.
689                            // alloc_value (no Drop impl) goes out of scope naturally.
690                            self.shared
691                                .stats
692                                .mmap_ref_sends
693                                .fetch_add(1, AtomicOrdering::Relaxed);
694                            ring_doorbell(&self.shared);
695                            return;
696                        }
697                        Err(_) => {
698                            self.shared
699                                .stats
700                                .commit_retries
701                                .fetch_add(1, AtomicOrdering::Relaxed);
702                            std::thread::yield_now();
703                        }
704                    }
705                }
706            }
707        }
708    }
709}
710
711#[derive(Debug)]
712pub enum ShmLinkRxError {
713    MmapResolve(crate::mmap_registry::MmapResolveError),
714    DoorbellWait(io::Error),
715    MalformedSlotRefLength {
716        slot_bytes: usize,
717        payload_len: usize,
718    },
719}
720
721impl std::fmt::Display for ShmLinkRxError {
722    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723        match self {
724            ShmLinkRxError::MmapResolve(err) => write!(f, "mmap resolve failed: {err}"),
725            ShmLinkRxError::DoorbellWait(err) => {
726                write!(
727                    f,
728                    "doorbell wait failed: {} (raw_os_error={:?})",
729                    err,
730                    err.raw_os_error()
731                )
732            }
733            ShmLinkRxError::MalformedSlotRefLength {
734                slot_bytes,
735                payload_len,
736            } => write!(
737                f,
738                "malformed slot-ref payload length: payload_len={payload_len}, slot_bytes={slot_bytes}"
739            ),
740        }
741    }
742}
743
744impl std::error::Error for ShmLinkRxError {}
745
746impl LinkRx for ShmLinkRx {
747    type Error = ShmLinkRxError;
748
749    async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
750        loop {
751            let (_, mut consumer) = self.rx_bipbuf.split();
752            if let Some(frame) = framing::read_frame(&mut consumer) {
753                return match frame {
754                    // r[impl zerocopy.recv.shm.inline]
755                    // r[impl zerocopy.backing.bipbuf]
756                    OwnedFrame::Inline(bytes) => {
757                        trace!(len = bytes.len(), "shm rx received inline frame");
758                        self.stats
759                            .inline_recvs
760                            .fetch_add(1, AtomicOrdering::Relaxed);
761                        if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
762                            let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
763                            self.stats
764                                .doorbell_peer_dead
765                                .fetch_add(1, AtomicOrdering::Relaxed);
766                            if !was_closed {
767                                debug!("shm rx observed peer dead while draining inline frame");
768                            }
769                        }
770                        Ok(Some(Backing::Boxed(bytes.into_boxed_slice())))
771                    }
772                    // r[impl zerocopy.recv.shm.slotref]
773                    OwnedFrame::SlotRef(slot_ref) => {
774                        trace!(slot_ref = ?slot_ref, "shm rx received slot-ref frame");
775                        self.stats
776                            .slot_ref_recvs
777                            .fetch_add(1, AtomicOrdering::Relaxed);
778                        if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
779                            let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
780                            self.stats
781                                .doorbell_peer_dead
782                                .fetch_add(1, AtomicOrdering::Relaxed);
783                            if !was_closed {
784                                debug!("shm rx observed peer dead while draining slot-ref frame");
785                            }
786                        }
787                        let slot = unsafe { self.backend.slot_data(&slot_ref) };
788                        if slot.len() < SLOT_LEN_PREFIX_SIZE {
789                            self.backend.free_slot(slot_ref);
790                            warn!(
791                                slot_ref = ?slot_ref,
792                                slot_bytes = slot.len(),
793                                "shm rx malformed slot-ref: missing payload length prefix"
794                            );
795                            return Err(ShmLinkRxError::MalformedSlotRefLength {
796                                slot_bytes: slot.len(),
797                                payload_len: 0,
798                            });
799                        }
800
801                        let payload_len =
802                            u32::from_le_bytes([slot[0], slot[1], slot[2], slot[3]]) as usize;
803                        if payload_len > slot.len().saturating_sub(SLOT_LEN_PREFIX_SIZE) {
804                            self.backend.free_slot(slot_ref);
805                            warn!(
806                                slot_ref = ?slot_ref,
807                                slot_bytes = slot.len(),
808                                payload_len,
809                                "shm rx malformed slot-ref: payload length exceeds slot size"
810                            );
811                            return Err(ShmLinkRxError::MalformedSlotRefLength {
812                                slot_bytes: slot.len(),
813                                payload_len,
814                            });
815                        }
816                        trace!(
817                            slot_ref = ?slot_ref,
818                            slot_bytes = slot.len(),
819                            payload_len,
820                            "shm rx slot-ref frame decoded"
821                        );
822
823                        Ok(Some(Backing::shared(Arc::new(ShmVarSlotBacking {
824                            backend: self.backend.clone(),
825                            slot_ref,
826                            payload_len,
827                            doorbell: self.doorbell.clone(),
828                            peer_closed: self.peer_closed.clone(),
829                            stats: self.stats.clone(),
830                        }))))
831                    }
832                    // r[impl zerocopy.recv.shm.mmap]
833                    OwnedFrame::MmapRef(mmap_ref) => {
834                        trace!(
835                            map_id = mmap_ref.map_id,
836                            map_generation = mmap_ref.map_generation,
837                            map_offset = mmap_ref.map_offset,
838                            payload_len = mmap_ref.payload_len,
839                            "shm rx received mmap-ref frame"
840                        );
841                        self.stats
842                            .mmap_ref_recvs
843                            .fetch_add(1, AtomicOrdering::Relaxed);
844                        if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
845                            let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
846                            self.stats
847                                .doorbell_peer_dead
848                                .fetch_add(1, AtomicOrdering::Relaxed);
849                            if !was_closed {
850                                debug!("shm rx observed peer dead while draining mmap-ref frame");
851                            }
852                        }
853
854                        let mapping = match self.mmap_attachments.resolve_with_grace(
855                            mmap_ref.map_id,
856                            mmap_ref.map_generation,
857                            mmap_ref.map_offset,
858                            mmap_ref.payload_len,
859                        ) {
860                            Ok(mapping) => mapping,
861                            Err(error) => {
862                                warn!(
863                                    map_id = mmap_ref.map_id,
864                                    map_generation = mmap_ref.map_generation,
865                                    map_offset = mmap_ref.map_offset,
866                                    payload_len = mmap_ref.payload_len,
867                                    error = %error,
868                                    "shm rx mmap-ref resolve failed"
869                                );
870                                // r[impl shm.mmap.attach.protocol-error]
871                                return Err(ShmLinkRxError::MmapResolve(error));
872                            }
873                        };
874
875                        // r[impl zerocopy.backing.mmap]
876                        trace!(
877                            map_id = mmap_ref.map_id,
878                            map_generation = mmap_ref.map_generation,
879                            map_offset = mmap_ref.map_offset,
880                            payload_len = mmap_ref.payload_len,
881                            "shm rx mmap-ref resolved"
882                        );
883                        Ok(Some(Backing::shared(Arc::new(ShmMmapBacking {
884                            mapping,
885                            offset: mmap_ref.map_offset as usize,
886                            len: mmap_ref.payload_len as usize,
887                        }))))
888                    }
889                };
890            }
891
892            if self.peer_closed.load(Ordering::Acquire) && self.rx_bipbuf.inner().is_empty() {
893                debug!("shm rx returning EOF: peer closed and rx bipbuf is empty");
894                return Ok(None);
895            }
896
897            trace!("shm rx waiting on doorbell");
898            if let Err(err) = self.doorbell.wait().await {
899                self.stats
900                    .doorbell_wait_errors
901                    .fetch_add(1, AtomicOrdering::Relaxed);
902                warn!(
903                    error = %err,
904                    raw_os_error = ?err.raw_os_error(),
905                    "shm rx doorbell wait failed"
906                );
907                return Err(ShmLinkRxError::DoorbellWait(err));
908            }
909            trace!("shm rx woke from doorbell wait");
910        }
911    }
912}
913
914impl ShmLinkTx {
915    pub fn stats(&self) -> ShmTransportStatsSnapshot {
916        self.shared.stats.snapshot()
917    }
918}
919
920impl ShmLinkRx {
921    pub fn stats(&self) -> ShmTransportStatsSnapshot {
922        self.stats.snapshot()
923    }
924}
925
926// r[impl zerocopy.backing.varslot]
927struct ShmVarSlotBacking {
928    backend: Backend,
929    slot_ref: SlotRef,
930    payload_len: usize,
931    doorbell: Arc<Doorbell>,
932    peer_closed: Arc<AtomicBool>,
933    stats: Arc<ShmTransportStats>,
934}
935
936impl SharedBacking for ShmVarSlotBacking {
937    fn as_bytes(&self) -> &[u8] {
938        let slot = unsafe { self.backend.slot_data(&self.slot_ref) };
939        let end = SLOT_LEN_PREFIX_SIZE + self.payload_len;
940        &slot[SLOT_LEN_PREFIX_SIZE..end]
941    }
942}
943
944impl Drop for ShmVarSlotBacking {
945    fn drop(&mut self) {
946        self.backend.free_slot(self.slot_ref);
947        if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
948            self.peer_closed.store(true, Ordering::Release);
949            self.stats
950                .doorbell_peer_dead
951                .fetch_add(1, AtomicOrdering::Relaxed);
952        }
953    }
954}
955
956// r[impl zerocopy.backing.mmap]
957struct ShmMmapBacking {
958    mapping: Arc<mmap_registry::AttachedMapping>,
959    offset: usize,
960    len: usize,
961}
962
963impl SharedBacking for ShmMmapBacking {
964    fn as_bytes(&self) -> &[u8] {
965        let region = self.mapping.region.region();
966        let ptr = region.as_ptr();
967        // SAFETY: offset+len was bounds-checked during resolve
968        unsafe { std::slice::from_raw_parts(ptr.add(self.offset), self.len) }
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use std::sync::Arc;
975    use std::time::Duration;
976
977    use shm_primitives::FileCleanup;
978    use tokio::time::timeout;
979    use vox_types::{LinkRx as _, LinkTx as _, LinkTxPermit as _};
980
981    use super::*;
982    use crate::host::create_test_link_pair;
983    use crate::segment::SegmentConfig;
984
985    /// Create a real segment-backed test link pair.
986    async fn make_test_pair(
987        bipbuf_capacity: u32,
988        max_payload_size: u32,
989        inline_threshold: u32,
990        size_classes: &[SizeClassConfig],
991    ) -> (ShmLink, ShmLink, tempfile::TempDir) {
992        let dir = tempfile::tempdir().expect("tempdir");
993        let path = dir.path().join("test.shm");
994        let segment = Arc::new(
995            Segment::create(
996                &path,
997                SegmentConfig {
998                    max_guests: 1,
999                    bipbuf_capacity,
1000                    max_payload_size,
1001                    inline_threshold,
1002                    heartbeat_interval: 0,
1003                    size_classes,
1004                },
1005                FileCleanup::Manual,
1006            )
1007            .expect("create segment"),
1008        );
1009        let (a, b) = create_test_link_pair(segment)
1010            .await
1011            .expect("create_test_link_pair");
1012        (a, b, dir)
1013    }
1014
1015    const CLASSES: &[SizeClassConfig] = &[SizeClassConfig {
1016        slot_size: 256,
1017        slot_count: 1,
1018    }];
1019
1020    #[tokio::test]
1021    async fn inline_payload_roundtrip_is_boxed() {
1022        let (a, b, _dir) = make_test_pair(4096, 1024, 128, CLASSES).await;
1023        let (a_tx, _a_rx) = a.split();
1024        let (_b_tx, mut b_rx) = b.split();
1025
1026        let payload = b"inline hello";
1027        let permit = a_tx.reserve().await.unwrap();
1028        let mut slot = permit.alloc(payload.len()).unwrap();
1029        slot.as_mut_slice().copy_from_slice(payload);
1030        slot.commit();
1031
1032        let backing = b_rx.recv().await.unwrap().unwrap();
1033        match backing {
1034            Backing::Boxed(bytes) => assert_eq!(&*bytes, payload),
1035            Backing::Shared(_) => panic!("inline path must be boxed"),
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn slot_ref_payload_is_zero_copy_shared_backing() {
1041        let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1042        let (a_tx, _a_rx) = a.split();
1043        let (_b_tx, mut b_rx) = b.split();
1044
1045        let payload = vec![7_u8; 200];
1046        let permit = a_tx.reserve().await.unwrap();
1047        let mut slot = permit.alloc(payload.len()).unwrap();
1048        slot.as_mut_slice().copy_from_slice(&payload);
1049        slot.commit();
1050
1051        let backing = b_rx.recv().await.unwrap().unwrap();
1052        match backing {
1053            Backing::Shared(shared) => assert_eq!(shared.as_bytes(), payload.as_slice()),
1054            Backing::Boxed(_) => panic!("slot-ref path must be shared"),
1055        }
1056    }
1057
1058    #[tokio::test]
1059    async fn shared_backing_drop_releases_slot() {
1060        let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1061        let (a_tx, _a_rx) = a.split();
1062        let (_b_tx, mut b_rx) = b.split();
1063
1064        let payload = vec![1_u8; 200];
1065        let permit = a_tx.reserve().await.unwrap();
1066        let mut slot = permit.alloc(payload.len()).unwrap();
1067        slot.as_mut_slice().copy_from_slice(&payload);
1068        slot.commit();
1069
1070        let backing = b_rx.recv().await.unwrap().unwrap();
1071
1072        // single-slot pool: reserve stays size-agnostic and alloc fails immediately.
1073        let permit2 = a_tx.reserve().await.unwrap();
1074        match permit2.alloc(payload.len()) {
1075            Ok(_) => panic!("alloc should fail while slot is still held by shared backing"),
1076            Err(err) => assert_eq!(err.kind(), io::ErrorKind::WouldBlock),
1077        }
1078
1079        drop(backing);
1080
1081        let permit3 = a_tx.reserve().await.unwrap();
1082        let _slot3 = permit3
1083            .alloc(payload.len())
1084            .expect("slot must be released after drop");
1085    }
1086
1087    #[tokio::test]
1088    async fn mixed_payload_stress_roundtrip() {
1089        let classes = [SizeClassConfig {
1090            slot_size: 4096,
1091            slot_count: 32,
1092        }];
1093        let (a, b, _dir) = make_test_pair(1 << 16, 1 << 20, 256, &classes).await;
1094        let (a_tx, _a_rx) = a.split();
1095        let (_b_tx, mut b_rx) = b.split();
1096
1097        for i in 0..400 {
1098            let len = if i % 3 == 0 { 48 } else { 1500 };
1099            let payload = vec![(i % 239) as u8; len];
1100            let permit = a_tx.reserve().await.unwrap();
1101            let mut slot = permit.alloc(payload.len()).unwrap();
1102            slot.as_mut_slice().copy_from_slice(&payload);
1103            slot.commit();
1104
1105            let backing = b_rx.recv().await.unwrap().unwrap();
1106            assert_eq!(backing.as_bytes(), payload.as_slice());
1107        }
1108    }
1109
1110    #[tokio::test]
1111    async fn reserve_waits_until_rx_frees_ring_space() {
1112        let (a, b, _dir) = make_test_pair(32, 1024, 256, CLASSES).await;
1113        let (a_tx, _a_rx) = a.split();
1114        let (_b_tx, mut b_rx) = b.split();
1115
1116        let payload = vec![9_u8; 24]; // align4(8 + 24) = 32 (fills ring)
1117        let permit = a_tx.reserve().await.unwrap();
1118        let mut slot = permit.alloc(payload.len()).unwrap();
1119        slot.as_mut_slice().copy_from_slice(&payload);
1120        slot.commit();
1121
1122        let reserve_fut = a_tx.reserve();
1123        tokio::pin!(reserve_fut);
1124        assert!(
1125            timeout(Duration::from_millis(20), &mut reserve_fut)
1126                .await
1127                .is_err(),
1128            "reserve should wait while ring is full"
1129        );
1130
1131        let _ = b_rx.recv().await.unwrap().unwrap();
1132        let permit2 = timeout(Duration::from_secs(1), &mut reserve_fut)
1133            .await
1134            .expect("reserve should wake after recv")
1135            .expect("reserve should succeed");
1136        drop(permit2);
1137    }
1138
1139    #[tokio::test]
1140    async fn transport_stats_track_send_recv_and_exhaustion() {
1141        let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1142        let (a_tx, _a_rx) = a.split();
1143        let (_b_tx, mut b_rx) = b.split();
1144
1145        let inline = b"hello";
1146        let permit = a_tx.reserve().await.unwrap();
1147        let mut slot = permit.alloc(inline.len()).unwrap();
1148        slot.as_mut_slice().copy_from_slice(inline);
1149        slot.commit();
1150
1151        let large = vec![1_u8; 200];
1152        let permit = a_tx.reserve().await.unwrap();
1153        let mut slot = permit.alloc(large.len()).unwrap();
1154        slot.as_mut_slice().copy_from_slice(&large);
1155        slot.commit();
1156
1157        let backing1 = b_rx.recv().await.unwrap().unwrap();
1158        assert!(backing1.as_bytes().starts_with(inline));
1159        let backing2 = b_rx.recv().await.unwrap().unwrap();
1160        assert_eq!(backing2.as_bytes(), large.as_slice());
1161
1162        let permit = a_tx.reserve().await.unwrap();
1163        match permit.alloc(large.len()) {
1164            Ok(_) => panic!("alloc should fail while varslot is exhausted"),
1165            Err(err) => assert_eq!(err.kind(), io::ErrorKind::WouldBlock),
1166        }
1167
1168        drop(backing2); // free slot for future sends
1169        let permit = a_tx.reserve().await.unwrap();
1170        let _slot = permit.alloc(large.len()).expect("slot should be available");
1171
1172        let tx_stats = a_tx.stats();
1173        let rx_stats = b_rx.stats();
1174
1175        assert_eq!(tx_stats.inline_sends, 1);
1176        assert_eq!(tx_stats.slot_ref_sends, 1);
1177        assert!(tx_stats.varslot_exhausted >= 1);
1178        assert_eq!(rx_stats.inline_recvs, 1);
1179        assert_eq!(rx_stats.slot_ref_recvs, 1);
1180    }
1181
1182    // Small varslot class to force mmap path for payloads > 60 bytes
1183    const SMALL_CLASSES: &[SizeClassConfig] = &[SizeClassConfig {
1184        slot_size: 64,
1185        slot_count: 2,
1186    }];
1187
1188    #[tokio::test]
1189    async fn mmap_large_payload_roundtrip() {
1190        let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1191        let (a_tx, _a_rx) = a.split();
1192        let (_b_tx, mut b_rx) = b.split();
1193
1194        // 200 bytes exceeds max varslot payload (64 - 4 = 60 bytes) → mmap path
1195        let payload = vec![0xAB_u8; 200];
1196        let permit = a_tx.reserve().await.unwrap();
1197        let mut slot = permit.alloc(payload.len()).unwrap();
1198        slot.as_mut_slice().copy_from_slice(&payload);
1199        slot.commit();
1200
1201        let backing = b_rx.recv().await.unwrap().unwrap();
1202        match &backing {
1203            Backing::Shared(shared) => assert_eq!(shared.as_bytes(), payload.as_slice()),
1204            Backing::Boxed(_) => panic!("mmap path must be shared"),
1205        }
1206
1207        let tx_stats = a_tx.stats();
1208        let rx_stats = b_rx.stats();
1209        assert_eq!(tx_stats.mmap_ref_sends, 1);
1210        assert_eq!(rx_stats.mmap_ref_recvs, 1);
1211    }
1212
1213    #[tokio::test]
1214    async fn mmap_multiple_payloads_share_region() {
1215        let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1216        let (a_tx, _a_rx) = a.split();
1217        let (_b_tx, mut b_rx) = b.split();
1218
1219        // Send two mmap payloads — they should share the same region
1220        for i in 0u8..3 {
1221            let payload = vec![i; 200];
1222            let permit = a_tx.reserve().await.unwrap();
1223            let mut slot = permit.alloc(payload.len()).unwrap();
1224            slot.as_mut_slice().copy_from_slice(&payload);
1225            slot.commit();
1226        }
1227
1228        for i in 0u8..3 {
1229            let backing = b_rx.recv().await.unwrap().unwrap();
1230            assert_eq!(backing.as_bytes(), &vec![i; 200]);
1231        }
1232
1233        assert_eq!(a_tx.stats().mmap_ref_sends, 3);
1234        assert_eq!(b_rx.stats().mmap_ref_recvs, 3);
1235    }
1236
1237    #[tokio::test]
1238    async fn mmap_mixed_with_inline_and_varslot() {
1239        let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1240        let (a_tx, _a_rx) = a.split();
1241        let (_b_tx, mut b_rx) = b.split();
1242
1243        // Inline (≤32 bytes)
1244        let inline_payload = b"hello";
1245        let permit = a_tx.reserve().await.unwrap();
1246        let mut slot = permit.alloc(inline_payload.len()).unwrap();
1247        slot.as_mut_slice().copy_from_slice(inline_payload);
1248        slot.commit();
1249
1250        // Varslot (33..=60 bytes)
1251        let varslot_payload = vec![0x42_u8; 50];
1252        let permit = a_tx.reserve().await.unwrap();
1253        let mut slot = permit.alloc(varslot_payload.len()).unwrap();
1254        slot.as_mut_slice().copy_from_slice(&varslot_payload);
1255        slot.commit();
1256
1257        // Mmap (>60 bytes)
1258        let mmap_payload = vec![0xFF_u8; 500];
1259        let permit = a_tx.reserve().await.unwrap();
1260        let mut slot = permit.alloc(mmap_payload.len()).unwrap();
1261        slot.as_mut_slice().copy_from_slice(&mmap_payload);
1262        slot.commit();
1263
1264        let b1 = b_rx.recv().await.unwrap().unwrap();
1265        assert!(b1.as_bytes().starts_with(inline_payload));
1266
1267        let b2 = b_rx.recv().await.unwrap().unwrap();
1268        assert_eq!(b2.as_bytes(), varslot_payload.as_slice());
1269
1270        let b3 = b_rx.recv().await.unwrap().unwrap();
1271        assert_eq!(b3.as_bytes(), mmap_payload.as_slice());
1272
1273        let stats = a_tx.stats();
1274        assert_eq!(stats.inline_sends, 1);
1275        assert_eq!(stats.slot_ref_sends, 1);
1276        assert_eq!(stats.mmap_ref_sends, 1);
1277    }
1278
1279    #[tokio::test]
1280    async fn mmap_backing_survives_rx_drop_and_peer_teardown() {
1281        let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1282        let (a_tx, _a_rx) = a.split();
1283        let (_b_tx, mut b_rx) = b.split();
1284
1285        let payload = vec![0x5A_u8; 500];
1286        let permit = a_tx.reserve().await.unwrap();
1287        let mut slot = permit.alloc(payload.len()).unwrap();
1288        slot.as_mut_slice().copy_from_slice(&payload);
1289        slot.commit();
1290
1291        let backing = b_rx.recv().await.unwrap().unwrap();
1292        let shared = match backing {
1293            Backing::Shared(shared) => shared,
1294            Backing::Boxed(_) => panic!("expected mmap-backed shared payload"),
1295        };
1296
1297        // Tear down receiver state (drops MmapAttachments) and peer tx.
1298        drop(b_rx);
1299        drop(a_tx);
1300
1301        // Backing must remain valid independently of attachment table/peer lifetime.
1302        assert_eq!(shared.as_bytes(), payload.as_slice());
1303    }
1304
1305    #[cfg(unix)]
1306    #[tokio::test]
1307    async fn mmap_ref_can_arrive_before_attach_control_message() {
1308        let (_a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1309        let (_b_tx, mut b_rx) = b.split();
1310
1311        // Replace the default control channel so this test can
1312        // intentionally delay the attach message relative to the data frame.
1313        let (control_tx, control_rx) =
1314            shm_primitives_async::create_mmap_control_pair_connected().unwrap();
1315        b_rx.mmap_attachments = MmapAttachments::new(MmapChannelRx::Real(control_rx));
1316
1317        let payload = b"late mmap attach payload".to_vec();
1318        let map_id = 72_u32;
1319        let map_generation = 1_u32;
1320        let mapping_length = 4096_u64;
1321        let mmap_ref = MmapRef {
1322            map_id,
1323            map_generation,
1324            map_offset: 0,
1325            payload_len: payload.len() as u32,
1326        };
1327
1328        // Enqueue an mmap-ref frame first (before the attach control message).
1329        let (mut producer, _) = b_rx.rx_bipbuf.split();
1330        framing::write_mmap_ref(&mut producer, &mmap_ref).unwrap();
1331
1332        // Build the mapping and schedule attach delivery shortly after recv starts.
1333        let dir = tempfile::tempdir().unwrap();
1334        let path = dir.path().join("late-attach.shm");
1335        let region =
1336            shm_primitives::MmapRegion::create(&path, mapping_length as usize, FileCleanup::Manual)
1337                .unwrap();
1338        unsafe {
1339            std::ptr::copy_nonoverlapping(
1340                payload.as_ptr(),
1341                region.region().as_ptr(),
1342                payload.len(),
1343            );
1344        }
1345        let attach = shm_primitives_async::MmapAttachMessage {
1346            map_id,
1347            map_generation,
1348            mapping_length,
1349        };
1350
1351        let control_tx = Arc::new(control_tx);
1352        let delayed_tx = Arc::clone(&control_tx);
1353        std::thread::spawn(move || {
1354            std::thread::sleep(Duration::from_millis(150));
1355            delayed_tx.send(region.as_raw_fd(), &attach).unwrap();
1356        });
1357
1358        let backing = timeout(Duration::from_secs(1), b_rx.recv())
1359            .await
1360            .expect("recv timed out")
1361            .expect("recv should succeed despite delayed attach")
1362            .expect("expected payload");
1363        assert_eq!(backing.as_bytes(), payload.as_slice());
1364    }
1365}