1#![allow(deprecated)]
15use std::{
16 collections::HashMap,
17 env,
18 future::Future,
19 mem::ManuallyDrop,
20 sync::{
21 atomic::{AtomicBool, AtomicUsize, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26
27use async_trait::async_trait;
28use cyclors::{
29 qos::{
30 DurabilityService, History, IgnoreLocal, IgnoreLocalKind, Qos, Reliability,
31 ReliabilityKind, DDS_100MS_DURATION, DDS_1S_DURATION,
32 },
33 *,
34};
35use flume::{unbounded, Receiver, Sender};
36use futures::select;
37use route_dds_zenoh::RouteDDSZenoh;
38use serde::{ser::SerializeStruct, Serialize, Serializer};
39use serde_json::Value;
40use tokio::task::JoinHandle;
41use tracing::{debug, error, info, trace, warn};
42use zenoh::{
43 bytes::{Encoding, ZBytes},
44 handlers::FifoChannelHandler,
45 internal::{
46 plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
47 runtime::DynamicRuntime,
48 zerror, Timed, TimedEvent, Timer,
49 },
50 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
51 liveliness::LivelinessToken,
52 qos::CongestionControl,
53 query::{ConsolidationMode, Query, QueryTarget, Queryable, Selector},
54 sample::{Locality, Sample, SampleKind},
55 Result as ZResult, Session, Wait,
56};
57use zenoh_ext::{SessionExt, SubscriberBuilderExt};
58use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
59
60pub mod config;
61mod dds_mgt;
62mod qos_helpers;
63mod ros_discovery;
64mod route_dds_zenoh;
65mod route_zenoh_dds;
66use config::Config;
67use dds_mgt::*;
68
69use crate::{
70 qos_helpers::*,
71 ros_discovery::{
72 NodeEntitiesInfo, ParticipantEntitiesInfo, RosDiscoveryInfoMgr,
73 ROS_DISCOVERY_INFO_TOPIC_NAME,
74 },
75 route_zenoh_dds::RouteZenohDDS,
76};
77
78macro_rules! zenoh_id {
79 ($val:expr) => {
80 $val.key_expr().as_str().split('/').last().unwrap()
81 };
82}
83
84lazy_static::lazy_static! {
85 static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
86 static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
87 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
89 .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
90 .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
91 .enable_all()
92 .build()
93 .expect("Unable to create runtime");
94}
95#[inline(always)]
96pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
97where
98 F: Future + Send + 'static,
99 F::Output: Send + 'static,
100{
101 match tokio::runtime::Handle::try_current() {
103 Ok(rt) => {
104 rt.spawn(task)
106 }
107 Err(_) => {
108 TOKIO_RUNTIME.spawn(task)
110 }
111 }
112}
113
114lazy_static::lazy_static!(
115 static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
116
117 static ref KE_PREFIX_ADMIN_SPACE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") };
118 static ref KE_PREFIX_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("dds") };
119 static ref KE_PREFIX_ROUTE_TO_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/to_dds") };
120 static ref KE_PREFIX_ROUTE_FROM_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/from_dds") };
121 static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_pub_cache") };
122 static ref KE_PREFIX_FWD_DISCO: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_fwd_disco") };
123 static ref KE_PREFIX_LIVELINESS_GROUP: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("zenoh-plugin-dds") };
124
125 static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
126 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
127
128 static ref LOG_ROS2_DEPRECATION_WARNING_FLAG: AtomicBool = AtomicBool::new(false);
129);
130
131const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General><Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces></General></Domain></CycloneDDS>,"#;
135
136#[cfg(feature = "dds_shm")]
138const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><General><Interfaces><PubSubMessageExchange type="iox" library="psmx_iox"/></Interfaces></General></Domain></CycloneDDS>,"#;
139
140const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 500;
141
142#[cfg(feature = "dynamic_plugin")]
143zenoh_plugin_trait::declare_plugin!(DDSPlugin);
144
145fn log_ros2_deprecation_warning() {
146 if !LOG_ROS2_DEPRECATION_WARNING_FLAG.swap(true, std::sync::atomic::Ordering::Relaxed) {
147 tracing::warn!("------------------------------------------------------------------------------------------");
148 tracing::warn!(
149 "ROS 2 system detected. Did you know a new Zenoh bridge dedicated to ROS 2 exists ?"
150 );
151 tracing::warn!("Check it out on https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds");
152 tracing::warn!("This DDS bridge will eventually be deprecated for ROS 2 usage in favor of this new bridge.");
153 tracing::warn!("------------------------------------------------------------------------------------------");
154 }
155}
156
157#[allow(clippy::upper_case_acronyms)]
158pub struct DDSPlugin;
159
160impl PluginControl for DDSPlugin {}
161impl ZenohPlugin for DDSPlugin {}
162impl Plugin for DDSPlugin {
163 type StartArgs = DynamicRuntime;
164 type Instance = RunningPlugin;
165
166 const DEFAULT_NAME: &'static str = "dds";
167 const PLUGIN_VERSION: &'static str = plugin_version!();
168 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
169
170 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
171 zenoh::try_init_log_from_env();
175
176 let runtime_conf = runtime.get_config();
177 let plugin_conf = runtime_conf
178 .get_plugin_config(name)
179 .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
180 let config: Config = serde_json::from_value(plugin_conf.clone())
181 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182 WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
183 MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
184
185 spawn_runtime(run(runtime.clone(), config));
186
187 Ok(Box::new(DDSPlugin))
188 }
189}
190impl RunningPluginTrait for DDSPlugin {}
191
192pub async fn run(runtime: DynamicRuntime, config: Config) {
193 zenoh::try_init_log_from_env();
197 debug!("DDS plugin {}", DDSPlugin::PLUGIN_LONG_VERSION);
198 debug!("DDS plugin {:?}", config);
199
200 let zsession = match zenoh::session::init(runtime)
202 .aggregated_subscribers(config.generalise_subs.clone())
203 .aggregated_publishers(config.generalise_pubs.clone())
204 .await
205 {
206 Ok(session) => Arc::new(session),
207 Err(e) => {
208 tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
209 return;
210 }
211 };
212
213 let member = match zsession
214 .liveliness()
215 .declare_token(*KE_PREFIX_LIVELINESS_GROUP / &zsession.zid().into_keyexpr())
216 .await
217 {
218 Ok(member) => member,
219 Err(e) => {
220 tracing::error!(
221 "Unable to declare liveliness token for DDS plugin : {:?}",
222 e
223 );
224 return;
225 }
226 };
227
228 if config.localhost_only {
230 env::set_var(
231 "CYCLONEDDS_URI",
232 format!(
233 "{}{}",
234 CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
235 env::var("CYCLONEDDS_URI").unwrap_or_default()
236 ),
237 );
238 }
239
240 #[cfg(feature = "dds_shm")]
242 {
243 if config.shm_enabled {
244 env::set_var(
245 "CYCLONEDDS_URI",
246 format!(
247 "{}{}",
248 CYCLONEDDS_CONFIG_ENABLE_SHM,
249 env::var("CYCLONEDDS_URI").unwrap_or_default()
250 ),
251 );
252 if config.forward_discovery {
253 warn!("DDS shared memory support enabled but will not be used as forward discovery mode is active.");
254 }
255 }
256 }
257
258 debug!(
260 "Create DDS Participant with CYCLONEDDS_URI='{}'",
261 env::var("CYCLONEDDS_URI").unwrap_or_default()
262 );
263 let dp = unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
264 debug!(
265 "DDS plugin {} using DDS Participant {}",
266 zsession.zid(),
267 get_guid(&dp).unwrap()
268 );
269
270 let mut dds_plugin = DdsPluginRuntime {
271 config,
272 zsession: &zsession,
273 _member: member,
274 dp,
275 discovered_participants: HashMap::<String, DdsParticipant>::new(),
276 discovered_writers: HashMap::<String, DdsEntity>::new(),
277 discovered_readers: HashMap::<String, DdsEntity>::new(),
278 routes_from_dds: HashMap::<OwnedKeyExpr, RouteDDSZenoh>::new(),
279 routes_to_dds: HashMap::<OwnedKeyExpr, RouteZenohDDS>::new(),
280 admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
281 };
282
283 dds_plugin.run().await;
284}
285
286#[derive(Debug)]
288enum AdminRef {
289 DdsParticipant(String),
290 DdsWriterEntity(String),
291 DdsReaderEntity(String),
292 FromDdsRoute(OwnedKeyExpr),
293 ToDdsRoute(OwnedKeyExpr),
294 Config,
295 Version,
296}
297
298pub(crate) struct DdsPluginRuntime<'a> {
299 config: Config,
300 zsession: &'a Arc<Session>,
303 _member: LivelinessToken,
304 dp: dds_entity_t,
305 discovered_participants: HashMap<String, DdsParticipant>,
307 discovered_writers: HashMap<String, DdsEntity>,
308 discovered_readers: HashMap<String, DdsEntity>,
309 routes_from_dds: HashMap<OwnedKeyExpr, RouteDDSZenoh<'a>>,
311 routes_to_dds: HashMap<OwnedKeyExpr, RouteZenohDDS<'a>>,
312 admin_space: HashMap<OwnedKeyExpr, AdminRef>,
315}
316
317impl Serialize for DdsPluginRuntime<'_> {
318 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
319 where
320 S: Serializer,
321 {
322 let mut s = serializer.serialize_struct("dds", 3)?;
324 s.serialize_field("domain", &self.config.domain)?;
325 s.serialize_field("scope", &self.config.scope)?;
326 s.serialize_field(
327 "allow",
328 &self
329 .config
330 .allow
331 .as_ref()
332 .map_or_else(|| ".*".to_string(), |re| re.to_string()),
333 )?;
334 s.serialize_field(
335 "deny",
336 &self
337 .config
338 .deny
339 .as_ref()
340 .map_or_else(|| "".to_string(), |re| re.to_string()),
341 )?;
342 s.serialize_field(
343 "max-frequencies",
344 &self
345 .config
346 .max_frequencies
347 .iter()
348 .map(|(re, freq)| format!("{re}={freq}"))
349 .collect::<Vec<String>>(),
350 )?;
351 s.serialize_field("forward_discovery", &self.config.forward_discovery)?;
352 s.serialize_field(
353 "reliable_routes_blocking",
354 &self.config.reliable_routes_blocking,
355 )?;
356 s.end()
357 }
358}
359
360lazy_static::lazy_static! {
361 static ref JSON_NULL_VALUE: Value = serde_json::json!(null);
362}
363
364impl<'a> DdsPluginRuntime<'a> {
365 fn is_allowed(&self, ke: &keyexpr) -> bool {
366 if ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
367 log_ros2_deprecation_warning();
368 }
369
370 if self.config.forward_discovery && ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
371 return false;
373 }
374 match (&self.config.allow, &self.config.deny) {
375 (Some(allow), None) => allow.is_match(ke),
376 (None, Some(deny)) => !deny.is_match(ke),
377 (Some(allow), Some(deny)) => allow.is_match(ke) && !deny.is_match(ke),
378 (None, None) => true,
379 }
380 }
381
382 fn get_read_period(&self, ke: &keyexpr) -> Option<Duration> {
384 for (re, freq) in &self.config.max_frequencies {
385 if re.is_match(ke) {
386 return Some(Duration::from_secs_f32(1f32 / freq));
387 }
388 }
389 None
390 }
391
392 fn get_participant_admin_keyexpr(e: &DdsParticipant) -> OwnedKeyExpr {
393 format!("participant/{}", e.key,).try_into().unwrap()
394 }
395
396 fn get_entity_admin_keyexpr(e: &DdsEntity, is_writer: bool) -> OwnedKeyExpr {
397 format!(
398 "participant/{}/{}/{}/{}",
399 e.participant_key,
400 if is_writer { "writer" } else { "reader" },
401 e.key,
402 e.topic_name
403 )
404 .try_into()
405 .unwrap()
406 }
407
408 fn insert_dds_participant(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsParticipant) {
409 self.admin_space
411 .insert(admin_keyexpr, AdminRef::DdsParticipant(e.key.clone()));
412
413 self.discovered_participants.insert(e.key.clone(), e);
415 }
416
417 fn remove_dds_participant(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsParticipant)> {
418 if let Some(e) = self.discovered_participants.remove(dds_key) {
420 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&e);
422 self.admin_space.remove(&admin_keyexpr);
423 Some((admin_keyexpr, e))
424 } else {
425 None
426 }
427 }
428
429 fn insert_dds_writer(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
430 self.admin_space
432 .insert(admin_keyexpr, AdminRef::DdsWriterEntity(e.key.clone()));
433
434 self.discovered_writers.insert(e.key.clone(), e);
436 }
437
438 fn remove_dds_writer(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
439 if let Some(e) = self.discovered_writers.remove(dds_key) {
441 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, true);
443 self.admin_space.remove(&admin_keyexpr);
444 Some((admin_keyexpr, e))
445 } else {
446 None
447 }
448 }
449
450 fn insert_dds_reader(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
451 self.admin_space
453 .insert(admin_keyexpr, AdminRef::DdsReaderEntity(e.key.clone()));
454
455 self.discovered_readers.insert(e.key.clone(), e);
457 }
458
459 fn remove_dds_reader(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
460 if let Some(e) = self.discovered_readers.remove(dds_key) {
462 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, false);
464 self.admin_space.remove(&admin_keyexpr);
465 Some((admin_keyexpr, e))
466 } else {
467 None
468 }
469 }
470
471 fn insert_route_from_dds(&mut self, ke: OwnedKeyExpr, r: RouteDDSZenoh<'a>) {
472 let admin_ke = *KE_PREFIX_ROUTE_FROM_DDS / &ke;
474 self.admin_space
475 .insert(admin_ke, AdminRef::FromDdsRoute(ke.clone()));
476
477 self.routes_from_dds.insert(ke, r);
479 }
480
481 fn insert_route_to_dds(&mut self, ke: OwnedKeyExpr, r: RouteZenohDDS<'a>) {
482 let admin_ke: OwnedKeyExpr = *KE_PREFIX_ROUTE_TO_DDS / &ke;
484 self.admin_space
485 .insert(admin_ke, AdminRef::ToDdsRoute(ke.clone()));
486
487 self.routes_to_dds.insert(ke, r);
489 }
490
491 #[allow(clippy::too_many_arguments)]
492 async fn try_add_route_from_dds(
493 &mut self,
494 ke: OwnedKeyExpr,
495 topic_name: &str,
496 topic_type: &str,
497 type_info: &Option<TypeInfo>,
498 keyless: bool,
499 reader_qos: Qos,
500 congestion_ctrl: CongestionControl,
501 ) -> RouteStatus {
502 if !self.is_allowed(&ke) {
503 info!(
504 "Ignoring Publication for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
505 ke
506 );
507 return RouteStatus::NotAllowed;
508 }
509
510 if self.routes_from_dds.contains_key(&ke) {
511 debug!(
513 "Route from DDS to resource {} already exists -- ignoring",
514 ke
515 );
516 return RouteStatus::Routed(ke);
517 }
518
519 match RouteDDSZenoh::new(
521 self,
522 topic_name.into(),
523 topic_type.into(),
524 type_info,
525 keyless,
526 reader_qos,
527 ke.clone(),
528 congestion_ctrl,
529 )
530 .await
531 {
532 Ok(route) => {
533 info!("{}: created with topic_type={}", route, topic_type);
534 self.insert_route_from_dds(ke.clone(), route);
535 RouteStatus::Routed(ke)
536 }
537 Err(e) => {
538 error!(
539 "Route DDS->Zenoh ({} -> {}): creation failed: {}",
540 topic_name, ke, e
541 );
542 RouteStatus::CreationFailure(e)
543 }
544 }
545 }
546
547 #[allow(clippy::too_many_arguments)]
548 async fn try_add_route_to_dds(
549 &mut self,
550 ke: OwnedKeyExpr,
551 topic_name: &str,
552 topic_type: &str,
553 type_info: &Option<TypeInfo>,
554 keyless: bool,
555 is_transient: bool,
556 writer_qos: Option<Qos>,
557 ) -> RouteStatus {
558 if !self.is_allowed(&ke) {
559 info!(
560 "Ignoring Subscription for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
561 ke
562 );
563 return RouteStatus::NotAllowed;
564 }
565
566 if let Some(route) = self.routes_to_dds.get(&ke) {
567 debug!(
569 "Route from resource {} to DDS already exists -- ignoring",
570 ke
571 );
572 if let Some(qos) = writer_qos {
576 if let Err(e) = route.set_dds_writer(self.dp, type_info, qos) {
577 error!(
578 "{}: failed to set a DDS Writer after creation: {}",
579 route, e
580 );
581 return RouteStatus::CreationFailure(e);
582 }
583 }
584 return RouteStatus::Routed(ke);
585 }
586
587 match RouteZenohDDS::new(
589 self,
590 ke.clone(),
591 is_transient,
592 topic_name.into(),
593 topic_type.into(),
594 keyless,
595 )
596 .await
597 {
598 Ok(route) => {
599 if let Some(qos) = writer_qos {
601 if let Err(e) = route.set_dds_writer(self.dp, type_info, qos) {
602 error!(
603 "Route Zenoh->DDS ({} -> {}): creation failed: {}",
604 ke, topic_name, e
605 );
606 return RouteStatus::CreationFailure(e);
607 }
608 }
609
610 info!("{}: created with topic_type={}", route, topic_type);
611 self.insert_route_to_dds(ke.clone(), route);
612 RouteStatus::Routed(ke)
613 }
614 Err(e) => {
615 error!(
616 "Route Zenoh->DDS ({} -> {}): creation failed: {}",
617 ke, topic_name, e
618 );
619 RouteStatus::CreationFailure(e)
620 }
621 }
622 }
623
624 fn get_admin_value(&self, admin_ref: &AdminRef) -> Result<Option<Value>, serde_json::Error> {
625 match admin_ref {
626 AdminRef::DdsParticipant(key) => self
627 .discovered_participants
628 .get(key)
629 .map(serde_json::to_value)
630 .map(remove_null_qos_values)
631 .transpose(),
632 AdminRef::DdsReaderEntity(key) => self
633 .discovered_readers
634 .get(key)
635 .map(serde_json::to_value)
636 .map(remove_null_qos_values)
637 .transpose(),
638 AdminRef::DdsWriterEntity(key) => self
639 .discovered_writers
640 .get(key)
641 .map(serde_json::to_value)
642 .map(remove_null_qos_values)
643 .transpose(),
644 AdminRef::FromDdsRoute(zkey) => self
645 .routes_from_dds
646 .get(zkey)
647 .map(serde_json::to_value)
648 .transpose(),
649 AdminRef::ToDdsRoute(zkey) => self
650 .routes_to_dds
651 .get(zkey)
652 .map(serde_json::to_value)
653 .transpose(),
654 AdminRef::Config => Some(serde_json::to_value(self)).transpose(),
655 AdminRef::Version => Ok(Some(DDSPlugin::PLUGIN_LONG_VERSION.into())),
656 }
657 }
658
659 async fn treat_admin_query(&self, query: Query, admin_keyexpr_prefix: &keyexpr) {
660 let selector = query.selector();
661 debug!("Query on admin space: {:?}", selector);
662
663 let sub_kes = selector.key_expr().strip_prefix(admin_keyexpr_prefix);
666 if sub_kes.is_empty() {
667 error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
668 return;
669 }
670
671 let mut kvs: Vec<(KeyExpr, Value)> = Vec::with_capacity(sub_kes.len());
673 for sub_ke in sub_kes {
674 if sub_ke.contains('*') {
675 for (ke, admin_ref) in self.admin_space.iter() {
677 if sub_ke.intersects(ke) {
678 match self.get_admin_value(admin_ref) {
679 Ok(Some(v)) => kvs.push((ke.into(), v)),
680 Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
681 Err(e) => {
682 error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
683 }
684 }
685 }
686 }
687 } else {
688 if let Some(admin_ref) = self.admin_space.get(sub_ke) {
690 match self.get_admin_value(admin_ref) {
691 Ok(Some(v)) => kvs.push((sub_ke.into(), v)),
692 Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
693 Err(e) => {
694 error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
695 }
696 }
697 }
698 }
699 }
700
701 for (ke, v) in kvs.drain(..) {
703 let admin_keyexpr = admin_keyexpr_prefix / &ke;
704 match serde_json::to_vec(&v) {
705 Ok(vec_u8) => {
706 let payload = ZBytes::from(vec_u8);
707 if let Err(e) = query
708 .reply(admin_keyexpr, payload)
709 .encoding(Encoding::APPLICATION_JSON)
710 .await
711 {
712 warn!("Error replying to admin query {:?}: {}", query, e);
713 }
714 }
715 Err(e) => warn!("Error transforming JSON to admin query {:?}: {}", query, e),
716 }
717 }
718 }
719
720 async fn run(&mut self) {
721 let group_subscriber = self
722 .zsession
723 .liveliness()
724 .declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
725 .querying()
726 .with(flume::unbounded())
727 .await
728 .expect("Failed to create Liveliness Subscriber");
729
730 let (tx, dds_disco_rcv): (Sender<DiscoveryEvent>, Receiver<DiscoveryEvent>) = unbounded();
732 run_discovery(self.dp, tx);
733
734 let admin_keyexpr_prefix =
736 *KE_PREFIX_ADMIN_SPACE / &self.zsession.zid().into_keyexpr() / *KE_PREFIX_DDS;
737 let admin_keyexpr_expr = (&admin_keyexpr_prefix) / *KE_ANY_N_SEGMENT;
738 debug!("Declare admin space on {}", admin_keyexpr_expr);
739 let admin_queryable = self
740 .zsession
741 .declare_queryable(admin_keyexpr_expr)
742 .await
743 .expect("Failed to create AdminSpace queryable");
744
745 self.admin_space
747 .insert("config".try_into().unwrap(), AdminRef::Config);
748 self.admin_space
749 .insert("version".try_into().unwrap(), AdminRef::Version);
750
751 if self.config.forward_discovery {
752 self.run_fwd_discovery_mode(
753 &group_subscriber,
754 &dds_disco_rcv,
755 admin_keyexpr_prefix,
756 &admin_queryable,
757 )
758 .await;
759 } else {
760 self.run_local_discovery_mode(
761 &group_subscriber,
762 &dds_disco_rcv,
763 admin_keyexpr_prefix,
764 &admin_queryable,
765 )
766 .await;
767 }
768 }
769
770 fn topic_to_keyexpr(
771 &self,
772 topic_name: &str,
773 scope: &Option<OwnedKeyExpr>,
774 partition: Option<&str>,
775 ) -> ZResult<OwnedKeyExpr> {
776 match (scope, partition) {
778 (Some(scope), Some(part)) => scope.join(&format!("{part}/{topic_name}")),
779 (Some(scope), None) => scope.join(topic_name),
780 (None, Some(part)) => format!("{part}/{topic_name}").try_into(),
781 (None, None) => topic_name.try_into(),
782 }
783 }
784
785 async fn run_local_discovery_mode(
786 &mut self,
787 group_subscriber: &Receiver<Sample>,
788 dds_disco_rcv: &Receiver<DiscoveryEvent>,
789 admin_keyexpr_prefix: OwnedKeyExpr,
790 admin_queryable: &Queryable<FifoChannelHandler<Query>>,
791 ) {
792 debug!(r#"Run in "local discovery" mode"#);
793
794 loop {
795 select!(
796 evt = dds_disco_rcv.recv_async() => {
797 match evt.unwrap() {
798 DiscoveryEvent::DiscoveredPublication {
799 mut entity
800 } => {
801 debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
802 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
804
805 let qos = adapt_writer_qos_for_reader(&entity.qos);
806 let congestion_ctrl = match (self.config.reliable_routes_blocking, is_writer_reliable(&entity.qos.reliability)) {
808 (true, true) => CongestionControl::Block,
809 _ => CongestionControl::Drop,
810 };
811
812 if partition_is_empty(&entity.qos.partition) {
814 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
815 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
816 if let RouteStatus::Routed(ref route_key) = route_status {
817 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
818 r.add_local_routed_writer(entity.key.clone());
820 }
821 }
822 entity.routes.insert("*".to_string(), route_status);
823 } else {
824 for p in entity.qos.partition.as_deref().unwrap() {
825 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
826 let mut qos = qos.clone();
827 qos.partition = Some(vec![p.to_string()]);
828 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
829 if let RouteStatus::Routed(ref route_key) = route_status {
830 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
831 r.add_local_routed_writer(entity.key.clone());
833 }
834 }
835 entity.routes.insert(p.clone(), route_status);
836 }
837 }
838
839 self.insert_dds_writer(admin_keyexpr, entity);
841 }
842
843 DiscoveryEvent::UndiscoveredPublication {
844 key,
845 } => {
846 if let Some((_, e)) = self.remove_dds_writer(&key) {
847 debug!("Undiscovered DDS Writer {} on topic {}", key, e.topic_name);
848 let admin_space = &mut self.admin_space;
850 self.routes_from_dds.retain(|zkey, route| {
851 route.remove_local_routed_writer(&key);
852 if !route.has_local_routed_writer() {
853 info!(
854 "{}: remove it as no longer unused (no local DDS Writer left)",
855 route
856 );
857 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
858 admin_space.remove(&ke);
859 false
860 } else {
861 true
862 }
863 }
864 );
865 }
866 }
867
868 DiscoveryEvent::DiscoveredSubscription {
869 mut entity
870 } => {
871 debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
872 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
873
874 let qos = adapt_reader_qos_for_writer(&entity.qos);
875
876 if partition_is_empty(&entity.qos.partition) {
878 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
879 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
880 if let RouteStatus::Routed(ref route_key) = route_status {
881 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
882 r.add_local_routed_reader(entity.key.clone());
884 }
885 }
886 entity.routes.insert("*".to_string(), route_status);
887 } else {
888 for p in entity.qos.partition.as_deref().unwrap() {
889 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
890 let mut qos = qos.clone();
891 qos.partition = Some(vec![p.to_string()]);
892 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
893 if let RouteStatus::Routed(ref route_key) = route_status {
894 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
895 r.add_local_routed_reader(entity.key.clone());
897 }
898 }
899 entity.routes.insert(p.clone(), route_status);
900 }
901 }
902
903 self.insert_dds_reader(admin_keyexpr, entity);
905 }
906
907 DiscoveryEvent::UndiscoveredSubscription {
908 key,
909 } => {
910 if let Some((_, e)) = self.remove_dds_reader(&key) {
911 debug!("Undiscovered DDS Reader {} on topic {}", key, e.topic_name);
912 let admin_space = &mut self.admin_space;
914 self.routes_to_dds.retain(|zkey, route| {
915 route.remove_local_routed_reader(&key);
916 if !route.has_local_routed_reader() {
917 info!(
918 "{}: remove it as no longer unused (no local DDS Reader left)",
919 route
920 );
921 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
922 admin_space.remove(&ke);
923 false
924 } else {
925 true
926 }
927 }
928 );
929 }
930 }
931
932 DiscoveryEvent::DiscoveredParticipant {
933 entity,
934 } => {
935 debug!("Discovered DDS Participant {}", entity.key);
936 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
937
938 self.insert_dds_participant(admin_keyexpr, entity);
940 }
941
942 DiscoveryEvent::UndiscoveredParticipant {
943 key,
944 } => {
945 if let Some((_, _)) = self.remove_dds_participant(&key) {
946 debug!("Undiscovered DDS Participant {}", key);
947 }
948 }
949 }
950 },
951
952 group_event = group_subscriber.recv_async() => {
953 match group_event.as_ref().map(|s|s.kind()) {
954 Ok(SampleKind::Put) => {
955 let zid = zenoh_id!(group_event.as_ref().unwrap());
956 debug!("New zenoh_dds_plugin detected: {}", zid);
957 if let Ok(zenoh_id) = keyexpr::new(zid) {
958 for (zkey, route) in &mut self.routes_to_dds {
960 route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
961 }
962 } else {
963 error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
964 }
965 }
966 Ok(_) => {} Err(e) => warn!("Error receiving GroupEvent: {}", e)
968 }
969 }
970
971 get_request = admin_queryable.recv_async() => {
972 if let Ok(query) = get_request {
973 self.treat_admin_query(query, &admin_keyexpr_prefix).await;
974 } else {
975 warn!("AdminSpace queryable was closed!");
976 }
977 }
978 )
979 }
980 }
981
982 async fn run_fwd_discovery_mode(
983 &mut self,
984 group_subscriber: &Receiver<Sample>,
985 dds_disco_rcv: &Receiver<DiscoveryEvent>,
986 admin_keyexpr_prefix: OwnedKeyExpr,
987 admin_queryable: &Queryable<FifoChannelHandler<Query>>,
988 ) {
989 debug!(r#"Run in "forward discovery" mode"#);
990
991 let uuid: OwnedKeyExpr = self.zsession.zid().into();
998 let fwd_key_prefix = if let Some(scope) = &self.config.scope {
999 *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO / scope
1000 } else {
1001 *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO
1002 };
1003 let fwd_writers_key_prefix =
1004 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("writer") };
1005 let fwd_readers_key_prefix =
1006 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("reader") };
1007 let fwd_ros_discovery_key =
1008 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("ros_disco") };
1009 let fwd_declare_publication_cache_key = &fwd_key_prefix / *KE_ANY_N_SEGMENT;
1010 let fwd_discovery_subscription_key = if let Some(scope) = &self.config.scope {
1011 *KE_PREFIX_ADMIN_SPACE
1012 / *KE_ANY_1_SEGMENT
1013 / *KE_PREFIX_FWD_DISCO
1014 / scope
1015 / *KE_ANY_N_SEGMENT
1016 } else {
1017 *KE_PREFIX_ADMIN_SPACE / *KE_ANY_1_SEGMENT / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1018 };
1019
1020 let fwd_writers_key_prefix_key = self
1022 .zsession
1023 .declare_keyexpr(fwd_writers_key_prefix)
1024 .await
1025 .expect("Failed to declare key expression for Fwd Discovery of writers");
1026 let fwd_readers_key_prefix_key = self
1027 .zsession
1028 .declare_keyexpr(fwd_readers_key_prefix)
1029 .await
1030 .expect("Failed to declare key expression for Fwd Discovery of readers");
1031 let fwd_ros_discovery_key_declared = self
1032 .zsession
1033 .declare_keyexpr(&fwd_ros_discovery_key)
1034 .await
1035 .expect("Failed to declare key expression for Fwd Discovery of ros_discovery");
1036
1037 let _fwd_disco_pub_cache = self
1039 .zsession
1040 .declare_publication_cache(fwd_declare_publication_cache_key)
1041 .queryable_allowed_origin(Locality::Remote) .await
1043 .expect("Failed to declare PublicationCache for Fwd Discovery");
1044
1045 let fwd_disco_sub = self
1047 .zsession
1048 .declare_subscriber(fwd_discovery_subscription_key)
1049 .querying()
1050 .allowed_origin(Locality::Remote) .query_timeout(self.config.queries_timeout)
1052 .await
1053 .expect("Failed to declare QueryingSubscriber for Fwd Discovery");
1054
1055 let ros_disco_mgr =
1057 RosDiscoveryInfoMgr::create(self.dp).expect("Failed to create RosDiscoveryInfoMgr");
1058 let timer = Timer::default();
1059 let (tx, ros_disco_timer_rcv): (Sender<()>, Receiver<()>) = unbounded();
1060 let ros_disco_timer_event = TimedEvent::periodic(
1061 Duration::from_millis(ROS_DISCOVERY_INFO_POLL_INTERVAL_MS),
1062 ChannelEvent { tx },
1063 );
1064 timer.add_async(ros_disco_timer_event).await;
1065
1066 let mut participant_info = ParticipantEntitiesInfo::new(
1068 get_guid(&self.dp).expect("Failed to get my Participant's guid"),
1069 );
1070
1071 let scope = self.config.scope.clone();
1072 loop {
1073 select!(
1074 evt = dds_disco_rcv.recv_async() => {
1075 match evt.unwrap() {
1076 DiscoveryEvent::DiscoveredPublication {
1077 entity
1078 } => {
1079 debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1080 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
1082 let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1083 let msg = (&entity, &scope);
1084 let ser_msg = match bincode::serialize(&msg) {
1085 Ok(s) => s,
1086 Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1087 };
1088 if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1089 error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1090 }
1091
1092 self.insert_dds_writer(admin_keyexpr, entity);
1094 }
1095
1096 DiscoveryEvent::UndiscoveredPublication {
1097 key,
1098 } => {
1099 debug!("Undiscovered DDS Writer {} => advertise it", key);
1100 if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
1101 let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1102 if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1104 error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1105 }
1106 }
1107 }
1108
1109 DiscoveryEvent::DiscoveredSubscription {
1110 mut entity
1111 } => {
1112 debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1113
1114 if partition_is_empty(&entity.qos.partition) {
1117 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
1118 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&entity.qos), None).await;
1119 if let RouteStatus::Routed(ref route_key) = route_status {
1120 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1121 r.add_local_routed_reader(entity.key.clone());
1123 }
1124 }
1125 entity.routes.insert("*".to_string(), route_status);
1126 } else {
1127 for p in entity.qos.partition.as_deref().unwrap() {
1128 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
1129 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&entity.qos), None).await;
1130 if let RouteStatus::Routed(ref route_key) = route_status {
1131 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1132 r.add_local_routed_reader(entity.key.clone());
1134 }
1135 }
1136 entity.routes.insert(p.clone(), route_status);
1137 }
1138 }
1139
1140 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
1142 let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1143 let msg = (&entity, &scope);
1144 let ser_msg = match bincode::serialize(&msg) {
1145 Ok(s) => s,
1146 Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1147 };
1148 if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1149 error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1150 }
1151
1152 self.insert_dds_reader(admin_keyexpr, entity);
1154 }
1155
1156 DiscoveryEvent::UndiscoveredSubscription {
1157 key,
1158 } => {
1159 debug!("Undiscovered DDS Reader {} => advertise it", key);
1160 if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
1161 let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1162 if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1164 error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1165 }
1166 }
1167 let admin_space = &mut self.admin_space;
1170 self.routes_to_dds.retain(|zkey, route| {
1171 route.remove_local_routed_reader(&key);
1172 if !route.has_local_routed_reader() && !route.has_remote_routed_writer(){
1173 info!(
1174 "{}: remove it as no longer unused (no local DDS Reader nor remote DDS Writer left)",
1175 route
1176 );
1177 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1178 admin_space.remove(&ke);
1179 false
1180 } else {
1181 true
1182 }
1183 }
1184 );
1185 }
1186
1187 DiscoveryEvent::DiscoveredParticipant {
1188 entity,
1189 } => {
1190 debug!("Discovered DDS Participant {}", entity.key);
1191 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
1192
1193 self.insert_dds_participant(admin_keyexpr, entity);
1195 }
1196
1197 DiscoveryEvent::UndiscoveredParticipant {
1198 key,
1199 } => {
1200 if let Some((_, _)) = self.remove_dds_participant(&key) {
1201 debug!("Undiscovered DDS Participant {}", key);
1202 }
1203 }
1204 }
1205 },
1206
1207 sample = fwd_disco_sub.recv_async() => {
1208 let sample = sample.expect("Fwd Discovery subscriber was closed!");
1209 let fwd_ke = &sample.key_expr();
1210 debug!("Received forwarded discovery message on {}", fwd_ke);
1211
1212 if let Some((remote_uuid, disco_kind, remaining_ke)) = Self::parse_fwd_discovery_keyexpr(fwd_ke) {
1214 match disco_kind {
1215 "writer" => {
1217 let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1219 if sample.kind() != SampleKind::Delete {
1220 let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1222 Ok(x) => x,
1223 Err(e) => {
1224 warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1225 continue;
1226 }
1227 };
1228 let qos = adapt_writer_qos_for_proxy_writer(&entity.qos);
1229
1230 if partition_is_empty(&entity.qos.partition) {
1232 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1233 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1234 if let RouteStatus::Routed(ref route_key) = route_status {
1235 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1236 r.add_remote_routed_writer(full_admin_keyexpr);
1238 for reader in self.discovered_readers.values_mut() {
1240 if reader.topic_name == entity.topic_name && partition_is_empty(&reader.qos.partition) {
1241 r.add_local_routed_reader(reader.key.clone());
1242 reader.routes.insert("*".to_string(), route_status.clone());
1243 }
1244 }
1245 }
1246 }
1247 } else {
1248 for p in entity.qos.partition.as_deref().unwrap() {
1249 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1250 let mut qos = qos.clone();
1251 qos.partition = Some(vec![p.to_string()]);
1252 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1253 if let RouteStatus::Routed(ref route_key) = route_status {
1254 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1255 r.add_remote_routed_writer(full_admin_keyexpr.clone());
1257 for reader in self.discovered_readers.values_mut() {
1259 if reader.topic_name == entity.topic_name && partition_contains(&reader.qos.partition, p) {
1260 r.add_local_routed_reader(reader.key.clone());
1261 reader.routes.insert(p.clone(), route_status.clone());
1262 }
1263 }
1264 }
1265 }
1266 }
1267 }
1268 } else {
1269 let admin_space = &mut self.admin_space;
1271 self.routes_to_dds.retain(|zkey, route| {
1272 route.remove_remote_routed_writer(&full_admin_keyexpr);
1273 if route.has_remote_routed_writer() {
1274 true
1276 } else {
1277 route.delete_dds_writer();
1280 if !route.has_local_routed_reader() {
1281 info!(
1282 "{}: remove it as no longer unused (no remote DDS Writer nor local DDS Reader left)",
1283 route
1284 );
1285 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1286 admin_space.remove(&ke);
1287 false
1288 } else {
1289 true
1290 }
1291 }
1292 }
1293 );
1294 }
1295 }
1296
1297 "reader" => {
1299 let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1301 if sample.kind() != SampleKind::Delete {
1302 let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1304 Ok(x) => x,
1305 Err(e) => {
1306 warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1307 continue;
1308 }
1309 };
1310 let qos = adapt_reader_qos_for_proxy_reader(&entity.qos);
1311
1312 let congestion_ctrl = match (self.config.reliable_routes_blocking, is_reader_reliable(&entity.qos.reliability)) {
1314 (true, true) => CongestionControl::Block,
1315 _ => CongestionControl::Drop,
1316 };
1317
1318 if partition_is_empty(&entity.qos.partition) {
1320 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1321 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1322 if let RouteStatus::Routed(ref route_key) = route_status {
1323 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1324 r.add_remote_routed_reader(full_admin_keyexpr);
1326 for writer in self.discovered_writers.values_mut() {
1328 if writer.topic_name == entity.topic_name && partition_is_empty(&writer.qos.partition) {
1329 r.add_local_routed_writer(writer.key.clone());
1330 writer.routes.insert("*".to_string(), route_status.clone());
1331 }
1332 }
1333 }
1334 }
1335 } else {
1336 for p in &entity.qos.partition.unwrap() {
1337 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1338 let mut qos = qos.clone();
1339 qos.partition = Some(vec![p.to_string()]);
1340 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1341 if let RouteStatus::Routed(ref route_key) = route_status {
1342 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1343 r.add_remote_routed_reader(full_admin_keyexpr.clone());
1345 for writer in self.discovered_writers.values_mut() {
1347 if writer.topic_name == entity.topic_name && partition_contains(&writer.qos.partition, p) {
1348 r.add_local_routed_writer(writer.key.clone());
1349 writer.routes.insert(p.clone(), route_status.clone());
1350 }
1351 }
1352 }
1353 }
1354 }
1355 }
1356 } else {
1357 let admin_space = &mut self.admin_space;
1359 self.routes_from_dds.retain(|zkey, route| {
1360 route.remove_remote_routed_reader(&full_admin_keyexpr);
1361 if !route.has_remote_routed_reader() {
1362 info!(
1363 "{}: remove it as no longer unused (no remote DDS Reader left)",
1364 route
1365 );
1366 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1367 admin_space.remove(&ke);
1368 false
1369 } else {
1370 true
1371 }
1372 }
1373 );
1374 }
1375 }
1376
1377 "ros_disco" => {
1379 match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>(
1380 sample.payload().reader(),
1381 cdr::size::Infinite,
1382 ) {
1383 Ok(mut info) => {
1384 self.remap_entities_info(&mut info.node_entities_info_seq);
1386 participant_info.update_with(info.node_entities_info_seq);
1388 debug!("Publish updated ros_discovery_info: {:?}", participant_info);
1389 if let Err(e) = ros_disco_mgr.write(&participant_info) {
1390 error!("Error forwarding ros_discovery_info: {}", e);
1391 }
1392 }
1393 Err(e) => error!(
1394 "Error receiving ParticipantEntitiesInfo on {}: {}",
1395 fwd_ke, e
1396 ),
1397 }
1398 }
1399
1400 x => {
1401 error!("Unexpected forwarded discovery message received on invalid key {} (unknown kind: {}) ", fwd_ke, x);
1402 }
1403 }
1404 }
1405 },
1406
1407 group_event = group_subscriber.recv_async() => {
1408 match group_event.as_ref().map(|s|s.kind()) {
1409 Ok(SampleKind::Put) => {
1410 let zid = zenoh_id!(group_event.as_ref().unwrap());
1411 debug!("New zenoh_dds_plugin detected: {}", zid);
1412
1413 if let Ok(zenoh_id) = keyexpr::new(zid) {
1414 let key = if let Some(scope) = &self.config.scope {
1416 *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / scope / *KE_ANY_N_SEGMENT
1417 } else {
1418 *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1419 };
1420 debug!("Query past discovery messages from {} on {}", zid, key);
1421 if let Err(e) = fwd_disco_sub.fetch( |cb| {
1422 self.zsession.get(Selector::from(&key))
1423 .callback(cb)
1424 .target(QueryTarget::All)
1425 .consolidation(ConsolidationMode::None)
1426 .timeout(self.config.queries_timeout)
1427 .wait()
1428 }).await
1429 {
1430 warn!("Query on {} for discovery messages failed: {}", key, e);
1431 }
1432 for (zkey, route) in &mut self.routes_to_dds {
1434 route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
1435 }
1436 } else {
1437 error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
1438 }
1439 }
1440 Ok(SampleKind::Delete) => {
1441 let zid = zenoh_id!(group_event.as_ref().unwrap());
1442 debug!("Remote zenoh_dds_plugin left: {}", zid);
1443 let admin_space = &mut self.admin_space;
1446 let admin_subke = format!("@/{zid}/dds/");
1447 let mut participant_info_changed = false;
1448 self.routes_to_dds.retain(|zkey, route| {
1449 route.remove_remote_routed_writers_containing(&admin_subke);
1450 if !route.has_remote_routed_writer() {
1451 info!(
1452 "{}: remove it as no longer unused (no remote DDS Writer left)",
1453 route
1454 );
1455 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1456 admin_space.remove(&ke);
1457 if let Ok(guid) = route.dds_writer_guid() {
1458 participant_info.remove_writer_gid(&guid);
1459 participant_info_changed = true;
1460 } else {
1461 warn!("Failed to get guid for Writer serving the route zenoh '{}' => DDS '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1462 }
1463 false
1464 } else {
1465 true
1466 }
1467 });
1468 self.routes_from_dds.retain(|zkey, route| {
1469 route.remove_remote_routed_readers_containing(&admin_subke);
1470 if !route.has_remote_routed_reader() {
1471 info!(
1472 "{}: remove it as no longer unused (no remote DDS Reader left)",
1473 route
1474 );
1475 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1476 admin_space.remove(&ke);
1477 if let Ok(guid) = route.dds_reader_guid() {
1478 participant_info.remove_reader_gid(&guid);
1479 participant_info_changed = true;
1480 } else {
1481 warn!("Failed to get guid for Reader serving the route DDS '{}' => zenoh '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1482 }
1483 false
1484 } else {
1485 true
1486 }
1487 });
1488 if participant_info_changed {
1489 debug!("Publishing up-to-date ros_discovery_info after leaving of plugin {}", zid);
1490 participant_info.cleanup();
1491 if let Err(e) = ros_disco_mgr.write(&participant_info) {
1492 error!("Error forwarding ros_discovery_info: {}", e);
1493 }
1494 }
1495 }
1496 Err(e) => warn!("Error receiving GroupEvent: {}", e)
1497 }
1498 }
1499
1500 get_request = admin_queryable.recv_async() => {
1501 if let Ok(query) = get_request {
1502 self.treat_admin_query(query, &admin_keyexpr_prefix).await;
1503 } else {
1504 warn!("AdminSpace queryable was closed!");
1505 }
1506 }
1507
1508 _ = ros_disco_timer_rcv.recv_async() => {
1509 let infos = ros_disco_mgr.read();
1510 for (gid, buf) in infos {
1511 trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode());
1512 let ke = &fwd_ros_discovery_key_declared / unsafe { keyexpr::from_str_unchecked(&gid) };
1514 if let Err(e) = self.zsession.put(ke, buf).wait() {
1515 error!("Forward ROS discovery info failed: {}", e);
1516 }
1517 }
1518 }
1519 )
1520 }
1521 }
1522
1523 fn parse_fwd_discovery_keyexpr(fwd_ke: &keyexpr) -> Option<(&keyexpr, &str, &keyexpr)> {
1524 if !fwd_ke.starts_with(KE_PREFIX_ADMIN_SPACE.as_str()) {
1526 return None;
1528 }
1529 let mut remaining = &fwd_ke[KE_PREFIX_ADMIN_SPACE.len() + 1..];
1530 let uuid = if let Some(i) = remaining.find('/') {
1531 let uuid = unsafe { keyexpr::from_str_unchecked(&remaining[..i]) };
1532 remaining = &remaining[i + 1..];
1533 uuid
1534 } else {
1535 error!(
1536 "Unexpected forwarded discovery message received on invalid key: {}",
1537 fwd_ke
1538 );
1539 return None;
1540 };
1541 if !remaining.starts_with(KE_PREFIX_FWD_DISCO.as_str()) {
1542 return None;
1544 }
1545 let kind = if let Some(i) = remaining.find("/reader/") {
1546 remaining = &remaining[i + 8..];
1547 "reader"
1548 } else if let Some(i) = remaining.find("/writer/") {
1549 remaining = &remaining[i + 8..];
1550 "writer"
1551 } else if let Some(i) = remaining.find("/ros_disco/") {
1552 remaining = &remaining[i + 11..];
1553 "ros_disco"
1554 } else {
1555 error!("Unexpected forwarded discovery message received on invalid key: {} (no expected kind '/reader/', '/writer/' or '/ros_disco/')", fwd_ke);
1556 return None;
1557 };
1558 Some((uuid, kind, unsafe {
1559 keyexpr::from_str_unchecked(remaining)
1560 }))
1561 }
1562
1563 fn remap_entities_info(&self, entities_info: &mut HashMap<String, NodeEntitiesInfo>) {
1564 for node in entities_info.values_mut() {
1565 let mut i = 0;
1567 while i < node.reader_gid_seq.len() {
1568 match self
1570 .routes_from_dds
1571 .values()
1572 .find(|route| route.is_routing_remote_reader(&node.reader_gid_seq[i]))
1573 {
1574 Some(route) => {
1575 if let Ok(gid) = route.dds_reader_guid() {
1577 trace!(
1578 "ros_discovery_info remap reader {} -> {}",
1579 node.reader_gid_seq[i],
1580 gid
1581 );
1582 node.reader_gid_seq[i] = gid;
1583 i += 1;
1584 } else {
1585 error!("Failed to get guid for Reader serving the a route. Can't remap in ros_discovery_info");
1586 }
1587 }
1588 None => {
1589 trace!(
1592 "ros_discovery_info remap reader {} -> NONE",
1593 node.reader_gid_seq[i]
1594 );
1595 node.reader_gid_seq.remove(i);
1596 }
1597 }
1598 }
1599 let mut i = 0;
1600 while i < node.writer_gid_seq.len() {
1601 match self
1603 .routes_to_dds
1604 .values()
1605 .find(|route| route.is_routing_remote_writer(&node.writer_gid_seq[i]))
1606 {
1607 Some(route) => {
1608 if let Ok(gid) = route.dds_writer_guid() {
1610 trace!(
1611 "ros_discovery_info remap writer {} -> {}",
1612 node.writer_gid_seq[i],
1613 gid
1614 );
1615 node.writer_gid_seq[i] = gid;
1616 i += 1;
1617 } else {
1618 error!("Failed to get guid for Writer serving the a route. Can't remap in ros_discovery_info");
1619 }
1620 }
1621 None => {
1622 trace!(
1625 "ros_discovery_info remap writer {} -> NONE",
1626 node.writer_gid_seq[i]
1627 );
1628 node.writer_gid_seq.remove(i);
1629 }
1630 }
1631 }
1632 }
1633 }
1634}
1635
1636fn remove_null_qos_values(
1638 value: Result<Value, serde_json::Error>,
1639) -> Result<Value, serde_json::Error> {
1640 match value {
1641 Ok(value) => match value {
1642 Value::Object(mut obj) => {
1643 let qos = obj.get_mut("qos");
1644 if let Some(qos) = qos {
1645 if qos.is_object() {
1646 qos.as_object_mut().unwrap().retain(|_, v| !v.is_null());
1647 }
1648 }
1649 Ok(Value::Object(obj))
1650 }
1651 _ => Ok(value),
1652 },
1653 Err(error) => Err(error),
1654 }
1655}
1656
1657fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos {
1659 let mut reader_qos = qos.clone();
1660
1661 reader_qos.durability_service = None;
1663 reader_qos.ownership_strength = None;
1664 reader_qos.transport_priority = None;
1665 reader_qos.lifespan = None;
1666 reader_qos.writer_data_lifecycle = None;
1667 reader_qos.writer_batching = None;
1668
1669 reader_qos.properties = None;
1671 reader_qos.entity_name = None;
1672 reader_qos.ignore_local = None;
1673
1674 if reader_qos.reliability.is_none() {
1676 reader_qos.reliability = Some({
1677 Reliability {
1678 kind: ReliabilityKind::BEST_EFFORT,
1679 max_blocking_time: DDS_100MS_DURATION,
1680 }
1681 });
1682 }
1683
1684 reader_qos
1685}
1686
1687fn adapt_writer_qos_for_proxy_writer(qos: &Qos) -> Qos {
1689 let mut writer_qos = qos.clone();
1690
1691 writer_qos.properties = None;
1693 writer_qos.entity_name = None;
1694
1695 writer_qos.ignore_local = Some(IgnoreLocal {
1697 kind: IgnoreLocalKind::PARTICIPANT,
1698 });
1699
1700 writer_qos
1701}
1702
1703fn adapt_reader_qos_for_writer(qos: &Qos) -> Qos {
1705 let mut writer_qos = qos.clone();
1706
1707 writer_qos.time_based_filter = None;
1709 writer_qos.reader_data_lifecycle = None;
1710 writer_qos.properties = None;
1711 writer_qos.entity_name = None;
1712
1713 writer_qos.ignore_local = Some(IgnoreLocal {
1715 kind: IgnoreLocalKind::PARTICIPANT,
1716 });
1717
1718 if is_transient_local(qos) {
1721 let history = qos
1722 .history
1723 .as_ref()
1724 .map_or(History::default(), |history| history.clone());
1725
1726 writer_qos.durability_service = Some(DurabilityService {
1727 service_cleanup_delay: 60 * DDS_1S_DURATION,
1728 history_kind: history.kind,
1729 history_depth: history.depth,
1730 max_samples: DDS_LENGTH_UNLIMITED,
1731 max_instances: DDS_LENGTH_UNLIMITED,
1732 max_samples_per_instance: DDS_LENGTH_UNLIMITED,
1733 });
1734 }
1735 writer_qos.reliability = match writer_qos.reliability {
1737 Some(mut reliability) => {
1738 reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1739 Some(reliability)
1740 }
1741 _ => {
1742 let mut reliability = Reliability {
1743 kind: ReliabilityKind::RELIABLE,
1744 max_blocking_time: DDS_100MS_DURATION,
1745 };
1746 reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1747 Some(reliability)
1748 }
1749 };
1750
1751 writer_qos
1752}
1753
1754fn adapt_reader_qos_for_proxy_reader(qos: &Qos) -> Qos {
1756 let mut reader_qos = qos.clone();
1757
1758 reader_qos.properties = None;
1760 reader_qos.entity_name = None;
1761 reader_qos.ignore_local = None;
1762
1763 reader_qos
1764}
1765
1766#[inline]
1768pub(crate) fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
1769 let mut me = ManuallyDrop::new(v);
1770 (me.as_mut_ptr(), me.len(), me.capacity())
1771}
1772
1773struct ChannelEvent {
1774 tx: Sender<()>,
1775}
1776
1777#[async_trait]
1778impl Timed for ChannelEvent {
1779 async fn run(&mut self) {
1780 if self.tx.send(()).is_err() {
1781 warn!("Error sending periodic timer notification on channel");
1782 };
1783 }
1784}