1use std::io;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
10use std::sync::{Arc, Mutex};
11
12use shm_primitives::{BipBuf, PeerId};
13use shm_primitives_async::{Doorbell, SignalResult};
14use tracing::{debug, trace, warn};
15use vox_types::{
16 Backing, Link, LinkRx, LinkTx, LinkTxPermit, SharedBacking, TransportMode, WriteSlot,
17};
18
19use crate::framing::{DEFAULT_INLINE_THRESHOLD, MmapRef, OwnedFrame};
20use crate::mmap_registry::{
21 MmapAllocation, MmapAttachments, MmapChannelRx, MmapChannelTx, MmapRegistry,
22};
23
24pub mod bootstrap;
25pub mod framing;
26pub mod host;
27pub mod mmap_registry;
28pub mod peer_table;
29pub mod segment;
30pub mod varslot;
31
32pub use segment::{AttachError, Segment, SegmentConfig, SegmentLayout};
33pub use varslot::{SizeClassConfig, SlotRef, VarSlotPool};
34
35pub use host::create_test_link_pair;
36pub use host::{
37 AddPeerOptions, GuestSpawnTicket, HostHub, HostPeer, MultiPeerHostDriver, PreparedPeer, ShmHost,
38};
39#[cfg(windows)]
40pub use host::{guest_link_from_names, guest_link_from_ticket_windows};
41#[cfg(unix)]
42pub use host::{guest_link_from_raw, guest_link_from_ticket};
43
44pub mod driver {
45 pub use crate::host::{AddPeerOptions, MultiPeerHostDriver, ShmHost};
46}
47
48const SLOT_LEN_PREFIX_SIZE: usize = 4;
49
50#[derive(Clone)]
51struct Backend(Arc<Segment>);
52
53impl Backend {
54 fn allocate_slot(&self, size: u32, owner_peer: u8) -> Option<SlotRef> {
55 self.0.var_pool().allocate(size, owner_peer)
56 }
57
58 fn free_slot(&self, slot_ref: SlotRef) {
59 let _ = self.0.var_pool().free(slot_ref);
60 }
61
62 unsafe fn slot_data<'a>(&self, slot_ref: &SlotRef) -> &'a [u8] {
63 unsafe { self.0.var_pool().slot_data(slot_ref) }
64 }
65
66 unsafe fn slot_data_mut<'a>(&self, slot_ref: &SlotRef) -> &'a mut [u8] {
67 unsafe { self.0.var_pool().slot_data_mut(slot_ref) }
68 }
69
70 fn max_slot_size(&self) -> Option<u32> {
71 let pool = self.0.var_pool();
72 let class_count = pool.class_count();
73 if class_count == 0 {
74 return None;
75 }
76 let mut max_size = 0u32;
77 for class_idx in 0..class_count {
78 max_size = max_size.max(pool.slot_size(class_idx));
79 }
80 Some(max_size)
81 }
82}
83
84struct TxShared {
85 tx_bipbuf: Arc<BipBuf>,
86 backend: Backend,
87 owner_peer: u8,
88 max_payload_size: u32,
89 inline_threshold: u32,
90 max_varslot_payload_size: Option<u32>,
91 reserve_ring_bytes: u32,
92 tx_lock: Mutex<()>,
93 doorbell: Arc<Doorbell>,
94 doorbell_dead: AtomicBool,
95 stats: Arc<ShmTransportStats>,
96 mmap_registry: Mutex<MmapRegistry>,
97}
98
99pub struct ShmLink {
103 tx_shared: Arc<TxShared>,
104 rx_bipbuf: Arc<BipBuf>,
105 rx_backend: Backend,
106 tx_closed: Arc<AtomicBool>,
107 peer_closed: Arc<AtomicBool>,
108 mmap_attachments: MmapAttachments,
109}
110
111#[derive(Default)]
112struct ShmTransportStats {
113 inline_sends: AtomicU64,
114 slot_ref_sends: AtomicU64,
115 mmap_ref_sends: AtomicU64,
116 inline_recvs: AtomicU64,
117 slot_ref_recvs: AtomicU64,
118 mmap_ref_recvs: AtomicU64,
119 varslot_exhausted: AtomicU64,
120 ring_exhausted: AtomicU64,
121 reserve_waits: AtomicU64,
122 commit_retries: AtomicU64,
123 doorbell_peer_dead: AtomicU64,
124 doorbell_wait_errors: AtomicU64,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub struct ShmTransportStatsSnapshot {
129 pub inline_sends: u64,
130 pub slot_ref_sends: u64,
131 pub mmap_ref_sends: u64,
132 pub inline_recvs: u64,
133 pub slot_ref_recvs: u64,
134 pub mmap_ref_recvs: u64,
135 pub varslot_exhausted: u64,
136 pub ring_exhausted: u64,
137 pub reserve_waits: u64,
138 pub commit_retries: u64,
139 pub doorbell_peer_dead: u64,
140 pub doorbell_wait_errors: u64,
141}
142
143impl ShmTransportStats {
144 fn snapshot(&self) -> ShmTransportStatsSnapshot {
145 ShmTransportStatsSnapshot {
146 inline_sends: self.inline_sends.load(AtomicOrdering::Relaxed),
147 slot_ref_sends: self.slot_ref_sends.load(AtomicOrdering::Relaxed),
148 mmap_ref_sends: self.mmap_ref_sends.load(AtomicOrdering::Relaxed),
149 inline_recvs: self.inline_recvs.load(AtomicOrdering::Relaxed),
150 slot_ref_recvs: self.slot_ref_recvs.load(AtomicOrdering::Relaxed),
151 mmap_ref_recvs: self.mmap_ref_recvs.load(AtomicOrdering::Relaxed),
152 varslot_exhausted: self.varslot_exhausted.load(AtomicOrdering::Relaxed),
153 ring_exhausted: self.ring_exhausted.load(AtomicOrdering::Relaxed),
154 reserve_waits: self.reserve_waits.load(AtomicOrdering::Relaxed),
155 commit_retries: self.commit_retries.load(AtomicOrdering::Relaxed),
156 doorbell_peer_dead: self.doorbell_peer_dead.load(AtomicOrdering::Relaxed),
157 doorbell_wait_errors: self.doorbell_wait_errors.load(AtomicOrdering::Relaxed),
158 }
159 }
160}
161
162impl ShmLink {
163 fn normalize_threshold(threshold: u32) -> u32 {
164 if threshold == 0 {
165 DEFAULT_INLINE_THRESHOLD
166 } else {
167 threshold
168 }
169 }
170
171 #[allow(clippy::too_many_arguments)]
172 fn from_parts(
173 tx_bipbuf: Arc<BipBuf>,
174 rx_bipbuf: Arc<BipBuf>,
175 backend: Backend,
176 doorbell: Arc<Doorbell>,
177 owner_peer: u8,
178 max_payload_size: u32,
179 inline_threshold: u32,
180 tx_closed: Arc<AtomicBool>,
181 peer_closed: Arc<AtomicBool>,
182 mmap_channel_tx: MmapChannelTx,
183 mmap_channel_rx: MmapChannelRx,
184 ) -> Self {
185 let ring_capacity = tx_bipbuf.capacity();
186 let ring_contiguous_ceiling = ring_capacity.saturating_sub(1);
187 let inline_ceiling = max_payload_size
188 .min(Self::normalize_threshold(inline_threshold))
189 .min(ring_contiguous_ceiling.saturating_sub(framing::FRAME_HEADER_SIZE as u32));
190 let reserve_inline_bytes = ((framing::FRAME_HEADER_SIZE as u32 + inline_ceiling) + 3) & !3;
191 let reserve_ring_bytes = reserve_inline_bytes
192 .max(framing::SLOT_REF_ENTRY_SIZE)
193 .max(framing::MMAP_REF_ENTRY_SIZE)
194 .min(ring_contiguous_ceiling);
195 let max_varslot_payload_size = backend
196 .max_slot_size()
197 .and_then(|slot_size| slot_size.checked_sub(SLOT_LEN_PREFIX_SIZE as u32));
198
199 let default_mmap_region_size = 1024 * 1024; let mmap_registry = MmapRegistry::new(mmap_channel_tx, default_mmap_region_size);
201
202 let stats = Arc::new(ShmTransportStats::default());
203 let tx_shared = Arc::new(TxShared {
204 tx_bipbuf,
205 backend: backend.clone(),
206 owner_peer,
207 max_payload_size,
208 inline_threshold: Self::normalize_threshold(inline_threshold),
209 max_varslot_payload_size,
210 reserve_ring_bytes,
211 tx_lock: Mutex::new(()),
212 doorbell,
213 doorbell_dead: AtomicBool::new(false),
214 stats: stats.clone(),
215 mmap_registry: Mutex::new(mmap_registry),
216 });
217
218 let mmap_attachments = MmapAttachments::new(mmap_channel_rx);
219
220 Self {
221 tx_shared,
222 rx_bipbuf,
223 rx_backend: backend,
224 tx_closed,
225 peer_closed,
226 mmap_attachments,
227 }
228 }
229
230 pub fn for_guest(
232 segment: Arc<Segment>,
233 peer_id: PeerId,
234 doorbell: Doorbell,
235 mmap_channel_tx: MmapChannelTx,
236 mmap_channel_rx: MmapChannelRx,
237 ) -> Self {
238 let tx_bipbuf = Arc::new(segment.g2h_bipbuf(peer_id));
239 let rx_bipbuf = Arc::new(segment.h2g_bipbuf(peer_id));
240 let backend = Backend(segment.clone());
241
242 Self::from_parts(
243 tx_bipbuf,
244 rx_bipbuf,
245 backend,
246 Arc::new(doorbell),
247 peer_id.get(),
248 segment.header().max_payload_size,
249 segment.header().inline_threshold,
250 Arc::new(AtomicBool::new(false)),
251 Arc::new(AtomicBool::new(false)),
252 mmap_channel_tx,
253 mmap_channel_rx,
254 )
255 }
256
257 pub fn for_host(
259 segment: Arc<Segment>,
260 peer_id: PeerId,
261 doorbell: Doorbell,
262 mmap_channel_tx: MmapChannelTx,
263 mmap_channel_rx: MmapChannelRx,
264 ) -> Self {
265 let tx_bipbuf = Arc::new(segment.h2g_bipbuf(peer_id));
266 let rx_bipbuf = Arc::new(segment.g2h_bipbuf(peer_id));
267 let backend = Backend(segment.clone());
268
269 Self::from_parts(
270 tx_bipbuf,
271 rx_bipbuf,
272 backend,
273 Arc::new(doorbell),
274 0,
275 segment.header().max_payload_size,
276 segment.header().inline_threshold,
277 Arc::new(AtomicBool::new(false)),
278 Arc::new(AtomicBool::new(false)),
279 mmap_channel_tx,
280 mmap_channel_rx,
281 )
282 }
283
284 #[cfg(windows)]
292 pub async fn accept_doorbell(&self) -> std::io::Result<()> {
293 self.tx_shared.doorbell.accept().await
294 }
295}
296
297pub struct ShmLinkTx {
299 shared: Arc<TxShared>,
300 tx_closed: Arc<AtomicBool>,
301}
302
303pub struct ShmLinkRx {
305 rx_bipbuf: Arc<BipBuf>,
306 backend: Backend,
307 peer_closed: Arc<AtomicBool>,
308 doorbell: Arc<Doorbell>,
309 stats: Arc<ShmTransportStats>,
310 mmap_attachments: MmapAttachments,
311}
312
313pub struct ShmTxPermit {
314 shared: Arc<TxShared>,
315 tx_closed: Arc<AtomicBool>,
316}
317
318enum ShmWriteSlotInner {
319 Inline {
320 bytes: Vec<u8>,
321 },
322 VarSlot {
323 slot_ref: Option<SlotRef>,
324 payload_len: usize,
325 },
326 MmapRef {
327 alloc: Option<MmapAllocation>,
328 payload_len: usize,
329 },
330}
331
332pub struct ShmWriteSlot {
333 shared: Arc<TxShared>,
334 inner: ShmWriteSlotInner,
335}
336
337impl Drop for ShmWriteSlot {
338 fn drop(&mut self) {
339 match &mut self.inner {
340 ShmWriteSlotInner::VarSlot { slot_ref, .. } => {
341 if let Some(slot_ref) = slot_ref.take() {
342 self.shared.backend.free_slot(slot_ref);
343 if matches!(self.shared.doorbell.signal_now(), SignalResult::PeerDead) {
344 self.shared.doorbell_dead.store(true, Ordering::Release);
345 self.shared
346 .stats
347 .doorbell_peer_dead
348 .fetch_add(1, AtomicOrdering::Relaxed);
349 }
350 }
351 }
352 ShmWriteSlotInner::MmapRef { alloc, .. } => {
353 if let Some(alloc) = alloc.take() {
355 alloc.lease_counter.fetch_sub(1, Ordering::Release);
356 }
357 }
358 ShmWriteSlotInner::Inline { .. } => {}
359 }
360 }
361}
362
363impl Link for ShmLink {
364 type Tx = ShmLinkTx;
365 type Rx = ShmLinkRx;
366
367 fn split(self) -> (Self::Tx, Self::Rx) {
368 let tx_shared = self.tx_shared;
369 let doorbell = tx_shared.doorbell.clone();
370 let stats = tx_shared.stats.clone();
371 (
372 ShmLinkTx {
373 shared: tx_shared,
374 tx_closed: self.tx_closed,
375 },
376 ShmLinkRx {
377 rx_bipbuf: self.rx_bipbuf,
378 backend: self.rx_backend,
379 peer_closed: self.peer_closed,
380 doorbell,
381 stats,
382 mmap_attachments: self.mmap_attachments,
383 },
384 )
385 }
386
387 fn supports_transport_mode(mode: TransportMode) -> bool
388 where
389 Self: Sized,
390 {
391 matches!(mode, TransportMode::Bare)
392 }
393}
394
395impl LinkTx for ShmLinkTx {
396 type Permit = ShmTxPermit;
397
398 async fn reserve(&self) -> io::Result<Self::Permit> {
399 loop {
400 if self.tx_closed.load(Ordering::Acquire) {
401 return Err(io::Error::new(
402 io::ErrorKind::BrokenPipe,
403 "shm tx is closed",
404 ));
405 }
406 if self.shared.doorbell_dead.load(Ordering::Acquire) {
407 return Err(io::Error::new(
408 io::ErrorKind::BrokenPipe,
409 "shm doorbell peer is closed",
410 ));
411 }
412 if self
413 .shared
414 .tx_bipbuf
415 .inner()
416 .can_grant(self.shared.reserve_ring_bytes)
417 {
418 return Ok(ShmTxPermit {
419 shared: self.shared.clone(),
420 tx_closed: self.tx_closed.clone(),
421 });
422 }
423
424 self.shared
425 .stats
426 .reserve_waits
427 .fetch_add(1, AtomicOrdering::Relaxed);
428 self.shared
429 .stats
430 .ring_exhausted
431 .fetch_add(1, AtomicOrdering::Relaxed);
432 if let Err(err) = self.shared.doorbell.wait().await {
433 self.shared
434 .stats
435 .doorbell_wait_errors
436 .fetch_add(1, AtomicOrdering::Relaxed);
437 warn!(
438 error = %err,
439 raw_os_error = ?err.raw_os_error(),
440 "shm tx doorbell wait failed"
441 );
442 return Err(io::Error::new(
443 io::ErrorKind::BrokenPipe,
444 format!("shm doorbell wait failed: {err}"),
445 ));
446 }
447 }
448 }
449
450 async fn close(self) -> io::Result<()> {
451 self.tx_closed.store(true, Ordering::Release);
452 match self.shared.doorbell.signal_now() {
453 SignalResult::PeerDead => {
454 self.shared.doorbell_dead.store(true, Ordering::Release);
455 self.shared
456 .stats
457 .doorbell_peer_dead
458 .fetch_add(1, AtomicOrdering::Relaxed);
459 Err(io::Error::new(
460 io::ErrorKind::BrokenPipe,
461 "shm doorbell peer is closed",
462 ))
463 }
464 _ => Ok(()),
465 }
466 }
467}
468
469impl LinkTxPermit for ShmTxPermit {
471 type Slot = ShmWriteSlot;
472
473 fn alloc(self, len: usize) -> io::Result<Self::Slot> {
474 if self.tx_closed.load(Ordering::Acquire) {
475 return Err(io::Error::new(
476 io::ErrorKind::BrokenPipe,
477 "shm tx is closed",
478 ));
479 }
480 if len > self.shared.max_payload_size as usize {
481 return Err(io::Error::new(
482 io::ErrorKind::InvalidInput,
483 "payload exceeds max_payload_size",
484 ));
485 }
486
487 if len as u32 <= self.shared.inline_threshold {
488 return Ok(ShmWriteSlot {
489 shared: self.shared.clone(),
490 inner: ShmWriteSlotInner::Inline {
491 bytes: vec![0; len],
492 },
493 });
494 }
495 let needed = len.checked_add(SLOT_LEN_PREFIX_SIZE).ok_or_else(|| {
496 io::Error::new(
497 io::ErrorKind::InvalidInput,
498 "payload length overflow while allocating slot-ref",
499 )
500 })?;
501 let needed_u32 = u32::try_from(needed).map_err(|_| {
502 io::Error::new(
503 io::ErrorKind::InvalidInput,
504 "payload length exceeds varslot addressing range",
505 )
506 })?;
507 let max_varslot_payload = self.shared.max_varslot_payload_size.ok_or_else(|| {
508 io::Error::new(
509 io::ErrorKind::Unsupported,
510 "payload exceeds inline threshold but no varslot class is configured",
511 )
512 })?;
513 if len as u32 > max_varslot_payload {
514 let mut registry = self
518 .shared
519 .mmap_registry
520 .lock()
521 .expect("mmap registry poisoned");
522 let alloc = registry
523 .alloc(len)
524 .map_err(|e| io::Error::other(format!("mmap alloc failed: {e}")))?;
525 return Ok(ShmWriteSlot {
526 shared: self.shared.clone(),
527 inner: ShmWriteSlotInner::MmapRef {
528 alloc: Some(alloc),
529 payload_len: len,
530 },
531 });
532 }
533 let slot_ref = self
534 .shared
535 .backend
536 .allocate_slot(needed_u32, self.shared.owner_peer)
537 .ok_or_else(|| {
538 self.shared
539 .stats
540 .varslot_exhausted
541 .fetch_add(1, AtomicOrdering::Relaxed);
542 if matches!(self.shared.doorbell.signal_now(), SignalResult::PeerDead) {
543 self.shared.doorbell_dead.store(true, Ordering::Release);
544 self.shared
545 .stats
546 .doorbell_peer_dead
547 .fetch_add(1, AtomicOrdering::Relaxed);
548 }
549 io::Error::new(
550 io::ErrorKind::WouldBlock,
551 "varslot exhausted; retry on next reserve/send cycle",
552 )
553 })?;
554
555 Ok(ShmWriteSlot {
556 shared: self.shared.clone(),
557 inner: ShmWriteSlotInner::VarSlot {
558 slot_ref: Some(slot_ref),
559 payload_len: len,
560 },
561 })
562 }
563}
564
565impl WriteSlot for ShmWriteSlot {
566 fn as_mut_slice(&mut self) -> &mut [u8] {
567 match &mut self.inner {
568 ShmWriteSlotInner::Inline { bytes } => bytes.as_mut_slice(),
569 ShmWriteSlotInner::VarSlot {
570 slot_ref,
571 payload_len,
572 } => {
573 let slot_ref = slot_ref
574 .as_ref()
575 .expect("slot must be present while write slot is alive");
576 let end = SLOT_LEN_PREFIX_SIZE + *payload_len;
577 let data = unsafe { self.shared.backend.slot_data_mut(slot_ref) };
578 &mut data[SLOT_LEN_PREFIX_SIZE..end]
579 }
580 ShmWriteSlotInner::MmapRef { alloc, payload_len } => {
581 let alloc = alloc
582 .as_mut()
583 .expect("mmap alloc must be present while write slot is alive");
584 unsafe { alloc.payload_mut(*payload_len) }
586 }
587 }
588 }
589
590 fn commit(mut self) {
591 fn ring_doorbell(shared: &TxShared) {
592 if matches!(shared.doorbell.signal_now(), SignalResult::PeerDead) {
593 shared.doorbell_dead.store(true, Ordering::Release);
594 shared
595 .stats
596 .doorbell_peer_dead
597 .fetch_add(1, AtomicOrdering::Relaxed);
598 }
599 }
600
601 match &mut self.inner {
602 ShmWriteSlotInner::Inline { bytes } => loop {
603 let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
604 let (mut producer, _) = self.shared.tx_bipbuf.split();
605 let result = framing::write_inline(&mut producer, bytes);
606 drop(lock);
607 match result {
608 Ok(()) => {
609 self.shared
610 .stats
611 .inline_sends
612 .fetch_add(1, AtomicOrdering::Relaxed);
613 ring_doorbell(&self.shared);
614 return;
615 }
616 Err(_) => {
617 self.shared
618 .stats
619 .commit_retries
620 .fetch_add(1, AtomicOrdering::Relaxed);
621 std::thread::yield_now();
622 }
623 }
624 },
625 ShmWriteSlotInner::VarSlot {
626 slot_ref,
627 payload_len,
628 } => {
629 let Some(slot_ref_value) = *slot_ref else {
630 return;
631 };
632
633 {
634 let payload_len_bytes = (*payload_len as u32).to_le_bytes();
635 let data = unsafe { self.shared.backend.slot_data_mut(&slot_ref_value) };
636 data[..SLOT_LEN_PREFIX_SIZE].copy_from_slice(&payload_len_bytes);
637 }
638
639 loop {
640 let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
641 let (mut producer, _) = self.shared.tx_bipbuf.split();
642 let result = framing::write_slot_ref(&mut producer, &slot_ref_value);
643 drop(lock);
644 match result {
645 Ok(()) => {
646 *slot_ref = None;
647 self.shared
648 .stats
649 .slot_ref_sends
650 .fetch_add(1, AtomicOrdering::Relaxed);
651 ring_doorbell(&self.shared);
652 return;
653 }
654 Err(_) => {
655 self.shared
656 .stats
657 .commit_retries
658 .fetch_add(1, AtomicOrdering::Relaxed);
659 std::thread::yield_now();
660 }
661 }
662 }
663 }
664 ShmWriteSlotInner::MmapRef { alloc, payload_len } => {
666 let Some(alloc_value) = alloc.take() else {
667 return;
668 };
669
670 std::sync::atomic::fence(Ordering::Release);
672
673 let mmap_ref = MmapRef {
674 map_id: alloc_value.map_id,
675 map_generation: alloc_value.map_generation,
676 map_offset: alloc_value.map_offset,
677 payload_len: *payload_len as u32,
678 };
679
680 loop {
681 let lock = self.shared.tx_lock.lock().expect("tx lock poisoned");
682 let (mut producer, _) = self.shared.tx_bipbuf.split();
683 let result = framing::write_mmap_ref(&mut producer, &mmap_ref);
684 drop(lock);
685 match result {
686 Ok(()) => {
687 self.shared
691 .stats
692 .mmap_ref_sends
693 .fetch_add(1, AtomicOrdering::Relaxed);
694 ring_doorbell(&self.shared);
695 return;
696 }
697 Err(_) => {
698 self.shared
699 .stats
700 .commit_retries
701 .fetch_add(1, AtomicOrdering::Relaxed);
702 std::thread::yield_now();
703 }
704 }
705 }
706 }
707 }
708 }
709}
710
711#[derive(Debug)]
712pub enum ShmLinkRxError {
713 MmapResolve(crate::mmap_registry::MmapResolveError),
714 DoorbellWait(io::Error),
715 MalformedSlotRefLength {
716 slot_bytes: usize,
717 payload_len: usize,
718 },
719}
720
721impl std::fmt::Display for ShmLinkRxError {
722 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723 match self {
724 ShmLinkRxError::MmapResolve(err) => write!(f, "mmap resolve failed: {err}"),
725 ShmLinkRxError::DoorbellWait(err) => {
726 write!(
727 f,
728 "doorbell wait failed: {} (raw_os_error={:?})",
729 err,
730 err.raw_os_error()
731 )
732 }
733 ShmLinkRxError::MalformedSlotRefLength {
734 slot_bytes,
735 payload_len,
736 } => write!(
737 f,
738 "malformed slot-ref payload length: payload_len={payload_len}, slot_bytes={slot_bytes}"
739 ),
740 }
741 }
742}
743
744impl std::error::Error for ShmLinkRxError {}
745
746impl LinkRx for ShmLinkRx {
747 type Error = ShmLinkRxError;
748
749 async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
750 loop {
751 let (_, mut consumer) = self.rx_bipbuf.split();
752 if let Some(frame) = framing::read_frame(&mut consumer) {
753 return match frame {
754 OwnedFrame::Inline(bytes) => {
757 trace!(len = bytes.len(), "shm rx received inline frame");
758 self.stats
759 .inline_recvs
760 .fetch_add(1, AtomicOrdering::Relaxed);
761 if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
762 let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
763 self.stats
764 .doorbell_peer_dead
765 .fetch_add(1, AtomicOrdering::Relaxed);
766 if !was_closed {
767 debug!("shm rx observed peer dead while draining inline frame");
768 }
769 }
770 Ok(Some(Backing::Boxed(bytes.into_boxed_slice())))
771 }
772 OwnedFrame::SlotRef(slot_ref) => {
774 trace!(slot_ref = ?slot_ref, "shm rx received slot-ref frame");
775 self.stats
776 .slot_ref_recvs
777 .fetch_add(1, AtomicOrdering::Relaxed);
778 if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
779 let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
780 self.stats
781 .doorbell_peer_dead
782 .fetch_add(1, AtomicOrdering::Relaxed);
783 if !was_closed {
784 debug!("shm rx observed peer dead while draining slot-ref frame");
785 }
786 }
787 let slot = unsafe { self.backend.slot_data(&slot_ref) };
788 if slot.len() < SLOT_LEN_PREFIX_SIZE {
789 self.backend.free_slot(slot_ref);
790 warn!(
791 slot_ref = ?slot_ref,
792 slot_bytes = slot.len(),
793 "shm rx malformed slot-ref: missing payload length prefix"
794 );
795 return Err(ShmLinkRxError::MalformedSlotRefLength {
796 slot_bytes: slot.len(),
797 payload_len: 0,
798 });
799 }
800
801 let payload_len =
802 u32::from_le_bytes([slot[0], slot[1], slot[2], slot[3]]) as usize;
803 if payload_len > slot.len().saturating_sub(SLOT_LEN_PREFIX_SIZE) {
804 self.backend.free_slot(slot_ref);
805 warn!(
806 slot_ref = ?slot_ref,
807 slot_bytes = slot.len(),
808 payload_len,
809 "shm rx malformed slot-ref: payload length exceeds slot size"
810 );
811 return Err(ShmLinkRxError::MalformedSlotRefLength {
812 slot_bytes: slot.len(),
813 payload_len,
814 });
815 }
816 trace!(
817 slot_ref = ?slot_ref,
818 slot_bytes = slot.len(),
819 payload_len,
820 "shm rx slot-ref frame decoded"
821 );
822
823 Ok(Some(Backing::shared(Arc::new(ShmVarSlotBacking {
824 backend: self.backend.clone(),
825 slot_ref,
826 payload_len,
827 doorbell: self.doorbell.clone(),
828 peer_closed: self.peer_closed.clone(),
829 stats: self.stats.clone(),
830 }))))
831 }
832 OwnedFrame::MmapRef(mmap_ref) => {
834 trace!(
835 map_id = mmap_ref.map_id,
836 map_generation = mmap_ref.map_generation,
837 map_offset = mmap_ref.map_offset,
838 payload_len = mmap_ref.payload_len,
839 "shm rx received mmap-ref frame"
840 );
841 self.stats
842 .mmap_ref_recvs
843 .fetch_add(1, AtomicOrdering::Relaxed);
844 if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
845 let was_closed = self.peer_closed.swap(true, Ordering::AcqRel);
846 self.stats
847 .doorbell_peer_dead
848 .fetch_add(1, AtomicOrdering::Relaxed);
849 if !was_closed {
850 debug!("shm rx observed peer dead while draining mmap-ref frame");
851 }
852 }
853
854 let mapping = match self.mmap_attachments.resolve_with_grace(
855 mmap_ref.map_id,
856 mmap_ref.map_generation,
857 mmap_ref.map_offset,
858 mmap_ref.payload_len,
859 ) {
860 Ok(mapping) => mapping,
861 Err(error) => {
862 warn!(
863 map_id = mmap_ref.map_id,
864 map_generation = mmap_ref.map_generation,
865 map_offset = mmap_ref.map_offset,
866 payload_len = mmap_ref.payload_len,
867 error = %error,
868 "shm rx mmap-ref resolve failed"
869 );
870 return Err(ShmLinkRxError::MmapResolve(error));
872 }
873 };
874
875 trace!(
877 map_id = mmap_ref.map_id,
878 map_generation = mmap_ref.map_generation,
879 map_offset = mmap_ref.map_offset,
880 payload_len = mmap_ref.payload_len,
881 "shm rx mmap-ref resolved"
882 );
883 Ok(Some(Backing::shared(Arc::new(ShmMmapBacking {
884 mapping,
885 offset: mmap_ref.map_offset as usize,
886 len: mmap_ref.payload_len as usize,
887 }))))
888 }
889 };
890 }
891
892 if self.peer_closed.load(Ordering::Acquire) && self.rx_bipbuf.inner().is_empty() {
893 debug!("shm rx returning EOF: peer closed and rx bipbuf is empty");
894 return Ok(None);
895 }
896
897 trace!("shm rx waiting on doorbell");
898 if let Err(err) = self.doorbell.wait().await {
899 self.stats
900 .doorbell_wait_errors
901 .fetch_add(1, AtomicOrdering::Relaxed);
902 warn!(
903 error = %err,
904 raw_os_error = ?err.raw_os_error(),
905 "shm rx doorbell wait failed"
906 );
907 return Err(ShmLinkRxError::DoorbellWait(err));
908 }
909 trace!("shm rx woke from doorbell wait");
910 }
911 }
912}
913
914impl ShmLinkTx {
915 pub fn stats(&self) -> ShmTransportStatsSnapshot {
916 self.shared.stats.snapshot()
917 }
918}
919
920impl ShmLinkRx {
921 pub fn stats(&self) -> ShmTransportStatsSnapshot {
922 self.stats.snapshot()
923 }
924}
925
926struct ShmVarSlotBacking {
928 backend: Backend,
929 slot_ref: SlotRef,
930 payload_len: usize,
931 doorbell: Arc<Doorbell>,
932 peer_closed: Arc<AtomicBool>,
933 stats: Arc<ShmTransportStats>,
934}
935
936impl SharedBacking for ShmVarSlotBacking {
937 fn as_bytes(&self) -> &[u8] {
938 let slot = unsafe { self.backend.slot_data(&self.slot_ref) };
939 let end = SLOT_LEN_PREFIX_SIZE + self.payload_len;
940 &slot[SLOT_LEN_PREFIX_SIZE..end]
941 }
942}
943
944impl Drop for ShmVarSlotBacking {
945 fn drop(&mut self) {
946 self.backend.free_slot(self.slot_ref);
947 if matches!(self.doorbell.signal_now(), SignalResult::PeerDead) {
948 self.peer_closed.store(true, Ordering::Release);
949 self.stats
950 .doorbell_peer_dead
951 .fetch_add(1, AtomicOrdering::Relaxed);
952 }
953 }
954}
955
956struct ShmMmapBacking {
958 mapping: Arc<mmap_registry::AttachedMapping>,
959 offset: usize,
960 len: usize,
961}
962
963impl SharedBacking for ShmMmapBacking {
964 fn as_bytes(&self) -> &[u8] {
965 let region = self.mapping.region.region();
966 let ptr = region.as_ptr();
967 unsafe { std::slice::from_raw_parts(ptr.add(self.offset), self.len) }
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use std::sync::Arc;
975 use std::time::Duration;
976
977 use shm_primitives::FileCleanup;
978 use tokio::time::timeout;
979 use vox_types::{LinkRx as _, LinkTx as _, LinkTxPermit as _};
980
981 use super::*;
982 use crate::host::create_test_link_pair;
983 use crate::segment::SegmentConfig;
984
985 async fn make_test_pair(
987 bipbuf_capacity: u32,
988 max_payload_size: u32,
989 inline_threshold: u32,
990 size_classes: &[SizeClassConfig],
991 ) -> (ShmLink, ShmLink, tempfile::TempDir) {
992 let dir = tempfile::tempdir().expect("tempdir");
993 let path = dir.path().join("test.shm");
994 let segment = Arc::new(
995 Segment::create(
996 &path,
997 SegmentConfig {
998 max_guests: 1,
999 bipbuf_capacity,
1000 max_payload_size,
1001 inline_threshold,
1002 heartbeat_interval: 0,
1003 size_classes,
1004 },
1005 FileCleanup::Manual,
1006 )
1007 .expect("create segment"),
1008 );
1009 let (a, b) = create_test_link_pair(segment)
1010 .await
1011 .expect("create_test_link_pair");
1012 (a, b, dir)
1013 }
1014
1015 const CLASSES: &[SizeClassConfig] = &[SizeClassConfig {
1016 slot_size: 256,
1017 slot_count: 1,
1018 }];
1019
1020 #[tokio::test]
1021 async fn inline_payload_roundtrip_is_boxed() {
1022 let (a, b, _dir) = make_test_pair(4096, 1024, 128, CLASSES).await;
1023 let (a_tx, _a_rx) = a.split();
1024 let (_b_tx, mut b_rx) = b.split();
1025
1026 let payload = b"inline hello";
1027 let permit = a_tx.reserve().await.unwrap();
1028 let mut slot = permit.alloc(payload.len()).unwrap();
1029 slot.as_mut_slice().copy_from_slice(payload);
1030 slot.commit();
1031
1032 let backing = b_rx.recv().await.unwrap().unwrap();
1033 match backing {
1034 Backing::Boxed(bytes) => assert_eq!(&*bytes, payload),
1035 Backing::Shared(_) => panic!("inline path must be boxed"),
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn slot_ref_payload_is_zero_copy_shared_backing() {
1041 let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1042 let (a_tx, _a_rx) = a.split();
1043 let (_b_tx, mut b_rx) = b.split();
1044
1045 let payload = vec![7_u8; 200];
1046 let permit = a_tx.reserve().await.unwrap();
1047 let mut slot = permit.alloc(payload.len()).unwrap();
1048 slot.as_mut_slice().copy_from_slice(&payload);
1049 slot.commit();
1050
1051 let backing = b_rx.recv().await.unwrap().unwrap();
1052 match backing {
1053 Backing::Shared(shared) => assert_eq!(shared.as_bytes(), payload.as_slice()),
1054 Backing::Boxed(_) => panic!("slot-ref path must be shared"),
1055 }
1056 }
1057
1058 #[tokio::test]
1059 async fn shared_backing_drop_releases_slot() {
1060 let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1061 let (a_tx, _a_rx) = a.split();
1062 let (_b_tx, mut b_rx) = b.split();
1063
1064 let payload = vec![1_u8; 200];
1065 let permit = a_tx.reserve().await.unwrap();
1066 let mut slot = permit.alloc(payload.len()).unwrap();
1067 slot.as_mut_slice().copy_from_slice(&payload);
1068 slot.commit();
1069
1070 let backing = b_rx.recv().await.unwrap().unwrap();
1071
1072 let permit2 = a_tx.reserve().await.unwrap();
1074 match permit2.alloc(payload.len()) {
1075 Ok(_) => panic!("alloc should fail while slot is still held by shared backing"),
1076 Err(err) => assert_eq!(err.kind(), io::ErrorKind::WouldBlock),
1077 }
1078
1079 drop(backing);
1080
1081 let permit3 = a_tx.reserve().await.unwrap();
1082 let _slot3 = permit3
1083 .alloc(payload.len())
1084 .expect("slot must be released after drop");
1085 }
1086
1087 #[tokio::test]
1088 async fn mixed_payload_stress_roundtrip() {
1089 let classes = [SizeClassConfig {
1090 slot_size: 4096,
1091 slot_count: 32,
1092 }];
1093 let (a, b, _dir) = make_test_pair(1 << 16, 1 << 20, 256, &classes).await;
1094 let (a_tx, _a_rx) = a.split();
1095 let (_b_tx, mut b_rx) = b.split();
1096
1097 for i in 0..400 {
1098 let len = if i % 3 == 0 { 48 } else { 1500 };
1099 let payload = vec![(i % 239) as u8; len];
1100 let permit = a_tx.reserve().await.unwrap();
1101 let mut slot = permit.alloc(payload.len()).unwrap();
1102 slot.as_mut_slice().copy_from_slice(&payload);
1103 slot.commit();
1104
1105 let backing = b_rx.recv().await.unwrap().unwrap();
1106 assert_eq!(backing.as_bytes(), payload.as_slice());
1107 }
1108 }
1109
1110 #[tokio::test]
1111 async fn reserve_waits_until_rx_frees_ring_space() {
1112 let (a, b, _dir) = make_test_pair(32, 1024, 256, CLASSES).await;
1113 let (a_tx, _a_rx) = a.split();
1114 let (_b_tx, mut b_rx) = b.split();
1115
1116 let payload = vec![9_u8; 24]; let permit = a_tx.reserve().await.unwrap();
1118 let mut slot = permit.alloc(payload.len()).unwrap();
1119 slot.as_mut_slice().copy_from_slice(&payload);
1120 slot.commit();
1121
1122 let reserve_fut = a_tx.reserve();
1123 tokio::pin!(reserve_fut);
1124 assert!(
1125 timeout(Duration::from_millis(20), &mut reserve_fut)
1126 .await
1127 .is_err(),
1128 "reserve should wait while ring is full"
1129 );
1130
1131 let _ = b_rx.recv().await.unwrap().unwrap();
1132 let permit2 = timeout(Duration::from_secs(1), &mut reserve_fut)
1133 .await
1134 .expect("reserve should wake after recv")
1135 .expect("reserve should succeed");
1136 drop(permit2);
1137 }
1138
1139 #[tokio::test]
1140 async fn transport_stats_track_send_recv_and_exhaustion() {
1141 let (a, b, _dir) = make_test_pair(4096, 1024, 64, CLASSES).await;
1142 let (a_tx, _a_rx) = a.split();
1143 let (_b_tx, mut b_rx) = b.split();
1144
1145 let inline = b"hello";
1146 let permit = a_tx.reserve().await.unwrap();
1147 let mut slot = permit.alloc(inline.len()).unwrap();
1148 slot.as_mut_slice().copy_from_slice(inline);
1149 slot.commit();
1150
1151 let large = vec![1_u8; 200];
1152 let permit = a_tx.reserve().await.unwrap();
1153 let mut slot = permit.alloc(large.len()).unwrap();
1154 slot.as_mut_slice().copy_from_slice(&large);
1155 slot.commit();
1156
1157 let backing1 = b_rx.recv().await.unwrap().unwrap();
1158 assert!(backing1.as_bytes().starts_with(inline));
1159 let backing2 = b_rx.recv().await.unwrap().unwrap();
1160 assert_eq!(backing2.as_bytes(), large.as_slice());
1161
1162 let permit = a_tx.reserve().await.unwrap();
1163 match permit.alloc(large.len()) {
1164 Ok(_) => panic!("alloc should fail while varslot is exhausted"),
1165 Err(err) => assert_eq!(err.kind(), io::ErrorKind::WouldBlock),
1166 }
1167
1168 drop(backing2); let permit = a_tx.reserve().await.unwrap();
1170 let _slot = permit.alloc(large.len()).expect("slot should be available");
1171
1172 let tx_stats = a_tx.stats();
1173 let rx_stats = b_rx.stats();
1174
1175 assert_eq!(tx_stats.inline_sends, 1);
1176 assert_eq!(tx_stats.slot_ref_sends, 1);
1177 assert!(tx_stats.varslot_exhausted >= 1);
1178 assert_eq!(rx_stats.inline_recvs, 1);
1179 assert_eq!(rx_stats.slot_ref_recvs, 1);
1180 }
1181
1182 const SMALL_CLASSES: &[SizeClassConfig] = &[SizeClassConfig {
1184 slot_size: 64,
1185 slot_count: 2,
1186 }];
1187
1188 #[tokio::test]
1189 async fn mmap_large_payload_roundtrip() {
1190 let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1191 let (a_tx, _a_rx) = a.split();
1192 let (_b_tx, mut b_rx) = b.split();
1193
1194 let payload = vec![0xAB_u8; 200];
1196 let permit = a_tx.reserve().await.unwrap();
1197 let mut slot = permit.alloc(payload.len()).unwrap();
1198 slot.as_mut_slice().copy_from_slice(&payload);
1199 slot.commit();
1200
1201 let backing = b_rx.recv().await.unwrap().unwrap();
1202 match &backing {
1203 Backing::Shared(shared) => assert_eq!(shared.as_bytes(), payload.as_slice()),
1204 Backing::Boxed(_) => panic!("mmap path must be shared"),
1205 }
1206
1207 let tx_stats = a_tx.stats();
1208 let rx_stats = b_rx.stats();
1209 assert_eq!(tx_stats.mmap_ref_sends, 1);
1210 assert_eq!(rx_stats.mmap_ref_recvs, 1);
1211 }
1212
1213 #[tokio::test]
1214 async fn mmap_multiple_payloads_share_region() {
1215 let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1216 let (a_tx, _a_rx) = a.split();
1217 let (_b_tx, mut b_rx) = b.split();
1218
1219 for i in 0u8..3 {
1221 let payload = vec![i; 200];
1222 let permit = a_tx.reserve().await.unwrap();
1223 let mut slot = permit.alloc(payload.len()).unwrap();
1224 slot.as_mut_slice().copy_from_slice(&payload);
1225 slot.commit();
1226 }
1227
1228 for i in 0u8..3 {
1229 let backing = b_rx.recv().await.unwrap().unwrap();
1230 assert_eq!(backing.as_bytes(), &vec![i; 200]);
1231 }
1232
1233 assert_eq!(a_tx.stats().mmap_ref_sends, 3);
1234 assert_eq!(b_rx.stats().mmap_ref_recvs, 3);
1235 }
1236
1237 #[tokio::test]
1238 async fn mmap_mixed_with_inline_and_varslot() {
1239 let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1240 let (a_tx, _a_rx) = a.split();
1241 let (_b_tx, mut b_rx) = b.split();
1242
1243 let inline_payload = b"hello";
1245 let permit = a_tx.reserve().await.unwrap();
1246 let mut slot = permit.alloc(inline_payload.len()).unwrap();
1247 slot.as_mut_slice().copy_from_slice(inline_payload);
1248 slot.commit();
1249
1250 let varslot_payload = vec![0x42_u8; 50];
1252 let permit = a_tx.reserve().await.unwrap();
1253 let mut slot = permit.alloc(varslot_payload.len()).unwrap();
1254 slot.as_mut_slice().copy_from_slice(&varslot_payload);
1255 slot.commit();
1256
1257 let mmap_payload = vec![0xFF_u8; 500];
1259 let permit = a_tx.reserve().await.unwrap();
1260 let mut slot = permit.alloc(mmap_payload.len()).unwrap();
1261 slot.as_mut_slice().copy_from_slice(&mmap_payload);
1262 slot.commit();
1263
1264 let b1 = b_rx.recv().await.unwrap().unwrap();
1265 assert!(b1.as_bytes().starts_with(inline_payload));
1266
1267 let b2 = b_rx.recv().await.unwrap().unwrap();
1268 assert_eq!(b2.as_bytes(), varslot_payload.as_slice());
1269
1270 let b3 = b_rx.recv().await.unwrap().unwrap();
1271 assert_eq!(b3.as_bytes(), mmap_payload.as_slice());
1272
1273 let stats = a_tx.stats();
1274 assert_eq!(stats.inline_sends, 1);
1275 assert_eq!(stats.slot_ref_sends, 1);
1276 assert_eq!(stats.mmap_ref_sends, 1);
1277 }
1278
1279 #[tokio::test]
1280 async fn mmap_backing_survives_rx_drop_and_peer_teardown() {
1281 let (a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1282 let (a_tx, _a_rx) = a.split();
1283 let (_b_tx, mut b_rx) = b.split();
1284
1285 let payload = vec![0x5A_u8; 500];
1286 let permit = a_tx.reserve().await.unwrap();
1287 let mut slot = permit.alloc(payload.len()).unwrap();
1288 slot.as_mut_slice().copy_from_slice(&payload);
1289 slot.commit();
1290
1291 let backing = b_rx.recv().await.unwrap().unwrap();
1292 let shared = match backing {
1293 Backing::Shared(shared) => shared,
1294 Backing::Boxed(_) => panic!("expected mmap-backed shared payload"),
1295 };
1296
1297 drop(b_rx);
1299 drop(a_tx);
1300
1301 assert_eq!(shared.as_bytes(), payload.as_slice());
1303 }
1304
1305 #[cfg(unix)]
1306 #[tokio::test]
1307 async fn mmap_ref_can_arrive_before_attach_control_message() {
1308 let (_a, b, _dir) = make_test_pair(4096, 1 << 20, 32, SMALL_CLASSES).await;
1309 let (_b_tx, mut b_rx) = b.split();
1310
1311 let (control_tx, control_rx) =
1314 shm_primitives_async::create_mmap_control_pair_connected().unwrap();
1315 b_rx.mmap_attachments = MmapAttachments::new(MmapChannelRx::Real(control_rx));
1316
1317 let payload = b"late mmap attach payload".to_vec();
1318 let map_id = 72_u32;
1319 let map_generation = 1_u32;
1320 let mapping_length = 4096_u64;
1321 let mmap_ref = MmapRef {
1322 map_id,
1323 map_generation,
1324 map_offset: 0,
1325 payload_len: payload.len() as u32,
1326 };
1327
1328 let (mut producer, _) = b_rx.rx_bipbuf.split();
1330 framing::write_mmap_ref(&mut producer, &mmap_ref).unwrap();
1331
1332 let dir = tempfile::tempdir().unwrap();
1334 let path = dir.path().join("late-attach.shm");
1335 let region =
1336 shm_primitives::MmapRegion::create(&path, mapping_length as usize, FileCleanup::Manual)
1337 .unwrap();
1338 unsafe {
1339 std::ptr::copy_nonoverlapping(
1340 payload.as_ptr(),
1341 region.region().as_ptr(),
1342 payload.len(),
1343 );
1344 }
1345 let attach = shm_primitives_async::MmapAttachMessage {
1346 map_id,
1347 map_generation,
1348 mapping_length,
1349 };
1350
1351 let control_tx = Arc::new(control_tx);
1352 let delayed_tx = Arc::clone(&control_tx);
1353 std::thread::spawn(move || {
1354 std::thread::sleep(Duration::from_millis(150));
1355 delayed_tx.send(region.as_raw_fd(), &attach).unwrap();
1356 });
1357
1358 let backing = timeout(Duration::from_secs(1), b_rx.recv())
1359 .await
1360 .expect("recv timed out")
1361 .expect("recv should succeed despite delayed attach")
1362 .expect("expected payload");
1363 assert_eq!(backing.as_bytes(), payload.as_slice());
1364 }
1365}