Skip to main content

zerodds_dcps/
topic.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Topic — der typed rendezvous-point zwischen DataWriter und DataReader.
4//!
5//! Spec-Referenz:
6//! - OMG DDS 1.4 §2.2.2.3.1 `TopicDescription` (Base-Klasse fuer
7//!   `Topic`, `ContentFilteredTopic`, `MultiTopic`),
8//! - §2.2.2.3.2 `Topic` (concrete TopicDescription mit Type +
9//!   Topic-QoS),
10//! - §2.2.2.2.1.12 `lookup_topicdescription` (untypisiertes Lookup),
11//! - §2.2.2.2.1.13 `create_contentfilteredtopic`.
12//!
13//! In v1.2 sind wir schlicht: der `Topic<T>` ist ein Handle, das
14//! name + type_name traegt und einen generischen `PhantomData<T>`
15//! fuer statische Typ-Sicherheit. Der Topic referenziert seinen
16//! `DomainParticipant`, damit `TopicDescription::get_participant`
17//! Spec-treu funktioniert. Der Cycle-Bruch sitzt in der
18//! Topic-Registry: die haelt nur `Arc<TopicInner>`, **nicht** den
19//! geklonten `DomainParticipant`-Handle.
20
21extern crate alloc;
22use alloc::string::{String, ToString};
23use alloc::sync::Arc;
24use alloc::vec::Vec;
25use core::marker::PhantomData;
26
27#[cfg(feature = "std")]
28use std::sync::RwLock;
29
30use zerodds_sql_filter::{Expr, RowAccess, Value};
31
32use crate::dds_type::DdsType;
33use crate::entity::StatusMask;
34use crate::error::{DdsError, Result};
35use crate::listener::ArcTopicListener;
36use crate::participant::DomainParticipant;
37use crate::qos::TopicQos;
38
39/// `TopicDescription`-Trait — Base-Interface fuer alles, woraus
40/// ein `DataReader` (typed via `T`) Samples beziehen kann.
41///
42/// Spec-Referenz: OMG DDS 1.4 §2.2.2.3.1 "TopicDescription is the
43/// most abstract description of a topic. It encapsulates the
44/// information that is common to all the kinds of topic that can be
45/// used: `Topic`, `ContentFilteredTopic`, `MultiTopic`."
46///
47/// Der Trait ist **object-safe** — wir wollen ihn als
48/// `&dyn TopicDescription` aus `lookup_topicdescription` und
49/// `find_topic` zurueckgeben koennen.
50pub trait TopicDescription {
51    /// Type-Name (z.B. `"std_msgs::msg::String"`).
52    /// Spec §2.2.2.3.1 `get_type_name`.
53    fn get_type_name(&self) -> &str;
54    /// Topic-Name (z.B. `"ChatterTopic"`).
55    /// Spec §2.2.2.3.1 `get_name`.
56    fn get_name(&self) -> &str;
57    /// Owning Participant.
58    /// Spec §2.2.2.3.1 `get_participant`.
59    fn get_participant(&self) -> &DomainParticipant;
60}
61
62/// Typed Topic-Handle.
63#[derive(Debug)]
64pub struct Topic<T: DdsType> {
65    inner: Arc<TopicInner>,
66    /// Owning Participant (kein Cycle: der Participant haelt im
67    /// Topic-Registry nur den Inner, nicht den Handle). `None` bei
68    /// Builtin-Topics, die ohne Participant konstruiert werden
69    /// (siehe [`Self::new_orphan`]).
70    participant: Option<DomainParticipant>,
71    _t: PhantomData<T>,
72}
73
74/// Inner-State eines Topics (shared via Arc, weil Handles geklont werden).
75pub(crate) struct TopicInner {
76    /// Topic-Name (z.B. "ChatterTopic").
77    pub name: String,
78    /// Type-Name aus `T::TYPE_NAME` (eager copied, damit der Inner
79    /// kein Generic-Typ hat — das vereinfacht die Participant-
80    /// Registry-Datenstruktur in .2a).
81    pub type_name: &'static str,
82    /// Topic-QoS — mutable via `Entity::set_qos`.
83    #[cfg(feature = "std")]
84    pub qos: std::sync::Mutex<TopicQos>,
85    #[cfg(not(feature = "std"))]
86    pub qos: TopicQos,
87    /// Entity-Lifecycle (DCPS §2.2.2.1).
88    pub entity_state: Arc<crate::entity::EntityState>,
89    /// optionaler `TopicListener` + StatusMask. Spec
90    /// §2.2.2.3.2.x set_listener / Bubble-Up §2.2.4.2.3.
91    #[cfg(feature = "std")]
92    pub listener: std::sync::Mutex<Option<(ArcTopicListener, StatusMask)>>,
93    /// Counter fuer InconsistentTopic-Detections (Spec §2.2.4.2.5).
94    /// Inkrementiert in `record_inconsistent_topic`, ausgelesen in
95    /// `inconsistent_topic_status` mit Delta-Detection.
96    #[cfg(feature = "std")]
97    pub inconsistent_topic_count: std::sync::atomic::AtomicI64,
98    /// zuletzt gesehener Wert fuer Delta-Trigger.
99    #[cfg(feature = "std")]
100    pub last_inconsistent_topic: std::sync::atomic::AtomicI64,
101}
102
103impl core::fmt::Debug for TopicInner {
104    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
105        #[cfg(feature = "std")]
106        let listener_present = self.listener.lock().map(|s| s.is_some()).unwrap_or(false);
107        #[cfg(not(feature = "std"))]
108        let listener_present = false;
109        f.debug_struct("TopicInner")
110            .field("name", &self.name)
111            .field("type_name", &self.type_name)
112            .field("listener_present", &listener_present)
113            .finish_non_exhaustive()
114    }
115}
116
117impl<T: DdsType> Topic<T> {
118    /// Erzeugt einen neuen Topic-Handle. Wird normalerweise vom
119    /// `DomainParticipant::create_topic<T>(name, qos)` aufgerufen.
120    #[must_use]
121    pub fn new(name: String, qos: TopicQos, participant: DomainParticipant) -> Self {
122        Self {
123            inner: Arc::new(TopicInner {
124                name,
125                type_name: T::TYPE_NAME,
126                #[cfg(feature = "std")]
127                qos: std::sync::Mutex::new(qos),
128                #[cfg(not(feature = "std"))]
129                qos,
130                entity_state: crate::entity::EntityState::new(),
131                #[cfg(feature = "std")]
132                listener: std::sync::Mutex::new(None),
133                #[cfg(feature = "std")]
134                inconsistent_topic_count: std::sync::atomic::AtomicI64::new(0),
135                #[cfg(feature = "std")]
136                last_inconsistent_topic: std::sync::atomic::AtomicI64::new(-1),
137            }),
138            participant: Some(participant),
139            _t: PhantomData,
140        }
141    }
142
143    /// Erzeugt einen Topic-Handle **ohne** Participant — fuer Builtin-
144    /// Topics (DCPSParticipant/Topic/Publication/Subscription), die im
145    /// BuiltinSubscriber-Konstruktor entstehen, bevor der DomainParticipant
146    /// fertig ist (Henne-Ei-Problem). `get_participant()` panickt
147    /// auf einem orphan-Topic — Builtin-Reader umgehen das.
148    #[must_use]
149    pub fn new_orphan(name: String, qos: TopicQos) -> Self {
150        Self {
151            inner: Arc::new(TopicInner {
152                name,
153                type_name: T::TYPE_NAME,
154                #[cfg(feature = "std")]
155                qos: std::sync::Mutex::new(qos),
156                #[cfg(not(feature = "std"))]
157                qos,
158                entity_state: crate::entity::EntityState::new(),
159                #[cfg(feature = "std")]
160                listener: std::sync::Mutex::new(None),
161                #[cfg(feature = "std")]
162                inconsistent_topic_count: std::sync::atomic::AtomicI64::new(0),
163                #[cfg(feature = "std")]
164                last_inconsistent_topic: std::sync::atomic::AtomicI64::new(-1),
165            }),
166            participant: None,
167            _t: PhantomData,
168        }
169    }
170
171    /// Topic-Name.
172    #[must_use]
173    pub fn name(&self) -> &str {
174        &self.inner.name
175    }
176
177    /// Type-Name (aus `T::TYPE_NAME`).
178    #[must_use]
179    pub fn type_name(&self) -> &'static str {
180        self.inner.type_name
181    }
182
183    /// setzt den `TopicListener` + StatusMask. `None` loescht
184    /// den Slot. Spec §2.2.2.3.2.x set_listener.
185    #[cfg(feature = "std")]
186    pub fn set_listener(&self, listener: Option<ArcTopicListener>, mask: StatusMask) {
187        if let Ok(mut slot) = self.inner.listener.lock() {
188            *slot = listener.map(|l| (l, mask));
189        }
190        self.inner.entity_state.set_listener_mask(mask);
191    }
192
193    /// aktueller Listener-Klon, sofern vorhanden.
194    #[cfg(feature = "std")]
195    #[must_use]
196    pub fn get_listener(&self) -> Option<ArcTopicListener> {
197        self.inner
198            .listener
199            .lock()
200            .ok()
201            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
202    }
203
204    /// Snapshot der Bubble-Up-Kette (Topic → Participant) — fuer
205    /// Hot-Path-Listener-Dispatch (z.B. on_inconsistent_topic).
206    ///
207    /// Aktuell hat der Hot-Path keinen Inconsistent-Topic-Trigger
208    /// (dazu braucht es Cross-Vendor Type-Mismatch-Detection in WP
209    /// 2.x); die Snapshot-API existiert bereits, damit sie spaeter
210    /// ohne API-Change verdrahtet werden kann.
211    #[cfg(feature = "std")]
212    #[must_use]
213    pub(crate) fn listener_chain(&self) -> crate::listener_dispatch::TopicListenerChain {
214        let topic = self
215            .inner
216            .listener
217            .lock()
218            .ok()
219            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
220        let participant = self
221            .participant
222            .as_ref()
223            .and_then(|p| p.snapshot_listener());
224        crate::listener_dispatch::TopicListenerChain { topic, participant }
225    }
226
227    /// Recordet eine InconsistentTopic-Detection (z.B. wenn
228    /// ein remote Topic mit demselben Namen, aber anderem Type-Name
229    /// entdeckt wurde, oder wenn `create_topic` mit gleichem Namen aber
230    /// unterschiedlichem Type aufgerufen wird). Spec §2.2.4.2.5.
231    #[cfg(feature = "std")]
232    pub fn record_inconsistent_topic(&self) {
233        self.inner
234            .inconsistent_topic_count
235            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
236    }
237
238    /// aktueller `InconsistentTopicStatus` + Trigger via
239    /// Bubble-Up bei Delta gegenueber dem letzten Aufruf.
240    #[cfg(feature = "std")]
241    #[must_use]
242    pub fn inconsistent_topic_status(&self) -> crate::status::InconsistentTopicStatus {
243        let curr = self
244            .inner
245            .inconsistent_topic_count
246            .load(std::sync::atomic::Ordering::Acquire);
247        let prev = self
248            .inner
249            .last_inconsistent_topic
250            .swap(curr, std::sync::atomic::Ordering::AcqRel);
251        let delta = if prev < 0 { curr } else { curr - prev };
252        let status = crate::status::InconsistentTopicStatus {
253            total_count: curr as i32,
254            total_count_change: delta as i32,
255        };
256        // Trigger Listener nur bei tatsaechlichem Delta — sonst wird der
257        // Listener bei jedem Status-Read mehrfach gefeuert (idempotent).
258        let actually_changed = if prev < 0 { curr != 0 } else { prev != curr };
259        if actually_changed {
260            let chain = self.listener_chain();
261            crate::listener_dispatch::dispatch_inconsistent_topic(
262                &chain,
263                self.inner.entity_state.instance_handle(),
264                status,
265            );
266        }
267        status
268    }
269
270    /// Topic-QoS (cloned).  Inner-Mutex erlaubt set_qos.
271    #[must_use]
272    pub fn qos(&self) -> TopicQos {
273        #[cfg(feature = "std")]
274        {
275            self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
276        }
277        #[cfg(not(feature = "std"))]
278        {
279            self.inner.qos.clone()
280        }
281    }
282
283    /// Interner Zugriff auf den shared state (fuer Publisher/Subscriber).
284    #[allow(dead_code)]
285    pub(crate) fn inner(&self) -> Arc<TopicInner> {
286        Arc::clone(&self.inner)
287    }
288
289    /// Interner Konstruktor fuer shared-handle-Rehydrierung in der
290    /// Participant-Topic-Registry (`create_topic` zweite Runde
291    /// mit gleichem Namen + Typ).
292    pub(crate) fn _from_inner_impl(inner: Arc<TopicInner>, participant: DomainParticipant) -> Self {
293        Self {
294            inner,
295            participant: Some(participant),
296            _t: PhantomData,
297        }
298    }
299}
300
301impl<T: DdsType> Clone for Topic<T> {
302    fn clone(&self) -> Self {
303        Self {
304            inner: Arc::clone(&self.inner),
305            participant: self.participant.clone(),
306            _t: PhantomData,
307        }
308    }
309}
310
311impl<T: DdsType> TopicDescription for Topic<T> {
312    fn get_type_name(&self) -> &str {
313        self.inner.type_name
314    }
315    fn get_name(&self) -> &str {
316        &self.inner.name
317    }
318    /// Liefert den Owning-Participant. Panickt bei Builtin-Topics
319    /// (orphan), die ueber [`Topic::new_orphan`] konstruiert wurden —
320    /// Builtin-Reader rufen `get_participant()` nicht.
321    #[allow(clippy::expect_used, clippy::panic)]
322    fn get_participant(&self) -> &DomainParticipant {
323        match &self.participant {
324            Some(p) => p,
325            None => panic!(
326                "get_participant on orphan (builtin) topic — builtin readers must not call this"
327            ),
328        }
329    }
330}
331
332// ============================================================================
333// Entity-Trait (DCPS §2.2.2.1) —
334// ============================================================================
335
336#[cfg(feature = "std")]
337impl<T: DdsType> crate::entity::Entity for Topic<T> {
338    type Qos = TopicQos;
339
340    fn get_qos(&self) -> Self::Qos {
341        self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
342    }
343
344    /// TopicQos. Spec §2.2.3: alle Topic-Policies (TOPIC_DATA, DURABILITY,
345    /// RELIABILITY, HISTORY, RESOURCE_LIMITS, ...) sind nach `enable()`
346    /// **immutable** — nur TOPIC_DATA ist Changeable=YES.
347    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
348        let enabled = self.inner.entity_state.is_enabled();
349        if let Ok(mut current) = self.inner.qos.lock() {
350            if enabled {
351                // Spec §2.2.3: DURABILITY und RELIABILITY sind
352                // Changeable=NO post-enable.
353                if current.durability != qos.durability {
354                    return Err(crate::entity::immutable_if_enabled("DURABILITY"));
355                }
356                if current.reliability != qos.reliability {
357                    return Err(crate::entity::immutable_if_enabled("RELIABILITY"));
358                }
359            }
360            *current = qos;
361        }
362        Ok(())
363    }
364
365    fn enable(&self) -> Result<()> {
366        self.inner.entity_state.enable();
367        Ok(())
368    }
369
370    fn entity_state(&self) -> Arc<crate::entity::EntityState> {
371        Arc::clone(&self.inner.entity_state)
372    }
373}
374
375/// Untypisiertes `TopicDescription`-Handle, das aus
376/// `DomainParticipant::lookup_topicdescription` /
377/// `DomainParticipant::find_topic` zurueckgegeben wird.
378///
379/// Die Spec-API liefert die abstrakte Base-Klasse, weil der Caller
380/// im Generalfall den Typ noch nicht kennt (z.B. nach Discovery).
381/// Wir packen Name + Type-Name + Participant in ein clone-bares
382/// Handle; der Caller kann `get_type_name()` lesen und sich dann
383/// per `create_topic::<T>` einen typed Handle holen.
384#[derive(Debug, Clone)]
385pub struct TopicDescriptionHandle {
386    name: String,
387    type_name: String,
388    participant: DomainParticipant,
389}
390
391impl TopicDescriptionHandle {
392    /// Konstruiere ein Handle (intern; v.a. fuer Tests + Lookup-API).
393    pub(crate) fn new(name: String, type_name: String, participant: DomainParticipant) -> Self {
394        Self {
395            name,
396            type_name,
397            participant,
398        }
399    }
400}
401
402impl TopicDescription for TopicDescriptionHandle {
403    fn get_type_name(&self) -> &str {
404        &self.type_name
405    }
406    fn get_name(&self) -> &str {
407        &self.name
408    }
409    fn get_participant(&self) -> &DomainParticipant {
410        &self.participant
411    }
412}
413
414/// `ContentFilteredTopic<T>` — Sub-Topic eines `Topic<T>` mit
415/// Filter-Expression. Spec-Referenz: OMG DDS 1.4 §2.2.2.3.3.
416///
417/// "ContentFilteredTopic is a specialization of TopicDescription that
418/// allows for content-based subscriptions. The selection of the
419/// content is done using a filter_expression with parameters
420/// (filter_parameters)."
421///
422/// Die Filter-Expression ist ein SQL-Subset (DDS-DCPS Annex B, BNF):
423/// `field op literal-or-param`, `AND`/`OR`/`NOT`-Komposition, parens,
424/// `LIKE`. Der konkrete Parser sitzt in `zerodds-sql-filter`.
425///
426/// **Lifecycle:** das CFT haelt einen Klon des Related-Topics (shared
427/// `Arc<TopicInner>`) und ist clone-bar selbst. Filter-Parameter sind
428/// veraenderbar zur Laufzeit (Spec §2.2.2.3.3.7).
429#[derive(Debug)]
430pub struct ContentFilteredTopic<T: DdsType> {
431    name: String,
432    related_topic: Topic<T>,
433    /// Roh-Form der Expression (read-only — Spec ist explizit, dass
434    /// nur die `filter_parameters` veraenderbar sind, nicht die
435    /// Expression selbst).
436    filter_expression: String,
437    /// Vorab-geparste AST. Wir parsen einmal beim Konstruktor, damit
438    /// `evaluate` heiss-pfad ist.
439    parsed: Arc<Expr>,
440    /// Filter-Parameter (`%0`, `%1`, ...). Werden als Strings
441    /// uebergeben (Spec §2.2.2.3.3 — die Parameter sind Strings, die
442    /// in die Expression substituiert werden). Wir tragen sie als
443    /// Strings + parsed `Value` parallel, damit `set_filter_parameters`
444    /// nur einmal die String→Value-Konversion macht.
445    #[cfg(feature = "std")]
446    params: Arc<RwLock<FilterParams>>,
447    #[cfg(not(feature = "std"))]
448    params: FilterParams,
449    participant: DomainParticipant,
450    _t: PhantomData<T>,
451}
452
453#[derive(Debug, Clone)]
454struct FilterParams {
455    raw: Vec<String>,
456    values: Vec<Value>,
457}
458
459impl<T: DdsType> ContentFilteredTopic<T> {
460    /// Konstruktor (intern aufgerufen von
461    /// `DomainParticipant::create_contentfilteredtopic`).
462    ///
463    /// # Errors
464    /// `BadParameter` wenn die Expression nicht parst oder ein
465    /// referenzierter `%N`-Index nicht im `filter_parameters`-Vec
466    /// existiert.
467    pub(crate) fn new(
468        name: String,
469        related_topic: Topic<T>,
470        filter_expression: String,
471        filter_parameters: Vec<String>,
472        participant: DomainParticipant,
473    ) -> Result<Self> {
474        let parsed =
475            zerodds_sql_filter::parse(&filter_expression).map_err(|_| DdsError::BadParameter {
476                what: "filter expression syntax",
477            })?;
478        // Pruefe Parameter-Indices.
479        let used = parsed.collect_param_indices();
480        if let Some(max) = used.iter().max() {
481            if (*max as usize) >= filter_parameters.len() {
482                return Err(DdsError::BadParameter {
483                    what: "filter parameter %N out of range",
484                });
485            }
486        }
487        let values: Vec<Value> = filter_parameters
488            .iter()
489            .map(|s| param_string_to_value(s))
490            .collect();
491        let fp = FilterParams {
492            raw: filter_parameters,
493            values,
494        };
495        Ok(Self {
496            name,
497            related_topic,
498            filter_expression,
499            parsed: Arc::new(parsed),
500            #[cfg(feature = "std")]
501            params: Arc::new(RwLock::new(fp)),
502            #[cfg(not(feature = "std"))]
503            params: fp,
504            participant,
505            _t: PhantomData,
506        })
507    }
508
509    /// Spec §2.2.2.3.3.4 `get_filter_expression`.
510    #[must_use]
511    pub fn get_filter_expression(&self) -> &str {
512        &self.filter_expression
513    }
514
515    /// Spec §2.2.2.3.3.5 `get_filter_parameters`.
516    #[must_use]
517    pub fn get_filter_parameters(&self) -> Vec<String> {
518        #[cfg(feature = "std")]
519        {
520            self.params
521                .read()
522                .map(|p| p.raw.clone())
523                .unwrap_or_default()
524        }
525        #[cfg(not(feature = "std"))]
526        {
527            self.params.raw.clone()
528        }
529    }
530
531    /// Spec §2.2.2.3.3.6 `set_filter_parameters`. Tauscht die
532    /// Parameter (gleiche Anzahl bleibt nicht erzwungen — die Spec
533    /// sagt "should match the number of `%n` tokens" als Empfehlung,
534    /// wir verifizieren das hart).
535    ///
536    /// # Errors
537    /// `BadParameter` wenn ein `%N`-Index ausserhalb des neuen Vecs
538    /// liegt.
539    pub fn set_filter_parameters(&self, params: Vec<String>) -> Result<()> {
540        let used = self.parsed.collect_param_indices();
541        if let Some(max) = used.iter().max() {
542            if (*max as usize) >= params.len() {
543                return Err(DdsError::BadParameter {
544                    what: "filter parameter %N out of range",
545                });
546            }
547        }
548        let values: Vec<Value> = params.iter().map(|s| param_string_to_value(s)).collect();
549        let fp = FilterParams {
550            raw: params,
551            values,
552        };
553        #[cfg(feature = "std")]
554        {
555            let mut w = self
556                .params
557                .write()
558                .map_err(|_| DdsError::PreconditionNotMet {
559                    reason: "filter params poisoned",
560                })?;
561            *w = fp;
562        }
563        #[cfg(not(feature = "std"))]
564        {
565            // im no_std-Pfad nicht mutable gehalten — Parameter werden
566            // beim Konstruktor gesetzt; dieser Pfad ist im v1.2-MVP
567            // ohnehin nicht aktiv (dcps verlangt std).
568            let _ = fp;
569            return Err(DdsError::PreconditionNotMet {
570                reason: "set_filter_parameters needs std feature",
571            });
572        }
573        Ok(())
574    }
575
576    /// Spec §2.2.2.3.3.3 `get_related_topic`.
577    #[must_use]
578    pub fn get_related_topic(&self) -> &Topic<T> {
579        &self.related_topic
580    }
581
582    /// Wertet den Filter gegen ein decodiertes Sample aus. Liefert
583    /// `Ok(true)` wenn das Sample passieren soll, `Ok(false)` wenn es
584    /// gefiltert wird, `Err` wenn die Expression auf das Row-Schema
585    /// nicht passt (Caller-Entscheidung: filter denies oder error).
586    ///
587    /// # Errors
588    /// - `PreconditionNotMet` wenn die Filter-Parameter-Lock vergiftet ist.
589    /// - `BadParameter` wenn ein Feld in der Expression im Row nicht
590    ///   existiert oder ein Type-Mismatch auftritt.
591    pub fn evaluate<R: RowAccess>(&self, row: &R) -> Result<bool> {
592        #[cfg(feature = "std")]
593        let params = {
594            let r = self
595                .params
596                .read()
597                .map_err(|_| DdsError::PreconditionNotMet {
598                    reason: "filter params poisoned",
599                })?;
600            r.values.clone()
601        };
602        #[cfg(not(feature = "std"))]
603        let params = self.params.values.clone();
604        self.parsed
605            .evaluate(row, &params)
606            .map_err(|e| DdsError::BadParameter {
607                what: match e {
608                    zerodds_sql_filter::EvalError::UnknownField(_) => "filter unknown field",
609                    zerodds_sql_filter::EvalError::MissingParam(_) => "filter missing param",
610                    zerodds_sql_filter::EvalError::TypeMismatch(_) => "filter type mismatch",
611                },
612            })
613    }
614}
615
616impl<T: DdsType> Clone for ContentFilteredTopic<T> {
617    fn clone(&self) -> Self {
618        Self {
619            name: self.name.clone(),
620            related_topic: self.related_topic.clone(),
621            filter_expression: self.filter_expression.clone(),
622            parsed: Arc::clone(&self.parsed),
623            #[cfg(feature = "std")]
624            params: Arc::clone(&self.params),
625            #[cfg(not(feature = "std"))]
626            params: self.params.clone(),
627            participant: self.participant.clone(),
628            _t: PhantomData,
629        }
630    }
631}
632
633impl<T: DdsType> TopicDescription for ContentFilteredTopic<T> {
634    /// Type-Name kommt vom Related-Topic — ein CFT teilt sich das
635    /// Schema mit dem unterliegenden Topic.
636    fn get_type_name(&self) -> &str {
637        self.related_topic.type_name()
638    }
639    fn get_name(&self) -> &str {
640        &self.name
641    }
642    fn get_participant(&self) -> &DomainParticipant {
643        &self.participant
644    }
645}
646
647/// `MultiTopic<T>` — Spec §2.2.2.3.4 (DDS 1.4 optionales Feature).
648///
649/// Eine MultiTopic kombiniert mehrere Underlying-Topics in einer
650/// einzigen TopicDescription via `subscription_expression` (SQL-
651/// Subset). Der Resulting-Type `T` ist User-definiert; die
652/// Underlying-Topics koennen unterschiedliche Types haben.
653///
654/// **Cross-Topic-Sample-Routing:** Der Join-Operator ist ueber
655/// [`hash_join_two`] und [`Self::evaluate_joined`] live. Die
656/// `subscription_expression` referenziert Felder als
657/// `<topic_name>.<field_path>` (dotted), und [`JoinedRow`]
658/// dispatched die Lookups an die matchenden Topic-Quellen.
659pub struct MultiTopic<T: DdsType> {
660    name: String,
661    type_name: String,
662    /// Liste der Related-Topics als untypisierte Handles.
663    related_topic_names: Vec<String>,
664    subscription_expression: String,
665    /// Vorab-geparste AST der Subscription-Expression.
666    parsed: Arc<Expr>,
667    /// Mutable Filter-Parameter (Spec §2.2.2.3.4.7
668    /// `set_expression_parameters`).
669    #[cfg(feature = "std")]
670    params: Arc<RwLock<FilterParams>>,
671    #[cfg(not(feature = "std"))]
672    params: FilterParams,
673    participant: DomainParticipant,
674    _t: PhantomData<T>,
675}
676
677impl<T: DdsType> core::fmt::Debug for MultiTopic<T> {
678    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
679        f.debug_struct("MultiTopic")
680            .field("name", &self.name)
681            .field("type_name", &self.type_name)
682            .field("related_topic_names", &self.related_topic_names)
683            .field("subscription_expression", &self.subscription_expression)
684            .finish_non_exhaustive()
685    }
686}
687
688impl<T: DdsType> MultiTopic<T> {
689    /// Konstruktor (intern aufgerufen von
690    /// `DomainParticipant::create_multitopic`).
691    ///
692    /// # Errors
693    /// `BadParameter` bei leerem Namen oder leerer Expression, oder
694    /// wenn die Expression nicht parst, oder wenn ein referenzierter
695    /// `%N`-Parameter ausserhalb des `expression_parameters`-Vec liegt,
696    /// oder wenn `related_topic_names` leer ist.
697    pub(crate) fn new(
698        name: String,
699        type_name: String,
700        related_topic_names: Vec<String>,
701        subscription_expression: String,
702        expression_parameters: Vec<String>,
703        participant: DomainParticipant,
704    ) -> Result<Self> {
705        if related_topic_names.is_empty() {
706            return Err(DdsError::BadParameter {
707                what: "multitopic needs at least one related topic",
708            });
709        }
710        let parsed = zerodds_sql_filter::parse(&subscription_expression).map_err(|_| {
711            DdsError::BadParameter {
712                what: "multitopic subscription expression syntax",
713            }
714        })?;
715        let used = parsed.collect_param_indices();
716        if let Some(max) = used.iter().max() {
717            if (*max as usize) >= expression_parameters.len() {
718                return Err(DdsError::BadParameter {
719                    what: "multitopic expression parameter %N out of range",
720                });
721            }
722        }
723        let values: Vec<Value> = expression_parameters
724            .iter()
725            .map(|s| param_string_to_value(s))
726            .collect();
727        let fp = FilterParams {
728            raw: expression_parameters,
729            values,
730        };
731        Ok(Self {
732            name,
733            type_name,
734            related_topic_names,
735            subscription_expression,
736            parsed: Arc::new(parsed),
737            #[cfg(feature = "std")]
738            params: Arc::new(RwLock::new(fp)),
739            #[cfg(not(feature = "std"))]
740            params: fp,
741            participant,
742            _t: PhantomData,
743        })
744    }
745
746    /// Spec §2.2.2.3.4.4 `get_subscription_expression`.
747    #[must_use]
748    pub fn get_subscription_expression(&self) -> &str {
749        &self.subscription_expression
750    }
751
752    /// Spec §2.2.2.3.4.5 `get_expression_parameters`.
753    #[must_use]
754    pub fn get_expression_parameters(&self) -> Vec<String> {
755        #[cfg(feature = "std")]
756        {
757            self.params
758                .read()
759                .map(|p| p.raw.clone())
760                .unwrap_or_default()
761        }
762        #[cfg(not(feature = "std"))]
763        {
764            self.params.raw.clone()
765        }
766    }
767
768    /// Spec §2.2.2.3.4.6 `set_expression_parameters`.
769    ///
770    /// # Errors
771    /// `BadParameter` wenn ein referenzierter `%N`-Parameter
772    /// ausserhalb des neuen Vec liegt; `PreconditionNotMet` bei
773    /// Lock-Poisoning.
774    pub fn set_expression_parameters(&self, params: Vec<String>) -> Result<()> {
775        let used = self.parsed.collect_param_indices();
776        if let Some(max) = used.iter().max() {
777            if (*max as usize) >= params.len() {
778                return Err(DdsError::BadParameter {
779                    what: "multitopic expression parameter %N out of range",
780                });
781            }
782        }
783        let values: Vec<Value> = params.iter().map(|s| param_string_to_value(s)).collect();
784        let fp = FilterParams {
785            raw: params,
786            values,
787        };
788        #[cfg(feature = "std")]
789        {
790            let mut w = self
791                .params
792                .write()
793                .map_err(|_| DdsError::PreconditionNotMet {
794                    reason: "multitopic params poisoned",
795                })?;
796            *w = fp;
797        }
798        #[cfg(not(feature = "std"))]
799        {
800            let _ = fp;
801            return Err(DdsError::PreconditionNotMet {
802                reason: "set_expression_parameters needs std feature",
803            });
804        }
805        Ok(())
806    }
807
808    /// Spec §2.2.2.3.4.3 `get_related_topic` (PSM gibt eine Sequence
809    /// zurueck — wir liefern die Namen).
810    #[must_use]
811    pub fn get_related_topic_names(&self) -> &[String] {
812        &self.related_topic_names
813    }
814
815    /// Wertet die `subscription_expression` gegen ein [`JoinedRow`]
816    /// aus (Cross-Topic-Predicate). Spec §2.2.2.3.4.
817    ///
818    /// # Errors
819    /// `PreconditionNotMet` bei Lock-Poisoning oder SQL-Eval-Fehler.
820    pub fn evaluate_joined(&self, row: &JoinedRow<'_>) -> Result<bool> {
821        #[cfg(feature = "std")]
822        let values = {
823            let p = self
824                .params
825                .read()
826                .map_err(|_| DdsError::PreconditionNotMet {
827                    reason: "multitopic params poisoned",
828                })?;
829            p.values.clone()
830        };
831        #[cfg(not(feature = "std"))]
832        let values = self.params.values.clone();
833        self.parsed
834            .evaluate(row, &values)
835            .map_err(|_| DdsError::PreconditionNotMet {
836                reason: "multitopic SQL evaluation failed",
837            })
838    }
839}
840
841/// `JoinedRow` — `RowAccess`-Adapter ueber mehrere benannte
842/// Topic-Quellen. Dotted-Pfade `topic.field.sub` werden am ersten
843/// `.` geteilt: der Praefix matcht den Topic-Namen, der Rest wird
844/// an dessen [`RowAccess::get`] weitergereicht.
845///
846/// Wenn keine Topic-Quelle den Praefix kennt, wird der gesamte Pfad
847/// pauschal an alle Quellen weitergereicht (Fallback fuer
848/// undotted-Feld-Referenzen).
849pub struct JoinedRow<'a> {
850    sources: Vec<(String, &'a dyn RowAccess)>,
851}
852
853impl<'a> JoinedRow<'a> {
854    /// Konstruktor.
855    #[must_use]
856    pub fn new(sources: Vec<(String, &'a dyn RowAccess)>) -> Self {
857        Self { sources }
858    }
859}
860
861impl RowAccess for JoinedRow<'_> {
862    fn get(&self, path: &str) -> Option<Value> {
863        if let Some((prefix, rest)) = path.split_once('.') {
864            for (name, src) in &self.sources {
865                if name == prefix {
866                    return src.get(rest);
867                }
868            }
869        }
870        // Kein Praefix-Match -> alle Quellen abfragen, erste Antwort
871        // gewinnt.
872        for (_, src) in &self.sources {
873            if let Some(v) = src.get(path) {
874                return Some(v);
875            }
876        }
877        None
878    }
879}
880
881/// Hash-Join-Helper fuer zwei typed Streams. Iteriert die linke
882/// Liste, baut eine HashMap `key -> [&L]` (Build-Phase), iteriert
883/// dann die rechte Liste (Probe-Phase) und erzeugt fuer jedes
884/// matchende `(L, R)`-Paar via `combine` ein Resultat. Optional
885/// wird ein zusaetzliches Predicate via `predicate` (z.B.
886/// `MultiTopic::evaluate_joined`) auf jedem Paar geprueft.
887///
888/// Spec: §2.2.2.3.4 (MultiTopic) — der Hash-Join ist die idiomatische
889/// O(n+m)-Implementation der `subscription_expression`.
890#[cfg(feature = "std")]
891#[allow(clippy::too_many_arguments)]
892pub fn hash_join_two<L, R, T, KL, KR, C, P>(
893    left: &[L],
894    left_topic: &str,
895    key_left: KL,
896    right: &[R],
897    right_topic: &str,
898    key_right: KR,
899    combine: C,
900    predicate: P,
901) -> Vec<T>
902where
903    L: RowAccess,
904    R: RowAccess,
905    KL: Fn(&L) -> String,
906    KR: Fn(&R) -> String,
907    C: Fn(&L, &R) -> T,
908    P: Fn(&JoinedRow<'_>) -> Result<bool>,
909{
910    use std::collections::HashMap;
911    let mut idx: HashMap<String, Vec<&L>> = HashMap::with_capacity(left.len());
912    for l in left {
913        idx.entry(key_left(l)).or_default().push(l);
914    }
915    let mut out = Vec::new();
916    for r in right {
917        let k = key_right(r);
918        let Some(matches) = idx.get(&k) else { continue };
919        for l in matches {
920            let row = JoinedRow::new(alloc::vec![
921                (left_topic.to_string(), *l as &dyn RowAccess),
922                (right_topic.to_string(), r as &dyn RowAccess),
923            ]);
924            if predicate(&row).unwrap_or(false) {
925                out.push(combine(l, r));
926            }
927        }
928    }
929    out
930}
931
932impl<T: DdsType> Clone for MultiTopic<T> {
933    fn clone(&self) -> Self {
934        Self {
935            name: self.name.clone(),
936            type_name: self.type_name.clone(),
937            related_topic_names: self.related_topic_names.clone(),
938            subscription_expression: self.subscription_expression.clone(),
939            parsed: Arc::clone(&self.parsed),
940            #[cfg(feature = "std")]
941            params: Arc::clone(&self.params),
942            #[cfg(not(feature = "std"))]
943            params: self.params.clone(),
944            participant: self.participant.clone(),
945            _t: PhantomData,
946        }
947    }
948}
949
950impl<T: DdsType> TopicDescription for MultiTopic<T> {
951    fn get_type_name(&self) -> &str {
952        &self.type_name
953    }
954    fn get_name(&self) -> &str {
955        &self.name
956    }
957    fn get_participant(&self) -> &DomainParticipant {
958        &self.participant
959    }
960}
961
962/// Heuristik-Konversion eines `filter_parameter`-Strings in einen
963/// `Value`: erst Bool, dann Int, dann Float, sonst String. Wir
964/// strippen flankierende `'...'`-Anfuehrungszeichen, weil die Spec-
965/// Beispiele die Strings oft so liefern.
966fn param_string_to_value(s: &str) -> Value {
967    let trimmed = s.trim();
968    // Bool.
969    if trimmed.eq_ignore_ascii_case("TRUE") {
970        return Value::Bool(true);
971    }
972    if trimmed.eq_ignore_ascii_case("FALSE") {
973        return Value::Bool(false);
974    }
975    // Int.
976    if let Ok(i) = trimmed.parse::<i64>() {
977        return Value::Int(i);
978    }
979    // Float.
980    if let Ok(f) = trimmed.parse::<f64>() {
981        return Value::Float(f);
982    }
983    // String (mit optionalem ''-strip).
984    if trimmed.len() >= 2 && trimmed.starts_with('\'') && trimmed.ends_with('\'') {
985        return Value::String(trimmed[1..trimmed.len() - 1].to_string());
986    }
987    Value::String(trimmed.to_string())
988}
989
990#[cfg(test)]
991#[allow(clippy::expect_used, clippy::unwrap_used)]
992mod tests {
993    use super::*;
994    use crate::dds_type::RawBytes;
995    use crate::factory::DomainParticipantFactory;
996    use crate::qos::DomainParticipantQos;
997
998    #[test]
999    fn topic_implements_topic_description() {
1000        let p = DomainParticipantFactory::instance()
1001            .create_participant_offline(0, DomainParticipantQos::default());
1002        let t = p
1003            .create_topic::<RawBytes>("Chatter", TopicQos::default())
1004            .unwrap();
1005        // Trait-Methoden funktionieren.
1006        let td: &dyn TopicDescription = &t;
1007        assert_eq!(td.get_name(), "Chatter");
1008        assert_eq!(td.get_type_name(), RawBytes::TYPE_NAME);
1009        assert_eq!(td.get_participant().domain_id(), 0);
1010    }
1011
1012    #[test]
1013    fn topic_description_handle_is_cloneable() {
1014        let p = DomainParticipantFactory::instance()
1015            .create_participant_offline(7, DomainParticipantQos::default());
1016        let h = TopicDescriptionHandle::new("X".into(), "T".into(), p.clone());
1017        let h2 = h.clone();
1018        assert_eq!(h2.get_name(), "X");
1019        assert_eq!(h2.get_type_name(), "T");
1020        assert_eq!(h2.get_participant().domain_id(), 7);
1021    }
1022
1023    #[test]
1024    fn topic_description_trait_is_object_safe() {
1025        // Object-safety verifizieren: wir koennen `&dyn TopicDescription`
1026        // halten und nicht-statisch dispatchen. Aus mehreren konkreten
1027        // Implementierungen sammeln.
1028        let p = DomainParticipantFactory::instance()
1029            .create_participant_offline(8, DomainParticipantQos::default());
1030        let t = p
1031            .create_topic::<RawBytes>("DynA", TopicQos::default())
1032            .unwrap();
1033        let h = TopicDescriptionHandle::new("DynB".into(), "T".into(), p.clone());
1034        let descs: Vec<&dyn TopicDescription> = vec![&t, &h];
1035        assert_eq!(descs.len(), 2);
1036        assert_eq!(descs[0].get_name(), "DynA");
1037        assert_eq!(descs[1].get_name(), "DynB");
1038    }
1039
1040    #[test]
1041    fn topic_description_create_topic_rejects_empty_name() {
1042        // §2.2.2.2.1.4 create_topic: leerer Topic-Name → BadParameter.
1043        // Das ist die einzige Stelle, wo TopicDescription-Konsumenten
1044        // einen ungueltigen TopicDescription-Bauversuch erleben.
1045        let p = DomainParticipantFactory::instance()
1046            .create_participant_offline(9, DomainParticipantQos::default());
1047        let res = p.create_topic::<RawBytes>("", TopicQos::default());
1048        assert!(matches!(
1049            res,
1050            Err(crate::error::DdsError::BadParameter { .. })
1051        ));
1052    }
1053
1054    // -------- MultiTopic (§2.2.2.3.4) --------
1055
1056    #[test]
1057    fn multitopic_compiles_and_implements_topic_description() {
1058        let p = DomainParticipantFactory::instance()
1059            .create_participant_offline(13, DomainParticipantQos::default());
1060        let mt = p
1061            .create_multitopic::<RawBytes>(
1062                "Combined",
1063                "MyResultType",
1064                alloc::vec!["TopicA".into(), "TopicB".into()],
1065                "x > %0",
1066                alloc::vec!["10".into()],
1067            )
1068            .unwrap();
1069        let td: &dyn TopicDescription = &mt;
1070        assert_eq!(td.get_name(), "Combined");
1071        assert_eq!(td.get_type_name(), "MyResultType");
1072        assert_eq!(td.get_participant().domain_id(), 13);
1073        assert_eq!(mt.get_subscription_expression(), "x > %0");
1074        assert_eq!(mt.get_related_topic_names().len(), 2);
1075        assert_eq!(mt.get_expression_parameters().len(), 1);
1076    }
1077
1078    #[test]
1079    fn multitopic_set_expression_parameters_roundtrip() {
1080        let p = DomainParticipantFactory::instance()
1081            .create_participant_offline(0, DomainParticipantQos::default());
1082        let mt = p
1083            .create_multitopic::<RawBytes>(
1084                "MT",
1085                "T",
1086                alloc::vec!["A".into()],
1087                "v = %0",
1088                alloc::vec!["100".into()],
1089            )
1090            .unwrap();
1091        assert_eq!(
1092            mt.get_expression_parameters(),
1093            alloc::vec!["100".to_string()]
1094        );
1095        mt.set_expression_parameters(alloc::vec!["200".into()])
1096            .unwrap();
1097        assert_eq!(
1098            mt.get_expression_parameters(),
1099            alloc::vec!["200".to_string()]
1100        );
1101    }
1102
1103    #[test]
1104    fn multitopic_rejects_empty_name() {
1105        let p = DomainParticipantFactory::instance()
1106            .create_participant_offline(0, DomainParticipantQos::default());
1107        let res = p.create_multitopic::<RawBytes>(
1108            "",
1109            "T",
1110            alloc::vec!["A".into()],
1111            "x > 0",
1112            alloc::vec::Vec::new(),
1113        );
1114        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1115    }
1116
1117    #[test]
1118    fn multitopic_rejects_empty_type_name() {
1119        let p = DomainParticipantFactory::instance()
1120            .create_participant_offline(0, DomainParticipantQos::default());
1121        let res = p.create_multitopic::<RawBytes>(
1122            "MT",
1123            "",
1124            alloc::vec!["A".into()],
1125            "x > 0",
1126            alloc::vec::Vec::new(),
1127        );
1128        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1129    }
1130
1131    #[test]
1132    fn multitopic_rejects_empty_related_topics() {
1133        let p = DomainParticipantFactory::instance()
1134            .create_participant_offline(0, DomainParticipantQos::default());
1135        let res = p.create_multitopic::<RawBytes>(
1136            "MT",
1137            "T",
1138            alloc::vec::Vec::new(),
1139            "x > 0",
1140            alloc::vec::Vec::new(),
1141        );
1142        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1143    }
1144
1145    #[test]
1146    fn multitopic_rejects_invalid_expression() {
1147        let p = DomainParticipantFactory::instance()
1148            .create_participant_offline(0, DomainParticipantQos::default());
1149        let res = p.create_multitopic::<RawBytes>(
1150            "MT",
1151            "T",
1152            alloc::vec!["A".into()],
1153            "x === bogus",
1154            alloc::vec::Vec::new(),
1155        );
1156        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1157    }
1158
1159    #[test]
1160    fn multitopic_rejects_param_index_out_of_range() {
1161        let p = DomainParticipantFactory::instance()
1162            .create_participant_offline(0, DomainParticipantQos::default());
1163        // Expression referenziert %1, params hat aber nur 1 Eintrag.
1164        let res = p.create_multitopic::<RawBytes>(
1165            "MT",
1166            "T",
1167            alloc::vec!["A".into()],
1168            "x = %1",
1169            alloc::vec!["only_zero".into()],
1170        );
1171        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1172    }
1173
1174    #[test]
1175    fn multitopic_set_params_validates_index_range() {
1176        let p = DomainParticipantFactory::instance()
1177            .create_participant_offline(0, DomainParticipantQos::default());
1178        let mt = p
1179            .create_multitopic::<RawBytes>(
1180                "MT",
1181                "T",
1182                alloc::vec!["A".into()],
1183                "x = %0 OR y = %1",
1184                alloc::vec!["a".into(), "b".into()],
1185            )
1186            .unwrap();
1187        // Set mit zu wenig Params → BadParameter.
1188        let res = mt.set_expression_parameters(alloc::vec!["only_zero".into()]);
1189        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1190    }
1191
1192    #[test]
1193    fn multitopic_clone_shares_params() {
1194        let p = DomainParticipantFactory::instance()
1195            .create_participant_offline(0, DomainParticipantQos::default());
1196        let mt = p
1197            .create_multitopic::<RawBytes>(
1198                "MT",
1199                "T",
1200                alloc::vec!["A".into()],
1201                "v = %0",
1202                alloc::vec!["init".into()],
1203            )
1204            .unwrap();
1205        let mt2 = mt.clone();
1206        // Update auf einer Klone-Instanz reflektiert sich auf der anderen
1207        // (shared Arc<RwLock<FilterParams>>).
1208        mt.set_expression_parameters(alloc::vec!["updated".into()])
1209            .unwrap();
1210        assert_eq!(
1211            mt2.get_expression_parameters(),
1212            alloc::vec!["updated".to_string()]
1213        );
1214    }
1215
1216    // -------- MultiTopic Hash-Join (§2.2.2.3.4-r-cross-topic-join) --------
1217
1218    struct OrderRow {
1219        id: i64,
1220        amount: i64,
1221    }
1222    impl RowAccess for OrderRow {
1223        fn get(&self, p: &str) -> Option<Value> {
1224            match p {
1225                "id" => Some(Value::Int(self.id)),
1226                "amount" => Some(Value::Int(self.amount)),
1227                _ => None,
1228            }
1229        }
1230    }
1231
1232    struct CustomerRow {
1233        id: i64,
1234        country: String,
1235    }
1236    impl RowAccess for CustomerRow {
1237        fn get(&self, p: &str) -> Option<Value> {
1238            match p {
1239                "id" => Some(Value::Int(self.id)),
1240                "country" => Some(Value::String(self.country.clone())),
1241                _ => None,
1242            }
1243        }
1244    }
1245
1246    #[test]
1247    fn joined_row_dispatches_dotted_paths_by_topic_prefix() {
1248        let o = OrderRow { id: 7, amount: 100 };
1249        let c = CustomerRow {
1250            id: 7,
1251            country: "DE".into(),
1252        };
1253        let row = JoinedRow::new(alloc::vec![
1254            ("Order".into(), &o as &dyn RowAccess),
1255            ("Customer".into(), &c as &dyn RowAccess),
1256        ]);
1257        assert_eq!(row.get("Order.amount"), Some(Value::Int(100)));
1258        assert_eq!(
1259            row.get("Customer.country"),
1260            Some(Value::String("DE".into()))
1261        );
1262        assert_eq!(row.get("Order.country"), None); // praefix matched -> kein Field
1263    }
1264
1265    #[test]
1266    fn joined_row_undotted_falls_back_to_first_match() {
1267        let o = OrderRow { id: 7, amount: 100 };
1268        let c = CustomerRow {
1269            id: 9,
1270            country: "DE".into(),
1271        };
1272        let row = JoinedRow::new(alloc::vec![
1273            ("Order".into(), &o as &dyn RowAccess),
1274            ("Customer".into(), &c as &dyn RowAccess),
1275        ]);
1276        // "country" gibts nur in CustomerRow → wird gefunden via Fallback.
1277        assert_eq!(row.get("country"), Some(Value::String("DE".into())));
1278        // "amount" gibt es nur in OrderRow.
1279        assert_eq!(row.get("amount"), Some(Value::Int(100)));
1280    }
1281
1282    #[test]
1283    fn multitopic_evaluate_joined_uses_dotted_paths() {
1284        let p = DomainParticipantFactory::instance()
1285            .create_participant_offline(50, DomainParticipantQos::default());
1286        let mt = p
1287            .create_multitopic::<RawBytes>(
1288                "Sales",
1289                "Sale",
1290                alloc::vec!["Order".into(), "Customer".into()],
1291                "Order.id = Customer.id AND Customer.country = %0",
1292                alloc::vec!["DE".into()],
1293            )
1294            .unwrap();
1295        let o = OrderRow { id: 1, amount: 50 };
1296        let c = CustomerRow {
1297            id: 1,
1298            country: "DE".into(),
1299        };
1300        let row = JoinedRow::new(alloc::vec![
1301            ("Order".into(), &o as &dyn RowAccess),
1302            ("Customer".into(), &c as &dyn RowAccess),
1303        ]);
1304        assert!(mt.evaluate_joined(&row).unwrap());
1305
1306        let c_us = CustomerRow {
1307            id: 1,
1308            country: "US".into(),
1309        };
1310        let row2 = JoinedRow::new(alloc::vec![
1311            ("Order".into(), &o as &dyn RowAccess),
1312            ("Customer".into(), &c_us as &dyn RowAccess),
1313        ]);
1314        assert!(!mt.evaluate_joined(&row2).unwrap());
1315    }
1316
1317    #[test]
1318    fn hash_join_two_combines_matching_rows() {
1319        let p = DomainParticipantFactory::instance()
1320            .create_participant_offline(51, DomainParticipantQos::default());
1321        let mt = p
1322            .create_multitopic::<RawBytes>(
1323                "Sales",
1324                "Sale",
1325                alloc::vec!["Order".into(), "Customer".into()],
1326                "Customer.country = %0",
1327                alloc::vec!["DE".into()],
1328            )
1329            .unwrap();
1330        let orders = alloc::vec![
1331            OrderRow { id: 1, amount: 50 },
1332            OrderRow { id: 2, amount: 70 },
1333            OrderRow { id: 3, amount: 90 },
1334        ];
1335        let customers = alloc::vec![
1336            CustomerRow {
1337                id: 1,
1338                country: "DE".into(),
1339            },
1340            CustomerRow {
1341                id: 2,
1342                country: "US".into(),
1343            },
1344            CustomerRow {
1345                id: 3,
1346                country: "DE".into(),
1347            },
1348        ];
1349        let out: alloc::vec::Vec<(i64, i64, String)> = hash_join_two(
1350            &orders,
1351            "Order",
1352            |o| o.id.to_string(),
1353            &customers,
1354            "Customer",
1355            |c| c.id.to_string(),
1356            |o, c| (o.id, o.amount, c.country.clone()),
1357            |row| mt.evaluate_joined(row),
1358        );
1359        assert_eq!(out.len(), 2);
1360        // Erwarte ids 1 und 3 (DE), nicht 2 (US).
1361        assert!(out.iter().any(|(i, _, _)| *i == 1));
1362        assert!(out.iter().any(|(i, _, _)| *i == 3));
1363        assert!(out.iter().all(|(_, _, c)| c == "DE"));
1364    }
1365
1366    #[test]
1367    fn hash_join_two_returns_empty_when_no_keys_match() {
1368        let p = DomainParticipantFactory::instance()
1369            .create_participant_offline(52, DomainParticipantQos::default());
1370        let mt = p
1371            .create_multitopic::<RawBytes>(
1372                "Sales",
1373                "Sale",
1374                alloc::vec!["Order".into(), "Customer".into()],
1375                "Order.id = Customer.id",
1376                alloc::vec::Vec::new(),
1377            )
1378            .unwrap();
1379        let orders = alloc::vec![OrderRow { id: 1, amount: 50 }];
1380        let customers = alloc::vec![CustomerRow {
1381            id: 99,
1382            country: "DE".into(),
1383        }];
1384        let out: alloc::vec::Vec<i64> = hash_join_two(
1385            &orders,
1386            "Order",
1387            |o| o.id.to_string(),
1388            &customers,
1389            "Customer",
1390            |c| c.id.to_string(),
1391            |o, _| o.id,
1392            |row| mt.evaluate_joined(row),
1393        );
1394        assert!(out.is_empty());
1395    }
1396
1397    #[test]
1398    fn hash_join_two_emits_cartesian_for_duplicate_keys() {
1399        let p = DomainParticipantFactory::instance()
1400            .create_participant_offline(53, DomainParticipantQos::default());
1401        let mt = p
1402            .create_multitopic::<RawBytes>(
1403                "Sales",
1404                "Sale",
1405                alloc::vec!["Order".into(), "Customer".into()],
1406                "Order.id = Customer.id",
1407                alloc::vec::Vec::new(),
1408            )
1409            .unwrap();
1410        // Zwei Orders mit id=1, ein Customer mit id=1
1411        let orders = alloc::vec![
1412            OrderRow { id: 1, amount: 10 },
1413            OrderRow { id: 1, amount: 20 },
1414        ];
1415        let customers = alloc::vec![CustomerRow {
1416            id: 1,
1417            country: "DE".into(),
1418        }];
1419        let out: alloc::vec::Vec<i64> = hash_join_two(
1420            &orders,
1421            "Order",
1422            |o| o.id.to_string(),
1423            &customers,
1424            "Customer",
1425            |c| c.id.to_string(),
1426            |o, _| o.amount,
1427            |row| mt.evaluate_joined(row),
1428        );
1429        assert_eq!(out.len(), 2);
1430        assert!(out.contains(&10));
1431        assert!(out.contains(&20));
1432    }
1433
1434    #[test]
1435    fn hash_join_two_predicate_can_filter_pairs() {
1436        let p = DomainParticipantFactory::instance()
1437            .create_participant_offline(54, DomainParticipantQos::default());
1438        // Predicate verlangt amount > 60.
1439        let mt = p
1440            .create_multitopic::<RawBytes>(
1441                "Sales",
1442                "Sale",
1443                alloc::vec!["Order".into(), "Customer".into()],
1444                "Order.amount > 60",
1445                alloc::vec::Vec::new(),
1446            )
1447            .unwrap();
1448        let orders = alloc::vec![
1449            OrderRow { id: 1, amount: 50 },
1450            OrderRow { id: 2, amount: 70 },
1451        ];
1452        let customers = alloc::vec![
1453            CustomerRow {
1454                id: 1,
1455                country: "DE".into(),
1456            },
1457            CustomerRow {
1458                id: 2,
1459                country: "DE".into(),
1460            },
1461        ];
1462        let out: alloc::vec::Vec<i64> = hash_join_two(
1463            &orders,
1464            "Order",
1465            |o| o.id.to_string(),
1466            &customers,
1467            "Customer",
1468            |c| c.id.to_string(),
1469            |o, _| o.amount,
1470            |row| mt.evaluate_joined(row),
1471        );
1472        // Nur amount=70 darf raus.
1473        assert_eq!(out, alloc::vec![70]);
1474    }
1475
1476    #[test]
1477    fn delete_multitopic_rejects_foreign_participant() {
1478        let p1 = DomainParticipantFactory::instance()
1479            .create_participant_offline(0, DomainParticipantQos::default());
1480        let p2 = DomainParticipantFactory::instance()
1481            .create_participant_offline(1, DomainParticipantQos::default());
1482        let mt = p1
1483            .create_multitopic::<RawBytes>(
1484                "MT",
1485                "T",
1486                alloc::vec!["A".into()],
1487                "x > 0",
1488                alloc::vec::Vec::new(),
1489            )
1490            .unwrap();
1491        let res = p2.delete_multitopic(&mt);
1492        assert!(matches!(res, Err(DdsError::BadParameter { .. })));
1493    }
1494
1495    #[test]
1496    fn topic_description_get_participant_returns_owning_participant() {
1497        // get_participant() liefert genau den Participant, an dem
1498        // create_topic aufgerufen wurde — kein anderer.
1499        let p1 = DomainParticipantFactory::instance()
1500            .create_participant_offline(11, DomainParticipantQos::default());
1501        let p2 = DomainParticipantFactory::instance()
1502            .create_participant_offline(12, DomainParticipantQos::default());
1503        let t = p1
1504            .create_topic::<RawBytes>("Owned", TopicQos::default())
1505            .unwrap();
1506        let td: &dyn TopicDescription = &t;
1507        assert_eq!(td.get_participant().domain_id(), 11);
1508        assert_ne!(td.get_participant().domain_id(), p2.domain_id());
1509    }
1510
1511    // -------- ContentFilteredTopic --------
1512
1513    use alloc::collections::BTreeMap;
1514    use zerodds_sql_filter::{RowAccess, Value};
1515
1516    struct MapRow(BTreeMap<String, Value>);
1517    impl RowAccess for MapRow {
1518        fn get(&self, path: &str) -> Option<Value> {
1519            self.0.get(path).cloned()
1520        }
1521    }
1522
1523    fn row(pairs: &[(&str, Value)]) -> MapRow {
1524        let mut m = BTreeMap::new();
1525        for (k, v) in pairs {
1526            m.insert((*k).into(), v.clone());
1527        }
1528        MapRow(m)
1529    }
1530
1531    fn mk_p(domain: i32) -> DomainParticipant {
1532        DomainParticipantFactory::instance()
1533            .create_participant_offline(domain, DomainParticipantQos::default())
1534    }
1535
1536    #[test]
1537    fn cft_compiles_and_evaluates_filter() {
1538        let p = mk_p(0);
1539        let topic = p
1540            .create_topic::<RawBytes>("Chatter", TopicQos::default())
1541            .unwrap();
1542        let cft = p
1543            .create_contentfilteredtopic("ChatterFilt", &topic, "x > 10", alloc::vec::Vec::new())
1544            .unwrap();
1545        // CFT trait works.
1546        let td: &dyn TopicDescription = &cft;
1547        assert_eq!(td.get_name(), "ChatterFilt");
1548        assert_eq!(td.get_type_name(), RawBytes::TYPE_NAME);
1549
1550        // Filter: x > 10
1551        let r_yes = row(&[("x", Value::Int(20))]);
1552        let r_no = row(&[("x", Value::Int(5))]);
1553        assert_eq!(cft.evaluate(&r_yes), Ok(true));
1554        assert_eq!(cft.evaluate(&r_no), Ok(false));
1555    }
1556
1557    #[test]
1558    fn cft_with_params_can_be_updated() {
1559        let p = mk_p(0);
1560        let topic = p
1561            .create_topic::<RawBytes>("T", TopicQos::default())
1562            .unwrap();
1563        let cft = p
1564            .create_contentfilteredtopic("Filt", &topic, "color = %0", alloc::vec!["RED".into()])
1565            .unwrap();
1566        assert_eq!(cft.get_filter_expression(), "color = %0");
1567        assert_eq!(cft.get_filter_parameters(), alloc::vec!["RED".to_string()]);
1568
1569        let r = row(&[("color", Value::String("RED".into()))]);
1570        assert_eq!(cft.evaluate(&r), Ok(true));
1571
1572        // Parameter updaten.
1573        cft.set_filter_parameters(alloc::vec!["BLUE".into()])
1574            .unwrap();
1575        assert_eq!(cft.evaluate(&r), Ok(false));
1576    }
1577
1578    #[test]
1579    fn cft_get_related_topic() {
1580        let p = mk_p(0);
1581        let topic = p
1582            .create_topic::<RawBytes>("Base", TopicQos::default())
1583            .unwrap();
1584        let cft = p
1585            .create_contentfilteredtopic("CF", &topic, "x = 1", alloc::vec::Vec::new())
1586            .unwrap();
1587        assert_eq!(cft.get_related_topic().name(), "Base");
1588    }
1589
1590    #[test]
1591    fn cft_invalid_expression_rejected() {
1592        let p = mk_p(0);
1593        let topic = p
1594            .create_topic::<RawBytes>("T", TopicQos::default())
1595            .unwrap();
1596        let err = p
1597            .create_contentfilteredtopic("CF", &topic, "x === bogus", alloc::vec::Vec::new())
1598            .unwrap_err();
1599        assert!(matches!(err, DdsError::BadParameter { .. }));
1600    }
1601
1602    #[test]
1603    fn cft_param_index_out_of_range_rejected() {
1604        let p = mk_p(0);
1605        let topic = p
1606            .create_topic::<RawBytes>("T", TopicQos::default())
1607            .unwrap();
1608        // Expression nutzt %0 + %1, aber wir liefern nur einen.
1609        let err = p
1610            .create_contentfilteredtopic("CF", &topic, "x = %0 AND y = %1", alloc::vec!["1".into()])
1611            .unwrap_err();
1612        assert!(matches!(err, DdsError::BadParameter { .. }));
1613    }
1614
1615    #[test]
1616    fn cft_set_filter_parameters_validates_count() {
1617        let p = mk_p(0);
1618        let topic = p
1619            .create_topic::<RawBytes>("T", TopicQos::default())
1620            .unwrap();
1621        let cft = p
1622            .create_contentfilteredtopic(
1623                "CF",
1624                &topic,
1625                "x = %0 AND y = %1",
1626                alloc::vec!["1".into(), "2".into()],
1627            )
1628            .unwrap();
1629        let err = cft
1630            .set_filter_parameters(alloc::vec!["1".into()])
1631            .unwrap_err();
1632        assert!(matches!(err, DdsError::BadParameter { .. }));
1633    }
1634
1635    #[test]
1636    fn cft_filter_with_string_param() {
1637        let p = mk_p(0);
1638        let topic = p
1639            .create_topic::<RawBytes>("T", TopicQos::default())
1640            .unwrap();
1641        // Strings: Caller liefert sie ohne ''-Anfuehrungszeichen
1642        // (Spec-Beispiele); die Wert-Konvertierung interpretiert sie
1643        // als String-Default.
1644        let cft = p
1645            .create_contentfilteredtopic("CF", &topic, "name LIKE %0", alloc::vec!["foo%".into()])
1646            .unwrap();
1647        let r = row(&[("name", Value::String("foobar".into()))]);
1648        assert_eq!(cft.evaluate(&r), Ok(true));
1649    }
1650
1651    #[test]
1652    fn cft_filter_with_or_and_combination() {
1653        let p = mk_p(0);
1654        let topic = p
1655            .create_topic::<RawBytes>("T", TopicQos::default())
1656            .unwrap();
1657        let cft = p
1658            .create_contentfilteredtopic(
1659                "CF",
1660                &topic,
1661                "(x > 10 AND x < 100) OR color = 'RED'",
1662                alloc::vec::Vec::new(),
1663            )
1664            .unwrap();
1665        // x in range, color irrelevant.
1666        let r1 = row(&[
1667            ("x", Value::Int(50)),
1668            ("color", Value::String("BLUE".into())),
1669        ]);
1670        assert_eq!(cft.evaluate(&r1), Ok(true));
1671        // x out, color RED.
1672        let r2 = row(&[("x", Value::Int(5)), ("color", Value::String("RED".into()))]);
1673        assert_eq!(cft.evaluate(&r2), Ok(true));
1674        // both fail.
1675        let r3 = row(&[
1676            ("x", Value::Int(5)),
1677            ("color", Value::String("BLUE".into())),
1678        ]);
1679        assert_eq!(cft.evaluate(&r3), Ok(false));
1680    }
1681
1682    #[test]
1683    fn cft_unknown_field_returns_bad_parameter() {
1684        let p = mk_p(0);
1685        let topic = p
1686            .create_topic::<RawBytes>("T", TopicQos::default())
1687            .unwrap();
1688        let cft = p
1689            .create_contentfilteredtopic("CF", &topic, "missing = 1", alloc::vec::Vec::new())
1690            .unwrap();
1691        let r = row(&[("x", Value::Int(1))]);
1692        let err = cft.evaluate(&r).unwrap_err();
1693        assert!(matches!(err, DdsError::BadParameter { .. }));
1694    }
1695
1696    #[test]
1697    fn cft_clone_shares_params() {
1698        let p = mk_p(0);
1699        let topic = p
1700            .create_topic::<RawBytes>("T", TopicQos::default())
1701            .unwrap();
1702        let cft = p
1703            .create_contentfilteredtopic("CF", &topic, "color = %0", alloc::vec!["RED".into()])
1704            .unwrap();
1705        let cft2 = cft.clone();
1706        // Update via cft → sichtbar in cft2 (Arc<RwLock>-shared).
1707        cft.set_filter_parameters(alloc::vec!["BLUE".into()])
1708            .unwrap();
1709        assert_eq!(
1710            cft2.get_filter_parameters(),
1711            alloc::vec!["BLUE".to_string()]
1712        );
1713    }
1714
1715    #[test]
1716    fn param_string_to_value_heuristics() {
1717        assert_eq!(super::param_string_to_value("42"), Value::Int(42));
1718        assert_eq!(super::param_string_to_value("2.5"), Value::Float(2.5));
1719        assert_eq!(super::param_string_to_value("TRUE"), Value::Bool(true));
1720        assert_eq!(super::param_string_to_value("False"), Value::Bool(false));
1721        assert_eq!(
1722            super::param_string_to_value("'hello'"),
1723            Value::String("hello".into())
1724        );
1725        assert_eq!(
1726            super::param_string_to_value("plain"),
1727            Value::String("plain".into())
1728        );
1729    }
1730}