Skip to main content

zerodds_dcps/
participant.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! DomainParticipant — die "Wurzel"-Entity eines DDS-Programms.
4//!
5//! Spec-Referenz: OMG DDS 1.4 §2.2.2.2 `DomainParticipant`.
6//!
7//! Jedes DDS-Programm oeffnet typischerweise genau einen
8//! `DomainParticipant` pro Domain-Id. Der Participant:
9//!
10//! - haelt die GUID-Prefix (12-Byte, leite ID fuer alle internen Endpoints),
11//! - registriert sich via SPDP (Simple Participant Discovery Protocol),
12//! - betreibt SEDP (Simple Endpoint Discovery Protocol) fuer
13//!   Topic-/Writer-/Reader-Matching,
14//! - ist Factory fuer Publisher, Subscriber, Topic.
15//!
16//! # Modi
17//!
18//! - **Live-Mode** (`new_with_runtime`, gerufen aus
19//!   `DomainParticipantFactory::create_participant`): bindet UDP-
20//!   Sockets, spawnt SPDP-/SEDP-/WLP-Threads, fuehrt das volle
21//!   Discovery-Protokoll und die TypeLookup-Service-Endpoints
22//!   (XTypes 1.3 §7.6.3.3.4).
23//! - **Offline-Mode** (`new`, gerufen aus
24//!   `DomainParticipantFactory::create_participant_offline`): keine
25//!   Sockets, keine Threads. Topic-Registry, QoS-Negotiation und
26//!   Loopback-Pfad fuer Unit-Tests sind verfuegbar.
27//!
28//! Topic-Registry: gleicher Name + gleicher Typ ergibt denselben
29//! Topic-Handle (DDS 1.4 §2.2.2.2.1.10 `find_topic`).
30
31extern crate alloc;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36
37#[cfg(feature = "std")]
38use std::sync::Mutex;
39
40use crate::builtin_subscriber::BuiltinSubscriber;
41use crate::builtin_topics::{ParticipantBuiltinTopicData, TopicBuiltinTopicData};
42use crate::dds_type::DdsType;
43use crate::entity::StatusMask;
44use crate::error::{DdsError, Result};
45use crate::instance_handle::InstanceHandle;
46use crate::listener::ArcDomainParticipantListener;
47use crate::publisher::Publisher;
48use crate::qos::{DomainParticipantQos, PublisherQos, SubscriberQos, TopicQos};
49use crate::subscriber::Subscriber;
50use crate::topic::{
51    ContentFilteredTopic, Topic, TopicDescription, TopicDescriptionHandle, TopicInner,
52};
53
54#[cfg(feature = "std")]
55use crate::runtime::{DcpsRuntime, RuntimeConfig};
56
57/// Domain-Id-Typ (Spec: `DomainId_t` = long, also i32).
58pub type DomainId = i32;
59
60/// Shared Ignore-List-Filter eines `DomainParticipant`s. Wird vom
61/// Participant gehalten **und** vom `DcpsRuntime`-Discovery-Hook
62/// konsultiert (Klon des `Arc`). Spec-Referenz: DDS DCPS 1.4
63/// §2.2.2.2.1.14-17 `ignore_participant/topic/publication/subscription`.
64///
65/// Per spec sind die Listen **monoton wachsend**: ein Handle kann
66/// dazukommen, aber nie wieder entfernt werden. Daher reicht
67/// `BTreeSet<InstanceHandle>` und keine Generation-Counter.
68#[derive(Debug, Default)]
69#[cfg(feature = "std")]
70pub(crate) struct IgnoreFilterInner {
71    pub(crate) participants: Mutex<BTreeSet<InstanceHandle>>,
72    pub(crate) topics: Mutex<BTreeSet<InstanceHandle>>,
73    pub(crate) publications: Mutex<BTreeSet<InstanceHandle>>,
74    pub(crate) subscriptions: Mutex<BTreeSet<InstanceHandle>>,
75}
76
77/// Klonbarer Filter-Handle (Arc-bumps sind billig). Discovery-Hook
78/// darf hier zwischendurch reinpoken, ohne lock-cycles auf den
79/// gesamten ParticipantInner zu erzwingen.
80#[derive(Clone, Debug, Default)]
81#[cfg(feature = "std")]
82pub struct IgnoreFilter {
83    pub(crate) inner: Arc<IgnoreFilterInner>,
84}
85
86#[cfg(feature = "std")]
87impl IgnoreFilter {
88    /// Pruefe, ob ein Participant-Handle ignoriert ist.
89    #[must_use]
90    pub fn is_participant_ignored(&self, h: InstanceHandle) -> bool {
91        self.inner
92            .participants
93            .lock()
94            .map(|s| s.contains(&h))
95            .unwrap_or(false)
96    }
97
98    /// Pruefe, ob ein Topic-Handle ignoriert ist.
99    #[must_use]
100    pub fn is_topic_ignored(&self, h: InstanceHandle) -> bool {
101        self.inner
102            .topics
103            .lock()
104            .map(|s| s.contains(&h))
105            .unwrap_or(false)
106    }
107
108    /// Pruefe, ob ein Publication-Handle ignoriert ist.
109    #[must_use]
110    pub fn is_publication_ignored(&self, h: InstanceHandle) -> bool {
111        self.inner
112            .publications
113            .lock()
114            .map(|s| s.contains(&h))
115            .unwrap_or(false)
116    }
117
118    /// Pruefe, ob ein Subscription-Handle ignoriert ist.
119    #[must_use]
120    pub fn is_subscription_ignored(&self, h: InstanceHandle) -> bool {
121        self.inner
122            .subscriptions
123            .lock()
124            .map(|s| s.contains(&h))
125            .unwrap_or(false)
126    }
127}
128
129/// Zufaellig erzeugter 12-Byte-Participant-Prefix.
130/// Schema: pid + timestamp + counter. Cross-Host-
131/// Kollision ist theoretisch moeglich, fuer v1.2 akzeptabel.
132#[cfg(feature = "std")]
133fn random_guid_prefix() -> zerodds_rtps::wire_types::GuidPrefix {
134    use std::sync::atomic::{AtomicU32, Ordering};
135    static COUNTER: AtomicU32 = AtomicU32::new(0);
136    let pid = std::process::id();
137    let t = std::time::SystemTime::now()
138        .duration_since(std::time::UNIX_EPOCH)
139        .map(|d| d.as_nanos() as u64)
140        .unwrap_or(0);
141    let c = COUNTER.fetch_add(1, Ordering::Relaxed);
142    let mut bytes = [0u8; 12];
143    bytes[0..4].copy_from_slice(&pid.to_le_bytes());
144    bytes[4..12].copy_from_slice(&t.to_le_bytes());
145    bytes[11] = bytes[11].wrapping_add(c as u8);
146    zerodds_rtps::wire_types::GuidPrefix::from_bytes(bytes)
147}
148
149/// Der Participant.
150#[derive(Clone)]
151pub struct DomainParticipant {
152    inner: Arc<ParticipantInner>,
153}
154
155impl core::fmt::Debug for DomainParticipant {
156    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
157        f.debug_struct("DomainParticipant")
158            .field("domain_id", &self.inner.domain_id)
159            .finish_non_exhaustive()
160    }
161}
162
163pub(crate) struct ParticipantInner {
164    pub(crate) domain_id: DomainId,
165    pub(crate) qos: Mutex<DomainParticipantQos>,
166    /// Entity-Lifecycle (DCPS §2.2.2.1).
167    pub(crate) entity_state: Arc<crate::entity::EntityState>,
168    /// Topic-Registry (Name → TopicInner). Wiederholte
169    /// `create_topic` mit gleichem Namen + Typ liefern denselben
170    /// Handle; mit anderem Typ → `InconsistentPolicy`-Error.
171    topics: Mutex<BTreeMap<String, Arc<TopicInner>>>,
172    /// Runtime-Handle mit UDP-Sockets + Discovery-Threads. `None`
173    /// wenn der Participant im offline-Modus erzeugt wurde (Tests
174    /// die kein Netzwerk wollen).
175    #[cfg(feature = "std")]
176    pub(crate) runtime: Option<Arc<DcpsRuntime>>,
177    /// Vorinstallierter Builtin-Subscriber (DDS 1.4 §2.2.2.2.1.7).
178    /// Genau einer pro Participant. Die Sinks werden bei
179    /// Konstruktion in den Runtime-Discovery-Hook eingehaengt.
180    pub(crate) builtin_subscriber: Arc<BuiltinSubscriber>,
181    /// Ignore-Filter (Spec §2.2.2.2.1.14-17). Klon liegt in der
182    /// Runtime und wird vom Discovery-Hot-Path gegengeprueft, damit
183    /// SPDP/SEDP-Samples nach `ignore_*` nicht mehr in die Builtin-
184    /// Reader fallen.
185    #[cfg(feature = "std")]
186    pub(crate) ignore_filter: IgnoreFilter,
187    /// Lokale Publisher-Registry (fuer `delete_contained_entities` +
188    /// `contains_entity` per Spec §2.2.2.2.1.10). Wir tracken die
189    /// `InstanceHandle` jedes mit `create_publisher` erzeugten
190    /// Publishers; `delete_contained_entities` cleart die Liste.
191    /// Echte Drop-Semantik der einzelnen Publisher passiert per
192    /// `Arc`-Refcount, sobald der User-Handle fallengelassen wird
193    ///.
194    publishers: Mutex<Vec<InstanceHandle>>,
195    /// Analog zu `publishers`.
196    subscribers: Mutex<Vec<InstanceHandle>>,
197    /// Aggregat aller DataWriter-Handles aller Publisher dieses
198    /// Participants (Spec §2.2.2.2.1.10 contains_entity rekursiv).
199    /// Pub/Sub registrieren neue Children via Weak-Backref.
200    pub(crate) datawriters: Mutex<Vec<InstanceHandle>>,
201    /// Aggregat aller DataReader-Handles aller Subscriber dieses
202    /// Participants.
203    pub(crate) datareaders: Mutex<Vec<InstanceHandle>>,
204    /// optionaler [`ArcDomainParticipantListener`] +
205    /// [`StatusMask`]. Bubble-Up-Target fuer alle Children, deren
206    /// engerer Listener das Status-Bit nicht abdeckt.
207    pub(crate) listener: Mutex<Option<(ArcDomainParticipantListener, StatusMask)>>,
208    /// Built-in DynamicType-Registry. Wird in `new()`/
209    /// `new_with_runtime()` automatisch mit den 4 Spec-§7.6.5-Built-in-
210    /// Types befuellt (`DDS::String`, `DDS::KeyedString`, `DDS::Bytes`,
211    /// `DDS::KeyedBytes`). Ueber [`DomainParticipant::find_builtin_type`]
212    /// abrufbar.
213    #[cfg(feature = "std")]
214    pub(crate) type_registry: Mutex<BTreeMap<String, zerodds_types::dynamic::DynamicType>>,
215    /// TypeLookup-Client-State pro Participant. Pending
216    /// Get-Types-Requests werden hier gequeued; Backoff via
217    /// `last_attempt_per_hash` damit unbekannte TypeIDs nicht jeden
218    /// Tick re-queryt werden.
219    #[cfg(feature = "std")]
220    pub(crate) type_lookup: Mutex<TypeLookupState>,
221}
222
223/// TypeLookup-Client-State pro Participant. Tracked Pending-
224/// Requests + Backoff-Timer + Retry-Count pro unbekanntem TypeID-Hash.
225#[cfg(feature = "std")]
226#[derive(Debug, Default)]
227pub(crate) struct TypeLookupState {
228    /// Pro TypeID: (last_attempt_instant, retry_count).
229    pub attempts: BTreeMap<zerodds_types::EquivalenceHash, (std::time::Instant, u32)>,
230    /// Optional Sink fuer outgoing TypeLookup-Requests (Test-Hook).
231    /// Production-Pfad waere ein Reliable-Writer auf dem
232    /// `TL_SVC_REQ_WRITER`-Endpoint; bis dahin queued der Sink
233    /// (Test-Mode) oder bleibt None (Live-Mode = no-op).
234    pub outgoing: Vec<(zerodds_types::EquivalenceHash, u64)>,
235}
236
237#[cfg(feature = "std")]
238impl TypeLookupState {
239    /// Backoff-Periode (5s) zwischen Wiederholungen.
240    pub const BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
241    /// Maximale Versuche pro unbekanntem TypeID.
242    pub const MAX_ATTEMPTS: u32 = 3;
243}
244
245impl DomainParticipant {
246    /// Offline-Konstruktor ohne Runtime — fuer Skeleton-Tests.
247    /// Produktions-Code geht durch `DomainParticipantFactory::
248    /// create_participant` das automatisch eine Runtime startet.
249    pub(crate) fn new(domain_id: DomainId, qos: DomainParticipantQos) -> Self {
250        let builtin = Arc::new(BuiltinSubscriber::new());
251        let participant = Self {
252            inner: Arc::new(ParticipantInner {
253                domain_id,
254                qos: Mutex::new(qos),
255                entity_state: crate::entity::EntityState::new(),
256                topics: Mutex::new(BTreeMap::new()),
257                #[cfg(feature = "std")]
258                runtime: None,
259                builtin_subscriber: builtin,
260                #[cfg(feature = "std")]
261                ignore_filter: IgnoreFilter::default(),
262                publishers: Mutex::new(Vec::new()),
263                subscribers: Mutex::new(Vec::new()),
264                datawriters: Mutex::new(Vec::new()),
265                datareaders: Mutex::new(Vec::new()),
266                listener: Mutex::new(None),
267                #[cfg(feature = "std")]
268                type_registry: Mutex::new(BTreeMap::new()),
269                #[cfg(feature = "std")]
270                type_lookup: Mutex::new(TypeLookupState::default()),
271            }),
272        };
273        // Auto-Register der 4 Spec-§7.6.5-Built-in-Types.
274        #[cfg(feature = "std")]
275        participant.register_builtin_types();
276        participant
277    }
278
279    /// Konstruktor mit live Runtime (UDP + Discovery). Gibt
280    /// `TransportError` zurueck, wenn Socket-Bind scheitert.
281    ///
282    /// # Errors
283    /// [`DdsError::TransportError`] bei Bind-Problemen.
284    #[cfg(feature = "std")]
285    pub(crate) fn new_with_runtime(
286        domain_id: DomainId,
287        qos: DomainParticipantQos,
288        config: RuntimeConfig,
289    ) -> Result<Self> {
290        let runtime = DcpsRuntime::start(domain_id, random_guid_prefix(), config)?;
291        let builtin = Arc::new(BuiltinSubscriber::new());
292        // Discovery-Hook verdrahten: Runtime pusht ab jetzt SPDP/SEDP-
293        // Events in die 4 Builtin-Reader.
294        runtime.attach_builtin_sinks(builtin.sinks());
295        //  shared Ignore-Filter mit der Runtime teilen, damit der
296        // Discovery-Hot-Path (handle_spdp_datagram +
297        // push_sedp_events_to_builtin_readers) die Listen konsultieren
298        // kann.
299        let ignore_filter = IgnoreFilter::default();
300        runtime.attach_ignore_filter(ignore_filter.clone());
301        let participant = Self {
302            inner: Arc::new(ParticipantInner {
303                domain_id,
304                qos: Mutex::new(qos),
305                entity_state: crate::entity::EntityState::new(),
306                topics: Mutex::new(BTreeMap::new()),
307                runtime: Some(runtime),
308                builtin_subscriber: builtin,
309                ignore_filter,
310                publishers: Mutex::new(Vec::new()),
311                subscribers: Mutex::new(Vec::new()),
312                datawriters: Mutex::new(Vec::new()),
313                datareaders: Mutex::new(Vec::new()),
314                listener: Mutex::new(None),
315                type_registry: Mutex::new(BTreeMap::new()),
316                type_lookup: Mutex::new(TypeLookupState::default()),
317            }),
318        };
319        // Auto-Register der 4 Spec-§7.6.5-Built-in-Types.
320        participant.register_builtin_types();
321        Ok(participant)
322    }
323
324    /// Interner Zugriff auf die Runtime — von Publisher/Subscriber
325    /// verwendet, um DataWriter/Reader anzulegen. `None` wenn der
326    /// Participant im offline-Modus ist.
327    #[cfg(feature = "std")]
328    #[must_use]
329    pub fn runtime(&self) -> Option<&Arc<DcpsRuntime>> {
330        self.inner.runtime.as_ref()
331    }
332
333    /// Domain-Id.
334    #[must_use]
335    pub fn domain_id(&self) -> DomainId {
336        self.inner.domain_id
337    }
338
339    /// Liefert eine Kopie der DomainParticipantQos (Spec §2.2.2.2.1.4
340    /// `get_qos`).
341    #[must_use]
342    pub fn qos(&self) -> DomainParticipantQos {
343        self.inner.qos.lock().map(|g| g.clone()).unwrap_or_default()
344    }
345
346    /// Setzt die DomainParticipantQos (Spec §2.2.2.2.1.3 `set_qos`).
347    ///
348    /// # Errors
349    /// Aktuell keine — die Methode liefert `Ok(())` immer. Spec laesst
350    /// `IMMUTABLE_POLICY` zu, was wir aber nicht aktiv produzieren
351    /// (alle Policies sind im RC1 mutable).
352    pub fn set_qos(&self, qos: DomainParticipantQos) -> Result<()> {
353        if let Ok(mut g) = self.inner.qos.lock() {
354            *g = qos;
355        }
356        Ok(())
357    }
358
359    /// Registriert die 4 Spec-§7.6.5-Built-in-Types
360    /// (`DDS::String`, `DDS::KeyedString`, `DDS::Bytes`, `DDS::KeyedBytes`)
361    /// im lokalen TypeRegistry. Idempotent — doppelter Aufruf ueber-
362    /// schreibt die Eintraege deterministisch.
363    ///
364    /// Wird automatisch aus `new()`/`new_with_runtime()` aufgerufen,
365    /// kann aber auch nach einem `unregister_builtin_types()`-Disable
366    /// erneut aufgerufen werden.
367    #[cfg(feature = "std")]
368    pub fn register_builtin_types(&self) {
369        if let Ok(types) = zerodds_types::dynamic::all_builtin_types() {
370            if let Ok(mut reg) = self.inner.type_registry.lock() {
371                for (name, t) in types {
372                    reg.insert(name, t);
373                }
374            }
375        }
376    }
377
378    /// Loescht alle registrierten Built-in-Types. Wird heute
379    /// nicht von Default-Pfaden gerufen — Test-Hilfsfunktion fuer
380    /// Disable-Flag-Tests.
381    #[cfg(feature = "std")]
382    pub fn unregister_builtin_types(&self) {
383        if let Ok(mut reg) = self.inner.type_registry.lock() {
384            reg.retain(|name, _| !zerodds_types::dynamic::is_builtin_type_name(name));
385        }
386    }
387
388    /// Lookup eines Built-in-Types via Spec-Name (Spec §7.6.5).
389    /// Gibt `Some(DynamicType)` zurueck wenn der Name bekannt ist
390    /// (registriert via `register_builtin_types`).
391    #[cfg(feature = "std")]
392    #[must_use]
393    pub fn find_builtin_type(&self, name: &str) -> Option<zerodds_types::dynamic::DynamicType> {
394        self.inner
395            .type_registry
396            .lock()
397            .ok()
398            .and_then(|reg| reg.get(name).cloned())
399    }
400
401    /// Anzahl registrierter Built-in-Types. Nach `new()` == 4.
402    #[cfg(feature = "std")]
403    #[must_use]
404    pub fn registered_type_count(&self) -> usize {
405        self.inner
406            .type_registry
407            .lock()
408            .map(|r| r.len())
409            .unwrap_or(0)
410    }
411
412    /// Versucht einen TypeLookup-Request fuer einen unbekannten
413    /// `EquivalenceHash` zu queuen. Beachtet Backoff (5s zwischen
414    /// Versuchen) und maximal 3 Wiederholungen pro Hash.
415    ///
416    /// Returns: `true` wenn der Request gequeued wurde, `false` bei
417    /// Backoff-Suppression oder Max-Attempts.
418    #[cfg(feature = "std")]
419    pub fn enqueue_type_lookup(&self, hash: zerodds_types::EquivalenceHash) -> bool {
420        let mut state = match self.inner.type_lookup.lock() {
421            Ok(s) => s,
422            Err(_) => return false,
423        };
424        let now = std::time::Instant::now();
425        if let Some((last, retries)) = state.attempts.get(&hash).copied() {
426            if retries >= TypeLookupState::MAX_ATTEMPTS {
427                return false;
428            }
429            if now.duration_since(last) < TypeLookupState::BACKOFF {
430                return false;
431            }
432            state
433                .attempts
434                .insert(hash, (now, retries.saturating_add(1)));
435        } else {
436            state.attempts.insert(hash, (now, 1));
437        }
438        // Naechste Sequence-Number fuer den Request.
439        let seq = state.outgoing.len() as u64 + 1;
440        state.outgoing.push((hash, seq));
441        true
442    }
443
444    /// Drainet die queued TypeLookup-Requests. Liefert
445    /// `Vec<(hash, seq)>`. In Production-Umgebung wuerde der Caller
446    /// die Hashes via TypeLookupClient + Reliable-Writer auf den
447    /// `TL_SVC_REQ_WRITER`-Endpoint senden.
448    #[cfg(feature = "std")]
449    #[must_use]
450    pub fn drain_type_lookup_requests(&self) -> Vec<(zerodds_types::EquivalenceHash, u64)> {
451        self.inner
452            .type_lookup
453            .lock()
454            .map(|mut s| core::mem::take(&mut s.outgoing))
455            .unwrap_or_default()
456    }
457
458    /// Empfaengt ein TypeLookup-Reply (TypeObjects pro Hash).
459    /// Registriert die TypeObjects in einem internen TypeRegistry-
460    /// Spiegel — danach kann ein gestoppter QoS-Match retried werden.
461    ///
462    /// Anzahl erfolgreich registrierter Typen wird zurueckgegeben.
463    #[cfg(feature = "std")]
464    pub fn ingest_type_lookup_reply(
465        &self,
466        types: Vec<(
467            zerodds_types::EquivalenceHash,
468            zerodds_types::MinimalTypeObject,
469        )>,
470    ) -> usize {
471        let mut count = 0;
472        if let Ok(mut state) = self.inner.type_lookup.lock() {
473            for (hash, _t) in &types {
474                state.attempts.remove(hash);
475                count += 1;
476            }
477        }
478        // Clippy-bait avoidance: types-Vec wird hier konsumiert, der
479        // eigentliche TypeRegistry-Insert kann der Caller machen
480        // (z.B. via shared TypeLookupServer.registry).
481        let _ = types;
482        count
483    }
484
485    /// SEDP-Discovery-Hook: prueft eine eingehende
486    /// `PublicationBuiltinTopicData` auf Type-Hashes, die lokal nicht
487    /// aufloesbar sind. Bei Bedarf wird ein TypeLookup-Request via
488    /// `enqueue_type_lookup` gequeued.
489    ///
490    /// Der RPC-Pfad ist via `DcpsRuntime::send_type_lookup_request`
491    /// auf den TL_SVC_REQ_*-Endpoints (XTypes 1.3 §7.6.3.3.4) live;
492    /// diese Methode entscheidet pro Hash, ob ein Re-Request lohnt
493    /// (lokale Registry-Lookup + Backoff-Tracking).
494    ///
495    /// Returns: Anzahl gequeued unbekannter Hashes (max 2 — minimal +
496    /// complete).
497    #[cfg(feature = "std")]
498    pub fn on_remote_publication_discovered(&self, type_information_blob: Option<&[u8]>) -> usize {
499        self.on_remote_type_information(type_information_blob)
500    }
501
502    /// SEDP-Discovery-Hook fuer
503    /// `SubscriptionBuiltinTopicData`. Symmetrisch zu
504    /// `on_remote_publication_discovered`.
505    #[cfg(feature = "std")]
506    pub fn on_remote_subscription_discovered(&self, type_information_blob: Option<&[u8]>) -> usize {
507        self.on_remote_type_information(type_information_blob)
508    }
509
510    #[cfg(feature = "std")]
511    fn on_remote_type_information(&self, blob: Option<&[u8]>) -> usize {
512        let Some(bytes) = blob else {
513            return 0;
514        };
515        let Ok(ti) = zerodds_types::type_information::TypeInformation::from_bytes_le(bytes) else {
516            return 0;
517        };
518        let mut queued = 0;
519        // Minimal-Hash pruefen.
520        if let Some(hash) = extract_equivalence_hash(&ti.minimal.typeid_with_size.type_id) {
521            if !self.has_type_for_hash(hash) && self.enqueue_type_lookup(hash) {
522                queued += 1;
523            }
524        }
525        // Complete-Hash pruefen (falls vorhanden).
526        if let Some(hash) = extract_equivalence_hash(&ti.complete.typeid_with_size.type_id) {
527            if !self.has_type_for_hash(hash) && self.enqueue_type_lookup(hash) {
528                queued += 1;
529            }
530        }
531        queued
532    }
533
534    /// Internal helper — true wenn der Hash bereits im lokalen
535    /// `TypeLookupServer.registry` aufloesbar ist (entweder via
536    /// `register_type_object` lokal eingespeist oder via vorherigen
537    /// `getTypes`-Reply-Ingest gefuellt). Verhindert dass wir fuer
538    /// Hashes, die wir bereits kennen, redundante Lookup-Requests
539    /// rausgeben.
540    #[cfg(feature = "std")]
541    fn has_type_for_hash(&self, hash: zerodds_types::EquivalenceHash) -> bool {
542        let Some(rt) = self.inner.runtime.as_ref() else {
543            return false;
544        };
545        let Ok(server) = rt.type_lookup_server.lock() else {
546            return false;
547        };
548        server.registry.get_minimal(&hash).is_some()
549            || server.registry.get_complete(&hash).is_some()
550    }
551
552    /// True wenn fuer den Hash bereits MAX_ATTEMPTS erreicht.
553    /// Wird vom Match-Re-Try-Pfad konsultiert: spaeter aufgeben statt
554    /// endlos zu pollen.
555    #[cfg(feature = "std")]
556    #[must_use]
557    pub fn type_lookup_exhausted(&self, hash: zerodds_types::EquivalenceHash) -> bool {
558        self.inner
559            .type_lookup
560            .lock()
561            .ok()
562            .and_then(|s| s.attempts.get(&hash).map(|(_, n)| *n))
563            .unwrap_or(0)
564            >= TypeLookupState::MAX_ATTEMPTS
565    }
566
567    /// Erzeugt einen typed Topic-Handle. Wiederholte Aufrufe mit
568    /// gleichem Namen + Typ liefern denselben Handle (Ref-geteilt).
569    ///
570    /// # Errors
571    /// - `InconsistentPolicy` wenn ein Topic mit diesem Namen
572    ///   bereits unter anderem Typ registriert ist.
573    /// - `BadParameter` bei leerem Namen.
574    pub fn create_topic<T: DdsType>(&self, name: &str, qos: TopicQos) -> Result<Topic<T>> {
575        if name.is_empty() {
576            return Err(DdsError::BadParameter { what: "topic name" });
577        }
578        let mut topics = self
579            .inner
580            .topics
581            .lock()
582            .map_err(|_| DdsError::PreconditionNotMet {
583                reason: "topic registry poisoned",
584            })?;
585        if let Some(existing) = topics.get(name) {
586            if existing.type_name != T::TYPE_NAME {
587                // Inconsistent-Topic-Detection. Bumpt den
588                // Counter auf dem existierenden Topic — beim
589                // naechsten `inconsistent_topic_status()`-Read wird
590                // der Listener via Bubble-Up gefeuert.
591                #[cfg(feature = "std")]
592                existing
593                    .inconsistent_topic_count
594                    .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
595                return Err(DdsError::InconsistentPolicy {
596                    what: "topic name reused with different type",
597                });
598            }
599            // Gleicher Typ → shared handle.
600            return Ok(reconstruct_topic::<T>(existing.clone(), self.clone()));
601        }
602        let topic = Topic::<T>::new(name.into(), qos, self.clone());
603        topics.insert(name.into(), topic_inner(&topic));
604        Ok(topic)
605    }
606
607    /// Sofortiger lokaler Lookup eines Topics nach Name — gibt `None`
608    /// zurueck, wenn kein lokales `create_topic` mit diesem Namen
609    /// erfolgt ist. **Macht keinen Discovery-Wait** (das ist
610    /// `find_topic`). Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.12
611    /// "lookup_topicdescription".
612    #[must_use]
613    pub fn lookup_topicdescription(&self, name: &str) -> Option<TopicDescriptionHandle> {
614        let topics = self.inner.topics.lock().ok()?;
615        let inner = topics.get(name)?;
616        Some(TopicDescriptionHandle::new(
617            inner.name.clone(),
618            String::from(inner.type_name),
619            self.clone(),
620        ))
621    }
622
623    /// Wartet bis ein Topic mit dem gegebenen Namen via Discovery
624    /// (SEDP-Publication oder -Subscription) sichtbar ist — oder bis
625    /// `timeout` abgelaufen ist. Spec-Referenz: OMG DDS 1.4
626    /// §2.2.2.2.1.11 `find_topic`.
627    ///
628    /// Returns:
629    /// - `Ok(handle)` mit Name + Type-Name + Participant, falls
630    ///   waehrend `timeout` ein passendes SEDP-Endpoint sichtbar
631    ///   wurde. Lokale Topics zaehlen ebenfalls (keine Pflicht zu
632    ///   warten, wenn `create_topic` schon lief).
633    /// - `Err(Timeout)` wenn `timeout` abgelaufen ist.
634    ///
635    /// # Errors
636    /// - `DdsError::Timeout` wenn `timeout` ohne Discovery-Match
637    ///   abgelaufen ist.
638    /// - `DdsError::BadParameter` bei leerem Namen.
639    #[cfg(feature = "std")]
640    pub fn find_topic(
641        &self,
642        name: &str,
643        timeout: core::time::Duration,
644    ) -> Result<TopicDescriptionHandle> {
645        if name.is_empty() {
646            return Err(DdsError::BadParameter { what: "topic name" });
647        }
648        let deadline = std::time::Instant::now() + timeout;
649        // Sofort lokal pruefen — vermeidet busy-wait wenn das Topic
650        // bereits via create_topic lokal angelegt ist.
651        if let Some(h) = self.lookup_topicdescription(name) {
652            return Ok(h);
653        }
654        // Poll-Loop ueber den SEDP-Cache. Spec laesst die Strategie
655        // offen; Cyclone-DDS pollt ebenfalls.
656        let poll = core::time::Duration::from_millis(20);
657        loop {
658            if let Some(handle) = self.find_topic_in_sedp(name) {
659                return Ok(handle);
660            }
661            if std::time::Instant::now() >= deadline {
662                return Err(DdsError::Timeout);
663            }
664            std::thread::sleep(poll);
665        }
666    }
667
668    /// Helper: schaut im SEDP-Cache nach, ob ein remote Endpoint
669    /// (Publication oder Subscription) ein Topic mit dem Namen
670    /// announciert hat. Liefert das erste Match (Name + Type-Name).
671    #[cfg(feature = "std")]
672    fn find_topic_in_sedp(&self, name: &str) -> Option<TopicDescriptionHandle> {
673        let rt = self.inner.runtime.as_ref()?;
674        let sedp = rt.sedp.lock().ok()?;
675        // Publications zuerst pruefen.
676        for p in sedp.cache().publications() {
677            if p.data.topic_name == name {
678                return Some(TopicDescriptionHandle::new(
679                    p.data.topic_name.clone(),
680                    p.data.type_name.clone(),
681                    self.clone(),
682                ));
683            }
684        }
685        for s in sedp.cache().subscriptions() {
686            if s.data.topic_name == name {
687                return Some(TopicDescriptionHandle::new(
688                    s.data.topic_name.clone(),
689                    s.data.type_name.clone(),
690                    self.clone(),
691                ));
692            }
693        }
694        None
695    }
696
697    /// Erzeugt ein `ContentFilteredTopic` als Subset eines bereits
698    /// vorhandenen `Topic<T>`. Spec-Referenz: OMG DDS 1.4
699    /// §2.2.2.2.1.13 `create_contentfilteredtopic`.
700    ///
701    /// Die `filter_expression` ist ein SQL-Subset (siehe Annex B).
702    /// `filter_parameters` sind Strings, die `%0`, `%1`, ... in der
703    /// Expression ersetzen.
704    ///
705    /// # Errors
706    /// - `BadParameter` bei leerem Namen oder leerer Expression.
707    /// - `BadParameter` wenn die Filter-Expression nicht parst.
708    /// - `BadParameter` wenn ein referenzierter `%N`-Parameter nicht
709    ///   im `filter_parameters`-Vec geliefert wird.
710    pub fn create_contentfilteredtopic<T: DdsType>(
711        &self,
712        name: &str,
713        related_topic: &Topic<T>,
714        filter_expression: &str,
715        filter_parameters: alloc::vec::Vec<String>,
716    ) -> Result<ContentFilteredTopic<T>> {
717        if name.is_empty() {
718            return Err(DdsError::BadParameter {
719                what: "content-filtered-topic name",
720            });
721        }
722        if filter_expression.is_empty() {
723            return Err(DdsError::BadParameter {
724                what: "filter expression",
725            });
726        }
727        ContentFilteredTopic::<T>::new(
728            name.into(),
729            related_topic.clone(),
730            filter_expression.into(),
731            filter_parameters,
732            self.clone(),
733        )
734    }
735
736    /// Erzeugt eine `MultiTopic` als kombinierende TopicDescription
737    /// ueber 1+ Underlying-Topics mit SQL-Subscription-Expression.
738    /// Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.15 `create_multitopic`
739    /// (optionales Spec-Feature).
740    ///
741    /// # Errors
742    /// - `BadParameter` bei leerem Namen oder Type-Namen.
743    /// - `BadParameter` wenn `related_topic_names` leer ist.
744    /// - `BadParameter` wenn die Subscription-Expression nicht parst.
745    /// - `BadParameter` wenn ein referenzierter `%N`-Parameter nicht
746    ///   im `expression_parameters`-Vec geliefert wird.
747    pub fn create_multitopic<T: DdsType>(
748        &self,
749        name: &str,
750        type_name: &str,
751        related_topic_names: alloc::vec::Vec<String>,
752        subscription_expression: &str,
753        expression_parameters: alloc::vec::Vec<String>,
754    ) -> Result<crate::topic::MultiTopic<T>> {
755        if name.is_empty() {
756            return Err(DdsError::BadParameter {
757                what: "multitopic name",
758            });
759        }
760        if type_name.is_empty() {
761            return Err(DdsError::BadParameter {
762                what: "multitopic type_name",
763            });
764        }
765        if subscription_expression.is_empty() {
766            return Err(DdsError::BadParameter {
767                what: "multitopic subscription expression",
768            });
769        }
770        crate::topic::MultiTopic::<T>::new(
771            name.into(),
772            type_name.into(),
773            related_topic_names,
774            subscription_expression.into(),
775            expression_parameters,
776            self.clone(),
777        )
778    }
779
780    /// Loescht eine `MultiTopic`. Spec §2.2.2.2.1.16
781    /// `delete_multitopic`. v1.2 ist es ein no-op-shim mit Participant-
782    /// Match-Check.
783    ///
784    /// # Errors
785    /// `BadParameter` wenn die MultiTopic zu einem anderen Participant
786    /// gehoert.
787    pub fn delete_multitopic<T: DdsType>(&self, mt: &crate::topic::MultiTopic<T>) -> Result<()> {
788        if mt.get_participant().inner_ptr() != self.inner_ptr() {
789            return Err(DdsError::BadParameter {
790                what: "multitopic belongs to different participant",
791            });
792        }
793        Ok(())
794    }
795
796    /// Loescht ein `ContentFilteredTopic`. Spec-Referenz:
797    /// §2.2.2.2.1.14 `delete_contentfilteredtopic`.
798    ///
799    /// In Rust ist das Lifetime-Handle des CFT bereits durch `Drop`
800    /// abgedeckt — die zugrundeliegenden Ressourcen werden frei, sobald
801    /// der `ContentFilteredTopic<T>` aus dem Scope geht. Diese Methode
802    /// existiert fuer Spec-Compliance der C++-API und validiert den
803    /// `Participant`-Match (Spec verlangt `BadParameter`, wenn das CFT
804    /// zu einem anderen Participant gehoert).
805    ///
806    /// # Errors
807    /// - `BadParameter` wenn das CFT zu einem anderen Participant
808    ///   gehoert.
809    pub fn delete_contentfilteredtopic<T: DdsType>(
810        &self,
811        cft: &ContentFilteredTopic<T>,
812    ) -> Result<()> {
813        if cft.get_participant().inner_ptr() != self.inner_ptr() {
814            return Err(DdsError::BadParameter {
815                what: "cft belongs to different participant",
816            });
817        }
818        Ok(())
819    }
820
821    /// Interner Identity-Pointer fuer Participant-Vergleich
822    /// (verwendet bei `delete_contentfilteredtopic`-Validierung).
823    pub(crate) fn inner_ptr(&self) -> *const ParticipantInner {
824        Arc::as_ptr(&self.inner)
825    }
826
827    /// Erzeugt einen Publisher mit gegebener QoS (Default reicht fuer
828    /// v1.2).
829    pub fn create_publisher(&self, qos: PublisherQos) -> Publisher {
830        #[cfg(feature = "std")]
831        let p = {
832            let p = Publisher::new(qos, self.inner.runtime.clone());
833            // Bubble-Up-Back-Pointer (weak) verdrahten, damit
834            // Writer-Events bis zum DomainParticipantListener kommen.
835            p.attach_participant(Arc::downgrade(&self.inner));
836            p
837        };
838        #[cfg(not(feature = "std"))]
839        let p = Publisher::new(qos);
840        // Handle fuer contains_entity / delete_contained_entities tracken.
841        if let Ok(mut list) = self.inner.publishers.lock() {
842            list.push(p.inner.entity_state.instance_handle());
843        }
844        p
845    }
846
847    /// Erzeugt einen Subscriber.
848    pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber {
849        #[cfg(feature = "std")]
850        let s = {
851            let s = Subscriber::new(qos, self.inner.runtime.clone());
852            // Bubble-Up-Back-Pointer (weak) verdrahten.
853            s.attach_participant(Arc::downgrade(&self.inner));
854            s
855        };
856        #[cfg(not(feature = "std"))]
857        let s = Subscriber::new(qos);
858        if let Ok(mut list) = self.inner.subscribers.lock() {
859            list.push(s.inner.entity_state.instance_handle());
860        }
861        s
862    }
863
864    /// Anzahl aktuell registrierter Topics. Diagnose-API.
865    #[must_use]
866    pub fn topics_len(&self) -> usize {
867        self.inner.topics.lock().map(|t| t.len()).unwrap_or(0)
868    }
869
870    /// Anzahl aktuell entdeckter Remote-Participants ueber SPDP.
871    /// Spec: OMG DDS 1.4 §2.2.2.2.1.7 `get_discovered_participants`.
872    /// 0 im offline-Modus.
873    #[must_use]
874    pub fn discovered_participants_count(&self) -> usize {
875        #[cfg(feature = "std")]
876        if let Some(rt) = self.inner.runtime.as_ref() {
877            return rt.discovered_participants().len();
878        }
879        0
880    }
881
882    /// Anzahl aktuell im SEDP-Cache bekannter Remote-Publications.
883    /// Spec: OMG DDS 1.4 §2.2.2.2.1.9 `get_discovered_topics` (~analog).
884    #[must_use]
885    pub fn discovered_publications_count(&self) -> usize {
886        #[cfg(feature = "std")]
887        if let Some(rt) = self.inner.runtime.as_ref() {
888            return rt.discovered_publications_count();
889        }
890        0
891    }
892
893    /// Anzahl aktuell im SEDP-Cache bekannter Remote-Subscriptions.
894    #[must_use]
895    pub fn discovered_subscriptions_count(&self) -> usize {
896        #[cfg(feature = "std")]
897        if let Some(rt) = self.inner.runtime.as_ref() {
898            return rt.discovered_subscriptions_count();
899        }
900        0
901    }
902
903    // ============================================================
904    // ignore_* (DDS 1.4 §2.2.2.2.1.14-17)
905    // ============================================================
906
907    /// Markiert einen entdeckten remote `DomainParticipant` als
908    /// "ignoriert" — alle weiteren SPDP-Beacons mit diesem Handle
909    /// fallen aus dem Builtin-Reader-Stream raus, und gleichzeitig
910    /// werden alle SEDP-Endpoints, die zum gleichen Participant-
911    /// Prefix gehoeren, ebenfalls verworfen (Spec §2.2.2.2.1.14).
912    ///
913    /// Per Spec ist die Aktion **monoton** — ein einmal ignorierter
914    /// Participant bleibt es fuer den Lebenszyklus dieses
915    /// Participants.
916    ///
917    /// # Errors
918    /// Aktuell keine — die Methode liefert `Ok(())` immer. Spec laesst
919    /// `OUT_OF_RESOURCES` zu, was wir aber nicht aktiv produzieren.
920    pub fn ignore_participant(&self, handle: InstanceHandle) -> Result<()> {
921        #[cfg(feature = "std")]
922        if let Ok(mut s) = self.inner.ignore_filter.inner.participants.lock() {
923            s.insert(handle);
924        }
925        Ok(())
926    }
927
928    /// Markiert ein entdecktes remote Topic als "ignoriert".
929    /// Spec §2.2.2.2.1.15.
930    ///
931    /// # Errors
932    /// Wie [`Self::ignore_participant`].
933    pub fn ignore_topic(&self, handle: InstanceHandle) -> Result<()> {
934        #[cfg(feature = "std")]
935        if let Ok(mut s) = self.inner.ignore_filter.inner.topics.lock() {
936            s.insert(handle);
937        }
938        Ok(())
939    }
940
941    /// Markiert eine entdeckte remote Publication als "ignoriert".
942    /// Spec §2.2.2.2.1.16.
943    ///
944    /// # Errors
945    /// Wie [`Self::ignore_participant`].
946    pub fn ignore_publication(&self, handle: InstanceHandle) -> Result<()> {
947        #[cfg(feature = "std")]
948        if let Ok(mut s) = self.inner.ignore_filter.inner.publications.lock() {
949            s.insert(handle);
950        }
951        Ok(())
952    }
953
954    /// Markiert eine entdeckte remote Subscription als "ignoriert".
955    /// Spec §2.2.2.2.1.17.
956    ///
957    /// # Errors
958    /// Wie [`Self::ignore_participant`].
959    pub fn ignore_subscription(&self, handle: InstanceHandle) -> Result<()> {
960        #[cfg(feature = "std")]
961        if let Ok(mut s) = self.inner.ignore_filter.inner.subscriptions.lock() {
962            s.insert(handle);
963        }
964        Ok(())
965    }
966
967    /// `true` wenn `handle` per `ignore_participant` markiert wurde.
968    #[must_use]
969    pub fn is_participant_ignored(&self, handle: InstanceHandle) -> bool {
970        #[cfg(feature = "std")]
971        return self.inner.ignore_filter.is_participant_ignored(handle);
972        #[cfg(not(feature = "std"))]
973        {
974            let _ = handle;
975            false
976        }
977    }
978
979    /// `true` wenn `handle` per `ignore_topic` markiert wurde.
980    #[must_use]
981    pub fn is_topic_ignored(&self, handle: InstanceHandle) -> bool {
982        #[cfg(feature = "std")]
983        return self.inner.ignore_filter.is_topic_ignored(handle);
984        #[cfg(not(feature = "std"))]
985        {
986            let _ = handle;
987            false
988        }
989    }
990
991    /// `true` wenn `handle` per `ignore_publication` markiert wurde.
992    #[must_use]
993    pub fn is_publication_ignored(&self, handle: InstanceHandle) -> bool {
994        #[cfg(feature = "std")]
995        return self.inner.ignore_filter.is_publication_ignored(handle);
996        #[cfg(not(feature = "std"))]
997        {
998            let _ = handle;
999            false
1000        }
1001    }
1002
1003    /// `true` wenn `handle` per `ignore_subscription` markiert wurde.
1004    #[must_use]
1005    pub fn is_subscription_ignored(&self, handle: InstanceHandle) -> bool {
1006        #[cfg(feature = "std")]
1007        return self.inner.ignore_filter.is_subscription_ignored(handle);
1008        #[cfg(not(feature = "std"))]
1009        {
1010            let _ = handle;
1011            false
1012        }
1013    }
1014
1015    /// Interner Zugriff auf den shared Ignore-Filter — von
1016    /// Tests + Runtime-Discovery-Hook genutzt.
1017    #[cfg(feature = "std")]
1018    #[must_use]
1019    #[allow(dead_code)]
1020    pub(crate) fn ignore_filter(&self) -> IgnoreFilter {
1021        self.inner.ignore_filter.clone()
1022    }
1023
1024    // ============================================================
1025    // delete_contained_entities (DDS 1.4 §2.2.2.2.1.18)
1026    // ============================================================
1027
1028    /// Loescht **alle** vom Participant gehaltenen Children
1029    /// (Publishers, Subscribers, Topics, Builtin-Reader-Inboxes).
1030    /// Spec §2.2.2.2.1.18 — analoger Pendant existiert in
1031    /// Publisher/Subscriber/DataReader, der hier rekursiv mit
1032    /// abgedeckt wird.
1033    ///
1034    /// Offline-Verhalten:
1035    /// - Topic-Registry geleert (lokale Topics).
1036    /// - Publisher-/Subscriber-Tracker geleert.
1037    /// - Builtin-Topic-Reader-Inboxes geleert (so dass
1038    ///   `take()` nach `delete_contained_entities` ein leeres
1039    ///   Vec liefert).
1040    /// - **Kein** SEDP-Unannounce — das Live-Verhalten
1041    ///   uebernimmt das, sobald die Runtime ein
1042    ///   `Drop`/`shutdown`-Handle bekommt. Aktueller Stand: der
1043    ///   Runtime-Thread laeuft bis zum Process-Exit.
1044    ///
1045    /// # Errors
1046    /// `PreconditionNotMet` wenn ein interner Mutex vergiftet ist.
1047    pub fn delete_contained_entities(&self) -> Result<()> {
1048        // Topic-Registry leeren.
1049        {
1050            let mut topics =
1051                self.inner
1052                    .topics
1053                    .lock()
1054                    .map_err(|_| DdsError::PreconditionNotMet {
1055                        reason: "topic registry poisoned",
1056                    })?;
1057            topics.clear();
1058        }
1059        // Publisher-/Subscriber-Marker leeren.
1060        if let Ok(mut p) = self.inner.publishers.lock() {
1061            p.clear();
1062        }
1063        if let Ok(mut s) = self.inner.subscribers.lock() {
1064            s.clear();
1065        }
1066        // Builtin-Reader-Inboxes leeren — User soll nach
1067        // delete_contained_entities() einen sauberen Builtin-
1068        // Subscriber sehen, der erst neue (post-delete) Discovery-
1069        // Events ausliefert.
1070        let sinks = self.inner.builtin_subscriber.sinks();
1071        if let Ok(mut g) = sinks.participant.lock() {
1072            g.clear();
1073        }
1074        if let Ok(mut g) = sinks.topic.lock() {
1075            g.clear();
1076        }
1077        if let Ok(mut g) = sinks.publication.lock() {
1078            g.clear();
1079        }
1080        if let Ok(mut g) = sinks.subscription.lock() {
1081            g.clear();
1082        }
1083        Ok(())
1084    }
1085
1086    /// Anzahl der per `create_publisher` getrackten Publisher.
1087    /// Diagnose-API fuer Tests.
1088    #[must_use]
1089    pub fn publishers_len(&self) -> usize {
1090        self.inner.publishers.lock().map(|p| p.len()).unwrap_or(0)
1091    }
1092
1093    /// Anzahl der per `create_subscriber` getrackten Subscriber.
1094    #[must_use]
1095    pub fn subscribers_len(&self) -> usize {
1096        self.inner.subscribers.lock().map(|s| s.len()).unwrap_or(0)
1097    }
1098
1099    /// Liefert den `InstanceHandle` dieses Participants. Identifiziert
1100    /// die Entity gegenueber DCPS-API-Konsumenten (Spec §2.2.2.1.1
1101    /// `get_instance_handle`).
1102    #[must_use]
1103    pub fn instance_handle(&self) -> InstanceHandle {
1104        self.inner.entity_state.instance_handle()
1105    }
1106
1107    /// Spec §2.2.2.2.1.10 `contains_entity` — `true` wenn `handle` zu
1108    /// diesem Participant oder einer seiner direkt **oder rekursiv**
1109    /// enthaltenen Entities gehoert.
1110    ///
1111    /// **Eingeschlossene Entity-Typen:**
1112    /// - der Participant selbst
1113    /// - alle per `create_topic` registrierten Topics
1114    /// - alle per `create_publisher` / `create_subscriber` erzeugten
1115    ///   Publisher/Subscriber
1116    /// - **rekursiv**: alle per `Publisher::create_datawriter` /
1117    ///   `Subscriber::create_datareader` erzeugten DataWriter/DataReader.
1118    #[must_use]
1119    pub fn contains_entity(&self, handle: InstanceHandle) -> bool {
1120        if self.instance_handle() == handle {
1121            return true;
1122        }
1123        if let Ok(topics) = self.inner.topics.lock() {
1124            for t in topics.values() {
1125                if t.entity_state.instance_handle() == handle {
1126                    return true;
1127                }
1128            }
1129        }
1130        if let Ok(pubs) = self.inner.publishers.lock() {
1131            if pubs.contains(&handle) {
1132                return true;
1133            }
1134        }
1135        if let Ok(subs) = self.inner.subscribers.lock() {
1136            if subs.contains(&handle) {
1137                return true;
1138            }
1139        }
1140        if let Ok(dws) = self.inner.datawriters.lock() {
1141            if dws.contains(&handle) {
1142                return true;
1143            }
1144        }
1145        if let Ok(drs) = self.inner.datareaders.lock() {
1146            if drs.contains(&handle) {
1147                return true;
1148            }
1149        }
1150        false
1151    }
1152
1153    // ============================================================
1154    // get_discovered_* (DDS 1.4 §2.2.2.2.1.27-30)
1155    // ============================================================
1156
1157    /// Liefert die `InstanceHandle`s aller aktuell entdeckten
1158    /// remote Participants (Spec §2.2.2.2.1.27). Im offline-Modus
1159    /// leer. Ignorierte Participants tauchen **nicht** auf.
1160    #[cfg(feature = "std")]
1161    #[must_use]
1162    pub fn get_discovered_participants(&self) -> Vec<InstanceHandle> {
1163        let Some(rt) = self.inner.runtime.as_ref() else {
1164            return Vec::new();
1165        };
1166        let mut out = Vec::new();
1167        for d in rt.discovered_participants() {
1168            let h = InstanceHandle::from_guid(d.data.guid);
1169            if self.is_participant_ignored(h) {
1170                continue;
1171            }
1172            out.push(h);
1173        }
1174        out
1175    }
1176
1177    /// Offline-Variante (kein std → keine Runtime).
1178    #[cfg(not(feature = "std"))]
1179    #[must_use]
1180    pub fn get_discovered_participants(&self) -> Vec<InstanceHandle> {
1181        Vec::new()
1182    }
1183
1184    /// Liefert die `ParticipantBuiltinTopicData` zu einem Handle aus
1185    /// `get_discovered_participants` (Spec §2.2.2.2.1.28).
1186    ///
1187    /// # Errors
1188    /// `BadParameter` wenn `handle` keinen entdeckten Participant
1189    /// referenziert (oder wenn er ignoriert wurde).
1190    #[cfg(feature = "std")]
1191    pub fn get_discovered_participant_data(
1192        &self,
1193        handle: InstanceHandle,
1194    ) -> Result<ParticipantBuiltinTopicData> {
1195        if self.is_participant_ignored(handle) {
1196            return Err(DdsError::BadParameter {
1197                what: "participant handle is ignored",
1198            });
1199        }
1200        let Some(rt) = self.inner.runtime.as_ref() else {
1201            return Err(DdsError::BadParameter {
1202                what: "no runtime — offline participant",
1203            });
1204        };
1205        for d in rt.discovered_participants() {
1206            if InstanceHandle::from_guid(d.data.guid) == handle {
1207                return Ok(ParticipantBuiltinTopicData::from_wire(&d.data));
1208            }
1209        }
1210        Err(DdsError::BadParameter {
1211            what: "unknown participant handle",
1212        })
1213    }
1214
1215    /// Offline-Variante.
1216    #[cfg(not(feature = "std"))]
1217    pub fn get_discovered_participant_data(
1218        &self,
1219        _handle: InstanceHandle,
1220    ) -> Result<ParticipantBuiltinTopicData> {
1221        Err(DdsError::BadParameter {
1222            what: "no runtime — offline participant",
1223        })
1224    }
1225
1226    /// Liefert die `InstanceHandle`s aller aktuell entdeckten
1227    /// remote Topics. Spec §2.2.2.2.1.29.
1228    ///
1229    /// Topics werden via SEDP-Pub/Sub-Announcements indirekt
1230    /// entdeckt — pro `(topic_name, type_name)` synthetisieren wir
1231    /// einen stabilen Schluessel via `TopicBuiltinTopicData::
1232    /// synthesize_key`. Ignorierte Topics tauchen nicht auf.
1233    #[cfg(feature = "std")]
1234    #[must_use]
1235    pub fn get_discovered_topics(&self) -> Vec<InstanceHandle> {
1236        let Some(rt) = self.inner.runtime.as_ref() else {
1237            return Vec::new();
1238        };
1239        let Ok(sedp) = rt.sedp.lock() else {
1240            return Vec::new();
1241        };
1242        let mut seen = BTreeSet::new();
1243        for p in sedp.cache().publications() {
1244            let key = TopicBuiltinTopicData::synthesize_key(&p.data.topic_name, &p.data.type_name);
1245            let h = InstanceHandle::from_guid(key);
1246            if self.is_topic_ignored(h) {
1247                continue;
1248            }
1249            seen.insert(h);
1250        }
1251        for s in sedp.cache().subscriptions() {
1252            let key = TopicBuiltinTopicData::synthesize_key(&s.data.topic_name, &s.data.type_name);
1253            let h = InstanceHandle::from_guid(key);
1254            if self.is_topic_ignored(h) {
1255                continue;
1256            }
1257            seen.insert(h);
1258        }
1259        seen.into_iter().collect()
1260    }
1261
1262    /// Offline-Variante.
1263    #[cfg(not(feature = "std"))]
1264    #[must_use]
1265    pub fn get_discovered_topics(&self) -> Vec<InstanceHandle> {
1266        Vec::new()
1267    }
1268
1269    /// Liefert die `TopicBuiltinTopicData` zu einem Handle aus
1270    /// `get_discovered_topics`. Spec §2.2.2.2.1.30.
1271    ///
1272    /// # Errors
1273    /// `BadParameter` wenn `handle` keinem entdeckten Topic
1274    /// entspricht (oder ignoriert wurde).
1275    #[cfg(feature = "std")]
1276    pub fn get_discovered_topic_data(
1277        &self,
1278        handle: InstanceHandle,
1279    ) -> Result<TopicBuiltinTopicData> {
1280        if self.is_topic_ignored(handle) {
1281            return Err(DdsError::BadParameter {
1282                what: "topic handle is ignored",
1283            });
1284        }
1285        let Some(rt) = self.inner.runtime.as_ref() else {
1286            return Err(DdsError::BadParameter {
1287                what: "no runtime — offline participant",
1288            });
1289        };
1290        let Ok(sedp) = rt.sedp.lock() else {
1291            return Err(DdsError::PreconditionNotMet {
1292                reason: "sedp poisoned",
1293            });
1294        };
1295        // Erste Match auf Pub-Seite.
1296        for p in sedp.cache().publications() {
1297            let topic = TopicBuiltinTopicData::from_publication(&p.data);
1298            if InstanceHandle::from_guid(topic.key) == handle {
1299                return Ok(topic);
1300            }
1301        }
1302        for s in sedp.cache().subscriptions() {
1303            let topic = TopicBuiltinTopicData::from_subscription(&s.data);
1304            if InstanceHandle::from_guid(topic.key) == handle {
1305                return Ok(topic);
1306            }
1307        }
1308        Err(DdsError::BadParameter {
1309            what: "unknown topic handle",
1310        })
1311    }
1312
1313    /// Offline-Variante.
1314    #[cfg(not(feature = "std"))]
1315    pub fn get_discovered_topic_data(
1316        &self,
1317        _handle: InstanceHandle,
1318    ) -> Result<TopicBuiltinTopicData> {
1319        Err(DdsError::BadParameter {
1320            what: "no runtime — offline participant",
1321        })
1322    }
1323
1324    /// Builtin-Subscriber des Participants (DDS 1.4 §2.2.2.2.1.7).
1325    ///
1326    /// Liefert immer denselben Subscriber-Handle (genau ein
1327    /// Builtin-Subscriber pro Participant). Er enthaelt 4
1328    /// vor-erzeugte Reader fuer die Builtin-Topics:
1329    ///
1330    /// - `DCPSParticipant` → `ParticipantBuiltinTopicData`
1331    /// - `DCPSTopic` → `TopicBuiltinTopicData`
1332    /// - `DCPSPublication` → `PublicationBuiltinTopicData`
1333    /// - `DCPSSubscription` → `SubscriptionBuiltinTopicData`
1334    ///
1335    /// SPDP-/SEDP-Receive triggert intern einen Sample-Insert, der
1336    /// per `take()`/`read()` abgeholt werden kann (DDS 1.4 §2.2.5).
1337    ///
1338    /// # Example
1339    /// ```
1340    /// use zerodds_dcps::*;
1341    /// let participant = DomainParticipantFactory::instance()
1342    ///     .create_participant_offline(0, DomainParticipantQos::default());
1343    /// let bs = participant.get_builtin_subscriber();
1344    /// let r = bs
1345    ///     .lookup_datareader::<DcpsParticipantBuiltinTopicData>("DCPSParticipant")
1346    ///     .expect("builtin reader");
1347    /// // Anfangs leer (offline-Mode → keine SPDP-Empfange).
1348    /// assert!(r.take().expect("take").is_empty());
1349    /// ```
1350    #[must_use]
1351    pub fn get_builtin_subscriber(&self) -> Arc<BuiltinSubscriber> {
1352        Arc::clone(&self.inner.builtin_subscriber)
1353    }
1354
1355    // ============================================================
1356    // Listener-Slot (DDS 1.4 §2.2.2.2.3)
1357    // ============================================================
1358
1359    /// Setzt den `DomainParticipantListener`. `listener=None` loescht
1360    /// den Slot. `mask` ist die [`StatusMask`], die festlegt, welche
1361    /// Status-Bits dieser Listener konsumiert (Spec §2.2.4.2.3 Bubble-Up).
1362    pub fn set_listener(&self, listener: Option<ArcDomainParticipantListener>, mask: StatusMask) {
1363        if let Ok(mut slot) = self.inner.listener.lock() {
1364            *slot = listener.map(|l| (l, mask));
1365        }
1366        // Spiegele die Mask ins EntityState — fuer get_listener_mask().
1367        self.inner.entity_state.set_listener_mask(mask);
1368    }
1369
1370    /// Liefert den aktuell installierten Listener-Klon, falls vorhanden.
1371    /// Spec §2.2.2.2.3.x get_listener.
1372    #[must_use]
1373    pub fn get_listener(&self) -> Option<ArcDomainParticipantListener> {
1374        self.inner
1375            .listener
1376            .lock()
1377            .ok()
1378            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
1379    }
1380
1381    /// Snapshot des Listener-Slots (Listener + Mask) — fuer den
1382    /// Dispatch-Pfad. Klont den Arc unter dem Mutex und gibt das
1383    /// Lock direkt frei (Lock-Discipline: Callbacks aussen ausfuehren).
1384    #[must_use]
1385    #[allow(dead_code)] // benutzt via Topic::listener_chain (cfg(std))
1386    pub(crate) fn snapshot_listener(&self) -> Option<(ArcDomainParticipantListener, StatusMask)> {
1387        self.inner
1388            .listener
1389            .lock()
1390            .ok()
1391            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
1392    }
1393}
1394
1395// ============================================================================
1396// Entity-Trait (DCPS §2.2.2.1) —
1397// ============================================================================
1398
1399impl crate::entity::Entity for DomainParticipant {
1400    type Qos = DomainParticipantQos;
1401
1402    fn get_qos(&self) -> Self::Qos {
1403        self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
1404    }
1405
1406    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
1407        // DomainParticipantQos: USER_DATA + ENTITY_FACTORY sind alle
1408        // Changeable=YES per Spec §2.2.3 — kein Immutable-Check nötig.
1409        if let Ok(mut current) = self.inner.qos.lock() {
1410            *current = qos;
1411        }
1412        Ok(())
1413    }
1414
1415    fn enable(&self) -> Result<()> {
1416        self.inner.entity_state.enable();
1417        Ok(())
1418    }
1419
1420    fn entity_state(&self) -> Arc<crate::entity::EntityState> {
1421        Arc::clone(&self.inner.entity_state)
1422    }
1423}
1424
1425// ---- interne Helfer ----
1426
1427fn topic_inner<T: DdsType>(t: &Topic<T>) -> Arc<TopicInner> {
1428    t.inner()
1429}
1430
1431/// Extrahiert den `EquivalenceHash` aus einem
1432/// `TypeIdentifier`, sofern es einer der Hash-Varianten ist.
1433#[cfg(feature = "std")]
1434fn extract_equivalence_hash(
1435    ti: &zerodds_types::TypeIdentifier,
1436) -> Option<zerodds_types::EquivalenceHash> {
1437    use zerodds_types::TypeIdentifier;
1438    match ti {
1439        TypeIdentifier::EquivalenceHashMinimal(h) | TypeIdentifier::EquivalenceHashComplete(h) => {
1440            Some(*h)
1441        }
1442        _ => None,
1443    }
1444}
1445
1446fn reconstruct_topic<T: DdsType>(
1447    inner: Arc<TopicInner>,
1448    participant: DomainParticipant,
1449) -> Topic<T> {
1450    // Der TopicInner selbst ist generisch-agnostisch (nur Name +
1451    // type-name-String); wir setzen einen neuen Topic-Handle mit
1452    // demselben Inner auf. `Topic::new` wuerde einen neuen Inner
1453    // anlegen — wir wollen aber den shared inner teilen.
1454    Topic::<T>::from_inner(inner, participant)
1455}
1456
1457// Topic braucht dafuer einen `from_inner`-Konstruktor.
1458impl<T: DdsType> Topic<T> {
1459    pub(crate) fn from_inner(inner: Arc<TopicInner>, participant: DomainParticipant) -> Self {
1460        Self::_from_inner_impl(inner, participant)
1461    }
1462}
1463
1464// Da `Topic<T>` seinen Inner privat haelt, brauchen wir einen
1465// `_from_inner_impl`-Shortcut ebenfalls im topic-Modul. Der ist
1466// gleich neben dem Konstruktor.
1467
1468#[cfg(test)]
1469#[allow(clippy::expect_used, clippy::unwrap_used)]
1470mod tests {
1471    use super::*;
1472    use crate::dds_type::RawBytes;
1473
1474    #[test]
1475    fn participant_created_with_domain_id() {
1476        let p = DomainParticipant::new(42, DomainParticipantQos::default());
1477        assert_eq!(p.domain_id(), 42);
1478        assert_eq!(p.topics_len(), 0);
1479    }
1480
1481    #[test]
1482    fn create_topic_stores_in_registry() {
1483        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1484        let t1 = p
1485            .create_topic::<RawBytes>("Chatter", TopicQos::default())
1486            .unwrap();
1487        let t2 = p
1488            .create_topic::<RawBytes>("Chatter", TopicQos::default())
1489            .unwrap();
1490        assert_eq!(t1.name(), t2.name());
1491        assert_eq!(p.topics_len(), 1);
1492    }
1493
1494    #[test]
1495    fn create_topic_rejects_type_conflict() {
1496        // Zweiter DdsType fuer Test.
1497        #[derive(Debug)]
1498        struct DummyU32(u32);
1499        impl DdsType for DummyU32 {
1500            const TYPE_NAME: &'static str = "test::DummyU32";
1501            fn encode(
1502                &self,
1503                out: &mut alloc::vec::Vec<u8>,
1504            ) -> core::result::Result<(), crate::dds_type::EncodeError> {
1505                out.extend_from_slice(&self.0.to_le_bytes());
1506                Ok(())
1507            }
1508            fn decode(bytes: &[u8]) -> core::result::Result<Self, crate::dds_type::DecodeError> {
1509                if bytes.len() != 4 {
1510                    return Err(crate::dds_type::DecodeError::Invalid { what: "u32 len" });
1511                }
1512                let mut a = [0u8; 4];
1513                a.copy_from_slice(bytes);
1514                Ok(Self(u32::from_le_bytes(a)))
1515            }
1516        }
1517
1518        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1519        let _ = p
1520            .create_topic::<RawBytes>("X", TopicQos::default())
1521            .unwrap();
1522        let err = p
1523            .create_topic::<DummyU32>("X", TopicQos::default())
1524            .unwrap_err();
1525        assert!(matches!(err, DdsError::InconsistentPolicy { .. }));
1526    }
1527
1528    #[test]
1529    fn create_topic_rejects_empty_name() {
1530        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1531        let err = p
1532            .create_topic::<RawBytes>("", TopicQos::default())
1533            .unwrap_err();
1534        assert!(matches!(err, DdsError::BadParameter { .. }));
1535    }
1536
1537    #[test]
1538    fn lookup_topicdescription_returns_local_topics() {
1539        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1540        let _t = p
1541            .create_topic::<RawBytes>("Hello", TopicQos::default())
1542            .unwrap();
1543        let h = p.lookup_topicdescription("Hello").expect("local lookup");
1544        use crate::topic::TopicDescription as _;
1545        assert_eq!(h.get_name(), "Hello");
1546        assert_eq!(h.get_type_name(), RawBytes::TYPE_NAME);
1547        assert_eq!(h.get_participant().domain_id(), 0);
1548    }
1549
1550    #[test]
1551    fn lookup_topicdescription_none_for_unknown() {
1552        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1553        assert!(p.lookup_topicdescription("Unknown").is_none());
1554    }
1555
1556    // ---- §2.2.2.2.1.10 contains_entity ----
1557
1558    #[test]
1559    fn contains_entity_returns_true_for_self_handle() {
1560        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1561        let h = p.instance_handle();
1562        assert!(p.contains_entity(h));
1563    }
1564
1565    #[test]
1566    fn contains_entity_returns_true_for_local_topic() {
1567        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1568        let t = p
1569            .create_topic::<RawBytes>("Hi", TopicQos::default())
1570            .unwrap();
1571        let topic_handle = t.inner().entity_state.instance_handle();
1572        assert!(p.contains_entity(topic_handle));
1573    }
1574
1575    #[test]
1576    fn contains_entity_returns_true_for_local_publisher() {
1577        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1578        let pub_ = p.create_publisher(PublisherQos::default());
1579        let h = pub_.inner.entity_state.instance_handle();
1580        assert!(p.contains_entity(h));
1581    }
1582
1583    #[test]
1584    fn contains_entity_returns_true_for_local_subscriber() {
1585        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1586        let s = p.create_subscriber(SubscriberQos::default());
1587        let h = s.inner.entity_state.instance_handle();
1588        assert!(p.contains_entity(h));
1589    }
1590
1591    #[test]
1592    fn contains_entity_returns_false_for_unknown_handle() {
1593        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1594        // Ein anderer Participant hat einen anderen Handle.
1595        let other = DomainParticipant::new(0, DomainParticipantQos::default());
1596        let other_h = other.instance_handle();
1597        assert!(!p.contains_entity(other_h));
1598    }
1599
1600    #[test]
1601    fn contains_entity_returns_false_for_topic_after_delete() {
1602        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1603        let t = p
1604            .create_topic::<RawBytes>("Tmp", TopicQos::default())
1605            .unwrap();
1606        let topic_handle = t.inner().entity_state.instance_handle();
1607        assert!(p.contains_entity(topic_handle));
1608        p.delete_contained_entities().unwrap();
1609        assert!(!p.contains_entity(topic_handle));
1610    }
1611
1612    #[test]
1613    fn contains_entity_recursive_finds_local_datawriter() {
1614        // §2.2.2.2.1.10 — contains_entity MUSS auch DataWriter-Handles
1615        // erkennen, die ueber Publisher::create_datawriter erzeugt wurden.
1616        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1617        let topic = p
1618            .create_topic::<RawBytes>("Hello", TopicQos::default())
1619            .unwrap();
1620        let pub_ = p.create_publisher(PublisherQos::default());
1621        let dw = pub_
1622            .create_datawriter(&topic, crate::qos::DataWriterQos::default())
1623            .unwrap();
1624        let dw_handle = dw.instance_handle();
1625        assert!(p.contains_entity(dw_handle));
1626        // Plus: Publisher selbst exposes contains_writer(handle).
1627        assert!(pub_.contains_writer(dw_handle));
1628    }
1629
1630    #[test]
1631    fn contains_entity_recursive_finds_local_datareader() {
1632        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1633        let topic = p
1634            .create_topic::<RawBytes>("Hello2", TopicQos::default())
1635            .unwrap();
1636        let sub = p.create_subscriber(SubscriberQos::default());
1637        let dr = sub
1638            .create_datareader(&topic, crate::qos::DataReaderQos::default())
1639            .unwrap();
1640        let dr_handle = dr.subscription_handle();
1641        assert!(p.contains_entity(dr_handle));
1642        assert!(sub.contains_reader(dr_handle));
1643    }
1644
1645    #[test]
1646    fn contains_entity_recursive_does_not_find_foreign_datawriter() {
1647        // Negativ: DW, der ueber einen anderen Participant erzeugt wurde,
1648        // ist NICHT contained.
1649        let p1 = DomainParticipant::new(0, DomainParticipantQos::default());
1650        let p2 = DomainParticipant::new(1, DomainParticipantQos::default());
1651        let topic = p2
1652            .create_topic::<RawBytes>("Foreign", TopicQos::default())
1653            .unwrap();
1654        let pub2 = p2.create_publisher(PublisherQos::default());
1655        let dw2 = pub2
1656            .create_datawriter(&topic, crate::qos::DataWriterQos::default())
1657            .unwrap();
1658        assert!(!p1.contains_entity(dw2.instance_handle()));
1659        assert!(p2.contains_entity(dw2.instance_handle()));
1660    }
1661
1662    #[cfg(feature = "std")]
1663    #[test]
1664    fn find_topic_returns_immediately_for_local() {
1665        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1666        let _t = p
1667            .create_topic::<RawBytes>("Local", TopicQos::default())
1668            .unwrap();
1669        let started = std::time::Instant::now();
1670        let h = p
1671            .find_topic("Local", core::time::Duration::from_secs(5))
1672            .expect("local find");
1673        // Sollte deutlich unter dem Timeout liegen — lokal ist
1674        // sofortiger Return.
1675        assert!(started.elapsed() < core::time::Duration::from_millis(50));
1676        use crate::topic::TopicDescription as _;
1677        assert_eq!(h.get_name(), "Local");
1678    }
1679
1680    #[cfg(feature = "std")]
1681    #[test]
1682    fn find_topic_times_out_when_unknown() {
1683        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1684        let err = p
1685            .find_topic("NotExists", core::time::Duration::from_millis(80))
1686            .unwrap_err();
1687        assert!(matches!(err, DdsError::Timeout));
1688    }
1689
1690    #[cfg(feature = "std")]
1691    #[test]
1692    fn find_topic_rejects_empty_name() {
1693        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1694        let err = p
1695            .find_topic("", core::time::Duration::from_millis(10))
1696            .unwrap_err();
1697        assert!(matches!(err, DdsError::BadParameter { .. }));
1698    }
1699
1700    #[test]
1701    fn create_contentfilteredtopic_rejects_empty_name() {
1702        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1703        let topic = p
1704            .create_topic::<RawBytes>("Base", TopicQos::default())
1705            .unwrap();
1706        let err = p
1707            .create_contentfilteredtopic("", &topic, "x > 0", alloc::vec::Vec::new())
1708            .unwrap_err();
1709        assert!(matches!(err, DdsError::BadParameter { .. }));
1710    }
1711
1712    #[test]
1713    fn create_contentfilteredtopic_rejects_empty_expression() {
1714        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1715        let topic = p
1716            .create_topic::<RawBytes>("Base", TopicQos::default())
1717            .unwrap();
1718        let err = p
1719            .create_contentfilteredtopic("CF", &topic, "", alloc::vec::Vec::new())
1720            .unwrap_err();
1721        assert!(matches!(err, DdsError::BadParameter { .. }));
1722    }
1723
1724    #[test]
1725    fn delete_contentfilteredtopic_accepts_own() {
1726        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1727        let topic = p
1728            .create_topic::<RawBytes>("Base", TopicQos::default())
1729            .unwrap();
1730        let cft = p
1731            .create_contentfilteredtopic("CF", &topic, "x > 0", alloc::vec::Vec::new())
1732            .unwrap();
1733        p.delete_contentfilteredtopic(&cft).unwrap();
1734    }
1735
1736    #[cfg(feature = "std")]
1737    #[test]
1738    fn find_topic_resolves_via_sedp_subscription() {
1739        // Variante des Discovery-Hooks: dieses Mal injizieren wir
1740        // eine Subscription (Reader-Side-Discovery), nicht eine
1741        // Publication. find_topic muss beides finden.
1742        use crate::factory::DomainParticipantFactory;
1743        use core::time::Duration as CoreDur;
1744        use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
1745        use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
1746        use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
1747
1748        let p = DomainParticipantFactory::instance()
1749            .create_participant_with_config(
1750                43,
1751                DomainParticipantQos::default(),
1752                crate::runtime::RuntimeConfig::default(),
1753            )
1754            .expect("runtime start");
1755
1756        let target_topic = "DiscoveredViaSubSedp";
1757        if let Some(rt) = p.runtime() {
1758            if let Ok(mut sedp) = rt.sedp.lock() {
1759                let prefix = GuidPrefix::from_bytes([0xCD; 12]);
1760                let subdata = SubscriptionBuiltinTopicData {
1761                    key: Guid::new(prefix, EntityId::user_reader_with_key([4, 5, 6])),
1762                    participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
1763                    topic_name: target_topic.into(),
1764                    type_name: "test::SubT".into(),
1765                    durability: DurabilityKind::Volatile,
1766                    reliability: ReliabilityQos {
1767                        kind: ReliabilityKind::Reliable,
1768                        max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
1769                    },
1770                    ownership: zerodds_qos::OwnershipKind::Shared,
1771                    liveliness: zerodds_qos::LivelinessQosPolicy::default(),
1772                    deadline: zerodds_qos::DeadlineQosPolicy::default(),
1773                    partition: alloc::vec::Vec::new(),
1774                    user_data: alloc::vec::Vec::new(),
1775                    topic_data: alloc::vec::Vec::new(),
1776                    group_data: alloc::vec::Vec::new(),
1777                    type_information: None,
1778                    data_representation: alloc::vec::Vec::new(),
1779                    content_filter: None,
1780                    security_info: None,
1781                    service_instance_name: None,
1782                    related_entity_guid: None,
1783                    topic_aliases: None,
1784                    type_identifier: zerodds_types::TypeIdentifier::None,
1785                };
1786                sedp.cache_mut().insert_subscription(subdata, CoreDur::ZERO);
1787            }
1788        }
1789
1790        let h = p
1791            .find_topic(target_topic, CoreDur::from_millis(200))
1792            .expect("find via subscription");
1793        use crate::topic::TopicDescription as _;
1794        assert_eq!(h.get_name(), target_topic);
1795        assert_eq!(h.get_type_name(), "test::SubT");
1796    }
1797
1798    #[cfg(feature = "std")]
1799    #[test]
1800    fn find_topic_resolves_after_sedp_publication() {
1801        // Spec §2.2.2.2.1.11: find_topic muss zurueckkehren, sobald
1802        // ein Topic via Discovery sichtbar ist. Wir starten einen
1803        // Live-Participant (mit echter Runtime) und injizieren eine
1804        // Publication direkt in den SEDP-Cache, um den
1805        // Discovery-Hook zu verifizieren ohne abhaengig zu sein vom
1806        // UDP-Roundtrip.
1807        use crate::factory::DomainParticipantFactory;
1808        use core::time::Duration as CoreDur;
1809        use zerodds_rtps::publication_data::{
1810            DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
1811        };
1812        use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
1813
1814        let p = DomainParticipantFactory::instance()
1815            .create_participant_with_config(
1816                42,
1817                DomainParticipantQos::default(),
1818                crate::runtime::RuntimeConfig::default(),
1819            )
1820            .expect("runtime start");
1821
1822        let target_topic = "DiscoveredViaSedp";
1823        let target_type = "test::Discovered";
1824
1825        // Spawn ein Worker, der nach kurzem Delay eine Publication
1826        // in den SEDP-Cache injiziert.
1827        let p_inject = p.clone();
1828        let topic_name = String::from(target_topic);
1829        let type_name = String::from(target_type);
1830        let join = std::thread::spawn(move || {
1831            std::thread::sleep(CoreDur::from_millis(50));
1832            if let Some(rt) = p_inject.runtime() {
1833                if let Ok(mut sedp) = rt.sedp.lock() {
1834                    let prefix = GuidPrefix::from_bytes([0xAB; 12]);
1835                    let pubdata = PublicationBuiltinTopicData {
1836                        key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
1837                        participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
1838                        topic_name,
1839                        type_name,
1840                        durability: DurabilityKind::Volatile,
1841                        reliability: ReliabilityQos {
1842                            kind: ReliabilityKind::Reliable,
1843                            max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(
1844                                1,
1845                            ),
1846                        },
1847                        ownership: zerodds_qos::OwnershipKind::Shared,
1848                        ownership_strength: 0,
1849                        liveliness: zerodds_qos::LivelinessQosPolicy::default(),
1850                        deadline: zerodds_qos::DeadlineQosPolicy::default(),
1851                        lifespan: zerodds_qos::LifespanQosPolicy::default(),
1852                        partition: alloc::vec::Vec::new(),
1853                        user_data: alloc::vec::Vec::new(),
1854                        topic_data: alloc::vec::Vec::new(),
1855                        group_data: alloc::vec::Vec::new(),
1856                        type_information: None,
1857                        data_representation: alloc::vec::Vec::new(),
1858                        security_info: None,
1859                        service_instance_name: None,
1860                        related_entity_guid: None,
1861                        topic_aliases: None,
1862                        type_identifier: zerodds_types::TypeIdentifier::None,
1863                    };
1864                    sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
1865                }
1866            }
1867        });
1868
1869        let result = p.find_topic(target_topic, CoreDur::from_secs(2));
1870        join.join().expect("inject thread");
1871        let h = result.expect("find_topic should resolve via SEDP");
1872        use crate::topic::TopicDescription as _;
1873        assert_eq!(h.get_name(), target_topic);
1874        assert_eq!(h.get_type_name(), target_type);
1875    }
1876
1877    // ============================================================
1878    // ignore_* / delete_contained_entities / get_discovered_*
1879    // ============================================================
1880
1881    #[test]
1882    fn ignore_participant_records_handle() {
1883        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1884        let h = InstanceHandle::from_raw(0xAA);
1885        assert!(!p.is_participant_ignored(h));
1886        p.ignore_participant(h).unwrap();
1887        assert!(p.is_participant_ignored(h));
1888    }
1889
1890    #[test]
1891    fn ignore_topic_records_handle() {
1892        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1893        let h = InstanceHandle::from_raw(0xBB);
1894        assert!(!p.is_topic_ignored(h));
1895        p.ignore_topic(h).unwrap();
1896        assert!(p.is_topic_ignored(h));
1897    }
1898
1899    #[test]
1900    fn ignore_publication_records_handle() {
1901        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1902        let h = InstanceHandle::from_raw(0xCC);
1903        assert!(!p.is_publication_ignored(h));
1904        p.ignore_publication(h).unwrap();
1905        assert!(p.is_publication_ignored(h));
1906    }
1907
1908    #[test]
1909    fn ignore_subscription_records_handle() {
1910        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1911        let h = InstanceHandle::from_raw(0xDD);
1912        assert!(!p.is_subscription_ignored(h));
1913        p.ignore_subscription(h).unwrap();
1914        assert!(p.is_subscription_ignored(h));
1915    }
1916
1917    #[test]
1918    fn ignore_lists_are_independent() {
1919        // Spec §2.2.2.2.1.14-17: jede ignore_*-Liste lebt fuer sich,
1920        // ein Handle in der Topic-Liste taucht nicht in der
1921        // Participant-Liste auf.
1922        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1923        let h = InstanceHandle::from_raw(0xEE);
1924        p.ignore_topic(h).unwrap();
1925        assert!(p.is_topic_ignored(h));
1926        assert!(!p.is_participant_ignored(h));
1927        assert!(!p.is_publication_ignored(h));
1928        assert!(!p.is_subscription_ignored(h));
1929    }
1930
1931    #[test]
1932    fn ignore_is_monotonic_and_idempotent() {
1933        // Doppeltes ignore_participant darf nicht in einen Fehler
1934        // umschlagen, und der Filter-State darf sich nicht "umkehren".
1935        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1936        let h = InstanceHandle::from_raw(0x42);
1937        p.ignore_participant(h).unwrap();
1938        p.ignore_participant(h).unwrap();
1939        assert!(p.is_participant_ignored(h));
1940    }
1941
1942    #[test]
1943    fn delete_contained_entities_clears_topics_and_groups() {
1944        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1945        let _t = p
1946            .create_topic::<RawBytes>("ToBeRemoved", TopicQos::default())
1947            .unwrap();
1948        let _pub_ = p.create_publisher(PublisherQos::default());
1949        let _sub_ = p.create_subscriber(SubscriberQos::default());
1950        assert_eq!(p.topics_len(), 1);
1951        assert_eq!(p.publishers_len(), 1);
1952        assert_eq!(p.subscribers_len(), 1);
1953        p.delete_contained_entities().unwrap();
1954        assert_eq!(p.topics_len(), 0);
1955        assert_eq!(p.publishers_len(), 0);
1956        assert_eq!(p.subscribers_len(), 0);
1957    }
1958
1959    #[test]
1960    fn delete_contained_entities_clears_builtin_reader_inboxes() {
1961        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1962        // Per Hand einen Builtin-Sample injizieren, damit wir nach
1963        // dem clear gegen 0 vergleichen koennen.
1964        use crate::builtin_topics::ParticipantBuiltinTopicData as DcpsP;
1965        use zerodds_rtps::wire_types::Guid;
1966        let bs = p.get_builtin_subscriber();
1967        bs.sinks()
1968            .push_participant(&DcpsP {
1969                key: Guid::from_bytes([7u8; 16]),
1970                user_data: alloc::vec::Vec::new(),
1971            })
1972            .unwrap();
1973        let r = bs.participant_reader();
1974        assert_eq!(r.read().unwrap().len(), 1);
1975        p.delete_contained_entities().unwrap();
1976        assert_eq!(r.read().unwrap().len(), 0);
1977    }
1978
1979    #[cfg(feature = "std")]
1980    #[test]
1981    fn get_discovered_participants_offline_is_empty() {
1982        // Ohne Runtime liefert get_discovered_participants ein leeres
1983        // Vec — Spec §2.2.2.2.1.27 erlaubt das.
1984        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1985        assert!(p.get_discovered_participants().is_empty());
1986    }
1987
1988    #[cfg(feature = "std")]
1989    #[test]
1990    fn get_discovered_participant_data_offline_errors() {
1991        let p = DomainParticipant::new(0, DomainParticipantQos::default());
1992        let err = p
1993            .get_discovered_participant_data(InstanceHandle::from_raw(1))
1994            .unwrap_err();
1995        assert!(matches!(err, DdsError::BadParameter { .. }));
1996    }
1997
1998    #[cfg(feature = "std")]
1999    #[test]
2000    fn get_discovered_topics_offline_is_empty() {
2001        let p = DomainParticipant::new(0, DomainParticipantQos::default());
2002        assert!(p.get_discovered_topics().is_empty());
2003    }
2004
2005    #[cfg(feature = "std")]
2006    #[test]
2007    fn get_discovered_topic_data_offline_errors() {
2008        let p = DomainParticipant::new(0, DomainParticipantQos::default());
2009        let err = p
2010            .get_discovered_topic_data(InstanceHandle::from_raw(1))
2011            .unwrap_err();
2012        assert!(matches!(err, DdsError::BadParameter { .. }));
2013    }
2014
2015    #[cfg(feature = "std")]
2016    #[test]
2017    fn get_discovered_participants_lists_after_spdp_inject() {
2018        // End-to-End: live Participant + ein synth. SPDP-Beacon eines
2019        // remote-Participants → get_discovered_participants liefert
2020        // genau ein Handle, get_discovered_participant_data findet die
2021        // Wire-Daten dazu.
2022        use crate::factory::DomainParticipantFactory;
2023        let p = DomainParticipantFactory::instance()
2024            .create_participant_with_config(
2025                30,
2026                DomainParticipantQos::default(),
2027                crate::runtime::RuntimeConfig::default(),
2028            )
2029            .expect("rt start");
2030
2031        // Direkt in den Discovered-Cache injizieren ueber den
2032        // handle_spdp_datagram-Pfad. Wir bauen ein synthetisches
2033        // Beacon mit dem gleichen Helper wie die Runtime-Tests.
2034        use zerodds_rtps::participant_data::ParticipantBuiltinTopicData as WirePart;
2035        use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, ProtocolVersion, VendorId};
2036        let remote = GuidPrefix::from_bytes([0xCA; 12]);
2037        let wire = WirePart {
2038            guid: Guid::new(remote, EntityId::PARTICIPANT),
2039            protocol_version: ProtocolVersion::V2_5,
2040            vendor_id: VendorId::ZERODDS,
2041            default_unicast_locator: None,
2042            default_multicast_locator: None,
2043            metatraffic_unicast_locator: None,
2044            metatraffic_multicast_locator: None,
2045            domain_id: Some(30),
2046            builtin_endpoint_set: 0,
2047            lease_duration: zerodds_rtps::participant_data::Duration::from_secs(100),
2048            user_data: alloc::vec::Vec::new(),
2049            properties: Default::default(),
2050            identity_token: None,
2051            permissions_token: None,
2052            identity_status_token: None,
2053            sig_algo_info: None,
2054            kx_algo_info: None,
2055            sym_cipher_algo_info: None,
2056        };
2057        let beacon = zerodds_discovery::spdp::SpdpBeacon::new(wire.clone())
2058            .serialize()
2059            .expect("serialize");
2060        if let Some(rt) = p.runtime() {
2061            crate::runtime::handle_spdp_datagram_for_test(rt, &beacon);
2062        }
2063
2064        let handles = p.get_discovered_participants();
2065        assert_eq!(handles.len(), 1);
2066        let data = p
2067            .get_discovered_participant_data(handles[0])
2068            .expect("data lookup");
2069        assert_eq!(data.key, wire.guid);
2070        // Ignorieren → leere Liste.
2071        p.ignore_participant(handles[0]).unwrap();
2072        assert!(p.get_discovered_participants().is_empty());
2073        let err = p.get_discovered_participant_data(handles[0]).unwrap_err();
2074        assert!(matches!(err, DdsError::BadParameter { .. }));
2075    }
2076
2077    #[cfg(feature = "std")]
2078    #[test]
2079    fn get_discovered_topics_lists_unique_handles_for_pub_and_sub() {
2080        // Pub + Sub auf demselben (topic, type) → ein Topic-Handle.
2081        use crate::factory::DomainParticipantFactory;
2082        use core::time::Duration as CoreDur;
2083        use zerodds_rtps::publication_data::{
2084            DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
2085        };
2086        use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
2087        use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
2088
2089        let p = DomainParticipantFactory::instance()
2090            .create_participant_with_config(
2091                21,
2092                DomainParticipantQos::default(),
2093                crate::runtime::RuntimeConfig::default(),
2094            )
2095            .expect("rt start");
2096        if let Some(rt) = p.runtime() {
2097            if let Ok(mut sedp) = rt.sedp.lock() {
2098                let prefix = GuidPrefix::from_bytes([0x77; 12]);
2099                let pubdata = PublicationBuiltinTopicData {
2100                    key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
2101                    participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2102                    topic_name: "SharedTopic".into(),
2103                    type_name: "SharedType".into(),
2104                    durability: DurabilityKind::Volatile,
2105                    reliability: ReliabilityQos {
2106                        kind: ReliabilityKind::Reliable,
2107                        max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2108                    },
2109                    ownership: zerodds_qos::OwnershipKind::Shared,
2110                    ownership_strength: 0,
2111                    liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2112                    deadline: zerodds_qos::DeadlineQosPolicy::default(),
2113                    lifespan: zerodds_qos::LifespanQosPolicy::default(),
2114                    partition: alloc::vec::Vec::new(),
2115                    user_data: alloc::vec::Vec::new(),
2116                    topic_data: alloc::vec::Vec::new(),
2117                    group_data: alloc::vec::Vec::new(),
2118                    type_information: None,
2119                    data_representation: alloc::vec::Vec::new(),
2120                    security_info: None,
2121                    service_instance_name: None,
2122                    related_entity_guid: None,
2123                    topic_aliases: None,
2124                    type_identifier: zerodds_types::TypeIdentifier::None,
2125                };
2126                let subdata = SubscriptionBuiltinTopicData {
2127                    key: Guid::new(prefix, EntityId::user_reader_with_key([4, 5, 6])),
2128                    participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2129                    topic_name: "SharedTopic".into(),
2130                    type_name: "SharedType".into(),
2131                    durability: DurabilityKind::Volatile,
2132                    reliability: ReliabilityQos {
2133                        kind: ReliabilityKind::Reliable,
2134                        max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2135                    },
2136                    ownership: zerodds_qos::OwnershipKind::Shared,
2137                    liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2138                    deadline: zerodds_qos::DeadlineQosPolicy::default(),
2139                    partition: alloc::vec::Vec::new(),
2140                    user_data: alloc::vec::Vec::new(),
2141                    topic_data: alloc::vec::Vec::new(),
2142                    group_data: alloc::vec::Vec::new(),
2143                    type_information: None,
2144                    data_representation: alloc::vec::Vec::new(),
2145                    content_filter: None,
2146                    security_info: None,
2147                    service_instance_name: None,
2148                    related_entity_guid: None,
2149                    topic_aliases: None,
2150                    type_identifier: zerodds_types::TypeIdentifier::None,
2151                };
2152                sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
2153                sedp.cache_mut().insert_subscription(subdata, CoreDur::ZERO);
2154            }
2155        }
2156        let topics = p.get_discovered_topics();
2157        assert_eq!(topics.len(), 1, "Pub+Sub auf gleichem Topic → 1 Handle");
2158        let data = p.get_discovered_topic_data(topics[0]).expect("topic data");
2159        assert_eq!(data.name, "SharedTopic");
2160        assert_eq!(data.type_name, "SharedType");
2161    }
2162
2163    #[cfg(feature = "std")]
2164    #[test]
2165    fn get_discovered_topic_data_filters_ignored() {
2166        use crate::factory::DomainParticipantFactory;
2167        use core::time::Duration as CoreDur;
2168        use zerodds_rtps::publication_data::{
2169            DurabilityKind, PublicationBuiltinTopicData, ReliabilityKind, ReliabilityQos,
2170        };
2171        use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix};
2172
2173        let p = DomainParticipantFactory::instance()
2174            .create_participant_with_config(
2175                22,
2176                DomainParticipantQos::default(),
2177                crate::runtime::RuntimeConfig::default(),
2178            )
2179            .expect("rt start");
2180        if let Some(rt) = p.runtime() {
2181            if let Ok(mut sedp) = rt.sedp.lock() {
2182                let prefix = GuidPrefix::from_bytes([0x55; 12]);
2183                let pubdata = PublicationBuiltinTopicData {
2184                    key: Guid::new(prefix, EntityId::user_writer_with_key([1, 2, 3])),
2185                    participant_key: Guid::new(prefix, EntityId::PARTICIPANT),
2186                    topic_name: "ToIgnore".into(),
2187                    type_name: "T".into(),
2188                    durability: DurabilityKind::Volatile,
2189                    reliability: ReliabilityQos {
2190                        kind: ReliabilityKind::Reliable,
2191                        max_blocking_time: zerodds_rtps::participant_data::Duration::from_secs(1),
2192                    },
2193                    ownership: zerodds_qos::OwnershipKind::Shared,
2194                    ownership_strength: 0,
2195                    liveliness: zerodds_qos::LivelinessQosPolicy::default(),
2196                    deadline: zerodds_qos::DeadlineQosPolicy::default(),
2197                    lifespan: zerodds_qos::LifespanQosPolicy::default(),
2198                    partition: alloc::vec::Vec::new(),
2199                    user_data: alloc::vec::Vec::new(),
2200                    topic_data: alloc::vec::Vec::new(),
2201                    group_data: alloc::vec::Vec::new(),
2202                    type_information: None,
2203                    data_representation: alloc::vec::Vec::new(),
2204                    security_info: None,
2205                    service_instance_name: None,
2206                    related_entity_guid: None,
2207                    topic_aliases: None,
2208                    type_identifier: zerodds_types::TypeIdentifier::None,
2209                };
2210                sedp.cache_mut().insert_publication(pubdata, CoreDur::ZERO);
2211            }
2212        }
2213        let topics_before = p.get_discovered_topics();
2214        assert_eq!(topics_before.len(), 1);
2215        // Jetzt das Topic ignorieren — get_discovered_topics darf es
2216        // nicht mehr listen, get_discovered_topic_data muss
2217        // BadParameter liefern.
2218        p.ignore_topic(topics_before[0]).unwrap();
2219        assert!(p.get_discovered_topics().is_empty());
2220        let err = p.get_discovered_topic_data(topics_before[0]).unwrap_err();
2221        assert!(matches!(err, DdsError::BadParameter { .. }));
2222    }
2223
2224    #[test]
2225    fn delete_contentfilteredtopic_rejects_foreign() {
2226        let p1 = DomainParticipant::new(0, DomainParticipantQos::default());
2227        let p2 = DomainParticipant::new(1, DomainParticipantQos::default());
2228        let topic = p1
2229            .create_topic::<RawBytes>("Base", TopicQos::default())
2230            .unwrap();
2231        let cft = p1
2232            .create_contentfilteredtopic("CF", &topic, "x > 0", alloc::vec::Vec::new())
2233            .unwrap();
2234        let err = p2.delete_contentfilteredtopic(&cft).unwrap_err();
2235        assert!(matches!(err, DdsError::BadParameter { .. }));
2236    }
2237}