cu_zenoh_ros_sink/
lib.rs

1mod attachment;
2mod error;
3mod keyexpr;
4mod liveliness;
5mod node;
6mod topic;
7
8use attachment::encode_attachment;
9use cdr::{CdrBe, Infinite};
10use cu29::clock::RobotClock;
11use cu29::prelude::*;
12use cu_ros_payloads::RosMsgAdapter;
13use error::cu_error_map;
14use liveliness::{node_liveliness, publisher_liveliness};
15use node::Node;
16use topic::Topic;
17use zenoh::bytes::Encoding;
18use zenoh::Config;
19
20use std::marker::PhantomData;
21
22// Only one node per session
23const NODE_ID: u32 = 0;
24
25// Only one publisher per session
26const PUBLISHER_ID: u32 = NODE_ID + 1;
27
28/// This is a sink task that sends ROS-compatible messages to a Zenoh topic.
29/// P is the payload type of the messages, which must be convertible to ROS format.
30/// Hence, the payload type must implement the `RosMsgAdapter` trait.
31pub struct ZenohRosSink<P>
32where
33    P: CuMsgPayload + RosMsgAdapter<'static>,
34{
35    _marker: PhantomData<P>,
36    config: ZenohRosConfig,
37    ctx: Option<ZenohRosContext>,
38}
39
40pub struct ZenohRosConfig {
41    session: Config,
42    domain_id: u32,
43    namespace: String,
44    node: String,
45    topic: String,
46}
47
48pub struct ZenohRosContext {
49    session: zenoh::Session,
50    publisher: zenoh::pubsub::Publisher<'static>,
51    // Token have to be kept alive (undeclared on drop)
52    #[allow(dead_code)]
53    node_token: zenoh::liveliness::LivelinessToken,
54    #[allow(dead_code)]
55    publisher_token: zenoh::liveliness::LivelinessToken,
56    sequence_number: u64,
57}
58
59impl<P> Freezable for ZenohRosSink<P> where P: CuMsgPayload + RosMsgAdapter<'static> {}
60
61impl<P> CuSinkTask for ZenohRosSink<P>
62where
63    P: CuMsgPayload + RosMsgAdapter<'static>,
64{
65    type Input<'m> = input_msg!(P);
66
67    fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
68    where
69        Self: Sized,
70    {
71        let config = config.ok_or(CuError::from("ZenohRosSink: Missing configuration"))?;
72
73        // Get json zenoh config
74        let session_config = config.get::<String>("zenoh_config_json").map_or(
75            // Or default zenoh config otherwise
76            Ok(Config::default()),
77            |s| -> CuResult<Config> {
78                Config::from_json5(&s)
79                    .map_err(cu_error_map("ZenohRosSink: Failed to create zenoh config"))
80            },
81        )?;
82
83        Ok(Self {
84            _marker: Default::default(),
85            config: ZenohRosConfig {
86                session: session_config,
87                domain_id: config.get::<u32>("domain_id").unwrap_or(0),
88                namespace: config.get::<String>("namespace").unwrap_or("node".into()),
89                node: config.get::<String>("node").unwrap_or("node".into()),
90                topic: config.get::<String>("topic").unwrap_or("copper".into()),
91            },
92            ctx: None,
93        })
94    }
95
96    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
97        let session = zenoh::Wait::wait(zenoh::open(self.config.session.clone()))
98            .map_err(cu_error_map("ZenohRosSink: Failed to open session"))?;
99
100        debug!("Zenoh session open");
101
102        let zid = session.zid();
103
104        let node = Node {
105            domain_id: self.config.domain_id,
106            zid,
107            id: NODE_ID,
108            namespace: self.config.namespace.as_ref(),
109            name: self.config.node.as_ref(),
110        };
111
112        let topic = Topic::new::<P>(self.config.topic.as_ref());
113
114        let node_token =
115            zenoh::Wait::wait(session.liveliness().declare_token(node_liveliness(&node)?))
116                .map_err(cu_error_map(
117                    "ZenohRosSink: Failed to declare node liveliness token",
118                ))?;
119
120        let publisher_token = zenoh::Wait::wait(
121            session
122                .liveliness()
123                .declare_token(publisher_liveliness(&node, &topic, PUBLISHER_ID)?),
124        )
125        .map_err(cu_error_map(
126            "ZenohRosSink: Failed to declare topic liveliness token",
127        ))?;
128
129        // Format the key expression according to ROS 2 conventions
130        let publisher = zenoh::Wait::wait(session.declare_publisher(topic.pubsub_keyexpr(&node)?))
131            .map_err(cu_error_map("ZenohRosSink: Failed to create publisher"))?;
132
133        self.ctx = Some(ZenohRosContext {
134            session,
135            publisher,
136            node_token,
137            publisher_token,
138            sequence_number: 0,
139        });
140        Ok(())
141    }
142
143    fn process(&mut self, clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
144        let ctx = self
145            .ctx
146            .as_mut()
147            .ok_or(CuError::from("ZenohRosSink: Context not found"))?;
148
149        // Convert the payload to ROS-compatible CDR format
150        let payload = input.payload().ok_or(CuError::from(
151            "ZenohRosSink: Cannot send empty payload through ROS bridge",
152        ))?;
153
154        let ros_payload = payload.convert();
155        let serial_ros_payload = cdr::serialize::<_, _, CdrBe>(&ros_payload, Infinite)
156            .map_err(|e| CuError::new_with_cause("ZenohRosSink: Failed to serialize payload", e))?;
157
158        let serial_metadata = encode_attachment(ctx.sequence_number, clock, &ctx.session.zid());
159
160        ctx.sequence_number += 1;
161
162        zenoh::Wait::wait(
163            ctx.publisher
164                .put(serial_ros_payload)
165                .encoding(Encoding::APPLICATION_CDR)
166                .attachment(serial_metadata),
167        )
168        .map_err(cu_error_map("ZenohRosSink: Failed to put value"))?;
169
170        Ok(())
171    }
172
173    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
174        if let Some(ZenohRosContext {
175            session,
176            publisher,
177            node_token: _,
178            publisher_token: _,
179            sequence_number: _,
180        }) = self.ctx.take()
181        {
182            zenoh::Wait::wait(publisher.undeclare())
183                .map_err(cu_error_map("ZenohRosSink: Failed to undeclare publisher"))?;
184            zenoh::Wait::wait(session.close())
185                .map_err(cu_error_map("ZenohRosSink: Failed to close session"))?;
186        }
187        debug!("ZenohRosSink: Stopped");
188        Ok(())
189    }
190}