1use alloc::sync::Arc;
8use core::marker::PhantomData;
9use core::ops::Deref;
10
11use crate::FlatStruct;
12use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
13use crate::backend::SlotBackend;
14use crate::slot::ReaderMask;
15
16pub struct FlatWriter<T: FlatStruct> {
18 alloc: Arc<InMemorySlotAllocator>,
19 active_readers_mask: ReaderMask,
20 _t: PhantomData<fn() -> T>,
21}
22
23impl<T: FlatStruct> FlatWriter<T> {
24 pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
28 Self {
29 alloc,
30 active_readers_mask,
31 _t: PhantomData,
32 }
33 }
34
35 pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
41 let bytes = sample.as_bytes();
42 let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
43 match self.alloc.commit_slot(handle, bytes) {
44 Ok(sn) => Ok(sn),
45 Err(e) => {
46 let _ = self.alloc.discard_slot(handle);
47 Err(e)
48 }
49 }
50 }
51
52 pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
59 let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
60 Ok(FlatSlot {
61 handle,
62 writer: self,
63 committed: false,
64 })
65 }
66}
67
68pub struct FlatSlot<'a, T: FlatStruct> {
71 handle: SlotHandle,
72 writer: &'a FlatWriter<T>,
73 committed: bool,
74}
75
76impl<T: FlatStruct> FlatSlot<'_, T> {
77 pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
82 let bytes = sample.as_bytes();
83 let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
84 self.committed = true;
85 Ok(sn)
86 }
87}
88
89impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
90 fn drop(&mut self) {
91 if !self.committed {
92 let _ = self.writer.alloc.discard_slot(self.handle);
93 }
94 }
95}
96
97pub struct FlatReader<T: FlatStruct> {
99 alloc: Arc<InMemorySlotAllocator>,
100 reader_index: u8,
102 last_sn: core::sync::atomic::AtomicU32,
104 expected_type_hash: [u8; 16],
106 _t: PhantomData<fn() -> T>,
107}
108
109impl<T: FlatStruct> FlatReader<T> {
110 pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
112 Self {
113 alloc,
114 reader_index,
115 last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
116 expected_type_hash: T::TYPE_HASH,
117 _t: PhantomData,
118 }
119 }
120
121 #[must_use]
123 pub fn type_hash(&self) -> [u8; 16] {
124 self.expected_type_hash
125 }
126
127 pub fn read(&self) -> Result<Option<T>, SlotError> {
139 if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
144 if backend_hash != self.expected_type_hash {
145 return Err(SlotError::SampleTooLarge {
146 sample: 0,
147 slot_capacity: 0,
148 });
149 }
150 }
151 let count = self.alloc.slot_count()?;
152 let mut best: Option<(u32, T)> = None;
153 let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
154 for idx in 0..count {
155 let handle = SlotHandle {
156 segment_id: 0,
157 slot_index: idx as u32,
158 };
159 let (header, bytes) = self.alloc.read_slot(handle)?;
160 if header.sample_size == 0 {
161 continue; }
163 if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
165 continue;
166 }
167 if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
169 continue;
170 }
171 let sample = unsafe { T::from_bytes_unchecked(&bytes) };
174 let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
177 let beats_current = best
178 .as_ref()
179 .is_none_or(|(b_sn, _)| header.sequence_number > *b_sn);
180 if unseen && beats_current {
181 best = Some((header.sequence_number, sample));
182 }
183 self.alloc.mark_read(handle, self.reader_index)?;
186 }
187 if let Some((sn, _)) = best.as_ref() {
188 self.last_sn
189 .store(*sn, core::sync::atomic::Ordering::Relaxed);
190 }
191 Ok(best.map(|(_, t)| t))
192 }
193}
194
195pub struct FlatSampleRef<T: FlatStruct> {
198 sample: T,
199}
200
201impl<T: FlatStruct> FlatSampleRef<T> {
202 #[must_use]
204 pub fn new(sample: T) -> Self {
205 Self { sample }
206 }
207
208 #[must_use]
210 pub fn into_inner(self) -> T {
211 self.sample
212 }
213}
214
215impl<T: FlatStruct> Deref for FlatSampleRef<T> {
216 type Target = T;
217 fn deref(&self) -> &T {
218 &self.sample
219 }
220}
221
222#[cfg(test)]
223#[allow(clippy::expect_used, clippy::unwrap_used)]
224mod tests {
225 use super::*;
226
227 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
228 #[repr(C)]
229 struct Pose {
230 x: i64,
231 y: i64,
232 z: i64,
233 }
234
235 unsafe impl FlatStruct for Pose {
237 const TYPE_HASH: [u8; 16] = [0x42; 16];
238 }
239
240 fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
241 Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
242 }
243
244 #[test]
245 fn writer_write_then_reader_read() {
246 let alloc = fresh_alloc(4);
247 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
249 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
250
251 let p = Pose { x: 1, y: 2, z: 3 };
252 let _sn = writer.write(&p).expect("write");
253
254 let got = reader.read().expect("read").expect("some");
255 assert_eq!(got, p);
256 }
257
258 #[test]
259 fn reader_does_not_re_read_same_slot() {
260 let alloc = fresh_alloc(4);
261 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
262 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
263
264 writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
265 let _ = reader.read().expect("first read").expect("some");
266 let second = reader.read().expect("second read");
268 assert!(second.is_none());
269 }
270
271 #[test]
272 fn writer_loan_commit_pattern() {
273 let alloc = fresh_alloc(2);
274 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
275 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
276
277 let slot = writer.loan_slot().expect("loan");
278 let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
279
280 let got = reader.read().expect("read").expect("some");
281 assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
282 }
283
284 #[test]
285 fn loan_drop_without_commit_releases_slot() {
286 let alloc = fresh_alloc(1);
287 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
288
289 {
290 let _slot = writer.loan_slot().expect("loan");
291 }
293
294 let _ = writer.loan_slot().expect("re-loan after drop");
296 }
297
298 #[test]
299 fn reader_recycles_slot_after_read() {
300 let alloc = fresh_alloc(1);
301 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
302 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
303
304 writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
306 let _ = reader.read().expect("r1").expect("some");
307
308 writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
311 let got = reader.read().expect("r2").expect("some");
312 assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
313 }
314
315 #[test]
316 fn flat_sample_ref_deref() {
317 let p = Pose { x: 1, y: 2, z: 3 };
318 let r = FlatSampleRef::new(p);
319 assert_eq!(r.x, 1);
320 assert_eq!(r.into_inner(), p);
321 }
322
323 #[test]
324 fn reader_rejects_type_hash_mismatch() {
325 let wrong_hash = [0xBB; 16];
328 let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
329 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
330 let res = reader.read();
331 assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
332 }
333
334 #[test]
335 fn reader_accepts_matching_type_hash() {
336 let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
339 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
340 let res = reader.read().expect("no schema drift");
341 assert!(res.is_none());
342 }
343
344 #[test]
345 fn reader_without_backend_hash_does_not_reject() {
346 let alloc = fresh_alloc(4);
349 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
350 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
351 writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
352 let got = reader.read().expect("read").expect("some");
353 assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
354 }
355}