1extern crate alloc;
18use alloc::collections::{BTreeMap, BTreeSet};
19use alloc::vec::Vec;
20use core::time::Duration;
21
22use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
23use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
24use zerodds_rtps::wire_types::{Guid, GuidPrefix};
25
26pub const DEFAULT_MAX_PUBLICATIONS_PER_PARTICIPANT: usize = 256;
28pub const DEFAULT_MAX_SUBSCRIPTIONS_PER_PARTICIPANT: usize = 256;
30
31#[derive(Debug, Clone, Copy)]
33pub struct CacheCaps {
34 pub max_publications_per_participant: usize,
37 pub max_subscriptions_per_participant: usize,
39}
40
41impl Default for CacheCaps {
42 fn default() -> Self {
43 Self {
44 max_publications_per_participant: DEFAULT_MAX_PUBLICATIONS_PER_PARTICIPANT,
45 max_subscriptions_per_participant: DEFAULT_MAX_SUBSCRIPTIONS_PER_PARTICIPANT,
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct DiscoveredPublication {
53 pub data: PublicationBuiltinTopicData,
55 pub discovered_at: Duration,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct DiscoveredSubscription {
62 pub data: SubscriptionBuiltinTopicData,
64 pub discovered_at: Duration,
66}
67
68type PerPrefixIndex = BTreeMap<GuidPrefix, BTreeSet<(Duration, Guid)>>;
74
75#[derive(Debug, Clone, Default)]
77pub struct DiscoveredEndpointsCache {
78 publications: BTreeMap<Guid, DiscoveredPublication>,
79 subscriptions: BTreeMap<Guid, DiscoveredSubscription>,
80 pub_index: PerPrefixIndex,
86 sub_index: PerPrefixIndex,
87 caps: CacheCaps,
88 evicted: u64,
89}
90
91impl DiscoveredEndpointsCache {
92 #[must_use]
94 pub fn new(caps: CacheCaps) -> Self {
95 Self {
96 publications: BTreeMap::new(),
97 subscriptions: BTreeMap::new(),
98 pub_index: BTreeMap::new(),
99 sub_index: BTreeMap::new(),
100 caps,
101 evicted: 0,
102 }
103 }
104
105 #[must_use]
107 pub fn publications_len(&self) -> usize {
108 self.publications.len()
109 }
110
111 #[must_use]
113 pub fn subscriptions_len(&self) -> usize {
114 self.subscriptions.len()
115 }
116
117 #[must_use]
119 pub fn evicted_count(&self) -> u64 {
120 self.evicted
121 }
122
123 #[must_use]
125 pub fn publication(&self, key: Guid) -> Option<&DiscoveredPublication> {
126 self.publications.get(&key)
127 }
128
129 #[must_use]
131 pub fn subscription(&self, key: Guid) -> Option<&DiscoveredSubscription> {
132 self.subscriptions.get(&key)
133 }
134
135 pub fn publications_for(
137 &self,
138 prefix: GuidPrefix,
139 ) -> impl Iterator<Item = &DiscoveredPublication> + '_ {
140 self.publications
141 .iter()
142 .filter(move |(guid, _)| guid.prefix == prefix)
143 .map(|(_, p)| p)
144 }
145
146 pub fn subscriptions_for(
148 &self,
149 prefix: GuidPrefix,
150 ) -> impl Iterator<Item = &DiscoveredSubscription> + '_ {
151 self.subscriptions
152 .iter()
153 .filter(move |(guid, _)| guid.prefix == prefix)
154 .map(|(_, p)| p)
155 }
156
157 pub fn publications(&self) -> impl Iterator<Item = &DiscoveredPublication> + '_ {
159 self.publications.values()
160 }
161
162 pub fn subscriptions(&self) -> impl Iterator<Item = &DiscoveredSubscription> + '_ {
164 self.subscriptions.values()
165 }
166
167 pub fn insert_publication(&mut self, data: PublicationBuiltinTopicData, now: Duration) -> bool {
177 let key = data.key;
178 let prefix = key.prefix;
179 if let Some(existing) = self.publications.get_mut(&key) {
180 let old_entry = (existing.discovered_at, key);
182 if let Some(set) = self.pub_index.get_mut(&prefix) {
183 set.remove(&old_entry);
184 set.insert((now, key));
185 }
186 existing.data = data;
187 existing.discovered_at = now;
188 return false;
189 }
190 let set = self.pub_index.entry(prefix).or_default();
193 if set.len() >= self.caps.max_publications_per_participant {
194 if let Some(&oldest) = set.iter().next() {
195 set.remove(&oldest);
196 self.publications.remove(&oldest.1);
197 self.evicted = self.evicted.saturating_add(1);
198 }
199 }
200 set.insert((now, key));
201 self.publications.insert(
202 key,
203 DiscoveredPublication {
204 data,
205 discovered_at: now,
206 },
207 );
208 true
209 }
210
211 pub fn insert_subscription(
214 &mut self,
215 data: SubscriptionBuiltinTopicData,
216 now: Duration,
217 ) -> bool {
218 let key = data.key;
219 let prefix = key.prefix;
220 if let Some(existing) = self.subscriptions.get_mut(&key) {
221 let old_entry = (existing.discovered_at, key);
222 if let Some(set) = self.sub_index.get_mut(&prefix) {
223 set.remove(&old_entry);
224 set.insert((now, key));
225 }
226 existing.data = data;
227 existing.discovered_at = now;
228 return false;
229 }
230 let set = self.sub_index.entry(prefix).or_default();
231 if set.len() >= self.caps.max_subscriptions_per_participant {
232 if let Some(&oldest) = set.iter().next() {
233 set.remove(&oldest);
234 self.subscriptions.remove(&oldest.1);
235 self.evicted = self.evicted.saturating_add(1);
236 }
237 }
238 set.insert((now, key));
239 self.subscriptions.insert(
240 key,
241 DiscoveredSubscription {
242 data,
243 discovered_at: now,
244 },
245 );
246 true
247 }
248
249 pub fn remove_publication(&mut self, key: Guid) -> Option<DiscoveredPublication> {
252 let removed = self.publications.remove(&key)?;
253 if let Some(set) = self.pub_index.get_mut(&key.prefix) {
254 set.remove(&(removed.discovered_at, key));
255 if set.is_empty() {
256 self.pub_index.remove(&key.prefix);
257 }
258 }
259 Some(removed)
260 }
261
262 pub fn remove_subscription(&mut self, key: Guid) -> Option<DiscoveredSubscription> {
264 let removed = self.subscriptions.remove(&key)?;
265 if let Some(set) = self.sub_index.get_mut(&key.prefix) {
266 set.remove(&(removed.discovered_at, key));
267 if set.is_empty() {
268 self.sub_index.remove(&key.prefix);
269 }
270 }
271 Some(removed)
272 }
273
274 pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
279 let removed_pubs = if let Some(set) = self.pub_index.remove(&prefix) {
282 for (_, g) in &set {
283 self.publications.remove(g);
284 }
285 set.len()
286 } else {
287 0
288 };
289 let removed_subs = if let Some(set) = self.sub_index.remove(&prefix) {
290 for (_, g) in &set {
291 self.subscriptions.remove(g);
292 }
293 set.len()
294 } else {
295 0
296 };
297 (removed_pubs, removed_subs)
298 }
299
300 pub fn match_publications<'a>(
303 &'a self,
304 topic: &'a str,
305 type_name: &'a str,
306 ) -> impl Iterator<Item = &'a DiscoveredPublication> + 'a {
307 self.publications
308 .values()
309 .filter(move |p| p.data.topic_name == topic && p.data.type_name == type_name)
310 }
311
312 pub fn match_subscriptions<'a>(
315 &'a self,
316 topic: &'a str,
317 type_name: &'a str,
318 ) -> impl Iterator<Item = &'a DiscoveredSubscription> + 'a {
319 self.subscriptions
320 .values()
321 .filter(move |s| s.data.topic_name == topic && s.data.type_name == type_name)
322 }
323
324 #[must_use]
327 pub fn publication_keys(&self) -> Vec<Guid> {
328 self.publications.keys().copied().collect()
329 }
330}
331
332#[cfg(test)]
333#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
334mod tests {
335 use super::*;
336 use zerodds_rtps::participant_data::Duration as DdsDuration;
337 use zerodds_rtps::publication_data::{
338 DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
339 };
340 use zerodds_rtps::wire_types::EntityId;
341
342 fn guid(prefix: [u8; 12], key: [u8; 3]) -> Guid {
343 Guid::new(
344 GuidPrefix::from_bytes(prefix),
345 EntityId::user_writer_with_key(key),
346 )
347 }
348
349 fn pub_data(prefix: [u8; 12], key: [u8; 3], topic: &str) -> PublicationBuiltinTopicData {
350 PublicationBuiltinTopicData {
351 key: guid(prefix, key),
352 participant_key: Guid::new(GuidPrefix::from_bytes(prefix), EntityId::PARTICIPANT),
353 topic_name: topic.into(),
354 type_name: "T".into(),
355 durability: DurabilityKind::Volatile,
356 reliability: ReliabilityQos {
357 kind: ReliabilityKind::Reliable,
358 max_blocking_time: DdsDuration::from_secs(1),
359 },
360 ownership: zerodds_qos::OwnershipKind::Shared,
361 ownership_strength: 0,
362 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
363 deadline: zerodds_qos::DeadlineQosPolicy::default(),
364 lifespan: zerodds_qos::LifespanQosPolicy::default(),
365 partition: alloc::vec::Vec::new(),
366 user_data: alloc::vec::Vec::new(),
367 topic_data: alloc::vec::Vec::new(),
368 group_data: alloc::vec::Vec::new(),
369 type_information: None,
370 data_representation: alloc::vec::Vec::new(),
371 security_info: None,
372 service_instance_name: None,
373 related_entity_guid: None,
374 topic_aliases: None,
375 type_identifier: zerodds_types::TypeIdentifier::None,
376 }
377 }
378
379 #[test]
380 fn fresh_cache_is_empty() {
381 let c = DiscoveredEndpointsCache::default();
382 assert_eq!(c.publications_len(), 0);
383 assert_eq!(c.subscriptions_len(), 0);
384 assert_eq!(c.evicted_count(), 0);
385 }
386
387 #[test]
388 fn insert_publication_returns_true_first_time() {
389 let mut c = DiscoveredEndpointsCache::default();
390 let inserted = c.insert_publication(pub_data([1; 12], [1, 0, 0], "T"), Duration::ZERO);
391 assert!(inserted);
392 assert_eq!(c.publications_len(), 1);
393 }
394
395 #[test]
396 fn reinsert_same_guid_updates_in_place() {
397 let mut c = DiscoveredEndpointsCache::default();
398 let first = c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
399 let second =
400 c.insert_publication(pub_data([1; 12], [1, 0, 0], "B"), Duration::from_secs(1));
401 assert!(first);
402 assert!(!second, "reinsert should return false (update, not insert)");
403 assert_eq!(c.publications_len(), 1);
404 let p = c.publication(guid([1; 12], [1, 0, 0])).unwrap();
405 assert_eq!(p.data.topic_name, "B");
406 assert_eq!(p.discovered_at, Duration::from_secs(1));
407 }
408
409 #[test]
410 fn publications_for_filters_by_prefix() {
411 let mut c = DiscoveredEndpointsCache::default();
412 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
413 c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
414 c.insert_publication(pub_data([1; 12], [3, 0, 0], "C"), Duration::ZERO);
415 let p1: Vec<_> = c
416 .publications_for(GuidPrefix::from_bytes([1; 12]))
417 .collect();
418 assert_eq!(p1.len(), 2);
419 let p2: Vec<_> = c
420 .publications_for(GuidPrefix::from_bytes([2; 12]))
421 .collect();
422 assert_eq!(p2.len(), 1);
423 }
424
425 #[test]
426 fn cap_evicts_oldest_of_same_participant() {
427 let caps = CacheCaps {
428 max_publications_per_participant: 2,
429 max_subscriptions_per_participant: 2,
430 };
431 let mut c = DiscoveredEndpointsCache::new(caps);
432 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::from_secs(1));
434 c.insert_publication(pub_data([1; 12], [2, 0, 0], "B"), Duration::from_secs(2));
435 c.insert_publication(pub_data([1; 12], [3, 0, 0], "C"), Duration::from_secs(3));
436 assert_eq!(c.publications_len(), 2);
438 assert!(c.publication(guid([1; 12], [1, 0, 0])).is_none());
439 assert!(c.publication(guid([1; 12], [2, 0, 0])).is_some());
440 assert!(c.publication(guid([1; 12], [3, 0, 0])).is_some());
441 assert_eq!(c.evicted_count(), 1);
442 }
443
444 #[test]
445 fn cap_is_per_participant_not_global() {
446 let caps = CacheCaps {
447 max_publications_per_participant: 1,
448 ..CacheCaps::default()
449 };
450 let mut c = DiscoveredEndpointsCache::new(caps);
451 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
454 c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
455 c.insert_publication(pub_data([3; 12], [3, 0, 0], "C"), Duration::ZERO);
456 assert_eq!(c.publications_len(), 3);
457 assert_eq!(c.evicted_count(), 0);
458 }
459
460 #[test]
461 fn on_participant_lost_removes_all_of_that_prefix() {
462 let mut c = DiscoveredEndpointsCache::default();
463 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
464 c.insert_publication(pub_data([1; 12], [2, 0, 0], "B"), Duration::ZERO);
465 c.insert_publication(pub_data([2; 12], [3, 0, 0], "C"), Duration::ZERO);
466 let (pubs, subs) = c.on_participant_lost(GuidPrefix::from_bytes([1; 12]));
467 assert_eq!(pubs, 2);
468 assert_eq!(subs, 0);
469 assert_eq!(c.publications_len(), 1);
470 assert!(c.publication(guid([2; 12], [3, 0, 0])).is_some());
471 }
472
473 #[test]
474 fn remove_publication_returns_removed() {
475 let mut c = DiscoveredEndpointsCache::default();
476 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
477 let removed = c.remove_publication(guid([1; 12], [1, 0, 0]));
478 assert!(removed.is_some());
479 assert_eq!(removed.unwrap().data.topic_name, "A");
480 assert_eq!(c.publications_len(), 0);
481 assert!(
482 c.remove_publication(guid([1; 12], [1, 0, 0])).is_none(),
483 "second remove is None"
484 );
485 }
486
487 #[test]
488 fn match_publications_filters_topic_and_type() {
489 let mut c = DiscoveredEndpointsCache::default();
490 c.insert_publication(pub_data([1; 12], [1, 0, 0], "Chatter"), Duration::ZERO);
491 let mut p = pub_data([1; 12], [2, 0, 0], "Chatter");
492 p.type_name = "OtherType".into();
493 c.insert_publication(p, Duration::ZERO);
494 c.insert_publication(pub_data([1; 12], [3, 0, 0], "Weather"), Duration::ZERO);
495
496 let matches: Vec<_> = c.match_publications("Chatter", "T").collect();
497 assert_eq!(matches.len(), 1);
498 assert_eq!(matches[0].data.topic_name, "Chatter");
499 assert_eq!(matches[0].data.type_name, "T");
500 }
501
502 #[test]
503 fn subscriptions_match_topic_type() {
504 use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
505 let mut c = DiscoveredEndpointsCache::default();
506 let sub = SubscriptionBuiltinTopicData {
507 key: Guid::new(
508 GuidPrefix::from_bytes([2; 12]),
509 EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
510 ),
511 participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
512 topic_name: "Chatter".into(),
513 type_name: "T".into(),
514 durability: DurabilityKind::Volatile,
515 reliability: ReliabilityQos::default(),
516 ownership: zerodds_qos::OwnershipKind::Shared,
517 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
518 deadline: zerodds_qos::DeadlineQosPolicy::default(),
519 partition: alloc::vec::Vec::new(),
520 user_data: alloc::vec::Vec::new(),
521 topic_data: alloc::vec::Vec::new(),
522 group_data: alloc::vec::Vec::new(),
523 type_information: None,
524 data_representation: alloc::vec::Vec::new(),
525 content_filter: None,
526 security_info: None,
527 service_instance_name: None,
528 related_entity_guid: None,
529 topic_aliases: None,
530 type_identifier: zerodds_types::TypeIdentifier::None,
531 };
532 c.insert_subscription(sub, Duration::ZERO);
533 assert_eq!(c.subscriptions_len(), 1);
534 assert_eq!(c.match_subscriptions("Chatter", "T").count(), 1);
535 assert_eq!(c.match_subscriptions("Chatter", "Other").count(), 0);
536 }
537
538 #[test]
539 fn update_does_not_evict_even_at_cap() {
540 let caps = CacheCaps {
541 max_publications_per_participant: 1,
542 ..CacheCaps::default()
543 };
544 let mut c = DiscoveredEndpointsCache::new(caps);
545 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::from_secs(1));
546 c.insert_publication(pub_data([1; 12], [1, 0, 0], "B"), Duration::from_secs(2));
548 assert_eq!(c.publications_len(), 1);
549 assert_eq!(c.evicted_count(), 0, "update must not count as eviction");
550 }
551
552 #[test]
553 fn publication_keys_enumerates_all_guids() {
554 let mut c = DiscoveredEndpointsCache::default();
555 c.insert_publication(pub_data([1; 12], [1, 0, 0], "A"), Duration::ZERO);
556 c.insert_publication(pub_data([2; 12], [2, 0, 0], "B"), Duration::ZERO);
557 let keys = c.publication_keys();
558 assert_eq!(keys.len(), 2);
559 }
560}