communication_layer_pub_sub/
zenoh.rs

1//! Provides [`ZenohCommunicationLayer`] to communicate over `zenoh`.
2
3use super::{CommunicationLayer, Publisher, Subscriber};
4use crate::{BoxError, ReceivedSample};
5use std::{borrow::Cow, sync::Arc, time::Duration};
6use zenoh::{
7    prelude::{sync::SyncResolve, Config, Priority, SessionDeclarations, SplitBuffer},
8    publication::CongestionControl,
9};
10
11/// Allows communication over `zenoh`.
12pub struct ZenohCommunicationLayer {
13    zenoh: Arc<zenoh::Session>,
14    topic_prefix: String,
15}
16
17impl ZenohCommunicationLayer {
18    /// Initializes a new `zenoh` session with the given configuration.
19    ///
20    /// The `prefix` is added to all topic names when using the [`publisher`][Self::publisher]
21    /// and [`subscriber`][Self::subscribe] methods. Pass an empty string if no prefix is
22    /// desired.
23    pub fn init(config: Config, prefix: String) -> Result<Self, BoxError> {
24        let zenoh = ::zenoh::open(config)
25            .res_sync()
26            .map_err(BoxError::from)?
27            .into_arc();
28        Ok(Self {
29            zenoh,
30            topic_prefix: prefix,
31        })
32    }
33
34    fn prefixed(&self, topic: &str) -> String {
35        format!("{}/{topic}", self.topic_prefix)
36    }
37}
38
39impl CommunicationLayer for ZenohCommunicationLayer {
40    fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError> {
41        let publisher = self
42            .zenoh
43            .declare_publisher(self.prefixed(topic))
44            .congestion_control(CongestionControl::Block)
45            .priority(Priority::RealTime)
46            .res_sync()
47            .map_err(BoxError::from)?;
48
49        Ok(Box::new(ZenohPublisher { publisher }))
50    }
51
52    fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError> {
53        let subscriber = self
54            .zenoh
55            .declare_subscriber(self.prefixed(topic))
56            .reliable()
57            .res_sync()
58            .map_err(BoxError::from)?;
59
60        Ok(Box::new(ZenohReceiver(subscriber)))
61    }
62}
63
64impl Drop for ZenohCommunicationLayer {
65    fn drop(&mut self) {
66        // wait a bit before closing to ensure that remaining published
67        // messages are sent out
68        //
69        // TODO: create a minimal example to reproduce the dropped messages
70        // and report this issue in the zenoh repo
71        std::thread::sleep(Duration::from_secs_f32(2.0));
72    }
73}
74
75#[derive(Clone)]
76struct ZenohPublisher {
77    publisher: zenoh::publication::Publisher<'static>,
78}
79
80impl Publisher for ZenohPublisher {
81    fn prepare(&self, len: usize) -> Result<Box<dyn crate::PublishSample>, BoxError> {
82        Ok(Box::new(ZenohPublishSample {
83            sample: vec![0; len],
84            publisher: self.publisher.clone(),
85        }))
86    }
87
88    fn dyn_clone(&self) -> Box<dyn Publisher> {
89        Box::new(self.clone())
90    }
91}
92
93#[derive(Clone)]
94struct ZenohPublishSample {
95    sample: Vec<u8>,
96    publisher: zenoh::publication::Publisher<'static>,
97}
98
99impl<'a> crate::PublishSample<'a> for ZenohPublishSample {
100    fn as_mut_slice(&mut self) -> &mut [u8] {
101        &mut self.sample
102    }
103
104    fn publish(self: Box<Self>) -> Result<(), BoxError> {
105        self.publisher
106            .put(self.sample)
107            .res_sync()
108            .map_err(BoxError::from)
109    }
110}
111
112struct ZenohReceiver(
113    zenoh::subscriber::Subscriber<'static, flume::Receiver<zenoh::sample::Sample>>,
114);
115
116impl Subscriber for ZenohReceiver {
117    fn recv(&mut self) -> Result<Option<Box<dyn ReceivedSample>>, BoxError> {
118        match self.0.recv() {
119            Ok(sample) => Ok(Some(Box::new(ZenohReceivedSample {
120                sample: sample.value.payload,
121            }))),
122            Err(_) => Ok(None),
123        }
124    }
125}
126
127struct ZenohReceivedSample {
128    sample: zenoh::buffers::ZBuf,
129}
130
131impl ReceivedSample for ZenohReceivedSample {
132    fn get(&self) -> Cow<[u8]> {
133        self.sample.contiguous()
134    }
135}