use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;
use zerodds_dcps::DomainParticipant;
use crate::mapping::TopicMap;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum BridgeError {
#[error("zenoh session failed: {0}")]
ZenohSession(String),
#[error("dds topic missing: {0}")]
DdsTopicMissing(String),
#[error("tokio runtime: {0}")]
Tokio(String),
}
pub struct ZenohBridgeBuilder {
map: TopicMap,
zenoh_endpoints: Vec<String>,
prefix: String,
participant: Option<Arc<DomainParticipant>>,
}
impl ZenohBridgeBuilder {
#[must_use]
pub fn new() -> Self {
Self {
map: TopicMap::new(),
zenoh_endpoints: Vec::new(),
prefix: crate::mapping::DEFAULT_PREFIX.into(),
participant: None,
}
}
#[must_use]
pub fn with_dcps_participant(mut self, participant: Arc<DomainParticipant>) -> Self {
self.participant = Some(participant);
self
}
#[must_use]
pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = prefix.into();
self
}
#[must_use]
pub fn endpoint(mut self, ep: impl Into<String>) -> Self {
self.zenoh_endpoints.push(ep.into());
self
}
pub fn add_topic(
&mut self,
topic: impl Into<String>,
type_name: impl Into<String>,
partition: &str,
reliability: zerodds_qos::ReliabilityKind,
durability: zerodds_qos::DurabilityKind,
) {
let topic = topic.into();
let key_expr = crate::mapping::key_expr_for_topic(&self.prefix, partition, &topic);
self.map.add(crate::mapping::TopicMapEntry {
topic,
type_name: type_name.into(),
key_expr,
reliability,
durability,
});
}
pub async fn build(self) -> Result<ZenohBridge, BridgeError> {
let mut config = zenoh::Config::default();
for ep in &self.zenoh_endpoints {
let _ = config.insert_json5("connect/endpoints", &alloc::format!("[\"{ep}\"]"));
}
let session = zenoh::open(config)
.await
.map_err(|e| BridgeError::ZenohSession(alloc::format!("{e}")))?;
Ok(ZenohBridge {
map: Arc::new(self.map),
session: Arc::new(session),
participant: self.participant,
tasks: Vec::new(),
})
}
}
impl Default for ZenohBridgeBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct ZenohBridge {
map: Arc<TopicMap>,
session: Arc<zenoh::Session>,
participant: Option<Arc<DomainParticipant>>,
tasks: Vec<tokio::task::JoinHandle<()>>,
}
impl ZenohBridge {
#[must_use]
pub fn map(&self) -> &TopicMap {
&self.map
}
#[must_use]
pub fn dcps_participant(&self) -> Option<&Arc<DomainParticipant>> {
self.participant.as_ref()
}
#[must_use]
pub fn task_count(&self) -> usize {
self.tasks.len()
}
pub async fn stop(mut self) {
for handle in self.tasks.drain(..) {
handle.abort();
}
drop(self.session);
drop(self.participant);
}
}
#[cfg(test)]
mod tests {
use super::*;
use zerodds_qos::{DurabilityKind, ReliabilityKind};
#[test]
fn builder_collects_topics() {
let mut b = ZenohBridgeBuilder::new().prefix("dds");
b.add_topic(
"Chatter",
"std_msgs::String",
"",
ReliabilityKind::Reliable,
DurabilityKind::Volatile,
);
b.add_topic(
"Sensor",
"sensor_msgs::Temperature",
"robot1",
ReliabilityKind::BestEffort,
DurabilityKind::TransientLocal,
);
assert_eq!(
crate::mapping::key_expr_for_topic("dds", "", "Chatter"),
"dds/Chatter"
);
assert_eq!(
crate::mapping::key_expr_for_topic("dds", "robot1", "Sensor"),
"dds/robot1/Sensor"
);
}
}