Skip to main content

zerodds_discovery/sedp/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `DiscoveredEndpointsCache` — in-memory cache entdeckter SEDP-
4//! Publications und Subscriptions.
5//!
6//! Pro in-flight Participant halten wir separat Pub/Sub-Maps mit
7//! LRU-Eviction bei Ueberlauf eines konfigurierbaren Caps (analog
8//! Fast-DDS `ReaderProxy.resource_limits.max_samples`, Recherche aus
9//! WP 1.2 Review). Eviction **pro Remote-Participant**, nicht global,
10//! damit ein chatty Remote nicht die Discoveries aus anderen
11//! Participants verdraengt.
12//!
13//! `on_participant_lost(prefix)` wirft alle Endpoints eines
14//! Remote-Participants auf einmal raus — notwendig, wenn SPDP einen
15//! Lease-Timeout erkennt.
16
17extern crate alloc;
18use alloc::collections::{BTreeMap, BTreeSet};
19use alloc::vec::Vec;
20use core::time::Duration;
21
22use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
23use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
24use zerodds_rtps::wire_types::{Guid, GuidPrefix};
25
26/// Default-Cap fuer Publications pro Remote-Participant.
27pub const DEFAULT_MAX_PUBLICATIONS_PER_PARTICIPANT: usize = 256;
28/// Default-Cap fuer Subscriptions pro Remote-Participant.
29pub const DEFAULT_MAX_SUBSCRIPTIONS_PER_PARTICIPANT: usize = 256;
30
31/// Konfiguration (DoS-Caps).
32#[derive(Debug, Clone, Copy)]
33pub struct CacheCaps {
34    /// Max. Publications pro Remote-Participant. Ueberlauf → aelteste
35    /// Publication dieses Participants verworfen (LRU).
36    pub max_publications_per_participant: usize,
37    /// Max. Subscriptions pro Remote-Participant. Gleiche Semantik.
38    pub max_subscriptions_per_participant: usize,
39}
40
41impl Default for CacheCaps {
42    fn default() -> Self {
43        Self {
44            max_publications_per_participant: DEFAULT_MAX_PUBLICATIONS_PER_PARTICIPANT,
45            max_subscriptions_per_participant: DEFAULT_MAX_SUBSCRIPTIONS_PER_PARTICIPANT,
46        }
47    }
48}
49
50/// Eine entdeckte Publication mit Zeitstempel.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct DiscoveredPublication {
53    /// Wire-Daten aus der SEDP-DATA-Submessage.
54    pub data: PublicationBuiltinTopicData,
55    /// Zeitpunkt der (letzten) Discovery — Uptime-basiert, fuer LRU.
56    pub discovered_at: Duration,
57}
58
59/// Eine entdeckte Subscription mit Zeitstempel.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DiscoveredSubscription {
62    /// Wire-Daten aus der SEDP-DATA-Submessage.
63    pub data: SubscriptionBuiltinTopicData,
64    /// Zeitpunkt der (letzten) Discovery.
65    pub discovered_at: Duration,
66}
67
68/// Sekundär-Index pro Remote-Participant, sortiert nach
69/// `discovered_at`. Erlaubt O(log n) Count (`set.len()`) und O(log n)
70/// LRU-Eviction (`set.iter().next()` = aeltester Eintrag). Der Index
71/// muss bei jedem Update auf `publications`/`subscriptions`
72/// synchronisiert werden.
73type PerPrefixIndex = BTreeMap<GuidPrefix, BTreeSet<(Duration, Guid)>>;
74
75/// In-memory Cache fuer SEDP-entdeckte Endpoints.
76#[derive(Debug, Clone, Default)]
77pub struct DiscoveredEndpointsCache {
78    publications: BTreeMap<Guid, DiscoveredPublication>,
79    subscriptions: BTreeMap<Guid, DiscoveredSubscription>,
80    // F3-Fix: Sekundaer-Indizes fuer O(log n) Count + LRU-Eviction.
81    // Gueltigkeit: fuer jeden Schluessel k in `publications` existiert
82    // genau ein Eintrag `(k-discovered_at, k)` in `pub_index[k.prefix]`.
83    // Selbiges fuer subscriptions/sub_index. Invariante wird in jedem
84    // Insert/Update/Remove-Pfad erhalten.
85    pub_index: PerPrefixIndex,
86    sub_index: PerPrefixIndex,
87    caps: CacheCaps,
88    evicted: u64,
89}
90
91impl DiscoveredEndpointsCache {
92    /// Neu mit den gegebenen Caps.
93    #[must_use]
94    pub fn new(caps: CacheCaps) -> Self {
95        Self {
96            publications: BTreeMap::new(),
97            subscriptions: BTreeMap::new(),
98            pub_index: BTreeMap::new(),
99            sub_index: BTreeMap::new(),
100            caps,
101            evicted: 0,
102        }
103    }
104
105    /// Anzahl Publications insgesamt.
106    #[must_use]
107    pub fn publications_len(&self) -> usize {
108        self.publications.len()
109    }
110
111    /// Anzahl Subscriptions insgesamt.
112    #[must_use]
113    pub fn subscriptions_len(&self) -> usize {
114        self.subscriptions.len()
115    }
116
117    /// Anzahl LRU-Evictions seit Start (DoS-Diagnose).
118    #[must_use]
119    pub fn evicted_count(&self) -> u64 {
120        self.evicted
121    }
122
123    /// Lookup einer Publication per Endpoint-GUID.
124    #[must_use]
125    pub fn publication(&self, key: Guid) -> Option<&DiscoveredPublication> {
126        self.publications.get(&key)
127    }
128
129    /// Lookup einer Subscription per Endpoint-GUID.
130    #[must_use]
131    pub fn subscription(&self, key: Guid) -> Option<&DiscoveredSubscription> {
132        self.subscriptions.get(&key)
133    }
134
135    /// Iteriert alle Publications eines Remote-Participants.
136    pub fn publications_for(
137        &self,
138        prefix: GuidPrefix,
139    ) -> impl Iterator<Item = &DiscoveredPublication> + '_ {
140        self.publications
141            .iter()
142            .filter(move |(guid, _)| guid.prefix == prefix)
143            .map(|(_, p)| p)
144    }
145
146    /// Iteriert alle Subscriptions eines Remote-Participants.
147    pub fn subscriptions_for(
148        &self,
149        prefix: GuidPrefix,
150    ) -> impl Iterator<Item = &DiscoveredSubscription> + '_ {
151        self.subscriptions
152            .iter()
153            .filter(move |(guid, _)| guid.prefix == prefix)
154            .map(|(_, p)| p)
155    }
156
157    /// Iteriert alle Publications (fuer Matching-Scan).
158    pub fn publications(&self) -> impl Iterator<Item = &DiscoveredPublication> + '_ {
159        self.publications.values()
160    }
161
162    /// Iteriert alle Subscriptions.
163    pub fn subscriptions(&self) -> impl Iterator<Item = &DiscoveredSubscription> + '_ {
164        self.subscriptions.values()
165    }
166
167    /// Insert oder Update einer Publication. Idempotent:
168    /// bei bereits vorhandener GUID wird `data` ueberschrieben und
169    /// `discovered_at` aktualisiert — **keine** Eviction-Runde.
170    ///
171    /// Bei neuem Insert: wenn die Publications-Zahl dieses Participants
172    /// den Cap erreicht, wird die aelteste Publication desselben
173    /// Participants verworfen (LRU), `evicted_count` inkrementiert.
174    ///
175    /// Rueckgabe: `true` = neu eingefuegt, `false` = Update.
176    pub fn insert_publication(&mut self, data: PublicationBuiltinTopicData, now: Duration) -> bool {
177        let key = data.key;
178        let prefix = key.prefix;
179        if let Some(existing) = self.publications.get_mut(&key) {
180            // Update-Pfad: Index auf neuen `discovered_at` nachziehen.
181            let old_entry = (existing.discovered_at, key);
182            if let Some(set) = self.pub_index.get_mut(&prefix) {
183                set.remove(&old_entry);
184                set.insert((now, key));
185            }
186            existing.data = data;
187            existing.discovered_at = now;
188            return false;
189        }
190        // Cap-Check + LRU-Eviction via Sekundaer-Index — O(log n) statt
191        // frueheres O(N) (F3-Fix).
192        let set = self.pub_index.entry(prefix).or_default();
193        if set.len() >= self.caps.max_publications_per_participant {
194            if let Some(&oldest) = set.iter().next() {
195                set.remove(&oldest);
196                self.publications.remove(&oldest.1);
197                self.evicted = self.evicted.saturating_add(1);
198            }
199        }
200        set.insert((now, key));
201        self.publications.insert(
202            key,
203            DiscoveredPublication {
204                data,
205                discovered_at: now,
206            },
207        );
208        true
209    }
210
211    /// Insert oder Update einer Subscription. Semantik analog
212    /// [`Self::insert_publication`].
213    pub fn insert_subscription(
214        &mut self,
215        data: SubscriptionBuiltinTopicData,
216        now: Duration,
217    ) -> bool {
218        let key = data.key;
219        let prefix = key.prefix;
220        if let Some(existing) = self.subscriptions.get_mut(&key) {
221            let old_entry = (existing.discovered_at, key);
222            if let Some(set) = self.sub_index.get_mut(&prefix) {
223                set.remove(&old_entry);
224                set.insert((now, key));
225            }
226            existing.data = data;
227            existing.discovered_at = now;
228            return false;
229        }
230        let set = self.sub_index.entry(prefix).or_default();
231        if set.len() >= self.caps.max_subscriptions_per_participant {
232            if let Some(&oldest) = set.iter().next() {
233                set.remove(&oldest);
234                self.subscriptions.remove(&oldest.1);
235                self.evicted = self.evicted.saturating_add(1);
236            }
237        }
238        set.insert((now, key));
239        self.subscriptions.insert(
240            key,
241            DiscoveredSubscription {
242                data,
243                discovered_at: now,
244            },
245        );
246        true
247    }
248
249    /// Entfernt eine Publication per GUID (z.B. nach expliziter
250    /// Withdrawal-Notification vom Remote).
251    pub fn remove_publication(&mut self, key: Guid) -> Option<DiscoveredPublication> {
252        let removed = self.publications.remove(&key)?;
253        if let Some(set) = self.pub_index.get_mut(&key.prefix) {
254            set.remove(&(removed.discovered_at, key));
255            if set.is_empty() {
256                self.pub_index.remove(&key.prefix);
257            }
258        }
259        Some(removed)
260    }
261
262    /// Entfernt eine Subscription per GUID.
263    pub fn remove_subscription(&mut self, key: Guid) -> Option<DiscoveredSubscription> {
264        let removed = self.subscriptions.remove(&key)?;
265        if let Some(set) = self.sub_index.get_mut(&key.prefix) {
266            set.remove(&(removed.discovered_at, key));
267            if set.is_empty() {
268                self.sub_index.remove(&key.prefix);
269            }
270        }
271        Some(removed)
272    }
273
274    /// Entfernt alle Publications und Subscriptions eines Remote-
275    /// Participants. Aufruf z.B. bei SPDP-Lease-Timeout.
276    ///
277    /// Rueckgabe: (removed_pubs, removed_subs).
278    pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
279        // Sekundaer-Index hat die exakte Key-Liste pro prefix — O(k)
280        // statt O(N)-retain.
281        let removed_pubs = if let Some(set) = self.pub_index.remove(&prefix) {
282            for (_, g) in &set {
283                self.publications.remove(g);
284            }
285            set.len()
286        } else {
287            0
288        };
289        let removed_subs = if let Some(set) = self.sub_index.remove(&prefix) {
290            for (_, g) in &set {
291                self.subscriptions.remove(g);
292            }
293            set.len()
294        } else {
295            0
296        };
297        (removed_pubs, removed_subs)
298    }
299
300    /// Findet alle Publications, die zu einem `(topic_name, type_name)`
301    /// passen — fuer Reader-seitiges Matching (T4/T5).
302    pub fn match_publications<'a>(
303        &'a self,
304        topic: &'a str,
305        type_name: &'a str,
306    ) -> impl Iterator<Item = &'a DiscoveredPublication> + 'a {
307        self.publications
308            .values()
309            .filter(move |p| p.data.topic_name == topic && p.data.type_name == type_name)
310    }
311
312    /// Findet alle Subscriptions, die zu einem `(topic_name, type_name)`
313    /// passen — fuer Writer-seitiges Matching.
314    pub fn match_subscriptions<'a>(
315        &'a self,
316        topic: &'a str,
317        type_name: &'a str,
318    ) -> impl Iterator<Item = &'a DiscoveredSubscription> + 'a {
319        self.subscriptions
320            .values()
321            .filter(move |s| s.data.topic_name == topic && s.data.type_name == type_name)
322    }
323
324    /// Sammelt die GUIDs aller Publications (fuer Snapshot-Tests /
325    /// Diagnose).
326    #[must_use]
327    pub fn publication_keys(&self) -> Vec<Guid> {
328        self.publications.keys().copied().collect()
329    }
330}
331
332#[cfg(test)]
333#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
334mod tests {
335    use super::*;
336    use zerodds_rtps::participant_data::Duration as DdsDuration;
337    use zerodds_rtps::publication_data::{
338        DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
339    };
340    use zerodds_rtps::wire_types::EntityId;
341
342    fn guid(prefix: [u8; 12], key: [u8; 3]) -> Guid {
343        Guid::new(
344            GuidPrefix::from_bytes(prefix),
345            EntityId::user_writer_with_key(key),
346        )
347    }
348
349    fn pub_data(prefix: [u8; 12], key: [u8; 3], topic: &str) -> PublicationBuiltinTopicData {
350        PublicationBuiltinTopicData {
351            key: guid(prefix, key),
352            participant_key: Guid::new(GuidPrefix::from_bytes(prefix), EntityId::PARTICIPANT),
353            topic_name: topic.into(),
354            type_name: "T".into(),
355            durability: DurabilityKind::Volatile,
356            reliability: ReliabilityQos {
357                kind: ReliabilityKind::Reliable,
358                max_blocking_time: DdsDuration::from_secs(1),
359            },
360            ownership: zerodds_qos::OwnershipKind::Shared,
361            ownership_strength: 0,
362            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
363            deadline: zerodds_qos::DeadlineQosPolicy::default(),
364            lifespan: zerodds_qos::LifespanQosPolicy::default(),
365            partition: alloc::vec::Vec::new(),
366            user_data: alloc::vec::Vec::new(),
367            topic_data: alloc::vec::Vec::new(),
368            group_data: alloc::vec::Vec::new(),
369            type_information: None,
370            data_representation: alloc::vec::Vec::new(),
371            security_info: None,
372            service_instance_name: None,
373            related_entity_guid: None,
374            topic_aliases: None,
375            type_identifier: zerodds_types::TypeIdentifier::None,
376        }
377    }
378
379    #[test]
380    fn fresh_cache_is_empty() {
381        let c = DiscoveredEndpointsCache::default();
382        assert_eq!(c.publications_len(), 0);
383        assert_eq!(c.subscriptions_len(), 0);
384        assert_eq!(c.evicted_count(), 0);
385    }
386
387    #[test]
388    fn insert_publication_returns_true_first_time() {
389        let mut c = DiscoveredEndpointsCache::default();
390        let inserted = c.insert_publication(pub_data([1; 12], [1, 0, 0], "T"), Duration::ZERO);
391        assert!(inserted);
392        assert_eq!(c.publications_len(), 1);
393    }
394
395    #[test]
396    fn reinsert_same_guid_updates_in_place() {
397        let mut c = DiscoveredEndpointsCache::default();
398        let first = c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
399        let second =
400            c.insert_publication(pub_data([1; 12], [1, 0, 0], "B"), Duration::from_secs(1));
401        assert!(first);
402        assert!(!second, "reinsert should return false (update, not insert)");
403        assert_eq!(c.publications_len(), 1);
404        let p = c.publication(guid([1; 12], [1, 0, 0])).unwrap();
405        assert_eq!(p.data.topic_name, "B");
406        assert_eq!(p.discovered_at, Duration::from_secs(1));
407    }
408
409    #[test]
410    fn publications_for_filters_by_prefix() {
411        let mut c = DiscoveredEndpointsCache::default();
412        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
413        c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
414        c.insert_publication(pub_data([1; 12], [3, 0, 0], "C"), Duration::ZERO);
415        let p1: Vec<_> = c
416            .publications_for(GuidPrefix::from_bytes([1; 12]))
417            .collect();
418        assert_eq!(p1.len(), 2);
419        let p2: Vec<_> = c
420            .publications_for(GuidPrefix::from_bytes([2; 12]))
421            .collect();
422        assert_eq!(p2.len(), 1);
423    }
424
425    #[test]
426    fn cap_evicts_oldest_of_same_participant() {
427        let caps = CacheCaps {
428            max_publications_per_participant: 2,
429            max_subscriptions_per_participant: 2,
430        };
431        let mut c = DiscoveredEndpointsCache::new(caps);
432        // 3 Publications vom gleichen Participant, je 1 Sekunde Abstand.
433        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::from_secs(1));
434        c.insert_publication(pub_data([1; 12], [2, 0, 0], "B"), Duration::from_secs(2));
435        c.insert_publication(pub_data([1; 12], [3, 0, 0], "C"), Duration::from_secs(3));
436        // Aelteste (A) raus, B+C drin
437        assert_eq!(c.publications_len(), 2);
438        assert!(c.publication(guid([1; 12], [1, 0, 0])).is_none());
439        assert!(c.publication(guid([1; 12], [2, 0, 0])).is_some());
440        assert!(c.publication(guid([1; 12], [3, 0, 0])).is_some());
441        assert_eq!(c.evicted_count(), 1);
442    }
443
444    #[test]
445    fn cap_is_per_participant_not_global() {
446        let caps = CacheCaps {
447            max_publications_per_participant: 1,
448            ..CacheCaps::default()
449        };
450        let mut c = DiscoveredEndpointsCache::new(caps);
451        // Pro Participant 1 Publication — insgesamt 3 Participants, also
452        // 3 Publications insgesamt, keine Eviction.
453        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
454        c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
455        c.insert_publication(pub_data([3; 12], [3, 0, 0], "C"), Duration::ZERO);
456        assert_eq!(c.publications_len(), 3);
457        assert_eq!(c.evicted_count(), 0);
458    }
459
460    #[test]
461    fn on_participant_lost_removes_all_of_that_prefix() {
462        let mut c = DiscoveredEndpointsCache::default();
463        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
464        c.insert_publication(pub_data([1; 12], [2, 0, 0], "B"), Duration::ZERO);
465        c.insert_publication(pub_data([2; 12], [3, 0, 0], "C"), Duration::ZERO);
466        let (pubs, subs) = c.on_participant_lost(GuidPrefix::from_bytes([1; 12]));
467        assert_eq!(pubs, 2);
468        assert_eq!(subs, 0);
469        assert_eq!(c.publications_len(), 1);
470        assert!(c.publication(guid([2; 12], [3, 0, 0])).is_some());
471    }
472
473    #[test]
474    fn remove_publication_returns_removed() {
475        let mut c = DiscoveredEndpointsCache::default();
476        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
477        let removed = c.remove_publication(guid([1; 12], [1, 0, 0]));
478        assert!(removed.is_some());
479        assert_eq!(removed.unwrap().data.topic_name, "A");
480        assert_eq!(c.publications_len(), 0);
481        assert!(
482            c.remove_publication(guid([1; 12], [1, 0, 0])).is_none(),
483            "second remove is None"
484        );
485    }
486
487    #[test]
488    fn match_publications_filters_topic_and_type() {
489        let mut c = DiscoveredEndpointsCache::default();
490        c.insert_publication(pub_data([1; 12], [1, 0, 0], "Chatter"), Duration::ZERO);
491        let mut p = pub_data([1; 12], [2, 0, 0], "Chatter");
492        p.type_name = "OtherType".into();
493        c.insert_publication(p, Duration::ZERO);
494        c.insert_publication(pub_data([1; 12], [3, 0, 0], "Weather"), Duration::ZERO);
495
496        let matches: Vec<_> = c.match_publications("Chatter", "T").collect();
497        assert_eq!(matches.len(), 1);
498        assert_eq!(matches[0].data.topic_name, "Chatter");
499        assert_eq!(matches[0].data.type_name, "T");
500    }
501
502    #[test]
503    fn subscriptions_match_topic_type() {
504        use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
505        let mut c = DiscoveredEndpointsCache::default();
506        let sub = SubscriptionBuiltinTopicData {
507            key: Guid::new(
508                GuidPrefix::from_bytes([2; 12]),
509                EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
510            ),
511            participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
512            topic_name: "Chatter".into(),
513            type_name: "T".into(),
514            durability: DurabilityKind::Volatile,
515            reliability: ReliabilityQos::default(),
516            ownership: zerodds_qos::OwnershipKind::Shared,
517            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
518            deadline: zerodds_qos::DeadlineQosPolicy::default(),
519            partition: alloc::vec::Vec::new(),
520            user_data: alloc::vec::Vec::new(),
521            topic_data: alloc::vec::Vec::new(),
522            group_data: alloc::vec::Vec::new(),
523            type_information: None,
524            data_representation: alloc::vec::Vec::new(),
525            content_filter: None,
526            security_info: None,
527            service_instance_name: None,
528            related_entity_guid: None,
529            topic_aliases: None,
530            type_identifier: zerodds_types::TypeIdentifier::None,
531        };
532        c.insert_subscription(sub, Duration::ZERO);
533        assert_eq!(c.subscriptions_len(), 1);
534        assert_eq!(c.match_subscriptions("Chatter", "T").count(), 1);
535        assert_eq!(c.match_subscriptions("Chatter", "Other").count(), 0);
536    }
537
538    #[test]
539    fn update_does_not_evict_even_at_cap() {
540        let caps = CacheCaps {
541            max_publications_per_participant: 1,
542            ..CacheCaps::default()
543        };
544        let mut c = DiscoveredEndpointsCache::new(caps);
545        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::from_secs(1));
546        // Reinsert gleicher GUID darf keine Eviction triggern
547        c.insert_publication(pub_data([1; 12], [1, 0, 0], "B"), Duration::from_secs(2));
548        assert_eq!(c.publications_len(), 1);
549        assert_eq!(c.evicted_count(), 0, "update must not count as eviction");
550    }
551
552    #[test]
553    fn publication_keys_enumerates_all_guids() {
554        let mut c = DiscoveredEndpointsCache::default();
555        c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
556        c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
557        let keys = c.publication_keys();
558        assert_eq!(keys.len(), 2);
559    }
560}