1#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11use crate::error::Error;
13use crate::traits::Observer as ObserverTrait;
14use crate::zenoh::observer::{
15 ArcControlCallback, ArcResponseCallback, ControlCallback, Observer, ResponseCallback,
16};
17use alloc::{
18 boxed::Box,
19 string::{String, ToString},
20 sync::Arc,
21};
22use core::time::Duration;
23use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
24use dimas_core::{
25 Result,
26 enums::OperationState,
27 message_types::{ControlResponse, ObservableResponse},
28 traits::Context,
29 utils::selector_from,
30};
31use futures::future::Future;
32#[cfg(feature = "std")]
33use std::{collections::HashMap, sync::RwLock};
34#[cfg(feature = "std")]
35use tokio::sync::Mutex;
36pub struct ObserverBuilder<P, K, CC, RC, S>
41where
42 P: Send + Sync + 'static,
43{
44 session_id: String,
45 context: Context<P>,
47 activation_state: OperationState,
48 timeout: Duration,
49 selector: K,
50 control_callback: CC,
52 response_callback: RC,
54 storage: S,
55}
56
57impl<P> ObserverBuilder<P, NoSelector, NoCallback, NoCallback, NoStorage>
58where
59 P: Send + Sync + 'static,
60{
61 #[must_use]
63 pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
64 Self {
65 session_id: session_id.into(),
66 context,
67 activation_state: OperationState::Active,
68 timeout: Duration::from_millis(100),
69 selector: NoSelector,
70 control_callback: NoCallback,
71 response_callback: NoCallback,
72 storage: NoStorage,
73 }
74 }
75}
76
77impl<P, K, CC, RC, S> ObserverBuilder<P, K, CC, RC, S>
78where
79 P: Send + Sync + 'static,
80{
81 #[must_use]
83 pub const fn activation_state(mut self, state: OperationState) -> Self {
84 self.activation_state = state;
85 self
86 }
87
88 #[must_use]
90 pub fn session_id(mut self, session_id: &str) -> Self {
91 self.session_id = session_id.into();
92 self
93 }
94}
95
96impl<P, CC, RC, S> ObserverBuilder<P, NoSelector, CC, RC, S>
97where
98 P: Send + Sync + 'static,
99{
100 #[must_use]
102 pub fn selector(self, selector: &str) -> ObserverBuilder<P, Selector, CC, RC, S> {
103 let Self {
104 session_id,
105 context,
106 activation_state,
107 timeout,
108 control_callback,
109 response_callback,
110 storage,
111 ..
112 } = self;
113 ObserverBuilder {
114 session_id,
115 context,
116 activation_state,
117 timeout,
118 selector: Selector {
119 selector: selector.into(),
120 },
121 control_callback,
122 response_callback,
123 storage,
124 }
125 }
126
127 #[must_use]
130 pub const fn timeout(mut self, timeout: Duration) -> Self {
131 self.timeout = timeout;
132 self
133 }
134
135 #[must_use]
138 pub fn topic(self, topic: &str) -> ObserverBuilder<P, Selector, CC, RC, S> {
139 let selector = selector_from(topic, self.context.prefix());
140 self.selector(&selector)
141 }
142}
143
144impl<P, K, RC, S> ObserverBuilder<P, K, NoCallback, RC, S>
145where
146 P: Send + Sync + 'static,
147{
148 #[must_use]
150 pub fn control_callback<C, F>(
151 self,
152 mut callback: C,
153 ) -> ObserverBuilder<P, K, Callback<ArcControlCallback<P>>, RC, S>
154 where
155 C: FnMut(Context<P>, ControlResponse) -> F + Send + Sync + 'static,
156 F: Future<Output = Result<()>> + Send + Sync + 'static,
157 {
158 let Self {
159 session_id,
160 context,
161 activation_state,
162 timeout,
163 selector,
164 response_callback,
165 storage,
166 ..
167 } = self;
168 let callback: ControlCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
169 let callback: ArcControlCallback<P> = Arc::new(Mutex::new(callback));
170 ObserverBuilder {
171 session_id,
172 context,
173 activation_state,
174 timeout,
175 selector,
176 control_callback: Callback { callback },
177 response_callback,
178 storage,
179 }
180 }
181}
182
183impl<P, K, CC, S> ObserverBuilder<P, K, CC, NoCallback, S>
184where
185 P: Send + Sync + 'static,
186{
187 #[must_use]
189 pub fn result_callback<C, F>(
190 self,
191 mut callback: C,
192 ) -> ObserverBuilder<P, K, CC, Callback<ArcResponseCallback<P>>, S>
193 where
194 C: FnMut(Context<P>, ObservableResponse) -> F + Send + Sync + 'static,
195 F: Future<Output = Result<()>> + Send + Sync + 'static,
196 {
197 let Self {
198 session_id,
199 context,
200 activation_state,
201 timeout,
202 selector,
203 control_callback,
204 storage,
205 ..
206 } = self;
207 let callback: ResponseCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
208 let callback: ArcResponseCallback<P> = Arc::new(Mutex::new(callback));
209 ObserverBuilder {
210 session_id,
211 context,
212 activation_state,
213 timeout,
214 selector,
215 control_callback,
216 response_callback: Callback { callback },
217 storage,
218 }
219 }
220}
221
222impl<P, K, CC, RC> ObserverBuilder<P, K, CC, RC, NoStorage>
223where
224 P: Send + Sync + 'static,
225{
226 #[must_use]
228 pub fn storage(
229 self,
230 storage: Arc<RwLock<HashMap<String, Box<dyn ObserverTrait>>>>,
231 ) -> ObserverBuilder<P, K, CC, RC, Storage<Box<dyn ObserverTrait>>> {
232 let Self {
233 session_id,
234 context,
235 activation_state,
236 timeout,
237 selector,
238 control_callback,
239 response_callback,
240 ..
241 } = self;
242 ObserverBuilder {
243 session_id,
244 context,
245 activation_state,
246 timeout,
247 selector,
248 control_callback,
249 response_callback,
250 storage: Storage { storage },
251 }
252 }
253}
254
255impl<P, S>
256 ObserverBuilder<P, Selector, Callback<ArcControlCallback<P>>, Callback<ArcResponseCallback<P>>, S>
257where
258 P: Send + Sync + 'static,
259{
260 pub fn build(self) -> Result<Observer<P>> {
265 let Self {
266 session_id,
267 context,
268 timeout,
269 selector,
270 activation_state,
271 control_callback,
272 response_callback,
273 ..
274 } = self;
275 let selector = selector.selector;
276 let session = context
277 .session(&session_id)
278 .ok_or_else(|| Error::NoZenohSession)?;
279 Ok(Observer::new(
280 session,
281 selector,
282 context,
283 activation_state,
284 control_callback.callback,
285 response_callback.callback,
286 timeout,
287 ))
288 }
289}
290
291impl<P>
292 ObserverBuilder<
293 P,
294 Selector,
295 Callback<ArcControlCallback<P>>,
296 Callback<ArcResponseCallback<P>>,
297 Storage<Box<dyn ObserverTrait>>,
298 >
299where
300 P: Send + Sync + 'static,
301{
302 pub fn add(self) -> Result<Option<Box<dyn ObserverTrait>>> {
307 let c = self.storage.storage.clone();
308 let s = self.build()?;
309
310 let r = c
311 .write()
312 .map_err(|_| Error::MutexPoison(String::from("ObserverBuilder")))?
313 .insert(s.selector().to_string(), Box::new(s));
314 Ok(r)
315 }
316}
317#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[derive(Debug)]
324 struct Props {}
325
326 const fn is_normal<T: Sized + Send + Sync>() {}
328
329 #[test]
330 const fn normal_types() {
331 is_normal::<ObserverBuilder<Props, NoSelector, NoCallback, NoCallback, NoStorage>>();
332 }
333}