1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12use crate::error::Error;
14use crate::{
15 traits::LivelinessSubscriber as LivelinessSubscriberTrait,
16 zenoh::liveliness::{ArcLivelinessCallback, LivelinessCallback, LivelinessSubscriber},
17};
18use alloc::{boxed::Box, format, string::String, sync::Arc};
19use dimas_core::builder_states::{Callback, NoCallback, NoStorage, Storage};
20use dimas_core::{Result, enums::OperationState, traits::Context, utils::selector_from};
21use futures::future::Future;
22#[cfg(feature = "std")]
23use std::{collections::HashMap, sync::RwLock};
24#[cfg(feature = "std")]
25use tokio::sync::Mutex;
26#[allow(clippy::module_name_repetitions)]
31pub struct LivelinessSubscriberBuilder<P, C, S>
32where
33 P: Send + Sync + 'static,
34{
35 session_id: String,
36 token: String,
37 context: Context<P>,
38 activation_state: OperationState,
39 put_callback: C,
40 storage: S,
41 delete_callback: Option<ArcLivelinessCallback<P>>,
42}
43
44impl<P> LivelinessSubscriberBuilder<P, NoCallback, NoStorage>
45where
46 P: Send + Sync + 'static,
47{
48 #[must_use]
50 pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
51 let token = selector_from("*", context.prefix());
55 Self {
56 session_id: session_id.into(),
57 token,
58 context,
59 activation_state: OperationState::Created,
60 put_callback: NoCallback,
61 storage: NoStorage,
62 delete_callback: None,
63 }
64 }
65}
66
67impl<P, C, S> LivelinessSubscriberBuilder<P, C, S>
68where
69 P: Send + Sync + 'static,
70{
71 #[must_use]
73 pub const fn activation_state(mut self, state: OperationState) -> Self {
74 self.activation_state = state;
75 self
76 }
77
78 #[must_use]
80 pub fn prefix(self, prefix: &str) -> Self {
81 let token = format!("{prefix}/*");
82 let Self {
83 session_id,
84 context,
85 activation_state,
86 put_callback,
87 storage,
88 delete_callback,
89 ..
90 } = self;
91 Self {
92 session_id,
93 token,
94 context,
95 activation_state,
96 put_callback,
97 storage,
98 delete_callback,
99 }
100 }
101
102 #[must_use]
104 pub fn session_id(mut self, session_id: &str) -> Self {
105 self.session_id = session_id.into();
106 self
107 }
108
109 #[must_use]
111 pub fn token(self, token: impl Into<String>) -> Self {
112 let Self {
113 session_id,
114 context,
115 activation_state,
116 put_callback,
117 storage,
118 delete_callback,
119 ..
120 } = self;
121 Self {
122 session_id,
123 token: token.into(),
124 context,
125 activation_state,
126 put_callback,
127 storage,
128 delete_callback,
129 }
130 }
131
132 #[must_use]
134 pub fn delete_callback<CB, F>(self, mut callback: CB) -> Self
135 where
136 CB: FnMut(Context<P>, String) -> F + Send + Sync + 'static,
137 F: Future<Output = Result<()>> + Send + Sync + 'static,
138 {
139 let Self {
140 session_id,
141 token,
142 context,
143 activation_state,
144 put_callback,
145 storage,
146 ..
147 } = self;
148
149 let callback: LivelinessCallback<P> =
150 Box::new(move |ctx, txt| Box::pin(callback(ctx, txt)));
151 let delete_callback: Option<ArcLivelinessCallback<P>> =
152 Some(Arc::new(Mutex::new(callback)));
153 Self {
154 session_id,
155 token,
156 context,
157 activation_state,
158 put_callback,
159 storage,
160 delete_callback,
161 }
162 }
163}
164
165impl<P, S> LivelinessSubscriberBuilder<P, NoCallback, S>
166where
167 P: Send + Sync + 'static,
168{
169 #[must_use]
171 pub fn put_callback<CB, F>(
172 self,
173 mut callback: CB,
174 ) -> LivelinessSubscriberBuilder<P, Callback<ArcLivelinessCallback<P>>, S>
175 where
176 CB: FnMut(Context<P>, String) -> F + Send + Sync + 'static,
177 F: Future<Output = Result<()>> + Send + Sync + 'static,
178 {
179 let Self {
180 session_id,
181 token,
182 context,
183 activation_state,
184 storage,
185 delete_callback,
186 ..
187 } = self;
188 let callback: LivelinessCallback<P> =
189 Box::new(move |ctx, txt| Box::pin(callback(ctx, txt)));
190 let put_callback: ArcLivelinessCallback<P> = Arc::new(Mutex::new(callback));
191 LivelinessSubscriberBuilder {
192 session_id,
193 token,
194 context,
195 activation_state,
196 put_callback: Callback {
197 callback: put_callback,
198 },
199 storage,
200 delete_callback,
201 }
202 }
203}
204
205impl<P, C> LivelinessSubscriberBuilder<P, C, NoStorage>
206where
207 P: Send + Sync + 'static,
208{
209 #[must_use]
211 pub fn storage(
212 self,
213 storage: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriberTrait>>>>,
214 ) -> LivelinessSubscriberBuilder<P, C, Storage<Box<dyn LivelinessSubscriberTrait>>> {
215 let Self {
216 session_id,
217 token,
218 context,
219 activation_state,
220 put_callback,
221 delete_callback,
222 ..
223 } = self;
224 LivelinessSubscriberBuilder {
225 session_id,
226 token,
227 context,
228 activation_state,
229 put_callback,
230 storage: Storage { storage },
231 delete_callback,
232 }
233 }
234}
235
236impl<P, S> LivelinessSubscriberBuilder<P, Callback<ArcLivelinessCallback<P>>, S>
237where
238 P: Send + Sync + 'static,
239{
240 pub fn build(self) -> Result<LivelinessSubscriber<P>> {
244 let Self {
245 session_id,
246 token,
247 context,
248 activation_state,
249 put_callback,
250 delete_callback,
251 ..
252 } = self;
253 let session = context
254 .session(&session_id)
255 .ok_or_else(|| Error::NoZenohSession)?;
256 Ok(LivelinessSubscriber::new(
257 session,
258 token,
259 context,
260 activation_state,
261 put_callback.callback,
262 delete_callback,
263 ))
264 }
265}
266
267impl<P>
268 LivelinessSubscriberBuilder<
269 P,
270 Callback<ArcLivelinessCallback<P>>,
271 Storage<Box<dyn LivelinessSubscriberTrait>>,
272 >
273where
274 P: Send + Sync + 'static,
275{
276 pub fn add(self) -> Result<Option<Box<dyn LivelinessSubscriberTrait>>> {
280 let c = self.storage.storage.clone();
281 let s = self.build()?;
282
283 let r = c
284 .write()
285 .map_err(|_| Error::MutexPoison(String::from("LivelinessSubscriberBuilder")))?
286 .insert(s.token().into(), Box::new(s));
287 Ok(r)
288 }
289}
290#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[derive(Debug)]
297 struct Props {}
298
299 const fn is_normal<T: Sized + Send + Sync>() {}
301
302 #[test]
303 const fn normal_types() {
304 is_normal::<LivelinessSubscriberBuilder<Props, NoCallback, NoStorage>>();
305 }
306}