zenoh_protocol/transport/open.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! # Open message
16//!
17//! After having successfully complete the [`super::InitSyn`]-[`super::InitAck`] message exchange,
18//! the OPEN message is sent on a link to finalize the initialization of the link and
19//! associated transport with a zenoh node.
20//! For convenience, we call [`OpenSyn`] and [`OpenAck`] an OPEN message with the A flag
21//! is set to 0 and 1, respectively.
22//!
23//! The [`OpenSyn`]/[`OpenAck`] message flow is the following:
24//!
25//! ```text
26//! A B
27//! | OPEN SYN |
28//! |------------------>|
29//! | |
30//! | OPEN ACK |
31//! |<------------------|
32//! | |
33//! ```
34//!
35//! ```text
36//! Flags:
37//! - A: Ack If A==0 then the message is an OpenSyn else it is an OpenAck
38//! - T: Lease period if T==1 then the lease period is in seconds else in milliseconds
39//! - Z: Extensions If Z==1 then zenoh extensions will follow.
40//!
41//! 7 6 5 4 3 2 1 0
42//! +-+-+-+-+-+-+-+-+
43//! |Z|T|A| OPEN |
44//! +-+-+-+---------+
45//! % lease % -- Lease period of the sender of the OPEN message
46//! +---------------+
47//! % initial_sn % -- Initial SN proposed by the sender of the OPEN(*)
48//! +---------------+
49//! ~ <u8;z16> ~ if Flag(A)==0 (**) -- Cookie
50//! +---------------+
51//! ~ [OpenExts] ~ if Flag(Z)==1
52//! +---------------+
53//!
54//! (*) The initial sequence number MUST be compatible with the sequence number resolution agreed in the
55//! [`super::InitSyn`]-[`super::InitAck`] message exchange
56//! (**) The cookie MUST be the same received in the [`super::InitAck`]from the corresponding zenoh node
57//! ```
58//!
59//! NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
60//! in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
61//! This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
62//! the boundary of the serialized messages. The length is encoded as little-endian.
63//! In any case, the length of a message must not exceed 65535 bytes.
64
65use core::time::Duration;
66
67use zenoh_buffers::ZSlice;
68
69use crate::transport::TransportSn;
70
71pub mod flag {
72 pub const A: u8 = 1 << 5; // 0x20 Ack if A==0 then the message is an InitSyn else it is an InitAck
73 pub const T: u8 = 1 << 6; // 0x40 Lease period if T==1 then the lease period is in seconds else in milliseconds
74 pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
75}
76
77/// # OpenSyn message
78///
79/// ```text
80/// 7 6 5 4 3 2 1 0
81/// +-+-+-+-+-+-+-+-+
82/// ~ challenge ~
83/// +---------------+
84/// ```
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct OpenSyn {
87 pub lease: Duration,
88 pub initial_sn: TransportSn,
89 pub cookie: ZSlice,
90 pub ext_qos: Option<ext::QoS>,
91 #[cfg(feature = "shared-memory")]
92 pub ext_shm: Option<ext::Shm>,
93 pub ext_auth: Option<ext::Auth>,
94 pub ext_mlink: Option<ext::MultiLinkSyn>,
95 pub ext_lowlatency: Option<ext::LowLatency>,
96 pub ext_compression: Option<ext::Compression>,
97}
98
99// Extensions
100pub mod ext {
101 #[cfg(feature = "shared-memory")]
102 use crate::common::ZExtZ64;
103 #[cfg(feature = "shared-memory")]
104 use crate::zextz64;
105 use crate::{
106 common::{ZExtUnit, ZExtZBuf},
107 zextunit, zextzbuf,
108 };
109
110 /// # QoS extension
111 /// Used to negotiate the use of QoS
112 pub type QoS = zextunit!(0x1, false);
113
114 /// # Shm extension
115 /// Used as challenge for probing shared memory capabilities
116 #[cfg(feature = "shared-memory")]
117 pub type Shm = zextz64!(0x2, false);
118
119 /// # Auth extension
120 /// Used as challenge for probing authentication rights
121 pub type Auth = zextzbuf!(0x3, false);
122
123 /// # Multilink extension
124 /// Used as challenge for probing multilink capabilities
125 pub type MultiLinkSyn = zextzbuf!(0x4, false);
126 pub type MultiLinkAck = zextunit!(0x4, false);
127
128 /// # LowLatency extension
129 /// Used to negotiate the use of lowlatency transport
130 pub type LowLatency = zextunit!(0x5, false);
131
132 /// # Compression extension
133 /// Used to negotiate the use of compression on the link
134 pub type Compression = zextunit!(0x6, false);
135}
136
137impl OpenSyn {
138 #[cfg(feature = "test")]
139 pub fn rand() -> Self {
140 use rand::Rng;
141
142 #[cfg(feature = "shared-memory")]
143 use crate::common::ZExtZ64;
144 use crate::common::{ZExtUnit, ZExtZBuf};
145
146 const MIN: usize = 32;
147 const MAX: usize = 1_024;
148
149 let mut rng = rand::thread_rng();
150
151 let lease = if rng.gen_bool(0.5) {
152 Duration::from_secs(rng.gen())
153 } else {
154 Duration::from_millis(rng.gen())
155 };
156
157 let initial_sn: TransportSn = rng.gen();
158 let cookie = ZSlice::rand(rng.gen_range(MIN..=MAX));
159 let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
160 #[cfg(feature = "shared-memory")]
161 let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
162 let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
163 let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
164 let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
165 let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
166
167 Self {
168 lease,
169 initial_sn,
170 cookie,
171 ext_qos,
172 #[cfg(feature = "shared-memory")]
173 ext_shm,
174 ext_auth,
175 ext_mlink,
176 ext_lowlatency,
177 ext_compression,
178 }
179 }
180}
181
182/// # OpenAck message
183///
184/// ```text
185/// 7 6 5 4 3 2 1 0
186/// +-+-+-+-+-+-+-+-+
187/// ~ ack ~
188/// +---------------+
189/// ```
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct OpenAck {
192 pub lease: Duration,
193 pub initial_sn: TransportSn,
194 pub ext_qos: Option<ext::QoS>,
195 #[cfg(feature = "shared-memory")]
196 pub ext_shm: Option<ext::Shm>,
197 pub ext_auth: Option<ext::Auth>,
198 pub ext_mlink: Option<ext::MultiLinkAck>,
199 pub ext_lowlatency: Option<ext::LowLatency>,
200 pub ext_compression: Option<ext::Compression>,
201}
202
203impl OpenAck {
204 #[cfg(feature = "test")]
205 pub fn rand() -> Self {
206 use rand::Rng;
207
208 #[cfg(feature = "shared-memory")]
209 use crate::common::ZExtZ64;
210 use crate::common::{ZExtUnit, ZExtZBuf};
211
212 let mut rng = rand::thread_rng();
213
214 let lease = if rng.gen_bool(0.5) {
215 Duration::from_secs(rng.gen())
216 } else {
217 Duration::from_millis(rng.gen())
218 };
219
220 let initial_sn: TransportSn = rng.gen();
221 let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
222 #[cfg(feature = "shared-memory")]
223 let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
224 let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
225 let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
226 let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
227 let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
228
229 Self {
230 lease,
231 initial_sn,
232 ext_qos,
233 #[cfg(feature = "shared-memory")]
234 ext_shm,
235 ext_auth,
236 ext_mlink,
237 ext_lowlatency,
238 ext_compression,
239 }
240 }
241}