extern crate alloc;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::sync::atomic::{AtomicU32, Ordering};
use std::path::PathBuf;
use std::sync::Mutex;
use shared_memory::{Shmem, ShmemConf, ShmemError};
use crate::allocator::{SlotError, SlotHandle};
use crate::backend::SlotBackend;
use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
const SEGMENT_MAGIC: u32 = 0x5A44_5353;
#[derive(Debug)]
#[non_exhaustive]
pub enum PosixSlotError {
Shm(ShmemError),
CapacityOverflow,
InvalidHeader,
Slot(SlotError),
}
impl core::fmt::Display for PosixSlotError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Shm(e) => write!(f, "shm error: {e}"),
Self::CapacityOverflow => f.write_str("slot capacity overflows u32"),
Self::InvalidHeader => f.write_str("segment magic/version mismatch"),
Self::Slot(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for PosixSlotError {}
impl From<ShmemError> for PosixSlotError {
fn from(e: ShmemError) -> Self {
Self::Shm(e)
}
}
impl From<SlotError> for PosixSlotError {
fn from(e: SlotError) -> Self {
Self::Slot(e)
}
}
pub struct PosixSlotAllocator {
shmem: Option<Shmem>,
flink: PathBuf,
loaned: Mutex<Vec<bool>>,
slot_count: u32,
slot_total_size: u32,
slot_capacity: u32,
}
unsafe impl Send for PosixSlotAllocator {}
unsafe impl Sync for PosixSlotAllocator {}
impl PosixSlotAllocator {
pub fn create<P: Into<PathBuf>>(
flink_path: P,
slot_count: usize,
slot_capacity: usize,
) -> Result<Self, PosixSlotError> {
let flink_path = flink_path.into();
if let Some(parent) = flink_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let slot_capacity_u32 =
u32::try_from(slot_capacity).map_err(|_| PosixSlotError::CapacityOverflow)?;
let slot_count_u32 =
u32::try_from(slot_count).map_err(|_| PosixSlotError::CapacityOverflow)?;
let slot_total_size = align_up(SLOT_HEADER_SIZE + slot_capacity, 64);
let slot_total_size_u32 =
u32::try_from(slot_total_size).map_err(|_| PosixSlotError::CapacityOverflow)?;
let header_size = 0x10usize;
let total_size = header_size + slot_count * slot_total_size;
let shmem = ShmemConf::new()
.size(total_size)
.flink(&flink_path)
.create()?;
unsafe {
let base = shmem.as_ptr();
let p = base as *mut u32;
p.add(0).write(SEGMENT_MAGIC);
p.add(1).write(slot_count_u32);
p.add(2).write(slot_total_size_u32);
p.add(3).write(0); core::ptr::write_bytes(base.add(header_size), 0u8, slot_count * slot_total_size);
}
Ok(Self {
shmem: Some(shmem),
flink: flink_path,
loaned: Mutex::new(alloc::vec![false; slot_count]),
slot_count: slot_count_u32,
slot_total_size: slot_total_size_u32,
slot_capacity: slot_capacity_u32,
})
}
pub fn attach<P: Into<PathBuf>>(flink_path: P) -> Result<Self, PosixSlotError> {
let flink_path = flink_path.into();
let shmem = ShmemConf::new().flink(&flink_path).open()?;
let (magic, slot_count, slot_total_size, _next_sn) = unsafe {
let p = shmem.as_ptr() as *const u32;
(
p.add(0).read(),
p.add(1).read(),
p.add(2).read(),
p.add(3).read(),
)
};
if magic != SEGMENT_MAGIC {
return Err(PosixSlotError::InvalidHeader);
}
let slot_capacity = slot_total_size.saturating_sub(SLOT_HEADER_SIZE as u32);
Ok(Self {
shmem: Some(shmem),
flink: flink_path,
loaned: Mutex::new(alloc::vec![false; slot_count as usize]),
slot_count,
slot_total_size,
slot_capacity,
})
}
#[must_use]
pub fn flink_path(&self) -> &str {
self.flink.to_str().unwrap_or("")
}
pub fn segment_path(&self) -> String {
self.flink_path().to_string()
}
fn slot_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
if idx >= self.slot_count {
return Err(SlotError::OutOfBounds);
}
let header_size = 0x10usize;
let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
let base = shmem.as_ptr();
unsafe { Ok(base.add(header_size + (idx as usize) * (self.slot_total_size as usize))) }
}
fn read_header(&self, idx: u32) -> Result<SlotHeader, SlotError> {
let p = self.slot_ptr(idx)?;
let header = unsafe { core::ptr::read(p as *const SlotHeader) };
Ok(header)
}
fn write_header(&self, idx: u32, header: SlotHeader) -> Result<(), SlotError> {
let p = self.slot_ptr(idx)?;
unsafe {
core::ptr::write(p as *mut SlotHeader, header);
}
Ok(())
}
fn next_sn_inc(&self) -> Result<u32, SlotError> {
let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
let sn_ptr = unsafe { shmem.as_ptr().add(12) as *const AtomicU32 };
let atomic = unsafe { &*sn_ptr };
Ok(atomic.fetch_add(1, Ordering::Relaxed))
}
fn data_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
let p = self.slot_ptr(idx)?;
Ok(unsafe { p.add(SLOT_HEADER_SIZE) })
}
}
impl SlotBackend for PosixSlotAllocator {
fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
for idx in 0..self.slot_count {
if loaned[idx as usize] {
continue;
}
let header = self.read_header(idx)?;
if header.sample_size == 0 || header.all_read(active_readers_mask) {
loaned[idx as usize] = true;
return Ok(SlotHandle {
segment_id: 0,
slot_index: idx,
});
}
}
Err(SlotError::NoFreeSlot)
}
fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
if bytes.len() > self.slot_capacity as usize {
return Err(SlotError::SampleTooLarge {
sample: bytes.len(),
slot_capacity: self.slot_capacity as usize,
});
}
let sn = self.next_sn_inc()?;
let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
let header = SlotHeader::new(sn, sample_size);
let dp = self.data_ptr(handle.slot_index)?;
unsafe {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), dp, bytes.len());
}
self.write_header(handle.slot_index, header)?;
let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
loaned[handle.slot_index as usize] = false;
Ok(sn)
}
fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
if (handle.slot_index as usize) >= loaned.len() {
return Err(SlotError::OutOfBounds);
}
loaned[handle.slot_index as usize] = false;
Ok(())
}
fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
let header = self.read_header(handle.slot_index)?;
let n = (header.sample_size as usize).min(self.slot_capacity as usize);
let dp = self.data_ptr(handle.slot_index)?;
let mut buf = alloc::vec![0u8; n];
unsafe {
core::ptr::copy_nonoverlapping(dp, buf.as_mut_ptr(), n);
}
Ok((header, buf))
}
fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
debug_assert!(reader_index < 32);
let p = self.slot_ptr(handle.slot_index)?;
let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
let atomic = unsafe { &*mask_ptr };
atomic.fetch_or(1u32 << reader_index, Ordering::Relaxed);
Ok(())
}
fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
debug_assert!(reader_index < 32);
let bit = 1u32 << reader_index;
for idx in 0..self.slot_count {
let p = self.slot_ptr(idx)?;
let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
let atomic = unsafe { &*mask_ptr };
atomic.fetch_or(bit, Ordering::Relaxed);
}
Ok(())
}
fn slot_count(&self) -> Result<usize, SlotError> {
Ok(self.slot_count as usize)
}
fn slot_total_size(&self) -> usize {
self.slot_total_size as usize
}
fn slot_capacity(&self) -> usize {
self.slot_capacity as usize
}
}
fn align_up(x: usize, n: usize) -> usize {
debug_assert!(n.is_power_of_two());
(x + n - 1) & !(n - 1)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use core::sync::atomic::{AtomicU64, Ordering};
fn unique_flink() -> PathBuf {
static N: AtomicU64 = AtomicU64::new(0);
let pid = std::process::id();
let n = N.fetch_add(1, Ordering::Relaxed);
let mut p = std::env::temp_dir();
p.push(alloc::format!("zerodds-flatdata-test-{pid}-{n}"));
p
}
#[test]
fn create_attach_roundtrip() {
let flink = unique_flink();
let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
assert_eq!(SlotBackend::slot_count(&owner).unwrap(), 4);
assert_eq!(SlotBackend::slot_count(&consumer).unwrap(), 4);
assert_eq!(SlotBackend::slot_total_size(&owner), 128);
}
#[test]
fn write_read_through_shm() {
let flink = unique_flink();
let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
let h = SlotBackend::reserve_slot(&owner, 0b1).expect("reserve");
let _sn = SlotBackend::commit_slot(&owner, h, &[1, 2, 3, 4]).expect("commit");
let (header, bytes) = SlotBackend::read_slot(&consumer, h).expect("read");
assert_eq!(header.sample_size, 4);
assert_eq!(bytes, vec![1, 2, 3, 4]);
}
#[test]
fn mark_read_visible_to_owner() {
let flink = unique_flink();
let owner = PosixSlotAllocator::create(&flink, 1, 64).expect("create");
let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
let h = SlotBackend::reserve_slot(&owner, 0b011).expect("reserve");
SlotBackend::commit_slot(&owner, h, &[0xFF]).expect("commit");
SlotBackend::mark_read(&consumer, h, 0).expect("mark0");
SlotBackend::mark_read(&consumer, h, 1).expect("mark1");
let (header, _) = SlotBackend::read_slot(&owner, h).unwrap();
assert_eq!(header.reader_mask, 0b011);
let _ = SlotBackend::reserve_slot(&owner, 0b011).expect("reuse");
}
#[test]
fn next_sn_increments_atomically() {
let flink = unique_flink();
let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
let h0 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
let sn0 = SlotBackend::commit_slot(&owner, h0, &[0]).unwrap();
let h1 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
let sn1 = SlotBackend::commit_slot(&owner, h1, &[1]).unwrap();
assert!(sn1 > sn0);
}
}