1extern crate alloc;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36
37#[cfg(feature = "std")]
38use std::sync::Mutex;
39
40use crate::builtin_subscriber::BuiltinSubscriber;
41use crate::builtin_topics::{ParticipantBuiltinTopicData, TopicBuiltinTopicData};
42use crate::dds_type::DdsType;
43use crate::entity::StatusMask;
44use crate::error::{DdsError, Result};
45use crate::instance_handle::InstanceHandle;
46use crate::listener::ArcDomainParticipantListener;
47use crate::publisher::Publisher;
48use crate::qos::{DomainParticipantQos, PublisherQos, SubscriberQos, TopicQos};
49use crate::subscriber::Subscriber;
50use crate::topic::{
51 ContentFilteredTopic, Topic, TopicDescription, TopicDescriptionHandle, TopicInner,
52};
53
54#[cfg(feature = "std")]
55use crate::runtime::{DcpsRuntime, RuntimeConfig};
56
57pub type DomainId = i32;
59
60#[derive(Debug, Default)]
69#[cfg(feature = "std")]
70pub(crate) struct IgnoreFilterInner {
71 pub(crate) participants: Mutex<BTreeSet<InstanceHandle>>,
72 pub(crate) topics: Mutex<BTreeSet<InstanceHandle>>,
73 pub(crate) publications: Mutex<BTreeSet<InstanceHandle>>,
74 pub(crate) subscriptions: Mutex<BTreeSet<InstanceHandle>>,
75}
76
77#[derive(Clone, Debug, Default)]
81#[cfg(feature = "std")]
82pub struct IgnoreFilter {
83 pub(crate) inner: Arc<IgnoreFilterInner>,
84}
85
86#[cfg(feature = "std")]
87impl IgnoreFilter {
88 #[must_use]
90 pub fn is_participant_ignored(&self, h: InstanceHandle) -> bool {
91 self.inner
92 .participants
93 .lock()
94 .map(|s| s.contains(&h))
95 .unwrap_or(false)
96 }
97
98 #[must_use]
100 pub fn is_topic_ignored(&self, h: InstanceHandle) -> bool {
101 self.inner
102 .topics
103 .lock()
104 .map(|s| s.contains(&h))
105 .unwrap_or(false)
106 }
107
108 #[must_use]
110 pub fn is_publication_ignored(&self, h: InstanceHandle) -> bool {
111 self.inner
112 .publications
113 .lock()
114 .map(|s| s.contains(&h))
115 .unwrap_or(false)
116 }
117
118 #[must_use]
120 pub fn is_subscription_ignored(&self, h: InstanceHandle) -> bool {
121 self.inner
122 .subscriptions
123 .lock()
124 .map(|s| s.contains(&h))
125 .unwrap_or(false)
126 }
127}
128
129#[cfg(feature = "std")]
133fn random_guid_prefix() -> zerodds_rtps::wire_types::GuidPrefix {
134 use std::sync::atomic::{AtomicU32, Ordering};
135 static COUNTER: AtomicU32 = AtomicU32::new(0);
136 let pid = std::process::id();
137 let t = std::time::SystemTime::now()
138 .duration_since(std::time::UNIX_EPOCH)
139 .map(|d| d.as_nanos() as u64)
140 .unwrap_or(0);
141 let c = COUNTER.fetch_add(1, Ordering::Relaxed);
142 let mut bytes = [0u8; 12];
143 bytes[0..4].copy_from_slice(&pid.to_le_bytes());
144 bytes[4..12].copy_from_slice(&t.to_le_bytes());
145 bytes[11] = bytes[11].wrapping_add(c as u8);
146 zerodds_rtps::wire_types::GuidPrefix::from_bytes(bytes)
147}
148
149#[derive(Clone)]
151pub struct DomainParticipant {
152 inner: Arc<ParticipantInner>,
153}
154
155impl core::fmt::Debug for DomainParticipant {
156 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
157 f.debug_struct("DomainParticipant")
158 .field("domain_id", &self.inner.domain_id)
159 .finish_non_exhaustive()
160 }
161}
162
163pub(crate) struct ParticipantInner {
164 pub(crate) domain_id: DomainId,
165 pub(crate) qos: Mutex<DomainParticipantQos>,
166 pub(crate) entity_state: Arc<crate::entity::EntityState>,
168 topics: Mutex<BTreeMap<String, Arc<TopicInner>>>,
172 #[cfg(feature = "std")]
176 pub(crate) runtime: Option<Arc<DcpsRuntime>>,
177 pub(crate) builtin_subscriber: Arc<BuiltinSubscriber>,
181 #[cfg(feature = "std")]
186 pub(crate) ignore_filter: IgnoreFilter,
187 publishers: Mutex<Vec<InstanceHandle>>,
195 subscribers: Mutex<Vec<InstanceHandle>>,
197 pub(crate) datawriters: Mutex<Vec<InstanceHandle>>,
201 pub(crate) datareaders: Mutex<Vec<InstanceHandle>>,
204 pub(crate) listener: Mutex<Option<(ArcDomainParticipantListener, StatusMask)>>,
208 #[cfg(feature = "std")]
214 pub(crate) type_registry: Mutex<BTreeMap<String, zerodds_types::dynamic::DynamicType>>,
215 #[cfg(feature = "std")]
220 pub(crate) type_lookup: Mutex<TypeLookupState>,
221}
222
223#[cfg(feature = "std")]
226#[derive(Debug, Default)]
227pub(crate) struct TypeLookupState {
228 pub attempts: BTreeMap<zerodds_types::EquivalenceHash, (std::time::Instant, u32)>,
230 pub outgoing: Vec<(zerodds_types::EquivalenceHash, u64)>,
235}
236
237#[cfg(feature = "std")]
238impl TypeLookupState {
239 pub const BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
241 pub const MAX_ATTEMPTS: u32 = 3;
243}
244
245impl DomainParticipant {
246 pub(crate) fn new(domain_id: DomainId, qos: DomainParticipantQos) -> Self {
250 let builtin = Arc::new(BuiltinSubscriber::new());
251 let participant = Self {
252 inner: Arc::new(ParticipantInner {
253 domain_id,
254 qos: Mutex::new(qos),
255 entity_state: crate::entity::EntityState::new(),
256 topics: Mutex::new(BTreeMap::new()),
257 #[cfg(feature = "std")]
258 runtime: None,
259 builtin_subscriber: builtin,
260 #[cfg(feature = "std")]
261 ignore_filter: IgnoreFilter::default(),
262 publishers: Mutex::new(Vec::new()),
263 subscribers: Mutex::new(Vec::new()),
264 datawriters: Mutex::new(Vec::new()),
265 datareaders: Mutex::new(Vec::new()),
266 listener: Mutex::new(None),
267 #[cfg(feature = "std")]
268 type_registry: Mutex::new(BTreeMap::new()),
269 #[cfg(feature = "std")]
270 type_lookup: Mutex::new(TypeLookupState::default()),
271 }),
272 };
273 #[cfg(feature = "std")]
275 participant.register_builtin_types();
276 participant
277 }
278
279 #[cfg(feature = "std")]
285 pub(crate) fn new_with_runtime(
286 domain_id: DomainId,
287 qos: DomainParticipantQos,
288 config: RuntimeConfig,
289 ) -> Result<Self> {
290 let runtime = DcpsRuntime::start(domain_id, random_guid_prefix(), config)?;
291 let builtin = Arc::new(BuiltinSubscriber::new());
292 runtime.attach_builtin_sinks(builtin.sinks());
295 let ignore_filter = IgnoreFilter::default();
300 runtime.attach_ignore_filter(ignore_filter.clone());
301 let participant = Self {
302 inner: Arc::new(ParticipantInner {
303 domain_id,
304 qos: Mutex::new(qos),
305 entity_state: crate::entity::EntityState::new(),
306 topics: Mutex::new(BTreeMap::new()),
307 runtime: Some(runtime),
308 builtin_subscriber: builtin,
309 ignore_filter,
310 publishers: Mutex::new(Vec::new()),
311 subscribers: Mutex::new(Vec::new()),
312 datawriters: Mutex::new(Vec::new()),
313 datareaders: Mutex::new(Vec::new()),
314 listener: Mutex::new(None),
315 type_registry: Mutex::new(BTreeMap::new()),
316 type_lookup: Mutex::new(TypeLookupState::default()),
317 }),
318 };
319 participant.register_builtin_types();
321 Ok(participant)
322 }
323
324 #[cfg(feature = "std")]
328 #[must_use]
329 pub fn runtime(&self) -> Option<&Arc<DcpsRuntime>> {
330 self.inner.runtime.as_ref()
331 }
332
333 #[must_use]
335 pub fn domain_id(&self) -> DomainId {
336 self.inner.domain_id
337 }
338
339 #[must_use]
342 pub fn qos(&self) -> DomainParticipantQos {
343 self.inner.qos.lock().map(|g| g.clone()).unwrap_or_default()
344 }
345
346 pub fn set_qos(&self, qos: DomainParticipantQos) -> Result<()> {
353 if let Ok(mut g) = self.inner.qos.lock() {
354 *g = qos;
355 }
356 Ok(())
357 }
358
359 #[cfg(feature = "std")]
368 pub fn register_builtin_types(&self) {
369 if let Ok(types) = zerodds_types::dynamic::all_builtin_types() {
370 if let Ok(mut reg) = self.inner.type_registry.lock() {
371 for (name, t) in types {
372 reg.insert(name, t);
373 }
374 }
375 }
376 }
377
378 #[cfg(feature = "std")]
382 pub fn unregister_builtin_types(&self) {
383 if let Ok(mut reg) = self.inner.type_registry.lock() {
384 reg.retain(|name, _| !zerodds_types::dynamic::is_builtin_type_name(name));
385 }
386 }
387
388 #[cfg(feature = "std")]
392 #[must_use]
393 pub fn find_builtin_type(&self, name: &str) -> Option<zerodds_types::dynamic::DynamicType> {
394 self.inner
395 .type_registry
396 .lock()
397 .ok()
398 .and_then(|reg| reg.get(name).cloned())
399 }
400
401 #[cfg(feature = "std")]
403 #[must_use]
404 pub fn registered_type_count(&self) -> usize {
405 self.inner
406 .type_registry
407 .lock()
408 .map(|r| r.len())
409 .unwrap_or(0)
410 }
411
412 #[cfg(feature = "std")]
419 pub fn enqueue_type_lookup(&self, hash: zerodds_types::EquivalenceHash) -> bool {
420 let mut state = match self.inner.type_lookup.lock() {
421 Ok(s) => s,
422 Err(_) => return false,
423 };
424 let now = std::time::Instant::now();
425 if let Some((last, retries)) = state.attempts.get(&hash).copied() {
426 if retries >= TypeLookupState::MAX_ATTEMPTS {
427 return false;
428 }
429 if now.duration_since(last) < TypeLookupState::BACKOFF {
430 return false;
431 }
432 state
433 .attempts
434 .insert(hash, (now, retries.saturating_add(1)));
435 } else {
436 state.attempts.insert(hash, (now, 1));
437 }
438 let seq = state.outgoing.len() as u64 + 1;
440 state.outgoing.push((hash, seq));
441 true
442 }
443
444 #[cfg(feature = "std")]
449 #[must_use]
450 pub fn drain_type_lookup_requests(&self) -> Vec<(zerodds_types::EquivalenceHash, u64)> {
451 self.inner
452 .type_lookup
453 .lock()
454 .map(|mut s| core::mem::take(&mut s.outgoing))
455 .unwrap_or_default()
456 }
457
458 #[cfg(feature = "std")]
464 pub fn ingest_type_lookup_reply(
465 &self,
466 types: Vec<(
467 zerodds_types::EquivalenceHash,
468 zerodds_types::MinimalTypeObject,
469 )>,
470 ) -> usize {
471 let mut count = 0;
472 if let Ok(mut state) = self.inner.type_lookup.lock() {
473 for (hash, _t) in &types {
474 state.attempts.remove(hash);
475 count += 1;
476 }
477 }
478 let _ = types;
482 count
483 }
484
485 #[cfg(feature = "std")]
498 pub fn on_remote_publication_discovered(&self, type_information_blob: Option<&[u8]>) -> usize {
499 self.on_remote_type_information(type_information_blob)
500 }
501
502 #[cfg(feature = "std")]
506 pub fn on_remote_subscription_discovered(&self, type_information_blob: Option<&[u8]>) -> usize {
507 self.on_remote_type_information(type_information_blob)
508 }
509
510 #[cfg(feature = "std")]
511 fn on_remote_type_information(&self, blob: Option<&[u8]>) -> usize {
512 let Some(bytes) = blob else {
513 return 0;
514 };
515 let Ok(ti) = zerodds_types::type_information::TypeInformation::from_bytes_le(bytes) else {
516 return 0;
517 };
518 let mut queued = 0;
519 if let Some(hash) = extract_equivalence_hash(&ti.minimal.typeid_with_size.type_id) {
521 if !self.has_type_for_hash(hash) && self.enqueue_type_lookup(hash) {
522 queued += 1;
523 }
524 }
525 if let Some(hash) = extract_equivalence_hash(&ti.complete.typeid_with_size.type_id) {
527 if !self.has_type_for_hash(hash) && self.enqueue_type_lookup(hash) {
528 queued += 1;
529 }
530 }
531 queued
532 }
533
534 #[cfg(feature = "std")]
541 fn has_type_for_hash(&self, hash: zerodds_types::EquivalenceHash) -> bool {
542 let Some(rt) = self.inner.runtime.as_ref() else {
543 return false;
544 };
545 let Ok(server) = rt.type_lookup_server.lock() else {
546 return false;
547 };
548 server.registry.get_minimal(&hash).is_some()
549 || server.registry.get_complete(&hash).is_some()
550 }
551
552 #[cfg(feature = "std")]
556 #[must_use]
557 pub fn type_lookup_exhausted(&self, hash: zerodds_types::EquivalenceHash) -> bool {
558 self.inner
559 .type_lookup
560 .lock()
561 .ok()
562 .and_then(|s| s.attempts.get(&hash).map(|(_, n)| *n))
563 .unwrap_or(0)
564 >= TypeLookupState::MAX_ATTEMPTS
565 }
566
567 pub fn create_topic<T: DdsType>(&self, name: &str, qos: TopicQos) -> Result<Topic<T>> {
575 if name.is_empty() {
576 return Err(DdsError::BadParameter { what: "topic name" });
577 }
578 let mut topics = self
579 .inner
580 .topics
581 .lock()
582 .map_err(|_| DdsError::PreconditionNotMet {
583 reason: "topic registry poisoned",
584 })?;
585 if let Some(existing) = topics.get(name) {
586 if existing.type_name != T::TYPE_NAME {
587 #[cfg(feature = "std")]
592 existing
593 .inconsistent_topic_count
594 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
595 return Err(DdsError::InconsistentPolicy {
596 what: "topic name reused with different type",
597 });
598 }
599 return Ok(reconstruct_topic::<T>(existing.clone(), self.clone()));
601 }
602 let topic = Topic::<T>::new(name.into(), qos, self.clone());
603 topics.insert(name.into(), topic_inner(&topic));
604 Ok(topic)
605 }
606
607 #[must_use]
613 pub fn lookup_topicdescription(&self, name: &str) -> Option<TopicDescriptionHandle> {
614 let topics = self.inner.topics.lock().ok()?;
615 let inner = topics.get(name)?;
616 Some(TopicDescriptionHandle::new(
617 inner.name.clone(),
618 String::from(inner.type_name),
619 self.clone(),
620 ))
621 }
622
623 #[cfg(feature = "std")]
640 pub fn find_topic(
641 &self,
642 name: &str,
643 timeout: core::time::Duration,
644 ) -> Result<TopicDescriptionHandle> {
645 if name.is_empty() {
646 return Err(DdsError::BadParameter { what: "topic name" });
647 }
648 let deadline = std::time::Instant::now() + timeout;
649 if let Some(h) = self.lookup_topicdescription(name) {
652 return Ok(h);
653 }
654 let poll = core::time::Duration::from_millis(20);
657 loop {
658 if let Some(handle) = self.find_topic_in_sedp(name) {
659 return Ok(handle);
660 }
661 if std::time::Instant::now() >= deadline {
662 return Err(DdsError::Timeout);
663 }
664 std::thread::sleep(poll);
665 }
666 }
667
668 #[cfg(feature = "std")]
672 fn find_topic_in_sedp(&self, name: &str) -> Option<TopicDescriptionHandle> {
673 let rt = self.inner.runtime.as_ref()?;
674 let sedp = rt.sedp.lock().ok()?;
675 for p in sedp.cache().publications() {
677 if p.data.topic_name == name {
678 return Some(TopicDescriptionHandle::new(
679 p.data.topic_name.clone(),
680 p.data.type_name.clone(),
681 self.clone(),
682 ));
683 }
684 }
685 for s in sedp.cache().subscriptions() {
686 if s.data.topic_name == name {
687 return Some(TopicDescriptionHandle::new(
688 s.data.topic_name.clone(),
689 s.data.type_name.clone(),
690 self.clone(),
691 ));
692 }
693 }
694 None
695 }
696
697 pub fn create_contentfilteredtopic<T: DdsType>(
711 &self,
712 name: &str,
713 related_topic: &Topic<T>,
714 filter_expression: &str,
715 filter_parameters: alloc::vec::Vec<String>,
716 ) -> Result<ContentFilteredTopic<T>> {
717 if name.is_empty() {
718 return Err(DdsError::BadParameter {
719 what: "content-filtered-topic name",
720 });
721 }
722 if filter_expression.is_empty() {
723 return Err(DdsError::BadParameter {
724 what: "filter expression",
725 });
726 }
727 ContentFilteredTopic::<T>::new(
728 name.into(),
729 related_topic.clone(),
730 filter_expression.into(),
731 filter_parameters,
732 self.clone(),
733 )
734 }
735
736 pub fn create_multitopic<T: DdsType>(
748 &self,
749 name: &str,
750 type_name: &str,
751 related_topic_names: alloc::vec::Vec<String>,
752 subscription_expression: &str,
753 expression_parameters: alloc::vec::Vec<String>,
754 ) -> Result<crate::topic::MultiTopic<T>> {
755 if name.is_empty() {
756 return Err(DdsError::BadParameter {
757 what: "multitopic name",
758 });
759 }
760 if type_name.is_empty() {
761 return Err(DdsError::BadParameter {
762 what: "multitopic type_name",
763 });
764 }
765 if subscription_expression.is_empty() {
766 return Err(DdsError::BadParameter {
767 what: "multitopic subscription expression",
768 });
769 }
770 crate::topic::MultiTopic::<T>::new(
771 name.into(),
772 type_name.into(),
773 related_topic_names,
774 subscription_expression.into(),
775 expression_parameters,
776 self.clone(),
777 )
778 }
779
780 pub fn delete_multitopic<T: DdsType>(&self, mt: &crate::topic::MultiTopic<T>) -> Result<()> {
788 if mt.get_participant().inner_ptr() != self.inner_ptr() {
789 return Err(DdsError::BadParameter {
790 what: "multitopic belongs to different participant",
791 });
792 }
793 Ok(())
794 }
795
796 pub fn delete_contentfilteredtopic<T: DdsType>(
810 &self,
811 cft: &ContentFilteredTopic<T>,
812 ) -> Result<()> {
813 if cft.get_participant().inner_ptr() != self.inner_ptr() {
814 return Err(DdsError::BadParameter {
815 what: "cft belongs to different participant",
816 });
817 }
818 Ok(())
819 }
820
821 pub(crate) fn inner_ptr(&self) -> *const ParticipantInner {
824 Arc::as_ptr(&self.inner)
825 }
826
827 pub fn create_publisher(&self, qos: PublisherQos) -> Publisher {
830 #[cfg(feature = "std")]
831 let p = {
832 let p = Publisher::new(qos, self.inner.runtime.clone());
833 p.attach_participant(Arc::downgrade(&self.inner));
836 p
837 };
838 #[cfg(not(feature = "std"))]
839 let p = Publisher::new(qos);
840 if let Ok(mut list) = self.inner.publishers.lock() {
842 list.push(p.inner.entity_state.instance_handle());
843 }
844 p
845 }
846
847 pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber {
849 #[cfg(feature = "std")]
850 let s = {
851 let s = Subscriber::new(qos, self.inner.runtime.clone());
852 s.attach_participant(Arc::downgrade(&self.inner));
854 s
855 };
856 #[cfg(not(feature = "std"))]
857 let s = Subscriber::new(qos);
858 if let Ok(mut list) = self.inner.subscribers.lock() {
859 list.push(s.inner.entity_state.instance_handle());
860 }
861 s
862 }
863
864 #[must_use]
866 pub fn topics_len(&self) -> usize {
867 self.inner.topics.lock().map(|t| t.len()).unwrap_or(0)
868 }
869
870 #[must_use]
874 pub fn discovered_participants_count(&self) -> usize {
875 #[cfg(feature = "std")]
876 if let Some(rt) = self.inner.runtime.as_ref() {
877 return rt.discovered_participants().len();
878 }
879 0
880 }
881
882 #[must_use]
885 pub fn discovered_publications_count(&self) -> usize {
886 #[cfg(feature = "std")]
887 if let Some(rt) = self.inner.runtime.as_ref() {
888 return rt.discovered_publications_count();
889 }
890 0
891 }
892
893 #[must_use]
895 pub fn discovered_subscriptions_count(&self) -> usize {
896 #[cfg(feature = "std")]
897 if let Some(rt) = self.inner.runtime.as_ref() {
898 return rt.discovered_subscriptions_count();
899 }
900 0
901 }
902
903 pub fn ignore_participant(&self, handle: InstanceHandle) -> Result<()> {
921 #[cfg(feature = "std")]
922 if let Ok(mut s) = self.inner.ignore_filter.inner.participants.lock() {
923 s.insert(handle);
924 }
925 Ok(())
926 }
927
928 pub fn ignore_topic(&self, handle: InstanceHandle) -> Result<()> {
934 #[cfg(feature = "std")]
935 if let Ok(mut s) = self.inner.ignore_filter.inner.topics.lock() {
936 s.insert(handle);
937 }
938 Ok(())
939 }
940
941 pub fn ignore_publication(&self, handle: InstanceHandle) -> Result<()> {
947 #[cfg(feature = "std")]
948 if let Ok(mut s) = self.inner.ignore_filter.inner.publications.lock() {
949 s.insert(handle);
950 }
951 Ok(())
952 }
953
954 pub fn ignore_subscription(&self, handle: InstanceHandle) -> Result<()> {
960 #[cfg(feature = "std")]
961 if let Ok(mut s) = self.inner.ignore_filter.inner.subscriptions.lock() {
962 s.insert(handle);
963 }
964 Ok(())
965 }
966
967 #[must_use]
969 pub fn is_participant_ignored(&self, handle: InstanceHandle) -> bool {
970 #[cfg(feature = "std")]
971 return self.inner.ignore_filter.is_participant_ignored(handle);
972 #[cfg(not(feature = "std"))]
973 {
974 let _ = handle;
975 false
976 }
977 }
978
979 #[must_use]
981 pub fn is_topic_ignored(&self, handle: InstanceHandle) -> bool {
982 #[cfg(feature = "std")]
983 return self.inner.ignore_filter.is_topic_ignored(handle);
984 #[cfg(not(feature = "std"))]
985 {
986 let _ = handle;
987 false
988 }
989 }
990
991 #[must_use]
993 pub fn is_publication_ignored(&self, handle: InstanceHandle) -> bool {
994 #[cfg(feature = "std")]
995 return self.inner.ignore_filter.is_publication_ignored(handle);
996 #[cfg(not(feature = "std"))]
997 {
998 let _ = handle;
999 false
1000 }
1001 }
1002
1003 #[must_use]
1005 pub fn is_subscription_ignored(&self, handle: InstanceHandle) -> bool {
1006 #[cfg(feature = "std")]
1007 return self.inner.ignore_filter.is_subscription_ignored(handle);
1008 #[cfg(not(feature = "std"))]
1009 {
1010 let _ = handle;
1011 false
1012 }
1013 }
1014
1015 #[cfg(feature = "std")]
1018 #[must_use]
1019 #[allow(dead_code)]
1020 pub(crate) fn ignore_filter(&self) -> IgnoreFilter {
1021 self.inner.ignore_filter.clone()
1022 }
1023
1024 pub fn delete_contained_entities(&self) -> Result<()> {
1048 {
1050 let mut topics =
1051 self.inner
1052 .topics
1053 .lock()
1054 .map_err(|_| DdsError::PreconditionNotMet {
1055 reason: "topic registry poisoned",
1056 })?;
1057 topics.clear();
1058 }
1059 if let Ok(mut p) = self.inner.publishers.lock() {
1061 p.clear();
1062 }
1063 if let Ok(mut s) = self.inner.subscribers.lock() {
1064 s.clear();
1065 }
1066 let sinks = self.inner.builtin_subscriber.sinks();
1071 if let Ok(mut g) = sinks.participant.lock() {
1072 g.clear();
1073 }
1074 if let Ok(mut g) = sinks.topic.lock() {
1075 g.clear();
1076 }
1077 if let Ok(mut g) = sinks.publication.lock() {
1078 g.clear();
1079 }
1080 if let Ok(mut g) = sinks.subscription.lock() {
1081 g.clear();
1082 }
1083 Ok(())
1084 }
1085
1086 #[must_use]
1089 pub fn publishers_len(&self) -> usize {
1090 self.inner.publishers.lock().map(|p| p.len()).unwrap_or(0)
1091 }
1092
1093 #[must_use]
1095 pub fn subscribers_len(&self) -> usize {
1096 self.inner.subscribers.lock().map(|s| s.len()).unwrap_or(0)
1097 }
1098
1099 #[must_use]
1103 pub fn instance_handle(&self) -> InstanceHandle {
1104 self.inner.entity_state.instance_handle()
1105 }
1106
1107 #[must_use]
1119 pub fn contains_entity(&self, handle: InstanceHandle) -> bool {
1120 if self.instance_handle() == handle {
1121 return true;
1122 }
1123 if let Ok(topics) = self.inner.topics.lock() {
1124 for t in topics.values() {
1125 if t.entity_state.instance_handle() == handle {
1126 return true;
1127 }
1128 }
1129 }
1130 if let Ok(pubs) = self.inner.publishers.lock() {
1131 if pubs.contains(&handle) {
1132 return true;
1133 }
1134 }
1135 if let Ok(subs) = self.inner.subscribers.lock() {
1136 if subs.contains(&handle) {
1137 return true;
1138 }
1139 }
1140 if let Ok(dws) = self.inner.datawriters.lock() {
1141 if dws.contains(&handle) {
1142 return true;
1143 }
1144 }
1145 if let Ok(drs) = self.inner.datareaders.lock() {
1146 if drs.contains(&handle) {
1147 return true;
1148 }
1149 }
1150 false
1151 }
1152
1153 #[cfg(feature = "std")]
1161 #[must_use]
1162 pub fn get_discovered_participants(&self) -> Vec<InstanceHandle> {
1163 let Some(rt) = self.inner.runtime.as_ref() else {
1164 return Vec::new();
1165 };
1166 let mut out = Vec::new();
1167 for d in rt.discovered_participants() {
1168 let h = InstanceHandle::from_guid(d.data.guid);
1169 if self.is_participant_ignored(h) {
1170 continue;
1171 }
1172 out.push(h);
1173 }
1174 out
1175 }
1176
1177 #[cfg(not(feature = "std"))]
1179 #[must_use]
1180 pub fn get_discovered_participants(&self) -> Vec<InstanceHandle> {
1181 Vec::new()
1182 }
1183
1184 #[cfg(feature = "std")]
1191 pub fn get_discovered_participant_data(
1192 &self,
1193 handle: InstanceHandle,
1194 ) -> Result<ParticipantBuiltinTopicData> {
1195 if self.is_participant_ignored(handle) {
1196 return Err(DdsError::BadParameter {
1197 what: "participant handle is ignored",
1198 });
1199 }
1200 let Some(rt) = self.inner.runtime.as_ref() else {
1201 return Err(DdsError::BadParameter {
1202 what: "no runtime — offline participant",
1203 });
1204 };
1205 for d in rt.discovered_participants() {
1206 if InstanceHandle::from_guid(d.data.guid) == handle {
1207 return Ok(ParticipantBuiltinTopicData::from_wire(&d.data));
1208 }
1209 }
1210 Err(DdsError::BadParameter {
1211 what: "unknown participant handle",
1212 })
1213 }
1214
1215 #[cfg(not(feature = "std"))]
1217 pub fn get_discovered_participant_data(
1218 &self,
1219 _handle: InstanceHandle,
1220 ) -> Result<ParticipantBuiltinTopicData> {
1221 Err(DdsError::BadParameter {
1222 what: "no runtime — offline participant",
1223 })
1224 }
1225
1226 #[cfg(feature = "std")]
1234 #[must_use]
1235 pub fn get_discovered_topics(&self) -> Vec<InstanceHandle> {
1236 let Some(rt) = self.inner.runtime.as_ref() else {
1237 return Vec::new();
1238 };
1239 let Ok(sedp) = rt.sedp.lock() else {
1240 return Vec::new();
1241 };
1242 let mut seen = BTreeSet::new();
1243 for p in sedp.cache().publications() {
1244 let key = TopicBuiltinTopicData::synthesize_key(&p.data.topic_name, &p.data.type_name);
1245 let h = InstanceHandle::from_guid(key);
1246 if self.is_topic_ignored(h) {
1247 continue;
1248 }
1249 seen.insert(h);
1250 }
1251 for s in sedp.cache().subscriptions() {
1252 let key = TopicBuiltinTopicData::synthesize_key(&s.data.topic_name, &s.data.type_name);
1253 let h = InstanceHandle::from_guid(key);
1254 if self.is_topic_ignored(h) {
1255 continue;
1256 }
1257 seen.insert(h);
1258 }
1259 seen.into_iter().collect()
1260 }
1261
1262 #[cfg(not(feature = "std"))]
1264 #[must_use]
1265 pub fn get_discovered_topics(&self) -> Vec<InstanceHandle> {
1266 Vec::new()
1267 }
1268
1269 #[cfg(feature = "std")]
1276 pub fn get_discovered_topic_data(
1277 &self,
1278 handle: InstanceHandle,
1279 ) -> Result<TopicBuiltinTopicData> {
1280 if self.is_topic_ignored(handle) {
1281 return Err(DdsError::BadParameter {
1282 what: "topic handle is ignored",
1283 });
1284 }
1285 let Some(rt) = self.inner.runtime.as_ref() else {
1286 return Err(DdsError::BadParameter {
1287 what: "no runtime — offline participant",
1288 });
1289 };
1290 let Ok(sedp) = rt.sedp.lock() else {
1291 return Err(DdsError::PreconditionNotMet {
1292 reason: "sedp poisoned",
1293 });
1294 };
1295 for p in sedp.cache().publications() {
1297 let topic = TopicBuiltinTopicData::from_publication(&p.data);
1298 if InstanceHandle::from_guid(topic.key) == handle {
1299 return Ok(topic);
1300 }
1301 }
1302 for s in sedp.cache().subscriptions() {
1303 let topic = TopicBuiltinTopicData::from_subscription(&s.data);
1304 if InstanceHandle::from_guid(topic.key) == handle {
1305 return Ok(topic);
1306 }
1307 }
1308 Err(DdsError::BadParameter {
1309 what: "unknown topic handle",
1310 })
1311 }
1312
1313 #[cfg(not(feature = "std"))]
1315 pub fn get_discovered_topic_data(
1316 &self,
1317 _handle: InstanceHandle,
1318 ) -> Result<TopicBuiltinTopicData> {
1319 Err(DdsError::BadParameter {
1320 what: "no runtime — offline participant",
1321 })
1322 }
1323
1324 #[must_use]
1351 pub fn get_builtin_subscriber(&self) -> Arc<BuiltinSubscriber> {
1352 Arc::clone(&self.inner.builtin_subscriber)
1353 }
1354
1355 pub fn set_listener(&self, listener: Option<ArcDomainParticipantListener>, mask: StatusMask) {
1363 if let Ok(mut slot) = self.inner.listener.lock() {
1364 *slot = listener.map(|l| (l, mask));
1365 }
1366 self.inner.entity_state.set_listener_mask(mask);
1368 }
1369
1370 #[must_use]
1373 pub fn get_listener(&self) -> Option<ArcDomainParticipantListener> {
1374 self.inner
1375 .listener
1376 .lock()
1377 .ok()
1378 .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
1379 }
1380
1381 #[must_use]
1385 #[allow(dead_code)] pub(crate) fn snapshot_listener(&self) -> Option<(ArcDomainParticipantListener, StatusMask)> {
1387 self.inner
1388 .listener
1389 .lock()
1390 .ok()
1391 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
1392 }
1393}
1394
1395impl crate::entity::Entity for DomainParticipant {
1400 type Qos = DomainParticipantQos;
1401
1402 fn get_qos(&self) -> Self::Qos {
1403 self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
1404 }
1405
1406 fn set_qos(&self, qos: Self::Qos) -> Result<()> {
1407 if let Ok(mut current) = self.inner.qos.lock() {
1410 *current = qos;
1411 }
1412 Ok(())
1413 }
1414
1415 fn enable(&self) -> Result<()> {
1416 self.inner.entity_state.enable();
1417 Ok(())
1418 }
1419
1420 fn entity_state(&self) -> Arc<crate::entity::EntityState> {
1421 Arc::clone(&self.inner.entity_state)
1422 }
1423}
1424
1425fn topic_inner<T: DdsType>(t: &Topic<T>) -> Arc<TopicInner> {
1428 t.inner()
1429}
1430
1431#[cfg(feature = "std")]
1434fn extract_equivalence_hash(
1435 ti: &zerodds_types::TypeIdentifier,
1436) -> Option<zerodds_types::EquivalenceHash> {
1437 use zerodds_types::TypeIdentifier;
1438 match ti {
1439 TypeIdentifier::EquivalenceHashMinimal(h) | TypeIdentifier::EquivalenceHashComplete(h) => {
1440 Some(*h)
1441 }
1442 _ => None,
1443 }
1444}
1445
1446fn reconstruct_topic<T: DdsType>(
1447 inner: Arc<TopicInner>,
1448 participant: DomainParticipant,
1449) -> Topic<T> {
1450 Topic::<T>::from_inner(inner, participant)
1455}
1456
1457impl<T: DdsType> Topic<T> {
1459 pub(crate) fn from_inner(inner: Arc<TopicInner>, participant: DomainParticipant) -> Self {
1460 Self::_from_inner_impl(inner, participant)
1461 }
1462}
1463
1464#[cfg(test)]
1469#[allow(clippy::expect_used, clippy::unwrap_used)]
1470mod tests {
1471 use super::*;
1472 use crate::dds_type::RawBytes;
1473
1474 #[test]
1475 fn participant_created_with_domain_id() {
1476 let p = DomainParticipant::new(42, DomainParticipantQos::default());
1477 assert_eq!(p.domain_id(), 42);
1478 assert_eq!(p.topics_len(), 0);
1479 }
1480
1481 #[test]
1482 fn create_topic_stores_in_registry() {
1483 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1484 let t1 = p
1485 .create_topic::<RawBytes>("Chatter", TopicQos::default())
1486 .unwrap();
1487 let t2 = p
1488 .create_topic::<RawBytes>("Chatter", TopicQos::default())
1489 .unwrap();
1490 assert_eq!(t1.name(), t2.name());
1491 assert_eq!(p.topics_len(), 1);
1492 }
1493
1494 #[test]
1495 fn create_topic_rejects_type_conflict() {
1496 #[derive(Debug)]
1498 struct DummyU32(u32);
1499 impl DdsType for DummyU32 {
1500 const TYPE_NAME: &'static str = "test::DummyU32";
1501 fn encode(
1502 &self,
1503 out: &mut alloc::vec::Vec<u8>,
1504 ) -> core::result::Result<(), crate::dds_type::EncodeError> {
1505 out.extend_from_slice(&self.0.to_le_bytes());
1506 Ok(())
1507 }
1508 fn decode(bytes: &[u8]) -> core::result::Result<Self, crate::dds_type::DecodeError> {
1509 if bytes.len() != 4 {
1510 return Err(crate::dds_type::DecodeError::Invalid { what: "u32 len" });
1511 }
1512 let mut a = [0u8; 4];
1513 a.copy_from_slice(bytes);
1514 Ok(Self(u32::from_le_bytes(a)))
1515 }
1516 }
1517
1518 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1519 let _ = p
1520 .create_topic::<RawBytes>("X", TopicQos::default())
1521 .unwrap();
1522 let err = p
1523 .create_topic::<DummyU32>("X", TopicQos::default())
1524 .unwrap_err();
1525 assert!(matches!(err, DdsError::InconsistentPolicy { .. }));
1526 }
1527
1528 #[test]
1529 fn create_topic_rejects_empty_name() {
1530 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1531 let err = p
1532 .create_topic::<RawBytes>("", TopicQos::default())
1533 .unwrap_err();
1534 assert!(matches!(err, DdsError::BadParameter { .. }));
1535 }
1536
1537 #[test]
1538 fn lookup_topicdescription_returns_local_topics() {
1539 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1540 let _t = p
1541 .create_topic::<RawBytes>("Hello", TopicQos::default())
1542 .unwrap();
1543 let h = p.lookup_topicdescription("Hello").expect("local lookup");
1544 use crate::topic::TopicDescription as _;
1545 assert_eq!(h.get_name(), "Hello");
1546 assert_eq!(h.get_type_name(), RawBytes::TYPE_NAME);
1547 assert_eq!(h.get_participant().domain_id(), 0);
1548 }
1549
1550 #[test]
1551 fn lookup_topicdescription_none_for_unknown() {
1552 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1553 assert!(p.lookup_topicdescription("Unknown").is_none());
1554 }
1555
1556 #[test]
1559 fn contains_entity_returns_true_for_self_handle() {
1560 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1561 let h = p.instance_handle();
1562 assert!(p.contains_entity(h));
1563 }
1564
1565 #[test]
1566 fn contains_entity_returns_true_for_local_topic() {
1567 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1568 let t = p
1569 .create_topic::<RawBytes>("Hi", TopicQos::default())
1570 .unwrap();
1571 let topic_handle = t.inner().entity_state.instance_handle();
1572 assert!(p.contains_entity(topic_handle));
1573 }
1574
1575 #[test]
1576 fn contains_entity_returns_true_for_local_publisher() {
1577 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1578 let pub_ = p.create_publisher(PublisherQos::default());
1579 let h = pub_.inner.entity_state.instance_handle();
1580 assert!(p.contains_entity(h));
1581 }
1582
1583 #[test]
1584 fn contains_entity_returns_true_for_local_subscriber() {
1585 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1586 let s = p.create_subscriber(SubscriberQos::default());
1587 let h = s.inner.entity_state.instance_handle();
1588 assert!(p.contains_entity(h));
1589 }
1590
1591 #[test]
1592 fn contains_entity_returns_false_for_unknown_handle() {
1593 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1594 let other = DomainParticipant::new(0, DomainParticipantQos::default());
1596 let other_h = other.instance_handle();
1597 assert!(!p.contains_entity(other_h));
1598 }
1599
1600 #[test]
1601 fn contains_entity_returns_false_for_topic_after_delete() {
1602 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1603 let t = p
1604 .create_topic::<RawBytes>("Tmp", TopicQos::default())
1605 .unwrap();
1606 let topic_handle = t.inner().entity_state.instance_handle();
1607 assert!(p.contains_entity(topic_handle));
1608 p.delete_contained_entities().unwrap();
1609 assert!(!p.contains_entity(topic_handle));
1610 }
1611
1612 #[test]
1613 fn contains_entity_recursive_finds_local_datawriter() {
1614 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1617 let topic = p
1618 .create_topic::<RawBytes>("Hello", TopicQos::default())
1619 .unwrap();
1620 let pub_ = p.create_publisher(PublisherQos::default());
1621 let dw = pub_
1622 .create_datawriter(&topic, crate::qos::DataWriterQos::default())
1623 .unwrap();
1624 let dw_handle = dw.instance_handle();
1625 assert!(p.contains_entity(dw_handle));
1626 assert!(pub_.contains_writer(dw_handle));
1628 }
1629
1630 #[test]
1631 fn contains_entity_recursive_finds_local_datareader() {
1632 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1633 let topic = p
1634 .create_topic::<RawBytes>("Hello2", TopicQos::default())
1635 .unwrap();
1636 let sub = p.create_subscriber(SubscriberQos::default());
1637 let dr = sub
1638 .create_datareader(&topic, crate::qos::DataReaderQos::default())
1639 .unwrap();
1640 let dr_handle = dr.subscription_handle();
1641 assert!(p.contains_entity(dr_handle));
1642 assert!(sub.contains_reader(dr_handle));
1643 }
1644
1645 #[test]
1646 fn contains_entity_recursive_does_not_find_foreign_datawriter() {
1647 let p1 = DomainParticipant::new(0, DomainParticipantQos::default());
1650 let p2 = DomainParticipant::new(1, DomainParticipantQos::default());
1651 let topic = p2
1652 .create_topic::<RawBytes>("Foreign", TopicQos::default())
1653 .unwrap();
1654 let pub2 = p2.create_publisher(PublisherQos::default());
1655 let dw2 = pub2
1656 .create_datawriter(&topic, crate::qos::DataWriterQos::default())
1657 .unwrap();
1658 assert!(!p1.contains_entity(dw2.instance_handle()));
1659 assert!(p2.contains_entity(dw2.instance_handle()));
1660 }
1661
1662 #[cfg(feature = "std")]
1663 #[test]
1664 fn find_topic_returns_immediately_for_local() {
1665 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1666 let _t = p
1667 .create_topic::<RawBytes>("Local", TopicQos::default())
1668 .unwrap();
1669 let started = std::time::Instant::now();
1670 let h = p
1671 .find_topic("Local", core::time::Duration::from_secs(5))
1672 .expect("local find");
1673 assert!(started.elapsed() < core::time::Duration::from_millis(50));
1676 use crate::topic::TopicDescription as _;
1677 assert_eq!(h.get_name(), "Local");
1678 }
1679
1680 #[cfg(feature = "std")]
1681 #[test]
1682 fn find_topic_times_out_when_unknown() {
1683 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1684 let err = p
1685 .find_topic("NotExists", core::time::Duration::from_millis(80))
1686 .unwrap_err();
1687 assert!(matches!(err, DdsError::Timeout));
1688 }
1689
1690 #[cfg(feature = "std")]
1691 #[test]
1692 fn find_topic_rejects_empty_name() {
1693 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1694 let err = p
1695 .find_topic("", core::time::Duration::from_millis(10))
1696 .unwrap_err();
1697 assert!(matches!(err, DdsError::BadParameter { .. }));
1698 }
1699
1700 #[test]
1701 fn create_contentfilteredtopic_rejects_empty_name() {
1702 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1703 let topic = p
1704 .create_topic::<RawBytes>("Base", TopicQos::default())
1705 .unwrap();
1706 let err = p
1707 .create_contentfilteredtopic("", &topic, "x > 0", alloc::vec::Vec::new())
1708 .unwrap_err();
1709 assert!(matches!(err, DdsError::BadParameter { .. }));
1710 }
1711
1712 #[test]
1713 fn create_contentfilteredtopic_rejects_empty_expression() {
1714 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1715 let topic = p
1716 .create_topic::<RawBytes>("Base", TopicQos::default())
1717 .unwrap();
1718 let err = p
1719 .create_contentfilteredtopic("CF", &topic, "", alloc::vec::Vec::new())
1720 .unwrap_err();
1721 assert!(matches!(err, DdsError::BadParameter { .. }));
1722 }
1723
1724 #[test]
1725 fn delete_contentfilteredtopic_accepts_own() {
1726 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1727 let topic = p
1728 .create_topic::<RawBytes>("Base", TopicQos::default())
1729 .unwrap();
1730 let cft = p
1731 .create_contentfilteredtopic("CF", &topic, "x > 0", alloc::vec::Vec::new())
1732 .unwrap();
1733 p.delete_contentfilteredtopic(&cft).unwrap();
1734 }
1735
1736 #[cfg(feature = "std")]
1737 #[test]
1738 fn find_topic_resolves_via_sedp_subscription() {
1739 use crate::factory::DomainParticipantFactory;
1743 use core::time::Duration as CoreDur;
1744 use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
1745 use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
1746 use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
1747
1748 let p = DomainParticipantFactory::instance()
1749 .create_participant_with_config(
1750 43,
1751 DomainParticipantQos::default(),
1752 crate::runtime::RuntimeConfig::default(),
1753 )
1754 .expect("runtime start");
1755
1756 let target_topic = "DiscoveredViaSubSedp";
1757 if let Some(rt) = p.runtime() {
1758 if let Ok(mut sedp) = rt.sedp.lock() {
1759 let prefix = GuidPrefix::from_bytes([0xCD; 12]);
1760 let subdata = SubscriptionBuiltinTopicData {
1761 key: Guid::new(prefix, EntityId::user_reader_with_key([4, 5, 6])),
1762 participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
1763 topic_name: target_topic.into(),
1764 type_name: "test::SubT".into(),
1765 durability: DurabilityKind::Volatile,
1766 reliability: ReliabilityQos {
1767 kind: ReliabilityKind::Reliable,
1768 max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
1769 },
1770 ownership: zerodds_qos::OwnershipKind::Shared,
1771 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
1772 deadline: zerodds_qos::DeadlineQosPolicy::default(),
1773 partition: alloc::vec::Vec::new(),
1774 user_data: alloc::vec::Vec::new(),
1775 topic_data: alloc::vec::Vec::new(),
1776 group_data: alloc::vec::Vec::new(),
1777 type_information: None,
1778 data_representation: alloc::vec::Vec::new(),
1779 content_filter: None,
1780 security_info: None,
1781 service_instance_name: None,
1782 related_entity_guid: None,
1783 topic_aliases: None,
1784 type_identifier: zerodds_types::TypeIdentifier::None,
1785 };
1786 sedp.cache_mut().insert_subscription(subdata, CoreDur::ZERO);
1787 }
1788 }
1789
1790 let h = p
1791 .find_topic(target_topic, CoreDur::from_millis(200))
1792 .expect("find via subscription");
1793 use crate::topic::TopicDescription as _;
1794 assert_eq!(h.get_name(), target_topic);
1795 assert_eq!(h.get_type_name(), "test::SubT");
1796 }
1797
1798 #[cfg(feature = "std")]
1799 #[test]
1800 fn find_topic_resolves_after_sedp_publication() {
1801 use crate::factory::DomainParticipantFactory;
1808 use core::time::Duration as CoreDur;
1809 use zerodds_rtps::publication_data::{
1810 DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
1811 };
1812 use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
1813
1814 let p = DomainParticipantFactory::instance()
1815 .create_participant_with_config(
1816 42,
1817 DomainParticipantQos::default(),
1818 crate::runtime::RuntimeConfig::default(),
1819 )
1820 .expect("runtime start");
1821
1822 let target_topic = "DiscoveredViaSedp";
1823 let target_type = "test::Discovered";
1824
1825 let p_inject = p.clone();
1828 let topic_name = String::from(target_topic);
1829 let type_name = String::from(target_type);
1830 let join = std::thread::spawn(move || {
1831 std::thread::sleep(CoreDur::from_millis(50));
1832 if let Some(rt) = p_inject.runtime() {
1833 if let Ok(mut sedp) = rt.sedp.lock() {
1834 let prefix = GuidPrefix::from_bytes([0xAB; 12]);
1835 let pubdata = PublicationBuiltinTopicData {
1836 key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
1837 participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
1838 topic_name,
1839 type_name,
1840 durability: DurabilityKind::Volatile,
1841 reliability: ReliabilityQos {
1842 kind: ReliabilityKind::Reliable,
1843 max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(
1844 1,
1845 ),
1846 },
1847 ownership: zerodds_qos::OwnershipKind::Shared,
1848 ownership_strength: 0,
1849 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
1850 deadline: zerodds_qos::DeadlineQosPolicy::default(),
1851 lifespan: zerodds_qos::LifespanQosPolicy::default(),
1852 partition: alloc::vec::Vec::new(),
1853 user_data: alloc::vec::Vec::new(),
1854 topic_data: alloc::vec::Vec::new(),
1855 group_data: alloc::vec::Vec::new(),
1856 type_information: None,
1857 data_representation: alloc::vec::Vec::new(),
1858 security_info: None,
1859 service_instance_name: None,
1860 related_entity_guid: None,
1861 topic_aliases: None,
1862 type_identifier: zerodds_types::TypeIdentifier::None,
1863 };
1864 sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
1865 }
1866 }
1867 });
1868
1869 let result = p.find_topic(target_topic, CoreDur::from_secs(2));
1870 join.join().expect("inject thread");
1871 let h = result.expect("find_topic should resolve via SEDP");
1872 use crate::topic::TopicDescription as _;
1873 assert_eq!(h.get_name(), target_topic);
1874 assert_eq!(h.get_type_name(), target_type);
1875 }
1876
1877 #[test]
1882 fn ignore_participant_records_handle() {
1883 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1884 let h = InstanceHandle::from_raw(0xAA);
1885 assert!(!p.is_participant_ignored(h));
1886 p.ignore_participant(h).unwrap();
1887 assert!(p.is_participant_ignored(h));
1888 }
1889
1890 #[test]
1891 fn ignore_topic_records_handle() {
1892 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1893 let h = InstanceHandle::from_raw(0xBB);
1894 assert!(!p.is_topic_ignored(h));
1895 p.ignore_topic(h).unwrap();
1896 assert!(p.is_topic_ignored(h));
1897 }
1898
1899 #[test]
1900 fn ignore_publication_records_handle() {
1901 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1902 let h = InstanceHandle::from_raw(0xCC);
1903 assert!(!p.is_publication_ignored(h));
1904 p.ignore_publication(h).unwrap();
1905 assert!(p.is_publication_ignored(h));
1906 }
1907
1908 #[test]
1909 fn ignore_subscription_records_handle() {
1910 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1911 let h = InstanceHandle::from_raw(0xDD);
1912 assert!(!p.is_subscription_ignored(h));
1913 p.ignore_subscription(h).unwrap();
1914 assert!(p.is_subscription_ignored(h));
1915 }
1916
1917 #[test]
1918 fn ignore_lists_are_independent() {
1919 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1923 let h = InstanceHandle::from_raw(0xEE);
1924 p.ignore_topic(h).unwrap();
1925 assert!(p.is_topic_ignored(h));
1926 assert!(!p.is_participant_ignored(h));
1927 assert!(!p.is_publication_ignored(h));
1928 assert!(!p.is_subscription_ignored(h));
1929 }
1930
1931 #[test]
1932 fn ignore_is_monotonic_and_idempotent() {
1933 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1936 let h = InstanceHandle::from_raw(0x42);
1937 p.ignore_participant(h).unwrap();
1938 p.ignore_participant(h).unwrap();
1939 assert!(p.is_participant_ignored(h));
1940 }
1941
1942 #[test]
1943 fn delete_contained_entities_clears_topics_and_groups() {
1944 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1945 let _t = p
1946 .create_topic::<RawBytes>("ToBeRemoved", TopicQos::default())
1947 .unwrap();
1948 let _pub_ = p.create_publisher(PublisherQos::default());
1949 let _sub_ = p.create_subscriber(SubscriberQos::default());
1950 assert_eq!(p.topics_len(), 1);
1951 assert_eq!(p.publishers_len(), 1);
1952 assert_eq!(p.subscribers_len(), 1);
1953 p.delete_contained_entities().unwrap();
1954 assert_eq!(p.topics_len(), 0);
1955 assert_eq!(p.publishers_len(), 0);
1956 assert_eq!(p.subscribers_len(), 0);
1957 }
1958
1959 #[test]
1960 fn delete_contained_entities_clears_builtin_reader_inboxes() {
1961 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1962 use crate::builtin_topics::ParticipantBuiltinTopicData as DcpsP;
1965 use zerodds_rtps::wire_types::Guid;
1966 let bs = p.get_builtin_subscriber();
1967 bs.sinks()
1968 .push_participant(&DcpsP {
1969 key: Guid::from_bytes([7u8; 16]),
1970 user_data: alloc::vec::Vec::new(),
1971 })
1972 .unwrap();
1973 let r = bs.participant_reader();
1974 assert_eq!(r.read().unwrap().len(), 1);
1975 p.delete_contained_entities().unwrap();
1976 assert_eq!(r.read().unwrap().len(), 0);
1977 }
1978
1979 #[cfg(feature = "std")]
1980 #[test]
1981 fn get_discovered_participants_offline_is_empty() {
1982 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1985 assert!(p.get_discovered_participants().is_empty());
1986 }
1987
1988 #[cfg(feature = "std")]
1989 #[test]
1990 fn get_discovered_participant_data_offline_errors() {
1991 let p = DomainParticipant::new(0, DomainParticipantQos::default());
1992 let err = p
1993 .get_discovered_participant_data(InstanceHandle::from_raw(1))
1994 .unwrap_err();
1995 assert!(matches!(err, DdsError::BadParameter { .. }));
1996 }
1997
1998 #[cfg(feature = "std")]
1999 #[test]
2000 fn get_discovered_topics_offline_is_empty() {
2001 let p = DomainParticipant::new(0, DomainParticipantQos::default());
2002 assert!(p.get_discovered_topics().is_empty());
2003 }
2004
2005 #[cfg(feature = "std")]
2006 #[test]
2007 fn get_discovered_topic_data_offline_errors() {
2008 let p = DomainParticipant::new(0, DomainParticipantQos::default());
2009 let err = p
2010 .get_discovered_topic_data(InstanceHandle::from_raw(1))
2011 .unwrap_err();
2012 assert!(matches!(err, DdsError::BadParameter { .. }));
2013 }
2014
2015 #[cfg(feature = "std")]
2016 #[test]
2017 fn get_discovered_participants_lists_after_spdp_inject() {
2018 use crate::factory::DomainParticipantFactory;
2023 let p = DomainParticipantFactory::instance()
2024 .create_participant_with_config(
2025 30,
2026 DomainParticipantQos::default(),
2027 crate::runtime::RuntimeConfig::default(),
2028 )
2029 .expect("rt start");
2030
2031 use zerodds_rtps::participant_data::ParticipantBuiltinTopicData as WirePart;
2035 use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, ProtocolVersion, VendorId};
2036 let remote = GuidPrefix::from_bytes([0xCA; 12]);
2037 let wire = WirePart {
2038 guid: Guid::new(remote, EntityId::PARTICIPANT),
2039 protocol_version: ProtocolVersion::V2_5,
2040 vendor_id: VendorId::ZERODDS,
2041 default_unicast_locator: None,
2042 default_multicast_locator: None,
2043 metatraffic_unicast_locator: None,
2044 metatraffic_multicast_locator: None,
2045 domain_id: Some(30),
2046 builtin_endpoint_set: 0,
2047 lease_duration: zerodds_rtps::participant_data::Duration::from_secs(100),
2048 user_data: alloc::vec::Vec::new(),
2049 properties: Default::default(),
2050 identity_token: None,
2051 permissions_token: None,
2052 identity_status_token: None,
2053 sig_algo_info: None,
2054 kx_algo_info: None,
2055 sym_cipher_algo_info: None,
2056 };
2057 let beacon = zerodds_discovery::spdp::SpdpBeacon::new(wire.clone())
2058 .serialize()
2059 .expect("serialize");
2060 if let Some(rt) = p.runtime() {
2061 crate::runtime::handle_spdp_datagram_for_test(rt, &beacon);
2062 }
2063
2064 let handles = p.get_discovered_participants();
2065 assert_eq!(handles.len(), 1);
2066 let data = p
2067 .get_discovered_participant_data(handles[0])
2068 .expect("data lookup");
2069 assert_eq!(data.key, wire.guid);
2070 p.ignore_participant(handles[0]).unwrap();
2072 assert!(p.get_discovered_participants().is_empty());
2073 let err = p.get_discovered_participant_data(handles[0]).unwrap_err();
2074 assert!(matches!(err, DdsError::BadParameter { .. }));
2075 }
2076
2077 #[cfg(feature = "std")]
2078 #[test]
2079 fn get_discovered_topics_lists_unique_handles_for_pub_and_sub() {
2080 use crate::factory::DomainParticipantFactory;
2082 use core::time::Duration as CoreDur;
2083 use zerodds_rtps::publication_data::{
2084 DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
2085 };
2086 use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
2087 use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
2088
2089 let p = DomainParticipantFactory::instance()
2090 .create_participant_with_config(
2091 21,
2092 DomainParticipantQos::default(),
2093 crate::runtime::RuntimeConfig::default(),
2094 )
2095 .expect("rt start");
2096 if let Some(rt) = p.runtime() {
2097 if let Ok(mut sedp) = rt.sedp.lock() {
2098 let prefix = GuidPrefix::from_bytes([0x77; 12]);
2099 let pubdata = PublicationBuiltinTopicData {
2100 key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
2101 participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2102 topic_name: "SharedTopic".into(),
2103 type_name: "SharedType".into(),
2104 durability: DurabilityKind::Volatile,
2105 reliability: ReliabilityQos {
2106 kind: ReliabilityKind::Reliable,
2107 max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2108 },
2109 ownership: zerodds_qos::OwnershipKind::Shared,
2110 ownership_strength: 0,
2111 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2112 deadline: zerodds_qos::DeadlineQosPolicy::default(),
2113 lifespan: zerodds_qos::LifespanQosPolicy::default(),
2114 partition: alloc::vec::Vec::new(),
2115 user_data: alloc::vec::Vec::new(),
2116 topic_data: alloc::vec::Vec::new(),
2117 group_data: alloc::vec::Vec::new(),
2118 type_information: None,
2119 data_representation: alloc::vec::Vec::new(),
2120 security_info: None,
2121 service_instance_name: None,
2122 related_entity_guid: None,
2123 topic_aliases: None,
2124 type_identifier: zerodds_types::TypeIdentifier::None,
2125 };
2126 let subdata = SubscriptionBuiltinTopicData {
2127 key: Guid::new(prefix, EntityId::user_reader_with_key([4, 5, 6])),
2128 participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2129 topic_name: "SharedTopic".into(),
2130 type_name: "SharedType".into(),
2131 durability: DurabilityKind::Volatile,
2132 reliability: ReliabilityQos {
2133 kind: ReliabilityKind::Reliable,
2134 max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2135 },
2136 ownership: zerodds_qos::OwnershipKind::Shared,
2137 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2138 deadline: zerodds_qos::DeadlineQosPolicy::default(),
2139 partition: alloc::vec::Vec::new(),
2140 user_data: alloc::vec::Vec::new(),
2141 topic_data: alloc::vec::Vec::new(),
2142 group_data: alloc::vec::Vec::new(),
2143 type_information: None,
2144 data_representation: alloc::vec::Vec::new(),
2145 content_filter: None,
2146 security_info: None,
2147 service_instance_name: None,
2148 related_entity_guid: None,
2149 topic_aliases: None,
2150 type_identifier: zerodds_types::TypeIdentifier::None,
2151 };
2152 sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
2153 sedp.cache_mut().insert_subscription(subdata, CoreDur::ZERO);
2154 }
2155 }
2156 let topics = p.get_discovered_topics();
2157 assert_eq!(topics.len(), 1, "Pub+Sub auf gleichem Topic → 1 Handle");
2158 let data = p.get_discovered_topic_data(topics[0]).expect("topic data");
2159 assert_eq!(data.name, "SharedTopic");
2160 assert_eq!(data.type_name, "SharedType");
2161 }
2162
2163 #[cfg(feature = "std")]
2164 #[test]
2165 fn get_discovered_topic_data_filters_ignored() {
2166 use crate::factory::DomainParticipantFactory;
2167 use core::time::Duration as CoreDur;
2168 use zerodds_rtps::publication_data::{
2169 DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
2170 };
2171 use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
2172
2173 let p = DomainParticipantFactory::instance()
2174 .create_participant_with_config(
2175 22,
2176 DomainParticipantQos::default(),
2177 crate::runtime::RuntimeConfig::default(),
2178 )
2179 .expect("rt start");
2180 if let Some(rt) = p.runtime() {
2181 if let Ok(mut sedp) = rt.sedp.lock() {
2182 let prefix = GuidPrefix::from_bytes([0x55; 12]);
2183 let pubdata = PublicationBuiltinTopicData {
2184 key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
2185 participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2186 topic_name: "ToIgnore".into(),
2187 type_name: "T".into(),
2188 durability: DurabilityKind::Volatile,
2189 reliability: ReliabilityQos {
2190 kind: ReliabilityKind::Reliable,
2191 max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2192 },
2193 ownership: zerodds_qos::OwnershipKind::Shared,
2194 ownership_strength: 0,
2195 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2196 deadline: zerodds_qos::DeadlineQosPolicy::default(),
2197 lifespan: zerodds_qos::LifespanQosPolicy::default(),
2198 partition: alloc::vec::Vec::new(),
2199 user_data: alloc::vec::Vec::new(),
2200 topic_data: alloc::vec::Vec::new(),
2201 group_data: alloc::vec::Vec::new(),
2202 type_information: None,
2203 data_representation: alloc::vec::Vec::new(),
2204 security_info: None,
2205 service_instance_name: None,
2206 related_entity_guid: None,
2207 topic_aliases: None,
2208 type_identifier: zerodds_types::TypeIdentifier::None,
2209 };
2210 sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
2211 }
2212 }
2213 let topics_before = p.get_discovered_topics();
2214 assert_eq!(topics_before.len(), 1);
2215 p.ignore_topic(topics_before[0]).unwrap();
2219 assert!(p.get_discovered_topics().is_empty());
2220 let err = p.get_discovered_topic_data(topics_before[0]).unwrap_err();
2221 assert!(matches!(err, DdsError::BadParameter { .. }));
2222 }
2223
2224 #[test]
2225 fn delete_contentfilteredtopic_rejects_foreign() {
2226 let p1 = DomainParticipant::new(0, DomainParticipantQos::default());
2227 let p2 = DomainParticipant::new(1, DomainParticipantQos::default());
2228 let topic = p1
2229 .create_topic::<RawBytes>("Base", TopicQos::default())
2230 .unwrap();
2231 let cft = p1
2232 .create_contentfilteredtopic("CF", &topic, "x > 0", alloc::vec::Vec::new())
2233 .unwrap();
2234 let err = p2.delete_contentfilteredtopic(&cft).unwrap_err();
2235 assert!(matches!(err, DdsError::BadParameter { .. }));
2236 }
2237}