Skip to main content

zerodds_corba_ccm_lib/
dds_bridge.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! DdsBridgeComponent — bidirektionale CCM↔DDS-Bridge.
5//!
6//! Spec-Refs:
7//! * CCM 4.0 §6.4 (EventSource/EventSink Ports)
8//! * DDS 1.4 §2.2.2 (DCPS Topic/Publisher/Subscriber)
9//!
10//! Diese Component mappt:
11//!
12//! * CCM-EventSink → DDS-DataReader (publishing in DDS-Topic)
13//! * CCM-EventSource → DDS-DataWriter (forwarding from DDS-Topic)
14//!
15//! Die konkreten DCPS-Calls macht die ausgewaehlte DDS-Runtime
16//! (`zerodds-dcps`); diese Component liefert nur das Mapping-Modell und
17//! die Lifecycle-Hooks.
18
19use alloc::boxed::Box;
20use alloc::collections::BTreeMap;
21use alloc::string::String;
22use alloc::vec::Vec;
23
24use zerodds_corba_ccm::cif::{CifError, ComponentExecutor};
25use zerodds_corba_ccm::context::ComponentContext;
26
27/// Mapping-Direction: Component-Port-zu-DDS oder DDS-zu-Component-Port.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum MappingDirection {
30    /// CCM-EventSink → DDS-DataReader (Subscriber).
31    SinkSubscribesTopic,
32    /// CCM-EventSource → DDS-DataWriter (Publisher).
33    SourcePublishesTopic,
34}
35
36/// Eine konkrete Bridge-Mapping-Definition.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct TopicMapping {
39    /// CCM-Port-Name (Source oder Sink).
40    pub port_name: String,
41    /// DDS-Topic-Name.
42    pub topic_name: String,
43    /// Type-Name (vollqualifiziert).
44    pub type_name: String,
45    /// Mapping-Direction.
46    pub direction: MappingDirection,
47}
48
49/// Bridge-Component-Fehler.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum BridgeError {
52    /// Mapping mit gleichem Port-Name existiert bereits.
53    DuplicatePort(String),
54    /// Component nicht aktiviert.
55    NotActive,
56    /// Generic CCM-Error.
57    Cif(CifError),
58}
59
60impl core::fmt::Display for BridgeError {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        match self {
63            Self::DuplicatePort(s) => write!(f, "duplicate port `{s}`"),
64            Self::NotActive => f.write_str("component not active"),
65            Self::Cif(e) => write!(f, "cif: {e:?}"),
66        }
67    }
68}
69
70#[cfg(feature = "std")]
71impl std::error::Error for BridgeError {}
72
73/// DDS-Bridge-Component — production-ready CCM-Component.
74#[derive(Default)]
75pub struct DdsBridgeComponent {
76    mappings: BTreeMap<String, TopicMapping>,
77    activated: bool,
78    ctx: Option<Box<dyn ComponentContext>>,
79}
80
81impl core::fmt::Debug for DdsBridgeComponent {
82    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
83        f.debug_struct("DdsBridgeComponent")
84            .field("mappings", &self.mappings)
85            .field("activated", &self.activated)
86            .field("has_context", &self.ctx.is_some())
87            .finish()
88    }
89}
90
91impl DdsBridgeComponent {
92    /// Konstruktor.
93    #[must_use]
94    pub fn new() -> Self {
95        Self::default()
96    }
97
98    /// Fuegt ein Topic-Mapping hinzu.
99    ///
100    /// # Errors
101    /// `DuplicatePort` wenn ein Mapping mit demselben Port-Name
102    /// bereits registriert ist.
103    pub fn add_mapping(&mut self, m: TopicMapping) -> Result<(), BridgeError> {
104        if self.mappings.contains_key(&m.port_name) {
105            return Err(BridgeError::DuplicatePort(m.port_name));
106        }
107        self.mappings.insert(m.port_name.clone(), m);
108        Ok(())
109    }
110
111    /// Liste aller Mappings.
112    #[must_use]
113    pub fn mappings(&self) -> Vec<&TopicMapping> {
114        self.mappings.values().collect()
115    }
116
117    /// Anzahl Subscriber-Mappings.
118    #[must_use]
119    pub fn subscriber_count(&self) -> usize {
120        self.mappings
121            .values()
122            .filter(|m| matches!(m.direction, MappingDirection::SinkSubscribesTopic))
123            .count()
124    }
125
126    /// Anzahl Publisher-Mappings.
127    #[must_use]
128    pub fn publisher_count(&self) -> usize {
129        self.mappings
130            .values()
131            .filter(|m| matches!(m.direction, MappingDirection::SourcePublishesTopic))
132            .count()
133    }
134
135    /// Liefert `true` wenn der Container `ccm_activate` aufgerufen
136    /// hat.
137    #[must_use]
138    pub fn is_active(&self) -> bool {
139        self.activated
140    }
141}
142
143impl ComponentExecutor for DdsBridgeComponent {
144    fn set_context(&mut self, context: Box<dyn ComponentContext>) {
145        self.ctx = Some(context);
146    }
147
148    fn ccm_activate(&mut self) -> Result<(), CifError> {
149        self.activated = true;
150        Ok(())
151    }
152
153    fn ccm_passivate(&mut self) -> Result<(), CifError> {
154        self.activated = false;
155        Ok(())
156    }
157
158    fn ccm_remove(&mut self) -> Result<(), CifError> {
159        self.activated = false;
160        self.mappings.clear();
161        Ok(())
162    }
163}
164
165#[cfg(test)]
166#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
167mod tests {
168    use super::*;
169
170    fn sub_mapping(name: &str) -> TopicMapping {
171        TopicMapping {
172            port_name: name.into(),
173            topic_name: alloc::format!("topic_{name}"),
174            type_name: "demo::Trade".into(),
175            direction: MappingDirection::SinkSubscribesTopic,
176        }
177    }
178
179    fn pub_mapping(name: &str) -> TopicMapping {
180        TopicMapping {
181            port_name: name.into(),
182            topic_name: alloc::format!("topic_{name}"),
183            type_name: "demo::Quote".into(),
184            direction: MappingDirection::SourcePublishesTopic,
185        }
186    }
187
188    struct AnonContext;
189    impl ComponentContext for AnonContext {
190        fn get_caller_principal(&self) -> Option<Vec<u8>> {
191            None
192        }
193    }
194
195    #[test]
196    fn fresh_component_is_inactive() {
197        let c = DdsBridgeComponent::new();
198        assert!(!c.is_active());
199        assert_eq!(c.mappings().len(), 0);
200    }
201
202    #[test]
203    fn activate_makes_active() {
204        let mut c = DdsBridgeComponent::new();
205        c.set_context(Box::new(AnonContext));
206        c.ccm_activate().unwrap();
207        assert!(c.is_active());
208    }
209
210    #[test]
211    fn passivate_clears_active_flag() {
212        let mut c = DdsBridgeComponent::new();
213        c.ccm_activate().unwrap();
214        c.ccm_passivate().unwrap();
215        assert!(!c.is_active());
216    }
217
218    #[test]
219    fn add_mapping_round_trip() {
220        let mut c = DdsBridgeComponent::new();
221        c.add_mapping(sub_mapping("a")).unwrap();
222        c.add_mapping(pub_mapping("b")).unwrap();
223        assert_eq!(c.mappings().len(), 2);
224        assert_eq!(c.subscriber_count(), 1);
225        assert_eq!(c.publisher_count(), 1);
226    }
227
228    #[test]
229    fn duplicate_port_rejected() {
230        let mut c = DdsBridgeComponent::new();
231        c.add_mapping(sub_mapping("p")).unwrap();
232        let err = c.add_mapping(pub_mapping("p")).unwrap_err();
233        assert!(matches!(err, BridgeError::DuplicatePort(_)));
234    }
235
236    #[test]
237    fn remove_clears_mappings_and_state() {
238        let mut c = DdsBridgeComponent::new();
239        c.add_mapping(sub_mapping("a")).unwrap();
240        c.ccm_activate().unwrap();
241        c.ccm_remove().unwrap();
242        assert!(!c.is_active());
243        assert_eq!(c.mappings().len(), 0);
244    }
245
246    #[test]
247    fn mapping_direction_round_trip() {
248        let s = sub_mapping("s");
249        let p = pub_mapping("p");
250        assert_eq!(s.direction, MappingDirection::SinkSubscribesTopic);
251        assert_eq!(p.direction, MappingDirection::SourcePublishesTopic);
252        assert_ne!(s.direction, p.direction);
253    }
254}