1#![allow(deprecated)]
15use std::{
16 collections::HashMap,
17 env,
18 future::Future,
19 mem::ManuallyDrop,
20 sync::{
21 atomic::{AtomicBool, AtomicUsize, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26
27use async_trait::async_trait;
28use cyclors::{
29 qos::{
30 DurabilityService, History, IgnoreLocal, IgnoreLocalKind, Qos, Reliability,
31 ReliabilityKind, DDS_100MS_DURATION, DDS_1S_DURATION,
32 },
33 *,
34};
35use flume::{unbounded, Receiver, Sender};
36use futures::select;
37use route_dds_zenoh::RouteDDSZenoh;
38use serde::{ser::SerializeStruct, Serialize, Serializer};
39use serde_json::Value;
40use tokio::task::JoinHandle;
41use tracing::{debug, error, info, trace, warn};
42use zenoh::{
43 bytes::{Encoding, ZBytes},
44 handlers::FifoChannelHandler,
45 internal::{
46 plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
47 runtime::Runtime,
48 zerror, Timed, TimedEvent, Timer,
49 },
50 key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
51 liveliness::LivelinessToken,
52 qos::CongestionControl,
53 query::{ConsolidationMode, Query, QueryTarget, Queryable, Selector},
54 sample::{Locality, Sample, SampleKind},
55 Result as ZResult, Session, Wait,
56};
57use zenoh_ext::{SessionExt, SubscriberBuilderExt};
58use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
59
60pub mod config;
61mod dds_mgt;
62mod qos_helpers;
63mod ros_discovery;
64mod route_dds_zenoh;
65mod route_zenoh_dds;
66use config::Config;
67use dds_mgt::*;
68
69use crate::{
70 qos_helpers::*,
71 ros_discovery::{
72 NodeEntitiesInfo, ParticipantEntitiesInfo, RosDiscoveryInfoMgr,
73 ROS_DISCOVERY_INFO_TOPIC_NAME,
74 },
75 route_zenoh_dds::RouteZenohDDS,
76};
77
78macro_rules! zenoh_id {
79 ($val:expr) => {
80 $val.key_expr().as_str().split('/').last().unwrap()
81 };
82}
83
84lazy_static::lazy_static! {
85 static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
86 static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
87 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
89 .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
90 .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
91 .enable_all()
92 .build()
93 .expect("Unable to create runtime");
94}
95#[inline(always)]
96pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
97where
98 F: Future + Send + 'static,
99 F::Output: Send + 'static,
100{
101 match tokio::runtime::Handle::try_current() {
103 Ok(rt) => {
104 rt.spawn(task)
106 }
107 Err(_) => {
108 TOKIO_RUNTIME.spawn(task)
110 }
111 }
112}
113
114lazy_static::lazy_static!(
115 static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
116
117 static ref KE_PREFIX_ADMIN_SPACE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") };
118 static ref KE_PREFIX_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("dds") };
119 static ref KE_PREFIX_ROUTE_TO_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/to_dds") };
120 static ref KE_PREFIX_ROUTE_FROM_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/from_dds") };
121 static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_pub_cache") };
122 static ref KE_PREFIX_FWD_DISCO: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_fwd_disco") };
123 static ref KE_PREFIX_LIVELINESS_GROUP: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("zenoh-plugin-dds") };
124
125 static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
126 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
127
128 static ref LOG_ROS2_DEPRECATION_WARNING_FLAG: AtomicBool = AtomicBool::new(false);
129);
130
131const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General><Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces></General></Domain></CycloneDDS>,"#;
135
136#[cfg(feature = "dds_shm")]
138const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><General><Interfaces><PubSubMessageExchange name="iox" library="psmx_iox" priority="1000000"/></Interfaces></General></Domain></CycloneDDS>,"#;
139
140const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 500;
141
142#[cfg(feature = "dynamic_plugin")]
143zenoh_plugin_trait::declare_plugin!(DDSPlugin);
144
145fn log_ros2_deprecation_warning() {
146 if !LOG_ROS2_DEPRECATION_WARNING_FLAG.swap(true, std::sync::atomic::Ordering::Relaxed) {
147 tracing::warn!("------------------------------------------------------------------------------------------");
148 tracing::warn!(
149 "ROS 2 system detected. Did you know a new Zenoh bridge dedicated to ROS 2 exists ?"
150 );
151 tracing::warn!("Check it out on https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds");
152 tracing::warn!("This DDS bridge will eventually be deprecated for ROS 2 usage in favor of this new bridge.");
153 tracing::warn!("------------------------------------------------------------------------------------------");
154 }
155}
156
157#[allow(clippy::upper_case_acronyms)]
158pub struct DDSPlugin;
159
160impl PluginControl for DDSPlugin {}
161impl ZenohPlugin for DDSPlugin {}
162impl Plugin for DDSPlugin {
163 type StartArgs = Runtime;
164 type Instance = RunningPlugin;
165
166 const DEFAULT_NAME: &'static str = "dds";
167 const PLUGIN_VERSION: &'static str = plugin_version!();
168 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
169
170 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
171 zenoh::try_init_log_from_env();
175
176 let runtime_conf = runtime.config().lock();
177 let plugin_conf = runtime_conf
178 .plugin(name)
179 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
180 let config: Config = serde_json::from_value(plugin_conf.clone())
181 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182 WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
183 MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
184
185 spawn_runtime(run(runtime.clone(), config));
186
187 Ok(Box::new(DDSPlugin))
188 }
189}
190impl RunningPluginTrait for DDSPlugin {}
191
192pub async fn run(runtime: Runtime, config: Config) {
193 zenoh::try_init_log_from_env();
197 debug!("DDS plugin {}", DDSPlugin::PLUGIN_LONG_VERSION);
198 debug!("DDS plugin {:?}", config);
199
200 let zsession = match zenoh::session::init(runtime)
202 .aggregated_subscribers(config.generalise_subs.clone())
203 .aggregated_publishers(config.generalise_pubs.clone())
204 .await
205 {
206 Ok(session) => Arc::new(session),
207 Err(e) => {
208 tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
209 return;
210 }
211 };
212
213 let member = match zsession
214 .liveliness()
215 .declare_token(*KE_PREFIX_LIVELINESS_GROUP / &zsession.zid().into_keyexpr())
216 .await
217 {
218 Ok(member) => member,
219 Err(e) => {
220 tracing::error!(
221 "Unable to declare liveliness token for DDS plugin : {:?}",
222 e
223 );
224 return;
225 }
226 };
227
228 if config.localhost_only {
230 env::set_var(
231 "CYCLONEDDS_URI",
232 format!(
233 "{}{}",
234 CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
235 env::var("CYCLONEDDS_URI").unwrap_or_default()
236 ),
237 );
238 }
239
240 #[cfg(feature = "dds_shm")]
242 {
243 if config.shm_enabled {
244 env::set_var(
245 "CYCLONEDDS_URI",
246 format!(
247 "{}{}",
248 CYCLONEDDS_CONFIG_ENABLE_SHM,
249 env::var("CYCLONEDDS_URI").unwrap_or_default()
250 ),
251 );
252 if config.forward_discovery {
253 warn!("DDS shared memory support enabled but will not be used as forward discovery mode is active.");
254 }
255 }
256 }
257
258 debug!(
260 "Create DDS Participant with CYCLONEDDS_URI='{}'",
261 env::var("CYCLONEDDS_URI").unwrap_or_default()
262 );
263 let dp = unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
264 debug!(
265 "DDS plugin {} using DDS Participant {}",
266 zsession.zid(),
267 get_guid(&dp).unwrap()
268 );
269
270 let mut dds_plugin = DdsPluginRuntime {
271 config,
272 zsession: &zsession,
273 _member: member,
274 dp,
275 discovered_participants: HashMap::<String, DdsParticipant>::new(),
276 discovered_writers: HashMap::<String, DdsEntity>::new(),
277 discovered_readers: HashMap::<String, DdsEntity>::new(),
278 routes_from_dds: HashMap::<OwnedKeyExpr, RouteDDSZenoh>::new(),
279 routes_to_dds: HashMap::<OwnedKeyExpr, RouteZenohDDS>::new(),
280 admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
281 };
282
283 dds_plugin.run().await;
284}
285
286#[derive(Debug)]
288enum AdminRef {
289 DdsParticipant(String),
290 DdsWriterEntity(String),
291 DdsReaderEntity(String),
292 FromDdsRoute(OwnedKeyExpr),
293 ToDdsRoute(OwnedKeyExpr),
294 Config,
295 Version,
296}
297
298pub(crate) struct DdsPluginRuntime<'a> {
299 config: Config,
300 zsession: &'a Arc<Session>,
303 _member: LivelinessToken,
304 dp: dds_entity_t,
305 discovered_participants: HashMap<String, DdsParticipant>,
307 discovered_writers: HashMap<String, DdsEntity>,
308 discovered_readers: HashMap<String, DdsEntity>,
309 routes_from_dds: HashMap<OwnedKeyExpr, RouteDDSZenoh<'a>>,
311 routes_to_dds: HashMap<OwnedKeyExpr, RouteZenohDDS<'a>>,
312 admin_space: HashMap<OwnedKeyExpr, AdminRef>,
315}
316
317impl Serialize for DdsPluginRuntime<'_> {
318 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
319 where
320 S: Serializer,
321 {
322 let mut s = serializer.serialize_struct("dds", 3)?;
324 s.serialize_field("domain", &self.config.domain)?;
325 s.serialize_field("scope", &self.config.scope)?;
326 s.serialize_field(
327 "allow",
328 &self
329 .config
330 .allow
331 .as_ref()
332 .map_or_else(|| ".*".to_string(), |re| re.to_string()),
333 )?;
334 s.serialize_field(
335 "deny",
336 &self
337 .config
338 .deny
339 .as_ref()
340 .map_or_else(|| "".to_string(), |re| re.to_string()),
341 )?;
342 s.serialize_field(
343 "max-frequencies",
344 &self
345 .config
346 .max_frequencies
347 .iter()
348 .map(|(re, freq)| format!("{re}={freq}"))
349 .collect::<Vec<String>>(),
350 )?;
351 s.serialize_field("forward_discovery", &self.config.forward_discovery)?;
352 s.serialize_field(
353 "reliable_routes_blocking",
354 &self.config.reliable_routes_blocking,
355 )?;
356 s.end()
357 }
358}
359
360lazy_static::lazy_static! {
361 static ref JSON_NULL_VALUE: Value = serde_json::json!(null);
362}
363
364impl<'a> DdsPluginRuntime<'a> {
365 fn is_allowed(&self, ke: &keyexpr) -> bool {
366 if ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
367 log_ros2_deprecation_warning();
368 }
369
370 if self.config.forward_discovery && ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
371 return false;
373 }
374 match (&self.config.allow, &self.config.deny) {
375 (Some(allow), None) => allow.is_match(ke),
376 (None, Some(deny)) => !deny.is_match(ke),
377 (Some(allow), Some(deny)) => allow.is_match(ke) && !deny.is_match(ke),
378 (None, None) => true,
379 }
380 }
381
382 fn get_read_period(&self, ke: &keyexpr) -> Option<Duration> {
384 for (re, freq) in &self.config.max_frequencies {
385 if re.is_match(ke) {
386 return Some(Duration::from_secs_f32(1f32 / freq));
387 }
388 }
389 None
390 }
391
392 fn get_participant_admin_keyexpr(e: &DdsParticipant) -> OwnedKeyExpr {
393 format!("participant/{}", e.key,).try_into().unwrap()
394 }
395
396 fn get_entity_admin_keyexpr(e: &DdsEntity, is_writer: bool) -> OwnedKeyExpr {
397 format!(
398 "participant/{}/{}/{}/{}",
399 e.participant_key,
400 if is_writer { "writer" } else { "reader" },
401 e.key,
402 e.topic_name
403 )
404 .try_into()
405 .unwrap()
406 }
407
408 fn insert_dds_participant(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsParticipant) {
409 self.admin_space
411 .insert(admin_keyexpr, AdminRef::DdsParticipant(e.key.clone()));
412
413 self.discovered_participants.insert(e.key.clone(), e);
415 }
416
417 fn remove_dds_participant(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsParticipant)> {
418 if let Some(e) = self.discovered_participants.remove(dds_key) {
420 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&e);
422 self.admin_space.remove(&admin_keyexpr);
423 Some((admin_keyexpr, e))
424 } else {
425 None
426 }
427 }
428
429 fn insert_dds_writer(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
430 self.admin_space
432 .insert(admin_keyexpr, AdminRef::DdsWriterEntity(e.key.clone()));
433
434 self.discovered_writers.insert(e.key.clone(), e);
436 }
437
438 fn remove_dds_writer(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
439 if let Some(e) = self.discovered_writers.remove(dds_key) {
441 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, true);
443 self.admin_space.remove(&admin_keyexpr);
444 Some((admin_keyexpr, e))
445 } else {
446 None
447 }
448 }
449
450 fn insert_dds_reader(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
451 self.admin_space
453 .insert(admin_keyexpr, AdminRef::DdsReaderEntity(e.key.clone()));
454
455 self.discovered_readers.insert(e.key.clone(), e);
457 }
458
459 fn remove_dds_reader(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
460 if let Some(e) = self.discovered_readers.remove(dds_key) {
462 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, false);
464 self.admin_space.remove(&admin_keyexpr);
465 Some((admin_keyexpr, e))
466 } else {
467 None
468 }
469 }
470
471 fn insert_route_from_dds(&mut self, ke: OwnedKeyExpr, r: RouteDDSZenoh<'a>) {
472 let admin_ke = *KE_PREFIX_ROUTE_FROM_DDS / &ke;
474 self.admin_space
475 .insert(admin_ke, AdminRef::FromDdsRoute(ke.clone()));
476
477 self.routes_from_dds.insert(ke, r);
479 }
480
481 fn insert_route_to_dds(&mut self, ke: OwnedKeyExpr, r: RouteZenohDDS<'a>) {
482 let admin_ke: OwnedKeyExpr = *KE_PREFIX_ROUTE_TO_DDS / &ke;
484 self.admin_space
485 .insert(admin_ke, AdminRef::ToDdsRoute(ke.clone()));
486
487 self.routes_to_dds.insert(ke, r);
489 }
490
491 #[allow(clippy::too_many_arguments)]
492 async fn try_add_route_from_dds(
493 &mut self,
494 ke: OwnedKeyExpr,
495 topic_name: &str,
496 topic_type: &str,
497 type_info: &Option<TypeInfo>,
498 keyless: bool,
499 reader_qos: Qos,
500 congestion_ctrl: CongestionControl,
501 ) -> RouteStatus {
502 if !self.is_allowed(&ke) {
503 info!(
504 "Ignoring Publication for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
505 ke
506 );
507 return RouteStatus::NotAllowed;
508 }
509
510 if self.routes_from_dds.contains_key(&ke) {
511 debug!(
513 "Route from DDS to resource {} already exists -- ignoring",
514 ke
515 );
516 return RouteStatus::Routed(ke);
517 }
518
519 match RouteDDSZenoh::new(
521 self,
522 topic_name.into(),
523 topic_type.into(),
524 type_info,
525 keyless,
526 reader_qos,
527 ke.clone(),
528 congestion_ctrl,
529 )
530 .await
531 {
532 Ok(route) => {
533 info!("{}: created with topic_type={}", route, topic_type);
534 self.insert_route_from_dds(ke.clone(), route);
535 RouteStatus::Routed(ke)
536 }
537 Err(e) => {
538 error!(
539 "Route DDS->Zenoh ({} -> {}): creation failed: {}",
540 topic_name, ke, e
541 );
542 RouteStatus::CreationFailure(e)
543 }
544 }
545 }
546
547 async fn try_add_route_to_dds(
548 &mut self,
549 ke: OwnedKeyExpr,
550 topic_name: &str,
551 topic_type: &str,
552 keyless: bool,
553 is_transient: bool,
554 writer_qos: Option<Qos>,
555 ) -> RouteStatus {
556 if !self.is_allowed(&ke) {
557 info!(
558 "Ignoring Subscription for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
559 ke
560 );
561 return RouteStatus::NotAllowed;
562 }
563
564 if let Some(route) = self.routes_to_dds.get(&ke) {
565 debug!(
567 "Route from resource {} to DDS already exists -- ignoring",
568 ke
569 );
570 if let Some(qos) = writer_qos {
574 if let Err(e) = route.set_dds_writer(self.dp, qos) {
575 error!(
576 "{}: failed to set a DDS Writer after creation: {}",
577 route, e
578 );
579 return RouteStatus::CreationFailure(e);
580 }
581 }
582 return RouteStatus::Routed(ke);
583 }
584
585 match RouteZenohDDS::new(
587 self,
588 ke.clone(),
589 is_transient,
590 topic_name.into(),
591 topic_type.into(),
592 keyless,
593 )
594 .await
595 {
596 Ok(route) => {
597 if let Some(qos) = writer_qos {
599 if let Err(e) = route.set_dds_writer(self.dp, qos) {
600 error!(
601 "Route Zenoh->DDS ({} -> {}): creation failed: {}",
602 ke, topic_name, e
603 );
604 return RouteStatus::CreationFailure(e);
605 }
606 }
607
608 info!("{}: created with topic_type={}", route, topic_type);
609 self.insert_route_to_dds(ke.clone(), route);
610 RouteStatus::Routed(ke)
611 }
612 Err(e) => {
613 error!(
614 "Route Zenoh->DDS ({} -> {}): creation failed: {}",
615 ke, topic_name, e
616 );
617 RouteStatus::CreationFailure(e)
618 }
619 }
620 }
621
622 fn get_admin_value(&self, admin_ref: &AdminRef) -> Result<Option<Value>, serde_json::Error> {
623 match admin_ref {
624 AdminRef::DdsParticipant(key) => self
625 .discovered_participants
626 .get(key)
627 .map(serde_json::to_value)
628 .map(remove_null_qos_values)
629 .transpose(),
630 AdminRef::DdsReaderEntity(key) => self
631 .discovered_readers
632 .get(key)
633 .map(serde_json::to_value)
634 .map(remove_null_qos_values)
635 .transpose(),
636 AdminRef::DdsWriterEntity(key) => self
637 .discovered_writers
638 .get(key)
639 .map(serde_json::to_value)
640 .map(remove_null_qos_values)
641 .transpose(),
642 AdminRef::FromDdsRoute(zkey) => self
643 .routes_from_dds
644 .get(zkey)
645 .map(serde_json::to_value)
646 .transpose(),
647 AdminRef::ToDdsRoute(zkey) => self
648 .routes_to_dds
649 .get(zkey)
650 .map(serde_json::to_value)
651 .transpose(),
652 AdminRef::Config => Some(serde_json::to_value(self)).transpose(),
653 AdminRef::Version => Ok(Some(DDSPlugin::PLUGIN_LONG_VERSION.into())),
654 }
655 }
656
657 async fn treat_admin_query(&self, query: Query, admin_keyexpr_prefix: &keyexpr) {
658 let selector = query.selector();
659 debug!("Query on admin space: {:?}", selector);
660
661 let sub_kes = selector.key_expr().strip_prefix(admin_keyexpr_prefix);
664 if sub_kes.is_empty() {
665 error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
666 return;
667 }
668
669 let mut kvs: Vec<(KeyExpr, Value)> = Vec::with_capacity(sub_kes.len());
671 for sub_ke in sub_kes {
672 if sub_ke.contains('*') {
673 for (ke, admin_ref) in self.admin_space.iter() {
675 if sub_ke.intersects(ke) {
676 match self.get_admin_value(admin_ref) {
677 Ok(Some(v)) => kvs.push((ke.into(), v)),
678 Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
679 Err(e) => {
680 error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
681 }
682 }
683 }
684 }
685 } else {
686 if let Some(admin_ref) = self.admin_space.get(sub_ke) {
688 match self.get_admin_value(admin_ref) {
689 Ok(Some(v)) => kvs.push((sub_ke.into(), v)),
690 Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
691 Err(e) => {
692 error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
693 }
694 }
695 }
696 }
697 }
698
699 for (ke, v) in kvs.drain(..) {
701 let admin_keyexpr = admin_keyexpr_prefix / &ke;
702 match serde_json::to_vec(&v) {
703 Ok(vec_u8) => {
704 let payload = ZBytes::from(vec_u8);
705 if let Err(e) = query
706 .reply(admin_keyexpr, payload)
707 .encoding(Encoding::APPLICATION_JSON)
708 .await
709 {
710 warn!("Error replying to admin query {:?}: {}", query, e);
711 }
712 }
713 Err(e) => warn!("Error transforming JSON to admin query {:?}: {}", query, e),
714 }
715 }
716 }
717
718 async fn run(&mut self) {
719 let group_subscriber = self
720 .zsession
721 .liveliness()
722 .declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
723 .querying()
724 .with(flume::unbounded())
725 .await
726 .expect("Failed to create Liveliness Subscriber");
727
728 let (tx, dds_disco_rcv): (Sender<DiscoveryEvent>, Receiver<DiscoveryEvent>) = unbounded();
730 run_discovery(self.dp, tx);
731
732 let admin_keyexpr_prefix =
734 *KE_PREFIX_ADMIN_SPACE / &self.zsession.zid().into_keyexpr() / *KE_PREFIX_DDS;
735 let admin_keyexpr_expr = (&admin_keyexpr_prefix) / *KE_ANY_N_SEGMENT;
736 debug!("Declare admin space on {}", admin_keyexpr_expr);
737 let admin_queryable = self
738 .zsession
739 .declare_queryable(admin_keyexpr_expr)
740 .await
741 .expect("Failed to create AdminSpace queryable");
742
743 self.admin_space
745 .insert("config".try_into().unwrap(), AdminRef::Config);
746 self.admin_space
747 .insert("version".try_into().unwrap(), AdminRef::Version);
748
749 if self.config.forward_discovery {
750 self.run_fwd_discovery_mode(
751 &group_subscriber,
752 &dds_disco_rcv,
753 admin_keyexpr_prefix,
754 &admin_queryable,
755 )
756 .await;
757 } else {
758 self.run_local_discovery_mode(
759 &group_subscriber,
760 &dds_disco_rcv,
761 admin_keyexpr_prefix,
762 &admin_queryable,
763 )
764 .await;
765 }
766 }
767
768 fn topic_to_keyexpr(
769 &self,
770 topic_name: &str,
771 scope: &Option<OwnedKeyExpr>,
772 partition: Option<&str>,
773 ) -> ZResult<OwnedKeyExpr> {
774 match (scope, partition) {
776 (Some(scope), Some(part)) => scope.join(&format!("{part}/{topic_name}")),
777 (Some(scope), None) => scope.join(topic_name),
778 (None, Some(part)) => format!("{part}/{topic_name}").try_into(),
779 (None, None) => topic_name.try_into(),
780 }
781 }
782
783 async fn run_local_discovery_mode(
784 &mut self,
785 group_subscriber: &Receiver<Sample>,
786 dds_disco_rcv: &Receiver<DiscoveryEvent>,
787 admin_keyexpr_prefix: OwnedKeyExpr,
788 admin_queryable: &Queryable<FifoChannelHandler<Query>>,
789 ) {
790 debug!(r#"Run in "local discovery" mode"#);
791
792 loop {
793 select!(
794 evt = dds_disco_rcv.recv_async() => {
795 match evt.unwrap() {
796 DiscoveryEvent::DiscoveredPublication {
797 mut entity
798 } => {
799 debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
800 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
802
803 let qos = adapt_writer_qos_for_reader(&entity.qos);
804 let congestion_ctrl = match (self.config.reliable_routes_blocking, is_writer_reliable(&entity.qos.reliability)) {
806 (true, true) => CongestionControl::Block,
807 _ => CongestionControl::Drop,
808 };
809
810 if partition_is_empty(&entity.qos.partition) {
812 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
813 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
814 if let RouteStatus::Routed(ref route_key) = route_status {
815 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
816 r.add_local_routed_writer(entity.key.clone());
818 }
819 }
820 entity.routes.insert("*".to_string(), route_status);
821 } else {
822 for p in entity.qos.partition.as_deref().unwrap() {
823 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
824 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await;
825 if let RouteStatus::Routed(ref route_key) = route_status {
826 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
827 r.add_local_routed_writer(entity.key.clone());
829 }
830 }
831 entity.routes.insert(p.clone(), route_status);
832 }
833 }
834
835 self.insert_dds_writer(admin_keyexpr, entity);
837 }
838
839 DiscoveryEvent::UndiscoveredPublication {
840 key,
841 } => {
842 if let Some((_, e)) = self.remove_dds_writer(&key) {
843 debug!("Undiscovered DDS Writer {} on topic {}", key, e.topic_name);
844 let admin_space = &mut self.admin_space;
846 self.routes_from_dds.retain(|zkey, route| {
847 route.remove_local_routed_writer(&key);
848 if !route.has_local_routed_writer() {
849 info!(
850 "{}: remove it as no longer unused (no local DDS Writer left)",
851 route
852 );
853 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
854 admin_space.remove(&ke);
855 false
856 } else {
857 true
858 }
859 }
860 );
861 }
862 }
863
864 DiscoveryEvent::DiscoveredSubscription {
865 mut entity
866 } => {
867 debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
868 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
869
870 let qos = adapt_reader_qos_for_writer(&entity.qos);
871
872 if partition_is_empty(&entity.qos.partition) {
874 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
875 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos)).await;
876 if let RouteStatus::Routed(ref route_key) = route_status {
877 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
878 r.add_local_routed_reader(entity.key.clone());
880 }
881 }
882 entity.routes.insert("*".to_string(), route_status);
883 } else {
884 for p in entity.qos.partition.as_deref().unwrap() {
885 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
886 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos.clone())).await;
887 if let RouteStatus::Routed(ref route_key) = route_status {
888 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
889 r.add_local_routed_reader(entity.key.clone());
891 }
892 }
893 entity.routes.insert(p.clone(), route_status);
894 }
895 }
896
897 self.insert_dds_reader(admin_keyexpr, entity);
899 }
900
901 DiscoveryEvent::UndiscoveredSubscription {
902 key,
903 } => {
904 if let Some((_, e)) = self.remove_dds_reader(&key) {
905 debug!("Undiscovered DDS Reader {} on topic {}", key, e.topic_name);
906 let admin_space = &mut self.admin_space;
908 self.routes_to_dds.retain(|zkey, route| {
909 route.remove_local_routed_reader(&key);
910 if !route.has_local_routed_reader() {
911 info!(
912 "{}: remove it as no longer unused (no local DDS Reader left)",
913 route
914 );
915 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
916 admin_space.remove(&ke);
917 false
918 } else {
919 true
920 }
921 }
922 );
923 }
924 }
925
926 DiscoveryEvent::DiscoveredParticipant {
927 entity,
928 } => {
929 debug!("Discovered DDS Participant {}", entity.key);
930 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
931
932 self.insert_dds_participant(admin_keyexpr, entity);
934 }
935
936 DiscoveryEvent::UndiscoveredParticipant {
937 key,
938 } => {
939 if let Some((_, _)) = self.remove_dds_participant(&key) {
940 debug!("Undiscovered DDS Participant {}", key);
941 }
942 }
943 }
944 },
945
946 group_event = group_subscriber.recv_async() => {
947 match group_event.as_ref().map(|s|s.kind()) {
948 Ok(SampleKind::Put) => {
949 let zid = zenoh_id!(group_event.as_ref().unwrap());
950 debug!("New zenoh_dds_plugin detected: {}", zid);
951 if let Ok(zenoh_id) = keyexpr::new(zid) {
952 for (zkey, route) in &mut self.routes_to_dds {
954 route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
955 }
956 } else {
957 error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
958 }
959 }
960 Ok(_) => {} Err(e) => warn!("Error receiving GroupEvent: {}", e)
962 }
963 }
964
965 get_request = admin_queryable.recv_async() => {
966 if let Ok(query) = get_request {
967 self.treat_admin_query(query, &admin_keyexpr_prefix).await;
968 } else {
969 warn!("AdminSpace queryable was closed!");
970 }
971 }
972 )
973 }
974 }
975
976 async fn run_fwd_discovery_mode(
977 &mut self,
978 group_subscriber: &Receiver<Sample>,
979 dds_disco_rcv: &Receiver<DiscoveryEvent>,
980 admin_keyexpr_prefix: OwnedKeyExpr,
981 admin_queryable: &Queryable<FifoChannelHandler<Query>>,
982 ) {
983 debug!(r#"Run in "forward discovery" mode"#);
984
985 let uuid: OwnedKeyExpr = self.zsession.zid().into();
992 let fwd_key_prefix = if let Some(scope) = &self.config.scope {
993 *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO / scope
994 } else {
995 *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO
996 };
997 let fwd_writers_key_prefix =
998 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("writer") };
999 let fwd_readers_key_prefix =
1000 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("reader") };
1001 let fwd_ros_discovery_key =
1002 &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("ros_disco") };
1003 let fwd_declare_publication_cache_key = &fwd_key_prefix / *KE_ANY_N_SEGMENT;
1004 let fwd_discovery_subscription_key = if let Some(scope) = &self.config.scope {
1005 *KE_PREFIX_ADMIN_SPACE
1006 / *KE_ANY_1_SEGMENT
1007 / *KE_PREFIX_FWD_DISCO
1008 / scope
1009 / *KE_ANY_N_SEGMENT
1010 } else {
1011 *KE_PREFIX_ADMIN_SPACE / *KE_ANY_1_SEGMENT / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1012 };
1013
1014 let fwd_writers_key_prefix_key = self
1016 .zsession
1017 .declare_keyexpr(fwd_writers_key_prefix)
1018 .await
1019 .expect("Failed to declare key expression for Fwd Discovery of writers");
1020 let fwd_readers_key_prefix_key = self
1021 .zsession
1022 .declare_keyexpr(fwd_readers_key_prefix)
1023 .await
1024 .expect("Failed to declare key expression for Fwd Discovery of readers");
1025 let fwd_ros_discovery_key_declared = self
1026 .zsession
1027 .declare_keyexpr(&fwd_ros_discovery_key)
1028 .await
1029 .expect("Failed to declare key expression for Fwd Discovery of ros_discovery");
1030
1031 let _fwd_disco_pub_cache = self
1033 .zsession
1034 .declare_publication_cache(fwd_declare_publication_cache_key)
1035 .queryable_allowed_origin(Locality::Remote) .await
1037 .expect("Failed to declare PublicationCache for Fwd Discovery");
1038
1039 let fwd_disco_sub = self
1041 .zsession
1042 .declare_subscriber(fwd_discovery_subscription_key)
1043 .querying()
1044 .allowed_origin(Locality::Remote) .query_timeout(self.config.queries_timeout)
1046 .await
1047 .expect("Failed to declare QueryingSubscriber for Fwd Discovery");
1048
1049 let ros_disco_mgr =
1051 RosDiscoveryInfoMgr::create(self.dp).expect("Failed to create RosDiscoveryInfoMgr");
1052 let timer = Timer::default();
1053 let (tx, ros_disco_timer_rcv): (Sender<()>, Receiver<()>) = unbounded();
1054 let ros_disco_timer_event = TimedEvent::periodic(
1055 Duration::from_millis(ROS_DISCOVERY_INFO_POLL_INTERVAL_MS),
1056 ChannelEvent { tx },
1057 );
1058 timer.add_async(ros_disco_timer_event).await;
1059
1060 let mut participant_info = ParticipantEntitiesInfo::new(
1062 get_guid(&self.dp).expect("Failed to get my Participant's guid"),
1063 );
1064
1065 let scope = self.config.scope.clone();
1066 loop {
1067 select!(
1068 evt = dds_disco_rcv.recv_async() => {
1069 match evt.unwrap() {
1070 DiscoveryEvent::DiscoveredPublication {
1071 entity
1072 } => {
1073 debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1074 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
1076 let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1077 let msg = (&entity, &scope);
1078 let ser_msg = match bincode::serialize(&msg) {
1079 Ok(s) => s,
1080 Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1081 };
1082 if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1083 error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1084 }
1085
1086 self.insert_dds_writer(admin_keyexpr, entity);
1088 }
1089
1090 DiscoveryEvent::UndiscoveredPublication {
1091 key,
1092 } => {
1093 debug!("Undiscovered DDS Writer {} => advertise it", key);
1094 if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
1095 let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1096 if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1098 error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1099 }
1100 }
1101 }
1102
1103 DiscoveryEvent::DiscoveredSubscription {
1104 mut entity
1105 } => {
1106 debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1107
1108 if partition_is_empty(&entity.qos.partition) {
1111 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
1112 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&entity.qos), None).await;
1113 if let RouteStatus::Routed(ref route_key) = route_status {
1114 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1115 r.add_local_routed_reader(entity.key.clone());
1117 }
1118 }
1119 entity.routes.insert("*".to_string(), route_status);
1120 } else {
1121 for p in entity.qos.partition.as_deref().unwrap() {
1122 let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
1123 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&entity.qos), None).await;
1124 if let RouteStatus::Routed(ref route_key) = route_status {
1125 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1126 r.add_local_routed_reader(entity.key.clone());
1128 }
1129 }
1130 entity.routes.insert(p.clone(), route_status);
1131 }
1132 }
1133
1134 let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
1136 let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1137 let msg = (&entity, &scope);
1138 let ser_msg = match bincode::serialize(&msg) {
1139 Ok(s) => s,
1140 Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1141 };
1142 if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1143 error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1144 }
1145
1146 self.insert_dds_reader(admin_keyexpr, entity);
1148 }
1149
1150 DiscoveryEvent::UndiscoveredSubscription {
1151 key,
1152 } => {
1153 debug!("Undiscovered DDS Reader {} => advertise it", key);
1154 if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
1155 let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1156 if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1158 error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1159 }
1160 }
1161 let admin_space = &mut self.admin_space;
1164 self.routes_to_dds.retain(|zkey, route| {
1165 route.remove_local_routed_reader(&key);
1166 if !route.has_local_routed_reader() && !route.has_remote_routed_writer(){
1167 info!(
1168 "{}: remove it as no longer unused (no local DDS Reader nor remote DDS Writer left)",
1169 route
1170 );
1171 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1172 admin_space.remove(&ke);
1173 false
1174 } else {
1175 true
1176 }
1177 }
1178 );
1179 }
1180
1181 DiscoveryEvent::DiscoveredParticipant {
1182 entity,
1183 } => {
1184 debug!("Discovered DDS Participant {}", entity.key);
1185 let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
1186
1187 self.insert_dds_participant(admin_keyexpr, entity);
1189 }
1190
1191 DiscoveryEvent::UndiscoveredParticipant {
1192 key,
1193 } => {
1194 if let Some((_, _)) = self.remove_dds_participant(&key) {
1195 debug!("Undiscovered DDS Participant {}", key);
1196 }
1197 }
1198 }
1199 },
1200
1201 sample = fwd_disco_sub.recv_async() => {
1202 let sample = sample.expect("Fwd Discovery subscriber was closed!");
1203 let fwd_ke = &sample.key_expr();
1204 debug!("Received forwarded discovery message on {}", fwd_ke);
1205
1206 if let Some((remote_uuid, disco_kind, remaining_ke)) = Self::parse_fwd_discovery_keyexpr(fwd_ke) {
1208 match disco_kind {
1209 "writer" => {
1211 let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1213 if sample.kind() != SampleKind::Delete {
1214 let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1216 Ok(x) => x,
1217 Err(e) => {
1218 warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1219 continue;
1220 }
1221 };
1222 let qos = adapt_writer_qos_for_proxy_writer(&entity.qos);
1223
1224 if partition_is_empty(&entity.qos.partition) {
1226 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1227 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1228 if let RouteStatus::Routed(ref route_key) = route_status {
1229 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1230 r.add_remote_routed_writer(full_admin_keyexpr);
1232 for reader in self.discovered_readers.values_mut() {
1234 if reader.topic_name == entity.topic_name && partition_is_empty(&reader.qos.partition) {
1235 r.add_local_routed_reader(reader.key.clone());
1236 reader.routes.insert("*".to_string(), route_status.clone());
1237 }
1238 }
1239 }
1240 }
1241 } else {
1242 for p in entity.qos.partition.as_deref().unwrap() {
1243 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1244 let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos.clone())).await;
1245 if let RouteStatus::Routed(ref route_key) = route_status {
1246 if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1247 r.add_remote_routed_writer(full_admin_keyexpr.clone());
1249 for reader in self.discovered_readers.values_mut() {
1251 if reader.topic_name == entity.topic_name && partition_contains(&reader.qos.partition, p) {
1252 r.add_local_routed_reader(reader.key.clone());
1253 reader.routes.insert(p.clone(), route_status.clone());
1254 }
1255 }
1256 }
1257 }
1258 }
1259 }
1260 } else {
1261 let admin_space = &mut self.admin_space;
1263 self.routes_to_dds.retain(|zkey, route| {
1264 route.remove_remote_routed_writer(&full_admin_keyexpr);
1265 if route.has_remote_routed_writer() {
1266 true
1268 } else {
1269 route.delete_dds_writer();
1272 if !route.has_local_routed_reader() {
1273 info!(
1274 "{}: remove it as no longer unused (no remote DDS Writer nor local DDS Reader left)",
1275 route
1276 );
1277 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1278 admin_space.remove(&ke);
1279 false
1280 } else {
1281 true
1282 }
1283 }
1284 }
1285 );
1286 }
1287 }
1288
1289 "reader" => {
1291 let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1293 if sample.kind() != SampleKind::Delete {
1294 let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1296 Ok(x) => x,
1297 Err(e) => {
1298 warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1299 continue;
1300 }
1301 };
1302 let qos = adapt_reader_qos_for_proxy_reader(&entity.qos);
1303
1304 let congestion_ctrl = match (self.config.reliable_routes_blocking, is_reader_reliable(&entity.qos.reliability)) {
1306 (true, true) => CongestionControl::Block,
1307 _ => CongestionControl::Drop,
1308 };
1309
1310 if partition_is_empty(&entity.qos.partition) {
1312 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1313 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1314 if let RouteStatus::Routed(ref route_key) = route_status {
1315 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1316 r.add_remote_routed_reader(full_admin_keyexpr);
1318 for writer in self.discovered_writers.values_mut() {
1320 if writer.topic_name == entity.topic_name && partition_is_empty(&writer.qos.partition) {
1321 r.add_local_routed_writer(writer.key.clone());
1322 writer.routes.insert("*".to_string(), route_status.clone());
1323 }
1324 }
1325 }
1326 }
1327 } else {
1328 for p in &entity.qos.partition.unwrap() {
1329 let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1330 let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await;
1331 if let RouteStatus::Routed(ref route_key) = route_status {
1332 if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1333 r.add_remote_routed_reader(full_admin_keyexpr.clone());
1335 for writer in self.discovered_writers.values_mut() {
1337 if writer.topic_name == entity.topic_name && partition_contains(&writer.qos.partition, p) {
1338 r.add_local_routed_writer(writer.key.clone());
1339 writer.routes.insert(p.clone(), route_status.clone());
1340 }
1341 }
1342 }
1343 }
1344 }
1345 }
1346 } else {
1347 let admin_space = &mut self.admin_space;
1349 self.routes_from_dds.retain(|zkey, route| {
1350 route.remove_remote_routed_reader(&full_admin_keyexpr);
1351 if !route.has_remote_routed_reader() {
1352 info!(
1353 "{}: remove it as no longer unused (no remote DDS Reader left)",
1354 route
1355 );
1356 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1357 admin_space.remove(&ke);
1358 false
1359 } else {
1360 true
1361 }
1362 }
1363 );
1364 }
1365 }
1366
1367 "ros_disco" => {
1369 match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>(
1370 sample.payload().reader(),
1371 cdr::size::Infinite,
1372 ) {
1373 Ok(mut info) => {
1374 self.remap_entities_info(&mut info.node_entities_info_seq);
1376 participant_info.update_with(info.node_entities_info_seq);
1378 debug!("Publish updated ros_discovery_info: {:?}", participant_info);
1379 if let Err(e) = ros_disco_mgr.write(&participant_info) {
1380 error!("Error forwarding ros_discovery_info: {}", e);
1381 }
1382 }
1383 Err(e) => error!(
1384 "Error receiving ParticipantEntitiesInfo on {}: {}",
1385 fwd_ke, e
1386 ),
1387 }
1388 }
1389
1390 x => {
1391 error!("Unexpected forwarded discovery message received on invalid key {} (unknown kind: {}) ", fwd_ke, x);
1392 }
1393 }
1394 }
1395 },
1396
1397 group_event = group_subscriber.recv_async() => {
1398 match group_event.as_ref().map(|s|s.kind()) {
1399 Ok(SampleKind::Put) => {
1400 let zid = zenoh_id!(group_event.as_ref().unwrap());
1401 debug!("New zenoh_dds_plugin detected: {}", zid);
1402
1403 if let Ok(zenoh_id) = keyexpr::new(zid) {
1404 let key = if let Some(scope) = &self.config.scope {
1406 *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / scope / *KE_ANY_N_SEGMENT
1407 } else {
1408 *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1409 };
1410 debug!("Query past discovery messages from {} on {}", zid, key);
1411 if let Err(e) = fwd_disco_sub.fetch( |cb| {
1412 self.zsession.get(Selector::from(&key))
1413 .callback(cb)
1414 .target(QueryTarget::All)
1415 .consolidation(ConsolidationMode::None)
1416 .timeout(self.config.queries_timeout)
1417 .wait()
1418 }).await
1419 {
1420 warn!("Query on {} for discovery messages failed: {}", key, e);
1421 }
1422 for (zkey, route) in &mut self.routes_to_dds {
1424 route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
1425 }
1426 } else {
1427 error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
1428 }
1429 }
1430 Ok(SampleKind::Delete) => {
1431 let zid = zenoh_id!(group_event.as_ref().unwrap());
1432 debug!("Remote zenoh_dds_plugin left: {}", zid);
1433 let admin_space = &mut self.admin_space;
1436 let admin_subke = format!("@/{zid}/dds/");
1437 let mut participant_info_changed = false;
1438 self.routes_to_dds.retain(|zkey, route| {
1439 route.remove_remote_routed_writers_containing(&admin_subke);
1440 if !route.has_remote_routed_writer() {
1441 info!(
1442 "{}: remove it as no longer unused (no remote DDS Writer left)",
1443 route
1444 );
1445 let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1446 admin_space.remove(&ke);
1447 if let Ok(guid) = route.dds_writer_guid() {
1448 participant_info.remove_writer_gid(&guid);
1449 participant_info_changed = true;
1450 } else {
1451 warn!("Failed to get guid for Writer serving the route zenoh '{}' => DDS '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1452 }
1453 false
1454 } else {
1455 true
1456 }
1457 });
1458 self.routes_from_dds.retain(|zkey, route| {
1459 route.remove_remote_routed_readers_containing(&admin_subke);
1460 if !route.has_remote_routed_reader() {
1461 info!(
1462 "{}: remove it as no longer unused (no remote DDS Reader left)",
1463 route
1464 );
1465 let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1466 admin_space.remove(&ke);
1467 if let Ok(guid) = route.dds_reader_guid() {
1468 participant_info.remove_reader_gid(&guid);
1469 participant_info_changed = true;
1470 } else {
1471 warn!("Failed to get guid for Reader serving the route DDS '{}' => zenoh '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1472 }
1473 false
1474 } else {
1475 true
1476 }
1477 });
1478 if participant_info_changed {
1479 debug!("Publishing up-to-date ros_discovery_info after leaving of plugin {}", zid);
1480 participant_info.cleanup();
1481 if let Err(e) = ros_disco_mgr.write(&participant_info) {
1482 error!("Error forwarding ros_discovery_info: {}", e);
1483 }
1484 }
1485 }
1486 Err(e) => warn!("Error receiving GroupEvent: {}", e)
1487 }
1488 }
1489
1490 get_request = admin_queryable.recv_async() => {
1491 if let Ok(query) = get_request {
1492 self.treat_admin_query(query, &admin_keyexpr_prefix).await;
1493 } else {
1494 warn!("AdminSpace queryable was closed!");
1495 }
1496 }
1497
1498 _ = ros_disco_timer_rcv.recv_async() => {
1499 let infos = ros_disco_mgr.read();
1500 for (gid, buf) in infos {
1501 trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode());
1502 let ke = &fwd_ros_discovery_key_declared / unsafe { keyexpr::from_str_unchecked(&gid) };
1504 if let Err(e) = self.zsession.put(ke, buf).wait() {
1505 error!("Forward ROS discovery info failed: {}", e);
1506 }
1507 }
1508 }
1509 )
1510 }
1511 }
1512
1513 fn parse_fwd_discovery_keyexpr(fwd_ke: &keyexpr) -> Option<(&keyexpr, &str, &keyexpr)> {
1514 if !fwd_ke.starts_with(KE_PREFIX_ADMIN_SPACE.as_str()) {
1516 return None;
1518 }
1519 let mut remaining = &fwd_ke[KE_PREFIX_ADMIN_SPACE.len() + 1..];
1520 let uuid = if let Some(i) = remaining.find('/') {
1521 let uuid = unsafe { keyexpr::from_str_unchecked(&remaining[..i]) };
1522 remaining = &remaining[i + 1..];
1523 uuid
1524 } else {
1525 error!(
1526 "Unexpected forwarded discovery message received on invalid key: {}",
1527 fwd_ke
1528 );
1529 return None;
1530 };
1531 if !remaining.starts_with(KE_PREFIX_FWD_DISCO.as_str()) {
1532 return None;
1534 }
1535 let kind = if let Some(i) = remaining.find("/reader/") {
1536 remaining = &remaining[i + 8..];
1537 "reader"
1538 } else if let Some(i) = remaining.find("/writer/") {
1539 remaining = &remaining[i + 8..];
1540 "writer"
1541 } else if let Some(i) = remaining.find("/ros_disco/") {
1542 remaining = &remaining[i + 11..];
1543 "ros_disco"
1544 } else {
1545 error!("Unexpected forwarded discovery message received on invalid key: {} (no expected kind '/reader/', '/writer/' or '/ros_disco/')", fwd_ke);
1546 return None;
1547 };
1548 Some((uuid, kind, unsafe {
1549 keyexpr::from_str_unchecked(remaining)
1550 }))
1551 }
1552
1553 fn remap_entities_info(&self, entities_info: &mut HashMap<String, NodeEntitiesInfo>) {
1554 for node in entities_info.values_mut() {
1555 let mut i = 0;
1557 while i < node.reader_gid_seq.len() {
1558 match self
1560 .routes_from_dds
1561 .values()
1562 .find(|route| route.is_routing_remote_reader(&node.reader_gid_seq[i]))
1563 {
1564 Some(route) => {
1565 if let Ok(gid) = route.dds_reader_guid() {
1567 trace!(
1568 "ros_discovery_info remap reader {} -> {}",
1569 node.reader_gid_seq[i],
1570 gid
1571 );
1572 node.reader_gid_seq[i] = gid;
1573 i += 1;
1574 } else {
1575 error!("Failed to get guid for Reader serving the a route. Can't remap in ros_discovery_info");
1576 }
1577 }
1578 None => {
1579 trace!(
1582 "ros_discovery_info remap reader {} -> NONE",
1583 node.reader_gid_seq[i]
1584 );
1585 node.reader_gid_seq.remove(i);
1586 }
1587 }
1588 }
1589 let mut i = 0;
1590 while i < node.writer_gid_seq.len() {
1591 match self
1593 .routes_to_dds
1594 .values()
1595 .find(|route| route.is_routing_remote_writer(&node.writer_gid_seq[i]))
1596 {
1597 Some(route) => {
1598 if let Ok(gid) = route.dds_writer_guid() {
1600 trace!(
1601 "ros_discovery_info remap writer {} -> {}",
1602 node.writer_gid_seq[i],
1603 gid
1604 );
1605 node.writer_gid_seq[i] = gid;
1606 i += 1;
1607 } else {
1608 error!("Failed to get guid for Writer serving the a route. Can't remap in ros_discovery_info");
1609 }
1610 }
1611 None => {
1612 trace!(
1615 "ros_discovery_info remap writer {} -> NONE",
1616 node.writer_gid_seq[i]
1617 );
1618 node.writer_gid_seq.remove(i);
1619 }
1620 }
1621 }
1622 }
1623 }
1624}
1625
1626fn remove_null_qos_values(
1628 value: Result<Value, serde_json::Error>,
1629) -> Result<Value, serde_json::Error> {
1630 match value {
1631 Ok(value) => match value {
1632 Value::Object(mut obj) => {
1633 let qos = obj.get_mut("qos");
1634 if let Some(qos) = qos {
1635 if qos.is_object() {
1636 qos.as_object_mut().unwrap().retain(|_, v| !v.is_null());
1637 }
1638 }
1639 Ok(Value::Object(obj))
1640 }
1641 _ => Ok(value),
1642 },
1643 Err(error) => Err(error),
1644 }
1645}
1646
1647fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos {
1649 let mut reader_qos = qos.clone();
1650
1651 reader_qos.durability_service = None;
1653 reader_qos.ownership_strength = None;
1654 reader_qos.transport_priority = None;
1655 reader_qos.lifespan = None;
1656 reader_qos.writer_data_lifecycle = None;
1657 reader_qos.writer_batching = None;
1658
1659 reader_qos.properties = None;
1661 reader_qos.entity_name = None;
1662 reader_qos.ignore_local = None;
1663
1664 if reader_qos.reliability.is_none() {
1666 reader_qos.reliability = Some({
1667 Reliability {
1668 kind: ReliabilityKind::BEST_EFFORT,
1669 max_blocking_time: DDS_100MS_DURATION,
1670 }
1671 });
1672 }
1673
1674 reader_qos
1675}
1676
1677fn adapt_writer_qos_for_proxy_writer(qos: &Qos) -> Qos {
1679 let mut writer_qos = qos.clone();
1680
1681 writer_qos.properties = None;
1683 writer_qos.entity_name = None;
1684
1685 writer_qos.ignore_local = Some(IgnoreLocal {
1687 kind: IgnoreLocalKind::PARTICIPANT,
1688 });
1689
1690 writer_qos
1691}
1692
1693fn adapt_reader_qos_for_writer(qos: &Qos) -> Qos {
1695 let mut writer_qos = qos.clone();
1696
1697 writer_qos.time_based_filter = None;
1699 writer_qos.reader_data_lifecycle = None;
1700 writer_qos.properties = None;
1701 writer_qos.entity_name = None;
1702
1703 writer_qos.ignore_local = Some(IgnoreLocal {
1705 kind: IgnoreLocalKind::PARTICIPANT,
1706 });
1707
1708 if is_transient_local(qos) {
1711 let history = qos
1712 .history
1713 .as_ref()
1714 .map_or(History::default(), |history| history.clone());
1715
1716 writer_qos.durability_service = Some(DurabilityService {
1717 service_cleanup_delay: 60 * DDS_1S_DURATION,
1718 history_kind: history.kind,
1719 history_depth: history.depth,
1720 max_samples: DDS_LENGTH_UNLIMITED,
1721 max_instances: DDS_LENGTH_UNLIMITED,
1722 max_samples_per_instance: DDS_LENGTH_UNLIMITED,
1723 });
1724 }
1725 writer_qos.reliability = match writer_qos.reliability {
1727 Some(mut reliability) => {
1728 reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1729 Some(reliability)
1730 }
1731 _ => {
1732 let mut reliability = Reliability {
1733 kind: ReliabilityKind::RELIABLE,
1734 max_blocking_time: DDS_100MS_DURATION,
1735 };
1736 reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1737 Some(reliability)
1738 }
1739 };
1740
1741 writer_qos
1742}
1743
1744fn adapt_reader_qos_for_proxy_reader(qos: &Qos) -> Qos {
1746 let mut reader_qos = qos.clone();
1747
1748 reader_qos.properties = None;
1750 reader_qos.entity_name = None;
1751 reader_qos.ignore_local = None;
1752
1753 reader_qos
1754}
1755
1756#[inline]
1758pub(crate) fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
1759 let mut me = ManuallyDrop::new(v);
1760 (me.as_mut_ptr(), me.len(), me.capacity())
1761}
1762
1763struct ChannelEvent {
1764 tx: Sender<()>,
1765}
1766
1767#[async_trait]
1768impl Timed for ChannelEvent {
1769 async fn run(&mut self) {
1770 if self.tx.send(()).is_err() {
1771 warn!("Error sending periodic timer notification on channel");
1772 };
1773 }
1774}