1mod attachment;
2mod error;
3mod keyexpr;
4mod liveliness;
5mod node;
6mod topic;
7
8use attachment::encode_attachment;
9use cdr::{CdrBe, Infinite};
10use cu_ros_payloads::RosMsgAdapter;
11use cu29::clock::RobotClock;
12use cu29::prelude::*;
13use error::cu_error_map;
14use liveliness::{node_liveliness, publisher_liveliness};
15use node::Node;
16use topic::Topic;
17use zenoh::Config;
18use zenoh::bytes::Encoding;
19
20use std::marker::PhantomData;
21
22const NODE_ID: u32 = 0;
24
25const PUBLISHER_ID: u32 = NODE_ID + 1;
27
28#[derive(Reflect)]
32#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
33pub struct ZenohRosSink<P>
34where
35 P: CuMsgPayload + RosMsgAdapter<'static>,
36{
37 #[reflect(ignore)]
38 _marker: PhantomData<fn() -> P>,
39 #[reflect(ignore)]
40 config: ZenohRosConfig,
41 #[reflect(ignore)]
42 ctx: Option<ZenohRosContext>,
43}
44
45pub struct ZenohRosConfig {
46 session: Config,
47 domain_id: u32,
48 namespace: String,
49 node: String,
50 topic: String,
51}
52
53pub struct ZenohRosContext {
54 session: zenoh::Session,
55 publisher: zenoh::pubsub::Publisher<'static>,
56 #[allow(dead_code)]
58 node_token: zenoh::liveliness::LivelinessToken,
59 #[allow(dead_code)]
60 publisher_token: zenoh::liveliness::LivelinessToken,
61 sequence_number: u64,
62}
63
64impl<P> Freezable for ZenohRosSink<P> where P: CuMsgPayload + RosMsgAdapter<'static> {}
65
66impl<P> cu29::reflect::TypePath for ZenohRosSink<P>
67where
68 P: CuMsgPayload + RosMsgAdapter<'static> + 'static,
69{
70 fn type_path() -> &'static str {
71 "cu_zenoh_ros_sink::ZenohRosSink"
72 }
73
74 fn short_type_path() -> &'static str {
75 "ZenohRosSink"
76 }
77
78 fn type_ident() -> Option<&'static str> {
79 Some("ZenohRosSink")
80 }
81
82 fn crate_name() -> Option<&'static str> {
83 Some("cu_zenoh_ros_sink")
84 }
85
86 fn module_path() -> Option<&'static str> {
87 Some("cu_zenoh_ros_sink")
88 }
89}
90
91impl<P> CuSinkTask for ZenohRosSink<P>
92where
93 P: CuMsgPayload + RosMsgAdapter<'static> + 'static,
94{
95 type Resources<'r> = ();
96 type Input<'m> = input_msg!(P);
97
98 fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
99 where
100 Self: Sized,
101 {
102 let config = config.ok_or(CuError::from("ZenohRosSink: Missing configuration"))?;
103
104 let session_config = match config.get::<String>("zenoh_config_json")? {
106 Some(json) => Config::from_json5(&json)
107 .map_err(cu_error_map("ZenohRosSink: Failed to create zenoh config"))?,
108 None => Config::default(),
109 };
110
111 Ok(Self {
112 _marker: PhantomData,
113 config: ZenohRosConfig {
114 session: session_config,
115 domain_id: config.get::<u32>("domain_id")?.unwrap_or(0),
116 namespace: config.get::<String>("namespace")?.unwrap_or("node".into()),
117 node: config.get::<String>("node")?.unwrap_or("node".into()),
118 topic: config.get::<String>("topic")?.unwrap_or("copper".into()),
119 },
120 ctx: None,
121 })
122 }
123
124 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
125 let session = zenoh::Wait::wait(zenoh::open(self.config.session.clone()))
126 .map_err(cu_error_map("ZenohRosSink: Failed to open session"))?;
127
128 debug!("Zenoh session open");
129
130 let zid = session.zid();
131
132 let node = Node {
133 domain_id: self.config.domain_id,
134 zid,
135 id: NODE_ID,
136 namespace: self.config.namespace.as_ref(),
137 name: self.config.node.as_ref(),
138 };
139
140 let topic = Topic::new::<P>(self.config.topic.as_ref());
141
142 let node_token =
143 zenoh::Wait::wait(session.liveliness().declare_token(node_liveliness(&node)?))
144 .map_err(cu_error_map(
145 "ZenohRosSink: Failed to declare node liveliness token",
146 ))?;
147
148 let publisher_token = zenoh::Wait::wait(
149 session
150 .liveliness()
151 .declare_token(publisher_liveliness(&node, &topic, PUBLISHER_ID)?),
152 )
153 .map_err(cu_error_map(
154 "ZenohRosSink: Failed to declare topic liveliness token",
155 ))?;
156
157 let publisher = zenoh::Wait::wait(session.declare_publisher(topic.pubsub_keyexpr(&node)?))
159 .map_err(cu_error_map("ZenohRosSink: Failed to create publisher"))?;
160
161 self.ctx = Some(ZenohRosContext {
162 session,
163 publisher,
164 node_token,
165 publisher_token,
166 sequence_number: 0,
167 });
168 Ok(())
169 }
170
171 fn process(&mut self, clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
172 let ctx = self
173 .ctx
174 .as_mut()
175 .ok_or(CuError::from("ZenohRosSink: Context not found"))?;
176
177 let payload = input.payload().ok_or(CuError::from(
179 "ZenohRosSink: Cannot send empty payload through ROS bridge",
180 ))?;
181
182 let ros_payload = payload.convert();
183 let serial_ros_payload = cdr::serialize::<_, _, CdrBe>(&ros_payload, Infinite)
184 .map_err(|e| CuError::new_with_cause("ZenohRosSink: Failed to serialize payload", e))?;
185
186 let serial_metadata = encode_attachment(ctx.sequence_number, clock, &ctx.session.zid());
187
188 ctx.sequence_number += 1;
189
190 zenoh::Wait::wait(
191 ctx.publisher
192 .put(serial_ros_payload)
193 .encoding(Encoding::APPLICATION_CDR)
194 .attachment(serial_metadata),
195 )
196 .map_err(cu_error_map("ZenohRosSink: Failed to put value"))?;
197
198 Ok(())
199 }
200
201 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
202 if let Some(ZenohRosContext {
203 session,
204 publisher,
205 node_token: _,
206 publisher_token: _,
207 sequence_number: _,
208 }) = self.ctx.take()
209 {
210 zenoh::Wait::wait(publisher.undeclare())
211 .map_err(cu_error_map("ZenohRosSink: Failed to undeclare publisher"))?;
212 zenoh::Wait::wait(session.close())
213 .map_err(cu_error_map("ZenohRosSink: Failed to close session"))?;
214 }
215 debug!("ZenohRosSink: Stopped");
216 Ok(())
217 }
218}