1use std::collections::BTreeMap;
10use std::sync::Arc;
11
12use zerodds_dcps::runtime::{UserReaderConfig, UserWriterConfig};
13use zerodds_dcps::{DomainParticipant, DomainParticipantFactory, DomainParticipantQos};
14use zerodds_qos::{DeadlineQosPolicy, LifespanQosPolicy, LivelinessKind, LivelinessQosPolicy};
15
16use crate::config::{Endpoint, Ownership, Route, RouterConfig};
17use crate::error::{Result, RoutingError};
18use crate::forwarding::{ForwardingSession, SampleProcessor, SessionParts};
19use crate::metrics::{RouteMetrics, RouteMetricsSnapshot};
20use crate::transform::TypeRegistry;
21
22pub struct Router {
24 name: String,
25 inputs: BTreeMap<i32, DomainParticipant>,
27 outputs: BTreeMap<i32, DomainParticipant>,
29 sessions: BTreeMap<String, ForwardingSession>,
31 metrics: BTreeMap<String, RouteMetrics>,
33 shapes: TypeRegistry,
35}
36
37impl Router {
38 pub fn start(config: &RouterConfig) -> Result<Self> {
44 Self::start_with_types(config, TypeRegistry::new())
45 }
46
47 pub fn start_with_types(config: &RouterConfig, shapes: TypeRegistry) -> Result<Self> {
57 config.validate()?;
58 for route in &config.routes {
60 if route.preserve_source_timestamp {
61 return Err(RoutingError::Config(format!(
65 "route '{}': preserve_source_timestamp is not yet supported (the runtime \
66 user byte-path does not propagate the source timestamp); remove the flag or \
67 wire source-timestamp support into UserSample first",
68 route.name
69 )));
70 }
71 }
72 let factory = DomainParticipantFactory::instance();
73 let mut router = Self {
74 name: config.name.clone(),
75 inputs: BTreeMap::new(),
76 outputs: BTreeMap::new(),
77 sessions: BTreeMap::new(),
78 metrics: BTreeMap::new(),
79 shapes,
80 };
81 for route in &config.routes {
83 Self::participant(&mut router.inputs, factory, route.input.domain)?;
84 Self::participant(&mut router.outputs, factory, route.output.domain)?;
85 }
86 router.wire_loop_guard(config);
92 for route in &config.routes {
94 router.add_route(route)?;
95 }
96 Ok(router)
97 }
98
99 #[must_use]
101 pub fn name(&self) -> &str {
102 &self.name
103 }
104
105 #[must_use]
107 pub fn route_names(&self) -> Vec<String> {
108 self.sessions.keys().cloned().collect()
109 }
110
111 #[must_use]
113 pub fn route_metrics(&self, route: &str) -> Option<RouteMetricsSnapshot> {
114 self.metrics.get(route).map(RouteMetrics::snapshot)
115 }
116
117 pub fn shutdown(&mut self) {
119 for (_, mut session) in core::mem::take(&mut self.sessions) {
120 session.stop();
121 }
122 }
123
124 fn participant(
127 map: &mut BTreeMap<i32, DomainParticipant>,
128 factory: &DomainParticipantFactory,
129 domain: i32,
130 ) -> Result<()> {
131 if let std::collections::btree_map::Entry::Vacant(slot) = map.entry(domain) {
132 let p = factory
133 .create_participant(domain, DomainParticipantQos::default())
134 .map_err(|e| RoutingError::Dds(format!("create_participant({domain}): {e:?}")))?;
135 slot.insert(p);
136 }
137 Ok(())
138 }
139
140 fn add_route(&mut self, route: &Route) -> Result<()> {
141 let in_type = resolve_type(&route.input, &route.output, &route.name)?;
142 let out_type = if route.output.type_name == "*" {
143 in_type.clone()
144 } else {
145 route.output.type_name.clone()
146 };
147
148 let in_rt = self
151 .inputs
152 .get(&route.input.domain)
153 .and_then(DomainParticipant::runtime)
154 .cloned()
155 .ok_or(RoutingError::Internal("input runtime"))?;
156 let out_rt = self
157 .outputs
158 .get(&route.output.domain)
159 .and_then(DomainParticipant::runtime)
160 .cloned()
161 .ok_or(RoutingError::Internal("output runtime"))?;
162
163 let (_reader_eid, rx) = in_rt
164 .register_user_reader_kind(user_reader_cfg(&route.input, &in_type), route.input.keyed)
165 .map_err(|e| RoutingError::Dds(format!("register input reader: {e:?}")))?;
166
167 let writer_eid = out_rt
168 .register_user_writer_kind(
169 user_writer_cfg(&route.output, &out_type),
170 route.output.keyed,
171 )
172 .map_err(|e| RoutingError::Dds(format!("register output writer: {e:?}")))?;
173
174 let metrics = RouteMetrics::new(&route.name);
175 self.metrics.insert(route.name.clone(), metrics.clone());
176
177 let processor: Option<Box<dyn SampleProcessor>> =
178 crate::transform::build_processor_with(route, &in_type, &out_type, &self.shapes)?;
179
180 let session = ForwardingSession::start(SessionParts {
181 route_name: route.name.clone(),
182 rx,
183 output_rt: out_rt,
184 writer_eid,
185 keyed: route.output.keyed,
186 processor,
187 metrics: Arc::new(metrics),
188 })?;
189 self.sessions.insert(route.name.clone(), session);
190 Ok(())
191 }
192
193 fn wire_loop_guard(&self, config: &RouterConfig) {
201 if !config.routes.iter().any(|r| r.loop_guard) {
202 return;
203 }
204 let shared: Vec<i32> = self
205 .inputs
206 .keys()
207 .filter(|d| self.outputs.contains_key(d))
208 .copied()
209 .collect();
210 for d in shared {
211 if let (Some(i), Some(o)) = (self.inputs.get(&d), self.outputs.get(&d)) {
212 let _ = i.ignore_participant(o.participant_handle());
213 let _ = o.ignore_participant(i.participant_handle());
214 }
215 }
216 }
217}
218
219impl Drop for Router {
220 fn drop(&mut self) {
221 self.shutdown();
222 }
223}
224
225fn resolve_type(input: &Endpoint, output: &Endpoint, route: &str) -> Result<String> {
226 if input.type_name != "*" {
227 return Ok(input.type_name.clone());
228 }
229 if output.type_name != "*" {
230 return Ok(output.type_name.clone());
231 }
232 Err(RoutingError::Config(format!(
233 "route '{route}': a concrete type_name is required on the input (or output) endpoint \
234 — RTPS matches readers/writers by topic + type name, there is no wildcard type match"
235 )))
236}
237
238fn user_reader_cfg(ep: &Endpoint, type_name: &str) -> UserReaderConfig {
241 UserReaderConfig {
242 topic_name: ep.topic.clone(),
243 type_name: type_name.to_string(),
244 reliable: ep.qos.reliable,
245 durability: ep.qos.durability.into(),
246 deadline: DeadlineQosPolicy::default(),
247 liveliness: LivelinessQosPolicy {
248 kind: LivelinessKind::Automatic,
249 ..Default::default()
250 },
251 ownership: ep.qos.ownership.into(),
252 partition: ep.partition.clone(),
253 user_data: Vec::new(),
254 topic_data: Vec::new(),
255 group_data: Vec::new(),
256 type_identifier: zerodds_types::TypeIdentifier::None,
257 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
258 data_representation_offer: ep
259 .qos
260 .data_representation
261 .clone()
262 .or_else(|| Some(vec![0, 2])),
263 }
264}
265
266fn user_writer_cfg(ep: &Endpoint, type_name: &str) -> UserWriterConfig {
268 UserWriterConfig {
269 topic_name: ep.topic.clone(),
270 type_name: type_name.to_string(),
271 reliable: ep.qos.reliable,
272 durability: ep.qos.durability.into(),
273 deadline: DeadlineQosPolicy::default(),
274 lifespan: LifespanQosPolicy::default(),
275 liveliness: LivelinessQosPolicy {
276 kind: LivelinessKind::Automatic,
277 ..Default::default()
278 },
279 ownership: ep.qos.ownership.into(),
280 ownership_strength: if ep.qos.ownership == Ownership::Exclusive {
281 ep.qos.ownership_strength
282 } else {
283 0
284 },
285 partition: ep.partition.clone(),
286 user_data: Vec::new(),
287 topic_data: Vec::new(),
288 group_data: Vec::new(),
289 type_identifier: zerodds_types::TypeIdentifier::None,
290 data_representation_offer: ep.qos.data_representation.clone(),
291 }
292}