dimas_com/communicator/
multi_communicator.rs1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12use crate::error::Error;
14#[cfg(feature = "unstable")]
15use crate::traits::LivelinessSubscriber;
16use crate::{
17 enums::CommunicatorImplementation,
18 traits::{
19 Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
20 Querier, Responder,
21 },
22};
23use alloc::{
24 boxed::Box,
25 string::{String, ToString},
26 sync::Arc,
27 vec::Vec,
28};
29use dimas_config::Config;
30use dimas_core::message_types::{Message, QueryableMsg};
31use dimas_core::{enums::OperationState, traits::Capability, Result};
32use std::{collections::HashMap, sync::RwLock};
33use zenoh::{config::ZenohId, Session};
34const INITIAL_SIZE: usize = 9;
39const DEFAULT: &str = "default";
41#[derive(Debug)]
46pub struct MultiCommunicator {
47 uuid: ZenohId,
49 mode: String,
51 state: OperationState,
53 communicators: Arc<RwLock<HashMap<String, Arc<CommunicatorImplementation>>>>,
55 #[cfg(feature = "unstable")]
57 liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
58 observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
60 publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
62 queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
64 responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
66}
67
68impl Capability for MultiCommunicator {
69 fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
70 if new_state >= &self.state {
71 self.upgrade_capabilities(new_state)?;
72 } else if new_state < &self.state {
73 self.downgrade_capabilities(new_state)?;
74 }
75 Ok(())
76 }
77}
78
79impl CommunicatorMethods for MultiCommunicator {
80 fn put(&self, selector: &str, message: Message) -> Result<()> {
84 let publishers = self
85 .publishers
86 .read()
87 .map_err(|_| Error::ReadAccess("publishers".into()))?;
88
89 #[allow(clippy::single_match_else)]
90 match publishers.get(selector) {
91 Some(publisher) => publisher.put(message),
92 None => {
93 let comm = self
94 .communicators
95 .read()
96 .map_err(|_| Error::ReadAccess("publishers".into()))?
97 .get(DEFAULT)
98 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
99 .cloned()?;
100
101 comm.put(selector, message)
102 }
103 }
104 }
105
106 fn delete(&self, selector: &str) -> Result<()> {
110 let publishers = self
111 .publishers
112 .read()
113 .map_err(|_| Error::ReadAccess("publishers".into()))?;
114
115 #[allow(clippy::single_match_else)]
116 match publishers.get(selector) {
117 Some(publisher) => publisher.delete(),
118 None => {
119 let comm = self
120 .communicators
121 .read()
122 .map_err(|_| Error::ReadAccess("publishers".into()))?
123 .get(DEFAULT)
124 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
125 .cloned()?;
126
127 #[allow(clippy::match_wildcard_for_single_variants)]
128 match comm.as_ref() {
129 CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
130 }
131 }
132 }
133 }
134
135 fn get(
139 &self,
140 selector: &str,
141 message: Option<Message>,
142 callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
143 ) -> Result<()> {
144 let queriers = self
145 .queriers
146 .read()
147 .map_err(|_| Error::ReadAccess("queriers".into()))?;
148
149 #[allow(clippy::single_match_else)]
150 match queriers.get(selector) {
151 Some(querier) => querier.get(message, callback),
152 None => {
153 let comm = self
154 .communicators
155 .read()
156 .map_err(|_| Error::ReadAccess("queriers".into()))?
157 .get(DEFAULT)
158 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
159 .cloned()?;
160
161 match comm.as_ref() {
162 CommunicatorImplementation::Zenoh(zenoh) => {
163 zenoh.get(selector, message, callback)
164 }
165 }
166 }
167 }
168 }
169
170 fn observe(&self, selector: &str, message: Option<Message>) -> Result<()> {
174 let observers = self
175 .observers
176 .read()
177 .map_err(|_| Error::ReadAccess("observers".into()))?;
178
179 #[allow(clippy::single_match_else)]
180 match observers.get(selector) {
181 Some(observer) => observer.request(message),
182 None => {
183 let comm = self
184 .communicators
185 .read()
186 .map_err(|_| Error::ReadAccess("observers".into()))?
187 .get(DEFAULT)
188 .ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
189 .cloned()?;
190
191 #[allow(clippy::match_wildcard_for_single_variants)]
192 match comm.as_ref() {
193 CommunicatorImplementation::Zenoh(_zenoh) => Err(Error::NotImplemented.into()),
194 }
195 }
196 }
197 }
198
199 fn watch(&self, _selector: &str, _message: Message) -> Result<()> {
203 Err(Error::NotImplemented.into())
204 }
205}
206
207impl Communicator for MultiCommunicator {
208 fn uuid(&self) -> String {
210 self.uuid.to_string()
211 }
212
213 fn mode(&self) -> &String {
215 &self.mode
216 }
217
218 #[cfg(feature = "unstable")]
220 fn liveliness_subscribers(
221 &self,
222 ) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
223 self.liveliness_subscribers.clone()
224 }
225
226 fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
228 self.observers.clone()
229 }
230
231 fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
233 self.publishers.clone()
234 }
235
236 fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
238 self.queriers.clone()
239 }
240
241 fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
243 self.responders.clone()
244 }
245
246 fn default_session(&self) -> Arc<Session> {
247 let com = self
248 .communicators
249 .read()
250 .expect("snh")
251 .get(DEFAULT)
252 .cloned()
253 .expect("snh");
254 match com.as_ref() {
255 CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
256 }
257 }
258
259 fn session(&self, id: &str) -> Option<Arc<zenoh::Session>> {
260 let com = self
261 .communicators
262 .read()
263 .expect("snh")
264 .get(id)
265 .cloned()
266 .expect("snh");
267 match com.as_ref() {
268 CommunicatorImplementation::Zenoh(communicator) => {
269 let com = communicator.session();
270 Some(com)
271 }
272 }
273 }
274
275 fn sessions(&self) -> Vec<Arc<Session>> {
276 let com: Vec<Arc<Session>> = self
277 .communicators
278 .read()
279 .expect("snh")
280 .iter()
281 .map(|(_id, com)| match com.as_ref() {
282 CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
283 })
284 .collect();
285 com
286 }
287}
288
289impl MultiCommunicator {
290 pub fn new(config: &Config) -> Result<Self> {
293 let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
294 let uuid = zenoh.session().zid();
295 let mode = zenoh.mode().to_string();
296 let com = Self {
297 uuid,
298 mode,
299 state: OperationState::Created,
300 communicators: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
301 #[cfg(feature = "unstable")]
302 liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
303 observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
304 publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
305 queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
306 responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
307 };
308 com.communicators
310 .write()
311 .map_err(|_| Error::ModifyStruct("commmunicators".into()))?
312 .insert(
313 "default".to_string(),
314 Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
315 );
316 if let Some(sessions) = config.sessions() {
318 for session in sessions {
319 match session.protocol.as_str() {
320 "zenoh" => {
321 let zenoh = crate::zenoh::Communicator::new(&session.config)?;
322 com.communicators
323 .write()
324 .map_err(|_| Error::ModifyStruct("commmunicators".into()))?
325 .insert(
326 session.name.clone(),
327 Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
328 );
329 }
330 _ => {
331 return Err(Error::UnknownProtocol {
332 protocol: session.protocol.clone(),
333 }
334 .into());
335 }
336 }
337 }
338 }
339
340 Ok(com)
341 }
342}
343#[cfg(test)]
346mod tests {
347 use super::*;
348
349 const fn is_normal<T: Sized + Send + Sync>() {}
351
352 #[test]
353 const fn normal_types() {
354 is_normal::<MultiCommunicator>();
355 }
356}