dimas_com/builder/
publisher_builder.rs

1// Copyright © 2023 Stephan Kunz
2
3//! Module `publisher` provides a message sender `Publisher` which can be created using the `PublisherBuilder`.
4
5#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11// region:		--- modules
12use crate::error::Error;
13use crate::traits::Publisher as PublisherTrait;
14use crate::zenoh::publisher::Publisher;
15use alloc::{
16	boxed::Box,
17	string::{String, ToString},
18	sync::Arc,
19};
20use dimas_core::builder_states::{NoSelector, NoStorage, Selector, Storage};
21use dimas_core::{Result, enums::OperationState, traits::Context, utils::selector_from};
22#[cfg(feature = "std")]
23use std::{collections::HashMap, sync::RwLock};
24use zenoh::bytes::Encoding;
25use zenoh::qos::CongestionControl;
26use zenoh::qos::Priority;
27#[cfg(feature = "unstable")]
28use zenoh::{qos::Reliability, sample::Locality};
29// endregion:	--- modules
30
31// region:		--- PublisherBuilder
32/// The builder for a [`Publisher`]
33pub struct PublisherBuilder<P, K, S>
34where
35	P: Send + Sync + 'static,
36{
37	session_id: String,
38	context: Context<P>,
39	activation_state: OperationState,
40	#[cfg(feature = "unstable")]
41	allowed_destination: Locality,
42	congestion_control: CongestionControl,
43	encoding: String,
44	express: bool,
45	priority: Priority,
46	#[cfg(feature = "unstable")]
47	reliability: Reliability,
48	selector: K,
49	storage: S,
50}
51
52impl<P> PublisherBuilder<P, NoSelector, NoStorage>
53where
54	P: Send + Sync + 'static,
55{
56	/// Construct a [`PublisherBuilder`] in initial state
57	#[must_use]
58	pub fn new(session_id: impl Into<String>, context: Context<P>) -> Self {
59		Self {
60			session_id: session_id.into(),
61			context,
62			activation_state: OperationState::Active,
63			#[cfg(feature = "unstable")]
64			allowed_destination: Locality::Any,
65			congestion_control: CongestionControl::Drop,
66			encoding: Encoding::default().to_string(),
67			express: false,
68			priority: Priority::Data,
69			#[cfg(feature = "unstable")]
70			reliability: Reliability::BestEffort,
71			selector: NoSelector,
72			storage: NoStorage,
73		}
74	}
75}
76
77impl<P, K, S> PublisherBuilder<P, K, S>
78where
79	P: Send + Sync + 'static,
80{
81	/// Set the activation state.
82	#[must_use]
83	pub const fn activation_state(mut self, state: OperationState) -> Self {
84		self.activation_state = state;
85		self
86	}
87
88	/// Set the publishers alllowed destinations
89	#[cfg(feature = "unstable")]
90	#[must_use]
91	pub const fn set_allowed_destination(mut self, allowed_destination: Locality) -> Self {
92		self.allowed_destination = allowed_destination;
93		self
94	}
95
96	/// Set the session id.
97	#[must_use]
98	pub fn session_id(mut self, session_id: &str) -> Self {
99		self.session_id = session_id.into();
100		self
101	}
102
103	/// Set the publishers congestion control
104	#[must_use]
105	pub const fn set_congestion_control(mut self, congestion_control: CongestionControl) -> Self {
106		self.congestion_control = congestion_control;
107		self
108	}
109
110	/// Set the publishers encoding
111	#[must_use]
112	pub fn encoding(mut self, encoding: String) -> Self {
113		self.encoding = encoding;
114		self
115	}
116
117	/// Set the publishers enexpress policy
118	#[must_use]
119	pub const fn set_express(mut self, express: bool) -> Self {
120		self.express = express;
121		self
122	}
123
124	/// Set the publishers priority
125	#[must_use]
126	pub const fn set_priority(mut self, priority: Priority) -> Self {
127		self.priority = priority;
128		self
129	}
130
131	/// Set the publishers reliability
132	#[cfg(feature = "unstable")]
133	#[must_use]
134	pub const fn set_reliability(mut self, reliability: Reliability) -> Self {
135		self.reliability = reliability;
136		self
137	}
138}
139
140impl<P, K> PublisherBuilder<P, K, NoStorage>
141where
142	P: Send + Sync + 'static,
143{
144	/// Provide agents storage for the publisher
145	#[must_use]
146	pub fn storage(
147		self,
148		storage: Arc<RwLock<HashMap<String, Box<dyn PublisherTrait>>>>,
149	) -> PublisherBuilder<P, K, Storage<Box<dyn PublisherTrait>>> {
150		let Self {
151			session_id,
152			context,
153			activation_state,
154			#[cfg(feature = "unstable")]
155			allowed_destination,
156			congestion_control,
157			encoding,
158			express,
159			priority,
160			#[cfg(feature = "unstable")]
161			reliability,
162			selector,
163			..
164		} = self;
165		PublisherBuilder {
166			session_id,
167			context,
168			activation_state,
169			#[cfg(feature = "unstable")]
170			allowed_destination,
171			congestion_control,
172			encoding,
173			express,
174			priority,
175			#[cfg(feature = "unstable")]
176			reliability,
177			selector,
178			storage: Storage { storage },
179		}
180	}
181}
182
183impl<P, S> PublisherBuilder<P, NoSelector, S>
184where
185	P: Send + Sync + 'static,
186{
187	/// Set the full key expression for the [`Publisher`]
188	#[must_use]
189	pub fn selector(self, selector: &str) -> PublisherBuilder<P, Selector, S> {
190		let Self {
191			session_id,
192			context,
193			activation_state,
194			#[cfg(feature = "unstable")]
195			allowed_destination,
196			congestion_control,
197			encoding,
198			express,
199			priority,
200			#[cfg(feature = "unstable")]
201			reliability,
202			storage,
203			..
204		} = self;
205		PublisherBuilder {
206			session_id,
207			context,
208			activation_state,
209			#[cfg(feature = "unstable")]
210			allowed_destination,
211			congestion_control,
212			encoding,
213			express,
214			priority,
215			#[cfg(feature = "unstable")]
216			reliability,
217			selector: Selector {
218				selector: selector.into(),
219			},
220			storage,
221		}
222	}
223
224	/// Set only the message qualifing part of the [`Publisher`].
225	/// Will be prefixed with `Agent`s prefix.
226	#[must_use]
227	pub fn topic(self, topic: &str) -> PublisherBuilder<P, Selector, S> {
228		let selector = selector_from(topic, self.context.prefix());
229		self.selector(&selector)
230	}
231}
232
233impl<P, S> PublisherBuilder<P, Selector, S>
234where
235	P: Send + Sync + 'static,
236{
237	/// Build the [`Publisher`]
238	///
239	/// # Errors
240	/// Currently none
241	pub fn build(self) -> Result<Publisher> {
242		let session = self
243			.context
244			.session(&self.session_id)
245			.ok_or_else(|| Error::NoZenohSession)?;
246		Ok(Publisher::new(
247			session,
248			self.selector.selector,
249			self.activation_state,
250			#[cfg(feature = "unstable")]
251			self.allowed_destination,
252			self.congestion_control,
253			self.encoding,
254			self.express,
255			self.priority,
256			#[cfg(feature = "unstable")]
257			self.reliability,
258		))
259	}
260}
261
262impl<P> PublisherBuilder<P, Selector, Storage<Box<dyn PublisherTrait>>>
263where
264	P: Send + Sync + 'static,
265{
266	/// Build and add the [Publisher] to the `Agent`s context
267	///
268	/// # Errors
269	/// Currently none
270	pub fn add(self) -> Result<Option<Box<dyn PublisherTrait>>> {
271		let collection = self.storage.storage.clone();
272		let p = self.build()?;
273		let r = collection
274			.write()
275			.map_err(|_| Error::MutexPoison(String::from("PublisherBuilder")))?
276			.insert(p.selector().to_string(), Box::new(p));
277		Ok(r)
278	}
279}
280// endregion:	--- PublisherBuilder
281
282#[cfg(test)]
283mod tests {
284	use super::*;
285
286	#[derive(Debug)]
287	struct Props {}
288
289	// check, that the auto traits are available
290	const fn is_normal<T: Sized + Send + Sync>() {}
291
292	#[test]
293	const fn normal_types() {
294		is_normal::<PublisherBuilder<Props, NoSelector, NoStorage>>();
295	}
296}