1use cu29::clock::RobotClock;
2use cu29::{bincode, prelude::*};
3
4use zenoh::Config;
5use zenoh::Error as ZenohError;
6use zenoh::key_expr::KeyExpr;
7
8use std::marker::PhantomData;
9
10pub struct ZenohSink<P>
14where
15 P: CuMsgPayload,
16{
17 _marker: PhantomData<P>,
18 config: ZenohConfig,
19 ctx: Option<ZenohContext>,
20}
21
22pub struct ZenohConfig {
23 config: Config,
24 topic: String,
25}
26
27pub struct ZenohContext {
28 session: zenoh::Session,
29 publisher: zenoh::pubsub::Publisher<'static>,
30}
31
32fn cu_error(msg: &str, error: ZenohError) -> CuError {
33 CuError::from(msg).add_cause(&error.to_string())
34}
35
36fn cu_error_map(msg: &str) -> impl FnOnce(ZenohError) -> CuError + '_ {
37 |e| cu_error(msg, e)
38}
39
40impl<P> Freezable for ZenohSink<P> where P: CuMsgPayload {}
41
42impl<P> CuSinkTask for ZenohSink<P>
43where
44 P: CuMsgPayload + 'static,
45{
46 type Resources<'r> = ();
47 type Input<'m> = input_msg!(P);
48
49 fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
50 where
51 Self: Sized,
52 {
53 let config = config.ok_or(CuError::from("ZenohSink: Missing configuration"))?;
54
55 let session_config = config.get::<String>("zenoh_config_file").map_or(
57 Ok(Config::default()),
59 |s| -> CuResult<Config> {
60 Config::from_file(&s)
61 .map_err(cu_error_map("ZenohSink: Failed to create zenoh config"))
62 },
63 )?;
64
65 let topic = config.get::<String>("topic").unwrap_or("copper".to_owned());
66
67 Ok(Self {
68 _marker: Default::default(),
69 config: ZenohConfig {
70 config: session_config,
71 topic,
72 },
73 ctx: None,
74 })
75 }
76
77 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
78 let session = zenoh::Wait::wait(zenoh::open(self.config.config.clone()))
79 .map_err(cu_error_map("ZenohSink: Failed to open session"))?;
80
81 let key_expr = KeyExpr::<'static>::new(self.config.topic.clone())
82 .map_err(cu_error_map("ZenohSink: Invalid topic string"))?;
83
84 debug!("Zenoh session open");
85 let publisher = zenoh::Wait::wait(session.declare_publisher(key_expr))
86 .map_err(cu_error_map("ZenohSink: Failed to create publisher"))?;
87
88 self.ctx = Some(ZenohContext { session, publisher });
89 Ok(())
90 }
91
92 fn process(&mut self, _clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
93 let ctx = self
94 .ctx
95 .as_mut()
96 .ok_or_else(|| CuError::from("ZenohSink: Context not found"))?;
97
98 let encoded =
99 bincode::encode_to_vec(input, bincode::config::standard()).expect("Encoding failed");
100 zenoh::Wait::wait(ctx.publisher.put(encoded))
101 .map_err(cu_error_map("ZenohSink: Failed to put value"))?;
102 Ok(())
103 }
104
105 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
106 if let Some(ZenohContext { session, publisher }) = self.ctx.take() {
107 zenoh::Wait::wait(publisher.undeclare())
108 .map_err(cu_error_map("ZenohSink: Failed to undeclare publisher"))?;
109 zenoh::Wait::wait(session.close())
110 .map_err(cu_error_map("ZenohSink: Failed to close session"))?;
111 }
112 debug!("ZenohSink: Stopped");
113 Ok(())
114 }
115}