1#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11use alloc::{
13 boxed::Box,
14 string::{String, ToString},
15 sync::Arc,
16};
17use dimas_core::{
18 Result, enums::OperationState, message_types::QueryMsg, traits::Context, utils::selector_from,
19};
20use futures::future::Future;
21#[cfg(feature = "std")]
22use std::{collections::HashMap, sync::RwLock};
23#[cfg(feature = "std")]
24use tokio::sync::Mutex;
25#[cfg(feature = "unstable")]
26use zenoh::sample::Locality;
27
28use crate::error::Error;
29use crate::{
30 traits::Responder,
31 zenoh::queryable::{ArcGetCallback, GetCallback, Queryable},
32};
33use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
34#[derive(Clone)]
39pub struct QueryableBuilder<P, K, C, S>
40where
41 P: Send + Sync + 'static,
42{
43 session_id: String,
44 context: Context<P>,
45 activation_state: OperationState,
46 completeness: bool,
47 #[cfg(feature = "unstable")]
48 allowed_origin: Locality,
49 selector: K,
50 callback: C,
51 storage: S,
52}
53
54impl<P> QueryableBuilder<P, NoSelector, NoCallback, NoStorage>
55where
56 P: Send + Sync + 'static,
57{
58 #[must_use]
60 pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
61 Self {
62 session_id: session_id.into(),
63 context,
64 activation_state: OperationState::Active,
65 completeness: true,
66 #[cfg(feature = "unstable")]
67 allowed_origin: Locality::Any,
68 selector: NoSelector,
69 callback: NoCallback,
70 storage: NoStorage,
71 }
72 }
73}
74
75impl<P, K, C, S> QueryableBuilder<P, K, C, S>
76where
77 P: Send + Sync + 'static,
78{
79 #[must_use]
81 pub const fn activation_state(mut self, state: OperationState) -> Self {
82 self.activation_state = state;
83 self
84 }
85
86 #[must_use]
88 pub const fn completeness(mut self, completeness: bool) -> Self {
89 self.completeness = completeness;
90 self
91 }
92
93 #[cfg(feature = "unstable")]
95 #[must_use]
96 pub const fn allowed_origin(mut self, allowed_origin: Locality) -> Self {
97 self.allowed_origin = allowed_origin;
98 self
99 }
100
101 #[must_use]
103 pub fn session_id(mut self, session_id: &str) -> Self {
104 self.session_id = session_id.into();
105 self
106 }
107}
108
109impl<P, C, S> QueryableBuilder<P, NoSelector, C, S>
110where
111 P: Send + Sync + 'static,
112{
113 #[must_use]
115 pub fn selector(self, selector: &str) -> QueryableBuilder<P, Selector, C, S> {
116 let Self {
117 session_id,
118 context,
119 activation_state,
120 completeness,
121 #[cfg(feature = "unstable")]
122 allowed_origin,
123 storage,
124 callback,
125 ..
126 } = self;
127 QueryableBuilder {
128 session_id,
129 context,
130 activation_state,
131 completeness,
132 #[cfg(feature = "unstable")]
133 allowed_origin,
134 selector: Selector {
135 selector: selector.into(),
136 },
137 callback,
138 storage,
139 }
140 }
141
142 #[must_use]
145 pub fn topic(self, topic: &str) -> QueryableBuilder<P, Selector, C, S> {
146 let selector = selector_from(topic, self.context.prefix());
147 self.selector(&selector)
148 }
149}
150
151impl<P, K, S> QueryableBuilder<P, K, NoCallback, S>
152where
153 P: Send + Sync + 'static,
154{
155 #[must_use]
157 pub fn callback<C, F>(
158 self,
159 mut callback: C,
160 ) -> QueryableBuilder<P, K, Callback<ArcGetCallback<P>>, S>
161 where
162 C: FnMut(Context<P>, QueryMsg) -> F + Send + Sync + 'static,
163 F: Future<Output = Result<()>> + Send + Sync + 'static,
164 {
165 let Self {
166 session_id,
167 context,
168 activation_state,
169 completeness,
170 #[cfg(feature = "unstable")]
171 allowed_origin,
172 selector,
173 storage,
174 ..
175 } = self;
176 let callback: GetCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
177 let callback: ArcGetCallback<P> = Arc::new(Mutex::new(callback));
178 QueryableBuilder {
179 session_id,
180 context,
181 activation_state,
182 completeness,
183 #[cfg(feature = "unstable")]
184 allowed_origin,
185 selector,
186 callback: Callback { callback },
187 storage,
188 }
189 }
190}
191
192impl<P, K, C> QueryableBuilder<P, K, C, NoStorage>
193where
194 P: Send + Sync + 'static,
195{
196 #[must_use]
198 pub fn storage(
199 self,
200 storage: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
201 ) -> QueryableBuilder<P, K, C, Storage<Box<dyn Responder>>> {
202 let Self {
203 session_id,
204 context,
205 activation_state,
206 completeness,
207 #[cfg(feature = "unstable")]
208 allowed_origin,
209 selector,
210 callback,
211 ..
212 } = self;
213 QueryableBuilder {
214 session_id,
215 context,
216 activation_state,
217 completeness,
218 #[cfg(feature = "unstable")]
219 allowed_origin,
220 selector,
221 callback,
222 storage: Storage { storage },
223 }
224 }
225}
226
227impl<P, S> QueryableBuilder<P, Selector, Callback<ArcGetCallback<P>>, S>
228where
229 P: Send + Sync + 'static,
230{
231 pub fn build(self) -> Result<Queryable<P>> {
235 let Self {
236 session_id,
237 context,
238 activation_state,
239 completeness,
240 #[cfg(feature = "unstable")]
241 allowed_origin,
242 selector,
243 callback,
244 ..
245 } = self;
246 let selector = selector.selector;
247 let session = context
248 .session(&session_id)
249 .ok_or_else(|| Error::NoZenohSession)?;
250 Ok(Queryable::new(
251 session,
252 selector,
253 context,
254 activation_state,
255 callback.callback,
256 completeness,
257 #[cfg(feature = "unstable")]
258 allowed_origin,
259 ))
260 }
261}
262
263impl<P> QueryableBuilder<P, Selector, Callback<ArcGetCallback<P>>, Storage<Box<dyn Responder>>>
264where
265 P: Send + Sync + 'static,
266{
267 pub fn add(self) -> Result<Option<Box<dyn Responder>>> {
271 let collection = self.storage.storage.clone();
272 let q = self.build()?;
273
274 let r = collection
275 .write()
276 .map_err(|_| Error::MutexPoison(String::from("QueryableBuilder")))?
277 .insert(q.selector().to_string(), Box::new(q));
278 Ok(r)
279 }
280}
281#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[derive(Debug)]
288 struct Props {}
289
290 const fn is_normal<T: Sized + Send + Sync>() {}
292
293 #[test]
294 const fn normal_types() {
295 is_normal::<QueryableBuilder<Props, NoSelector, NoCallback, NoStorage>>();
296 }
297}