dimas_com/builder/
observer_builder.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Module
4
5#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11// region:		--- modules
12use crate::error::Error;
13use crate::traits::Observer as ObserverTrait;
14use crate::zenoh::observer::{
15	ArcControlCallback, ArcResponseCallback, ControlCallback, Observer, ResponseCallback,
16};
17use alloc::{
18	boxed::Box,
19	string::{String, ToString},
20	sync::Arc,
21};
22use core::time::Duration;
23use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
24use dimas_core::{
25	Result,
26	enums::OperationState,
27	message_types::{ControlResponse, ObservableResponse},
28	traits::Context,
29	utils::selector_from,
30};
31use futures::future::Future;
32#[cfg(feature = "std")]
33use std::{collections::HashMap, sync::RwLock};
34#[cfg(feature = "std")]
35use tokio::sync::Mutex;
36// endregion:	--- modules
37
38// region:		--- ObserverBuilder
39/// The builder for an [`Observer`]
40pub struct ObserverBuilder<P, K, CC, RC, S>
41where
42	P: Send + Sync + 'static,
43{
44	session_id: String,
45	/// Context for the `ObserverBuilder`
46	context: Context<P>,
47	activation_state: OperationState,
48	timeout: Duration,
49	selector: K,
50	/// callback for observer request and cancelation
51	control_callback: CC,
52	/// callback for observer result
53	response_callback: RC,
54	storage: S,
55}
56
57impl<P> ObserverBuilder<P, NoSelector, NoCallback, NoCallback, NoStorage>
58where
59	P: Send + Sync + 'static,
60{
61	/// Construct an `ObserverBuilder` in initial state
62	#[must_use]
63	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
64		Self {
65			session_id: session_id.into(),
66			context,
67			activation_state: OperationState::Active,
68			timeout: Duration::from_millis(100),
69			selector: NoSelector,
70			control_callback: NoCallback,
71			response_callback: NoCallback,
72			storage: NoStorage,
73		}
74	}
75}
76
77impl<P, K, CC, RC, S> ObserverBuilder<P, K, CC, RC, S>
78where
79	P: Send + Sync + 'static,
80{
81	/// Set the activation state.
82	#[must_use]
83	pub const fn activation_state(mut self, state: OperationState) -> Self {
84		self.activation_state = state;
85		self
86	}
87
88	/// Set the session id.
89	#[must_use]
90	pub fn session_id(mut self, session_id: &str) -> Self {
91		self.session_id = session_id.into();
92		self
93	}
94}
95
96impl<P, CC, RC, S> ObserverBuilder<P, NoSelector, CC, RC, S>
97where
98	P: Send + Sync + 'static,
99{
100	/// Set the full key expression for the [`Observer`].
101	#[must_use]
102	pub fn selector(self, selector: &str) -> ObserverBuilder<P, Selector, CC, RC, S> {
103		let Self {
104			session_id,
105			context,
106			activation_state,
107			timeout,
108			control_callback,
109			response_callback,
110			storage,
111			..
112		} = self;
113		ObserverBuilder {
114			session_id,
115			context,
116			activation_state,
117			timeout,
118			selector: Selector {
119				selector: selector.into(),
120			},
121			control_callback,
122			response_callback,
123			storage,
124		}
125	}
126
127	/// Set a timeout for the [`Observer`].
128	/// Default is 100ms
129	#[must_use]
130	pub const fn timeout(mut self, timeout: Duration) -> Self {
131		self.timeout = timeout;
132		self
133	}
134
135	/// Set only the message qualifing part of the [`Observer`].
136	/// Will be prefixed with `Agent`s prefix.
137	#[must_use]
138	pub fn topic(self, topic: &str) -> ObserverBuilder<P, Selector, CC, RC, S> {
139		let selector = selector_from(topic, self.context.prefix());
140		self.selector(&selector)
141	}
142}
143
144impl<P, K, RC, S> ObserverBuilder<P, K, NoCallback, RC, S>
145where
146	P: Send + Sync + 'static,
147{
148	/// Set callback for messages
149	#[must_use]
150	pub fn control_callback<C, F>(
151		self,
152		mut callback: C,
153	) -> ObserverBuilder<P, K, Callback<ArcControlCallback<P>>, RC, S>
154	where
155		C: FnMut(Context<P>, ControlResponse) -> F + Send + Sync + 'static,
156		F: Future<Output = Result<()>> + Send + Sync + 'static,
157	{
158		let Self {
159			session_id,
160			context,
161			activation_state,
162			timeout,
163			selector,
164			response_callback,
165			storage,
166			..
167		} = self;
168		let callback: ControlCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
169		let callback: ArcControlCallback<P> = Arc::new(Mutex::new(callback));
170		ObserverBuilder {
171			session_id,
172			context,
173			activation_state,
174			timeout,
175			selector,
176			control_callback: Callback { callback },
177			response_callback,
178			storage,
179		}
180	}
181}
182
183impl<P, K, CC, S> ObserverBuilder<P, K, CC, NoCallback, S>
184where
185	P: Send + Sync + 'static,
186{
187	/// Set callback for response messages
188	#[must_use]
189	pub fn result_callback<C, F>(
190		self,
191		mut callback: C,
192	) -> ObserverBuilder<P, K, CC, Callback<ArcResponseCallback<P>>, S>
193	where
194		C: FnMut(Context<P>, ObservableResponse) -> F + Send + Sync + 'static,
195		F: Future<Output = Result<()>> + Send + Sync + 'static,
196	{
197		let Self {
198			session_id,
199			context,
200			activation_state,
201			timeout,
202			selector,
203			control_callback,
204			storage,
205			..
206		} = self;
207		let callback: ResponseCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
208		let callback: ArcResponseCallback<P> = Arc::new(Mutex::new(callback));
209		ObserverBuilder {
210			session_id,
211			context,
212			activation_state,
213			timeout,
214			selector,
215			control_callback,
216			response_callback: Callback { callback },
217			storage,
218		}
219	}
220}
221
222impl<P, K, CC, RC> ObserverBuilder<P, K, CC, RC, NoStorage>
223where
224	P: Send + Sync + 'static,
225{
226	/// Provide agents storage for the subscriber
227	#[must_use]
228	pub fn storage(
229		self,
230		storage: Arc<RwLock<HashMap<String, Box<dyn ObserverTrait>>>>,
231	) -> ObserverBuilder<P, K, CC, RC, Storage<Box<dyn ObserverTrait>>> {
232		let Self {
233			session_id,
234			context,
235			activation_state,
236			timeout,
237			selector,
238			control_callback,
239			response_callback,
240			..
241		} = self;
242		ObserverBuilder {
243			session_id,
244			context,
245			activation_state,
246			timeout,
247			selector,
248			control_callback,
249			response_callback,
250			storage: Storage { storage },
251		}
252	}
253}
254
255impl<P, S>
256	ObserverBuilder<P, Selector, Callback<ArcControlCallback<P>>, Callback<ArcResponseCallback<P>>, S>
257where
258	P: Send + Sync + 'static,
259{
260	/// Build the [`Observer`].
261	///
262	/// # Errors
263	/// Currently none
264	pub fn build(self) -> Result<Observer<P>> {
265		let Self {
266			session_id,
267			context,
268			timeout,
269			selector,
270			activation_state,
271			control_callback,
272			response_callback,
273			..
274		} = self;
275		let selector = selector.selector;
276		let session = context
277			.session(&session_id)
278			.ok_or_else(|| Error::NoZenohSession)?;
279		Ok(Observer::new(
280			session,
281			selector,
282			context,
283			activation_state,
284			control_callback.callback,
285			response_callback.callback,
286			timeout,
287		))
288	}
289}
290
291impl<P>
292	ObserverBuilder<
293		P,
294		Selector,
295		Callback<ArcControlCallback<P>>,
296		Callback<ArcResponseCallback<P>>,
297		Storage<Box<dyn ObserverTrait>>,
298	>
299where
300	P: Send + Sync + 'static,
301{
302	/// Build and add the [`Observer`] to the `Agent`.
303	///
304	/// # Errors
305	/// Currently none
306	pub fn add(self) -> Result<Option<Box<dyn ObserverTrait>>> {
307		let c = self.storage.storage.clone();
308		let s = self.build()?;
309
310		let r = c
311			.write()
312			.map_err(|_| Error::MutexPoison(String::from("ObserverBuilder")))?
313			.insert(s.selector().to_string(), Box::new(s));
314		Ok(r)
315	}
316}
317// endregion:	--- ObserverBuilder
318
319#[cfg(test)]
320mod tests {
321	use super::*;
322
323	#[derive(Debug)]
324	struct Props {}
325
326	// check, that the auto traits are available
327	const fn is_normal<T: Sized + Send + Sync>() {}
328
329	#[test]
330	const fn normal_types() {
331		is_normal::<ObserverBuilder<Props, NoSelector, NoCallback, NoCallback, NoStorage>>();
332	}
333}