dimas_com/zenoh/
observer.rs

1// Copyright © 2024 Stephan Kunz
2
3#[doc(hidden)]
4extern crate alloc;
5
6#[cfg(feature = "std")]
7extern crate std;
8
9// region:		--- modules
10use alloc::sync::Arc;
11use alloc::{
12	borrow::ToOwned,
13	boxed::Box,
14	string::{String, ToString},
15	vec::Vec,
16};
17use bitcode::decode;
18use core::time::Duration;
19use dimas_core::{
20	Result,
21	enums::OperationState,
22	message_types::{ControlResponse, Message, ObservableResponse},
23	traits::{Capability, Context},
24	utils::{cancel_selector_from, feedback_selector_from, request_selector_from},
25};
26use futures::future::BoxFuture;
27#[cfg(feature = "std")]
28use tokio::{sync::Mutex, task::JoinHandle};
29use tracing::{Level, error, instrument, warn};
30use zenoh::Session;
31#[cfg(feature = "unstable")]
32use zenoh::sample::Locality;
33use zenoh::{
34	Wait,
35	query::{ConsolidationMode, QueryTarget},
36	sample::SampleKind,
37};
38
39use crate::error::Error;
40// endregion:	--- modules
41
42// region:    	--- types
43/// Type definition for an observers `control` callback
44pub type ControlCallback<P> =
45	Box<dyn FnMut(Context<P>, ControlResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
46/// Type definition for an observers atomic reference counted `control` callback
47pub type ArcControlCallback<P> = Arc<Mutex<ControlCallback<P>>>;
48/// Type definition for an observers `response` callback
49pub type ResponseCallback<P> =
50	Box<dyn FnMut(Context<P>, ObservableResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
51/// Type definition for an observers atomic reference counted `response` callback
52pub type ArcResponseCallback<P> = Arc<Mutex<ResponseCallback<P>>>;
53// endregion: 	--- types
54
55// region:		--- Observer
56/// Observer
57pub struct Observer<P>
58where
59	P: Send + Sync + 'static,
60{
61	/// the zenoh session this observer belongs to
62	session: Arc<Session>,
63	/// The observers key expression
64	selector: String,
65	/// Context for the Observer
66	context: Context<P>,
67	activation_state: OperationState,
68	/// callback for control request results
69	control_callback: ArcControlCallback<P>,
70	/// callback for responses
71	response_callback: ArcResponseCallback<P>,
72	/// timeout value
73	timeout: Duration,
74	handle: std::sync::Mutex<Option<JoinHandle<()>>>,
75}
76
77impl<P> core::fmt::Debug for Observer<P>
78where
79	P: Send + Sync + 'static,
80{
81	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82		f.debug_struct("Observer").finish_non_exhaustive()
83	}
84}
85
86impl<P> crate::traits::Observer for Observer<P>
87where
88	P: Send + Sync + 'static,
89{
90	/// Get `selector`
91	fn selector(&self) -> &str {
92		&self.selector
93	}
94
95	/// Cancel a running observation
96	#[instrument(level = Level::ERROR, skip_all)]
97	fn cancel(&self) -> Result<()> {
98		// TODO: make a proper "key: value" implementation
99		let selector = cancel_selector_from(&self.selector);
100		let builder = self
101			.session
102			.get(&selector)
103			.target(QueryTarget::All)
104			.consolidation(ConsolidationMode::None)
105			.timeout(self.timeout);
106
107		#[cfg(feature = "unstable")]
108		let builder = builder.allowed_destination(Locality::Any);
109
110		let query = builder
111			.wait()
112			.map_err(|source| Error::QueryCreation { source })?;
113
114		let mut unreached = true;
115		let mut retry_count = 0u8;
116
117		while unreached && retry_count <= 5 {
118			retry_count += 1;
119			while let Ok(reply) = query.recv() {
120				match reply.result() {
121					Ok(sample) => match sample.kind() {
122						SampleKind::Put => {
123							let ccb = self.control_callback.clone();
124							let ctx = self.context.clone();
125							let content: Vec<u8> = sample.payload().to_bytes().into_owned();
126							let response: ControlResponse = decode(&content)?;
127							if matches!(response, ControlResponse::Canceled) {
128								// without spawning possible deadlock when called inside an control response
129								tokio::spawn(async move {
130									let mut lock = ccb.lock().await;
131									if let Err(error) = lock(ctx.clone(), response).await {
132										error!("callback failed with {error}");
133									}
134								});
135							} else {
136								error!("unexpected response on cancelation");
137							};
138						}
139						SampleKind::Delete => {
140							error!("Delete in cancel");
141						}
142					},
143					Err(err) => error!("receive error: {:?})", err),
144				}
145				unreached = false;
146			}
147			if unreached {
148				if retry_count < 5 {
149					std::thread::sleep(self.timeout);
150				} else {
151					return Err(Error::AccessingObservable {
152						selector: self.selector.to_string(),
153					}
154					.into());
155				}
156			}
157		}
158		Ok(())
159	}
160
161	/// Request an observation with an optional [`Message`].
162	#[instrument(level = Level::ERROR, skip_all)]
163	fn request(&self, message: Option<Message>) -> Result<()> {
164		let session = self.session.clone();
165		// TODO: make a proper "key: value" implementation
166		let selector = request_selector_from(&self.selector);
167		let mut query = session
168			.get(&selector)
169			.target(QueryTarget::All)
170			.consolidation(ConsolidationMode::None)
171			.timeout(self.timeout);
172
173		if let Some(message) = message {
174			let value = message.value().to_owned();
175			query = query.payload(value);
176		};
177
178		#[cfg(feature = "unstable")]
179		let query = query.allowed_destination(Locality::Any);
180
181		let query = query
182			.wait()
183			.map_err(|source| Error::QueryCreation { source })?;
184
185		let mut unreached = true;
186		let mut retry_count = 0u8;
187
188		while unreached && retry_count <= 5 {
189			retry_count += 1;
190			while let Ok(reply) = query.recv() {
191				let session = session.clone();
192				match reply.result() {
193					Ok(sample) => match sample.kind() {
194						SampleKind::Put => {
195							let content: Vec<u8> = sample.payload().to_bytes().into_owned();
196							decode::<ControlResponse>(&content).map_or_else(
197								|_| todo!(),
198								|response| {
199									if matches!(response, ControlResponse::Accepted) {
200										let ctx = self.context.clone();
201										// use "<query_selector>/feedback/<source_id/replier_id>" as key
202										// in case there is no source_id/replier_id, listen on all id's
203										#[cfg(not(feature = "unstable"))]
204										let source_id = "*".to_string();
205										#[cfg(feature = "unstable")]
206										let source_id = reply.result().map_or_else(
207											|_| {
208												reply.replier_id().map_or_else(
209													|| "*".to_string(),
210													|id| id.to_string(),
211												)
212											},
213											|sample| {
214												sample.source_info().source_id().map_or_else(
215													|| {
216														reply.replier_id().map_or_else(
217															|| "*".to_string(),
218															|id| id.to_string(),
219														)
220													},
221													|id| id.zid().to_string(),
222												)
223											},
224										);
225										let selector =
226											feedback_selector_from(&self.selector, &source_id);
227
228										let rcb = self.response_callback.clone();
229										tokio::task::spawn(async move {
230											if let Err(error) =
231												run_observation(session, selector, ctx, rcb).await
232											{
233												error!("observation failed with {error}");
234											};
235										});
236									};
237									// call control callback
238									let ctx = self.context.clone();
239									let ccb = self.control_callback.clone();
240									tokio::task::spawn(async move {
241										let mut lock = ccb.lock().await;
242										if let Err(error) = lock(ctx, response).await {
243											error!("control callback failed with {error}");
244										}
245									});
246								},
247							);
248						}
249						SampleKind::Delete => {
250							error!("Delete in request response");
251						}
252					},
253					Err(err) => error!("request response error: {:?})", err),
254				};
255				unreached = false;
256			}
257			if unreached {
258				if retry_count < 5 {
259					std::thread::sleep(self.timeout);
260				} else {
261					return Err(Error::AccessingObservable {
262						selector: self.selector.to_string(),
263					}
264					.into());
265				}
266			}
267		}
268		Ok(())
269	}
270}
271
272impl<P> Capability for Observer<P>
273where
274	P: Send + Sync + 'static,
275{
276	fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
277		if state >= &self.activation_state {
278			return self.init();
279		} else if state < &self.activation_state {
280			return self.de_init();
281		}
282		Ok(())
283	}
284}
285
286impl<P> Observer<P>
287where
288	P: Send + Sync + 'static,
289{
290	/// Constructor for an [`Observer`]
291	#[must_use]
292	pub fn new(
293		session: Arc<Session>,
294		selector: String,
295		context: Context<P>,
296		activation_state: OperationState,
297		control_callback: ArcControlCallback<P>,
298		response_callback: ArcResponseCallback<P>,
299		timeout: Duration,
300	) -> Self {
301		Self {
302			session,
303			selector,
304			context,
305			activation_state,
306			control_callback,
307			response_callback,
308			timeout,
309			handle: std::sync::Mutex::new(None),
310		}
311	}
312
313	/// Initialize
314	/// # Errors
315	///
316	#[instrument(level = Level::TRACE, skip_all)]
317	fn init(&self) -> Result<()> {
318		self.de_init()
319	}
320
321	/// De-Initialize
322	/// # Errors
323	///
324	#[allow(clippy::unnecessary_wraps)]
325	fn de_init(&self) -> Result<()> {
326		// cancel current request before stopping
327		let _ = crate::traits::Observer::cancel(self);
328		self.handle.lock().map_or_else(
329			|_| todo!(),
330			|mut handle| {
331				handle.take();
332				Ok(())
333			},
334		)
335	}
336}
337// endregion:	--- Observer
338
339// region:		--- functions
340#[allow(clippy::significant_drop_in_scrutinee)]
341#[instrument(name="observation", level = Level::ERROR, skip_all)]
342async fn run_observation<P>(
343	session: Arc<Session>,
344	selector: String,
345	ctx: Context<P>,
346	rcb: ArcResponseCallback<P>,
347) -> Result<()> {
348	// create the feedback subscriber
349	let subscriber = session.declare_subscriber(&selector).await?;
350
351	loop {
352		match subscriber.recv_async().await {
353			// feedback from observable
354			Ok(sample) => {
355				match sample.kind() {
356					SampleKind::Put => {
357						let content: Vec<u8> = sample.payload().to_bytes().into_owned();
358						match decode::<ObservableResponse>(&content) {
359							Ok(response) => {
360								// remember to stop loop on anything that is not feedback
361								let stop = !matches!(response, ObservableResponse::Feedback(_));
362								let ctx = ctx.clone();
363								if let Err(error) = rcb.lock().await(ctx, response).await {
364									error!("response callback failed with {error}");
365								};
366								if stop {
367									break;
368								};
369							}
370							Err(_) => todo!(),
371						};
372					}
373					SampleKind::Delete => {
374						error!("unexpected delete in observation response");
375					}
376				}
377			}
378			Err(err) => {
379				error!("observation response with {err}");
380			}
381		}
382	}
383	Ok(())
384}
385// endregion:	--- functions
386
387#[cfg(test)]
388mod tests {
389	use super::*;
390
391	#[derive(Debug)]
392	struct Props {}
393
394	// check, that the auto traits are available
395	const fn is_normal<T: Sized + Send + Sync>() {}
396
397	#[test]
398	const fn normal_types() {
399		is_normal::<Observer<Props>>();
400	}
401}