Skip to main content

zerodds_discovery/
spdp.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Simple Participant Discovery Protocol (DDSI-RTPS 2.5 §8.5.3).
4//!
5//! Best-Effort-Beacon. Beacon-Sender baut ein DATA-Datagram mit
6//! `ParticipantBuiltinTopicData` als PL_CDR_LE-Payload; der Caller
7//! sendet es periodisch via Multicast. Beacon-Receiver parst
8//! eingehende DATA-Submessages mit der SPDP-Reader-EntityId und liefert
9//! ein `DiscoveredParticipant` an den Cache.
10//!
11//! Lease-Tracking: `DiscoveredParticipantsCache` führt `last_seen` pro
12//! Participant; Caller (DCPS-Runtime) räumt abgelaufene Einträge gemäss
13//! `participant.lease_duration` (PID_PARTICIPANT_LEASE_DURATION) auf.
14
15extern crate alloc;
16use alloc::collections::BTreeMap;
17use alloc::vec::Vec;
18
19use core::fmt;
20
21use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
22use zerodds_rtps::error::WireError;
23use zerodds_rtps::header::RtpsHeader;
24use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
25use zerodds_rtps::submessages::DataSubmessage;
26use zerodds_rtps::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
27
28/// SPDP-spezifische Fehler.
29#[derive(Debug, Clone, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum SpdpError {
32    /// Wire-Decoding-Fehler (Datagram, Submessage, ParameterList).
33    Wire(WireError),
34    /// Datagram enthaelt keine SPDP-DATA-Submessage (Reader/Writer-IDs
35    /// passen nicht).
36    NotSpdp,
37}
38
39impl fmt::Display for SpdpError {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Self::Wire(e) => write!(f, "spdp wire error: {e}"),
43            Self::NotSpdp => f.write_str("spdp: datagram is not an SPDP DATA submessage"),
44        }
45    }
46}
47
48impl From<WireError> for SpdpError {
49    fn from(e: WireError) -> Self {
50        Self::Wire(e)
51    }
52}
53
54#[cfg(feature = "std")]
55impl std::error::Error for SpdpError {}
56
57// ============================================================================
58// Beacon-Sender
59// ============================================================================
60
61/// SPDP-Beacon-Sender. Stateless: ruft `serialize()` auf, um ein
62/// fertiges Datagram zu produzieren, das der Caller via Multicast
63/// sendet.
64#[derive(Debug, Clone)]
65pub struct SpdpBeacon {
66    /// Eigene Participant-Daten.
67    pub data: ParticipantBuiltinTopicData,
68    /// VendorId fuer den RTPS-Header (default ZeroDDS).
69    pub vendor_id: VendorId,
70    /// Naechste Sequence-Number fuer DATA-Submessages.
71    pub next_sn: i64,
72}
73
74impl SpdpBeacon {
75    /// Konstruktor.
76    #[must_use]
77    pub fn new(data: ParticipantBuiltinTopicData) -> Self {
78        Self {
79            data,
80            vendor_id: VendorId::ZERODDS,
81            next_sn: 1,
82        }
83    }
84
85    /// Setzt eine bestimmte VendorId (sonst Default `ZERODDS`).
86    pub fn set_vendor_id(&mut self, vendor: VendorId) {
87        self.vendor_id = vendor;
88    }
89
90    /// Encoded ein SPDP-Beacon-Datagram.
91    ///
92    /// # Errors
93    /// `WireError`, wenn DATA-Body groesser als u16::MAX oder Encoding
94    /// scheitert.
95    pub fn serialize(&mut self) -> Result<Vec<u8>, WireError> {
96        #[cfg(feature = "metrics")]
97        crate::metrics::inc_spdp_announcement_sent();
98        let payload = self.data.to_pl_cdr_le();
99        let sn = SequenceNumber(self.next_sn);
100        self.next_sn = self
101            .next_sn
102            .checked_add(1)
103            .ok_or(WireError::ValueOutOfRange {
104                message: "spdp beacon sequence overflow",
105            })?;
106        let data = DataSubmessage {
107            extra_flags: 0,
108            reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
109            writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
110            writer_sn: sn,
111            inline_qos: None,
112            key_flag: false,
113            non_standard_flag: false,
114            serialized_payload: payload.into(),
115        };
116        let header = RtpsHeader::new(self.vendor_id, self.data.guid.prefix);
117        encode_data_datagram(header, &[data])
118    }
119}
120
121// ============================================================================
122// Beacon-Receiver
123// ============================================================================
124
125/// SPDP-Beacon-Receiver. Stateless: nimmt ein Datagram und versucht,
126/// daraus eine DiscoveredParticipant-Info zu extrahieren.
127#[derive(Debug, Clone, Default)]
128pub struct SpdpReader;
129
130impl SpdpReader {
131    /// Konstruktor.
132    #[must_use]
133    pub fn new() -> Self {
134        Self
135    }
136
137    /// Versucht, einen DiscoveredParticipant aus einem Datagram zu
138    /// extrahieren.
139    ///
140    /// # Errors
141    /// - `SpdpError::Wire` bei Decoder-Fehler.
142    /// - `SpdpError::NotSpdp` wenn keine SPDP-DATA-Submessage darin ist.
143    pub fn parse_datagram(&self, datagram: &[u8]) -> Result<DiscoveredParticipant, SpdpError> {
144        let parsed = decode_datagram(datagram)?;
145        for sub in parsed.submessages {
146            if let ParsedSubmessage::Data(d) = sub {
147                if d.writer_id == EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER {
148                    match ParticipantBuiltinTopicData::from_pl_cdr_le(&d.serialized_payload) {
149                        Ok(data) => {
150                            return Ok(DiscoveredParticipant {
151                                sender_prefix: parsed.header.guid_prefix,
152                                sender_vendor: parsed.header.vendor_id,
153                                data,
154                            });
155                        }
156                        // Fremde Encapsulation (z.B. PL_CDR2 von Cyclone/Fast-DDS):
157                        // kein echter Bug, einfach stillschweigend ueberspringen.
158                        // Wird bei XCDR2-Rollout (Phase 1/2) ersetzt.
159                        Err(WireError::UnsupportedEncapsulation { .. }) => continue,
160                        Err(e) => return Err(SpdpError::Wire(e)),
161                    }
162                }
163            }
164        }
165        Err(SpdpError::NotSpdp)
166    }
167}
168
169/// Ergebnis einer SPDP-Beacon-Reception.
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct DiscoveredParticipant {
172    /// GuidPrefix aus dem RTPS-Header (sollte mit `data.guid.prefix`
173    /// uebereinstimmen, kann bei Mis-Configuration abweichen).
174    pub sender_prefix: GuidPrefix,
175    /// VendorId aus dem RTPS-Header.
176    pub sender_vendor: VendorId,
177    /// Geparste Participant-Daten.
178    pub data: ParticipantBuiltinTopicData,
179}
180
181// ============================================================================
182// Cache
183// ============================================================================
184
185/// In-Memory-Cache aller bisher entdeckten Participants. `last_seen`
186/// pro Eintrag wird vom Caller (DCPS-Runtime) gegen
187/// `participant.lease_duration` geprüft, um abgelaufene Participants zu
188/// purgen — der Cache selbst erzwingt kein Timeout.
189#[derive(Debug, Clone, Default)]
190pub struct DiscoveredParticipantsCache {
191    inner: BTreeMap<GuidPrefix, DiscoveredParticipant>,
192}
193
194impl DiscoveredParticipantsCache {
195    /// Leerer Cache.
196    #[must_use]
197    pub fn new() -> Self {
198        Self {
199            inner: BTreeMap::new(),
200        }
201    }
202
203    /// Insert/Update. Liefert `true` wenn ein NEUER Participant
204    /// hinzugekommen ist (Caller kann darauf einen Listener aufrufen).
205    pub fn insert(&mut self, p: DiscoveredParticipant) -> bool {
206        let inserted = self.inner.insert(p.data.guid.prefix, p).is_none();
207        if inserted {
208            #[cfg(feature = "metrics")]
209            crate::metrics::set_participants_known(self.inner.len());
210        }
211        inserted
212    }
213
214    /// Anzahl bekannte Participants.
215    #[must_use]
216    pub fn len(&self) -> usize {
217        self.inner.len()
218    }
219
220    /// `true` wenn leer.
221    #[must_use]
222    pub fn is_empty(&self) -> bool {
223        self.inner.is_empty()
224    }
225
226    /// Lookup nach Prefix.
227    #[must_use]
228    pub fn get(&self, prefix: &GuidPrefix) -> Option<&DiscoveredParticipant> {
229        self.inner.get(prefix)
230    }
231
232    /// Iter ueber alle bekannten Participants.
233    pub fn iter(&self) -> impl Iterator<Item = &DiscoveredParticipant> {
234        self.inner.values()
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
241    use super::*;
242    use zerodds_rtps::participant_data::{Duration, endpoint_flag};
243    use zerodds_rtps::wire_types::{Guid, Locator, ProtocolVersion};
244
245    fn sample_participant() -> ParticipantBuiltinTopicData {
246        ParticipantBuiltinTopicData {
247            guid: Guid::new(GuidPrefix::from_bytes([0xA; 12]), EntityId::PARTICIPANT),
248            protocol_version: ProtocolVersion::V2_5,
249            vendor_id: VendorId::ZERODDS,
250            default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 1], 7410)),
251            default_multicast_locator: Some(Locator::udp_v4([239, 255, 0, 1], 7400)),
252            metatraffic_unicast_locator: None,
253            metatraffic_multicast_locator: None,
254            domain_id: None,
255            builtin_endpoint_set: endpoint_flag::PARTICIPANT_ANNOUNCER
256                | endpoint_flag::PARTICIPANT_DETECTOR,
257            lease_duration: Duration::from_secs(100),
258            user_data: alloc::vec::Vec::new(),
259            properties: Default::default(),
260            identity_token: None,
261            permissions_token: None,
262            identity_status_token: None,
263            sig_algo_info: None,
264            kx_algo_info: None,
265            sym_cipher_algo_info: None,
266        }
267    }
268
269    #[test]
270    fn beacon_serializes_to_decodable_datagram() {
271        let mut beacon = SpdpBeacon::new(sample_participant());
272        let datagram = beacon.serialize().unwrap();
273        let reader = SpdpReader::new();
274        let discovered = reader.parse_datagram(&datagram).unwrap();
275        assert_eq!(
276            discovered.data.guid.prefix,
277            GuidPrefix::from_bytes([0xA; 12])
278        );
279        assert_eq!(discovered.sender_vendor, VendorId::ZERODDS);
280    }
281
282    #[test]
283    fn beacon_increments_sequence_number() {
284        let mut beacon = SpdpBeacon::new(sample_participant());
285        beacon.serialize().unwrap();
286        assert_eq!(beacon.next_sn, 2);
287        beacon.serialize().unwrap();
288        assert_eq!(beacon.next_sn, 3);
289    }
290
291    #[test]
292    fn beacon_uses_spdp_builtin_writer_id() {
293        let mut beacon = SpdpBeacon::new(sample_participant());
294        let datagram = beacon.serialize().unwrap();
295        let parsed = decode_datagram(&datagram).unwrap();
296        match &parsed.submessages[0] {
297            ParsedSubmessage::Data(d) => {
298                assert_eq!(d.writer_id, EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER);
299                assert_eq!(d.reader_id, EntityId::SPDP_BUILTIN_PARTICIPANT_READER);
300            }
301            other => panic!("expected DATA, got {other:?}"),
302        }
303    }
304
305    #[test]
306    fn reader_rejects_non_spdp_datagram() {
307        // Ein normales User-DATA, kein SPDP.
308        let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]));
309        let data = DataSubmessage {
310            extra_flags: 0,
311            reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
312            writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
313            writer_sn: SequenceNumber(1),
314            inline_qos: None,
315            key_flag: false,
316            non_standard_flag: false,
317            serialized_payload: alloc::vec![1, 2, 3, 4].into(),
318        };
319        let datagram = encode_data_datagram(header, &[data]).unwrap();
320        let reader = SpdpReader::new();
321        let res = reader.parse_datagram(&datagram);
322        assert!(matches!(res, Err(SpdpError::NotSpdp)));
323    }
324
325    #[test]
326    fn reader_propagates_invalid_magic_as_wire_error() {
327        let reader = SpdpReader::new();
328        let res = reader.parse_datagram(&[0u8; 32]);
329        assert!(matches!(res, Err(SpdpError::Wire(_))));
330    }
331
332    #[test]
333    fn cache_starts_empty() {
334        let c = DiscoveredParticipantsCache::new();
335        assert!(c.is_empty());
336        assert_eq!(c.len(), 0);
337    }
338
339    #[test]
340    fn cache_insert_returns_true_for_new_participant() {
341        let mut c = DiscoveredParticipantsCache::new();
342        let mut beacon = SpdpBeacon::new(sample_participant());
343        let datagram = beacon.serialize().unwrap();
344        let p = SpdpReader::new().parse_datagram(&datagram).unwrap();
345        assert!(c.insert(p.clone()));
346        assert_eq!(c.len(), 1);
347        // Zweites Insert mit gleichem Prefix → false.
348        assert!(!c.insert(p));
349        assert_eq!(c.len(), 1);
350    }
351
352    #[test]
353    fn cache_get_returns_inserted_participant() {
354        let mut c = DiscoveredParticipantsCache::new();
355        let mut beacon = SpdpBeacon::new(sample_participant());
356        let datagram = beacon.serialize().unwrap();
357        let p = SpdpReader::new().parse_datagram(&datagram).unwrap();
358        let prefix = p.data.guid.prefix;
359        c.insert(p);
360        assert!(c.get(&prefix).is_some());
361    }
362
363    #[test]
364    fn cache_iter_yields_all_known_participants() {
365        let mut c = DiscoveredParticipantsCache::new();
366        let mut p1 = sample_participant();
367        let mut p2 = sample_participant();
368        p1.guid = Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT);
369        p2.guid = Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT);
370        let mut b1 = SpdpBeacon::new(p1);
371        let mut b2 = SpdpBeacon::new(p2);
372        let d1 = b1.serialize().unwrap();
373        let d2 = b2.serialize().unwrap();
374        c.insert(SpdpReader::new().parse_datagram(&d1).unwrap());
375        c.insert(SpdpReader::new().parse_datagram(&d2).unwrap());
376        assert_eq!(c.iter().count(), 2);
377    }
378}