use std::{
collections::HashSet,
ffi::CStr,
fmt,
sync::{
atomic::{AtomicI32, Ordering},
Arc,
},
time::Duration,
};
use cyclors::{
dds_entity_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov,
ddsi_serdata_kind_SDK_DATA, ddsi_sertype, ddsrt_iovec_t,
};
use serde::{Serialize, Serializer};
use zenoh::{
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
pubsub::Subscriber,
query::{ConsolidationMode, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample},
Session, Wait,
};
use zenoh_ext::{FetchingSubscriber, SubscriberBuilderExt};
use crate::{
dds_mgt::*, qos::Qos, vec_into_raw_parts, DdsPluginRuntime, KE_ANY_1_SEGMENT,
KE_PREFIX_PUB_CACHE, LOG_PAYLOAD,
};
type AtomicDDSEntity = AtomicI32;
const DDS_ENTITY_NULL: dds_entity_t = 0;
enum ZSubscriber {
Subscriber(Subscriber<()>),
FetchingSubscriber(FetchingSubscriber<()>),
}
impl ZSubscriber {
fn key_expr(&self) -> &KeyExpr<'static> {
match self {
ZSubscriber::Subscriber(s) => s.key_expr(),
ZSubscriber::FetchingSubscriber(s) => s.key_expr(),
}
}
}
#[allow(clippy::upper_case_acronyms)]
#[derive(Serialize)]
pub(crate) struct RouteZenohDDS<'a> {
#[serde(skip)]
zenoh_session: &'a Arc<Session>,
#[serde(skip)]
zenoh_subscriber: ZSubscriber,
topic_name: String,
topic_type: String,
keyless: bool,
#[serde(serialize_with = "serialize_atomic_entity_guid")]
dds_writer: Arc<AtomicDDSEntity>,
remote_routed_writers: HashSet<OwnedKeyExpr>,
local_routed_readers: HashSet<String>,
}
impl Drop for RouteZenohDDS<'_> {
fn drop(&mut self) {
self.delete_dds_writer();
}
}
impl fmt::Display for RouteZenohDDS<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Route Zenoh->DDS ({} -> {})",
self.zenoh_subscriber.key_expr(),
self.topic_name
)
}
}
fn serialize_atomic_entity_guid<S>(entity: &Arc<AtomicDDSEntity>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match entity.load(std::sync::atomic::Ordering::Relaxed) {
DDS_ENTITY_NULL => s.serialize_str(""),
entity => serialize_entity_guid(&entity, s),
}
}
impl RouteZenohDDS<'_> {
pub(crate) async fn new<'a>(
plugin: &DdsPluginRuntime<'a>,
ke: OwnedKeyExpr,
querying_subscriber: bool,
topic_name: String,
topic_type: String,
keyless: bool,
) -> Result<RouteZenohDDS<'a>, String> {
tracing::debug!(
"Route Zenoh->DDS ({} -> {}): creation with topic_type={} querying_subscriber={}",
ke,
topic_name,
topic_type,
querying_subscriber
);
let dds_writer = Arc::new(AtomicDDSEntity::from(DDS_ENTITY_NULL));
let arc_dw = dds_writer.clone();
let ton = topic_name.clone();
let subscriber_callback = move |s: Sample| {
let dw = arc_dw.load(Ordering::Relaxed);
if dw != DDS_ENTITY_NULL {
do_route_data(s, &ton, dw);
} else {
tracing::debug!(
"Route Zenoh->DDS ({} -> {}): data arrived but no DDS Writer yet to route it... wait 3s for discovery forwarding msg",
s.key_expr(),
&ton
);
let arc_dw2 = arc_dw.clone();
let ton2 = ton.clone();
let ke = s.key_expr().clone();
tokio::task::spawn(async move {
for _ in 1..30 {
tokio::time::sleep(Duration::from_millis(100)).await;
let dw = arc_dw2.load(Ordering::Relaxed);
if dw != DDS_ENTITY_NULL {
do_route_data(s, &ton2, dw);
break;
} else {
tracing::warn!(
"Route Zenoh->DDS ({} -> {}): still no DDS Writer after 3s - drop incoming data!",
ke,
&ton2
);
}
}
});
}
};
let zenoh_subscriber = if querying_subscriber {
let query_selector: Selector = (&ke / *KE_PREFIX_PUB_CACHE / *KE_ANY_1_SEGMENT).into();
tracing::debug!(
"Route Zenoh->DDS ({} -> {}): query historical data from everybody for TRANSIENT_LOCAL Reader on {}",
ke,
topic_name,
query_selector
);
let sub = plugin
.zsession
.declare_subscriber(ke.clone())
.callback(subscriber_callback)
.allowed_origin(Locality::Remote) .querying()
.query_timeout(plugin.config.queries_timeout)
.query_selector(query_selector)
.query_accept_replies(ReplyKeyExpr::Any)
.await
.map_err(|e| {
format!(
"Route Zenoh->DDS ({ke} -> {topic_name}): failed to create FetchingSubscriber: {e}"
)
})?;
ZSubscriber::FetchingSubscriber(sub)
} else {
let sub = plugin
.zsession
.declare_subscriber(ke.clone())
.callback(subscriber_callback)
.allowed_origin(Locality::Remote) .await
.map_err(|e| {
format!(
"Route Zenoh->DDS ({ke} -> {topic_name}): failed to create Subscriber: {e}"
)
})?;
ZSubscriber::Subscriber(sub)
};
Ok(RouteZenohDDS {
zenoh_session: plugin.zsession,
zenoh_subscriber,
topic_name,
topic_type,
keyless,
dds_writer,
remote_routed_writers: HashSet::new(),
local_routed_readers: HashSet::new(),
})
}
pub(crate) fn set_dds_writer(
&self,
data_participant: dds_entity_t,
type_info: &Option<TypeInfo>,
writer_qos: Qos,
) -> Result<(), String> {
let old = self.dds_writer.load(Ordering::SeqCst);
if old == DDS_ENTITY_NULL {
tracing::debug!("{}: create DDS Writer", self);
let dw = create_forwarding_dds_writer(
data_participant,
self.topic_name.clone(),
self.topic_type.clone(),
type_info,
self.keyless,
writer_qos,
)?;
if self
.dds_writer
.compare_exchange(DDS_ENTITY_NULL, dw, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
tracing::debug!(
"{}: delete DDS Writer since another task created one concurrently",
self
);
if let Err(e) = delete_dds_entity(dw) {
tracing::warn!(
"{}: failed to delete DDS Writer created in concurrence of another task: {}",
self, e
)
}
}
}
Ok(())
}
pub(crate) fn delete_dds_writer(&self) {
let dds_entity = self
.dds_writer
.swap(DDS_ENTITY_NULL, std::sync::atomic::Ordering::Relaxed);
if dds_entity != DDS_ENTITY_NULL {
if let Err(e) = delete_dds_entity(dds_entity) {
tracing::warn!("{}: error deleting DDS Writer: {}", self, e);
}
}
}
pub(crate) async fn query_historical_publications<'a, F>(
&mut self,
selector: F,
query_timeout: Duration,
) where
F: Fn() -> Selector<'a>,
{
if let ZSubscriber::FetchingSubscriber(sub) = &mut self.zenoh_subscriber {
let s = selector();
tracing::debug!(
"Route Zenoh->DDS ({} -> {}): query historical publications from {}",
sub.key_expr(),
self.topic_name,
s
);
if let Err(e) = sub
.fetch({
let session = &self.zenoh_session;
let s = s.clone();
move |cb| {
session
.get(&s)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.accept_replies(ReplyKeyExpr::Any)
.timeout(query_timeout)
.callback(cb)
.wait()
}
})
.await
{
tracing::warn!(
"{}: query for historical publications on {} failed: {}",
self,
s,
e
);
}
}
}
pub(crate) fn dds_writer_guid(&self) -> Result<String, String> {
get_guid(&self.dds_writer.load(Ordering::Relaxed))
}
pub(crate) fn add_remote_routed_writer(&mut self, admin_ke: OwnedKeyExpr) {
self.remote_routed_writers.insert(admin_ke);
}
pub(crate) fn remove_remote_routed_writer(&mut self, admin_ke: &keyexpr) {
self.remote_routed_writers.remove(admin_ke);
}
pub(crate) fn remove_remote_routed_writers_containing(&mut self, sub_ke: &str) {
self.remote_routed_writers.retain(|s| !s.contains(sub_ke));
}
pub(crate) fn has_remote_routed_writer(&self) -> bool {
!self.remote_routed_writers.is_empty()
}
pub(crate) fn is_routing_remote_writer(&self, entity_key: &str) -> bool {
self.remote_routed_writers
.iter()
.any(|s| s.contains(entity_key))
}
pub(crate) fn add_local_routed_reader(&mut self, entity_key: String) {
self.local_routed_readers.insert(entity_key);
}
pub(crate) fn remove_local_routed_reader(&mut self, entity_key: &str) {
self.local_routed_readers.remove(entity_key);
}
pub(crate) fn has_local_routed_reader(&self) -> bool {
!self.local_routed_readers.is_empty()
}
}
fn do_route_data(s: Sample, topic_name: &str, data_writer: dds_entity_t) {
if *LOG_PAYLOAD {
tracing::trace!(
"Route Zenoh->DDS ({} -> {}): routing data - payload: {:?}",
s.key_expr(),
&topic_name,
s.payload()
);
} else {
tracing::trace!(
"Route Zenoh->DDS ({} -> {}): routing data",
s.key_expr(),
&topic_name
);
}
unsafe {
let bs = s.payload().to_bytes().to_vec();
let (ptr, len, capacity) = vec_into_raw_parts(bs);
let data_out: ddsrt_iovec_t;
#[cfg(not(target_os = "windows"))]
{
data_out = ddsrt_iovec_t {
iov_base: ptr as *mut std::ffi::c_void,
iov_len: len,
};
}
#[cfg(target_os = "windows")]
{
data_out = ddsrt_iovec_t {
iov_base: ptr as *mut std::ffi::c_void,
iov_len: len as u32,
};
}
let mut sertype_ptr: *const ddsi_sertype = std::ptr::null_mut();
let ret = dds_get_entity_sertype(data_writer, &mut sertype_ptr);
if ret < 0 {
tracing::warn!(
"Route Zenoh->DDS ({} -> {}): can't route data; sertype lookup failed ({})",
s.key_expr(),
topic_name,
CStr::from_ptr(dds_strretcode(ret))
.to_str()
.unwrap_or("unrecoverable DDS retcode")
);
return;
}
let fwdp =
ddsi_serdata_from_ser_iov(sertype_ptr, ddsi_serdata_kind_SDK_DATA, 1, &data_out, len);
dds_writecdr(data_writer, fwdp);
drop(Vec::from_raw_parts(ptr, len, capacity));
}
}