dimas_com/communicator/
multi_communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Implementation of a multi session/protocol communicator
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use crate::error::Error;
14use crate::traits::LivelinessSubscriber;
15use crate::{
16	enums::CommunicatorImplementation,
17	traits::{
18		Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
19		Querier, Responder,
20	},
21};
22use alloc::{
23	boxed::Box,
24	string::{String, ToString},
25	sync::Arc,
26	vec::Vec,
27};
28use dimas_config::Config;
29use dimas_core::message_types::{Message, QueryableMsg};
30use dimas_core::{Result, enums::OperationState, traits::Capability};
31use std::{collections::HashMap, sync::RwLock};
32use zenoh::{Session, config::ZenohId};
33// endregion:   --- modules
34
35// region:		--- types
36/// the initial size of the `HashMaps`
37const INITIAL_SIZE: usize = 9;
38/// id for default communication session
39const DEFAULT: &str = "default";
40// endregion:	--- types
41
42// region:      --- MultiCommunicator
43/// a multi session communicator
44#[derive(Debug)]
45pub struct MultiCommunicator {
46	/// a uuid generated by default zenoh session
47	uuid: ZenohId,
48	/// the mode of default zenoh session
49	mode: String,
50	/// The [`Communicator`]s current operational state.
51	state: OperationState,
52	/// Registered Communicators
53	communicators: Arc<RwLock<HashMap<String, Arc<CommunicatorImplementation>>>>,
54	/// Registered [`LivelinessSubscriber`]
55	liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
56	/// Registered [`Observer`]
57	observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
58	/// Registered [`Publisher`]
59	publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
60	/// Registered [`Query`]s
61	queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
62	/// Registered [`Observable`]s, [`Queryable`]s and [`Subscriber`]s
63	responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
64}
65
66impl Capability for MultiCommunicator {
67	fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
68		if new_state >= &self.state {
69			self.upgrade_capabilities(new_state)?;
70		} else if new_state < &self.state {
71			self.downgrade_capabilities(new_state)?;
72		}
73		Ok(())
74	}
75}
76
77impl CommunicatorMethods for MultiCommunicator {
78	/// Send a put message [`Message`] to the given `selector`.
79	/// # Errors
80	/// - `NotImplemented`: there is no implementation within this communicator
81	fn put(&self, selector: &str, message: Message) -> Result<()> {
82		let publishers = self
83			.publishers
84			.read()
85			.map_err(|_| Error::ReadAccess("publishers".into()))?;
86
87		#[allow(clippy::single_match_else)]
88		match publishers.get(selector) {
89			Some(publisher) => publisher.put(message),
90			None => {
91				let comm = self
92					.communicators
93					.read()
94					.map_err(|_| Error::ReadAccess("publishers".into()))?
95					.get(DEFAULT)
96					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
97					.cloned()?;
98
99				comm.put(selector, message)
100			}
101		}
102	}
103
104	/// Send a delete message to the given `selector`.
105	/// # Errors
106	/// - `NotImplemented`: there is no implementation within this communicator
107	fn delete(&self, selector: &str) -> Result<()> {
108		let publishers = self
109			.publishers
110			.read()
111			.map_err(|_| Error::ReadAccess("publishers".into()))?;
112
113		#[allow(clippy::single_match_else)]
114		match publishers.get(selector) {
115			Some(publisher) => publisher.delete(),
116			None => {
117				let comm = self
118					.communicators
119					.read()
120					.map_err(|_| Error::ReadAccess("publishers".into()))?
121					.get(DEFAULT)
122					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
123					.cloned()?;
124
125				#[allow(clippy::match_wildcard_for_single_variants)]
126				match comm.as_ref() {
127					CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
128				}
129			}
130		}
131	}
132
133	/// Send a query with an optional specification [`Message`] to the given `selector`.
134	/// # Errors
135	/// - `NotImplemented`: there is no implementation within this communicator
136	fn get(
137		&self,
138		selector: &str,
139		message: Option<Message>,
140		callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
141	) -> Result<()> {
142		let queriers = self
143			.queriers
144			.read()
145			.map_err(|_| Error::ReadAccess("queriers".into()))?;
146
147		#[allow(clippy::single_match_else)]
148		match queriers.get(selector) {
149			Some(querier) => querier.get(message, callback),
150			None => {
151				let comm = self
152					.communicators
153					.read()
154					.map_err(|_| Error::ReadAccess("queriers".into()))?
155					.get(DEFAULT)
156					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
157					.cloned()?;
158
159				match comm.as_ref() {
160					CommunicatorImplementation::Zenoh(zenoh) => {
161						zenoh.get(selector, message, callback)
162					}
163				}
164			}
165		}
166	}
167
168	/// Request an observation for [`Message`] from the given `selector`
169	/// # Errors
170	/// - `NotImplemented`: there is no implementation within this communicator
171	fn observe(&self, selector: &str, message: Option<Message>) -> Result<()> {
172		let observers = self
173			.observers
174			.read()
175			.map_err(|_| Error::ReadAccess("observers".into()))?;
176
177		#[allow(clippy::single_match_else)]
178		match observers.get(selector) {
179			Some(observer) => observer.request(message),
180			None => {
181				let comm = self
182					.communicators
183					.read()
184					.map_err(|_| Error::ReadAccess("observers".into()))?
185					.get(DEFAULT)
186					.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
187					.cloned()?;
188
189				#[allow(clippy::match_wildcard_for_single_variants)]
190				match comm.as_ref() {
191					CommunicatorImplementation::Zenoh(_zenoh) => Err(Error::NotImplemented.into()),
192				}
193			}
194		}
195	}
196
197	/// Request a stream configured by [`Message`] from the given `selector`
198	/// # Errors
199	/// - `NotImplemented`: there is no implementation within this communicator
200	fn watch(&self, _selector: &str, _message: Message) -> Result<()> {
201		Err(Error::NotImplemented.into())
202	}
203}
204
205impl Communicator for MultiCommunicator {
206	/// the uuid of the default zenoh session
207	fn uuid(&self) -> String {
208		self.uuid.to_string()
209	}
210
211	/// the mode of the default zenoh session
212	fn mode(&self) -> &String {
213		&self.mode
214	}
215
216	/// Get the liveliness subscribers
217	fn liveliness_subscribers(
218		&self,
219	) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
220		self.liveliness_subscribers.clone()
221	}
222
223	/// Get the observers
224	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
225		self.observers.clone()
226	}
227
228	/// Get the publishers
229	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
230		self.publishers.clone()
231	}
232
233	/// Get the queries
234	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
235		self.queriers.clone()
236	}
237
238	/// Get the responders
239	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
240		self.responders.clone()
241	}
242
243	fn default_session(&self) -> Arc<Session> {
244		let com = self
245			.communicators
246			.read()
247			.expect("snh")
248			.get(DEFAULT)
249			.cloned()
250			.expect("snh");
251		match com.as_ref() {
252			CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
253		}
254	}
255
256	fn session(&self, id: &str) -> Option<Arc<zenoh::Session>> {
257		let com = self
258			.communicators
259			.read()
260			.expect("snh")
261			.get(id)
262			.cloned()
263			.expect("snh");
264		match com.as_ref() {
265			CommunicatorImplementation::Zenoh(communicator) => {
266				let com = communicator.session();
267				Some(com)
268			}
269		}
270	}
271
272	fn sessions(&self) -> Vec<Arc<Session>> {
273		let com: Vec<Arc<Session>> = self
274			.communicators
275			.read()
276			.expect("snh")
277			.values()
278			.map(|com| match com.as_ref() {
279				CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
280			})
281			.collect();
282		com
283	}
284}
285
286impl MultiCommunicator {
287	/// Constructor
288	/// # Errors
289	pub fn new(config: &Config) -> Result<Self> {
290		let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
291		let uuid = zenoh.session().zid();
292		let mode = zenoh.mode().clone();
293		let com = Self {
294			uuid,
295			mode,
296			state: OperationState::Created,
297			communicators: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
298			liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
299			observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
300			publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
301			queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
302			responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
303		};
304		// add the default communicator
305		com.communicators
306			.write()
307			.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
308			.insert(
309				"default".to_string(),
310				Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
311			);
312		// create the additional sessions
313		if let Some(sessions) = config.sessions() {
314			for session in sessions {
315				match session.protocol.as_str() {
316					"zenoh" => {
317						let zenoh = crate::zenoh::Communicator::new(&session.config)?;
318						com.communicators
319							.write()
320							.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
321							.insert(
322								session.name.clone(),
323								Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
324							);
325					}
326					_ => {
327						return Err(Error::UnknownProtocol {
328							protocol: session.protocol.clone(),
329						}
330						.into());
331					}
332				}
333			}
334		}
335
336		Ok(com)
337	}
338}
339// endregion:   --- MultiCommunicator
340
341#[cfg(test)]
342mod tests {
343	use super::*;
344
345	// check, that the auto traits are available
346	const fn is_normal<T: Sized + Send + Sync>() {}
347
348	#[test]
349	const fn normal_types() {
350		is_normal::<MultiCommunicator>();
351	}
352}