use std::collections::BTreeMap;
use std::sync::Arc;
use zerodds_dcps::runtime::{UserReaderConfig, UserWriterConfig};
use zerodds_dcps::{DomainParticipant, DomainParticipantFactory, DomainParticipantQos};
use zerodds_qos::{DeadlineQosPolicy, LifespanQosPolicy, LivelinessKind, LivelinessQosPolicy};
use crate::config::{Endpoint, Ownership, Route, RouterConfig};
use crate::error::{Result, RoutingError};
use crate::forwarding::{ForwardingSession, SampleProcessor, SessionParts};
use crate::metrics::{RouteMetrics, RouteMetricsSnapshot};
use crate::transform::TypeRegistry;
pub struct Router {
name: String,
inputs: BTreeMap<i32, DomainParticipant>,
outputs: BTreeMap<i32, DomainParticipant>,
sessions: BTreeMap<String, ForwardingSession>,
metrics: BTreeMap<String, RouteMetrics>,
shapes: TypeRegistry,
}
impl Router {
pub fn start(config: &RouterConfig) -> Result<Self> {
Self::start_with_types(config, TypeRegistry::new())
}
pub fn start_with_types(config: &RouterConfig, shapes: TypeRegistry) -> Result<Self> {
config.validate()?;
for route in &config.routes {
if route.preserve_source_timestamp {
return Err(RoutingError::Config(format!(
"route '{}': preserve_source_timestamp is not yet supported (the runtime \
user byte-path does not propagate the source timestamp); remove the flag or \
wire source-timestamp support into UserSample first",
route.name
)));
}
}
let factory = DomainParticipantFactory::instance();
let mut router = Self {
name: config.name.clone(),
inputs: BTreeMap::new(),
outputs: BTreeMap::new(),
sessions: BTreeMap::new(),
metrics: BTreeMap::new(),
shapes,
};
for route in &config.routes {
Self::participant(&mut router.inputs, factory, route.input.domain)?;
Self::participant(&mut router.outputs, factory, route.output.domain)?;
}
router.wire_loop_guard(config);
for route in &config.routes {
router.add_route(route)?;
}
Ok(router)
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn route_names(&self) -> Vec<String> {
self.sessions.keys().cloned().collect()
}
#[must_use]
pub fn route_metrics(&self, route: &str) -> Option<RouteMetricsSnapshot> {
self.metrics.get(route).map(RouteMetrics::snapshot)
}
pub fn shutdown(&mut self) {
for (_, mut session) in core::mem::take(&mut self.sessions) {
session.stop();
}
}
fn participant(
map: &mut BTreeMap<i32, DomainParticipant>,
factory: &DomainParticipantFactory,
domain: i32,
) -> Result<()> {
if let std::collections::btree_map::Entry::Vacant(slot) = map.entry(domain) {
let p = factory
.create_participant(domain, DomainParticipantQos::default())
.map_err(|e| RoutingError::Dds(format!("create_participant({domain}): {e:?}")))?;
slot.insert(p);
}
Ok(())
}
fn add_route(&mut self, route: &Route) -> Result<()> {
let in_type = resolve_type(&route.input, &route.output, &route.name)?;
let out_type = if route.output.type_name == "*" {
in_type.clone()
} else {
route.output.type_name.clone()
};
let in_rt = self
.inputs
.get(&route.input.domain)
.and_then(DomainParticipant::runtime)
.cloned()
.ok_or(RoutingError::Internal("input runtime"))?;
let out_rt = self
.outputs
.get(&route.output.domain)
.and_then(DomainParticipant::runtime)
.cloned()
.ok_or(RoutingError::Internal("output runtime"))?;
let (_reader_eid, rx) = in_rt
.register_user_reader_kind(user_reader_cfg(&route.input, &in_type), route.input.keyed)
.map_err(|e| RoutingError::Dds(format!("register input reader: {e:?}")))?;
let writer_eid = out_rt
.register_user_writer_kind(
user_writer_cfg(&route.output, &out_type),
route.output.keyed,
)
.map_err(|e| RoutingError::Dds(format!("register output writer: {e:?}")))?;
let metrics = RouteMetrics::new(&route.name);
self.metrics.insert(route.name.clone(), metrics.clone());
let processor: Option<Box<dyn SampleProcessor>> =
crate::transform::build_processor_with(route, &in_type, &out_type, &self.shapes)?;
let session = ForwardingSession::start(SessionParts {
route_name: route.name.clone(),
rx,
output_rt: out_rt,
writer_eid,
keyed: route.output.keyed,
processor,
metrics: Arc::new(metrics),
})?;
self.sessions.insert(route.name.clone(), session);
Ok(())
}
fn wire_loop_guard(&self, config: &RouterConfig) {
if !config.routes.iter().any(|r| r.loop_guard) {
return;
}
let shared: Vec<i32> = self
.inputs
.keys()
.filter(|d| self.outputs.contains_key(d))
.copied()
.collect();
for d in shared {
if let (Some(i), Some(o)) = (self.inputs.get(&d), self.outputs.get(&d)) {
let _ = i.ignore_participant(o.participant_handle());
let _ = o.ignore_participant(i.participant_handle());
}
}
}
}
impl Drop for Router {
fn drop(&mut self) {
self.shutdown();
}
}
fn resolve_type(input: &Endpoint, output: &Endpoint, route: &str) -> Result<String> {
if input.type_name != "*" {
return Ok(input.type_name.clone());
}
if output.type_name != "*" {
return Ok(output.type_name.clone());
}
Err(RoutingError::Config(format!(
"route '{route}': a concrete type_name is required on the input (or output) endpoint \
— RTPS matches readers/writers by topic + type name, there is no wildcard type match"
)))
}
fn user_reader_cfg(ep: &Endpoint, type_name: &str) -> UserReaderConfig {
UserReaderConfig {
topic_name: ep.topic.clone(),
type_name: type_name.to_string(),
reliable: ep.qos.reliable,
durability: ep.qos.durability.into(),
deadline: DeadlineQosPolicy::default(),
liveliness: LivelinessQosPolicy {
kind: LivelinessKind::Automatic,
..Default::default()
},
ownership: ep.qos.ownership.into(),
partition: ep.partition.clone(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
data_representation_offer: ep
.qos
.data_representation
.clone()
.or_else(|| Some(vec![0, 2])),
}
}
fn user_writer_cfg(ep: &Endpoint, type_name: &str) -> UserWriterConfig {
UserWriterConfig {
topic_name: ep.topic.clone(),
type_name: type_name.to_string(),
reliable: ep.qos.reliable,
durability: ep.qos.durability.into(),
deadline: DeadlineQosPolicy::default(),
lifespan: LifespanQosPolicy::default(),
liveliness: LivelinessQosPolicy {
kind: LivelinessKind::Automatic,
..Default::default()
},
ownership: ep.qos.ownership.into(),
ownership_strength: if ep.qos.ownership == Ownership::Exclusive {
ep.qos.ownership_strength
} else {
0
},
partition: ep.partition.clone(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: ep.qos.data_representation.clone(),
}
}