use alloc::sync::Arc;
use alloc::vec::Vec;
use core::marker::PhantomData;
use core::ops::Deref;
use core::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Reliability {
Reliable,
BestEffort,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteOutcome {
Written(u32),
Dropped,
TimedOut,
}
use crate::FlatStruct;
use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
use crate::backend::SlotBackend;
use crate::slot::ReaderMask;
pub struct FlatWriter<T: FlatStruct> {
alloc: Arc<InMemorySlotAllocator>,
active_readers_mask: ReaderMask,
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> FlatWriter<T> {
pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
Self {
alloc,
active_readers_mask,
_t: PhantomData,
}
}
pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
let bytes = sample.as_bytes();
let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
match self.alloc.commit_slot(handle, bytes) {
Ok(sn) => Ok(sn),
Err(e) => {
let _ = self.alloc.discard_slot(handle);
Err(e)
}
}
}
pub fn write_bp(
&self,
sample: &T,
reliability: Reliability,
timeout: Duration,
) -> Result<WriteOutcome, SlotError> {
let deadline = std::time::Instant::now() + timeout;
loop {
match self.write(sample) {
Ok(sn) => return Ok(WriteOutcome::Written(sn)),
Err(SlotError::NoFreeSlot) => {}
Err(e) => return Err(e),
}
if reliability == Reliability::BestEffort {
return Ok(WriteOutcome::Dropped);
}
let now = std::time::Instant::now();
if now >= deadline {
return Ok(WriteOutcome::TimedOut);
}
let g = self.alloc.notify_gen();
match self.write(sample) {
Ok(sn) => return Ok(WriteOutcome::Written(sn)),
Err(SlotError::NoFreeSlot) => {}
Err(e) => return Err(e),
}
self.alloc.wait_for_change(g, deadline - now);
}
}
pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
Ok(FlatSlot {
handle,
writer: self,
committed: false,
})
}
}
pub struct FlatSlot<'a, T: FlatStruct> {
handle: SlotHandle,
writer: &'a FlatWriter<T>,
committed: bool,
}
impl<T: FlatStruct> FlatSlot<'_, T> {
pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
let bytes = sample.as_bytes();
let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
self.committed = true;
Ok(sn)
}
pub fn as_mut(&mut self) -> Result<&mut T, SlotError> {
let (ptr, cap) = self.writer.alloc.slot_data_ptr(self.handle)?;
if cap < T::WIRE_SIZE {
return Err(SlotError::SampleTooLarge {
sample: T::WIRE_SIZE,
slot_capacity: cap,
});
}
unsafe {
core::ptr::write_bytes(ptr, 0, T::WIRE_SIZE);
Ok(&mut *ptr.cast::<T>())
}
}
pub fn commit_in_place(mut self) -> Result<u32, SlotError> {
let sn = self
.writer
.alloc
.commit_in_place(self.handle, T::WIRE_SIZE)?;
self.committed = true;
Ok(sn)
}
}
impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
fn drop(&mut self) {
if !self.committed {
let _ = self.writer.alloc.discard_slot(self.handle);
}
}
}
pub struct FlatReader<T: FlatStruct> {
alloc: Arc<InMemorySlotAllocator>,
reader_index: u8,
last_sn: core::sync::atomic::AtomicU32,
expected_type_hash: [u8; 16],
_t: PhantomData<fn() -> T>,
}
impl<T: FlatStruct> FlatReader<T> {
pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
Self {
alloc,
reader_index,
last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
expected_type_hash: T::TYPE_HASH,
_t: PhantomData,
}
}
#[must_use]
pub fn type_hash(&self) -> [u8; 16] {
self.expected_type_hash
}
pub fn read(&self) -> Result<Option<T>, SlotError> {
Ok(self.scan_best(false)?.map(|(_, _, t)| t))
}
pub fn read_ref(&self) -> Result<Option<FlatSampleRef<T>>, SlotError> {
match self.scan_best(true)? {
Some((handle, _, sample)) => {
let concrete = Arc::clone(&self.alloc);
let backend: Arc<dyn SlotBackend> = concrete;
Ok(Some(FlatSampleRef::with_release(
sample,
backend,
handle,
self.reader_index,
)))
}
None => Ok(None),
}
}
pub fn read_blocking(&self, timeout: Duration) -> Result<Option<T>, SlotError> {
let deadline = std::time::Instant::now() + timeout;
loop {
if let Some(sample) = self.read()? {
return Ok(Some(sample));
}
let now = std::time::Instant::now();
if now >= deadline {
return Ok(None);
}
let g = self.alloc.notify_gen();
if let Some(sample) = self.read()? {
return Ok(Some(sample));
}
self.alloc.wait_for_change(g, deadline - now);
}
}
fn scan_best(&self, defer_best: bool) -> Result<Option<(SlotHandle, u32, T)>, SlotError> {
if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
if backend_hash != self.expected_type_hash {
return Err(SlotError::SampleTooLarge {
sample: 0,
slot_capacity: 0,
});
}
}
let count = self.alloc.slot_count()?;
let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
let mut best: Option<(SlotHandle, u32, T)> = None;
let mut to_mark: Vec<SlotHandle> = Vec::new();
for idx in 0..count {
let handle = SlotHandle {
segment_id: 0,
slot_index: idx as u32,
};
let (header, bytes) = self.alloc.read_slot(handle)?;
if header.sample_size == 0 {
continue; }
if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
continue; }
if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
continue; }
let sample = unsafe { T::from_bytes_unchecked(&bytes) };
to_mark.push(handle);
let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
let beats_current = best
.as_ref()
.is_none_or(|(_, b_sn, _)| header.sequence_number > *b_sn);
if unseen && beats_current {
best = Some((handle, header.sequence_number, sample));
}
}
let best_handle = best.as_ref().map(|(h, _, _)| *h);
for handle in to_mark {
if defer_best && Some(handle) == best_handle {
continue; }
self.alloc.mark_read(handle, self.reader_index)?;
}
if let Some((_, sn, _)) = best.as_ref() {
self.last_sn
.store(*sn, core::sync::atomic::Ordering::Relaxed);
}
Ok(best)
}
}
struct DeferredRelease {
backend: Arc<dyn SlotBackend>,
handle: SlotHandle,
reader_index: u8,
}
pub struct FlatSampleRef<T: FlatStruct> {
sample: T,
release: Option<DeferredRelease>,
}
impl<T: FlatStruct> FlatSampleRef<T> {
#[must_use]
pub fn new(sample: T) -> Self {
Self {
sample,
release: None,
}
}
#[must_use]
pub(crate) fn with_release(
sample: T,
backend: Arc<dyn SlotBackend>,
handle: SlotHandle,
reader_index: u8,
) -> Self {
Self {
sample,
release: Some(DeferredRelease {
backend,
handle,
reader_index,
}),
}
}
#[must_use]
pub fn into_inner(self) -> T {
self.sample
}
}
impl<T: FlatStruct> Deref for FlatSampleRef<T> {
type Target = T;
fn deref(&self) -> &T {
&self.sample
}
}
impl<T: FlatStruct> Drop for FlatSampleRef<T> {
fn drop(&mut self) {
if let Some(r) = &self.release {
let _ = r.backend.mark_read(r.handle, r.reader_index);
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[repr(C)]
struct Pose {
x: i64,
y: i64,
z: i64,
}
unsafe impl FlatStruct for Pose {
const TYPE_HASH: [u8; 16] = [0x42; 16];
}
fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
}
#[test]
fn writer_write_then_reader_read() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let p = Pose { x: 1, y: 2, z: 3 };
let _sn = writer.write(&p).expect("write");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, p);
}
#[test]
fn reader_does_not_re_read_same_slot() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
let _ = reader.read().expect("first read").expect("some");
let second = reader.read().expect("second read");
assert!(second.is_none());
}
#[test]
fn writer_loan_commit_pattern() {
let alloc = fresh_alloc(2);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let slot = writer.loan_slot().expect("loan");
let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
}
#[test]
fn writer_loan_in_place_zero_copy() {
let alloc = fresh_alloc(2);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let mut slot = writer.loan_slot().expect("loan");
{
let p = slot.as_mut().expect("in-place view");
p.x = 11;
p.y = 22;
p.z = 33;
}
let _sn = slot.commit_in_place().expect("commit_in_place");
let got = reader.read().expect("read").expect("some");
assert_eq!(
got,
Pose {
x: 11,
y: 22,
z: 33
}
);
}
#[test]
fn loan_drop_without_commit_releases_slot() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
{
let _slot = writer.loan_slot().expect("loan");
}
let _ = writer.loan_slot().expect("re-loan after drop");
}
#[test]
fn reader_recycles_slot_after_read() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
let _ = reader.read().expect("r1").expect("some");
writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
let got = reader.read().expect("r2").expect("some");
assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
}
#[test]
fn flat_sample_ref_deref() {
let p = Pose { x: 1, y: 2, z: 3 };
let r = FlatSampleRef::new(p);
assert_eq!(r.x, 1);
assert_eq!(r.into_inner(), p);
}
#[test]
fn read_ref_holds_slot_until_drop() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
let sref = reader.read_ref().expect("read_ref").expect("some");
assert_eq!(sref.x, 1);
assert_eq!(sref.z, 3);
assert!(matches!(
writer.write(&Pose { x: 9, y: 9, z: 9 }),
Err(SlotError::NoFreeSlot)
));
drop(sref);
writer
.write(&Pose { x: 4, y: 5, z: 6 })
.expect("write after ref drop");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, Pose { x: 4, y: 5, z: 6 });
}
#[test]
fn read_ref_into_inner_releases_slot() {
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("write");
let sref = reader.read_ref().expect("read_ref").expect("some");
let owned = sref.into_inner();
assert_eq!(owned, Pose { x: 1, y: 1, z: 1 });
writer
.write(&Pose { x: 2, y: 2, z: 2 })
.expect("write after into_inner");
}
#[test]
fn reader_rejects_type_hash_mismatch() {
let wrong_hash = [0xBB; 16];
let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let res = reader.read();
assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
}
#[test]
fn reader_accepts_matching_type_hash() {
let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let res = reader.read().expect("no schema drift");
assert!(res.is_none());
}
#[test]
fn reader_without_backend_hash_does_not_reject() {
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
let got = reader.read().expect("read").expect("some");
assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
}
#[test]
fn read_blocking_wakes_on_commit() {
use std::time::Duration;
let alloc = fresh_alloc(4);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let w_alloc = Arc::clone(&alloc);
let h = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
FlatWriter::<Pose>::new(w_alloc, 0b1)
.write(&Pose { x: 7, y: 8, z: 9 })
.expect("write");
});
let _ = writer;
let start = std::time::Instant::now();
let got = reader
.read_blocking(Duration::from_secs(5))
.expect("read_blocking")
.expect("woken with a sample");
assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
assert!(
start.elapsed() < Duration::from_secs(2),
"should wake on notify, not spin to timeout"
);
h.join().unwrap();
}
#[test]
fn read_blocking_times_out_without_writer() {
use std::time::Duration;
let alloc = fresh_alloc(2);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
let start = std::time::Instant::now();
let got = reader.read_blocking(Duration::from_millis(60)).expect("rb");
assert!(got.is_none());
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn write_bp_best_effort_drops_when_full() {
use std::time::Duration;
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
assert!(matches!(
writer
.write_bp(
&Pose { x: 1, y: 1, z: 1 },
Reliability::BestEffort,
Duration::ZERO
)
.unwrap(),
WriteOutcome::Written(_)
));
assert_eq!(
writer
.write_bp(
&Pose { x: 2, y: 2, z: 2 },
Reliability::BestEffort,
Duration::ZERO
)
.unwrap(),
WriteOutcome::Dropped
);
}
#[test]
fn write_bp_reliable_blocks_until_reader_frees() {
use std::time::Duration;
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
let r_alloc = Arc::clone(&alloc);
let h = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
let r = FlatReader::<Pose>::new(r_alloc, 0);
r.read().expect("read").expect("some"); });
let _ = reader;
let start = std::time::Instant::now();
let outcome = writer
.write_bp(
&Pose { x: 2, y: 2, z: 2 },
Reliability::Reliable,
Duration::from_secs(5),
)
.expect("write_bp");
assert!(matches!(outcome, WriteOutcome::Written(_)));
assert!(start.elapsed() < Duration::from_secs(2));
h.join().unwrap();
}
#[test]
fn write_bp_reliable_times_out() {
use std::time::Duration;
let alloc = fresh_alloc(1);
let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
let start = std::time::Instant::now();
let outcome = writer
.write_bp(
&Pose { x: 2, y: 2, z: 2 },
Reliability::Reliable,
Duration::from_millis(60),
)
.expect("write_bp");
assert_eq!(outcome, WriteOutcome::TimedOut);
assert!(start.elapsed() >= Duration::from_millis(50));
}
}