1use zenoh::{key_expr::KeyExpr, session::ZenohId, Result};
10
11use crate::{
12 entity::{
13 EndpointEntity, Entity, EntityConversionError, EntityKind, LivelinessKE, NodeEntity,
14 TopicKE, TypeHash, TypeInfo,
15 },
16 qos::QosProfile,
17};
18
19use super::KeyExprFormatter;
20
21pub const EMPTY_PLACEHOLDER: &str = "%";
23pub const EMPTY_TOPIC_TYPE: &str = "EMPTY_TOPIC_TYPE";
25pub const EMPTY_TOPIC_HASH: &str = "EMPTY_TOPIC_HASH";
27
28pub struct RmwZenohFormatter;
30
31impl KeyExprFormatter for RmwZenohFormatter {
32 const ESCAPE_CHAR: char = '%';
33 const ADMIN_SPACE: &'static str = "@ros2_lv";
34
35 fn topic_key_expr(entity: &EndpointEntity) -> Result<TopicKE> {
36 let EndpointEntity {
37 node: Some(node),
38 topic,
39 type_info,
40 ..
41 } = entity
42 else {
43 return Err(zenoh::Error::from(
44 "rmw-zenoh endpoint keys require node identity",
45 ));
46 };
47 let domain_id = node.domain_id;
48 let topic = {
49 let s = topic.as_str();
50 let s = s.strip_prefix('/').unwrap_or(s);
51 let s = s.strip_suffix('/').unwrap_or(s);
52
53 s.to_string()
62 };
63
64 let type_info =
65 type_info
66 .as_ref()
67 .map_or(format!("{EMPTY_TOPIC_TYPE}/{EMPTY_TOPIC_HASH}"), |x| {
68 let type_name = Self::demangle_name(&x.name);
69 let type_hash = Self::demangle_name(&x.hash.to_string());
70 format!("{type_name}/{type_hash}")
71 });
72
73 Ok(TopicKE::new(
74 format!("{domain_id}/{topic}/{type_info}").try_into()?,
75 ))
76 }
77
78 fn liveliness_key_expr(entity: &EndpointEntity, _zid: &ZenohId) -> Result<LivelinessKE> {
79 let EndpointEntity {
80 id,
81 node:
82 Some(NodeEntity {
83 domain_id,
84 z_id,
85 id: node_id,
86 name: node_name,
87 namespace: node_namespace,
88 enclave: _,
89 }),
90 kind,
91 topic: topic_name,
92 type_info,
93 qos,
94 } = entity
95 else {
96 return Err(zenoh::Error::from(
97 "rmw-zenoh liveliness requires node identity",
98 ));
99 };
100
101 let node_namespace = if node_namespace.is_empty() {
102 EMPTY_PLACEHOLDER.to_string()
103 } else {
104 Self::mangle_name(node_namespace)
105 };
106 let node_name = Self::mangle_name(node_name);
107
108 let topic_name = {
110 let s = topic_name.strip_suffix('/').unwrap_or(topic_name);
111 Self::mangle_name(s)
112 };
113
114 let type_info_str = type_info
115 .as_ref()
116 .map_or(format!("{EMPTY_TOPIC_TYPE}/{EMPTY_TOPIC_HASH}"), |x| {
117 format!("{}/{}", Self::mangle_name(&x.name), x.hash.to_rihs_string())
118 });
119
120 let qos_str = qos.encode();
121
122 let ke = format!(
123 "{}/{domain_id}/{z_id}/{node_id}/{id}/{kind}/{EMPTY_PLACEHOLDER}/{node_namespace}/{node_name}/{topic_name}/{type_info_str}/{qos_str}",
124 Self::ADMIN_SPACE
125 );
126
127 Ok(LivelinessKE::new(ke.try_into()?))
128 }
129
130 fn node_liveliness_key_expr(entity: &NodeEntity) -> Result<LivelinessKE> {
131 let NodeEntity {
132 domain_id,
133 z_id,
134 id,
135 name,
136 namespace,
137 enclave: _,
138 } = entity;
139
140 let namespace = if namespace.is_empty() {
141 EMPTY_PLACEHOLDER
142 } else {
143 &Self::mangle_name(namespace)
144 };
145 let name = Self::mangle_name(name);
146
147 Ok(LivelinessKE::new(
148 format!(
149 "{}/{domain_id}/{z_id}/{id}/{id}/NN/{EMPTY_PLACEHOLDER}/{namespace}/{name}",
150 Self::ADMIN_SPACE
151 )
152 .try_into()?,
153 ))
154 }
155
156 fn parse_liveliness(ke: &KeyExpr) -> Result<Entity> {
157 use EntityConversionError::*;
158
159 let mut iter = ke.split('/');
160
161 let admin = iter.next().ok_or(MissingAdminSpace)?;
163 if admin != Self::ADMIN_SPACE {
164 return Err(zenoh::Error::from(MissingAdminSpace));
165 }
166
167 let domain_id = iter
168 .next()
169 .ok_or(MissingDomainId)?
170 .parse()
171 .map_err(|_| ParsingError)?;
172 let z_id = iter
173 .next()
174 .ok_or(MissingZId)?
175 .parse()
176 .map_err(|_| ParsingError)?;
177 let node_id = iter
178 .next()
179 .ok_or(MissingNodeId)?
180 .parse()
181 .map_err(|_| ParsingError)?;
182 let entity_id = iter
183 .next()
184 .ok_or(MissingEntityId)?
185 .parse()
186 .map_err(|_| ParsingError)?;
187 let entity_kind: EntityKind = iter
188 .next()
189 .ok_or(MissingEntityKind)?
190 .parse()
191 .map_err(|_| ParsingError)?;
192
193 let enclave = match iter.next().ok_or(MissingEnclave)? {
195 EMPTY_PLACEHOLDER => String::new(),
196 x => Self::demangle_name(x),
197 };
198
199 let namespace = match iter.next().ok_or(MissingNamespace)? {
200 EMPTY_PLACEHOLDER => String::new(),
201 x => Self::demangle_name(x),
202 };
203 let node_name = Self::demangle_name(iter.next().ok_or(MissingNodeName)?);
204
205 let node = NodeEntity {
206 id: node_id,
207 domain_id,
208 z_id,
209 name: node_name,
210 namespace,
211 enclave,
212 };
213
214 Ok(match entity_kind {
215 EntityKind::Node => Entity::Node(node),
216 EntityKind::Endpoint(kind) => {
217 let topic_name = Self::demangle_name(iter.next().ok_or(MissingTopicName)?);
218 let topic_type = iter.next().ok_or(MissingTopicType)?;
219 let topic_hash = iter.next().ok_or(MissingTopicHash)?;
220
221 let type_info = match (topic_type, topic_hash) {
222 (EMPTY_TOPIC_TYPE, EMPTY_TOPIC_HASH) => None,
223 (EMPTY_TOPIC_TYPE, _) | (_, EMPTY_TOPIC_HASH) => None,
224 (topic_type, topic_hash) => {
225 let type_hash = TypeHash::from_rihs_string(topic_hash)
226 .unwrap_or(TypeHash::new(0, [0u8; 32]));
227 Some(TypeInfo {
228 name: Self::demangle_name(topic_type),
229 hash: type_hash,
230 })
231 }
232 };
233
234 let qos = QosProfile::decode(iter.next().ok_or(MissingTopicQoS)?)
235 .map_err(QosDecodeError)?;
236
237 Entity::Endpoint(EndpointEntity {
238 id: entity_id,
239 node: Some(node),
240 kind,
241 topic: topic_name,
242 type_info,
243 qos,
244 })
245 }
246 })
247 }
248
249 fn encode_qos(qos: &QosProfile, _keyless: bool) -> String {
250 qos.encode()
251 }
252
253 fn decode_qos(s: &str) -> Result<(bool, QosProfile)> {
254 let qos = QosProfile::decode(s)
255 .map_err(|e| zenoh::Error::from(format!("QoS decode error: {:?}", e)))?;
256 Ok((false, qos))
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::entity::{EndpointEntity, EndpointKind, NodeEntity, TypeInfo};
264 use crate::qos::{QosDurability, QosHistory, QosProfile, QosReliability};
265
266 #[test]
267 fn test_mangle_demangle() {
268 assert_eq!(RmwZenohFormatter::mangle_name("/chatter"), "%chatter");
269 assert_eq!(
270 RmwZenohFormatter::mangle_name("std_msgs/msg/String"),
271 "std_msgs%msg%String"
272 );
273 assert_eq!(
274 RmwZenohFormatter::demangle_name("std_msgs%msg%String"),
275 "std_msgs/msg/String"
276 );
277 }
278
279 #[test]
280 fn test_qos_encode_decode() {
281 let qos = QosProfile::default();
282 let encoded = RmwZenohFormatter::encode_qos(&qos, false);
283
284 let (keyless, decoded) = RmwZenohFormatter::decode_qos(&encoded).unwrap();
285 assert!(!keyless);
286 assert_eq!(decoded.reliability, qos.reliability);
287 assert_eq!(decoded.durability, qos.durability);
288 }
289
290 #[test]
291 fn test_qos_reliable_transient() {
292 let qos = QosProfile {
293 reliability: QosReliability::Reliable,
294 durability: QosDurability::TransientLocal,
295 history: QosHistory::from_depth(10),
296 };
297 let encoded = RmwZenohFormatter::encode_qos(&qos, false);
298
299 let (keyless, decoded) = RmwZenohFormatter::decode_qos(&encoded).unwrap();
300 assert!(!keyless);
301 assert_eq!(decoded.reliability, QosReliability::Reliable);
302 assert_eq!(decoded.durability, QosDurability::TransientLocal);
303 }
304
305 #[test]
309 fn test_topic_key_expr_format() {
310 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
311 let node = NodeEntity::new(
312 0,
313 zid,
314 0,
315 "test_node".to_string(),
316 "/".to_string(),
317 String::new(),
318 );
319
320 let entity = EndpointEntity {
321 id: 1,
322 node: Some(node),
323 kind: EndpointKind::Publisher,
324 topic: "chatter".to_string(),
325 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
326 qos: QosProfile::default(),
327 };
328
329 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
330 let ke_str = topic_ke.as_str();
331
332 assert!(
334 ke_str.starts_with("0/"),
335 "Should start with domain ID '0/', got: {}",
336 ke_str
337 );
338 assert!(
339 ke_str.contains("/chatter/"),
340 "Should contain '/chatter/', got: {}",
341 ke_str
342 );
343 assert!(
345 ke_str.contains("std_msgs/msg/String"),
346 "Should contain type name, got: {}",
347 ke_str
348 );
349 }
350
351 #[test]
355 fn test_liveliness_key_expr_format() {
356 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
357 let node = NodeEntity::new(
358 0,
359 zid,
360 0,
361 "test_node".to_string(),
362 "/".to_string(),
363 String::new(),
364 );
365
366 let entity = EndpointEntity {
367 id: 1,
368 node: Some(node),
369 kind: EndpointKind::Publisher,
370 topic: "chatter".to_string(),
371 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
372 qos: QosProfile::default(),
373 };
374
375 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
376 let ke_str = liveliness_ke.as_str();
377
378 assert!(
380 ke_str.starts_with("@ros2_lv/"),
381 "Should start with '@ros2_lv/', got: {}",
382 ke_str
383 );
384
385 assert!(
387 ke_str.contains("/0/"),
388 "Should contain domain '/0/', got: {}",
389 ke_str
390 );
391
392 assert!(
394 ke_str.contains("/MP/"),
395 "Should contain '/MP/' for Publisher, got: {}",
396 ke_str
397 );
398
399 assert!(
401 ke_str.contains("/test_node/"),
402 "Should contain '/test_node/', got: {}",
403 ke_str
404 );
405
406 assert!(
408 ke_str.contains("/chatter/"),
409 "Should contain '/chatter/', got: {}",
410 ke_str
411 );
412
413 assert!(
415 ke_str.contains("std_msgs%msg%String"),
416 "Should contain 'std_msgs%msg%String', got: {}",
417 ke_str
418 );
419 }
420
421 #[test]
423 fn test_subscriber_liveliness_key_expr() {
424 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
425 let node = NodeEntity::new(
426 0,
427 zid,
428 0,
429 "test_node".to_string(),
430 "/".to_string(),
431 String::new(),
432 );
433
434 let entity = EndpointEntity {
435 id: 1,
436 node: Some(node),
437 kind: EndpointKind::Subscription,
438 topic: "chatter".to_string(),
439 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
440 qos: QosProfile::default(),
441 };
442
443 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
444 let ke_str = liveliness_ke.as_str();
445
446 assert!(
448 ke_str.contains("/MS/"),
449 "Should contain '/MS/' for Subscription, got: {}",
450 ke_str
451 );
452 }
453
454 #[test]
456 fn test_service_liveliness_key_expr() {
457 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
458 let node = NodeEntity::new(
459 0,
460 zid,
461 0,
462 "test_node".to_string(),
463 "/".to_string(),
464 String::new(),
465 );
466
467 let entity = EndpointEntity {
468 id: 1,
469 node: Some(node),
470 kind: EndpointKind::Service,
471 topic: "add_two_ints".to_string(),
472 type_info: Some(TypeInfo::new(
473 "example_interfaces/srv/AddTwoInts",
474 TypeHash::zero(),
475 )),
476 qos: QosProfile::default(),
477 };
478
479 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
480 let ke_str = liveliness_ke.as_str();
481
482 assert!(
484 ke_str.contains("/SS/"),
485 "Should contain '/SS/' for Service, got: {}",
486 ke_str
487 );
488 }
489
490 #[test]
492 fn test_client_liveliness_key_expr() {
493 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
494 let node = NodeEntity::new(
495 0,
496 zid,
497 0,
498 "test_node".to_string(),
499 "/".to_string(),
500 String::new(),
501 );
502
503 let entity = EndpointEntity {
504 id: 1,
505 node: Some(node),
506 kind: EndpointKind::Client,
507 topic: "add_two_ints".to_string(),
508 type_info: Some(TypeInfo::new(
509 "example_interfaces/srv/AddTwoInts",
510 TypeHash::zero(),
511 )),
512 qos: QosProfile::default(),
513 };
514
515 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
516 let ke_str = liveliness_ke.as_str();
517
518 assert!(
520 ke_str.contains("/SC/"),
521 "Should contain '/SC/' for Client, got: {}",
522 ke_str
523 );
524 }
525
526 #[test]
528 fn test_node_liveliness_key_expr() {
529 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
530 let node = NodeEntity::new(
531 0,
532 zid,
533 0,
534 "test_node".to_string(),
535 "/my_namespace".to_string(),
536 String::new(),
537 );
538
539 let liveliness_ke = RmwZenohFormatter::node_liveliness_key_expr(&node).unwrap();
540 let ke_str = liveliness_ke.as_str();
541
542 assert!(
544 ke_str.starts_with("@ros2_lv/"),
545 "Should start with '@ros2_lv/', got: {}",
546 ke_str
547 );
548
549 assert!(
551 ke_str.contains("/NN/"),
552 "Should contain '/NN/' for Node, got: {}",
553 ke_str
554 );
555
556 assert!(
558 ke_str.contains("/test_node"),
559 "Should contain '/test_node', got: {}",
560 ke_str
561 );
562 }
563
564 #[test]
575 fn test_service_topic_key_expr_preserves_slashes() {
576 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
577 let node = NodeEntity::new(
578 0,
579 zid,
580 0,
581 "talker".to_string(),
582 "/".to_string(),
583 String::new(),
584 );
585
586 let entity = EndpointEntity {
588 id: 10,
589 node: Some(node),
590 kind: EndpointKind::Service,
591 topic: "/talker/get_type_description".to_string(),
592 type_info: Some(TypeInfo::new(
593 "type_description_interfaces::srv::dds_::GetTypeDescription_",
594 TypeHash::zero(),
595 )),
596 qos: QosProfile::default(),
597 };
598
599 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
600 let ke_str = topic_ke.as_str();
601
602 assert!(
605 ke_str.starts_with("0/talker/get_type_description/"),
606 "Service topic key expr should preserve internal slashes (strip_slashes behavior), got: {}",
607 ke_str
608 );
609 assert!(
610 !ke_str.contains("%"),
611 "Service topic key expr should NOT contain mangled slashes, got: {}",
612 ke_str
613 );
614 }
615
616 #[test]
618 fn test_client_topic_key_expr_preserves_slashes() {
619 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
620 let node = NodeEntity::new(
621 0,
622 zid,
623 0,
624 "test_client".to_string(),
625 "/".to_string(),
626 String::new(),
627 );
628
629 let entity = EndpointEntity {
630 id: 11,
631 node: Some(node),
632 kind: EndpointKind::Client,
633 topic: "/my_service/sub_service/action".to_string(),
634 type_info: Some(TypeInfo::new(
635 "example_interfaces/srv/AddTwoInts",
636 TypeHash::zero(),
637 )),
638 qos: QosProfile::default(),
639 };
640
641 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
642 let ke_str = topic_ke.as_str();
643
644 assert!(
645 ke_str.starts_with("0/my_service/sub_service/action/"),
646 "Client topic key expr should preserve internal slashes, got: {}",
647 ke_str
648 );
649 }
650
651 #[test]
656 fn test_publisher_topic_key_expr_preserves_slashes() {
657 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
658 let node = NodeEntity::new(
659 0,
660 zid,
661 0,
662 "test_pub".to_string(),
663 "/".to_string(),
664 String::new(),
665 );
666
667 let entity = EndpointEntity {
668 id: 1,
669 node: Some(node),
670 kind: EndpointKind::Publisher,
671 topic: "/ns/topic".to_string(),
672 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
673 qos: QosProfile::default(),
674 };
675
676 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
677 let ke_str = topic_ke.as_str();
678
679 assert!(
682 ke_str.contains("ns/topic"),
683 "Publisher topic key expr should preserve internal slashes (strip_slashes behavior), got: {}",
684 ke_str
685 );
686 assert!(
687 !ke_str.contains("%"),
688 "Publisher topic key expr should NOT mangle slashes, got: {}",
689 ke_str
690 );
691 }
692
693 #[test]
697 fn test_subscription_topic_key_expr_preserves_slashes() {
698 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
699 let node = NodeEntity::new(
700 0,
701 zid,
702 0,
703 "test_sub".to_string(),
704 "/".to_string(),
705 String::new(),
706 );
707
708 let entity = EndpointEntity {
709 id: 2,
710 node: Some(node),
711 kind: EndpointKind::Subscription,
712 topic: "/robot/sensor/data".to_string(),
713 type_info: Some(TypeInfo::new("sensor_msgs/msg/Image", TypeHash::zero())),
714 qos: QosProfile::default(),
715 };
716
717 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
718 let ke_str = topic_ke.as_str();
719
720 assert!(
723 ke_str.contains("robot/sensor/data"),
724 "Subscription topic key expr should preserve internal slashes (strip_slashes behavior), got: {}",
725 ke_str
726 );
727 assert!(
728 !ke_str.contains("%"),
729 "Subscription topic key expr should NOT mangle slashes, got: {}",
730 ke_str
731 );
732 }
733
734 #[test]
738 fn test_action_topic_key_expr_preserves_slashes() {
739 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
740 let node = NodeEntity::new(
741 0,
742 zid,
743 0,
744 "action_server".to_string(),
745 "/".to_string(),
746 String::new(),
747 );
748
749 let entity = EndpointEntity {
750 id: 3,
751 node: Some(node),
752 kind: EndpointKind::Publisher, topic: "/fibonacci/_action/send_goal".to_string(),
754 type_info: Some(TypeInfo::new(
755 "action_tutorials_interfaces::action::dds_::Fibonacci_SendGoal_",
756 TypeHash::zero(),
757 )),
758 qos: QosProfile::default(),
759 };
760
761 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
762 let ke_str = topic_ke.as_str();
763
764 assert!(
766 ke_str.contains("fibonacci/_action/send_goal"),
767 "Action topic key expr should preserve internal slashes (strip_slashes behavior), got: {}",
768 ke_str
769 );
770 assert!(
771 !ke_str.contains("%"),
772 "Action topic key expr should NOT mangle slashes, got: {}",
773 ke_str
774 );
775 }
776
777 #[test]
779 fn test_topic_key_expr_strips_leading_trailing_slashes() {
780 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
781 let node = NodeEntity::new(
782 0,
783 zid,
784 0,
785 "test".to_string(),
786 "/".to_string(),
787 String::new(),
788 );
789
790 let entity = EndpointEntity {
792 id: 4,
793 node: Some(node),
794 kind: EndpointKind::Service,
795 topic: "/my_service/".to_string(),
796 type_info: Some(TypeInfo::new(
797 "example_interfaces/srv/Trigger",
798 TypeHash::zero(),
799 )),
800 qos: QosProfile::default(),
801 };
802
803 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
804 let ke_str = topic_ke.as_str();
805
806 assert!(
808 ke_str.starts_with("0/my_service/"),
809 "Should strip leading and trailing slashes, got: {}",
810 ke_str
811 );
812 assert!(
813 !ke_str.contains("//"),
814 "Should not have double slashes, got: {}",
815 ke_str
816 );
817 }
818
819 #[test]
821 fn test_topic_key_expr_empty_type_info() {
822 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
823 let node = NodeEntity::new(
824 0,
825 zid,
826 0,
827 "test".to_string(),
828 "/".to_string(),
829 String::new(),
830 );
831
832 let entity = EndpointEntity {
833 id: 5,
834 node: Some(node),
835 kind: EndpointKind::Publisher,
836 topic: "chatter".to_string(),
837 type_info: None, qos: QosProfile::default(),
839 };
840
841 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
842 let ke_str = topic_ke.as_str();
843
844 assert!(
846 ke_str.contains(EMPTY_TOPIC_TYPE),
847 "Should use EMPTY_TOPIC_TYPE placeholder, got: {}",
848 ke_str
849 );
850 assert!(
851 ke_str.contains(EMPTY_TOPIC_HASH),
852 "Should use EMPTY_TOPIC_HASH placeholder, got: {}",
853 ke_str
854 );
855 }
856
857 #[test]
859 fn test_topic_key_expr_type_name_demangling() {
860 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
861 let node = NodeEntity::new(
862 0,
863 zid,
864 0,
865 "test".to_string(),
866 "/".to_string(),
867 String::new(),
868 );
869
870 let entity = EndpointEntity {
871 id: 6,
872 node: Some(node),
873 kind: EndpointKind::Publisher,
874 topic: "chatter".to_string(),
875 type_info: Some(TypeInfo::new("std_msgs%msg%String", TypeHash::zero())),
877 qos: QosProfile::default(),
878 };
879
880 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
881 let ke_str = topic_ke.as_str();
882
883 assert!(
885 ke_str.contains("std_msgs/msg/String"),
886 "Type name should be demangled in topic key expr, got: {}",
887 ke_str
888 );
889 }
890
891 #[test]
898 fn test_service_liveliness_mangles_topic_name() {
899 let zid: zenoh::session::ZenohId = "9aed1ea85b72095f6dbc9ee90dabd56".parse().unwrap();
900 let node = NodeEntity::new(
901 0,
902 zid,
903 0,
904 "talker".to_string(),
905 "/".to_string(),
906 String::new(),
907 );
908
909 let entity = EndpointEntity {
910 id: 10,
911 node: Some(node),
912 kind: EndpointKind::Service,
913 topic: "/talker/get_type_description".to_string(),
914 type_info: Some(TypeInfo::new(
915 "type_description_interfaces::srv::dds_::GetTypeDescription_",
916 TypeHash::zero(),
917 )),
918 qos: QosProfile::default(),
919 };
920
921 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
922 let ke_str = liveliness_ke.as_str();
923
924 assert!(
926 ke_str.contains("%talker%get_type_description"),
927 "Service liveliness should mangle topic name, got: {}",
928 ke_str
929 );
930 }
931
932 #[test]
934 fn test_publisher_liveliness_multi_segment_namespace() {
935 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
936 let node = NodeEntity::new(
937 0,
938 zid,
939 0,
940 "sensor_node".to_string(),
941 "/robot/sensors".to_string(),
942 String::new(),
943 );
944
945 let entity = EndpointEntity {
946 id: 7,
947 node: Some(node),
948 kind: EndpointKind::Publisher,
949 topic: "/data/temperature".to_string(),
950 type_info: Some(TypeInfo::new(
951 "sensor_msgs/msg/Temperature",
952 TypeHash::zero(),
953 )),
954 qos: QosProfile::default(),
955 };
956
957 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
958 let ke_str = liveliness_ke.as_str();
959
960 assert!(
962 ke_str.contains("%robot%sensors"),
963 "Namespace should be mangled in liveliness, got: {}",
964 ke_str
965 );
966 assert!(
968 ke_str.contains("%data%temperature"),
969 "Topic should be mangled in liveliness, got: {}",
970 ke_str
971 );
972 assert!(
974 ke_str.contains("/MP/"),
975 "Should contain '/MP/', got: {}",
976 ke_str
977 );
978 }
979
980 #[test]
982 fn test_liveliness_empty_namespace() {
983 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
984 let node = NodeEntity::new(
985 0,
986 zid,
987 0,
988 "test_node".to_string(),
989 "".to_string(),
990 String::new(),
991 );
992
993 let entity = EndpointEntity {
994 id: 8,
995 node: Some(node),
996 kind: EndpointKind::Subscription,
997 topic: "chatter".to_string(),
998 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
999 qos: QosProfile::default(),
1000 };
1001
1002 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
1003 let ke_str = liveliness_ke.as_str();
1004
1005 let parts: Vec<&str> = ke_str.split('/').collect();
1008 assert_eq!(
1009 parts[6], EMPTY_PLACEHOLDER,
1010 "Enclave should be empty placeholder"
1011 );
1012 assert_eq!(
1013 parts[7], EMPTY_PLACEHOLDER,
1014 "Empty namespace should use placeholder, got: {}",
1015 ke_str
1016 );
1017 }
1018
1019 #[test]
1021 fn test_liveliness_root_namespace() {
1022 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1023 let node = NodeEntity::new(
1024 0,
1025 zid,
1026 0,
1027 "test_node".to_string(),
1028 "/".to_string(),
1029 String::new(),
1030 );
1031
1032 let entity = EndpointEntity {
1033 id: 9,
1034 node: Some(node),
1035 kind: EndpointKind::Publisher,
1036 topic: "chatter".to_string(),
1037 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
1038 qos: QosProfile::default(),
1039 };
1040
1041 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
1042 let ke_str = liveliness_ke.as_str();
1043
1044 let parts: Vec<&str> = ke_str.split('/').collect();
1046 assert_eq!(
1047 parts[7], EMPTY_PLACEHOLDER,
1048 "Root namespace should use placeholder, got: {}",
1049 ke_str
1050 );
1051 }
1052
1053 #[test]
1055 fn test_liveliness_type_info_mangling() {
1056 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1057 let node = NodeEntity::new(
1058 0,
1059 zid,
1060 0,
1061 "test".to_string(),
1062 "/".to_string(),
1063 String::new(),
1064 );
1065
1066 let entity = EndpointEntity {
1067 id: 10,
1068 node: Some(node),
1069 kind: EndpointKind::Publisher,
1070 topic: "image".to_string(),
1071 type_info: Some(TypeInfo::new(
1072 "sensor_msgs/msg/Image",
1073 TypeHash::new(1, [0x12; 32]),
1074 )),
1075 qos: QosProfile::default(),
1076 };
1077
1078 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
1079 let ke_str = liveliness_ke.as_str();
1080
1081 assert!(
1083 ke_str.contains("sensor_msgs%msg%Image"),
1084 "Type name should be mangled in liveliness, got: {}",
1085 ke_str
1086 );
1087 assert!(
1089 ke_str.contains("RIHS01_"),
1090 "Should contain RIHS01 hash, got: {}",
1091 ke_str
1092 );
1093 }
1094
1095 #[test]
1097 fn test_liveliness_qos_encoding() {
1098 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1099 let node = NodeEntity::new(
1100 0,
1101 zid,
1102 0,
1103 "test".to_string(),
1104 "/".to_string(),
1105 String::new(),
1106 );
1107
1108 let qos = QosProfile {
1109 reliability: QosReliability::Reliable,
1110 durability: QosDurability::TransientLocal,
1111 history: QosHistory::from_depth(10),
1112 };
1113
1114 let entity = EndpointEntity {
1115 id: 11,
1116 node: Some(node),
1117 kind: EndpointKind::Publisher,
1118 topic: "chatter".to_string(),
1119 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
1120 qos,
1121 };
1122
1123 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&entity, &zid).unwrap();
1124 let ke_str = liveliness_ke.as_str();
1125
1126 let parts: Vec<&str> = ke_str.split('/').collect();
1129 let qos_part = parts.last().unwrap();
1130 assert!(
1131 qos_part.contains(":"),
1132 "QoS should be encoded with colons, got: {}",
1133 qos_part
1134 );
1135 assert!(
1137 qos_part.contains("10"),
1138 "QoS should contain history depth, got: {}",
1139 qos_part
1140 );
1141 }
1142
1143 #[test]
1147 fn test_parse_liveliness_publisher_roundtrip() {
1148 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1149 let node = NodeEntity::new(
1150 0,
1151 zid,
1152 0,
1153 "test_node".to_string(),
1154 "/my_ns".to_string(),
1155 String::new(),
1156 );
1157
1158 let original = EndpointEntity {
1159 id: 12,
1160 node: Some(node),
1161 kind: EndpointKind::Publisher,
1162 topic: "/topic/name".to_string(),
1163 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
1164 qos: QosProfile::default(),
1165 };
1166
1167 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&original, &zid).unwrap();
1168 let parsed = RmwZenohFormatter::parse_liveliness(&liveliness_ke).unwrap();
1169
1170 if let Entity::Endpoint(parsed_entity) = parsed {
1171 assert_eq!(parsed_entity.id, original.id);
1172 assert_eq!(parsed_entity.kind, original.kind);
1173 assert_eq!(
1174 parsed_entity.node.as_ref().unwrap().name,
1175 original.node.as_ref().unwrap().name
1176 );
1177 assert_eq!(
1178 parsed_entity.node.as_ref().unwrap().namespace,
1179 original.node.as_ref().unwrap().namespace
1180 );
1181 assert_eq!(parsed_entity.topic, "/topic/name");
1183 } else {
1184 panic!("Expected Endpoint entity");
1185 }
1186 }
1187
1188 #[test]
1190 fn test_parse_liveliness_service_roundtrip() {
1191 let zid: zenoh::session::ZenohId = "9aed1ea85b72095f6dbc9ee90dabd56".parse().unwrap();
1192 let node = NodeEntity::new(
1193 0,
1194 zid,
1195 0,
1196 "server".to_string(),
1197 "/".to_string(),
1198 String::new(),
1199 );
1200
1201 let original = EndpointEntity {
1202 id: 13,
1203 node: Some(node),
1204 kind: EndpointKind::Service,
1205 topic: "/my/service".to_string(),
1206 type_info: Some(TypeInfo::new(
1207 "example_interfaces::srv::dds_::AddTwoInts_",
1208 TypeHash::new(1, [0xab; 32]),
1209 )),
1210 qos: QosProfile::default(),
1211 };
1212
1213 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&original, &zid).unwrap();
1214 let parsed = RmwZenohFormatter::parse_liveliness(&liveliness_ke).unwrap();
1215
1216 if let Entity::Endpoint(parsed_entity) = parsed {
1217 assert_eq!(parsed_entity.id, original.id);
1218 assert_eq!(parsed_entity.kind, EndpointKind::Service);
1219 assert_eq!(parsed_entity.topic, "/my/service");
1220 assert_eq!(
1221 parsed_entity.type_info.as_ref().unwrap().name,
1222 "example_interfaces::srv::dds_::AddTwoInts_"
1223 );
1224 } else {
1225 panic!("Expected Endpoint entity");
1226 }
1227 }
1228
1229 #[test]
1231 fn test_parse_liveliness_empty_type_info() {
1232 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1233 let node = NodeEntity::new(
1234 0,
1235 zid,
1236 0,
1237 "test".to_string(),
1238 "/".to_string(),
1239 String::new(),
1240 );
1241
1242 let original = EndpointEntity {
1243 id: 14,
1244 node: Some(node),
1245 kind: EndpointKind::Publisher,
1246 topic: "test".to_string(),
1247 type_info: None,
1248 qos: QosProfile::default(),
1249 };
1250
1251 let liveliness_ke = RmwZenohFormatter::liveliness_key_expr(&original, &zid).unwrap();
1252 let parsed = RmwZenohFormatter::parse_liveliness(&liveliness_ke).unwrap();
1253
1254 if let Entity::Endpoint(parsed_entity) = parsed {
1255 assert!(
1256 parsed_entity.type_info.is_none(),
1257 "Type info should be None for empty placeholders"
1258 );
1259 } else {
1260 panic!("Expected Endpoint entity");
1261 }
1262 }
1263
1264 #[test]
1266 fn test_parse_liveliness_node() {
1267 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1268 let node = NodeEntity::new(
1269 0,
1270 zid,
1271 15,
1272 "my_node".to_string(),
1273 "/robot/sensors".to_string(),
1274 String::new(),
1275 );
1276
1277 let liveliness_ke = RmwZenohFormatter::node_liveliness_key_expr(&node).unwrap();
1278 let parsed = RmwZenohFormatter::parse_liveliness(&liveliness_ke).unwrap();
1279
1280 if let Entity::Node(parsed_node) = parsed {
1281 assert_eq!(parsed_node.id, node.id);
1282 assert_eq!(parsed_node.name, node.name);
1283 assert_eq!(parsed_node.namespace, node.namespace);
1284 assert_eq!(parsed_node.domain_id, node.domain_id);
1285 } else {
1286 panic!("Expected Node entity");
1287 }
1288 }
1289
1290 #[test]
1294 fn test_topic_key_expr_no_slashes() {
1295 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1296 let node = NodeEntity::new(
1297 0,
1298 zid,
1299 0,
1300 "test".to_string(),
1301 "/".to_string(),
1302 String::new(),
1303 );
1304
1305 let entity = EndpointEntity {
1306 id: 16,
1307 node: Some(node),
1308 kind: EndpointKind::Publisher,
1309 topic: "simple_topic".to_string(),
1310 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
1311 qos: QosProfile::default(),
1312 };
1313
1314 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
1315 let ke_str = topic_ke.as_str();
1316
1317 assert!(
1319 ke_str.starts_with("0/simple_topic/"),
1320 "Simple topic should work, got: {}",
1321 ke_str
1322 );
1323 }
1324
1325 #[test]
1330 fn test_service_topic_consecutive_slashes_rejected() {
1331 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1332 let node = NodeEntity::new(
1333 0,
1334 zid,
1335 0,
1336 "test".to_string(),
1337 "/".to_string(),
1338 String::new(),
1339 );
1340
1341 let entity = EndpointEntity {
1342 id: 17,
1343 node: Some(node),
1344 kind: EndpointKind::Service,
1345 topic: "/a//b".to_string(), type_info: Some(TypeInfo::new("std_srvs/srv/Trigger", TypeHash::zero())),
1347 qos: QosProfile::default(),
1348 };
1349
1350 let result = RmwZenohFormatter::topic_key_expr(&entity);
1352 assert!(
1353 result.is_err(),
1354 "Consecutive slashes should be rejected by Zenoh KeyExpr"
1355 );
1356 }
1357
1358 #[test]
1360 fn test_topic_key_expr_dds_type_name() {
1361 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1362 let node = NodeEntity::new(
1363 0,
1364 zid,
1365 0,
1366 "test".to_string(),
1367 "/".to_string(),
1368 String::new(),
1369 );
1370
1371 let entity = EndpointEntity {
1372 id: 18,
1373 node: Some(node),
1374 kind: EndpointKind::Publisher,
1375 topic: "chatter".to_string(),
1376 type_info: Some(TypeInfo::new(
1378 "std_msgs::msg::dds_::String_",
1379 TypeHash::zero(),
1380 )),
1381 qos: QosProfile::default(),
1382 };
1383
1384 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
1385 let ke_str = topic_ke.as_str();
1386
1387 assert!(
1389 ke_str.contains("std_msgs::msg::dds_::String_"),
1390 "DDS type name should be preserved, got: {}",
1391 ke_str
1392 );
1393 }
1394
1395 #[test]
1397 fn test_topic_key_expr_long_name() {
1398 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
1399 let node = NodeEntity::new(
1400 0,
1401 zid,
1402 0,
1403 "test".to_string(),
1404 "/".to_string(),
1405 String::new(),
1406 );
1407
1408 let long_topic = "/very/long/topic/name/with/many/segments/for/testing/purposes";
1409 let entity = EndpointEntity {
1410 id: 19,
1411 node: Some(node),
1412 kind: EndpointKind::Service,
1413 topic: long_topic.to_string(),
1414 type_info: Some(TypeInfo::new("std_srvs/srv/Trigger", TypeHash::zero())),
1415 qos: QosProfile::default(),
1416 };
1417
1418 let topic_ke = RmwZenohFormatter::topic_key_expr(&entity).unwrap();
1419 let ke_str = topic_ke.as_str();
1420
1421 assert!(
1423 ke_str.contains("very/long/topic/name/with/many/segments/for/testing/purposes"),
1424 "Should handle long topic names, got: {}",
1425 ke_str
1426 );
1427 }
1428
1429 #[test]
1431 fn test_type_hash_rihs_format() {
1432 let hash = TypeHash::new(1, [0xab; 32]);
1433 let rihs_str = hash.to_rihs_string();
1434
1435 assert!(
1437 rihs_str.starts_with("RIHS01_"),
1438 "Should start with RIHS01_, got: {}",
1439 rihs_str
1440 );
1441 assert_eq!(
1442 rihs_str.len(),
1443 7 + 64,
1444 "RIHS string should be 71 chars (RIHS01_ + 64 hex)"
1445 );
1446
1447 let parsed = TypeHash::from_rihs_string(&rihs_str).unwrap();
1449 assert_eq!(parsed.version, 1);
1450 assert_eq!(parsed.value, [0xab; 32]);
1451 }
1452
1453 #[test]
1455 fn test_type_hash_zero() {
1456 let hash = TypeHash::zero();
1457 let rihs_str = hash.to_rihs_string();
1458
1459 assert_eq!(
1460 rihs_str,
1461 "RIHS01_0000000000000000000000000000000000000000000000000000000000000000"
1462 );
1463 }
1464}
1465
1466#[cfg(kani)]
1471mod kani_proofs {
1472 use super::*;
1473 use crate::{
1474 entity::{EndpointEntity, EntityKind, NodeEntity, TypeHash, TypeInfo},
1475 qos::{QosDurability, QosHistory, QosProfile, QosReliability},
1476 };
1477 use zenoh::session::ZenohId;
1478
1479 #[kani::proof]
1482 #[kani::unwind(8)]
1483 fn liveliness_roundtrip_domain_id() {
1484 let raw_domain_id: u8 = kani::any();
1486 let domain_id = raw_domain_id as usize;
1487
1488 let node = NodeEntity {
1489 domain_id,
1490 z_id: ZenohId::default(),
1491 id: 1,
1492 name: "kani_node".to_string(),
1493 namespace: "/".to_string(),
1494 enclave: "/".to_string(),
1495 };
1496 let entity = EndpointEntity {
1497 id: 1,
1498 node: Some(node),
1499 kind: EndpointKind::Publisher,
1500 topic: "/kani_topic".to_string(),
1501 type_info: Some(TypeInfo {
1502 name: "std_msgs/msg/String".to_string(),
1503 hash: TypeHash::zero(),
1504 }),
1505 qos: QosProfile {
1506 reliability: QosReliability::Reliable,
1507 durability: QosDurability::Volatile,
1508 history: QosHistory::KeepLast(10),
1509 },
1510 };
1511
1512 let ke = RmwZenohFormatter::liveliness_key_expr(&entity, &ZenohId::default())
1513 .expect("liveliness_key_expr");
1514 let parsed = RmwZenohFormatter::parse_liveliness(&ke).expect("parse_liveliness");
1515
1516 if let crate::entity::Entity::Endpoint(ep) = parsed {
1517 kani::assert(
1518 ep.node.as_ref().unwrap().domain_id == domain_id,
1519 "domain_id preserved",
1520 );
1521 kani::assert(ep.kind == EndpointKind::Publisher, "entity kind preserved");
1522 } else {
1523 kani::assert(false, "expected Endpoint entity");
1524 }
1525 }
1526}