#![allow(unused_imports)]
#[doc(hidden)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use crate::{error::Error, traits::CommunicatorImplementationMethods};
use alloc::{
borrow::ToOwned,
boxed::Box,
string::{String, ToString},
sync::Arc,
vec::Vec,
};
use core::{fmt::Debug, time::Duration};
use dimas_core::{
Result,
enums::OperationState,
message_types::{Message, QueryableMsg},
traits::Capability,
};
use zenoh::config::WhatAmI;
#[cfg(feature = "unstable")]
use zenoh::sample::Locality;
use zenoh::{
Session, Wait,
query::{ConsolidationMode, QueryTarget},
sample::SampleKind,
};
#[allow(clippy::module_name_repetitions)]
#[derive(Debug)]
pub struct Communicator {
session: Arc<Session>,
mode: String,
}
impl Capability for Communicator {
fn manage_operation_state(&self, _state: &OperationState) -> Result<()> {
Ok(())
}
}
impl CommunicatorImplementationMethods for Communicator {
#[allow(clippy::needless_pass_by_value)]
fn put(&self, selector: &str, message: Message) -> Result<()> {
self.session
.put(selector, message.value())
.wait()
.map_err(|source| Error::PublishingPut { source }.into())
}
fn delete(&self, selector: &str) -> Result<()> {
self.session
.delete(selector)
.wait()
.map_err(|source| Error::PublishingDelete { source }.into())
}
fn get(
&self,
selector: &str,
message: Option<Message>,
mut callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
) -> Result<()> {
let builder = message
.map_or_else(
|| self.session.get(selector),
|msg| self.session.get(selector).payload(msg.value()),
)
.consolidation(ConsolidationMode::None)
.target(QueryTarget::All);
#[cfg(feature = "unstable")]
let builder = builder.allowed_destination(Locality::Any);
let query = builder
.timeout(Duration::from_millis(5000))
.wait()
.map_err(|source| Error::QueryCreation { source })?;
let mut unreached = true;
let mut retry_count = 0u8;
while unreached && retry_count <= 5 {
retry_count += 1;
while let Ok(reply) = query.recv() {
match reply.result() {
Ok(sample) => match sample.kind() {
SampleKind::Put => {
let content: Vec<u8> = sample.payload().to_bytes().into_owned();
callback.as_deref_mut().map_or_else(
|| Err(Error::NotImplemented),
|callback| {
callback(QueryableMsg(content))
.map_err(|source| Error::QueryCallback { source })
},
)?;
}
SampleKind::Delete => {
todo!("Delete in Query");
}
},
Err(err) => {
let content= err.payload().try_to_string()?;
std::println!(">> Zenoh Communicator received (ERROR: '{:?}' for {})", &content, &selector);
}
}
unreached = false;
}
if unreached {
if retry_count < 5 {
std::thread::sleep(Duration::from_millis(1000));
} else {
return Err(Error::AccessingQueryable {
selector: selector.to_string(),
}
.into());
}
}
}
Ok(())
}
}
impl Communicator {
pub fn new(config: &zenoh::Config) -> Result<Self> {
#[cfg(feature = "unstable")]
let kind = config.mode().unwrap_or(WhatAmI::Peer).to_string();
#[cfg(not(feature = "unstable"))]
let kind = WhatAmI::Peer.to_string();
let session = Arc::new(
zenoh::open(config.to_owned())
.wait()
.map_err(|source| Error::CreateCommunicator { source })?,
);
Ok(Self {
session,
mode: kind,
})
}
#[must_use]
pub fn uuid(&self) -> String {
self.session.zid().to_string()
}
#[must_use]
pub fn session(&self) -> Arc<Session> {
self.session.clone()
}
#[must_use]
pub const fn mode(&self) -> &String {
&self.mode
}
}
#[cfg(test)]
mod tests {
use super::*;
const fn is_normal<T: Sized + Send + Sync>() {}
#[test]
const fn normal_types() {
is_normal::<Communicator>();
}
#[tokio::test(flavor = "multi_thread")]
async fn communicator_create() -> Result<()> {
let cfg = dimas_config::Config::default();
let _peer = Communicator::new(cfg.zenoh_config())?;
Ok(())
}
}