1use zenoh::{key_expr::KeyExpr, session::ZenohId, Result};
12
13use crate::{
14 entity::{
15 EndpointEntity, EndpointKind, Entity, EntityConversionError, LivelinessKE, NodeEntity,
16 TopicKE, TypeHash, TypeInfo,
17 },
18 qos::{QosDurability, QosHistory, QosProfile, QosReliability},
19};
20
21use super::KeyExprFormatter;
22
23pub const SLASH_REPLACEMENT_CHAR: char = '§';
25
26pub struct Ros2DdsFormatter;
28
29impl KeyExprFormatter for Ros2DdsFormatter {
30 const ESCAPE_CHAR: char = SLASH_REPLACEMENT_CHAR;
32
33 const ADMIN_SPACE: &'static str = "@ros2_lv";
35
36 fn topic_key_expr(entity: &EndpointEntity) -> Result<TopicKE> {
37 let topic = {
39 let s = &entity.topic;
40 let s = s.strip_prefix('/').unwrap_or(s);
41 let s = s.strip_suffix('/').unwrap_or(s);
42 s.to_string()
43 };
44
45 Ok(TopicKE::new(topic.try_into()?))
46 }
47
48 fn liveliness_key_expr(entity: &EndpointEntity, zid: &ZenohId) -> Result<LivelinessKE> {
49 let kind = match entity.kind {
51 EndpointKind::Publisher => "MP",
52 EndpointKind::Subscription => "MS",
53 EndpointKind::Service => "SS",
54 EndpointKind::Client => "SC",
55 };
56
57 let topic = {
59 let s = &entity.topic;
60 let s = s.strip_prefix('/').unwrap_or(s);
61 let s = s.strip_suffix('/').unwrap_or(s);
62 Self::mangle_name(s)
63 };
64
65 let type_name = entity
67 .type_info
68 .as_ref()
69 .map(|ti| Self::mangle_name(&ti.name))
70 .unwrap_or_else(|| "unknown".to_string());
71
72 let qos_str = match entity.kind {
74 EndpointKind::Publisher | EndpointKind::Subscription => {
75 format!("/{}", Self::encode_qos(&entity.qos, false))
76 }
77 _ => String::new(),
78 };
79
80 let ke = format!(
81 "@/{zid}/{}/{kind}/{topic}/{type_name}{qos_str}",
82 Self::ADMIN_SPACE
83 );
84
85 Ok(LivelinessKE::new(ke.try_into()?))
86 }
87
88 fn node_liveliness_key_expr(_entity: &NodeEntity) -> Result<LivelinessKE> {
89 Err(zenoh::Error::from(
92 "ros2dds backend does not support node liveliness tokens",
93 ))
94 }
95
96 fn parse_liveliness(ke: &KeyExpr) -> Result<Entity> {
97 use EntityConversionError::*;
99
100 let mut iter = ke.split('/');
101
102 let first = iter.next().ok_or(MissingAdminSpace)?;
104 if first != "@" {
105 return Err(zenoh::Error::from(MissingAdminSpace));
106 }
107
108 let _z_id = iter.next().ok_or(MissingZId)?;
111
112 let admin = iter.next().ok_or(MissingAdminSpace)?;
114 if admin != Self::ADMIN_SPACE {
115 return Err(zenoh::Error::from(MissingAdminSpace));
116 }
117
118 let kind_str = iter.next().ok_or(MissingEntityKind)?;
120 let kind = match kind_str {
121 "MP" => EndpointKind::Publisher,
122 "MS" => EndpointKind::Subscription,
123 "SS" => EndpointKind::Service,
124 "SC" => EndpointKind::Client,
125 "AS" | "AC" => {
126 EndpointKind::Service
128 }
129 _ => return Err(zenoh::Error::from(ParsingError)),
130 };
131
132 let topic_escaped = iter.next().ok_or(MissingTopicName)?;
134 let topic = match Self::demangle_name(topic_escaped) {
135 topic if topic.is_empty() => "/".to_string(),
136 topic if topic.starts_with('/') => topic,
137 topic => format!("/{}", topic),
138 };
139
140 let type_escaped = iter.next().ok_or(MissingTopicType)?;
142 let type_name = Self::demangle_name(type_escaped);
143
144 let qos = if let Some(qos_str) = iter.next() {
146 let (_, qos) = Self::decode_qos(qos_str)?;
147 qos
148 } else {
149 QosProfile::default()
150 };
151
152 let type_info = if type_name.is_empty() || type_name == "unknown" {
153 None
154 } else {
155 Some(TypeInfo {
156 name: type_name,
157 hash: TypeHash::zero(),
158 })
159 };
160
161 Ok(Entity::Endpoint(EndpointEntity {
162 id: 0,
163 node: None,
164 kind,
165 topic,
166 type_info,
167 qos,
168 }))
169 }
170
171 fn encode_qos(qos: &QosProfile, keyless: bool) -> String {
179 let mut result = String::new();
180
181 if !keyless {
183 result.push('K');
184 }
185 result.push(':');
186
187 match qos.reliability {
189 QosReliability::BestEffort => result.push('0'),
190 QosReliability::Reliable => result.push('1'),
191 }
192 result.push(':');
193
194 match qos.durability {
196 QosDurability::Volatile => result.push('0'),
197 QosDurability::TransientLocal => result.push('1'),
198 }
199 result.push(':');
200
201 match qos.history {
203 QosHistory::KeepLast(depth) => {
204 result.push_str(&format!("0,{}", depth));
205 }
206 QosHistory::KeepAll => {
207 result.push_str("1,0");
208 }
209 }
210
211 result
212 }
213
214 fn decode_qos(s: &str) -> Result<(bool, QosProfile)> {
216 let parts: Vec<&str> = s.split(':').collect();
217 if parts.len() < 4 {
218 return Err(zenoh::Error::from(format!(
219 "Invalid QoS format: expected at least 4 colon-separated parts, got {}",
220 parts.len()
221 )));
222 }
223
224 let keyless = parts[0].is_empty();
225
226 let reliability = match parts[1] {
227 "" => QosReliability::default(),
228 "0" => QosReliability::BestEffort,
229 "1" => QosReliability::Reliable,
230 _ => QosReliability::default(),
231 };
232
233 let durability = match parts[2] {
234 "" => QosDurability::default(),
235 "0" => QosDurability::Volatile,
236 "1" => QosDurability::TransientLocal,
237 _ => QosDurability::default(),
238 };
239
240 let history = if parts[3].is_empty() {
241 QosHistory::default()
242 } else {
243 let history_parts: Vec<&str> = parts[3].split(',').collect();
244 if history_parts.len() >= 2 {
245 let depth: usize = history_parts[1].parse().unwrap_or(10);
246 match history_parts[0] {
247 "0" | "" => QosHistory::from_depth(depth),
248 "1" => QosHistory::KeepAll,
249 _ => QosHistory::from_depth(depth),
250 }
251 } else {
252 QosHistory::default()
253 }
254 };
255
256 Ok((
257 keyless,
258 QosProfile {
259 reliability,
260 durability,
261 history,
262 },
263 ))
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_mangle_demangle() {
273 assert_eq!(Ros2DdsFormatter::mangle_name("/chatter"), "§chatter");
274 assert_eq!(
275 Ros2DdsFormatter::mangle_name("std_msgs/msg/String"),
276 "std_msgs§msg§String"
277 );
278 assert_eq!(
279 Ros2DdsFormatter::demangle_name("std_msgs§msg§String"),
280 "std_msgs/msg/String"
281 );
282 }
283
284 #[test]
285 fn test_qos_encode_decode() {
286 let qos = QosProfile::default();
287 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
288 assert!(encoded.starts_with("K:"));
289
290 let (keyless, decoded) = Ros2DdsFormatter::decode_qos(&encoded).unwrap();
291 assert!(!keyless);
292 assert_eq!(decoded.reliability, qos.reliability);
293 assert_eq!(decoded.durability, qos.durability);
294 }
295
296 #[test]
297 fn test_qos_reliable_transient() {
298 let qos = QosProfile {
299 reliability: QosReliability::Reliable,
300 durability: QosDurability::TransientLocal,
301 history: QosHistory::from_depth(10),
302 };
303 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
304 assert_eq!(encoded, "K:1:1:0,10");
305
306 let (keyless, decoded) = Ros2DdsFormatter::decode_qos(&encoded).unwrap();
307 assert!(!keyless);
308 assert_eq!(decoded.reliability, QosReliability::Reliable);
309 assert_eq!(decoded.durability, QosDurability::TransientLocal);
310 }
311
312 #[test]
321 fn test_qos_format_compatibility_with_zenoh_plugin() {
322 let qos = QosProfile::default();
324 let encoded = Ros2DdsFormatter::encode_qos(&qos, true);
325 assert!(
327 encoded.starts_with(":"),
328 "Expected ':' prefix for keyless, got: {}",
329 encoded
330 );
331
332 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
334 assert!(
335 encoded.starts_with("K:"),
336 "Expected 'K:' prefix for non-keyless, got: {}",
337 encoded
338 );
339
340 let qos = QosProfile {
342 reliability: QosReliability::Reliable,
343 ..Default::default()
344 };
345 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
346 let parts: Vec<&str> = encoded.split(':').collect();
347 assert_eq!(parts[1], "1", "RELIABLE should be encoded as '1'");
348
349 let qos = QosProfile {
351 reliability: QosReliability::BestEffort,
352 ..Default::default()
353 };
354 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
355 let parts: Vec<&str> = encoded.split(':').collect();
356 assert_eq!(parts[1], "0", "BEST_EFFORT should be encoded as '0'");
357
358 let qos = QosProfile {
360 durability: QosDurability::TransientLocal,
361 ..Default::default()
362 };
363 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
364 let parts: Vec<&str> = encoded.split(':').collect();
365 assert_eq!(parts[2], "1", "TRANSIENT_LOCAL should be encoded as '1'");
366
367 let qos = QosProfile {
369 durability: QosDurability::Volatile,
370 ..Default::default()
371 };
372 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
373 let parts: Vec<&str> = encoded.split(':').collect();
374 assert_eq!(parts[2], "0", "VOLATILE should be encoded as '0'");
375
376 let qos = QosProfile {
378 history: QosHistory::from_depth(5),
379 ..Default::default()
380 };
381 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
382 let parts: Vec<&str> = encoded.split(':').collect();
383 assert_eq!(parts[3], "0,5", "KEEP_LAST(5) should be encoded as '0,5'");
384
385 let qos = QosProfile {
387 history: QosHistory::KeepAll,
388 ..Default::default()
389 };
390 let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
391 let parts: Vec<&str> = encoded.split(':').collect();
392 assert_eq!(parts[3], "1,0", "KEEP_ALL should be encoded as '1,0'");
393 }
394
395 #[test]
400 fn test_topic_key_expr_format() {
401 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
402 let node = NodeEntity::new(
403 0,
404 zid,
405 1,
406 "test_node".to_string(),
407 "/".to_string(),
408 String::new(),
409 );
410
411 let entity = EndpointEntity {
412 id: 1,
413 node: Some(node),
414 kind: EndpointKind::Publisher,
415 topic: "chatter".to_string(),
416 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
417 qos: QosProfile::default(),
418 };
419
420 let topic_ke = Ros2DdsFormatter::topic_key_expr(&entity).unwrap();
421 assert_eq!(topic_ke.as_str(), "chatter");
423 }
424
425 #[test]
429 fn test_liveliness_key_expr_format() {
430 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
431 let node = NodeEntity::new(
432 0,
433 zid,
434 1,
435 "test_node".to_string(),
436 "/".to_string(),
437 String::new(),
438 );
439
440 let entity = EndpointEntity {
441 id: 1,
442 node: Some(node),
443 kind: EndpointKind::Publisher,
444 topic: "chatter".to_string(),
445 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
446 qos: QosProfile::default(),
447 };
448
449 let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
450 let ke_str = liveliness_ke.as_str();
451
452 assert!(
454 ke_str.starts_with("@/"),
455 "Should start with '@/', got: {}",
456 ke_str
457 );
458 assert!(
459 ke_str.contains("/@ros2_lv/"),
460 "Should contain '/@ros2_lv/', got: {}",
461 ke_str
462 );
463
464 assert!(
466 ke_str.contains("/MP/"),
467 "Should contain '/MP/' for Publisher, got: {}",
468 ke_str
469 );
470
471 assert!(
473 ke_str.contains("/chatter/"),
474 "Should contain '/chatter/', got: {}",
475 ke_str
476 );
477
478 assert!(
480 ke_str.contains("std_msgs§msg§String"),
481 "Should contain 'std_msgs§msg§String', got: {}",
482 ke_str
483 );
484 }
485
486 #[test]
488 fn test_subscriber_liveliness_key_expr() {
489 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
490 let node = NodeEntity::new(
491 0,
492 zid,
493 1,
494 "test_node".to_string(),
495 "/".to_string(),
496 String::new(),
497 );
498
499 let entity = EndpointEntity {
500 id: 1,
501 node: Some(node),
502 kind: EndpointKind::Subscription,
503 topic: "chatter".to_string(),
504 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
505 qos: QosProfile::default(),
506 };
507
508 let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
509 let ke_str = liveliness_ke.as_str();
510
511 assert!(
513 ke_str.contains("/MS/"),
514 "Should contain '/MS/' for Subscription, got: {}",
515 ke_str
516 );
517 }
518
519 #[test]
520 fn test_parse_liveliness_restores_absolute_topic_name() {
521 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
522 let node = NodeEntity::new(
523 0,
524 zid,
525 1,
526 "test_node".to_string(),
527 "/".to_string(),
528 String::new(),
529 );
530
531 let entity = EndpointEntity {
532 id: 1,
533 node: Some(node),
534 kind: EndpointKind::Publisher,
535 topic: "/chatter".to_string(),
536 type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
537 qos: QosProfile::default(),
538 };
539
540 let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
541 let parsed = Ros2DdsFormatter::parse_liveliness(&liveliness_ke).unwrap();
542
543 match parsed {
544 Entity::Endpoint(endpoint) => {
545 assert_eq!(endpoint.topic, "/chatter");
546 assert!(endpoint.node.is_none());
547 }
548 other => panic!("expected endpoint entity, got {:?}", other),
549 }
550 }
551
552 #[test]
554 fn test_service_liveliness_key_expr() {
555 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
556 let node = NodeEntity::new(
557 0,
558 zid,
559 1,
560 "test_node".to_string(),
561 "/".to_string(),
562 String::new(),
563 );
564
565 let entity = EndpointEntity {
566 id: 1,
567 node: Some(node),
568 kind: EndpointKind::Service,
569 topic: "add_two_ints".to_string(),
570 type_info: Some(TypeInfo::new(
571 "example_interfaces/srv/AddTwoInts",
572 TypeHash::zero(),
573 )),
574 qos: QosProfile::default(),
575 };
576
577 let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
578 let ke_str = liveliness_ke.as_str();
579
580 assert!(
582 ke_str.contains("/SS/"),
583 "Should contain '/SS/' for Service, got: {}",
584 ke_str
585 );
586
587 let parts: Vec<&str> = ke_str.split('/').collect();
590 let last_part = parts.last().unwrap();
592 assert!(
593 !last_part.contains(':'),
594 "Service liveliness should not have QoS suffix, got: {}",
595 ke_str
596 );
597 }
598
599 #[test]
601 fn test_client_liveliness_key_expr() {
602 let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
603 let node = NodeEntity::new(
604 0,
605 zid,
606 1,
607 "test_node".to_string(),
608 "/".to_string(),
609 String::new(),
610 );
611
612 let entity = EndpointEntity {
613 id: 1,
614 node: Some(node),
615 kind: EndpointKind::Client,
616 topic: "add_two_ints".to_string(),
617 type_info: Some(TypeInfo::new(
618 "example_interfaces/srv/AddTwoInts",
619 TypeHash::zero(),
620 )),
621 qos: QosProfile::default(),
622 };
623
624 let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
625 let ke_str = liveliness_ke.as_str();
626
627 assert!(
629 ke_str.contains("/SC/"),
630 "Should contain '/SC/' for Client, got: {}",
631 ke_str
632 );
633 }
634}