Skip to main content

zenoh_codec/transport/
open.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::time::Duration;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19    ZSlice,
20};
21use zenoh_protocol::{
22    common::{iext, imsg},
23    transport::{
24        id,
25        open::{ext, flag, OpenAck, OpenSyn},
26        TransportSn,
27    },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
31
32// OpenSyn
33impl<W> WCodec<&OpenSyn, &mut W> for Zenoh080
34where
35    W: Writer,
36{
37    type Output = Result<(), DidntWrite>;
38
39    fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output {
40        let OpenSyn {
41            lease,
42            initial_sn,
43            cookie,
44            ext_qos,
45            #[cfg(feature = "shared-memory")]
46            ext_shm,
47            ext_auth,
48            ext_mlink,
49            ext_lowlatency,
50            ext_compression,
51            ext_south,
52        } = x;
53
54        // Header
55        let mut header = id::OPEN;
56        if lease.as_millis() % 1_000 == 0 {
57            header |= flag::T;
58        }
59        let mut n_exts = (ext_qos.is_some() as u8)
60            + (ext_auth.is_some() as u8)
61            + (ext_mlink.is_some() as u8)
62            + (ext_lowlatency.is_some() as u8)
63            + (ext_compression.is_some() as u8)
64            + (ext_south.is_some() as u8);
65
66        #[cfg(feature = "shared-memory")]
67        {
68            n_exts += ext_shm.is_some() as u8;
69        }
70
71        if n_exts != 0 {
72            header |= flag::Z;
73        }
74        self.write(&mut *writer, header)?;
75
76        // Body
77        if imsg::has_flag(header, flag::T) {
78            self.write(&mut *writer, lease.as_secs())?;
79        } else {
80            self.write(&mut *writer, lease.as_millis() as u64)?;
81        }
82        self.write(&mut *writer, initial_sn)?;
83        self.write(&mut *writer, cookie)?;
84
85        // Extensions
86        if let Some(qos) = ext_qos.as_ref() {
87            n_exts -= 1;
88            self.write(&mut *writer, (qos, n_exts != 0))?;
89        }
90        #[cfg(feature = "shared-memory")]
91        if let Some(shm) = ext_shm.as_ref() {
92            n_exts -= 1;
93            self.write(&mut *writer, (shm, n_exts != 0))?;
94        }
95        if let Some(auth) = ext_auth.as_ref() {
96            n_exts -= 1;
97            self.write(&mut *writer, (auth, n_exts != 0))?;
98        }
99        if let Some(mlink) = ext_mlink.as_ref() {
100            n_exts -= 1;
101            self.write(&mut *writer, (mlink, n_exts != 0))?;
102        }
103        if let Some(lowlatency) = ext_lowlatency.as_ref() {
104            n_exts -= 1;
105            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
106        }
107        if let Some(compression) = ext_compression.as_ref() {
108            n_exts -= 1;
109            self.write(&mut *writer, (compression, n_exts != 0))?;
110        }
111        if let Some(south) = ext_south.as_ref() {
112            n_exts -= 1;
113            self.write(&mut *writer, (south, n_exts != 0))?;
114        }
115
116        Ok(())
117    }
118}
119
120impl<R> RCodec<OpenSyn, &mut R> for Zenoh080
121where
122    R: Reader,
123{
124    type Error = DidntRead;
125
126    fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
127        let header: u8 = self.read(&mut *reader)?;
128        let codec = Zenoh080Header::new(header);
129        codec.read(reader)
130    }
131}
132
133impl<R> RCodec<OpenSyn, &mut R> for Zenoh080Header
134where
135    R: Reader,
136{
137    type Error = DidntRead;
138
139    fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
140        if imsg::mid(self.header) != id::OPEN || imsg::has_flag(self.header, flag::A) {
141            return Err(DidntRead);
142        }
143
144        // Body
145        let lease: u64 = self.codec.read(&mut *reader)?;
146        let lease = if imsg::has_flag(self.header, flag::T) {
147            Duration::from_secs(lease)
148        } else {
149            Duration::from_millis(lease)
150        };
151        let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
152        let cookie: ZSlice = self.codec.read(&mut *reader)?;
153
154        // Extensions
155        let mut ext_qos = None;
156        #[cfg(feature = "shared-memory")]
157        let mut ext_shm = None;
158        let mut ext_auth = None;
159        let mut ext_mlink = None;
160        let mut ext_lowlatency = None;
161        let mut ext_compression = None;
162        let mut ext_south = None;
163
164        let mut has_ext = imsg::has_flag(self.header, flag::Z);
165        while has_ext {
166            let ext: u8 = self.codec.read(&mut *reader)?;
167            let eodec = Zenoh080Header::new(ext);
168            match iext::eid(ext) {
169                ext::QoS::ID => {
170                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
171                    ext_qos = Some(q);
172                    has_ext = ext;
173                }
174                #[cfg(feature = "shared-memory")]
175                ext::Shm::ID => {
176                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
177                    ext_shm = Some(s);
178                    has_ext = ext;
179                }
180                ext::Auth::ID => {
181                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
182                    ext_auth = Some(a);
183                    has_ext = ext;
184                }
185                ext::MultiLinkSyn::ID => {
186                    let (a, ext): (ext::MultiLinkSyn, bool) = eodec.read(&mut *reader)?;
187                    ext_mlink = Some(a);
188                    has_ext = ext;
189                }
190                ext::LowLatency::ID => {
191                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
192                    ext_lowlatency = Some(q);
193                    has_ext = ext;
194                }
195                ext::Compression::ID => {
196                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
197                    ext_compression = Some(q);
198                    has_ext = ext;
199                }
200                ext::RemoteBound::ID => {
201                    let (q, ext): (ext::RemoteBound, bool) = eodec.read(&mut *reader)?;
202                    ext_south = Some(q);
203                    has_ext = ext;
204                }
205                _ => {
206                    has_ext = extension::skip(reader, "OpenSyn", ext)?;
207                }
208            }
209        }
210
211        Ok(OpenSyn {
212            lease,
213            initial_sn,
214            cookie,
215            ext_qos,
216            #[cfg(feature = "shared-memory")]
217            ext_shm,
218            ext_auth,
219            ext_mlink,
220            ext_lowlatency,
221            ext_compression,
222            ext_south,
223        })
224    }
225}
226
227// OpenAck
228impl<W> WCodec<&OpenAck, &mut W> for Zenoh080
229where
230    W: Writer,
231{
232    type Output = Result<(), DidntWrite>;
233
234    fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output {
235        let OpenAck {
236            lease,
237            initial_sn,
238            ext_qos,
239            #[cfg(feature = "shared-memory")]
240            ext_shm,
241            ext_auth,
242            ext_mlink,
243            ext_lowlatency,
244            ext_compression,
245            ext_south,
246        } = x;
247
248        // Header
249        let mut header = id::OPEN;
250        header |= flag::A;
251        // Verify that the timeout is expressed in seconds, i.e. subsec part is 0.
252        if lease.subsec_nanos() == 0 {
253            header |= flag::T;
254        }
255        let mut n_exts = (ext_qos.is_some() as u8)
256            + (ext_auth.is_some() as u8)
257            + (ext_mlink.is_some() as u8)
258            + (ext_lowlatency.is_some() as u8)
259            + (ext_compression.is_some() as u8)
260            + (ext_south.is_some() as u8);
261
262        #[cfg(feature = "shared-memory")]
263        {
264            n_exts += ext_shm.is_some() as u8;
265        }
266
267        if n_exts != 0 {
268            header |= flag::Z;
269        }
270        self.write(&mut *writer, header)?;
271
272        // Body
273        if imsg::has_flag(header, flag::T) {
274            self.write(&mut *writer, lease.as_secs())?;
275        } else {
276            self.write(&mut *writer, lease.as_millis() as u64)?;
277        }
278        self.write(&mut *writer, initial_sn)?;
279
280        // Extensions
281        if let Some(qos) = ext_qos.as_ref() {
282            n_exts -= 1;
283            self.write(&mut *writer, (qos, n_exts != 0))?;
284        }
285        #[cfg(feature = "shared-memory")]
286        if let Some(shm) = ext_shm.as_ref() {
287            n_exts -= 1;
288            self.write(&mut *writer, (shm, n_exts != 0))?;
289        }
290        if let Some(auth) = ext_auth.as_ref() {
291            n_exts -= 1;
292            self.write(&mut *writer, (auth, n_exts != 0))?;
293        }
294        if let Some(mlink) = ext_mlink.as_ref() {
295            n_exts -= 1;
296            self.write(&mut *writer, (mlink, n_exts != 0))?;
297        }
298        if let Some(lowlatency) = ext_lowlatency.as_ref() {
299            n_exts -= 1;
300            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
301        }
302        if let Some(compression) = ext_compression.as_ref() {
303            n_exts -= 1;
304            self.write(&mut *writer, (compression, n_exts != 0))?;
305        }
306        if let Some(south) = ext_south.as_ref() {
307            n_exts -= 1;
308            self.write(&mut *writer, (south, n_exts != 0))?;
309        }
310
311        Ok(())
312    }
313}
314
315impl<R> RCodec<OpenAck, &mut R> for Zenoh080
316where
317    R: Reader,
318{
319    type Error = DidntRead;
320
321    fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
322        let header: u8 = self.read(&mut *reader)?;
323        let codec = Zenoh080Header::new(header);
324        codec.read(reader)
325    }
326}
327
328impl<R> RCodec<OpenAck, &mut R> for Zenoh080Header
329where
330    R: Reader,
331{
332    type Error = DidntRead;
333
334    fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
335        if imsg::mid(self.header) != id::OPEN || !imsg::has_flag(self.header, flag::A) {
336            return Err(DidntRead);
337        }
338
339        // Body
340        let lease: u64 = self.codec.read(&mut *reader)?;
341        let lease = if imsg::has_flag(self.header, flag::T) {
342            Duration::from_secs(lease)
343        } else {
344            Duration::from_millis(lease)
345        };
346        let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
347
348        // Extensions
349        let mut ext_qos = None;
350        #[cfg(feature = "shared-memory")]
351        let mut ext_shm = None;
352        let mut ext_auth = None;
353        let mut ext_mlink = None;
354        let mut ext_lowlatency = None;
355        let mut ext_compression = None;
356        let mut ext_south = None;
357
358        let mut has_ext = imsg::has_flag(self.header, flag::Z);
359        while has_ext {
360            let ext: u8 = self.codec.read(&mut *reader)?;
361            let eodec = Zenoh080Header::new(ext);
362            match iext::eid(ext) {
363                ext::QoS::ID => {
364                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
365                    ext_qos = Some(q);
366                    has_ext = ext;
367                }
368                #[cfg(feature = "shared-memory")]
369                ext::Shm::ID => {
370                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
371                    ext_shm = Some(s);
372                    has_ext = ext;
373                }
374                ext::Auth::ID => {
375                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
376                    ext_auth = Some(a);
377                    has_ext = ext;
378                }
379                ext::MultiLinkAck::ID => {
380                    let (a, ext): (ext::MultiLinkAck, bool) = eodec.read(&mut *reader)?;
381                    ext_mlink = Some(a);
382                    has_ext = ext;
383                }
384                ext::LowLatency::ID => {
385                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
386                    ext_lowlatency = Some(q);
387                    has_ext = ext;
388                }
389                ext::Compression::ID => {
390                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
391                    ext_compression = Some(q);
392                    has_ext = ext;
393                }
394                ext::RemoteBound::ID => {
395                    let (q, ext): (ext::RemoteBound, bool) = eodec.read(&mut *reader)?;
396                    ext_south = Some(q);
397                    has_ext = ext;
398                }
399                _ => {
400                    has_ext = extension::skip(reader, "OpenAck", ext)?;
401                }
402            }
403        }
404
405        Ok(OpenAck {
406            lease,
407            initial_sn,
408            ext_qos,
409            #[cfg(feature = "shared-memory")]
410            ext_shm,
411            ext_auth,
412            ext_mlink,
413            ext_lowlatency,
414            ext_compression,
415            ext_south,
416        })
417    }
418}