zenoh_link_commons/
multicast.rs1use alloc::{borrow::Cow, boxed::Box, sync::Arc, vec::Vec};
15use core::{
16 fmt,
17 hash::{Hash, Hasher},
18 ops::Deref,
19};
20
21use async_trait::async_trait;
22use zenoh_buffers::{reader::HasReader, writer::HasWriter};
23use zenoh_codec::{RCodec, WCodec, Zenoh080};
24use zenoh_protocol::{
25 core::{EndPoint, Locator},
26 transport::{BatchSize, TransportMessage},
27};
28use zenoh_result::{zerror, ZResult};
29
30use crate::LinkAuthId;
31
32#[async_trait]
36pub trait LinkManagerMulticastTrait: Send + Sync {
37 async fn new_link(&self, endpoint: &EndPoint) -> ZResult<LinkMulticast>;
38}
39
40pub type LinkManagerMulticast = Arc<dyn LinkManagerMulticastTrait>;
41
42#[derive(Clone)]
46pub struct LinkMulticast(pub Arc<dyn LinkMulticastTrait>);
47
48#[async_trait]
49pub trait LinkMulticastTrait: Send + Sync {
50 fn get_mtu(&self) -> BatchSize;
51 fn get_src(&self) -> &Locator;
52 fn get_dst(&self) -> &Locator;
53 fn get_auth_id(&self) -> &LinkAuthId;
54 fn is_reliable(&self) -> bool;
55 async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
56 async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
57 async fn read<'a>(&'a self, buffer: &mut [u8]) -> ZResult<(usize, Cow<'a, Locator>)>;
58 async fn close(&self) -> ZResult<()>;
59}
60
61impl LinkMulticast {
62 pub async fn send(&self, msg: &TransportMessage) -> ZResult<usize> {
63 let mut buff = Vec::new();
65
66 let codec = Zenoh080::new();
67 let mut writer = buff.writer();
68
69 codec
70 .write(&mut writer, msg)
71 .map_err(|_| zerror!("Encoding error on link: {}", self))?;
72
73 self.0.write_all(buff.as_slice()).await?;
75
76 Ok(buff.len())
77 }
78
79 pub async fn recv(&self) -> ZResult<(TransportMessage, Locator)> {
80 let mut buffer = zenoh_buffers::vec::uninit(self.get_mtu() as usize);
82 let (n, locator) = self.read(&mut buffer).await?;
83 buffer.truncate(n);
84
85 let codec = Zenoh080::new();
86 let mut reader = buffer.reader();
87
88 let msg: TransportMessage = codec
89 .read(&mut reader)
90 .map_err(|_| zerror!("Invalid Message: Decoding error on link: {}", self))?;
91
92 Ok((msg, locator.into_owned()))
93 }
94}
95
96impl Deref for LinkMulticast {
97 type Target = Arc<dyn LinkMulticastTrait>;
98 #[inline]
99 fn deref(&self) -> &Self::Target {
100 &self.0
101 }
102}
103
104impl Eq for LinkMulticast {}
105
106impl PartialEq for LinkMulticast {
107 fn eq(&self, other: &Self) -> bool {
108 self.get_src() == other.get_src() && self.get_dst() == other.get_dst()
109 }
110}
111
112impl Hash for LinkMulticast {
113 fn hash<H: Hasher>(&self, state: &mut H) {
114 self.get_src().hash(state);
115 self.get_dst().hash(state);
116 }
117}
118
119impl fmt::Display for LinkMulticast {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(f, "{} => {}", self.get_src(), self.get_dst())
122 }
123}
124
125impl fmt::Debug for LinkMulticast {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_struct("Link")
128 .field("src", &self.get_src())
129 .field("dst", &self.get_dst())
130 .field("mtu", &self.get_mtu())
131 .field("is_reliable", &self.is_reliable())
132 .finish()
133 }
134}
135
136impl From<Arc<dyn LinkMulticastTrait>> for LinkMulticast {
137 fn from(link: Arc<dyn LinkMulticastTrait>) -> LinkMulticast {
138 LinkMulticast(link)
139 }
140}