Skip to main content

zerodds_routing_service/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! The router orchestrator: turns a [`RouterConfig`] into running
5//! [`ForwardingSession`]s over a pool of participants (one input + one output
6//! participant per domain), with a participant-level loop guard and per-route
7//! metrics.
8
9use std::collections::BTreeMap;
10use std::sync::Arc;
11
12use zerodds_dcps::runtime::{UserReaderConfig, UserWriterConfig};
13use zerodds_dcps::{DomainParticipant, DomainParticipantFactory, DomainParticipantQos};
14use zerodds_qos::{DeadlineQosPolicy, LifespanQosPolicy, LivelinessKind, LivelinessQosPolicy};
15
16use crate::config::{Endpoint, Ownership, Route, RouterConfig};
17use crate::error::{Result, RoutingError};
18use crate::forwarding::{ForwardingSession, SampleProcessor, SessionParts};
19use crate::metrics::{RouteMetrics, RouteMetricsSnapshot};
20use crate::transform::TypeRegistry;
21
22/// A running router instance.
23pub struct Router {
24    name: String,
25    /// One input participant per domain.
26    inputs: BTreeMap<i32, DomainParticipant>,
27    /// One output participant per domain.
28    outputs: BTreeMap<i32, DomainParticipant>,
29    /// Running forward pumps, keyed by route name.
30    sessions: BTreeMap<String, ForwardingSession>,
31    /// Per-route metrics handles, keyed by route name.
32    metrics: BTreeMap<String, RouteMetrics>,
33    /// Type shapes for content filtering / transformation (empty = byte-only).
34    shapes: TypeRegistry,
35}
36
37impl Router {
38    /// Builds and starts a router from `config` (byte-only — no type shapes).
39    ///
40    /// # Errors
41    /// [`RoutingError`] on validation failure, an unresolved type name, an
42    /// unsupported option, or a DDS runtime error.
43    pub fn start(config: &RouterConfig) -> Result<Self> {
44        Self::start_with_types(config, TypeRegistry::new())
45    }
46
47    /// Like [`start`](Self::start) but with a [`TypeRegistry`] supplying the
48    /// type shapes content filtering and field transformation need. Routes
49    /// without a filter/transform forward bytes verbatim and ignore the
50    /// registry; routes with a filter/transform require their input and output
51    /// types to be present.
52    ///
53    /// # Errors
54    /// [`RoutingError`] as for [`start`](Self::start), plus a missing type
55    /// shape for a filtering/transforming route.
56    pub fn start_with_types(config: &RouterConfig, shapes: TypeRegistry) -> Result<Self> {
57        config.validate()?;
58        // Reject unsupported options before creating any DDS state.
59        for route in &config.routes {
60            if route.preserve_source_timestamp {
61                // Honest constraint, not a silent no-op: the user byte path does
62                // not surface the source timestamp, so the router cannot
63                // reproduce it. See the crate docs §Limitations.
64                return Err(RoutingError::Config(format!(
65                    "route '{}': preserve_source_timestamp is not yet supported (the runtime \
66                     user byte-path does not propagate the source timestamp); remove the flag or \
67                     wire source-timestamp support into UserSample first",
68                    route.name
69                )));
70            }
71        }
72        let factory = DomainParticipantFactory::instance();
73        let mut router = Self {
74            name: config.name.clone(),
75            inputs: BTreeMap::new(),
76            outputs: BTreeMap::new(),
77            sessions: BTreeMap::new(),
78            metrics: BTreeMap::new(),
79            shapes,
80        };
81        // Phase 1 — create every participant first, so their handles exist.
82        for route in &config.routes {
83            Self::participant(&mut router.inputs, factory, route.input.domain)?;
84            Self::participant(&mut router.outputs, factory, route.output.domain)?;
85        }
86        // Phase 2 — install the loop guard BEFORE any endpoint is created, so
87        // the mutual `ignore_participant` is in effect at match time. Adding it
88        // after endpoints already discovered each other in-process would not
89        // tear an existing match down (the ignore filter is consulted only at
90        // discovery/match time).
91        router.wire_loop_guard(config);
92        // Phase 3 — now wire readers/writers and start the pumps.
93        for route in &config.routes {
94            router.add_route(route)?;
95        }
96        Ok(router)
97    }
98
99    /// Router instance name.
100    #[must_use]
101    pub fn name(&self) -> &str {
102        &self.name
103    }
104
105    /// Names of the active routes.
106    #[must_use]
107    pub fn route_names(&self) -> Vec<String> {
108        self.sessions.keys().cloned().collect()
109    }
110
111    /// Metrics snapshot for a route, or `None` if no such route.
112    #[must_use]
113    pub fn route_metrics(&self, route: &str) -> Option<RouteMetricsSnapshot> {
114        self.metrics.get(route).map(RouteMetrics::snapshot)
115    }
116
117    /// Stops all pumps and tears the router down.
118    pub fn shutdown(&mut self) {
119        for (_, mut session) in core::mem::take(&mut self.sessions) {
120            session.stop();
121        }
122    }
123
124    // -- internals ---------------------------------------------------------
125
126    fn participant(
127        map: &mut BTreeMap<i32, DomainParticipant>,
128        factory: &DomainParticipantFactory,
129        domain: i32,
130    ) -> Result<()> {
131        if let std::collections::btree_map::Entry::Vacant(slot) = map.entry(domain) {
132            let p = factory
133                .create_participant(domain, DomainParticipantQos::default())
134                .map_err(|e| RoutingError::Dds(format!("create_participant({domain}): {e:?}")))?;
135            slot.insert(p);
136        }
137        Ok(())
138    }
139
140    fn add_route(&mut self, route: &Route) -> Result<()> {
141        let in_type = resolve_type(&route.input, &route.output, &route.name)?;
142        let out_type = if route.output.type_name == "*" {
143            in_type.clone()
144        } else {
145            route.output.type_name.clone()
146        };
147
148        // Participants were created in phase 1 (so the loop guard could be set
149        // up before any endpoint exists); here we only wire the endpoints.
150        let in_rt = self
151            .inputs
152            .get(&route.input.domain)
153            .and_then(DomainParticipant::runtime)
154            .cloned()
155            .ok_or(RoutingError::Internal("input runtime"))?;
156        let out_rt = self
157            .outputs
158            .get(&route.output.domain)
159            .and_then(DomainParticipant::runtime)
160            .cloned()
161            .ok_or(RoutingError::Internal("output runtime"))?;
162
163        let (_reader_eid, rx) = in_rt
164            .register_user_reader_kind(user_reader_cfg(&route.input, &in_type), route.input.keyed)
165            .map_err(|e| RoutingError::Dds(format!("register input reader: {e:?}")))?;
166
167        let writer_eid = out_rt
168            .register_user_writer_kind(
169                user_writer_cfg(&route.output, &out_type),
170                route.output.keyed,
171            )
172            .map_err(|e| RoutingError::Dds(format!("register output writer: {e:?}")))?;
173
174        let metrics = RouteMetrics::new(&route.name);
175        self.metrics.insert(route.name.clone(), metrics.clone());
176
177        let processor: Option<Box<dyn SampleProcessor>> =
178            crate::transform::build_processor_with(route, &in_type, &out_type, &self.shapes)?;
179
180        let session = ForwardingSession::start(SessionParts {
181            route_name: route.name.clone(),
182            rx,
183            output_rt: out_rt,
184            writer_eid,
185            keyed: route.output.keyed,
186            processor,
187            metrics: Arc::new(metrics),
188        })?;
189        self.sessions.insert(route.name.clone(), session);
190        Ok(())
191    }
192
193    /// Participant-level loop guard: on every domain present as both an input
194    /// and an output domain, make the router's input participant and output
195    /// participant mutually invisible. So no router output (from any route) is
196    /// ever ingested by any router input on the same domain — preventing
197    /// router-internal loops (bidirectional bridges, multi-route cycles) while
198    /// application writers stay visible. Cross-*process* router loops need a
199    /// discovery discriminator and are out of scope (see crate docs).
200    fn wire_loop_guard(&self, config: &RouterConfig) {
201        if !config.routes.iter().any(|r| r.loop_guard) {
202            return;
203        }
204        let shared: Vec<i32> = self
205            .inputs
206            .keys()
207            .filter(|d| self.outputs.contains_key(d))
208            .copied()
209            .collect();
210        for d in shared {
211            if let (Some(i), Some(o)) = (self.inputs.get(&d), self.outputs.get(&d)) {
212                let _ = i.ignore_participant(o.participant_handle());
213                let _ = o.ignore_participant(i.participant_handle());
214            }
215        }
216    }
217}
218
219impl Drop for Router {
220    fn drop(&mut self) {
221        self.shutdown();
222    }
223}
224
225fn resolve_type(input: &Endpoint, output: &Endpoint, route: &str) -> Result<String> {
226    if input.type_name != "*" {
227        return Ok(input.type_name.clone());
228    }
229    if output.type_name != "*" {
230        return Ok(output.type_name.clone());
231    }
232    Err(RoutingError::Config(format!(
233        "route '{route}': a concrete type_name is required on the input (or output) endpoint \
234         — RTPS matches readers/writers by topic + type name, there is no wildcard type match"
235    )))
236}
237
238/// Builds the input reader config (accepts both XCDR1 and XCDR2 so it ingests
239/// whatever representation the source emits).
240fn user_reader_cfg(ep: &Endpoint, type_name: &str) -> UserReaderConfig {
241    UserReaderConfig {
242        topic_name: ep.topic.clone(),
243        type_name: type_name.to_string(),
244        reliable: ep.qos.reliable,
245        durability: ep.qos.durability.into(),
246        deadline: DeadlineQosPolicy::default(),
247        liveliness: LivelinessQosPolicy {
248            kind: LivelinessKind::Automatic,
249            ..Default::default()
250        },
251        ownership: ep.qos.ownership.into(),
252        partition: ep.partition.clone(),
253        user_data: Vec::new(),
254        topic_data: Vec::new(),
255        group_data: Vec::new(),
256        type_identifier: zerodds_types::TypeIdentifier::None,
257        type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
258        data_representation_offer: ep
259            .qos
260            .data_representation
261            .clone()
262            .or_else(|| Some(vec![0, 2])),
263    }
264}
265
266/// Builds the output writer config.
267fn user_writer_cfg(ep: &Endpoint, type_name: &str) -> UserWriterConfig {
268    UserWriterConfig {
269        topic_name: ep.topic.clone(),
270        type_name: type_name.to_string(),
271        reliable: ep.qos.reliable,
272        durability: ep.qos.durability.into(),
273        deadline: DeadlineQosPolicy::default(),
274        lifespan: LifespanQosPolicy::default(),
275        liveliness: LivelinessQosPolicy {
276            kind: LivelinessKind::Automatic,
277            ..Default::default()
278        },
279        ownership: ep.qos.ownership.into(),
280        ownership_strength: if ep.qos.ownership == Ownership::Exclusive {
281            ep.qos.ownership_strength
282        } else {
283            0
284        },
285        partition: ep.partition.clone(),
286        user_data: Vec::new(),
287        topic_data: Vec::new(),
288        group_data: Vec::new(),
289        type_identifier: zerodds_types::TypeIdentifier::None,
290        data_representation_offer: ep.qos.data_representation.clone(),
291    }
292}