1use cu29::cubridge::{
2 BridgeChannel, BridgeChannelConfig, BridgeChannelInfo, BridgeChannelSet, CuBridge,
3};
4use cu29::prelude::*;
5use serde::{Deserialize, Serialize};
6use zenoh::bytes::Encoding;
7use zenoh::key_expr::KeyExpr;
8use zenoh::{Config, Error as ZenohError};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum WireFormat {
13 Bincode,
14 Json,
15 Cbor,
16}
17
18impl WireFormat {
19 fn parse(value: &str) -> Option<Self> {
20 match value.trim().to_ascii_lowercase().as_str() {
21 "bincode" | "bin" | "binary" => Some(Self::Bincode),
22 "json" => Some(Self::Json),
23 "cbor" => Some(Self::Cbor),
24 _ => None,
25 }
26 }
27
28 fn encoding(self) -> Encoding {
29 match self {
30 Self::Bincode => Encoding::APPLICATION_OCTET_STREAM,
31 Self::Json => Encoding::APPLICATION_JSON,
32 Self::Cbor => Encoding::APPLICATION_CBOR,
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
38struct ZenohChannelConfig<Id: Copy> {
39 id: Id,
40 route: String,
41 wire_format: WireFormat,
42}
43
44type ZenohSubscriber =
45 zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>;
46
47struct ZenohTxChannel<Id: Copy> {
48 id: Id,
49 publisher: zenoh::pubsub::Publisher<'static>,
50 wire_format: WireFormat,
51}
52
53struct ZenohRxChannel<Id: Copy> {
54 id: Id,
55 subscriber: ZenohSubscriber,
56 wire_format: WireFormat,
57}
58
59struct ZenohContext<TxId: Copy, RxId: Copy> {
60 session: zenoh::Session,
61 tx_channels: Vec<ZenohTxChannel<TxId>>,
62 rx_channels: Vec<ZenohRxChannel<RxId>>,
63}
64
65#[derive(Reflect)]
66#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
67pub struct ZenohBridge<Tx, Rx>
68where
69 Tx: BridgeChannelSet + 'static,
70 Rx: BridgeChannelSet + 'static,
71 Tx::Id: Send + Sync + 'static,
72 Rx::Id: Send + Sync + 'static,
73{
74 #[reflect(ignore)]
75 session_config: Config,
76 #[reflect(ignore)]
77 tx_channels: Vec<ZenohChannelConfig<Tx::Id>>,
78 #[reflect(ignore)]
79 rx_channels: Vec<ZenohChannelConfig<Rx::Id>>,
80 #[reflect(ignore)]
81 ctx: Option<ZenohContext<Tx::Id, Rx::Id>>,
82}
83
84impl<Tx, Rx> Freezable for ZenohBridge<Tx, Rx>
85where
86 Tx: BridgeChannelSet + 'static,
87 Rx: BridgeChannelSet + 'static,
88 Tx::Id: Send + Sync + 'static,
89 Rx::Id: Send + Sync + 'static,
90{
91}
92
93impl<Tx, Rx> cu29::reflect::TypePath for ZenohBridge<Tx, Rx>
94where
95 Tx: BridgeChannelSet + 'static,
96 Rx: BridgeChannelSet + 'static,
97 Tx::Id: Send + Sync + 'static,
98 Rx::Id: Send + Sync + 'static,
99{
100 fn type_path() -> &'static str {
101 "cu_zenoh_bridge::ZenohBridge"
102 }
103
104 fn short_type_path() -> &'static str {
105 "ZenohBridge"
106 }
107
108 fn type_ident() -> Option<&'static str> {
109 Some("ZenohBridge")
110 }
111
112 fn crate_name() -> Option<&'static str> {
113 Some("cu_zenoh_bridge")
114 }
115
116 fn module_path() -> Option<&'static str> {
117 Some("cu_zenoh_bridge")
118 }
119}
120
121impl<Tx, Rx> ZenohBridge<Tx, Rx>
122where
123 Tx: BridgeChannelSet + 'static,
124 Rx: BridgeChannelSet + 'static,
125 Tx::Id: Send + Sync + 'static,
126 Rx::Id: Send + Sync + 'static,
127{
128 fn parse_session_config(config: Option<&ComponentConfig>) -> CuResult<Config> {
129 if let Some(config) = config {
130 if let Some(path) = config.get::<String>("zenoh_config_file")? {
131 return Config::from_file(&path).map_err(|e| {
132 CuError::from(format!("ZenohBridge: Failed to read config file: {e}"))
133 });
134 }
135 if let Some(json) = config.get::<String>("zenoh_config_json")? {
136 return Config::from_json5(&json).map_err(|e| {
137 CuError::from(format!("ZenohBridge: Failed to parse config json: {e}"))
138 });
139 }
140 }
141 Ok(Config::default())
142 }
143
144 fn parse_default_wire_format(config: Option<&ComponentConfig>) -> CuResult<WireFormat> {
145 if let Some(config) = config
146 && let Some(raw) = config.get::<String>("wire_format")?
147 {
148 return WireFormat::parse(&raw).ok_or_else(|| {
149 CuError::from(format!(
150 "ZenohBridge: Unsupported wire_format '{raw}', expected bincode/json/cbor"
151 ))
152 });
153 }
154 Ok(WireFormat::Bincode)
155 }
156
157 fn channel_route<Id: Copy + core::fmt::Debug>(
158 channel: &BridgeChannelConfig<Id>,
159 ) -> CuResult<String> {
160 channel
161 .effective_route()
162 .map(|route| route.into_owned())
163 .ok_or_else(|| {
164 let id = channel.channel.id;
165 CuError::from(format!("ZenohBridge: Missing route for channel {:?}", id))
166 })
167 }
168
169 fn channel_wire_format<Id: Copy>(
170 channel: &BridgeChannelConfig<Id>,
171 default: WireFormat,
172 ) -> CuResult<WireFormat> {
173 if let Some(config) = channel.config.as_ref()
174 && let Some(raw) = config.get::<String>("wire_format")?
175 {
176 return WireFormat::parse(&raw).ok_or_else(|| {
177 CuError::from(format!(
178 "ZenohBridge: Unsupported wire_format '{raw}', expected bincode/json/cbor"
179 ))
180 });
181 }
182 Ok(default)
183 }
184
185 fn encode_message<Payload: CuMsgPayload>(
186 wire_format: WireFormat,
187 msg: &CuMsg<Payload>,
188 ) -> CuResult<Vec<u8>> {
189 match wire_format {
190 WireFormat::Bincode => bincode::encode_to_vec(msg, bincode::config::standard())
191 .map_err(|e| CuError::new_with_cause("ZenohBridge: bincode encode failed", e)),
192 WireFormat::Json => serde_json::to_vec(msg)
193 .map_err(|e| CuError::new_with_cause("ZenohBridge: json encode failed", e)),
194 WireFormat::Cbor => minicbor_serde::to_vec(msg)
195 .map_err(|e| CuError::new_with_cause("ZenohBridge: cbor encode failed", e)),
196 }
197 }
198
199 fn decode_message<Payload: CuMsgPayload>(
200 wire_format: WireFormat,
201 bytes: &[u8],
202 ) -> CuResult<CuMsg<Payload>> {
203 match wire_format {
204 WireFormat::Bincode => {
205 let (decoded, _): (CuMsg<Payload>, usize) =
206 bincode::decode_from_slice(bytes, bincode::config::standard()).map_err(
207 |e| CuError::new_with_cause("ZenohBridge: bincode decode failed", e),
208 )?;
209 Ok(decoded)
210 }
211 WireFormat::Json => serde_json::from_slice(bytes)
212 .map_err(|e| CuError::new_with_cause("ZenohBridge: json decode failed", e)),
213 WireFormat::Cbor => minicbor_serde::from_slice(bytes)
214 .map_err(|e| CuError::new_with_cause("ZenohBridge: cbor decode failed", e)),
215 }
216 }
217
218 fn find_tx_channel_mut(
219 channels: &mut [ZenohTxChannel<Tx::Id>],
220 id: Tx::Id,
221 ) -> Option<&mut ZenohTxChannel<Tx::Id>> {
222 channels.iter_mut().find(|channel| channel.id == id)
223 }
224
225 fn find_rx_channel_mut(
226 channels: &mut [ZenohRxChannel<Rx::Id>],
227 id: Rx::Id,
228 ) -> Option<&mut ZenohRxChannel<Rx::Id>> {
229 channels.iter_mut().find(|channel| channel.id == id)
230 }
231}
232
233impl<Tx, Rx> CuBridge for ZenohBridge<Tx, Rx>
234where
235 Tx: BridgeChannelSet + 'static,
236 Rx: BridgeChannelSet + 'static,
237 Tx::Id: core::fmt::Debug + Send + Sync + 'static,
238 Rx::Id: core::fmt::Debug + Send + Sync + 'static,
239{
240 type Tx = Tx;
241 type Rx = Rx;
242 type Resources<'r> = ();
243
244 fn new(
245 config: Option<&ComponentConfig>,
246 tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
247 rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
248 _resources: Self::Resources<'_>,
249 ) -> CuResult<Self>
250 where
251 Self: Sized,
252 {
253 let session_config = Self::parse_session_config(config)?;
254 let default_wire_format = Self::parse_default_wire_format(config)?;
255
256 let mut tx_cfgs = Vec::with_capacity(tx_channels.len());
257 for channel in tx_channels {
258 let route = Self::channel_route(channel)?;
259 let wire_format = Self::channel_wire_format(channel, default_wire_format)?;
260 tx_cfgs.push(ZenohChannelConfig {
261 id: channel.channel.id,
262 route,
263 wire_format,
264 });
265 }
266
267 let mut rx_cfgs = Vec::with_capacity(rx_channels.len());
268 for channel in rx_channels {
269 let route = Self::channel_route(channel)?;
270 let wire_format = Self::channel_wire_format(channel, default_wire_format)?;
271 rx_cfgs.push(ZenohChannelConfig {
272 id: channel.channel.id,
273 route,
274 wire_format,
275 });
276 }
277
278 Ok(Self {
279 session_config,
280 tx_channels: tx_cfgs,
281 rx_channels: rx_cfgs,
282 ctx: None,
283 })
284 }
285
286 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
287 let session = zenoh::Wait::wait(zenoh::open(self.session_config.clone()))
288 .map_err(cu_error_map("ZenohBridge: Failed to open session"))?;
289
290 let mut tx_channels = Vec::with_capacity(self.tx_channels.len());
291 for channel in &self.tx_channels {
292 let key_expr = KeyExpr::<'static>::new(channel.route.clone())
293 .map_err(cu_error_map("ZenohBridge: Invalid Tx key expression"))?;
294 let publisher = zenoh::Wait::wait(session.declare_publisher(key_expr))
295 .map_err(cu_error_map("ZenohBridge: Failed to declare publisher"))?;
296 tx_channels.push(ZenohTxChannel {
297 id: channel.id,
298 publisher,
299 wire_format: channel.wire_format,
300 });
301 }
302
303 let mut rx_channels = Vec::with_capacity(self.rx_channels.len());
304 for channel in &self.rx_channels {
305 let key_expr = KeyExpr::<'static>::new(channel.route.clone())
306 .map_err(cu_error_map("ZenohBridge: Invalid Rx key expression"))?;
307 let subscriber = zenoh::Wait::wait(session.declare_subscriber(key_expr))
308 .map_err(cu_error_map("ZenohBridge: Failed to declare subscriber"))?;
309 rx_channels.push(ZenohRxChannel {
310 id: channel.id,
311 subscriber,
312 wire_format: channel.wire_format,
313 });
314 }
315
316 self.ctx = Some(ZenohContext {
317 session,
318 tx_channels,
319 rx_channels,
320 });
321 Ok(())
322 }
323
324 fn send<'a, Payload>(
325 &mut self,
326 _clock: &RobotClock,
327 channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
328 msg: &CuMsg<Payload>,
329 ) -> CuResult<()>
330 where
331 Payload: CuMsgPayload + 'a,
332 {
333 let ctx = self
334 .ctx
335 .as_mut()
336 .ok_or_else(|| CuError::from("ZenohBridge: Context not initialized"))?;
337 let tx_channel =
338 Self::find_tx_channel_mut(&mut ctx.tx_channels, channel.id()).ok_or_else(|| {
339 CuError::from(format!(
340 "ZenohBridge: Unknown Tx channel {:?}",
341 channel.id()
342 ))
343 })?;
344
345 let encoded = Self::encode_message(tx_channel.wire_format, msg)?;
346 zenoh::Wait::wait(
347 tx_channel
348 .publisher
349 .put(encoded)
350 .encoding(tx_channel.wire_format.encoding()),
351 )
352 .map_err(cu_error_map("ZenohBridge: Failed to publish"))?;
353 Ok(())
354 }
355
356 fn receive<'a, Payload>(
357 &mut self,
358 clock: &RobotClock,
359 channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
360 msg: &mut CuMsg<Payload>,
361 ) -> CuResult<()>
362 where
363 Payload: CuMsgPayload + 'a,
364 {
365 let ctx = self
366 .ctx
367 .as_mut()
368 .ok_or_else(|| CuError::from("ZenohBridge: Context not initialized"))?;
369 let rx_channel =
370 Self::find_rx_channel_mut(&mut ctx.rx_channels, channel.id()).ok_or_else(|| {
371 CuError::from(format!(
372 "ZenohBridge: Unknown Rx channel {:?}",
373 channel.id()
374 ))
375 })?;
376
377 msg.tov = Tov::Time(clock.now());
378
379 let sample = rx_channel
380 .subscriber
381 .try_recv()
382 .map_err(|e| CuError::from(format!("ZenohBridge: receive failed: {e}")))?;
383 if let Some(sample) = sample {
384 let payload = sample.payload().to_bytes();
385 let decoded = Self::decode_message(rx_channel.wire_format, payload.as_ref())?;
386 *msg = decoded;
387 } else {
388 msg.clear_payload();
389 }
390 Ok(())
391 }
392
393 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
394 if let Some(ZenohContext {
395 session,
396 tx_channels,
397 rx_channels,
398 }) = self.ctx.take()
399 {
400 for channel in tx_channels {
401 zenoh::Wait::wait(channel.publisher.undeclare())
402 .map_err(cu_error_map("ZenohBridge: Failed to undeclare publisher"))?;
403 }
404 for channel in rx_channels {
405 zenoh::Wait::wait(channel.subscriber.undeclare())
406 .map_err(cu_error_map("ZenohBridge: Failed to undeclare subscriber"))?;
407 }
408 zenoh::Wait::wait(session.close())
409 .map_err(cu_error_map("ZenohBridge: Failed to close session"))?;
410 }
411 Ok(())
412 }
413}
414
415fn cu_error(msg: &str, error: ZenohError) -> CuError {
416 CuError::from(format!("{msg}: {error}"))
417}
418
419fn cu_error_map(msg: &str) -> impl FnOnce(ZenohError) -> CuError + '_ {
420 move |e| cu_error(msg, e)
421}