zenoh_link/
lib.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
20use std::collections::HashMap;
21
22use zenoh_config::Config;
23pub use zenoh_link_commons::*;
24#[cfg(feature = "transport_quic")]
25pub use zenoh_link_quic as quic;
26#[cfg(feature = "transport_quic")]
27use zenoh_link_quic::{
28    LinkManagerUnicastQuic, QuicConfigurator, QuicLocatorInspector, QUIC_LOCATOR_PREFIX,
29};
30#[cfg(feature = "transport_quic_datagram")]
31pub use zenoh_link_quic_datagram as quic_datagram;
32#[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))]
33use zenoh_link_quic_datagram::QUIC_DATAGRAM_LOCATOR_PREFIX;
34#[cfg(feature = "transport_quic_datagram")]
35use zenoh_link_quic_datagram::{
36    LinkManagerUnicastQuicDatagram, QuicDatagramConfigurator, QuicDatagramLocatorInspector,
37};
38#[cfg(feature = "transport_serial")]
39pub use zenoh_link_serial as serial;
40#[cfg(feature = "transport_serial")]
41use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL_LOCATOR_PREFIX};
42#[cfg(feature = "transport_tcp")]
43pub use zenoh_link_tcp as tcp;
44#[cfg(feature = "transport_tcp")]
45use zenoh_link_tcp::{
46    LinkManagerUnicastTcp, TcpConfigurator, TcpLocatorInspector, TCP_LOCATOR_PREFIX,
47};
48#[cfg(feature = "transport_tls")]
49pub use zenoh_link_tls as tls;
50#[cfg(feature = "transport_tls")]
51use zenoh_link_tls::{
52    LinkManagerUnicastTls, TlsConfigurator, TlsLocatorInspector, TLS_LOCATOR_PREFIX,
53};
54#[cfg(feature = "transport_udp")]
55pub use zenoh_link_udp as udp;
56#[cfg(feature = "transport_udp")]
57use zenoh_link_udp::{
58    LinkManagerMulticastUdp, LinkManagerUnicastUdp, UdpLocatorInspector, UDP_LOCATOR_PREFIX,
59};
60#[cfg(feature = "transport_unixpipe")]
61pub use zenoh_link_unixpipe as unixpipe;
62#[cfg(feature = "transport_unixpipe")]
63use zenoh_link_unixpipe::{
64    LinkManagerUnicastPipe, UnixPipeConfigurator, UnixPipeLocatorInspector, UNIXPIPE_LOCATOR_PREFIX,
65};
66#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
67pub use zenoh_link_unixsock_stream as unixsock_stream;
68#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
69use zenoh_link_unixsock_stream::{
70    LinkManagerUnicastUnixSocketStream, UnixSockStreamLocatorInspector,
71    UNIXSOCKSTREAM_LOCATOR_PREFIX,
72};
73#[cfg(all(feature = "transport_vsock", target_os = "linux"))]
74pub use zenoh_link_vsock as vsock;
75#[cfg(all(feature = "transport_vsock", target_os = "linux"))]
76use zenoh_link_vsock::{LinkManagerUnicastVsock, VsockLocatorInspector, VSOCK_LOCATOR_PREFIX};
77#[cfg(feature = "transport_ws")]
78pub use zenoh_link_ws as ws;
79#[cfg(feature = "transport_ws")]
80use zenoh_link_ws::{LinkManagerUnicastWs, WsLocatorInspector, WS_LOCATOR_PREFIX};
81pub use zenoh_protocol::core::{EndPoint, Locator};
82use zenoh_result::{bail, ZResult};
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
85pub enum LinkKind {
86    Quic,
87    QuicDatagram,
88    Serial,
89    Tcp,
90    Tls,
91    Udp,
92    Unixpipe,
93    UnixsockStream,
94    Vscock,
95    Ws,
96}
97
98impl LinkKind {
99    pub fn new_supported_links<'p>(protocols: impl Iterator<Item = &'p str>) -> Vec<LinkKind> {
100        #[allow(unused_mut)]
101        let mut supported_links = Vec::new();
102        for p in protocols {
103            match p {
104                #[cfg(feature = "transport_tcp")]
105                TCP_LOCATOR_PREFIX => supported_links.push(LinkKind::Tcp),
106                #[cfg(feature = "transport_udp")]
107                UDP_LOCATOR_PREFIX => supported_links.push(LinkKind::Udp),
108                #[cfg(feature = "transport_tls")]
109                TLS_LOCATOR_PREFIX => supported_links.push(LinkKind::Tls),
110                #[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))]
111                QUIC_DATAGRAM_LOCATOR_PREFIX => supported_links.push(LinkKind::QuicDatagram),
112                #[cfg(all(feature = "transport_quic", not(feature = "transport_quic_datagram")))]
113                QUIC_LOCATOR_PREFIX => supported_links.push(LinkKind::QuicDatagram),
114                #[cfg(all(feature = "transport_quic", feature = "transport_quic_datagram"))]
115                QUIC_LOCATOR_PREFIX => {
116                    supported_links.push(LinkKind::Quic);
117                    supported_links.push(LinkKind::QuicDatagram);
118                }
119                #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
120                UNIXSOCKSTREAM_LOCATOR_PREFIX => supported_links.push(LinkKind::UnixsockStream),
121                #[cfg(feature = "transport_ws")]
122                WS_LOCATOR_PREFIX => supported_links.push(LinkKind::Ws),
123                #[cfg(feature = "transport_serial")]
124                SERIAL_LOCATOR_PREFIX => supported_links.push(LinkKind::Serial),
125                #[cfg(feature = "transport_unixpipe")]
126                UNIXPIPE_LOCATOR_PREFIX => supported_links.push(LinkKind::Unixpipe),
127                #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
128                VSOCK_LOCATOR_PREFIX => supported_links.push(LinkKind::Vscock),
129                _ => {}
130            }
131        }
132        supported_links
133    }
134}
135
136impl TryFrom<&Locator> for LinkKind {
137    type Error = zenoh_result::Error;
138
139    fn try_from(locator: &Locator) -> Result<Self, Self::Error> {
140        #[allow(unused_imports)]
141        use zenoh_link_commons::LocatorInspector;
142        match locator.protocol().as_str() {
143            #[cfg(feature = "transport_tcp")]
144            TCP_LOCATOR_PREFIX => Ok(LinkKind::Tcp),
145            #[cfg(feature = "transport_udp")]
146            UDP_LOCATOR_PREFIX => Ok(LinkKind::Udp),
147            #[cfg(feature = "transport_tls")]
148            TLS_LOCATOR_PREFIX => Ok(LinkKind::Tls),
149            #[cfg(all(feature = "transport_quic_datagram", not(feature = "transport_quic")))]
150            QUIC_DATAGRAM_LOCATOR_PREFIX => {
151                if !QuicDatagramLocatorInspector.is_reliable(locator)? {
152                    Ok(LinkKind::QuicDatagram)
153                } else {
154                    Err(zenoh_result::zerror!("Attempted to use a reliable QUIC link without enabling the transport_quic feature").into())
155                }
156            }
157            #[cfg(all(feature = "transport_quic", not(feature = "transport_quic_datagram")))]
158            QUIC_LOCATOR_PREFIX => {
159                if QuicLocatorInspector.is_reliable(locator)? {
160                    Ok(LinkKind::Quic)
161                } else {
162                    Err(zenoh_result::zerror!("Cannot use unreliable QUIC without enabling the transport_quic_datagram feature").into())
163                }
164            }
165            #[cfg(all(feature = "transport_quic", feature = "transport_quic_datagram"))]
166            QUIC_LOCATOR_PREFIX => {
167                if QuicLocatorInspector.is_reliable(locator)? {
168                    Ok(LinkKind::Quic)
169                } else {
170                    Ok(LinkKind::QuicDatagram)
171                }
172            }
173            #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
174            UNIXSOCKSTREAM_LOCATOR_PREFIX => Ok(LinkKind::UnixsockStream),
175            #[cfg(feature = "transport_ws")]
176            WS_LOCATOR_PREFIX => Ok(LinkKind::Ws),
177            #[cfg(feature = "transport_serial")]
178            SERIAL_LOCATOR_PREFIX => Ok(LinkKind::Serial),
179            #[cfg(feature = "transport_unixpipe")]
180            UNIXPIPE_LOCATOR_PREFIX => Ok(LinkKind::Unixpipe),
181            #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
182            VSOCK_LOCATOR_PREFIX => Ok(LinkKind::Vscock),
183            _ => bail!(
184                "Unicast not supported for {} protocol",
185                locator.protocol().as_str()
186            ),
187        }
188    }
189}
190
191impl TryFrom<&EndPoint> for LinkKind {
192    type Error = zenoh_result::Error;
193
194    fn try_from(endpoint: &EndPoint) -> Result<Self, Self::Error> {
195        LinkKind::try_from(&endpoint.to_locator())
196    }
197}
198
199pub const ALL_SUPPORTED_LINKS: &[LinkKind] = &[
200    #[cfg(feature = "transport_quic")]
201    LinkKind::Quic,
202    #[cfg(feature = "transport_quic_datagram")]
203    LinkKind::QuicDatagram,
204    #[cfg(feature = "transport_tcp")]
205    LinkKind::Tcp,
206    #[cfg(feature = "transport_tls")]
207    LinkKind::Tls,
208    #[cfg(feature = "transport_udp")]
209    LinkKind::Udp,
210    #[cfg(feature = "transport_ws")]
211    LinkKind::Ws,
212    #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
213    LinkKind::UnixsockStream,
214    #[cfg(feature = "transport_serial")]
215    LinkKind::Serial,
216    #[cfg(feature = "transport_unixpipe")]
217    LinkKind::Unixpipe,
218    #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
219    LinkKind::Vscock,
220];
221
222#[derive(Default, Clone)]
223pub struct LocatorInspector {
224    #[cfg(feature = "transport_quic")]
225    quic_inspector: QuicLocatorInspector,
226    #[cfg(feature = "transport_quic_datagram")]
227    quic_datagram_inspector: QuicDatagramLocatorInspector,
228    #[cfg(feature = "transport_tcp")]
229    tcp_inspector: TcpLocatorInspector,
230    #[cfg(feature = "transport_tls")]
231    tls_inspector: TlsLocatorInspector,
232    #[cfg(feature = "transport_udp")]
233    udp_inspector: UdpLocatorInspector,
234    #[cfg(feature = "transport_ws")]
235    ws_inspector: WsLocatorInspector,
236    #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
237    unixsock_stream_inspector: UnixSockStreamLocatorInspector,
238    #[cfg(feature = "transport_serial")]
239    serial_inspector: SerialLocatorInspector,
240    #[cfg(feature = "transport_unixpipe")]
241    unixpipe_inspector: UnixPipeLocatorInspector,
242    #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
243    vsock_inspector: VsockLocatorInspector,
244}
245impl LocatorInspector {
246    pub fn is_reliable(&self, locator: &Locator) -> ZResult<bool> {
247        #[allow(unused_imports)]
248        use zenoh_link_commons::LocatorInspector;
249        match LinkKind::try_from(locator)? {
250            #[cfg(feature = "transport_tcp")]
251            LinkKind::Tcp => self.tcp_inspector.is_reliable(locator),
252            #[cfg(feature = "transport_udp")]
253            LinkKind::Udp => self.udp_inspector.is_reliable(locator),
254            #[cfg(feature = "transport_tls")]
255            LinkKind::Tls => self.tls_inspector.is_reliable(locator),
256            #[cfg(feature = "transport_quic")]
257            LinkKind::Quic => self.quic_inspector.is_reliable(locator),
258            #[cfg(feature = "transport_quic_datagram")]
259            LinkKind::QuicDatagram => self.quic_datagram_inspector.is_reliable(locator),
260            #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
261            LinkKind::UnixsockStream => self.unixsock_stream_inspector.is_reliable(locator),
262            #[cfg(feature = "transport_ws")]
263            LinkKind::Ws => self.ws_inspector.is_reliable(locator),
264            #[cfg(feature = "transport_serial")]
265            LinkKind::Serial => self.serial_inspector.is_reliable(locator),
266            #[cfg(feature = "transport_unixpipe")]
267            LinkKind::Unixpipe => self.unixpipe_inspector.is_reliable(locator),
268            #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
269            LinkKind::Vscock => self.vsock_inspector.is_reliable(locator),
270            #[allow(unreachable_patterns)]
271            _ => unreachable!(),
272        }
273    }
274
275    pub async fn is_multicast(&self, locator: &Locator) -> ZResult<bool> {
276        #[allow(unused_imports)]
277        use zenoh_link_commons::LocatorInspector;
278        match LinkKind::try_from(locator)? {
279            #[cfg(feature = "transport_tcp")]
280            LinkKind::Tcp => self.tcp_inspector.is_multicast(locator).await,
281            #[cfg(feature = "transport_udp")]
282            LinkKind::Udp => self.udp_inspector.is_multicast(locator).await,
283            #[cfg(feature = "transport_tls")]
284            LinkKind::Tls => self.tls_inspector.is_multicast(locator).await,
285            #[cfg(feature = "transport_quic")]
286            LinkKind::Quic => self.quic_inspector.is_multicast(locator).await,
287            #[cfg(feature = "transport_quic_datagram")]
288            LinkKind::QuicDatagram => self.quic_datagram_inspector.is_multicast(locator).await,
289            #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
290            LinkKind::UnixsockStream => self.unixsock_stream_inspector.is_multicast(locator).await,
291            #[cfg(feature = "transport_ws")]
292            LinkKind::Ws => self.ws_inspector.is_multicast(locator).await,
293            #[cfg(feature = "transport_serial")]
294            LinkKind::Serial => self.serial_inspector.is_multicast(locator).await,
295            #[cfg(feature = "transport_unixpipe")]
296            LinkKind::Unixpipe => self.unixpipe_inspector.is_multicast(locator).await,
297            #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
298            LinkKind::Vscock => self.vsock_inspector.is_multicast(locator).await,
299            #[allow(unreachable_patterns)]
300            _ => unreachable!(),
301        }
302    }
303}
304#[derive(Default)]
305pub struct LinkConfigurator {
306    #[cfg(feature = "transport_tcp")]
307    tcp_inspector: TcpConfigurator,
308    #[cfg(feature = "transport_quic_datagram")]
309    quic_datagram_inspector: QuicDatagramConfigurator,
310    #[cfg(feature = "transport_quic")]
311    quic_inspector: QuicConfigurator,
312    #[cfg(feature = "transport_tls")]
313    tls_inspector: TlsConfigurator,
314    #[cfg(feature = "transport_unixpipe")]
315    unixpipe_inspector: UnixPipeConfigurator,
316}
317
318impl LinkConfigurator {
319    #[allow(unused_variables, unused_mut)]
320    pub fn configurations(
321        &self,
322        config: &Config,
323    ) -> (
324        HashMap<LinkKind, String>,
325        HashMap<LinkKind, zenoh_result::Error>,
326    ) {
327        let mut configs = HashMap::<LinkKind, String>::new();
328        let mut errors = HashMap::<LinkKind, zenoh_result::Error>::new();
329        let mut insert_config = |kind: LinkKind, cfg: ZResult<String>| match cfg {
330            Ok(v) => {
331                configs.insert(kind, v);
332            }
333            Err(e) => {
334                errors.insert(kind, e);
335            }
336        };
337        #[cfg(feature = "transport_tcp")]
338        {
339            insert_config(LinkKind::Tcp, self.tcp_inspector.inspect_config(config));
340        }
341        #[cfg(feature = "transport_quic_datagram")]
342        {
343            insert_config(
344                LinkKind::QuicDatagram,
345                self.quic_datagram_inspector.inspect_config(config),
346            );
347        }
348        #[cfg(feature = "transport_quic")]
349        {
350            insert_config(LinkKind::Quic, self.quic_inspector.inspect_config(config));
351        }
352        #[cfg(feature = "transport_tls")]
353        {
354            insert_config(LinkKind::Tls, self.tls_inspector.inspect_config(config));
355        }
356        #[cfg(feature = "transport_unixpipe")]
357        {
358            insert_config(
359                LinkKind::Unixpipe,
360                self.unixpipe_inspector.inspect_config(config),
361            );
362        }
363        (configs, errors)
364    }
365}
366
367/*************************************/
368/*             UNICAST               */
369/*************************************/
370
371pub struct LinkManagerBuilderUnicast;
372
373impl LinkManagerBuilderUnicast {
374    pub fn make(
375        _manager: NewLinkChannelSender,
376        endpoint: &EndPoint,
377    ) -> ZResult<LinkManagerUnicast> {
378        #[allow(unused_imports)]
379        use zenoh_link_commons::LocatorInspector;
380        match LinkKind::try_from(endpoint)? {
381            #[cfg(feature = "transport_tcp")]
382            LinkKind::Tcp => Ok(std::sync::Arc::new(LinkManagerUnicastTcp::new(_manager))),
383            #[cfg(feature = "transport_udp")]
384            LinkKind::Udp => Ok(std::sync::Arc::new(LinkManagerUnicastUdp::new(_manager))),
385            #[cfg(feature = "transport_tls")]
386            LinkKind::Tls => Ok(std::sync::Arc::new(LinkManagerUnicastTls::new(_manager))),
387            #[cfg(feature = "transport_quic_datagram")]
388            LinkKind::QuicDatagram => Ok(std::sync::Arc::new(LinkManagerUnicastQuicDatagram::new(
389                _manager,
390            ))),
391            #[cfg(feature = "transport_quic")]
392            LinkKind::Quic => Ok(std::sync::Arc::new(LinkManagerUnicastQuic::new(_manager))),
393            #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
394            LinkKind::UnixsockStream => Ok(std::sync::Arc::new(
395                LinkManagerUnicastUnixSocketStream::new(_manager),
396            )),
397            #[cfg(feature = "transport_ws")]
398            LinkKind::Ws => Ok(std::sync::Arc::new(LinkManagerUnicastWs::new(_manager))),
399            #[cfg(feature = "transport_serial")]
400            LinkKind::Serial => Ok(std::sync::Arc::new(LinkManagerUnicastSerial::new(_manager))),
401            #[cfg(feature = "transport_unixpipe")]
402            LinkKind::Unixpipe => Ok(std::sync::Arc::new(LinkManagerUnicastPipe::new(_manager))),
403            #[cfg(all(feature = "transport_vsock", target_os = "linux"))]
404            LinkKind::Vscock => Ok(std::sync::Arc::new(LinkManagerUnicastVsock::new(_manager))),
405            #[allow(unreachable_patterns)]
406            _ => unreachable!(),
407        }
408    }
409}
410
411/*************************************/
412/*            MULTICAST              */
413/*************************************/
414
415pub struct LinkManagerBuilderMulticast;
416
417impl LinkManagerBuilderMulticast {
418    pub fn make(link_kind: LinkKind) -> ZResult<LinkManagerMulticast> {
419        match link_kind {
420            #[cfg(feature = "transport_udp")]
421            LinkKind::Udp => Ok(std::sync::Arc::new(LinkManagerMulticastUdp)),
422            _ => bail!("Multicast not supported for link {link_kind:?}"),
423        }
424    }
425}