dimas_com/communicator/
single_communicator.rs1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12#[cfg(feature = "unstable")]
14use crate::traits::LivelinessSubscriber;
15use crate::{
16 enums::CommunicatorImplementation,
17 error::Error,
18 traits::{
19 Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
20 Querier, Responder,
21 },
22};
23use alloc::{
24 boxed::Box,
25 string::{String, ToString},
26 sync::Arc,
27 vec::Vec,
28};
29use dimas_config::Config;
30use dimas_core::{enums::OperationState, message_types::Message, traits::Capability, Result};
31use std::{collections::HashMap, sync::RwLock};
32use zenoh::{config::ZenohId, Session};
33const INITIAL_SIZE: usize = 9;
38#[derive(Debug)]
43pub struct SingleCommunicator {
44 uuid: ZenohId,
46 mode: String,
48 state: OperationState,
50 communicator: Arc<CommunicatorImplementation>,
52 #[cfg(feature = "unstable")]
54 liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
55 observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
57 publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
59 queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
61 responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
63}
64
65impl Capability for SingleCommunicator {
66 fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
67 if new_state >= &self.state {
68 self.upgrade_capabilities(new_state)?;
69 } else if new_state < &self.state {
70 self.downgrade_capabilities(new_state)?;
71 }
72 Ok(())
73 }
74}
75
76impl Communicator for SingleCommunicator {
77 #[cfg(feature = "unstable")]
79 fn liveliness_subscribers(
80 &self,
81 ) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
82 self.liveliness_subscribers.clone()
83 }
84
85 fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
87 self.observers.clone()
88 }
89
90 fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
92 self.publishers.clone()
93 }
94
95 fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
97 self.queriers.clone()
98 }
99
100 fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
102 self.responders.clone()
103 }
104
105 fn uuid(&self) -> std::string::String {
106 self.uuid.to_string()
107 }
108
109 fn mode(&self) -> &std::string::String {
110 &self.mode
111 }
112
113 fn default_session(&self) -> Arc<Session> {
114 self.communicator.session()
115 }
116
117 fn session(&self, id: &str) -> Option<Arc<Session>> {
118 if id == "default" {
119 Some(self.communicator.session())
120 } else {
121 None
122 }
123 }
124
125 #[allow(clippy::vec_init_then_push)]
126 fn sessions(&self) -> Vec<Arc<Session>> {
127 let mut res = Vec::with_capacity(1);
128 res.push(self.communicator.session());
129 res
130 }
131}
132
133impl CommunicatorMethods for SingleCommunicator {
134 fn put(&self, selector: &str, message: Message) -> Result<()> {
135 let publishers = self
136 .publishers
137 .read()
138 .map_err(|_| Error::ReadAccess("publishers".into()))?;
139
140 #[allow(clippy::single_match_else)]
141 match publishers.get(selector) {
142 Some(publisher) => publisher.put(message),
143 None => match self.communicator.as_ref() {
144 CommunicatorImplementation::Zenoh(zenoh) => zenoh.put(selector, message),
145 },
146 }
147 }
148
149 fn delete(&self, selector: &str) -> Result<()> {
150 let publishers = self
151 .publishers
152 .read()
153 .map_err(|_| Error::ReadAccess("publishers".into()))?;
154
155 #[allow(clippy::option_if_let_else)]
156 match publishers.get(selector) {
157 Some(publisher) => publisher.delete(),
158 None => match self.communicator.as_ref() {
159 CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
160 },
161 }
162 }
163
164 fn get(
165 &self,
166 selector: &str,
167 message: Option<dimas_core::message_types::Message>,
168 callback: Option<&mut dyn FnMut(dimas_core::message_types::QueryableMsg) -> Result<()>>,
169 ) -> Result<()> {
170 let queriers = self
171 .queriers
172 .read()
173 .map_err(|_| Error::ReadAccess("queriers".into()))?;
174
175 #[allow(clippy::single_match_else)]
176 match queriers.get(selector) {
177 Some(querier) => querier.get(message, callback),
178 None =>
179 {
180 #[allow(clippy::match_wildcard_for_single_variants)]
181 match self.communicator.as_ref() {
182 CommunicatorImplementation::Zenoh(zenoh) => {
183 zenoh.get(selector, message, callback)
184 }
185 }
186 }
187 }
188 }
189
190 fn observe(
191 &self,
192 selector: &str,
193 message: Option<dimas_core::message_types::Message>,
194 ) -> Result<()> {
195 let observers = self
196 .observers
197 .read()
198 .map_err(|_| Error::ReadAccess("observers".into()))?;
199
200 #[allow(clippy::option_if_let_else)]
201 match observers.get(selector) {
202 Some(observer) => observer.request(message),
203 None => Err(crate::error::Error::NotImplemented.into()),
204 }
205 }
206
207 fn watch(&self, _selector: &str, _message: dimas_core::message_types::Message) -> Result<()> {
208 Err(crate::error::Error::NotImplemented.into())
209 }
210}
211
212impl SingleCommunicator {
213 pub fn new(config: &Config) -> Result<Self> {
216 let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
217 let uuid = zenoh.session().zid();
218 let mode = zenoh.mode().to_string();
219 let com = Self {
220 uuid,
221 mode,
222 communicator: Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
223 state: OperationState::Created,
224 #[cfg(feature = "unstable")]
225 liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
226 observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
227 publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
228 queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
229 responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
230 };
231 Ok(com)
232 }
233}
234#[cfg(test)]
237mod tests {
238 use super::*;
239
240 const fn is_normal<T: Sized + Send + Sync>() {}
242
243 #[test]
244 const fn normal_types() {
245 is_normal::<SingleCommunicator>();
246 }
247}