Skip to main content

zenoh_codec/transport/
init.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17    ZSlice,
18};
19use zenoh_protocol::{
20    common::{iext, imsg},
21    core::{Resolution, WhatAmI, ZenohIdProto},
22    transport::{
23        batch_size, id,
24        init::{ext, flag, InitAck, InitSyn},
25        BatchSize,
26    },
27};
28
29use crate::{
30    common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header, Zenoh080Length,
31};
32
33// InitSyn
34impl<W> WCodec<&InitSyn, &mut W> for Zenoh080
35where
36    W: Writer,
37{
38    type Output = Result<(), DidntWrite>;
39
40    fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output {
41        let InitSyn {
42            version,
43            whatami,
44            zid,
45            resolution,
46            batch_size,
47            ext_qos,
48            ext_qos_link,
49            #[cfg(feature = "shared-memory")]
50            ext_shm,
51            ext_auth,
52            ext_mlink,
53            ext_lowlatency,
54            ext_compression,
55            ext_patch,
56            ext_region_name,
57        } = x;
58
59        // Header
60        let mut header = id::INIT;
61        if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
62            header |= flag::S;
63        }
64        let mut n_exts = (ext_qos.is_some() as u8)
65            + (ext_qos_link.is_some() as u8)
66            + (ext_auth.is_some() as u8)
67            + (ext_mlink.is_some() as u8)
68            + (ext_lowlatency.is_some() as u8)
69            + (ext_compression.is_some() as u8)
70            + (*ext_patch != ext::PatchType::NONE) as u8
71            + (ext_region_name.is_some() as u8);
72
73        #[cfg(feature = "shared-memory")]
74        {
75            n_exts += ext_shm.is_some() as u8;
76        }
77
78        if n_exts != 0 {
79            header |= flag::Z;
80        }
81        self.write(&mut *writer, header)?;
82
83        // Body
84        self.write(&mut *writer, version)?;
85
86        let whatami: u8 = match whatami {
87            WhatAmI::Router => 0b00,
88            WhatAmI::Peer => 0b01,
89            WhatAmI::Client => 0b10,
90        };
91        let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
92        self.write(&mut *writer, flags)?;
93
94        let lodec = Zenoh080Length::new(zid.size());
95        lodec.write(&mut *writer, zid)?;
96
97        if imsg::has_flag(header, flag::S) {
98            self.write(&mut *writer, resolution.as_u8())?;
99            self.write(&mut *writer, batch_size.to_le_bytes())?;
100        }
101
102        // Extensions
103        if let Some(qos) = ext_qos.as_ref() {
104            n_exts -= 1;
105            self.write(&mut *writer, (qos, n_exts != 0))?;
106        }
107        if let Some(qos_link) = ext_qos_link.as_ref() {
108            n_exts -= 1;
109            self.write(&mut *writer, (qos_link, n_exts != 0))?;
110        }
111        #[cfg(feature = "shared-memory")]
112        if let Some(shm) = ext_shm.as_ref() {
113            n_exts -= 1;
114            self.write(&mut *writer, (shm, n_exts != 0))?;
115        }
116        if let Some(auth) = ext_auth.as_ref() {
117            n_exts -= 1;
118            self.write(&mut *writer, (auth, n_exts != 0))?;
119        }
120        if let Some(mlink) = ext_mlink.as_ref() {
121            n_exts -= 1;
122            self.write(&mut *writer, (mlink, n_exts != 0))?;
123        }
124        if let Some(lowlatency) = ext_lowlatency.as_ref() {
125            n_exts -= 1;
126            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
127        }
128        if let Some(compression) = ext_compression.as_ref() {
129            n_exts -= 1;
130            self.write(&mut *writer, (compression, n_exts != 0))?;
131        }
132        if *ext_patch != ext::PatchType::NONE {
133            n_exts -= 1;
134            self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
135        }
136        if let Some(region_name) = ext_region_name.as_ref() {
137            n_exts -= 1;
138            self.write(&mut *writer, (region_name, n_exts != 0))?;
139        }
140
141        Ok(())
142    }
143}
144
145impl<R> RCodec<InitSyn, &mut R> for Zenoh080
146where
147    R: Reader,
148{
149    type Error = DidntRead;
150
151    fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
152        let header: u8 = self.read(&mut *reader)?;
153        let codec = Zenoh080Header::new(header);
154        codec.read(reader)
155    }
156}
157
158impl<R> RCodec<InitSyn, &mut R> for Zenoh080Header
159where
160    R: Reader,
161{
162    type Error = DidntRead;
163
164    fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
165        if imsg::mid(self.header) != id::INIT || imsg::has_flag(self.header, flag::A) {
166            return Err(DidntRead);
167        }
168
169        // Body
170        let version: u8 = self.codec.read(&mut *reader)?;
171
172        let flags: u8 = self.codec.read(&mut *reader)?;
173        let whatami = match flags & 0b11 {
174            0b00 => WhatAmI::Router,
175            0b01 => WhatAmI::Peer,
176            0b10 => WhatAmI::Client,
177            _ => return Err(DidntRead),
178        };
179        let length = 1 + ((flags >> 4) as usize);
180        let lodec = Zenoh080Length::new(length);
181        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
182
183        let mut resolution = Resolution::default();
184        let mut batch_size = batch_size::UNICAST.to_le_bytes();
185        if imsg::has_flag(self.header, flag::S) {
186            let flags: u8 = self.codec.read(&mut *reader)?;
187            resolution = Resolution::from(flags & 0b00111111);
188            batch_size = self.codec.read(&mut *reader)?;
189        }
190        let batch_size = BatchSize::from_le_bytes(batch_size);
191
192        // Extensions
193        let mut ext_qos = None;
194        let mut ext_qos_link = None;
195        #[cfg(feature = "shared-memory")]
196        let mut ext_shm = None;
197        let mut ext_auth = None;
198        let mut ext_mlink = None;
199        let mut ext_lowlatency = None;
200        let mut ext_compression = None;
201        let mut ext_patch = ext::PatchType::NONE;
202        let mut ext_northtag = None;
203
204        let mut has_ext = imsg::has_flag(self.header, flag::Z);
205        while has_ext {
206            let ext: u8 = self.codec.read(&mut *reader)?;
207            let eodec = Zenoh080Header::new(ext);
208            match iext::eid(ext) {
209                ext::QoS::ID => {
210                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
211                    ext_qos = Some(q);
212                    has_ext = ext;
213                }
214                ext::QoSLink::ID => {
215                    let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
216                    ext_qos_link = Some(q);
217                    has_ext = ext;
218                }
219                #[cfg(feature = "shared-memory")]
220                ext::Shm::ID => {
221                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
222                    ext_shm = Some(s);
223                    has_ext = ext;
224                }
225                ext::Auth::ID => {
226                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
227                    ext_auth = Some(a);
228                    has_ext = ext;
229                }
230                ext::MultiLink::ID => {
231                    let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
232                    ext_mlink = Some(a);
233                    has_ext = ext;
234                }
235                ext::LowLatency::ID => {
236                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
237                    ext_lowlatency = Some(q);
238                    has_ext = ext;
239                }
240                ext::Compression::ID => {
241                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
242                    ext_compression = Some(q);
243                    has_ext = ext;
244                }
245                ext::Patch::ID => {
246                    let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
247                    ext_patch = p;
248                    has_ext = ext;
249                }
250                ext::RegionName::ID => {
251                    let (p, ext): (ext::RegionName, bool) = eodec.read(&mut *reader)?;
252                    ext_northtag = Some(p);
253                    has_ext = ext;
254                }
255                _ => {
256                    has_ext = extension::skip(reader, "InitSyn", ext)?;
257                }
258            }
259        }
260
261        Ok(InitSyn {
262            version,
263            whatami,
264            zid,
265            resolution,
266            batch_size,
267            ext_qos,
268            ext_qos_link,
269            #[cfg(feature = "shared-memory")]
270            ext_shm,
271            ext_auth,
272            ext_mlink,
273            ext_lowlatency,
274            ext_compression,
275            ext_patch,
276            ext_region_name: ext_northtag,
277        })
278    }
279}
280
281// InitAck
282impl<W> WCodec<&InitAck, &mut W> for Zenoh080
283where
284    W: Writer,
285{
286    type Output = Result<(), DidntWrite>;
287
288    fn write(self, writer: &mut W, x: &InitAck) -> Self::Output {
289        let InitAck {
290            version,
291            whatami,
292            zid,
293            resolution,
294            batch_size,
295            cookie,
296            ext_qos,
297            ext_qos_link,
298            #[cfg(feature = "shared-memory")]
299            ext_shm,
300            ext_auth,
301            ext_mlink,
302            ext_lowlatency,
303            ext_compression,
304            ext_patch,
305            ext_region_name,
306        } = x;
307
308        // Header
309        let mut header = id::INIT | flag::A;
310        if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
311            header |= flag::S;
312        }
313        let mut n_exts = (ext_qos.is_some() as u8)
314            + (ext_qos_link.is_some() as u8)
315            + (ext_auth.is_some() as u8)
316            + (ext_mlink.is_some() as u8)
317            + (ext_lowlatency.is_some() as u8)
318            + (ext_compression.is_some() as u8)
319            + (*ext_patch != ext::PatchType::NONE) as u8
320            + (ext_region_name.is_some() as u8);
321
322        #[cfg(feature = "shared-memory")]
323        {
324            n_exts += ext_shm.is_some() as u8;
325        }
326
327        if n_exts != 0 {
328            header |= flag::Z;
329        }
330        self.write(&mut *writer, header)?;
331
332        // Body
333        self.write(&mut *writer, version)?;
334
335        let whatami: u8 = match whatami {
336            WhatAmI::Router => 0b00,
337            WhatAmI::Peer => 0b01,
338            WhatAmI::Client => 0b10,
339        };
340        let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
341        self.write(&mut *writer, flags)?;
342
343        let lodec = Zenoh080Length::new(zid.size());
344        lodec.write(&mut *writer, zid)?;
345
346        if imsg::has_flag(header, flag::S) {
347            self.write(&mut *writer, resolution.as_u8())?;
348            self.write(&mut *writer, batch_size.to_le_bytes())?;
349        }
350
351        let zodec = Zenoh080Bounded::<BatchSize>::new();
352        zodec.write(&mut *writer, cookie)?;
353
354        // Extensions
355        if let Some(qos) = ext_qos.as_ref() {
356            n_exts -= 1;
357            self.write(&mut *writer, (qos, n_exts != 0))?;
358        }
359        if let Some(qos_link) = ext_qos_link.as_ref() {
360            n_exts -= 1;
361            self.write(&mut *writer, (qos_link, n_exts != 0))?;
362        }
363        #[cfg(feature = "shared-memory")]
364        if let Some(shm) = ext_shm.as_ref() {
365            n_exts -= 1;
366            self.write(&mut *writer, (shm, n_exts != 0))?;
367        }
368        if let Some(auth) = ext_auth.as_ref() {
369            n_exts -= 1;
370            self.write(&mut *writer, (auth, n_exts != 0))?;
371        }
372        if let Some(mlink) = ext_mlink.as_ref() {
373            n_exts -= 1;
374            self.write(&mut *writer, (mlink, n_exts != 0))?;
375        }
376        if let Some(lowlatency) = ext_lowlatency.as_ref() {
377            n_exts -= 1;
378            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
379        }
380        if let Some(compression) = ext_compression.as_ref() {
381            n_exts -= 1;
382            self.write(&mut *writer, (compression, n_exts != 0))?;
383        }
384        if *ext_patch != ext::PatchType::NONE {
385            n_exts -= 1;
386            self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
387        }
388        if let Some(region_name) = ext_region_name.as_ref() {
389            n_exts -= 1;
390            self.write(&mut *writer, (region_name, n_exts != 0))?;
391        }
392
393        Ok(())
394    }
395}
396
397impl<R> RCodec<InitAck, &mut R> for Zenoh080
398where
399    R: Reader,
400{
401    type Error = DidntRead;
402
403    fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
404        let header: u8 = self.read(&mut *reader)?;
405        let codec = Zenoh080Header::new(header);
406        codec.read(reader)
407    }
408}
409
410impl<R> RCodec<InitAck, &mut R> for Zenoh080Header
411where
412    R: Reader,
413{
414    type Error = DidntRead;
415
416    fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
417        if imsg::mid(self.header) != id::INIT || !imsg::has_flag(self.header, flag::A) {
418            return Err(DidntRead);
419        }
420
421        // Body
422        let version: u8 = self.codec.read(&mut *reader)?;
423
424        let flags: u8 = self.codec.read(&mut *reader)?;
425        let whatami = match flags & 0b11 {
426            0b00 => WhatAmI::Router,
427            0b01 => WhatAmI::Peer,
428            0b10 => WhatAmI::Client,
429            _ => return Err(DidntRead),
430        };
431        let length = 1 + ((flags >> 4) as usize);
432        let lodec = Zenoh080Length::new(length);
433        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
434
435        let mut resolution = Resolution::default();
436        let mut batch_size = batch_size::UNICAST.to_le_bytes();
437        if imsg::has_flag(self.header, flag::S) {
438            let flags: u8 = self.codec.read(&mut *reader)?;
439            resolution = Resolution::from(flags & 0b00111111);
440            batch_size = self.codec.read(&mut *reader)?;
441        }
442        let batch_size = BatchSize::from_le_bytes(batch_size);
443
444        let zodec = Zenoh080Bounded::<BatchSize>::new();
445        let cookie: ZSlice = zodec.read(&mut *reader)?;
446
447        // Extensions
448        let mut ext_qos = None;
449        let mut ext_qos_link = None;
450        #[cfg(feature = "shared-memory")]
451        let mut ext_shm = None;
452        let mut ext_auth = None;
453        let mut ext_mlink = None;
454        let mut ext_lowlatency = None;
455        let mut ext_compression = None;
456        let mut ext_patch = ext::PatchType::NONE;
457        let mut ext_region_name = None;
458
459        let mut has_ext = imsg::has_flag(self.header, flag::Z);
460        while has_ext {
461            let ext: u8 = self.codec.read(&mut *reader)?;
462            let eodec = Zenoh080Header::new(ext);
463            match iext::eid(ext) {
464                ext::QoS::ID => {
465                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
466                    ext_qos = Some(q);
467                    has_ext = ext;
468                }
469                ext::QoSLink::ID => {
470                    let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
471                    ext_qos_link = Some(q);
472                    has_ext = ext;
473                }
474                #[cfg(feature = "shared-memory")]
475                ext::Shm::ID => {
476                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
477                    ext_shm = Some(s);
478                    has_ext = ext;
479                }
480                ext::Auth::ID => {
481                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
482                    ext_auth = Some(a);
483                    has_ext = ext;
484                }
485                ext::MultiLink::ID => {
486                    let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
487                    ext_mlink = Some(a);
488                    has_ext = ext;
489                }
490                ext::LowLatency::ID => {
491                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
492                    ext_lowlatency = Some(q);
493                    has_ext = ext;
494                }
495                ext::Compression::ID => {
496                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
497                    ext_compression = Some(q);
498                    has_ext = ext;
499                }
500                ext::Patch::ID => {
501                    let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
502                    ext_patch = p;
503                    has_ext = ext;
504                }
505                ext::RegionName::ID => {
506                    let (q, ext): (ext::RegionName, bool) = eodec.read(&mut *reader)?;
507                    ext_region_name = Some(q);
508                    has_ext = ext;
509                }
510                _ => {
511                    has_ext = extension::skip(reader, "InitAck", ext)?;
512                }
513            }
514        }
515
516        Ok(InitAck {
517            version,
518            whatami,
519            zid,
520            resolution,
521            batch_size,
522            cookie,
523            ext_qos,
524            ext_qos_link,
525            #[cfg(feature = "shared-memory")]
526            ext_shm,
527            ext_auth,
528            ext_mlink,
529            ext_lowlatency,
530            ext_compression,
531            ext_patch,
532            ext_region_name,
533        })
534    }
535}