Skip to main content

zerodds_flatdata/
allocator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! In-Memory-Slot-Allocator als Referenz-Implementation des
4//! [`SlotBackend`](crate::SlotBackend)-Traits. Single-Process
5//! Same-Host-Pub/Sub ohne mmap-Overhead — fuer Tests und als
6//! Default-Backend wenn `posix-mmap` deaktiviert ist. Der
7//! Cross-Process-mmap-Backend lebt in `posix.rs` und implementiert
8//! dieselbe Trait-API.
9//!
10//! Spec: zerodds-flatdata-1.0 §4.1, §5.1.
11
12use alloc::sync::Arc;
13use alloc::vec::Vec;
14
15#[cfg(feature = "std")]
16use std::sync::Mutex;
17
18use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
19
20/// Slot-Identifikation: (segment_id, slot_index).
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
22pub struct SlotHandle {
23    /// Segment-ID (FNV-Hash des Segment-Pfads).
24    pub segment_id: u64,
25    /// Slot-Index in [0, slot_count).
26    pub slot_index: u32,
27}
28
29/// Fehler beim Slot-Management.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub enum SlotError {
32    /// Alle Slots sind belegt — keiner frei.
33    NoFreeSlot,
34    /// Slot-Index ausserhalb [0, slot_count).
35    OutOfBounds,
36    /// Sample passt nicht in Slot-Size.
37    SampleTooLarge {
38        /// Sample-Bytes.
39        sample: usize,
40        /// Konfigurierter Slot-Datenbereich.
41        slot_capacity: usize,
42    },
43    /// Slot-Lock konnte nicht erworben werden (poisoned).
44    LockPoisoned,
45}
46
47impl core::fmt::Display for SlotError {
48    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
49        match self {
50            Self::NoFreeSlot => f.write_str("no free slot in segment"),
51            Self::OutOfBounds => f.write_str("slot index out of bounds"),
52            Self::SampleTooLarge {
53                sample,
54                slot_capacity,
55            } => write!(
56                f,
57                "sample {sample} byte does not fit in slot capacity {slot_capacity}"
58            ),
59            Self::LockPoisoned => f.write_str("slot lock poisoned"),
60        }
61    }
62}
63
64#[cfg(feature = "std")]
65impl std::error::Error for SlotError {}
66
67/// In-Memory-Slot-Allocator. Vermittelt das gleiche
68/// [`SlotBackend`](crate::SlotBackend)-Interface wie der
69/// POSIX-mmap-Backend (`posix.rs`), aber liegt im Process-Heap —
70/// Single-Process-Pub/Sub und Test-Setups ohne mmap-Dep.
71#[cfg(feature = "std")]
72pub struct InMemorySlotAllocator {
73    slots: Arc<Mutex<Vec<Slot>>>,
74    segment_id: u64,
75    slot_capacity: usize,
76    next_sn: Arc<core::sync::atomic::AtomicU32>,
77    /// Optionaler Type-Hash fuer Cross-Validation (Spec §6.1).
78    type_hash: Option<[u8; 16]>,
79}
80
81#[derive(Debug, Clone)]
82struct Slot {
83    header: SlotHeader,
84    data: Vec<u8>,
85    /// `true` wenn aktuell ein Loan aktiv ist (Writer hat reserviert,
86    /// noch kein commit/discard).
87    loaned: bool,
88}
89
90#[cfg(feature = "std")]
91impl crate::backend::SlotBackend for InMemorySlotAllocator {
92    fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
93        Self::reserve_slot(self, active_readers_mask)
94    }
95    fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
96        Self::commit_slot(self, handle, bytes)
97    }
98    fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
99        Self::discard_slot(self, handle)
100    }
101    fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
102        Self::read_slot(self, handle)
103    }
104    fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
105        Self::mark_read(self, handle, reader_index)
106    }
107    fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
108        Self::mark_reader_disconnected(self, reader_index)
109    }
110    fn slot_count(&self) -> Result<usize, SlotError> {
111        Self::slot_count(self)
112    }
113    fn slot_total_size(&self) -> usize {
114        Self::slot_total_size(self)
115    }
116    fn slot_capacity(&self) -> usize {
117        Self::slot_capacity(self)
118    }
119    fn type_hash(&self) -> Option<[u8; 16]> {
120        self.type_hash
121    }
122}
123
124#[cfg(feature = "std")]
125impl InMemorySlotAllocator {
126    /// Erzeugt einen neuen Allocator mit `slot_count` Slots zu je
127    /// `slot_capacity` Bytes Daten-Bereich.
128    #[must_use]
129    pub fn new(segment_id: u64, slot_count: usize, slot_capacity: usize) -> Self {
130        let mut slots = Vec::with_capacity(slot_count);
131        for _ in 0..slot_count {
132            slots.push(Slot {
133                header: SlotHeader::new(0, 0),
134                data: alloc::vec![0u8; slot_capacity],
135                loaned: false,
136            });
137        }
138        Self {
139            slots: Arc::new(Mutex::new(slots)),
140            segment_id,
141            slot_capacity,
142            next_sn: Arc::new(core::sync::atomic::AtomicU32::new(0)),
143            type_hash: None,
144        }
145    }
146
147    /// Spec §6.1: Allocator mit verbundenem Type-Hash. Reader, der
148    /// gegen diesen Backend liest, prueft den Hash gegen
149    /// `T::TYPE_HASH` und droppt bei Mismatch.
150    #[must_use]
151    pub fn with_type_hash(mut self, hash: [u8; 16]) -> Self {
152        self.type_hash = Some(hash);
153        self
154    }
155
156    /// Spec §4.1 reserve_slot. Sucht einen freien Slot (alle aktiven
157    /// Reader haben gelesen) oder den ersten unbenutzten.
158    ///
159    /// # Errors
160    /// `NoFreeSlot` wenn alle Slots geloant oder unfertig.
161    pub fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
162        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
163        for (idx, slot) in slots.iter_mut().enumerate() {
164            if slot.loaned {
165                continue;
166            }
167            // Slot frei wenn (a) leer (sample_size==0) oder (b) alle
168            // Reader gelesen haben.
169            if slot.header.sample_size == 0 || slot.header.all_read(active_readers_mask) {
170                slot.loaned = true;
171                return Ok(SlotHandle {
172                    segment_id: self.segment_id,
173                    slot_index: idx as u32,
174                });
175            }
176        }
177        Err(SlotError::NoFreeSlot)
178    }
179
180    /// Spec §4.1 commit_slot. Schreibt sample-bytes in den Slot und
181    /// setzt SlotHeader { sn, sample_size, reader_mask=0 }.
182    ///
183    /// # Errors
184    /// `OutOfBounds`, `SampleTooLarge`, oder Lock-Poison.
185    pub fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
186        if bytes.len() > self.slot_capacity {
187            return Err(SlotError::SampleTooLarge {
188                sample: bytes.len(),
189                slot_capacity: self.slot_capacity,
190            });
191        }
192        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
193        let idx = handle.slot_index as usize;
194        if idx >= slots.len() {
195            return Err(SlotError::OutOfBounds);
196        }
197        let sn = self
198            .next_sn
199            .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
200        let slot = &mut slots[idx];
201        let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
202        slot.header = SlotHeader::new(sn, sample_size);
203        slot.data[..bytes.len()].copy_from_slice(bytes);
204        slot.loaned = false;
205        Ok(sn)
206    }
207
208    /// Spec §4.1 release_slot (kein commit; verwirft Loan).
209    ///
210    /// # Errors
211    /// `OutOfBounds` oder Lock-Poison.
212    pub fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
213        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
214        let idx = handle.slot_index as usize;
215        if idx >= slots.len() {
216            return Err(SlotError::OutOfBounds);
217        }
218        slots[idx].loaned = false;
219        Ok(())
220    }
221
222    /// Reader-Side: liest Slot-Header + bytes.
223    ///
224    /// # Errors
225    /// `OutOfBounds`.
226    pub fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
227        let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
228        let idx = handle.slot_index as usize;
229        if idx >= slots.len() {
230            return Err(SlotError::OutOfBounds);
231        }
232        let slot = &slots[idx];
233        let n = slot.header.sample_size as usize;
234        Ok((slot.header, slot.data[..n.min(slot.data.len())].to_vec()))
235    }
236
237    /// Reader-Side: setzt `reader_index`-Bit im reader_mask.
238    ///
239    /// # Errors
240    /// `OutOfBounds` oder Lock-Poison.
241    pub fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
242        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
243        let idx = handle.slot_index as usize;
244        if idx >= slots.len() {
245            return Err(SlotError::OutOfBounds);
246        }
247        slots[idx].header.mark_read(reader_index);
248        Ok(())
249    }
250
251    /// Spec §5.2 Reader-Disconnect retroaktiv.
252    ///
253    /// Wird vom Caller (SPDP-Lease-Expiry-Hook) gerufen wenn ein
254    /// Reader gestorben ist. Setzt sein Bit auf **allen** Slots,
255    /// damit der Slot-Allocator ihn als "hat gelesen" sieht und
256    /// belegte Slots wieder freigibt.
257    ///
258    /// # Errors
259    /// Lock-Poison.
260    pub fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
261        debug_assert!(reader_index < 32);
262        let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
263        for slot in slots.iter_mut() {
264            slot.header.reader_mask |= 1u32 << reader_index;
265        }
266        Ok(())
267    }
268
269    /// Slot-Capacity (data-Bereich; ohne Header).
270    #[must_use]
271    pub fn slot_capacity(&self) -> usize {
272        self.slot_capacity
273    }
274
275    /// Anzahl konfigurierter Slots.
276    pub fn slot_count(&self) -> Result<usize, SlotError> {
277        Ok(self
278            .slots
279            .lock()
280            .map_err(|_| SlotError::LockPoisoned)?
281            .len())
282    }
283
284    /// Berechnet die Gesamt-Slot-Groesse (Header + Daten, gepaddet auf
285    /// 64-byte Cache-Line) — fuer SEDP-Discovery.
286    #[must_use]
287    pub fn slot_total_size(&self) -> usize {
288        let raw = SLOT_HEADER_SIZE + self.slot_capacity;
289        (raw + 63) & !63
290    }
291}
292
293#[cfg(all(test, feature = "std"))]
294#[allow(clippy::expect_used, clippy::unwrap_used)]
295mod tests {
296    use super::*;
297
298    #[test]
299    fn reserve_returns_first_free_slot() {
300        let alloc = InMemorySlotAllocator::new(0, 4, 64);
301        let h0 = alloc.reserve_slot(0).expect("reserve 0");
302        assert_eq!(h0.slot_index, 0);
303        let h1 = alloc.reserve_slot(0).expect("reserve 1");
304        assert_eq!(h1.slot_index, 1);
305    }
306
307    #[test]
308    fn reserve_returns_no_free_slot_when_all_loaned() {
309        let alloc = InMemorySlotAllocator::new(0, 2, 64);
310        let _h0 = alloc.reserve_slot(0).unwrap();
311        let _h1 = alloc.reserve_slot(0).unwrap();
312        assert_eq!(alloc.reserve_slot(0), Err(SlotError::NoFreeSlot));
313    }
314
315    #[test]
316    fn commit_writes_bytes_and_increments_sn() {
317        let alloc = InMemorySlotAllocator::new(0, 2, 64);
318        let h = alloc.reserve_slot(0).unwrap();
319        let sn = alloc.commit_slot(h, &[1, 2, 3]).unwrap();
320        assert_eq!(sn, 0);
321        let (header, bytes) = alloc.read_slot(h).unwrap();
322        assert_eq!(header.sequence_number, 0);
323        assert_eq!(header.sample_size, 3);
324        assert_eq!(bytes, vec![1, 2, 3]);
325
326        let h2 = alloc.reserve_slot(0).unwrap();
327        let sn2 = alloc.commit_slot(h2, &[9]).unwrap();
328        assert_eq!(sn2, 1);
329    }
330
331    #[test]
332    fn commit_too_large_returns_error() {
333        let alloc = InMemorySlotAllocator::new(0, 2, 8);
334        let h = alloc.reserve_slot(0).unwrap();
335        let err = alloc.commit_slot(h, &[0u8; 16]).unwrap_err();
336        assert!(matches!(
337            err,
338            SlotError::SampleTooLarge {
339                sample: 16,
340                slot_capacity: 8
341            }
342        ));
343    }
344
345    #[test]
346    fn discard_frees_slot_for_reuse() {
347        let alloc = InMemorySlotAllocator::new(0, 1, 64);
348        let h = alloc.reserve_slot(0).unwrap();
349        alloc.discard_slot(h).unwrap();
350        // Slot kann wieder reserviert werden.
351        let _ = alloc.reserve_slot(0).unwrap();
352    }
353
354    #[test]
355    fn slot_recyclable_after_all_readers_marked() {
356        let alloc = InMemorySlotAllocator::new(0, 1, 64);
357        // 2 aktive Reader: Bit 0 + Bit 1.
358        let active = 0b011;
359        let h = alloc.reserve_slot(active).unwrap();
360        alloc.commit_slot(h, &[0xAA]).unwrap();
361
362        // Reservation scheitert — Slot ist noch nicht von allen gelesen.
363        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
364
365        alloc.mark_read(h, 0).unwrap();
366        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
367
368        alloc.mark_read(h, 1).unwrap();
369        // Jetzt frei.
370        let _ = alloc.reserve_slot(active).unwrap();
371    }
372
373    #[test]
374    fn slot_total_size_is_cache_line_padded() {
375        let alloc = InMemorySlotAllocator::new(0, 4, 100);
376        // Header(16) + Data(100) = 116 → padded auf 128.
377        assert_eq!(alloc.slot_total_size(), 128);
378    }
379
380    #[test]
381    fn reader_disconnect_frees_blocked_slots() {
382        // Spec §5.2: Reader-Lease-Expiry triggert mark_reader_disconnected;
383        // Slots die auf den toten Reader warteten, werden frei.
384        let alloc = InMemorySlotAllocator::new(0, 1, 64);
385        // Aktive Reader: 0 + 1.
386        let active = 0b011;
387        let h = alloc.reserve_slot(active).unwrap();
388        alloc.commit_slot(h, &[0xAA]).unwrap();
389
390        // Nur Reader 0 hat gelesen, Reader 1 nicht.
391        alloc.mark_read(h, 0).unwrap();
392        assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
393
394        // Reader 1 disconnected — Slot wird wieder reservierbar.
395        alloc.mark_reader_disconnected(1).unwrap();
396        let _ = alloc.reserve_slot(active).expect("free after disconnect");
397    }
398}