Skip to main content

zerodds_discovery/sedp/
stack.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `SedpStack` — zusammenhaengender SEDP-Endpoint-Komplex.
4//!
5//! Haelt die vier SEDP-Builtin-Endpoints fuer einen Participant
6//! (Publications- + Subscriptions- jeweils Writer + Reader) plus einen
7//! gemeinsamen [`DiscoveredEndpointsCache`], und verdrahtet sich
8//! automatisch mit neu-entdeckten Remote-Participants aus SPDP.
9//!
10//! # Lifecycle
11//!
12//! ```text
13//!   let mut stack = SedpStack::new(local_prefix, VendorId::ZERODDS);
14//!   // Lokale Endpoints ankuendigen
15//!   stack.announce_publication(&my_pub)?;
16//!   // Neuer Remote-Participant per SPDP entdeckt
17//!   stack.on_participant_discovered(&remote);
18//!   // ... Traffic fliesst zwischen den Proxies ...
19//!   // SPDP-Lease-Timeout
20//!   stack.on_participant_lost(remote_prefix);
21//! ```
22//!
23//! Der Stack exponiert keinen Transport — er liefert Datagramme als
24//! [`OutboundDatagram`] zurueck, Transport-Layer ist in
25//! `crates/transport`.
26
27extern crate alloc;
28use alloc::vec::Vec;
29use core::time::Duration;
30
31use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
32use zerodds_rtps::error::WireError;
33use zerodds_rtps::message_builder::OutboundDatagram;
34use zerodds_rtps::participant_data::endpoint_flag;
35use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
36use zerodds_rtps::reader_proxy::ReaderProxy;
37use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
38use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, VendorId};
39use zerodds_rtps::writer_proxy::WriterProxy;
40
41use crate::sedp::cache::DiscoveredEndpointsCache;
42use crate::sedp::reader::{SedpPublicationsReader, SedpReaderError, SedpSubscriptionsReader};
43use crate::sedp::writer::{SedpPublicationsWriter, SedpSubscriptionsWriter};
44use crate::spdp::DiscoveredParticipant;
45
46/// Gesammelte Events aus einem Datagramm-Dispatch. Fuer Callers, die
47/// explizit auf neue Discoveries reagieren wollen (GUI, Logging).
48#[derive(Debug, Default, Clone, PartialEq, Eq)]
49pub struct SedpEvents {
50    /// Neu empfangene Publications (bereits im Cache abgelegt).
51    pub new_publications: Vec<PublicationBuiltinTopicData>,
52    /// Neu empfangene Subscriptions (bereits im Cache abgelegt).
53    pub new_subscriptions: Vec<SubscriptionBuiltinTopicData>,
54}
55
56impl SedpEvents {
57    /// True wenn beides leer ist.
58    #[must_use]
59    pub fn is_empty(&self) -> bool {
60        self.new_publications.is_empty() && self.new_subscriptions.is_empty()
61    }
62}
63
64/// SEDP-Stack fuer einen Participant.
65#[derive(Debug)]
66pub struct SedpStack {
67    local_prefix: GuidPrefix,
68    pub_writer: SedpPublicationsWriter,
69    pub_reader: SedpPublicationsReader,
70    sub_writer: SedpSubscriptionsWriter,
71    sub_reader: SedpSubscriptionsReader,
72    cache: DiscoveredEndpointsCache,
73}
74
75impl SedpStack {
76    /// Erzeugt einen neuen Stack fuer den lokalen Participant. Alle
77    /// Endpoints sind initial ohne Remote-Proxies — die werden ueber
78    /// [`on_participant_discovered`](Self::on_participant_discovered)
79    /// verdrahtet.
80    #[must_use]
81    pub fn new(local_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
82        // Die Reader brauchen bei `new()` einen Writer-Proxy — wir
83        // setzen einen Platzhalter mit UNKNOWN-prefix, der unmittelbar
84        // wieder entfernt wird (siehe `on_participant_discovered`).
85        // Eine Reader-API ohne initialen Proxy wäre kosmetisch sauberer;
86        // funktional ist die Sequenz "Platzhalter rein → sofort raus"
87        // äquivalent.
88        let placeholder = GuidPrefix::UNKNOWN;
89        let mut pub_reader =
90            SedpPublicationsReader::new(local_prefix, vendor_id, placeholder, Vec::new());
91        let mut sub_reader =
92            SedpSubscriptionsReader::new(local_prefix, vendor_id, placeholder, Vec::new());
93        // Platzhalter-Proxies sofort entfernen — sie haben keine
94        // Locators, wuerden aber jeden handle_data-Call mit passender
95        // EntityId abfangen. Wir entfernen sie, um clean zu starten.
96        let pub_placeholder = Guid::new(placeholder, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER);
97        let sub_placeholder = Guid::new(placeholder, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
98        pub_reader.remove_writer_proxy(pub_placeholder);
99        sub_reader.remove_writer_proxy(sub_placeholder);
100        Self {
101            local_prefix,
102            pub_writer: SedpPublicationsWriter::new(local_prefix, vendor_id),
103            pub_reader,
104            sub_writer: SedpSubscriptionsWriter::new(local_prefix, vendor_id),
105            sub_reader,
106            cache: DiscoveredEndpointsCache::default(),
107        }
108    }
109
110    /// Lokaler GuidPrefix dieses Stacks.
111    #[must_use]
112    pub fn local_prefix(&self) -> GuidPrefix {
113        self.local_prefix
114    }
115
116    /// Read-only-Zugriff auf den DiscoveredEndpointsCache.
117    #[must_use]
118    pub fn cache(&self) -> &DiscoveredEndpointsCache {
119        &self.cache
120    }
121
122    /// Mutable-Zugriff auf den Cache. Wird primaer fuer Tests
123    /// (Discovery-Match injizieren ohne Live-UDP) und WP 2.5
124    /// `find_topic`-Verifikation verwendet.
125    pub fn cache_mut(&mut self) -> &mut DiscoveredEndpointsCache {
126        &mut self.cache
127    }
128
129    /// Kuendigt eine lokale Publication an alle bereits entdeckten
130    /// Remote-SEDP-Reader an.
131    ///
132    /// # Errors
133    /// Encoder-Fehler.
134    pub fn announce_publication(
135        &mut self,
136        p: &PublicationBuiltinTopicData,
137    ) -> Result<Vec<OutboundDatagram>, WireError> {
138        self.pub_writer.announce(p)
139    }
140
141    /// ADR-0006: Publication mit PID_SHM_LOCATOR (Vendor-PID 0x8001).
142    ///
143    /// # Errors
144    /// Encoder-Fehler oder Inject-Fehler.
145    pub fn announce_publication_with_shm_locator(
146        &mut self,
147        p: &PublicationBuiltinTopicData,
148        locator_bytes: &[u8],
149    ) -> Result<Vec<OutboundDatagram>, WireError> {
150        self.pub_writer.announce_with_shm_locator(p, locator_bytes)
151    }
152
153    /// Kuendigt eine lokale Subscription an.
154    ///
155    /// # Errors
156    /// Encoder-Fehler.
157    pub fn announce_subscription(
158        &mut self,
159        s: &SubscriptionBuiltinTopicData,
160    ) -> Result<Vec<OutboundDatagram>, WireError> {
161        self.sub_writer.announce(s)
162    }
163
164    /// Verdrahtet die SEDP-Endpoints fuer einen neu entdeckten
165    /// Participant. Liest `p.data.builtin_endpoint_set` + Unicast-
166    /// Locators und registriert die passenden Proxies.
167    pub fn on_participant_discovered(&mut self, p: &DiscoveredParticipant) {
168        let remote_prefix = p.sender_prefix;
169        if remote_prefix == self.local_prefix {
170            // Selbstentdeckung: kein Self-Match (Spec-gemaess).
171            return;
172        }
173        // SEDP routet an metatraffic_unicast_locator (PID 0x0032) —
174        // erst wenn der fehlt, fallback auf default_unicast_locator.
175        // Cyclone/FastDDS annoncieren beide, aber strikt getrennte Rollen.
176        let unicast_locators: Vec<_> = p
177            .data
178            .metatraffic_unicast_locator
179            .or(p.data.default_unicast_locator)
180            .into_iter()
181            .collect();
182        let flags = p.data.builtin_endpoint_set;
183
184        // Remote hat Publications-Announcer (Writer)?
185        //  → unser SedpPublicationsReader bekommt einen WriterProxy
186        if flags & endpoint_flag::PUBLICATIONS_ANNOUNCER != 0 {
187            self.pub_reader.add_writer_proxy(WriterProxy::new(
188                Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER),
189                unicast_locators.clone(),
190                Vec::new(),
191                true,
192            ));
193        }
194
195        // Remote hat Publications-Detector (Reader)?
196        //  → unser SedpPublicationsWriter bekommt einen ReaderProxy
197        if flags & endpoint_flag::PUBLICATIONS_DETECTOR != 0 {
198            self.pub_writer.add_reader_proxy(ReaderProxy::new(
199                Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_PUBLICATIONS_READER),
200                unicast_locators.clone(),
201                Vec::new(),
202                true,
203            ));
204        }
205
206        // Subscriptions-Announcer → unser Subscriptions-Reader
207        if flags & endpoint_flag::SUBSCRIPTIONS_ANNOUNCER != 0 {
208            self.sub_reader.add_writer_proxy(WriterProxy::new(
209                Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER),
210                unicast_locators.clone(),
211                Vec::new(),
212                true,
213            ));
214        }
215
216        // Subscriptions-Detector → unser Subscriptions-Writer
217        if flags & endpoint_flag::SUBSCRIPTIONS_DETECTOR != 0 {
218            self.sub_writer.add_reader_proxy(ReaderProxy::new(
219                Guid::new(remote_prefix, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER),
220                unicast_locators,
221                Vec::new(),
222                true,
223            ));
224        }
225    }
226
227    /// Ein Participant ist verloren (SPDP-Lease-Timeout). Cascade-
228    /// Cleanup: alle Proxies mit diesem Prefix raus, Cache-Eintraege
229    /// loeschen. Liefert (removed_pub_proxies, removed_sub_proxies).
230    pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
231        let mut removed = 0usize;
232        // Writer-seitig: remote Reader entfernen
233        if self
234            .pub_writer
235            .remove_reader_proxy(Guid::new(
236                prefix,
237                EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
238            ))
239            .is_some()
240        {
241            removed += 1;
242        }
243        if self
244            .sub_writer
245            .remove_reader_proxy(Guid::new(
246                prefix,
247                EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER,
248            ))
249            .is_some()
250        {
251            removed += 1;
252        }
253        // Reader-seitig: remote Writer entfernen
254        self.pub_reader.remove_writer_proxy(Guid::new(
255            prefix,
256            EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
257        ));
258        self.sub_reader.remove_writer_proxy(Guid::new(
259            prefix,
260            EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER,
261        ));
262        // Cache-Cleanup
263        let (pubs, subs) = self.cache.on_participant_lost(prefix);
264        let _ = removed; // Reader-removal-Count derzeit nicht ausgewertet
265        (pubs, subs)
266    }
267
268    /// Verarbeitet ein eingehendes RTPS-Datagramm an den SEDP-Stack.
269    /// Routed die Submessages nach Ziel-EntityId an den passenden
270    /// Endpoint und aktualisiert den Cache. Liefert die deltaartig
271    /// neu-entdeckten Samples zurueck.
272    ///
273    /// # Errors
274    /// Wire-Decode-Fehler oder fehlerhafte PL_CDR-Payloads.
275    pub fn handle_datagram(
276        &mut self,
277        datagram: &[u8],
278        now: Duration,
279    ) -> Result<SedpEvents, SedpReaderError> {
280        let parsed = decode_datagram(datagram).map_err(SedpReaderError::from)?;
281        let mut events = SedpEvents::default();
282        for sub in parsed.submessages {
283            match sub {
284                ParsedSubmessage::Data(d) => {
285                    if d.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
286                        for p in self.pub_reader.handle_data(&d)? {
287                            self.cache.insert_publication(p.clone(), now);
288                            events.new_publications.push(p);
289                        }
290                    } else if d.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
291                        for s in self.sub_reader.handle_data(&d)? {
292                            self.cache.insert_subscription(s.clone(), now);
293                            events.new_subscriptions.push(s);
294                        }
295                    }
296                }
297                ParsedSubmessage::DataFrag(df) => {
298                    if df.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
299                        for p in self.pub_reader.handle_data_frag(&df, now)? {
300                            self.cache.insert_publication(p.clone(), now);
301                            events.new_publications.push(p);
302                        }
303                    } else if df.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
304                        for s in self.sub_reader.handle_data_frag(&df, now)? {
305                            self.cache.insert_subscription(s.clone(), now);
306                            events.new_subscriptions.push(s);
307                        }
308                    }
309                }
310                ParsedSubmessage::Heartbeat(h) => {
311                    // Cyclone schickt HEARTBEATs oft mit reader_id=UNKNOWN
312                    // und nutzt INFO_DST zur Adressierung — Dispatch dann
313                    // nach writer_id an den passenden SEDP-Reader.
314                    let to_pub = h.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER
315                        || (h.reader_id == EntityId::UNKNOWN
316                            && h.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER);
317                    let to_sub = h.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER
318                        || (h.reader_id == EntityId::UNKNOWN
319                            && h.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
320                    if to_pub {
321                        self.pub_reader.handle_heartbeat(&h, now);
322                    }
323                    if to_sub {
324                        self.sub_reader.handle_heartbeat(&h, now);
325                    }
326                }
327                ParsedSubmessage::Gap(g) => {
328                    if g.reader_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_READER {
329                        for p in self.pub_reader.handle_gap(&g)? {
330                            self.cache.insert_publication(p.clone(), now);
331                            events.new_publications.push(p);
332                        }
333                    } else if g.reader_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER {
334                        for s in self.sub_reader.handle_gap(&g)? {
335                            self.cache.insert_subscription(s.clone(), now);
336                            events.new_subscriptions.push(s);
337                        }
338                    }
339                }
340                ParsedSubmessage::AckNack(ack) => {
341                    // Dispatch auf Writer anhand writer_id
342                    let base = ack.reader_sn_state.bitmap_base;
343                    let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
344                    let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
345                    if ack.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER {
346                        self.pub_writer.handle_acknack(src, base, requested);
347                    } else if ack.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER {
348                        self.sub_writer.handle_acknack(src, base, requested);
349                    }
350                }
351                ParsedSubmessage::NackFrag(nf) => {
352                    let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
353                    if nf.writer_id == EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER {
354                        self.pub_writer.handle_nackfrag(src, &nf);
355                    } else if nf.writer_id == EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER {
356                        self.sub_writer.handle_nackfrag(src, &nf);
357                    }
358                }
359                ParsedSubmessage::HeartbeatFrag(_)
360                | ParsedSubmessage::HeaderExtension(_)
361                | ParsedSubmessage::InfoSource(_)
362                | ParsedSubmessage::InfoReply(_)
363                | ParsedSubmessage::InfoTimestamp(_)
364                | ParsedSubmessage::Unknown { .. } => {}
365            }
366        }
367        Ok(events)
368    }
369
370    /// Tick ueber alle Endpoints. Liefert HEARTBEATs + Resends von den
371    /// Writern plus ACKNACK/NACK_FRAG von den Readern.
372    ///
373    /// # Errors
374    /// Wire-Encode-Fehler.
375    pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
376        let mut out = Vec::new();
377        out.extend(self.pub_writer.tick(now)?);
378        out.extend(self.sub_writer.tick(now)?);
379        // Reader-Ticks liefern ACKNACK/NACK_FRAG-Datagramme mit
380        // WriterProxy-Unicast-Locators als Target (RTPS §8.4.15).
381        // Erforderlich fuer Live-Interop: Cyclone wartet auf initiales
382        // ACKNACK, bevor der Reliable-Writer DATA schickt.
383        out.extend(self.pub_reader.tick_outbound(now)?);
384        out.extend(self.sub_reader.tick_outbound(now)?);
385        Ok(out)
386    }
387
388    /// Read-only-Zugriff auf die internen Endpoints (Diagnose/Tests).
389    #[must_use]
390    pub fn pub_writer(&self) -> &SedpPublicationsWriter {
391        &self.pub_writer
392    }
393
394    /// dito.
395    #[must_use]
396    pub fn pub_reader(&self) -> &SedpPublicationsReader {
397        &self.pub_reader
398    }
399
400    /// dito.
401    #[must_use]
402    pub fn sub_writer(&self) -> &SedpSubscriptionsWriter {
403        &self.sub_writer
404    }
405
406    /// dito.
407    #[must_use]
408    pub fn sub_reader(&self) -> &SedpSubscriptionsReader {
409        &self.sub_reader
410    }
411}
412
413#[cfg(test)]
414#[allow(clippy::expect_used, clippy::unwrap_used)]
415mod tests {
416    use super::*;
417    use zerodds_rtps::participant_data::{
418        Duration as DdsDuration, ParticipantBuiltinTopicData, endpoint_flag,
419    };
420    use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
421    use zerodds_rtps::wire_types::{Locator, ProtocolVersion};
422
423    fn remote_participant(prefix: GuidPrefix, endpoint_set: u32) -> DiscoveredParticipant {
424        DiscoveredParticipant {
425            sender_prefix: prefix,
426            sender_vendor: VendorId::ZERODDS,
427            data: ParticipantBuiltinTopicData {
428                guid: Guid::new(prefix, EntityId::PARTICIPANT),
429                protocol_version: ProtocolVersion::V2_5,
430                vendor_id: VendorId::ZERODDS,
431                default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7411)),
432                default_multicast_locator: None,
433                metatraffic_unicast_locator: None,
434                metatraffic_multicast_locator: None,
435                domain_id: None,
436                builtin_endpoint_set: endpoint_set,
437                lease_duration: DdsDuration::from_secs(30),
438                user_data: alloc::vec::Vec::new(),
439                properties: Default::default(),
440                identity_token: None,
441                permissions_token: None,
442                identity_status_token: None,
443                sig_algo_info: None,
444                kx_algo_info: None,
445                sym_cipher_algo_info: None,
446            },
447        }
448    }
449
450    fn sample_pub() -> PublicationBuiltinTopicData {
451        PublicationBuiltinTopicData {
452            key: Guid::new(
453                GuidPrefix::from_bytes([1; 12]),
454                EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
455            ),
456            participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
457            topic_name: "ChatterTopic".into(),
458            type_name: "std_msgs::String".into(),
459            durability: DurabilityKind::Volatile,
460            reliability: ReliabilityQos {
461                kind: ReliabilityKind::Reliable,
462                max_blocking_time: DdsDuration::from_secs(10),
463            },
464            ownership: zerodds_qos::OwnershipKind::Shared,
465            ownership_strength: 0,
466            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
467            deadline: zerodds_qos::DeadlineQosPolicy::default(),
468            lifespan: zerodds_qos::LifespanQosPolicy::default(),
469            partition: alloc::vec::Vec::new(),
470            user_data: alloc::vec::Vec::new(),
471            topic_data: alloc::vec::Vec::new(),
472            group_data: alloc::vec::Vec::new(),
473            type_information: None,
474            data_representation: alloc::vec::Vec::new(),
475            security_info: None,
476            service_instance_name: None,
477            related_entity_guid: None,
478            topic_aliases: None,
479            type_identifier: zerodds_types::TypeIdentifier::None,
480        }
481    }
482
483    #[test]
484    fn new_stack_has_no_proxies() {
485        let s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
486        assert_eq!(s.pub_writer().inner().reader_proxy_count(), 0);
487        assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
488        assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
489        assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
490    }
491
492    #[test]
493    fn discovered_participant_wires_all_four_endpoints_when_present() {
494        let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
495        let remote_prefix = GuidPrefix::from_bytes([2; 12]);
496        let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
497            | endpoint_flag::PUBLICATIONS_DETECTOR
498            | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
499            | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
500        s.on_participant_discovered(&remote_participant(remote_prefix, flags));
501        assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
502        assert_eq!(s.pub_reader().inner().writer_proxy_count(), 1);
503        assert_eq!(s.sub_writer().inner().reader_proxy_count(), 1);
504        assert_eq!(s.sub_reader().inner().writer_proxy_count(), 1);
505    }
506
507    #[test]
508    fn partial_endpoint_set_wires_only_matching_sides() {
509        let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
510        // Remote hat nur Publications-Seite (Announcer + Detector)
511        let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER | endpoint_flag::PUBLICATIONS_DETECTOR;
512        s.on_participant_discovered(&remote_participant(GuidPrefix::from_bytes([2; 12]), flags));
513        assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
514        assert_eq!(s.pub_reader().inner().writer_proxy_count(), 1);
515        assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
516        assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
517    }
518
519    #[test]
520    fn self_discovery_is_ignored() {
521        let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
522        let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER;
523        s.on_participant_discovered(&remote_participant(GuidPrefix::from_bytes([1; 12]), flags));
524        assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
525    }
526
527    #[test]
528    fn on_participant_lost_clears_proxies_and_cache() {
529        let mut s = SedpStack::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
530        let remote_prefix = GuidPrefix::from_bytes([2; 12]);
531        let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
532            | endpoint_flag::PUBLICATIONS_DETECTOR
533            | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
534            | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
535        s.on_participant_discovered(&remote_participant(remote_prefix, flags));
536        // Vollen Zustand bestaetigen
537        assert_eq!(s.pub_writer().inner().reader_proxy_count(), 1);
538
539        s.on_participant_lost(remote_prefix);
540
541        assert_eq!(s.pub_writer().inner().reader_proxy_count(), 0);
542        assert_eq!(s.pub_reader().inner().writer_proxy_count(), 0);
543        assert_eq!(s.sub_writer().inner().reader_proxy_count(), 0);
544        assert_eq!(s.sub_reader().inner().writer_proxy_count(), 0);
545    }
546
547    #[test]
548    fn end_to_end_discovery_between_two_stacks() {
549        // Zwei Stacks auf zwei verschiedenen Prefixes, beide announcen
550        // Publications. Nach on_participant_discovered + handle_datagram
551        // sehen beide die jeweils fremden Publications im Cache.
552        let prefix_a = GuidPrefix::from_bytes([1; 12]);
553        let prefix_b = GuidPrefix::from_bytes([2; 12]);
554        let flags = endpoint_flag::PUBLICATIONS_ANNOUNCER
555            | endpoint_flag::PUBLICATIONS_DETECTOR
556            | endpoint_flag::SUBSCRIPTIONS_ANNOUNCER
557            | endpoint_flag::SUBSCRIPTIONS_DETECTOR;
558        let mut a = SedpStack::new(prefix_a, VendorId::ZERODDS);
559        let mut b = SedpStack::new(prefix_b, VendorId::ZERODDS);
560        a.on_participant_discovered(&remote_participant(prefix_b, flags));
561        b.on_participant_discovered(&remote_participant(prefix_a, flags));
562
563        let now = Duration::from_secs(1);
564
565        // A announces
566        let mut pub_a = sample_pub();
567        pub_a.key = Guid::new(prefix_a, EntityId::user_writer_with_key([1, 0, 0]));
568        pub_a.participant_key = Guid::new(prefix_a, EntityId::PARTICIPANT);
569        pub_a.topic_name = "TopicA".into();
570        for dg in a.announce_publication(&pub_a).unwrap() {
571            let events = b.handle_datagram(&dg.bytes, now).unwrap();
572            assert!(!events.is_empty());
573            assert_eq!(events.new_publications[0].topic_name, "TopicA");
574        }
575
576        // B announces
577        let mut pub_b = sample_pub();
578        pub_b.key = Guid::new(prefix_b, EntityId::user_writer_with_key([2, 0, 0]));
579        pub_b.participant_key = Guid::new(prefix_b, EntityId::PARTICIPANT);
580        pub_b.topic_name = "TopicB".into();
581        for dg in b.announce_publication(&pub_b).unwrap() {
582            let events = a.handle_datagram(&dg.bytes, now).unwrap();
583            assert!(!events.is_empty());
584            assert_eq!(events.new_publications[0].topic_name, "TopicB");
585        }
586
587        assert_eq!(a.cache().publications_len(), 1);
588        assert_eq!(b.cache().publications_len(), 1);
589        assert_eq!(
590            a.cache().publications().next().unwrap().data.topic_name,
591            "TopicB"
592        );
593        assert_eq!(
594            b.cache().publications().next().unwrap().data.topic_name,
595            "TopicA"
596        );
597    }
598}