dimas_com/builder/
querier_builder.rs

1// Copyright © 2023 Stephan Kunz
2
3//! Module `query` provides an information/compute requestor `Query` which can be created using the `QuerierBuilder`.
4
5#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11// region:		--- modules
12use crate::error::Error;
13use crate::traits::Querier as QuerierTrait;
14use crate::zenoh::querier::Querier;
15use alloc::{
16	boxed::Box,
17	string::{String, ToString},
18	sync::Arc,
19};
20use core::time::Duration;
21use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
22use dimas_core::{
23	Result, enums::OperationState, message_types::QueryableMsg, traits::Context,
24	utils::selector_from,
25};
26use futures::Future;
27#[cfg(feature = "std")]
28use std::{collections::HashMap, sync::RwLock};
29#[cfg(feature = "std")]
30use tokio::sync::Mutex;
31#[cfg(feature = "unstable")]
32use zenoh::sample::Locality;
33use zenoh::{
34	bytes::Encoding,
35	query::{ConsolidationMode, QueryTarget},
36};
37
38use crate::zenoh::querier::{ArcGetCallback, GetCallback};
39// endregion:	--- modules
40
41// region:		--- QuerierBuilder
42/// The builder for a query
43#[derive(Clone)]
44pub struct QuerierBuilder<P, K, C, S>
45where
46	P: Send + Sync + 'static,
47{
48	session_id: String,
49	context: Context<P>,
50	activation_state: OperationState,
51	#[cfg(feature = "unstable")]
52	allowed_destination: Locality,
53	encoding: String,
54	timeout: Duration,
55	selector: K,
56	callback: C,
57	storage: S,
58	mode: ConsolidationMode,
59	target: QueryTarget,
60}
61
62impl<P> QuerierBuilder<P, NoSelector, NoCallback, NoStorage>
63where
64	P: Send + Sync + 'static,
65{
66	/// Construct a `QuerierBuilder` in initial state
67	#[must_use]
68	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
69		Self {
70			session_id: session_id.into(),
71			context,
72			activation_state: OperationState::Active,
73			#[cfg(feature = "unstable")]
74			allowed_destination: Locality::Any,
75			encoding: Encoding::default().to_string(),
76			timeout: Duration::from_millis(100),
77			selector: NoSelector,
78			callback: NoCallback,
79			storage: NoStorage,
80			mode: ConsolidationMode::None,
81			target: QueryTarget::All,
82		}
83	}
84}
85
86impl<P, K, C, S> QuerierBuilder<P, K, C, S>
87where
88	P: Send + Sync + 'static,
89{
90	/// Set the activation state.
91	#[must_use]
92	pub const fn activation_state(mut self, state: OperationState) -> Self {
93		self.activation_state = state;
94		self
95	}
96
97	/// Set the [`ConsolidationMode`] of the [`Querier`].
98	#[must_use]
99	pub const fn mode(mut self, mode: ConsolidationMode) -> Self {
100		self.mode = mode;
101		self
102	}
103
104	/// Set the session id.
105	#[must_use]
106	pub fn session_id(mut self, session_id: &str) -> Self {
107		self.session_id = session_id.into();
108		self
109	}
110
111	/// Set the [`QueryTarget`] of the [`Querier`].
112	#[must_use]
113	pub const fn target(mut self, target: QueryTarget) -> Self {
114		self.target = target;
115		self
116	}
117
118	/// Set the allowed destination of the [`Querier`].
119	#[cfg(feature = "unstable")]
120	#[must_use]
121	pub const fn allowed_destination(mut self, allowed_destination: Locality) -> Self {
122		self.allowed_destination = allowed_destination;
123		self
124	}
125
126	/// Set the [`Querier`]s encoding
127	#[must_use]
128	pub fn encoding(mut self, encoding: String) -> Self {
129		self.encoding = encoding;
130		self
131	}
132
133	/// Set a timeout for the [`Querier`].
134	/// Default is 100ms
135	#[must_use]
136	pub const fn timeout(mut self, timeout: Duration) -> Self {
137		self.timeout = timeout;
138		self
139	}
140}
141
142impl<P, C, S> QuerierBuilder<P, NoSelector, C, S>
143where
144	P: Send + Sync + 'static,
145{
146	/// Set the full expression for the query
147	#[must_use]
148	pub fn selector(self, selector: &str) -> QuerierBuilder<P, Selector, C, S> {
149		let Self {
150			session_id,
151			context,
152			activation_state,
153			#[cfg(feature = "unstable")]
154			allowed_destination,
155			encoding,
156			timeout,
157			storage,
158			callback,
159			mode,
160			target,
161			..
162		} = self;
163		QuerierBuilder {
164			session_id,
165			context,
166			activation_state,
167			#[cfg(feature = "unstable")]
168			allowed_destination,
169			encoding,
170			timeout,
171			selector: Selector {
172				selector: selector.into(),
173			},
174			callback,
175			storage,
176			mode,
177			target,
178		}
179	}
180
181	/// Set only the message qualifing part of the query.
182	/// Will be prefixed with agents prefix.
183	#[must_use]
184	pub fn topic(self, topic: &str) -> QuerierBuilder<P, Selector, C, S> {
185		let selector = selector_from(topic, self.context.prefix());
186		self.selector(&selector)
187	}
188}
189
190impl<P, K, S> QuerierBuilder<P, K, NoCallback, S>
191where
192	P: Send + Sync + 'static,
193{
194	/// Set query callback for response messages
195	#[must_use]
196	pub fn callback<C, F>(
197		self,
198		mut callback: C,
199	) -> QuerierBuilder<P, K, Callback<ArcGetCallback<P>>, S>
200	where
201		C: FnMut(Context<P>, QueryableMsg) -> F + Send + Sync + 'static,
202		F: Future<Output = Result<()>> + Send + Sync + 'static,
203	{
204		let Self {
205			session_id,
206			context,
207			activation_state,
208			#[cfg(feature = "unstable")]
209			allowed_destination,
210			encoding,
211			timeout,
212			selector,
213			storage,
214			mode,
215			target,
216			..
217		} = self;
218		let callback: GetCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
219		let callback: ArcGetCallback<P> = Arc::new(Mutex::new(callback));
220		QuerierBuilder {
221			session_id,
222			context,
223			activation_state,
224			#[cfg(feature = "unstable")]
225			allowed_destination,
226			encoding,
227			timeout,
228			selector,
229			callback: Callback { callback },
230			storage,
231			mode,
232			target,
233		}
234	}
235}
236
237impl<P, K, C> QuerierBuilder<P, K, C, NoStorage>
238where
239	P: Send + Sync + 'static,
240{
241	/// Provide agents storage for the query
242	#[must_use]
243	pub fn storage(
244		self,
245		storage: Arc<RwLock<HashMap<String, Box<dyn QuerierTrait>>>>,
246	) -> QuerierBuilder<P, K, C, Storage<Box<dyn QuerierTrait>>> {
247		let Self {
248			session_id,
249			context,
250			activation_state,
251			#[cfg(feature = "unstable")]
252			allowed_destination,
253			encoding,
254			timeout,
255			selector,
256			callback,
257			mode,
258			target,
259			..
260		} = self;
261		QuerierBuilder {
262			session_id,
263			context,
264			activation_state,
265			#[cfg(feature = "unstable")]
266			allowed_destination,
267			encoding,
268			timeout,
269			selector,
270			callback,
271			storage: Storage { storage },
272			mode,
273			target,
274		}
275	}
276}
277
278impl<P, S> QuerierBuilder<P, Selector, Callback<ArcGetCallback<P>>, S>
279where
280	P: Send + Sync + 'static,
281{
282	/// Build the [`Querier`]
283	/// # Errors
284	///
285	pub fn build(self) -> Result<Querier<P>> {
286		let Self {
287			session_id,
288			context,
289			activation_state,
290			#[cfg(feature = "unstable")]
291			allowed_destination,
292			encoding,
293			timeout,
294			selector,
295			callback: response,
296			mode,
297			target,
298			..
299		} = self;
300		let selector = selector.selector;
301		let session = context
302			.session(&session_id)
303			.ok_or_else(|| Error::NoZenohSession)?;
304		Ok(Querier::new(
305			session,
306			selector,
307			context,
308			activation_state,
309			response.callback,
310			mode,
311			#[cfg(feature = "unstable")]
312			allowed_destination,
313			encoding,
314			target,
315			timeout,
316		))
317	}
318}
319
320impl<P> QuerierBuilder<P, Selector, Callback<ArcGetCallback<P>>, Storage<Box<dyn QuerierTrait>>>
321where
322	P: Send + Sync + 'static,
323{
324	/// Build and add the query to the agents context
325	/// # Errors
326	pub fn add(self) -> Result<Option<Box<dyn QuerierTrait>>> {
327		let collection = self.storage.storage.clone();
328		let q = self.build()?;
329
330		let r = collection
331			.write()
332			.map_err(|_| Error::MutexPoison(String::from("QuerierBuilder")))?
333			.insert(q.selector().to_string(), Box::new(q));
334		Ok(r)
335	}
336}
337// endregion:	--- QuerierBuilder
338
339#[cfg(test)]
340mod tests {
341	use super::*;
342
343	#[derive(Debug)]
344	struct Props {}
345
346	// check, that the auto traits are available
347	const fn is_normal<T: Sized + Send + Sync>() {}
348
349	#[test]
350	const fn normal_types() {
351		is_normal::<QuerierBuilder<Props, NoSelector, NoCallback, NoStorage>>();
352	}
353}