dimas_com/zenoh/
observer.rs

1// Copyright © 2024 Stephan Kunz
2
3#[doc(hidden)]
4extern crate alloc;
5
6#[cfg(feature = "std")]
7extern crate std;
8
9// region:		--- modules
10use alloc::sync::Arc;
11use alloc::{
12	borrow::ToOwned,
13	boxed::Box,
14	string::{String, ToString},
15	vec::Vec,
16};
17use bitcode::decode;
18use core::time::Duration;
19use dimas_core::{
20	Result,
21	enums::OperationState,
22	message_types::{ControlResponse, Message, ObservableResponse},
23	traits::{Capability, Context},
24	utils::{cancel_selector_from, feedback_selector_from, request_selector_from},
25};
26use futures::future::BoxFuture;
27#[cfg(feature = "std")]
28use tokio::{sync::Mutex, task::JoinHandle};
29use tracing::{Level, error, instrument, warn};
30use zenoh::Session;
31#[cfg(feature = "unstable")]
32use zenoh::sample::Locality;
33use zenoh::{
34	Wait,
35	query::{ConsolidationMode, QueryTarget},
36	sample::SampleKind,
37};
38
39use crate::error::Error;
40// endregion:	--- modules
41
42// region:    	--- types
43/// Type definition for an observers `control` callback
44pub type ControlCallback<P> =
45	Box<dyn FnMut(Context<P>, ControlResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
46/// Type definition for an observers atomic reference counted `control` callback
47pub type ArcControlCallback<P> = Arc<Mutex<ControlCallback<P>>>;
48/// Type definition for an observers `response` callback
49pub type ResponseCallback<P> =
50	Box<dyn FnMut(Context<P>, ObservableResponse) -> BoxFuture<'static, Result<()>> + Send + Sync>;
51/// Type definition for an observers atomic reference counted `response` callback
52pub type ArcResponseCallback<P> = Arc<Mutex<ResponseCallback<P>>>;
53// endregion: 	--- types
54
55// region:		--- Observer
56/// Observer
57pub struct Observer<P>
58where
59	P: Send + Sync + 'static,
60{
61	/// the zenoh session this observer belongs to
62	session: Arc<Session>,
63	/// The observers key expression
64	selector: String,
65	/// Context for the Observer
66	context: Context<P>,
67	activation_state: OperationState,
68	/// callback for control request results
69	control_callback: ArcControlCallback<P>,
70	/// callback for responses
71	response_callback: ArcResponseCallback<P>,
72	/// timeout value
73	timeout: Duration,
74	handle: std::sync::Mutex<Option<JoinHandle<()>>>,
75}
76
77impl<P> core::fmt::Debug for Observer<P>
78where
79	P: Send + Sync + 'static,
80{
81	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82		f.debug_struct("Observer").finish_non_exhaustive()
83	}
84}
85
86impl<P> crate::traits::Observer for Observer<P>
87where
88	P: Send + Sync + 'static,
89{
90	/// Get `selector`
91	fn selector(&self) -> &str {
92		&self.selector
93	}
94
95	/// Cancel a running observation
96	#[allow(clippy::cognitive_complexity)]
97	#[instrument(level = Level::ERROR, skip_all)]
98	fn cancel(&self) -> Result<()> {
99		// TODO: make a proper "key: value" implementation
100		let selector = cancel_selector_from(&self.selector);
101		let builder = self
102			.session
103			.get(&selector)
104			.target(QueryTarget::All)
105			.consolidation(ConsolidationMode::None)
106			.timeout(self.timeout);
107
108		#[cfg(feature = "unstable")]
109		let builder = builder.allowed_destination(Locality::Any);
110
111		let query = builder
112			.wait()
113			.map_err(|source| Error::QueryCreation { source })?;
114
115		let mut unreached = true;
116		let mut retry_count = 0u8;
117
118		while unreached && retry_count <= 5 {
119			retry_count += 1;
120			while let Ok(reply) = query.recv() {
121				match reply.result() {
122					Ok(sample) => match sample.kind() {
123						SampleKind::Put => {
124							let ccb = self.control_callback.clone();
125							let ctx = self.context.clone();
126							let content: Vec<u8> = sample.payload().to_bytes().into_owned();
127							let response: ControlResponse = decode(&content)?;
128							if matches!(response, ControlResponse::Canceled) {
129								// without spawning possible deadlock when called inside an control response
130								tokio::spawn(async move {
131									let mut lock = ccb.lock().await;
132									if let Err(error) = lock(ctx.clone(), response).await {
133										error!("callback failed with {error}");
134									}
135								});
136							} else {
137								error!("unexpected response on cancelation");
138							}
139						}
140						SampleKind::Delete => {
141							error!("Delete in cancel");
142						}
143					},
144					Err(err) => error!("receive error: {:?})", err),
145				}
146				unreached = false;
147			}
148			if unreached {
149				if retry_count < 5 {
150					std::thread::sleep(self.timeout);
151				} else {
152					return Err(Error::AccessingObservable {
153						selector: self.selector.clone(),
154					}
155					.into());
156				}
157			}
158		}
159		Ok(())
160	}
161
162	/// Request an observation with an optional [`Message`].
163	#[allow(clippy::cognitive_complexity)]
164	#[instrument(level = Level::ERROR, skip_all)]
165	fn request(&self, message: Option<Message>) -> Result<()> {
166		let session = self.session.clone();
167		// TODO: make a proper "key: value" implementation
168		let selector = request_selector_from(&self.selector);
169		let mut query = session
170			.get(&selector)
171			.target(QueryTarget::All)
172			.consolidation(ConsolidationMode::None)
173			.timeout(self.timeout);
174
175		if let Some(message) = message {
176			let value = message.value().to_owned();
177			query = query.payload(value);
178		}
179
180		#[cfg(feature = "unstable")]
181		let query = query.allowed_destination(Locality::Any);
182
183		let query = query
184			.wait()
185			.map_err(|source| Error::QueryCreation { source })?;
186
187		let mut unreached = true;
188		let mut retry_count = 0u8;
189
190		while unreached && retry_count <= 5 {
191			retry_count += 1;
192			while let Ok(reply) = query.recv() {
193				let session = session.clone();
194				match reply.result() {
195					Ok(sample) => match sample.kind() {
196						SampleKind::Put => {
197							let content: Vec<u8> = sample.payload().to_bytes().into_owned();
198							decode::<ControlResponse>(&content).map_or_else(
199								|_| todo!(),
200								|response| {
201									if matches!(response, ControlResponse::Accepted) {
202										let ctx = self.context.clone();
203										// use "<query_selector>/feedback/<source_id/replier_id>" as key
204										// in case there is no source_id/replier_id, listen on all id's
205										#[cfg(not(feature = "unstable"))]
206										let source_id = "*".to_string();
207										#[cfg(feature = "unstable")]
208										let source_id = reply.result().map_or_else(
209											|_| {
210												reply.replier_id().map_or_else(
211													|| "*".to_string(),
212													|id| id.to_string(),
213												)
214											},
215											|sample| {
216												sample.source_info().source_id().map_or_else(
217													|| {
218														reply.replier_id().map_or_else(
219															|| "*".to_string(),
220															|id| id.to_string(),
221														)
222													},
223													|id| id.zid().to_string(),
224												)
225											},
226										);
227										let selector =
228											feedback_selector_from(&self.selector, &source_id);
229
230										let rcb = self.response_callback.clone();
231										tokio::task::spawn(async move {
232											if let Err(error) =
233												run_observation(session, selector, ctx, rcb).await
234											{
235												error!("observation failed with {error}");
236											}
237										});
238									}
239									// call control callback
240									let ctx = self.context.clone();
241									let ccb = self.control_callback.clone();
242									tokio::task::spawn(async move {
243										let mut lock = ccb.lock().await;
244										if let Err(error) = lock(ctx, response).await {
245											error!("control callback failed with {error}");
246										}
247									});
248								},
249							);
250						}
251						SampleKind::Delete => {
252							error!("Delete in request response");
253						}
254					},
255					Err(err) => error!("request response error: {:?})", err),
256				}
257				unreached = false;
258			}
259			if unreached {
260				if retry_count < 5 {
261					std::thread::sleep(self.timeout);
262				} else {
263					return Err(Error::AccessingObservable {
264						selector: self.selector.clone(),
265					}
266					.into());
267				}
268			}
269		}
270		Ok(())
271	}
272}
273
274impl<P> Capability for Observer<P>
275where
276	P: Send + Sync + 'static,
277{
278	fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
279		if state >= &self.activation_state {
280			return self.init();
281		} else if state < &self.activation_state {
282			return self.de_init();
283		}
284		Ok(())
285	}
286}
287
288impl<P> Observer<P>
289where
290	P: Send + Sync + 'static,
291{
292	/// Constructor for an [`Observer`]
293	#[must_use]
294	pub fn new(
295		session: Arc<Session>,
296		selector: String,
297		context: Context<P>,
298		activation_state: OperationState,
299		control_callback: ArcControlCallback<P>,
300		response_callback: ArcResponseCallback<P>,
301		timeout: Duration,
302	) -> Self {
303		Self {
304			session,
305			selector,
306			context,
307			activation_state,
308			control_callback,
309			response_callback,
310			timeout,
311			handle: std::sync::Mutex::new(None),
312		}
313	}
314
315	/// Initialize
316	/// # Errors
317	///
318	#[instrument(level = Level::TRACE, skip_all)]
319	fn init(&self) -> Result<()> {
320		self.de_init()
321	}
322
323	/// De-Initialize
324	/// # Errors
325	///
326	#[allow(clippy::unnecessary_wraps)]
327	fn de_init(&self) -> Result<()> {
328		// cancel current request before stopping
329		let _ = crate::traits::Observer::cancel(self);
330		self.handle.lock().map_or_else(
331			|_| todo!(),
332			|mut handle| {
333				handle.take();
334				Ok(())
335			},
336		)
337	}
338}
339// endregion:	--- Observer
340
341// region:		--- functions
342#[allow(clippy::significant_drop_in_scrutinee)]
343#[instrument(name="observation", level = Level::ERROR, skip_all)]
344async fn run_observation<P>(
345	session: Arc<Session>,
346	selector: String,
347	ctx: Context<P>,
348	rcb: ArcResponseCallback<P>,
349) -> Result<()> {
350	// create the feedback subscriber
351	let subscriber = session.declare_subscriber(&selector).await?;
352
353	loop {
354		match subscriber.recv_async().await {
355			// feedback from observable
356			Ok(sample) => {
357				match sample.kind() {
358					SampleKind::Put => {
359						let content: Vec<u8> = sample.payload().to_bytes().into_owned();
360						match decode::<ObservableResponse>(&content) {
361							Ok(response) => {
362								// remember to stop loop on anything that is not feedback
363								let stop = !matches!(response, ObservableResponse::Feedback(_));
364								let ctx = ctx.clone();
365								if let Err(error) = rcb.lock().await(ctx, response).await {
366									error!("response callback failed with {error}");
367								}
368								if stop {
369									break;
370								}
371							}
372							Err(_) => todo!(),
373						}
374					}
375					SampleKind::Delete => {
376						error!("unexpected delete in observation response");
377					}
378				}
379			}
380			Err(err) => {
381				error!("observation response with {err}");
382			}
383		}
384	}
385	Ok(())
386}
387// endregion:	--- functions
388
389#[cfg(test)]
390mod tests {
391	use super::*;
392
393	#[derive(Debug)]
394	struct Props {}
395
396	// check, that the auto traits are available
397	const fn is_normal<T: Sized + Send + Sync>() {}
398
399	#[test]
400	const fn normal_types() {
401		is_normal::<Observer<Props>>();
402	}
403}