zenoh_codec/transport/
join.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::boxed::Box;
15use core::time::Duration;
16
17use zenoh_buffers::{
18    reader::{DidntRead, Reader},
19    writer::{DidntWrite, Writer},
20};
21use zenoh_protocol::{
22    common::{iext, imsg, ZExtZBufHeader},
23    core::{Priority, Resolution, WhatAmI, ZenohIdProto},
24    transport::{
25        batch_size, id,
26        join::{ext, flag, Join},
27        BatchSize, PrioritySn, TransportSn,
28    },
29};
30
31use crate::{common::extension, LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
32
33impl LCodec<&PrioritySn> for Zenoh080 {
34    fn w_len(self, p: &PrioritySn) -> usize {
35        let PrioritySn {
36            reliable,
37            best_effort,
38        } = p;
39        self.w_len(*reliable) + self.w_len(*best_effort)
40    }
41}
42
43impl<W> WCodec<&PrioritySn, &mut W> for Zenoh080
44where
45    W: Writer,
46{
47    type Output = Result<(), DidntWrite>;
48
49    fn write(self, writer: &mut W, x: &PrioritySn) -> Self::Output {
50        let PrioritySn {
51            reliable,
52            best_effort,
53        } = x;
54
55        self.write(&mut *writer, reliable)?;
56        self.write(&mut *writer, best_effort)?;
57        Ok(())
58    }
59}
60
61impl<R> RCodec<PrioritySn, &mut R> for Zenoh080
62where
63    R: Reader,
64{
65    type Error = DidntRead;
66
67    fn read(self, reader: &mut R) -> Result<PrioritySn, Self::Error> {
68        let reliable: TransportSn = self.read(&mut *reader)?;
69        let best_effort: TransportSn = self.read(&mut *reader)?;
70
71        Ok(PrioritySn {
72            reliable,
73            best_effort,
74        })
75    }
76}
77
78// Extension
79impl<W> WCodec<(&ext::QoSType, bool), &mut W> for Zenoh080
80where
81    W: Writer,
82{
83    type Output = Result<(), DidntWrite>;
84
85    fn write(self, writer: &mut W, x: (&ext::QoSType, bool)) -> Self::Output {
86        let (x, more) = x;
87
88        // Header
89        let len = x.iter().fold(0, |acc, p| acc + self.w_len(p));
90        let header = ZExtZBufHeader::<{ ext::QoS::ID }>::new(len);
91        self.write(&mut *writer, (&header, more))?;
92
93        // Body
94        for p in x.iter() {
95            self.write(&mut *writer, p)?;
96        }
97
98        Ok(())
99    }
100}
101
102impl<R> RCodec<(ext::QoSType, bool), &mut R> for Zenoh080
103where
104    R: Reader,
105{
106    type Error = DidntRead;
107
108    fn read(self, reader: &mut R) -> Result<(ext::QoSType, bool), Self::Error> {
109        let header: u8 = self.read(&mut *reader)?;
110        let codec = Zenoh080Header::new(header);
111        codec.read(reader)
112    }
113}
114
115impl<R> RCodec<(ext::QoSType, bool), &mut R> for Zenoh080Header
116where
117    R: Reader,
118{
119    type Error = DidntRead;
120
121    fn read(self, reader: &mut R) -> Result<(ext::QoSType, bool), Self::Error> {
122        // Header
123        let (_, more): (ZExtZBufHeader<{ ext::QoS::ID }>, bool) = self.read(&mut *reader)?;
124
125        // Body
126        let mut ext_qos = Box::new([PrioritySn::DEFAULT; Priority::NUM]);
127        for p in ext_qos.iter_mut() {
128            *p = self.codec.read(&mut *reader)?;
129        }
130
131        Ok((ext_qos, more))
132    }
133}
134
135// Join
136impl<W> WCodec<&Join, &mut W> for Zenoh080
137where
138    W: Writer,
139{
140    type Output = Result<(), DidntWrite>;
141
142    fn write(self, writer: &mut W, x: &Join) -> Self::Output {
143        let Join {
144            version,
145            whatami,
146            zid,
147            resolution,
148            batch_size,
149            lease,
150            next_sn,
151            ext_qos,
152            ext_shm,
153            ext_patch,
154        } = x;
155
156        // Header
157        let mut header = id::JOIN;
158        if lease.as_millis() % 1_000 == 0 {
159            header |= flag::T;
160        }
161        if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST {
162            header |= flag::S;
163        }
164        let mut n_exts = (ext_qos.is_some() as u8)
165            + (ext_shm.is_some() as u8)
166            + (*ext_patch != ext::PatchType::NONE) as u8;
167        if n_exts != 0 {
168            header |= flag::Z;
169        }
170        self.write(&mut *writer, header)?;
171
172        // Body
173        self.write(&mut *writer, version)?;
174
175        let whatami: u8 = match whatami {
176            WhatAmI::Router => 0b00,
177            WhatAmI::Peer => 0b01,
178            WhatAmI::Client => 0b10,
179        };
180        let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
181        self.write(&mut *writer, flags)?;
182
183        let lodec = Zenoh080Length::new(zid.size());
184        lodec.write(&mut *writer, zid)?;
185
186        if imsg::has_flag(header, flag::S) {
187            self.write(&mut *writer, resolution.as_u8())?;
188            self.write(&mut *writer, batch_size.to_le_bytes())?;
189        }
190
191        if imsg::has_flag(header, flag::T) {
192            self.write(&mut *writer, lease.as_secs())?;
193        } else {
194            self.write(&mut *writer, lease.as_millis() as u64)?;
195        }
196        self.write(&mut *writer, next_sn)?;
197
198        // Extensions
199        if let Some(qos) = ext_qos.as_ref() {
200            n_exts -= 1;
201            self.write(&mut *writer, (qos, n_exts != 0))?;
202        }
203        if let Some(shm) = ext_shm.as_ref() {
204            n_exts -= 1;
205            self.write(&mut *writer, (shm, n_exts != 0))?;
206        }
207        if *ext_patch != ext::PatchType::NONE {
208            n_exts -= 1;
209            self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
210        }
211
212        Ok(())
213    }
214}
215
216impl<R> RCodec<Join, &mut R> for Zenoh080
217where
218    R: Reader,
219{
220    type Error = DidntRead;
221
222    fn read(self, reader: &mut R) -> Result<Join, Self::Error> {
223        let header: u8 = self.read(&mut *reader)?;
224        let codec = Zenoh080Header::new(header);
225        codec.read(reader)
226    }
227}
228
229impl<R> RCodec<Join, &mut R> for Zenoh080Header
230where
231    R: Reader,
232{
233    type Error = DidntRead;
234
235    fn read(self, reader: &mut R) -> Result<Join, Self::Error> {
236        if imsg::mid(self.header) != id::JOIN {
237            return Err(DidntRead);
238        }
239
240        // Body
241        let version: u8 = self.codec.read(&mut *reader)?;
242
243        let flags: u8 = self.codec.read(&mut *reader)?;
244        let whatami = match flags & 0b11 {
245            0b00 => WhatAmI::Router,
246            0b01 => WhatAmI::Peer,
247            0b10 => WhatAmI::Client,
248            _ => return Err(DidntRead),
249        };
250        let length = 1 + ((flags >> 4) as usize);
251        let lodec = Zenoh080Length::new(length);
252        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
253
254        let mut resolution = Resolution::default();
255        let mut batch_size = batch_size::MULTICAST.to_le_bytes();
256        if imsg::has_flag(self.header, flag::S) {
257            let flags: u8 = self.codec.read(&mut *reader)?;
258            resolution = Resolution::from(flags & 0b00111111);
259            batch_size = self.codec.read(&mut *reader)?;
260        }
261        let batch_size = BatchSize::from_le_bytes(batch_size);
262
263        let lease: u64 = self.codec.read(&mut *reader)?;
264        let lease = if imsg::has_flag(self.header, flag::T) {
265            Duration::from_secs(lease)
266        } else {
267            Duration::from_millis(lease)
268        };
269        let next_sn: PrioritySn = self.codec.read(&mut *reader)?;
270
271        // Extensions
272        let mut ext_qos = None;
273        let mut ext_shm = None;
274        let mut ext_patch = ext::PatchType::NONE;
275
276        let mut has_ext = imsg::has_flag(self.header, flag::Z);
277        while has_ext {
278            let ext: u8 = self.codec.read(&mut *reader)?;
279            let eodec = Zenoh080Header::new(ext);
280            match iext::eid(ext) {
281                ext::QoS::ID => {
282                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
283                    ext_qos = Some(q);
284                    has_ext = ext;
285                }
286                ext::Shm::ID => {
287                    let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
288                    ext_shm = Some(s);
289                    has_ext = ext;
290                }
291                ext::Patch::ID => {
292                    let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
293                    ext_patch = p;
294                    has_ext = ext;
295                }
296                _ => {
297                    has_ext = extension::skip(reader, "Join", ext)?;
298                }
299            }
300        }
301
302        Ok(Join {
303            version,
304            whatami,
305            zid,
306            resolution,
307            batch_size,
308            lease,
309            next_sn,
310            ext_qos,
311            ext_shm,
312            ext_patch,
313        })
314    }
315}