1#[doc(hidden)]
4extern crate alloc;
5
6#[cfg(feature = "std")]
7extern crate std;
8
9use alloc::sync::Arc;
11use alloc::{
12 borrow::ToOwned,
13 boxed::Box,
14 string::{String, ToString},
15 vec::Vec,
16};
17use bitcode::decode;
18use core::time::Duration;
19use dimas_core::{
20 Result,
21 enums::OperationState,
22 message_types::{ControlResponse, Message, ObservableResponse},
23 traits::{Capability, Context},
24 utils::{cancel_selector_from, feedback_selector_from, request_selector_from},
25};
26use futures::future::BoxFuture;
27#[cfg(feature = "std")]
28use tokio::{sync::Mutex, task::JoinHandle};
29use tracing::{Level, error, instrument, warn};
30use zenoh::Session;
31#[cfg(feature = "unstable")]
32use zenoh::sample::Locality;
33use zenoh::{
34 Wait,
35 query::{ConsolidationMode, QueryTarget},
36 sample::SampleKind,
37};
38
39use crate::error::Error;
40pub type ControlCallback<P> =
45 Box<dyn FnMut(Context<P>, ControlResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
46pub type ArcControlCallback<P> = Arc<Mutex<ControlCallback<P>>>;
48pub type ResponseCallback<P> =
50 Box<dyn FnMut(Context<P>, ObservableResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
51pub type ArcResponseCallback<P> = Arc<Mutex<ResponseCallback<P>>>;
53pub struct Observer<P>
58where
59 P: Send + Sync + 'static,
60{
61 session: Arc<Session>,
63 selector: String,
65 context: Context<P>,
67 activation_state: OperationState,
68 control_callback: ArcControlCallback<P>,
70 response_callback: ArcResponseCallback<P>,
72 timeout: Duration,
74 handle: std::sync::Mutex<Option<JoinHandle<()>>>,
75}
76
77impl<P> core::fmt::Debug for Observer<P>
78where
79 P: Send + Sync + 'static,
80{
81 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82 f.debug_struct("Observer").finish_non_exhaustive()
83 }
84}
85
86impl<P> crate::traits::Observer for Observer<P>
87where
88 P: Send + Sync + 'static,
89{
90 fn selector(&self) -> &str {
92 &self.selector
93 }
94
95 #[allow(clippy::cognitive_complexity)]
97 #[instrument(level = Level::ERROR, skip_all)]
98 fn cancel(&self) -> Result<()> {
99 let selector = cancel_selector_from(&self.selector);
101 let builder = self
102 .session
103 .get(&selector)
104 .target(QueryTarget::All)
105 .consolidation(ConsolidationMode::None)
106 .timeout(self.timeout);
107
108 #[cfg(feature = "unstable")]
109 let builder = builder.allowed_destination(Locality::Any);
110
111 let query = builder
112 .wait()
113 .map_err(|source| Error::QueryCreation { source })?;
114
115 let mut unreached = true;
116 let mut retry_count = 0u8;
117
118 while unreached && retry_count <= 5 {
119 retry_count += 1;
120 while let Ok(reply) = query.recv() {
121 match reply.result() {
122 Ok(sample) => match sample.kind() {
123 SampleKind::Put => {
124 let ccb = self.control_callback.clone();
125 let ctx = self.context.clone();
126 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
127 let response: ControlResponse = decode(&content)?;
128 if matches!(response, ControlResponse::Canceled) {
129 tokio::spawn(async move {
131 let mut lock = ccb.lock().await;
132 if let Err(error) = lock(ctx.clone(), response).await {
133 error!("callback failed with {error}");
134 }
135 });
136 } else {
137 error!("unexpected response on cancelation");
138 }
139 }
140 SampleKind::Delete => {
141 error!("Delete in cancel");
142 }
143 },
144 Err(err) => error!("receive error: {:?})", err),
145 }
146 unreached = false;
147 }
148 if unreached {
149 if retry_count < 5 {
150 std::thread::sleep(self.timeout);
151 } else {
152 return Err(Error::AccessingObservable {
153 selector: self.selector.clone(),
154 }
155 .into());
156 }
157 }
158 }
159 Ok(())
160 }
161
162 #[allow(clippy::cognitive_complexity)]
164 #[instrument(level = Level::ERROR, skip_all)]
165 fn request(&self, message: Option<Message>) -> Result<()> {
166 let session = self.session.clone();
167 let selector = request_selector_from(&self.selector);
169 let mut query = session
170 .get(&selector)
171 .target(QueryTarget::All)
172 .consolidation(ConsolidationMode::None)
173 .timeout(self.timeout);
174
175 if let Some(message) = message {
176 let value = message.value().to_owned();
177 query = query.payload(value);
178 }
179
180 #[cfg(feature = "unstable")]
181 let query = query.allowed_destination(Locality::Any);
182
183 let query = query
184 .wait()
185 .map_err(|source| Error::QueryCreation { source })?;
186
187 let mut unreached = true;
188 let mut retry_count = 0u8;
189
190 while unreached && retry_count <= 5 {
191 retry_count += 1;
192 while let Ok(reply) = query.recv() {
193 let session = session.clone();
194 match reply.result() {
195 Ok(sample) => match sample.kind() {
196 SampleKind::Put => {
197 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
198 decode::<ControlResponse>(&content).map_or_else(
199 |_| todo!(),
200 |response| {
201 if matches!(response, ControlResponse::Accepted) {
202 let ctx = self.context.clone();
203 #[cfg(not(feature = "unstable"))]
206 let source_id = "*".to_string();
207 #[cfg(feature = "unstable")]
208 let source_id = reply.result().map_or_else(
209 |_| {
210 reply.replier_id().map_or_else(
211 || "*".to_string(),
212 |id| id.to_string(),
213 )
214 },
215 |sample| {
216 sample.source_info().source_id().map_or_else(
217 || {
218 reply.replier_id().map_or_else(
219 || "*".to_string(),
220 |id| id.to_string(),
221 )
222 },
223 |id| id.zid().to_string(),
224 )
225 },
226 );
227 let selector =
228 feedback_selector_from(&self.selector, &source_id);
229
230 let rcb = self.response_callback.clone();
231 tokio::task::spawn(async move {
232 if let Err(error) =
233 run_observation(session, selector, ctx, rcb).await
234 {
235 error!("observation failed with {error}");
236 }
237 });
238 }
239 let ctx = self.context.clone();
241 let ccb = self.control_callback.clone();
242 tokio::task::spawn(async move {
243 let mut lock = ccb.lock().await;
244 if let Err(error) = lock(ctx, response).await {
245 error!("control callback failed with {error}");
246 }
247 });
248 },
249 );
250 }
251 SampleKind::Delete => {
252 error!("Delete in request response");
253 }
254 },
255 Err(err) => error!("request response error: {:?})", err),
256 }
257 unreached = false;
258 }
259 if unreached {
260 if retry_count < 5 {
261 std::thread::sleep(self.timeout);
262 } else {
263 return Err(Error::AccessingObservable {
264 selector: self.selector.clone(),
265 }
266 .into());
267 }
268 }
269 }
270 Ok(())
271 }
272}
273
274impl<P> Capability for Observer<P>
275where
276 P: Send + Sync + 'static,
277{
278 fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
279 if state >= &self.activation_state {
280 return self.init();
281 } else if state < &self.activation_state {
282 return self.de_init();
283 }
284 Ok(())
285 }
286}
287
288impl<P> Observer<P>
289where
290 P: Send + Sync + 'static,
291{
292 #[must_use]
294 pub fn new(
295 session: Arc<Session>,
296 selector: String,
297 context: Context<P>,
298 activation_state: OperationState,
299 control_callback: ArcControlCallback<P>,
300 response_callback: ArcResponseCallback<P>,
301 timeout: Duration,
302 ) -> Self {
303 Self {
304 session,
305 selector,
306 context,
307 activation_state,
308 control_callback,
309 response_callback,
310 timeout,
311 handle: std::sync::Mutex::new(None),
312 }
313 }
314
315 #[instrument(level = Level::TRACE, skip_all)]
319 fn init(&self) -> Result<()> {
320 self.de_init()
321 }
322
323 #[allow(clippy::unnecessary_wraps)]
327 fn de_init(&self) -> Result<()> {
328 let _ = crate::traits::Observer::cancel(self);
330 self.handle.lock().map_or_else(
331 |_| todo!(),
332 |mut handle| {
333 handle.take();
334 Ok(())
335 },
336 )
337 }
338}
339#[allow(clippy::significant_drop_in_scrutinee)]
343#[instrument(name="observation", level = Level::ERROR, skip_all)]
344async fn run_observation<P>(
345 session: Arc<Session>,
346 selector: String,
347 ctx: Context<P>,
348 rcb: ArcResponseCallback<P>,
349) -> Result<()> {
350 let subscriber = session.declare_subscriber(&selector).await?;
352
353 loop {
354 match subscriber.recv_async().await {
355 Ok(sample) => {
357 match sample.kind() {
358 SampleKind::Put => {
359 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
360 match decode::<ObservableResponse>(&content) {
361 Ok(response) => {
362 let stop = !matches!(response, ObservableResponse::Feedback(_));
364 let ctx = ctx.clone();
365 if let Err(error) = rcb.lock().await(ctx, response).await {
366 error!("response callback failed with {error}");
367 }
368 if stop {
369 break;
370 }
371 }
372 Err(_) => todo!(),
373 }
374 }
375 SampleKind::Delete => {
376 error!("unexpected delete in observation response");
377 }
378 }
379 }
380 Err(err) => {
381 error!("observation response with {err}");
382 }
383 }
384 }
385 Ok(())
386}
387#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[derive(Debug)]
394 struct Props {}
395
396 const fn is_normal<T: Sized + Send + Sync>() {}
398
399 #[test]
400 const fn normal_types() {
401 is_normal::<Observer<Props>>();
402 }
403}