dimas_com/communicator/
single_communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Implementation of a multi session/protocol communicator
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use crate::traits::LivelinessSubscriber;
14use crate::{
15	enums::CommunicatorImplementation,
16	error::Error,
17	traits::{
18		Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
19		Querier, Responder,
20	},
21};
22use alloc::{
23	boxed::Box,
24	string::{String, ToString},
25	sync::Arc,
26	vec::Vec,
27};
28use dimas_config::Config;
29use dimas_core::{Result, enums::OperationState, message_types::Message, traits::Capability};
30use std::{collections::HashMap, sync::RwLock};
31use zenoh::{Session, config::ZenohId};
32// endregion:	--- modules
33
34// region:		--- types
35// the initial size of the HashMaps
36const INITIAL_SIZE: usize = 9;
37// endregion:	--- types
38
39// region:		--- SingleCommunicator
40/// a multi session communicator
41#[derive(Debug)]
42pub struct SingleCommunicator {
43	/// a uuid generated by default zenoh session
44	uuid: ZenohId,
45	/// the mode of default zenoh session
46	mode: String,
47	/// The [`Communicator`]s current operational state.
48	state: OperationState,
49	/// Registered Communicator
50	communicator: Arc<CommunicatorImplementation>,
51	/// Registered [`LivelinessSubscriber`]
52	liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
53	/// Registered [`Observer`]
54	observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
55	/// Registered [`Publisher`]
56	publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
57	/// Registered [`Query`]s
58	queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
59	/// Registered [`Observable`]s, [`Queryable`]s and [`Subscriber`]s
60	responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
61}
62
63impl Capability for SingleCommunicator {
64	fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
65		if new_state >= &self.state {
66			self.upgrade_capabilities(new_state)?;
67		} else if new_state < &self.state {
68			self.downgrade_capabilities(new_state)?;
69		}
70		Ok(())
71	}
72}
73
74impl Communicator for SingleCommunicator {
75	/// Get the liveliness subscribers
76	fn liveliness_subscribers(
77		&self,
78	) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
79		self.liveliness_subscribers.clone()
80	}
81
82	/// Get the observers
83	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
84		self.observers.clone()
85	}
86
87	/// Get the publishers
88	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
89		self.publishers.clone()
90	}
91
92	/// Get the queries
93	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
94		self.queriers.clone()
95	}
96
97	/// Get the responders
98	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
99		self.responders.clone()
100	}
101
102	fn uuid(&self) -> std::string::String {
103		self.uuid.to_string()
104	}
105
106	fn mode(&self) -> &std::string::String {
107		&self.mode
108	}
109
110	fn default_session(&self) -> Arc<Session> {
111		self.communicator.session()
112	}
113
114	fn session(&self, id: &str) -> Option<Arc<Session>> {
115		if id == "default" {
116			Some(self.communicator.session())
117		} else {
118			None
119		}
120	}
121
122	#[allow(clippy::vec_init_then_push)]
123	fn sessions(&self) -> Vec<Arc<Session>> {
124		let mut res = Vec::with_capacity(1);
125		res.push(self.communicator.session());
126		res
127	}
128}
129
130impl CommunicatorMethods for SingleCommunicator {
131	fn put(&self, selector: &str, message: Message) -> Result<()> {
132		let publishers = self
133			.publishers
134			.read()
135			.map_err(|_| Error::ReadAccess("publishers".into()))?;
136
137		#[allow(clippy::single_match_else)]
138		match publishers.get(selector) {
139			Some(publisher) => publisher.put(message),
140			None => match self.communicator.as_ref() {
141				CommunicatorImplementation::Zenoh(zenoh) => zenoh.put(selector, message),
142			},
143		}
144	}
145
146	fn delete(&self, selector: &str) -> Result<()> {
147		let publishers = self
148			.publishers
149			.read()
150			.map_err(|_| Error::ReadAccess("publishers".into()))?;
151
152		#[allow(clippy::option_if_let_else)]
153		match publishers.get(selector) {
154			Some(publisher) => publisher.delete(),
155			None => match self.communicator.as_ref() {
156				CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
157			},
158		}
159	}
160
161	fn get(
162		&self,
163		selector: &str,
164		message: Option<dimas_core::message_types::Message>,
165		callback: Option<&mut dyn FnMut(dimas_core::message_types::QueryableMsg) -> Result<()>>,
166	) -> Result<()> {
167		let queriers = self
168			.queriers
169			.read()
170			.map_err(|_| Error::ReadAccess("queriers".into()))?;
171
172		#[allow(clippy::single_match_else)]
173		match queriers.get(selector) {
174			Some(querier) => querier.get(message, callback),
175			None =>
176			{
177				#[allow(clippy::match_wildcard_for_single_variants)]
178				match self.communicator.as_ref() {
179					CommunicatorImplementation::Zenoh(zenoh) => {
180						zenoh.get(selector, message, callback)
181					}
182				}
183			}
184		}
185	}
186
187	fn observe(
188		&self,
189		selector: &str,
190		message: Option<dimas_core::message_types::Message>,
191	) -> Result<()> {
192		let observers = self
193			.observers
194			.read()
195			.map_err(|_| Error::ReadAccess("observers".into()))?;
196
197		#[allow(clippy::option_if_let_else)]
198		match observers.get(selector) {
199			Some(observer) => observer.request(message),
200			None => Err(crate::error::Error::NotImplemented.into()),
201		}
202	}
203
204	fn watch(&self, _selector: &str, _message: dimas_core::message_types::Message) -> Result<()> {
205		Err(crate::error::Error::NotImplemented.into())
206	}
207}
208
209impl SingleCommunicator {
210	/// Constructor
211	/// # Errors
212	pub fn new(config: &Config) -> Result<Self> {
213		let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
214		let uuid = zenoh.session().zid();
215		let mode = zenoh.mode().clone();
216		let com = Self {
217			uuid,
218			mode,
219			communicator: Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
220			state: OperationState::Created,
221			liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
222			observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
223			publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
224			queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
225			responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
226		};
227		Ok(com)
228	}
229}
230// endregion:	--- SingleCommunicator
231
232#[cfg(test)]
233mod tests {
234	use super::*;
235
236	// check, that the auto traits are available
237	const fn is_normal<T: Sized + Send + Sync>() {}
238
239	#[test]
240	const fn normal_types() {
241		is_normal::<SingleCommunicator>();
242	}
243}