dimas_com/builder/
subscriber_builder.rs

1// Copyright © 2023 Stephan Kunz
2
3//! Module `subscriber` provides a message `Subscriber` which can be created using the `SubscriberBuilder`.
4//! A `Subscriber` can optional subscribe on a delete message.
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use crate::error::Error;
14use crate::traits::Responder as SubscriberTrait;
15use crate::zenoh::subscriber::{
16	ArcDeleteCallback, ArcPutCallback, DeleteCallback, PutCallback, Subscriber,
17};
18use alloc::{
19	boxed::Box,
20	string::{String, ToString},
21	sync::Arc,
22};
23use dimas_core::builder_states::{Callback, NoCallback, NoSelector, NoStorage, Selector, Storage};
24use dimas_core::{
25	Result, enums::OperationState, message_types::Message, traits::Context, utils::selector_from,
26};
27use futures::future::Future;
28#[cfg(feature = "std")]
29use std::{collections::HashMap, sync::RwLock};
30#[cfg(feature = "std")]
31use tokio::sync::Mutex;
32#[cfg(feature = "unstable")]
33use zenoh::sample::Locality;
34// endregion:	--- modules
35
36// region:		--- SubscriberBuilder
37/// A builder for a subscriber
38#[derive(Clone)]
39pub struct SubscriberBuilder<P, K, C, S>
40where
41	P: Send + Sync + 'static,
42{
43	session_id: String,
44	context: Context<P>,
45	activation_state: OperationState,
46	#[cfg(feature = "unstable")]
47	allowed_origin: Locality,
48	selector: K,
49	put_callback: C,
50	storage: S,
51	delete_callback: Option<ArcDeleteCallback<P>>,
52}
53
54impl<P> SubscriberBuilder<P, NoSelector, NoCallback, NoStorage>
55where
56	P: Send + Sync + 'static,
57{
58	/// Construct a `SubscriberBuilder` in initial state
59	#[must_use]
60	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
61		Self {
62			session_id: session_id.into(),
63			context,
64			activation_state: OperationState::Active,
65			#[cfg(feature = "unstable")]
66			allowed_origin: Locality::Any,
67			selector: NoSelector,
68			put_callback: NoCallback,
69			storage: NoStorage,
70			delete_callback: None,
71		}
72	}
73}
74
75impl<P, K, C, S> SubscriberBuilder<P, K, C, S>
76where
77	P: Send + Sync + 'static,
78{
79	/// Set the activation state.
80	#[must_use]
81	pub const fn activation_state(mut self, state: OperationState) -> Self {
82		self.activation_state = state;
83		self
84	}
85
86	/// Set the allowed origin.
87	#[cfg(feature = "unstable")]
88	#[must_use]
89	pub const fn allowed_origin(mut self, allowed_origin: Locality) -> Self {
90		self.allowed_origin = allowed_origin;
91		self
92	}
93
94	/// Set subscribers callback for `delete` messages
95	#[must_use]
96	pub fn delete_callback<CB, F>(mut self, mut callback: CB) -> Self
97	where
98		CB: FnMut(Context<P>) -> F + Send + Sync + 'static,
99		F: Future<Output = Result<()>> + Send + Sync + 'static,
100	{
101		let callback: DeleteCallback<P> = Box::new(move |ctx| Box::pin(callback(ctx)));
102		self.delete_callback
103			.replace(Arc::new(Mutex::new(callback)));
104		self
105	}
106
107	/// Set the session id.
108	#[must_use]
109	pub fn session_id(mut self, session_id: &str) -> Self {
110		self.session_id = session_id.into();
111		self
112	}
113}
114
115impl<P, C, S> SubscriberBuilder<P, NoSelector, C, S>
116where
117	P: Send + Sync + 'static,
118{
119	/// Set the full key expression for the [`Subscriber`].
120	#[must_use]
121	pub fn selector(self, selector: &str) -> SubscriberBuilder<P, Selector, C, S> {
122		let Self {
123			session_id,
124			context,
125			activation_state,
126			#[cfg(feature = "unstable")]
127			allowed_origin,
128			storage,
129			put_callback,
130			delete_callback,
131			..
132		} = self;
133		SubscriberBuilder {
134			session_id,
135			context,
136			activation_state,
137			#[cfg(feature = "unstable")]
138			allowed_origin,
139			selector: Selector {
140				selector: selector.into(),
141			},
142			put_callback,
143			storage,
144			delete_callback,
145		}
146	}
147
148	/// Set only the message qualifing part of the [`Subscriber`].
149	/// Will be prefixed with `Agent`s prefix.
150	#[must_use]
151	pub fn topic(self, topic: &str) -> SubscriberBuilder<P, Selector, C, S> {
152		let selector = selector_from(topic, self.context.prefix());
153		self.selector(&selector)
154	}
155}
156
157impl<P, K, S> SubscriberBuilder<P, K, NoCallback, S>
158where
159	P: Send + Sync + 'static,
160{
161	/// Set callback for put messages
162	#[must_use]
163	pub fn put_callback<CB, F>(
164		self,
165		mut callback: CB,
166	) -> SubscriberBuilder<P, K, Callback<ArcPutCallback<P>>, S>
167	where
168		CB: FnMut(Context<P>, Message) -> F + Send + Sync + 'static,
169		F: Future<Output = Result<()>> + Send + Sync + 'static,
170	{
171		let Self {
172			session_id,
173			context,
174			activation_state,
175			#[cfg(feature = "unstable")]
176			allowed_origin,
177			selector,
178			storage,
179			delete_callback,
180			..
181		} = self;
182		let callback: PutCallback<P> = Box::new(move |ctx, msg| Box::pin(callback(ctx, msg)));
183		let callback: ArcPutCallback<P> = Arc::new(Mutex::new(callback));
184		SubscriberBuilder {
185			session_id,
186			context,
187			activation_state,
188			#[cfg(feature = "unstable")]
189			allowed_origin,
190			selector,
191			put_callback: Callback { callback },
192			storage,
193			delete_callback,
194		}
195	}
196}
197
198impl<P, K, C> SubscriberBuilder<P, K, C, NoStorage>
199where
200	P: Send + Sync + 'static,
201{
202	/// Provide agents storage for the subscriber
203	#[must_use]
204	pub fn storage(
205		self,
206		storage: Arc<RwLock<HashMap<String, Box<dyn SubscriberTrait>>>>,
207	) -> SubscriberBuilder<P, K, C, Storage<Box<dyn SubscriberTrait>>> {
208		let Self {
209			session_id,
210			context,
211			activation_state,
212			#[cfg(feature = "unstable")]
213			allowed_origin,
214			selector,
215			put_callback,
216			delete_callback,
217			..
218		} = self;
219		SubscriberBuilder {
220			session_id,
221			context,
222			activation_state,
223			#[cfg(feature = "unstable")]
224			allowed_origin,
225			selector,
226			put_callback,
227			storage: Storage { storage },
228			delete_callback,
229		}
230	}
231}
232
233impl<P, S> SubscriberBuilder<P, Selector, Callback<ArcPutCallback<P>>, S>
234where
235	P: Send + Sync + 'static,
236{
237	/// Build the [`Subscriber`].
238	///
239	/// # Errors
240	/// Currently none
241	pub fn build(self) -> Result<Subscriber<P>> {
242		let Self {
243			session_id,
244			selector,
245			context,
246			activation_state,
247			#[cfg(feature = "unstable")]
248			allowed_origin,
249			put_callback,
250			delete_callback,
251			..
252		} = self;
253		let session = context
254			.session(&session_id)
255			.ok_or_else(|| Error::NoZenohSession)?;
256		Ok(Subscriber::new(
257			session,
258			selector.selector,
259			context,
260			activation_state,
261			#[cfg(feature = "unstable")]
262			allowed_origin,
263			put_callback.callback,
264			delete_callback,
265		))
266	}
267}
268
269impl<P>
270	SubscriberBuilder<P, Selector, Callback<ArcPutCallback<P>>, Storage<Box<dyn SubscriberTrait>>>
271where
272	P: Send + Sync + 'static,
273{
274	/// Build and add the [`Subscriber`] to the `Agent`.
275	///
276	/// # Errors
277	/// Currently none
278	pub fn add(self) -> Result<Option<Box<dyn SubscriberTrait>>> {
279		let c = self.storage.storage.clone();
280		let s = self.build()?;
281
282		let r = c
283			.write()
284			.map_err(|_| Error::MutexPoison(String::from("SubscriberBuilder")))?
285			.insert(s.selector().to_string(), Box::new(s));
286		Ok(r)
287	}
288}
289// endregion:	--- SubscriberBuilder
290
291#[cfg(test)]
292mod tests {
293	use super::*;
294
295	#[derive(Debug)]
296	struct Props {}
297
298	// check, that the auto traits are available
299	const fn is_normal<T: Sized + Send + Sync>() {}
300
301	#[test]
302	const fn normal_types() {
303		is_normal::<SubscriberBuilder<Props, NoSelector, NoCallback, NoStorage>>();
304	}
305}