1extern crate alloc;
16use alloc::collections::BTreeMap;
17use alloc::vec::Vec;
18
19use core::fmt;
20
21use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
22use zerodds_rtps::error::WireError;
23use zerodds_rtps::header::RtpsHeader;
24use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
25use zerodds_rtps::submessages::DataSubmessage;
26use zerodds_rtps::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum SpdpError {
32 Wire(WireError),
34 NotSpdp,
37}
38
39impl fmt::Display for SpdpError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::Wire(e) => write!(f, "spdp wire error: {e}"),
43 Self::NotSpdp => f.write_str("spdp: datagram is not an SPDP DATA submessage"),
44 }
45 }
46}
47
48impl From<WireError> for SpdpError {
49 fn from(e: WireError) -> Self {
50 Self::Wire(e)
51 }
52}
53
54#[cfg(feature = "std")]
55impl std::error::Error for SpdpError {}
56
57#[derive(Debug, Clone)]
65pub struct SpdpBeacon {
66 pub data: ParticipantBuiltinTopicData,
68 pub vendor_id: VendorId,
70 pub next_sn: i64,
72}
73
74impl SpdpBeacon {
75 #[must_use]
77 pub fn new(data: ParticipantBuiltinTopicData) -> Self {
78 Self {
79 data,
80 vendor_id: VendorId::ZERODDS,
81 next_sn: 1,
82 }
83 }
84
85 pub fn set_vendor_id(&mut self, vendor: VendorId) {
87 self.vendor_id = vendor;
88 }
89
90 pub fn serialize(&mut self) -> Result<Vec<u8>, WireError> {
96 #[cfg(feature = "metrics")]
97 crate::metrics::inc_spdp_announcement_sent();
98 let payload = self.data.to_pl_cdr_le();
99 let sn = SequenceNumber(self.next_sn);
100 self.next_sn = self
101 .next_sn
102 .checked_add(1)
103 .ok_or(WireError::ValueOutOfRange {
104 message: "spdp beacon sequence overflow",
105 })?;
106 let data = DataSubmessage {
107 extra_flags: 0,
108 reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
109 writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
110 writer_sn: sn,
111 inline_qos: None,
112 key_flag: false,
113 non_standard_flag: false,
114 serialized_payload: payload.into(),
115 };
116 let header = RtpsHeader::new(self.vendor_id, self.data.guid.prefix);
117 encode_data_datagram(header, &[data])
118 }
119}
120
121#[derive(Debug, Clone, Default)]
128pub struct SpdpReader;
129
130impl SpdpReader {
131 #[must_use]
133 pub fn new() -> Self {
134 Self
135 }
136
137 pub fn parse_datagram(&self, datagram: &[u8]) -> Result<DiscoveredParticipant, SpdpError> {
144 let parsed = decode_datagram(datagram)?;
145 for sub in parsed.submessages {
146 if let ParsedSubmessage::Data(d) = sub {
147 if d.writer_id == EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER {
148 match ParticipantBuiltinTopicData::from_pl_cdr_le(&d.serialized_payload) {
149 Ok(data) => {
150 return Ok(DiscoveredParticipant {
151 sender_prefix: parsed.header.guid_prefix,
152 sender_vendor: parsed.header.vendor_id,
153 data,
154 });
155 }
156 Err(WireError::UnsupportedEncapsulation { .. }) => continue,
160 Err(e) => return Err(SpdpError::Wire(e)),
161 }
162 }
163 }
164 }
165 Err(SpdpError::NotSpdp)
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct DiscoveredParticipant {
172 pub sender_prefix: GuidPrefix,
175 pub sender_vendor: VendorId,
177 pub data: ParticipantBuiltinTopicData,
179}
180
181#[derive(Debug, Clone, Default)]
190pub struct DiscoveredParticipantsCache {
191 inner: BTreeMap<GuidPrefix, DiscoveredParticipant>,
192}
193
194impl DiscoveredParticipantsCache {
195 #[must_use]
197 pub fn new() -> Self {
198 Self {
199 inner: BTreeMap::new(),
200 }
201 }
202
203 pub fn insert(&mut self, p: DiscoveredParticipant) -> bool {
206 let inserted = self.inner.insert(p.data.guid.prefix, p).is_none();
207 if inserted {
208 #[cfg(feature = "metrics")]
209 crate::metrics::set_participants_known(self.inner.len());
210 }
211 inserted
212 }
213
214 #[must_use]
216 pub fn len(&self) -> usize {
217 self.inner.len()
218 }
219
220 #[must_use]
222 pub fn is_empty(&self) -> bool {
223 self.inner.is_empty()
224 }
225
226 #[must_use]
228 pub fn get(&self, prefix: &GuidPrefix) -> Option<&DiscoveredParticipant> {
229 self.inner.get(prefix)
230 }
231
232 pub fn iter(&self) -> impl Iterator<Item = &DiscoveredParticipant> {
234 self.inner.values()
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
241 use super::*;
242 use zerodds_rtps::participant_data::{Duration, endpoint_flag};
243 use zerodds_rtps::wire_types::{Guid, Locator, ProtocolVersion};
244
245 fn sample_participant() -> ParticipantBuiltinTopicData {
246 ParticipantBuiltinTopicData {
247 guid: Guid::new(GuidPrefix::from_bytes([0xA; 12]), EntityId::PARTICIPANT),
248 protocol_version: ProtocolVersion::V2_5,
249 vendor_id: VendorId::ZERODDS,
250 default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 1], 7410)),
251 default_multicast_locator: Some(Locator::udp_v4([239, 255, 0, 1], 7400)),
252 metatraffic_unicast_locator: None,
253 metatraffic_multicast_locator: None,
254 domain_id: None,
255 builtin_endpoint_set: endpoint_flag::PARTICIPANT_ANNOUNCER
256 | endpoint_flag::PARTICIPANT_DETECTOR,
257 lease_duration: Duration::from_secs(100),
258 user_data: alloc::vec::Vec::new(),
259 properties: Default::default(),
260 identity_token: None,
261 permissions_token: None,
262 identity_status_token: None,
263 sig_algo_info: None,
264 kx_algo_info: None,
265 sym_cipher_algo_info: None,
266 }
267 }
268
269 #[test]
270 fn beacon_serializes_to_decodable_datagram() {
271 let mut beacon = SpdpBeacon::new(sample_participant());
272 let datagram = beacon.serialize().unwrap();
273 let reader = SpdpReader::new();
274 let discovered = reader.parse_datagram(&datagram).unwrap();
275 assert_eq!(
276 discovered.data.guid.prefix,
277 GuidPrefix::from_bytes([0xA; 12])
278 );
279 assert_eq!(discovered.sender_vendor, VendorId::ZERODDS);
280 }
281
282 #[test]
283 fn beacon_increments_sequence_number() {
284 let mut beacon = SpdpBeacon::new(sample_participant());
285 beacon.serialize().unwrap();
286 assert_eq!(beacon.next_sn, 2);
287 beacon.serialize().unwrap();
288 assert_eq!(beacon.next_sn, 3);
289 }
290
291 #[test]
292 fn beacon_uses_spdp_builtin_writer_id() {
293 let mut beacon = SpdpBeacon::new(sample_participant());
294 let datagram = beacon.serialize().unwrap();
295 let parsed = decode_datagram(&datagram).unwrap();
296 match &parsed.submessages[0] {
297 ParsedSubmessage::Data(d) => {
298 assert_eq!(d.writer_id, EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER);
299 assert_eq!(d.reader_id, EntityId::SPDP_BUILTIN_PARTICIPANT_READER);
300 }
301 other => panic!("expected DATA, got {other:?}"),
302 }
303 }
304
305 #[test]
306 fn reader_rejects_non_spdp_datagram() {
307 let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]));
309 let data = DataSubmessage {
310 extra_flags: 0,
311 reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
312 writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
313 writer_sn: SequenceNumber(1),
314 inline_qos: None,
315 key_flag: false,
316 non_standard_flag: false,
317 serialized_payload: alloc::vec![1, 2, 3, 4].into(),
318 };
319 let datagram = encode_data_datagram(header, &[data]).unwrap();
320 let reader = SpdpReader::new();
321 let res = reader.parse_datagram(&datagram);
322 assert!(matches!(res, Err(SpdpError::NotSpdp)));
323 }
324
325 #[test]
326 fn reader_propagates_invalid_magic_as_wire_error() {
327 let reader = SpdpReader::new();
328 let res = reader.parse_datagram(&[0u8; 32]);
329 assert!(matches!(res, Err(SpdpError::Wire(_))));
330 }
331
332 #[test]
333 fn cache_starts_empty() {
334 let c = DiscoveredParticipantsCache::new();
335 assert!(c.is_empty());
336 assert_eq!(c.len(), 0);
337 }
338
339 #[test]
340 fn cache_insert_returns_true_for_new_participant() {
341 let mut c = DiscoveredParticipantsCache::new();
342 let mut beacon = SpdpBeacon::new(sample_participant());
343 let datagram = beacon.serialize().unwrap();
344 let p = SpdpReader::new().parse_datagram(&datagram).unwrap();
345 assert!(c.insert(p.clone()));
346 assert_eq!(c.len(), 1);
347 assert!(!c.insert(p));
349 assert_eq!(c.len(), 1);
350 }
351
352 #[test]
353 fn cache_get_returns_inserted_participant() {
354 let mut c = DiscoveredParticipantsCache::new();
355 let mut beacon = SpdpBeacon::new(sample_participant());
356 let datagram = beacon.serialize().unwrap();
357 let p = SpdpReader::new().parse_datagram(&datagram).unwrap();
358 let prefix = p.data.guid.prefix;
359 c.insert(p);
360 assert!(c.get(&prefix).is_some());
361 }
362
363 #[test]
364 fn cache_iter_yields_all_known_participants() {
365 let mut c = DiscoveredParticipantsCache::new();
366 let mut p1 = sample_participant();
367 let mut p2 = sample_participant();
368 p1.guid = Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT);
369 p2.guid = Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT);
370 let mut b1 = SpdpBeacon::new(p1);
371 let mut b2 = SpdpBeacon::new(p2);
372 let d1 = b1.serialize().unwrap();
373 let d2 = b2.serialize().unwrap();
374 c.insert(SpdpReader::new().parse_datagram(&d1).unwrap());
375 c.insert(SpdpReader::new().parse_datagram(&d2).unwrap());
376 assert_eq!(c.iter().count(), 2);
377 }
378}