Skip to main content

zerodds_discovery/sedp/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! SEDP Builtin Reliable Readers — Publications + Subscriptions.
4//!
5//! Wrapper um [`zerodds_rtps::reliable_reader::ReliableReader`] mit festen
6//! SEDP-EntityIds. Der Wrapper deserialisiert eingehende DATA-Payloads
7//! als PL_CDR_LE-encoded [`PublicationBuiltinTopicData`] bzw.
8//! [`SubscriptionBuiltinTopicData`] und liefert sie als typisierte
9//! Samples zurueck.
10//!
11//! **Multi-Writer-Support** (ab WP 1.4 T4.5): der interne
12//! `ReliableReader` kann via `add_writer_proxy` mehrere Remote-
13//! Writer parallel tracken — Discovery von N Remote-Participants
14//! laeuft ueber denselben SEDP-Reader.
15
16extern crate alloc;
17use alloc::string::String;
18use alloc::vec::Vec;
19use core::time::Duration;
20
21use zerodds_rtps::error::WireError;
22use zerodds_rtps::fragment_assembler::AssemblerCaps;
23use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
24use zerodds_rtps::reliable_reader::{
25    DEFAULT_HEARTBEAT_RESPONSE_DELAY, ReliableReader, ReliableReaderConfig,
26};
27use zerodds_rtps::submessages::{
28    DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage,
29};
30use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
31use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, VendorId};
32use zerodds_rtps::writer_proxy::WriterProxy;
33
34/// Default-Kapazitaet fuer den Reader-Empfangs-Cache.
35pub const SEDP_READER_MAX_SAMPLES: usize = 256;
36
37/// SEDP-Reader-Fehler.
38#[derive(Debug, Clone, PartialEq, Eq)]
39#[non_exhaustive]
40pub enum SedpReaderError {
41    /// PL_CDR-Decode der Payload ist fehlgeschlagen — ungueltige SEDP-
42    /// Ankuendigung. Enthalten ist eine statische Begruendung.
43    InvalidPayload {
44        /// Begruendung (z.B. "TOPIC_NAME missing").
45        reason: &'static str,
46    },
47    /// Wire-Level-Fehler im ReliableReader (z.B. Encode-Overflow).
48    Wire(WireError),
49}
50
51impl From<WireError> for SedpReaderError {
52    fn from(e: WireError) -> Self {
53        match e {
54            WireError::ValueOutOfRange { message } => Self::InvalidPayload { reason: message },
55            other => Self::Wire(other),
56        }
57    }
58}
59
60/// Reader fuer SEDP-Publications (feste EntityId
61/// [`EntityId::SEDP_BUILTIN_PUBLICATIONS_READER`]).
62#[derive(Debug)]
63pub struct SedpPublicationsReader {
64    inner: ReliableReader,
65}
66
67/// Reader fuer SEDP-Subscriptions (feste EntityId
68/// [`EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER`]).
69#[derive(Debug)]
70pub struct SedpSubscriptionsReader {
71    inner: ReliableReader,
72}
73
74impl SedpPublicationsReader {
75    /// Erzeugt einen SEDP-Publications-Reader fuer den lokalen
76    /// Participant. Der `remote_writer_guid` muss bekannt sein — in
77    /// Phase 1 wird ein Platzhalter-Proxy mit `remote_prefix` +
78    /// [`EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER`] gebaut.
79    #[must_use]
80    pub fn new(
81        participant_prefix: GuidPrefix,
82        vendor_id: VendorId,
83        remote_writer_prefix: GuidPrefix,
84        remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
85    ) -> Self {
86        let reader_guid = Guid::new(
87            participant_prefix,
88            EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
89        );
90        let remote_writer_guid = Guid::new(
91            remote_writer_prefix,
92            EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
93        );
94        Self {
95            inner: make_sedp_reader(
96                reader_guid,
97                vendor_id,
98                remote_writer_guid,
99                remote_metatraffic_unicast,
100            ),
101        }
102    }
103
104    /// GUID des Readers.
105    #[must_use]
106    pub fn guid(&self) -> Guid {
107        self.inner.guid()
108    }
109
110    /// Fuegt einen weiteren Remote-SEDP-Writer als Proxy hinzu
111    /// (Multi-Participant-Discovery).
112    pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
113        self.inner.add_writer_proxy(proxy);
114    }
115
116    /// Entfernt einen Remote-Writer (z.B. nach SPDP-Lease-Timeout).
117    pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
118        self.inner.remove_writer_proxy(guid)
119    }
120
121    /// Verarbeitet eine eingegangene DATA-Submessage und liefert
122    /// deserialisierte [`PublicationBuiltinTopicData`]-Samples zurueck
123    /// (in SN-Reihenfolge).
124    ///
125    /// # Errors
126    /// [`SedpReaderError::InvalidPayload`] wenn der Payload kein
127    /// valides PL_CDR_LE ist.
128    pub fn handle_data(
129        &mut self,
130        data: &DataSubmessage,
131    ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
132        let samples = self.inner.handle_data(data);
133        decode_publication_samples(samples.into_iter().map(|s| s.payload))
134    }
135
136    /// Verarbeitet eine eingegangene DATA_FRAG-Submessage.
137    ///
138    /// # Errors
139    /// siehe [`handle_data`](Self::handle_data).
140    pub fn handle_data_frag(
141        &mut self,
142        df: &DataFragSubmessage,
143        now: Duration,
144    ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
145        let samples = self.inner.handle_data_frag(df, now);
146        decode_publication_samples(samples.into_iter().map(|s| s.payload))
147    }
148
149    /// Verarbeitet eine eingegangene GAP-Submessage (liefert etwaige
150    /// Folge-Samples, die jetzt in-order delivered werden koennen).
151    ///
152    /// # Errors
153    /// siehe [`handle_data`](Self::handle_data).
154    pub fn handle_gap(
155        &mut self,
156        gap: &GapSubmessage,
157    ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
158        let samples = self.inner.handle_gap(gap);
159        decode_publication_samples(samples.into_iter().map(|s| s.payload))
160    }
161
162    /// Verarbeitet eine eingegangene HEARTBEAT.
163    pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
164        self.inner.handle_heartbeat(hb, now);
165    }
166
167    /// Tick — liefert ACKNACK/NACK_FRAG-Datagramme, wenn faellig.
168    ///
169    /// # Errors
170    /// Wire-Encode-Fehler.
171    pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
172        self.inner.tick(now)
173    }
174
175    /// Tick mit Ziel-Locators pro Datagramm.
176    ///
177    /// # Errors
178    /// Wire-Encode-Fehler.
179    pub fn tick_outbound(
180        &mut self,
181        now: Duration,
182    ) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>, WireError> {
183        self.inner.tick_outbound(now)
184    }
185
186    /// Read-only-Zugriff.
187    #[must_use]
188    pub fn inner(&self) -> &ReliableReader {
189        &self.inner
190    }
191}
192
193impl SedpSubscriptionsReader {
194    /// Erzeugt einen SEDP-Subscriptions-Reader.
195    #[must_use]
196    pub fn new(
197        participant_prefix: GuidPrefix,
198        vendor_id: VendorId,
199        remote_writer_prefix: GuidPrefix,
200        remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
201    ) -> Self {
202        let reader_guid = Guid::new(
203            participant_prefix,
204            EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER,
205        );
206        let remote_writer_guid = Guid::new(
207            remote_writer_prefix,
208            EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER,
209        );
210        Self {
211            inner: make_sedp_reader(
212                reader_guid,
213                vendor_id,
214                remote_writer_guid,
215                remote_metatraffic_unicast,
216            ),
217        }
218    }
219
220    /// GUID.
221    #[must_use]
222    pub fn guid(&self) -> Guid {
223        self.inner.guid()
224    }
225
226    /// Fuegt einen weiteren Remote-SEDP-Writer als Proxy hinzu.
227    pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
228        self.inner.add_writer_proxy(proxy);
229    }
230
231    /// Entfernt einen Remote-Writer.
232    pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
233        self.inner.remove_writer_proxy(guid)
234    }
235
236    /// Verarbeitet eine DATA-Submessage.
237    ///
238    /// # Errors
239    /// siehe [`SedpPublicationsReader::handle_data`].
240    pub fn handle_data(
241        &mut self,
242        data: &DataSubmessage,
243    ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
244        let samples = self.inner.handle_data(data);
245        decode_subscription_samples(samples.into_iter().map(|s| s.payload))
246    }
247
248    /// DATA_FRAG.
249    ///
250    /// # Errors
251    /// siehe [`handle_data`](Self::handle_data).
252    pub fn handle_data_frag(
253        &mut self,
254        df: &DataFragSubmessage,
255        now: Duration,
256    ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
257        let samples = self.inner.handle_data_frag(df, now);
258        decode_subscription_samples(samples.into_iter().map(|s| s.payload))
259    }
260
261    /// GAP.
262    ///
263    /// # Errors
264    /// siehe [`handle_data`](Self::handle_data).
265    pub fn handle_gap(
266        &mut self,
267        gap: &GapSubmessage,
268    ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
269        let samples = self.inner.handle_gap(gap);
270        decode_subscription_samples(samples.into_iter().map(|s| s.payload))
271    }
272
273    /// HEARTBEAT.
274    pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
275        self.inner.handle_heartbeat(hb, now);
276    }
277
278    /// Tick.
279    ///
280    /// # Errors
281    /// Wire-Encode-Fehler.
282    pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
283        self.inner.tick(now)
284    }
285
286    /// Tick mit Ziel-Locators.
287    ///
288    /// # Errors
289    /// Wire-Encode-Fehler.
290    pub fn tick_outbound(
291        &mut self,
292        now: Duration,
293    ) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>, WireError> {
294        self.inner.tick_outbound(now)
295    }
296
297    /// Read-only-Zugriff.
298    #[must_use]
299    pub fn inner(&self) -> &ReliableReader {
300        &self.inner
301    }
302}
303
304// ============================================================================
305// Shared SEDP-Reader-Helpers
306// ============================================================================
307
308fn make_sedp_reader(
309    reader_guid: Guid,
310    vendor_id: VendorId,
311    remote_writer_guid: Guid,
312    remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
313) -> ReliableReader {
314    let writer_proxy = WriterProxy::new(
315        remote_writer_guid,
316        remote_metatraffic_unicast,
317        Vec::new(),
318        true,
319    );
320    ReliableReader::new(ReliableReaderConfig {
321        guid: reader_guid,
322        vendor_id,
323        writer_proxies: alloc::vec![writer_proxy],
324        max_samples_per_proxy: SEDP_READER_MAX_SAMPLES,
325        heartbeat_response_delay: DEFAULT_HEARTBEAT_RESPONSE_DELAY,
326        assembler_caps: AssemblerCaps::default(),
327    })
328}
329
330fn decode_publication_samples<B, I>(
331    payloads: I,
332) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError>
333where
334    B: AsRef<[u8]>,
335    I: IntoIterator<Item = B>,
336{
337    let mut out = Vec::new();
338    for p in payloads {
339        out.push(PublicationBuiltinTopicData::from_pl_cdr_le(p.as_ref())?);
340    }
341    Ok(out)
342}
343
344fn decode_subscription_samples<B, I>(
345    payloads: I,
346) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError>
347where
348    B: AsRef<[u8]>,
349    I: IntoIterator<Item = B>,
350{
351    let mut out = Vec::new();
352    for p in payloads {
353        out.push(SubscriptionBuiltinTopicData::from_pl_cdr_le(p.as_ref())?);
354    }
355    Ok(out)
356}
357
358// Referenziere String um unused-import-warn zu vermeiden (fuer
359// SedpReaderError::InvalidPayload.reason = String-ish).
360#[allow(dead_code)]
361const _: Option<String> = None;
362
363#[cfg(test)]
364#[allow(clippy::expect_used, clippy::unwrap_used)]
365mod tests {
366    use super::*;
367    use zerodds_rtps::participant_data::Duration as DdsDuration;
368    use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
369    use zerodds_rtps::submessages::DataSubmessage;
370    use zerodds_rtps::wire_types::{Locator, SequenceNumber};
371
372    fn sample_pub() -> PublicationBuiltinTopicData {
373        PublicationBuiltinTopicData {
374            key: Guid::new(
375                GuidPrefix::from_bytes([2; 12]),
376                EntityId::user_writer_with_key([0xAA, 0xBB, 0xCC]),
377            ),
378            participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
379            topic_name: "ChatterTopic".into(),
380            type_name: "std_msgs::String".into(),
381            durability: DurabilityKind::Volatile,
382            reliability: ReliabilityQos {
383                kind: ReliabilityKind::Reliable,
384                max_blocking_time: DdsDuration::from_secs(10),
385            },
386            ownership: zerodds_qos::OwnershipKind::Shared,
387            ownership_strength: 0,
388            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
389            deadline: zerodds_qos::DeadlineQosPolicy::default(),
390            lifespan: zerodds_qos::LifespanQosPolicy::default(),
391            partition: alloc::vec::Vec::new(),
392            user_data: alloc::vec::Vec::new(),
393            topic_data: alloc::vec::Vec::new(),
394            group_data: alloc::vec::Vec::new(),
395            type_information: None,
396            data_representation: alloc::vec::Vec::new(),
397            security_info: None,
398            service_instance_name: None,
399            related_entity_guid: None,
400            topic_aliases: None,
401            type_identifier: zerodds_types::TypeIdentifier::None,
402        }
403    }
404
405    fn make_reader() -> SedpPublicationsReader {
406        SedpPublicationsReader::new(
407            GuidPrefix::from_bytes([1; 12]),
408            VendorId::ZERODDS,
409            GuidPrefix::from_bytes([2; 12]),
410            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
411        )
412    }
413
414    #[test]
415    fn reader_has_expected_guid() {
416        let r = make_reader();
417        assert_eq!(
418            r.guid().entity_id,
419            EntityId::SEDP_BUILTIN_PUBLICATIONS_READER
420        );
421    }
422
423    #[test]
424    fn handle_data_decodes_publication_payload() {
425        let mut r = make_reader();
426        let p = sample_pub();
427        let payload = p.to_pl_cdr_le().unwrap();
428        let data = DataSubmessage {
429            extra_flags: 0,
430            reader_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
431            writer_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
432            writer_sn: SequenceNumber(1),
433            inline_qos: None,
434            key_flag: false,
435            non_standard_flag: false,
436            serialized_payload: payload.into(),
437        };
438        let out = r.handle_data(&data).unwrap();
439        assert_eq!(out.len(), 1);
440        assert_eq!(out[0].topic_name, "ChatterTopic");
441        assert_eq!(out[0].type_name, "std_msgs::String");
442    }
443
444    #[test]
445    fn handle_data_rejects_invalid_payload() {
446        let mut r = make_reader();
447        let data = DataSubmessage {
448            extra_flags: 0,
449            reader_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
450            writer_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
451            writer_sn: SequenceNumber(1),
452            // Encapsulation + malformed ParameterList
453            inline_qos: None,
454            key_flag: false,
455            non_standard_flag: false,
456            serialized_payload: alloc::vec![0x00, 0x03, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF].into(),
457        };
458        let res = r.handle_data(&data);
459        assert!(matches!(res, Err(SedpReaderError::InvalidPayload { .. })));
460    }
461
462    #[test]
463    fn subscriptions_reader_has_expected_guid() {
464        let r = SedpSubscriptionsReader::new(
465            GuidPrefix::from_bytes([1; 12]),
466            VendorId::ZERODDS,
467            GuidPrefix::from_bytes([2; 12]),
468            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
469        );
470        assert_eq!(
471            r.guid().entity_id,
472            EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER
473        );
474    }
475}