dimas_com/traits/
communicator.rs

1// Copyright © 2024 Stephan Kunz
2
3//! Traits for communication.
4//!
5
6#[doc(hidden)]
7extern crate alloc;
8
9#[cfg(feature = "std")]
10extern crate std;
11
12// region:		--- modules
13use super::LivelinessSubscriber;
14use super::{CommunicatorMethods, Observer, Publisher, Querier, Responder};
15use crate::error::Error;
16use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
17use dimas_core::{enums::OperationState, error::Result, traits::Capability};
18#[cfg(feature = "std")]
19use std::{collections::HashMap, sync::RwLock};
20use tracing::error;
21use zenoh::Session;
22// endregion:	--- modules
23
24// region:		--- Communicator
25/// the methodes to be implemented by any communicator
26pub trait Communicator: Capability + CommunicatorMethods + Send + Sync {
27	/// Get the liveliness subscribers
28	#[must_use]
29	fn liveliness_subscribers(&self)
30	-> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>;
31
32	/// Get the observers
33	#[must_use]
34	fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>>;
35
36	/// Get the publishers
37	#[must_use]
38	fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>;
39
40	/// Get the queriers
41	#[must_use]
42	fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>>;
43
44	/// Get the responders
45	#[must_use]
46	fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>>;
47
48	/// Method for upgrading [`OperationState`] all registered capabilities
49	///
50	/// The capabilities are upgraded in the order
51	/// - [`LivelinessSubscriber`]s
52	/// - [`Responder`]: `Observable`, `Queryable`, `Subscriber`
53	/// - [`Publisher`]s the
54	/// - [`Observer`]s and the
55	/// - [`Querier`]s
56	///
57	/// # Errors
58	/// Currently none
59	fn upgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
60		// start liveliness subscriber
61		self.liveliness_subscribers()
62			.write()
63			.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
64			.iter_mut()
65			.for_each(|subscriber| {
66				let _ = subscriber.1.manage_operation_state(new_state);
67			});
68
69		// start all registered responders
70		self.responders()
71			.write()
72			.map_err(|_| Error::ModifyStruct("subscribers".into()))?
73			.iter_mut()
74			.for_each(|subscriber| {
75				let _ = subscriber.1.manage_operation_state(new_state);
76			});
77
78		// init all registered publishers
79		self.publishers()
80			.write()
81			.map_err(|_| Error::ModifyStruct("publishers".into()))?
82			.iter_mut()
83			.for_each(|publisher| {
84				if let Err(reason) = publisher.1.manage_operation_state(new_state) {
85					error!(
86						"could not initialize publisher for {}, reason: {}",
87						publisher.1.selector(),
88						reason
89					);
90				}
91			});
92
93		// init all registered observers
94		self.observers()
95			.write()
96			.map_err(|_| Error::ModifyStruct("observers".into()))?
97			.iter_mut()
98			.for_each(|observer| {
99				if let Err(reason) = observer.1.manage_operation_state(new_state) {
100					error!(
101						"could not initialize observer for {}, reason: {}",
102						observer.1.selector(),
103						reason
104					);
105				}
106			});
107
108		// init all registered queries
109		self.queriers()
110			.write()
111			.map_err(|_| Error::ModifyStruct("queries".into()))?
112			.iter_mut()
113			.for_each(|query| {
114				if let Err(reason) = query.1.manage_operation_state(new_state) {
115					error!(
116						"could not initialize query for {}, reason: {}",
117						query.1.selector(),
118						reason
119					);
120				}
121			});
122
123		Ok(())
124	}
125
126	/// Method for downgrading [`OperationState`] all registered capabilities
127	///
128	/// The capabilities are downgraded in reverse order of their start in [`Communicator::upgrade_capabilities`]
129	///
130	/// # Errors
131	/// Currently none
132	fn downgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
133		// reverse order of start!
134		// de-init all registered queries
135		self.queriers()
136			.write()
137			.map_err(|_| Error::ModifyStruct("queries".into()))?
138			.iter_mut()
139			.for_each(|query| {
140				if let Err(reason) = query.1.manage_operation_state(new_state) {
141					error!(
142						"could not de-initialize query for {}, reason: {}",
143						query.1.selector(),
144						reason
145					);
146				}
147			});
148
149		// de-init all registered observers
150		self.observers()
151			.write()
152			.map_err(|_| Error::ModifyStruct("observers".into()))?
153			.iter_mut()
154			.for_each(|observer| {
155				if let Err(reason) = observer.1.manage_operation_state(new_state) {
156					error!(
157						"could not de-initialize observer for {}, reason: {}",
158						observer.1.selector(),
159						reason
160					);
161				}
162			});
163
164		// de-init all registered publishers
165		self.publishers()
166			.write()
167			.map_err(|_| Error::ModifyStruct("publishers".into()))?
168			.iter_mut()
169			.for_each(|publisher| {
170				let _ = publisher.1.manage_operation_state(new_state);
171			});
172
173		// stop all registered responders
174		self.responders()
175			.write()
176			.map_err(|_| Error::ModifyStruct("subscribers".into()))?
177			.iter_mut()
178			.for_each(|subscriber| {
179				let _ = subscriber.1.manage_operation_state(new_state);
180			});
181
182		// stop all registered liveliness subscribers
183		self.liveliness_subscribers()
184			.write()
185			.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
186			.iter_mut()
187			.for_each(|subscriber| {
188				let _ = subscriber.1.manage_operation_state(new_state);
189			});
190
191		Ok(())
192	}
193
194	/// the uuid of the communicator
195	#[must_use]
196	fn uuid(&self) -> String;
197
198	/// the mode of the communicator
199	#[must_use]
200	fn mode(&self) -> &String;
201
202	/// get the default session
203	fn default_session(&self) -> Arc<Session>;
204
205	/// get the session with `id`
206	fn session(&self, id: &str) -> Option<Arc<Session>>;
207
208	/// get all sessions
209	fn sessions(&self) -> Vec<Arc<Session>>;
210}