zenoh_plugin_dds/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14#![allow(deprecated)]
15use std::{
16    collections::HashMap,
17    env,
18    future::Future,
19    mem::ManuallyDrop,
20    sync::{
21        atomic::{AtomicBool, AtomicUsize, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26
27use async_trait::async_trait;
28use cyclors::{
29    qos::{
30        DurabilityService, History, IgnoreLocal, IgnoreLocalKind, Qos, Reliability,
31        ReliabilityKind, DDS_100MS_DURATION, DDS_1S_DURATION,
32    },
33    *,
34};
35use flume::{unbounded, Receiver, Sender};
36use futures::select;
37use route_dds_zenoh::RouteDDSZenoh;
38use serde::{ser::SerializeStruct, Serialize, Serializer};
39use serde_json::Value;
40use tokio::task::JoinHandle;
41use tracing::{debug, error, info, trace, warn};
42use zenoh::{
43    bytes::{Encoding, ZBytes},
44    handlers::FifoChannelHandler,
45    internal::{
46        plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
47        runtime::Runtime,
48        zerror, Timed, TimedEvent, Timer,
49    },
50    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
51    liveliness::LivelinessToken,
52    qos::CongestionControl,
53    query::{ConsolidationMode, Query, QueryTarget, Queryable, Selector},
54    sample::{Locality, Sample, SampleKind},
55    Result as ZResult, Session, Wait,
56};
57use zenoh_ext::{SessionExt, SubscriberBuilderExt};
58use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
59
60pub mod config;
61mod dds_mgt;
62mod qos_helpers;
63mod ros_discovery;
64mod route_dds_zenoh;
65mod route_zenoh_dds;
66use config::Config;
67use dds_mgt::*;
68
69use crate::{
70    qos_helpers::*,
71    ros_discovery::{
72        NodeEntitiesInfo, ParticipantEntitiesInfo, RosDiscoveryInfoMgr,
73        ROS_DISCOVERY_INFO_TOPIC_NAME,
74    },
75    route_zenoh_dds::RouteZenohDDS,
76};
77
78macro_rules! zenoh_id {
79    ($val:expr) => {
80        $val.key_expr().as_str().split('/').last().unwrap()
81    };
82}
83
84lazy_static::lazy_static! {
85    static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
86    static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
87    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
88    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
89               .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
90               .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
91               .enable_all()
92               .build()
93               .expect("Unable to create runtime");
94}
95#[inline(always)]
96pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
97where
98    F: Future + Send + 'static,
99    F::Output: Send + 'static,
100{
101    // Check whether able to get the current runtime
102    match tokio::runtime::Handle::try_current() {
103        Ok(rt) => {
104            // Able to get the current runtime (standalone binary), spawn on the current runtime
105            rt.spawn(task)
106        }
107        Err(_) => {
108            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
109            TOKIO_RUNTIME.spawn(task)
110        }
111    }
112}
113
114lazy_static::lazy_static!(
115    static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
116
117    static ref KE_PREFIX_ADMIN_SPACE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") };
118    static ref KE_PREFIX_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("dds") };
119    static ref KE_PREFIX_ROUTE_TO_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/to_dds") };
120    static ref KE_PREFIX_ROUTE_FROM_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/from_dds") };
121    static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_pub_cache") };
122    static ref KE_PREFIX_FWD_DISCO: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_fwd_disco") };
123    static ref KE_PREFIX_LIVELINESS_GROUP: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("zenoh-plugin-dds") };
124
125    static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
126    static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
127
128    static ref LOG_ROS2_DEPRECATION_WARNING_FLAG: AtomicBool = AtomicBool::new(false);
129);
130
131// CycloneDDS' localhost-only: set network interface address (shortened form of config would be
132// possible, too, but I think it is clearer to spell it out completely).
133// Empty configuration fragments are ignored, so it is safe to unconditionally append a comma.
134const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General><Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces></General></Domain></CycloneDDS>,"#;
135
136// CycloneDDS' enable-shm: enable usage of Iceoryx PSMX plugin
137#[cfg(feature = "dds_shm")]
138const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><General><Interfaces><PubSubMessageExchange name="iox" library="psmx_iox" priority="1000000"/></Interfaces></General></Domain></CycloneDDS>,"#;
139
140const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 500;
141
142#[cfg(feature = "dynamic_plugin")]
143zenoh_plugin_trait::declare_plugin!(DDSPlugin);
144
145fn log_ros2_deprecation_warning() {
146    if !LOG_ROS2_DEPRECATION_WARNING_FLAG.swap(true, std::sync::atomic::Ordering::Relaxed) {
147        tracing::warn!("------------------------------------------------------------------------------------------");
148        tracing::warn!(
149            "ROS 2 system detected. Did you know a new Zenoh bridge dedicated to ROS 2 exists ?"
150        );
151        tracing::warn!("Check it out on https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds");
152        tracing::warn!("This DDS bridge will eventually be deprecated for ROS 2 usage in favor of this new bridge.");
153        tracing::warn!("------------------------------------------------------------------------------------------");
154    }
155}
156
157#[allow(clippy::upper_case_acronyms)]
158pub struct DDSPlugin;
159
160impl PluginControl for DDSPlugin {}
161impl ZenohPlugin for DDSPlugin {}
162impl Plugin for DDSPlugin {
163    type StartArgs = Runtime;
164    type Instance = RunningPlugin;
165
166    const DEFAULT_NAME: &'static str = "dds";
167    const PLUGIN_VERSION: &'static str = plugin_version!();
168    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
169
170    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
171        // Try to initiate login.
172        // Required in case of dynamic lib, otherwise no logs.
173        // But cannot be done twice in case of static link.
174        zenoh::try_init_log_from_env();
175
176        let runtime_conf = runtime.config().lock();
177        let plugin_conf = runtime_conf
178            .plugin(name)
179            .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
180        let config: Config = serde_json::from_value(plugin_conf.clone())
181            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182        WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
183        MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
184
185        spawn_runtime(run(runtime.clone(), config));
186
187        Ok(Box::new(DDSPlugin))
188    }
189}
190impl RunningPluginTrait for DDSPlugin {}
191
192pub async fn run(runtime: Runtime, config: Config) {
193    // Try to initiate login.
194    // Required in case of dynamic lib, otherwise no logs.
195    // But cannot be done twice in case of static link.
196    zenoh::try_init_log_from_env();
197    debug!("DDS plugin {}", DDSPlugin::PLUGIN_LONG_VERSION);
198    debug!("DDS plugin {:?}", config);
199
200    // open zenoh-net Session
201    let zsession = match zenoh::session::init(runtime)
202        .aggregated_subscribers(config.generalise_subs.clone())
203        .aggregated_publishers(config.generalise_pubs.clone())
204        .await
205    {
206        Ok(session) => Arc::new(session),
207        Err(e) => {
208            tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
209            return;
210        }
211    };
212
213    let member = match zsession
214        .liveliness()
215        .declare_token(*KE_PREFIX_LIVELINESS_GROUP / &zsession.zid().into_keyexpr())
216        .await
217    {
218        Ok(member) => member,
219        Err(e) => {
220            tracing::error!(
221                "Unable to declare liveliness token for DDS plugin : {:?}",
222                e
223            );
224            return;
225        }
226    };
227
228    // if "localhost_only" is set, configure CycloneDDS to use only localhost interface
229    if config.localhost_only {
230        env::set_var(
231            "CYCLONEDDS_URI",
232            format!(
233                "{}{}",
234                CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
235                env::var("CYCLONEDDS_URI").unwrap_or_default()
236            ),
237        );
238    }
239
240    // if "enable_shm" is set, configure CycloneDDS to use Iceoryx PSMX plugin
241    #[cfg(feature = "dds_shm")]
242    {
243        if config.shm_enabled {
244            env::set_var(
245                "CYCLONEDDS_URI",
246                format!(
247                    "{}{}",
248                    CYCLONEDDS_CONFIG_ENABLE_SHM,
249                    env::var("CYCLONEDDS_URI").unwrap_or_default()
250                ),
251            );
252            if config.forward_discovery {
253                warn!("DDS shared memory support enabled but will not be used as forward discovery mode is active.");
254            }
255        }
256    }
257
258    // create DDS Participant
259    debug!(
260        "Create DDS Participant with CYCLONEDDS_URI='{}'",
261        env::var("CYCLONEDDS_URI").unwrap_or_default()
262    );
263    let dp = unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
264    debug!(
265        "DDS plugin {} using DDS Participant {}",
266        zsession.zid(),
267        get_guid(&dp).unwrap()
268    );
269
270    let mut dds_plugin = DdsPluginRuntime {
271        config,
272        zsession: &zsession,
273        _member: member,
274        dp,
275        discovered_participants: HashMap::<String, DdsParticipant>::new(),
276        discovered_writers: HashMap::<String, DdsEntity>::new(),
277        discovered_readers: HashMap::<String, DdsEntity>::new(),
278        routes_from_dds: HashMap::<OwnedKeyExpr, RouteDDSZenoh>::new(),
279        routes_to_dds: HashMap::<OwnedKeyExpr, RouteZenohDDS>::new(),
280        admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
281    };
282
283    dds_plugin.run().await;
284}
285
286// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
287#[derive(Debug)]
288enum AdminRef {
289    DdsParticipant(String),
290    DdsWriterEntity(String),
291    DdsReaderEntity(String),
292    FromDdsRoute(OwnedKeyExpr),
293    ToDdsRoute(OwnedKeyExpr),
294    Config,
295    Version,
296}
297
298pub(crate) struct DdsPluginRuntime<'a> {
299    config: Config,
300    // Note: &'a Arc<Session> here to keep the ownership of Session outside this struct
301    // and be able to store the publishers/subscribers it creates in this same struct.
302    zsession: &'a Arc<Session>,
303    _member: LivelinessToken,
304    dp: dds_entity_t,
305    // maps of all discovered DDS entities (indexed by DDS key)
306    discovered_participants: HashMap<String, DdsParticipant>,
307    discovered_writers: HashMap<String, DdsEntity>,
308    discovered_readers: HashMap<String, DdsEntity>,
309    // maps of established routes from/to DDS (indexed by zenoh key expression)
310    routes_from_dds: HashMap<OwnedKeyExpr, RouteDDSZenoh<'a>>,
311    routes_to_dds: HashMap<OwnedKeyExpr, RouteZenohDDS<'a>>,
312    // admin space: index is the admin_keyexpr (relative to admin_prefix)
313    // value is the JSon string to return to queries.
314    admin_space: HashMap<OwnedKeyExpr, AdminRef>,
315}
316
317impl Serialize for DdsPluginRuntime<'_> {
318    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
319    where
320        S: Serializer,
321    {
322        // return the plugin's config as a JSON struct
323        let mut s = serializer.serialize_struct("dds", 3)?;
324        s.serialize_field("domain", &self.config.domain)?;
325        s.serialize_field("scope", &self.config.scope)?;
326        s.serialize_field(
327            "allow",
328            &self
329                .config
330                .allow
331                .as_ref()
332                .map_or_else(|| ".*".to_string(), |re| re.to_string()),
333        )?;
334        s.serialize_field(
335            "deny",
336            &self
337                .config
338                .deny
339                .as_ref()
340                .map_or_else(|| "".to_string(), |re| re.to_string()),
341        )?;
342        s.serialize_field(
343            "max-frequencies",
344            &self
345                .config
346                .max_frequencies
347                .iter()
348                .map(|(re, freq)| format!("{re}={freq}"))
349                .collect::<Vec<String>>(),
350        )?;
351        s.serialize_field("forward_discovery", &self.config.forward_discovery)?;
352        s.serialize_field(
353            "reliable_routes_blocking",
354            &self.config.reliable_routes_blocking,
355        )?;
356        s.end()
357    }
358}
359
360lazy_static::lazy_static! {
361    static ref JSON_NULL_VALUE: Value = serde_json::json!(null);
362}
363
364impl<'a> DdsPluginRuntime<'a> {
365    fn is_allowed(&self, ke: &keyexpr) -> bool {
366        if ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
367            log_ros2_deprecation_warning();
368        }
369
370        if self.config.forward_discovery && ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
371            // If fwd-discovery mode is enabled, don't route "ros_discovery_info"
372            return false;
373        }
374        match (&self.config.allow, &self.config.deny) {
375            (Some(allow), None) => allow.is_match(ke),
376            (None, Some(deny)) => !deny.is_match(ke),
377            (Some(allow), Some(deny)) => allow.is_match(ke) && !deny.is_match(ke),
378            (None, None) => true,
379        }
380    }
381
382    // Return the read period if keyexpr matches one of the --dds-periodic-topics option
383    fn get_read_period(&self, ke: &keyexpr) -> Option<Duration> {
384        for (re, freq) in &self.config.max_frequencies {
385            if re.is_match(ke) {
386                return Some(Duration::from_secs_f32(1f32 / freq));
387            }
388        }
389        None
390    }
391
392    fn get_participant_admin_keyexpr(e: &DdsParticipant) -> OwnedKeyExpr {
393        format!("participant/{}", e.key,).try_into().unwrap()
394    }
395
396    fn get_entity_admin_keyexpr(e: &DdsEntity, is_writer: bool) -> OwnedKeyExpr {
397        format!(
398            "participant/{}/{}/{}/{}",
399            e.participant_key,
400            if is_writer { "writer" } else { "reader" },
401            e.key,
402            e.topic_name
403        )
404        .try_into()
405        .unwrap()
406    }
407
408    fn insert_dds_participant(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsParticipant) {
409        // insert reference in admin space
410        self.admin_space
411            .insert(admin_keyexpr, AdminRef::DdsParticipant(e.key.clone()));
412
413        // insert DdsParticipant in discovered_participants map
414        self.discovered_participants.insert(e.key.clone(), e);
415    }
416
417    fn remove_dds_participant(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsParticipant)> {
418        // remove from participants map
419        if let Some(e) = self.discovered_participants.remove(dds_key) {
420            // remove from admin_space
421            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&e);
422            self.admin_space.remove(&admin_keyexpr);
423            Some((admin_keyexpr, e))
424        } else {
425            None
426        }
427    }
428
429    fn insert_dds_writer(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
430        // insert reference in admin_space
431        self.admin_space
432            .insert(admin_keyexpr, AdminRef::DdsWriterEntity(e.key.clone()));
433
434        // insert DdsEntity in dds_writer map
435        self.discovered_writers.insert(e.key.clone(), e);
436    }
437
438    fn remove_dds_writer(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
439        // remove from dds_writer map
440        if let Some(e) = self.discovered_writers.remove(dds_key) {
441            // remove from admin_space
442            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, true);
443            self.admin_space.remove(&admin_keyexpr);
444            Some((admin_keyexpr, e))
445        } else {
446            None
447        }
448    }
449
450    fn insert_dds_reader(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
451        // insert reference in admin_space
452        self.admin_space
453            .insert(admin_keyexpr, AdminRef::DdsReaderEntity(e.key.clone()));
454
455        // insert DdsEntity in dds_reader map
456        self.discovered_readers.insert(e.key.clone(), e);
457    }
458
459    fn remove_dds_reader(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
460        // remove from dds_reader map
461        if let Some(e) = self.discovered_readers.remove(dds_key) {
462            // remove from admin space
463            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, false);
464            self.admin_space.remove(&admin_keyexpr);
465            Some((admin_keyexpr, e))
466        } else {
467            None
468        }
469    }
470
471    fn insert_route_from_dds(&mut self, ke: OwnedKeyExpr, r: RouteDDSZenoh<'a>) {
472        // insert reference in admin_space
473        let admin_ke = *KE_PREFIX_ROUTE_FROM_DDS / &ke;
474        self.admin_space
475            .insert(admin_ke, AdminRef::FromDdsRoute(ke.clone()));
476
477        // insert route in routes_from_dds map
478        self.routes_from_dds.insert(ke, r);
479    }
480
481    fn insert_route_to_dds(&mut self, ke: OwnedKeyExpr, r: RouteZenohDDS<'a>) {
482        // insert reference in admin_space
483        let admin_ke: OwnedKeyExpr = *KE_PREFIX_ROUTE_TO_DDS / &ke;
484        self.admin_space
485            .insert(admin_ke, AdminRef::ToDdsRoute(ke.clone()));
486
487        // insert route in routes_from_dds map
488        self.routes_to_dds.insert(ke, r);
489    }
490
491    #[allow(clippy::too_many_arguments)]
492    async fn try_add_route_from_dds(
493        &mut self,
494        ke: OwnedKeyExpr,
495        topic_name: &str,
496        topic_type: &str,
497        type_info: &Option<TypeInfo>,
498        keyless: bool,
499        reader_qos: Qos,
500        congestion_ctrl: CongestionControl,
501    ) -> RouteStatus {
502        if !self.is_allowed(&ke) {
503            info!(
504                "Ignoring Publication for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
505                ke
506            );
507            return RouteStatus::NotAllowed;
508        }
509
510        if self.routes_from_dds.contains_key(&ke) {
511            // TODO: check if there is no QoS conflict with existing route
512            debug!(
513                "Route from DDS to resource {} already exists -- ignoring",
514                ke
515            );
516            return RouteStatus::Routed(ke);
517        }
518
519        // create route DDS->Zenoh
520        match RouteDDSZenoh::new(
521            self,
522            topic_name.into(),
523            topic_type.into(),
524            type_info,
525            keyless,
526            reader_qos,
527            ke.clone(),
528            congestion_ctrl,
529        )
530        .await
531        {
532            Ok(route) => {
533                info!("{}: created with topic_type={}", route, topic_type);
534                self.insert_route_from_dds(ke.clone(), route);
535                RouteStatus::Routed(ke)
536            }
537            Err(e) => {
538                error!(
539                    "Route DDS->Zenoh ({} -> {}): creation failed: {}",
540                    topic_name, ke, e
541                );
542                RouteStatus::CreationFailure(e)
543            }
544        }
545    }
546
547    async fn try_add_route_to_dds(
548        &mut self,
549        ke: OwnedKeyExpr,
550        topic_name: &str,
551        topic_type: &str,
552        keyless: bool,
553        is_transient: bool,
554        writer_qos: Option<Qos>,
555    ) -> RouteStatus {
556        if !self.is_allowed(&ke) {
557            info!(
558                "Ignoring Subscription for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
559                ke
560            );
561            return RouteStatus::NotAllowed;
562        }
563
564        if let Some(route) = self.routes_to_dds.get(&ke) {
565            // TODO: check if there is no type or QoS conflict with existing route
566            debug!(
567                "Route from resource {} to DDS already exists -- ignoring",
568                ke
569            );
570            // #102: in forwarding mode, it might happen that the route have been created but without DDS Writer
571            //       (just to declare the Zenoh Subscriber). Thus, try to set a DDS Writer to the route here.
572            //       If already set, nothing will happen.
573            if let Some(qos) = writer_qos {
574                if let Err(e) = route.set_dds_writer(self.dp, qos) {
575                    error!(
576                        "{}: failed to set a DDS Writer after creation: {}",
577                        route, e
578                    );
579                    return RouteStatus::CreationFailure(e);
580                }
581            }
582            return RouteStatus::Routed(ke);
583        }
584
585        // create route Zenoh->DDS
586        match RouteZenohDDS::new(
587            self,
588            ke.clone(),
589            is_transient,
590            topic_name.into(),
591            topic_type.into(),
592            keyless,
593        )
594        .await
595        {
596            Ok(route) => {
597                // if writer_qos is set, add a DDS Writer to the route
598                if let Some(qos) = writer_qos {
599                    if let Err(e) = route.set_dds_writer(self.dp, qos) {
600                        error!(
601                            "Route Zenoh->DDS ({} -> {}): creation failed: {}",
602                            ke, topic_name, e
603                        );
604                        return RouteStatus::CreationFailure(e);
605                    }
606                }
607
608                info!("{}: created with topic_type={}", route, topic_type);
609                self.insert_route_to_dds(ke.clone(), route);
610                RouteStatus::Routed(ke)
611            }
612            Err(e) => {
613                error!(
614                    "Route Zenoh->DDS ({} -> {}): creation failed: {}",
615                    ke, topic_name, e
616                );
617                RouteStatus::CreationFailure(e)
618            }
619        }
620    }
621
622    fn get_admin_value(&self, admin_ref: &AdminRef) -> Result<Option<Value>, serde_json::Error> {
623        match admin_ref {
624            AdminRef::DdsParticipant(key) => self
625                .discovered_participants
626                .get(key)
627                .map(serde_json::to_value)
628                .map(remove_null_qos_values)
629                .transpose(),
630            AdminRef::DdsReaderEntity(key) => self
631                .discovered_readers
632                .get(key)
633                .map(serde_json::to_value)
634                .map(remove_null_qos_values)
635                .transpose(),
636            AdminRef::DdsWriterEntity(key) => self
637                .discovered_writers
638                .get(key)
639                .map(serde_json::to_value)
640                .map(remove_null_qos_values)
641                .transpose(),
642            AdminRef::FromDdsRoute(zkey) => self
643                .routes_from_dds
644                .get(zkey)
645                .map(serde_json::to_value)
646                .transpose(),
647            AdminRef::ToDdsRoute(zkey) => self
648                .routes_to_dds
649                .get(zkey)
650                .map(serde_json::to_value)
651                .transpose(),
652            AdminRef::Config => Some(serde_json::to_value(self)).transpose(),
653            AdminRef::Version => Ok(Some(DDSPlugin::PLUGIN_LONG_VERSION.into())),
654        }
655    }
656
657    async fn treat_admin_query(&self, query: Query, admin_keyexpr_prefix: &keyexpr) {
658        let selector = query.selector();
659        debug!("Query on admin space: {:?}", selector);
660
661        // get the list of sub-key expressions that will match the same stored keys than
662        // the selector, if those keys had the admin_keyexpr_prefix.
663        let sub_kes = selector.key_expr().strip_prefix(admin_keyexpr_prefix);
664        if sub_kes.is_empty() {
665            error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
666            return;
667        }
668
669        // Get all matching keys/values
670        let mut kvs: Vec<(KeyExpr, Value)> = Vec::with_capacity(sub_kes.len());
671        for sub_ke in sub_kes {
672            if sub_ke.contains('*') {
673                // iterate over all admin space to find matching keys
674                for (ke, admin_ref) in self.admin_space.iter() {
675                    if sub_ke.intersects(ke) {
676                        match self.get_admin_value(admin_ref) {
677                            Ok(Some(v)) => kvs.push((ke.into(), v)),
678                            Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
679                            Err(e) => {
680                                error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
681                            }
682                        }
683                    }
684                }
685            } else {
686                // sub_ke correspond to 1 key - just get it.
687                if let Some(admin_ref) = self.admin_space.get(sub_ke) {
688                    match self.get_admin_value(admin_ref) {
689                        Ok(Some(v)) => kvs.push((sub_ke.into(), v)),
690                        Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
691                        Err(e) => {
692                            error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
693                        }
694                    }
695                }
696            }
697        }
698
699        // send replies
700        for (ke, v) in kvs.drain(..) {
701            let admin_keyexpr = admin_keyexpr_prefix / &ke;
702            match serde_json::to_vec(&v) {
703                Ok(vec_u8) => {
704                    let payload = ZBytes::from(vec_u8);
705                    if let Err(e) = query
706                        .reply(admin_keyexpr, payload)
707                        .encoding(Encoding::APPLICATION_JSON)
708                        .await
709                    {
710                        warn!("Error replying to admin query {:?}: {}", query, e);
711                    }
712                }
713                Err(e) => warn!("Error transforming JSON to admin query {:?}: {}", query, e),
714            }
715        }
716    }
717
718    async fn run(&mut self) {
719        let group_subscriber = self
720            .zsession
721            .liveliness()
722            .declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
723            .querying()
724            .with(flume::unbounded())
725            .await
726            .expect("Failed to create Liveliness Subscriber");
727
728        // run DDS discovery
729        let (tx, dds_disco_rcv): (Sender<DiscoveryEvent>, Receiver<DiscoveryEvent>) = unbounded();
730        run_discovery(self.dp, tx);
731
732        // declare admin space queryable
733        let admin_keyexpr_prefix =
734            *KE_PREFIX_ADMIN_SPACE / &self.zsession.zid().into_keyexpr() / *KE_PREFIX_DDS;
735        let admin_keyexpr_expr = (&admin_keyexpr_prefix) / *KE_ANY_N_SEGMENT;
736        debug!("Declare admin space on {}", admin_keyexpr_expr);
737        let admin_queryable = self
738            .zsession
739            .declare_queryable(admin_keyexpr_expr)
740            .await
741            .expect("Failed to create AdminSpace queryable");
742
743        // add plugin's config and version in admin space
744        self.admin_space
745            .insert("config".try_into().unwrap(), AdminRef::Config);
746        self.admin_space
747            .insert("version".try_into().unwrap(), AdminRef::Version);
748
749        if self.config.forward_discovery {
750            self.run_fwd_discovery_mode(
751                &group_subscriber,
752                &dds_disco_rcv,
753                admin_keyexpr_prefix,
754                &admin_queryable,
755            )
756            .await;
757        } else {
758            self.run_local_discovery_mode(
759                &group_subscriber,
760                &dds_disco_rcv,
761                admin_keyexpr_prefix,
762                &admin_queryable,
763            )
764            .await;
765        }
766    }
767
768    fn topic_to_keyexpr(
769        &self,
770        topic_name: &str,
771        scope: &Option<OwnedKeyExpr>,
772        partition: Option<&str>,
773    ) -> ZResult<OwnedKeyExpr> {
774        // key_expr for a topic is: "<scope>/<partition>/<topic_name>" with <scope> and <partition> being optional
775        match (scope, partition) {
776            (Some(scope), Some(part)) => scope.join(&format!("{part}/{topic_name}")),
777            (Some(scope), None) => scope.join(topic_name),
778            (None, Some(part)) => format!("{part}/{topic_name}").try_into(),
779            (None, None) => topic_name.try_into(),
780        }
781    }
782
783    async fn run_local_discovery_mode(
784        &mut self,
785        group_subscriber: &Receiver<Sample>,
786        dds_disco_rcv: &Receiver<DiscoveryEvent>,
787        admin_keyexpr_prefix: OwnedKeyExpr,
788        admin_queryable: &Queryable<FifoChannelHandler<Query>>,
789    ) {
790        debug!(r#"Run in "local discovery" mode"#);
791
792        loop {
793            select!(
794                evt = dds_disco_rcv.recv_async() => {
795                    match evt.unwrap() {
796                        DiscoveryEvent::DiscoveredPublication {
797                            mut entity
798                        } => {
799                            debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
800                            // get its admin_keyexpr
801                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
802
803                            let qos = adapt_writer_qos_for_reader(&entity.qos);
804                            // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS)
805                            let congestion_ctrl = match (self.config.reliable_routes_blocking, is_writer_reliable(&entity.qos.reliability)) {
806                                (true, true) => CongestionControl::Block,
807                                _ => CongestionControl::Drop,
808                            };
809
810                            // create 1 route per partition, or just 1 if no partition
811                            if partition_is_empty(&entity.qos.partition) {
812                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
813                                let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
814                                if let RouteStatus::Routed(ref route_key) = route_status {
815                                    if let Some(r) = self.routes_from_dds.get_mut(route_key) {
816                                        // add Writer's key to the route
817                                        r.add_local_routed_writer(entity.key.clone());
818                                    }
819                                }
820                                entity.routes.insert("*".to_string(), route_status);
821                            } else {
822                                for p in entity.qos.partition.as_deref().unwrap() {
823                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
824                                    let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await;
825                                    if let RouteStatus::Routed(ref route_key) = route_status {
826                                        if let Some(r) = self.routes_from_dds.get_mut(route_key) {
827                                            // if route has been created, add this Writer in its routed_writers list
828                                            r.add_local_routed_writer(entity.key.clone());
829                                        }
830                                    }
831                                    entity.routes.insert(p.clone(), route_status);
832                                }
833                            }
834
835                            // store the writer
836                            self.insert_dds_writer(admin_keyexpr, entity);
837                        }
838
839                        DiscoveryEvent::UndiscoveredPublication {
840                            key,
841                        } => {
842                            if let Some((_, e)) = self.remove_dds_writer(&key) {
843                                debug!("Undiscovered DDS Writer {} on topic {}", key, e.topic_name);
844                                // remove it from all the active routes referring it (deleting the route if no longer used)
845                                let admin_space = &mut self.admin_space;
846                                self.routes_from_dds.retain(|zkey, route| {
847                                        route.remove_local_routed_writer(&key);
848                                        if !route.has_local_routed_writer() {
849                                            info!(
850                                                "{}: remove it as no longer unused (no local DDS Writer left)",
851                                                route
852                                            );
853                                            let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
854                                            admin_space.remove(&ke);
855                                            false
856                                        } else {
857                                            true
858                                        }
859                                    }
860                                );
861                            }
862                        }
863
864                        DiscoveryEvent::DiscoveredSubscription {
865                            mut entity
866                        } => {
867                            debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
868                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
869
870                            let qos = adapt_reader_qos_for_writer(&entity.qos);
871
872                            // create 1 route per partition, or just 1 if no partition
873                            if partition_is_empty(&entity.qos.partition) {
874                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
875                                let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos)).await;
876                                if let RouteStatus::Routed(ref route_key) = route_status {
877                                    if let Some(r) = self.routes_to_dds.get_mut(route_key) {
878                                        // if route has been created, add this Reader in its routed_readers list
879                                        r.add_local_routed_reader(entity.key.clone());
880                                    }
881                                }
882                                entity.routes.insert("*".to_string(), route_status);
883                            } else {
884                                for p in entity.qos.partition.as_deref().unwrap() {
885                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
886                                    let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos.clone())).await;
887                                    if let RouteStatus::Routed(ref route_key) = route_status {
888                                        if let Some(r) = self.routes_to_dds.get_mut(route_key) {
889                                            // if route has been created, add this Reader in its routed_readers list
890                                            r.add_local_routed_reader(entity.key.clone());
891                                        }
892                                    }
893                                    entity.routes.insert(p.clone(), route_status);
894                                }
895                            }
896
897                            // store the reader
898                            self.insert_dds_reader(admin_keyexpr, entity);
899                        }
900
901                        DiscoveryEvent::UndiscoveredSubscription {
902                            key,
903                        } => {
904                            if let Some((_, e)) = self.remove_dds_reader(&key) {
905                                debug!("Undiscovered DDS Reader {} on topic {}", key, e.topic_name);
906                                // remove it from all the active routes referring it (deleting the route if no longer used)
907                                let admin_space = &mut self.admin_space;
908                                self.routes_to_dds.retain(|zkey, route| {
909                                        route.remove_local_routed_reader(&key);
910                                        if !route.has_local_routed_reader() {
911                                            info!(
912                                                "{}: remove it as no longer unused (no local DDS Reader left)",
913                                                route
914                                            );
915                                            let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
916                                            admin_space.remove(&ke);
917                                            false
918                                        } else {
919                                            true
920                                        }
921                                    }
922                                );
923                            }
924                        }
925
926                        DiscoveryEvent::DiscoveredParticipant {
927                            entity,
928                        } => {
929                            debug!("Discovered DDS Participant {}", entity.key);
930                            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
931
932                            // store the participant
933                            self.insert_dds_participant(admin_keyexpr, entity);
934                        }
935
936                        DiscoveryEvent::UndiscoveredParticipant {
937                            key,
938                        } => {
939                            if let Some((_, _)) = self.remove_dds_participant(&key) {
940                                debug!("Undiscovered DDS Participant {}", key);
941                            }
942                        }
943                    }
944                },
945
946                group_event = group_subscriber.recv_async() => {
947                    match group_event.as_ref().map(|s|s.kind()) {
948                        Ok(SampleKind::Put) => {
949                            let zid = zenoh_id!(group_event.as_ref().unwrap());
950                            debug!("New zenoh_dds_plugin detected: {}", zid);
951                            if let Ok(zenoh_id) = keyexpr::new(zid) {
952                                // make all QueryingSubscriber to query this new member
953                                for (zkey, route) in &mut self.routes_to_dds {
954                                    route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE  / zenoh_id).into(), self.config.queries_timeout).await;
955                                }
956                            } else {
957                                error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
958                            }
959                        }
960                        Ok(_) => {} // ignore other GroupEvents
961                        Err(e) => warn!("Error receiving GroupEvent: {}", e)
962                    }
963                }
964
965                get_request = admin_queryable.recv_async() => {
966                    if let Ok(query) = get_request {
967                        self.treat_admin_query(query, &admin_keyexpr_prefix).await;
968                    } else {
969                        warn!("AdminSpace queryable was closed!");
970                    }
971                }
972            )
973        }
974    }
975
976    async fn run_fwd_discovery_mode(
977        &mut self,
978        group_subscriber: &Receiver<Sample>,
979        dds_disco_rcv: &Receiver<DiscoveryEvent>,
980        admin_keyexpr_prefix: OwnedKeyExpr,
981        admin_queryable: &Queryable<FifoChannelHandler<Query>>,
982    ) {
983        debug!(r#"Run in "forward discovery" mode"#);
984
985        // The data space where all discovery info are forwarded:
986        //   - writers discovery on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/writer/<dds_entity_admin_key>
987        //   - readers discovery on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/reader/<dds_entity_admin_key>
988        //   - ros_discovery_info on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/ros_disco/<gid>
989        // The PublicationCache is declared on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/**
990        // The QuerySubscriber is declared on  <KE_PREFIX_ADMIN_SPACE>/*/<KE_PREFIX_FWD_DISCO>/[<scope>]/**
991        let uuid: OwnedKeyExpr = self.zsession.zid().into();
992        let fwd_key_prefix = if let Some(scope) = &self.config.scope {
993            *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO / scope
994        } else {
995            *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO
996        };
997        let fwd_writers_key_prefix =
998            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("writer") };
999        let fwd_readers_key_prefix =
1000            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("reader") };
1001        let fwd_ros_discovery_key =
1002            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("ros_disco") };
1003        let fwd_declare_publication_cache_key = &fwd_key_prefix / *KE_ANY_N_SEGMENT;
1004        let fwd_discovery_subscription_key = if let Some(scope) = &self.config.scope {
1005            *KE_PREFIX_ADMIN_SPACE
1006                / *KE_ANY_1_SEGMENT
1007                / *KE_PREFIX_FWD_DISCO
1008                / scope
1009                / *KE_ANY_N_SEGMENT
1010        } else {
1011            *KE_PREFIX_ADMIN_SPACE / *KE_ANY_1_SEGMENT / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1012        };
1013
1014        // Register prefixes for optimization
1015        let fwd_writers_key_prefix_key = self
1016            .zsession
1017            .declare_keyexpr(fwd_writers_key_prefix)
1018            .await
1019            .expect("Failed to declare key expression for Fwd Discovery of writers");
1020        let fwd_readers_key_prefix_key = self
1021            .zsession
1022            .declare_keyexpr(fwd_readers_key_prefix)
1023            .await
1024            .expect("Failed to declare key expression for Fwd Discovery of readers");
1025        let fwd_ros_discovery_key_declared = self
1026            .zsession
1027            .declare_keyexpr(&fwd_ros_discovery_key)
1028            .await
1029            .expect("Failed to declare key expression for Fwd Discovery of ros_discovery");
1030
1031        // Cache the publications on admin space for late joiners DDS plugins
1032        let _fwd_disco_pub_cache = self
1033            .zsession
1034            .declare_publication_cache(fwd_declare_publication_cache_key)
1035            .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers
1036            .await
1037            .expect("Failed to declare PublicationCache for Fwd Discovery");
1038
1039        // Subscribe to remote DDS plugins publications of new Readers/Writers on admin space
1040        let fwd_disco_sub = self
1041            .zsession
1042            .declare_subscriber(fwd_discovery_subscription_key)
1043            .querying()
1044            .allowed_origin(Locality::Remote) // Note: ignore my own publications
1045            .query_timeout(self.config.queries_timeout)
1046            .await
1047            .expect("Failed to declare QueryingSubscriber for Fwd Discovery");
1048
1049        // Manage ros_discovery_info topic, reading it periodically
1050        let ros_disco_mgr =
1051            RosDiscoveryInfoMgr::create(self.dp).expect("Failed to create RosDiscoveryInfoMgr");
1052        let timer = Timer::default();
1053        let (tx, ros_disco_timer_rcv): (Sender<()>, Receiver<()>) = unbounded();
1054        let ros_disco_timer_event = TimedEvent::periodic(
1055            Duration::from_millis(ROS_DISCOVERY_INFO_POLL_INTERVAL_MS),
1056            ChannelEvent { tx },
1057        );
1058        timer.add_async(ros_disco_timer_event).await;
1059
1060        // The ParticipantEntitiesInfo to be re-published on ros_discovery_info (with this bridge's participant gid)
1061        let mut participant_info = ParticipantEntitiesInfo::new(
1062            get_guid(&self.dp).expect("Failed to get my Participant's guid"),
1063        );
1064
1065        let scope = self.config.scope.clone();
1066        loop {
1067            select!(
1068                evt = dds_disco_rcv.recv_async() => {
1069                    match evt.unwrap() {
1070                        DiscoveryEvent::DiscoveredPublication {
1071                            entity
1072                        } => {
1073                            debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1074                            // advertise the entity and its scope within admin space (bincode format)
1075                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
1076                            let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1077                            let msg = (&entity, &scope);
1078                            let ser_msg = match bincode::serialize(&msg) {
1079                                Ok(s) => s,
1080                                Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1081                            };
1082                            if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1083                                error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1084                            }
1085
1086                            // store the writer in admin space
1087                            self.insert_dds_writer(admin_keyexpr, entity);
1088                        }
1089
1090                        DiscoveryEvent::UndiscoveredPublication {
1091                            key,
1092                        } => {
1093                            debug!("Undiscovered DDS Writer {} => advertise it", key);
1094                            if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
1095                                let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1096                                // publish its deletion from admin space
1097                                if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1098                                    error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1099                                }
1100                            }
1101                        }
1102
1103                        DiscoveryEvent::DiscoveredSubscription {
1104                            mut entity
1105                        } => {
1106                            debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1107
1108                            // #102: create a local "to_dds" route, but only with the Zenoh Subscriber (not the DDS Writer)
1109                            // create 1 route per partition, or just 1 if no partition
1110                            if partition_is_empty(&entity.qos.partition) {
1111                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
1112                                let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&entity.qos), None).await;
1113                                if let RouteStatus::Routed(ref route_key) = route_status {
1114                                    if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1115                                        // if route has been created, add this Reader in its routed_readers list
1116                                        r.add_local_routed_reader(entity.key.clone());
1117                                    }
1118                                }
1119                                entity.routes.insert("*".to_string(), route_status);
1120                            } else {
1121                                for p in entity.qos.partition.as_deref().unwrap() {
1122                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
1123                                    let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&entity.qos), None).await;
1124                                    if let RouteStatus::Routed(ref route_key) = route_status {
1125                                        if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1126                                            // if route has been created, add this Reader in its routed_readers list
1127                                            r.add_local_routed_reader(entity.key.clone());
1128                                        }
1129                                    }
1130                                    entity.routes.insert(p.clone(), route_status);
1131                                }
1132                            }
1133
1134                            // advertise the entity and its scope within admin space (bincode format)
1135                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
1136                            let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1137                            let msg = (&entity, &scope);
1138                            let ser_msg = match bincode::serialize(&msg) {
1139                                Ok(s) => s,
1140                                Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1141                            };
1142                            if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1143                                error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1144                            }
1145
1146                            // store the reader
1147                            self.insert_dds_reader(admin_keyexpr, entity);
1148                        }
1149
1150                        DiscoveryEvent::UndiscoveredSubscription {
1151                            key,
1152                        } => {
1153                            debug!("Undiscovered DDS Reader {} => advertise it", key);
1154                            if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
1155                                let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1156                                // publish its deletion from admin space
1157                                if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1158                                    error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1159                                }
1160                            }
1161                            // #102: also remove the Reader from all the active routes referring it,
1162                            // deleting the route if it has no longer local Reader nor remote Writer.
1163                            let admin_space = &mut self.admin_space;
1164                            self.routes_to_dds.retain(|zkey, route| {
1165                                    route.remove_local_routed_reader(&key);
1166                                    if !route.has_local_routed_reader() && !route.has_remote_routed_writer(){
1167                                        info!(
1168                                            "{}: remove it as no longer unused (no local DDS Reader nor remote DDS Writer left)",
1169                                            route
1170                                        );
1171                                        let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1172                                        admin_space.remove(&ke);
1173                                        false
1174                                    } else {
1175                                        true
1176                                    }
1177                                }
1178                            );
1179                        }
1180
1181                        DiscoveryEvent::DiscoveredParticipant {
1182                            entity,
1183                        } => {
1184                            debug!("Discovered DDS Participant {}", entity.key);
1185                            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
1186
1187                            // store the participant
1188                            self.insert_dds_participant(admin_keyexpr, entity);
1189                        }
1190
1191                        DiscoveryEvent::UndiscoveredParticipant {
1192                            key,
1193                        } => {
1194                            if let Some((_, _)) = self.remove_dds_participant(&key) {
1195                                debug!("Undiscovered DDS Participant {}", key);
1196                            }
1197                        }
1198                    }
1199                },
1200
1201                sample = fwd_disco_sub.recv_async() => {
1202                    let sample = sample.expect("Fwd Discovery subscriber was closed!");
1203                    let fwd_ke = &sample.key_expr();
1204                    debug!("Received forwarded discovery message on {}", fwd_ke);
1205
1206                    // parse fwd_ke and extract the remote uuid, the discovery kind (reader|writer|ros_disco) and the remaining of the keyexpr
1207                    if let Some((remote_uuid, disco_kind, remaining_ke)) = Self::parse_fwd_discovery_keyexpr(fwd_ke) {
1208                        match disco_kind {
1209                            // it's a writer discovery message
1210                            "writer" => {
1211                                // reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
1212                                let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1213                                if sample.kind() != SampleKind::Delete {
1214                                    // deserialize payload
1215                                    let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1216                                        Ok(x) => x,
1217                                        Err(e) => {
1218                                            warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1219                                            continue;
1220                                        }
1221                                    };
1222                                    let qos = adapt_writer_qos_for_proxy_writer(&entity.qos);
1223
1224                                    // create 1 "to_dds" route per partition, or just 1 if no partition
1225                                    if partition_is_empty(&entity.qos.partition) {
1226                                        let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1227                                        let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1228                                        if let RouteStatus::Routed(ref route_key) = route_status {
1229                                            if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1230                                                // add the writer's admin keyexpr to the list of remote_routed_writers
1231                                                r.add_remote_routed_writer(full_admin_keyexpr);
1232                                                // check amongst local Readers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1233                                                for reader in self.discovered_readers.values_mut() {
1234                                                    if reader.topic_name == entity.topic_name && partition_is_empty(&reader.qos.partition) {
1235                                                        r.add_local_routed_reader(reader.key.clone());
1236                                                        reader.routes.insert("*".to_string(), route_status.clone());
1237                                                    }
1238                                                }
1239                                            }
1240                                        }
1241                                    } else {
1242                                        for p in entity.qos.partition.as_deref().unwrap() {
1243                                            let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1244                                            let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, entity.keyless, is_transient_local(&qos), Some(qos.clone())).await;
1245                                            if let RouteStatus::Routed(ref route_key) = route_status {
1246                                                if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1247                                                    // add the writer's admin keyexpr to the list of remote_routed_writers
1248                                                    r.add_remote_routed_writer(full_admin_keyexpr.clone());
1249                                                    // check amongst local Readers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1250                                                    for reader in self.discovered_readers.values_mut() {
1251                                                        if reader.topic_name == entity.topic_name && partition_contains(&reader.qos.partition, p) {
1252                                                            r.add_local_routed_reader(reader.key.clone());
1253                                                            reader.routes.insert(p.clone(), route_status.clone());
1254                                                        }
1255                                                    }
1256                                                }
1257                                            }
1258                                        }
1259                                    }
1260                                } else {
1261                                    // writer was deleted; remove it from all the active routes referring it (deleting the route if no longer used)
1262                                    let admin_space = &mut self.admin_space;
1263                                    self.routes_to_dds.retain(|zkey, route| {
1264                                            route.remove_remote_routed_writer(&full_admin_keyexpr);
1265                                            if route.has_remote_routed_writer() {
1266                                                // if there are still remote writers for this route, keep it
1267                                                true
1268                                            } else {
1269                                                // #102: Delete the DDS Writer of this route if there are no more remote Writers,
1270                                                // but don't delete the route itself if there is still a local Reader.
1271                                                route.delete_dds_writer();
1272                                                if !route.has_local_routed_reader() {
1273                                                    info!(
1274                                                        "{}: remove it as no longer unused (no remote DDS Writer nor local DDS Reader left)",
1275                                                        route
1276                                                    );
1277                                                    let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1278                                                    admin_space.remove(&ke);
1279                                                    false
1280                                                } else {
1281                                                    true
1282                                                }
1283                                            }
1284                                        }
1285                                    );
1286                                }
1287                            }
1288
1289                            // it's a reader discovery message
1290                            "reader" => {
1291                                // reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
1292                                let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1293                                if sample.kind() != SampleKind::Delete {
1294                                    // deserialize payload
1295                                    let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1296                                        Ok(x) => x,
1297                                        Err(e) => {
1298                                            warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1299                                            continue;
1300                                        }
1301                                    };
1302                                    let qos = adapt_reader_qos_for_proxy_reader(&entity.qos);
1303
1304                                    // CongestionControl to be used when re-publishing over zenoh: Blocking if Reader is RELIABLE (since Writer will also be, otherwise no matching)
1305                                    let congestion_ctrl = match (self.config.reliable_routes_blocking, is_reader_reliable(&entity.qos.reliability)) {
1306                                        (true, true) => CongestionControl::Block,
1307                                        _ => CongestionControl::Drop,
1308                                    };
1309
1310                                    // create 1 'from_dds" route per partition, or just 1 if no partition
1311                                    if partition_is_empty(&entity.qos.partition) {
1312                                        let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1313                                        let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1314                                        if let RouteStatus::Routed(ref route_key) = route_status {
1315                                            if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1316                                                // add the reader's admin keyexpr to the list of remote_routed_writers
1317                                                r.add_remote_routed_reader(full_admin_keyexpr);
1318                                                // check amongst local Writers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1319                                                for writer in self.discovered_writers.values_mut() {
1320                                                    if writer.topic_name == entity.topic_name && partition_is_empty(&writer.qos.partition) {
1321                                                        r.add_local_routed_writer(writer.key.clone());
1322                                                        writer.routes.insert("*".to_string(), route_status.clone());
1323                                                    }
1324                                                }
1325                                            }
1326                                        }
1327                                    } else {
1328                                        for p in &entity.qos.partition.unwrap() {
1329                                            let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1330                                            let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos.clone(), congestion_ctrl).await;
1331                                            if let RouteStatus::Routed(ref route_key) = route_status {
1332                                                if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1333                                                    // add the reader's admin keyexpr to the list of remote_routed_writers
1334                                                    r.add_remote_routed_reader(full_admin_keyexpr.clone());
1335                                                    // check amongst local Writers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1336                                                    for writer in self.discovered_writers.values_mut() {
1337                                                        if writer.topic_name == entity.topic_name && partition_contains(&writer.qos.partition, p) {
1338                                                            r.add_local_routed_writer(writer.key.clone());
1339                                                            writer.routes.insert(p.clone(), route_status.clone());
1340                                                        }
1341                                                    }
1342                                                }
1343                                            }
1344                                        }
1345                                    }
1346                                } else {
1347                                    // reader was deleted; remove it from all the active routes referring it (deleting the route if no longer used)
1348                                    let admin_space = &mut self.admin_space;
1349                                    self.routes_from_dds.retain(|zkey, route| {
1350                                            route.remove_remote_routed_reader(&full_admin_keyexpr);
1351                                            if !route.has_remote_routed_reader() {
1352                                                info!(
1353                                                    "{}: remove it as no longer unused (no remote DDS Reader left)",
1354                                                    route
1355                                                );
1356                                                let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1357                                                admin_space.remove(&ke);
1358                                                false
1359                                            } else {
1360                                                true
1361                                            }
1362                                        }
1363                                    );
1364                                }
1365                            }
1366
1367                            // it's a ros_discovery_info message
1368                            "ros_disco" => {
1369                                match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>(
1370                                    sample.payload().reader(),
1371                                    cdr::size::Infinite,
1372                                ) {
1373                                    Ok(mut info) => {
1374                                        // remap all original gids with the gids of the routes
1375                                        self.remap_entities_info(&mut info.node_entities_info_seq);
1376                                        // update the ParticipantEntitiesInfo for this bridge and re-publish it on DDS
1377                                        participant_info.update_with(info.node_entities_info_seq);
1378                                        debug!("Publish updated ros_discovery_info: {:?}", participant_info);
1379                                        if let Err(e) = ros_disco_mgr.write(&participant_info) {
1380                                            error!("Error forwarding ros_discovery_info: {}", e);
1381                                        }
1382                                    }
1383                                    Err(e) => error!(
1384                                        "Error receiving ParticipantEntitiesInfo on {}: {}",
1385                                        fwd_ke, e
1386                                    ),
1387                                }
1388                            }
1389
1390                            x => {
1391                                error!("Unexpected forwarded discovery message received on invalid key {} (unknown kind: {}) ", fwd_ke, x);
1392                            }
1393                        }
1394                    }
1395                },
1396
1397                group_event = group_subscriber.recv_async() => {
1398                    match group_event.as_ref().map(|s|s.kind()) {
1399                        Ok(SampleKind::Put) => {
1400                            let zid = zenoh_id!(group_event.as_ref().unwrap());
1401                            debug!("New zenoh_dds_plugin detected: {}", zid);
1402
1403                            if let Ok(zenoh_id) = keyexpr::new(zid) {
1404                                // query for past publications of discocvery messages from this new member
1405                                let key = if let Some(scope) = &self.config.scope {
1406                                    *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / scope / *KE_ANY_N_SEGMENT
1407                                } else {
1408                                    *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1409                                };
1410                                debug!("Query past discovery messages from {} on {}", zid, key);
1411                                if let Err(e) = fwd_disco_sub.fetch( |cb| {
1412                                    self.zsession.get(Selector::from(&key))
1413                                        .callback(cb)
1414                                        .target(QueryTarget::All)
1415                                        .consolidation(ConsolidationMode::None)
1416                                        .timeout(self.config.queries_timeout)
1417                                        .wait()
1418                                }).await
1419                                {
1420                                    warn!("Query on {} for discovery messages failed: {}", key, e);
1421                                }
1422                                // make all QueryingSubscriber to query this new member
1423                                for (zkey, route) in &mut self.routes_to_dds {
1424                                    route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
1425                                }
1426                            } else {
1427                                error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
1428                            }
1429                        }
1430                        Ok(SampleKind::Delete) => {
1431                            let zid = zenoh_id!(group_event.as_ref().unwrap());
1432                            debug!("Remote zenoh_dds_plugin left: {}", zid);
1433                            // remove all the references to the plugin's entities, removing no longer used routes
1434                            // and updating/re-publishing ParticipantEntitiesInfo
1435                            let admin_space = &mut self.admin_space;
1436                            let admin_subke = format!("@/{zid}/dds/");
1437                            let mut participant_info_changed = false;
1438                            self.routes_to_dds.retain(|zkey, route| {
1439                                route.remove_remote_routed_writers_containing(&admin_subke);
1440                                if !route.has_remote_routed_writer() {
1441                                    info!(
1442                                        "{}: remove it as no longer unused (no remote DDS Writer left)",
1443                                        route
1444                                    );
1445                                    let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1446                                    admin_space.remove(&ke);
1447                                    if let Ok(guid) = route.dds_writer_guid() {
1448                                        participant_info.remove_writer_gid(&guid);
1449                                        participant_info_changed = true;
1450                                    } else {
1451                                        warn!("Failed to get guid for Writer serving the route zenoh '{}' => DDS '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1452                                    }
1453                                    false
1454                                } else {
1455                                    true
1456                                }
1457                            });
1458                            self.routes_from_dds.retain(|zkey, route| {
1459                                route.remove_remote_routed_readers_containing(&admin_subke);
1460                                if !route.has_remote_routed_reader() {
1461                                    info!(
1462                                        "{}: remove it as no longer unused (no remote DDS Reader left)",
1463                                        route
1464                                    );
1465                                    let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1466                                    admin_space.remove(&ke);
1467                                    if let Ok(guid) = route.dds_reader_guid() {
1468                                        participant_info.remove_reader_gid(&guid);
1469                                        participant_info_changed = true;
1470                                    } else {
1471                                        warn!("Failed to get guid for Reader serving the route DDS '{}' => zenoh '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1472                                    }
1473                                    false
1474                                } else {
1475                                    true
1476                                }
1477                            });
1478                            if participant_info_changed {
1479                                debug!("Publishing up-to-date ros_discovery_info after leaving of plugin {}", zid);
1480                                participant_info.cleanup();
1481                                if let Err(e) = ros_disco_mgr.write(&participant_info) {
1482                                    error!("Error forwarding ros_discovery_info: {}", e);
1483                                }
1484                            }
1485                        }
1486                        Err(e) => warn!("Error receiving GroupEvent: {}", e)
1487                    }
1488                }
1489
1490                get_request = admin_queryable.recv_async() => {
1491                    if let Ok(query) = get_request {
1492                        self.treat_admin_query(query, &admin_keyexpr_prefix).await;
1493                    } else {
1494                        warn!("AdminSpace queryable was closed!");
1495                    }
1496                }
1497
1498                _ = ros_disco_timer_rcv.recv_async() => {
1499                    let infos = ros_disco_mgr.read();
1500                    for (gid, buf) in infos {
1501                        trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode());
1502                        // forward the payload on zenoh
1503                        let ke = &fwd_ros_discovery_key_declared / unsafe { keyexpr::from_str_unchecked(&gid) };
1504                        if let Err(e) = self.zsession.put(ke, buf).wait() {
1505                            error!("Forward ROS discovery info failed: {}", e);
1506                        }
1507                    }
1508                }
1509            )
1510        }
1511    }
1512
1513    fn parse_fwd_discovery_keyexpr(fwd_ke: &keyexpr) -> Option<(&keyexpr, &str, &keyexpr)> {
1514        // parse fwd_ke which have format: "KE_PREFIX_ADMIN_SPACE/<uuid>/KE_PREFIX_FWD_DISCO[/scope/possibly/multiple]/<disco_kind>/<remaining_ke...>"
1515        if !fwd_ke.starts_with(KE_PREFIX_ADMIN_SPACE.as_str()) {
1516            // publication on a key expression matching the fwd_ke: ignore it
1517            return None;
1518        }
1519        let mut remaining = &fwd_ke[KE_PREFIX_ADMIN_SPACE.len() + 1..];
1520        let uuid = if let Some(i) = remaining.find('/') {
1521            let uuid = unsafe { keyexpr::from_str_unchecked(&remaining[..i]) };
1522            remaining = &remaining[i + 1..];
1523            uuid
1524        } else {
1525            error!(
1526                "Unexpected forwarded discovery message received on invalid key: {}",
1527                fwd_ke
1528            );
1529            return None;
1530        };
1531        if !remaining.starts_with(KE_PREFIX_FWD_DISCO.as_str()) {
1532            // publication on a key expression matching the fwd_ke: ignore it
1533            return None;
1534        }
1535        let kind = if let Some(i) = remaining.find("/reader/") {
1536            remaining = &remaining[i + 8..];
1537            "reader"
1538        } else if let Some(i) = remaining.find("/writer/") {
1539            remaining = &remaining[i + 8..];
1540            "writer"
1541        } else if let Some(i) = remaining.find("/ros_disco/") {
1542            remaining = &remaining[i + 11..];
1543            "ros_disco"
1544        } else {
1545            error!("Unexpected forwarded discovery message received on invalid key: {} (no expected kind '/reader/', '/writer/' or '/ros_disco/')", fwd_ke);
1546            return None;
1547        };
1548        Some((uuid, kind, unsafe {
1549            keyexpr::from_str_unchecked(remaining)
1550        }))
1551    }
1552
1553    fn remap_entities_info(&self, entities_info: &mut HashMap<String, NodeEntitiesInfo>) {
1554        for node in entities_info.values_mut() {
1555            // TODO: replace with drain_filter when stable (https://github.com/rust-lang/rust/issues/43244)
1556            let mut i = 0;
1557            while i < node.reader_gid_seq.len() {
1558                // find a RouteDDSZenoh routing a remote reader with this gid
1559                match self
1560                    .routes_from_dds
1561                    .values()
1562                    .find(|route| route.is_routing_remote_reader(&node.reader_gid_seq[i]))
1563                {
1564                    Some(route) => {
1565                        // replace the gid with route's reader's gid
1566                        if let Ok(gid) = route.dds_reader_guid() {
1567                            trace!(
1568                                "ros_discovery_info remap reader {} -> {}",
1569                                node.reader_gid_seq[i],
1570                                gid
1571                            );
1572                            node.reader_gid_seq[i] = gid;
1573                            i += 1;
1574                        } else {
1575                            error!("Failed to get guid for Reader serving the a route. Can't remap in ros_discovery_info");
1576                        }
1577                    }
1578                    None => {
1579                        // remove the gid (not route found because either not allowed to be routed,
1580                        // either route already initiated by another reader)
1581                        trace!(
1582                            "ros_discovery_info remap reader {} -> NONE",
1583                            node.reader_gid_seq[i]
1584                        );
1585                        node.reader_gid_seq.remove(i);
1586                    }
1587                }
1588            }
1589            let mut i = 0;
1590            while i < node.writer_gid_seq.len() {
1591                // find a ToDdsRoute initiated by the writer with this gid
1592                match self
1593                    .routes_to_dds
1594                    .values()
1595                    .find(|route| route.is_routing_remote_writer(&node.writer_gid_seq[i]))
1596                {
1597                    Some(route) => {
1598                        // replace the gid with route's writer's gid
1599                        if let Ok(gid) = route.dds_writer_guid() {
1600                            trace!(
1601                                "ros_discovery_info remap writer {} -> {}",
1602                                node.writer_gid_seq[i],
1603                                gid
1604                            );
1605                            node.writer_gid_seq[i] = gid;
1606                            i += 1;
1607                        } else {
1608                            error!("Failed to get guid for Writer serving the a route. Can't remap in ros_discovery_info");
1609                        }
1610                    }
1611                    None => {
1612                        // remove the gid (not route found because either not allowed to be routed,
1613                        // either route already initiated by another writer)
1614                        trace!(
1615                            "ros_discovery_info remap writer {} -> NONE",
1616                            node.writer_gid_seq[i]
1617                        );
1618                        node.writer_gid_seq.remove(i);
1619                    }
1620                }
1621            }
1622        }
1623    }
1624}
1625
1626// Remove any null QoS values from a serde_json::Value
1627fn remove_null_qos_values(
1628    value: Result<Value, serde_json::Error>,
1629) -> Result<Value, serde_json::Error> {
1630    match value {
1631        Ok(value) => match value {
1632            Value::Object(mut obj) => {
1633                let qos = obj.get_mut("qos");
1634                if let Some(qos) = qos {
1635                    if qos.is_object() {
1636                        qos.as_object_mut().unwrap().retain(|_, v| !v.is_null());
1637                    }
1638                }
1639                Ok(Value::Object(obj))
1640            }
1641            _ => Ok(value),
1642        },
1643        Err(error) => Err(error),
1644    }
1645}
1646
1647// Copy and adapt Writer's QoS for creation of a matching Reader
1648fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos {
1649    let mut reader_qos = qos.clone();
1650
1651    // Unset any writer QoS that doesn't apply to data readers
1652    reader_qos.durability_service = None;
1653    reader_qos.ownership_strength = None;
1654    reader_qos.transport_priority = None;
1655    reader_qos.lifespan = None;
1656    reader_qos.writer_data_lifecycle = None;
1657    reader_qos.writer_batching = None;
1658
1659    // Unset proprietary QoS which shouldn't apply
1660    reader_qos.properties = None;
1661    reader_qos.entity_name = None;
1662    reader_qos.ignore_local = None;
1663
1664    // Set default Reliability QoS if not set for writer
1665    if reader_qos.reliability.is_none() {
1666        reader_qos.reliability = Some({
1667            Reliability {
1668                kind: ReliabilityKind::BEST_EFFORT,
1669                max_blocking_time: DDS_100MS_DURATION,
1670            }
1671        });
1672    }
1673
1674    reader_qos
1675}
1676
1677// Copy and adapt Writer's QoS for creation of a proxy Writer
1678fn adapt_writer_qos_for_proxy_writer(qos: &Qos) -> Qos {
1679    let mut writer_qos = qos.clone();
1680
1681    // Unset proprietary QoS which shouldn't apply
1682    writer_qos.properties = None;
1683    writer_qos.entity_name = None;
1684
1685    // Don't match with readers with the same participant
1686    writer_qos.ignore_local = Some(IgnoreLocal {
1687        kind: IgnoreLocalKind::PARTICIPANT,
1688    });
1689
1690    writer_qos
1691}
1692
1693// Copy and adapt Reader's QoS for creation of a matching Writer
1694fn adapt_reader_qos_for_writer(qos: &Qos) -> Qos {
1695    let mut writer_qos = qos.clone();
1696
1697    // Unset any reader QoS that doesn't apply to data writers
1698    writer_qos.time_based_filter = None;
1699    writer_qos.reader_data_lifecycle = None;
1700    writer_qos.properties = None;
1701    writer_qos.entity_name = None;
1702
1703    // Don't match with readers with the same participant
1704    writer_qos.ignore_local = Some(IgnoreLocal {
1705        kind: IgnoreLocalKind::PARTICIPANT,
1706    });
1707
1708    // if Reader is TRANSIENT_LOCAL, configure durability_service QoS with same history as the Reader.
1709    // This is because CycloneDDS is actually using durability_service.history for transient_local historical data.
1710    if is_transient_local(qos) {
1711        let history = qos
1712            .history
1713            .as_ref()
1714            .map_or(History::default(), |history| history.clone());
1715
1716        writer_qos.durability_service = Some(DurabilityService {
1717            service_cleanup_delay: 60 * DDS_1S_DURATION,
1718            history_kind: history.kind,
1719            history_depth: history.depth,
1720            max_samples: DDS_LENGTH_UNLIMITED,
1721            max_instances: DDS_LENGTH_UNLIMITED,
1722            max_samples_per_instance: DDS_LENGTH_UNLIMITED,
1723        });
1724    }
1725    // Workaround for the DDS Writer to correctly match with a FastRTPS Reader
1726    writer_qos.reliability = match writer_qos.reliability {
1727        Some(mut reliability) => {
1728            reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1729            Some(reliability)
1730        }
1731        _ => {
1732            let mut reliability = Reliability::default();
1733            reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1734            Some(reliability)
1735        }
1736    };
1737
1738    writer_qos
1739}
1740
1741// Copy and adapt Reader's QoS for creation of a proxy Reader
1742fn adapt_reader_qos_for_proxy_reader(qos: &Qos) -> Qos {
1743    let mut reader_qos = qos.clone();
1744
1745    // Unset proprietary QoS which shouldn't apply
1746    reader_qos.properties = None;
1747    reader_qos.entity_name = None;
1748    reader_qos.ignore_local = None;
1749
1750    reader_qos
1751}
1752
1753//TODO replace when stable https://github.com/rust-lang/rust/issues/65816
1754#[inline]
1755pub(crate) fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
1756    let mut me = ManuallyDrop::new(v);
1757    (me.as_mut_ptr(), me.len(), me.capacity())
1758}
1759
1760struct ChannelEvent {
1761    tx: Sender<()>,
1762}
1763
1764#[async_trait]
1765impl Timed for ChannelEvent {
1766    async fn run(&mut self) {
1767        if self.tx.send(()).is_err() {
1768            warn!("Error sending periodic timer notification on channel");
1769        };
1770    }
1771}