communication-layer-pub-sub 0.3.12

`dora` goal is to be a low latency, composable, and distributed data flow.
Documentation
//! Provides [`ZenohCommunicationLayer`] to communicate over `zenoh`.

use super::{CommunicationLayer, Publisher, Subscriber};
use crate::{BoxError, ReceivedSample};
use std::{borrow::Cow, sync::Arc, time::Duration};
use zenoh::{
    prelude::{sync::SyncResolve, Config, Priority, SessionDeclarations, SplitBuffer},
    publication::CongestionControl,
};

/// Allows communication over `zenoh`.
pub struct ZenohCommunicationLayer {
    zenoh: Arc<zenoh::Session>,
    topic_prefix: String,
}

impl ZenohCommunicationLayer {
    /// Initializes a new `zenoh` session with the given configuration.
    ///
    /// The `prefix` is added to all topic names when using the [`publisher`][Self::publisher]
    /// and [`subscriber`][Self::subscribe] methods. Pass an empty string if no prefix is
    /// desired.
    pub fn init(config: Config, prefix: String) -> Result<Self, BoxError> {
        let zenoh = ::zenoh::open(config)
            .res_sync()
            .map_err(BoxError::from)?
            .into_arc();
        Ok(Self {
            zenoh,
            topic_prefix: prefix,
        })
    }

    fn prefixed(&self, topic: &str) -> String {
        format!("{}/{topic}", self.topic_prefix)
    }
}

impl CommunicationLayer for ZenohCommunicationLayer {
    fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError> {
        let publisher = self
            .zenoh
            .declare_publisher(self.prefixed(topic))
            .congestion_control(CongestionControl::Block)
            .priority(Priority::RealTime)
            .res_sync()
            .map_err(BoxError::from)?;

        Ok(Box::new(ZenohPublisher { publisher }))
    }

    fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError> {
        let subscriber = self
            .zenoh
            .declare_subscriber(self.prefixed(topic))
            .reliable()
            .res_sync()
            .map_err(BoxError::from)?;

        Ok(Box::new(ZenohReceiver(subscriber)))
    }
}

impl Drop for ZenohCommunicationLayer {
    fn drop(&mut self) {
        // wait a bit before closing to ensure that remaining published
        // messages are sent out
        //
        // TODO: create a minimal example to reproduce the dropped messages
        // and report this issue in the zenoh repo
        std::thread::sleep(Duration::from_secs_f32(2.0));
    }
}

#[derive(Clone)]
struct ZenohPublisher {
    publisher: zenoh::publication::Publisher<'static>,
}

impl Publisher for ZenohPublisher {
    fn prepare(&self, len: usize) -> Result<Box<dyn crate::PublishSample>, BoxError> {
        Ok(Box::new(ZenohPublishSample {
            sample: vec![0; len],
            publisher: self.publisher.clone(),
        }))
    }

    fn dyn_clone(&self) -> Box<dyn Publisher> {
        Box::new(self.clone())
    }
}

#[derive(Clone)]
struct ZenohPublishSample {
    sample: Vec<u8>,
    publisher: zenoh::publication::Publisher<'static>,
}

impl<'a> crate::PublishSample<'a> for ZenohPublishSample {
    fn as_mut_slice(&mut self) -> &mut [u8] {
        &mut self.sample
    }

    fn publish(self: Box<Self>) -> Result<(), BoxError> {
        self.publisher
            .put(self.sample)
            .res_sync()
            .map_err(BoxError::from)
    }
}

struct ZenohReceiver(
    zenoh::subscriber::Subscriber<'static, flume::Receiver<zenoh::sample::Sample>>,
);

impl Subscriber for ZenohReceiver {
    fn recv(&mut self) -> Result<Option<Box<dyn ReceivedSample>>, BoxError> {
        match self.0.recv() {
            Ok(sample) => Ok(Some(Box::new(ZenohReceivedSample {
                sample: sample.value.payload,
            }))),
            Err(_) => Ok(None),
        }
    }
}

struct ZenohReceivedSample {
    sample: zenoh::buffers::ZBuf,
}

impl ReceivedSample for ZenohReceivedSample {
    fn get(&self) -> Cow<[u8]> {
        self.sample.contiguous()
    }
}