1use core::time::Duration;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19 ZSlice,
20};
21use zenoh_protocol::{
22 common::{iext, imsg},
23 transport::{
24 id,
25 open::{ext, flag, OpenAck, OpenSyn},
26 TransportSn,
27 },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
31
32impl<W> WCodec<&OpenSyn, &mut W> for Zenoh080
34where
35 W: Writer,
36{
37 type Output = Result<(), DidntWrite>;
38
39 fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output {
40 let OpenSyn {
41 lease,
42 initial_sn,
43 cookie,
44 ext_qos,
45 #[cfg(feature = "shared-memory")]
46 ext_shm,
47 ext_auth,
48 ext_mlink,
49 ext_lowlatency,
50 ext_compression,
51 } = x;
52
53 let mut header = id::OPEN;
55 if lease.as_millis() % 1_000 == 0 {
56 header |= flag::T;
57 }
58 let mut n_exts = (ext_qos.is_some() as u8)
59 + (ext_auth.is_some() as u8)
60 + (ext_mlink.is_some() as u8)
61 + (ext_lowlatency.is_some() as u8)
62 + (ext_compression.is_some() as u8);
63
64 #[cfg(feature = "shared-memory")]
65 {
66 n_exts += ext_shm.is_some() as u8;
67 }
68
69 if n_exts != 0 {
70 header |= flag::Z;
71 }
72 self.write(&mut *writer, header)?;
73
74 if imsg::has_flag(header, flag::T) {
76 self.write(&mut *writer, lease.as_secs())?;
77 } else {
78 self.write(&mut *writer, lease.as_millis() as u64)?;
79 }
80 self.write(&mut *writer, initial_sn)?;
81 self.write(&mut *writer, cookie)?;
82
83 if let Some(qos) = ext_qos.as_ref() {
85 n_exts -= 1;
86 self.write(&mut *writer, (qos, n_exts != 0))?;
87 }
88 #[cfg(feature = "shared-memory")]
89 if let Some(shm) = ext_shm.as_ref() {
90 n_exts -= 1;
91 self.write(&mut *writer, (shm, n_exts != 0))?;
92 }
93 if let Some(auth) = ext_auth.as_ref() {
94 n_exts -= 1;
95 self.write(&mut *writer, (auth, n_exts != 0))?;
96 }
97 if let Some(mlink) = ext_mlink.as_ref() {
98 n_exts -= 1;
99 self.write(&mut *writer, (mlink, n_exts != 0))?;
100 }
101 if let Some(lowlatency) = ext_lowlatency.as_ref() {
102 n_exts -= 1;
103 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
104 }
105 if let Some(compression) = ext_compression.as_ref() {
106 n_exts -= 1;
107 self.write(&mut *writer, (compression, n_exts != 0))?;
108 }
109
110 Ok(())
111 }
112}
113
114impl<R> RCodec<OpenSyn, &mut R> for Zenoh080
115where
116 R: Reader,
117{
118 type Error = DidntRead;
119
120 fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
121 let header: u8 = self.read(&mut *reader)?;
122 let codec = Zenoh080Header::new(header);
123 codec.read(reader)
124 }
125}
126
127impl<R> RCodec<OpenSyn, &mut R> for Zenoh080Header
128where
129 R: Reader,
130{
131 type Error = DidntRead;
132
133 fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
134 if imsg::mid(self.header) != id::OPEN || imsg::has_flag(self.header, flag::A) {
135 return Err(DidntRead);
136 }
137
138 let lease: u64 = self.codec.read(&mut *reader)?;
140 let lease = if imsg::has_flag(self.header, flag::T) {
141 Duration::from_secs(lease)
142 } else {
143 Duration::from_millis(lease)
144 };
145 let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
146 let cookie: ZSlice = self.codec.read(&mut *reader)?;
147
148 let mut ext_qos = None;
150 #[cfg(feature = "shared-memory")]
151 let mut ext_shm = None;
152 let mut ext_auth = None;
153 let mut ext_mlink = None;
154 let mut ext_lowlatency = None;
155 let mut ext_compression = None;
156
157 let mut has_ext = imsg::has_flag(self.header, flag::Z);
158 while has_ext {
159 let ext: u8 = self.codec.read(&mut *reader)?;
160 let eodec = Zenoh080Header::new(ext);
161 match iext::eid(ext) {
162 ext::QoS::ID => {
163 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
164 ext_qos = Some(q);
165 has_ext = ext;
166 }
167 #[cfg(feature = "shared-memory")]
168 ext::Shm::ID => {
169 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
170 ext_shm = Some(s);
171 has_ext = ext;
172 }
173 ext::Auth::ID => {
174 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
175 ext_auth = Some(a);
176 has_ext = ext;
177 }
178 ext::MultiLinkSyn::ID => {
179 let (a, ext): (ext::MultiLinkSyn, bool) = eodec.read(&mut *reader)?;
180 ext_mlink = Some(a);
181 has_ext = ext;
182 }
183 ext::LowLatency::ID => {
184 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
185 ext_lowlatency = Some(q);
186 has_ext = ext;
187 }
188 ext::Compression::ID => {
189 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
190 ext_compression = Some(q);
191 has_ext = ext;
192 }
193 _ => {
194 has_ext = extension::skip(reader, "OpenSyn", ext)?;
195 }
196 }
197 }
198
199 Ok(OpenSyn {
200 lease,
201 initial_sn,
202 cookie,
203 ext_qos,
204 #[cfg(feature = "shared-memory")]
205 ext_shm,
206 ext_auth,
207 ext_mlink,
208 ext_lowlatency,
209 ext_compression,
210 })
211 }
212}
213
214impl<W> WCodec<&OpenAck, &mut W> for Zenoh080
216where
217 W: Writer,
218{
219 type Output = Result<(), DidntWrite>;
220
221 fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output {
222 let OpenAck {
223 lease,
224 initial_sn,
225 ext_qos,
226 #[cfg(feature = "shared-memory")]
227 ext_shm,
228 ext_auth,
229 ext_mlink,
230 ext_lowlatency,
231 ext_compression,
232 } = x;
233
234 let mut header = id::OPEN;
236 header |= flag::A;
237 if lease.subsec_nanos() == 0 {
239 header |= flag::T;
240 }
241 let mut n_exts = (ext_qos.is_some() as u8)
242 + (ext_auth.is_some() as u8)
243 + (ext_mlink.is_some() as u8)
244 + (ext_lowlatency.is_some() as u8)
245 + (ext_compression.is_some() as u8);
246
247 #[cfg(feature = "shared-memory")]
248 {
249 n_exts += ext_shm.is_some() as u8;
250 }
251
252 if n_exts != 0 {
253 header |= flag::Z;
254 }
255 self.write(&mut *writer, header)?;
256
257 if imsg::has_flag(header, flag::T) {
259 self.write(&mut *writer, lease.as_secs())?;
260 } else {
261 self.write(&mut *writer, lease.as_millis() as u64)?;
262 }
263 self.write(&mut *writer, initial_sn)?;
264
265 if let Some(qos) = ext_qos.as_ref() {
267 n_exts -= 1;
268 self.write(&mut *writer, (qos, n_exts != 0))?;
269 }
270 #[cfg(feature = "shared-memory")]
271 if let Some(shm) = ext_shm.as_ref() {
272 n_exts -= 1;
273 self.write(&mut *writer, (shm, n_exts != 0))?;
274 }
275 if let Some(auth) = ext_auth.as_ref() {
276 n_exts -= 1;
277 self.write(&mut *writer, (auth, n_exts != 0))?;
278 }
279 if let Some(mlink) = ext_mlink.as_ref() {
280 n_exts -= 1;
281 self.write(&mut *writer, (mlink, n_exts != 0))?;
282 }
283 if let Some(lowlatency) = ext_lowlatency.as_ref() {
284 n_exts -= 1;
285 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
286 }
287 if let Some(compression) = ext_compression.as_ref() {
288 n_exts -= 1;
289 self.write(&mut *writer, (compression, n_exts != 0))?;
290 }
291
292 Ok(())
293 }
294}
295
296impl<R> RCodec<OpenAck, &mut R> for Zenoh080
297where
298 R: Reader,
299{
300 type Error = DidntRead;
301
302 fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
303 let header: u8 = self.read(&mut *reader)?;
304 let codec = Zenoh080Header::new(header);
305 codec.read(reader)
306 }
307}
308
309impl<R> RCodec<OpenAck, &mut R> for Zenoh080Header
310where
311 R: Reader,
312{
313 type Error = DidntRead;
314
315 fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
316 if imsg::mid(self.header) != id::OPEN || !imsg::has_flag(self.header, flag::A) {
317 return Err(DidntRead);
318 }
319
320 let lease: u64 = self.codec.read(&mut *reader)?;
322 let lease = if imsg::has_flag(self.header, flag::T) {
323 Duration::from_secs(lease)
324 } else {
325 Duration::from_millis(lease)
326 };
327 let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
328
329 let mut ext_qos = None;
331 #[cfg(feature = "shared-memory")]
332 let mut ext_shm = None;
333 let mut ext_auth = None;
334 let mut ext_mlink = None;
335 let mut ext_lowlatency = None;
336 let mut ext_compression = None;
337
338 let mut has_ext = imsg::has_flag(self.header, flag::Z);
339 while has_ext {
340 let ext: u8 = self.codec.read(&mut *reader)?;
341 let eodec = Zenoh080Header::new(ext);
342 match iext::eid(ext) {
343 ext::QoS::ID => {
344 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
345 ext_qos = Some(q);
346 has_ext = ext;
347 }
348 #[cfg(feature = "shared-memory")]
349 ext::Shm::ID => {
350 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
351 ext_shm = Some(s);
352 has_ext = ext;
353 }
354 ext::Auth::ID => {
355 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
356 ext_auth = Some(a);
357 has_ext = ext;
358 }
359 ext::MultiLinkAck::ID => {
360 let (a, ext): (ext::MultiLinkAck, bool) = eodec.read(&mut *reader)?;
361 ext_mlink = Some(a);
362 has_ext = ext;
363 }
364 ext::LowLatency::ID => {
365 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
366 ext_lowlatency = Some(q);
367 has_ext = ext;
368 }
369 ext::Compression::ID => {
370 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
371 ext_compression = Some(q);
372 has_ext = ext;
373 }
374 _ => {
375 has_ext = extension::skip(reader, "OpenAck", ext)?;
376 }
377 }
378 }
379
380 Ok(OpenAck {
381 lease,
382 initial_sn,
383 ext_qos,
384 #[cfg(feature = "shared-memory")]
385 ext_shm,
386 ext_auth,
387 ext_mlink,
388 ext_lowlatency,
389 ext_compression,
390 })
391 }
392}