Skip to main content

cu_zenoh_bridge/
lib.rs

1use cu29::cubridge::{
2    BridgeChannel, BridgeChannelConfig, BridgeChannelInfo, BridgeChannelSet, CuBridge,
3};
4use cu29::prelude::*;
5use serde::{Deserialize, Serialize};
6use zenoh::bytes::Encoding;
7use zenoh::key_expr::KeyExpr;
8use zenoh::{Config, Error as ZenohError};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum WireFormat {
13    Bincode,
14    Json,
15    Cbor,
16}
17
18impl WireFormat {
19    fn parse(value: &str) -> Option<Self> {
20        match value.trim().to_ascii_lowercase().as_str() {
21            "bincode" | "bin" | "binary" => Some(Self::Bincode),
22            "json" => Some(Self::Json),
23            "cbor" => Some(Self::Cbor),
24            _ => None,
25        }
26    }
27
28    fn encoding(self) -> Encoding {
29        match self {
30            Self::Bincode => Encoding::APPLICATION_OCTET_STREAM,
31            Self::Json => Encoding::APPLICATION_JSON,
32            Self::Cbor => Encoding::APPLICATION_CBOR,
33        }
34    }
35}
36
37#[derive(Debug, Clone)]
38struct ZenohChannelConfig<Id: Copy> {
39    id: Id,
40    route: String,
41    wire_format: WireFormat,
42}
43
44type ZenohSubscriber =
45    zenoh::pubsub::Subscriber<zenoh::handlers::FifoChannelHandler<zenoh::sample::Sample>>;
46
47struct ZenohTxChannel<Id: Copy> {
48    id: Id,
49    publisher: zenoh::pubsub::Publisher<'static>,
50    wire_format: WireFormat,
51}
52
53struct ZenohRxChannel<Id: Copy> {
54    id: Id,
55    subscriber: ZenohSubscriber,
56    wire_format: WireFormat,
57}
58
59struct ZenohContext<TxId: Copy, RxId: Copy> {
60    session: zenoh::Session,
61    tx_channels: Vec<ZenohTxChannel<TxId>>,
62    rx_channels: Vec<ZenohRxChannel<RxId>>,
63}
64
65#[derive(Reflect)]
66#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
67pub struct ZenohBridge<Tx, Rx>
68where
69    Tx: BridgeChannelSet + 'static,
70    Rx: BridgeChannelSet + 'static,
71    Tx::Id: Send + Sync + 'static,
72    Rx::Id: Send + Sync + 'static,
73{
74    #[reflect(ignore)]
75    session_config: Config,
76    #[reflect(ignore)]
77    tx_channels: Vec<ZenohChannelConfig<Tx::Id>>,
78    #[reflect(ignore)]
79    rx_channels: Vec<ZenohChannelConfig<Rx::Id>>,
80    #[reflect(ignore)]
81    ctx: Option<ZenohContext<Tx::Id, Rx::Id>>,
82}
83
84impl<Tx, Rx> Freezable for ZenohBridge<Tx, Rx>
85where
86    Tx: BridgeChannelSet + 'static,
87    Rx: BridgeChannelSet + 'static,
88    Tx::Id: Send + Sync + 'static,
89    Rx::Id: Send + Sync + 'static,
90{
91}
92
93impl<Tx, Rx> cu29::reflect::TypePath for ZenohBridge<Tx, Rx>
94where
95    Tx: BridgeChannelSet + 'static,
96    Rx: BridgeChannelSet + 'static,
97    Tx::Id: Send + Sync + 'static,
98    Rx::Id: Send + Sync + 'static,
99{
100    fn type_path() -> &'static str {
101        "cu_zenoh_bridge::ZenohBridge"
102    }
103
104    fn short_type_path() -> &'static str {
105        "ZenohBridge"
106    }
107
108    fn type_ident() -> Option<&'static str> {
109        Some("ZenohBridge")
110    }
111
112    fn crate_name() -> Option<&'static str> {
113        Some("cu_zenoh_bridge")
114    }
115
116    fn module_path() -> Option<&'static str> {
117        Some("cu_zenoh_bridge")
118    }
119}
120
121impl<Tx, Rx> ZenohBridge<Tx, Rx>
122where
123    Tx: BridgeChannelSet + 'static,
124    Rx: BridgeChannelSet + 'static,
125    Tx::Id: Send + Sync + 'static,
126    Rx::Id: Send + Sync + 'static,
127{
128    fn parse_session_config(config: Option<&ComponentConfig>) -> CuResult<Config> {
129        if let Some(config) = config {
130            if let Some(path) = config.get::<String>("zenoh_config_file")? {
131                return Config::from_file(&path).map_err(|e| {
132                    CuError::from(format!("ZenohBridge: Failed to read config file: {e}"))
133                });
134            }
135            if let Some(json) = config.get::<String>("zenoh_config_json")? {
136                return Config::from_json5(&json).map_err(|e| {
137                    CuError::from(format!("ZenohBridge: Failed to parse config json: {e}"))
138                });
139            }
140        }
141        Ok(Config::default())
142    }
143
144    fn parse_default_wire_format(config: Option<&ComponentConfig>) -> CuResult<WireFormat> {
145        if let Some(config) = config
146            && let Some(raw) = config.get::<String>("wire_format")?
147        {
148            return WireFormat::parse(&raw).ok_or_else(|| {
149                CuError::from(format!(
150                    "ZenohBridge: Unsupported wire_format '{raw}', expected bincode/json/cbor"
151                ))
152            });
153        }
154        Ok(WireFormat::Bincode)
155    }
156
157    fn channel_route<Id: Copy + core::fmt::Debug>(
158        channel: &BridgeChannelConfig<Id>,
159    ) -> CuResult<String> {
160        channel
161            .effective_route()
162            .map(|route| route.into_owned())
163            .ok_or_else(|| {
164                let id = channel.channel.id;
165                CuError::from(format!("ZenohBridge: Missing route for channel {:?}", id))
166            })
167    }
168
169    fn channel_wire_format<Id: Copy>(
170        channel: &BridgeChannelConfig<Id>,
171        default: WireFormat,
172    ) -> CuResult<WireFormat> {
173        if let Some(config) = channel.config.as_ref()
174            && let Some(raw) = config.get::<String>("wire_format")?
175        {
176            return WireFormat::parse(&raw).ok_or_else(|| {
177                CuError::from(format!(
178                    "ZenohBridge: Unsupported wire_format '{raw}', expected bincode/json/cbor"
179                ))
180            });
181        }
182        Ok(default)
183    }
184
185    fn encode_message<Payload: CuMsgPayload>(
186        wire_format: WireFormat,
187        msg: &CuMsg<Payload>,
188    ) -> CuResult<Vec<u8>> {
189        match wire_format {
190            WireFormat::Bincode => bincode::encode_to_vec(msg, bincode::config::standard())
191                .map_err(|e| CuError::new_with_cause("ZenohBridge: bincode encode failed", e)),
192            WireFormat::Json => serde_json::to_vec(msg)
193                .map_err(|e| CuError::new_with_cause("ZenohBridge: json encode failed", e)),
194            WireFormat::Cbor => minicbor_serde::to_vec(msg)
195                .map_err(|e| CuError::new_with_cause("ZenohBridge: cbor encode failed", e)),
196        }
197    }
198
199    fn decode_message<Payload: CuMsgPayload>(
200        wire_format: WireFormat,
201        bytes: &[u8],
202    ) -> CuResult<CuMsg<Payload>> {
203        match wire_format {
204            WireFormat::Bincode => {
205                let (decoded, _): (CuMsg<Payload>, usize) =
206                    bincode::decode_from_slice(bytes, bincode::config::standard()).map_err(
207                        |e| CuError::new_with_cause("ZenohBridge: bincode decode failed", e),
208                    )?;
209                Ok(decoded)
210            }
211            WireFormat::Json => serde_json::from_slice(bytes)
212                .map_err(|e| CuError::new_with_cause("ZenohBridge: json decode failed", e)),
213            WireFormat::Cbor => minicbor_serde::from_slice(bytes)
214                .map_err(|e| CuError::new_with_cause("ZenohBridge: cbor decode failed", e)),
215        }
216    }
217
218    fn find_tx_channel_mut(
219        channels: &mut [ZenohTxChannel<Tx::Id>],
220        id: Tx::Id,
221    ) -> Option<&mut ZenohTxChannel<Tx::Id>> {
222        channels.iter_mut().find(|channel| channel.id == id)
223    }
224
225    fn find_rx_channel_mut(
226        channels: &mut [ZenohRxChannel<Rx::Id>],
227        id: Rx::Id,
228    ) -> Option<&mut ZenohRxChannel<Rx::Id>> {
229        channels.iter_mut().find(|channel| channel.id == id)
230    }
231}
232
233impl<Tx, Rx> CuBridge for ZenohBridge<Tx, Rx>
234where
235    Tx: BridgeChannelSet + 'static,
236    Rx: BridgeChannelSet + 'static,
237    Tx::Id: core::fmt::Debug + Send + Sync + 'static,
238    Rx::Id: core::fmt::Debug + Send + Sync + 'static,
239{
240    type Tx = Tx;
241    type Rx = Rx;
242    type Resources<'r> = ();
243
244    fn new(
245        config: Option<&ComponentConfig>,
246        tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
247        rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
248        _resources: Self::Resources<'_>,
249    ) -> CuResult<Self>
250    where
251        Self: Sized,
252    {
253        let session_config = Self::parse_session_config(config)?;
254        let default_wire_format = Self::parse_default_wire_format(config)?;
255
256        let mut tx_cfgs = Vec::with_capacity(tx_channels.len());
257        for channel in tx_channels {
258            let route = Self::channel_route(channel)?;
259            let wire_format = Self::channel_wire_format(channel, default_wire_format)?;
260            tx_cfgs.push(ZenohChannelConfig {
261                id: channel.channel.id,
262                route,
263                wire_format,
264            });
265        }
266
267        let mut rx_cfgs = Vec::with_capacity(rx_channels.len());
268        for channel in rx_channels {
269            let route = Self::channel_route(channel)?;
270            let wire_format = Self::channel_wire_format(channel, default_wire_format)?;
271            rx_cfgs.push(ZenohChannelConfig {
272                id: channel.channel.id,
273                route,
274                wire_format,
275            });
276        }
277
278        Ok(Self {
279            session_config,
280            tx_channels: tx_cfgs,
281            rx_channels: rx_cfgs,
282            ctx: None,
283        })
284    }
285
286    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
287        let session = zenoh::Wait::wait(zenoh::open(self.session_config.clone()))
288            .map_err(cu_error_map("ZenohBridge: Failed to open session"))?;
289
290        let mut tx_channels = Vec::with_capacity(self.tx_channels.len());
291        for channel in &self.tx_channels {
292            let key_expr = KeyExpr::<'static>::new(channel.route.clone())
293                .map_err(cu_error_map("ZenohBridge: Invalid Tx key expression"))?;
294            let publisher = zenoh::Wait::wait(session.declare_publisher(key_expr))
295                .map_err(cu_error_map("ZenohBridge: Failed to declare publisher"))?;
296            tx_channels.push(ZenohTxChannel {
297                id: channel.id,
298                publisher,
299                wire_format: channel.wire_format,
300            });
301        }
302
303        let mut rx_channels = Vec::with_capacity(self.rx_channels.len());
304        for channel in &self.rx_channels {
305            let key_expr = KeyExpr::<'static>::new(channel.route.clone())
306                .map_err(cu_error_map("ZenohBridge: Invalid Rx key expression"))?;
307            let subscriber = zenoh::Wait::wait(session.declare_subscriber(key_expr))
308                .map_err(cu_error_map("ZenohBridge: Failed to declare subscriber"))?;
309            rx_channels.push(ZenohRxChannel {
310                id: channel.id,
311                subscriber,
312                wire_format: channel.wire_format,
313            });
314        }
315
316        self.ctx = Some(ZenohContext {
317            session,
318            tx_channels,
319            rx_channels,
320        });
321        Ok(())
322    }
323
324    fn send<'a, Payload>(
325        &mut self,
326        _clock: &RobotClock,
327        channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
328        msg: &CuMsg<Payload>,
329    ) -> CuResult<()>
330    where
331        Payload: CuMsgPayload + 'a,
332    {
333        let ctx = self
334            .ctx
335            .as_mut()
336            .ok_or_else(|| CuError::from("ZenohBridge: Context not initialized"))?;
337        let tx_channel =
338            Self::find_tx_channel_mut(&mut ctx.tx_channels, channel.id()).ok_or_else(|| {
339                CuError::from(format!(
340                    "ZenohBridge: Unknown Tx channel {:?}",
341                    channel.id()
342                ))
343            })?;
344
345        let encoded = Self::encode_message(tx_channel.wire_format, msg)?;
346        zenoh::Wait::wait(
347            tx_channel
348                .publisher
349                .put(encoded)
350                .encoding(tx_channel.wire_format.encoding()),
351        )
352        .map_err(cu_error_map("ZenohBridge: Failed to publish"))?;
353        Ok(())
354    }
355
356    fn receive<'a, Payload>(
357        &mut self,
358        clock: &RobotClock,
359        channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
360        msg: &mut CuMsg<Payload>,
361    ) -> CuResult<()>
362    where
363        Payload: CuMsgPayload + 'a,
364    {
365        let ctx = self
366            .ctx
367            .as_mut()
368            .ok_or_else(|| CuError::from("ZenohBridge: Context not initialized"))?;
369        let rx_channel =
370            Self::find_rx_channel_mut(&mut ctx.rx_channels, channel.id()).ok_or_else(|| {
371                CuError::from(format!(
372                    "ZenohBridge: Unknown Rx channel {:?}",
373                    channel.id()
374                ))
375            })?;
376
377        msg.tov = Tov::Time(clock.now());
378
379        let sample = rx_channel
380            .subscriber
381            .try_recv()
382            .map_err(|e| CuError::from(format!("ZenohBridge: receive failed: {e}")))?;
383        if let Some(sample) = sample {
384            let payload = sample.payload().to_bytes();
385            let decoded = Self::decode_message(rx_channel.wire_format, payload.as_ref())?;
386            *msg = decoded;
387        } else {
388            msg.clear_payload();
389        }
390        Ok(())
391    }
392
393    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
394        if let Some(ZenohContext {
395            session,
396            tx_channels,
397            rx_channels,
398        }) = self.ctx.take()
399        {
400            for channel in tx_channels {
401                zenoh::Wait::wait(channel.publisher.undeclare())
402                    .map_err(cu_error_map("ZenohBridge: Failed to undeclare publisher"))?;
403            }
404            for channel in rx_channels {
405                zenoh::Wait::wait(channel.subscriber.undeclare())
406                    .map_err(cu_error_map("ZenohBridge: Failed to undeclare subscriber"))?;
407            }
408            zenoh::Wait::wait(session.close())
409                .map_err(cu_error_map("ZenohBridge: Failed to close session"))?;
410        }
411        Ok(())
412    }
413}
414
415fn cu_error(msg: &str, error: ZenohError) -> CuError {
416    CuError::from(format!("{msg}: {error}"))
417}
418
419fn cu_error_map(msg: &str) -> impl FnOnce(ZenohError) -> CuError + '_ {
420    move |e| cu_error(msg, e)
421}