dimas_com/communicator/
multi_communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Implementation of a multi session/protocol communicator
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use crate::error::Error;
14#[cfg(feature = "unstable")]
15use crate::traits::LivelinessSubscriber;
16use crate::{
17	enums::CommunicatorImplementation,
18	traits::{
19		Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
20		Querier, Responder,
21	},
22};
23use alloc::{
24	boxed::Box,
25	string::{String, ToString},
26	sync::Arc,
27	vec::Vec,
28};
29use dimas_config::Config;
30use dimas_core::message_types::{Message, QueryableMsg};
31use dimas_core::{enums::OperationState, traits::Capability, Result};
32use std::{collections::HashMap, sync::RwLock};
33use zenoh::{config::ZenohId, Session};
34// endregion:   --- modules
35
36// region:		--- types
37/// the initial size of the `HashMaps`
38const INITIAL_SIZE: usize = 9;
39/// id for default communication session
40const DEFAULT: &str = "default";
41// endregion:	--- types
42
43// region:      --- MultiCommunicator
44/// a multi session communicator
45#[derive(Debug)]
46pub struct MultiCommunicator {
47	/// a uuid generated by default zenoh session
48	uuid: ZenohId,
49	/// the mode of default zenoh session
50	mode: String,
51	/// The [`Communicator`]s current operational state.
52	state: OperationState,
53	/// Registered Communicators
54	communicators: Arc<RwLock<HashMap<String, Arc<CommunicatorImplementation>>>>,
55	/// Registered [`LivelinessSubscriber`]
56	#[cfg(feature = "unstable")]
57	liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
58	/// Registered [`Observer`]
59	observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
60	/// Registered [`Publisher`]
61	publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
62	/// Registered [`Query`]s
63	queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
64	/// Registered [`Observable`]s, [`Queryable`]s and [`Subscriber`]s
65	responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
66}
67
68impl Capability for MultiCommunicator {
69	fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
70		if new_state >= &self.state {
71			self.upgrade_capabilities(new_state)?;
72		} else if new_state < &self.state {
73			self.downgrade_capabilities(new_state)?;
74		}
75		Ok(())
76	}
77}
78
79impl CommunicatorMethods for MultiCommunicator {
80	/// Send a put message [`Message`] to the given `selector`.
81	/// # Errors
82	/// - `NotImplemented`: there is no implementation within this communicator
83	fn put(&self, selector: &str, message: Message) -> Result<()> {
84		let publishers = self
85			.publishers
86			.read()
87			.map_err(|_| Error::ReadAccess("publishers".into()))?;
88
89		#[allow(clippy::single_match_else)]
90		match publishers.get(selector) {
91			Some(publisher) => publisher.put(message),
92			None => {
93				let comm = self
94					.communicators
95					.read()
96					.map_err(|_| Error::ReadAccess("publishers".into()))?
97					.get(DEFAULT)
98					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
99					.cloned()?;
100
101				comm.put(selector, message)
102			}
103		}
104	}
105
106	/// Send a delete message to the given `selector`.
107	/// # Errors
108	/// - `NotImplemented`: there is no implementation within this communicator
109	fn delete(&self, selector: &str) -> Result<()> {
110		let publishers = self
111			.publishers
112			.read()
113			.map_err(|_| Error::ReadAccess("publishers".into()))?;
114
115		#[allow(clippy::single_match_else)]
116		match publishers.get(selector) {
117			Some(publisher) => publisher.delete(),
118			None => {
119				let comm = self
120					.communicators
121					.read()
122					.map_err(|_| Error::ReadAccess("publishers".into()))?
123					.get(DEFAULT)
124					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
125					.cloned()?;
126
127				#[allow(clippy::match_wildcard_for_single_variants)]
128				match comm.as_ref() {
129					CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
130				}
131			}
132		}
133	}
134
135	/// Send a query with an optional specification [`Message`] to the given `selector`.
136	/// # Errors
137	/// - `NotImplemented`: there is no implementation within this communicator
138	fn get(
139		&self,
140		selector: &str,
141		message: Option<Message>,
142		callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
143	) -> Result<()> {
144		let queriers = self
145			.queriers
146			.read()
147			.map_err(|_| Error::ReadAccess("queriers".into()))?;
148
149		#[allow(clippy::single_match_else)]
150		match queriers.get(selector) {
151			Some(querier) => querier.get(message, callback),
152			None => {
153				let comm = self
154					.communicators
155					.read()
156					.map_err(|_| Error::ReadAccess("queriers".into()))?
157					.get(DEFAULT)
158					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
159					.cloned()?;
160
161				match comm.as_ref() {
162					CommunicatorImplementation::Zenoh(zenoh) => {
163						zenoh.get(selector, message, callback)
164					}
165				}
166			}
167		}
168	}
169
170	/// Request an observation for [`Message`] from the given `selector`
171	/// # Errors
172	/// - `NotImplemented`: there is no implementation within this communicator
173	fn observe(&self, selector: &str, message: Option<Message>) -> Result<()> {
174		let observers = self
175			.observers
176			.read()
177			.map_err(|_| Error::ReadAccess("observers".into()))?;
178
179		#[allow(clippy::single_match_else)]
180		match observers.get(selector) {
181			Some(observer) => observer.request(message),
182			None => {
183				let comm = self
184					.communicators
185					.read()
186					.map_err(|_| Error::ReadAccess("observers".into()))?
187					.get(DEFAULT)
188					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
189					.cloned()?;
190
191				#[allow(clippy::match_wildcard_for_single_variants)]
192				match comm.as_ref() {
193					CommunicatorImplementation::Zenoh(_zenoh) => Err(Error::NotImplemented.into()),
194				}
195			}
196		}
197	}
198
199	/// Request a stream configured by [`Message`] from the given `selector`
200	/// # Errors
201	/// - `NotImplemented`: there is no implementation within this communicator
202	fn watch(&self, _selector: &str, _message: Message) -> Result<()> {
203		Err(Error::NotImplemented.into())
204	}
205}
206
207impl Communicator for MultiCommunicator {
208	/// the uuid of the default zenoh session
209	fn uuid(&self) -> String {
210		self.uuid.to_string()
211	}
212
213	/// the mode of the default zenoh session
214	fn mode(&self) -> &String {
215		&self.mode
216	}
217
218	/// Get the liveliness subscribers
219	#[cfg(feature = "unstable")]
220	fn liveliness_subscribers(
221		&self,
222	) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
223		self.liveliness_subscribers.clone()
224	}
225
226	/// Get the observers
227	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
228		self.observers.clone()
229	}
230
231	/// Get the publishers
232	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
233		self.publishers.clone()
234	}
235
236	/// Get the queries
237	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
238		self.queriers.clone()
239	}
240
241	/// Get the responders
242	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
243		self.responders.clone()
244	}
245
246	fn default_session(&self) -> Arc<Session> {
247		let com = self
248			.communicators
249			.read()
250			.expect("snh")
251			.get(DEFAULT)
252			.cloned()
253			.expect("snh");
254		match com.as_ref() {
255			CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
256		}
257	}
258
259	fn session(&self, id: &str) -> Option<Arc<zenoh::Session>> {
260		let com = self
261			.communicators
262			.read()
263			.expect("snh")
264			.get(id)
265			.cloned()
266			.expect("snh");
267		match com.as_ref() {
268			CommunicatorImplementation::Zenoh(communicator) => {
269				let com = communicator.session();
270				Some(com)
271			}
272		}
273	}
274
275	fn sessions(&self) -> Vec<Arc<Session>> {
276		let com: Vec<Arc<Session>> = self
277			.communicators
278			.read()
279			.expect("snh")
280			.iter()
281			.map(|(_id, com)| match com.as_ref() {
282				CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
283			})
284			.collect();
285		com
286	}
287}
288
289impl MultiCommunicator {
290	/// Constructor
291	/// # Errors
292	pub fn new(config: &Config) -> Result<Self> {
293		let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
294		let uuid = zenoh.session().zid();
295		let mode = zenoh.mode().to_string();
296		let com = Self {
297			uuid,
298			mode,
299			state: OperationState::Created,
300			communicators: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
301			#[cfg(feature = "unstable")]
302			liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
303			observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
304			publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
305			queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
306			responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
307		};
308		// add the default communicator
309		com.communicators
310			.write()
311			.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
312			.insert(
313				"default".to_string(),
314				Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
315			);
316		// create the additional sessions
317		if let Some(sessions) = config.sessions() {
318			for session in sessions {
319				match session.protocol.as_str() {
320					"zenoh" => {
321						let zenoh = crate::zenoh::Communicator::new(&session.config)?;
322						com.communicators
323							.write()
324							.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
325							.insert(
326								session.name.clone(),
327								Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
328							);
329					}
330					_ => {
331						return Err(Error::UnknownProtocol {
332							protocol: session.protocol.clone(),
333						}
334						.into());
335					}
336				}
337			}
338		}
339
340		Ok(com)
341	}
342}
343// endregion:   --- MultiCommunicator
344
345#[cfg(test)]
346mod tests {
347	use super::*;
348
349	// check, that the auto traits are available
350	const fn is_normal<T: Sized + Send + Sync>() {}
351
352	#[test]
353	const fn normal_types() {
354		is_normal::<MultiCommunicator>();
355	}
356}