Skip to main content

zenoh_link_commons/
unicast.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
15use core::{
16    fmt,
17    hash::{Hash, Hasher},
18    ops::Deref,
19};
20use std::net::SocketAddr;
21
22use async_trait::async_trait;
23use serde::Serialize;
24use zenoh_protocol::{
25    core::{EndPoint, Locator, Priority},
26    transport::BatchSize,
27};
28use zenoh_result::ZResult;
29
30pub type LinkManagerUnicast = Arc<dyn LinkManagerUnicastTrait>;
31#[async_trait]
32pub trait LinkManagerUnicastTrait: Send + Sync {
33    async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast>;
34    async fn new_listener(&self, endpoint: EndPoint) -> ZResult<Locator>;
35    async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()>;
36    async fn get_listeners(&self) -> Vec<EndPoint>;
37    async fn get_locators(&self) -> Vec<Locator>;
38}
39pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
40
41pub trait ConstructibleLinkManagerUnicast<T>: Sized {
42    fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
43}
44
45#[derive(Clone)]
46pub struct LinkUnicast(pub NewLink);
47
48#[derive(Clone)]
49pub enum NewLink {
50    Single(Arc<dyn LinkUnicastTrait>),
51    MixedReliability {
52        reliable: Arc<dyn LinkUnicastTrait>,
53        best_effort: Arc<dyn LinkUnicastTrait>,
54    },
55}
56
57#[async_trait]
58pub trait LinkUnicastTrait: Send + Sync {
59    fn get_mtu(&self) -> BatchSize;
60    fn get_src(&self) -> &Locator;
61    fn get_dst(&self) -> &Locator;
62    fn is_reliable(&self) -> bool;
63    fn is_streamed(&self) -> bool;
64    fn get_interface_names(&self) -> Vec<String>;
65    fn get_auth_id(&self) -> &LinkAuthId;
66    fn supports_priorities(&self) -> bool {
67        false
68    }
69    async fn write(&self, buffer: &[u8], priority: Option<Priority>) -> ZResult<usize>;
70    async fn write_all(&self, buffer: &[u8], priority: Option<Priority>) -> ZResult<()>;
71    async fn read(&self, buffer: &mut [u8], priority: Option<Priority>) -> ZResult<usize>;
72    async fn read_exact(&self, buffer: &mut [u8], priority: Option<Priority>) -> ZResult<()>;
73    async fn close(&self) -> ZResult<()>;
74}
75
76impl Deref for LinkUnicast {
77    type Target = Arc<dyn LinkUnicastTrait>;
78
79    #[inline]
80    fn deref(&self) -> &Self::Target {
81        match &self.0 {
82            NewLink::Single(link) => link,
83            NewLink::MixedReliability { reliable, .. } => reliable,
84        }
85    }
86}
87
88impl Eq for LinkUnicast {}
89
90impl PartialEq for LinkUnicast {
91    fn eq(&self, other: &Self) -> bool {
92        self.get_src() == other.get_src() && self.get_dst() == other.get_dst()
93    }
94}
95
96impl Hash for LinkUnicast {
97    fn hash<H: Hasher>(&self, state: &mut H) {
98        self.get_src().hash(state);
99        self.get_dst().hash(state);
100    }
101}
102
103impl fmt::Display for LinkUnicast {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        write!(f, "{} => {}", self.get_src(), self.get_dst())
106    }
107}
108
109impl fmt::Debug for LinkUnicast {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_struct("Link")
112            .field("src", &self.get_src())
113            .field("dst", &self.get_dst())
114            .field("mtu", &self.get_mtu())
115            .field("is_reliable", &self.is_reliable())
116            .field("is_streamed", &self.is_streamed())
117            .finish()
118    }
119}
120
121impl From<Arc<dyn LinkUnicastTrait>> for LinkUnicast {
122    fn from(link: Arc<dyn LinkUnicastTrait>) -> LinkUnicast {
123        LinkUnicast(NewLink::Single(link))
124    }
125}
126
127pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec<String> {
128    match zenoh_util::net::get_interface_names_by_addr(addr.ip()) {
129        Ok(interfaces) => {
130            tracing::trace!("get_interface_names for {:?}: {:?}", addr.ip(), interfaces);
131            interfaces
132        }
133        Err(e) => {
134            tracing::debug!("get_interface_names for {:?} failed: {:?}", addr.ip(), e);
135            vec![]
136        }
137    }
138}
139
140#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
141pub enum LinkAuthId {
142    Tls(Option<String>),
143    Quic(Option<String>),
144    Tcp,
145    Udp,
146    Serial,
147    Unixpipe,
148    UnixsockStream,
149    Vsock,
150    Ws,
151}
152
153impl LinkAuthId {
154    pub fn get_cert_common_name(&self) -> Option<&str> {
155        match &self {
156            LinkAuthId::Tls(n) => n.as_ref().map(|s| s.as_ref()),
157            LinkAuthId::Quic(n) => n.as_ref().map(|s| s.as_ref()),
158            LinkAuthId::Tcp => None,
159            LinkAuthId::Udp => None,
160            LinkAuthId::Serial => None,
161            LinkAuthId::Unixpipe => None,
162            LinkAuthId::UnixsockStream => None,
163            LinkAuthId::Vsock => None,
164            LinkAuthId::Ws => None,
165        }
166    }
167}