Skip to main content

zerodds_rtps/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Best-Effort Stateless RTPS-Reader (W4).
4//!
5//! parst eingehende RTPS-Datagrams, filtert DATA-
6//! Submessages, die an unsere `EntityId` adressiert sind (oder
7//! ID_UNKNOWN als Wildcard), und liefert die Payload-Bytes an einen
8//! Listener-Callback.
9
10extern crate alloc;
11use alloc::vec::Vec;
12
13use crate::datagram::{ParsedSubmessage, decode_datagram};
14use crate::error::WireError;
15use crate::wire_types::{EntityId, EntityKind, Guid, GuidPrefix, SequenceNumber};
16
17/// Best-Effort Reader.
18///
19/// Stateless ausser der eigenen GUID + dem akzeptierten Writer-GUID.
20/// Phase 0: 1:1-Modell. Forward-Kompatibel mit Mehr-Writer-Setup, weil
21/// `recv_datagram` jeden DATA-Submessage einzeln liefert und die
22/// Filterlogik im Caller liegen kann.
23#[derive(Debug, Clone)]
24pub struct BestEffortReader {
25    guid: Guid,
26}
27
28impl BestEffortReader {
29    /// Konstruiert einen Reader.
30    #[must_use]
31    pub fn new(participant_prefix: GuidPrefix, reader_id: EntityId) -> Self {
32        Self {
33            guid: Guid::new(participant_prefix, reader_id),
34        }
35    }
36
37    /// Eigene GUID.
38    #[must_use]
39    pub fn guid(&self) -> Guid {
40        self.guid
41    }
42
43    /// Verarbeitet ein eingehendes Datagram.
44    ///
45    /// Liefert Vec der DATA-Submessages, die an diesen Reader gerichtet
46    /// sind (matched auf `entity_key` ODER `entity_kind == Unknown` als
47    /// Wildcard).
48    ///
49    /// # Errors
50    /// `WireError`, wenn das Datagram nicht parst.
51    pub fn recv_datagram(&self, datagram: &[u8]) -> Result<Vec<DeliveredSample>, WireError> {
52        let parsed = decode_datagram(datagram)?;
53        let mut out = Vec::new();
54        for sub in parsed.submessages {
55            if let ParsedSubmessage::Data(d) = sub {
56                if matches_reader(&d.reader_id, &self.guid.entity_id) {
57                    out.push(DeliveredSample {
58                        writer_id: d.writer_id,
59                        writer_sn: d.writer_sn,
60                        // WP 2.0a: d.serialized_payload ist bereits Arc<[u8]>
61                        // — kein zweiter Arc::from-Alloc, nur move.
62                        payload: d.serialized_payload,
63                    });
64                }
65            }
66            // Andere Submessages (HEARTBEAT/ACKNACK/GAP/Unknown) werden
67            // in Phase 0 ignoriert. Reliable-Pfad kommt mit Phase 1.
68        }
69        Ok(out)
70    }
71}
72
73/// Eine DATA-Submessage, die der Reader an den Listener weiterreicht.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct DeliveredSample {
76    /// Writer-GUID (nur EntityId; der Caller kennt den Participant
77    /// ueber den Datagram-Header).
78    pub writer_id: EntityId,
79    /// Sequence-Number, mit der der Writer das Sample sendet.
80    pub writer_sn: SequenceNumber,
81    /// Payload-Bytes (XCDR2-codiert oder vendor-spezifisch),
82    /// referenzgezaehlt via `Arc::clone` aus dem Cache.
83    pub payload: alloc::sync::Arc<[u8]>,
84}
85
86fn matches_reader(target: &EntityId, our: &EntityId) -> bool {
87    // Wildcard: ID_UNKNOWN matched alles.
88    if target.entity_kind == EntityKind::Unknown && target.entity_key == [0; 3] {
89        return true;
90    }
91    target == our
92}
93
94#[cfg(test)]
95mod tests {
96    #![allow(clippy::expect_used, clippy::unwrap_used)]
97    use super::*;
98    use crate::writer::BestEffortWriter;
99
100    fn reader_id() -> EntityId {
101        EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0])
102    }
103    fn writer_id() -> EntityId {
104        EntityId::user_writer_with_key([0x10, 0x20, 0x30])
105    }
106    fn prefix() -> GuidPrefix {
107        GuidPrefix::from_bytes([1; 12])
108    }
109
110    #[test]
111    fn reader_delivers_data_addressed_to_us() {
112        let mut w = BestEffortWriter::new(prefix(), writer_id(), reader_id());
113        let bytes = w.write(b"hello").unwrap();
114        let r = BestEffortReader::new(prefix(), reader_id());
115        let samples = r.recv_datagram(&bytes).unwrap();
116        assert_eq!(samples.len(), 1);
117        assert_eq!(samples[0].payload.as_ref(), &b"hello"[..]);
118        assert_eq!(samples[0].writer_id, writer_id());
119        assert_eq!(samples[0].writer_sn, SequenceNumber(1));
120    }
121
122    #[test]
123    fn reader_drops_data_addressed_to_other_reader() {
124        let other_reader = EntityId::user_reader_with_key([0x99, 0x99, 0x99]);
125        let mut w = BestEffortWriter::new(prefix(), writer_id(), other_reader);
126        let bytes = w.write(b"not for you").unwrap();
127        let r = BestEffortReader::new(prefix(), reader_id());
128        let samples = r.recv_datagram(&bytes).unwrap();
129        assert!(samples.is_empty());
130    }
131
132    #[test]
133    fn reader_accepts_unknown_wildcard_target() {
134        let mut w = BestEffortWriter::new(prefix(), writer_id(), EntityId::UNKNOWN);
135        let bytes = w.write(b"broadcast").unwrap();
136        let r = BestEffortReader::new(prefix(), reader_id());
137        let samples = r.recv_datagram(&bytes).unwrap();
138        assert_eq!(samples.len(), 1);
139        assert_eq!(samples[0].payload.as_ref(), &b"broadcast"[..]);
140    }
141
142    #[test]
143    fn reader_propagates_invalid_magic_error() {
144        let r = BestEffortReader::new(prefix(), reader_id());
145        let bytes = [0u8; 32];
146        let res = r.recv_datagram(&bytes);
147        assert!(matches!(res, Err(WireError::InvalidMagic { .. })));
148    }
149
150    #[test]
151    fn reader_handles_multiple_data_in_one_datagram() {
152        // Bauen manuell ein Datagram mit zwei DATA-Submessages an uns.
153        let mut w = BestEffortWriter::new(prefix(), writer_id(), reader_id());
154        let bytes_a = w.write(b"first").unwrap();
155        let bytes_b = w.write(b"second").unwrap();
156        // Concat-Datagrams nicht direkt erlaubt — aber zwei aufeinander-
157        // folgende recv-Aufrufe simulieren das.
158        let r = BestEffortReader::new(prefix(), reader_id());
159        let s1 = r.recv_datagram(&bytes_a).unwrap();
160        let s2 = r.recv_datagram(&bytes_b).unwrap();
161        assert_eq!(s1[0].writer_sn, SequenceNumber(1));
162        assert_eq!(s2[0].writer_sn, SequenceNumber(2));
163    }
164}