dimas_com/zenoh/
communicator.rs

1// Copyright © 2023 Stephan Kunz
2#![allow(unused_imports)]
3
4//! Implements the zenoh communication capabilities.
5//!
6
7#[doc(hidden)]
8extern crate alloc;
9
10#[cfg(feature = "std")]
11extern crate std;
12
13// region:		--- modules
14use crate::{error::Error, traits::CommunicatorImplementationMethods};
15use alloc::{
16	borrow::ToOwned,
17	boxed::Box,
18	string::{String, ToString},
19	sync::Arc,
20	vec::Vec,
21};
22use core::{fmt::Debug, time::Duration};
23use dimas_core::{
24	Result,
25	enums::OperationState,
26	message_types::{Message, QueryableMsg},
27	traits::Capability,
28};
29use zenoh::config::WhatAmI;
30#[cfg(feature = "unstable")]
31use zenoh::sample::Locality;
32use zenoh::{
33	Session, Wait,
34	query::{ConsolidationMode, QueryTarget},
35	sample::SampleKind,
36};
37// endregion:	--- modules
38
39// region:		--- Communicator
40/// [`Communicator`] handles all communication aspects
41#[allow(clippy::module_name_repetitions)]
42#[derive(Debug)]
43pub struct Communicator {
44	/// The zenoh session
45	session: Arc<Session>,
46	/// Mode of the session (router|peer|client)
47	mode: String,
48}
49
50impl Capability for Communicator {
51	fn manage_operation_state(&self, _state: &OperationState) -> Result<()> {
52		Ok(())
53	}
54}
55
56impl CommunicatorImplementationMethods for Communicator {
57	/// Send a put message [`Message`] using the given `selector`
58	/// # Errors
59	#[allow(clippy::needless_pass_by_value)]
60	fn put(&self, selector: &str, message: Message) -> Result<()> {
61		self.session
62			.put(selector, message.value())
63			.wait()
64			.map_err(|source| Error::PublishingPut { source }.into())
65	}
66
67	/// Send a delete message using the given `selector`.
68	/// # Errors
69	fn delete(&self, selector: &str) -> Result<()> {
70		self.session
71			.delete(selector)
72			.wait()
73			.map_err(|source| Error::PublishingDelete { source }.into())
74	}
75
76	/// Send a query with an optional [`Message`] using the given `selector`.
77	/// Answers are collected via callback
78	/// # Errors
79	/// # Panics
80	fn get(
81		&self,
82		selector: &str,
83		message: Option<Message>,
84		mut callback: Option<&mut dyn FnMut(QueryableMsg) -> Result<()>>,
85	) -> Result<()> {
86		let builder = message
87			.map_or_else(
88				|| self.session.get(selector),
89				|msg| self.session.get(selector).payload(msg.value()),
90			)
91			.consolidation(ConsolidationMode::None)
92			.target(QueryTarget::All);
93
94		#[cfg(feature = "unstable")]
95		let builder = builder.allowed_destination(Locality::Any);
96
97		let query = builder
98			.timeout(Duration::from_millis(5000))
99			.wait()
100			.map_err(|source| Error::QueryCreation { source })?;
101
102		let mut unreached = true;
103		let mut retry_count = 0u8;
104
105		while unreached && retry_count <= 5 {
106			retry_count += 1;
107			while let Ok(reply) = query.recv() {
108				match reply.result() {
109					Ok(sample) => match sample.kind() {
110						SampleKind::Put => {
111							let content: Vec<u8> = sample.payload().to_bytes().into_owned();
112							// CommunicatorImplementation::Zenoh(zenoh) =>
113							callback.as_deref_mut().map_or_else(
114								|| Err(Error::NotImplemented),
115								|callback| {
116									callback(QueryableMsg(content))
117										.map_err(|source| Error::QueryCallback { source })
118								},
119							)?;
120						}
121						SampleKind::Delete => {
122							todo!("Delete in Query");
123						}
124					},
125					Err(err) => {
126						let content= err.payload().try_to_string()?;
127						std::println!(">> Zenoh Communicator received (ERROR: '{:?}' for {})", &content, &selector);
128					}
129				}
130				unreached = false;
131			}
132			if unreached {
133				if retry_count < 5 {
134					std::thread::sleep(Duration::from_millis(1000));
135				} else {
136					return Err(Error::AccessingQueryable {
137						selector: selector.to_string(),
138					}
139					.into());
140				}
141			}
142		}
143		Ok(())
144	}
145}
146
147impl Communicator {
148	/// Constructor
149	/// # Errors
150	pub fn new(config: &zenoh::Config) -> Result<Self> {
151		#[cfg(feature = "unstable")]
152		let kind = config.mode().unwrap_or(WhatAmI::Peer).to_string();
153		#[cfg(not(feature = "unstable"))]
154		let kind = WhatAmI::Peer.to_string();
155		let session = Arc::new(
156			zenoh::open(config.to_owned())
157				.wait()
158				.map_err(|source| Error::CreateCommunicator { source })?,
159		);
160		Ok(Self {
161			session,
162			mode: kind,
163		})
164	}
165
166	/// Get globally unique ID
167	#[must_use]
168	pub fn uuid(&self) -> String {
169		self.session.zid().to_string()
170	}
171
172	/// Get session reference
173	#[must_use]
174	pub fn session(&self) -> Arc<Session> {
175		self.session.clone()
176	}
177
178	/// Get session mode
179	#[must_use]
180	pub const fn mode(&self) -> &String {
181		&self.mode
182	}
183}
184// endregion:	--- Communicator
185
186#[cfg(test)]
187mod tests {
188	use super::*;
189	//use serial_test::serial;
190
191	// check, that the auto traits are available
192	const fn is_normal<T: Sized + Send + Sync>() {}
193
194	#[test]
195	const fn normal_types() {
196		is_normal::<Communicator>();
197	}
198
199	#[tokio::test(flavor = "multi_thread")]
200	//#[serial]
201	async fn communicator_create() -> Result<()> {
202		let cfg = dimas_config::Config::default();
203		let _peer = Communicator::new(cfg.zenoh_config())?;
204		Ok(())
205	}
206}