Skip to main content

hiroz_protocol/format/
ros2dds.rs

1//! zenoh-plugin-ros2dds compatible backend.
2//!
3//! This backend generates key expressions compatible with zenoh-plugin-ros2dds.
4//!
5//! Key expression formats:
6//! - Topic: `<topic_name>` (simple, no type/hash)
7//! - Liveliness: `@/<zenoh_id>/@ros2_lv/<kind>/<escaped_ke>/<type>[/<qos>]`
8//!
9//! Reference: zenoh-plugin-ros2dds/src/liveliness_mgt.rs
10
11use zenoh::{key_expr::KeyExpr, session::ZenohId, Result};
12
13use crate::{
14    entity::{
15        EndpointEntity, EndpointKind, Entity, EntityConversionError, LivelinessKE, NodeEntity,
16        TopicKE, TypeHash, TypeInfo,
17    },
18    qos::{QosDurability, QosHistory, QosProfile, QosReliability},
19};
20
21use super::KeyExprFormatter;
22
23/// Escape character for slashes in ros2dds format (U+00A7 Section Sign).
24pub const SLASH_REPLACEMENT_CHAR: char = '§';
25
26/// zenoh-plugin-ros2dds compatible formatter.
27pub struct Ros2DdsFormatter;
28
29impl KeyExprFormatter for Ros2DdsFormatter {
30    /// ros2dds uses '§' (U+00A7) to escape slashes
31    const ESCAPE_CHAR: char = SLASH_REPLACEMENT_CHAR;
32
33    /// Admin space prefix for ros2dds liveliness tokens
34    const ADMIN_SPACE: &'static str = "@ros2_lv";
35
36    fn topic_key_expr(entity: &EndpointEntity) -> Result<TopicKE> {
37        // ros2dds format: just the topic name (no domain, type, or hash)
38        let topic = {
39            let s = &entity.topic;
40            let s = s.strip_prefix('/').unwrap_or(s);
41            let s = s.strip_suffix('/').unwrap_or(s);
42            s.to_string()
43        };
44
45        Ok(TopicKE::new(topic.try_into()?))
46    }
47
48    fn liveliness_key_expr(entity: &EndpointEntity, zid: &ZenohId) -> Result<LivelinessKE> {
49        // ros2dds format: @/<zenoh_id>/@ros2_lv/<kind>/<escaped_ke>/<type>[/<qos>]
50        let kind = match entity.kind {
51            EndpointKind::Publisher => "MP",
52            EndpointKind::Subscription => "MS",
53            EndpointKind::Service => "SS",
54            EndpointKind::Client => "SC",
55        };
56
57        // Escape slashes in topic name
58        let topic = {
59            let s = &entity.topic;
60            let s = s.strip_prefix('/').unwrap_or(s);
61            let s = s.strip_suffix('/').unwrap_or(s);
62            Self::mangle_name(s)
63        };
64
65        // Escape slashes in type name
66        let type_name = entity
67            .type_info
68            .as_ref()
69            .map(|ti| Self::mangle_name(&ti.name))
70            .unwrap_or_else(|| "unknown".to_string());
71
72        // QoS encoding for pub/sub only
73        let qos_str = match entity.kind {
74            EndpointKind::Publisher | EndpointKind::Subscription => {
75                format!("/{}", Self::encode_qos(&entity.qos, false))
76            }
77            _ => String::new(),
78        };
79
80        let ke = format!(
81            "@/{zid}/{}/{kind}/{topic}/{type_name}{qos_str}",
82            Self::ADMIN_SPACE
83        );
84
85        Ok(LivelinessKE::new(ke.try_into()?))
86    }
87
88    fn node_liveliness_key_expr(_entity: &NodeEntity) -> Result<LivelinessKE> {
89        // ros2dds does not expose node entities as liveliness tokens
90        // Return an empty/placeholder token that won't match anything
91        Err(zenoh::Error::from(
92            "ros2dds backend does not support node liveliness tokens",
93        ))
94    }
95
96    fn parse_liveliness(ke: &KeyExpr) -> Result<Entity> {
97        // ros2dds format: @/<zenoh_id>/@ros2_lv/<kind>/<escaped_ke>/<type>[/<qos>]
98        use EntityConversionError::*;
99
100        let mut iter = ke.split('/');
101
102        // First element should be '@'
103        let first = iter.next().ok_or(MissingAdminSpace)?;
104        if first != "@" {
105            return Err(zenoh::Error::from(MissingAdminSpace));
106        }
107
108        // Zenoh ID: present in the key expression but not used in Ros2Dds format
109        // (endpoints have no node identity, so the z_id cannot be stored)
110        let _z_id = iter.next().ok_or(MissingZId)?;
111
112        // @ros2_lv
113        let admin = iter.next().ok_or(MissingAdminSpace)?;
114        if admin != Self::ADMIN_SPACE {
115            return Err(zenoh::Error::from(MissingAdminSpace));
116        }
117
118        // Entity kind
119        let kind_str = iter.next().ok_or(MissingEntityKind)?;
120        let kind = match kind_str {
121            "MP" => EndpointKind::Publisher,
122            "MS" => EndpointKind::Subscription,
123            "SS" => EndpointKind::Service,
124            "SC" => EndpointKind::Client,
125            "AS" | "AC" => {
126                // Action server/client - map to Service for now
127                EndpointKind::Service
128            }
129            _ => return Err(zenoh::Error::from(ParsingError)),
130        };
131
132        // Topic key expression (escaped)
133        let topic_escaped = iter.next().ok_or(MissingTopicName)?;
134        let topic = match Self::demangle_name(topic_escaped) {
135            topic if topic.is_empty() => "/".to_string(),
136            topic if topic.starts_with('/') => topic,
137            topic => format!("/{}", topic),
138        };
139
140        // Type name (escaped)
141        let type_escaped = iter.next().ok_or(MissingTopicType)?;
142        let type_name = Self::demangle_name(type_escaped);
143
144        // Optional QoS
145        let qos = if let Some(qos_str) = iter.next() {
146            let (_, qos) = Self::decode_qos(qos_str)?;
147            qos
148        } else {
149            QosProfile::default()
150        };
151
152        let type_info = if type_name.is_empty() || type_name == "unknown" {
153            None
154        } else {
155            Some(TypeInfo {
156                name: type_name,
157                hash: TypeHash::zero(),
158            })
159        };
160
161        Ok(Entity::Endpoint(EndpointEntity {
162            id: 0,
163            node: None,
164            kind,
165            topic,
166            type_info,
167            qos,
168        }))
169    }
170
171    /// Encode QoS in ros2dds format.
172    ///
173    /// Format: `<keyless>:<reliability>:<durability>:<history_kind>,<depth>[:<user_data>]`
174    /// - keyless: 'K' if not keyless, empty if keyless
175    /// - reliability: 0=BEST_EFFORT, 1=RELIABLE, empty=default
176    /// - durability: 0=VOLATILE, 1=TRANSIENT_LOCAL, empty=default
177    /// - history: `<kind>,<depth>` where kind is 0=KEEP_LAST, 1=KEEP_ALL
178    fn encode_qos(qos: &QosProfile, keyless: bool) -> String {
179        let mut result = String::new();
180
181        // Keyless flag
182        if !keyless {
183            result.push('K');
184        }
185        result.push(':');
186
187        // Reliability
188        match qos.reliability {
189            QosReliability::BestEffort => result.push('0'),
190            QosReliability::Reliable => result.push('1'),
191        }
192        result.push(':');
193
194        // Durability
195        match qos.durability {
196            QosDurability::Volatile => result.push('0'),
197            QosDurability::TransientLocal => result.push('1'),
198        }
199        result.push(':');
200
201        // History
202        match qos.history {
203            QosHistory::KeepLast(depth) => {
204                result.push_str(&format!("0,{}", depth));
205            }
206            QosHistory::KeepAll => {
207                result.push_str("1,0");
208            }
209        }
210
211        result
212    }
213
214    /// Decode QoS from ros2dds format.
215    fn decode_qos(s: &str) -> Result<(bool, QosProfile)> {
216        let parts: Vec<&str> = s.split(':').collect();
217        if parts.len() < 4 {
218            return Err(zenoh::Error::from(format!(
219                "Invalid QoS format: expected at least 4 colon-separated parts, got {}",
220                parts.len()
221            )));
222        }
223
224        let keyless = parts[0].is_empty();
225
226        let reliability = match parts[1] {
227            "" => QosReliability::default(),
228            "0" => QosReliability::BestEffort,
229            "1" => QosReliability::Reliable,
230            _ => QosReliability::default(),
231        };
232
233        let durability = match parts[2] {
234            "" => QosDurability::default(),
235            "0" => QosDurability::Volatile,
236            "1" => QosDurability::TransientLocal,
237            _ => QosDurability::default(),
238        };
239
240        let history = if parts[3].is_empty() {
241            QosHistory::default()
242        } else {
243            let history_parts: Vec<&str> = parts[3].split(',').collect();
244            if history_parts.len() >= 2 {
245                let depth: usize = history_parts[1].parse().unwrap_or(10);
246                match history_parts[0] {
247                    "0" | "" => QosHistory::from_depth(depth),
248                    "1" => QosHistory::KeepAll,
249                    _ => QosHistory::from_depth(depth),
250                }
251            } else {
252                QosHistory::default()
253            }
254        };
255
256        Ok((
257            keyless,
258            QosProfile {
259                reliability,
260                durability,
261                history,
262            },
263        ))
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_mangle_demangle() {
273        assert_eq!(Ros2DdsFormatter::mangle_name("/chatter"), "§chatter");
274        assert_eq!(
275            Ros2DdsFormatter::mangle_name("std_msgs/msg/String"),
276            "std_msgs§msg§String"
277        );
278        assert_eq!(
279            Ros2DdsFormatter::demangle_name("std_msgs§msg§String"),
280            "std_msgs/msg/String"
281        );
282    }
283
284    #[test]
285    fn test_qos_encode_decode() {
286        let qos = QosProfile::default();
287        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
288        assert!(encoded.starts_with("K:"));
289
290        let (keyless, decoded) = Ros2DdsFormatter::decode_qos(&encoded).unwrap();
291        assert!(!keyless);
292        assert_eq!(decoded.reliability, qos.reliability);
293        assert_eq!(decoded.durability, qos.durability);
294    }
295
296    #[test]
297    fn test_qos_reliable_transient() {
298        let qos = QosProfile {
299            reliability: QosReliability::Reliable,
300            durability: QosDurability::TransientLocal,
301            history: QosHistory::from_depth(10),
302        };
303        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
304        assert_eq!(encoded, "K:1:1:0,10");
305
306        let (keyless, decoded) = Ros2DdsFormatter::decode_qos(&encoded).unwrap();
307        assert!(!keyless);
308        assert_eq!(decoded.reliability, QosReliability::Reliable);
309        assert_eq!(decoded.durability, QosDurability::TransientLocal);
310    }
311
312    /// Test that QoS encoding matches zenoh-plugin-ros2dds format.
313    ///
314    /// Format: `<keyless>:<ReliabilityKind>:<DurabilityKind>:<HistoryKind>,<HistoryDepth>`
315    /// where:
316    /// - keyless: 'K' if not keyless, empty if keyless
317    /// - ReliabilityKind: 0=BEST_EFFORT, 1=RELIABLE
318    /// - DurabilityKind: 0=VOLATILE, 1=TRANSIENT_LOCAL
319    /// - HistoryKind: 0=KEEP_LAST, 1=KEEP_ALL
320    #[test]
321    fn test_qos_format_compatibility_with_zenoh_plugin() {
322        // Test default QoS (keyless=true) -> ":::"
323        let qos = QosProfile::default();
324        let encoded = Ros2DdsFormatter::encode_qos(&qos, true);
325        // Keyless=true means empty first field
326        assert!(
327            encoded.starts_with(":"),
328            "Expected ':' prefix for keyless, got: {}",
329            encoded
330        );
331
332        // Test default QoS (keyless=false) -> "K:::"
333        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
334        assert!(
335            encoded.starts_with("K:"),
336            "Expected 'K:' prefix for non-keyless, got: {}",
337            encoded
338        );
339
340        // Test RELIABLE -> kind=1
341        let qos = QosProfile {
342            reliability: QosReliability::Reliable,
343            ..Default::default()
344        };
345        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
346        let parts: Vec<&str> = encoded.split(':').collect();
347        assert_eq!(parts[1], "1", "RELIABLE should be encoded as '1'");
348
349        // Test BEST_EFFORT -> kind=0
350        let qos = QosProfile {
351            reliability: QosReliability::BestEffort,
352            ..Default::default()
353        };
354        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
355        let parts: Vec<&str> = encoded.split(':').collect();
356        assert_eq!(parts[1], "0", "BEST_EFFORT should be encoded as '0'");
357
358        // Test TRANSIENT_LOCAL -> kind=1
359        let qos = QosProfile {
360            durability: QosDurability::TransientLocal,
361            ..Default::default()
362        };
363        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
364        let parts: Vec<&str> = encoded.split(':').collect();
365        assert_eq!(parts[2], "1", "TRANSIENT_LOCAL should be encoded as '1'");
366
367        // Test VOLATILE -> kind=0
368        let qos = QosProfile {
369            durability: QosDurability::Volatile,
370            ..Default::default()
371        };
372        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
373        let parts: Vec<&str> = encoded.split(':').collect();
374        assert_eq!(parts[2], "0", "VOLATILE should be encoded as '0'");
375
376        // Test KEEP_LAST with depth -> "0,depth"
377        let qos = QosProfile {
378            history: QosHistory::from_depth(5),
379            ..Default::default()
380        };
381        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
382        let parts: Vec<&str> = encoded.split(':').collect();
383        assert_eq!(parts[3], "0,5", "KEEP_LAST(5) should be encoded as '0,5'");
384
385        // Test KEEP_ALL -> "1,0"
386        let qos = QosProfile {
387            history: QosHistory::KeepAll,
388            ..Default::default()
389        };
390        let encoded = Ros2DdsFormatter::encode_qos(&qos, false);
391        let parts: Vec<&str> = encoded.split(':').collect();
392        assert_eq!(parts[3], "1,0", "KEEP_ALL should be encoded as '1,0'");
393    }
394
395    /// Test topic key expression format matches zenoh-plugin-ros2dds.
396    ///
397    /// zenoh-plugin-ros2dds uses simple topic names without type/hash:
398    /// Topic: `/chatter` -> key expression: `chatter`
399    #[test]
400    fn test_topic_key_expr_format() {
401        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
402        let node = NodeEntity::new(
403            0,
404            zid,
405            1,
406            "test_node".to_string(),
407            "/".to_string(),
408            String::new(),
409        );
410
411        let entity = EndpointEntity {
412            id: 1,
413            node: Some(node),
414            kind: EndpointKind::Publisher,
415            topic: "chatter".to_string(),
416            type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
417            qos: QosProfile::default(),
418        };
419
420        let topic_ke = Ros2DdsFormatter::topic_key_expr(&entity).unwrap();
421        // ros2dds uses simple topic name
422        assert_eq!(topic_ke.as_str(), "chatter");
423    }
424
425    /// Test liveliness key expression format matches zenoh-plugin-ros2dds.
426    ///
427    /// Format: `@/<zenoh_id>/@ros2_lv/<kind>/<escaped_ke>/<type>/<qos>`
428    #[test]
429    fn test_liveliness_key_expr_format() {
430        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
431        let node = NodeEntity::new(
432            0,
433            zid,
434            1,
435            "test_node".to_string(),
436            "/".to_string(),
437            String::new(),
438        );
439
440        let entity = EndpointEntity {
441            id: 1,
442            node: Some(node),
443            kind: EndpointKind::Publisher,
444            topic: "chatter".to_string(),
445            type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
446            qos: QosProfile::default(),
447        };
448
449        let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
450        let ke_str = liveliness_ke.as_str();
451
452        // Should start with @/<zenoh_id>/@ros2_lv
453        assert!(
454            ke_str.starts_with("@/"),
455            "Should start with '@/', got: {}",
456            ke_str
457        );
458        assert!(
459            ke_str.contains("/@ros2_lv/"),
460            "Should contain '/@ros2_lv/', got: {}",
461            ke_str
462        );
463
464        // Should contain MP for Publisher
465        assert!(
466            ke_str.contains("/MP/"),
467            "Should contain '/MP/' for Publisher, got: {}",
468            ke_str
469        );
470
471        // Should contain escaped topic name (chatter has no slashes, so unchanged)
472        assert!(
473            ke_str.contains("/chatter/"),
474            "Should contain '/chatter/', got: {}",
475            ke_str
476        );
477
478        // Should contain escaped type name with § instead of /
479        assert!(
480            ke_str.contains("std_msgs§msg§String"),
481            "Should contain 'std_msgs§msg§String', got: {}",
482            ke_str
483        );
484    }
485
486    /// Test subscriber liveliness key expression
487    #[test]
488    fn test_subscriber_liveliness_key_expr() {
489        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
490        let node = NodeEntity::new(
491            0,
492            zid,
493            1,
494            "test_node".to_string(),
495            "/".to_string(),
496            String::new(),
497        );
498
499        let entity = EndpointEntity {
500            id: 1,
501            node: Some(node),
502            kind: EndpointKind::Subscription,
503            topic: "chatter".to_string(),
504            type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
505            qos: QosProfile::default(),
506        };
507
508        let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
509        let ke_str = liveliness_ke.as_str();
510
511        // Should contain MS for Subscription
512        assert!(
513            ke_str.contains("/MS/"),
514            "Should contain '/MS/' for Subscription, got: {}",
515            ke_str
516        );
517    }
518
519    #[test]
520    fn test_parse_liveliness_restores_absolute_topic_name() {
521        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
522        let node = NodeEntity::new(
523            0,
524            zid,
525            1,
526            "test_node".to_string(),
527            "/".to_string(),
528            String::new(),
529        );
530
531        let entity = EndpointEntity {
532            id: 1,
533            node: Some(node),
534            kind: EndpointKind::Publisher,
535            topic: "/chatter".to_string(),
536            type_info: Some(TypeInfo::new("std_msgs/msg/String", TypeHash::zero())),
537            qos: QosProfile::default(),
538        };
539
540        let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
541        let parsed = Ros2DdsFormatter::parse_liveliness(&liveliness_ke).unwrap();
542
543        match parsed {
544            Entity::Endpoint(endpoint) => {
545                assert_eq!(endpoint.topic, "/chatter");
546                assert!(endpoint.node.is_none());
547            }
548            other => panic!("expected endpoint entity, got {:?}", other),
549        }
550    }
551
552    /// Test service server liveliness key expression
553    #[test]
554    fn test_service_liveliness_key_expr() {
555        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
556        let node = NodeEntity::new(
557            0,
558            zid,
559            1,
560            "test_node".to_string(),
561            "/".to_string(),
562            String::new(),
563        );
564
565        let entity = EndpointEntity {
566            id: 1,
567            node: Some(node),
568            kind: EndpointKind::Service,
569            topic: "add_two_ints".to_string(),
570            type_info: Some(TypeInfo::new(
571                "example_interfaces/srv/AddTwoInts",
572                TypeHash::zero(),
573            )),
574            qos: QosProfile::default(),
575        };
576
577        let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
578        let ke_str = liveliness_ke.as_str();
579
580        // Should contain SS for Service
581        assert!(
582            ke_str.contains("/SS/"),
583            "Should contain '/SS/' for Service, got: {}",
584            ke_str
585        );
586
587        // Service liveliness should NOT have QoS suffix (per ros2dds format)
588        // Check that it doesn't end with QoS pattern
589        let parts: Vec<&str> = ke_str.split('/').collect();
590        // Last part should be the escaped type, not a QoS string
591        let last_part = parts.last().unwrap();
592        assert!(
593            !last_part.contains(':'),
594            "Service liveliness should not have QoS suffix, got: {}",
595            ke_str
596        );
597    }
598
599    /// Test client liveliness key expression
600    #[test]
601    fn test_client_liveliness_key_expr() {
602        let zid: zenoh::session::ZenohId = "1234567890abcdef1234567890abcdef".parse().unwrap();
603        let node = NodeEntity::new(
604            0,
605            zid,
606            1,
607            "test_node".to_string(),
608            "/".to_string(),
609            String::new(),
610        );
611
612        let entity = EndpointEntity {
613            id: 1,
614            node: Some(node),
615            kind: EndpointKind::Client,
616            topic: "add_two_ints".to_string(),
617            type_info: Some(TypeInfo::new(
618                "example_interfaces/srv/AddTwoInts",
619                TypeHash::zero(),
620            )),
621            qos: QosProfile::default(),
622        };
623
624        let liveliness_ke = Ros2DdsFormatter::liveliness_key_expr(&entity, &zid).unwrap();
625        let ke_str = liveliness_ke.as_str();
626
627        // Should contain SC for Client
628        assert!(
629            ke_str.contains("/SC/"),
630            "Should contain '/SC/' for Client, got: {}",
631            ke_str
632        );
633    }
634}