1use alloc::sync::Arc;
8use alloc::vec::Vec;
9use core::marker::PhantomData;
10use core::ops::Deref;
11use core::time::Duration;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum Reliability {
18 Reliable,
21 BestEffort,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum WriteOutcome {
28 Written(u32),
30 Dropped,
32 TimedOut,
34}
35
36use crate::FlatStruct;
37use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
38use crate::backend::SlotBackend;
39use crate::slot::ReaderMask;
40
41pub struct FlatWriter<T: FlatStruct> {
43 alloc: Arc<InMemorySlotAllocator>,
44 active_readers_mask: ReaderMask,
45 _t: PhantomData<fn() -> T>,
46}
47
48impl<T: FlatStruct> FlatWriter<T> {
49 pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
53 Self {
54 alloc,
55 active_readers_mask,
56 _t: PhantomData,
57 }
58 }
59
60 pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
66 let bytes = sample.as_bytes();
67 let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
68 match self.alloc.commit_slot(handle, bytes) {
69 Ok(sn) => Ok(sn),
70 Err(e) => {
71 let _ = self.alloc.discard_slot(handle);
72 Err(e)
73 }
74 }
75 }
76
77 pub fn write_bp(
85 &self,
86 sample: &T,
87 reliability: Reliability,
88 timeout: Duration,
89 ) -> Result<WriteOutcome, SlotError> {
90 let deadline = std::time::Instant::now() + timeout;
91 loop {
92 match self.write(sample) {
93 Ok(sn) => return Ok(WriteOutcome::Written(sn)),
94 Err(SlotError::NoFreeSlot) => {}
95 Err(e) => return Err(e),
96 }
97 if reliability == Reliability::BestEffort {
98 return Ok(WriteOutcome::Dropped);
99 }
100 let now = std::time::Instant::now();
101 if now >= deadline {
102 return Ok(WriteOutcome::TimedOut);
103 }
104 let g = self.alloc.notify_gen();
107 match self.write(sample) {
108 Ok(sn) => return Ok(WriteOutcome::Written(sn)),
109 Err(SlotError::NoFreeSlot) => {}
110 Err(e) => return Err(e),
111 }
112 self.alloc.wait_for_change(g, deadline - now);
113 }
114 }
115
116 pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
123 let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
124 Ok(FlatSlot {
125 handle,
126 writer: self,
127 committed: false,
128 })
129 }
130}
131
132pub struct FlatSlot<'a, T: FlatStruct> {
135 handle: SlotHandle,
136 writer: &'a FlatWriter<T>,
137 committed: bool,
138}
139
140impl<T: FlatStruct> FlatSlot<'_, T> {
141 pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
146 let bytes = sample.as_bytes();
147 let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
148 self.committed = true;
149 Ok(sn)
150 }
151
152 pub fn as_mut(&mut self) -> Result<&mut T, SlotError> {
161 let (ptr, cap) = self.writer.alloc.slot_data_ptr(self.handle)?;
162 if cap < T::WIRE_SIZE {
163 return Err(SlotError::SampleTooLarge {
164 sample: T::WIRE_SIZE,
165 slot_capacity: cap,
166 });
167 }
168 unsafe {
173 core::ptr::write_bytes(ptr, 0, T::WIRE_SIZE);
174 Ok(&mut *ptr.cast::<T>())
175 }
176 }
177
178 pub fn commit_in_place(mut self) -> Result<u32, SlotError> {
184 let sn = self
185 .writer
186 .alloc
187 .commit_in_place(self.handle, T::WIRE_SIZE)?;
188 self.committed = true;
189 Ok(sn)
190 }
191}
192
193impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
194 fn drop(&mut self) {
195 if !self.committed {
196 let _ = self.writer.alloc.discard_slot(self.handle);
197 }
198 }
199}
200
201pub struct FlatReader<T: FlatStruct> {
203 alloc: Arc<InMemorySlotAllocator>,
204 reader_index: u8,
206 last_sn: core::sync::atomic::AtomicU32,
208 expected_type_hash: [u8; 16],
210 _t: PhantomData<fn() -> T>,
211}
212
213impl<T: FlatStruct> FlatReader<T> {
214 pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
216 Self {
217 alloc,
218 reader_index,
219 last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
220 expected_type_hash: T::TYPE_HASH,
221 _t: PhantomData,
222 }
223 }
224
225 #[must_use]
227 pub fn type_hash(&self) -> [u8; 16] {
228 self.expected_type_hash
229 }
230
231 pub fn read(&self) -> Result<Option<T>, SlotError> {
243 Ok(self.scan_best(false)?.map(|(_, _, t)| t))
246 }
247
248 pub fn read_ref(&self) -> Result<Option<FlatSampleRef<T>>, SlotError> {
258 match self.scan_best(true)? {
259 Some((handle, _, sample)) => {
260 let concrete = Arc::clone(&self.alloc);
261 let backend: Arc<dyn SlotBackend> = concrete;
262 Ok(Some(FlatSampleRef::with_release(
263 sample,
264 backend,
265 handle,
266 self.reader_index,
267 )))
268 }
269 None => Ok(None),
270 }
271 }
272
273 pub fn read_blocking(&self, timeout: Duration) -> Result<Option<T>, SlotError> {
281 let deadline = std::time::Instant::now() + timeout;
282 loop {
283 if let Some(sample) = self.read()? {
284 return Ok(Some(sample));
285 }
286 let now = std::time::Instant::now();
287 if now >= deadline {
288 return Ok(None);
289 }
290 let g = self.alloc.notify_gen();
293 if let Some(sample) = self.read()? {
294 return Ok(Some(sample));
295 }
296 self.alloc.wait_for_change(g, deadline - now);
297 }
298 }
299
300 fn scan_best(&self, defer_best: bool) -> Result<Option<(SlotHandle, u32, T)>, SlotError> {
305 if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
309 if backend_hash != self.expected_type_hash {
310 return Err(SlotError::SampleTooLarge {
311 sample: 0,
312 slot_capacity: 0,
313 });
314 }
315 }
316 let count = self.alloc.slot_count()?;
317 let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
318 let mut best: Option<(SlotHandle, u32, T)> = None;
319 let mut to_mark: Vec<SlotHandle> = Vec::new();
322 for idx in 0..count {
323 let handle = SlotHandle {
324 segment_id: 0,
325 slot_index: idx as u32,
326 };
327 let (header, bytes) = self.alloc.read_slot(handle)?;
328 if header.sample_size == 0 {
329 continue; }
331 if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
332 continue; }
334 if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
335 continue; }
337 let sample = unsafe { T::from_bytes_unchecked(&bytes) };
340 to_mark.push(handle);
341 let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
342 let beats_current = best
343 .as_ref()
344 .is_none_or(|(_, b_sn, _)| header.sequence_number > *b_sn);
345 if unseen && beats_current {
346 best = Some((handle, header.sequence_number, sample));
347 }
348 }
349 let best_handle = best.as_ref().map(|(h, _, _)| *h);
350 for handle in to_mark {
351 if defer_best && Some(handle) == best_handle {
352 continue; }
354 self.alloc.mark_read(handle, self.reader_index)?;
355 }
356 if let Some((_, sn, _)) = best.as_ref() {
357 self.last_sn
358 .store(*sn, core::sync::atomic::Ordering::Relaxed);
359 }
360 Ok(best)
361 }
362}
363
364struct DeferredRelease {
370 backend: Arc<dyn SlotBackend>,
371 handle: SlotHandle,
372 reader_index: u8,
373}
374
375pub struct FlatSampleRef<T: FlatStruct> {
383 sample: T,
384 release: Option<DeferredRelease>,
385}
386
387impl<T: FlatStruct> FlatSampleRef<T> {
388 #[must_use]
390 pub fn new(sample: T) -> Self {
391 Self {
392 sample,
393 release: None,
394 }
395 }
396
397 #[must_use]
401 pub(crate) fn with_release(
402 sample: T,
403 backend: Arc<dyn SlotBackend>,
404 handle: SlotHandle,
405 reader_index: u8,
406 ) -> Self {
407 Self {
408 sample,
409 release: Some(DeferredRelease {
410 backend,
411 handle,
412 reader_index,
413 }),
414 }
415 }
416
417 #[must_use]
420 pub fn into_inner(self) -> T {
421 self.sample
424 }
425}
426
427impl<T: FlatStruct> Deref for FlatSampleRef<T> {
428 type Target = T;
429 fn deref(&self) -> &T {
430 &self.sample
431 }
432}
433
434impl<T: FlatStruct> Drop for FlatSampleRef<T> {
435 fn drop(&mut self) {
436 if let Some(r) = &self.release {
437 let _ = r.backend.mark_read(r.handle, r.reader_index);
439 }
440 }
441}
442
443#[cfg(test)]
444#[allow(clippy::expect_used, clippy::unwrap_used)]
445mod tests {
446 use super::*;
447
448 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
449 #[repr(C)]
450 struct Pose {
451 x: i64,
452 y: i64,
453 z: i64,
454 }
455
456 unsafe impl FlatStruct for Pose {
458 const TYPE_HASH: [u8; 16] = [0x42; 16];
459 }
460
461 fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
462 Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
463 }
464
465 #[test]
466 fn writer_write_then_reader_read() {
467 let alloc = fresh_alloc(4);
468 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
470 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
471
472 let p = Pose { x: 1, y: 2, z: 3 };
473 let _sn = writer.write(&p).expect("write");
474
475 let got = reader.read().expect("read").expect("some");
476 assert_eq!(got, p);
477 }
478
479 #[test]
480 fn reader_does_not_re_read_same_slot() {
481 let alloc = fresh_alloc(4);
482 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
483 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
484
485 writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
486 let _ = reader.read().expect("first read").expect("some");
487 let second = reader.read().expect("second read");
489 assert!(second.is_none());
490 }
491
492 #[test]
493 fn writer_loan_commit_pattern() {
494 let alloc = fresh_alloc(2);
495 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
496 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
497
498 let slot = writer.loan_slot().expect("loan");
499 let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
500
501 let got = reader.read().expect("read").expect("some");
502 assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
503 }
504
505 #[test]
506 fn writer_loan_in_place_zero_copy() {
507 let alloc = fresh_alloc(2);
510 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
511 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
512
513 let mut slot = writer.loan_slot().expect("loan");
514 {
515 let p = slot.as_mut().expect("in-place view");
516 p.x = 11;
517 p.y = 22;
518 p.z = 33;
519 }
520 let _sn = slot.commit_in_place().expect("commit_in_place");
521
522 let got = reader.read().expect("read").expect("some");
523 assert_eq!(
524 got,
525 Pose {
526 x: 11,
527 y: 22,
528 z: 33
529 }
530 );
531 }
532
533 #[test]
534 fn loan_drop_without_commit_releases_slot() {
535 let alloc = fresh_alloc(1);
536 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
537
538 {
539 let _slot = writer.loan_slot().expect("loan");
540 }
542
543 let _ = writer.loan_slot().expect("re-loan after drop");
545 }
546
547 #[test]
548 fn reader_recycles_slot_after_read() {
549 let alloc = fresh_alloc(1);
550 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
551 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
552
553 writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
555 let _ = reader.read().expect("r1").expect("some");
556
557 writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
560 let got = reader.read().expect("r2").expect("some");
561 assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
562 }
563
564 #[test]
565 fn flat_sample_ref_deref() {
566 let p = Pose { x: 1, y: 2, z: 3 };
567 let r = FlatSampleRef::new(p);
568 assert_eq!(r.x, 1);
569 assert_eq!(r.into_inner(), p);
570 }
571
572 #[test]
573 fn read_ref_holds_slot_until_drop() {
574 let alloc = fresh_alloc(1);
577 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
578 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
579
580 writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
581 let sref = reader.read_ref().expect("read_ref").expect("some");
582 assert_eq!(sref.x, 1);
583 assert_eq!(sref.z, 3);
584
585 assert!(matches!(
587 writer.write(&Pose { x: 9, y: 9, z: 9 }),
588 Err(SlotError::NoFreeSlot)
589 ));
590
591 drop(sref);
593 writer
594 .write(&Pose { x: 4, y: 5, z: 6 })
595 .expect("write after ref drop");
596 let got = reader.read().expect("read").expect("some");
597 assert_eq!(got, Pose { x: 4, y: 5, z: 6 });
598 }
599
600 #[test]
601 fn read_ref_into_inner_releases_slot() {
602 let alloc = fresh_alloc(1);
604 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
605 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
606
607 writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("write");
608 let sref = reader.read_ref().expect("read_ref").expect("some");
609 let owned = sref.into_inner();
610 assert_eq!(owned, Pose { x: 1, y: 1, z: 1 });
611 writer
613 .write(&Pose { x: 2, y: 2, z: 2 })
614 .expect("write after into_inner");
615 }
616
617 #[test]
618 fn reader_rejects_type_hash_mismatch() {
619 let wrong_hash = [0xBB; 16];
622 let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
623 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
624 let res = reader.read();
625 assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
626 }
627
628 #[test]
629 fn reader_accepts_matching_type_hash() {
630 let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
633 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
634 let res = reader.read().expect("no schema drift");
635 assert!(res.is_none());
636 }
637
638 #[test]
639 fn reader_without_backend_hash_does_not_reject() {
640 let alloc = fresh_alloc(4);
643 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
644 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
645 writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
646 let got = reader.read().expect("read").expect("some");
647 assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
648 }
649
650 #[test]
651 fn read_blocking_wakes_on_commit() {
652 use std::time::Duration;
655 let alloc = fresh_alloc(4);
656 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
657 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
658
659 let w_alloc = Arc::clone(&alloc);
660 let h = std::thread::spawn(move || {
661 std::thread::sleep(Duration::from_millis(50));
662 FlatWriter::<Pose>::new(w_alloc, 0b1)
663 .write(&Pose { x: 7, y: 8, z: 9 })
664 .expect("write");
665 });
666 let _ = writer; let start = std::time::Instant::now();
669 let got = reader
670 .read_blocking(Duration::from_secs(5))
671 .expect("read_blocking")
672 .expect("woken with a sample");
673 assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
674 assert!(
675 start.elapsed() < Duration::from_secs(2),
676 "should wake on notify, not spin to timeout"
677 );
678 h.join().unwrap();
679 }
680
681 #[test]
682 fn read_blocking_times_out_without_writer() {
683 use std::time::Duration;
684 let alloc = fresh_alloc(2);
685 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
686 let start = std::time::Instant::now();
687 let got = reader.read_blocking(Duration::from_millis(60)).expect("rb");
688 assert!(got.is_none());
689 assert!(start.elapsed() >= Duration::from_millis(50));
690 }
691
692 #[test]
693 fn write_bp_best_effort_drops_when_full() {
694 use std::time::Duration;
695 let alloc = fresh_alloc(1);
696 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
697 assert!(matches!(
699 writer
700 .write_bp(
701 &Pose { x: 1, y: 1, z: 1 },
702 Reliability::BestEffort,
703 Duration::ZERO
704 )
705 .unwrap(),
706 WriteOutcome::Written(_)
707 ));
708 assert_eq!(
710 writer
711 .write_bp(
712 &Pose { x: 2, y: 2, z: 2 },
713 Reliability::BestEffort,
714 Duration::ZERO
715 )
716 .unwrap(),
717 WriteOutcome::Dropped
718 );
719 }
720
721 #[test]
722 fn write_bp_reliable_blocks_until_reader_frees() {
723 use std::time::Duration;
725 let alloc = fresh_alloc(1);
726 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
727 let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
728 writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
729
730 let r_alloc = Arc::clone(&alloc);
731 let h = std::thread::spawn(move || {
732 std::thread::sleep(Duration::from_millis(50));
733 let r = FlatReader::<Pose>::new(r_alloc, 0);
734 r.read().expect("read").expect("some"); });
736 let _ = reader;
737
738 let start = std::time::Instant::now();
739 let outcome = writer
740 .write_bp(
741 &Pose { x: 2, y: 2, z: 2 },
742 Reliability::Reliable,
743 Duration::from_secs(5),
744 )
745 .expect("write_bp");
746 assert!(matches!(outcome, WriteOutcome::Written(_)));
747 assert!(start.elapsed() < Duration::from_secs(2));
748 h.join().unwrap();
749 }
750
751 #[test]
752 fn write_bp_reliable_times_out() {
753 use std::time::Duration;
754 let alloc = fresh_alloc(1);
755 let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
756 writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
757 let start = std::time::Instant::now();
758 let outcome = writer
759 .write_bp(
760 &Pose { x: 2, y: 2, z: 2 },
761 Reliability::Reliable,
762 Duration::from_millis(60),
763 )
764 .expect("write_bp");
765 assert_eq!(outcome, WriteOutcome::TimedOut);
766 assert!(start.elapsed() >= Duration::from_millis(50));
767 }
768}