zenoh_codec/transport/
init.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17    ZSlice,
18};
19use zenoh_protocol::{
20    common::{iext, imsg},
21    core::{Resolution, WhatAmI, ZenohIdProto},
22    transport::{
23        batch_size, id,
24        init::{ext, flag, InitAck, InitSyn},
25        BatchSize,
26    },
27};
28
29use crate::{
30    common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header, Zenoh080Length,
31};
32
33// InitSyn
34impl<W> WCodec<&InitSyn, &mut W> for Zenoh080
35where
36    W: Writer,
37{
38    type Output = Result<(), DidntWrite>;
39
40    fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output {
41        let InitSyn {
42            version,
43            whatami,
44            zid,
45            resolution,
46            batch_size,
47            ext_qos,
48            ext_qos_link,
49            #[cfg(feature = "shared-memory")]
50            ext_shm,
51            ext_auth,
52            ext_mlink,
53            ext_lowlatency,
54            ext_compression,
55            ext_patch,
56        } = x;
57
58        // Header
59        let mut header = id::INIT;
60        if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
61            header |= flag::S;
62        }
63        let mut n_exts = (ext_qos.is_some() as u8)
64            + (ext_qos_link.is_some() as u8)
65            + (ext_auth.is_some() as u8)
66            + (ext_mlink.is_some() as u8)
67            + (ext_lowlatency.is_some() as u8)
68            + (ext_compression.is_some() as u8)
69            + (*ext_patch != ext::PatchType::NONE) as u8;
70
71        #[cfg(feature = "shared-memory")]
72        {
73            n_exts += ext_shm.is_some() as u8;
74        }
75
76        if n_exts != 0 {
77            header |= flag::Z;
78        }
79        self.write(&mut *writer, header)?;
80
81        // Body
82        self.write(&mut *writer, version)?;
83
84        let whatami: u8 = match whatami {
85            WhatAmI::Router => 0b00,
86            WhatAmI::Peer => 0b01,
87            WhatAmI::Client => 0b10,
88        };
89        let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
90        self.write(&mut *writer, flags)?;
91
92        let lodec = Zenoh080Length::new(zid.size());
93        lodec.write(&mut *writer, zid)?;
94
95        if imsg::has_flag(header, flag::S) {
96            self.write(&mut *writer, resolution.as_u8())?;
97            self.write(&mut *writer, batch_size.to_le_bytes())?;
98        }
99
100        // Extensions
101        if let Some(qos) = ext_qos.as_ref() {
102            n_exts -= 1;
103            self.write(&mut *writer, (qos, n_exts != 0))?;
104        }
105        if let Some(qos_link) = ext_qos_link.as_ref() {
106            n_exts -= 1;
107            self.write(&mut *writer, (qos_link, n_exts != 0))?;
108        }
109        #[cfg(feature = "shared-memory")]
110        if let Some(shm) = ext_shm.as_ref() {
111            n_exts -= 1;
112            self.write(&mut *writer, (shm, n_exts != 0))?;
113        }
114        if let Some(auth) = ext_auth.as_ref() {
115            n_exts -= 1;
116            self.write(&mut *writer, (auth, n_exts != 0))?;
117        }
118        if let Some(mlink) = ext_mlink.as_ref() {
119            n_exts -= 1;
120            self.write(&mut *writer, (mlink, n_exts != 0))?;
121        }
122        if let Some(lowlatency) = ext_lowlatency.as_ref() {
123            n_exts -= 1;
124            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
125        }
126        if let Some(compression) = ext_compression.as_ref() {
127            n_exts -= 1;
128            self.write(&mut *writer, (compression, n_exts != 0))?;
129        }
130        if *ext_patch != ext::PatchType::NONE {
131            n_exts -= 1;
132            self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
133        }
134
135        Ok(())
136    }
137}
138
139impl<R> RCodec<InitSyn, &mut R> for Zenoh080
140where
141    R: Reader,
142{
143    type Error = DidntRead;
144
145    fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
146        let header: u8 = self.read(&mut *reader)?;
147        let codec = Zenoh080Header::new(header);
148        codec.read(reader)
149    }
150}
151
152impl<R> RCodec<InitSyn, &mut R> for Zenoh080Header
153where
154    R: Reader,
155{
156    type Error = DidntRead;
157
158    fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
159        if imsg::mid(self.header) != id::INIT || imsg::has_flag(self.header, flag::A) {
160            return Err(DidntRead);
161        }
162
163        // Body
164        let version: u8 = self.codec.read(&mut *reader)?;
165
166        let flags: u8 = self.codec.read(&mut *reader)?;
167        let whatami = match flags & 0b11 {
168            0b00 => WhatAmI::Router,
169            0b01 => WhatAmI::Peer,
170            0b10 => WhatAmI::Client,
171            _ => return Err(DidntRead),
172        };
173        let length = 1 + ((flags >> 4) as usize);
174        let lodec = Zenoh080Length::new(length);
175        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
176
177        let mut resolution = Resolution::default();
178        let mut batch_size = batch_size::UNICAST.to_le_bytes();
179        if imsg::has_flag(self.header, flag::S) {
180            let flags: u8 = self.codec.read(&mut *reader)?;
181            resolution = Resolution::from(flags & 0b00111111);
182            batch_size = self.codec.read(&mut *reader)?;
183        }
184        let batch_size = BatchSize::from_le_bytes(batch_size);
185
186        // Extensions
187        let mut ext_qos = None;
188        let mut ext_qos_link = None;
189        #[cfg(feature = "shared-memory")]
190        let mut ext_shm = None;
191        let mut ext_auth = None;
192        let mut ext_mlink = None;
193        let mut ext_lowlatency = None;
194        let mut ext_compression = None;
195        let mut ext_patch = ext::PatchType::NONE;
196
197        let mut has_ext = imsg::has_flag(self.header, flag::Z);
198        while has_ext {
199            let ext: u8 = self.codec.read(&mut *reader)?;
200            let eodec = Zenoh080Header::new(ext);
201            match iext::eid(ext) {
202                ext::QoS::ID => {
203                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
204                    ext_qos = Some(q);
205                    has_ext = ext;
206                }
207                ext::QoSLink::ID => {
208                    let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
209                    ext_qos_link = Some(q);
210                    has_ext = ext;
211                }
212                #[cfg(feature = "shared-memory")]
213                ext::Shm::ID => {
214                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
215                    ext_shm = Some(s);
216                    has_ext = ext;
217                }
218                ext::Auth::ID => {
219                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
220                    ext_auth = Some(a);
221                    has_ext = ext;
222                }
223                ext::MultiLink::ID => {
224                    let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
225                    ext_mlink = Some(a);
226                    has_ext = ext;
227                }
228                ext::LowLatency::ID => {
229                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
230                    ext_lowlatency = Some(q);
231                    has_ext = ext;
232                }
233                ext::Compression::ID => {
234                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
235                    ext_compression = Some(q);
236                    has_ext = ext;
237                }
238                ext::Patch::ID => {
239                    let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
240                    ext_patch = p;
241                    has_ext = ext;
242                }
243                _ => {
244                    has_ext = extension::skip(reader, "InitSyn", ext)?;
245                }
246            }
247        }
248
249        Ok(InitSyn {
250            version,
251            whatami,
252            zid,
253            resolution,
254            batch_size,
255            ext_qos,
256            ext_qos_link,
257            #[cfg(feature = "shared-memory")]
258            ext_shm,
259            ext_auth,
260            ext_mlink,
261            ext_lowlatency,
262            ext_compression,
263            ext_patch,
264        })
265    }
266}
267
268// InitAck
269impl<W> WCodec<&InitAck, &mut W> for Zenoh080
270where
271    W: Writer,
272{
273    type Output = Result<(), DidntWrite>;
274
275    fn write(self, writer: &mut W, x: &InitAck) -> Self::Output {
276        let InitAck {
277            version,
278            whatami,
279            zid,
280            resolution,
281            batch_size,
282            cookie,
283            ext_qos,
284            ext_qos_link,
285            #[cfg(feature = "shared-memory")]
286            ext_shm,
287            ext_auth,
288            ext_mlink,
289            ext_lowlatency,
290            ext_compression,
291            ext_patch,
292        } = x;
293
294        // Header
295        let mut header = id::INIT | flag::A;
296        if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
297            header |= flag::S;
298        }
299        let mut n_exts = (ext_qos.is_some() as u8)
300            + (ext_qos_link.is_some() as u8)
301            + (ext_auth.is_some() as u8)
302            + (ext_mlink.is_some() as u8)
303            + (ext_lowlatency.is_some() as u8)
304            + (ext_compression.is_some() as u8)
305            + (*ext_patch != ext::PatchType::NONE) as u8;
306
307        #[cfg(feature = "shared-memory")]
308        {
309            n_exts += ext_shm.is_some() as u8;
310        }
311
312        if n_exts != 0 {
313            header |= flag::Z;
314        }
315        self.write(&mut *writer, header)?;
316
317        // Body
318        self.write(&mut *writer, version)?;
319
320        let whatami: u8 = match whatami {
321            WhatAmI::Router => 0b00,
322            WhatAmI::Peer => 0b01,
323            WhatAmI::Client => 0b10,
324        };
325        let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
326        self.write(&mut *writer, flags)?;
327
328        let lodec = Zenoh080Length::new(zid.size());
329        lodec.write(&mut *writer, zid)?;
330
331        if imsg::has_flag(header, flag::S) {
332            self.write(&mut *writer, resolution.as_u8())?;
333            self.write(&mut *writer, batch_size.to_le_bytes())?;
334        }
335
336        let zodec = Zenoh080Bounded::<BatchSize>::new();
337        zodec.write(&mut *writer, cookie)?;
338
339        // Extensions
340        if let Some(qos) = ext_qos.as_ref() {
341            n_exts -= 1;
342            self.write(&mut *writer, (qos, n_exts != 0))?;
343        }
344        if let Some(qos_link) = ext_qos_link.as_ref() {
345            n_exts -= 1;
346            self.write(&mut *writer, (qos_link, n_exts != 0))?;
347        }
348        #[cfg(feature = "shared-memory")]
349        if let Some(shm) = ext_shm.as_ref() {
350            n_exts -= 1;
351            self.write(&mut *writer, (shm, n_exts != 0))?;
352        }
353        if let Some(auth) = ext_auth.as_ref() {
354            n_exts -= 1;
355            self.write(&mut *writer, (auth, n_exts != 0))?;
356        }
357        if let Some(mlink) = ext_mlink.as_ref() {
358            n_exts -= 1;
359            self.write(&mut *writer, (mlink, n_exts != 0))?;
360        }
361        if let Some(lowlatency) = ext_lowlatency.as_ref() {
362            n_exts -= 1;
363            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
364        }
365        if let Some(compression) = ext_compression.as_ref() {
366            n_exts -= 1;
367            self.write(&mut *writer, (compression, n_exts != 0))?;
368        }
369        if *ext_patch != ext::PatchType::NONE {
370            n_exts -= 1;
371            self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
372        }
373
374        Ok(())
375    }
376}
377
378impl<R> RCodec<InitAck, &mut R> for Zenoh080
379where
380    R: Reader,
381{
382    type Error = DidntRead;
383
384    fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
385        let header: u8 = self.read(&mut *reader)?;
386        let codec = Zenoh080Header::new(header);
387        codec.read(reader)
388    }
389}
390
391impl<R> RCodec<InitAck, &mut R> for Zenoh080Header
392where
393    R: Reader,
394{
395    type Error = DidntRead;
396
397    fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
398        if imsg::mid(self.header) != id::INIT || !imsg::has_flag(self.header, flag::A) {
399            return Err(DidntRead);
400        }
401
402        // Body
403        let version: u8 = self.codec.read(&mut *reader)?;
404
405        let flags: u8 = self.codec.read(&mut *reader)?;
406        let whatami = match flags & 0b11 {
407            0b00 => WhatAmI::Router,
408            0b01 => WhatAmI::Peer,
409            0b10 => WhatAmI::Client,
410            _ => return Err(DidntRead),
411        };
412        let length = 1 + ((flags >> 4) as usize);
413        let lodec = Zenoh080Length::new(length);
414        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
415
416        let mut resolution = Resolution::default();
417        let mut batch_size = batch_size::UNICAST.to_le_bytes();
418        if imsg::has_flag(self.header, flag::S) {
419            let flags: u8 = self.codec.read(&mut *reader)?;
420            resolution = Resolution::from(flags & 0b00111111);
421            batch_size = self.codec.read(&mut *reader)?;
422        }
423        let batch_size = BatchSize::from_le_bytes(batch_size);
424
425        let zodec = Zenoh080Bounded::<BatchSize>::new();
426        let cookie: ZSlice = zodec.read(&mut *reader)?;
427
428        // Extensions
429        let mut ext_qos = None;
430        let mut ext_qos_link = None;
431        #[cfg(feature = "shared-memory")]
432        let mut ext_shm = None;
433        let mut ext_auth = None;
434        let mut ext_mlink = None;
435        let mut ext_lowlatency = None;
436        let mut ext_compression = None;
437        let mut ext_patch = ext::PatchType::NONE;
438
439        let mut has_ext = imsg::has_flag(self.header, flag::Z);
440        while has_ext {
441            let ext: u8 = self.codec.read(&mut *reader)?;
442            let eodec = Zenoh080Header::new(ext);
443            match iext::eid(ext) {
444                ext::QoS::ID => {
445                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
446                    ext_qos = Some(q);
447                    has_ext = ext;
448                }
449                ext::QoSLink::ID => {
450                    let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
451                    ext_qos_link = Some(q);
452                    has_ext = ext;
453                }
454                #[cfg(feature = "shared-memory")]
455                ext::Shm::ID => {
456                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
457                    ext_shm = Some(s);
458                    has_ext = ext;
459                }
460                ext::Auth::ID => {
461                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
462                    ext_auth = Some(a);
463                    has_ext = ext;
464                }
465                ext::MultiLink::ID => {
466                    let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
467                    ext_mlink = Some(a);
468                    has_ext = ext;
469                }
470                ext::LowLatency::ID => {
471                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
472                    ext_lowlatency = Some(q);
473                    has_ext = ext;
474                }
475                ext::Compression::ID => {
476                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
477                    ext_compression = Some(q);
478                    has_ext = ext;
479                }
480                ext::Patch::ID => {
481                    let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
482                    ext_patch = p;
483                    has_ext = ext;
484                }
485                _ => {
486                    has_ext = extension::skip(reader, "InitAck", ext)?;
487                }
488            }
489        }
490
491        Ok(InitAck {
492            version,
493            whatami,
494            zid,
495            resolution,
496            batch_size,
497            cookie,
498            ext_qos,
499            ext_qos_link,
500            #[cfg(feature = "shared-memory")]
501            ext_shm,
502            ext_auth,
503            ext_mlink,
504            ext_lowlatency,
505            ext_compression,
506            ext_patch,
507        })
508    }
509}