dimas_com/zenoh/
communicator.rs1#![allow(unused_imports)]
3
4#[doc(hidden)]
8extern crate alloc;
9
10#[cfg(feature = "std")]
11extern crate std;
12
13use crate::{error::Error, traits::CommunicatorImplementationMethods};
15use alloc::{
16 borrow::ToOwned,
17 boxed::Box,
18 string::{String, ToString},
19 sync::Arc,
20 vec::Vec,
21};
22use core::{fmt::Debug, time::Duration};
23use dimas_core::{
24 Result,
25 enums::OperationState,
26 message_types::{Message, QueryableMsg},
27 traits::Capability,
28};
29use zenoh::config::WhatAmI;
30#[cfg(feature = "unstable")]
31use zenoh::sample::Locality;
32use zenoh::{
33 Session, Wait,
34 query::{ConsolidationMode, QueryTarget},
35 sample::SampleKind,
36};
37#[allow(clippy::module_name_repetitions)]
42#[derive(Debug)]
43pub struct Communicator {
44 session: Arc<Session>,
46 mode: String,
48}
49
50impl Capability for Communicator {
51 fn manage_operation_state(&self, _state: &OperationState) -> Result<()> {
52 Ok(())
53 }
54}
55
56impl CommunicatorImplementationMethods for Communicator {
57 #[allow(clippy::needless_pass_by_value)]
60 fn put(&self, selector: &str, message: Message) -> Result<()> {
61 self.session
62 .put(selector, message.value())
63 .wait()
64 .map_err(|source| Error::PublishingPut { source }.into())
65 }
66
67 fn delete(&self, selector: &str) -> Result<()> {
70 self.session
71 .delete(selector)
72 .wait()
73 .map_err(|source| Error::PublishingDelete { source }.into())
74 }
75
76 fn get(
81 &self,
82 selector: &str,
83 message: Option<Message>,
84 mut callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
85 ) -> Result<()> {
86 let builder = message
87 .map_or_else(
88 || self.session.get(selector),
89 |msg| self.session.get(selector).payload(msg.value()),
90 )
91 .consolidation(ConsolidationMode::None)
92 .target(QueryTarget::All);
93
94 #[cfg(feature = "unstable")]
95 let builder = builder.allowed_destination(Locality::Any);
96
97 let query = builder
98 .timeout(Duration::from_millis(5000))
99 .wait()
100 .map_err(|source| Error::QueryCreation { source })?;
101
102 let mut unreached = true;
103 let mut retry_count = 0u8;
104
105 while unreached && retry_count <= 5 {
106 retry_count += 1;
107 while let Ok(reply) = query.recv() {
108 match reply.result() {
109 Ok(sample) => match sample.kind() {
110 SampleKind::Put => {
111 let content: Vec<u8> = sample.payload().to_bytes().into_owned();
112 callback.as_deref_mut().map_or_else(
114 || Err(Error::NotImplemented),
115 |callback| {
116 callback(QueryableMsg(content))
117 .map_err(|source| Error::QueryCallback { source })
118 },
119 )?;
120 }
121 SampleKind::Delete => {
122 todo!("Delete in Query");
123 }
124 },
125 Err(err) => {
126 let content= err.payload().try_to_string()?;
127 std::println!(">> Zenoh Communicator received (ERROR: '{:?}' for {})", &content, &selector);
128 }
129 }
130 unreached = false;
131 }
132 if unreached {
133 if retry_count < 5 {
134 std::thread::sleep(Duration::from_millis(1000));
135 } else {
136 return Err(Error::AccessingQueryable {
137 selector: selector.to_string(),
138 }
139 .into());
140 }
141 }
142 }
143 Ok(())
144 }
145}
146
147impl Communicator {
148 pub fn new(config: &zenoh::Config) -> Result<Self> {
151 #[cfg(feature = "unstable")]
152 let kind = config.mode().unwrap_or(WhatAmI::Peer).to_string();
153 #[cfg(not(feature = "unstable"))]
154 let kind = WhatAmI::Peer.to_string();
155 let session = Arc::new(
156 zenoh::open(config.to_owned())
157 .wait()
158 .map_err(|source| Error::CreateCommunicator { source })?,
159 );
160 Ok(Self {
161 session,
162 mode: kind,
163 })
164 }
165
166 #[must_use]
168 pub fn uuid(&self) -> String {
169 self.session.zid().to_string()
170 }
171
172 #[must_use]
174 pub fn session(&self) -> Arc<Session> {
175 self.session.clone()
176 }
177
178 #[must_use]
180 pub const fn mode(&self) -> &String {
181 &self.mode
182 }
183}
184#[cfg(test)]
187mod tests {
188 use super::*;
189 const fn is_normal<T: Sized + Send + Sync>() {}
193
194 #[test]
195 const fn normal_types() {
196 is_normal::<Communicator>();
197 }
198
199 #[tokio::test(flavor = "multi_thread")]
200 async fn communicator_create() -> Result<()> {
202 let cfg = dimas_config::Config::default();
203 let _peer = Communicator::new(cfg.zenoh_config())?;
204 Ok(())
205 }
206}