dimas_com/zenoh/
publisher.rs

1// Copyright © 2023 Stephan Kunz
2
3//! Module `publisher` provides a message sender `Publisher` which can be created using the `PublisherBuilder`.
4
5#[doc(hidden)]
6extern crate alloc;
7
8#[cfg(feature = "std")]
9extern crate std;
10
11// region:		--- modules
12use crate::error::Error;
13use alloc::{string::String, sync::Arc};
14use core::fmt::Debug;
15use dimas_core::{Result, enums::OperationState, message_types::Message, traits::Capability};
16use tracing::{Level, instrument};
17use zenoh::{
18	Session, Wait,
19	qos::{CongestionControl, Priority},
20};
21#[cfg(feature = "unstable")]
22use zenoh::{qos::Reliability, sample::Locality};
23// endregion:	--- modules
24
25// region:		--- Publisher
26/// Publisher
27pub struct Publisher {
28	/// the zenoh session this publisher belongs to
29	session: Arc<Session>,
30	selector: String,
31	activation_state: OperationState,
32	#[cfg(feature = "unstable")]
33	allowed_destination: Locality,
34	congestion_control: CongestionControl,
35	encoding: String,
36	express: bool,
37	priority: Priority,
38	#[cfg(feature = "unstable")]
39	reliability: Reliability,
40	declared_publ: std::sync::Mutex<Option<zenoh::pubsub::Publisher<'static>>>,
41}
42
43impl Debug for Publisher {
44	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45		f.debug_struct("Publisher")
46			.field("selector", &self.selector)
47			.field("initialized", &self.declared_publ)
48			.finish_non_exhaustive()
49	}
50}
51
52impl crate::traits::Publisher for Publisher {
53	/// Get `selector`
54	fn selector(&self) -> &str {
55		&self.selector
56	}
57
58	/// Send a "put" message
59	/// # Errors
60	///
61	#[instrument(name="publish", level = Level::ERROR, skip_all)]
62	fn put(&self, message: Message) -> Result<()> {
63		self.declared_publ.lock().map_or_else(
64			|_| todo!(),
65			|publisher| match publisher
66				.as_ref()
67				.ok_or(Error::AccessPublisher)?
68				.put(message.value())
69				.wait()
70			{
71				Ok(()) => Ok(()),
72				Err(source) => Err(Error::PublishingPut { source }.into()),
73			},
74		)
75	}
76
77	/// Send a "delete" message
78	/// # Errors
79	///
80	#[instrument(level = Level::ERROR, skip_all)]
81	fn delete(&self) -> Result<()> {
82		self.declared_publ.lock().map_or_else(
83			|_| todo!(),
84			|publisher| match publisher
85				.as_ref()
86				.ok_or(Error::AccessPublisher)?
87				.delete()
88				.wait()
89			{
90				Ok(()) => Ok(()),
91				Err(source) => Err(Error::PublishingDelete { source }.into()),
92			},
93		)
94	}
95}
96
97impl Capability for Publisher {
98	fn manage_operation_state(&self, state: &OperationState) -> Result<()> {
99		if state >= &self.activation_state {
100			return self.init();
101		} else if state < &self.activation_state {
102			return self.de_init();
103		}
104		Ok(())
105	}
106}
107
108impl Publisher {
109	/// Constructor for a [`Publisher`]
110	#[allow(clippy::too_many_arguments)]
111	#[must_use]
112	pub const fn new(
113		session: Arc<Session>,
114		selector: String,
115		activation_state: OperationState,
116		#[cfg(feature = "unstable")] allowed_destination: Locality,
117		congestion_control: CongestionControl,
118		encoding: String,
119		express: bool,
120		priority: Priority,
121		#[cfg(feature = "unstable")] reliability: Reliability,
122	) -> Self {
123		Self {
124			session,
125			selector,
126			activation_state,
127			#[cfg(feature = "unstable")]
128			allowed_destination,
129			congestion_control,
130			encoding,
131			express,
132			priority,
133			#[cfg(feature = "unstable")]
134			reliability,
135			declared_publ: std::sync::Mutex::new(None),
136		}
137	}
138
139	/// Initialize
140	/// # Errors
141	///
142	fn init(&self) -> Result<()> {
143		self.de_init()?;
144
145		let builder = self
146			.session
147			.declare_publisher(self.selector.clone())
148			.congestion_control(self.congestion_control)
149			.encoding(self.encoding.as_str())
150			.express(self.express)
151			.priority(self.priority);
152
153		#[cfg(feature = "unstable")]
154		let builder = builder
155			.allowed_destination(self.allowed_destination)
156			.reliability(self.reliability);
157
158		let new_publisher = builder.wait()?;
159		//.map_err(|_| DimasError::Put.into())?;
160		self.declared_publ.lock().map_or_else(
161			|_| todo!(),
162			|mut publisher| {
163				publisher.replace(new_publisher);
164				Ok(())
165			},
166		)
167	}
168
169	/// De-Initialize
170	/// # Errors
171	///
172	#[allow(clippy::unnecessary_wraps)]
173	fn de_init(&self) -> Result<()> {
174		self.declared_publ.lock().map_or_else(
175			|_| todo!(),
176			|mut publisher| {
177				publisher.take();
178				Ok(())
179			},
180		)
181	}
182}
183// endregion:	--- Publisher
184
185#[cfg(test)]
186mod tests {
187	use super::*;
188
189	// check, that the auto traits are available
190	const fn is_normal<T: Sized + Send + Sync>() {}
191
192	#[test]
193	const fn normal_types() {
194		is_normal::<Publisher>();
195	}
196}