1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright © 2023 Stephan Kunz

//! Communicator implements the communication capabilities.
//!

// region:		--- modules
use dimas_core::{
	error::{DimasError, Result},
	message_types::{Message, QueryableMsg},
};
use std::fmt::Debug;
use std::sync::Arc;
use zenoh::{
	config::WhatAmI,
	query::{ConsolidationMode, QueryTarget},
	sample::{Locality, SampleKind},
	Session, Wait,
};
// endregion:	--- modules

// region:		--- Communicator
/// [`Communicator`] handles all communication aspects
#[derive(Debug)]
pub struct Communicator {
	/// The zenoh session
	session: Arc<Session>,
	/// Mode of the session (router|peer|client)
	mode: String,
}

impl Communicator {
	/// Constructor
	/// # Errors
	pub fn new(config: &dimas_config::Config) -> Result<Self> {
		let cfg = config.zenoh_config();
		let kind = cfg.mode().unwrap_or(WhatAmI::Peer).to_string();
		let session = Arc::new(
			zenoh::open(cfg)
				.wait()
				.map_err(DimasError::CreateSession)?,
		);
		Ok(Self {
			session,
			mode: kind,
		})
	}

	/// Get globally unique ID
	#[must_use]
	pub fn uuid(&self) -> String {
		self.session.zid().to_string()
	}

	/// Get session reference
	#[must_use]
	pub fn session(&self) -> Arc<Session> {
		self.session.clone()
	}

	/// Get session mode
	#[must_use]
	pub const fn mode(&self) -> &String {
		&self.mode
	}

	/// Send an ad hoc put `message` of type `Message` using the given `selector`.
	/// # Errors
	#[allow(clippy::needless_pass_by_value)]
	pub fn put(&self, selector: &str, message: Message) -> Result<()> {
		self.session
			.put(selector, message.value())
			.wait()
			.map_err(|_| DimasError::Put.into())
	}

	/// Send an ad hoc delete using the given `selector`.
	/// # Errors
	pub fn delete(&self, selector: &str) -> Result<()> {
		self.session
			.delete(selector)
			.wait()
			.map_err(|_| DimasError::Delete.into())
	}

	/// Send an ad hoc query with an optional [`Message`] using the given `selector`.
	/// Answers are collected via callback
	/// # Errors
	/// # Panics
	pub fn get<F>(&self, selector: &str, message: Option<Message>, mut callback: F) -> Result<()>
	where
		F: FnMut(QueryableMsg) -> Result<()> + Sized,
	{
		let replies = message
			.map_or_else(
				|| self.session.get(selector),
				|msg| self.session.get(selector).payload(msg.value()),
			)
			.consolidation(ConsolidationMode::None)
			.target(QueryTarget::All)
			.allowed_destination(Locality::Any)
			//.timeout(Duration::from_millis(1000))
			.wait()
			.map_err(|_| DimasError::ShouldNotHappen)?;

		while let Ok(reply) = replies.recv() {
			match reply.result() {
				Ok(sample) => match sample.kind() {
					SampleKind::Put => {
						let content: Vec<u8> = sample.payload().into();
						callback(QueryableMsg(content))?;
					}
					SampleKind::Delete => {
						println!("Delete in Query");
					}
				},
				Err(err) => {
					let content: Vec<u8> = err.payload().into();
					println!(">> Received (ERROR: '{:?}' for {})", &content, &selector);
				}
			}
		}
		Ok(())
	}
}
// endregion:	--- Communicator

#[cfg(test)]
mod tests {
	use super::*;
	//use serial_test::serial;

	// check, that the auto traits are available
	const fn is_normal<T: Sized + Send + Sync + Unpin>() {}

	#[test]
	const fn normal_types() {
		is_normal::<Communicator>();
	}

	#[tokio::test(flavor = "multi_thread")]
	//#[serial]
	async fn communicator_create() -> Result<()> {
		let cfg = dimas_config::Config::default();
		let _peer = Communicator::new(&cfg)?;
		Ok(())
	}
}