zenoh_plugin_dds/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14#![allow(deprecated)]
15use std::{
16    collections::HashMap,
17    env,
18    future::Future,
19    mem::ManuallyDrop,
20    sync::{
21        atomic::{AtomicBool, AtomicUsize, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26
27use async_trait::async_trait;
28use cyclors::{
29    qos::{
30        DurabilityService, History, IgnoreLocal, IgnoreLocalKind, Qos, Reliability,
31        ReliabilityKind, DDS_100MS_DURATION, DDS_1S_DURATION,
32    },
33    *,
34};
35use flume::{unbounded, Receiver, Sender};
36use futures::select;
37use route_dds_zenoh::RouteDDSZenoh;
38use serde::{ser::SerializeStruct, Serialize, Serializer};
39use serde_json::Value;
40use tokio::task::JoinHandle;
41use tracing::{debug, error, info, trace, warn};
42use zenoh::{
43    bytes::{Encoding, ZBytes},
44    handlers::FifoChannelHandler,
45    internal::{
46        plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
47        runtime::DynamicRuntime,
48        zerror, Timed, TimedEvent, Timer,
49    },
50    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
51    liveliness::LivelinessToken,
52    qos::CongestionControl,
53    query::{ConsolidationMode, Query, QueryTarget, Queryable, Selector},
54    sample::{Locality, Sample, SampleKind},
55    Result as ZResult, Session, Wait,
56};
57use zenoh_ext::{SessionExt, SubscriberBuilderExt};
58use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
59
60pub mod config;
61mod dds_mgt;
62mod qos_helpers;
63mod ros_discovery;
64mod route_dds_zenoh;
65mod route_zenoh_dds;
66use config::Config;
67use dds_mgt::*;
68
69use crate::{
70    qos_helpers::*,
71    ros_discovery::{
72        NodeEntitiesInfo, ParticipantEntitiesInfo, RosDiscoveryInfoMgr,
73        ROS_DISCOVERY_INFO_TOPIC_NAME,
74    },
75    route_zenoh_dds::RouteZenohDDS,
76};
77
78macro_rules! zenoh_id {
79    ($val:expr) => {
80        $val.key_expr().as_str().split('/').last().unwrap()
81    };
82}
83
84lazy_static::lazy_static! {
85    static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
86    static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
87    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
88    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
89               .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
90               .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
91               .enable_all()
92               .build()
93               .expect("Unable to create runtime");
94}
95#[inline(always)]
96pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
97where
98    F: Future + Send + 'static,
99    F::Output: Send + 'static,
100{
101    // Check whether able to get the current runtime
102    match tokio::runtime::Handle::try_current() {
103        Ok(rt) => {
104            // Able to get the current runtime (standalone binary), spawn on the current runtime
105            rt.spawn(task)
106        }
107        Err(_) => {
108            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
109            TOKIO_RUNTIME.spawn(task)
110        }
111    }
112}
113
114lazy_static::lazy_static!(
115    static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
116
117    static ref KE_PREFIX_ADMIN_SPACE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") };
118    static ref KE_PREFIX_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("dds") };
119    static ref KE_PREFIX_ROUTE_TO_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/to_dds") };
120    static ref KE_PREFIX_ROUTE_FROM_DDS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("route/from_dds") };
121    static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_pub_cache") };
122    static ref KE_PREFIX_FWD_DISCO: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@dds_fwd_disco") };
123    static ref KE_PREFIX_LIVELINESS_GROUP: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("zenoh-plugin-dds") };
124
125    static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
126    static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
127
128    static ref LOG_ROS2_DEPRECATION_WARNING_FLAG: AtomicBool = AtomicBool::new(false);
129);
130
131// CycloneDDS' localhost-only: set network interface address (shortened form of config would be
132// possible, too, but I think it is clearer to spell it out completely).
133// Empty configuration fragments are ignored, so it is safe to unconditionally append a comma.
134const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General><Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces></General></Domain></CycloneDDS>,"#;
135
136// CycloneDDS' enable-shm: enable usage of Iceoryx PSMX plugin
137#[cfg(feature = "dds_shm")]
138const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><General><Interfaces><PubSubMessageExchange type="iox" library="psmx_iox"/></Interfaces></General></Domain></CycloneDDS>,"#;
139
140const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 500;
141
142#[cfg(feature = "dynamic_plugin")]
143zenoh_plugin_trait::declare_plugin!(DDSPlugin);
144
145fn log_ros2_deprecation_warning() {
146    if !LOG_ROS2_DEPRECATION_WARNING_FLAG.swap(true, std::sync::atomic::Ordering::Relaxed) {
147        tracing::warn!("------------------------------------------------------------------------------------------");
148        tracing::warn!(
149            "ROS 2 system detected. Did you know a new Zenoh bridge dedicated to ROS 2 exists ?"
150        );
151        tracing::warn!("Check it out on https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds");
152        tracing::warn!("This DDS bridge will eventually be deprecated for ROS 2 usage in favor of this new bridge.");
153        tracing::warn!("------------------------------------------------------------------------------------------");
154    }
155}
156
157#[allow(clippy::upper_case_acronyms)]
158pub struct DDSPlugin;
159
160impl PluginControl for DDSPlugin {}
161impl ZenohPlugin for DDSPlugin {}
162impl Plugin for DDSPlugin {
163    type StartArgs = DynamicRuntime;
164    type Instance = RunningPlugin;
165
166    const DEFAULT_NAME: &'static str = "dds";
167    const PLUGIN_VERSION: &'static str = plugin_version!();
168    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
169
170    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
171        // Try to initiate login.
172        // Required in case of dynamic lib, otherwise no logs.
173        // But cannot be done twice in case of static link.
174        zenoh::try_init_log_from_env();
175
176        let runtime_conf = runtime.get_config();
177        let plugin_conf = runtime_conf
178            .get_plugin_config(name)
179            .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
180        let config: Config = serde_json::from_value(plugin_conf.clone())
181            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
182        WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
183        MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
184
185        spawn_runtime(run(runtime.clone(), config));
186
187        Ok(Box::new(DDSPlugin))
188    }
189}
190impl RunningPluginTrait for DDSPlugin {}
191
192pub async fn run(runtime: DynamicRuntime, config: Config) {
193    // Try to initiate login.
194    // Required in case of dynamic lib, otherwise no logs.
195    // But cannot be done twice in case of static link.
196    zenoh::try_init_log_from_env();
197    debug!("DDS plugin {}", DDSPlugin::PLUGIN_LONG_VERSION);
198    debug!("DDS plugin {:?}", config);
199
200    // open zenoh-net Session
201    let zsession = match zenoh::session::init(runtime)
202        .aggregated_subscribers(config.generalise_subs.clone())
203        .aggregated_publishers(config.generalise_pubs.clone())
204        .await
205    {
206        Ok(session) => Arc::new(session),
207        Err(e) => {
208            tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
209            return;
210        }
211    };
212
213    let member = match zsession
214        .liveliness()
215        .declare_token(*KE_PREFIX_LIVELINESS_GROUP / &zsession.zid().into_keyexpr())
216        .await
217    {
218        Ok(member) => member,
219        Err(e) => {
220            tracing::error!(
221                "Unable to declare liveliness token for DDS plugin : {:?}",
222                e
223            );
224            return;
225        }
226    };
227
228    // if "localhost_only" is set, configure CycloneDDS to use only localhost interface
229    if config.localhost_only {
230        env::set_var(
231            "CYCLONEDDS_URI",
232            format!(
233                "{}{}",
234                CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
235                env::var("CYCLONEDDS_URI").unwrap_or_default()
236            ),
237        );
238    }
239
240    // if "enable_shm" is set, configure CycloneDDS to use Iceoryx PSMX plugin
241    #[cfg(feature = "dds_shm")]
242    {
243        if config.shm_enabled {
244            env::set_var(
245                "CYCLONEDDS_URI",
246                format!(
247                    "{}{}",
248                    CYCLONEDDS_CONFIG_ENABLE_SHM,
249                    env::var("CYCLONEDDS_URI").unwrap_or_default()
250                ),
251            );
252            if config.forward_discovery {
253                warn!("DDS shared memory support enabled but will not be used as forward discovery mode is active.");
254            }
255        }
256    }
257
258    // create DDS Participant
259    debug!(
260        "Create DDS Participant with CYCLONEDDS_URI='{}'",
261        env::var("CYCLONEDDS_URI").unwrap_or_default()
262    );
263    let dp = unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
264    debug!(
265        "DDS plugin {} using DDS Participant {}",
266        zsession.zid(),
267        get_guid(&dp).unwrap()
268    );
269
270    let mut dds_plugin = DdsPluginRuntime {
271        config,
272        zsession: &zsession,
273        _member: member,
274        dp,
275        discovered_participants: HashMap::<String, DdsParticipant>::new(),
276        discovered_writers: HashMap::<String, DdsEntity>::new(),
277        discovered_readers: HashMap::<String, DdsEntity>::new(),
278        routes_from_dds: HashMap::<OwnedKeyExpr, RouteDDSZenoh>::new(),
279        routes_to_dds: HashMap::<OwnedKeyExpr, RouteZenohDDS>::new(),
280        admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
281    };
282
283    dds_plugin.run().await;
284}
285
286// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
287#[derive(Debug)]
288enum AdminRef {
289    DdsParticipant(String),
290    DdsWriterEntity(String),
291    DdsReaderEntity(String),
292    FromDdsRoute(OwnedKeyExpr),
293    ToDdsRoute(OwnedKeyExpr),
294    Config,
295    Version,
296}
297
298pub(crate) struct DdsPluginRuntime<'a> {
299    config: Config,
300    // Note: &'a Arc<Session> here to keep the ownership of Session outside this struct
301    // and be able to store the publishers/subscribers it creates in this same struct.
302    zsession: &'a Arc<Session>,
303    _member: LivelinessToken,
304    dp: dds_entity_t,
305    // maps of all discovered DDS entities (indexed by DDS key)
306    discovered_participants: HashMap<String, DdsParticipant>,
307    discovered_writers: HashMap<String, DdsEntity>,
308    discovered_readers: HashMap<String, DdsEntity>,
309    // maps of established routes from/to DDS (indexed by zenoh key expression)
310    routes_from_dds: HashMap<OwnedKeyExpr, RouteDDSZenoh<'a>>,
311    routes_to_dds: HashMap<OwnedKeyExpr, RouteZenohDDS<'a>>,
312    // admin space: index is the admin_keyexpr (relative to admin_prefix)
313    // value is the JSon string to return to queries.
314    admin_space: HashMap<OwnedKeyExpr, AdminRef>,
315}
316
317impl Serialize for DdsPluginRuntime<'_> {
318    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
319    where
320        S: Serializer,
321    {
322        // return the plugin's config as a JSON struct
323        let mut s = serializer.serialize_struct("dds", 3)?;
324        s.serialize_field("domain", &self.config.domain)?;
325        s.serialize_field("scope", &self.config.scope)?;
326        s.serialize_field(
327            "allow",
328            &self
329                .config
330                .allow
331                .as_ref()
332                .map_or_else(|| ".*".to_string(), |re| re.to_string()),
333        )?;
334        s.serialize_field(
335            "deny",
336            &self
337                .config
338                .deny
339                .as_ref()
340                .map_or_else(|| "".to_string(), |re| re.to_string()),
341        )?;
342        s.serialize_field(
343            "max-frequencies",
344            &self
345                .config
346                .max_frequencies
347                .iter()
348                .map(|(re, freq)| format!("{re}={freq}"))
349                .collect::<Vec<String>>(),
350        )?;
351        s.serialize_field("forward_discovery", &self.config.forward_discovery)?;
352        s.serialize_field(
353            "reliable_routes_blocking",
354            &self.config.reliable_routes_blocking,
355        )?;
356        s.end()
357    }
358}
359
360lazy_static::lazy_static! {
361    static ref JSON_NULL_VALUE: Value = serde_json::json!(null);
362}
363
364impl<'a> DdsPluginRuntime<'a> {
365    fn is_allowed(&self, ke: &keyexpr) -> bool {
366        if ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
367            log_ros2_deprecation_warning();
368        }
369
370        if self.config.forward_discovery && ke.ends_with(ROS_DISCOVERY_INFO_TOPIC_NAME) {
371            // If fwd-discovery mode is enabled, don't route "ros_discovery_info"
372            return false;
373        }
374        match (&self.config.allow, &self.config.deny) {
375            (Some(allow), None) => allow.is_match(ke),
376            (None, Some(deny)) => !deny.is_match(ke),
377            (Some(allow), Some(deny)) => allow.is_match(ke) && !deny.is_match(ke),
378            (None, None) => true,
379        }
380    }
381
382    // Return the read period if keyexpr matches one of the --dds-periodic-topics option
383    fn get_read_period(&self, ke: &keyexpr) -> Option<Duration> {
384        for (re, freq) in &self.config.max_frequencies {
385            if re.is_match(ke) {
386                return Some(Duration::from_secs_f32(1f32 / freq));
387            }
388        }
389        None
390    }
391
392    fn get_participant_admin_keyexpr(e: &DdsParticipant) -> OwnedKeyExpr {
393        format!("participant/{}", e.key,).try_into().unwrap()
394    }
395
396    fn get_entity_admin_keyexpr(e: &DdsEntity, is_writer: bool) -> OwnedKeyExpr {
397        format!(
398            "participant/{}/{}/{}/{}",
399            e.participant_key,
400            if is_writer { "writer" } else { "reader" },
401            e.key,
402            e.topic_name
403        )
404        .try_into()
405        .unwrap()
406    }
407
408    fn insert_dds_participant(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsParticipant) {
409        // insert reference in admin space
410        self.admin_space
411            .insert(admin_keyexpr, AdminRef::DdsParticipant(e.key.clone()));
412
413        // insert DdsParticipant in discovered_participants map
414        self.discovered_participants.insert(e.key.clone(), e);
415    }
416
417    fn remove_dds_participant(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsParticipant)> {
418        // remove from participants map
419        if let Some(e) = self.discovered_participants.remove(dds_key) {
420            // remove from admin_space
421            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&e);
422            self.admin_space.remove(&admin_keyexpr);
423            Some((admin_keyexpr, e))
424        } else {
425            None
426        }
427    }
428
429    fn insert_dds_writer(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
430        // insert reference in admin_space
431        self.admin_space
432            .insert(admin_keyexpr, AdminRef::DdsWriterEntity(e.key.clone()));
433
434        // insert DdsEntity in dds_writer map
435        self.discovered_writers.insert(e.key.clone(), e);
436    }
437
438    fn remove_dds_writer(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
439        // remove from dds_writer map
440        if let Some(e) = self.discovered_writers.remove(dds_key) {
441            // remove from admin_space
442            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, true);
443            self.admin_space.remove(&admin_keyexpr);
444            Some((admin_keyexpr, e))
445        } else {
446            None
447        }
448    }
449
450    fn insert_dds_reader(&mut self, admin_keyexpr: OwnedKeyExpr, e: DdsEntity) {
451        // insert reference in admin_space
452        self.admin_space
453            .insert(admin_keyexpr, AdminRef::DdsReaderEntity(e.key.clone()));
454
455        // insert DdsEntity in dds_reader map
456        self.discovered_readers.insert(e.key.clone(), e);
457    }
458
459    fn remove_dds_reader(&mut self, dds_key: &str) -> Option<(OwnedKeyExpr, DdsEntity)> {
460        // remove from dds_reader map
461        if let Some(e) = self.discovered_readers.remove(dds_key) {
462            // remove from admin space
463            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&e, false);
464            self.admin_space.remove(&admin_keyexpr);
465            Some((admin_keyexpr, e))
466        } else {
467            None
468        }
469    }
470
471    fn insert_route_from_dds(&mut self, ke: OwnedKeyExpr, r: RouteDDSZenoh<'a>) {
472        // insert reference in admin_space
473        let admin_ke = *KE_PREFIX_ROUTE_FROM_DDS / &ke;
474        self.admin_space
475            .insert(admin_ke, AdminRef::FromDdsRoute(ke.clone()));
476
477        // insert route in routes_from_dds map
478        self.routes_from_dds.insert(ke, r);
479    }
480
481    fn insert_route_to_dds(&mut self, ke: OwnedKeyExpr, r: RouteZenohDDS<'a>) {
482        // insert reference in admin_space
483        let admin_ke: OwnedKeyExpr = *KE_PREFIX_ROUTE_TO_DDS / &ke;
484        self.admin_space
485            .insert(admin_ke, AdminRef::ToDdsRoute(ke.clone()));
486
487        // insert route in routes_from_dds map
488        self.routes_to_dds.insert(ke, r);
489    }
490
491    #[allow(clippy::too_many_arguments)]
492    async fn try_add_route_from_dds(
493        &mut self,
494        ke: OwnedKeyExpr,
495        topic_name: &str,
496        topic_type: &str,
497        type_info: &Option<TypeInfo>,
498        keyless: bool,
499        reader_qos: Qos,
500        congestion_ctrl: CongestionControl,
501    ) -> RouteStatus {
502        if !self.is_allowed(&ke) {
503            info!(
504                "Ignoring Publication for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
505                ke
506            );
507            return RouteStatus::NotAllowed;
508        }
509
510        if self.routes_from_dds.contains_key(&ke) {
511            // TODO: check if there is no QoS conflict with existing route
512            debug!(
513                "Route from DDS to resource {} already exists -- ignoring",
514                ke
515            );
516            return RouteStatus::Routed(ke);
517        }
518
519        // create route DDS->Zenoh
520        match RouteDDSZenoh::new(
521            self,
522            topic_name.into(),
523            topic_type.into(),
524            type_info,
525            keyless,
526            reader_qos,
527            ke.clone(),
528            congestion_ctrl,
529        )
530        .await
531        {
532            Ok(route) => {
533                info!("{}: created with topic_type={}", route, topic_type);
534                self.insert_route_from_dds(ke.clone(), route);
535                RouteStatus::Routed(ke)
536            }
537            Err(e) => {
538                error!(
539                    "Route DDS->Zenoh ({} -> {}): creation failed: {}",
540                    topic_name, ke, e
541                );
542                RouteStatus::CreationFailure(e)
543            }
544        }
545    }
546
547    #[allow(clippy::too_many_arguments)]
548    async fn try_add_route_to_dds(
549        &mut self,
550        ke: OwnedKeyExpr,
551        topic_name: &str,
552        topic_type: &str,
553        type_info: &Option<TypeInfo>,
554        keyless: bool,
555        is_transient: bool,
556        writer_qos: Option<Qos>,
557    ) -> RouteStatus {
558        if !self.is_allowed(&ke) {
559            info!(
560                "Ignoring Subscription for resource {} as it is not allowed (see your 'allow' or 'deny' configuration)",
561                ke
562            );
563            return RouteStatus::NotAllowed;
564        }
565
566        if let Some(route) = self.routes_to_dds.get(&ke) {
567            // TODO: check if there is no type or QoS conflict with existing route
568            debug!(
569                "Route from resource {} to DDS already exists -- ignoring",
570                ke
571            );
572            // #102: in forwarding mode, it might happen that the route have been created but without DDS Writer
573            //       (just to declare the Zenoh Subscriber). Thus, try to set a DDS Writer to the route here.
574            //       If already set, nothing will happen.
575            if let Some(qos) = writer_qos {
576                if let Err(e) = route.set_dds_writer(self.dp, type_info, qos) {
577                    error!(
578                        "{}: failed to set a DDS Writer after creation: {}",
579                        route, e
580                    );
581                    return RouteStatus::CreationFailure(e);
582                }
583            }
584            return RouteStatus::Routed(ke);
585        }
586
587        // create route Zenoh->DDS
588        match RouteZenohDDS::new(
589            self,
590            ke.clone(),
591            is_transient,
592            topic_name.into(),
593            topic_type.into(),
594            keyless,
595        )
596        .await
597        {
598            Ok(route) => {
599                // if writer_qos is set, add a DDS Writer to the route
600                if let Some(qos) = writer_qos {
601                    if let Err(e) = route.set_dds_writer(self.dp, type_info, qos) {
602                        error!(
603                            "Route Zenoh->DDS ({} -> {}): creation failed: {}",
604                            ke, topic_name, e
605                        );
606                        return RouteStatus::CreationFailure(e);
607                    }
608                }
609
610                info!("{}: created with topic_type={}", route, topic_type);
611                self.insert_route_to_dds(ke.clone(), route);
612                RouteStatus::Routed(ke)
613            }
614            Err(e) => {
615                error!(
616                    "Route Zenoh->DDS ({} -> {}): creation failed: {}",
617                    ke, topic_name, e
618                );
619                RouteStatus::CreationFailure(e)
620            }
621        }
622    }
623
624    fn get_admin_value(&self, admin_ref: &AdminRef) -> Result<Option<Value>, serde_json::Error> {
625        match admin_ref {
626            AdminRef::DdsParticipant(key) => self
627                .discovered_participants
628                .get(key)
629                .map(serde_json::to_value)
630                .map(remove_null_qos_values)
631                .transpose(),
632            AdminRef::DdsReaderEntity(key) => self
633                .discovered_readers
634                .get(key)
635                .map(serde_json::to_value)
636                .map(remove_null_qos_values)
637                .transpose(),
638            AdminRef::DdsWriterEntity(key) => self
639                .discovered_writers
640                .get(key)
641                .map(serde_json::to_value)
642                .map(remove_null_qos_values)
643                .transpose(),
644            AdminRef::FromDdsRoute(zkey) => self
645                .routes_from_dds
646                .get(zkey)
647                .map(serde_json::to_value)
648                .transpose(),
649            AdminRef::ToDdsRoute(zkey) => self
650                .routes_to_dds
651                .get(zkey)
652                .map(serde_json::to_value)
653                .transpose(),
654            AdminRef::Config => Some(serde_json::to_value(self)).transpose(),
655            AdminRef::Version => Ok(Some(DDSPlugin::PLUGIN_LONG_VERSION.into())),
656        }
657    }
658
659    async fn treat_admin_query(&self, query: Query, admin_keyexpr_prefix: &keyexpr) {
660        let selector = query.selector();
661        debug!("Query on admin space: {:?}", selector);
662
663        // get the list of sub-key expressions that will match the same stored keys than
664        // the selector, if those keys had the admin_keyexpr_prefix.
665        let sub_kes = selector.key_expr().strip_prefix(admin_keyexpr_prefix);
666        if sub_kes.is_empty() {
667            error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
668            return;
669        }
670
671        // Get all matching keys/values
672        let mut kvs: Vec<(KeyExpr, Value)> = Vec::with_capacity(sub_kes.len());
673        for sub_ke in sub_kes {
674            if sub_ke.contains('*') {
675                // iterate over all admin space to find matching keys
676                for (ke, admin_ref) in self.admin_space.iter() {
677                    if sub_ke.intersects(ke) {
678                        match self.get_admin_value(admin_ref) {
679                            Ok(Some(v)) => kvs.push((ke.into(), v)),
680                            Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
681                            Err(e) => {
682                                error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
683                            }
684                        }
685                    }
686                }
687            } else {
688                // sub_ke correspond to 1 key - just get it.
689                if let Some(admin_ref) = self.admin_space.get(sub_ke) {
690                    match self.get_admin_value(admin_ref) {
691                        Ok(Some(v)) => kvs.push((sub_ke.into(), v)),
692                        Ok(None) => error!("INTERNAL ERROR: Dangling {:?}", admin_ref),
693                        Err(e) => {
694                            error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
695                        }
696                    }
697                }
698            }
699        }
700
701        // send replies
702        for (ke, v) in kvs.drain(..) {
703            let admin_keyexpr = admin_keyexpr_prefix / &ke;
704            match serde_json::to_vec(&v) {
705                Ok(vec_u8) => {
706                    let payload = ZBytes::from(vec_u8);
707                    if let Err(e) = query
708                        .reply(admin_keyexpr, payload)
709                        .encoding(Encoding::APPLICATION_JSON)
710                        .await
711                    {
712                        warn!("Error replying to admin query {:?}: {}", query, e);
713                    }
714                }
715                Err(e) => warn!("Error transforming JSON to admin query {:?}: {}", query, e),
716            }
717        }
718    }
719
720    async fn run(&mut self) {
721        let group_subscriber = self
722            .zsession
723            .liveliness()
724            .declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
725            .querying()
726            .with(flume::unbounded())
727            .await
728            .expect("Failed to create Liveliness Subscriber");
729
730        // run DDS discovery
731        let (tx, dds_disco_rcv): (Sender<DiscoveryEvent>, Receiver<DiscoveryEvent>) = unbounded();
732        run_discovery(self.dp, tx);
733
734        // declare admin space queryable
735        let admin_keyexpr_prefix =
736            *KE_PREFIX_ADMIN_SPACE / &self.zsession.zid().into_keyexpr() / *KE_PREFIX_DDS;
737        let admin_keyexpr_expr = (&admin_keyexpr_prefix) / *KE_ANY_N_SEGMENT;
738        debug!("Declare admin space on {}", admin_keyexpr_expr);
739        let admin_queryable = self
740            .zsession
741            .declare_queryable(admin_keyexpr_expr)
742            .await
743            .expect("Failed to create AdminSpace queryable");
744
745        // add plugin's config and version in admin space
746        self.admin_space
747            .insert("config".try_into().unwrap(), AdminRef::Config);
748        self.admin_space
749            .insert("version".try_into().unwrap(), AdminRef::Version);
750
751        if self.config.forward_discovery {
752            self.run_fwd_discovery_mode(
753                &group_subscriber,
754                &dds_disco_rcv,
755                admin_keyexpr_prefix,
756                &admin_queryable,
757            )
758            .await;
759        } else {
760            self.run_local_discovery_mode(
761                &group_subscriber,
762                &dds_disco_rcv,
763                admin_keyexpr_prefix,
764                &admin_queryable,
765            )
766            .await;
767        }
768    }
769
770    fn topic_to_keyexpr(
771        &self,
772        topic_name: &str,
773        scope: &Option<OwnedKeyExpr>,
774        partition: Option<&str>,
775    ) -> ZResult<OwnedKeyExpr> {
776        // key_expr for a topic is: "<scope>/<partition>/<topic_name>" with <scope> and <partition> being optional
777        match (scope, partition) {
778            (Some(scope), Some(part)) => scope.join(&format!("{part}/{topic_name}")),
779            (Some(scope), None) => scope.join(topic_name),
780            (None, Some(part)) => format!("{part}/{topic_name}").try_into(),
781            (None, None) => topic_name.try_into(),
782        }
783    }
784
785    async fn run_local_discovery_mode(
786        &mut self,
787        group_subscriber: &Receiver<Sample>,
788        dds_disco_rcv: &Receiver<DiscoveryEvent>,
789        admin_keyexpr_prefix: OwnedKeyExpr,
790        admin_queryable: &Queryable<FifoChannelHandler<Query>>,
791    ) {
792        debug!(r#"Run in "local discovery" mode"#);
793
794        loop {
795            select!(
796                evt = dds_disco_rcv.recv_async() => {
797                    match evt.unwrap() {
798                        DiscoveryEvent::DiscoveredPublication {
799                            mut entity
800                        } => {
801                            debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
802                            // get its admin_keyexpr
803                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
804
805                            let qos = adapt_writer_qos_for_reader(&entity.qos);
806                            // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS)
807                            let congestion_ctrl = match (self.config.reliable_routes_blocking, is_writer_reliable(&entity.qos.reliability)) {
808                                (true, true) => CongestionControl::Block,
809                                _ => CongestionControl::Drop,
810                            };
811
812                            // create 1 route per partition, or just 1 if no partition
813                            if partition_is_empty(&entity.qos.partition) {
814                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
815                                let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
816                                if let RouteStatus::Routed(ref route_key) = route_status {
817                                    if let Some(r) = self.routes_from_dds.get_mut(route_key) {
818                                        // add Writer's key to the route
819                                        r.add_local_routed_writer(entity.key.clone());
820                                    }
821                                }
822                                entity.routes.insert("*".to_string(), route_status);
823                            } else {
824                                for p in entity.qos.partition.as_deref().unwrap() {
825                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
826                                    let mut qos = qos.clone();
827                                    qos.partition = Some(vec![p.to_string()]);
828                                    let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
829                                    if let RouteStatus::Routed(ref route_key) = route_status {
830                                        if let Some(r) = self.routes_from_dds.get_mut(route_key) {
831                                            // if route has been created, add this Writer in its routed_writers list
832                                            r.add_local_routed_writer(entity.key.clone());
833                                        }
834                                    }
835                                    entity.routes.insert(p.clone(), route_status);
836                                }
837                            }
838
839                            // store the writer
840                            self.insert_dds_writer(admin_keyexpr, entity);
841                        }
842
843                        DiscoveryEvent::UndiscoveredPublication {
844                            key,
845                        } => {
846                            if let Some((_, e)) = self.remove_dds_writer(&key) {
847                                debug!("Undiscovered DDS Writer {} on topic {}", key, e.topic_name);
848                                // remove it from all the active routes referring it (deleting the route if no longer used)
849                                let admin_space = &mut self.admin_space;
850                                self.routes_from_dds.retain(|zkey, route| {
851                                        route.remove_local_routed_writer(&key);
852                                        if !route.has_local_routed_writer() {
853                                            info!(
854                                                "{}: remove it as no longer unused (no local DDS Writer left)",
855                                                route
856                                            );
857                                            let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
858                                            admin_space.remove(&ke);
859                                            false
860                                        } else {
861                                            true
862                                        }
863                                    }
864                                );
865                            }
866                        }
867
868                        DiscoveryEvent::DiscoveredSubscription {
869                            mut entity
870                        } => {
871                            debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?}", entity.key, entity.topic_name, entity.type_name, entity.qos);
872                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
873
874                            let qos = adapt_reader_qos_for_writer(&entity.qos);
875
876                            // create 1 route per partition, or just 1 if no partition
877                            if partition_is_empty(&entity.qos.partition) {
878                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
879                                let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
880                                if let RouteStatus::Routed(ref route_key) = route_status {
881                                    if let Some(r) = self.routes_to_dds.get_mut(route_key) {
882                                        // if route has been created, add this Reader in its routed_readers list
883                                        r.add_local_routed_reader(entity.key.clone());
884                                    }
885                                }
886                                entity.routes.insert("*".to_string(), route_status);
887                            } else {
888                                for p in entity.qos.partition.as_deref().unwrap() {
889                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
890                                    let mut qos = qos.clone();
891                                    qos.partition = Some(vec![p.to_string()]);
892                                    let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
893                                    if let RouteStatus::Routed(ref route_key) = route_status {
894                                        if let Some(r) = self.routes_to_dds.get_mut(route_key) {
895                                            // if route has been created, add this Reader in its routed_readers list
896                                            r.add_local_routed_reader(entity.key.clone());
897                                        }
898                                    }
899                                    entity.routes.insert(p.clone(), route_status);
900                                }
901                            }
902
903                            // store the reader
904                            self.insert_dds_reader(admin_keyexpr, entity);
905                        }
906
907                        DiscoveryEvent::UndiscoveredSubscription {
908                            key,
909                        } => {
910                            if let Some((_, e)) = self.remove_dds_reader(&key) {
911                                debug!("Undiscovered DDS Reader {} on topic {}", key, e.topic_name);
912                                // remove it from all the active routes referring it (deleting the route if no longer used)
913                                let admin_space = &mut self.admin_space;
914                                self.routes_to_dds.retain(|zkey, route| {
915                                        route.remove_local_routed_reader(&key);
916                                        if !route.has_local_routed_reader() {
917                                            info!(
918                                                "{}: remove it as no longer unused (no local DDS Reader left)",
919                                                route
920                                            );
921                                            let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
922                                            admin_space.remove(&ke);
923                                            false
924                                        } else {
925                                            true
926                                        }
927                                    }
928                                );
929                            }
930                        }
931
932                        DiscoveryEvent::DiscoveredParticipant {
933                            entity,
934                        } => {
935                            debug!("Discovered DDS Participant {}", entity.key);
936                            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
937
938                            // store the participant
939                            self.insert_dds_participant(admin_keyexpr, entity);
940                        }
941
942                        DiscoveryEvent::UndiscoveredParticipant {
943                            key,
944                        } => {
945                            if let Some((_, _)) = self.remove_dds_participant(&key) {
946                                debug!("Undiscovered DDS Participant {}", key);
947                            }
948                        }
949                    }
950                },
951
952                group_event = group_subscriber.recv_async() => {
953                    match group_event.as_ref().map(|s|s.kind()) {
954                        Ok(SampleKind::Put) => {
955                            let zid = zenoh_id!(group_event.as_ref().unwrap());
956                            debug!("New zenoh_dds_plugin detected: {}", zid);
957                            if let Ok(zenoh_id) = keyexpr::new(zid) {
958                                // make all QueryingSubscriber to query this new member
959                                for (zkey, route) in &mut self.routes_to_dds {
960                                    route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE  / zenoh_id).into(), self.config.queries_timeout).await;
961                                }
962                            } else {
963                                error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
964                            }
965                        }
966                        Ok(_) => {} // ignore other GroupEvents
967                        Err(e) => warn!("Error receiving GroupEvent: {}", e)
968                    }
969                }
970
971                get_request = admin_queryable.recv_async() => {
972                    if let Ok(query) = get_request {
973                        self.treat_admin_query(query, &admin_keyexpr_prefix).await;
974                    } else {
975                        warn!("AdminSpace queryable was closed!");
976                    }
977                }
978            )
979        }
980    }
981
982    async fn run_fwd_discovery_mode(
983        &mut self,
984        group_subscriber: &Receiver<Sample>,
985        dds_disco_rcv: &Receiver<DiscoveryEvent>,
986        admin_keyexpr_prefix: OwnedKeyExpr,
987        admin_queryable: &Queryable<FifoChannelHandler<Query>>,
988    ) {
989        debug!(r#"Run in "forward discovery" mode"#);
990
991        // The data space where all discovery info are forwarded:
992        //   - writers discovery on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/writer/<dds_entity_admin_key>
993        //   - readers discovery on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/reader/<dds_entity_admin_key>
994        //   - ros_discovery_info on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/ros_disco/<gid>
995        // The PublicationCache is declared on <KE_PREFIX_ADMIN_SPACE>/<uuid>/<KE_PREFIX_FWD_DISCO>/[<scope>]/**
996        // The QuerySubscriber is declared on  <KE_PREFIX_ADMIN_SPACE>/*/<KE_PREFIX_FWD_DISCO>/[<scope>]/**
997        let uuid: OwnedKeyExpr = self.zsession.zid().into();
998        let fwd_key_prefix = if let Some(scope) = &self.config.scope {
999            *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO / scope
1000        } else {
1001            *KE_PREFIX_ADMIN_SPACE / &uuid / *KE_PREFIX_FWD_DISCO
1002        };
1003        let fwd_writers_key_prefix =
1004            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("writer") };
1005        let fwd_readers_key_prefix =
1006            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("reader") };
1007        let fwd_ros_discovery_key =
1008            &fwd_key_prefix / unsafe { keyexpr::from_str_unchecked("ros_disco") };
1009        let fwd_declare_publication_cache_key = &fwd_key_prefix / *KE_ANY_N_SEGMENT;
1010        let fwd_discovery_subscription_key = if let Some(scope) = &self.config.scope {
1011            *KE_PREFIX_ADMIN_SPACE
1012                / *KE_ANY_1_SEGMENT
1013                / *KE_PREFIX_FWD_DISCO
1014                / scope
1015                / *KE_ANY_N_SEGMENT
1016        } else {
1017            *KE_PREFIX_ADMIN_SPACE / *KE_ANY_1_SEGMENT / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1018        };
1019
1020        // Register prefixes for optimization
1021        let fwd_writers_key_prefix_key = self
1022            .zsession
1023            .declare_keyexpr(fwd_writers_key_prefix)
1024            .await
1025            .expect("Failed to declare key expression for Fwd Discovery of writers");
1026        let fwd_readers_key_prefix_key = self
1027            .zsession
1028            .declare_keyexpr(fwd_readers_key_prefix)
1029            .await
1030            .expect("Failed to declare key expression for Fwd Discovery of readers");
1031        let fwd_ros_discovery_key_declared = self
1032            .zsession
1033            .declare_keyexpr(&fwd_ros_discovery_key)
1034            .await
1035            .expect("Failed to declare key expression for Fwd Discovery of ros_discovery");
1036
1037        // Cache the publications on admin space for late joiners DDS plugins
1038        let _fwd_disco_pub_cache = self
1039            .zsession
1040            .declare_publication_cache(fwd_declare_publication_cache_key)
1041            .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers
1042            .await
1043            .expect("Failed to declare PublicationCache for Fwd Discovery");
1044
1045        // Subscribe to remote DDS plugins publications of new Readers/Writers on admin space
1046        let fwd_disco_sub = self
1047            .zsession
1048            .declare_subscriber(fwd_discovery_subscription_key)
1049            .querying()
1050            .allowed_origin(Locality::Remote) // Note: ignore my own publications
1051            .query_timeout(self.config.queries_timeout)
1052            .await
1053            .expect("Failed to declare QueryingSubscriber for Fwd Discovery");
1054
1055        // Manage ros_discovery_info topic, reading it periodically
1056        let ros_disco_mgr =
1057            RosDiscoveryInfoMgr::create(self.dp).expect("Failed to create RosDiscoveryInfoMgr");
1058        let timer = Timer::default();
1059        let (tx, ros_disco_timer_rcv): (Sender<()>, Receiver<()>) = unbounded();
1060        let ros_disco_timer_event = TimedEvent::periodic(
1061            Duration::from_millis(ROS_DISCOVERY_INFO_POLL_INTERVAL_MS),
1062            ChannelEvent { tx },
1063        );
1064        timer.add_async(ros_disco_timer_event).await;
1065
1066        // The ParticipantEntitiesInfo to be re-published on ros_discovery_info (with this bridge's participant gid)
1067        let mut participant_info = ParticipantEntitiesInfo::new(
1068            get_guid(&self.dp).expect("Failed to get my Participant's guid"),
1069        );
1070
1071        let scope = self.config.scope.clone();
1072        loop {
1073            select!(
1074                evt = dds_disco_rcv.recv_async() => {
1075                    match evt.unwrap() {
1076                        DiscoveryEvent::DiscoveredPublication {
1077                            entity
1078                        } => {
1079                            debug!("Discovered DDS Writer {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1080                            // advertise the entity and its scope within admin space (bincode format)
1081                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, true);
1082                            let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1083                            let msg = (&entity, &scope);
1084                            let ser_msg = match bincode::serialize(&msg) {
1085                                Ok(s) => s,
1086                                Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1087                            };
1088                            if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1089                                error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1090                            }
1091
1092                            // store the writer in admin space
1093                            self.insert_dds_writer(admin_keyexpr, entity);
1094                        }
1095
1096                        DiscoveryEvent::UndiscoveredPublication {
1097                            key,
1098                        } => {
1099                            debug!("Undiscovered DDS Writer {} => advertise it", key);
1100                            if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
1101                                let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
1102                                // publish its deletion from admin space
1103                                if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1104                                    error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1105                                }
1106                            }
1107                        }
1108
1109                        DiscoveryEvent::DiscoveredSubscription {
1110                            mut entity
1111                        } => {
1112                            debug!("Discovered DDS Reader {} on {} with type '{}' and QoS: {:?} => advertise it", entity.key, entity.topic_name, entity.type_name, entity.qos);
1113
1114                            // #102: create a local "to_dds" route, but only with the Zenoh Subscriber (not the DDS Writer)
1115                            // create 1 route per partition, or just 1 if no partition
1116                            if partition_is_empty(&entity.qos.partition) {
1117                                let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, None).unwrap();
1118                                let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&entity.qos), None).await;
1119                                if let RouteStatus::Routed(ref route_key) = route_status {
1120                                    if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1121                                        // if route has been created, add this Reader in its routed_readers list
1122                                        r.add_local_routed_reader(entity.key.clone());
1123                                    }
1124                                }
1125                                entity.routes.insert("*".to_string(), route_status);
1126                            } else {
1127                                for p in entity.qos.partition.as_deref().unwrap() {
1128                                    let ke = self.topic_to_keyexpr(&entity.topic_name, &self.config.scope, Some(p)).unwrap();
1129                                    let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&entity.qos), None).await;
1130                                    if let RouteStatus::Routed(ref route_key) = route_status {
1131                                        if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1132                                            // if route has been created, add this Reader in its routed_readers list
1133                                            r.add_local_routed_reader(entity.key.clone());
1134                                        }
1135                                    }
1136                                    entity.routes.insert(p.clone(), route_status);
1137                                }
1138                            }
1139
1140                            // advertise the entity and its scope within admin space (bincode format)
1141                            let admin_keyexpr = DdsPluginRuntime::get_entity_admin_keyexpr(&entity, false);
1142                            let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1143                            let msg = (&entity, &scope);
1144                            let ser_msg = match bincode::serialize(&msg) {
1145                                Ok(s) => s,
1146                                Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
1147                            };
1148                            if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).await {
1149                                error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
1150                            }
1151
1152                            // store the reader
1153                            self.insert_dds_reader(admin_keyexpr, entity);
1154                        }
1155
1156                        DiscoveryEvent::UndiscoveredSubscription {
1157                            key,
1158                        } => {
1159                            debug!("Undiscovered DDS Reader {} => advertise it", key);
1160                            if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
1161                                let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
1162                                // publish its deletion from admin space
1163                                if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).await {
1164                                    error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
1165                                }
1166                            }
1167                            // #102: also remove the Reader from all the active routes referring it,
1168                            // deleting the route if it has no longer local Reader nor remote Writer.
1169                            let admin_space = &mut self.admin_space;
1170                            self.routes_to_dds.retain(|zkey, route| {
1171                                    route.remove_local_routed_reader(&key);
1172                                    if !route.has_local_routed_reader() && !route.has_remote_routed_writer(){
1173                                        info!(
1174                                            "{}: remove it as no longer unused (no local DDS Reader nor remote DDS Writer left)",
1175                                            route
1176                                        );
1177                                        let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1178                                        admin_space.remove(&ke);
1179                                        false
1180                                    } else {
1181                                        true
1182                                    }
1183                                }
1184                            );
1185                        }
1186
1187                        DiscoveryEvent::DiscoveredParticipant {
1188                            entity,
1189                        } => {
1190                            debug!("Discovered DDS Participant {}", entity.key);
1191                            let admin_keyexpr = DdsPluginRuntime::get_participant_admin_keyexpr(&entity);
1192
1193                            // store the participant
1194                            self.insert_dds_participant(admin_keyexpr, entity);
1195                        }
1196
1197                        DiscoveryEvent::UndiscoveredParticipant {
1198                            key,
1199                        } => {
1200                            if let Some((_, _)) = self.remove_dds_participant(&key) {
1201                                debug!("Undiscovered DDS Participant {}", key);
1202                            }
1203                        }
1204                    }
1205                },
1206
1207                sample = fwd_disco_sub.recv_async() => {
1208                    let sample = sample.expect("Fwd Discovery subscriber was closed!");
1209                    let fwd_ke = &sample.key_expr();
1210                    debug!("Received forwarded discovery message on {}", fwd_ke);
1211
1212                    // parse fwd_ke and extract the remote uuid, the discovery kind (reader|writer|ros_disco) and the remaining of the keyexpr
1213                    if let Some((remote_uuid, disco_kind, remaining_ke)) = Self::parse_fwd_discovery_keyexpr(fwd_ke) {
1214                        match disco_kind {
1215                            // it's a writer discovery message
1216                            "writer" => {
1217                                // reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
1218                                let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1219                                if sample.kind() != SampleKind::Delete {
1220                                    // deserialize payload
1221                                    let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1222                                        Ok(x) => x,
1223                                        Err(e) => {
1224                                            warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1225                                            continue;
1226                                        }
1227                                    };
1228                                    let qos = adapt_writer_qos_for_proxy_writer(&entity.qos);
1229
1230                                    // create 1 "to_dds" route per partition, or just 1 if no partition
1231                                    if partition_is_empty(&entity.qos.partition) {
1232                                        let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1233                                        let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1234                                        if let RouteStatus::Routed(ref route_key) = route_status {
1235                                            if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1236                                                // add the writer's admin keyexpr to the list of remote_routed_writers
1237                                                r.add_remote_routed_writer(full_admin_keyexpr);
1238                                                // check amongst local Readers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1239                                                for reader in self.discovered_readers.values_mut() {
1240                                                    if reader.topic_name == entity.topic_name && partition_is_empty(&reader.qos.partition) {
1241                                                        r.add_local_routed_reader(reader.key.clone());
1242                                                        reader.routes.insert("*".to_string(), route_status.clone());
1243                                                    }
1244                                                }
1245                                            }
1246                                        }
1247                                    } else {
1248                                        for p in entity.qos.partition.as_deref().unwrap() {
1249                                            let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1250                                            let mut qos = qos.clone();
1251                                            qos.partition = Some(vec![p.to_string()]);
1252                                            let route_status = self.try_add_route_to_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, is_transient_local(&qos), Some(qos)).await;
1253                                            if let RouteStatus::Routed(ref route_key) = route_status {
1254                                                if let Some(r) = self.routes_to_dds.get_mut(route_key) {
1255                                                    // add the writer's admin keyexpr to the list of remote_routed_writers
1256                                                    r.add_remote_routed_writer(full_admin_keyexpr.clone());
1257                                                    // check amongst local Readers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1258                                                    for reader in self.discovered_readers.values_mut() {
1259                                                        if reader.topic_name == entity.topic_name && partition_contains(&reader.qos.partition, p) {
1260                                                            r.add_local_routed_reader(reader.key.clone());
1261                                                            reader.routes.insert(p.clone(), route_status.clone());
1262                                                        }
1263                                                    }
1264                                                }
1265                                            }
1266                                        }
1267                                    }
1268                                } else {
1269                                    // writer was deleted; remove it from all the active routes referring it (deleting the route if no longer used)
1270                                    let admin_space = &mut self.admin_space;
1271                                    self.routes_to_dds.retain(|zkey, route| {
1272                                            route.remove_remote_routed_writer(&full_admin_keyexpr);
1273                                            if route.has_remote_routed_writer() {
1274                                                // if there are still remote writers for this route, keep it
1275                                                true
1276                                            } else {
1277                                                // #102: Delete the DDS Writer of this route if there are no more remote Writers,
1278                                                // but don't delete the route itself if there is still a local Reader.
1279                                                route.delete_dds_writer();
1280                                                if !route.has_local_routed_reader() {
1281                                                    info!(
1282                                                        "{}: remove it as no longer unused (no remote DDS Writer nor local DDS Reader left)",
1283                                                        route
1284                                                    );
1285                                                    let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1286                                                    admin_space.remove(&ke);
1287                                                    false
1288                                                } else {
1289                                                    true
1290                                                }
1291                                            }
1292                                        }
1293                                    );
1294                                }
1295                            }
1296
1297                            // it's a reader discovery message
1298                            "reader" => {
1299                                // reconstruct full admin keyexpr for this entity (i.e. with it's remote plugin's uuid)
1300                                let full_admin_keyexpr = *KE_PREFIX_ADMIN_SPACE / remote_uuid / *KE_PREFIX_DDS / remaining_ke;
1301                                if sample.kind() != SampleKind::Delete {
1302                                    // deserialize payload
1303                                    let (entity, scope) = match bincode::deserialize::<(DdsEntity, Option<OwnedKeyExpr>)>(&sample.payload().to_bytes()) {
1304                                        Ok(x) => x,
1305                                        Err(e) => {
1306                                            warn!("Failed to deserialize discovery msg for {}: {}", full_admin_keyexpr, e);
1307                                            continue;
1308                                        }
1309                                    };
1310                                    let qos = adapt_reader_qos_for_proxy_reader(&entity.qos);
1311
1312                                    // CongestionControl to be used when re-publishing over zenoh: Blocking if Reader is RELIABLE (since Writer will also be, otherwise no matching)
1313                                    let congestion_ctrl = match (self.config.reliable_routes_blocking, is_reader_reliable(&entity.qos.reliability)) {
1314                                        (true, true) => CongestionControl::Block,
1315                                        _ => CongestionControl::Drop,
1316                                    };
1317
1318                                    // create 1 'from_dds" route per partition, or just 1 if no partition
1319                                    if partition_is_empty(&entity.qos.partition) {
1320                                        let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, None).unwrap();
1321                                        let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1322                                        if let RouteStatus::Routed(ref route_key) = route_status {
1323                                            if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1324                                                // add the reader's admin keyexpr to the list of remote_routed_writers
1325                                                r.add_remote_routed_reader(full_admin_keyexpr);
1326                                                // check amongst local Writers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1327                                                for writer in self.discovered_writers.values_mut() {
1328                                                    if writer.topic_name == entity.topic_name && partition_is_empty(&writer.qos.partition) {
1329                                                        r.add_local_routed_writer(writer.key.clone());
1330                                                        writer.routes.insert("*".to_string(), route_status.clone());
1331                                                    }
1332                                                }
1333                                            }
1334                                        }
1335                                    } else {
1336                                        for p in &entity.qos.partition.unwrap() {
1337                                            let ke = self.topic_to_keyexpr(&entity.topic_name, &scope, Some(p)).unwrap();
1338                                            let mut qos = qos.clone();
1339                                            qos.partition = Some(vec![p.to_string()]);
1340                                            let route_status = self.try_add_route_from_dds(ke, &entity.topic_name, &entity.type_name, &entity.type_info, entity.keyless, qos, congestion_ctrl).await;
1341                                            if let RouteStatus::Routed(ref route_key) = route_status {
1342                                                if let Some(r) = self.routes_from_dds.get_mut(route_key) {
1343                                                    // add the reader's admin keyexpr to the list of remote_routed_writers
1344                                                    r.add_remote_routed_reader(full_admin_keyexpr.clone());
1345                                                    // check amongst local Writers is some are matching (only wrt. topic_name and partition. TODO: consider qos match also)
1346                                                    for writer in self.discovered_writers.values_mut() {
1347                                                        if writer.topic_name == entity.topic_name && partition_contains(&writer.qos.partition, p) {
1348                                                            r.add_local_routed_writer(writer.key.clone());
1349                                                            writer.routes.insert(p.clone(), route_status.clone());
1350                                                        }
1351                                                    }
1352                                                }
1353                                            }
1354                                        }
1355                                    }
1356                                } else {
1357                                    // reader was deleted; remove it from all the active routes referring it (deleting the route if no longer used)
1358                                    let admin_space = &mut self.admin_space;
1359                                    self.routes_from_dds.retain(|zkey, route| {
1360                                            route.remove_remote_routed_reader(&full_admin_keyexpr);
1361                                            if !route.has_remote_routed_reader() {
1362                                                info!(
1363                                                    "{}: remove it as no longer unused (no remote DDS Reader left)",
1364                                                    route
1365                                                );
1366                                                let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1367                                                admin_space.remove(&ke);
1368                                                false
1369                                            } else {
1370                                                true
1371                                            }
1372                                        }
1373                                    );
1374                                }
1375                            }
1376
1377                            // it's a ros_discovery_info message
1378                            "ros_disco" => {
1379                                match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>(
1380                                    sample.payload().reader(),
1381                                    cdr::size::Infinite,
1382                                ) {
1383                                    Ok(mut info) => {
1384                                        // remap all original gids with the gids of the routes
1385                                        self.remap_entities_info(&mut info.node_entities_info_seq);
1386                                        // update the ParticipantEntitiesInfo for this bridge and re-publish it on DDS
1387                                        participant_info.update_with(info.node_entities_info_seq);
1388                                        debug!("Publish updated ros_discovery_info: {:?}", participant_info);
1389                                        if let Err(e) = ros_disco_mgr.write(&participant_info) {
1390                                            error!("Error forwarding ros_discovery_info: {}", e);
1391                                        }
1392                                    }
1393                                    Err(e) => error!(
1394                                        "Error receiving ParticipantEntitiesInfo on {}: {}",
1395                                        fwd_ke, e
1396                                    ),
1397                                }
1398                            }
1399
1400                            x => {
1401                                error!("Unexpected forwarded discovery message received on invalid key {} (unknown kind: {}) ", fwd_ke, x);
1402                            }
1403                        }
1404                    }
1405                },
1406
1407                group_event = group_subscriber.recv_async() => {
1408                    match group_event.as_ref().map(|s|s.kind()) {
1409                        Ok(SampleKind::Put) => {
1410                            let zid = zenoh_id!(group_event.as_ref().unwrap());
1411                            debug!("New zenoh_dds_plugin detected: {}", zid);
1412
1413                            if let Ok(zenoh_id) = keyexpr::new(zid) {
1414                                // query for past publications of discocvery messages from this new member
1415                                let key = if let Some(scope) = &self.config.scope {
1416                                    *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / scope / *KE_ANY_N_SEGMENT
1417                                } else {
1418                                    *KE_PREFIX_ADMIN_SPACE / zenoh_id / *KE_PREFIX_FWD_DISCO / *KE_ANY_N_SEGMENT
1419                                };
1420                                debug!("Query past discovery messages from {} on {}", zid, key);
1421                                if let Err(e) = fwd_disco_sub.fetch( |cb| {
1422                                    self.zsession.get(Selector::from(&key))
1423                                        .callback(cb)
1424                                        .target(QueryTarget::All)
1425                                        .consolidation(ConsolidationMode::None)
1426                                        .timeout(self.config.queries_timeout)
1427                                        .wait()
1428                                }).await
1429                                {
1430                                    warn!("Query on {} for discovery messages failed: {}", key, e);
1431                                }
1432                                // make all QueryingSubscriber to query this new member
1433                                for (zkey, route) in &mut self.routes_to_dds {
1434                                    route.query_historical_publications(|| (zkey / *KE_PREFIX_PUB_CACHE / zenoh_id).into(), self.config.queries_timeout).await;
1435                                }
1436                            } else {
1437                                error!("Can't convert zenoh id '{}' into a KeyExpr", zid);
1438                            }
1439                        }
1440                        Ok(SampleKind::Delete) => {
1441                            let zid = zenoh_id!(group_event.as_ref().unwrap());
1442                            debug!("Remote zenoh_dds_plugin left: {}", zid);
1443                            // remove all the references to the plugin's entities, removing no longer used routes
1444                            // and updating/re-publishing ParticipantEntitiesInfo
1445                            let admin_space = &mut self.admin_space;
1446                            let admin_subke = format!("@/{zid}/dds/");
1447                            let mut participant_info_changed = false;
1448                            self.routes_to_dds.retain(|zkey, route| {
1449                                route.remove_remote_routed_writers_containing(&admin_subke);
1450                                if !route.has_remote_routed_writer() {
1451                                    info!(
1452                                        "{}: remove it as no longer unused (no remote DDS Writer left)",
1453                                        route
1454                                    );
1455                                    let ke = *KE_PREFIX_ROUTE_TO_DDS / zkey;
1456                                    admin_space.remove(&ke);
1457                                    if let Ok(guid) = route.dds_writer_guid() {
1458                                        participant_info.remove_writer_gid(&guid);
1459                                        participant_info_changed = true;
1460                                    } else {
1461                                        warn!("Failed to get guid for Writer serving the route zenoh '{}' => DDS '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1462                                    }
1463                                    false
1464                                } else {
1465                                    true
1466                                }
1467                            });
1468                            self.routes_from_dds.retain(|zkey, route| {
1469                                route.remove_remote_routed_readers_containing(&admin_subke);
1470                                if !route.has_remote_routed_reader() {
1471                                    info!(
1472                                        "{}: remove it as no longer unused (no remote DDS Reader left)",
1473                                        route
1474                                    );
1475                                    let ke = *KE_PREFIX_ROUTE_FROM_DDS / zkey;
1476                                    admin_space.remove(&ke);
1477                                    if let Ok(guid) = route.dds_reader_guid() {
1478                                        participant_info.remove_reader_gid(&guid);
1479                                        participant_info_changed = true;
1480                                    } else {
1481                                        warn!("Failed to get guid for Reader serving the route DDS '{}' => zenoh '{}'. Can't update ros_discovery_info accordingly", zkey, zkey);
1482                                    }
1483                                    false
1484                                } else {
1485                                    true
1486                                }
1487                            });
1488                            if participant_info_changed {
1489                                debug!("Publishing up-to-date ros_discovery_info after leaving of plugin {}", zid);
1490                                participant_info.cleanup();
1491                                if let Err(e) = ros_disco_mgr.write(&participant_info) {
1492                                    error!("Error forwarding ros_discovery_info: {}", e);
1493                                }
1494                            }
1495                        }
1496                        Err(e) => warn!("Error receiving GroupEvent: {}", e)
1497                    }
1498                }
1499
1500                get_request = admin_queryable.recv_async() => {
1501                    if let Ok(query) = get_request {
1502                        self.treat_admin_query(query, &admin_keyexpr_prefix).await;
1503                    } else {
1504                        warn!("AdminSpace queryable was closed!");
1505                    }
1506                }
1507
1508                _ = ros_disco_timer_rcv.recv_async() => {
1509                    let infos = ros_disco_mgr.read();
1510                    for (gid, buf) in infos {
1511                        trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, buf.hex_encode());
1512                        // forward the payload on zenoh
1513                        let ke = &fwd_ros_discovery_key_declared / unsafe { keyexpr::from_str_unchecked(&gid) };
1514                        if let Err(e) = self.zsession.put(ke, buf).wait() {
1515                            error!("Forward ROS discovery info failed: {}", e);
1516                        }
1517                    }
1518                }
1519            )
1520        }
1521    }
1522
1523    fn parse_fwd_discovery_keyexpr(fwd_ke: &keyexpr) -> Option<(&keyexpr, &str, &keyexpr)> {
1524        // parse fwd_ke which have format: "KE_PREFIX_ADMIN_SPACE/<uuid>/KE_PREFIX_FWD_DISCO[/scope/possibly/multiple]/<disco_kind>/<remaining_ke...>"
1525        if !fwd_ke.starts_with(KE_PREFIX_ADMIN_SPACE.as_str()) {
1526            // publication on a key expression matching the fwd_ke: ignore it
1527            return None;
1528        }
1529        let mut remaining = &fwd_ke[KE_PREFIX_ADMIN_SPACE.len() + 1..];
1530        let uuid = if let Some(i) = remaining.find('/') {
1531            let uuid = unsafe { keyexpr::from_str_unchecked(&remaining[..i]) };
1532            remaining = &remaining[i + 1..];
1533            uuid
1534        } else {
1535            error!(
1536                "Unexpected forwarded discovery message received on invalid key: {}",
1537                fwd_ke
1538            );
1539            return None;
1540        };
1541        if !remaining.starts_with(KE_PREFIX_FWD_DISCO.as_str()) {
1542            // publication on a key expression matching the fwd_ke: ignore it
1543            return None;
1544        }
1545        let kind = if let Some(i) = remaining.find("/reader/") {
1546            remaining = &remaining[i + 8..];
1547            "reader"
1548        } else if let Some(i) = remaining.find("/writer/") {
1549            remaining = &remaining[i + 8..];
1550            "writer"
1551        } else if let Some(i) = remaining.find("/ros_disco/") {
1552            remaining = &remaining[i + 11..];
1553            "ros_disco"
1554        } else {
1555            error!("Unexpected forwarded discovery message received on invalid key: {} (no expected kind '/reader/', '/writer/' or '/ros_disco/')", fwd_ke);
1556            return None;
1557        };
1558        Some((uuid, kind, unsafe {
1559            keyexpr::from_str_unchecked(remaining)
1560        }))
1561    }
1562
1563    fn remap_entities_info(&self, entities_info: &mut HashMap<String, NodeEntitiesInfo>) {
1564        for node in entities_info.values_mut() {
1565            // TODO: replace with drain_filter when stable (https://github.com/rust-lang/rust/issues/43244)
1566            let mut i = 0;
1567            while i < node.reader_gid_seq.len() {
1568                // find a RouteDDSZenoh routing a remote reader with this gid
1569                match self
1570                    .routes_from_dds
1571                    .values()
1572                    .find(|route| route.is_routing_remote_reader(&node.reader_gid_seq[i]))
1573                {
1574                    Some(route) => {
1575                        // replace the gid with route's reader's gid
1576                        if let Ok(gid) = route.dds_reader_guid() {
1577                            trace!(
1578                                "ros_discovery_info remap reader {} -> {}",
1579                                node.reader_gid_seq[i],
1580                                gid
1581                            );
1582                            node.reader_gid_seq[i] = gid;
1583                            i += 1;
1584                        } else {
1585                            error!("Failed to get guid for Reader serving the a route. Can't remap in ros_discovery_info");
1586                        }
1587                    }
1588                    None => {
1589                        // remove the gid (not route found because either not allowed to be routed,
1590                        // either route already initiated by another reader)
1591                        trace!(
1592                            "ros_discovery_info remap reader {} -> NONE",
1593                            node.reader_gid_seq[i]
1594                        );
1595                        node.reader_gid_seq.remove(i);
1596                    }
1597                }
1598            }
1599            let mut i = 0;
1600            while i < node.writer_gid_seq.len() {
1601                // find a ToDdsRoute initiated by the writer with this gid
1602                match self
1603                    .routes_to_dds
1604                    .values()
1605                    .find(|route| route.is_routing_remote_writer(&node.writer_gid_seq[i]))
1606                {
1607                    Some(route) => {
1608                        // replace the gid with route's writer's gid
1609                        if let Ok(gid) = route.dds_writer_guid() {
1610                            trace!(
1611                                "ros_discovery_info remap writer {} -> {}",
1612                                node.writer_gid_seq[i],
1613                                gid
1614                            );
1615                            node.writer_gid_seq[i] = gid;
1616                            i += 1;
1617                        } else {
1618                            error!("Failed to get guid for Writer serving the a route. Can't remap in ros_discovery_info");
1619                        }
1620                    }
1621                    None => {
1622                        // remove the gid (not route found because either not allowed to be routed,
1623                        // either route already initiated by another writer)
1624                        trace!(
1625                            "ros_discovery_info remap writer {} -> NONE",
1626                            node.writer_gid_seq[i]
1627                        );
1628                        node.writer_gid_seq.remove(i);
1629                    }
1630                }
1631            }
1632        }
1633    }
1634}
1635
1636// Remove any null QoS values from a serde_json::Value
1637fn remove_null_qos_values(
1638    value: Result<Value, serde_json::Error>,
1639) -> Result<Value, serde_json::Error> {
1640    match value {
1641        Ok(value) => match value {
1642            Value::Object(mut obj) => {
1643                let qos = obj.get_mut("qos");
1644                if let Some(qos) = qos {
1645                    if qos.is_object() {
1646                        qos.as_object_mut().unwrap().retain(|_, v| !v.is_null());
1647                    }
1648                }
1649                Ok(Value::Object(obj))
1650            }
1651            _ => Ok(value),
1652        },
1653        Err(error) => Err(error),
1654    }
1655}
1656
1657// Copy and adapt Writer's QoS for creation of a matching Reader
1658fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos {
1659    let mut reader_qos = qos.clone();
1660
1661    // Unset any writer QoS that doesn't apply to data readers
1662    reader_qos.durability_service = None;
1663    reader_qos.ownership_strength = None;
1664    reader_qos.transport_priority = None;
1665    reader_qos.lifespan = None;
1666    reader_qos.writer_data_lifecycle = None;
1667    reader_qos.writer_batching = None;
1668
1669    // Unset proprietary QoS which shouldn't apply
1670    reader_qos.properties = None;
1671    reader_qos.entity_name = None;
1672    reader_qos.ignore_local = None;
1673
1674    // Set default Reliability QoS if not set for writer
1675    if reader_qos.reliability.is_none() {
1676        reader_qos.reliability = Some({
1677            Reliability {
1678                kind: ReliabilityKind::BEST_EFFORT,
1679                max_blocking_time: DDS_100MS_DURATION,
1680            }
1681        });
1682    }
1683
1684    reader_qos
1685}
1686
1687// Copy and adapt Writer's QoS for creation of a proxy Writer
1688fn adapt_writer_qos_for_proxy_writer(qos: &Qos) -> Qos {
1689    let mut writer_qos = qos.clone();
1690
1691    // Unset proprietary QoS which shouldn't apply
1692    writer_qos.properties = None;
1693    writer_qos.entity_name = None;
1694
1695    // Don't match with readers with the same participant
1696    writer_qos.ignore_local = Some(IgnoreLocal {
1697        kind: IgnoreLocalKind::PARTICIPANT,
1698    });
1699
1700    writer_qos
1701}
1702
1703// Copy and adapt Reader's QoS for creation of a matching Writer
1704fn adapt_reader_qos_for_writer(qos: &Qos) -> Qos {
1705    let mut writer_qos = qos.clone();
1706
1707    // Unset any reader QoS that doesn't apply to data writers
1708    writer_qos.time_based_filter = None;
1709    writer_qos.reader_data_lifecycle = None;
1710    writer_qos.properties = None;
1711    writer_qos.entity_name = None;
1712
1713    // Don't match with readers with the same participant
1714    writer_qos.ignore_local = Some(IgnoreLocal {
1715        kind: IgnoreLocalKind::PARTICIPANT,
1716    });
1717
1718    // if Reader is TRANSIENT_LOCAL, configure durability_service QoS with same history as the Reader.
1719    // This is because CycloneDDS is actually using durability_service.history for transient_local historical data.
1720    if is_transient_local(qos) {
1721        let history = qos
1722            .history
1723            .as_ref()
1724            .map_or(History::default(), |history| history.clone());
1725
1726        writer_qos.durability_service = Some(DurabilityService {
1727            service_cleanup_delay: 60 * DDS_1S_DURATION,
1728            history_kind: history.kind,
1729            history_depth: history.depth,
1730            max_samples: DDS_LENGTH_UNLIMITED,
1731            max_instances: DDS_LENGTH_UNLIMITED,
1732            max_samples_per_instance: DDS_LENGTH_UNLIMITED,
1733        });
1734    }
1735    // Workaround for the DDS Writer to correctly match with a FastRTPS Reader
1736    writer_qos.reliability = match writer_qos.reliability {
1737        Some(mut reliability) => {
1738            reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1739            Some(reliability)
1740        }
1741        _ => {
1742            let mut reliability = Reliability {
1743                kind: ReliabilityKind::RELIABLE,
1744                max_blocking_time: DDS_100MS_DURATION,
1745            };
1746            reliability.max_blocking_time = reliability.max_blocking_time.saturating_add(1);
1747            Some(reliability)
1748        }
1749    };
1750
1751    writer_qos
1752}
1753
1754// Copy and adapt Reader's QoS for creation of a proxy Reader
1755fn adapt_reader_qos_for_proxy_reader(qos: &Qos) -> Qos {
1756    let mut reader_qos = qos.clone();
1757
1758    // Unset proprietary QoS which shouldn't apply
1759    reader_qos.properties = None;
1760    reader_qos.entity_name = None;
1761    reader_qos.ignore_local = None;
1762
1763    reader_qos
1764}
1765
1766//TODO replace when stable https://github.com/rust-lang/rust/issues/65816
1767#[inline]
1768pub(crate) fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
1769    let mut me = ManuallyDrop::new(v);
1770    (me.as_mut_ptr(), me.len(), me.capacity())
1771}
1772
1773struct ChannelEvent {
1774    tx: Sender<()>,
1775}
1776
1777#[async_trait]
1778impl Timed for ChannelEvent {
1779    async fn run(&mut self) {
1780        if self.tx.send(()).is_err() {
1781            warn!("Error sending periodic timer notification on channel");
1782        };
1783    }
1784}