cu_zenoh_sink/
lib.rs

1use cu29::clock::RobotClock;
2use cu29::{bincode, prelude::*};
3
4use zenoh::Config;
5use zenoh::Error as ZenohError;
6use zenoh::key_expr::KeyExpr;
7
8use std::marker::PhantomData;
9
10/// This is a sink task that sends messages to a zenoh topic.
11/// P is the payload type of the messages.
12/// Copper messages and Zenoh payloads are compatible.
13pub struct ZenohSink<P>
14where
15    P: CuMsgPayload,
16{
17    _marker: PhantomData<P>,
18    config: ZenohConfig,
19    ctx: Option<ZenohContext>,
20}
21
22pub struct ZenohConfig {
23    config: Config,
24    topic: String,
25}
26
27pub struct ZenohContext {
28    session: zenoh::Session,
29    publisher: zenoh::pubsub::Publisher<'static>,
30}
31
32fn cu_error(msg: &str, error: ZenohError) -> CuError {
33    CuError::from(msg).add_cause(&error.to_string())
34}
35
36fn cu_error_map(msg: &str) -> impl FnOnce(ZenohError) -> CuError + '_ {
37    |e| cu_error(msg, e)
38}
39
40impl<P> Freezable for ZenohSink<P> where P: CuMsgPayload {}
41
42impl<P> CuSinkTask for ZenohSink<P>
43where
44    P: CuMsgPayload + 'static,
45{
46    type Resources<'r> = ();
47    type Input<'m> = input_msg!(P);
48
49    fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
50    where
51        Self: Sized,
52    {
53        let config = config.ok_or(CuError::from("ZenohSink: Missing configuration"))?;
54
55        // Get json zenoh config
56        let session_config = config.get::<String>("zenoh_config_file").map_or(
57            // Or default zenoh config otherwise
58            Ok(Config::default()),
59            |s| -> CuResult<Config> {
60                Config::from_file(&s)
61                    .map_err(cu_error_map("ZenohSink: Failed to create zenoh config"))
62            },
63        )?;
64
65        let topic = config.get::<String>("topic").unwrap_or("copper".to_owned());
66
67        Ok(Self {
68            _marker: Default::default(),
69            config: ZenohConfig {
70                config: session_config,
71                topic,
72            },
73            ctx: None,
74        })
75    }
76
77    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
78        let session = zenoh::Wait::wait(zenoh::open(self.config.config.clone()))
79            .map_err(cu_error_map("ZenohSink: Failed to open session"))?;
80
81        let key_expr = KeyExpr::<'static>::new(self.config.topic.clone())
82            .map_err(cu_error_map("ZenohSink: Invalid topic string"))?;
83
84        debug!("Zenoh session open");
85        let publisher = zenoh::Wait::wait(session.declare_publisher(key_expr))
86            .map_err(cu_error_map("ZenohSink: Failed to create publisher"))?;
87
88        self.ctx = Some(ZenohContext { session, publisher });
89        Ok(())
90    }
91
92    fn process(&mut self, _clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
93        let ctx = self
94            .ctx
95            .as_mut()
96            .ok_or_else(|| CuError::from("ZenohSink: Context not found"))?;
97
98        let encoded =
99            bincode::encode_to_vec(input, bincode::config::standard()).expect("Encoding failed");
100        zenoh::Wait::wait(ctx.publisher.put(encoded))
101            .map_err(cu_error_map("ZenohSink: Failed to put value"))?;
102        Ok(())
103    }
104
105    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
106        if let Some(ZenohContext { session, publisher }) = self.ctx.take() {
107            zenoh::Wait::wait(publisher.undeclare())
108                .map_err(cu_error_map("ZenohSink: Failed to undeclare publisher"))?;
109            zenoh::Wait::wait(session.close())
110                .map_err(cu_error_map("ZenohSink: Failed to close session"))?;
111        }
112        debug!("ZenohSink: Stopped");
113        Ok(())
114    }
115}