#[doc(hidden)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use crate::error::Error;
use crate::traits::LivelinessSubscriber;
use crate::{
enums::CommunicatorImplementation,
traits::{
Communicator, CommunicatorImplementationMethods, CommunicatorMethods, Observer, Publisher,
Querier, Responder,
},
};
use alloc::{
boxed::Box,
string::{String, ToString},
sync::Arc,
vec::Vec,
};
use dimas_config::Config;
use dimas_core::message_types::{Message, QueryableMsg};
use dimas_core::{Result, enums::OperationState, traits::Capability};
use std::{collections::HashMap, sync::RwLock};
use zenoh::{Session, config::ZenohId};
const INITIAL_SIZE: usize = 9;
const DEFAULT: &str = "default";
#[derive(Debug)]
pub struct MultiCommunicator {
uuid: ZenohId,
mode: String,
state: OperationState,
communicators: Arc<RwLock<HashMap<String, Arc<CommunicatorImplementation>>>>,
liveliness_subscribers: Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>>,
observers: Arc<RwLock<HashMap<String, Box<dyn Observer>>>>,
publishers: Arc<RwLock<HashMap<String, Box<dyn Publisher>>>>,
queriers: Arc<RwLock<HashMap<String, Box<dyn Querier>>>>,
responders: Arc<RwLock<HashMap<String, Box<dyn Responder>>>>,
}
impl Capability for MultiCommunicator {
fn manage_operation_state(&self, new_state: &OperationState) -> Result<()> {
if new_state >= &self.state {
self.upgrade_capabilities(new_state)?;
} else if new_state < &self.state {
self.downgrade_capabilities(new_state)?;
}
Ok(())
}
}
impl CommunicatorMethods for MultiCommunicator {
fn put(&self, selector: &str, message: Message) -> Result<()> {
let publishers = self
.publishers
.read()
.map_err(|_| Error::ReadAccess("publishers".into()))?;
#[allow(clippy::single_match_else)]
match publishers.get(selector) {
Some(publisher) => publisher.put(message),
None => {
let comm = self
.communicators
.read()
.map_err(|_| Error::ReadAccess("publishers".into()))?
.get(DEFAULT)
.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
.cloned()?;
comm.put(selector, message)
}
}
}
fn delete(&self, selector: &str) -> Result<()> {
let publishers = self
.publishers
.read()
.map_err(|_| Error::ReadAccess("publishers".into()))?;
#[allow(clippy::single_match_else)]
match publishers.get(selector) {
Some(publisher) => publisher.delete(),
None => {
let comm = self
.communicators
.read()
.map_err(|_| Error::ReadAccess("publishers".into()))?
.get(DEFAULT)
.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
.cloned()?;
#[allow(clippy::match_wildcard_for_single_variants)]
match comm.as_ref() {
CommunicatorImplementation::Zenoh(zenoh) => zenoh.delete(selector),
}
}
}
}
fn get(
&self,
selector: &str,
message: Option<Message>,
callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
) -> Result<()> {
let queriers = self
.queriers
.read()
.map_err(|_| Error::ReadAccess("queriers".into()))?;
#[allow(clippy::single_match_else)]
match queriers.get(selector) {
Some(querier) => querier.get(message, callback),
None => {
let comm = self
.communicators
.read()
.map_err(|_| Error::ReadAccess("queriers".into()))?
.get(DEFAULT)
.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
.cloned()?;
match comm.as_ref() {
CommunicatorImplementation::Zenoh(zenoh) => {
zenoh.get(selector, message, callback)
}
}
}
}
}
fn observe(&self, selector: &str, message: Option<Message>) -> Result<()> {
let observers = self
.observers
.read()
.map_err(|_| Error::ReadAccess("observers".into()))?;
#[allow(clippy::single_match_else)]
match observers.get(selector) {
Some(observer) => observer.request(message),
None => {
let comm = self
.communicators
.read()
.map_err(|_| Error::ReadAccess("observers".into()))?
.get(DEFAULT)
.ok_or_else(|| Error::NoCommunicator(DEFAULT.into()))
.cloned()?;
#[allow(clippy::match_wildcard_for_single_variants)]
match comm.as_ref() {
CommunicatorImplementation::Zenoh(_zenoh) => Err(Error::NotImplemented.into()),
}
}
}
}
fn watch(&self, _selector: &str, _message: Message) -> Result<()> {
Err(Error::NotImplemented.into())
}
}
impl Communicator for MultiCommunicator {
fn uuid(&self) -> String {
self.uuid.to_string()
}
fn mode(&self) -> &String {
&self.mode
}
fn liveliness_subscribers(
&self,
) -> Arc<RwLock<HashMap<String, Box<dyn LivelinessSubscriber>>>> {
self.liveliness_subscribers.clone()
}
fn observers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Observer>>>> {
self.observers.clone()
}
fn publishers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Publisher>>>> {
self.publishers.clone()
}
fn queriers(&self) -> Arc<RwLock<HashMap<String, Box<dyn Querier>>>> {
self.queriers.clone()
}
fn responders(&self) -> Arc<RwLock<HashMap<String, Box<dyn Responder>>>> {
self.responders.clone()
}
fn default_session(&self) -> Arc<Session> {
let com = self
.communicators
.read()
.expect("snh")
.get(DEFAULT)
.cloned()
.expect("snh");
match com.as_ref() {
CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
}
}
fn session(&self, id: &str) -> Option<Arc<zenoh::Session>> {
let com = self
.communicators
.read()
.expect("snh")
.get(id)
.cloned()
.expect("snh");
match com.as_ref() {
CommunicatorImplementation::Zenoh(communicator) => {
let com = communicator.session();
Some(com)
}
}
}
fn sessions(&self) -> Vec<Arc<Session>> {
let com: Vec<Arc<Session>> = self
.communicators
.read()
.expect("snh")
.values()
.map(|com| match com.as_ref() {
CommunicatorImplementation::Zenoh(communicator) => communicator.session(),
})
.collect();
com
}
}
impl MultiCommunicator {
pub fn new(config: &Config) -> Result<Self> {
let zenoh = crate::zenoh::Communicator::new(config.zenoh_config())?;
let uuid = zenoh.session().zid();
let mode = zenoh.mode().clone();
let com = Self {
uuid,
mode,
state: OperationState::Created,
communicators: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
liveliness_subscribers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
observers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
publishers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
queriers: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
responders: Arc::new(RwLock::new(HashMap::with_capacity(INITIAL_SIZE))),
};
com.communicators
.write()
.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
.insert(
"default".to_string(),
Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
);
if let Some(sessions) = config.sessions() {
for session in sessions {
match session.protocol.as_str() {
"zenoh" => {
let zenoh = crate::zenoh::Communicator::new(&session.config)?;
com.communicators
.write()
.map_err(|_| Error::ModifyStruct("commmunicators".into()))?
.insert(
session.name.clone(),
Arc::new(CommunicatorImplementation::Zenoh(zenoh)),
);
}
_ => {
return Err(Error::UnknownProtocol {
protocol: session.protocol.clone(),
}
.into());
}
}
}
}
Ok(com)
}
}
#[cfg(test)]
mod tests {
use super::*;
const fn is_normal<T: Sized + Send + Sync>() {}
#[test]
const fn normal_types() {
is_normal::<MultiCommunicator>();
}
}