use std::{collections::HashSet, fmt};
use cyclors::{
dds_entity_t,
qos::{HistoryKind, Qos},
DDS_LENGTH_UNLIMITED,
};
use serde::Serialize;
use zenoh::{
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
qos::CongestionControl,
sample::Locality,
};
use zenoh_ext::{PublicationCache, SessionExt};
use crate::{dds_mgt::*, qos_helpers::*, DdsPluginRuntime, KE_PREFIX_PUB_CACHE};
enum ZPublisher<'a> {
Publisher(KeyExpr<'a>),
PublicationCache(PublicationCache),
}
impl ZPublisher<'_> {
fn key_expr(&self) -> &KeyExpr<'_> {
match self {
ZPublisher::Publisher(k) => k,
ZPublisher::PublicationCache(p) => p.key_expr(),
}
}
}
#[allow(clippy::upper_case_acronyms)]
#[derive(Serialize)]
pub(crate) struct RouteDDSZenoh<'a> {
#[serde(serialize_with = "serialize_entity_guid")]
dds_reader: dds_entity_t,
topic_name: String,
topic_type: String,
keyless: bool,
#[serde(skip)]
zenoh_publisher: ZPublisher<'a>,
remote_routed_readers: HashSet<OwnedKeyExpr>,
local_routed_writers: HashSet<String>,
}
impl Drop for RouteDDSZenoh<'_> {
fn drop(&mut self) {
if let Err(e) = delete_dds_entity(self.dds_reader) {
tracing::warn!("{}: error deleting DDS Reader: {}", self, e);
}
}
}
impl fmt::Display for RouteDDSZenoh<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Route DDS->Zenoh ({} -> {})",
self.topic_name,
self.zenoh_publisher.key_expr()
)
}
}
impl RouteDDSZenoh<'_> {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new<'a>(
plugin: &DdsPluginRuntime<'a>,
topic_name: String,
topic_type: String,
type_info: &Option<TypeInfo>,
keyless: bool,
reader_qos: Qos,
ke: OwnedKeyExpr,
congestion_ctrl: CongestionControl,
) -> Result<RouteDDSZenoh<'a>, String> {
tracing::debug!(
"Route DDS->Zenoh ({} -> {}): creation with topic_type={}",
topic_name,
ke,
topic_type
);
let declared_ke = plugin
.zsession
.declare_keyexpr(ke.clone())
.await
.map_err(|e| {
format!("Route Zenoh->DDS ({topic_name} -> {ke}): failed to declare KeyExpr: {e}")
})?;
let zenoh_publisher: ZPublisher<'a> = if is_transient_local(&reader_qos) {
#[allow(non_upper_case_globals)]
let history_qos = get_history_or_default(&reader_qos);
let durability_service_qos = get_durability_service_or_default(&reader_qos);
let history = match (history_qos.kind, history_qos.depth) {
(HistoryKind::KEEP_LAST, n) => {
if keyless {
n as usize
} else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED {
usize::MAX
} else if durability_service_qos.max_instances > 0 {
if let Some(m) = n.checked_mul(durability_service_qos.max_instances) {
m as usize
} else {
usize::MAX
}
} else {
n as usize
}
}
(HistoryKind::KEEP_ALL, _) => usize::MAX,
};
tracing::debug!(
"Caching publications for TRANSIENT_LOCAL Writer on resource {} with history {} (Writer uses {:?} and DurabilityService.max_instances={})",
ke, history, reader_qos.history, durability_service_qos.max_instances
);
let pub_cache = plugin
.zsession
.declare_publication_cache(&declared_ke)
.history(history)
.queryable_suffix(*KE_PREFIX_PUB_CACHE / &plugin.zsession.zid().into_keyexpr())
.queryable_allowed_origin(Locality::Remote) .await
.map_err(|e| {
format!("Failed create PublicationCache for key {ke} (rid={declared_ke}): {e}")
})?;
ZPublisher::PublicationCache(pub_cache)
} else {
if let Err(e) = plugin
.zsession
.declare_publisher(declared_ke.clone())
.reliability(zenoh::qos::Reliability::Reliable)
.await
{
tracing::warn!(
"Failed to declare publisher for key {} (rid={}): {}",
ke,
declared_ke,
e
);
}
ZPublisher::Publisher(declared_ke.clone())
};
let read_period = plugin.get_read_period(&ke);
let dds_reader = create_forwarding_dds_reader(
plugin.dp,
topic_name.clone(),
topic_type.clone(),
type_info,
keyless,
reader_qos,
declared_ke,
plugin.zsession.clone(),
read_period,
congestion_ctrl,
)?;
Ok(RouteDDSZenoh {
dds_reader,
topic_name,
topic_type,
keyless,
zenoh_publisher,
remote_routed_readers: HashSet::new(),
local_routed_writers: HashSet::new(),
})
}
pub(crate) fn dds_reader_guid(&self) -> Result<String, String> {
get_guid(&self.dds_reader)
}
pub(crate) fn add_remote_routed_reader(&mut self, admin_ke: OwnedKeyExpr) {
self.remote_routed_readers.insert(admin_ke);
}
pub(crate) fn remove_remote_routed_reader(&mut self, admin_ke: &keyexpr) {
self.remote_routed_readers.remove(admin_ke);
}
pub(crate) fn remove_remote_routed_readers_containing(&mut self, sub_ke: &str) {
self.remote_routed_readers.retain(|s| !s.contains(sub_ke));
}
pub(crate) fn has_remote_routed_reader(&self) -> bool {
!self.remote_routed_readers.is_empty()
}
pub(crate) fn is_routing_remote_reader(&self, entity_key: &str) -> bool {
self.remote_routed_readers
.iter()
.any(|s| s.contains(entity_key))
}
pub(crate) fn add_local_routed_writer(&mut self, entity_key: String) {
self.local_routed_writers.insert(entity_key);
}
pub(crate) fn remove_local_routed_writer(&mut self, entity_key: &str) {
self.local_routed_writers.remove(entity_key);
}
pub(crate) fn has_local_routed_writer(&self) -> bool {
!self.local_routed_writers.is_empty()
}
}