dimas_com/traits/
communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Traits for communication.
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13#[cfg(any(feature = "unstable", doc))]
14use super::LivelinessSubscriber;
15use super::{CommunicatorMethods, Observer, Publisher, Querier, Responder};
16use crate::error::Error;
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18use dimas_core::{enums::OperationState, error::Result, traits::Capability};
19#[cfg(feature = "std")]
20use std::{collections::HashMap, sync::RwLock};
21use tracing::error;
22use zenoh::Session;
23// endregion:	--- modules
24
25// region:		--- Communicator
26/// the methodes to be implemented by any communicator
27pub trait Communicator: Capability + CommunicatorMethods + Send + Sync {
28	/// Get the liveliness subscribers
29	#[cfg(feature = "unstable")]
30	#[must_use]
31	fn liveliness_subscribers(&self)
32		-> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>;
33
34	/// Get the observers
35	#[must_use]
36	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>>;
37
38	/// Get the publishers
39	#[must_use]
40	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>;
41
42	/// Get the queriers
43	#[must_use]
44	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>>;
45
46	/// Get the responders
47	#[must_use]
48	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>>;
49
50	/// Method for upgrading [`OperationState`] all registered capabilities
51	///
52	/// The capabilities are upgraded in the order
53	/// - [`LivelinessSubscriber`]s
54	/// - [`Responder`]: `Observable`, `Queryable`, `Subscriber`
55	/// - [`Publisher`]s the
56	/// - [`Observer`]s and the
57	/// - [`Querier`]s
58	///
59	/// # Errors
60	/// Currently none
61	fn upgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
62		// start liveliness subscriber
63		#[cfg(feature = "unstable")]
64		self.liveliness_subscribers()
65			.write()
66			.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
67			.iter_mut()
68			.for_each(|subscriber| {
69				let _ = subscriber.1.manage_operation_state(new_state);
70			});
71
72		// start all registered responders
73		self.responders()
74			.write()
75			.map_err(|_| Error::ModifyStruct("subscribers".into()))?
76			.iter_mut()
77			.for_each(|subscriber| {
78				let _ = subscriber.1.manage_operation_state(new_state);
79			});
80
81		// init all registered publishers
82		self.publishers()
83			.write()
84			.map_err(|_| Error::ModifyStruct("publishers".into()))?
85			.iter_mut()
86			.for_each(|publisher| {
87				if let Err(reason) = publisher.1.manage_operation_state(new_state) {
88					error!(
89						"could not initialize publisher for {}, reason: {}",
90						publisher.1.selector(),
91						reason
92					);
93				};
94			});
95
96		// init all registered observers
97		self.observers()
98			.write()
99			.map_err(|_| Error::ModifyStruct("observers".into()))?
100			.iter_mut()
101			.for_each(|observer| {
102				if let Err(reason) = observer.1.manage_operation_state(new_state) {
103					error!(
104						"could not initialize observer for {}, reason: {}",
105						observer.1.selector(),
106						reason
107					);
108				};
109			});
110
111		// init all registered queries
112		self.queriers()
113			.write()
114			.map_err(|_| Error::ModifyStruct("queries".into()))?
115			.iter_mut()
116			.for_each(|query| {
117				if let Err(reason) = query.1.manage_operation_state(new_state) {
118					error!(
119						"could not initialize query for {}, reason: {}",
120						query.1.selector(),
121						reason
122					);
123				};
124			});
125
126		Ok(())
127	}
128
129	/// Method for downgrading [`OperationState`] all registered capabilities
130	///
131	/// The capabilities are downgraded in reverse order of their start in [`Communicator::upgrade_capabilities`]
132	///
133	/// # Errors
134	/// Currently none
135	fn downgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
136		// reverse order of start!
137		// de-init all registered queries
138		self.queriers()
139			.write()
140			.map_err(|_| Error::ModifyStruct("queries".into()))?
141			.iter_mut()
142			.for_each(|query| {
143				if let Err(reason) = query.1.manage_operation_state(new_state) {
144					error!(
145						"could not de-initialize query for {}, reason: {}",
146						query.1.selector(),
147						reason
148					);
149				};
150			});
151
152		// de-init all registered observers
153		self.observers()
154			.write()
155			.map_err(|_| Error::ModifyStruct("observers".into()))?
156			.iter_mut()
157			.for_each(|observer| {
158				if let Err(reason) = observer.1.manage_operation_state(new_state) {
159					error!(
160						"could not de-initialize observer for {}, reason: {}",
161						observer.1.selector(),
162						reason
163					);
164				};
165			});
166
167		// de-init all registered publishers
168		self.publishers()
169			.write()
170			.map_err(|_| Error::ModifyStruct("publishers".into()))?
171			.iter_mut()
172			.for_each(|publisher| {
173				let _ = publisher.1.manage_operation_state(new_state);
174			});
175
176		// stop all registered responders
177		self.responders()
178			.write()
179			.map_err(|_| Error::ModifyStruct("subscribers".into()))?
180			.iter_mut()
181			.for_each(|subscriber| {
182				let _ = subscriber.1.manage_operation_state(new_state);
183			});
184
185		// stop all registered liveliness subscribers
186		#[cfg(feature = "unstable")]
187		self.liveliness_subscribers()
188			.write()
189			.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
190			.iter_mut()
191			.for_each(|subscriber| {
192				let _ = subscriber.1.manage_operation_state(new_state);
193			});
194
195		Ok(())
196	}
197
198	/// the uuid of the communicator
199	#[must_use]
200	fn uuid(&self) -> String;
201
202	/// the mode of the communicator
203	#[must_use]
204	fn mode(&self) -> &String;
205
206	/// get the default session
207	fn default_session(&self) -> Arc<Session>;
208
209	/// get the session with `id`
210	fn session(&self, id: &str) -> Option<Arc<Session>>;
211
212	/// get all sessions
213	fn sessions(&self) -> Vec<Arc<Session>>;
214}