use std::io;
use std::path::Path;
use shm_primitives::{
BipBuf, FileCleanup, MmapRegion, PeerId, PeerState, SEGMENT_HEADER_SIZE, SegmentHeader,
SegmentHeaderInit,
};
use crate::framing::{self, OwnedFrame};
use crate::peer_table::{PeerTable, bipbuf_pair_size};
use crate::varslot::{SizeClassConfig, VarSlotPool};
const fn align_up(n: usize, align: usize) -> usize {
(n + align - 1) & !(align - 1)
}
pub struct SegmentLayout {
pub peer_table_offset: usize,
pub ring_base_offset: usize,
pub var_pool_offset: usize,
pub total_size: usize,
}
impl SegmentLayout {
pub fn compute(max_guests: u8, bipbuf_capacity: u32, size_classes: &[SizeClassConfig]) -> Self {
let peer_table_offset = SEGMENT_HEADER_SIZE; let peer_entries_size = max_guests as usize * 64; let ring_base_offset = peer_table_offset + peer_entries_size;
let rings_size = max_guests as usize * bipbuf_pair_size(bipbuf_capacity);
let var_pool_offset = align_up(ring_base_offset + rings_size, 64);
let pool_size = VarSlotPool::required_size(size_classes);
let total_size = var_pool_offset + pool_size;
Self {
peer_table_offset,
ring_base_offset,
var_pool_offset,
total_size,
}
}
}
pub struct SegmentConfig<'a> {
pub max_guests: u8,
pub bipbuf_capacity: u32,
pub max_payload_size: u32,
pub inline_threshold: u32,
pub heartbeat_interval: u64,
pub size_classes: &'a [SizeClassConfig],
}
#[derive(Debug)]
pub enum AttachError {
Io(io::Error),
BadHeader(&'static str),
}
impl From<io::Error> for AttachError {
fn from(e: io::Error) -> Self {
AttachError::Io(e)
}
}
impl std::fmt::Display for AttachError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AttachError::Io(e) => write!(f, "I/O error: {e}"),
AttachError::BadHeader(msg) => write!(f, "bad segment header: {msg}"),
}
}
}
impl std::error::Error for AttachError {}
pub struct Segment {
mmap: MmapRegion,
header: *mut SegmentHeader,
#[allow(dead_code)]
max_guests: u8,
bipbuf_capacity: u32,
peer_table: PeerTable,
var_pool: VarSlotPool,
#[allow(dead_code)]
size_classes: Vec<SizeClassConfig>,
}
unsafe impl Send for Segment {}
unsafe impl Sync for Segment {}
impl Segment {
fn refresh_views_after_remap(&mut self) {
let region = self.mmap.region();
self.header = unsafe { region.get_mut::<SegmentHeader>(0) };
let peer_table_offset = self.header().peer_table_offset as usize;
let var_pool_offset = self.header().var_pool_offset as usize;
self.bipbuf_capacity = self.header().bipbuf_capacity;
self.peer_table = unsafe { PeerTable::attach(region, peer_table_offset, self.max_guests) };
self.var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &self.size_classes) };
}
pub fn create(
path: &Path,
config: SegmentConfig<'_>,
cleanup: FileCleanup,
) -> io::Result<Self> {
let layout = SegmentLayout::compute(
config.max_guests,
config.bipbuf_capacity,
config.size_classes,
);
let mut mmap = MmapRegion::create(path, layout.total_size, cleanup)?;
let region = mmap.region();
let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
unsafe {
(*header).init(SegmentHeaderInit {
total_size: layout.total_size as u64,
max_payload_size: config.max_payload_size,
inline_threshold: config.inline_threshold,
max_guests: config.max_guests as u32,
bipbuf_capacity: config.bipbuf_capacity,
peer_table_offset: layout.peer_table_offset as u64,
var_pool_offset: layout.var_pool_offset as u64,
heartbeat_interval: config.heartbeat_interval,
num_var_slot_classes: config.size_classes.len() as u32,
});
}
let peer_table = unsafe {
PeerTable::init(
region,
layout.peer_table_offset,
config.max_guests,
layout.ring_base_offset,
config.bipbuf_capacity,
)
};
let var_pool =
unsafe { VarSlotPool::init(region, layout.var_pool_offset, config.size_classes) };
if cleanup == FileCleanup::Manual {
mmap.take_ownership();
}
Ok(Self {
mmap,
header,
max_guests: config.max_guests,
bipbuf_capacity: config.bipbuf_capacity,
peer_table,
var_pool,
size_classes: config.size_classes.to_vec(),
})
}
pub fn attach(path: &Path) -> Result<Self, AttachError> {
let mmap = MmapRegion::attach(path)?;
let region = mmap.region();
let header: *mut SegmentHeader = unsafe { region.get_mut::<SegmentHeader>(0) };
unsafe { &*header }
.validate()
.map_err(AttachError::BadHeader)?;
let max_guests = unsafe { (*header).max_guests as u8 };
let bipbuf_capacity = unsafe { (*header).bipbuf_capacity };
let peer_table_offset = unsafe { (*header).peer_table_offset as usize };
let var_pool_offset = unsafe { (*header).var_pool_offset as usize };
let num_var_slot_classes = unsafe { (*header).num_var_slot_classes };
let size_classes =
unsafe { VarSlotPool::discover_configs(region, var_pool_offset, num_var_slot_classes) }
.map_err(AttachError::BadHeader)?;
let peer_table = unsafe { PeerTable::attach(region, peer_table_offset, max_guests) };
let var_pool = unsafe { VarSlotPool::attach(region, var_pool_offset, &size_classes) };
Ok(Self {
mmap,
header,
max_guests,
bipbuf_capacity,
peer_table,
var_pool,
size_classes,
})
}
#[inline]
pub fn header(&self) -> &SegmentHeader {
unsafe { &*self.header }
}
#[cfg(unix)]
pub fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.mmap.as_raw_fd()
}
pub fn path(&self) -> &Path {
self.mmap.path()
}
#[inline]
pub fn peer_table(&self) -> &PeerTable {
&self.peer_table
}
#[inline]
pub fn var_pool(&self) -> &VarSlotPool {
&self.var_pool
}
pub fn reserve_peer(&self) -> Option<PeerId> {
let peer_id = self.peer_table.find_empty()?;
self.peer_table.entry(peer_id).try_reserve().ok()?;
Some(peer_id)
}
pub fn release_reserved_peer(&self, peer_id: PeerId) {
self.peer_table.entry(peer_id).release_reserved();
}
pub fn claim_peer(&self, peer_id: PeerId) -> Result<(), PeerState> {
self.peer_table.entry(peer_id).try_claim_reserved()
}
pub fn attach_peer(&self) -> Option<PeerId> {
let peer_id = self.peer_table.find_empty()?;
self.peer_table.entry(peer_id).try_attach().ok()?;
Some(peer_id)
}
pub fn detach_peer(&self, peer_id: PeerId) {
self.peer_table.entry(peer_id).set_goodbye();
}
pub fn set_host_goodbye(&self) {
self.header()
.host_goodbye
.store(1, shm_primitives::sync::Ordering::Release);
}
pub fn is_peer_heartbeat_stale(&self, peer_id: PeerId, current_ns: u64) -> bool {
let interval = self.header().heartbeat_interval;
self.peer_table
.entry(peer_id)
.is_heartbeat_stale(current_ns, interval)
}
pub fn recover_crashed_peer(&self, peer_id: PeerId) {
let entry = self.peer_table.entry(peer_id);
entry.set_goodbye();
{
let h2g = self.h2g_bipbuf(peer_id);
let (_, mut consumer) = h2g.split();
while let Some(frame) = framing::read_frame(&mut consumer) {
match frame {
OwnedFrame::SlotRef(slot_ref) => {
let _ = self.var_pool.free(slot_ref);
}
OwnedFrame::MmapRef(_) => {
}
OwnedFrame::Inline(_) => {}
}
}
h2g.reset();
}
self.g2h_bipbuf(peer_id).reset();
self.var_pool.reclaim_peer_slots(peer_id.get());
entry.reset();
}
pub fn grow_segment(&mut self, new_size: usize) -> io::Result<()> {
self.mmap.resize(new_size)?;
self.refresh_views_after_remap();
self.header()
.current_size
.store(new_size as u64, shm_primitives::sync::Ordering::Release);
Ok(())
}
pub fn check_and_remap(&mut self) -> io::Result<bool> {
let published = self
.header()
.current_size
.load(shm_primitives::sync::Ordering::Acquire);
let mapped = self.mmap.len();
if published as usize > mapped {
self.mmap.resize(published as usize)?;
self.refresh_views_after_remap();
Ok(true)
} else {
Ok(false)
}
}
pub fn g2h_bipbuf(&self, peer_id: PeerId) -> BipBuf {
let entry = self.peer_table.entry(peer_id);
let g2h_offset = entry.ring_offset as usize;
let region = self.mmap.region();
unsafe { BipBuf::attach(region, g2h_offset) }
}
pub fn h2g_bipbuf(&self, peer_id: PeerId) -> BipBuf {
let entry = self.peer_table.entry(peer_id);
let g2h_offset = entry.ring_offset as usize;
let h2g_offset = g2h_offset + crate::peer_table::bipbuf_single_stride(self.bipbuf_capacity);
let region = self.mmap.region();
unsafe { BipBuf::attach(region, h2g_offset) }
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use std::path::PathBuf;
use shm_primitives::{FileCleanup, MAGIC, MmapRegion, PeerState};
use super::{AttachError, Segment, SegmentConfig, SegmentLayout};
use crate::varslot::SizeClassConfig;
fn test_size_classes() -> [SizeClassConfig; 2] {
[
SizeClassConfig {
slot_size: 1024,
slot_count: 8,
},
SizeClassConfig {
slot_size: 16384,
slot_count: 4,
},
]
}
fn test_path(name: &str) -> (tempfile::TempDir, PathBuf) {
let dir = tempfile::tempdir().expect("create tempdir");
let path = dir.path().join(name);
(dir, path)
}
fn make_config<'a>(size_classes: &'a [SizeClassConfig]) -> SegmentConfig<'a> {
SegmentConfig {
max_guests: 4,
bipbuf_capacity: 4096,
max_payload_size: 64 * 1024,
inline_threshold: 0,
heartbeat_interval: 1_000_000,
size_classes,
}
}
#[test]
fn layout_compute_produces_aligned_monotonic_offsets() {
let size_classes = test_size_classes();
let layout = SegmentLayout::compute(4, 4096, &size_classes);
assert_eq!(
layout.peer_table_offset,
shm_primitives::SEGMENT_HEADER_SIZE
);
assert!(layout.ring_base_offset >= layout.peer_table_offset);
assert!(layout.var_pool_offset >= layout.ring_base_offset);
assert!(layout.var_pool_offset.is_multiple_of(64));
assert!(layout.total_size > layout.var_pool_offset);
}
#[test]
fn create_then_attach_roundtrips_header_and_offsets() {
let size_classes = test_size_classes();
let (_tmp, path) = test_path("roundtrip.segment");
let host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
.expect("create segment");
let guest = Segment::attach(&path).expect("attach segment");
assert_eq!(host.header().magic, MAGIC);
assert_eq!(guest.header().magic, MAGIC);
assert_eq!(host.header().max_guests, 4);
assert_eq!(guest.header().max_guests, 4);
assert_eq!(host.header().bipbuf_capacity, 4096);
assert_eq!(guest.header().bipbuf_capacity, 4096);
assert_eq!(
host.header().peer_table_offset,
guest.header().peer_table_offset
);
assert_eq!(
host.header().var_pool_offset,
guest.header().var_pool_offset
);
}
#[test]
fn attach_rejects_corrupted_header_magic() {
let size_classes = test_size_classes();
let (_tmp, path) = test_path("corrupt.segment");
let _segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
.expect("create segment");
let mmap = MmapRegion::attach(&path).expect("attach raw mmap");
let region = mmap.region();
let header = unsafe { region.get_mut::<shm_primitives::SegmentHeader>(0) };
header.magic[0] ^= 0xFF;
drop(mmap);
let err = match Segment::attach(&path) {
Ok(_) => panic!("corrupted header must fail attach"),
Err(err) => err,
};
assert!(
matches!(err, AttachError::BadHeader(_)),
"unexpected err: {err:?}"
);
}
#[test]
fn peer_lifecycle_reserve_release_claim_detach_and_attach() {
let size_classes = test_size_classes();
let (_tmp, path) = test_path("peer-lifecycle.segment");
let segment = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
.expect("create segment");
let reserved = segment.reserve_peer().expect("reserve peer");
assert_eq!(
segment.peer_table().entry(reserved).state(),
PeerState::Reserved
);
segment.release_reserved_peer(reserved);
assert_eq!(
segment.peer_table().entry(reserved).state(),
PeerState::Empty
);
let reserved_again = segment.reserve_peer().expect("reserve peer again");
segment
.claim_peer(reserved_again)
.expect("claim reserved peer");
assert_eq!(
segment.peer_table().entry(reserved_again).state(),
PeerState::Attached
);
segment.detach_peer(reserved_again);
assert_eq!(
segment.peer_table().entry(reserved_again).state(),
PeerState::Goodbye
);
let attached = segment.attach_peer().expect("attach walk-in peer");
assert_eq!(
segment.peer_table().entry(attached).state(),
PeerState::Attached
);
}
#[test]
fn grow_segment_publishes_size_and_guest_remaps() {
let size_classes = test_size_classes();
let (_tmp, path) = test_path("grow.segment");
let mut host = Segment::create(&path, make_config(&size_classes), FileCleanup::Manual)
.expect("create segment");
let mut guest = Segment::attach(&path).expect("attach guest");
let old_size = host.header().current_size() as usize;
let new_size = old_size + 64 * 1024;
host.grow_segment(new_size).expect("grow segment");
assert_eq!(host.header().current_size() as usize, new_size);
let remapped = guest.check_and_remap().expect("guest remap check");
assert!(remapped, "guest should remap after host growth");
let remapped_again = guest.check_and_remap().expect("guest remap check again");
assert!(!remapped_again, "no additional growth should produce false");
}
}