Skip to main content

vox_shm/
segment.rs

1//! High-level `Segment` — the top-level handle for a vox SHM segment.
2//!
3//! Combines the mmap region, segment header, peer table, and VarSlotPool
4//! into a single owned type for both host and guest use.
5//!
6//! r[impl shm]
7//! r[impl shm.architecture]
8
9use std::io;
10use std::path::Path;
11
12use shm_primitives::{
13    BipBuf, FileCleanup, MmapRegion, PeerId, PeerState, SEGMENT_HEADER_SIZE, SegmentHeader,
14    SegmentHeaderInit,
15};
16
17use crate::framing::{self, OwnedFrame};
18use crate::peer_table::{PeerTable, bipbuf_pair_size};
19use crate::varslot::{SizeClassConfig, VarSlotPool};
20
21// ── layout ─────────────────────────────────────────────────────────────────
22
23const fn align_up(n: usize, align: usize) -> usize {
24    (n + align - 1) & !(align - 1)
25}
26
27/// Computed byte offsets for a segment's sub-structures.
28pub struct SegmentLayout {
29    /// Byte offset of the peer table (= SEGMENT_HEADER_SIZE = 128).
30    pub peer_table_offset: usize,
31    /// Byte offset of the BipBuffer pairs within the segment.
32    pub ring_base_offset: usize,
33    /// Byte offset of the VarSlotPool.
34    pub var_pool_offset: usize,
35    /// Total segment size in bytes.
36    pub total_size: usize,
37}
38
39impl SegmentLayout {
40    pub fn compute(max_guests: u8, bipbuf_capacity: u32, size_classes: &[SizeClassConfig]) -> Self {
41        let peer_table_offset = SEGMENT_HEADER_SIZE; // 128, already 64-byte aligned
42        let peer_entries_size = max_guests as usize * 64; // PeerEntry is 64 bytes
43        let ring_base_offset = peer_table_offset + peer_entries_size;
44        // ring_base is 64-byte aligned: 128 + N*64 is always a multiple of 64
45        let rings_size = max_guests as usize * bipbuf_pair_size(bipbuf_capacity);
46        let var_pool_offset = align_up(ring_base_offset + rings_size, 64);
47        let pool_size = VarSlotPool::required_size(size_classes);
48        let total_size = var_pool_offset + pool_size;
49
50        Self {
51            peer_table_offset,
52            ring_base_offset,
53            var_pool_offset,
54            total_size,
55        }
56    }
57}
58
59// ── SegmentConfig ──────────────────────────────────────────────────────────
60
61/// Parameters for creating a new segment (host side).
62pub struct SegmentConfig<'a> {
63    /// Maximum number of concurrent guest processes.
64    pub max_guests: u8,
65    /// BipBuffer data capacity per direction, per guest, in bytes.
66    pub bipbuf_capacity: u32,
67    /// Maximum total payload size (written into header for guest reference).
68    pub max_payload_size: u32,
69    /// Inline threshold (0 = default 256).
70    pub inline_threshold: u32,
71    /// Heartbeat interval in nanoseconds; 0 = heartbeats disabled.
72    pub heartbeat_interval: u64,
73    /// VarSlotPool size classes.
74    pub size_classes: &'a [SizeClassConfig],
75}
76
77// ── AttachError ────────────────────────────────────────────────────────────
78
79/// Errors that can occur when attaching to an existing segment.
80#[derive(Debug)]
81pub enum AttachError {
82    Io(io::Error),
83    BadHeader(&'static str),
84}
85
86impl From<io::Error> for AttachError {
87    fn from(e: io::Error) -> Self {
88        AttachError::Io(e)
89    }
90}
91
92impl std::fmt::Display for AttachError {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        match self {
95            AttachError::Io(e) => write!(f, "I/O error: {e}"),
96            AttachError::BadHeader(msg) => write!(f, "bad segment header: {msg}"),
97        }
98    }
99}
100
101impl std::error::Error for AttachError {}
102
103// ── Segment ────────────────────────────────────────────────────────────────
104
105/// A vox SHM segment — the top-level handle tying together all sub-structures.
106///
107/// On the host side, create with [`Segment::create`].
108/// On the guest side, attach with [`Segment::attach`].
109pub struct Segment {
110    mmap: MmapRegion,
111    header: *mut SegmentHeader,
112    #[allow(dead_code)]
113    max_guests: u8,
114    bipbuf_capacity: u32,
115    peer_table: PeerTable,
116    var_pool: VarSlotPool,
117    #[allow(dead_code)]
118    size_classes: Vec<SizeClassConfig>,
119}
120
121unsafe impl Send for Segment {}
122unsafe impl Sync for Segment {}
123
124impl Segment {
125    fn refresh_views_after_remap(&mut self) {
126        let region = self.mmap.region();
127        self.header = unsafe { region.get_mut::<SegmentHeader>(0) };
128
129        let peer_table_offset = self.header().peer_table_offset as usize;
130        let var_pool_offset = self.header().var_pool_offset as usize;
131        self.bipbuf_capacity = self.header().bipbuf_capacity;
132
133        self.peer_table = unsafe { PeerTable::attach(region, peer_table_offset, self.max_guests) };
134        self.var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &self.size_classes) };
135    }
136
137    /// Create a new segment file at `path` and initialize all sub-structures.
138    ///
139    /// r[impl shm.segment]
140    pub fn create(
141        path: &Path,
142        config: SegmentConfig<'_>,
143        cleanup: FileCleanup,
144    ) -> io::Result<Self> {
145        let layout = SegmentLayout::compute(
146            config.max_guests,
147            config.bipbuf_capacity,
148            config.size_classes,
149        );
150
151        let mut mmap = MmapRegion::create(path, layout.total_size, cleanup)?;
152        let region = mmap.region();
153
154        // Initialize segment header.
155        let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
156        unsafe {
157            (*header).init(SegmentHeaderInit {
158                total_size: layout.total_size as u64,
159                max_payload_size: config.max_payload_size,
160                inline_threshold: config.inline_threshold,
161                max_guests: config.max_guests as u32,
162                bipbuf_capacity: config.bipbuf_capacity,
163                peer_table_offset: layout.peer_table_offset as u64,
164                var_pool_offset: layout.var_pool_offset as u64,
165                heartbeat_interval: config.heartbeat_interval,
166                num_var_slot_classes: config.size_classes.len() as u32,
167            });
168        }
169
170        // Initialize peer table and BipBuffer pairs.
171        let peer_table = unsafe {
172            PeerTable::init(
173                region,
174                layout.peer_table_offset,
175                config.max_guests,
176                layout.ring_base_offset,
177                config.bipbuf_capacity,
178            )
179        };
180
181        // Initialize VarSlotPool.
182        let var_pool =
183            unsafe { VarSlotPool::init(region, layout.var_pool_offset, config.size_classes) };
184
185        // If Auto cleanup, ownership transferred to mmap; keep it live.
186        // For Manual, we own the file.
187        if cleanup == FileCleanup::Manual {
188            mmap.take_ownership();
189        }
190
191        Ok(Self {
192            mmap,
193            header,
194            max_guests: config.max_guests,
195            bipbuf_capacity: config.bipbuf_capacity,
196            peer_table,
197            var_pool,
198            size_classes: config.size_classes.to_vec(),
199        })
200    }
201
202    /// Attach to an existing segment file at `path`.
203    ///
204    /// r[impl shm.guest.attach]
205    pub fn attach(path: &Path) -> Result<Self, AttachError> {
206        let mmap = MmapRegion::attach(path)?;
207        let region = mmap.region();
208
209        // Validate the header.
210        let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
211        unsafe { &*header }
212            .validate()
213            .map_err(AttachError::BadHeader)?;
214
215        let max_guests = unsafe { (*header).max_guests as u8 };
216        let bipbuf_capacity = unsafe { (*header).bipbuf_capacity };
217        let peer_table_offset = unsafe { (*header).peer_table_offset as usize };
218        let var_pool_offset = unsafe { (*header).var_pool_offset as usize };
219        let num_var_slot_classes = unsafe { (*header).num_var_slot_classes };
220
221        let size_classes =
222            unsafe { VarSlotPool::discover_configs(region, var_pool_offset, num_var_slot_classes) }
223                .map_err(AttachError::BadHeader)?;
224
225        let peer_table = unsafe { PeerTable::attach(region, peer_table_offset, max_guests) };
226        let var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &size_classes) };
227
228        Ok(Self {
229            mmap,
230            header,
231            max_guests,
232            bipbuf_capacity,
233            peer_table,
234            var_pool,
235            size_classes,
236        })
237    }
238
239    /// Access the segment header.
240    #[inline]
241    pub fn header(&self) -> &SegmentHeader {
242        unsafe { &*self.header }
243    }
244
245    #[cfg(unix)]
246    pub fn as_raw_fd(&self) -> std::os::fd::RawFd {
247        self.mmap.as_raw_fd()
248    }
249
250    pub fn path(&self) -> &Path {
251        self.mmap.path()
252    }
253
254    /// Access the peer table.
255    #[inline]
256    pub fn peer_table(&self) -> &PeerTable {
257        &self.peer_table
258    }
259
260    /// Access the VarSlotPool.
261    #[inline]
262    pub fn var_pool(&self) -> &VarSlotPool {
263        &self.var_pool
264    }
265
266    // ── peer lifecycle ──────────────────────────────────────────────────────
267
268    /// Reserve an empty peer table slot for an about-to-be-spawned guest.
269    ///
270    /// Returns the assigned `PeerId`, or `None` if all slots are occupied.
271    ///
272    /// r[impl shm.spawn]
273    pub fn reserve_peer(&self) -> Option<PeerId> {
274        let peer_id = self.peer_table.find_empty()?;
275        self.peer_table.entry(peer_id).try_reserve().ok()?;
276        Some(peer_id)
277    }
278
279    /// Release a reserved slot if the spawn fails before the guest could claim it.
280    pub fn release_reserved_peer(&self, peer_id: PeerId) {
281        self.peer_table.entry(peer_id).release_reserved();
282    }
283
284    /// Claim a Reserved slot from the guest side (called by the spawned guest).
285    ///
286    /// Returns `Err(actual)` if the slot is not in the Reserved state.
287    ///
288    /// r[impl shm.guest.attach]
289    pub fn claim_peer(&self, peer_id: PeerId) -> Result<(), PeerState> {
290        self.peer_table.entry(peer_id).try_claim_reserved()
291    }
292
293    /// Attach to any empty slot (walk-in guest, no prior reservation).
294    ///
295    /// Returns the assigned `PeerId`, or `None` if no empty slot exists.
296    ///
297    /// r[impl shm.guest.attach]
298    /// r[impl shm.guest.attach-failure]
299    pub fn attach_peer(&self) -> Option<PeerId> {
300        let peer_id = self.peer_table.find_empty()?;
301        self.peer_table.entry(peer_id).try_attach().ok()?;
302        Some(peer_id)
303    }
304
305    /// Mark a peer as detaching (graceful detach — step 1).
306    ///
307    /// r[impl shm.guest.detach]
308    pub fn detach_peer(&self, peer_id: PeerId) {
309        self.peer_table.entry(peer_id).set_goodbye();
310    }
311
312    /// Signal host shutdown to all guests by setting `host_goodbye`.
313    ///
314    /// r[impl shm.host.goodbye]
315    pub fn set_host_goodbye(&self) {
316        self.header()
317            .host_goodbye
318            .store(1, shm_primitives::sync::Ordering::Release);
319    }
320
321    // ── crash detection helpers ─────────────────────────────────────────────
322
323    /// Return true if the peer's heartbeat is stale.
324    ///
325    /// r[impl shm.crash.detection]
326    pub fn is_peer_heartbeat_stale(&self, peer_id: PeerId, current_ns: u64) -> bool {
327        let interval = self.header().heartbeat_interval;
328        self.peer_table
329            .entry(peer_id)
330            .is_heartbeat_stale(current_ns, interval)
331    }
332
333    // ── crash recovery ──────────────────────────────────────────────────────
334
335    /// Perform all crash recovery steps for a dead guest.
336    ///
337    /// Must only be called by the host after confirming the peer has crashed.
338    ///
339    /// r[impl shm.crash.recovery]
340    pub fn recover_crashed_peer(&self, peer_id: PeerId) {
341        let entry = self.peer_table.entry(peer_id);
342
343        // Step 1: mark as Goodbye.
344        entry.set_goodbye();
345
346        // Step 2: scan H2G BipBuffer for SLOT_REF/MMAP_REF frames; free slots.
347        // This MUST happen before step 3 (resetting the buffer destroys content).
348        // r[impl shm.mmap.crash-recovery]
349        {
350            let h2g = self.h2g_bipbuf(peer_id);
351            let (_, mut consumer) = h2g.split();
352            while let Some(frame) = framing::read_frame(&mut consumer) {
353                match frame {
354                    OwnedFrame::SlotRef(slot_ref) => {
355                        let _ = self.var_pool.free(slot_ref);
356                    }
357                    OwnedFrame::MmapRef(_) => {
358                        // MmapRef frames reference regions allocated by the crashed peer.
359                        // Those regions are gone with the process — just drain the frame.
360                        // Per-link mmap leases are cleaned up when the ShmLink is dropped.
361                    }
362                    OwnedFrame::Inline(_) => {}
363                }
364            }
365            // consumer dropped here — borrow of h2g ends.
366            h2g.reset();
367        }
368
369        // Step 3 (continued): reset G2H BipBuffer.
370        self.g2h_bipbuf(peer_id).reset();
371
372        // Step 4: reclaim all VarSlotPool slots owned by this peer.
373        self.var_pool.reclaim_peer_slots(peer_id.get());
374
375        // Step 5: return slot to Empty so a new guest can attach.
376        entry.reset();
377    }
378
379    // ── extent growth ───────────────────────────────────────────────────────
380
381    /// Grow the segment to `new_size` bytes and publish the new size.
382    ///
383    /// After this returns, the caller MUST signal every attached guest's
384    /// doorbell so they remap and see the new extent.
385    ///
386    /// r[impl shm.varslot.extents.notification]
387    pub fn grow_segment(&mut self, new_size: usize) -> io::Result<()> {
388        // Step 1: truncate/grow the backing file and remap.
389        self.mmap.resize(new_size)?;
390        self.refresh_views_after_remap();
391
392        // Step 2: publish new size with Release ordering.
393        self.header()
394            .current_size
395            .store(new_size as u64, shm_primitives::sync::Ordering::Release);
396
397        Ok(())
398    }
399
400    /// Check if the backing file has grown and remap if needed (guest side).
401    ///
402    /// Returns `true` if the mapping was extended.
403    ///
404    /// r[impl shm.varslot.extents.notification]
405    pub fn check_and_remap(&mut self) -> io::Result<bool> {
406        let published = self
407            .header()
408            .current_size
409            .load(shm_primitives::sync::Ordering::Acquire);
410        let mapped = self.mmap.len();
411        if published as usize > mapped {
412            self.mmap.resize(published as usize)?;
413            self.refresh_views_after_remap();
414            Ok(true)
415        } else {
416            Ok(false)
417        }
418    }
419
420    // ── BipBuffer access ────────────────────────────────────────────────────
421
422    /// Guest-to-host BipBuffer for the given peer.
423    ///
424    /// The guest writes into this; the host reads from it.
425    pub fn g2h_bipbuf(&self, peer_id: PeerId) -> BipBuf {
426        let entry = self.peer_table.entry(peer_id);
427        let g2h_offset = entry.ring_offset as usize;
428        let region = self.mmap.region();
429        unsafe { BipBuf::attach(region, g2h_offset) }
430    }
431
432    /// Host-to-guest BipBuffer for the given peer.
433    ///
434    /// The host writes into this; the guest reads from it.
435    pub fn h2g_bipbuf(&self, peer_id: PeerId) -> BipBuf {
436        let entry = self.peer_table.entry(peer_id);
437        let g2h_offset = entry.ring_offset as usize;
438        let h2g_offset = g2h_offset + crate::peer_table::bipbuf_single_stride(self.bipbuf_capacity);
439        let region = self.mmap.region();
440        unsafe { BipBuf::attach(region, h2g_offset) }
441    }
442}
443
444#[cfg(all(test, not(loom)))]
445mod tests {
446    use std::path::PathBuf;
447
448    use shm_primitives::{FileCleanup, MAGIC, MmapRegion, PeerState};
449
450    use super::{AttachError, Segment, SegmentConfig, SegmentLayout};
451    use crate::varslot::SizeClassConfig;
452
453    fn test_size_classes() -> [SizeClassConfig; 2] {
454        [
455            SizeClassConfig {
456                slot_size: 1024,
457                slot_count: 8,
458            },
459            SizeClassConfig {
460                slot_size: 16384,
461                slot_count: 4,
462            },
463        ]
464    }
465
466    fn test_path(name: &str) -> (tempfile::TempDir, PathBuf) {
467        let dir = tempfile::tempdir().expect("create tempdir");
468        let path = dir.path().join(name);
469        (dir, path)
470    }
471
472    fn make_config<'a>(size_classes: &'a [SizeClassConfig]) -> SegmentConfig<'a> {
473        SegmentConfig {
474            max_guests: 4,
475            bipbuf_capacity: 4096,
476            max_payload_size: 64 * 1024,
477            inline_threshold: 0,
478            heartbeat_interval: 1_000_000,
479            size_classes,
480        }
481    }
482
483    #[test]
484    fn layout_compute_produces_aligned_monotonic_offsets() {
485        let size_classes = test_size_classes();
486        let layout = SegmentLayout::compute(4, 4096, &size_classes);
487
488        assert_eq!(
489            layout.peer_table_offset,
490            shm_primitives::SEGMENT_HEADER_SIZE
491        );
492        assert!(layout.ring_base_offset >= layout.peer_table_offset);
493        assert!(layout.var_pool_offset >= layout.ring_base_offset);
494        assert!(layout.var_pool_offset.is_multiple_of(64));
495        assert!(layout.total_size > layout.var_pool_offset);
496    }
497
498    #[test]
499    fn create_then_attach_roundtrips_header_and_offsets() {
500        let size_classes = test_size_classes();
501        let (_tmp, path) = test_path("roundtrip.segment");
502
503        let host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
504            .expect("create segment");
505        let guest = Segment::attach(&path).expect("attach segment");
506
507        assert_eq!(host.header().magic, MAGIC);
508        assert_eq!(guest.header().magic, MAGIC);
509        assert_eq!(host.header().max_guests, 4);
510        assert_eq!(guest.header().max_guests, 4);
511        assert_eq!(host.header().bipbuf_capacity, 4096);
512        assert_eq!(guest.header().bipbuf_capacity, 4096);
513        assert_eq!(
514            host.header().peer_table_offset,
515            guest.header().peer_table_offset
516        );
517        assert_eq!(
518            host.header().var_pool_offset,
519            guest.header().var_pool_offset
520        );
521    }
522
523    #[test]
524    fn attach_rejects_corrupted_header_magic() {
525        let size_classes = test_size_classes();
526        let (_tmp, path) = test_path("corrupt.segment");
527        let _segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
528            .expect("create segment");
529
530        let mmap = MmapRegion::attach(&path).expect("attach raw mmap");
531        let region = mmap.region();
532        let header = unsafe { region.get_mut::<shm_primitives::SegmentHeader>(0) };
533        header.magic[0] ^= 0xFF;
534        drop(mmap);
535
536        let err = match Segment::attach(&path) {
537            Ok(_) => panic!("corrupted header must fail attach"),
538            Err(err) => err,
539        };
540        assert!(
541            matches!(err, AttachError::BadHeader(_)),
542            "unexpected err: {err:?}"
543        );
544    }
545
546    #[test]
547    fn peer_lifecycle_reserve_release_claim_detach_and_attach() {
548        let size_classes = test_size_classes();
549        let (_tmp, path) = test_path("peer-lifecycle.segment");
550        let segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
551            .expect("create segment");
552
553        let reserved = segment.reserve_peer().expect("reserve peer");
554        assert_eq!(
555            segment.peer_table().entry(reserved).state(),
556            PeerState::Reserved
557        );
558        segment.release_reserved_peer(reserved);
559        assert_eq!(
560            segment.peer_table().entry(reserved).state(),
561            PeerState::Empty
562        );
563
564        let reserved_again = segment.reserve_peer().expect("reserve peer again");
565        segment
566            .claim_peer(reserved_again)
567            .expect("claim reserved peer");
568        assert_eq!(
569            segment.peer_table().entry(reserved_again).state(),
570            PeerState::Attached
571        );
572        segment.detach_peer(reserved_again);
573        assert_eq!(
574            segment.peer_table().entry(reserved_again).state(),
575            PeerState::Goodbye
576        );
577
578        let attached = segment.attach_peer().expect("attach walk-in peer");
579        assert_eq!(
580            segment.peer_table().entry(attached).state(),
581            PeerState::Attached
582        );
583    }
584
585    #[test]
586    fn grow_segment_publishes_size_and_guest_remaps() {
587        let size_classes = test_size_classes();
588        let (_tmp, path) = test_path("grow.segment");
589        let mut host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
590            .expect("create segment");
591        let mut guest = Segment::attach(&path).expect("attach guest");
592
593        let old_size = host.header().current_size() as usize;
594        let new_size = old_size + 64 * 1024;
595
596        host.grow_segment(new_size).expect("grow segment");
597        assert_eq!(host.header().current_size() as usize, new_size);
598
599        let remapped = guest.check_and_remap().expect("guest remap check");
600        assert!(remapped, "guest should remap after host growth");
601
602        let remapped_again = guest.check_and_remap().expect("guest remap check again");
603        assert!(!remapped_again, "no additional growth should produce false");
604    }
605}