Skip to main content

zerodds_flatdata/
allocator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! In-memory slot allocator as the reference implementation of the
4//! [`SlotBackend`](crate::SlotBackend) trait. Single-process
5//! same-host pub/sub without mmap overhead — for tests and as the
6//! default backend when `posix-mmap` is disabled. The
7//! cross-process mmap backend lives in `posix.rs` and implements
8//! the same trait API.
9//!
10//! Spec: zerodds-flatdata-1.0 §4.1, §5.1.
11
12use alloc::sync::Arc;
13use alloc::vec::Vec;
14
15#[cfg(feature = "std")]
16use std::sync::{Condvar, Mutex};
17
18use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
19
20/// Event-driven notify for the in-memory backend (Spec §4.2). A monotonic
21/// generation counter behind a `Condvar`: bumped on every state change a waiter
22/// could care about (sample committed → wakes blocked readers; slot freed →
23/// wakes blocked writers). Waiters re-check their own condition after waking
24/// (Condvar discipline), so a single counter serves both directions without
25/// busy-polling.
26#[cfg(feature = "std")]
27#[derive(Debug, Default)]
28struct ChangeNotify {
29    generation: Mutex<u64>,
30    cvar: Condvar,
31}
32
33#[cfg(feature = "std")]
34impl ChangeNotify {
35    fn current(&self) -> u64 {
36        self.generation.lock().map(|g| *g).unwrap_or(0)
37    }
38
39    fn bump(&self) {
40        if let Ok(mut g) = self.generation.lock() {
41            *g = g.wrapping_add(1);
42            self.cvar.notify_all();
43        }
44    }
45
46    /// Blocks until the generation differs from `last` or `timeout` elapses.
47    fn wait_change(&self, last: u64, timeout: core::time::Duration) {
48        if let Ok(g) = self.generation.lock() {
49            // `wait_timeout_while` re-checks the predicate, absorbing spurious
50            // wakeups; it returns when gen != last or the deadline passes.
51            let _ = self.cvar.wait_timeout_while(g, timeout, |g| *g == last);
52        }
53    }
54}
55
56/// Slot identification: (segment_id, slot_index).
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58pub struct SlotHandle {
59    /// Segment ID (FNV hash of the segment path).
60    pub segment_id: u64,
61    /// Slot index in [0, slot_count).
62    pub slot_index: u32,
63}
64
65/// Error during slot management.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum SlotError {
68    /// All slots are occupied — none free.
69    NoFreeSlot,
70    /// Slot index outside [0, slot_count).
71    OutOfBounds,
72    /// Sample does not fit in the slot size.
73    SampleTooLarge {
74        /// Sample bytes.
75        sample: usize,
76        /// Configured slot data area.
77        slot_capacity: usize,
78    },
79    /// Slot lock could not be acquired (poisoned).
80    LockPoisoned,
81    /// The backend does not support in-place loan
82    /// (`slot_data_ptr`/`commit_in_place`).
83    InPlaceUnsupported,
84}
85
86impl core::fmt::Display for SlotError {
87    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
88        match self {
89            Self::NoFreeSlot => f.write_str("no free slot in segment"),
90            Self::OutOfBounds => f.write_str("slot index out of bounds"),
91            Self::SampleTooLarge {
92                sample,
93                slot_capacity,
94            } => write!(
95                f,
96                "sample {sample} byte does not fit in slot capacity {slot_capacity}"
97            ),
98            Self::LockPoisoned => f.write_str("slot lock poisoned"),
99            Self::InPlaceUnsupported => f.write_str("backend does not support in-place loan"),
100        }
101    }
102}
103
104#[cfg(feature = "std")]
105impl std::error::Error for SlotError {}
106
107/// In-memory slot allocator. Provides the same
108/// [`SlotBackend`](crate::SlotBackend) interface as the
109/// POSIX mmap backend (`posix.rs`), but lives on the process heap —
110/// single-process pub/sub and test setups without an mmap dep.
111#[cfg(feature = "std")]
112pub struct InMemorySlotAllocator {
113    slots: Arc<Mutex<Vec<Slot>>>,
114    segment_id: u64,
115    slot_capacity: usize,
116    next_sn: Arc<core::sync::atomic::AtomicU32>,
117    /// Optional type-hash for cross-validation (spec §6.1).
118    type_hash: Option<[u8; 16]>,
119    /// Event-driven notify (Spec §4.2): bumped on commit (wake readers) and on
120    /// slot-free (wake writers in backpressure). Shared via `Arc` so clones
121    /// observe the same generation.
122    notify: Arc<ChangeNotify>,
123}
124
125#[derive(Debug, Clone)]
126struct Slot {
127    header: SlotHeader,
128    data: Vec<u8>,
129    /// `true` when a loan is currently active (writer has reserved,
130    /// not yet committed/discarded).
131    loaned: bool,
132    /// Wall-clock instant the current sample was committed — writer-side,
133    /// out-of-band (not in the wire header). Drives the §5.1 timeout-eviction
134    /// of slots held hostage by a hung reader. `None` = never committed.
135    committed_at: Option<std::time::Instant>,
136}
137
138#[cfg(feature = "std")]
139impl crate::backend::SlotBackend for InMemorySlotAllocator {
140    fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
141        Self::reserve_slot(self, active_readers_mask)
142    }
143    fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
144        Self::commit_slot(self, handle, bytes)
145    }
146    fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
147        Self::discard_slot(self, handle)
148    }
149    fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
150        Self::slot_data_ptr(self, handle)
151    }
152    fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
153        Self::commit_in_place(self, handle, len)
154    }
155    fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
156        Self::slot_read_ptr(self, handle)
157    }
158    fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
159        Self::next_unread_slot(self, reader_index)
160    }
161    fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
162        Self::read_slot(self, handle)
163    }
164    fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
165        Self::mark_read(self, handle, reader_index)
166    }
167    fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
168        Self::mark_reader_disconnected(self, reader_index)
169    }
170    fn slot_count(&self) -> Result<usize, SlotError> {
171        Self::slot_count(self)
172    }
173    fn slot_total_size(&self) -> usize {
174        Self::slot_total_size(self)
175    }
176    fn slot_capacity(&self) -> usize {
177        Self::slot_capacity(self)
178    }
179    fn type_hash(&self) -> Option<[u8; 16]> {
180        self.type_hash
181    }
182    fn notify_generation(&self) -> u64 {
183        self.notify_gen()
184    }
185    fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
186        InMemorySlotAllocator::wait_for_change(self, last, timeout);
187    }
188}
189
190#[cfg(feature = "std")]
191impl InMemorySlotAllocator {
192    /// Creates a new allocator with `slot_count` slots, each with a
193    /// `slot_capacity`-byte data area.
194    #[must_use]
195    pub fn new(segment_id: u64, slot_count: usize, slot_capacity: usize) -> Self {
196        let mut slots = Vec::with_capacity(slot_count);
197        for _ in 0..slot_count {
198            slots.push(Slot {
199                header: SlotHeader::new(0, 0),
200                data: alloc::vec![0u8; slot_capacity],
201                loaned: false,
202                committed_at: None,
203            });
204        }
205        Self {
206            slots: Arc::new(Mutex::new(slots)),
207            segment_id,
208            slot_capacity,
209            next_sn: Arc::new(core::sync::atomic::AtomicU32::new(0)),
210            type_hash: None,
211            notify: Arc::new(ChangeNotify::default()),
212        }
213    }
214
215    /// Current notify generation. Capture it before checking a condition and
216    /// pass it to [`Self::wait_for_change`] to avoid a lost wakeup.
217    #[must_use]
218    pub fn notify_gen(&self) -> u64 {
219        self.notify.current()
220    }
221
222    /// Blocks until the notify generation differs from `last` or `timeout`
223    /// elapses (event-driven, Spec §4.2 — no busy-poll). Pair with
224    /// [`Self::notify_gen`] for lost-wakeup-free waiting.
225    pub fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
226        self.notify.wait_change(last, timeout);
227    }
228
229    /// Spec §6.1: allocator with an associated type-hash. A reader
230    /// reading against this backend checks the hash against
231    /// `T::TYPE_HASH` and drops on mismatch.
232    #[must_use]
233    pub fn with_type_hash(mut self, hash: [u8; 16]) -> Self {
234        self.type_hash = Some(hash);
235        self
236    }
237
238    /// Spec §4.1 reserve_slot. Finds a free slot (all active
239    /// readers have read) or the first unused one.
240    ///
241    /// # Errors
242    /// `NoFreeSlot` when all slots are loaned or unfinished.
243    pub fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
244        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
245        for (idx, slot) in slots.iter_mut().enumerate() {
246            if slot.loaned {
247                continue;
248            }
249            // Slot is free if (a) empty (sample_size==0) or (b) all
250            // readers have read.
251            if slot.header.sample_size == 0 || slot.header.all_read(active_readers_mask) {
252                slot.loaned = true;
253                return Ok(SlotHandle {
254                    segment_id: self.segment_id,
255                    slot_index: idx as u32,
256                });
257            }
258        }
259        Err(SlotError::NoFreeSlot)
260    }
261
262    /// Spec §4.1 commit_slot. Writes the sample bytes into the slot
263    /// and sets SlotHeader { sn, sample_size, reader_mask=0 }.
264    ///
265    /// # Errors
266    /// `OutOfBounds`, `SampleTooLarge`, or lock poison.
267    pub fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
268        if bytes.len() > self.slot_capacity {
269            return Err(SlotError::SampleTooLarge {
270                sample: bytes.len(),
271                slot_capacity: self.slot_capacity,
272            });
273        }
274        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
275        let idx = handle.slot_index as usize;
276        if idx >= slots.len() {
277            return Err(SlotError::OutOfBounds);
278        }
279        let sn = self
280            .next_sn
281            .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
282        let slot = &mut slots[idx];
283        let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
284        slot.header = SlotHeader::new(sn, sample_size);
285        slot.data[..bytes.len()].copy_from_slice(bytes);
286        slot.loaned = false;
287        slot.committed_at = Some(std::time::Instant::now());
288        drop(slots);
289        self.notify.bump(); // new sample → wake blocked readers (§4.2)
290        Ok(sn)
291    }
292
293    /// In-place loan: writable pointer + capacity into a reserved slot's data
294    /// area (see [`SlotBackend::slot_data_ptr`](crate::SlotBackend::slot_data_ptr)).
295    ///
296    /// # Errors
297    /// `OutOfBounds` if the slot is not a live loan, or lock poison.
298    pub fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
299        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
300        let idx = handle.slot_index as usize;
301        let slot = slots.get_mut(idx).ok_or(SlotError::OutOfBounds)?;
302        if !slot.loaned {
303            return Err(SlotError::OutOfBounds);
304        }
305        Ok((slot.data.as_mut_ptr(), self.slot_capacity))
306    }
307
308    /// Finalizes an in-place loan (no copy) — sets the header + releases the
309    /// loan (see
310    /// [`SlotBackend::commit_in_place`](crate::SlotBackend::commit_in_place)).
311    ///
312    /// # Errors
313    /// `SampleTooLarge`, `OutOfBounds`, or lock poison.
314    pub fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
315        if len > self.slot_capacity {
316            return Err(SlotError::SampleTooLarge {
317                sample: len,
318                slot_capacity: self.slot_capacity,
319            });
320        }
321        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
322        let idx = handle.slot_index as usize;
323        if idx >= slots.len() {
324            return Err(SlotError::OutOfBounds);
325        }
326        let sn = self
327            .next_sn
328            .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
329        let slot = &mut slots[idx];
330        let sample_size = u32::try_from(len).unwrap_or(u32::MAX);
331        slot.header = SlotHeader::new(sn, sample_size);
332        slot.loaned = false;
333        slot.committed_at = Some(std::time::Instant::now());
334        drop(slots);
335        self.notify.bump();
336        Ok(sn)
337    }
338
339    /// Zero-copy read pointer + sample length into a committed slot (see
340    /// [`SlotBackend::slot_read_ptr`](crate::SlotBackend::slot_read_ptr)).
341    ///
342    /// # Errors
343    /// `OutOfBounds` or lock poison.
344    pub fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
345        let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
346        let idx = handle.slot_index as usize;
347        let slot = slots.get(idx).ok_or(SlotError::OutOfBounds)?;
348        Ok((slot.data.as_ptr(), slot.header.sample_size as usize))
349    }
350
351    /// Next committed slot not yet read by `reader_index` (see
352    /// [`SlotBackend::next_unread_slot`](crate::SlotBackend::next_unread_slot)).
353    ///
354    /// # Errors
355    /// Lock poison.
356    pub fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
357        let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
358        let bit = 1u32 << reader_index;
359        for (idx, slot) in slots.iter().enumerate() {
360            if slot.header.sample_size > 0 && (slot.header.reader_mask & bit) == 0 {
361                return Ok(Some(SlotHandle {
362                    segment_id: self.segment_id,
363                    slot_index: idx as u32,
364                }));
365            }
366        }
367        Ok(None)
368    }
369
370    /// Spec §5.1 timeout-eviction. Force-frees every committed, not-yet-fully-
371    /// read, not-currently-loaned slot whose sample was committed more than
372    /// `max_age` ago — by setting all `active_readers_mask` bits so the slot
373    /// counts as read and becomes reservable again. Guards against a single
374    /// hung/dead reader holding a slot hostage forever (the SPDP lease path
375    /// handles clean disconnects; this is the backstop for a stuck-but-alive
376    /// reader). Returns the number of slots evicted. The writer side calls this
377    /// periodically (e.g. from the DCPS tick).
378    ///
379    /// # Errors
380    /// Lock poison.
381    pub fn evict_stale(
382        &self,
383        max_age: core::time::Duration,
384        active_readers_mask: ReaderMask,
385    ) -> Result<usize, SlotError> {
386        let now = std::time::Instant::now();
387        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
388        let mut evicted = 0;
389        for slot in slots.iter_mut() {
390            if slot.loaned || slot.header.sample_size == 0 {
391                continue;
392            }
393            if slot.header.all_read(active_readers_mask) {
394                continue; // already free
395            }
396            let stale = slot
397                .committed_at
398                .is_some_and(|t| now.saturating_duration_since(t) >= max_age);
399            if stale {
400                // Force-free: mark all active readers as read.
401                slot.header.reader_mask |= active_readers_mask;
402                evicted += 1;
403            }
404        }
405        drop(slots);
406        if evicted > 0 {
407            self.notify.bump(); // slots freed → wake blocked writers (§4.2/§10.5)
408        }
409        Ok(evicted)
410    }
411
412    /// Spec §4.1 release_slot (no commit; discards the loan).
413    ///
414    /// # Errors
415    /// `OutOfBounds` or lock poison.
416    pub fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
417        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
418        let idx = handle.slot_index as usize;
419        if idx >= slots.len() {
420            return Err(SlotError::OutOfBounds);
421        }
422        slots[idx].loaned = false;
423        drop(slots);
424        self.notify.bump(); // loan released → wake blocked writers
425        Ok(())
426    }
427
428    /// Reader side: reads slot header + bytes.
429    ///
430    /// # Errors
431    /// `OutOfBounds`.
432    pub fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
433        let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
434        let idx = handle.slot_index as usize;
435        if idx >= slots.len() {
436            return Err(SlotError::OutOfBounds);
437        }
438        let slot = &slots[idx];
439        let n = slot.header.sample_size as usize;
440        Ok((slot.header, slot.data[..n.min(slot.data.len())].to_vec()))
441    }
442
443    /// Reader side: sets the `reader_index` bit in reader_mask.
444    ///
445    /// # Errors
446    /// `OutOfBounds` or lock poison.
447    pub fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
448        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
449        let idx = handle.slot_index as usize;
450        if idx >= slots.len() {
451            return Err(SlotError::OutOfBounds);
452        }
453        slots[idx].header.mark_read(reader_index);
454        drop(slots);
455        self.notify.bump(); // reader consumed slot → may free it → wake writers
456        Ok(())
457    }
458
459    /// Spec §5.2 reader disconnect, applied retroactively.
460    ///
461    /// Called by the caller (SPDP lease-expiry hook) when a
462    /// reader has died. Sets its bit on **all** slots, so that
463    /// the slot allocator sees it as "has read" and frees up
464    /// occupied slots again.
465    ///
466    /// # Errors
467    /// Lock poison.
468    pub fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
469        debug_assert!(reader_index < 32);
470        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
471        for slot in slots.iter_mut() {
472            slot.header.reader_mask |= 1u32 << reader_index;
473        }
474        drop(slots);
475        self.notify.bump(); // dead reader freed slots → wake blocked writers
476        Ok(())
477    }
478
479    /// Slot capacity (data area; without header).
480    #[must_use]
481    pub fn slot_capacity(&self) -> usize {
482        self.slot_capacity
483    }
484
485    /// Number of configured slots.
486    pub fn slot_count(&self) -> Result<usize, SlotError> {
487        Ok(self
488            .slots
489            .lock()
490            .map_err(|_| SlotError::LockPoisoned)?
491            .len())
492    }
493
494    /// Computes the total slot size (header + data, padded to a
495    /// 64-byte cache line) — for SEDP discovery.
496    #[must_use]
497    pub fn slot_total_size(&self) -> usize {
498        let raw = SLOT_HEADER_SIZE + self.slot_capacity;
499        (raw + 63) & !63
500    }
501}
502
503#[cfg(all(test, feature = "std"))]
504#[allow(clippy::expect_used, clippy::unwrap_used)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn reserve_returns_first_free_slot() {
510        let alloc = InMemorySlotAllocator::new(0, 4, 64);
511        let h0 = alloc.reserve_slot(0).expect("reserve 0");
512        assert_eq!(h0.slot_index, 0);
513        let h1 = alloc.reserve_slot(0).expect("reserve 1");
514        assert_eq!(h1.slot_index, 1);
515    }
516
517    #[test]
518    fn reserve_returns_no_free_slot_when_all_loaned() {
519        let alloc = InMemorySlotAllocator::new(0, 2, 64);
520        let _h0 = alloc.reserve_slot(0).unwrap();
521        let _h1 = alloc.reserve_slot(0).unwrap();
522        assert_eq!(alloc.reserve_slot(0), Err(SlotError::NoFreeSlot));
523    }
524
525    #[test]
526    fn commit_writes_bytes_and_increments_sn() {
527        let alloc = InMemorySlotAllocator::new(0, 2, 64);
528        let h = alloc.reserve_slot(0).unwrap();
529        let sn = alloc.commit_slot(h, &[1, 2, 3]).unwrap();
530        assert_eq!(sn, 0);
531        let (header, bytes) = alloc.read_slot(h).unwrap();
532        assert_eq!(header.sequence_number, 0);
533        assert_eq!(header.sample_size, 3);
534        assert_eq!(bytes, vec![1, 2, 3]);
535
536        let h2 = alloc.reserve_slot(0).unwrap();
537        let sn2 = alloc.commit_slot(h2, &[9]).unwrap();
538        assert_eq!(sn2, 1);
539    }
540
541    #[test]
542    fn in_place_loan_writes_without_staging_copy() {
543        // Reserve → write directly into the slot via the raw pointer →
544        // commit_in_place: the result must be byte-identical to commit_slot.
545        let alloc = InMemorySlotAllocator::new(0, 2, 64);
546        let h = alloc.reserve_slot(0).unwrap();
547        let (ptr, cap) = alloc.slot_data_ptr(h).unwrap();
548        assert!(cap >= 3);
549        // SAFETY: slot is reserved (loaned); ptr is the slot data area, cap bytes.
550        unsafe {
551            ptr.add(0).write(1);
552            ptr.add(1).write(2);
553            ptr.add(2).write(3);
554        }
555        let sn = alloc.commit_in_place(h, 3).unwrap();
556        assert_eq!(sn, 0);
557        let (header, bytes) = alloc.read_slot(h).unwrap();
558        assert_eq!(header.sequence_number, 0);
559        assert_eq!(header.sample_size, 3);
560        assert_eq!(bytes, vec![1, 2, 3]);
561
562        // The loan was released → the slot is reservable again.
563        let h2 = alloc.reserve_slot(0).unwrap();
564        assert_eq!(alloc.commit_in_place(h2, 0).unwrap(), 1);
565    }
566
567    #[test]
568    fn slot_data_ptr_rejects_unreserved_slot() {
569        let alloc = InMemorySlotAllocator::new(0, 2, 64);
570        // Slot 0 was never reserved → no live loan.
571        let h = SlotHandle {
572            segment_id: 0,
573            slot_index: 0,
574        };
575        assert_eq!(alloc.slot_data_ptr(h), Err(SlotError::OutOfBounds));
576    }
577
578    #[test]
579    fn commit_in_place_too_large_returns_error() {
580        let alloc = InMemorySlotAllocator::new(0, 2, 8);
581        let h = alloc.reserve_slot(0).unwrap();
582        assert!(matches!(
583            alloc.commit_in_place(h, 9),
584            Err(SlotError::SampleTooLarge { .. })
585        ));
586    }
587
588    #[test]
589    fn commit_too_large_returns_error() {
590        let alloc = InMemorySlotAllocator::new(0, 2, 8);
591        let h = alloc.reserve_slot(0).unwrap();
592        let err = alloc.commit_slot(h, &[0u8; 16]).unwrap_err();
593        assert!(matches!(
594            err,
595            SlotError::SampleTooLarge {
596                sample: 16,
597                slot_capacity: 8
598            }
599        ));
600    }
601
602    #[test]
603    fn discard_frees_slot_for_reuse() {
604        let alloc = InMemorySlotAllocator::new(0, 1, 64);
605        let h = alloc.reserve_slot(0).unwrap();
606        alloc.discard_slot(h).unwrap();
607        // The slot can be reserved again.
608        let _ = alloc.reserve_slot(0).unwrap();
609    }
610
611    #[test]
612    fn slot_recyclable_after_all_readers_marked() {
613        let alloc = InMemorySlotAllocator::new(0, 1, 64);
614        // 2 active readers: bit 0 + bit 1.
615        let active = 0b011;
616        let h = alloc.reserve_slot(active).unwrap();
617        alloc.commit_slot(h, &[0xAA]).unwrap();
618
619        // Reservation fails — the slot has not been read by everyone yet.
620        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
621
622        alloc.mark_read(h, 0).unwrap();
623        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
624
625        alloc.mark_read(h, 1).unwrap();
626        // Now free.
627        let _ = alloc.reserve_slot(active).unwrap();
628    }
629
630    #[test]
631    fn slot_total_size_is_cache_line_padded() {
632        let alloc = InMemorySlotAllocator::new(0, 4, 100);
633        // Header(16) + Data(100) = 116 → padded to 128.
634        assert_eq!(alloc.slot_total_size(), 128);
635    }
636
637    #[test]
638    fn evict_stale_frees_slot_held_by_hung_reader() {
639        // Spec §5.1: a committed slot a reader never read stays blocked; after
640        // `max_age` the writer's timeout-eviction force-frees it.
641        let alloc = InMemorySlotAllocator::new(0, 1, 64);
642        let active = 0b1; // one active reader, never reads.
643        let h = alloc.reserve_slot(active).unwrap();
644        alloc.commit_slot(h, &[0xAA]).unwrap();
645
646        // Blocked: the reader hasn't read.
647        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
648
649        // A future deadline evicts nothing.
650        assert_eq!(
651            alloc
652                .evict_stale(core::time::Duration::from_secs(3600), active)
653                .unwrap(),
654            0
655        );
656        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
657
658        // A zero deadline (everything is already older) evicts the slot.
659        assert_eq!(
660            alloc
661                .evict_stale(core::time::Duration::ZERO, active)
662                .unwrap(),
663            1
664        );
665        // Now reservable again.
666        let _ = alloc.reserve_slot(active).unwrap();
667    }
668
669    #[test]
670    fn evict_stale_leaves_read_slots_untouched() {
671        let alloc = InMemorySlotAllocator::new(0, 1, 64);
672        let active = 0b1;
673        let h = alloc.reserve_slot(active).unwrap();
674        alloc.commit_slot(h, &[0xAA]).unwrap();
675        alloc.mark_read(h, 0).unwrap(); // reader consumed it.
676        // Already free → nothing to evict even at zero deadline.
677        assert_eq!(
678            alloc
679                .evict_stale(core::time::Duration::ZERO, active)
680                .unwrap(),
681            0
682        );
683    }
684
685    #[test]
686    fn reader_disconnect_frees_blocked_slots() {
687        // Spec §5.2: reader lease-expiry triggers mark_reader_disconnected;
688        // slots that were waiting on the dead reader become free.
689        let alloc = InMemorySlotAllocator::new(0, 1, 64);
690        // Active readers: 0 + 1.
691        let active = 0b011;
692        let h = alloc.reserve_slot(active).unwrap();
693        alloc.commit_slot(h, &[0xAA]).unwrap();
694
695        // Only reader 0 has read, reader 1 has not.
696        alloc.mark_read(h, 0).unwrap();
697        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
698
699        // Reader 1 disconnected — the slot becomes reservable again.
700        alloc.mark_reader_disconnected(1).unwrap();
701        let _ = alloc.reserve_slot(active).expect("free after disconnect");
702    }
703}