dimas_com/traits/
communicator.rs1#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12#[cfg(any(feature = "unstable", doc))]
14use super::LivelinessSubscriber;
15use super::{CommunicatorMethods, Observer, Publisher, Querier, Responder};
16use crate::error::Error;
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18use dimas_core::{enums::OperationState, error::Result, traits::Capability};
19#[cfg(feature = "std")]
20use std::{collections::HashMap, sync::RwLock};
21use tracing::error;
22use zenoh::Session;
23pub trait Communicator: Capability + CommunicatorMethods + Send + Sync {
28 #[cfg(feature = "unstable")]
30 #[must_use]
31 fn liveliness_subscribers(&self)
32 -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>;
33
34 #[must_use]
36 fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>>;
37
38 #[must_use]
40 fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>;
41
42 #[must_use]
44 fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>>;
45
46 #[must_use]
48 fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>>;
49
50 fn upgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
62 #[cfg(feature = "unstable")]
64 self.liveliness_subscribers()
65 .write()
66 .map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
67 .iter_mut()
68 .for_each(|subscriber| {
69 let _ = subscriber.1.manage_operation_state(new_state);
70 });
71
72 self.responders()
74 .write()
75 .map_err(|_| Error::ModifyStruct("subscribers".into()))?
76 .iter_mut()
77 .for_each(|subscriber| {
78 let _ = subscriber.1.manage_operation_state(new_state);
79 });
80
81 self.publishers()
83 .write()
84 .map_err(|_| Error::ModifyStruct("publishers".into()))?
85 .iter_mut()
86 .for_each(|publisher| {
87 if let Err(reason) = publisher.1.manage_operation_state(new_state) {
88 error!(
89 "could not initialize publisher for {}, reason: {}",
90 publisher.1.selector(),
91 reason
92 );
93 };
94 });
95
96 self.observers()
98 .write()
99 .map_err(|_| Error::ModifyStruct("observers".into()))?
100 .iter_mut()
101 .for_each(|observer| {
102 if let Err(reason) = observer.1.manage_operation_state(new_state) {
103 error!(
104 "could not initialize observer for {}, reason: {}",
105 observer.1.selector(),
106 reason
107 );
108 };
109 });
110
111 self.queriers()
113 .write()
114 .map_err(|_| Error::ModifyStruct("queries".into()))?
115 .iter_mut()
116 .for_each(|query| {
117 if let Err(reason) = query.1.manage_operation_state(new_state) {
118 error!(
119 "could not initialize query for {}, reason: {}",
120 query.1.selector(),
121 reason
122 );
123 };
124 });
125
126 Ok(())
127 }
128
129 fn downgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
136 self.queriers()
139 .write()
140 .map_err(|_| Error::ModifyStruct("queries".into()))?
141 .iter_mut()
142 .for_each(|query| {
143 if let Err(reason) = query.1.manage_operation_state(new_state) {
144 error!(
145 "could not de-initialize query for {}, reason: {}",
146 query.1.selector(),
147 reason
148 );
149 };
150 });
151
152 self.observers()
154 .write()
155 .map_err(|_| Error::ModifyStruct("observers".into()))?
156 .iter_mut()
157 .for_each(|observer| {
158 if let Err(reason) = observer.1.manage_operation_state(new_state) {
159 error!(
160 "could not de-initialize observer for {}, reason: {}",
161 observer.1.selector(),
162 reason
163 );
164 };
165 });
166
167 self.publishers()
169 .write()
170 .map_err(|_| Error::ModifyStruct("publishers".into()))?
171 .iter_mut()
172 .for_each(|publisher| {
173 let _ = publisher.1.manage_operation_state(new_state);
174 });
175
176 self.responders()
178 .write()
179 .map_err(|_| Error::ModifyStruct("subscribers".into()))?
180 .iter_mut()
181 .for_each(|subscriber| {
182 let _ = subscriber.1.manage_operation_state(new_state);
183 });
184
185 #[cfg(feature = "unstable")]
187 self.liveliness_subscribers()
188 .write()
189 .map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
190 .iter_mut()
191 .for_each(|subscriber| {
192 let _ = subscriber.1.manage_operation_state(new_state);
193 });
194
195 Ok(())
196 }
197
198 #[must_use]
200 fn uuid(&self) -> String;
201
202 #[must_use]
204 fn mode(&self) -> &String;
205
206 fn default_session(&self) -> Arc<Session>;
208
209 fn session(&self, id: &str) -> Option<Arc<Session>>;
211
212 fn sessions(&self) -> Vec<Arc<Session>>;
214}