use dimas_core::{
error::{DimasError, Result},
message_types::{Message, QueryableMsg},
};
use std::fmt::Debug;
use std::sync::Arc;
use zenoh::{
config::WhatAmI,
query::{ConsolidationMode, QueryTarget},
sample::{Locality, SampleKind},
Session, Wait,
};
#[derive(Debug)]
pub struct Communicator {
session: Arc<Session>,
mode: String,
}
impl Communicator {
pub fn new(config: &dimas_config::Config) -> Result<Self> {
let cfg = config.zenoh_config();
let kind = cfg.mode().unwrap_or(WhatAmI::Peer).to_string();
let session = Arc::new(
zenoh::open(cfg)
.wait()
.map_err(DimasError::CreateSession)?,
);
Ok(Self {
session,
mode: kind,
})
}
#[must_use]
pub fn uuid(&self) -> String {
self.session.zid().to_string()
}
#[must_use]
pub fn session(&self) -> Arc<Session> {
self.session.clone()
}
#[must_use]
pub const fn mode(&self) -> &String {
&self.mode
}
#[allow(clippy::needless_pass_by_value)]
pub fn put(&self, selector: &str, message: Message) -> Result<()> {
self.session
.put(selector, message.value())
.wait()
.map_err(|_| DimasError::Put.into())
}
pub fn delete(&self, selector: &str) -> Result<()> {
self.session
.delete(selector)
.wait()
.map_err(|_| DimasError::Delete.into())
}
pub fn get<F>(&self, selector: &str, message: Option<Message>, mut callback: F) -> Result<()>
where
F: FnMut(QueryableMsg) -> Result<()> + Sized,
{
let replies = message
.map_or_else(
|| self.session.get(selector),
|msg| self.session.get(selector).payload(msg.value()),
)
.consolidation(ConsolidationMode::None)
.target(QueryTarget::All)
.allowed_destination(Locality::Any)
.wait()
.map_err(|_| DimasError::ShouldNotHappen)?;
while let Ok(reply) = replies.recv() {
match reply.result() {
Ok(sample) => match sample.kind() {
SampleKind::Put => {
let content: Vec<u8> = sample.payload().into();
callback(QueryableMsg(content))?;
}
SampleKind::Delete => {
println!("Delete in Query");
}
},
Err(err) => {
let content: Vec<u8> = err.payload().into();
println!(">> Received (ERROR: '{:?}' for {})", &content, &selector);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
const fn is_normal<T: Sized + Send + Sync + Unpin>() {}
#[test]
const fn normal_types() {
is_normal::<Communicator>();
}
#[tokio::test(flavor = "multi_thread")]
async fn communicator_create() -> Result<()> {
let cfg = dimas_config::Config::default();
let _peer = Communicator::new(&cfg)?;
Ok(())
}
}