1extern crate alloc;
11use alloc::vec::Vec;
12
13use crate::datagram::{ParsedSubmessage, decode_datagram};
14use crate::error::WireError;
15use crate::wire_types::{EntityId, EntityKind, Guid, GuidPrefix, SequenceNumber};
16
17#[derive(Debug, Clone)]
24pub struct BestEffortReader {
25 guid: Guid,
26}
27
28impl BestEffortReader {
29 #[must_use]
31 pub fn new(participant_prefix: GuidPrefix, reader_id: EntityId) -> Self {
32 Self {
33 guid: Guid::new(participant_prefix, reader_id),
34 }
35 }
36
37 #[must_use]
39 pub fn guid(&self) -> Guid {
40 self.guid
41 }
42
43 pub fn recv_datagram(&self, datagram: &[u8]) -> Result<Vec<DeliveredSample>, WireError> {
52 let parsed = decode_datagram(datagram)?;
53 let mut out = Vec::new();
54 for sub in parsed.submessages {
55 if let ParsedSubmessage::Data(d) = sub {
56 if matches_reader(&d.reader_id, &self.guid.entity_id) {
57 out.push(DeliveredSample {
58 writer_id: d.writer_id,
59 writer_sn: d.writer_sn,
60 payload: d.serialized_payload,
63 });
64 }
65 }
66 }
69 Ok(out)
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct DeliveredSample {
76 pub writer_id: EntityId,
79 pub writer_sn: SequenceNumber,
81 pub payload: alloc::sync::Arc<[u8]>,
84}
85
86fn matches_reader(target: &EntityId, our: &EntityId) -> bool {
87 if target.entity_kind == EntityKind::Unknown && target.entity_key == [0; 3] {
89 return true;
90 }
91 target == our
92}
93
94#[cfg(test)]
95mod tests {
96 #![allow(clippy::expect_used, clippy::unwrap_used)]
97 use super::*;
98 use crate::writer::BestEffortWriter;
99
100 fn reader_id() -> EntityId {
101 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0])
102 }
103 fn writer_id() -> EntityId {
104 EntityId::user_writer_with_key([0x10, 0x20, 0x30])
105 }
106 fn prefix() -> GuidPrefix {
107 GuidPrefix::from_bytes([1; 12])
108 }
109
110 #[test]
111 fn reader_delivers_data_addressed_to_us() {
112 let mut w = BestEffortWriter::new(prefix(), writer_id(), reader_id());
113 let bytes = w.write(b"hello").unwrap();
114 let r = BestEffortReader::new(prefix(), reader_id());
115 let samples = r.recv_datagram(&bytes).unwrap();
116 assert_eq!(samples.len(), 1);
117 assert_eq!(samples[0].payload.as_ref(), &b"hello"[..]);
118 assert_eq!(samples[0].writer_id, writer_id());
119 assert_eq!(samples[0].writer_sn, SequenceNumber(1));
120 }
121
122 #[test]
123 fn reader_drops_data_addressed_to_other_reader() {
124 let other_reader = EntityId::user_reader_with_key([0x99, 0x99, 0x99]);
125 let mut w = BestEffortWriter::new(prefix(), writer_id(), other_reader);
126 let bytes = w.write(b"not for you").unwrap();
127 let r = BestEffortReader::new(prefix(), reader_id());
128 let samples = r.recv_datagram(&bytes).unwrap();
129 assert!(samples.is_empty());
130 }
131
132 #[test]
133 fn reader_accepts_unknown_wildcard_target() {
134 let mut w = BestEffortWriter::new(prefix(), writer_id(), EntityId::UNKNOWN);
135 let bytes = w.write(b"broadcast").unwrap();
136 let r = BestEffortReader::new(prefix(), reader_id());
137 let samples = r.recv_datagram(&bytes).unwrap();
138 assert_eq!(samples.len(), 1);
139 assert_eq!(samples[0].payload.as_ref(), &b"broadcast"[..]);
140 }
141
142 #[test]
143 fn reader_propagates_invalid_magic_error() {
144 let r = BestEffortReader::new(prefix(), reader_id());
145 let bytes = [0u8; 32];
146 let res = r.recv_datagram(&bytes);
147 assert!(matches!(res, Err(WireError::InvalidMagic { .. })));
148 }
149
150 #[test]
151 fn reader_handles_multiple_data_in_one_datagram() {
152 let mut w = BestEffortWriter::new(prefix(), writer_id(), reader_id());
154 let bytes_a = w.write(b"first").unwrap();
155 let bytes_b = w.write(b"second").unwrap();
156 let r = BestEffortReader::new(prefix(), reader_id());
159 let s1 = r.recv_datagram(&bytes_a).unwrap();
160 let s2 = r.recv_datagram(&bytes_b).unwrap();
161 assert_eq!(s1[0].writer_sn, SequenceNumber(1));
162 assert_eq!(s2[0].writer_sn, SequenceNumber(2));
163 }
164}