Skip to main content

cu_zenoh_ros_sink/
lib.rs

1mod attachment;
2mod error;
3mod keyexpr;
4mod liveliness;
5mod node;
6mod topic;
7
8use attachment::encode_attachment;
9use cdr::{CdrBe, Infinite};
10use cu_ros_payloads::RosMsgAdapter;
11use cu29::clock::RobotClock;
12use cu29::prelude::*;
13use error::cu_error_map;
14use liveliness::{node_liveliness, publisher_liveliness};
15use node::Node;
16use topic::Topic;
17use zenoh::Config;
18use zenoh::bytes::Encoding;
19
20use std::marker::PhantomData;
21
22// Only one node per session
23const NODE_ID: u32 = 0;
24
25// Only one publisher per session
26const PUBLISHER_ID: u32 = NODE_ID + 1;
27
28/// This is a sink task that sends ROS-compatible messages to a Zenoh topic.
29/// P is the payload type of the messages, which must be convertible to ROS format.
30/// Hence, the payload type must implement the `RosMsgAdapter` trait.
31#[derive(Reflect)]
32#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
33pub struct ZenohRosSink<P>
34where
35    P: CuMsgPayload + RosMsgAdapter<'static>,
36{
37    #[reflect(ignore)]
38    _marker: PhantomData<fn() -> P>,
39    #[reflect(ignore)]
40    config: ZenohRosConfig,
41    #[reflect(ignore)]
42    ctx: Option<ZenohRosContext>,
43}
44
45pub struct ZenohRosConfig {
46    session: Config,
47    domain_id: u32,
48    namespace: String,
49    node: String,
50    topic: String,
51}
52
53pub struct ZenohRosContext {
54    session: zenoh::Session,
55    publisher: zenoh::pubsub::Publisher<'static>,
56    // Token have to be kept alive (undeclared on drop)
57    #[allow(dead_code)]
58    node_token: zenoh::liveliness::LivelinessToken,
59    #[allow(dead_code)]
60    publisher_token: zenoh::liveliness::LivelinessToken,
61    sequence_number: u64,
62}
63
64impl<P> Freezable for ZenohRosSink<P> where P: CuMsgPayload + RosMsgAdapter<'static> {}
65
66impl<P> cu29::reflect::TypePath for ZenohRosSink<P>
67where
68    P: CuMsgPayload + RosMsgAdapter<'static> + 'static,
69{
70    fn type_path() -> &'static str {
71        "cu_zenoh_ros_sink::ZenohRosSink"
72    }
73
74    fn short_type_path() -> &'static str {
75        "ZenohRosSink"
76    }
77
78    fn type_ident() -> Option<&'static str> {
79        Some("ZenohRosSink")
80    }
81
82    fn crate_name() -> Option<&'static str> {
83        Some("cu_zenoh_ros_sink")
84    }
85
86    fn module_path() -> Option<&'static str> {
87        Some("cu_zenoh_ros_sink")
88    }
89}
90
91impl<P> CuSinkTask for ZenohRosSink<P>
92where
93    P: CuMsgPayload + RosMsgAdapter<'static> + 'static,
94{
95    type Resources<'r> = ();
96    type Input<'m> = input_msg!(P);
97
98    fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
99    where
100        Self: Sized,
101    {
102        let config = config.ok_or(CuError::from("ZenohRosSink: Missing configuration"))?;
103
104        // Get json zenoh config
105        let session_config = match config.get::<String>("zenoh_config_json")? {
106            Some(json) => Config::from_json5(&json)
107                .map_err(cu_error_map("ZenohRosSink: Failed to create zenoh config"))?,
108            None => Config::default(),
109        };
110
111        Ok(Self {
112            _marker: PhantomData,
113            config: ZenohRosConfig {
114                session: session_config,
115                domain_id: config.get::<u32>("domain_id")?.unwrap_or(0),
116                namespace: config.get::<String>("namespace")?.unwrap_or("node".into()),
117                node: config.get::<String>("node")?.unwrap_or("node".into()),
118                topic: config.get::<String>("topic")?.unwrap_or("copper".into()),
119            },
120            ctx: None,
121        })
122    }
123
124    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
125        let session = zenoh::Wait::wait(zenoh::open(self.config.session.clone()))
126            .map_err(cu_error_map("ZenohRosSink: Failed to open session"))?;
127
128        debug!("Zenoh session open");
129
130        let zid = session.zid();
131
132        let node = Node {
133            domain_id: self.config.domain_id,
134            zid,
135            id: NODE_ID,
136            namespace: self.config.namespace.as_ref(),
137            name: self.config.node.as_ref(),
138        };
139
140        let topic = Topic::new::<P>(self.config.topic.as_ref());
141
142        let node_token =
143            zenoh::Wait::wait(session.liveliness().declare_token(node_liveliness(&node)?))
144                .map_err(cu_error_map(
145                    "ZenohRosSink: Failed to declare node liveliness token",
146                ))?;
147
148        let publisher_token = zenoh::Wait::wait(
149            session
150                .liveliness()
151                .declare_token(publisher_liveliness(&node, &topic, PUBLISHER_ID)?),
152        )
153        .map_err(cu_error_map(
154            "ZenohRosSink: Failed to declare topic liveliness token",
155        ))?;
156
157        // Format the key expression according to ROS 2 conventions
158        let publisher = zenoh::Wait::wait(session.declare_publisher(topic.pubsub_keyexpr(&node)?))
159            .map_err(cu_error_map("ZenohRosSink: Failed to create publisher"))?;
160
161        self.ctx = Some(ZenohRosContext {
162            session,
163            publisher,
164            node_token,
165            publisher_token,
166            sequence_number: 0,
167        });
168        Ok(())
169    }
170
171    fn process(&mut self, clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
172        let ctx = self
173            .ctx
174            .as_mut()
175            .ok_or(CuError::from("ZenohRosSink: Context not found"))?;
176
177        // Convert the payload to ROS-compatible CDR format
178        let payload = input.payload().ok_or(CuError::from(
179            "ZenohRosSink: Cannot send empty payload through ROS bridge",
180        ))?;
181
182        let ros_payload = payload.convert();
183        let serial_ros_payload = cdr::serialize::<_, _, CdrBe>(&ros_payload, Infinite)
184            .map_err(|e| CuError::new_with_cause("ZenohRosSink: Failed to serialize payload", e))?;
185
186        let serial_metadata = encode_attachment(ctx.sequence_number, clock, &ctx.session.zid());
187
188        ctx.sequence_number += 1;
189
190        zenoh::Wait::wait(
191            ctx.publisher
192                .put(serial_ros_payload)
193                .encoding(Encoding::APPLICATION_CDR)
194                .attachment(serial_metadata),
195        )
196        .map_err(cu_error_map("ZenohRosSink: Failed to put value"))?;
197
198        Ok(())
199    }
200
201    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
202        if let Some(ZenohRosContext {
203            session,
204            publisher,
205            node_token: _,
206            publisher_token: _,
207            sequence_number: _,
208        }) = self.ctx.take()
209        {
210            zenoh::Wait::wait(publisher.undeclare())
211                .map_err(cu_error_map("ZenohRosSink: Failed to undeclare publisher"))?;
212            zenoh::Wait::wait(session.close())
213                .map_err(cu_error_map("ZenohRosSink: Failed to close session"))?;
214        }
215        debug!("ZenohRosSink: Stopped");
216        Ok(())
217    }
218}