1#![allow(deprecated)]
15
16use std::{
17 collections::HashMap,
18 env,
19 future::Future,
20 mem::ManuallyDrop,
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use async_trait::async_trait;
28use cyclors::*;
29use events::ROS2AnnouncementEvent;
30use flume::{unbounded, Receiver, Sender};
31use futures::select;
32use serde::Serializer;
33use tokio::task::JoinHandle;
34use zenoh::{
35 bytes::{Encoding, ZBytes},
36 internal::{
37 plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
38 runtime::Runtime,
39 zerror, Timed,
40 },
41 key_expr::{
42 format::{kedefine, keformat},
43 keyexpr, OwnedKeyExpr,
44 },
45 liveliness::LivelinessToken,
46 query::Query,
47 sample::SampleKind,
48 Result as ZResult, Session,
49};
50use zenoh_ext::SubscriberBuilderExt;
51use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
52
53pub mod config;
54mod dds_discovery;
55mod dds_types;
56mod dds_utils;
57mod discovered_entities;
58mod discovery_mgr;
59mod events;
60mod gid;
61mod liveliness_mgt;
62mod node_info;
63mod qos_helpers;
64mod ros2_utils;
65mod ros_discovery;
66mod route_action_cli;
67mod route_action_srv;
68mod route_publisher;
69mod route_service_cli;
70mod route_service_srv;
71mod route_subscriber;
72mod routes_mgr;
73use config::{Config, RosAutomaticDiscoveryRange};
74
75use crate::{
76 dds_utils::get_guid,
77 discovery_mgr::DiscoveryMgr,
78 events::ROS2DiscoveryEvent,
79 liveliness_mgt::*,
80 ros2_utils::{key_expr_to_ros2_name, ros_distro_is_less_than},
81 ros_discovery::RosDiscoveryInfoMgr,
82 routes_mgr::RoutesMgr,
83};
84
85lazy_static::lazy_static! {
86 static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
87 static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
88 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
90 .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
91 .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
92 .enable_all()
93 .build()
94 .expect("Unable to create runtime");
95}
96#[inline(always)]
97pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
98where
99 F: Future + Send + 'static,
100 F::Output: Send + 'static,
101{
102 match tokio::runtime::Handle::try_current() {
104 Ok(rt) => {
105 rt.spawn(task)
107 }
108 Err(_) => {
109 TOKIO_RUNTIME.spawn(task)
111 }
112 }
113}
114
115lazy_static::lazy_static!(
116
117
118 static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
119
120 static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
121 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
122
123 static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@ros2_pub_cache") };
124);
125
126kedefine!(
127 pub ke_admin_version: "${plugin_status_key:**}/__version__",
129
130 pub ke_admin_prefix: "@/${zenoh_id:*}/ros2/",
132);
133
134const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General>
138 <Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces>
139 </General></Domain></CycloneDDS>,"#;
140
141#[cfg(feature = "dds_shm")]
143const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><SharedMemory><Enable>true</Enable></SharedMemory></Domain></CycloneDDS>,"#;
144
145const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 100;
147const ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS: u64 = 100;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(ROS2Plugin);
151
152#[allow(clippy::upper_case_acronyms)]
153pub struct ROS2Plugin;
154
155impl ZenohPlugin for ROS2Plugin {}
156impl Plugin for ROS2Plugin {
157 type StartArgs = Runtime;
158 type Instance = RunningPlugin;
159
160 const PLUGIN_VERSION: &'static str = plugin_version!();
161 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
162 const DEFAULT_NAME: &'static str = "ros2dds";
163
164 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
165 zenoh::try_init_log_from_env();
169
170 let runtime_conf = runtime.config().lock();
171 let plugin_conf = runtime_conf
172 .plugin(name)
173 .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
174 let config: Config = serde_json::from_value(plugin_conf.clone())
175 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176 WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
177 MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
178
179 spawn_runtime(run(runtime.clone(), config));
180
181 Ok(Box::new(ROS2Plugin))
182 }
183}
184impl PluginControl for ROS2Plugin {}
185impl RunningPluginTrait for ROS2Plugin {}
186
187fn create_cyclonedds_config(
188 ros_automatic_discovery_range: RosAutomaticDiscoveryRange,
189 ros_static_peers: Vec<String>,
190) -> String {
191 let mut config = String::new();
192 let add_localhost_as_static_peer: bool;
194 let add_static_peers: bool;
195 let disable_multicast: bool;
196 match ros_automatic_discovery_range {
197 RosAutomaticDiscoveryRange::Subnet => {
198 add_localhost_as_static_peer = false;
199 add_static_peers = true;
200 disable_multicast = false;
201 }
202 RosAutomaticDiscoveryRange::SystemDefault => {
203 add_localhost_as_static_peer = false;
204 add_static_peers = false;
205 disable_multicast = false;
206 }
207 RosAutomaticDiscoveryRange::Localhost => {
208 add_localhost_as_static_peer = true;
209 add_static_peers = true;
210 disable_multicast = true;
211 }
212 RosAutomaticDiscoveryRange::Off => {
213 add_localhost_as_static_peer = false;
214 add_static_peers = false;
215 disable_multicast = true;
216 }
217 };
218 if add_localhost_as_static_peer || add_static_peers || disable_multicast {
219 config += "<CycloneDDS><Domain>";
220
221 if disable_multicast {
222 config += "<General><AllowMulticast>false</AllowMulticast></General>";
223 }
224
225 let discovery_off = disable_multicast && !add_localhost_as_static_peer && !add_static_peers;
226 if discovery_off {
227 config += "<Discovery><ParticipantIndex>none</ParticipantIndex>";
228 config += &format!("<Tag>ros_discovery_off_{}</Tag>", std::process::id());
229 } else {
230 config += "<Discovery><ParticipantIndex>auto</ParticipantIndex>";
231 config += "<MaxAutoParticipantIndex>32</MaxAutoParticipantIndex>";
232 }
233
234 if (add_static_peers && !ros_static_peers.is_empty()) || add_localhost_as_static_peer {
235 config += "<Peers>";
236 if add_localhost_as_static_peer {
237 config += "<Peer address=\"localhost\"/>";
238 }
239 for peer in ros_static_peers {
240 config += "<Peer address=\"";
241 config += &peer;
242 config += "\"/>";
243 }
244 config += "</Peers>";
245 }
246
247 config += "</Discovery></Domain></CycloneDDS>,";
248 }
249 config
250}
251
252pub async fn run(runtime: Runtime, config: Config) {
253 zenoh::try_init_log_from_env();
257 tracing::debug!("ROS2 plugin {}", ROS2Plugin::PLUGIN_VERSION);
258 tracing::info!("ROS2 plugin {config:?}");
259
260 if !regex::Regex::new("/[A-Za-z0-9_/]*")
262 .unwrap()
263 .is_match(&config.namespace)
264 {
265 tracing::error!(
266 r#"Configuration error: invalid namespace "{}" must contain only alphanumeric, '_' or '/' characters and start with '/'"#,
267 config.namespace
268 );
269 return;
270 }
271 if !regex::Regex::new("[A-Za-z0-9_]+")
272 .unwrap()
273 .is_match(&config.nodename)
274 {
275 tracing::error!(
276 r#"Configuration error: invalid nodename "{}" must contain only alphanumeric or '_' characters"#,
277 config.nodename
278 );
279 return;
280 }
281
282 let zsession = match zenoh::session::init(runtime).await {
284 Ok(session) => Arc::new(session),
285 Err(e) => {
286 tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
287 return;
288 }
289 };
290
291 let ke_liveliness = keformat!(
293 ke_liveliness_plugin::formatter(),
294 zenoh_id = zsession.zid().into_keyexpr()
295 )
296 .unwrap();
297 let member = match zsession.liveliness().declare_token(ke_liveliness).await {
298 Ok(member) => member,
299 Err(e) => {
300 tracing::error!(
301 "Unable to declare liveliness token for DDS plugin : {:?}",
302 e
303 );
304 return;
305 }
306 };
307
308 if ros_distro_is_less_than("iron") {
311 if config.ros_automatic_discovery_range.is_some() {
312 tracing::warn!("ROS_AUTOMATIC_DISCOVERY_RANGE will be ignored since it's not supported before ROS 2 Iron");
313 }
314 if config.ros_static_peers.is_some() {
315 tracing::warn!(
316 "ROS_STATIC_PEERS will be ignored since it's not supported before ROS 2 Iron"
317 );
318 }
319 if config.ros_localhost_only {
321 env::set_var(
322 "CYCLONEDDS_URI",
323 format!(
324 "{}{}",
325 CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
326 env::var("CYCLONEDDS_URI").unwrap_or_default()
327 ),
328 );
329 }
330 } else {
331 let (ros_automatic_discovery_range, ros_static_peers) = if config.ros_localhost_only {
332 tracing::warn!("ROS_LOCALHOST_ONLY is deprecated but still honored if it is enabled. Use ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS instead.");
335 tracing::warn!("'localhost_only' is enabled, 'automatic_discovery_range' and 'static_peers' will be ignored.");
336 (Some(RosAutomaticDiscoveryRange::Localhost), None)
337 } else {
338 (
339 config.ros_automatic_discovery_range,
340 config.ros_static_peers.clone(),
341 )
342 };
343 env::set_var(
344 "CYCLONEDDS_URI",
345 format!(
346 "{}{}",
347 create_cyclonedds_config(
348 ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
349 ros_static_peers.unwrap_or(Vec::new())
350 ),
351 env::var("CYCLONEDDS_URI").unwrap_or_default()
352 ),
353 );
354 }
355
356 #[cfg(feature = "dds_shm")]
358 {
359 if config.shm_enabled {
360 env::set_var(
361 "CYCLONEDDS_URI",
362 format!(
363 "{}{}",
364 CYCLONEDDS_CONFIG_ENABLE_SHM,
365 env::var("CYCLONEDDS_URI").unwrap_or_default()
366 ),
367 );
368 }
369 }
370
371 tracing::debug!(
373 "Create DDS Participant on domain {} with CYCLONEDDS_URI='{}'",
374 config.domain,
375 env::var("CYCLONEDDS_URI").unwrap_or_default()
376 );
377 let participant =
378 unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
379 tracing::debug!(
380 "ROS2 plugin {} using DDS Participant {} created",
381 zsession.zid(),
382 get_guid(&participant).unwrap()
383 );
384
385 let mut ros2_plugin = ROS2PluginRuntime {
386 config: Arc::new(config),
387 zsession,
388 participant,
389 _member: member,
390 admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
391 };
392
393 ros2_plugin.run().await;
394}
395
396pub struct ROS2PluginRuntime {
397 config: Arc<Config>,
398 zsession: Arc<Session>,
401 participant: dds_entity_t,
402 _member: LivelinessToken,
403 admin_space: HashMap<OwnedKeyExpr, AdminRef>,
406}
407
408#[derive(Debug)]
410enum AdminRef {
411 Config,
412 Version,
413}
414
415impl ROS2PluginRuntime {
416 async fn run(&mut self) {
417 let ke_liveliness_all = keformat!(
419 ke_liveliness_all::formatter(),
420 zenoh_id = "*",
421 remaining = "**"
422 )
423 .unwrap();
424 let liveliness_subscriber = self
425 .zsession
426 .liveliness()
427 .declare_subscriber(ke_liveliness_all)
428 .querying()
429 .with(flume::unbounded())
430 .await
431 .expect("Failed to create Liveliness Subscriber");
432
433 let admin_prefix = keformat!(
435 ke_admin_prefix::formatter(),
436 zenoh_id = &self.zsession.zid().into_keyexpr()
437 )
438 .unwrap();
439 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
440 tracing::debug!("Declare admin space on {}", admin_keyexpr_expr);
441 let admin_queryable = self
442 .zsession
443 .declare_queryable(admin_keyexpr_expr)
444 .await
445 .expect("Failed to create AdminSpace queryable");
446
447 self.admin_space.insert(
449 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
450 AdminRef::Config,
451 );
452 self.admin_space.insert(
453 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
454 AdminRef::Version,
455 );
456
457 let ros_discovery_mgr = Arc::new(
459 RosDiscoveryInfoMgr::new(
460 self.participant,
461 &self.config.namespace,
462 &self.config.nodename,
463 )
464 .expect("Failed to create RosDiscoveryInfoMgr"),
465 );
466 ros_discovery_mgr.run().await;
467
468 let (tx, discovery_rcv): (Sender<ROS2DiscoveryEvent>, Receiver<ROS2DiscoveryEvent>) =
470 unbounded();
471 let mut discovery_mgr = DiscoveryMgr::create(self.participant, ros_discovery_mgr.clone());
472 discovery_mgr.run(tx).await;
473
474 let mut routes_mgr = RoutesMgr::new(
476 self.config.clone(),
477 self.zsession.clone(),
478 self.participant,
479 discovery_mgr.discovered_entities.clone(),
480 ros_discovery_mgr,
481 admin_prefix.clone(),
482 );
483
484 loop {
485 select!(
486 evt = discovery_rcv.recv_async() => {
487 match evt {
488 Ok(evt) => {
489 if self.is_allowed(&evt) {
490 tracing::info!("{evt} - Allowed");
491 if let Err(e) = routes_mgr.on_ros_discovery_event(evt).await {
493 tracing::warn!("Error updating route: {e}");
494 }
495 } else {
496 tracing::debug!("{evt} - Denied per config");
497 }
498 }
499 Err(e) => tracing::error!("Internal Error: received from DiscoveryMgr: {e}")
500 }
501 },
502
503 liveliness_event = liveliness_subscriber.recv_async() => {
504 match liveliness_event
505 {
506 Ok(evt) => {
507 let ke = evt.key_expr().as_keyexpr();
508 if let Ok(parsed) = ke_liveliness_all::parse(ke) {
509 let zenoh_id = parsed.zenoh_id();
510 if zenoh_id == &*self.zsession.zid().into_keyexpr() {
511 continue;
513 }
514 match (parsed.remaining(), evt.kind()) {
515 (None, SampleKind::Put) => {
517 tracing::info!("New ROS 2 bridge detected: {}", zenoh_id);
518 routes_mgr.query_all_historical_publications(zenoh_id).await;
520 }
521 (None, SampleKind::Delete) => tracing::info!("Remote ROS 2 bridge left: {}", zenoh_id),
523 (Some(remaining), _) => {
525 match self.parse_announcement_event(ke, &remaining.as_str()[..3], evt.kind()) {
527 Ok(evt) => {
528 if self.is_announcement_allowed(&evt) {
529 tracing::info!("Remote bridge {zenoh_id} {evt} - Allowed");
530 routes_mgr.on_ros_announcement_event(evt).await
531 .unwrap_or_else(|e| tracing::warn!("Error treating announcement event: {e}"));
532 } else {
533 tracing::debug!("Remote bridge {zenoh_id} {evt} - Matching entity denied per config");
534 }
535 },
536 Err(e) =>
537 tracing::warn!("Received unexpected liveliness key expression '{ke}': {e}")
538 }
539 }
540 }
541 } else {
542 tracing::warn!("Received unexpected liveliness key expression '{ke}'");
543 }
544 },
545 Err(e) => tracing::warn!("Error receiving liveliness event: {e}")
546 }
547 },
548
549 get_request = admin_queryable.recv_async() => {
550 if let Ok(query) = get_request {
551 self.treat_admin_query(&query).await;
552 discovery_mgr.treat_admin_query(&query, &admin_prefix);
554 routes_mgr.treat_admin_query(&query).await;
556 } else {
557 tracing::warn!("AdminSpace queryable was closed!");
558 }
559 }
560 )
561 }
562 }
563
564 fn parse_announcement_event(
565 &self,
566 liveliness_ke: &keyexpr,
567 iface_kind: &str,
568 sample_kind: SampleKind,
569 ) -> Result<ROS2AnnouncementEvent, String> {
570 use ROS2AnnouncementEvent::*;
571 tracing::debug!("Received liveliness event: {sample_kind} on {liveliness_ke}");
572 match (iface_kind, sample_kind) {
573 ("MP/", SampleKind::Put) => parse_ke_liveliness_pub(liveliness_ke)
574 .map_err(|e| format!("Received invalid liveliness token: {e}"))
575 .map(
576 |(zenoh_id, zenoh_key_expr, ros2_type, keyless, writer_qos)| AnnouncedMsgPub {
577 zenoh_id,
578 zenoh_key_expr,
579 ros2_type,
580 keyless,
581 writer_qos,
582 },
583 ),
584 ("MP/", SampleKind::Delete) => parse_ke_liveliness_pub(liveliness_ke)
585 .map_err(|e| format!("Received invalid liveliness token: {e}"))
586 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgPub {
587 zenoh_id,
588 zenoh_key_expr,
589 }),
590 ("MS/", SampleKind::Put) => parse_ke_liveliness_sub(liveliness_ke)
591 .map_err(|e| format!("Received invalid liveliness token: {e}"))
592 .map(
593 |(zenoh_id, zenoh_key_expr, ros2_type, keyless, reader_qos)| AnnouncedMsgSub {
594 zenoh_id,
595 zenoh_key_expr,
596 ros2_type,
597 keyless,
598 reader_qos,
599 },
600 ),
601 ("MS/", SampleKind::Delete) => parse_ke_liveliness_sub(liveliness_ke)
602 .map_err(|e| format!("Received invalid liveliness token: {e}"))
603 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgSub {
604 zenoh_id,
605 zenoh_key_expr,
606 }),
607 ("SS/", SampleKind::Put) => parse_ke_liveliness_service_srv(liveliness_ke)
608 .map_err(|e| format!("Received invalid liveliness token: {e}"))
609 .map(
610 |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceSrv {
611 zenoh_id,
612 zenoh_key_expr,
613 ros2_type,
614 },
615 ),
616 ("SS/", SampleKind::Delete) => parse_ke_liveliness_service_srv(liveliness_ke)
617 .map_err(|e| format!("Received invalid liveliness token: {e}"))
618 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceSrv {
619 zenoh_id,
620 zenoh_key_expr,
621 }),
622 ("SC/", SampleKind::Put) => parse_ke_liveliness_service_cli(liveliness_ke)
623 .map_err(|e| format!("Received invalid liveliness token: {e}"))
624 .map(
625 |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceCli {
626 zenoh_id,
627 zenoh_key_expr,
628 ros2_type,
629 },
630 ),
631 ("SC/", SampleKind::Delete) => parse_ke_liveliness_service_cli(liveliness_ke)
632 .map_err(|e| format!("Received invalid liveliness token: {e}"))
633 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceCli {
634 zenoh_id,
635 zenoh_key_expr,
636 }),
637 ("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke)
638 .map_err(|e| format!("Received invalid liveliness token: {e}"))
639 .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv {
640 zenoh_id,
641 zenoh_key_expr,
642 ros2_type,
643 }),
644 ("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke)
645 .map_err(|e| format!("Received invalid liveliness token: {e}"))
646 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionSrv {
647 zenoh_id,
648 zenoh_key_expr,
649 }),
650 ("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke)
651 .map_err(|e| format!("Received invalid liveliness token: {e}"))
652 .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli {
653 zenoh_id,
654 zenoh_key_expr,
655 ros2_type,
656 }),
657 ("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke)
658 .map_err(|e| format!("Received invalid liveliness token: {e}"))
659 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionCli {
660 zenoh_id,
661 zenoh_key_expr,
662 }),
663 _ => Err(format!("invalid ROS2 interface kind: {iface_kind}")),
664 }
665 }
666
667 fn is_allowed(&self, evt: &ROS2DiscoveryEvent) -> bool {
668 if let Some(allowance) = &self.config.allowance {
669 use ROS2DiscoveryEvent::*;
670 match evt {
671 DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => {
672 allowance.is_publisher_allowed(&iface.name)
673 }
674 DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => {
675 allowance.is_subscriber_allowed(&iface.name)
676 }
677 DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => {
678 allowance.is_service_srv_allowed(&iface.name)
679 }
680 DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => {
681 allowance.is_service_cli_allowed(&iface.name)
682 }
683 DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => {
684 allowance.is_action_srv_allowed(&iface.name)
685 }
686 DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => {
687 allowance.is_action_cli_allowed(&iface.name)
688 }
689 }
690 } else {
691 true
693 }
694 }
695
696 fn is_announcement_allowed(&self, evt: &ROS2AnnouncementEvent) -> bool {
699 if let Some(allowance) = &self.config.allowance {
700 use ROS2AnnouncementEvent::*;
701 match evt {
702 AnnouncedMsgPub { zenoh_key_expr, .. } | RetiredMsgPub { zenoh_key_expr, .. } => {
703 allowance
704 .is_subscriber_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
705 }
706 AnnouncedMsgSub { zenoh_key_expr, .. } | RetiredMsgSub { zenoh_key_expr, .. } => {
707 allowance
708 .is_publisher_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
709 }
710 AnnouncedServiceSrv { zenoh_key_expr, .. }
711 | RetiredServiceSrv { zenoh_key_expr, .. } => allowance
712 .is_service_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
713 AnnouncedServiceCli { zenoh_key_expr, .. }
714 | RetiredServiceCli { zenoh_key_expr, .. } => allowance
715 .is_service_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
716 AnnouncedActionSrv { zenoh_key_expr, .. }
717 | RetiredActionSrv { zenoh_key_expr, .. } => allowance
718 .is_action_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
719 AnnouncedActionCli { zenoh_key_expr, .. }
720 | RetiredActionCli { zenoh_key_expr, .. } => allowance
721 .is_action_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
722 }
723 } else {
724 true
726 }
727 }
728
729 async fn treat_admin_query(&self, query: &Query) {
730 let query_ke = query.key_expr();
731 if query_ke.is_wild() {
732 for (ke, admin_ref) in self.admin_space.iter() {
734 if query_ke.intersects(ke) {
735 self.send_admin_reply(query, ke, admin_ref).await;
736 }
737 }
738 } else {
739 let own_ke: OwnedKeyExpr = query_ke.to_owned().into();
741 if let Some(admin_ref) = self.admin_space.get(&own_ke) {
742 self.send_admin_reply(query, &own_ke, admin_ref).await;
743 }
744 }
745 }
746
747 async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
748 let z_bytes: ZBytes = match admin_ref {
749 AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
750 Ok(v) => match serde_json::to_vec(&v) {
751 Ok(bytes) => ZBytes::from(bytes),
752 Err(e) => {
753 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
754 return;
755 }
756 },
757 Err(e) => {
758 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
759 return;
760 }
761 },
762 AdminRef::Config => match serde_json::to_value(&*self.config) {
763 Ok(v) => match serde_json::to_vec(&v) {
764 Ok(bytes) => ZBytes::from(bytes),
765 Err(e) => {
766 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
767 return;
768 }
769 },
770 Err(e) => {
771 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
772 return;
773 }
774 },
775 };
776 if let Err(e) = query
777 .reply(key_expr.to_owned(), z_bytes)
778 .encoding(Encoding::APPLICATION_JSON)
779 .await
780 {
781 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
782 }
783 }
784}
785
786#[inline]
788pub fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
789 let mut me = ManuallyDrop::new(v);
790 (me.as_mut_ptr(), me.len(), me.capacity())
791}
792
793struct ChannelEvent {
794 tx: Sender<()>,
795}
796
797#[async_trait]
798impl Timed for ChannelEvent {
799 async fn run(&mut self) {
800 if self.tx.send(()).is_err() {
801 tracing::warn!("Error sending periodic timer notification on channel");
802 };
803 }
804}
805
806pub(crate) fn serialize_option_as_bool<S, T>(opt: &Option<T>, s: S) -> Result<S::Ok, S::Error>
807where
808 S: Serializer,
809{
810 s.serialize_bool(opt.is_some())
811}