1#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11use crate::error::Error;
13use crate::{
14 traits::Responder,
15 zenoh::observable::{
16 ArcControlCallback, ArcExecutionCallback, ArcFeedbackCallback, ControlCallback,
17 ExecutionCallback, FeedbackCallback, Observable,
18 },
19};
20use alloc::{
21 boxed::Box,
22 string::{String, ToString},
23 sync::Arc,
24};
25use core::time::Duration;
26use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
27use dimas_core::{
28 Result,
29 enums::OperationState,
30 message_types::{ControlResponse, Message},
31 traits::Context,
32 utils::selector_from,
33};
34use futures::future::{BoxFuture, Future};
35#[cfg(feature = "std")]
36use std::{collections::HashMap, sync::RwLock};
37#[cfg(feature = "std")]
38use tokio::sync::Mutex;
39pub struct ObservableBuilder<P, K, CC, FC, EF, S>
44where
45 P: Send + Sync + 'static,
46{
47 session_id: String,
48 context: Context<P>,
50 activation_state: OperationState,
51 feedback_interval: Duration,
52 selector: K,
53 control_callback: CC,
54 feedback_callback: FC,
55 execution_callback: EF,
56 storage: S,
57}
58
59impl<P> ObservableBuilder<P, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>
60where
61 P: Send + Sync + 'static,
62{
63 #[must_use]
65 pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
66 Self {
67 session_id: session_id.into(),
68 context,
69 activation_state: OperationState::Active,
70 feedback_interval: Duration::from_millis(100),
71 selector: NoSelector,
72 control_callback: NoCallback,
73 feedback_callback: NoCallback,
74 execution_callback: NoCallback,
75 storage: NoStorage,
76 }
77 }
78}
79
80impl<P, K, CC, FC, EC, S> ObservableBuilder<P, K, CC, FC, EC, S>
81where
82 P: Send + Sync + 'static,
83{
84 #[must_use]
86 pub const fn activation_state(mut self, state: OperationState) -> Self {
87 self.activation_state = state;
88 self
89 }
90
91 #[must_use]
93 pub const fn feedback_interval(mut self, interval: Duration) -> Self {
94 self.feedback_interval = interval;
95 self
96 }
97
98 #[must_use]
100 pub fn session_id(mut self, session_id: &str) -> Self {
101 self.session_id = session_id.into();
102 self
103 }
104}
105
106impl<P, CC, FC, EF, S> ObservableBuilder<P, NoSelector, CC, FC, EF, S>
107where
108 P: Send + Sync + 'static,
109{
110 #[must_use]
112 pub fn selector(self, selector: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
113 let Self {
114 session_id,
115 context,
116 activation_state,
117 feedback_interval,
118 storage,
119 control_callback,
120 feedback_callback,
121 execution_callback,
122 ..
123 } = self;
124 ObservableBuilder {
125 session_id,
126 context,
127 activation_state,
128 feedback_interval,
129 selector: Selector {
130 selector: selector.into(),
131 },
132 control_callback,
133 feedback_callback,
134 execution_callback,
135 storage,
136 }
137 }
138
139 #[must_use]
142 pub fn topic(self, topic: &str) -> ObservableBuilder<P, Selector, CC, FC, EF, S> {
143 let selector = selector_from(topic, self.context.prefix());
144 self.selector(&selector)
145 }
146}
147
148impl<P, K, FC, EF, S> ObservableBuilder<P, K, NoCallback, FC, EF, S>
149where
150 P: Send + Sync + 'static,
151{
152 #[must_use]
154 pub fn control_callback<C, F>(
155 self,
156 mut callback: C,
157 ) -> ObservableBuilder<P, K, Callback<ArcControlCallback<P>>, FC, EF, S>
158 where
159 C: FnMut(Context<P>, Message) -> F + Send + Sync + 'static,
160 F: Future<Output = Result<ControlResponse>> + Send + Sync + 'static,
161 {
162 let Self {
163 session_id,
164 context,
165 activation_state,
166 feedback_interval,
167 selector,
168 storage,
169 feedback_callback,
170 execution_callback,
171 ..
172 } = self;
173 let callback: ControlCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
174 let callback: ArcControlCallback<P> = Arc::new(Mutex::new(callback));
175 ObservableBuilder {
176 session_id,
177 context,
178 activation_state,
179 feedback_interval,
180 selector,
181 control_callback: Callback { callback },
182 feedback_callback,
183 execution_callback,
184 storage,
185 }
186 }
187}
188
189impl<P, K, CC, EF, S> ObservableBuilder<P, K, CC, NoCallback, EF, S>
190where
191 P: Send + Sync + 'static,
192{
193 #[must_use]
195 pub fn feedback_callback<C, F>(
196 self,
197 mut callback: C,
198 ) -> ObservableBuilder<P, K, CC, Callback<ArcFeedbackCallback<P>>, EF, S>
199 where
200 C: FnMut(Context<P>) -> F + Send + Sync + 'static,
201 F: Future<Output = Result<Message>> + Send + Sync + 'static,
202 {
203 let Self {
204 session_id,
205 context,
206 activation_state,
207 feedback_interval,
208 selector,
209 storage,
210 control_callback,
211 execution_callback,
212 ..
213 } = self;
214 let callback: FeedbackCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
215 let callback: ArcFeedbackCallback<P> = Arc::new(Mutex::new(callback));
216 ObservableBuilder {
217 session_id,
218 context,
219 activation_state,
220 feedback_interval,
221 selector,
222 control_callback,
223 feedback_callback: Callback { callback },
224 execution_callback,
225 storage,
226 }
227 }
228}
229
230impl<P, K, CC, FC, S> ObservableBuilder<P, K, CC, FC, NoCallback, S>
231where
232 P: Send + Sync + 'static,
233{
234 #[must_use]
236 pub fn execution_callback<C, F>(
237 self,
238 mut callback: C,
239 ) -> ObservableBuilder<P, K, CC, FC, Callback<ArcExecutionCallback<P>>, S>
240 where
241 C: FnMut(Context<P>) -> F + Send + Sync + 'static,
242 F: Future<Output = Result<Message>> + Send + Sync + 'static,
243 {
244 let Self {
245 session_id,
246 context,
247 activation_state,
248 feedback_interval,
249 selector,
250 storage,
251 control_callback,
252 feedback_callback,
253 ..
254 } = self;
255 let callback: ExecutionCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
256 let callback = Arc::new(Mutex::new(callback));
257 ObservableBuilder {
258 session_id,
259 context,
260 activation_state,
261 feedback_interval,
262 selector,
263 control_callback,
264 feedback_callback,
265 execution_callback: Callback { callback },
266 storage,
267 }
268 }
269}
270
271impl<P, K, CC, FC, EF> ObservableBuilder<P, K, CC, FC, EF, NoStorage>
272where
273 P: Send + Sync + 'static,
274{
275 #[must_use]
277 pub fn storage(
278 self,
279 storage: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
280 ) -> ObservableBuilder<P, K, CC, FC, EF, Storage<Box<dyn Responder>>> {
281 let Self {
282 session_id,
283 context,
284 activation_state,
285 feedback_interval,
286 selector,
287 control_callback,
288 feedback_callback,
289 execution_callback,
290 ..
291 } = self;
292 ObservableBuilder {
293 session_id,
294 context,
295 activation_state,
296 feedback_interval,
297 selector,
298 control_callback,
299 feedback_callback,
300 execution_callback,
301 storage: Storage { storage },
302 }
303 }
304}
305
306impl<P, S>
307 ObservableBuilder<
308 P,
309 Selector,
310 Callback<ArcControlCallback<P>>,
311 Callback<ArcFeedbackCallback<P>>,
312 Callback<
313 Arc<
314 Mutex<
315 Box<dyn FnMut(Context<P>) -> BoxFuture<'static, Result<Message>> + Send + Sync>,
316 >,
317 >,
318 >,
319 S,
320 >
321where
322 P: Send + Sync + 'static,
323{
324 pub fn build(self) -> Result<Observable<P>> {
328 let Self {
329 session_id,
330 context,
331 activation_state,
332 feedback_interval,
333 selector,
334 control_callback,
335 feedback_callback,
336 execution_callback,
337 ..
338 } = self;
339 let session = context
340 .session(&session_id)
341 .ok_or_else(|| Error::NoZenohSession)?;
342 Ok(Observable::new(
343 session,
344 selector.selector,
345 context,
346 activation_state,
347 feedback_interval,
348 control_callback.callback,
349 feedback_callback.callback,
350 execution_callback.callback,
351 ))
352 }
353}
354
355impl<P>
356 ObservableBuilder<
357 P,
358 Selector,
359 Callback<ArcControlCallback<P>>,
360 Callback<ArcFeedbackCallback<P>>,
361 Callback<ArcExecutionCallback<P>>,
362 Storage<Box<dyn Responder>>,
363 >
364where
365 P: Send + Sync + 'static,
366{
367 pub fn add(self) -> Result<Option<Box<dyn Responder>>> {
371 let collection = self.storage.storage.clone();
372 let q = self.build()?;
373
374 let r = collection
375 .write()
376 .map_err(|_| Error::MutexPoison(String::from("ObservableBuilder")))?
377 .insert(q.selector().to_string(), Box::new(q));
378 Ok(r)
379 }
380}
381#[cfg(test)]
384mod tests {
385 use super::*;
386
387 #[derive(Debug)]
388 struct Props {}
389
390 const fn is_normal<T: Sized + Send + Sync>() {}
392
393 #[test]
394 const fn normal_types() {
395 is_normal::<
396 ObservableBuilder<Props, NoSelector, NoCallback, NoCallback, NoCallback, NoStorage>,
397 >();
398 }
399}