1#[doc(hidden)]
4extern crate alloc;
5
6#[cfg(feature = "std")]
7extern crate std;
8
9use alloc::sync::Arc;
11use alloc::{
12 borrow::ToOwned,
13 boxed::Box,
14 string::{String, ToString},
15 vec::Vec,
16};
17use bitcode::decode;
18use core::time::Duration;
19use dimas_core::{
20 enums::OperationState,
21 message_types::{ControlResponse, Message, ObservableResponse},
22 traits::{Capability, Context},
23 utils::{cancel_selector_from, feedback_selector_from, request_selector_from},
24 Result,
25};
26use futures::future::BoxFuture;
27#[cfg(feature = "std")]
28use tokio::{sync::Mutex, task::JoinHandle};
29use tracing::{error, instrument, warn, Level};
30#[cfg(feature = "unstable")]
31use zenoh::sample::Locality;
32use zenoh::Session;
33use zenoh::{
34 query::{ConsolidationMode, QueryTarget},
35 sample::SampleKind,
36 Wait,
37};
38
39use crate::error::Error;
40pub type ControlCallback<P> =
45 Box<dyn FnMut(Context<P>, ControlResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
46pub type ArcControlCallback<P> = Arc<Mutex<ControlCallback<P>>>;
48pub type ResponseCallback<P> =
50 Box<dyn FnMut(Context<P>, ObservableResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
51pub type ArcResponseCallback<P> = Arc<Mutex<ResponseCallback<P>>>;
53pub struct Observer<P>
58where
59 P: Send + Sync + 'static,
60{
61 session: Arc<Session>,
63 selector: String,
65 context: Context<P>,
67 activation_state: OperationState,
68 control_callback: ArcControlCallback<P>,
70 response_callback: ArcResponseCallback<P>,
72 timeout: Duration,
74 handle: std::sync::Mutex<Option<JoinHandle<()>>>,
75}
76
77impl<P> core::fmt::Debug for Observer<P>
78where
79 P: Send + Sync + 'static,
80{
81 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82 f.debug_struct("Observer").finish_non_exhaustive()
83 }
84}
85
86impl<P> crate::traits::Observer for Observer<P>
87where
88 P: Send + Sync + 'static,
89{
90 fn selector(&self) -> &str {
92 &self.selector
93 }
94
95 #[instrument(level = Level::ERROR, skip_all)]
97 fn cancel(&self) -> Result<()> {
98 let selector = cancel_selector_from(&self.selector);
100 let builder = self
101 .session
102 .get(&selector)
103 .target(QueryTarget::All)
104 .consolidation(ConsolidationMode::None)
105 .timeout(self.timeout);
106
107 #[cfg(feature = "unstable")]
108 let builder = builder.allowed_destination(Locality::Any);
109
110 let query = builder
111 .wait()
112 .map_err(|source| Error::QueryCreation { source })?;
113
114 let mut unreached = true;
115 let mut retry_count = 0u8;
116
117 while unreached && retry_count <= 5 {
118 retry_count += 1;
119 while let Ok(reply) = query.recv() {
120 match reply.result() {
121 Ok(sample) => match sample.kind() {
122 SampleKind::Put => {
123 let ccb = self.control_callback.clone();
124 let ctx = self.context.clone();
125 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
126 let response: ControlResponse = decode(&content)?;
127 if matches!(response, ControlResponse::Canceled) {
128 tokio::spawn(async move {
130 let mut lock = ccb.lock().await;
131 if let Err(error) = lock(ctx.clone(), response).await {
132 error!("callback failed with {error}");
133 }
134 });
135 } else {
136 error!("unexpected response on cancelation");
137 };
138 }
139 SampleKind::Delete => {
140 error!("Delete in cancel");
141 }
142 },
143 Err(err) => error!("receive error: {:?})", err),
144 }
145 unreached = false;
146 }
147 if unreached {
148 if retry_count < 5 {
149 std::thread::sleep(self.timeout);
150 } else {
151 return Err(Error::AccessingObservable {
152 selector: self.selector.to_string(),
153 }
154 .into());
155 }
156 }
157 }
158 Ok(())
159 }
160
161 #[instrument(level = Level::ERROR, skip_all)]
163 fn request(&self, message: Option<Message>) -> Result<()> {
164 let session = self.session.clone();
165 let selector = request_selector_from(&self.selector);
167 let mut query = session
168 .get(&selector)
169 .target(QueryTarget::All)
170 .consolidation(ConsolidationMode::None)
171 .timeout(self.timeout);
172
173 if let Some(message) = message {
174 let value = message.value().to_owned();
175 query = query.payload(value);
176 };
177
178 #[cfg(feature = "unstable")]
179 let query = query.allowed_destination(Locality::Any);
180
181 let query = query
182 .wait()
183 .map_err(|source| Error::QueryCreation { source })?;
184
185 let mut unreached = true;
186 let mut retry_count = 0u8;
187
188 while unreached && retry_count <= 5 {
189 retry_count += 1;
190 while let Ok(reply) = query.recv() {
191 let session = session.clone();
192 match reply.result() {
193 Ok(sample) => match sample.kind() {
194 SampleKind::Put => {
195 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
196 decode::<ControlResponse>(&content).map_or_else(
197 |_| todo!(),
198 |response| {
199 if matches!(response, ControlResponse::Accepted) {
200 let ctx = self.context.clone();
201 #[cfg(not(feature = "unstable"))]
204 let source_id = "*".to_string();
205 #[cfg(feature = "unstable")]
206 let source_id = reply.result().map_or_else(
207 |_| {
208 reply.replier_id().map_or_else(
209 || "*".to_string(),
210 |id| id.to_string(),
211 )
212 },
213 |sample| {
214 sample.source_info().source_id().map_or_else(
215 || {
216 reply.replier_id().map_or_else(
217 || "*".to_string(),
218 |id| id.to_string(),
219 )
220 },
221 |id| id.zid().to_string(),
222 )
223 },
224 );
225 let selector =
226 feedback_selector_from(&self.selector, &source_id);
227
228 let rcb = self.response_callback.clone();
229 tokio::task::spawn(async move {
230 if let Err(error) =
231 run_observation(session, selector, ctx, rcb).await
232 {
233 error!("observation failed with {error}");
234 };
235 });
236 };
237 let ctx = self.context.clone();
239 let ccb = self.control_callback.clone();
240 tokio::task::spawn(async move {
241 let mut lock = ccb.lock().await;
242 if let Err(error) = lock(ctx, response).await {
243 error!("control callback failed with {error}");
244 }
245 });
246 },
247 );
248 }
249 SampleKind::Delete => {
250 error!("Delete in request response");
251 }
252 },
253 Err(err) => error!("request response error: {:?})", err),
254 };
255 unreached = false;
256 }
257 if unreached {
258 if retry_count < 5 {
259 std::thread::sleep(self.timeout);
260 } else {
261 return Err(Error::AccessingObservable {
262 selector: self.selector.to_string(),
263 }
264 .into());
265 }
266 }
267 }
268 Ok(())
269 }
270}
271
272impl<P> Capability for Observer<P>
273where
274 P: Send + Sync + 'static,
275{
276 fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
277 if state >= &self.activation_state {
278 return self.init();
279 } else if state < &self.activation_state {
280 return self.de_init();
281 }
282 Ok(())
283 }
284}
285
286impl<P> Observer<P>
287where
288 P: Send + Sync + 'static,
289{
290 #[must_use]
292 pub fn new(
293 session: Arc<Session>,
294 selector: String,
295 context: Context<P>,
296 activation_state: OperationState,
297 control_callback: ArcControlCallback<P>,
298 response_callback: ArcResponseCallback<P>,
299 timeout: Duration,
300 ) -> Self {
301 Self {
302 session,
303 selector,
304 context,
305 activation_state,
306 control_callback,
307 response_callback,
308 timeout,
309 handle: std::sync::Mutex::new(None),
310 }
311 }
312
313 #[instrument(level = Level::TRACE, skip_all)]
317 fn init(&self) -> Result<()> {
318 self.de_init()
319 }
320
321 #[allow(clippy::unnecessary_wraps)]
325 fn de_init(&self) -> Result<()> {
326 let _ = crate::traits::Observer::cancel(self);
328 self.handle.lock().map_or_else(
329 |_| todo!(),
330 |mut handle| {
331 handle.take();
332 Ok(())
333 },
334 )
335 }
336}
337#[allow(clippy::significant_drop_in_scrutinee)]
341#[instrument(name="observation", level = Level::ERROR, skip_all)]
342async fn run_observation<P>(
343 session: Arc<Session>,
344 selector: String,
345 ctx: Context<P>,
346 rcb: ArcResponseCallback<P>,
347) -> Result<()> {
348 let subscriber = session.declare_subscriber(&selector).await?;
350
351 loop {
352 match subscriber.recv_async().await {
353 Ok(sample) => {
355 match sample.kind() {
356 SampleKind::Put => {
357 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
358 match decode::<ObservableResponse>(&content) {
359 Ok(response) => {
360 let stop = !matches!(response, ObservableResponse::Feedback(_));
362 let ctx = ctx.clone();
363 if let Err(error) = rcb.lock().await(ctx, response).await {
364 error!("response callback failed with {error}");
365 };
366 if stop {
367 break;
368 };
369 }
370 Err(_) => todo!(),
371 };
372 }
373 SampleKind::Delete => {
374 error!("unexpected delete in observation response");
375 }
376 }
377 }
378 Err(err) => {
379 error!("observation response with {err}");
380 }
381 }
382 }
383 Ok(())
384}
385#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[derive(Debug)]
392 struct Props {}
393
394 const fn is_normal<T: Sized + Send + Sync>() {}
396
397 #[test]
398 const fn normal_types() {
399 is_normal::<Observer<Props>>();
400 }
401}