use alloc::sync::Arc;
use core::marker::PhantomData;
use core::ops::Deref;
use crate::FlatStruct;
use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
use crate::backend::SlotBackend;
use crate::slot::ReaderMask;
pub struct FlatWriter<T: FlatStruct> {
alloc: Arc<InMemorySlotAllocator>,
active_readers_mask: ReaderMask,
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> FlatWriter<T> {
pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
Self {
alloc,
active_readers_mask,
_t: PhantomData,
}
}
pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
let bytes = sample.as_bytes();
let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
match self.alloc.commit_slot(handle, bytes) {
Ok(sn) => Ok(sn),
Err(e) => {
let _ = self.alloc.discard_slot(handle);
Err(e)
}
}
}
pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
Ok(FlatSlot {
handle,
writer: self,
committed: false,
})
}
}
pub struct FlatSlot<'a, T: FlatStruct> {
handle: SlotHandle,
writer: &'a FlatWriter<T>,
committed: bool,
}
impl<T: FlatStruct> FlatSlot<'_, T> {
pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
let bytes = sample.as_bytes();
let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
self.committed = true;
Ok(sn)
}
}
impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
fn drop(&mut self) {
if !self.committed {
let _ = self.writer.alloc.discard_slot(self.handle);
}
}
}
pub struct FlatReader<T: FlatStruct> {
alloc: Arc<InMemorySlotAllocator>,
reader_index: u8,
last_sn: core::sync::atomic::AtomicU32,
expected_type_hash: [u8; 16],
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> FlatReader<T> {
pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
Self {
alloc,
reader_index,
last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
expected_type_hash: T::TYPE_HASH,
_t: PhantomData,
}
}
#[must_use]
pub fn type_hash(&self) -> [u8; 16] {
self.expected_type_hash
}
pub fn read(&self) -> Result<Option<T>, SlotError> {
if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
if backend_hash != self.expected_type_hash {
return Err(SlotError::SampleTooLarge {
sample: 0,
slot_capacity: 0,
});
}
}
let count = self.alloc.slot_count()?;
let mut best: Option<(u32, T)> = None;
let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
for idx in 0..count {
let handle = SlotHandle {
segment_id: 0,
slot_index: idx as u32,
};
let (header, bytes) = self.alloc.read_slot(handle)?;
if header.sample_size == 0 {
continue; }
if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
continue;
}
if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
continue;
}
let sample = unsafe { T::from_bytes_unchecked(&bytes) };
let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
let beats_current = best
.as_ref()
.is_none_or(|(b_sn, _)| header.sequence_number > *b_sn);
if unseen && beats_current {
best = Some((header.sequence_number, sample));
}
self.alloc.mark_read(handle, self.reader_index)?;
}
if let Some((sn, _)) = best.as_ref() {
self.last_sn
.store(*sn, core::sync::atomic::Ordering::Relaxed);
}
Ok(best.map(|(_, t)| t))
}
}
pub struct FlatSampleRef<T: FlatStruct> {
sample: T,
}
impl<T: FlatStruct> FlatSampleRef<T> {
#[must_use]
pub fn new(sample: T) -> Self {
Self { sample }
}
#[must_use]
pub fn into_inner(self) -> T {
self.sample
}
}
impl<T: FlatStruct> Deref for FlatSampleRef<T> {
type Target = T;
fn deref(&self) -> &T {
&self.sample
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[repr(C)]
struct Pose {
x: i64,
y: i64,
z: i64,
}
unsafe impl FlatStruct for Pose {
const TYPE_HASH: [u8; 16] = [0x42; 16];
}
fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
}
#[test]
fn writer_write_then_reader_read() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let p = Pose { x: 1, y: 2, z: 3 };
let _sn = writer.write(&p).expect("write");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, p);
}
#[test]
fn reader_does_not_re_read_same_slot() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
let _ = reader.read().expect("first read").expect("some");
let second = reader.read().expect("second read");
assert!(second.is_none());
}
#[test]
fn writer_loan_commit_pattern() {
let alloc = fresh_alloc(2);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let slot = writer.loan_slot().expect("loan");
let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
}
#[test]
fn loan_drop_without_commit_releases_slot() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
{
let _slot = writer.loan_slot().expect("loan");
}
let _ = writer.loan_slot().expect("re-loan after drop");
}
#[test]
fn reader_recycles_slot_after_read() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
let _ = reader.read().expect("r1").expect("some");
writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
let got = reader.read().expect("r2").expect("some");
assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
}
#[test]
fn flat_sample_ref_deref() {
let p = Pose { x: 1, y: 2, z: 3 };
let r = FlatSampleRef::new(p);
assert_eq!(r.x, 1);
assert_eq!(r.into_inner(), p);
}
#[test]
fn reader_rejects_type_hash_mismatch() {
let wrong_hash = [0xBB; 16];
let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let res = reader.read();
assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
}
#[test]
fn reader_accepts_matching_type_hash() {
let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let res = reader.read().expect("no schema drift");
assert!(res.is_none());
}
#[test]
fn reader_without_backend_hash_does_not_reject() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
}
}