1extern crate alloc;
17use alloc::string::String;
18use alloc::vec::Vec;
19use core::time::Duration;
20
21use zerodds_rtps::error::WireError;
22use zerodds_rtps::fragment_assembler::AssemblerCaps;
23use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
24use zerodds_rtps::reliable_reader::{
25 DEFAULT_HEARTBEAT_RESPONSE_DELAY, ReliableReader, ReliableReaderConfig,
26};
27use zerodds_rtps::submessages::{
28 DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage,
29};
30use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
31use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, VendorId};
32use zerodds_rtps::writer_proxy::WriterProxy;
33
34pub const SEDP_READER_MAX_SAMPLES: usize = 256;
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39#[non_exhaustive]
40pub enum SedpReaderError {
41 InvalidPayload {
44 reason: &'static str,
46 },
47 Wire(WireError),
49}
50
51impl From<WireError> for SedpReaderError {
52 fn from(e: WireError) -> Self {
53 match e {
54 WireError::ValueOutOfRange { message } => Self::InvalidPayload { reason: message },
55 other => Self::Wire(other),
56 }
57 }
58}
59
60#[derive(Debug)]
63pub struct SedpPublicationsReader {
64 inner: ReliableReader,
65}
66
67#[derive(Debug)]
70pub struct SedpSubscriptionsReader {
71 inner: ReliableReader,
72}
73
74impl SedpPublicationsReader {
75 #[must_use]
80 pub fn new(
81 participant_prefix: GuidPrefix,
82 vendor_id: VendorId,
83 remote_writer_prefix: GuidPrefix,
84 remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
85 ) -> Self {
86 let reader_guid = Guid::new(
87 participant_prefix,
88 EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
89 );
90 let remote_writer_guid = Guid::new(
91 remote_writer_prefix,
92 EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
93 );
94 Self {
95 inner: make_sedp_reader(
96 reader_guid,
97 vendor_id,
98 remote_writer_guid,
99 remote_metatraffic_unicast,
100 ),
101 }
102 }
103
104 #[must_use]
106 pub fn guid(&self) -> Guid {
107 self.inner.guid()
108 }
109
110 pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
113 self.inner.add_writer_proxy(proxy);
114 }
115
116 pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
118 self.inner.remove_writer_proxy(guid)
119 }
120
121 pub fn handle_data(
129 &mut self,
130 data: &DataSubmessage,
131 ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
132 let samples = self.inner.handle_data(data);
133 decode_publication_samples(samples.into_iter().map(|s| s.payload))
134 }
135
136 pub fn handle_data_frag(
141 &mut self,
142 df: &DataFragSubmessage,
143 now: Duration,
144 ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
145 let samples = self.inner.handle_data_frag(df, now);
146 decode_publication_samples(samples.into_iter().map(|s| s.payload))
147 }
148
149 pub fn handle_gap(
155 &mut self,
156 gap: &GapSubmessage,
157 ) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError> {
158 let samples = self.inner.handle_gap(gap);
159 decode_publication_samples(samples.into_iter().map(|s| s.payload))
160 }
161
162 pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
164 self.inner.handle_heartbeat(hb, now);
165 }
166
167 pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
172 self.inner.tick(now)
173 }
174
175 pub fn tick_outbound(
180 &mut self,
181 now: Duration,
182 ) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>, WireError> {
183 self.inner.tick_outbound(now)
184 }
185
186 #[must_use]
188 pub fn inner(&self) -> &ReliableReader {
189 &self.inner
190 }
191}
192
193impl SedpSubscriptionsReader {
194 #[must_use]
196 pub fn new(
197 participant_prefix: GuidPrefix,
198 vendor_id: VendorId,
199 remote_writer_prefix: GuidPrefix,
200 remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
201 ) -> Self {
202 let reader_guid = Guid::new(
203 participant_prefix,
204 EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER,
205 );
206 let remote_writer_guid = Guid::new(
207 remote_writer_prefix,
208 EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER,
209 );
210 Self {
211 inner: make_sedp_reader(
212 reader_guid,
213 vendor_id,
214 remote_writer_guid,
215 remote_metatraffic_unicast,
216 ),
217 }
218 }
219
220 #[must_use]
222 pub fn guid(&self) -> Guid {
223 self.inner.guid()
224 }
225
226 pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
228 self.inner.add_writer_proxy(proxy);
229 }
230
231 pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
233 self.inner.remove_writer_proxy(guid)
234 }
235
236 pub fn handle_data(
241 &mut self,
242 data: &DataSubmessage,
243 ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
244 let samples = self.inner.handle_data(data);
245 decode_subscription_samples(samples.into_iter().map(|s| s.payload))
246 }
247
248 pub fn handle_data_frag(
253 &mut self,
254 df: &DataFragSubmessage,
255 now: Duration,
256 ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
257 let samples = self.inner.handle_data_frag(df, now);
258 decode_subscription_samples(samples.into_iter().map(|s| s.payload))
259 }
260
261 pub fn handle_gap(
266 &mut self,
267 gap: &GapSubmessage,
268 ) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError> {
269 let samples = self.inner.handle_gap(gap);
270 decode_subscription_samples(samples.into_iter().map(|s| s.payload))
271 }
272
273 pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
275 self.inner.handle_heartbeat(hb, now);
276 }
277
278 pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
283 self.inner.tick(now)
284 }
285
286 pub fn tick_outbound(
291 &mut self,
292 now: Duration,
293 ) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>, WireError> {
294 self.inner.tick_outbound(now)
295 }
296
297 #[must_use]
299 pub fn inner(&self) -> &ReliableReader {
300 &self.inner
301 }
302}
303
304fn make_sedp_reader(
309 reader_guid: Guid,
310 vendor_id: VendorId,
311 remote_writer_guid: Guid,
312 remote_metatraffic_unicast: Vec<zerodds_rtps::wire_types::Locator>,
313) -> ReliableReader {
314 let writer_proxy = WriterProxy::new(
315 remote_writer_guid,
316 remote_metatraffic_unicast,
317 Vec::new(),
318 true,
319 );
320 ReliableReader::new(ReliableReaderConfig {
321 guid: reader_guid,
322 vendor_id,
323 writer_proxies: alloc::vec![writer_proxy],
324 max_samples_per_proxy: SEDP_READER_MAX_SAMPLES,
325 heartbeat_response_delay: DEFAULT_HEARTBEAT_RESPONSE_DELAY,
326 assembler_caps: AssemblerCaps::default(),
327 })
328}
329
330fn decode_publication_samples<B, I>(
331 payloads: I,
332) -> Result<Vec<PublicationBuiltinTopicData>, SedpReaderError>
333where
334 B: AsRef<[u8]>,
335 I: IntoIterator<Item = B>,
336{
337 let mut out = Vec::new();
338 for p in payloads {
339 out.push(PublicationBuiltinTopicData::from_pl_cdr_le(p.as_ref())?);
340 }
341 Ok(out)
342}
343
344fn decode_subscription_samples<B, I>(
345 payloads: I,
346) -> Result<Vec<SubscriptionBuiltinTopicData>, SedpReaderError>
347where
348 B: AsRef<[u8]>,
349 I: IntoIterator<Item = B>,
350{
351 let mut out = Vec::new();
352 for p in payloads {
353 out.push(SubscriptionBuiltinTopicData::from_pl_cdr_le(p.as_ref())?);
354 }
355 Ok(out)
356}
357
358#[allow(dead_code)]
361const _: Option<String> = None;
362
363#[cfg(test)]
364#[allow(clippy::expect_used, clippy::unwrap_used)]
365mod tests {
366 use super::*;
367 use zerodds_rtps::participant_data::Duration as DdsDuration;
368 use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
369 use zerodds_rtps::submessages::DataSubmessage;
370 use zerodds_rtps::wire_types::{Locator, SequenceNumber};
371
372 fn sample_pub() -> PublicationBuiltinTopicData {
373 PublicationBuiltinTopicData {
374 key: Guid::new(
375 GuidPrefix::from_bytes([2; 12]),
376 EntityId::user_writer_with_key([0xAA, 0xBB, 0xCC]),
377 ),
378 participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
379 topic_name: "ChatterTopic".into(),
380 type_name: "std_msgs::String".into(),
381 durability: DurabilityKind::Volatile,
382 reliability: ReliabilityQos {
383 kind: ReliabilityKind::Reliable,
384 max_blocking_time: DdsDuration::from_secs(10),
385 },
386 ownership: zerodds_qos::OwnershipKind::Shared,
387 ownership_strength: 0,
388 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
389 deadline: zerodds_qos::DeadlineQosPolicy::default(),
390 lifespan: zerodds_qos::LifespanQosPolicy::default(),
391 partition: alloc::vec::Vec::new(),
392 user_data: alloc::vec::Vec::new(),
393 topic_data: alloc::vec::Vec::new(),
394 group_data: alloc::vec::Vec::new(),
395 type_information: None,
396 data_representation: alloc::vec::Vec::new(),
397 security_info: None,
398 service_instance_name: None,
399 related_entity_guid: None,
400 topic_aliases: None,
401 type_identifier: zerodds_types::TypeIdentifier::None,
402 }
403 }
404
405 fn make_reader() -> SedpPublicationsReader {
406 SedpPublicationsReader::new(
407 GuidPrefix::from_bytes([1; 12]),
408 VendorId::ZERODDS,
409 GuidPrefix::from_bytes([2; 12]),
410 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
411 )
412 }
413
414 #[test]
415 fn reader_has_expected_guid() {
416 let r = make_reader();
417 assert_eq!(
418 r.guid().entity_id,
419 EntityId::SEDP_BUILTIN_PUBLICATIONS_READER
420 );
421 }
422
423 #[test]
424 fn handle_data_decodes_publication_payload() {
425 let mut r = make_reader();
426 let p = sample_pub();
427 let payload = p.to_pl_cdr_le().unwrap();
428 let data = DataSubmessage {
429 extra_flags: 0,
430 reader_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
431 writer_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
432 writer_sn: SequenceNumber(1),
433 inline_qos: None,
434 key_flag: false,
435 non_standard_flag: false,
436 serialized_payload: payload.into(),
437 };
438 let out = r.handle_data(&data).unwrap();
439 assert_eq!(out.len(), 1);
440 assert_eq!(out[0].topic_name, "ChatterTopic");
441 assert_eq!(out[0].type_name, "std_msgs::String");
442 }
443
444 #[test]
445 fn handle_data_rejects_invalid_payload() {
446 let mut r = make_reader();
447 let data = DataSubmessage {
448 extra_flags: 0,
449 reader_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
450 writer_id: EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
451 writer_sn: SequenceNumber(1),
452 inline_qos: None,
454 key_flag: false,
455 non_standard_flag: false,
456 serialized_payload: alloc::vec![0x00, 0x03, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF].into(),
457 };
458 let res = r.handle_data(&data);
459 assert!(matches!(res, Err(SedpReaderError::InvalidPayload { .. })));
460 }
461
462 #[test]
463 fn subscriptions_reader_has_expected_guid() {
464 let r = SedpSubscriptionsReader::new(
465 GuidPrefix::from_bytes([1; 12]),
466 VendorId::ZERODDS,
467 GuidPrefix::from_bytes([2; 12]),
468 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
469 );
470 assert_eq!(
471 r.guid().entity_id,
472 EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER
473 );
474 }
475}