Skip to main content

zerodds_flatdata/
pubsub.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! FlatWriter + FlatReader — high-level API over the SlotAllocator.
4//!
5//! Spec: zerodds-flatdata-1.0 §8 + §9.
6
7use alloc::sync::Arc;
8use alloc::vec::Vec;
9use core::marker::PhantomData;
10use core::ops::Deref;
11use core::time::Duration;
12
13/// Reliability policy for [`FlatWriter::write_bp`] under slot pressure
14/// (Spec §10.5). `Reliable` blocks (event-driven) until a slot frees or the
15/// deadline passes; `BestEffort` drops the sample immediately.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum Reliability {
18    /// Block until a slot is free (or timeout) — no sample loss within the
19    /// deadline.
20    Reliable,
21    /// Drop the sample if no slot is free right now.
22    BestEffort,
23}
24
25/// Outcome of a backpressure-aware write ([`FlatWriter::write_bp`]).
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum WriteOutcome {
28    /// The sample was written; carries its sequence number.
29    Written(u32),
30    /// `BestEffort` with no free slot: the sample was dropped.
31    Dropped,
32    /// `Reliable` timed out waiting for a free slot.
33    TimedOut,
34}
35
36use crate::FlatStruct;
37use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
38use crate::backend::SlotBackend;
39use crate::slot::ReaderMask;
40
41/// Writes FlatStruct samples directly into SHM slots — without CDR encoding.
42pub struct FlatWriter<T: FlatStruct> {
43    alloc: Arc<InMemorySlotAllocator>,
44    active_readers_mask: ReaderMask,
45    _t: PhantomData<fn() -> T>,
46}
47
48impl<T: FlatStruct> FlatWriter<T> {
49    /// Creates a writer over an allocator. `active_readers_mask`
50    /// lists the reader bits that must all have read before
51    /// a slot can be reused.
52    pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
53        Self {
54            alloc,
55            active_readers_mask,
56            _t: PhantomData,
57        }
58    }
59
60    /// Spec §8.1 write_flat — reserve + memcpy + commit in a single call.
61    ///
62    /// # Errors
63    /// `SlotError::NoFreeSlot` under resource pressure;
64    /// `SampleTooLarge` when the slot is smaller than T::WIRE_SIZE.
65    pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
66        let bytes = sample.as_bytes();
67        let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
68        match self.alloc.commit_slot(handle, bytes) {
69            Ok(sn) => Ok(sn),
70            Err(e) => {
71                let _ = self.alloc.discard_slot(handle);
72                Err(e)
73            }
74        }
75    }
76
77    /// Spec §10.5 backpressure-aware write. On `NoFreeSlot`: `BestEffort`
78    /// drops the sample (`WriteOutcome::Dropped`); `Reliable` **blocks**
79    /// event-driven on the allocator's notify until a slot frees or `timeout`
80    /// elapses (`WriteOutcome::TimedOut`) — no busy-poll.
81    ///
82    /// # Errors
83    /// Non-`NoFreeSlot` slot errors (e.g. `SampleTooLarge`, lock poison).
84    pub fn write_bp(
85        &self,
86        sample: &T,
87        reliability: Reliability,
88        timeout: Duration,
89    ) -> Result<WriteOutcome, SlotError> {
90        let deadline = std::time::Instant::now() + timeout;
91        loop {
92            match self.write(sample) {
93                Ok(sn) => return Ok(WriteOutcome::Written(sn)),
94                Err(SlotError::NoFreeSlot) => {}
95                Err(e) => return Err(e),
96            }
97            if reliability == Reliability::BestEffort {
98                return Ok(WriteOutcome::Dropped);
99            }
100            let now = std::time::Instant::now();
101            if now >= deadline {
102                return Ok(WriteOutcome::TimedOut);
103            }
104            // Capture the notify generation, re-try once (a slot may have freed
105            // between the failed write and here), then block until a change.
106            let g = self.alloc.notify_gen();
107            match self.write(sample) {
108                Ok(sn) => return Ok(WriteOutcome::Written(sn)),
109                Err(SlotError::NoFreeSlot) => {}
110                Err(e) => return Err(e),
111            }
112            self.alloc.wait_for_change(g, deadline - now);
113        }
114    }
115
116    /// Spec §8.2 loan_slot — lower level: explicit slot borrow.
117    /// The caller writes directly into the loaned `&mut T` buffer and
118    /// commits.
119    ///
120    /// # Errors
121    /// `NoFreeSlot`.
122    pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
123        let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
124        Ok(FlatSlot {
125            handle,
126            writer: self,
127            committed: false,
128        })
129    }
130}
131
132/// Loaned slot. The caller sets the sample via `write()` and then
133/// calls `commit()`. Dropping without a commit discards the loan.
134pub struct FlatSlot<'a, T: FlatStruct> {
135    handle: SlotHandle,
136    writer: &'a FlatWriter<T>,
137    committed: bool,
138}
139
140impl<T: FlatStruct> FlatSlot<'_, T> {
141    /// Writes the sample into the slot.
142    ///
143    /// # Errors
144    /// As for `commit_slot`.
145    pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
146        let bytes = sample.as_bytes();
147        let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
148        self.committed = true;
149        Ok(sn)
150    }
151
152    /// True zero-copy (Spec §8.2): a `&mut T` view **directly onto the SHM
153    /// slot**. The slot's `WIRE_SIZE` bytes are zeroed first (so the all-zero
154    /// bit pattern is a valid `T` — `bool` fields are `false`, etc.); the
155    /// caller fills the fields in place, then calls [`Self::commit_in_place`].
156    /// No staging buffer and no copy on commit.
157    ///
158    /// # Errors
159    /// `NoFreeSlot`/`OutOfBounds`/`SampleTooLarge` from the backend.
160    pub fn as_mut(&mut self) -> Result<&mut T, SlotError> {
161        let (ptr, cap) = self.writer.alloc.slot_data_ptr(self.handle)?;
162        if cap < T::WIRE_SIZE {
163            return Err(SlotError::SampleTooLarge {
164                sample: T::WIRE_SIZE,
165                slot_capacity: cap,
166            });
167        }
168        // SAFETY: the slot is exclusively reserved (loaned) for this `FlatSlot`,
169        // `ptr` is the slot data area with `cap >= WIRE_SIZE` bytes, and `T` is a
170        // `repr(C)` `Copy` POD (FlatStruct contract). Zeroing makes the all-zero
171        // bit pattern a valid `T` before forming the reference.
172        unsafe {
173            core::ptr::write_bytes(ptr, 0, T::WIRE_SIZE);
174            Ok(&mut *ptr.cast::<T>())
175        }
176    }
177
178    /// Commits a slot whose `T` was written in place via [`Self::as_mut`] —
179    /// no copy (counterpart to [`Self::commit`]). Returns the SN.
180    ///
181    /// # Errors
182    /// As for `commit_in_place`.
183    pub fn commit_in_place(mut self) -> Result<u32, SlotError> {
184        let sn = self
185            .writer
186            .alloc
187            .commit_in_place(self.handle, T::WIRE_SIZE)?;
188        self.committed = true;
189        Ok(sn)
190    }
191}
192
193impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
194    fn drop(&mut self) {
195        if !self.committed {
196            let _ = self.writer.alloc.discard_slot(self.handle);
197        }
198    }
199}
200
201/// Reads FlatStruct samples directly from SHM slots.
202pub struct FlatReader<T: FlatStruct> {
203    alloc: Arc<InMemorySlotAllocator>,
204    /// Which bit in reader_mask belongs to this reader.
205    reader_index: u8,
206    /// Last read sequence number (prevents duplicates).
207    last_sn: core::sync::atomic::AtomicU32,
208    /// Type hash of the topic — for the version check.
209    expected_type_hash: [u8; 16],
210    _t: PhantomData<fn() -> T>,
211}
212
213impl<T: FlatStruct> FlatReader<T> {
214    /// Creates a reader over an allocator.
215    pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
216        Self {
217            alloc,
218            reader_index,
219            last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
220            expected_type_hash: T::TYPE_HASH,
221            _t: PhantomData,
222        }
223    }
224
225    /// Returns the type hash — for discovery matching.
226    #[must_use]
227    pub fn type_hash(&self) -> [u8; 16] {
228        self.expected_type_hash
229    }
230
231    /// Spec §9.1 read_flat. Returns the **newest** unread sample.
232    /// Automatically sets this reader's bit in reader_mask.
233    ///
234    /// Spec §6.1 type-hash cross-validation: before reading a slot,
235    /// `T::TYPE_HASH` is checked against the hash stored on the
236    /// SlotBackend. On mismatch: no slots are dereferenced,
237    /// `Err(SlotError::SampleTooLarge)` with a schema-drift indication.
238    ///
239    /// # Errors
240    /// - `SampleTooLarge` on TYPE_HASH mismatch (Spec §6.1).
241    /// - Wire/layout errors or slot-lock poison as usual.
242    pub fn read(&self) -> Result<Option<T>, SlotError> {
243        // Copy-out variant: the reader bit is set on every scanned slot
244        // (incl. the delivered one), so the slot can be recycled immediately.
245        Ok(self.scan_best(false)?.map(|(_, _, t)| t))
246    }
247
248    /// Spec §9.2/§9.3 read_flat (reference variant). Returns the newest
249    /// unread sample wrapped in a [`FlatSampleRef`] whose `Drop` sets this
250    /// reader's bit — i.e. the delivered slot stays **un-recyclable** for the
251    /// lifetime of the returned reference (zero-copy lifetime binding), and is
252    /// released when the reference drops. All other scanned slots are marked
253    /// read immediately, as in [`Self::read`].
254    ///
255    /// # Errors
256    /// As [`Self::read`].
257    pub fn read_ref(&self) -> Result<Option<FlatSampleRef<T>>, SlotError> {
258        match self.scan_best(true)? {
259            Some((handle, _, sample)) => {
260                let concrete = Arc::clone(&self.alloc);
261                let backend: Arc<dyn SlotBackend> = concrete;
262                Ok(Some(FlatSampleRef::with_release(
263                    sample,
264                    backend,
265                    handle,
266                    self.reader_index,
267                )))
268            }
269            None => Ok(None),
270        }
271    }
272
273    /// Spec §4.2 event-driven read. Returns the newest unread sample, blocking
274    /// on the allocator's notify (NO busy-poll) until one arrives or `timeout`
275    /// elapses (then `Ok(None)`). The writer's `commit_slot` bumps the notify
276    /// generation, waking this reader.
277    ///
278    /// # Errors
279    /// As [`Self::read`].
280    pub fn read_blocking(&self, timeout: Duration) -> Result<Option<T>, SlotError> {
281        let deadline = std::time::Instant::now() + timeout;
282        loop {
283            if let Some(sample) = self.read()? {
284                return Ok(Some(sample));
285            }
286            let now = std::time::Instant::now();
287            if now >= deadline {
288                return Ok(None);
289            }
290            // Capture gen, re-check (a sample may have landed), then block until
291            // a change — lost-wakeup-free.
292            let g = self.alloc.notify_gen();
293            if let Some(sample) = self.read()? {
294                return Ok(Some(sample));
295            }
296            self.alloc.wait_for_change(g, deadline - now);
297        }
298    }
299
300    /// Shared scan for [`Self::read`] / [`Self::read_ref`]: finds the newest
301    /// unread sample and marks this reader's bit on every scanned unread slot.
302    /// When `defer_best` is true the delivered ("best") slot is **not** marked
303    /// here — the caller defers that to [`FlatSampleRef`]'s `Drop`.
304    fn scan_best(&self, defer_best: bool) -> Result<Option<(SlotHandle, u32, T)>, SlotError> {
305        // Spec §6.1: type-hash cross-validation. If the backend provides a
306        // hash, it must match `T::TYPE_HASH`; otherwise it is schema drift and
307        // we reject the read without dereferencing any slot.
308        if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
309            if backend_hash != self.expected_type_hash {
310                return Err(SlotError::SampleTooLarge {
311                    sample: 0,
312                    slot_capacity: 0,
313                });
314            }
315        }
316        let count = self.alloc.slot_count()?;
317        let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
318        let mut best: Option<(SlotHandle, u32, T)> = None;
319        // Slots scanned-and-unread; all get the reader bit (except the deferred
320        // best one). Collected first so the best is known before marking.
321        let mut to_mark: Vec<SlotHandle> = Vec::new();
322        for idx in 0..count {
323            let handle = SlotHandle {
324                segment_id: 0,
325                slot_index: idx as u32,
326            };
327            let (header, bytes) = self.alloc.read_slot(handle)?;
328            if header.sample_size == 0 {
329                continue; // unused
330            }
331            if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
332                continue; // already read
333            }
334            if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
335                continue; // too short
336            }
337            // SAFETY: WIRE_SIZE checked above + TYPE_HASH match above guards
338            // against schema drift (Spec §6.1).
339            let sample = unsafe { T::from_bytes_unchecked(&bytes) };
340            to_mark.push(handle);
341            let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
342            let beats_current = best
343                .as_ref()
344                .is_none_or(|(_, b_sn, _)| header.sequence_number > *b_sn);
345            if unseen && beats_current {
346                best = Some((handle, header.sequence_number, sample));
347            }
348        }
349        let best_handle = best.as_ref().map(|(h, _, _)| *h);
350        for handle in to_mark {
351            if defer_best && Some(handle) == best_handle {
352                continue; // released later by FlatSampleRef::Drop
353            }
354            self.alloc.mark_read(handle, self.reader_index)?;
355        }
356        if let Some((_, sn, _)) = best.as_ref() {
357            self.last_sn
358                .store(*sn, core::sync::atomic::Ordering::Relaxed);
359        }
360        Ok(best)
361    }
362}
363
364/// Deferred slot release carried by a [`FlatSampleRef`]: setting this reader's
365/// bit on the delivered slot when the reference drops, so the slot stays
366/// un-recyclable for the reference's lifetime. Holds the backend as a trait
367/// object so the same reference works over the in-memory and the POSIX/DCPS
368/// (`Arc<dyn SlotBackend>`) backends alike.
369struct DeferredRelease {
370    backend: Arc<dyn SlotBackend>,
371    handle: SlotHandle,
372    reader_index: u8,
373}
374
375/// Reference sample that holds its source slot for the duration of its
376/// lifetime and sets this reader's bit on `Drop` (releasing the slot for
377/// recycling). Spec §9.2/§9.3. Returned by [`FlatReader::read_ref`].
378///
379/// While a `FlatSampleRef` is alive the writer cannot reuse its slot — that is
380/// the zero-copy lifetime guarantee. Dropping it (or calling
381/// [`Self::into_inner`]) releases the slot.
382pub struct FlatSampleRef<T: FlatStruct> {
383    sample: T,
384    release: Option<DeferredRelease>,
385}
386
387impl<T: FlatStruct> FlatSampleRef<T> {
388    /// Wraps a read sample without a deferred release (plain owned copy).
389    #[must_use]
390    pub fn new(sample: T) -> Self {
391        Self {
392            sample,
393            release: None,
394        }
395    }
396
397    /// Wraps a sample together with the slot to release on `Drop`. The backend
398    /// is taken as `Arc<dyn SlotBackend>` so callers over either backend can
399    /// build the reference.
400    #[must_use]
401    pub(crate) fn with_release(
402        sample: T,
403        backend: Arc<dyn SlotBackend>,
404        handle: SlotHandle,
405        reader_index: u8,
406    ) -> Self {
407        Self {
408            sample,
409            release: Some(DeferredRelease {
410                backend,
411                handle,
412                reader_index,
413            }),
414        }
415    }
416
417    /// Consumes the wrapper and returns the (copied) sample. The slot is
418    /// released here (the wrapper's `Drop` runs as it goes out of scope).
419    #[must_use]
420    pub fn into_inner(self) -> T {
421        // `T: FlatStruct: Copy`, so this copies the value out; `self` is then
422        // dropped, running the deferred slot release.
423        self.sample
424    }
425}
426
427impl<T: FlatStruct> Deref for FlatSampleRef<T> {
428    type Target = T;
429    fn deref(&self) -> &T {
430        &self.sample
431    }
432}
433
434impl<T: FlatStruct> Drop for FlatSampleRef<T> {
435    fn drop(&mut self) {
436        if let Some(r) = &self.release {
437            // Release the slot: set this reader's bit so it can be recycled.
438            let _ = r.backend.mark_read(r.handle, r.reader_index);
439        }
440    }
441}
442
443#[cfg(test)]
444#[allow(clippy::expect_used, clippy::unwrap_used)]
445mod tests {
446    use super::*;
447
448    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
449    #[repr(C)]
450    struct Pose {
451        x: i64,
452        y: i64,
453        z: i64,
454    }
455
456    // SAFETY: repr(C) + Copy + 'static, all fields are primitive.
457    unsafe impl FlatStruct for Pose {
458        const TYPE_HASH: [u8; 16] = [0x42; 16];
459    }
460
461    fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
462        Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
463    }
464
465    #[test]
466    fn writer_write_then_reader_read() {
467        let alloc = fresh_alloc(4);
468        // 1 active reader (bit 0).
469        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
470        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
471
472        let p = Pose { x: 1, y: 2, z: 3 };
473        let _sn = writer.write(&p).expect("write");
474
475        let got = reader.read().expect("read").expect("some");
476        assert_eq!(got, p);
477    }
478
479    #[test]
480    fn reader_does_not_re_read_same_slot() {
481        let alloc = fresh_alloc(4);
482        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
483        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
484
485        writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
486        let _ = reader.read().expect("first read").expect("some");
487        // Second read without another write → None.
488        let second = reader.read().expect("second read");
489        assert!(second.is_none());
490    }
491
492    #[test]
493    fn writer_loan_commit_pattern() {
494        let alloc = fresh_alloc(2);
495        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
496        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
497
498        let slot = writer.loan_slot().expect("loan");
499        let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
500
501        let got = reader.read().expect("read").expect("some");
502        assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
503    }
504
505    #[test]
506    fn writer_loan_in_place_zero_copy() {
507        // True zero-copy: fill the `&mut Pose` view directly in the slot,
508        // commit_in_place (no staging copy), reader gets the value.
509        let alloc = fresh_alloc(2);
510        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
511        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
512
513        let mut slot = writer.loan_slot().expect("loan");
514        {
515            let p = slot.as_mut().expect("in-place view");
516            p.x = 11;
517            p.y = 22;
518            p.z = 33;
519        }
520        let _sn = slot.commit_in_place().expect("commit_in_place");
521
522        let got = reader.read().expect("read").expect("some");
523        assert_eq!(
524            got,
525            Pose {
526                x: 11,
527                y: 22,
528                z: 33
529            }
530        );
531    }
532
533    #[test]
534    fn loan_drop_without_commit_releases_slot() {
535        let alloc = fresh_alloc(1);
536        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
537
538        {
539            let _slot = writer.loan_slot().expect("loan");
540            // Drop without commit.
541        }
542
543        // Slot is free again.
544        let _ = writer.loan_slot().expect("re-loan after drop");
545    }
546
547    #[test]
548    fn reader_recycles_slot_after_read() {
549        let alloc = fresh_alloc(1);
550        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
551        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
552
553        // First sample.
554        writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
555        let _ = reader.read().expect("r1").expect("some");
556
557        // Second sample — the slot must be reusable
558        // (the reader bit is set).
559        writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
560        let got = reader.read().expect("r2").expect("some");
561        assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
562    }
563
564    #[test]
565    fn flat_sample_ref_deref() {
566        let p = Pose { x: 1, y: 2, z: 3 };
567        let r = FlatSampleRef::new(p);
568        assert_eq!(r.x, 1);
569        assert_eq!(r.into_inner(), p);
570    }
571
572    #[test]
573    fn read_ref_holds_slot_until_drop() {
574        // Spec §9.2/§9.3: read_ref defers the reader bit to FlatSampleRef::Drop,
575        // so a single-slot segment stays un-recyclable while the ref is alive.
576        let alloc = fresh_alloc(1);
577        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
578        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
579
580        writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
581        let sref = reader.read_ref().expect("read_ref").expect("some");
582        assert_eq!(sref.x, 1);
583        assert_eq!(sref.z, 3);
584
585        // While the ref lives, the only slot is held → writer cannot reserve.
586        assert!(matches!(
587            writer.write(&Pose { x: 9, y: 9, z: 9 }),
588            Err(SlotError::NoFreeSlot)
589        ));
590
591        // Dropping the ref sets the reader bit → slot recyclable.
592        drop(sref);
593        writer
594            .write(&Pose { x: 4, y: 5, z: 6 })
595            .expect("write after ref drop");
596        let got = reader.read().expect("read").expect("some");
597        assert_eq!(got, Pose { x: 4, y: 5, z: 6 });
598    }
599
600    #[test]
601    fn read_ref_into_inner_releases_slot() {
602        // into_inner consumes the ref → its Drop runs → slot released.
603        let alloc = fresh_alloc(1);
604        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
605        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
606
607        writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("write");
608        let sref = reader.read_ref().expect("read_ref").expect("some");
609        let owned = sref.into_inner();
610        assert_eq!(owned, Pose { x: 1, y: 1, z: 1 });
611        // Slot released by into_inner's Drop.
612        writer
613            .write(&Pose { x: 2, y: 2, z: 2 })
614            .expect("write after into_inner");
615    }
616
617    #[test]
618    fn reader_rejects_type_hash_mismatch() {
619        // Spec §6.1: the reader checks `T::TYPE_HASH` against the
620        // hash stored on the backend; a mismatch → schema-drift reject.
621        let wrong_hash = [0xBB; 16];
622        let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
623        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
624        let res = reader.read();
625        assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
626    }
627
628    #[test]
629    fn reader_accepts_matching_type_hash() {
630        // Spec §6.1: with a correct hash on the backend → no reject;
631        // with no sample → Ok(None).
632        let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
633        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
634        let res = reader.read().expect("no schema drift");
635        assert!(res.is_none());
636    }
637
638    #[test]
639    fn reader_without_backend_hash_does_not_reject() {
640        // Default allocator without `with_type_hash` → no validation,
641        // the reader reads normally.
642        let alloc = fresh_alloc(4);
643        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
644        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
645        writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
646        let got = reader.read().expect("read").expect("some");
647        assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
648    }
649
650    #[test]
651    fn read_blocking_wakes_on_commit() {
652        // Spec §4.2: read_blocking parks on the notify and is woken by the
653        // writer's commit — no busy-poll, returns well before the 5s timeout.
654        use std::time::Duration;
655        let alloc = fresh_alloc(4);
656        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
657        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
658
659        let w_alloc = Arc::clone(&alloc);
660        let h = std::thread::spawn(move || {
661            std::thread::sleep(Duration::from_millis(50));
662            FlatWriter::<Pose>::new(w_alloc, 0b1)
663                .write(&Pose { x: 7, y: 8, z: 9 })
664                .expect("write");
665        });
666        let _ = writer; // writer used only to pin the active-mask in scope
667
668        let start = std::time::Instant::now();
669        let got = reader
670            .read_blocking(Duration::from_secs(5))
671            .expect("read_blocking")
672            .expect("woken with a sample");
673        assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
674        assert!(
675            start.elapsed() < Duration::from_secs(2),
676            "should wake on notify, not spin to timeout"
677        );
678        h.join().unwrap();
679    }
680
681    #[test]
682    fn read_blocking_times_out_without_writer() {
683        use std::time::Duration;
684        let alloc = fresh_alloc(2);
685        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
686        let start = std::time::Instant::now();
687        let got = reader.read_blocking(Duration::from_millis(60)).expect("rb");
688        assert!(got.is_none());
689        assert!(start.elapsed() >= Duration::from_millis(50));
690    }
691
692    #[test]
693    fn write_bp_best_effort_drops_when_full() {
694        use std::time::Duration;
695        let alloc = fresh_alloc(1);
696        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
697        // Fill the only slot.
698        assert!(matches!(
699            writer
700                .write_bp(
701                    &Pose { x: 1, y: 1, z: 1 },
702                    Reliability::BestEffort,
703                    Duration::ZERO
704                )
705                .unwrap(),
706            WriteOutcome::Written(_)
707        ));
708        // Next BestEffort write drops (no free slot, no reader).
709        assert_eq!(
710            writer
711                .write_bp(
712                    &Pose { x: 2, y: 2, z: 2 },
713                    Reliability::BestEffort,
714                    Duration::ZERO
715                )
716                .unwrap(),
717            WriteOutcome::Dropped
718        );
719    }
720
721    #[test]
722    fn write_bp_reliable_blocks_until_reader_frees() {
723        // Spec §10.5: Reliable parks until a reader consumes the slot.
724        use std::time::Duration;
725        let alloc = fresh_alloc(1);
726        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
727        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
728        writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
729
730        let r_alloc = Arc::clone(&alloc);
731        let h = std::thread::spawn(move || {
732            std::thread::sleep(Duration::from_millis(50));
733            let r = FlatReader::<Pose>::new(r_alloc, 0);
734            r.read().expect("read").expect("some"); // frees the slot
735        });
736        let _ = reader;
737
738        let start = std::time::Instant::now();
739        let outcome = writer
740            .write_bp(
741                &Pose { x: 2, y: 2, z: 2 },
742                Reliability::Reliable,
743                Duration::from_secs(5),
744            )
745            .expect("write_bp");
746        assert!(matches!(outcome, WriteOutcome::Written(_)));
747        assert!(start.elapsed() < Duration::from_secs(2));
748        h.join().unwrap();
749    }
750
751    #[test]
752    fn write_bp_reliable_times_out() {
753        use std::time::Duration;
754        let alloc = fresh_alloc(1);
755        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
756        writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("fill");
757        let start = std::time::Instant::now();
758        let outcome = writer
759            .write_bp(
760                &Pose { x: 2, y: 2, z: 2 },
761                Reliability::Reliable,
762                Duration::from_millis(60),
763            )
764            .expect("write_bp");
765        assert_eq!(outcome, WriteOutcome::TimedOut);
766        assert!(start.elapsed() >= Duration::from_millis(50));
767    }
768}