1mod attachment;
2mod error;
3mod keyexpr;
4mod liveliness;
5mod node;
6mod topic;
7
8use attachment::encode_attachment;
9use cdr::{CdrBe, Infinite};
10use cu29::clock::RobotClock;
11use cu29::prelude::*;
12use cu_ros_payloads::RosMsgAdapter;
13use error::cu_error_map;
14use liveliness::{node_liveliness, publisher_liveliness};
15use node::Node;
16use topic::Topic;
17use zenoh::bytes::Encoding;
18use zenoh::Config;
19
20use std::marker::PhantomData;
21
22const NODE_ID: u32 = 0;
24
25const PUBLISHER_ID: u32 = NODE_ID + 1;
27
28pub struct ZenohRosSink<P>
32where
33 P: CuMsgPayload + RosMsgAdapter<'static>,
34{
35 _marker: PhantomData<P>,
36 config: ZenohRosConfig,
37 ctx: Option<ZenohRosContext>,
38}
39
40pub struct ZenohRosConfig {
41 session: Config,
42 domain_id: u32,
43 namespace: String,
44 node: String,
45 topic: String,
46}
47
48pub struct ZenohRosContext {
49 session: zenoh::Session,
50 publisher: zenoh::pubsub::Publisher<'static>,
51 #[allow(dead_code)]
53 node_token: zenoh::liveliness::LivelinessToken,
54 #[allow(dead_code)]
55 publisher_token: zenoh::liveliness::LivelinessToken,
56 sequence_number: u64,
57}
58
59impl<P> Freezable for ZenohRosSink<P> where P: CuMsgPayload + RosMsgAdapter<'static> {}
60
61impl<P> CuSinkTask for ZenohRosSink<P>
62where
63 P: CuMsgPayload + RosMsgAdapter<'static>,
64{
65 type Input<'m> = input_msg!(P);
66
67 fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
68 where
69 Self: Sized,
70 {
71 let config = config.ok_or(CuError::from("ZenohRosSink: Missing configuration"))?;
72
73 let session_config = config.get::<String>("zenoh_config_json").map_or(
75 Ok(Config::default()),
77 |s| -> CuResult<Config> {
78 Config::from_json5(&s)
79 .map_err(cu_error_map("ZenohRosSink: Failed to create zenoh config"))
80 },
81 )?;
82
83 Ok(Self {
84 _marker: Default::default(),
85 config: ZenohRosConfig {
86 session: session_config,
87 domain_id: config.get::<u32>("domain_id").unwrap_or(0),
88 namespace: config.get::<String>("namespace").unwrap_or("node".into()),
89 node: config.get::<String>("node").unwrap_or("node".into()),
90 topic: config.get::<String>("topic").unwrap_or("copper".into()),
91 },
92 ctx: None,
93 })
94 }
95
96 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
97 let session = zenoh::Wait::wait(zenoh::open(self.config.session.clone()))
98 .map_err(cu_error_map("ZenohRosSink: Failed to open session"))?;
99
100 debug!("Zenoh session open");
101
102 let zid = session.zid();
103
104 let node = Node {
105 domain_id: self.config.domain_id,
106 zid,
107 id: NODE_ID,
108 namespace: self.config.namespace.as_ref(),
109 name: self.config.node.as_ref(),
110 };
111
112 let topic = Topic::new::<P>(self.config.topic.as_ref());
113
114 let node_token =
115 zenoh::Wait::wait(session.liveliness().declare_token(node_liveliness(&node)?))
116 .map_err(cu_error_map(
117 "ZenohRosSink: Failed to declare node liveliness token",
118 ))?;
119
120 let publisher_token = zenoh::Wait::wait(
121 session
122 .liveliness()
123 .declare_token(publisher_liveliness(&node, &topic, PUBLISHER_ID)?),
124 )
125 .map_err(cu_error_map(
126 "ZenohRosSink: Failed to declare topic liveliness token",
127 ))?;
128
129 let publisher = zenoh::Wait::wait(session.declare_publisher(topic.pubsub_keyexpr(&node)?))
131 .map_err(cu_error_map("ZenohRosSink: Failed to create publisher"))?;
132
133 self.ctx = Some(ZenohRosContext {
134 session,
135 publisher,
136 node_token,
137 publisher_token,
138 sequence_number: 0,
139 });
140 Ok(())
141 }
142
143 fn process(&mut self, clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
144 let ctx = self
145 .ctx
146 .as_mut()
147 .ok_or(CuError::from("ZenohRosSink: Context not found"))?;
148
149 let payload = input.payload().ok_or(CuError::from(
151 "ZenohRosSink: Cannot send empty payload through ROS bridge",
152 ))?;
153
154 let ros_payload = payload.convert();
155 let serial_ros_payload = cdr::serialize::<_, _, CdrBe>(&ros_payload, Infinite)
156 .map_err(|e| CuError::new_with_cause("ZenohRosSink: Failed to serialize payload", e))?;
157
158 let serial_metadata = encode_attachment(ctx.sequence_number, clock, &ctx.session.zid());
159
160 ctx.sequence_number += 1;
161
162 zenoh::Wait::wait(
163 ctx.publisher
164 .put(serial_ros_payload)
165 .encoding(Encoding::APPLICATION_CDR)
166 .attachment(serial_metadata),
167 )
168 .map_err(cu_error_map("ZenohRosSink: Failed to put value"))?;
169
170 Ok(())
171 }
172
173 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
174 if let Some(ZenohRosContext {
175 session,
176 publisher,
177 node_token: _,
178 publisher_token: _,
179 sequence_number: _,
180 }) = self.ctx.take()
181 {
182 zenoh::Wait::wait(publisher.undeclare())
183 .map_err(cu_error_map("ZenohRosSink: Failed to undeclare publisher"))?;
184 zenoh::Wait::wait(session.close())
185 .map_err(cu_error_map("ZenohRosSink: Failed to close session"))?;
186 }
187 debug!("ZenohRosSink: Stopped");
188 Ok(())
189 }
190}