Skip to main content

zerodds_flatdata/
pubsub.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! FlatWriter + FlatReader — high-level API ueber dem SlotAllocator.
4//!
5//! Spec: zerodds-flatdata-1.0 §8 + §9.
6
7use alloc::sync::Arc;
8use core::marker::PhantomData;
9use core::ops::Deref;
10
11use crate::FlatStruct;
12use crate::allocator::{InMemorySlotAllocator, SlotError, SlotHandle};
13use crate::backend::SlotBackend;
14use crate::slot::ReaderMask;
15
16/// Schreibt FlatStruct-Samples direkt in SHM-Slots — ohne CDR-Encode.
17pub struct FlatWriter<T: FlatStruct> {
18    alloc: Arc<InMemorySlotAllocator>,
19    active_readers_mask: ReaderMask,
20    _t: PhantomData<fn() -> T>,
21}
22
23impl<T: FlatStruct> FlatWriter<T> {
24    /// Erzeugt einen Writer ueber einem Allocator. `active_readers_mask`
25    /// listet die Reader-Bits, die alle gelesen haben muessen, bevor
26    /// ein Slot wiederverwendet werden kann.
27    pub fn new(alloc: Arc<InMemorySlotAllocator>, active_readers_mask: ReaderMask) -> Self {
28        Self {
29            alloc,
30            active_readers_mask,
31            _t: PhantomData,
32        }
33    }
34
35    /// Spec §8.1 write_flat — reserve + memcpy + commit in einem Call.
36    ///
37    /// # Errors
38    /// `SlotError::NoFreeSlot` bei Resource-Pressure;
39    /// `SampleTooLarge` wenn der Slot kleiner als T::WIRE_SIZE ist.
40    pub fn write(&self, sample: &T) -> Result<u32, SlotError> {
41        let bytes = sample.as_bytes();
42        let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
43        match self.alloc.commit_slot(handle, bytes) {
44            Ok(sn) => Ok(sn),
45            Err(e) => {
46                let _ = self.alloc.discard_slot(handle);
47                Err(e)
48            }
49        }
50    }
51
52    /// Spec §8.2 loan_slot — niedrigere Ebene: explizite Slot-Borrow.
53    /// Caller schreibt direkt in das geliehene `&mut T`-Buffer und
54    /// committed.
55    ///
56    /// # Errors
57    /// `NoFreeSlot`.
58    pub fn loan_slot(&self) -> Result<FlatSlot<'_, T>, SlotError> {
59        let handle = self.alloc.reserve_slot(self.active_readers_mask)?;
60        Ok(FlatSlot {
61            handle,
62            writer: self,
63            committed: false,
64        })
65    }
66}
67
68/// Geliehener Slot. Caller setzt den Sample via `write()` und ruft
69/// dann `commit()`. Drop ohne commit verwirft den Loan.
70pub struct FlatSlot<'a, T: FlatStruct> {
71    handle: SlotHandle,
72    writer: &'a FlatWriter<T>,
73    committed: bool,
74}
75
76impl<T: FlatStruct> FlatSlot<'_, T> {
77    /// Schreibt das Sample in den Slot.
78    ///
79    /// # Errors
80    /// Wie `commit_slot`.
81    pub fn commit(mut self, sample: T) -> Result<u32, SlotError> {
82        let bytes = sample.as_bytes();
83        let sn = self.writer.alloc.commit_slot(self.handle, bytes)?;
84        self.committed = true;
85        Ok(sn)
86    }
87}
88
89impl<T: FlatStruct> Drop for FlatSlot<'_, T> {
90    fn drop(&mut self) {
91        if !self.committed {
92            let _ = self.writer.alloc.discard_slot(self.handle);
93        }
94    }
95}
96
97/// Liest FlatStruct-Samples direkt aus SHM-Slots.
98pub struct FlatReader<T: FlatStruct> {
99    alloc: Arc<InMemorySlotAllocator>,
100    /// Welcher Bit im reader_mask gehoert diesem Reader.
101    reader_index: u8,
102    /// Letzte gelesene Sequence-Number (verhindert Duplicate).
103    last_sn: core::sync::atomic::AtomicU32,
104    /// Type-Hash des Topics — zum Versions-Check.
105    expected_type_hash: [u8; 16],
106    _t: PhantomData<fn() -> T>,
107}
108
109impl<T: FlatStruct> FlatReader<T> {
110    /// Erzeugt einen Reader ueber einem Allocator.
111    pub fn new(alloc: Arc<InMemorySlotAllocator>, reader_index: u8) -> Self {
112        Self {
113            alloc,
114            reader_index,
115            last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
116            expected_type_hash: T::TYPE_HASH,
117            _t: PhantomData,
118        }
119    }
120
121    /// Liefert Type-Hash — fuer Discovery-Match.
122    #[must_use]
123    pub fn type_hash(&self) -> [u8; 16] {
124        self.expected_type_hash
125    }
126
127    /// Spec §9.1 read_flat. Liefert das **neueste** ungelesene Sample.
128    /// Setzt automatisch das reader-Bit im reader_mask.
129    ///
130    /// Spec §6.1 Type-Hash-Cross-Validation: vor dem Slot-Read wird
131    /// `T::TYPE_HASH` gegen den am SlotBackend hinterlegten Hash
132    /// geprueft. Bei Mismatch: keine Slots werden dereferenziert,
133    /// `Err(SlotError::SampleTooLarge)` mit Schema-Drift-Indikation.
134    ///
135    /// # Errors
136    /// - `SampleTooLarge` bei TYPE_HASH-Mismatch (Spec §6.1).
137    /// - Wire-/Layout-Fehler oder Slot-Lock-Poison wie sonst.
138    pub fn read(&self) -> Result<Option<T>, SlotError> {
139        // Spec §6.1: Type-Hash Cross-Validation. Wenn der Backend
140        // einen Hash liefert (per `with_type_hash` gesetzt), muss er
141        // mit `T::TYPE_HASH` uebereinstimmen — sonst Schema-Drift,
142        // wir lehnen den Read ohne Slot-Dereferenzierung ab.
143        if let Some(backend_hash) = SlotBackend::type_hash(&*self.alloc) {
144            if backend_hash != self.expected_type_hash {
145                return Err(SlotError::SampleTooLarge {
146                    sample: 0,
147                    slot_capacity: 0,
148                });
149            }
150        }
151        let count = self.alloc.slot_count()?;
152        let mut best: Option<(u32, T)> = None;
153        let last_seen = self.last_sn.load(core::sync::atomic::Ordering::Relaxed);
154        for idx in 0..count {
155            let handle = SlotHandle {
156                segment_id: 0,
157                slot_index: idx as u32,
158            };
159            let (header, bytes) = self.alloc.read_slot(handle)?;
160            if header.sample_size == 0 {
161                continue; // unbenutzt
162            }
163            // Schon gelesen? Bit gesetzt.
164            if (header.reader_mask & (1u32 << self.reader_index)) != 0 {
165                continue;
166            }
167            // Zu kurz?
168            if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
169                continue;
170            }
171            // SAFETY: WIRE_SIZE oben gepruft + TYPE_HASH-Match oben
172            // gegen Schema-Drift gesichert (Spec §6.1).
173            let sample = unsafe { T::from_bytes_unchecked(&bytes) };
174            // Wir liefern das **neueste**: hoechste sn, die noch nicht
175            // in last_seen ist.
176            let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
177            let beats_current = best
178                .as_ref()
179                .is_none_or(|(b_sn, _)| header.sequence_number > *b_sn);
180            if unseen && beats_current {
181                best = Some((header.sequence_number, sample));
182            }
183            // Reader-Bit setzen — auch wenn wir das Sample nicht
184            // ausliefern (Slot kann recyclet werden).
185            self.alloc.mark_read(handle, self.reader_index)?;
186        }
187        if let Some((sn, _)) = best.as_ref() {
188            self.last_sn
189                .store(*sn, core::sync::atomic::Ordering::Relaxed);
190        }
191        Ok(best.map(|(_, t)| t))
192    }
193}
194
195/// Reference-Sample, das beim `Drop` automatisch das reader-Bit setzt.
196/// Spec §9.2/§9.3.
197pub struct FlatSampleRef<T: FlatStruct> {
198    sample: T,
199}
200
201impl<T: FlatStruct> FlatSampleRef<T> {
202    /// Wrapt ein gelesenes Sample.
203    #[must_use]
204    pub fn new(sample: T) -> Self {
205        Self { sample }
206    }
207
208    /// Konsumiert das Wrapper und liefert das Sample.
209    #[must_use]
210    pub fn into_inner(self) -> T {
211        self.sample
212    }
213}
214
215impl<T: FlatStruct> Deref for FlatSampleRef<T> {
216    type Target = T;
217    fn deref(&self) -> &T {
218        &self.sample
219    }
220}
221
222#[cfg(test)]
223#[allow(clippy::expect_used, clippy::unwrap_used)]
224mod tests {
225    use super::*;
226
227    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
228    #[repr(C)]
229    struct Pose {
230        x: i64,
231        y: i64,
232        z: i64,
233    }
234
235    // SAFETY: repr(C) + Copy + 'static, alle Felder sind Primitiv.
236    unsafe impl FlatStruct for Pose {
237        const TYPE_HASH: [u8; 16] = [0x42; 16];
238    }
239
240    fn fresh_alloc(slot_count: usize) -> Arc<InMemorySlotAllocator> {
241        Arc::new(InMemorySlotAllocator::new(0, slot_count, 64))
242    }
243
244    #[test]
245    fn writer_write_then_reader_read() {
246        let alloc = fresh_alloc(4);
247        // 1 aktiver Reader (Bit 0).
248        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
249        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
250
251        let p = Pose { x: 1, y: 2, z: 3 };
252        let _sn = writer.write(&p).expect("write");
253
254        let got = reader.read().expect("read").expect("some");
255        assert_eq!(got, p);
256    }
257
258    #[test]
259    fn reader_does_not_re_read_same_slot() {
260        let alloc = fresh_alloc(4);
261        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
262        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
263
264        writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
265        let _ = reader.read().expect("first read").expect("some");
266        // Zweites read ohne weiteren write → None.
267        let second = reader.read().expect("second read");
268        assert!(second.is_none());
269    }
270
271    #[test]
272    fn writer_loan_commit_pattern() {
273        let alloc = fresh_alloc(2);
274        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
275        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
276
277        let slot = writer.loan_slot().expect("loan");
278        let _sn = slot.commit(Pose { x: 7, y: 8, z: 9 }).expect("commit");
279
280        let got = reader.read().expect("read").expect("some");
281        assert_eq!(got, Pose { x: 7, y: 8, z: 9 });
282    }
283
284    #[test]
285    fn loan_drop_without_commit_releases_slot() {
286        let alloc = fresh_alloc(1);
287        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
288
289        {
290            let _slot = writer.loan_slot().expect("loan");
291            // Drop ohne commit.
292        }
293
294        // Slot ist wieder frei.
295        let _ = writer.loan_slot().expect("re-loan after drop");
296    }
297
298    #[test]
299    fn reader_recycles_slot_after_read() {
300        let alloc = fresh_alloc(1);
301        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
302        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
303
304        // Erstes Sample.
305        writer.write(&Pose { x: 1, y: 1, z: 1 }).expect("w1");
306        let _ = reader.read().expect("r1").expect("some");
307
308        // Zweites Sample — Slot muss wiederverwendet werden koennen
309        // (Reader-Bit ist gesetzt).
310        writer.write(&Pose { x: 2, y: 2, z: 2 }).expect("w2");
311        let got = reader.read().expect("r2").expect("some");
312        assert_eq!(got, Pose { x: 2, y: 2, z: 2 });
313    }
314
315    #[test]
316    fn flat_sample_ref_deref() {
317        let p = Pose { x: 1, y: 2, z: 3 };
318        let r = FlatSampleRef::new(p);
319        assert_eq!(r.x, 1);
320        assert_eq!(r.into_inner(), p);
321    }
322
323    #[test]
324    fn reader_rejects_type_hash_mismatch() {
325        // Spec §6.1: Reader prueft `T::TYPE_HASH` gegen den am
326        // Backend hinterlegten Hash; Mismatch → Schema-Drift-Reject.
327        let wrong_hash = [0xBB; 16];
328        let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(wrong_hash));
329        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
330        let res = reader.read();
331        assert!(matches!(res, Err(SlotError::SampleTooLarge { .. })));
332    }
333
334    #[test]
335    fn reader_accepts_matching_type_hash() {
336        // Spec §6.1: bei korrektem Hash am Backend → kein Reject;
337        // ohne Sample → Ok(None).
338        let alloc = Arc::new(InMemorySlotAllocator::new(0, 4, 64).with_type_hash(Pose::TYPE_HASH));
339        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
340        let res = reader.read().expect("no schema drift");
341        assert!(res.is_none());
342    }
343
344    #[test]
345    fn reader_without_backend_hash_does_not_reject() {
346        // Default-Allocator ohne `with_type_hash` → keine Validation,
347        // Reader liest normal.
348        let alloc = fresh_alloc(4);
349        let writer = FlatWriter::<Pose>::new(Arc::clone(&alloc), 0b1);
350        let reader = FlatReader::<Pose>::new(Arc::clone(&alloc), 0);
351        writer.write(&Pose { x: 1, y: 2, z: 3 }).expect("write");
352        let got = reader.read().expect("read").expect("some");
353        assert_eq!(got, Pose { x: 1, y: 2, z: 3 });
354    }
355}