1extern crate alloc;
28use alloc::vec::Vec;
29use core::time::Duration;
30
31use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
32use zerodds_rtps::error::WireError;
33use zerodds_rtps::message_builder::OutboundDatagram;
34use zerodds_rtps::participant_data::endpoint_flag;
35use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
36use zerodds_rtps::reader_proxy::ReaderProxy;
37use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
38use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, VendorId};
39use zerodds_rtps::writer_proxy::WriterProxy;
40
41use crate::sedp::cache::DiscoveredEndpointsCache;
42use crate::sedp::reader::{SedpPublicationsReader, SedpReaderError, SedpSubscriptionsReader};
43use crate::sedp::writer::{SedpPublicationsWriter, SedpSubscriptionsWriter};
44use crate::spdp::DiscoveredParticipant;
45
46#[derive(Debug, Default, Clone, PartialEq, Eq)]
49pub struct SedpEvents {
50 pub new_publications: Vec<PublicationBuiltinTopicData>,
52 pub new_subscriptions: Vec<SubscriptionBuiltinTopicData>,
54}
55
56impl SedpEvents {
57 #[must_use]
59 pub fn is_empty(&self) -> bool {
60 self.new_publications.is_empty() && self.new_subscriptions.is_empty()
61 }
62}
63
64#[derive(Debug)]
66pub struct SedpStack {
67 local_prefix: GuidPrefix,
68 pub_writer: SedpPublicationsWriter,
69 pub_reader: SedpPublicationsReader,
70 sub_writer: SedpSubscriptionsWriter,
71 sub_reader: SedpSubscriptionsReader,
72 cache: DiscoveredEndpointsCache,
73}
74
75impl SedpStack {
76 #[must_use]
81 pub fn new(local_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
82 let placeholder = GuidPrefix::UNKNOWN;
89 let mut pub_reader =
90 SedpPublicationsReader::new(local_prefix, vendor_id, placeholder, Vec::new());
91 let mut sub_reader =
92 SedpSubscriptionsReader::new(local_prefix, vendor_id, placeholder, Vec::new());
93 let pub_placeholder = Guid::new(placeholder, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER);
97 let sub_placeholder = Guid::new(placeholder, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
98 pub_reader.remove_writer_proxy(pub_placeholder);
99 sub_reader.remove_writer_proxy(sub_placeholder);
100 Self {
101 local_prefix,
102 pub_writer: SedpPublicationsWriter::new(local_prefix, vendor_id),
103 pub_reader,
104 sub_writer: SedpSubscriptionsWriter::new(local_prefix, vendor_id),
105 sub_reader,
106 cache: DiscoveredEndpointsCache::default(),
107 }
108 }
109
110 #[must_use]
112 pub fn local_prefix(&self) -> GuidPrefix {
113 self.local_prefix
114 }
115
116 #[must_use]
118 pub fn cache(&self) -> &DiscoveredEndpointsCache {
119 &self.cache
120 }
121
122 pub fn cache_mut(&mut self) -> &mut DiscoveredEndpointsCache {
126 &mut self.cache
127 }
128
129 pub fn announce_publication(
135 &mut self,
136 p: &PublicationBuiltinTopicData,
137 ) -> Result<Vec<OutboundDatagram>, WireError> {
138 self.pub_writer.announce(p)
139 }
140
141 pub fn announce_publication_with_shm_locator(
146 &mut self,
147 p: &PublicationBuiltinTopicData,
148 locator_bytes: &[u8],
149 ) -> Result<Vec<OutboundDatagram>, WireError> {
150 self.pub_writer.announce_with_shm_locator(p, locator_bytes)
151 }
152
153 pub fn announce_subscription(
158 &mut self,
159 s: &SubscriptionBuiltinTopicData,
160 ) -> Result<Vec<OutboundDatagram>, WireError> {
161 self.sub_writer.announce(s)
162 }
163
164 pub fn on_participant_discovered(&mut self, p: &DiscoveredParticipant) {
168 let remote_prefix = p.sender_prefix;
169 if remote_prefix == self.local_prefix {
170 return;
172 }
173 let unicast_locators: Vec<_> = p
177 .data
178 .metatraffic_unicast_locator
179 .or(p.data.default_unicast_locator)
180 .into_iter()
181 .collect();
182 let flags = p.data.builtin_endpoint_set;
183
184 if flags & endpoint_flag::PUBLICATIONS_ANNOUNCER != 0 {
187 self.pub_reader.add_writer_proxy(WriterProxy::new(
188 Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER),
189 unicast_locators.clone(),
190 Vec::new(),
191 true,
192 ));
193 }
194
195 if flags & endpoint_flag::PUBLICATIONS_DETECTOR != 0 {
198 self.pub_writer.add_reader_proxy(ReaderProxy::new(
199 Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_PUBLICATIONS_READER),
200 unicast_locators.clone(),
201 Vec::new(),
202 true,
203 ));
204 }
205
206 if flags & endpoint_flag::SUBSCRIPTIONS_ANNOUNCER != 0 {
208 self.sub_reader.add_writer_proxy(WriterProxy::new(
209 Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER),
210 unicast_locators.clone(),
211 Vec::new(),
212 true,
213 ));
214 }
215
216 if flags & endpoint_flag::SUBSCRIPTIONS_DETECTOR != 0 {
218 self.sub_writer.add_reader_proxy(ReaderProxy::new(
219 Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER),
220 unicast_locators,
221 Vec::new(),
222 true,
223 ));
224 }
225 }
226
227 pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
231 let mut removed = 0usize;
232 if self
234 .pub_writer
235 .remove_reader_proxy(Guid::new(
236 prefix,
237 EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
238 ))
239 .is_some()
240 {
241 removed += 1;
242 }
243 if self
244 .sub_writer
245 .remove_reader_proxy(Guid::new(
246 prefix,
247 EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER,
248 ))
249 .is_some()
250 {
251 removed += 1;
252 }
253 self.pub_reader.remove_writer_proxy(Guid::new(
255 prefix,
256 EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
257 ));
258 self.sub_reader.remove_writer_proxy(Guid::new(
259 prefix,
260 EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER,
261 ));
262 let (pubs, subs) = self.cache.on_participant_lost(prefix);
264 let _ = removed; (pubs, subs)
266 }
267
268 pub fn handle_datagram(
276 &mut self,
277 datagram: &[u8],
278 now: Duration,
279 ) -> Result<SedpEvents, SedpReaderError> {
280 let parsed = decode_datagram(datagram).map_err(SedpReaderError::from)?;
281 let mut events = SedpEvents::default();
282 for sub in parsed.submessages {
283 match sub {
284 ParsedSubmessage::Data(d) => {
285 if d.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
286 for p in self.pub_reader.handle_data(&d)? {
287 self.cache.insert_publication(p.clone(), now);
288 events.new_publications.push(p);
289 }
290 } else if d.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
291 for s in self.sub_reader.handle_data(&d)? {
292 self.cache.insert_subscription(s.clone(), now);
293 events.new_subscriptions.push(s);
294 }
295 }
296 }
297 ParsedSubmessage::DataFrag(df) => {
298 if df.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
299 for p in self.pub_reader.handle_data_frag(&df, now)? {
300 self.cache.insert_publication(p.clone(), now);
301 events.new_publications.push(p);
302 }
303 } else if df.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
304 for s in self.sub_reader.handle_data_frag(&df, now)? {
305 self.cache.insert_subscription(s.clone(), now);
306 events.new_subscriptions.push(s);
307 }
308 }
309 }
310 ParsedSubmessage::Heartbeat(h) => {
311 let to_pub = h.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER
315 || (h.reader_id == EntityId::UNKNOWN
316 && h.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER);
317 let to_sub = h.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER
318 || (h.reader_id == EntityId::UNKNOWN
319 && h.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
320 if to_pub {
321 self.pub_reader.handle_heartbeat(&h, now);
322 }
323 if to_sub {
324 self.sub_reader.handle_heartbeat(&h, now);
325 }
326 }
327 ParsedSubmessage::Gap(g) => {
328 if g.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
329 for p in self.pub_reader.handle_gap(&g)? {
330 self.cache.insert_publication(p.clone(), now);
331 events.new_publications.push(p);
332 }
333 } else if g.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
334 for s in self.sub_reader.handle_gap(&g)? {
335 self.cache.insert_subscription(s.clone(), now);
336 events.new_subscriptions.push(s);
337 }
338 }
339 }
340 ParsedSubmessage::AckNack(ack) => {
341 let base = ack.reader_sn_state.bitmap_base;
343 let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
344 let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
345 if ack.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER {
346 self.pub_writer.handle_acknack(src, base, requested);
347 } else if ack.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER {
348 self.sub_writer.handle_acknack(src, base, requested);
349 }
350 }
351 ParsedSubmessage::NackFrag(nf) => {
352 let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
353 if nf.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER {
354 self.pub_writer.handle_nackfrag(src, &nf);
355 } else if nf.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER {
356 self.sub_writer.handle_nackfrag(src, &nf);
357 }
358 }
359 ParsedSubmessage::HeartbeatFrag(_)
360 | ParsedSubmessage::HeaderExtension(_)
361 | ParsedSubmessage::InfoSource(_)
362 | ParsedSubmessage::InfoReply(_)
363 | ParsedSubmessage::InfoTimestamp(_)
364 | ParsedSubmessage::Unknown { .. } => {}
365 }
366 }
367 Ok(events)
368 }
369
370 pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
376 let mut out = Vec::new();
377 out.extend(self.pub_writer.tick(now)?);
378 out.extend(self.sub_writer.tick(now)?);
379 out.extend(self.pub_reader.tick_outbound(now)?);
384 out.extend(self.sub_reader.tick_outbound(now)?);
385 Ok(out)
386 }
387
388 #[must_use]
390 pub fn pub_writer(&self) -> &SedpPublicationsWriter {
391 &self.pub_writer
392 }
393
394 #[must_use]
396 pub fn pub_reader(&self) -> &SedpPublicationsReader {
397 &self.pub_reader
398 }
399
400 #[must_use]
402 pub fn sub_writer(&self) -> &SedpSubscriptionsWriter {
403 &self.sub_writer
404 }
405
406 #[must_use]
408 pub fn sub_reader(&self) -> &SedpSubscriptionsReader {
409 &self.sub_reader
410 }
411}
412
413#[cfg(test)]
414#[allow(clippy::expect_used, clippy::unwrap_used)]
415mod tests {
416 use super::*;
417 use zerodds_rtps::participant_data::{
418 Duration as DdsDuration, ParticipantBuiltinTopicData, endpoint_flag,
419 };
420 use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
421 use zerodds_rtps::wire_types::{Locator, ProtocolVersion};
422
423 fn remote_participant(prefix: GuidPrefix, endpoint_set: u32) -> DiscoveredParticipant {
424 DiscoveredParticipant {
425 sender_prefix: prefix,
426 sender_vendor: VendorId::ZERODDS,
427 data: ParticipantBuiltinTopicData {
428 guid: Guid::new(prefix, EntityId::PARTICIPANT),
429 protocol_version: ProtocolVersion::V2_5,
430 vendor_id: VendorId::ZERODDS,
431 default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7411)),
432 default_multicast_locator: None,
433 metatraffic_unicast_locator: None,
434 metatraffic_multicast_locator: None,
435 domain_id: None,
436 builtin_endpoint_set: endpoint_set,
437 lease_duration: DdsDuration::from_secs(30),
438 user_data: alloc::vec::Vec::new(),
439 properties: Default::default(),
440 identity_token: None,
441 permissions_token: None,
442 identity_status_token: None,
443 sig_algo_info: None,
444 kx_algo_info: None,
445 sym_cipher_algo_info: None,
446 },
447 }
448 }
449
450 fn sample_pub() -> PublicationBuiltinTopicData {
451 PublicationBuiltinTopicData {
452 key: Guid::new(
453 GuidPrefix::from_bytes([1; 12]),
454 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
455 ),
456 participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
457 topic_name: "ChatterTopic".into(),
458 type_name: "std_msgs::String".into(),
459 durability: DurabilityKind::Volatile,
460 reliability: ReliabilityQos {
461 kind: ReliabilityKind::Reliable,
462 max_blocking_time: DdsDuration::from_secs(10),
463 },
464 ownership: zerodds_qos::OwnershipKind::Shared,
465 ownership_strength: 0,
466 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
467 deadline: zerodds_qos::DeadlineQosPolicy::default(),
468 lifespan: zerodds_qos::LifespanQosPolicy::default(),
469 partition: alloc::vec::Vec::new(),
470 user_data: alloc::vec::Vec::new(),
471 topic_data: alloc::vec::Vec::new(),
472 group_data: alloc::vec::Vec::new(),
473 type_information: None,
474 data_representation: alloc::vec::Vec::new(),
475 security_info: None,
476 service_instance_name: None,
477 related_entity_guid: None,
478 topic_aliases: None,
479 type_identifier: zerodds_types::TypeIdentifier::None,
480 }
481 }
482
483 #[test]
484 fn new_stack_has_no_proxies() {
485 let s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
486 assert_eq!(s.pub_writer().inner().reader_proxy_count(), 0);
487 assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
488 assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
489 assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
490 }
491
492 #[test]
493 fn discovered_participant_wires_all_four_endpoints_when_present() {
494 let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
495 let remote_prefix = GuidPrefix::from_bytes([2; 12]);
496 let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
497 | endpoint_flag::PUBLICATIONS_DETECTOR
498 | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
499 | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
500 s.on_participant_discovered(&remote_participant(remote_prefix, flags));
501 assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
502 assert_eq!(s.pub_reader().inner().writer_proxy_count(), 1);
503 assert_eq!(s.sub_writer().inner().reader_proxy_count(), 1);
504 assert_eq!(s.sub_reader().inner().writer_proxy_count(), 1);
505 }
506
507 #[test]
508 fn partial_endpoint_set_wires_only_matching_sides() {
509 let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
510 let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER | endpoint_flag::PUBLICATIONS_DETECTOR;
512 s.on_participant_discovered(&remote_participant(GuidPrefix::from_bytes([2; 12]), flags));
513 assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
514 assert_eq!(s.pub_reader().inner().writer_proxy_count(), 1);
515 assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
516 assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
517 }
518
519 #[test]
520 fn self_discovery_is_ignored() {
521 let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
522 let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER;
523 s.on_participant_discovered(&remote_participant(GuidPrefix::from_bytes([1; 12]), flags));
524 assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
525 }
526
527 #[test]
528 fn on_participant_lost_clears_proxies_and_cache() {
529 let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
530 let remote_prefix = GuidPrefix::from_bytes([2; 12]);
531 let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
532 | endpoint_flag::PUBLICATIONS_DETECTOR
533 | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
534 | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
535 s.on_participant_discovered(&remote_participant(remote_prefix, flags));
536 assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
538
539 s.on_participant_lost(remote_prefix);
540
541 assert_eq!(s.pub_writer().inner().reader_proxy_count(), 0);
542 assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
543 assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
544 assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
545 }
546
547 #[test]
548 fn end_to_end_discovery_between_two_stacks() {
549 let prefix_a = GuidPrefix::from_bytes([1; 12]);
553 let prefix_b = GuidPrefix::from_bytes([2; 12]);
554 let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
555 | endpoint_flag::PUBLICATIONS_DETECTOR
556 | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
557 | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
558 let mut a = SedpStack::new(prefix_a, VendorId::ZERODDS);
559 let mut b = SedpStack::new(prefix_b, VendorId::ZERODDS);
560 a.on_participant_discovered(&remote_participant(prefix_b, flags));
561 b.on_participant_discovered(&remote_participant(prefix_a, flags));
562
563 let now = Duration::from_secs(1);
564
565 let mut pub_a = sample_pub();
567 pub_a.key = Guid::new(prefix_a, EntityId::user_writer_with_key([1, 0, 0]));
568 pub_a.participant_key = Guid::new(prefix_a, EntityId::PARTICIPANT);
569 pub_a.topic_name = "TopicA".into();
570 for dg in a.announce_publication(&pub_a).unwrap() {
571 let events = b.handle_datagram(&dg.bytes, now).unwrap();
572 assert!(!events.is_empty());
573 assert_eq!(events.new_publications[0].topic_name, "TopicA");
574 }
575
576 let mut pub_b = sample_pub();
578 pub_b.key = Guid::new(prefix_b, EntityId::user_writer_with_key([2, 0, 0]));
579 pub_b.participant_key = Guid::new(prefix_b, EntityId::PARTICIPANT);
580 pub_b.topic_name = "TopicB".into();
581 for dg in b.announce_publication(&pub_b).unwrap() {
582 let events = a.handle_datagram(&dg.bytes, now).unwrap();
583 assert!(!events.is_empty());
584 assert_eq!(events.new_publications[0].topic_name, "TopicB");
585 }
586
587 assert_eq!(a.cache().publications_len(), 1);
588 assert_eq!(b.cache().publications_len(), 1);
589 assert_eq!(
590 a.cache().publications().next().unwrap().data.topic_name,
591 "TopicB"
592 );
593 assert_eq!(
594 b.cache().publications().next().unwrap().data.topic_name,
595 "TopicA"
596 );
597 }
598}