1use std::io;
10use std::path::Path;
11
12use shm_primitives::{
13 BipBuf, FileCleanup, MmapRegion, PeerId, PeerState, SEGMENT_HEADER_SIZE, SegmentHeader,
14 SegmentHeaderInit,
15};
16
17use crate::framing::{self, OwnedFrame};
18use crate::peer_table::{PeerTable, bipbuf_pair_size};
19use crate::varslot::{SizeClassConfig, VarSlotPool};
20
21const fn align_up(n: usize, align: usize) -> usize {
24 (n + align - 1) & !(align - 1)
25}
26
27pub struct SegmentLayout {
29 pub peer_table_offset: usize,
31 pub ring_base_offset: usize,
33 pub var_pool_offset: usize,
35 pub total_size: usize,
37}
38
39impl SegmentLayout {
40 pub fn compute(max_guests: u8, bipbuf_capacity: u32, size_classes: &[SizeClassConfig]) -> Self {
41 let peer_table_offset = SEGMENT_HEADER_SIZE; let peer_entries_size = max_guests as usize * 64; let ring_base_offset = peer_table_offset + peer_entries_size;
44 let rings_size = max_guests as usize * bipbuf_pair_size(bipbuf_capacity);
46 let var_pool_offset = align_up(ring_base_offset + rings_size, 64);
47 let pool_size = VarSlotPool::required_size(size_classes);
48 let total_size = var_pool_offset + pool_size;
49
50 Self {
51 peer_table_offset,
52 ring_base_offset,
53 var_pool_offset,
54 total_size,
55 }
56 }
57}
58
59pub struct SegmentConfig<'a> {
63 pub max_guests: u8,
65 pub bipbuf_capacity: u32,
67 pub max_payload_size: u32,
69 pub inline_threshold: u32,
71 pub heartbeat_interval: u64,
73 pub size_classes: &'a [SizeClassConfig],
75}
76
77#[derive(Debug)]
81pub enum AttachError {
82 Io(io::Error),
83 BadHeader(&'static str),
84}
85
86impl From<io::Error> for AttachError {
87 fn from(e: io::Error) -> Self {
88 AttachError::Io(e)
89 }
90}
91
92impl std::fmt::Display for AttachError {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 match self {
95 AttachError::Io(e) => write!(f, "I/O error: {e}"),
96 AttachError::BadHeader(msg) => write!(f, "bad segment header: {msg}"),
97 }
98 }
99}
100
101impl std::error::Error for AttachError {}
102
103pub struct Segment {
110 mmap: MmapRegion,
111 header: *mut SegmentHeader,
112 #[allow(dead_code)]
113 max_guests: u8,
114 bipbuf_capacity: u32,
115 peer_table: PeerTable,
116 var_pool: VarSlotPool,
117 #[allow(dead_code)]
118 size_classes: Vec<SizeClassConfig>,
119}
120
121unsafe impl Send for Segment {}
122unsafe impl Sync for Segment {}
123
124impl Segment {
125 fn refresh_views_after_remap(&mut self) {
126 let region = self.mmap.region();
127 self.header = unsafe { region.get_mut::<SegmentHeader>(0) };
128
129 let peer_table_offset = self.header().peer_table_offset as usize;
130 let var_pool_offset = self.header().var_pool_offset as usize;
131 self.bipbuf_capacity = self.header().bipbuf_capacity;
132
133 self.peer_table = unsafe { PeerTable::attach(region, peer_table_offset, self.max_guests) };
134 self.var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &self.size_classes) };
135 }
136
137 pub fn create(
141 path: &Path,
142 config: SegmentConfig<'_>,
143 cleanup: FileCleanup,
144 ) -> io::Result<Self> {
145 let layout = SegmentLayout::compute(
146 config.max_guests,
147 config.bipbuf_capacity,
148 config.size_classes,
149 );
150
151 let mut mmap = MmapRegion::create(path, layout.total_size, cleanup)?;
152 let region = mmap.region();
153
154 let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
156 unsafe {
157 (*header).init(SegmentHeaderInit {
158 total_size: layout.total_size as u64,
159 max_payload_size: config.max_payload_size,
160 inline_threshold: config.inline_threshold,
161 max_guests: config.max_guests as u32,
162 bipbuf_capacity: config.bipbuf_capacity,
163 peer_table_offset: layout.peer_table_offset as u64,
164 var_pool_offset: layout.var_pool_offset as u64,
165 heartbeat_interval: config.heartbeat_interval,
166 num_var_slot_classes: config.size_classes.len() as u32,
167 });
168 }
169
170 let peer_table = unsafe {
172 PeerTable::init(
173 region,
174 layout.peer_table_offset,
175 config.max_guests,
176 layout.ring_base_offset,
177 config.bipbuf_capacity,
178 )
179 };
180
181 let var_pool =
183 unsafe { VarSlotPool::init(region, layout.var_pool_offset, config.size_classes) };
184
185 if cleanup == FileCleanup::Manual {
188 mmap.take_ownership();
189 }
190
191 Ok(Self {
192 mmap,
193 header,
194 max_guests: config.max_guests,
195 bipbuf_capacity: config.bipbuf_capacity,
196 peer_table,
197 var_pool,
198 size_classes: config.size_classes.to_vec(),
199 })
200 }
201
202 pub fn attach(path: &Path) -> Result<Self, AttachError> {
206 let mmap = MmapRegion::attach(path)?;
207 let region = mmap.region();
208
209 let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
211 unsafe { &*header }
212 .validate()
213 .map_err(AttachError::BadHeader)?;
214
215 let max_guests = unsafe { (*header).max_guests as u8 };
216 let bipbuf_capacity = unsafe { (*header).bipbuf_capacity };
217 let peer_table_offset = unsafe { (*header).peer_table_offset as usize };
218 let var_pool_offset = unsafe { (*header).var_pool_offset as usize };
219 let num_var_slot_classes = unsafe { (*header).num_var_slot_classes };
220
221 let size_classes =
222 unsafe { VarSlotPool::discover_configs(region, var_pool_offset, num_var_slot_classes) }
223 .map_err(AttachError::BadHeader)?;
224
225 let peer_table = unsafe { PeerTable::attach(region, peer_table_offset, max_guests) };
226 let var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &size_classes) };
227
228 Ok(Self {
229 mmap,
230 header,
231 max_guests,
232 bipbuf_capacity,
233 peer_table,
234 var_pool,
235 size_classes,
236 })
237 }
238
239 #[inline]
241 pub fn header(&self) -> &SegmentHeader {
242 unsafe { &*self.header }
243 }
244
245 #[cfg(unix)]
246 pub fn as_raw_fd(&self) -> std::os::fd::RawFd {
247 self.mmap.as_raw_fd()
248 }
249
250 pub fn path(&self) -> &Path {
251 self.mmap.path()
252 }
253
254 #[inline]
256 pub fn peer_table(&self) -> &PeerTable {
257 &self.peer_table
258 }
259
260 #[inline]
262 pub fn var_pool(&self) -> &VarSlotPool {
263 &self.var_pool
264 }
265
266 pub fn reserve_peer(&self) -> Option<PeerId> {
274 let peer_id = self.peer_table.find_empty()?;
275 self.peer_table.entry(peer_id).try_reserve().ok()?;
276 Some(peer_id)
277 }
278
279 pub fn release_reserved_peer(&self, peer_id: PeerId) {
281 self.peer_table.entry(peer_id).release_reserved();
282 }
283
284 pub fn claim_peer(&self, peer_id: PeerId) -> Result<(), PeerState> {
290 self.peer_table.entry(peer_id).try_claim_reserved()
291 }
292
293 pub fn attach_peer(&self) -> Option<PeerId> {
300 let peer_id = self.peer_table.find_empty()?;
301 self.peer_table.entry(peer_id).try_attach().ok()?;
302 Some(peer_id)
303 }
304
305 pub fn detach_peer(&self, peer_id: PeerId) {
309 self.peer_table.entry(peer_id).set_goodbye();
310 }
311
312 pub fn set_host_goodbye(&self) {
316 self.header()
317 .host_goodbye
318 .store(1, shm_primitives::sync::Ordering::Release);
319 }
320
321 pub fn is_peer_heartbeat_stale(&self, peer_id: PeerId, current_ns: u64) -> bool {
327 let interval = self.header().heartbeat_interval;
328 self.peer_table
329 .entry(peer_id)
330 .is_heartbeat_stale(current_ns, interval)
331 }
332
333 pub fn recover_crashed_peer(&self, peer_id: PeerId) {
341 let entry = self.peer_table.entry(peer_id);
342
343 entry.set_goodbye();
345
346 {
350 let h2g = self.h2g_bipbuf(peer_id);
351 let (_, mut consumer) = h2g.split();
352 while let Some(frame) = framing::read_frame(&mut consumer) {
353 match frame {
354 OwnedFrame::SlotRef(slot_ref) => {
355 let _ = self.var_pool.free(slot_ref);
356 }
357 OwnedFrame::MmapRef(_) => {
358 }
362 OwnedFrame::Inline(_) => {}
363 }
364 }
365 h2g.reset();
367 }
368
369 self.g2h_bipbuf(peer_id).reset();
371
372 self.var_pool.reclaim_peer_slots(peer_id.get());
374
375 entry.reset();
377 }
378
379 pub fn grow_segment(&mut self, new_size: usize) -> io::Result<()> {
388 self.mmap.resize(new_size)?;
390 self.refresh_views_after_remap();
391
392 self.header()
394 .current_size
395 .store(new_size as u64, shm_primitives::sync::Ordering::Release);
396
397 Ok(())
398 }
399
400 pub fn check_and_remap(&mut self) -> io::Result<bool> {
406 let published = self
407 .header()
408 .current_size
409 .load(shm_primitives::sync::Ordering::Acquire);
410 let mapped = self.mmap.len();
411 if published as usize > mapped {
412 self.mmap.resize(published as usize)?;
413 self.refresh_views_after_remap();
414 Ok(true)
415 } else {
416 Ok(false)
417 }
418 }
419
420 pub fn g2h_bipbuf(&self, peer_id: PeerId) -> BipBuf {
426 let entry = self.peer_table.entry(peer_id);
427 let g2h_offset = entry.ring_offset as usize;
428 let region = self.mmap.region();
429 unsafe { BipBuf::attach(region, g2h_offset) }
430 }
431
432 pub fn h2g_bipbuf(&self, peer_id: PeerId) -> BipBuf {
436 let entry = self.peer_table.entry(peer_id);
437 let g2h_offset = entry.ring_offset as usize;
438 let h2g_offset = g2h_offset + crate::peer_table::bipbuf_single_stride(self.bipbuf_capacity);
439 let region = self.mmap.region();
440 unsafe { BipBuf::attach(region, h2g_offset) }
441 }
442}
443
444#[cfg(all(test, not(loom)))]
445mod tests {
446 use std::path::PathBuf;
447
448 use shm_primitives::{FileCleanup, MAGIC, MmapRegion, PeerState};
449
450 use super::{AttachError, Segment, SegmentConfig, SegmentLayout};
451 use crate::varslot::SizeClassConfig;
452
453 fn test_size_classes() -> [SizeClassConfig; 2] {
454 [
455 SizeClassConfig {
456 slot_size: 1024,
457 slot_count: 8,
458 },
459 SizeClassConfig {
460 slot_size: 16384,
461 slot_count: 4,
462 },
463 ]
464 }
465
466 fn test_path(name: &str) -> (tempfile::TempDir, PathBuf) {
467 let dir = tempfile::tempdir().expect("create tempdir");
468 let path = dir.path().join(name);
469 (dir, path)
470 }
471
472 fn make_config<'a>(size_classes: &'a [SizeClassConfig]) -> SegmentConfig<'a> {
473 SegmentConfig {
474 max_guests: 4,
475 bipbuf_capacity: 4096,
476 max_payload_size: 64 * 1024,
477 inline_threshold: 0,
478 heartbeat_interval: 1_000_000,
479 size_classes,
480 }
481 }
482
483 #[test]
484 fn layout_compute_produces_aligned_monotonic_offsets() {
485 let size_classes = test_size_classes();
486 let layout = SegmentLayout::compute(4, 4096, &size_classes);
487
488 assert_eq!(
489 layout.peer_table_offset,
490 shm_primitives::SEGMENT_HEADER_SIZE
491 );
492 assert!(layout.ring_base_offset >= layout.peer_table_offset);
493 assert!(layout.var_pool_offset >= layout.ring_base_offset);
494 assert!(layout.var_pool_offset.is_multiple_of(64));
495 assert!(layout.total_size > layout.var_pool_offset);
496 }
497
498 #[test]
499 fn create_then_attach_roundtrips_header_and_offsets() {
500 let size_classes = test_size_classes();
501 let (_tmp, path) = test_path("roundtrip.segment");
502
503 let host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
504 .expect("create segment");
505 let guest = Segment::attach(&path).expect("attach segment");
506
507 assert_eq!(host.header().magic, MAGIC);
508 assert_eq!(guest.header().magic, MAGIC);
509 assert_eq!(host.header().max_guests, 4);
510 assert_eq!(guest.header().max_guests, 4);
511 assert_eq!(host.header().bipbuf_capacity, 4096);
512 assert_eq!(guest.header().bipbuf_capacity, 4096);
513 assert_eq!(
514 host.header().peer_table_offset,
515 guest.header().peer_table_offset
516 );
517 assert_eq!(
518 host.header().var_pool_offset,
519 guest.header().var_pool_offset
520 );
521 }
522
523 #[test]
524 fn attach_rejects_corrupted_header_magic() {
525 let size_classes = test_size_classes();
526 let (_tmp, path) = test_path("corrupt.segment");
527 let _segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
528 .expect("create segment");
529
530 let mmap = MmapRegion::attach(&path).expect("attach raw mmap");
531 let region = mmap.region();
532 let header = unsafe { region.get_mut::<shm_primitives::SegmentHeader>(0) };
533 header.magic[0] ^= 0xFF;
534 drop(mmap);
535
536 let err = match Segment::attach(&path) {
537 Ok(_) => panic!("corrupted header must fail attach"),
538 Err(err) => err,
539 };
540 assert!(
541 matches!(err, AttachError::BadHeader(_)),
542 "unexpected err: {err:?}"
543 );
544 }
545
546 #[test]
547 fn peer_lifecycle_reserve_release_claim_detach_and_attach() {
548 let size_classes = test_size_classes();
549 let (_tmp, path) = test_path("peer-lifecycle.segment");
550 let segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
551 .expect("create segment");
552
553 let reserved = segment.reserve_peer().expect("reserve peer");
554 assert_eq!(
555 segment.peer_table().entry(reserved).state(),
556 PeerState::Reserved
557 );
558 segment.release_reserved_peer(reserved);
559 assert_eq!(
560 segment.peer_table().entry(reserved).state(),
561 PeerState::Empty
562 );
563
564 let reserved_again = segment.reserve_peer().expect("reserve peer again");
565 segment
566 .claim_peer(reserved_again)
567 .expect("claim reserved peer");
568 assert_eq!(
569 segment.peer_table().entry(reserved_again).state(),
570 PeerState::Attached
571 );
572 segment.detach_peer(reserved_again);
573 assert_eq!(
574 segment.peer_table().entry(reserved_again).state(),
575 PeerState::Goodbye
576 );
577
578 let attached = segment.attach_peer().expect("attach walk-in peer");
579 assert_eq!(
580 segment.peer_table().entry(attached).state(),
581 PeerState::Attached
582 );
583 }
584
585 #[test]
586 fn grow_segment_publishes_size_and_guest_remaps() {
587 let size_classes = test_size_classes();
588 let (_tmp, path) = test_path("grow.segment");
589 let mut host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
590 .expect("create segment");
591 let mut guest = Segment::attach(&path).expect("attach guest");
592
593 let old_size = host.header().current_size() as usize;
594 let new_size = old_size + 64 * 1024;
595
596 host.grow_segment(new_size).expect("grow segment");
597 assert_eq!(host.header().current_size() as usize, new_size);
598
599 let remapped = guest.check_and_remap().expect("guest remap check");
600 assert!(remapped, "guest should remap after host growth");
601
602 let remapped_again = guest.check_and_remap().expect("guest remap check again");
603 assert!(!remapped_again, "no additional growth should produce false");
604 }
605}