dimas_com/builder/
observable_builder.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Module
4
5#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11// region:		--- modules
12use crate::error::Error;
13use crate::{
14	traits::Responder,
15	zenoh::observable::{
16		ArcControlCallback, ArcExecutionCallback, ArcFeedbackCallback, ControlCallback,
17		ExecutionCallback, FeedbackCallback, Observable,
18	},
19};
20use alloc::{
21	boxed::Box,
22	string::{String, ToString},
23	sync::Arc,
24};
25use core::time::Duration;
26use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
27use dimas_core::{
28	Result,
29	enums::OperationState,
30	message_types::{ControlResponse, Message},
31	traits::Context,
32	utils::selector_from,
33};
34use futures::future::{BoxFuture, Future};
35#[cfg(feature = "std")]
36use std::{collections::HashMap, sync::RwLock};
37#[cfg(feature = "std")]
38use tokio::sync::Mutex;
39// endregion:	--- modules
40
41// region:		--- ObservableBuilder
42/// The builder for an [`Observable`]
43pub struct ObservableBuilder<P, K, CC, FC, EF, S>
44where
45	P: Send + Sync + 'static,
46{
47	session_id: String,
48	/// Context for the `ObservableBuilder`
49	context: Context<P>,
50	activation_state: OperationState,
51	feedback_interval: Duration,
52	selector: K,
53	control_callback: CC,
54	feedback_callback: FC,
55	execution_callback: EF,
56	storage: S,
57}
58
59impl<P> ObservableBuilder<P, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>
60where
61	P: Send + Sync + 'static,
62{
63	/// Construct a `ObservableBuilder` in initial state
64	#[must_use]
65	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
66		Self {
67			session_id: session_id.into(),
68			context,
69			activation_state: OperationState::Active,
70			feedback_interval: Duration::from_millis(100),
71			selector: NoSelector,
72			control_callback: NoCallback,
73			feedback_callback: NoCallback,
74			execution_callback: NoCallback,
75			storage: NoStorage,
76		}
77	}
78}
79
80impl<P, K, CC, FC, EC, S> ObservableBuilder<P, K, CC, FC, EC, S>
81where
82	P: Send + Sync + 'static,
83{
84	/// Set the activation state.
85	#[must_use]
86	pub const fn activation_state(mut self, state: OperationState) -> Self {
87		self.activation_state = state;
88		self
89	}
90
91	/// Set the feedback interval.
92	#[must_use]
93	pub const fn feedback_interval(mut self, interval: Duration) -> Self {
94		self.feedback_interval = interval;
95		self
96	}
97
98	/// Set the session id.
99	#[must_use]
100	pub fn session_id(mut self, session_id: &str) -> Self {
101		self.session_id = session_id.into();
102		self
103	}
104}
105
106impl<P, CC, FC, EF, S> ObservableBuilder<P, NoSelector, CC, FC, EF, S>
107where
108	P: Send + Sync + 'static,
109{
110	/// Set the full expression for the [`Observable`].
111	#[must_use]
112	pub fn selector(self, selector: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
113		let Self {
114			session_id,
115			context,
116			activation_state,
117			feedback_interval,
118			storage,
119			control_callback,
120			feedback_callback,
121			execution_callback,
122			..
123		} = self;
124		ObservableBuilder {
125			session_id,
126			context,
127			activation_state,
128			feedback_interval,
129			selector: Selector {
130				selector: selector.into(),
131			},
132			control_callback,
133			feedback_callback,
134			execution_callback,
135			storage,
136		}
137	}
138
139	/// Set only the topic of the [`Observable`].
140	/// Will be prefixed with agents prefix.
141	#[must_use]
142	pub fn topic(self, topic: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
143		let selector = selector_from(topic, self.context.prefix());
144		self.selector(&selector)
145	}
146}
147
148impl<P, K, FC, EF, S> ObservableBuilder<P, K, NoCallback, FC, EF, S>
149where
150	P: Send + Sync + 'static,
151{
152	/// Set callback for control messages
153	#[must_use]
154	pub fn control_callback<C, F>(
155		self,
156		mut callback: C,
157	) -> ObservableBuilder<P, K, Callback<ArcControlCallback<P>>, FC, EF, S>
158	where
159		C: FnMut(Context<P>, Message) -> F + Send + Sync + 'static,
160		F: Future<Output = Result<ControlResponse>> + Send + Sync + 'static,
161	{
162		let Self {
163			session_id,
164			context,
165			activation_state,
166			feedback_interval,
167			selector,
168			storage,
169			feedback_callback,
170			execution_callback,
171			..
172		} = self;
173		let callback: ControlCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
174		let callback: ArcControlCallback<P> = Arc::new(Mutex::new(callback));
175		ObservableBuilder {
176			session_id,
177			context,
178			activation_state,
179			feedback_interval,
180			selector,
181			control_callback: Callback { callback },
182			feedback_callback,
183			execution_callback,
184			storage,
185		}
186	}
187}
188
189impl<P, K, CC, EF, S> ObservableBuilder<P, K, CC, NoCallback, EF, S>
190where
191	P: Send + Sync + 'static,
192{
193	/// Set callback for feedback messages
194	#[must_use]
195	pub fn feedback_callback<C, F>(
196		self,
197		mut callback: C,
198	) -> ObservableBuilder<P, K, CC, Callback<ArcFeedbackCallback<P>>, EF, S>
199	where
200		C: FnMut(Context<P>) -> F + Send + Sync + 'static,
201		F: Future<Output = Result<Message>> + Send + Sync + 'static,
202	{
203		let Self {
204			session_id,
205			context,
206			activation_state,
207			feedback_interval,
208			selector,
209			storage,
210			control_callback,
211			execution_callback,
212			..
213		} = self;
214		let callback: FeedbackCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
215		let callback: ArcFeedbackCallback<P> = Arc::new(Mutex::new(callback));
216		ObservableBuilder {
217			session_id,
218			context,
219			activation_state,
220			feedback_interval,
221			selector,
222			control_callback,
223			feedback_callback: Callback { callback },
224			execution_callback,
225			storage,
226		}
227	}
228}
229
230impl<P, K, CC, FC, S> ObservableBuilder<P, K, CC, FC, NoCallback, S>
231where
232	P: Send + Sync + 'static,
233{
234	/// Set execution function
235	#[must_use]
236	pub fn execution_callback<C, F>(
237		self,
238		mut callback: C,
239	) -> ObservableBuilder<P, K, CC, FC, Callback<ArcExecutionCallback<P>>, S>
240	where
241		C: FnMut(Context<P>) -> F + Send + Sync + 'static,
242		F: Future<Output = Result<Message>> + Send + Sync + 'static,
243	{
244		let Self {
245			session_id,
246			context,
247			activation_state,
248			feedback_interval,
249			selector,
250			storage,
251			control_callback,
252			feedback_callback,
253			..
254		} = self;
255		let callback: ExecutionCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
256		let callback = Arc::new(Mutex::new(callback));
257		ObservableBuilder {
258			session_id,
259			context,
260			activation_state,
261			feedback_interval,
262			selector,
263			control_callback,
264			feedback_callback,
265			execution_callback: Callback { callback },
266			storage,
267		}
268	}
269}
270
271impl<P, K, CC, FC, EF> ObservableBuilder<P, K, CC, FC, EF, NoStorage>
272where
273	P: Send + Sync + 'static,
274{
275	/// Provide agents storage for the observable
276	#[must_use]
277	pub fn storage(
278		self,
279		storage: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
280	) -> ObservableBuilder<P, K, CC, FC, EF, Storage<Box<dyn Responder>>> {
281		let Self {
282			session_id,
283			context,
284			activation_state,
285			feedback_interval,
286			selector,
287			control_callback,
288			feedback_callback,
289			execution_callback,
290			..
291		} = self;
292		ObservableBuilder {
293			session_id,
294			context,
295			activation_state,
296			feedback_interval,
297			selector,
298			control_callback,
299			feedback_callback,
300			execution_callback,
301			storage: Storage { storage },
302		}
303	}
304}
305
306impl<P, S>
307	ObservableBuilder<
308		P,
309		Selector,
310		Callback<ArcControlCallback<P>>,
311		Callback<ArcFeedbackCallback<P>>,
312		Callback<
313			Arc<
314				Mutex<
315					Box<dyn FnMut(Context<P>) -> BoxFuture<'static, Result<Message>> + Send + Sync>,
316				>,
317			>,
318		>,
319		S,
320	>
321where
322	P: Send + Sync + 'static,
323{
324	/// Build the [`Observable`]
325	/// # Errors
326	///
327	pub fn build(self) -> Result<Observable<P>> {
328		let Self {
329			session_id,
330			context,
331			activation_state,
332			feedback_interval,
333			selector,
334			control_callback,
335			feedback_callback,
336			execution_callback,
337			..
338		} = self;
339		let session = context
340			.session(&session_id)
341			.ok_or_else(|| Error::NoZenohSession)?;
342		Ok(Observable::new(
343			session,
344			selector.selector,
345			context,
346			activation_state,
347			feedback_interval,
348			control_callback.callback,
349			feedback_callback.callback,
350			execution_callback.callback,
351		))
352	}
353}
354
355impl<P>
356	ObservableBuilder<
357		P,
358		Selector,
359		Callback<ArcControlCallback<P>>,
360		Callback<ArcFeedbackCallback<P>>,
361		Callback<ArcExecutionCallback<P>>,
362		Storage<Box<dyn Responder>>,
363	>
364where
365	P: Send + Sync + 'static,
366{
367	/// Build and add the observable to the agents context
368	/// # Errors
369	///
370	pub fn add(self) -> Result<Option<Box<dyn Responder>>> {
371		let collection = self.storage.storage.clone();
372		let q = self.build()?;
373
374		let r = collection
375			.write()
376			.map_err(|_| Error::MutexPoison(String::from("ObservableBuilder")))?
377			.insert(q.selector().to_string(), Box::new(q));
378		Ok(r)
379	}
380}
381// endregion:	--- ObservableBuilder
382
383#[cfg(test)]
384mod tests {
385	use super::*;
386
387	#[derive(Debug)]
388	struct Props {}
389
390	// check, that the auto traits are available
391	const fn is_normal<T: Sized + Send + Sync>() {}
392
393	#[test]
394	const fn normal_types() {
395		is_normal::<
396			ObservableBuilder<Props, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>,
397		>();
398	}
399}