1#![allow(deprecated)]
15
16use std::{
17 collections::HashMap,
18 env,
19 future::Future,
20 mem::ManuallyDrop,
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use async_trait::async_trait;
28use cyclors::*;
29use events::ROS2AnnouncementEvent;
30use flume::{unbounded, Receiver, Sender};
31use futures::select;
32use serde::Serializer;
33use tokio::task::JoinHandle;
34use zenoh::{
35 bytes::{Encoding, ZBytes},
36 internal::{
37 plugins::{RunningPlugin, RunningPluginTrait, ZenohPlugin},
38 runtime::DynamicRuntime,
39 zerror, Timed,
40 },
41 key_expr::{
42 format::{kedefine, keformat},
43 keyexpr, OwnedKeyExpr,
44 },
45 liveliness::LivelinessToken,
46 query::Query,
47 sample::SampleKind,
48 Result as ZResult, Session,
49};
50use zenoh_ext::SubscriberBuilderExt;
51use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
52
53pub mod config;
54mod dds_discovery;
55mod dds_types;
56mod dds_utils;
57mod discovered_entities;
58mod discovery_mgr;
59mod events;
60mod gid;
61mod liveliness_mgt;
62mod node_info;
63mod qos_helpers;
64mod ros2_utils;
65mod ros_discovery;
66mod route_action_cli;
67mod route_action_srv;
68mod route_publisher;
69mod route_service_cli;
70mod route_service_srv;
71mod route_subscriber;
72mod routes_mgr;
73use config::{Config, RosAutomaticDiscoveryRange};
74
75use crate::{
76 dds_utils::get_guid,
77 discovery_mgr::DiscoveryMgr,
78 events::ROS2DiscoveryEvent,
79 liveliness_mgt::*,
80 ros2_utils::{key_expr_to_ros2_name, ros_distro_is_less_than},
81 ros_discovery::RosDiscoveryInfoMgr,
82 routes_mgr::RoutesMgr,
83};
84
85lazy_static::lazy_static! {
86 static ref WORK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_WORK_THREAD_NUM);
87 static ref MAX_BLOCK_THREAD_NUM: AtomicUsize = AtomicUsize::new(config::DEFAULT_MAX_BLOCK_THREAD_NUM);
88 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
90 .worker_threads(WORK_THREAD_NUM.load(Ordering::SeqCst))
91 .max_blocking_threads(MAX_BLOCK_THREAD_NUM.load(Ordering::SeqCst))
92 .enable_all()
93 .build()
94 .expect("Unable to create runtime");
95}
96#[inline(always)]
97pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
98where
99 F: Future + Send + 'static,
100 F::Output: Send + 'static,
101{
102 match tokio::runtime::Handle::try_current() {
104 Ok(rt) => {
105 rt.spawn(task)
107 }
108 Err(_) => {
109 TOKIO_RUNTIME.spawn(task)
111 }
112 }
113}
114
115lazy_static::lazy_static!(
116
117
118 static ref LOG_PAYLOAD: bool = std::env::var("Z_LOG_PAYLOAD").is_ok();
119
120 static ref KE_ANY_1_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("*") };
121 static ref KE_ANY_N_SEGMENT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
122
123 static ref KE_PREFIX_PUB_CACHE: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@ros2_pub_cache") };
124);
125
126kedefine!(
127 pub ke_admin_version: "${plugin_status_key:**}/__version__",
129
130 pub ke_admin_prefix: "@/${zenoh_id:*}/ros2/",
132);
133
134const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#"<CycloneDDS><Domain><General>
138 <Interfaces><NetworkInterface address="127.0.0.1"/></Interfaces>
139 </General></Domain></CycloneDDS>,"#;
140
141#[cfg(feature = "dds_shm")]
143const CYCLONEDDS_CONFIG_ENABLE_SHM: &str = r#"<CycloneDDS><Domain><SharedMemory><Enable>true</Enable></SharedMemory></Domain></CycloneDDS>,"#;
144
145const ROS_DISCOVERY_INFO_POLL_INTERVAL_MS: u64 = 100;
147const ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS: u64 = 100;
148
149#[cfg(feature = "dynamic_plugin")]
150zenoh_plugin_trait::declare_plugin!(ROS2Plugin);
151
152#[allow(clippy::upper_case_acronyms)]
153pub struct ROS2Plugin;
154
155impl ZenohPlugin for ROS2Plugin {}
156impl Plugin for ROS2Plugin {
157 type StartArgs = DynamicRuntime;
158 type Instance = RunningPlugin;
159
160 const PLUGIN_VERSION: &'static str = plugin_version!();
161 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
162 const DEFAULT_NAME: &'static str = "ros2dds";
163
164 fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<RunningPlugin> {
165 zenoh::try_init_log_from_env();
169
170 let runtime_conf = runtime.get_config();
171 let plugin_conf = runtime_conf
172 .get_plugin_config(name)
173 .map_err(|_| zerror!("Plugin `{}`: missing config", name))?;
174 let config: Config = serde_json::from_value(plugin_conf.clone())
175 .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
176 WORK_THREAD_NUM.store(config.work_thread_num, Ordering::SeqCst);
177 MAX_BLOCK_THREAD_NUM.store(config.max_block_thread_num, Ordering::SeqCst);
178
179 spawn_runtime(run(runtime.clone(), config));
180
181 Ok(Box::new(ROS2Plugin))
182 }
183}
184impl PluginControl for ROS2Plugin {}
185impl RunningPluginTrait for ROS2Plugin {}
186
187fn create_cyclonedds_config(
188 ros_automatic_discovery_range: RosAutomaticDiscoveryRange,
189 ros_static_peers: Vec<String>,
190) -> String {
191 let mut config = String::new();
192 let add_localhost_as_static_peer: bool;
194 let add_static_peers: bool;
195 let disable_multicast: bool;
196 match ros_automatic_discovery_range {
197 RosAutomaticDiscoveryRange::Subnet => {
198 add_localhost_as_static_peer = false;
199 add_static_peers = true;
200 disable_multicast = false;
201 }
202 RosAutomaticDiscoveryRange::SystemDefault => {
203 add_localhost_as_static_peer = false;
204 add_static_peers = false;
205 disable_multicast = false;
206 }
207 RosAutomaticDiscoveryRange::Localhost => {
208 add_localhost_as_static_peer = true;
209 add_static_peers = true;
210 disable_multicast = true;
211 }
212 RosAutomaticDiscoveryRange::Off => {
213 add_localhost_as_static_peer = false;
214 add_static_peers = false;
215 disable_multicast = true;
216 }
217 };
218 if add_localhost_as_static_peer || add_static_peers || disable_multicast {
219 config += "<CycloneDDS><Domain>";
220
221 if disable_multicast {
222 config += "<General><AllowMulticast>false</AllowMulticast></General>";
223 }
224
225 let discovery_off = disable_multicast && !add_localhost_as_static_peer && !add_static_peers;
226 if discovery_off {
227 config += "<Discovery><ParticipantIndex>none</ParticipantIndex>";
228 config += &format!("<Tag>ros_discovery_off_{}</Tag>", std::process::id());
229 } else {
230 config += "<Discovery><ParticipantIndex>auto</ParticipantIndex>";
231 config += "<MaxAutoParticipantIndex>32</MaxAutoParticipantIndex>";
232 }
233
234 if (add_static_peers && !ros_static_peers.is_empty()) || add_localhost_as_static_peer {
235 config += "<Peers>";
236 if add_localhost_as_static_peer {
237 config += "<Peer address=\"localhost\"/>";
238 }
239 for peer in ros_static_peers {
240 config += "<Peer address=\"";
241 config += &peer;
242 config += "\"/>";
243 }
244 config += "</Peers>";
245 }
246
247 config += "</Discovery></Domain></CycloneDDS>,";
248 }
249 config
250}
251
252pub async fn run(runtime: DynamicRuntime, config: Config) {
253 zenoh::try_init_log_from_env();
257 tracing::debug!("ROS2 plugin {}", ROS2Plugin::PLUGIN_VERSION);
258 tracing::info!("ROS2 plugin {config:?}");
259
260 if !regex::Regex::new("/[A-Za-z0-9_/]*")
262 .unwrap()
263 .is_match(&config.namespace)
264 {
265 tracing::error!(
266 r#"Configuration error: invalid namespace "{}" must contain only alphanumeric, '_' or '/' characters and start with '/'"#,
267 config.namespace
268 );
269 return;
270 }
271 if !regex::Regex::new("[A-Za-z0-9_]+")
272 .unwrap()
273 .is_match(&config.nodename)
274 {
275 tracing::error!(
276 r#"Configuration error: invalid nodename "{}" must contain only alphanumeric or '_' characters"#,
277 config.nodename
278 );
279 return;
280 }
281
282 let zsession = match zenoh::session::init(runtime).await {
284 Ok(session) => Arc::new(session),
285 Err(e) => {
286 tracing::error!("Unable to init zenoh session for DDS plugin : {:?}", e);
287 return;
288 }
289 };
290
291 let ke_liveliness = keformat!(
293 ke_liveliness_plugin::formatter(),
294 zenoh_id = zsession.zid().into_keyexpr()
295 )
296 .unwrap();
297 let member = match zsession.liveliness().declare_token(ke_liveliness).await {
298 Ok(member) => member,
299 Err(e) => {
300 tracing::error!(
301 "Unable to declare liveliness token for DDS plugin : {:?}",
302 e
303 );
304 return;
305 }
306 };
307
308 if ros_distro_is_less_than("iron") {
311 if config.ros_automatic_discovery_range.is_some() {
312 tracing::warn!("ROS_AUTOMATIC_DISCOVERY_RANGE will be ignored since it's not supported before ROS 2 Iron");
313 }
314 if config.ros_static_peers.is_some() {
315 tracing::warn!(
316 "ROS_STATIC_PEERS will be ignored since it's not supported before ROS 2 Iron"
317 );
318 }
319 if config.ros_localhost_only {
321 env::set_var(
322 "CYCLONEDDS_URI",
323 format!(
324 "{}{}",
325 CYCLONEDDS_CONFIG_LOCALHOST_ONLY,
326 env::var("CYCLONEDDS_URI").unwrap_or_default()
327 ),
328 );
329 }
330 } else {
331 let (ros_automatic_discovery_range, ros_static_peers) = if config.ros_localhost_only {
332 tracing::warn!("ROS_LOCALHOST_ONLY is deprecated but still honored if it is enabled. Use ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS instead.");
335 tracing::warn!("'localhost_only' is enabled, 'automatic_discovery_range' and 'static_peers' will be ignored.");
336 (Some(RosAutomaticDiscoveryRange::Localhost), None)
337 } else {
338 (
339 config.ros_automatic_discovery_range,
340 config.ros_static_peers.clone(),
341 )
342 };
343 env::set_var(
344 "CYCLONEDDS_URI",
345 format!(
346 "{}{}",
347 create_cyclonedds_config(
348 ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
349 ros_static_peers.unwrap_or(Vec::new())
350 ),
351 env::var("CYCLONEDDS_URI").unwrap_or_default()
352 ),
353 );
354 }
355
356 #[cfg(feature = "dds_shm")]
358 {
359 if config.shm_enabled {
360 env::set_var(
361 "CYCLONEDDS_URI",
362 format!(
363 "{}{}",
364 CYCLONEDDS_CONFIG_ENABLE_SHM,
365 env::var("CYCLONEDDS_URI").unwrap_or_default()
366 ),
367 );
368 }
369 }
370
371 tracing::debug!(
373 "Create DDS Participant on domain {} with CYCLONEDDS_URI='{}'",
374 config.domain,
375 env::var("CYCLONEDDS_URI").unwrap_or_default()
376 );
377 let participant =
378 unsafe { dds_create_participant(config.domain, std::ptr::null(), std::ptr::null()) };
379 tracing::debug!(
380 "ROS2 plugin {} using DDS Participant {} created",
381 zsession.zid(),
382 get_guid(&participant).unwrap()
383 );
384
385 let mut ros2_plugin = ROS2PluginRuntime {
386 config: Arc::new(config),
387 zsession,
388 participant,
389 _member: member,
390 admin_space: HashMap::<OwnedKeyExpr, AdminRef>::new(),
391 };
392
393 ros2_plugin.run().await;
394}
395
396pub struct ROS2PluginRuntime {
397 config: Arc<Config>,
398 zsession: Arc<Session>,
401 participant: dds_entity_t,
402 _member: LivelinessToken,
403 admin_space: HashMap<OwnedKeyExpr, AdminRef>,
406}
407
408#[derive(Debug)]
410enum AdminRef {
411 Config,
412 Version,
413}
414
415impl ROS2PluginRuntime {
416 async fn run(&mut self) {
417 let ke_liveliness_all = keformat!(
419 ke_liveliness_all::formatter(),
420 zenoh_id = "*",
421 remaining = "**"
422 )
423 .unwrap();
424 let liveliness_subscriber = self
425 .zsession
426 .liveliness()
427 .declare_subscriber(ke_liveliness_all)
428 .querying()
429 .with(flume::unbounded())
430 .await
431 .expect("Failed to create Liveliness Subscriber");
432
433 let admin_prefix = keformat!(
435 ke_admin_prefix::formatter(),
436 zenoh_id = &self.zsession.zid().into_keyexpr()
437 )
438 .unwrap();
439 let admin_keyexpr_expr = (&admin_prefix) / *KE_ANY_N_SEGMENT;
440 tracing::debug!("Declare admin space on {}", admin_keyexpr_expr);
441 let admin_queryable = self
442 .zsession
443 .declare_queryable(admin_keyexpr_expr)
444 .await
445 .expect("Failed to create AdminSpace queryable");
446
447 self.admin_space.insert(
449 &admin_prefix / unsafe { keyexpr::from_str_unchecked("config") },
450 AdminRef::Config,
451 );
452 self.admin_space.insert(
453 &admin_prefix / unsafe { keyexpr::from_str_unchecked("version") },
454 AdminRef::Version,
455 );
456
457 let ros_discovery_mgr = Arc::new(
459 RosDiscoveryInfoMgr::new(
460 self.participant,
461 &self.config.namespace,
462 &self.config.nodename,
463 )
464 .expect("Failed to create RosDiscoveryInfoMgr"),
465 );
466 ros_discovery_mgr.run().await;
467
468 let (tx, discovery_rcv): (Sender<ROS2DiscoveryEvent>, Receiver<ROS2DiscoveryEvent>) =
470 unbounded();
471 let mut discovery_mgr = DiscoveryMgr::create(self.participant, ros_discovery_mgr.clone());
472 discovery_mgr.run(tx).await;
473
474 let mut routes_mgr = RoutesMgr::new(
476 self.config.clone(),
477 self.zsession.clone(),
478 self.participant,
479 discovery_mgr.discovered_entities.clone(),
480 ros_discovery_mgr,
481 admin_prefix.clone(),
482 );
483
484 loop {
485 select!(
486 evt = discovery_rcv.recv_async() => {
487 match evt {
488 Ok(evt) => {
489 if self.is_allowed(&evt) {
490 tracing::info!("{evt} - Allowed");
491 if let Err(e) = routes_mgr.on_ros_discovery_event(evt).await {
493 tracing::warn!("Error updating route: {e}");
494 }
495 } else {
496 tracing::debug!("{evt} - Denied per config");
497 }
498 }
499 Err(e) => tracing::error!("Internal Error: received from DiscoveryMgr: {e}")
500 }
501 },
502
503 liveliness_event = liveliness_subscriber.recv_async() => {
504 match liveliness_event
505 {
506 Ok(evt) => {
507 let ke = evt.key_expr().as_keyexpr();
508 if let Ok(parsed) = ke_liveliness_all::parse(ke) {
509 let zenoh_id = parsed.zenoh_id();
510 if zenoh_id == &*self.zsession.zid().into_keyexpr() {
511 continue;
513 }
514 match (parsed.remaining(), evt.kind()) {
515 (None, SampleKind::Put) => {
517 tracing::info!("New ROS 2 bridge detected: {}", zenoh_id);
518 }
519 (None, SampleKind::Delete) => tracing::info!("Remote ROS 2 bridge left: {}", zenoh_id),
521 (Some(remaining), _) => {
523 match self.parse_announcement_event(ke, &remaining.as_str()[..3], evt.kind()) {
525 Ok(evt) => {
526 if self.is_announcement_allowed(&evt) {
527 tracing::info!("Remote bridge {zenoh_id} {evt} - Allowed");
528 routes_mgr.on_ros_announcement_event(evt).await
529 .unwrap_or_else(|e| tracing::warn!("Error treating announcement event: {e}"));
530 } else {
531 tracing::debug!("Remote bridge {zenoh_id} {evt} - Matching entity denied per config");
532 }
533 },
534 Err(e) =>
535 tracing::warn!("Received unexpected liveliness key expression '{ke}': {e}")
536 }
537 }
538 }
539 } else {
540 tracing::warn!("Received unexpected liveliness key expression '{ke}'");
541 }
542 },
543 Err(e) => tracing::warn!("Error receiving liveliness event: {e}")
544 }
545 },
546
547 get_request = admin_queryable.recv_async() => {
548 if let Ok(query) = get_request {
549 self.treat_admin_query(&query).await;
550 discovery_mgr.treat_admin_query(&query, &admin_prefix);
552 routes_mgr.treat_admin_query(&query).await;
554 } else {
555 tracing::warn!("AdminSpace queryable was closed!");
556 }
557 }
558 )
559 }
560 }
561
562 fn parse_announcement_event(
563 &self,
564 liveliness_ke: &keyexpr,
565 iface_kind: &str,
566 sample_kind: SampleKind,
567 ) -> Result<ROS2AnnouncementEvent, String> {
568 use ROS2AnnouncementEvent::*;
569 tracing::debug!("Received liveliness event: {sample_kind} on {liveliness_ke}");
570 match (iface_kind, sample_kind) {
571 ("MP/", SampleKind::Put) => parse_ke_liveliness_pub(liveliness_ke)
572 .map_err(|e| format!("Received invalid liveliness token: {e}"))
573 .map(
574 |(zenoh_id, zenoh_key_expr, ros2_type, keyless, writer_qos)| AnnouncedMsgPub {
575 zenoh_id,
576 zenoh_key_expr,
577 ros2_type,
578 keyless,
579 writer_qos,
580 },
581 ),
582 ("MP/", SampleKind::Delete) => parse_ke_liveliness_pub(liveliness_ke)
583 .map_err(|e| format!("Received invalid liveliness token: {e}"))
584 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgPub {
585 zenoh_id,
586 zenoh_key_expr,
587 }),
588 ("MS/", SampleKind::Put) => parse_ke_liveliness_sub(liveliness_ke)
589 .map_err(|e| format!("Received invalid liveliness token: {e}"))
590 .map(
591 |(zenoh_id, zenoh_key_expr, ros2_type, keyless, reader_qos)| AnnouncedMsgSub {
592 zenoh_id,
593 zenoh_key_expr,
594 ros2_type,
595 keyless,
596 reader_qos,
597 },
598 ),
599 ("MS/", SampleKind::Delete) => parse_ke_liveliness_sub(liveliness_ke)
600 .map_err(|e| format!("Received invalid liveliness token: {e}"))
601 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredMsgSub {
602 zenoh_id,
603 zenoh_key_expr,
604 }),
605 ("SS/", SampleKind::Put) => parse_ke_liveliness_service_srv(liveliness_ke)
606 .map_err(|e| format!("Received invalid liveliness token: {e}"))
607 .map(
608 |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceSrv {
609 zenoh_id,
610 zenoh_key_expr,
611 ros2_type,
612 },
613 ),
614 ("SS/", SampleKind::Delete) => parse_ke_liveliness_service_srv(liveliness_ke)
615 .map_err(|e| format!("Received invalid liveliness token: {e}"))
616 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceSrv {
617 zenoh_id,
618 zenoh_key_expr,
619 }),
620 ("SC/", SampleKind::Put) => parse_ke_liveliness_service_cli(liveliness_ke)
621 .map_err(|e| format!("Received invalid liveliness token: {e}"))
622 .map(
623 |(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedServiceCli {
624 zenoh_id,
625 zenoh_key_expr,
626 ros2_type,
627 },
628 ),
629 ("SC/", SampleKind::Delete) => parse_ke_liveliness_service_cli(liveliness_ke)
630 .map_err(|e| format!("Received invalid liveliness token: {e}"))
631 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredServiceCli {
632 zenoh_id,
633 zenoh_key_expr,
634 }),
635 ("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke)
636 .map_err(|e| format!("Received invalid liveliness token: {e}"))
637 .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv {
638 zenoh_id,
639 zenoh_key_expr,
640 ros2_type,
641 }),
642 ("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke)
643 .map_err(|e| format!("Received invalid liveliness token: {e}"))
644 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionSrv {
645 zenoh_id,
646 zenoh_key_expr,
647 }),
648 ("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke)
649 .map_err(|e| format!("Received invalid liveliness token: {e}"))
650 .map(|(zenoh_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli {
651 zenoh_id,
652 zenoh_key_expr,
653 ros2_type,
654 }),
655 ("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke)
656 .map_err(|e| format!("Received invalid liveliness token: {e}"))
657 .map(|(zenoh_id, zenoh_key_expr, ..)| RetiredActionCli {
658 zenoh_id,
659 zenoh_key_expr,
660 }),
661 _ => Err(format!("invalid ROS2 interface kind: {iface_kind}")),
662 }
663 }
664
665 fn is_allowed(&self, evt: &ROS2DiscoveryEvent) -> bool {
666 if let Some(allowance) = &self.config.allowance {
667 use ROS2DiscoveryEvent::*;
668 match evt {
669 DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => {
670 allowance.is_publisher_allowed(&iface.name)
671 }
672 DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => {
673 allowance.is_subscriber_allowed(&iface.name)
674 }
675 DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => {
676 allowance.is_service_srv_allowed(&iface.name)
677 }
678 DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => {
679 allowance.is_service_cli_allowed(&iface.name)
680 }
681 DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => {
682 allowance.is_action_srv_allowed(&iface.name)
683 }
684 DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => {
685 allowance.is_action_cli_allowed(&iface.name)
686 }
687 }
688 } else {
689 true
691 }
692 }
693
694 fn is_announcement_allowed(&self, evt: &ROS2AnnouncementEvent) -> bool {
697 if let Some(allowance) = &self.config.allowance {
698 use ROS2AnnouncementEvent::*;
699 match evt {
700 AnnouncedMsgPub { zenoh_key_expr, .. } | RetiredMsgPub { zenoh_key_expr, .. } => {
701 allowance
702 .is_subscriber_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
703 }
704 AnnouncedMsgSub { zenoh_key_expr, .. } | RetiredMsgSub { zenoh_key_expr, .. } => {
705 allowance
706 .is_publisher_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config))
707 }
708 AnnouncedServiceSrv { zenoh_key_expr, .. }
709 | RetiredServiceSrv { zenoh_key_expr, .. } => allowance
710 .is_service_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
711 AnnouncedServiceCli { zenoh_key_expr, .. }
712 | RetiredServiceCli { zenoh_key_expr, .. } => allowance
713 .is_service_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
714 AnnouncedActionSrv { zenoh_key_expr, .. }
715 | RetiredActionSrv { zenoh_key_expr, .. } => allowance
716 .is_action_cli_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
717 AnnouncedActionCli { zenoh_key_expr, .. }
718 | RetiredActionCli { zenoh_key_expr, .. } => allowance
719 .is_action_srv_allowed(&key_expr_to_ros2_name(zenoh_key_expr, &self.config)),
720 }
721 } else {
722 true
724 }
725 }
726
727 async fn treat_admin_query(&self, query: &Query) {
728 let query_ke = query.key_expr();
729 if query_ke.is_wild() {
730 for (ke, admin_ref) in self.admin_space.iter() {
732 if query_ke.intersects(ke) {
733 self.send_admin_reply(query, ke, admin_ref).await;
734 }
735 }
736 } else {
737 let own_ke: OwnedKeyExpr = query_ke.to_owned().into();
739 if let Some(admin_ref) = self.admin_space.get(&own_ke) {
740 self.send_admin_reply(query, &own_ke, admin_ref).await;
741 }
742 }
743 }
744
745 async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
746 let z_bytes: ZBytes = match admin_ref {
747 AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
748 Ok(v) => match serde_json::to_vec(&v) {
749 Ok(bytes) => ZBytes::from(bytes),
750 Err(e) => {
751 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
752 return;
753 }
754 },
755 Err(e) => {
756 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
757 return;
758 }
759 },
760 AdminRef::Config => match serde_json::to_value(&*self.config) {
761 Ok(v) => match serde_json::to_vec(&v) {
762 Ok(bytes) => ZBytes::from(bytes),
763 Err(e) => {
764 tracing::warn!("Error transforming JSON to ZBytes: {}", e);
765 return;
766 }
767 },
768 Err(e) => {
769 tracing::error!("INTERNAL ERROR serializing config as JSON: {}", e);
770 return;
771 }
772 },
773 };
774 if let Err(e) = query
775 .reply(key_expr.to_owned(), z_bytes)
776 .encoding(Encoding::APPLICATION_JSON)
777 .await
778 {
779 tracing::warn!("Error replying to admin query {:?}: {}", query, e);
780 }
781 }
782}
783
784#[inline]
786pub fn vec_into_raw_parts<T>(v: Vec<T>) -> (*mut T, usize, usize) {
787 let mut me = ManuallyDrop::new(v);
788 (me.as_mut_ptr(), me.len(), me.capacity())
789}
790
791struct ChannelEvent {
792 tx: Sender<()>,
793}
794
795#[async_trait]
796impl Timed for ChannelEvent {
797 async fn run(&mut self) {
798 if self.tx.send(()).is_err() {
799 tracing::warn!("Error sending periodic timer notification on channel");
800 };
801 }
802}
803
804pub(crate) fn serialize_option_as_bool<S, T>(opt: &Option<T>, s: S) -> Result<S::Ok, S::Error>
805where
806 S: Serializer,
807{
808 s.serialize_bool(opt.is_some())
809}