zenoh_codec/transport/
open.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::time::Duration;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19    ZSlice,
20};
21use zenoh_protocol::{
22    common::{iext, imsg},
23    transport::{
24        id,
25        open::{ext, flag, OpenAck, OpenSyn},
26        TransportSn,
27    },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
31
32// OpenSyn
33impl<W> WCodec<&OpenSyn, &mut W> for Zenoh080
34where
35    W: Writer,
36{
37    type Output = Result<(), DidntWrite>;
38
39    fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output {
40        let OpenSyn {
41            lease,
42            initial_sn,
43            cookie,
44            ext_qos,
45            #[cfg(feature = "shared-memory")]
46            ext_shm,
47            ext_auth,
48            ext_mlink,
49            ext_lowlatency,
50            ext_compression,
51        } = x;
52
53        // Header
54        let mut header = id::OPEN;
55        if lease.as_millis() % 1_000 == 0 {
56            header |= flag::T;
57        }
58        let mut n_exts = (ext_qos.is_some() as u8)
59            + (ext_auth.is_some() as u8)
60            + (ext_mlink.is_some() as u8)
61            + (ext_lowlatency.is_some() as u8)
62            + (ext_compression.is_some() as u8);
63
64        #[cfg(feature = "shared-memory")]
65        {
66            n_exts += ext_shm.is_some() as u8;
67        }
68
69        if n_exts != 0 {
70            header |= flag::Z;
71        }
72        self.write(&mut *writer, header)?;
73
74        // Body
75        if imsg::has_flag(header, flag::T) {
76            self.write(&mut *writer, lease.as_secs())?;
77        } else {
78            self.write(&mut *writer, lease.as_millis() as u64)?;
79        }
80        self.write(&mut *writer, initial_sn)?;
81        self.write(&mut *writer, cookie)?;
82
83        // Extensions
84        if let Some(qos) = ext_qos.as_ref() {
85            n_exts -= 1;
86            self.write(&mut *writer, (qos, n_exts != 0))?;
87        }
88        #[cfg(feature = "shared-memory")]
89        if let Some(shm) = ext_shm.as_ref() {
90            n_exts -= 1;
91            self.write(&mut *writer, (shm, n_exts != 0))?;
92        }
93        if let Some(auth) = ext_auth.as_ref() {
94            n_exts -= 1;
95            self.write(&mut *writer, (auth, n_exts != 0))?;
96        }
97        if let Some(mlink) = ext_mlink.as_ref() {
98            n_exts -= 1;
99            self.write(&mut *writer, (mlink, n_exts != 0))?;
100        }
101        if let Some(lowlatency) = ext_lowlatency.as_ref() {
102            n_exts -= 1;
103            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
104        }
105        if let Some(compression) = ext_compression.as_ref() {
106            n_exts -= 1;
107            self.write(&mut *writer, (compression, n_exts != 0))?;
108        }
109
110        Ok(())
111    }
112}
113
114impl<R> RCodec<OpenSyn, &mut R> for Zenoh080
115where
116    R: Reader,
117{
118    type Error = DidntRead;
119
120    fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
121        let header: u8 = self.read(&mut *reader)?;
122        let codec = Zenoh080Header::new(header);
123        codec.read(reader)
124    }
125}
126
127impl<R> RCodec<OpenSyn, &mut R> for Zenoh080Header
128where
129    R: Reader,
130{
131    type Error = DidntRead;
132
133    fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
134        if imsg::mid(self.header) != id::OPEN || imsg::has_flag(self.header, flag::A) {
135            return Err(DidntRead);
136        }
137
138        // Body
139        let lease: u64 = self.codec.read(&mut *reader)?;
140        let lease = if imsg::has_flag(self.header, flag::T) {
141            Duration::from_secs(lease)
142        } else {
143            Duration::from_millis(lease)
144        };
145        let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
146        let cookie: ZSlice = self.codec.read(&mut *reader)?;
147
148        // Extensions
149        let mut ext_qos = None;
150        #[cfg(feature = "shared-memory")]
151        let mut ext_shm = None;
152        let mut ext_auth = None;
153        let mut ext_mlink = None;
154        let mut ext_lowlatency = None;
155        let mut ext_compression = None;
156
157        let mut has_ext = imsg::has_flag(self.header, flag::Z);
158        while has_ext {
159            let ext: u8 = self.codec.read(&mut *reader)?;
160            let eodec = Zenoh080Header::new(ext);
161            match iext::eid(ext) {
162                ext::QoS::ID => {
163                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
164                    ext_qos = Some(q);
165                    has_ext = ext;
166                }
167                #[cfg(feature = "shared-memory")]
168                ext::Shm::ID => {
169                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
170                    ext_shm = Some(s);
171                    has_ext = ext;
172                }
173                ext::Auth::ID => {
174                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
175                    ext_auth = Some(a);
176                    has_ext = ext;
177                }
178                ext::MultiLinkSyn::ID => {
179                    let (a, ext): (ext::MultiLinkSyn, bool) = eodec.read(&mut *reader)?;
180                    ext_mlink = Some(a);
181                    has_ext = ext;
182                }
183                ext::LowLatency::ID => {
184                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
185                    ext_lowlatency = Some(q);
186                    has_ext = ext;
187                }
188                ext::Compression::ID => {
189                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
190                    ext_compression = Some(q);
191                    has_ext = ext;
192                }
193                _ => {
194                    has_ext = extension::skip(reader, "OpenSyn", ext)?;
195                }
196            }
197        }
198
199        Ok(OpenSyn {
200            lease,
201            initial_sn,
202            cookie,
203            ext_qos,
204            #[cfg(feature = "shared-memory")]
205            ext_shm,
206            ext_auth,
207            ext_mlink,
208            ext_lowlatency,
209            ext_compression,
210        })
211    }
212}
213
214// OpenAck
215impl<W> WCodec<&OpenAck, &mut W> for Zenoh080
216where
217    W: Writer,
218{
219    type Output = Result<(), DidntWrite>;
220
221    fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output {
222        let OpenAck {
223            lease,
224            initial_sn,
225            ext_qos,
226            #[cfg(feature = "shared-memory")]
227            ext_shm,
228            ext_auth,
229            ext_mlink,
230            ext_lowlatency,
231            ext_compression,
232        } = x;
233
234        // Header
235        let mut header = id::OPEN;
236        header |= flag::A;
237        // Verify that the timeout is expressed in seconds, i.e. subsec part is 0.
238        if lease.subsec_nanos() == 0 {
239            header |= flag::T;
240        }
241        let mut n_exts = (ext_qos.is_some() as u8)
242            + (ext_auth.is_some() as u8)
243            + (ext_mlink.is_some() as u8)
244            + (ext_lowlatency.is_some() as u8)
245            + (ext_compression.is_some() as u8);
246
247        #[cfg(feature = "shared-memory")]
248        {
249            n_exts += ext_shm.is_some() as u8;
250        }
251
252        if n_exts != 0 {
253            header |= flag::Z;
254        }
255        self.write(&mut *writer, header)?;
256
257        // Body
258        if imsg::has_flag(header, flag::T) {
259            self.write(&mut *writer, lease.as_secs())?;
260        } else {
261            self.write(&mut *writer, lease.as_millis() as u64)?;
262        }
263        self.write(&mut *writer, initial_sn)?;
264
265        // Extensions
266        if let Some(qos) = ext_qos.as_ref() {
267            n_exts -= 1;
268            self.write(&mut *writer, (qos, n_exts != 0))?;
269        }
270        #[cfg(feature = "shared-memory")]
271        if let Some(shm) = ext_shm.as_ref() {
272            n_exts -= 1;
273            self.write(&mut *writer, (shm, n_exts != 0))?;
274        }
275        if let Some(auth) = ext_auth.as_ref() {
276            n_exts -= 1;
277            self.write(&mut *writer, (auth, n_exts != 0))?;
278        }
279        if let Some(mlink) = ext_mlink.as_ref() {
280            n_exts -= 1;
281            self.write(&mut *writer, (mlink, n_exts != 0))?;
282        }
283        if let Some(lowlatency) = ext_lowlatency.as_ref() {
284            n_exts -= 1;
285            self.write(&mut *writer, (lowlatency, n_exts != 0))?;
286        }
287        if let Some(compression) = ext_compression.as_ref() {
288            n_exts -= 1;
289            self.write(&mut *writer, (compression, n_exts != 0))?;
290        }
291
292        Ok(())
293    }
294}
295
296impl<R> RCodec<OpenAck, &mut R> for Zenoh080
297where
298    R: Reader,
299{
300    type Error = DidntRead;
301
302    fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
303        let header: u8 = self.read(&mut *reader)?;
304        let codec = Zenoh080Header::new(header);
305        codec.read(reader)
306    }
307}
308
309impl<R> RCodec<OpenAck, &mut R> for Zenoh080Header
310where
311    R: Reader,
312{
313    type Error = DidntRead;
314
315    fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
316        if imsg::mid(self.header) != id::OPEN || !imsg::has_flag(self.header, flag::A) {
317            return Err(DidntRead);
318        }
319
320        // Body
321        let lease: u64 = self.codec.read(&mut *reader)?;
322        let lease = if imsg::has_flag(self.header, flag::T) {
323            Duration::from_secs(lease)
324        } else {
325            Duration::from_millis(lease)
326        };
327        let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
328
329        // Extensions
330        let mut ext_qos = None;
331        #[cfg(feature = "shared-memory")]
332        let mut ext_shm = None;
333        let mut ext_auth = None;
334        let mut ext_mlink = None;
335        let mut ext_lowlatency = None;
336        let mut ext_compression = None;
337
338        let mut has_ext = imsg::has_flag(self.header, flag::Z);
339        while has_ext {
340            let ext: u8 = self.codec.read(&mut *reader)?;
341            let eodec = Zenoh080Header::new(ext);
342            match iext::eid(ext) {
343                ext::QoS::ID => {
344                    let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
345                    ext_qos = Some(q);
346                    has_ext = ext;
347                }
348                #[cfg(feature = "shared-memory")]
349                ext::Shm::ID => {
350                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
351                    ext_shm = Some(s);
352                    has_ext = ext;
353                }
354                ext::Auth::ID => {
355                    let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
356                    ext_auth = Some(a);
357                    has_ext = ext;
358                }
359                ext::MultiLinkAck::ID => {
360                    let (a, ext): (ext::MultiLinkAck, bool) = eodec.read(&mut *reader)?;
361                    ext_mlink = Some(a);
362                    has_ext = ext;
363                }
364                ext::LowLatency::ID => {
365                    let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
366                    ext_lowlatency = Some(q);
367                    has_ext = ext;
368                }
369                ext::Compression::ID => {
370                    let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
371                    ext_compression = Some(q);
372                    has_ext = ext;
373                }
374                _ => {
375                    has_ext = extension::skip(reader, "OpenAck", ext)?;
376                }
377            }
378        }
379
380        Ok(OpenAck {
381            lease,
382            initial_sn,
383            ext_qos,
384            #[cfg(feature = "shared-memory")]
385            ext_shm,
386            ext_auth,
387            ext_mlink,
388            ext_lowlatency,
389            ext_compression,
390        })
391    }
392}