#[doc(hidden)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use super::LivelinessSubscriber;
use super::{CommunicatorMethods, Observer, Publisher, Querier, Responder};
use crate::error::Error;
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use dimas_core::{enums::OperationState, error::Result, traits::Capability};
#[cfg(feature = "std")]
use std::{collections::HashMap, sync::RwLock};
use tracing::error;
use zenoh::Session;
pub trait Communicator: Capability + CommunicatorMethods + Send + Sync {
#[must_use]
fn liveliness_subscribers(&self)
-> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>;
#[must_use]
fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>>;
#[must_use]
fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>;
#[must_use]
fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>>;
#[must_use]
fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>>;
fn upgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
self.liveliness_subscribers()
.write()
.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
.iter_mut()
.for_each(|subscriber| {
let _ = subscriber.1.manage_operation_state(new_state);
});
self.responders()
.write()
.map_err(|_| Error::ModifyStruct("subscribers".into()))?
.iter_mut()
.for_each(|subscriber| {
let _ = subscriber.1.manage_operation_state(new_state);
});
self.publishers()
.write()
.map_err(|_| Error::ModifyStruct("publishers".into()))?
.iter_mut()
.for_each(|publisher| {
if let Err(reason) = publisher.1.manage_operation_state(new_state) {
error!(
"could not initialize publisher for {}, reason: {}",
publisher.1.selector(),
reason
);
}
});
self.observers()
.write()
.map_err(|_| Error::ModifyStruct("observers".into()))?
.iter_mut()
.for_each(|observer| {
if let Err(reason) = observer.1.manage_operation_state(new_state) {
error!(
"could not initialize observer for {}, reason: {}",
observer.1.selector(),
reason
);
}
});
self.queriers()
.write()
.map_err(|_| Error::ModifyStruct("queries".into()))?
.iter_mut()
.for_each(|query| {
if let Err(reason) = query.1.manage_operation_state(new_state) {
error!(
"could not initialize query for {}, reason: {}",
query.1.selector(),
reason
);
}
});
Ok(())
}
fn downgrade_capabilities(&self, new_state: &OperationState) -> Result<()> {
self.queriers()
.write()
.map_err(|_| Error::ModifyStruct("queries".into()))?
.iter_mut()
.for_each(|query| {
if let Err(reason) = query.1.manage_operation_state(new_state) {
error!(
"could not de-initialize query for {}, reason: {}",
query.1.selector(),
reason
);
}
});
self.observers()
.write()
.map_err(|_| Error::ModifyStruct("observers".into()))?
.iter_mut()
.for_each(|observer| {
if let Err(reason) = observer.1.manage_operation_state(new_state) {
error!(
"could not de-initialize observer for {}, reason: {}",
observer.1.selector(),
reason
);
}
});
self.publishers()
.write()
.map_err(|_| Error::ModifyStruct("publishers".into()))?
.iter_mut()
.for_each(|publisher| {
let _ = publisher.1.manage_operation_state(new_state);
});
self.responders()
.write()
.map_err(|_| Error::ModifyStruct("subscribers".into()))?
.iter_mut()
.for_each(|subscriber| {
let _ = subscriber.1.manage_operation_state(new_state);
});
self.liveliness_subscribers()
.write()
.map_err(|_| Error::ModifyStruct("liveliness subscribers".into()))?
.iter_mut()
.for_each(|subscriber| {
let _ = subscriber.1.manage_operation_state(new_state);
});
Ok(())
}
#[must_use]
fn uuid(&self) -> String;
#[must_use]
fn mode(&self) -> &String;
fn default_session(&self) -> Arc<Session>;
fn session(&self, id: &str) -> Option<Arc<Session>>;
fn sessions(&self) -> Vec<Arc<Session>>;
}