dimas_com/builder/
liveliness_subscriber_builder.rs

1// Copyright © 2023 Stephan Kunz
2
3//! Module `liveliness` provides a `LivelinessSubscriber` which can be created using the `LivelinessSubscriberBuilder`.
4//! A `LivelinessSubscriber` can optional subscribe on a delete message.
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use crate::error::Error;
14use crate::{
15	traits::LivelinessSubscriber as LivelinessSubscriberTrait,
16	zenoh::liveliness::{ArcLivelinessCallback, LivelinessCallback, LivelinessSubscriber},
17};
18use alloc::{boxed::Box, format, string::String, sync::Arc};
19use dimas_core::builder_states::{Callback, NoCallback, NoStorage, Storage};
20use dimas_core::{Result, enums::OperationState, traits::Context, utils::selector_from};
21use futures::future::Future;
22#[cfg(feature = "std")]
23use std::{collections::HashMap, sync::RwLock};
24#[cfg(feature = "std")]
25use tokio::sync::Mutex;
26// endregion:	--- modules
27
28// region:		--- LivelinessSubscriberBuilder
29/// The builder for the liveliness subscriber
30#[allow(clippy::module_name_repetitions)]
31pub struct LivelinessSubscriberBuilder<P, C, S>
32where
33	P: Send + Sync + 'static,
34{
35	session_id: String,
36	token: String,
37	context: Context<P>,
38	activation_state: OperationState,
39	put_callback: C,
40	storage: S,
41	delete_callback: Option<ArcLivelinessCallback<P>>,
42}
43
44impl<P> LivelinessSubscriberBuilder<P, NoCallback, NoStorage>
45where
46	P: Send + Sync + 'static,
47{
48	/// Construct a `LivelinessSubscriberBuilder` in initial state
49	#[must_use]
50	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
51		//let token = context
52		//	.prefix()
53		//	.map_or("*".to_string(), |prefix| format!("{prefix}/*"));
54		let token = selector_from("*", context.prefix());
55		Self {
56			session_id: session_id.into(),
57			token,
58			context,
59			activation_state: OperationState::Created,
60			put_callback: NoCallback,
61			storage: NoStorage,
62			delete_callback: None,
63		}
64	}
65}
66
67impl<P, C, S> LivelinessSubscriberBuilder<P, C, S>
68where
69	P: Send + Sync + 'static,
70{
71	/// Set the activation state.
72	#[must_use]
73	pub const fn activation_state(mut self, state: OperationState) -> Self {
74		self.activation_state = state;
75		self
76	}
77
78	/// Set a different prefix for the liveliness subscriber.
79	#[must_use]
80	pub fn prefix(self, prefix: &str) -> Self {
81		let token = format!("{prefix}/*");
82		let Self {
83			session_id,
84			context,
85			activation_state,
86			put_callback,
87			storage,
88			delete_callback,
89			..
90		} = self;
91		Self {
92			session_id,
93			token,
94			context,
95			activation_state,
96			put_callback,
97			storage,
98			delete_callback,
99		}
100	}
101
102	/// Set the session id.
103	#[must_use]
104	pub fn session_id(mut self, session_id: &str) -> Self {
105		self.session_id = session_id.into();
106		self
107	}
108
109	/// Set an explicite token for the liveliness subscriber.
110	#[must_use]
111	pub fn token(self, token: impl Into<String>) -> Self {
112		let Self {
113			session_id,
114			context,
115			activation_state,
116			put_callback,
117			storage,
118			delete_callback,
119			..
120		} = self;
121		Self {
122			session_id,
123			token: token.into(),
124			context,
125			activation_state,
126			put_callback,
127			storage,
128			delete_callback,
129		}
130	}
131
132	/// Set liveliness subscribers callback for `delete` messages
133	#[must_use]
134	pub fn delete_callback<CB, F>(self, mut callback: CB) -> Self
135	where
136		CB: FnMut(Context<P>, String) -> F + Send + Sync + 'static,
137		F: Future<Output = Result<()>> + Send + Sync + 'static,
138	{
139		let Self {
140			session_id,
141			token,
142			context,
143			activation_state,
144			put_callback,
145			storage,
146			..
147		} = self;
148
149		let callback: LivelinessCallback<P> =
150			Box::new(move |ctx, txt| Box::pin(callback(ctx, txt)));
151		let delete_callback: Option<ArcLivelinessCallback<P>> =
152			Some(Arc::new(Mutex::new(callback)));
153		Self {
154			session_id,
155			token,
156			context,
157			activation_state,
158			put_callback,
159			storage,
160			delete_callback,
161		}
162	}
163}
164
165impl<P, S> LivelinessSubscriberBuilder<P, NoCallback, S>
166where
167	P: Send + Sync + 'static,
168{
169	/// Set liveliness subscribers callback for `put` messages
170	#[must_use]
171	pub fn put_callback<CB, F>(
172		self,
173		mut callback: CB,
174	) -> LivelinessSubscriberBuilder<P, Callback<ArcLivelinessCallback<P>>, S>
175	where
176		CB: FnMut(Context<P>, String) -> F + Send + Sync + 'static,
177		F: Future<Output = Result<()>> + Send + Sync + 'static,
178	{
179		let Self {
180			session_id,
181			token,
182			context,
183			activation_state,
184			storage,
185			delete_callback,
186			..
187		} = self;
188		let callback: LivelinessCallback<P> =
189			Box::new(move |ctx, txt| Box::pin(callback(ctx, txt)));
190		let put_callback: ArcLivelinessCallback<P> = Arc::new(Mutex::new(callback));
191		LivelinessSubscriberBuilder {
192			session_id,
193			token,
194			context,
195			activation_state,
196			put_callback: Callback {
197				callback: put_callback,
198			},
199			storage,
200			delete_callback,
201		}
202	}
203}
204
205impl<P, C> LivelinessSubscriberBuilder<P, C, NoStorage>
206where
207	P: Send + Sync + 'static,
208{
209	/// Provide agents storage for the liveliness subscriber
210	#[must_use]
211	pub fn storage(
212		self,
213		storage: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriberTrait>>>>,
214	) -> LivelinessSubscriberBuilder<P, C, Storage<Box<dyn LivelinessSubscriberTrait>>> {
215		let Self {
216			session_id,
217			token,
218			context,
219			activation_state,
220			put_callback,
221			delete_callback,
222			..
223		} = self;
224		LivelinessSubscriberBuilder {
225			session_id,
226			token,
227			context,
228			activation_state,
229			put_callback,
230			storage: Storage { storage },
231			delete_callback,
232		}
233	}
234}
235
236impl<P, S> LivelinessSubscriberBuilder<P, Callback<ArcLivelinessCallback<P>>, S>
237where
238	P: Send + Sync + 'static,
239{
240	/// Build the [`LivelinessSubscriber`]
241	/// # Errors
242	///
243	pub fn build(self) -> Result<LivelinessSubscriber<P>> {
244		let Self {
245			session_id,
246			token,
247			context,
248			activation_state,
249			put_callback,
250			delete_callback,
251			..
252		} = self;
253		let session = context
254			.session(&session_id)
255			.ok_or_else(|| Error::NoZenohSession)?;
256		Ok(LivelinessSubscriber::new(
257			session,
258			token,
259			context,
260			activation_state,
261			put_callback.callback,
262			delete_callback,
263		))
264	}
265}
266
267impl<P>
268	LivelinessSubscriberBuilder<
269		P,
270		Callback<ArcLivelinessCallback<P>>,
271		Storage<Box<dyn LivelinessSubscriberTrait>>,
272	>
273where
274	P: Send + Sync + 'static,
275{
276	/// Build and add the liveliness subscriber to the agent
277	/// # Errors
278	///
279	pub fn add(self) -> Result<Option<Box<dyn LivelinessSubscriberTrait>>> {
280		let c = self.storage.storage.clone();
281		let s = self.build()?;
282
283		let r = c
284			.write()
285			.map_err(|_| Error::MutexPoison(String::from("LivelinessSubscriberBuilder")))?
286			.insert(s.token().into(), Box::new(s));
287		Ok(r)
288	}
289}
290// endregion:	--- LivelinessSubscriberBuilder
291
292#[cfg(test)]
293mod tests {
294	use super::*;
295
296	#[derive(Debug)]
297	struct Props {}
298
299	// check, that the auto traits are available
300	const fn is_normal<T: Sized + Send + Sync>() {}
301
302	#[test]
303	const fn normal_types() {
304		is_normal::<LivelinessSubscriberBuilder<Props, NoCallback, NoStorage>>();
305	}
306}