dimas_com/communicator/
multi_communicator.rs1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12use crate::error::Error;
14use crate::traits::LivelinessSubscriber;
15use crate::{
16 enums::CommunicatorImplementation,
17 traits::{
18 Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
19 Querier, Responder,
20 },
21};
22use alloc::{
23 boxed::Box,
24 string::{String, ToString},
25 sync::Arc,
26 vec::Vec,
27};
28use dimas_config::Config;
29use dimas_core::message_types::{Message, QueryableMsg};
30use dimas_core::{Result, enums::OperationState, traits::Capability};
31use std::{collections::HashMap, sync::RwLock};
32use zenoh::{Session, config::ZenohId};
33const INITIAL_SIZE: usize = 9;
38const DEFAULT: &str = "default";
40#[derive(Debug)]
45pub struct MultiCommunicator {
46 uuid: ZenohId,
48 mode: String,
50 state: OperationState,
52 communicators: Arc<RwLock<HashMap<String, Arc<CommunicatorImplementation>>>>,
54 liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
56 observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
58 publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
60 queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
62 responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
64}
65
66impl Capability for MultiCommunicator {
67 fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
68 if new_state >= &self.state {
69 self.upgrade_capabilities(new_state)?;
70 } else if new_state < &self.state {
71 self.downgrade_capabilities(new_state)?;
72 }
73 Ok(())
74 }
75}
76
77impl CommunicatorMethods for MultiCommunicator {
78 fn put(&self, selector: &str, message: Message) -> Result<()> {
82 let publishers = self
83 .publishers
84 .read()
85 .map_err(|_| Error::ReadAccess("publishers".into()))?;
86
87 #[allow(clippy::single_match_else)]
88 match publishers.get(selector) {
89 Some(publisher) => publisher.put(message),
90 None => {
91 let comm = self
92 .communicators
93 .read()
94 .map_err(|_| Error::ReadAccess("publishers".into()))?
95 .get(DEFAULT)
96 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
97 .cloned()?;
98
99 comm.put(selector, message)
100 }
101 }
102 }
103
104 fn delete(&self, selector: &str) -> Result<()> {
108 let publishers = self
109 .publishers
110 .read()
111 .map_err(|_| Error::ReadAccess("publishers".into()))?;
112
113 #[allow(clippy::single_match_else)]
114 match publishers.get(selector) {
115 Some(publisher) => publisher.delete(),
116 None => {
117 let comm = self
118 .communicators
119 .read()
120 .map_err(|_| Error::ReadAccess("publishers".into()))?
121 .get(DEFAULT)
122 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
123 .cloned()?;
124
125 #[allow(clippy::match_wildcard_for_single_variants)]
126 match comm.as_ref() {
127 CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
128 }
129 }
130 }
131 }
132
133 fn get(
137 &self,
138 selector: &str,
139 message: Option<Message>,
140 callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
141 ) -> Result<()> {
142 let queriers = self
143 .queriers
144 .read()
145 .map_err(|_| Error::ReadAccess("queriers".into()))?;
146
147 #[allow(clippy::single_match_else)]
148 match queriers.get(selector) {
149 Some(querier) => querier.get(message, callback),
150 None => {
151 let comm = self
152 .communicators
153 .read()
154 .map_err(|_| Error::ReadAccess("queriers".into()))?
155 .get(DEFAULT)
156 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
157 .cloned()?;
158
159 match comm.as_ref() {
160 CommunicatorImplementation::Zenoh(zenoh) => {
161 zenoh.get(selector, message, callback)
162 }
163 }
164 }
165 }
166 }
167
168 fn observe(&self, selector: &str, message: Option<Message>) -> Result<()> {
172 let observers = self
173 .observers
174 .read()
175 .map_err(|_| Error::ReadAccess("observers".into()))?;
176
177 #[allow(clippy::single_match_else)]
178 match observers.get(selector) {
179 Some(observer) => observer.request(message),
180 None => {
181 let comm = self
182 .communicators
183 .read()
184 .map_err(|_| Error::ReadAccess("observers".into()))?
185 .get(DEFAULT)
186 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
187 .cloned()?;
188
189 #[allow(clippy::match_wildcard_for_single_variants)]
190 match comm.as_ref() {
191 CommunicatorImplementation::Zenoh(_zenoh) => Err(Error::NotImplemented.into()),
192 }
193 }
194 }
195 }
196
197 fn watch(&self, _selector: &str, _message: Message) -> Result<()> {
201 Err(Error::NotImplemented.into())
202 }
203}
204
205impl Communicator for MultiCommunicator {
206 fn uuid(&self) -> String {
208 self.uuid.to_string()
209 }
210
211 fn mode(&self) -> &String {
213 &self.mode
214 }
215
216 fn liveliness_subscribers(
218 &self,
219 ) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
220 self.liveliness_subscribers.clone()
221 }
222
223 fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
225 self.observers.clone()
226 }
227
228 fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
230 self.publishers.clone()
231 }
232
233 fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
235 self.queriers.clone()
236 }
237
238 fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
240 self.responders.clone()
241 }
242
243 fn default_session(&self) -> Arc<Session> {
244 let com = self
245 .communicators
246 .read()
247 .expect("snh")
248 .get(DEFAULT)
249 .cloned()
250 .expect("snh");
251 match com.as_ref() {
252 CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
253 }
254 }
255
256 fn session(&self, id: &str) -> Option<Arc<zenoh::Session>> {
257 let com = self
258 .communicators
259 .read()
260 .expect("snh")
261 .get(id)
262 .cloned()
263 .expect("snh");
264 match com.as_ref() {
265 CommunicatorImplementation::Zenoh(communicator) => {
266 let com = communicator.session();
267 Some(com)
268 }
269 }
270 }
271
272 fn sessions(&self) -> Vec<Arc<Session>> {
273 let com: Vec<Arc<Session>> = self
274 .communicators
275 .read()
276 .expect("snh")
277 .values()
278 .map(|com| match com.as_ref() {
279 CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
280 })
281 .collect();
282 com
283 }
284}
285
286impl MultiCommunicator {
287 pub fn new(config: &Config) -> Result<Self> {
290 let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
291 let uuid = zenoh.session().zid();
292 let mode = zenoh.mode().clone();
293 let com = Self {
294 uuid,
295 mode,
296 state: OperationState::Created,
297 communicators: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
298 liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
299 observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
300 publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
301 queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
302 responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
303 };
304 com.communicators
306 .write()
307 .map_err(|_| Error::ModifyStruct("commmunicators".into()))?
308 .insert(
309 "default".to_string(),
310 Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
311 );
312 if let Some(sessions) = config.sessions() {
314 for session in sessions {
315 match session.protocol.as_str() {
316 "zenoh" => {
317 let zenoh = crate::zenoh::Communicator::new(&session.config)?;
318 com.communicators
319 .write()
320 .map_err(|_| Error::ModifyStruct("commmunicators".into()))?
321 .insert(
322 session.name.clone(),
323 Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
324 );
325 }
326 _ => {
327 return Err(Error::UnknownProtocol {
328 protocol: session.protocol.clone(),
329 }
330 .into());
331 }
332 }
333 }
334 }
335
336 Ok(com)
337 }
338}
339#[cfg(test)]
342mod tests {
343 use super::*;
344
345 const fn is_normal<T: Sized + Send + Sync>() {}
347
348 #[test]
349 const fn normal_types() {
350 is_normal::<MultiCommunicator>();
351 }
352}