1#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11use crate::error::Error;
13use crate::traits::Querier as QuerierTrait;
14use crate::zenoh::querier::Querier;
15use alloc::{
16 boxed::Box,
17 string::{String, ToString},
18 sync::Arc,
19};
20use core::time::Duration;
21use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
22use dimas_core::{
23 Result, enums::OperationState, message_types::QueryableMsg, traits::Context,
24 utils::selector_from,
25};
26use futures::Future;
27#[cfg(feature = "std")]
28use std::{collections::HashMap, sync::RwLock};
29#[cfg(feature = "std")]
30use tokio::sync::Mutex;
31#[cfg(feature = "unstable")]
32use zenoh::sample::Locality;
33use zenoh::{
34 bytes::Encoding,
35 query::{ConsolidationMode, QueryTarget},
36};
37
38use crate::zenoh::querier::{ArcGetCallback, GetCallback};
39#[derive(Clone)]
44pub struct QuerierBuilder<P, K, C, S>
45where
46 P: Send + Sync + 'static,
47{
48 session_id: String,
49 context: Context<P>,
50 activation_state: OperationState,
51 #[cfg(feature = "unstable")]
52 allowed_destination: Locality,
53 encoding: String,
54 timeout: Duration,
55 selector: K,
56 callback: C,
57 storage: S,
58 mode: ConsolidationMode,
59 target: QueryTarget,
60}
61
62impl<P> QuerierBuilder<P, NoSelector, NoCallback, NoStorage>
63where
64 P: Send + Sync + 'static,
65{
66 #[must_use]
68 pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
69 Self {
70 session_id: session_id.into(),
71 context,
72 activation_state: OperationState::Active,
73 #[cfg(feature = "unstable")]
74 allowed_destination: Locality::Any,
75 encoding: Encoding::default().to_string(),
76 timeout: Duration::from_millis(100),
77 selector: NoSelector,
78 callback: NoCallback,
79 storage: NoStorage,
80 mode: ConsolidationMode::None,
81 target: QueryTarget::All,
82 }
83 }
84}
85
86impl<P, K, C, S> QuerierBuilder<P, K, C, S>
87where
88 P: Send + Sync + 'static,
89{
90 #[must_use]
92 pub const fn activation_state(mut self, state: OperationState) -> Self {
93 self.activation_state = state;
94 self
95 }
96
97 #[must_use]
99 pub const fn mode(mut self, mode: ConsolidationMode) -> Self {
100 self.mode = mode;
101 self
102 }
103
104 #[must_use]
106 pub fn session_id(mut self, session_id: &str) -> Self {
107 self.session_id = session_id.into();
108 self
109 }
110
111 #[must_use]
113 pub const fn target(mut self, target: QueryTarget) -> Self {
114 self.target = target;
115 self
116 }
117
118 #[cfg(feature = "unstable")]
120 #[must_use]
121 pub const fn allowed_destination(mut self, allowed_destination: Locality) -> Self {
122 self.allowed_destination = allowed_destination;
123 self
124 }
125
126 #[must_use]
128 pub fn encoding(mut self, encoding: String) -> Self {
129 self.encoding = encoding;
130 self
131 }
132
133 #[must_use]
136 pub const fn timeout(mut self, timeout: Duration) -> Self {
137 self.timeout = timeout;
138 self
139 }
140}
141
142impl<P, C, S> QuerierBuilder<P, NoSelector, C, S>
143where
144 P: Send + Sync + 'static,
145{
146 #[must_use]
148 pub fn selector(self, selector: &str) -> QuerierBuilder<P, Selector, C, S> {
149 let Self {
150 session_id,
151 context,
152 activation_state,
153 #[cfg(feature = "unstable")]
154 allowed_destination,
155 encoding,
156 timeout,
157 storage,
158 callback,
159 mode,
160 target,
161 ..
162 } = self;
163 QuerierBuilder {
164 session_id,
165 context,
166 activation_state,
167 #[cfg(feature = "unstable")]
168 allowed_destination,
169 encoding,
170 timeout,
171 selector: Selector {
172 selector: selector.into(),
173 },
174 callback,
175 storage,
176 mode,
177 target,
178 }
179 }
180
181 #[must_use]
184 pub fn topic(self, topic: &str) -> QuerierBuilder<P, Selector, C, S> {
185 let selector = selector_from(topic, self.context.prefix());
186 self.selector(&selector)
187 }
188}
189
190impl<P, K, S> QuerierBuilder<P, K, NoCallback, S>
191where
192 P: Send + Sync + 'static,
193{
194 #[must_use]
196 pub fn callback<C, F>(
197 self,
198 mut callback: C,
199 ) -> QuerierBuilder<P, K, Callback<ArcGetCallback<P>>, S>
200 where
201 C: FnMut(Context<P>, QueryableMsg) -> F + Send + Sync + 'static,
202 F: Future<Output = Result<()>> + Send + Sync + 'static,
203 {
204 let Self {
205 session_id,
206 context,
207 activation_state,
208 #[cfg(feature = "unstable")]
209 allowed_destination,
210 encoding,
211 timeout,
212 selector,
213 storage,
214 mode,
215 target,
216 ..
217 } = self;
218 let callback: GetCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
219 let callback: ArcGetCallback<P> = Arc::new(Mutex::new(callback));
220 QuerierBuilder {
221 session_id,
222 context,
223 activation_state,
224 #[cfg(feature = "unstable")]
225 allowed_destination,
226 encoding,
227 timeout,
228 selector,
229 callback: Callback { callback },
230 storage,
231 mode,
232 target,
233 }
234 }
235}
236
237impl<P, K, C> QuerierBuilder<P, K, C, NoStorage>
238where
239 P: Send + Sync + 'static,
240{
241 #[must_use]
243 pub fn storage(
244 self,
245 storage: Arc<RwLock<HashMap<String, Box<dyn QuerierTrait>>>>,
246 ) -> QuerierBuilder<P, K, C, Storage<Box<dyn QuerierTrait>>> {
247 let Self {
248 session_id,
249 context,
250 activation_state,
251 #[cfg(feature = "unstable")]
252 allowed_destination,
253 encoding,
254 timeout,
255 selector,
256 callback,
257 mode,
258 target,
259 ..
260 } = self;
261 QuerierBuilder {
262 session_id,
263 context,
264 activation_state,
265 #[cfg(feature = "unstable")]
266 allowed_destination,
267 encoding,
268 timeout,
269 selector,
270 callback,
271 storage: Storage { storage },
272 mode,
273 target,
274 }
275 }
276}
277
278impl<P, S> QuerierBuilder<P, Selector, Callback<ArcGetCallback<P>>, S>
279where
280 P: Send + Sync + 'static,
281{
282 pub fn build(self) -> Result<Querier<P>> {
286 let Self {
287 session_id,
288 context,
289 activation_state,
290 #[cfg(feature = "unstable")]
291 allowed_destination,
292 encoding,
293 timeout,
294 selector,
295 callback: response,
296 mode,
297 target,
298 ..
299 } = self;
300 let selector = selector.selector;
301 let session = context
302 .session(&session_id)
303 .ok_or_else(|| Error::NoZenohSession)?;
304 Ok(Querier::new(
305 session,
306 selector,
307 context,
308 activation_state,
309 response.callback,
310 mode,
311 #[cfg(feature = "unstable")]
312 allowed_destination,
313 encoding,
314 target,
315 timeout,
316 ))
317 }
318}
319
320impl<P> QuerierBuilder<P, Selector, Callback<ArcGetCallback<P>>, Storage<Box<dyn QuerierTrait>>>
321where
322 P: Send + Sync + 'static,
323{
324 pub fn add(self) -> Result<Option<Box<dyn QuerierTrait>>> {
327 let collection = self.storage.storage.clone();
328 let q = self.build()?;
329
330 let r = collection
331 .write()
332 .map_err(|_| Error::MutexPoison(String::from("QuerierBuilder")))?
333 .insert(q.selector().to_string(), Box::new(q));
334 Ok(r)
335 }
336}
337#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[derive(Debug)]
344 struct Props {}
345
346 const fn is_normal<T: Sized + Send + Sync>() {}
348
349 #[test]
350 const fn normal_types() {
351 is_normal::<QuerierBuilder<Props, NoSelector, NoCallback, NoStorage>>();
352 }
353}