zenoh_link_commons/
multicast.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::{borrow::Cow, boxed::Box, sync::Arc, vec::Vec};
15use core::{
16    fmt,
17    hash::{Hash, Hasher},
18    ops::Deref,
19};
20
21use async_trait::async_trait;
22use zenoh_buffers::{reader::HasReader, writer::HasWriter};
23use zenoh_codec::{RCodec, WCodec, Zenoh080};
24use zenoh_protocol::{
25    core::{EndPoint, Locator},
26    transport::{BatchSize, TransportMessage},
27};
28use zenoh_result::{zerror, ZResult};
29
30use crate::LinkAuthId;
31
32/*************************************/
33/*             MANAGER               */
34/*************************************/
35#[async_trait]
36pub trait LinkManagerMulticastTrait: Send + Sync {
37    async fn new_link(&self, endpoint: &EndPoint) -> ZResult<LinkMulticast>;
38}
39
40pub type LinkManagerMulticast = Arc<dyn LinkManagerMulticastTrait>;
41
42/*************************************/
43/*              LINK                 */
44/*************************************/
45#[derive(Clone)]
46pub struct LinkMulticast(pub Arc<dyn LinkMulticastTrait>);
47
48#[async_trait]
49pub trait LinkMulticastTrait: Send + Sync {
50    fn get_mtu(&self) -> BatchSize;
51    fn get_src(&self) -> &Locator;
52    fn get_dst(&self) -> &Locator;
53    fn get_auth_id(&self) -> &LinkAuthId;
54    fn is_reliable(&self) -> bool;
55    async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
56    async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
57    async fn read<'a>(&'a self, buffer: &mut [u8]) -> ZResult<(usize, Cow<'a, Locator>)>;
58    async fn close(&self) -> ZResult<()>;
59}
60
61impl LinkMulticast {
62    pub async fn send(&self, msg: &TransportMessage) -> ZResult<usize> {
63        // Create the buffer for serializing the message
64        let mut buff = Vec::new();
65
66        let codec = Zenoh080::new();
67        let mut writer = buff.writer();
68
69        codec
70            .write(&mut writer, msg)
71            .map_err(|_| zerror!("Encoding error on link: {}", self))?;
72
73        // Send the message on the link
74        self.0.write_all(buff.as_slice()).await?;
75
76        Ok(buff.len())
77    }
78
79    pub async fn recv(&self) -> ZResult<(TransportMessage, Locator)> {
80        // Read the message
81        let mut buffer = zenoh_buffers::vec::uninit(self.get_mtu() as usize);
82        let (n, locator) = self.read(&mut buffer).await?;
83        buffer.truncate(n);
84
85        let codec = Zenoh080::new();
86        let mut reader = buffer.reader();
87
88        let msg: TransportMessage = codec
89            .read(&mut reader)
90            .map_err(|_| zerror!("Invalid Message: Decoding error on link: {}", self))?;
91
92        Ok((msg, locator.into_owned()))
93    }
94}
95
96impl Deref for LinkMulticast {
97    type Target = Arc<dyn LinkMulticastTrait>;
98    #[inline]
99    fn deref(&self) -> &Self::Target {
100        &self.0
101    }
102}
103
104impl Eq for LinkMulticast {}
105
106impl PartialEq for LinkMulticast {
107    fn eq(&self, other: &Self) -> bool {
108        self.get_src() == other.get_src() && self.get_dst() == other.get_dst()
109    }
110}
111
112impl Hash for LinkMulticast {
113    fn hash<H: Hasher>(&self, state: &mut H) {
114        self.get_src().hash(state);
115        self.get_dst().hash(state);
116    }
117}
118
119impl fmt::Display for LinkMulticast {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        write!(f, "{} => {}", self.get_src(), self.get_dst())
122    }
123}
124
125impl fmt::Debug for LinkMulticast {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("Link")
128            .field("src", &self.get_src())
129            .field("dst", &self.get_dst())
130            .field("mtu", &self.get_mtu())
131            .field("is_reliable", &self.is_reliable())
132            .finish()
133    }
134}
135
136impl From<Arc<dyn LinkMulticastTrait>> for LinkMulticast {
137    fn from(link: Arc<dyn LinkMulticastTrait>) -> LinkMulticast {
138        LinkMulticast(link)
139    }
140}