zenoh_protocol/transport/
init.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! # Init message
16//!
17//! The INIT message is sent on a specific Locator to initiate a transport with the zenoh node
18//! associated with that Locator. The initiator MUST send an INIT message with the A flag set to 0.
19//! If the corresponding zenohd node deems appropriate to accept the INIT message, the corresponding
20//! peer MUST reply with an INIT message with the A flag set to 1. Alternatively, it MAY reply with
21//! a [`super::Close`] message. For convenience, we call [`InitSyn`] and [`InitAck`] an INIT message
22//! when the A flag is set to 0 and 1, respectively.
23//!
24//! The [`InitSyn`]/[`InitAck`] message flow is the following:
25//!
26//! ```text
27//!     A                   B
28//!     |      INIT SYN     |
29//!     |------------------>|
30//!     |                   |
31//!     |      INIT ACK     |
32//!     |<------------------|
33//!     |                   |
34//! ```
35//!
36//! The INIT message structure is defined as follows:
37//!
38//! ```text
39//! Flags:
40//! - A: Ack            If A==0 then the message is an InitSyn else it is an InitAck
41//! - S: Size params    If S==1 then size parameters are exchanged
42//! - Z: Extensions     If Z==1 then zenoh extensions will follow.
43//!
44//!  7 6 5 4 3 2 1 0
45//! +-+-+-+-+-+-+-+-+
46//! |Z|S|A|   INIT  |
47//! +-+-+-+---------+
48//! |    version    |
49//! +---------------+
50//! |zid_len|x|x|wai| (#)(*)
51//! +-------+-+-+---+
52//! ~      [u8]     ~ -- ZenohID of the sender of the INIT message
53//! +---------------+
54//! |x|x|x|x|rid|fsn| \                -- SN/ID resolution (+)
55//! +---------------+  | if Flag(S)==1
56//! |      u16      |  |               -- Batch Size ($)
57//! |               | /
58//! +---------------+
59//! ~    <u8;z16>   ~ -- if Flag(A)==1 -- Cookie
60//! +---------------+
61//! ~   [InitExts]  ~ -- if Flag(Z)==1
62//! +---------------+
63//!
64//! If A==1 and S==0 then size parameters are (ie. S flag) are accepted.
65//!
66//! (*) WhatAmI. It indicates the role of the zenoh node sending the INIT message.
67//!    The valid WhatAmI values are:
68//!    - 0b00: Router
69//!    - 0b01: Peer
70//!    - 0b10: Client
71//!    - 0b11: Reserved
72//!
73//! (#) ZID length. It indicates how many bytes are used for the ZenohID bytes.
74//!     A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual length is computed as:
75//!         real_zid_len := 1 + zid_len
76//!
77//! (+) Sequence Number/ID resolution. It indicates the resolution and consequently the wire overhead
78//!     of various SN and ID in Zenoh.
79//!     - fsn: frame/fragment sequence number resolution. Used in Frame/Fragment messages.
80//!     - rid: request ID resolution. Used in Request/Response messages.
81//!     The valid SN/ID resolution values are:
82//!     - 0b00: 8 bits
83//!     - 0b01: 16 bits
84//!     - 0b10: 32 bits
85//!     - 0b11: 64 bits
86//!
87//! ($) Batch Size. It indicates the maximum size of a batch the sender of the INIT message is willing
88//!     to accept when reading from the network. Default on unicast: 65535.
89//!
90//! NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
91//!       in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
92//!       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
93//!       the boundary of the serialized messages. The length is encoded as little-endian.
94//!       In any case, the length of a message must not exceed 65535 bytes.
95//! ```
96
97use zenoh_buffers::ZSlice;
98
99use crate::{
100    core::{Resolution, WhatAmI, ZenohIdProto},
101    transport::BatchSize,
102};
103
104pub mod flag {
105    pub const A: u8 = 1 << 5; // 0x20 Ack           if A==0 then the message is an InitSyn else it is an InitAck
106    pub const S: u8 = 1 << 6; // 0x40 Size params   if S==1 then size parameters are exchanged
107    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
108}
109
110/// # InitSyn message
111///
112/// ```text
113///  7 6 5 4 3 2 1 0
114/// +-+-+-+-+-+-+-+-+
115/// +---------------+
116///
117/// ZExtUnit
118/// ```
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct InitSyn {
121    pub version: u8,
122    pub whatami: WhatAmI,
123    pub zid: ZenohIdProto,
124    pub resolution: Resolution,
125    pub batch_size: BatchSize,
126    pub ext_qos: Option<ext::QoS>,
127    pub ext_qos_link: Option<ext::QoSLink>,
128    #[cfg(feature = "shared-memory")]
129    pub ext_shm: Option<ext::Shm>,
130    pub ext_auth: Option<ext::Auth>,
131    pub ext_mlink: Option<ext::MultiLink>,
132    pub ext_lowlatency: Option<ext::LowLatency>,
133    pub ext_compression: Option<ext::Compression>,
134    pub ext_patch: ext::PatchType,
135}
136
137// Extensions
138pub mod ext {
139    use crate::{
140        common::{ZExtUnit, ZExtZ64, ZExtZBuf},
141        zextunit, zextz64, zextzbuf,
142    };
143
144    /// # QoS extension
145    /// Used to negotiate the use of QoS
146    pub type QoS = zextunit!(0x1, false);
147    pub type QoSLink = zextz64!(0x1, false);
148
149    /// # Shm extension
150    /// Used as challenge for probing shared memory capabilities
151    #[cfg(feature = "shared-memory")]
152    pub type Shm = zextzbuf!(0x2, false);
153
154    /// # Auth extension
155    /// Used as challenge for probing authentication rights
156    pub type Auth = zextzbuf!(0x3, false);
157
158    /// # Multilink extension
159    /// Used as challenge for probing multilink capabilities
160    pub type MultiLink = zextzbuf!(0x4, false);
161
162    /// # LowLatency extension
163    /// Used to negotiate the use of lowlatency transport
164    pub type LowLatency = zextunit!(0x5, false);
165
166    /// # Compression extension
167    /// Used to negotiate the use of compression on the link
168    pub type Compression = zextunit!(0x6, false);
169
170    /// # Patch extension
171    /// Used to negotiate the patch version of the protocol
172    /// if not present (or 0), then protocol as released with 1.0.0
173    /// if >= 1, then fragmentation first/drop markers
174    pub type Patch = zextz64!(0x7, false);
175    pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>;
176}
177
178impl InitSyn {
179    #[cfg(feature = "test")]
180    #[doc(hidden)]
181    pub fn rand() -> Self {
182        use rand::Rng;
183
184        use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};
185
186        let mut rng = rand::thread_rng();
187
188        let version: u8 = rng.gen();
189        let whatami = WhatAmI::rand();
190        let zid = ZenohIdProto::default();
191        let resolution = Resolution::rand();
192        let batch_size: BatchSize = rng.gen();
193        let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
194        let ext_qos_link = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
195        #[cfg(feature = "shared-memory")]
196        let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
197        let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
198        let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
199        let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
200        let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
201        let ext_patch = ext::PatchType::rand();
202
203        Self {
204            version,
205            whatami,
206            zid,
207            resolution,
208            batch_size,
209            ext_qos,
210            ext_qos_link,
211            #[cfg(feature = "shared-memory")]
212            ext_shm,
213            ext_auth,
214            ext_mlink,
215            ext_lowlatency,
216            ext_compression,
217            ext_patch,
218        }
219    }
220}
221
222/// # InitAck message
223///
224/// ```text
225///  7 6 5 4 3 2 1 0
226/// +-+-+-+-+-+-+-+-+
227/// ~     nonce     ~
228/// +---------------+
229///
230/// ZExtZ64
231/// ```
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct InitAck {
234    pub version: u8,
235    pub whatami: WhatAmI,
236    pub zid: ZenohIdProto,
237    pub resolution: Resolution,
238    pub batch_size: BatchSize,
239    pub cookie: ZSlice,
240    pub ext_qos: Option<ext::QoS>,
241    pub ext_qos_link: Option<ext::QoSLink>,
242    #[cfg(feature = "shared-memory")]
243    pub ext_shm: Option<ext::Shm>,
244    pub ext_auth: Option<ext::Auth>,
245    pub ext_mlink: Option<ext::MultiLink>,
246    pub ext_lowlatency: Option<ext::LowLatency>,
247    pub ext_compression: Option<ext::Compression>,
248    pub ext_patch: ext::PatchType,
249}
250
251impl InitAck {
252    #[cfg(feature = "test")]
253    #[doc(hidden)]
254    pub fn rand() -> Self {
255        use rand::Rng;
256
257        use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};
258
259        let mut rng = rand::thread_rng();
260
261        let version: u8 = rng.gen();
262        let whatami = WhatAmI::rand();
263        let zid = ZenohIdProto::default();
264        let resolution = if rng.gen_bool(0.5) {
265            Resolution::default()
266        } else {
267            Resolution::rand()
268        };
269        let batch_size: BatchSize = rng.gen();
270        let cookie = ZSlice::rand(64);
271        let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
272        let ext_qos_link = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
273        #[cfg(feature = "shared-memory")]
274        let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
275        let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
276        let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
277        let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
278        let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
279        let ext_patch = ext::PatchType::rand();
280
281        Self {
282            version,
283            whatami,
284            zid,
285            resolution,
286            batch_size,
287            cookie,
288            ext_qos,
289            ext_qos_link,
290            #[cfg(feature = "shared-memory")]
291            ext_shm,
292            ext_auth,
293            ext_mlink,
294            ext_lowlatency,
295            ext_compression,
296            ext_patch,
297        }
298    }
299}