dimas_com/communicator/
single_communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Implementation of a multi session/protocol communicator
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13#[cfg(feature = "unstable")]
14use crate::traits::LivelinessSubscriber;
15use crate::{
16	enums::CommunicatorImplementation,
17	error::Error,
18	traits::{
19		Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
20		Querier, Responder,
21	},
22};
23use alloc::{
24	boxed::Box,
25	string::{String, ToString},
26	sync::Arc,
27	vec::Vec,
28};
29use dimas_config::Config;
30use dimas_core::{enums::OperationState, message_types::Message, traits::Capability, Result};
31use std::{collections::HashMap, sync::RwLock};
32use zenoh::{config::ZenohId, Session};
33// endregion:	--- modules
34
35// region:		--- types
36// the initial size of the HashMaps
37const INITIAL_SIZE: usize = 9;
38// endregion:	--- types
39
40// region:		--- SingleCommunicator
41/// a multi session communicator
42#[derive(Debug)]
43pub struct SingleCommunicator {
44	/// a uuid generated by default zenoh session
45	uuid: ZenohId,
46	/// the mode of default zenoh session
47	mode: String,
48	/// The [`Communicator`]s current operational state.
49	state: OperationState,
50	/// Registered Communicator
51	communicator: Arc<CommunicatorImplementation>,
52	/// Registered [`LivelinessSubscriber`]
53	#[cfg(feature = "unstable")]
54	liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
55	/// Registered [`Observer`]
56	observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
57	/// Registered [`Publisher`]
58	publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
59	/// Registered [`Query`]s
60	queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
61	/// Registered [`Observable`]s, [`Queryable`]s and [`Subscriber`]s
62	responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
63}
64
65impl Capability for SingleCommunicator {
66	fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
67		if new_state >= &self.state {
68			self.upgrade_capabilities(new_state)?;
69		} else if new_state < &self.state {
70			self.downgrade_capabilities(new_state)?;
71		}
72		Ok(())
73	}
74}
75
76impl Communicator for SingleCommunicator {
77	/// Get the liveliness subscribers
78	#[cfg(feature = "unstable")]
79	fn liveliness_subscribers(
80		&self,
81	) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
82		self.liveliness_subscribers.clone()
83	}
84
85	/// Get the observers
86	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
87		self.observers.clone()
88	}
89
90	/// Get the publishers
91	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
92		self.publishers.clone()
93	}
94
95	/// Get the queries
96	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
97		self.queriers.clone()
98	}
99
100	/// Get the responders
101	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
102		self.responders.clone()
103	}
104
105	fn uuid(&self) -> std::string::String {
106		self.uuid.to_string()
107	}
108
109	fn mode(&self) -> &std::string::String {
110		&self.mode
111	}
112
113	fn default_session(&self) -> Arc<Session> {
114		self.communicator.session()
115	}
116
117	fn session(&self, id: &str) -> Option<Arc<Session>> {
118		if id == "default" {
119			Some(self.communicator.session())
120		} else {
121			None
122		}
123	}
124
125	#[allow(clippy::vec_init_then_push)]
126	fn sessions(&self) -> Vec<Arc<Session>> {
127		let mut res = Vec::with_capacity(1);
128		res.push(self.communicator.session());
129		res
130	}
131}
132
133impl CommunicatorMethods for SingleCommunicator {
134	fn put(&self, selector: &str, message: Message) -> Result<()> {
135		let publishers = self
136			.publishers
137			.read()
138			.map_err(|_| Error::ReadAccess("publishers".into()))?;
139
140		#[allow(clippy::single_match_else)]
141		match publishers.get(selector) {
142			Some(publisher) => publisher.put(message),
143			None => match self.communicator.as_ref() {
144				CommunicatorImplementation::Zenoh(zenoh) => zenoh.put(selector, message),
145			},
146		}
147	}
148
149	fn delete(&self, selector: &str) -> Result<()> {
150		let publishers = self
151			.publishers
152			.read()
153			.map_err(|_| Error::ReadAccess("publishers".into()))?;
154
155		#[allow(clippy::option_if_let_else)]
156		match publishers.get(selector) {
157			Some(publisher) => publisher.delete(),
158			None => match self.communicator.as_ref() {
159				CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
160			},
161		}
162	}
163
164	fn get(
165		&self,
166		selector: &str,
167		message: Option<dimas_core::message_types::Message>,
168		callback: Option<&mut dyn FnMut(dimas_core::message_types::QueryableMsg) -> Result<()>>,
169	) -> Result<()> {
170		let queriers = self
171			.queriers
172			.read()
173			.map_err(|_| Error::ReadAccess("queriers".into()))?;
174
175		#[allow(clippy::single_match_else)]
176		match queriers.get(selector) {
177			Some(querier) => querier.get(message, callback),
178			None =>
179			{
180				#[allow(clippy::match_wildcard_for_single_variants)]
181				match self.communicator.as_ref() {
182					CommunicatorImplementation::Zenoh(zenoh) => {
183						zenoh.get(selector, message, callback)
184					}
185				}
186			}
187		}
188	}
189
190	fn observe(
191		&self,
192		selector: &str,
193		message: Option<dimas_core::message_types::Message>,
194	) -> Result<()> {
195		let observers = self
196			.observers
197			.read()
198			.map_err(|_| Error::ReadAccess("observers".into()))?;
199
200		#[allow(clippy::option_if_let_else)]
201		match observers.get(selector) {
202			Some(observer) => observer.request(message),
203			None => Err(crate::error::Error::NotImplemented.into()),
204		}
205	}
206
207	fn watch(&self, _selector: &str, _message: dimas_core::message_types::Message) -> Result<()> {
208		Err(crate::error::Error::NotImplemented.into())
209	}
210}
211
212impl SingleCommunicator {
213	/// Constructor
214	/// # Errors
215	pub fn new(config: &Config) -> Result<Self> {
216		let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
217		let uuid = zenoh.session().zid();
218		let mode = zenoh.mode().to_string();
219		let com = Self {
220			uuid,
221			mode,
222			communicator: Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
223			state: OperationState::Created,
224			#[cfg(feature = "unstable")]
225			liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
226			observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
227			publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
228			queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
229			responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
230		};
231		Ok(com)
232	}
233}
234// endregion:	--- SingleCommunicator
235
236#[cfg(test)]
237mod tests {
238	use super::*;
239
240	// check, that the auto traits are available
241	const fn is_normal<T: Sized + Send + Sync>() {}
242
243	#[test]
244	const fn normal_types() {
245		is_normal::<SingleCommunicator>();
246	}
247}