zenoh_plugin_ros2dds/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14#![allow(deprecated)]
15
16use std::{
17    collections::HashMap,
18    env,
19    future::Future,
20    mem::ManuallyDrop,
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use async_trait::async_trait;
28use cyclors::*;
29use events::ROS2AnnouncementEvent;
30use flume::{unbounded, Receiver, Sender};
31use futures::select;
32use serde::Serializer;
33use tokio::task::JoinHandle;
34use zenoh::{
35    bytes::{Encoding, ZBytes},
36    internal::{
37        plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
38        runtime::Runtime,
39        zerror, Timed,
40    },
41    key_expr::{
42        format::{kedefine, keformat},
43        keyexpr, OwnedKeyExpr,
44    },
45    liveliness::LivelinessToken,
46    query::Query,
47    sample::SampleKind,
48    Result as ZResult, Session,
49};
50use zenoh_ext::SubscriberBuilderExt;
51use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
52
53pub mod config;
54mod dds_discovery;
55mod dds_types;
56mod dds_utils;
57mod discovered_entities;
58mod discovery_mgr;
59mod events;
60mod gid;
61mod liveliness_mgt;
62mod node_info;
63mod qos_helpers;
64mod ros2_utils;
65mod ros_discovery;
66mod route_action_cli;
67mod route_action_srv;
68mod route_publisher;
69mod route_service_cli;
70mod route_service_srv;
71mod route_subscriber;
72mod routes_mgr;
73use config::{Config, RosAutomaticDiscoveryRange};
74
75use crate::{
76    dds_utils::get_guid,
77    discovery_mgr::DiscoveryMgr,
78    events::ROS2DiscoveryEvent,
79    liveliness_mgt::*,
80    ros2_utils::{key_expr_to_ros2_name, ros_distro_is_less_than},
81    ros_discovery::RosDiscoveryInfoMgr,
82    routes_mgr::RoutesMgr,
83};
84
85lazy_static::lazy_static! {
86    static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
87    static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
88    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
89    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
90               .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
91               .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
92               .enable_all()
93               .build()
94               .expect("Unable to create runtime");
95}
96#[inline(always)]
97pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
98where
99    F: Future + Send + 'static,
100    F::Output: Send + 'static,
101{
102    // Check whether able to get the current runtime
103    match tokio::runtime::Handle::try_current() {
104        Ok(rt) => {
105            // Able to get the current runtime (standalone binary), use the current runtime
106            rt.spawn(task)
107        }
108        Err(_) => {
109            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
110            TOKIO_RUNTIME.spawn(task)
111        }
112    }
113}
114
115lazy_static::lazy_static!(
116
117
118    static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
119
120    static ref KE_ANY_1_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("*") };
121    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
122
123    static ref KE_PREFIX_PUB_CACHE: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("@ros2_pub_cache") };
124);
125
126kedefine!(
127    // Admin space key expressions of plugin's version
128    pub ke_admin_version: "${plugin_status_key:**}/__version__",
129
130    // Admin prefix of this bridge
131    pub ke_admin_prefix: "@/${zenoh_id:*}/ros2/",
132);
133
134// CycloneDDS' localhost-only: set network interface address (shortened form of config would be
135// possible, too, but I think it is clearer to spell it out completely).
136// Empty configuration fragments are ignored, so it is safe to unconditionally append a comma.
137const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General>
138                                                      <Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces>
139                                                  </General></Domain></CycloneDDS>,"#;
140
141// CycloneDDS' enable-shm: enable usage of Iceoryx shared memory
142#[cfg(feature = "dds_shm")]
143const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><SharedMemory><Enable>true</Enable></SharedMemory></Domain></CycloneDDS>,"#;
144
145// interval between each read/write on "ros_discovery_info" topic
146const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 100;
147const ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS: u64 = 100;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(ROS2Plugin);
151
152#[allow(clippy::upper_case_acronyms)]
153pub struct ROS2Plugin;
154
155impl ZenohPlugin for ROS2Plugin {}
156impl Plugin for ROS2Plugin {
157    type StartArgs = Runtime;
158    type Instance = RunningPlugin;
159
160    const PLUGIN_VERSION: &'static str = plugin_version!();
161    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
162    const DEFAULT_NAME: &'static str = "ros2dds";
163
164    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
165        // Try to initiate login.
166        // Required in case of dynamic lib, otherwise no logs.
167        // But cannot be done twice in case of static link.
168        zenoh::try_init_log_from_env();
169
170        let runtime_conf = runtime.config().lock();
171        let plugin_conf = runtime_conf
172            .plugin(name)
173            .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
174        let config: Config = serde_json::from_value(plugin_conf.clone())
175            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176        WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
177        MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
178
179        spawn_runtime(run(runtime.clone(), config));
180
181        Ok(Box::new(ROS2Plugin))
182    }
183}
184impl PluginControl for ROS2Plugin {}
185impl RunningPluginTrait for ROS2Plugin {}
186
187fn create_cyclonedds_config(
188    ros_automatic_discovery_range: RosAutomaticDiscoveryRange,
189    ros_static_peers: Vec<String>,
190) -> String {
191    let mut config = String::new();
192    // Refer to https://github.com/ros2/rmw_cyclonedds/blob/c9e7001e6bf5373bdf1931535354b52eeddb2053/rmw_cyclonedds_cpp/src/rmw_node.cpp#L1134
193    let add_localhost_as_static_peer: bool;
194    let add_static_peers: bool;
195    let disable_multicast: bool;
196    match ros_automatic_discovery_range {
197        RosAutomaticDiscoveryRange::Subnet => {
198            add_localhost_as_static_peer = false;
199            add_static_peers = true;
200            disable_multicast = false;
201        }
202        RosAutomaticDiscoveryRange::SystemDefault => {
203            add_localhost_as_static_peer = false;
204            add_static_peers = false;
205            disable_multicast = false;
206        }
207        RosAutomaticDiscoveryRange::Localhost => {
208            add_localhost_as_static_peer = true;
209            add_static_peers = true;
210            disable_multicast = true;
211        }
212        RosAutomaticDiscoveryRange::Off => {
213            add_localhost_as_static_peer = false;
214            add_static_peers = false;
215            disable_multicast = true;
216        }
217    };
218    if add_localhost_as_static_peer || add_static_peers || disable_multicast {
219        config += "<CycloneDDS><Domain>";
220
221        if disable_multicast {
222            config += "<General><AllowMulticast>false</AllowMulticast></General>";
223        }
224
225        let discovery_off = disable_multicast && !add_localhost_as_static_peer && !add_static_peers;
226        if discovery_off {
227            config += "<Discovery><ParticipantIndex>none</ParticipantIndex>";
228            config += &format!("<Tag>ros_discovery_off_{}</Tag>", std::process::id());
229        } else {
230            config += "<Discovery><ParticipantIndex>auto</ParticipantIndex>";
231            config += "<MaxAutoParticipantIndex>32</MaxAutoParticipantIndex>";
232        }
233
234        if (add_static_peers && !ros_static_peers.is_empty()) || add_localhost_as_static_peer {
235            config += "<Peers>";
236            if add_localhost_as_static_peer {
237                config += "<Peer address=\"localhost\"/>";
238            }
239            for peer in ros_static_peers {
240                config += "<Peer address=\"";
241                config += &peer;
242                config += "\"/>";
243            }
244            config += "</Peers>";
245        }
246
247        config += "</Discovery></Domain></CycloneDDS>,";
248    }
249    config
250}
251
252pub async fn run(runtime: Runtime, config: Config) {
253    // Try to initiate login.
254    // Required in case of dynamic lib, otherwise no logs.
255    // But cannot be done twice in case of static link.
256    zenoh::try_init_log_from_env();
257    tracing::debug!("ROS2 plugin {}", ROS2Plugin::PLUGIN_VERSION);
258    tracing::info!("ROS2 plugin {config:?}");
259
260    // Check config validity
261    if !regex::Regex::new("/[A-Za-z0-9_/]*")
262        .unwrap()
263        .is_match(&config.namespace)
264    {
265        tracing::error!(
266            r#"Configuration error: invalid namespace "{}" must contain only alphanumeric, '_' or '/' characters and start with '/'"#,
267            config.namespace
268        );
269        return;
270    }
271    if !regex::Regex::new("[A-Za-z0-9_]+")
272        .unwrap()
273        .is_match(&config.nodename)
274    {
275        tracing::error!(
276            r#"Configuration error: invalid nodename "{}" must contain only alphanumeric or '_' characters"#,
277            config.nodename
278        );
279        return;
280    }
281
282    // open zenoh-net Session
283    let zsession = match zenoh::session::init(runtime).await {
284        Ok(session) => Arc::new(session),
285        Err(e) => {
286            tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
287            return;
288        }
289    };
290
291    // Declare plugin's liveliness token
292    let ke_liveliness = keformat!(
293        ke_liveliness_plugin::formatter(),
294        zenoh_id = zsession.zid().into_keyexpr()
295    )
296    .unwrap();
297    let member = match zsession.liveliness().declare_token(ke_liveliness).await {
298        Ok(member) => member,
299        Err(e) => {
300            tracing::error!(
301                "Unable to declare liveliness token for DDS plugin : {:?}",
302                e
303            );
304            return;
305        }
306    };
307
308    // Dynamic Discovery is changed after iron. Need to check the ROS 2 version.
309    // https://docs.ros.org/en/rolling/Tutorials/Advanced/Improved-Dynamic-Discovery.html
310    if ros_distro_is_less_than("iron") {
311        if config.ros_automatic_discovery_range.is_some() {
312            tracing::warn!("ROS_AUTOMATIC_DISCOVERY_RANGE will be ignored since it's not supported before ROS 2 Iron");
313        }
314        if config.ros_static_peers.is_some() {
315            tracing::warn!(
316                "ROS_STATIC_PEERS will be ignored since it's not supported before ROS 2 Iron"
317            );
318        }
319        // if "ros_localhost_only" is set, configure CycloneDDS to use only localhost interface
320        if config.ros_localhost_only {
321            env::set_var(
322                "CYCLONEDDS_URI",
323                format!(
324                    "{}{}",
325                    CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
326                    env::var("CYCLONEDDS_URI").unwrap_or_default()
327                ),
328            );
329        }
330    } else {
331        let (ros_automatic_discovery_range, ros_static_peers) = if config.ros_localhost_only {
332            // If ROS_LOCALHOST_ONLY is set, need to transform into new environmental variables
333            // Refer to https://github.com/ros2/ros2_documentation/pull/3519#discussion_r1186541935
334            tracing::warn!("ROS_LOCALHOST_ONLY is deprecated but still honored if it is enabled. Use ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS instead.");
335            tracing::warn!("'localhost_only' is enabled, 'automatic_discovery_range' and 'static_peers' will be ignored.");
336            (Some(RosAutomaticDiscoveryRange::Localhost), None)
337        } else {
338            (
339                config.ros_automatic_discovery_range,
340                config.ros_static_peers.clone(),
341            )
342        };
343        env::set_var(
344            "CYCLONEDDS_URI",
345            format!(
346                "{}{}",
347                create_cyclonedds_config(
348                    ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
349                    ros_static_peers.unwrap_or(Vec::new())
350                ),
351                env::var("CYCLONEDDS_URI").unwrap_or_default()
352            ),
353        );
354    }
355
356    // if "enable_shm" is set, configure CycloneDDS to use Iceoryx shared memory
357    #[cfg(feature = "dds_shm")]
358    {
359        if config.shm_enabled {
360            env::set_var(
361                "CYCLONEDDS_URI",
362                format!(
363                    "{}{}",
364                    CYCLONEDDS_CONFIG_ENABLE_SHM,
365                    env::var("CYCLONEDDS_URI").unwrap_or_default()
366                ),
367            );
368        }
369    }
370
371    // create DDS Participant
372    tracing::debug!(
373        "Create DDS Participant on domain {} with CYCLONEDDS_URI='{}'",
374        config.domain,
375        env::var("CYCLONEDDS_URI").unwrap_or_default()
376    );
377    let participant =
378        unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
379    tracing::debug!(
380        "ROS2 plugin {} using DDS Participant {} created",
381        zsession.zid(),
382        get_guid(&participant).unwrap()
383    );
384
385    let mut ros2_plugin = ROS2PluginRuntime {
386        config: Arc::new(config),
387        zsession,
388        participant,
389        _member: member,
390        admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
391    };
392
393    ros2_plugin.run().await;
394}
395
396pub struct ROS2PluginRuntime {
397    config: Arc<Config>,
398    // Note: &'a Arc<Session> here to keep the ownership of Session outside this struct
399    // and be able to store the publishers/subscribers it creates in this same struct.
400    zsession: Arc<Session>,
401    participant: dds_entity_t,
402    _member: LivelinessToken,
403    // admin space: index is the admin_keyexpr
404    // value is the JSon string to return to queries.
405    admin_space: HashMap<OwnedKeyExpr, AdminRef>,
406}
407
408// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
409#[derive(Debug)]
410enum AdminRef {
411    Config,
412    Version,
413}
414
415impl ROS2PluginRuntime {
416    async fn run(&mut self) {
417        // Subscribe to all liveliness info from other ROS2 plugins
418        let ke_liveliness_all = keformat!(
419            ke_liveliness_all::formatter(),
420            zenoh_id = "*",
421            remaining = "**"
422        )
423        .unwrap();
424        let liveliness_subscriber = self
425            .zsession
426            .liveliness()
427            .declare_subscriber(ke_liveliness_all)
428            .querying()
429            .with(flume::unbounded())
430            .await
431            .expect("Failed to create Liveliness Subscriber");
432
433        // declare admin space queryable
434        let admin_prefix = keformat!(
435            ke_admin_prefix::formatter(),
436            zenoh_id = &self.zsession.zid().into_keyexpr()
437        )
438        .unwrap();
439        let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
440        tracing::debug!("Declare admin space on {}", admin_keyexpr_expr);
441        let admin_queryable = self
442            .zsession
443            .declare_queryable(admin_keyexpr_expr)
444            .await
445            .expect("Failed to create AdminSpace queryable");
446
447        // add plugin's config and version in admin space
448        self.admin_space.insert(
449            &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
450            AdminRef::Config,
451        );
452        self.admin_space.insert(
453            &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
454            AdminRef::Version,
455        );
456
457        // Create and start the RosDiscoveryInfoMgr (managing ros_discovery_info topic)
458        let ros_discovery_mgr = Arc::new(
459            RosDiscoveryInfoMgr::new(
460                self.participant,
461                &self.config.namespace,
462                &self.config.nodename,
463            )
464            .expect("Failed to create RosDiscoveryInfoMgr"),
465        );
466        ros_discovery_mgr.run().await;
467
468        // Create and start DiscoveryManager
469        let (tx, discovery_rcv): (Sender<ROS2DiscoveryEvent>, Receiver<ROS2DiscoveryEvent>) =
470            unbounded();
471        let mut discovery_mgr = DiscoveryMgr::create(self.participant, ros_discovery_mgr.clone());
472        discovery_mgr.run(tx).await;
473
474        // Create RoutesManager
475        let mut routes_mgr = RoutesMgr::new(
476            self.config.clone(),
477            self.zsession.clone(),
478            self.participant,
479            discovery_mgr.discovered_entities.clone(),
480            ros_discovery_mgr,
481            admin_prefix.clone(),
482        );
483
484        loop {
485            select!(
486                evt = discovery_rcv.recv_async() => {
487                    match evt {
488                        Ok(evt) => {
489                            if self.is_allowed(&evt) {
490                                tracing::info!("{evt} - Allowed");
491                                // pass ROS2DiscoveryEvent to RoutesMgr
492                                if let Err(e) = routes_mgr.on_ros_discovery_event(evt).await {
493                                    tracing::warn!("Error updating route: {e}");
494                                }
495                            } else {
496                                tracing::debug!("{evt} - Denied per config");
497                            }
498                        }
499                        Err(e) => tracing::error!("Internal Error: received from DiscoveryMgr: {e}")
500                    }
501                },
502
503                liveliness_event = liveliness_subscriber.recv_async() => {
504                    match liveliness_event
505                    {
506                        Ok(evt) => {
507                            let ke = evt.key_expr().as_keyexpr();
508                            if let Ok(parsed) = ke_liveliness_all::parse(ke) {
509                                let zenoh_id = parsed.zenoh_id();
510                                if zenoh_id == &*self.zsession.zid().into_keyexpr() {
511                                    // ignore own announcements
512                                    continue;
513                                }
514                                match (parsed.remaining(), evt.kind())  {
515                                    // New remote bridge detected
516                                    (None, SampleKind::Put) => {
517                                        tracing::info!("New ROS 2 bridge detected: {}", zenoh_id);
518                                        // make each routes for a TRANSIENT_LOCAL Subscriber to query historical publications from this new plugin
519                                        routes_mgr.query_all_historical_publications(zenoh_id).await;
520                                    }
521                                    // New remote bridge left
522                                    (None, SampleKind::Delete) => tracing::info!("Remote ROS 2 bridge left: {}", zenoh_id),
523                                    // the liveliness token corresponds to a ROS2 announcement
524                                    (Some(remaining), _) => {
525                                        // parse it and pass ROS2AnnouncementEvent to RoutesMgr
526                                        match self.parse_announcement_event(ke, &remaining.as_str()[..3], evt.kind()) {
527                                            Ok(evt) => {
528                                                if self.is_announcement_allowed(&evt) {
529                                                    tracing::info!("Remote bridge {zenoh_id} {evt} - Allowed");
530                                                    routes_mgr.on_ros_announcement_event(evt).await
531                                                        .unwrap_or_else(|e| tracing::warn!("Error treating announcement event: {e}"));
532                                                } else {
533                                                    tracing::debug!("Remote bridge {zenoh_id} {evt} - Matching entity denied per config");
534                                                }
535                                            },
536                                            Err(e) =>
537                                                tracing::warn!("Received unexpected liveliness key expression '{ke}': {e}")
538                                        }
539                                    }
540                                }
541                            } else {
542                                tracing::warn!("Received unexpected liveliness key expression '{ke}'");
543                            }
544                        },
545                        Err(e) => tracing::warn!("Error receiving liveliness event: {e}")
546                    }
547                },
548
549                get_request = admin_queryable.recv_async() => {
550                    if let Ok(query) = get_request {
551                        self.treat_admin_query(&query).await;
552                        // pass query to discovery_mgr
553                        discovery_mgr.treat_admin_query(&query, &admin_prefix);
554                        // pass query to discovery_mgr
555                        routes_mgr.treat_admin_query(&query).await;
556                    } else {
557                        tracing::warn!("AdminSpace queryable was closed!");
558                    }
559                }
560            )
561        }
562    }
563
564    fn parse_announcement_event(
565        &self,
566        liveliness_ke: &keyexpr,
567        iface_kind: &str,
568        sample_kind: SampleKind,
569    ) -> Result<ROS2AnnouncementEvent, String> {
570        use ROS2AnnouncementEvent::*;
571        tracing::debug!("Received liveliness event: {sample_kind} on {liveliness_ke}");
572        match (iface_kind, sample_kind) {
573            ("MP/", SampleKind::Put) => parse_ke_liveliness_pub(liveliness_ke)
574                .map_err(|e| format!("Received invalid liveliness token: {e}"))
575                .map(
576                    |(zenoh_id, zenoh_key_expr, ros2_type, keyless, writer_qos)| AnnouncedMsgPub {
577                        zenoh_id,
578                        zenoh_key_expr,
579                        ros2_type,
580                        keyless,
581                        writer_qos,
582                    },
583                ),
584            ("MP/", SampleKind::Delete) => parse_ke_liveliness_pub(liveliness_ke)
585                .map_err(|e| format!("Received invalid liveliness token: {e}"))
586                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgPub {
587                    zenoh_id,
588                    zenoh_key_expr,
589                }),
590            ("MS/", SampleKind::Put) => parse_ke_liveliness_sub(liveliness_ke)
591                .map_err(|e| format!("Received invalid liveliness token: {e}"))
592                .map(
593                    |(zenoh_id, zenoh_key_expr, ros2_type, keyless, reader_qos)| AnnouncedMsgSub {
594                        zenoh_id,
595                        zenoh_key_expr,
596                        ros2_type,
597                        keyless,
598                        reader_qos,
599                    },
600                ),
601            ("MS/", SampleKind::Delete) => parse_ke_liveliness_sub(liveliness_ke)
602                .map_err(|e| format!("Received invalid liveliness token: {e}"))
603                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgSub {
604                    zenoh_id,
605                    zenoh_key_expr,
606                }),
607            ("SS/", SampleKind::Put) => parse_ke_liveliness_service_srv(liveliness_ke)
608                .map_err(|e| format!("Received invalid liveliness token: {e}"))
609                .map(
610                    |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceSrv {
611                        zenoh_id,
612                        zenoh_key_expr,
613                        ros2_type,
614                    },
615                ),
616            ("SS/", SampleKind::Delete) => parse_ke_liveliness_service_srv(liveliness_ke)
617                .map_err(|e| format!("Received invalid liveliness token: {e}"))
618                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceSrv {
619                    zenoh_id,
620                    zenoh_key_expr,
621                }),
622            ("SC/", SampleKind::Put) => parse_ke_liveliness_service_cli(liveliness_ke)
623                .map_err(|e| format!("Received invalid liveliness token: {e}"))
624                .map(
625                    |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceCli {
626                        zenoh_id,
627                        zenoh_key_expr,
628                        ros2_type,
629                    },
630                ),
631            ("SC/", SampleKind::Delete) => parse_ke_liveliness_service_cli(liveliness_ke)
632                .map_err(|e| format!("Received invalid liveliness token: {e}"))
633                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceCli {
634                    zenoh_id,
635                    zenoh_key_expr,
636                }),
637            ("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke)
638                .map_err(|e| format!("Received invalid liveliness token: {e}"))
639                .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv {
640                    zenoh_id,
641                    zenoh_key_expr,
642                    ros2_type,
643                }),
644            ("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke)
645                .map_err(|e| format!("Received invalid liveliness token: {e}"))
646                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionSrv {
647                    zenoh_id,
648                    zenoh_key_expr,
649                }),
650            ("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke)
651                .map_err(|e| format!("Received invalid liveliness token: {e}"))
652                .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli {
653                    zenoh_id,
654                    zenoh_key_expr,
655                    ros2_type,
656                }),
657            ("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke)
658                .map_err(|e| format!("Received invalid liveliness token: {e}"))
659                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionCli {
660                    zenoh_id,
661                    zenoh_key_expr,
662                }),
663            _ => Err(format!("invalid ROS2 interface kind: {iface_kind}")),
664        }
665    }
666
667    fn is_allowed(&self, evt: &ROS2DiscoveryEvent) -> bool {
668        if let Some(allowance) = &self.config.allowance {
669            use ROS2DiscoveryEvent::*;
670            match evt {
671                DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => {
672                    allowance.is_publisher_allowed(&iface.name)
673                }
674                DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => {
675                    allowance.is_subscriber_allowed(&iface.name)
676                }
677                DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => {
678                    allowance.is_service_srv_allowed(&iface.name)
679                }
680                DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => {
681                    allowance.is_service_cli_allowed(&iface.name)
682                }
683                DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => {
684                    allowance.is_action_srv_allowed(&iface.name)
685                }
686                DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => {
687                    allowance.is_action_cli_allowed(&iface.name)
688                }
689            }
690        } else {
691            // no allow/deny configured => allow all
692            true
693        }
694    }
695
696    // Check if a remote announcement by another bridge is allowed, depending on the matching entity allowance in config.
697    // E.g. a remote announcement of a Publisher on /abc is allowed only if a Subscriber on /abc is allowed in the local config.
698    fn is_announcement_allowed(&self, evt: &ROS2AnnouncementEvent) -> bool {
699        if let Some(allowance) = &self.config.allowance {
700            use ROS2AnnouncementEvent::*;
701            match evt {
702                AnnouncedMsgPub { zenoh_key_expr, .. } | RetiredMsgPub { zenoh_key_expr, .. } => {
703                    allowance
704                        .is_subscriber_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
705                }
706                AnnouncedMsgSub { zenoh_key_expr, .. } | RetiredMsgSub { zenoh_key_expr, .. } => {
707                    allowance
708                        .is_publisher_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
709                }
710                AnnouncedServiceSrv { zenoh_key_expr, .. }
711                | RetiredServiceSrv { zenoh_key_expr, .. } => allowance
712                    .is_service_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
713                AnnouncedServiceCli { zenoh_key_expr, .. }
714                | RetiredServiceCli { zenoh_key_expr, .. } => allowance
715                    .is_service_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
716                AnnouncedActionSrv { zenoh_key_expr, .. }
717                | RetiredActionSrv { zenoh_key_expr, .. } => allowance
718                    .is_action_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
719                AnnouncedActionCli { zenoh_key_expr, .. }
720                | RetiredActionCli { zenoh_key_expr, .. } => allowance
721                    .is_action_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
722            }
723        } else {
724            // no allow/deny configured => allow all
725            true
726        }
727    }
728
729    async fn treat_admin_query(&self, query: &Query) {
730        let query_ke = query.key_expr();
731        if query_ke.is_wild() {
732            // iterate over all admin space to find matching keys and reply for each
733            for (ke, admin_ref) in self.admin_space.iter() {
734                if query_ke.intersects(ke) {
735                    self.send_admin_reply(query, ke, admin_ref).await;
736                }
737            }
738        } else {
739            // sub_ke correspond to 1 key - just get it and reply
740            let own_ke: OwnedKeyExpr = query_ke.to_owned().into();
741            if let Some(admin_ref) = self.admin_space.get(&own_ke) {
742                self.send_admin_reply(query, &own_ke, admin_ref).await;
743            }
744        }
745    }
746
747    async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
748        let z_bytes: ZBytes = match admin_ref {
749            AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
750                Ok(v) => match serde_json::to_vec(&v) {
751                    Ok(bytes) => ZBytes::from(bytes),
752                    Err(e) => {
753                        tracing::warn!("Error transforming JSON to ZBytes: {}", e);
754                        return;
755                    }
756                },
757                Err(e) => {
758                    tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
759                    return;
760                }
761            },
762            AdminRef::Config => match serde_json::to_value(&*self.config) {
763                Ok(v) => match serde_json::to_vec(&v) {
764                    Ok(bytes) => ZBytes::from(bytes),
765                    Err(e) => {
766                        tracing::warn!("Error transforming JSON to ZBytes: {}", e);
767                        return;
768                    }
769                },
770                Err(e) => {
771                    tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
772                    return;
773                }
774            },
775        };
776        if let Err(e) = query
777            .reply(key_expr.to_owned(), z_bytes)
778            .encoding(Encoding::APPLICATION_JSON)
779            .await
780        {
781            tracing::warn!("Error replying to admin query {:?}: {}", query, e);
782        }
783    }
784}
785
786//TODO replace when stable https://github.com/rust-lang/rust/issues/65816
787#[inline]
788pub fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
789    let mut me = ManuallyDrop::new(v);
790    (me.as_mut_ptr(), me.len(), me.capacity())
791}
792
793struct ChannelEvent {
794    tx: Sender<()>,
795}
796
797#[async_trait]
798impl Timed for ChannelEvent {
799    async fn run(&mut self) {
800        if self.tx.send(()).is_err() {
801            tracing::warn!("Error sending periodic timer notification on channel");
802        };
803    }
804}
805
806pub(crate) fn serialize_option_as_bool<S, T>(opt: &Option<T>, s: S) -> Result<S::Ok, S::Error>
807where
808    S: Serializer,
809{
810    s.serialize_bool(opt.is_some())
811}