communication_layer_pub_sub/
zenoh.rs1use super::{CommunicationLayer, Publisher, Subscriber};
4use crate::{BoxError, ReceivedSample};
5use std::{borrow::Cow, sync::Arc, time::Duration};
6use zenoh::{
7 prelude::{sync::SyncResolve, Config, Priority, SessionDeclarations, SplitBuffer},
8 publication::CongestionControl,
9};
10
11pub struct ZenohCommunicationLayer {
13 zenoh: Arc<zenoh::Session>,
14 topic_prefix: String,
15}
16
17impl ZenohCommunicationLayer {
18 pub fn init(config: Config, prefix: String) -> Result<Self, BoxError> {
24 let zenoh = ::zenoh::open(config)
25 .res_sync()
26 .map_err(BoxError::from)?
27 .into_arc();
28 Ok(Self {
29 zenoh,
30 topic_prefix: prefix,
31 })
32 }
33
34 fn prefixed(&self, topic: &str) -> String {
35 format!("{}/{topic}", self.topic_prefix)
36 }
37}
38
39impl CommunicationLayer for ZenohCommunicationLayer {
40 fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError> {
41 let publisher = self
42 .zenoh
43 .declare_publisher(self.prefixed(topic))
44 .congestion_control(CongestionControl::Block)
45 .priority(Priority::RealTime)
46 .res_sync()
47 .map_err(BoxError::from)?;
48
49 Ok(Box::new(ZenohPublisher { publisher }))
50 }
51
52 fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError> {
53 let subscriber = self
54 .zenoh
55 .declare_subscriber(self.prefixed(topic))
56 .reliable()
57 .res_sync()
58 .map_err(BoxError::from)?;
59
60 Ok(Box::new(ZenohReceiver(subscriber)))
61 }
62}
63
64impl Drop for ZenohCommunicationLayer {
65 fn drop(&mut self) {
66 std::thread::sleep(Duration::from_secs_f32(2.0));
72 }
73}
74
75#[derive(Clone)]
76struct ZenohPublisher {
77 publisher: zenoh::publication::Publisher<'static>,
78}
79
80impl Publisher for ZenohPublisher {
81 fn prepare(&self, len: usize) -> Result<Box<dyn crate::PublishSample>, BoxError> {
82 Ok(Box::new(ZenohPublishSample {
83 sample: vec![0; len],
84 publisher: self.publisher.clone(),
85 }))
86 }
87
88 fn dyn_clone(&self) -> Box<dyn Publisher> {
89 Box::new(self.clone())
90 }
91}
92
93#[derive(Clone)]
94struct ZenohPublishSample {
95 sample: Vec<u8>,
96 publisher: zenoh::publication::Publisher<'static>,
97}
98
99impl<'a> crate::PublishSample<'a> for ZenohPublishSample {
100 fn as_mut_slice(&mut self) -> &mut [u8] {
101 &mut self.sample
102 }
103
104 fn publish(self: Box<Self>) -> Result<(), BoxError> {
105 self.publisher
106 .put(self.sample)
107 .res_sync()
108 .map_err(BoxError::from)
109 }
110}
111
112struct ZenohReceiver(
113 zenoh::subscriber::Subscriber<'static, flume::Receiver<zenoh::sample::Sample>>,
114);
115
116impl Subscriber for ZenohReceiver {
117 fn recv(&mut self) -> Result<Option<Box<dyn ReceivedSample>>, BoxError> {
118 match self.0.recv() {
119 Ok(sample) => Ok(Some(Box::new(ZenohReceivedSample {
120 sample: sample.value.payload,
121 }))),
122 Err(_) => Ok(None),
123 }
124 }
125}
126
127struct ZenohReceivedSample {
128 sample: zenoh::buffers::ZBuf,
129}
130
131impl ReceivedSample for ZenohReceivedSample {
132 fn get(&self) -> Cow<[u8]> {
133 self.sample.contiguous()
134 }
135}