zenoh_plugin_ros2dds/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14#![allow(deprecated)]
15
16use std::{
17    collections::HashMap,
18    env,
19    future::Future,
20    mem::ManuallyDrop,
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use async_trait::async_trait;
28use cyclors::*;
29use events::ROS2AnnouncementEvent;
30use flume::{unbounded, Receiver, Sender};
31use futures::select;
32use serde::Serializer;
33use tokio::task::JoinHandle;
34use zenoh::{
35    bytes::{Encoding, ZBytes},
36    internal::{
37        plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
38        runtime::DynamicRuntime,
39        zerror, Timed,
40    },
41    key_expr::{
42        format::{kedefine, keformat},
43        keyexpr, OwnedKeyExpr,
44    },
45    liveliness::LivelinessToken,
46    query::Query,
47    sample::SampleKind,
48    Result as ZResult, Session,
49};
50use zenoh_ext::SubscriberBuilderExt;
51use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
52
53pub mod config;
54mod dds_discovery;
55mod dds_types;
56mod dds_utils;
57mod discovered_entities;
58mod discovery_mgr;
59mod events;
60mod gid;
61mod liveliness_mgt;
62mod node_info;
63mod qos_helpers;
64mod ros2_utils;
65mod ros_discovery;
66mod route_action_cli;
67mod route_action_srv;
68mod route_publisher;
69mod route_service_cli;
70mod route_service_srv;
71mod route_subscriber;
72mod routes_mgr;
73use config::{Config, RosAutomaticDiscoveryRange};
74
75use crate::{
76    dds_utils::get_guid,
77    discovery_mgr::DiscoveryMgr,
78    events::ROS2DiscoveryEvent,
79    liveliness_mgt::*,
80    ros2_utils::{key_expr_to_ros2_name, ros_distro_is_less_than},
81    ros_discovery::RosDiscoveryInfoMgr,
82    routes_mgr::RoutesMgr,
83};
84
85lazy_static::lazy_static! {
86    static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
87    static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
88    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
89    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
90               .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
91               .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
92               .enable_all()
93               .build()
94               .expect("Unable to create runtime");
95}
96#[inline(always)]
97pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
98where
99    F: Future + Send + 'static,
100    F::Output: Send + 'static,
101{
102    // Check whether able to get the current runtime
103    match tokio::runtime::Handle::try_current() {
104        Ok(rt) => {
105            // Able to get the current runtime (standalone binary), use the current runtime
106            rt.spawn(task)
107        }
108        Err(_) => {
109            // Unable to get the current runtime (dynamic plugins), reuse the global runtime
110            TOKIO_RUNTIME.spawn(task)
111        }
112    }
113}
114
115lazy_static::lazy_static!(
116
117
118    static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
119
120    static ref KE_ANY_1_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("*") };
121    static ref KE_ANY_N_SEGMENT: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("**") };
122
123    static ref KE_PREFIX_PUB_CACHE: &'static keyexpr =  unsafe { keyexpr::from_str_unchecked("@ros2_pub_cache") };
124);
125
126kedefine!(
127    // Admin space key expressions of plugin's version
128    pub ke_admin_version: "${plugin_status_key:**}/__version__",
129
130    // Admin prefix of this bridge
131    pub ke_admin_prefix: "@/${zenoh_id:*}/ros2/",
132);
133
134// CycloneDDS' localhost-only: set network interface address (shortened form of config would be
135// possible, too, but I think it is clearer to spell it out completely).
136// Empty configuration fragments are ignored, so it is safe to unconditionally append a comma.
137const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General>
138                                                      <Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces>
139                                                  </General></Domain></CycloneDDS>,"#;
140
141// CycloneDDS' enable-shm: enable usage of Iceoryx shared memory
142#[cfg(feature = "dds_shm")]
143const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><SharedMemory><Enable>true</Enable></SharedMemory></Domain></CycloneDDS>,"#;
144
145// interval between each read/write on "ros_discovery_info" topic
146const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 100;
147const ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS: u64 = 100;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(ROS2Plugin);
151
152#[allow(clippy::upper_case_acronyms)]
153pub struct ROS2Plugin;
154
155impl ZenohPlugin for ROS2Plugin {}
156impl Plugin for ROS2Plugin {
157    type StartArgs = DynamicRuntime;
158    type Instance = RunningPlugin;
159
160    const PLUGIN_VERSION: &'static str = plugin_version!();
161    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
162    const DEFAULT_NAME: &'static str = "ros2dds";
163
164    fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
165        // Try to initiate login.
166        // Required in case of dynamic lib, otherwise no logs.
167        // But cannot be done twice in case of static link.
168        zenoh::try_init_log_from_env();
169
170        let runtime_conf = runtime.get_config();
171        let plugin_conf = runtime_conf
172            .get_plugin_config(name)
173            .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
174        let config: Config = serde_json::from_value(plugin_conf.clone())
175            .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176        WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
177        MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
178
179        spawn_runtime(run(runtime.clone(), config));
180
181        Ok(Box::new(ROS2Plugin))
182    }
183}
184impl PluginControl for ROS2Plugin {}
185impl RunningPluginTrait for ROS2Plugin {}
186
187fn create_cyclonedds_config(
188    ros_automatic_discovery_range: RosAutomaticDiscoveryRange,
189    ros_static_peers: Vec<String>,
190) -> String {
191    let mut config = String::new();
192    // Refer to https://github.com/ros2/rmw_cyclonedds/blob/c9e7001e6bf5373bdf1931535354b52eeddb2053/rmw_cyclonedds_cpp/src/rmw_node.cpp#L1134
193    let add_localhost_as_static_peer: bool;
194    let add_static_peers: bool;
195    let disable_multicast: bool;
196    match ros_automatic_discovery_range {
197        RosAutomaticDiscoveryRange::Subnet => {
198            add_localhost_as_static_peer = false;
199            add_static_peers = true;
200            disable_multicast = false;
201        }
202        RosAutomaticDiscoveryRange::SystemDefault => {
203            add_localhost_as_static_peer = false;
204            add_static_peers = false;
205            disable_multicast = false;
206        }
207        RosAutomaticDiscoveryRange::Localhost => {
208            add_localhost_as_static_peer = true;
209            add_static_peers = true;
210            disable_multicast = true;
211        }
212        RosAutomaticDiscoveryRange::Off => {
213            add_localhost_as_static_peer = false;
214            add_static_peers = false;
215            disable_multicast = true;
216        }
217    };
218    if add_localhost_as_static_peer || add_static_peers || disable_multicast {
219        config += "<CycloneDDS><Domain>";
220
221        if disable_multicast {
222            config += "<General><AllowMulticast>false</AllowMulticast></General>";
223        }
224
225        let discovery_off = disable_multicast && !add_localhost_as_static_peer && !add_static_peers;
226        if discovery_off {
227            config += "<Discovery><ParticipantIndex>none</ParticipantIndex>";
228            config += &format!("<Tag>ros_discovery_off_{}</Tag>", std::process::id());
229        } else {
230            config += "<Discovery><ParticipantIndex>auto</ParticipantIndex>";
231            config += "<MaxAutoParticipantIndex>32</MaxAutoParticipantIndex>";
232        }
233
234        if (add_static_peers && !ros_static_peers.is_empty()) || add_localhost_as_static_peer {
235            config += "<Peers>";
236            if add_localhost_as_static_peer {
237                config += "<Peer address=\"localhost\"/>";
238            }
239            for peer in ros_static_peers {
240                config += "<Peer address=\"";
241                config += &peer;
242                config += "\"/>";
243            }
244            config += "</Peers>";
245        }
246
247        config += "</Discovery></Domain></CycloneDDS>,";
248    }
249    config
250}
251
252pub async fn run(runtime: DynamicRuntime, config: Config) {
253    // Try to initiate login.
254    // Required in case of dynamic lib, otherwise no logs.
255    // But cannot be done twice in case of static link.
256    zenoh::try_init_log_from_env();
257    tracing::debug!("ROS2 plugin {}", ROS2Plugin::PLUGIN_VERSION);
258    tracing::info!("ROS2 plugin {config:?}");
259
260    // Check config validity
261    if !regex::Regex::new("/[A-Za-z0-9_/]*")
262        .unwrap()
263        .is_match(&config.namespace)
264    {
265        tracing::error!(
266            r#"Configuration error: invalid namespace "{}" must contain only alphanumeric, '_' or '/' characters and start with '/'"#,
267            config.namespace
268        );
269        return;
270    }
271    if !regex::Regex::new("[A-Za-z0-9_]+")
272        .unwrap()
273        .is_match(&config.nodename)
274    {
275        tracing::error!(
276            r#"Configuration error: invalid nodename "{}" must contain only alphanumeric or '_' characters"#,
277            config.nodename
278        );
279        return;
280    }
281
282    // open zenoh-net Session
283    let zsession = match zenoh::session::init(runtime).await {
284        Ok(session) => Arc::new(session),
285        Err(e) => {
286            tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
287            return;
288        }
289    };
290
291    // Declare plugin's liveliness token
292    let ke_liveliness = keformat!(
293        ke_liveliness_plugin::formatter(),
294        zenoh_id = zsession.zid().into_keyexpr()
295    )
296    .unwrap();
297    let member = match zsession.liveliness().declare_token(ke_liveliness).await {
298        Ok(member) => member,
299        Err(e) => {
300            tracing::error!(
301                "Unable to declare liveliness token for DDS plugin : {:?}",
302                e
303            );
304            return;
305        }
306    };
307
308    // Dynamic Discovery is changed after iron. Need to check the ROS 2 version.
309    // https://docs.ros.org/en/rolling/Tutorials/Advanced/Improved-Dynamic-Discovery.html
310    if ros_distro_is_less_than("iron") {
311        if config.ros_automatic_discovery_range.is_some() {
312            tracing::warn!("ROS_AUTOMATIC_DISCOVERY_RANGE will be ignored since it's not supported before ROS 2 Iron");
313        }
314        if config.ros_static_peers.is_some() {
315            tracing::warn!(
316                "ROS_STATIC_PEERS will be ignored since it's not supported before ROS 2 Iron"
317            );
318        }
319        // if "ros_localhost_only" is set, configure CycloneDDS to use only localhost interface
320        if config.ros_localhost_only {
321            env::set_var(
322                "CYCLONEDDS_URI",
323                format!(
324                    "{}{}",
325                    CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
326                    env::var("CYCLONEDDS_URI").unwrap_or_default()
327                ),
328            );
329        }
330    } else {
331        let (ros_automatic_discovery_range, ros_static_peers) = if config.ros_localhost_only {
332            // If ROS_LOCALHOST_ONLY is set, need to transform into new environmental variables
333            // Refer to https://github.com/ros2/ros2_documentation/pull/3519#discussion_r1186541935
334            tracing::warn!("ROS_LOCALHOST_ONLY is deprecated but still honored if it is enabled. Use ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS instead.");
335            tracing::warn!("'localhost_only' is enabled, 'automatic_discovery_range' and 'static_peers' will be ignored.");
336            (Some(RosAutomaticDiscoveryRange::Localhost), None)
337        } else {
338            (
339                config.ros_automatic_discovery_range,
340                config.ros_static_peers.clone(),
341            )
342        };
343        env::set_var(
344            "CYCLONEDDS_URI",
345            format!(
346                "{}{}",
347                create_cyclonedds_config(
348                    ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
349                    ros_static_peers.unwrap_or(Vec::new())
350                ),
351                env::var("CYCLONEDDS_URI").unwrap_or_default()
352            ),
353        );
354    }
355
356    // if "enable_shm" is set, configure CycloneDDS to use Iceoryx shared memory
357    #[cfg(feature = "dds_shm")]
358    {
359        if config.shm_enabled {
360            env::set_var(
361                "CYCLONEDDS_URI",
362                format!(
363                    "{}{}",
364                    CYCLONEDDS_CONFIG_ENABLE_SHM,
365                    env::var("CYCLONEDDS_URI").unwrap_or_default()
366                ),
367            );
368        }
369    }
370
371    // create DDS Participant
372    tracing::debug!(
373        "Create DDS Participant on domain {} with CYCLONEDDS_URI='{}'",
374        config.domain,
375        env::var("CYCLONEDDS_URI").unwrap_or_default()
376    );
377    let participant =
378        unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
379    tracing::debug!(
380        "ROS2 plugin {} using DDS Participant {} created",
381        zsession.zid(),
382        get_guid(&participant).unwrap()
383    );
384
385    let mut ros2_plugin = ROS2PluginRuntime {
386        config: Arc::new(config),
387        zsession,
388        participant,
389        _member: member,
390        admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
391    };
392
393    ros2_plugin.run().await;
394}
395
396pub struct ROS2PluginRuntime {
397    config: Arc<Config>,
398    // Note: &'a Arc<Session> here to keep the ownership of Session outside this struct
399    // and be able to store the publishers/subscribers it creates in this same struct.
400    zsession: Arc<Session>,
401    participant: dds_entity_t,
402    _member: LivelinessToken,
403    // admin space: index is the admin_keyexpr
404    // value is the JSon string to return to queries.
405    admin_space: HashMap<OwnedKeyExpr, AdminRef>,
406}
407
408// An reference used in admin space to point to a struct (DdsEntity or Route) stored in another map
409#[derive(Debug)]
410enum AdminRef {
411    Config,
412    Version,
413}
414
415impl ROS2PluginRuntime {
416    async fn run(&mut self) {
417        // Subscribe to all liveliness info from other ROS2 plugins
418        let ke_liveliness_all = keformat!(
419            ke_liveliness_all::formatter(),
420            zenoh_id = "*",
421            remaining = "**"
422        )
423        .unwrap();
424        let liveliness_subscriber = self
425            .zsession
426            .liveliness()
427            .declare_subscriber(ke_liveliness_all)
428            .querying()
429            .with(flume::unbounded())
430            .await
431            .expect("Failed to create Liveliness Subscriber");
432
433        // declare admin space queryable
434        let admin_prefix = keformat!(
435            ke_admin_prefix::formatter(),
436            zenoh_id = &self.zsession.zid().into_keyexpr()
437        )
438        .unwrap();
439        let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
440        tracing::debug!("Declare admin space on {}", admin_keyexpr_expr);
441        let admin_queryable = self
442            .zsession
443            .declare_queryable(admin_keyexpr_expr)
444            .await
445            .expect("Failed to create AdminSpace queryable");
446
447        // add plugin's config and version in admin space
448        self.admin_space.insert(
449            &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
450            AdminRef::Config,
451        );
452        self.admin_space.insert(
453            &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
454            AdminRef::Version,
455        );
456
457        // Create and start the RosDiscoveryInfoMgr (managing ros_discovery_info topic)
458        let ros_discovery_mgr = Arc::new(
459            RosDiscoveryInfoMgr::new(
460                self.participant,
461                &self.config.namespace,
462                &self.config.nodename,
463            )
464            .expect("Failed to create RosDiscoveryInfoMgr"),
465        );
466        ros_discovery_mgr.run().await;
467
468        // Create and start DiscoveryManager
469        let (tx, discovery_rcv): (Sender<ROS2DiscoveryEvent>, Receiver<ROS2DiscoveryEvent>) =
470            unbounded();
471        let mut discovery_mgr = DiscoveryMgr::create(self.participant, ros_discovery_mgr.clone());
472        discovery_mgr.run(tx).await;
473
474        // Create RoutesManager
475        let mut routes_mgr = RoutesMgr::new(
476            self.config.clone(),
477            self.zsession.clone(),
478            self.participant,
479            discovery_mgr.discovered_entities.clone(),
480            ros_discovery_mgr,
481            admin_prefix.clone(),
482        );
483
484        loop {
485            select!(
486                evt = discovery_rcv.recv_async() => {
487                    match evt {
488                        Ok(evt) => {
489                            if self.is_allowed(&evt) {
490                                tracing::info!("{evt} - Allowed");
491                                // pass ROS2DiscoveryEvent to RoutesMgr
492                                if let Err(e) = routes_mgr.on_ros_discovery_event(evt).await {
493                                    tracing::warn!("Error updating route: {e}");
494                                }
495                            } else {
496                                tracing::debug!("{evt} - Denied per config");
497                            }
498                        }
499                        Err(e) => tracing::error!("Internal Error: received from DiscoveryMgr: {e}")
500                    }
501                },
502
503                liveliness_event = liveliness_subscriber.recv_async() => {
504                    match liveliness_event
505                    {
506                        Ok(evt) => {
507                            let ke = evt.key_expr().as_keyexpr();
508                            if let Ok(parsed) = ke_liveliness_all::parse(ke) {
509                                let zenoh_id = parsed.zenoh_id();
510                                if zenoh_id == &*self.zsession.zid().into_keyexpr() {
511                                    // ignore own announcements
512                                    continue;
513                                }
514                                match (parsed.remaining(), evt.kind())  {
515                                    // New remote bridge detected
516                                    (None, SampleKind::Put) => {
517                                        tracing::info!("New ROS 2 bridge detected: {}", zenoh_id);
518                                    }
519                                    // New remote bridge left
520                                    (None, SampleKind::Delete) => tracing::info!("Remote ROS 2 bridge left: {}", zenoh_id),
521                                    // the liveliness token corresponds to a ROS2 announcement
522                                    (Some(remaining), _) => {
523                                        // parse it and pass ROS2AnnouncementEvent to RoutesMgr
524                                        match self.parse_announcement_event(ke, &remaining.as_str()[..3], evt.kind()) {
525                                            Ok(evt) => {
526                                                if self.is_announcement_allowed(&evt) {
527                                                    tracing::info!("Remote bridge {zenoh_id} {evt} - Allowed");
528                                                    routes_mgr.on_ros_announcement_event(evt).await
529                                                        .unwrap_or_else(|e| tracing::warn!("Error treating announcement event: {e}"));
530                                                } else {
531                                                    tracing::debug!("Remote bridge {zenoh_id} {evt} - Matching entity denied per config");
532                                                }
533                                            },
534                                            Err(e) =>
535                                                tracing::warn!("Received unexpected liveliness key expression '{ke}': {e}")
536                                        }
537                                    }
538                                }
539                            } else {
540                                tracing::warn!("Received unexpected liveliness key expression '{ke}'");
541                            }
542                        },
543                        Err(e) => tracing::warn!("Error receiving liveliness event: {e}")
544                    }
545                },
546
547                get_request = admin_queryable.recv_async() => {
548                    if let Ok(query) = get_request {
549                        self.treat_admin_query(&query).await;
550                        // pass query to discovery_mgr
551                        discovery_mgr.treat_admin_query(&query, &admin_prefix);
552                        // pass query to discovery_mgr
553                        routes_mgr.treat_admin_query(&query).await;
554                    } else {
555                        tracing::warn!("AdminSpace queryable was closed!");
556                    }
557                }
558            )
559        }
560    }
561
562    fn parse_announcement_event(
563        &self,
564        liveliness_ke: &keyexpr,
565        iface_kind: &str,
566        sample_kind: SampleKind,
567    ) -> Result<ROS2AnnouncementEvent, String> {
568        use ROS2AnnouncementEvent::*;
569        tracing::debug!("Received liveliness event: {sample_kind} on {liveliness_ke}");
570        match (iface_kind, sample_kind) {
571            ("MP/", SampleKind::Put) => parse_ke_liveliness_pub(liveliness_ke)
572                .map_err(|e| format!("Received invalid liveliness token: {e}"))
573                .map(
574                    |(zenoh_id, zenoh_key_expr, ros2_type, keyless, writer_qos)| AnnouncedMsgPub {
575                        zenoh_id,
576                        zenoh_key_expr,
577                        ros2_type,
578                        keyless,
579                        writer_qos,
580                    },
581                ),
582            ("MP/", SampleKind::Delete) => parse_ke_liveliness_pub(liveliness_ke)
583                .map_err(|e| format!("Received invalid liveliness token: {e}"))
584                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgPub {
585                    zenoh_id,
586                    zenoh_key_expr,
587                }),
588            ("MS/", SampleKind::Put) => parse_ke_liveliness_sub(liveliness_ke)
589                .map_err(|e| format!("Received invalid liveliness token: {e}"))
590                .map(
591                    |(zenoh_id, zenoh_key_expr, ros2_type, keyless, reader_qos)| AnnouncedMsgSub {
592                        zenoh_id,
593                        zenoh_key_expr,
594                        ros2_type,
595                        keyless,
596                        reader_qos,
597                    },
598                ),
599            ("MS/", SampleKind::Delete) => parse_ke_liveliness_sub(liveliness_ke)
600                .map_err(|e| format!("Received invalid liveliness token: {e}"))
601                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgSub {
602                    zenoh_id,
603                    zenoh_key_expr,
604                }),
605            ("SS/", SampleKind::Put) => parse_ke_liveliness_service_srv(liveliness_ke)
606                .map_err(|e| format!("Received invalid liveliness token: {e}"))
607                .map(
608                    |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceSrv {
609                        zenoh_id,
610                        zenoh_key_expr,
611                        ros2_type,
612                    },
613                ),
614            ("SS/", SampleKind::Delete) => parse_ke_liveliness_service_srv(liveliness_ke)
615                .map_err(|e| format!("Received invalid liveliness token: {e}"))
616                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceSrv {
617                    zenoh_id,
618                    zenoh_key_expr,
619                }),
620            ("SC/", SampleKind::Put) => parse_ke_liveliness_service_cli(liveliness_ke)
621                .map_err(|e| format!("Received invalid liveliness token: {e}"))
622                .map(
623                    |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceCli {
624                        zenoh_id,
625                        zenoh_key_expr,
626                        ros2_type,
627                    },
628                ),
629            ("SC/", SampleKind::Delete) => parse_ke_liveliness_service_cli(liveliness_ke)
630                .map_err(|e| format!("Received invalid liveliness token: {e}"))
631                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceCli {
632                    zenoh_id,
633                    zenoh_key_expr,
634                }),
635            ("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke)
636                .map_err(|e| format!("Received invalid liveliness token: {e}"))
637                .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv {
638                    zenoh_id,
639                    zenoh_key_expr,
640                    ros2_type,
641                }),
642            ("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke)
643                .map_err(|e| format!("Received invalid liveliness token: {e}"))
644                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionSrv {
645                    zenoh_id,
646                    zenoh_key_expr,
647                }),
648            ("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke)
649                .map_err(|e| format!("Received invalid liveliness token: {e}"))
650                .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli {
651                    zenoh_id,
652                    zenoh_key_expr,
653                    ros2_type,
654                }),
655            ("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke)
656                .map_err(|e| format!("Received invalid liveliness token: {e}"))
657                .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionCli {
658                    zenoh_id,
659                    zenoh_key_expr,
660                }),
661            _ => Err(format!("invalid ROS2 interface kind: {iface_kind}")),
662        }
663    }
664
665    fn is_allowed(&self, evt: &ROS2DiscoveryEvent) -> bool {
666        if let Some(allowance) = &self.config.allowance {
667            use ROS2DiscoveryEvent::*;
668            match evt {
669                DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => {
670                    allowance.is_publisher_allowed(&iface.name)
671                }
672                DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => {
673                    allowance.is_subscriber_allowed(&iface.name)
674                }
675                DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => {
676                    allowance.is_service_srv_allowed(&iface.name)
677                }
678                DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => {
679                    allowance.is_service_cli_allowed(&iface.name)
680                }
681                DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => {
682                    allowance.is_action_srv_allowed(&iface.name)
683                }
684                DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => {
685                    allowance.is_action_cli_allowed(&iface.name)
686                }
687            }
688        } else {
689            // no allow/deny configured => allow all
690            true
691        }
692    }
693
694    // Check if a remote announcement by another bridge is allowed, depending on the matching entity allowance in config.
695    // E.g. a remote announcement of a Publisher on /abc is allowed only if a Subscriber on /abc is allowed in the local config.
696    fn is_announcement_allowed(&self, evt: &ROS2AnnouncementEvent) -> bool {
697        if let Some(allowance) = &self.config.allowance {
698            use ROS2AnnouncementEvent::*;
699            match evt {
700                AnnouncedMsgPub { zenoh_key_expr, .. } | RetiredMsgPub { zenoh_key_expr, .. } => {
701                    allowance
702                        .is_subscriber_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
703                }
704                AnnouncedMsgSub { zenoh_key_expr, .. } | RetiredMsgSub { zenoh_key_expr, .. } => {
705                    allowance
706                        .is_publisher_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
707                }
708                AnnouncedServiceSrv { zenoh_key_expr, .. }
709                | RetiredServiceSrv { zenoh_key_expr, .. } => allowance
710                    .is_service_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
711                AnnouncedServiceCli { zenoh_key_expr, .. }
712                | RetiredServiceCli { zenoh_key_expr, .. } => allowance
713                    .is_service_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
714                AnnouncedActionSrv { zenoh_key_expr, .. }
715                | RetiredActionSrv { zenoh_key_expr, .. } => allowance
716                    .is_action_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
717                AnnouncedActionCli { zenoh_key_expr, .. }
718                | RetiredActionCli { zenoh_key_expr, .. } => allowance
719                    .is_action_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
720            }
721        } else {
722            // no allow/deny configured => allow all
723            true
724        }
725    }
726
727    async fn treat_admin_query(&self, query: &Query) {
728        let query_ke = query.key_expr();
729        if query_ke.is_wild() {
730            // iterate over all admin space to find matching keys and reply for each
731            for (ke, admin_ref) in self.admin_space.iter() {
732                if query_ke.intersects(ke) {
733                    self.send_admin_reply(query, ke, admin_ref).await;
734                }
735            }
736        } else {
737            // sub_ke correspond to 1 key - just get it and reply
738            let own_ke: OwnedKeyExpr = query_ke.to_owned().into();
739            if let Some(admin_ref) = self.admin_space.get(&own_ke) {
740                self.send_admin_reply(query, &own_ke, admin_ref).await;
741            }
742        }
743    }
744
745    async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
746        let z_bytes: ZBytes = match admin_ref {
747            AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
748                Ok(v) => match serde_json::to_vec(&v) {
749                    Ok(bytes) => ZBytes::from(bytes),
750                    Err(e) => {
751                        tracing::warn!("Error transforming JSON to ZBytes: {}", e);
752                        return;
753                    }
754                },
755                Err(e) => {
756                    tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
757                    return;
758                }
759            },
760            AdminRef::Config => match serde_json::to_value(&*self.config) {
761                Ok(v) => match serde_json::to_vec(&v) {
762                    Ok(bytes) => ZBytes::from(bytes),
763                    Err(e) => {
764                        tracing::warn!("Error transforming JSON to ZBytes: {}", e);
765                        return;
766                    }
767                },
768                Err(e) => {
769                    tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
770                    return;
771                }
772            },
773        };
774        if let Err(e) = query
775            .reply(key_expr.to_owned(), z_bytes)
776            .encoding(Encoding::APPLICATION_JSON)
777            .await
778        {
779            tracing::warn!("Error replying to admin query {:?}: {}", query, e);
780        }
781    }
782}
783
784//TODO replace when stable https://github.com/rust-lang/rust/issues/65816
785#[inline]
786pub fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
787    let mut me = ManuallyDrop::new(v);
788    (me.as_mut_ptr(), me.len(), me.capacity())
789}
790
791struct ChannelEvent {
792    tx: Sender<()>,
793}
794
795#[async_trait]
796impl Timed for ChannelEvent {
797    async fn run(&mut self) {
798        if self.tx.send(()).is_err() {
799            tracing::warn!("Error sending periodic timer notification on channel");
800        };
801    }
802}
803
804pub(crate) fn serialize_option_as_bool<S, T>(opt: &Option<T>, s: S) -> Result<S::Ok, S::Error>
805where
806    S: Serializer,
807{
808    s.serialize_bool(opt.is_some())
809}