zenoh_protocol/transport/
open.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! # Open message
16//!
17//! After having successfully complete the [`super::InitSyn`]-[`super::InitAck`] message exchange,
18//! the OPEN message is sent on a link to finalize the initialization of the link and
19//! associated transport with a zenoh node.
20//! For convenience, we call [`OpenSyn`] and [`OpenAck`] an OPEN message with the A flag
21//! is set to 0 and 1, respectively.
22//!
23//! The [`OpenSyn`]/[`OpenAck`] message flow is the following:
24//!
25//! ```text
26//!     A                   B
27//!     |      OPEN SYN     |
28//!     |------------------>|
29//!     |                   |
30//!     |      OPEN ACK     |
31//!     |<------------------|
32//!     |                   |
33//! ```
34//!
35//! ```text
36//! Flags:
37//! - A: Ack            If A==0 then the message is an OpenSyn else it is an OpenAck
38//! - T: Lease period   if T==1 then the lease period is in seconds else in milliseconds
39//! - Z: Extensions     If Z==1 then zenoh extensions will follow.
40//!
41//!  7 6 5 4 3 2 1 0
42//! +-+-+-+-+-+-+-+-+
43//! |Z|T|A|   OPEN  |
44//! +-+-+-+---------+
45//! %     lease     % -- Lease period of the sender of the OPEN message
46//! +---------------+
47//! %  initial_sn   % -- Initial SN proposed by the sender of the OPEN(*)
48//! +---------------+
49//! ~    <u8;z16>   ~ if Flag(A)==0 (**) -- Cookie
50//! +---------------+
51//! ~   [OpenExts]  ~ if Flag(Z)==1
52//! +---------------+
53//!
54//! (*)     The initial sequence number MUST be compatible with the sequence number resolution agreed in the
55//!         [`super::InitSyn`]-[`super::InitAck`] message exchange
56//! (**)    The cookie MUST be the same received in the [`super::InitAck`]from the corresponding zenoh node
57//! ```
58//!
59//! NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
60//!       in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
61//!       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
62//!       the boundary of the serialized messages. The length is encoded as little-endian.
63//!       In any case, the length of a message must not exceed 65535 bytes.
64
65use core::time::Duration;
66
67use zenoh_buffers::ZSlice;
68
69use crate::transport::TransportSn;
70
71pub mod flag {
72    pub const A: u8 = 1 << 5; // 0x20 Ack           if A==0 then the message is an InitSyn else it is an InitAck
73    pub const T: u8 = 1 << 6; // 0x40 Lease period  if T==1 then the lease period is in seconds else in milliseconds
74    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
75}
76
77/// # OpenSyn message
78///
79/// ```text
80///  7 6 5 4 3 2 1 0
81/// +-+-+-+-+-+-+-+-+
82/// ~   challenge   ~
83/// +---------------+
84/// ```
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct OpenSyn {
87    pub lease: Duration,
88    pub initial_sn: TransportSn,
89    pub cookie: ZSlice,
90    pub ext_qos: Option<ext::QoS>,
91    #[cfg(feature = "shared-memory")]
92    pub ext_shm: Option<ext::Shm>,
93    pub ext_auth: Option<ext::Auth>,
94    pub ext_mlink: Option<ext::MultiLinkSyn>,
95    pub ext_lowlatency: Option<ext::LowLatency>,
96    pub ext_compression: Option<ext::Compression>,
97}
98
99// Extensions
100pub mod ext {
101    #[cfg(feature = "shared-memory")]
102    use crate::common::ZExtZ64;
103    #[cfg(feature = "shared-memory")]
104    use crate::zextz64;
105    use crate::{
106        common::{ZExtUnit, ZExtZBuf},
107        zextunit, zextzbuf,
108    };
109
110    /// # QoS extension
111    /// Used to negotiate the use of QoS
112    pub type QoS = zextunit!(0x1, false);
113
114    /// # Shm extension
115    /// Used as challenge for probing shared memory capabilities
116    #[cfg(feature = "shared-memory")]
117    pub type Shm = zextz64!(0x2, false);
118
119    /// # Auth extension
120    /// Used as challenge for probing authentication rights
121    pub type Auth = zextzbuf!(0x3, false);
122
123    /// # Multilink extension
124    /// Used as challenge for probing multilink capabilities
125    pub type MultiLinkSyn = zextzbuf!(0x4, false);
126    pub type MultiLinkAck = zextunit!(0x4, false);
127
128    /// # LowLatency extension
129    /// Used to negotiate the use of lowlatency transport
130    pub type LowLatency = zextunit!(0x5, false);
131
132    /// # Compression extension
133    /// Used to negotiate the use of compression on the link
134    pub type Compression = zextunit!(0x6, false);
135}
136
137impl OpenSyn {
138    #[cfg(feature = "test")]
139    #[doc(hidden)]
140    pub fn rand() -> Self {
141        use rand::Rng;
142
143        #[cfg(feature = "shared-memory")]
144        use crate::common::ZExtZ64;
145        use crate::common::{ZExtUnit, ZExtZBuf};
146
147        const MIN: usize = 32;
148        const MAX: usize = 1_024;
149
150        let mut rng = rand::thread_rng();
151
152        let lease = if rng.gen_bool(0.5) {
153            Duration::from_secs(rng.gen())
154        } else {
155            Duration::from_millis(rng.gen())
156        };
157
158        let initial_sn: TransportSn = rng.gen();
159        let cookie = ZSlice::rand(rng.gen_range(MIN..=MAX));
160        let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
161        #[cfg(feature = "shared-memory")]
162        let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
163        let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
164        let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
165        let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
166        let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
167
168        Self {
169            lease,
170            initial_sn,
171            cookie,
172            ext_qos,
173            #[cfg(feature = "shared-memory")]
174            ext_shm,
175            ext_auth,
176            ext_mlink,
177            ext_lowlatency,
178            ext_compression,
179        }
180    }
181}
182
183/// # OpenAck message
184///
185/// ```text
186///  7 6 5 4 3 2 1 0
187/// +-+-+-+-+-+-+-+-+
188/// ~      ack      ~
189/// +---------------+
190/// ```
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct OpenAck {
193    pub lease: Duration,
194    pub initial_sn: TransportSn,
195    pub ext_qos: Option<ext::QoS>,
196    #[cfg(feature = "shared-memory")]
197    pub ext_shm: Option<ext::Shm>,
198    pub ext_auth: Option<ext::Auth>,
199    pub ext_mlink: Option<ext::MultiLinkAck>,
200    pub ext_lowlatency: Option<ext::LowLatency>,
201    pub ext_compression: Option<ext::Compression>,
202}
203
204impl OpenAck {
205    #[cfg(feature = "test")]
206    #[doc(hidden)]
207    pub fn rand() -> Self {
208        use rand::Rng;
209
210        #[cfg(feature = "shared-memory")]
211        use crate::common::ZExtZ64;
212        use crate::common::{ZExtUnit, ZExtZBuf};
213
214        let mut rng = rand::thread_rng();
215
216        let lease = if rng.gen_bool(0.5) {
217            Duration::from_secs(rng.gen())
218        } else {
219            Duration::from_millis(rng.gen())
220        };
221
222        let initial_sn: TransportSn = rng.gen();
223        let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
224        #[cfg(feature = "shared-memory")]
225        let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
226        let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
227        let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
228        let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
229        let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
230
231        Self {
232            lease,
233            initial_sn,
234            ext_qos,
235            #[cfg(feature = "shared-memory")]
236            ext_shm,
237            ext_auth,
238            ext_mlink,
239            ext_lowlatency,
240            ext_compression,
241        }
242    }
243}