dimas_com/communicator/
single_communicator.rs1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12use crate::traits::LivelinessSubscriber;
14use crate::{
15 enums::CommunicatorImplementation,
16 error::Error,
17 traits::{
18 Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
19 Querier, Responder,
20 },
21};
22use alloc::{
23 boxed::Box,
24 string::{String, ToString},
25 sync::Arc,
26 vec::Vec,
27};
28use dimas_config::Config;
29use dimas_core::{Result, enums::OperationState, message_types::Message, traits::Capability};
30use std::{collections::HashMap, sync::RwLock};
31use zenoh::{Session, config::ZenohId};
32const INITIAL_SIZE: usize = 9;
37#[derive(Debug)]
42pub struct SingleCommunicator {
43 uuid: ZenohId,
45 mode: String,
47 state: OperationState,
49 communicator: Arc<CommunicatorImplementation>,
51 liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
53 observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
55 publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
57 queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
59 responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
61}
62
63impl Capability for SingleCommunicator {
64 fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
65 if new_state >= &self.state {
66 self.upgrade_capabilities(new_state)?;
67 } else if new_state < &self.state {
68 self.downgrade_capabilities(new_state)?;
69 }
70 Ok(())
71 }
72}
73
74impl Communicator for SingleCommunicator {
75 fn liveliness_subscribers(
77 &self,
78 ) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
79 self.liveliness_subscribers.clone()
80 }
81
82 fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
84 self.observers.clone()
85 }
86
87 fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
89 self.publishers.clone()
90 }
91
92 fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
94 self.queriers.clone()
95 }
96
97 fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
99 self.responders.clone()
100 }
101
102 fn uuid(&self) -> std::string::String {
103 self.uuid.to_string()
104 }
105
106 fn mode(&self) -> &std::string::String {
107 &self.mode
108 }
109
110 fn default_session(&self) -> Arc<Session> {
111 self.communicator.session()
112 }
113
114 fn session(&self, id: &str) -> Option<Arc<Session>> {
115 if id == "default" {
116 Some(self.communicator.session())
117 } else {
118 None
119 }
120 }
121
122 #[allow(clippy::vec_init_then_push)]
123 fn sessions(&self) -> Vec<Arc<Session>> {
124 let mut res = Vec::with_capacity(1);
125 res.push(self.communicator.session());
126 res
127 }
128}
129
130impl CommunicatorMethods for SingleCommunicator {
131 fn put(&self, selector: &str, message: Message) -> Result<()> {
132 let publishers = self
133 .publishers
134 .read()
135 .map_err(|_| Error::ReadAccess("publishers".into()))?;
136
137 #[allow(clippy::single_match_else)]
138 match publishers.get(selector) {
139 Some(publisher) => publisher.put(message),
140 None => match self.communicator.as_ref() {
141 CommunicatorImplementation::Zenoh(zenoh) => zenoh.put(selector, message),
142 },
143 }
144 }
145
146 fn delete(&self, selector: &str) -> Result<()> {
147 let publishers = self
148 .publishers
149 .read()
150 .map_err(|_| Error::ReadAccess("publishers".into()))?;
151
152 #[allow(clippy::option_if_let_else)]
153 match publishers.get(selector) {
154 Some(publisher) => publisher.delete(),
155 None => match self.communicator.as_ref() {
156 CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
157 },
158 }
159 }
160
161 fn get(
162 &self,
163 selector: &str,
164 message: Option<dimas_core::message_types::Message>,
165 callback: Option<&mut dyn FnMut(dimas_core::message_types::QueryableMsg) -> Result<()>>,
166 ) -> Result<()> {
167 let queriers = self
168 .queriers
169 .read()
170 .map_err(|_| Error::ReadAccess("queriers".into()))?;
171
172 #[allow(clippy::single_match_else)]
173 match queriers.get(selector) {
174 Some(querier) => querier.get(message, callback),
175 None =>
176 {
177 #[allow(clippy::match_wildcard_for_single_variants)]
178 match self.communicator.as_ref() {
179 CommunicatorImplementation::Zenoh(zenoh) => {
180 zenoh.get(selector, message, callback)
181 }
182 }
183 }
184 }
185 }
186
187 fn observe(
188 &self,
189 selector: &str,
190 message: Option<dimas_core::message_types::Message>,
191 ) -> Result<()> {
192 let observers = self
193 .observers
194 .read()
195 .map_err(|_| Error::ReadAccess("observers".into()))?;
196
197 #[allow(clippy::option_if_let_else)]
198 match observers.get(selector) {
199 Some(observer) => observer.request(message),
200 None => Err(crate::error::Error::NotImplemented.into()),
201 }
202 }
203
204 fn watch(&self, _selector: &str, _message: dimas_core::message_types::Message) -> Result<()> {
205 Err(crate::error::Error::NotImplemented.into())
206 }
207}
208
209impl SingleCommunicator {
210 pub fn new(config: &Config) -> Result<Self> {
213 let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
214 let uuid = zenoh.session().zid();
215 let mode = zenoh.mode().clone();
216 let com = Self {
217 uuid,
218 mode,
219 communicator: Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
220 state: OperationState::Created,
221 liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
222 observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
223 publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
224 queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
225 responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
226 };
227 Ok(com)
228 }
229}
230#[cfg(test)]
233mod tests {
234 use super::*;
235
236 const fn is_normal<T: Sized + Send + Sync>() {}
238
239 #[test]
240 const fn normal_types() {
241 is_normal::<SingleCommunicator>();
242 }
243}