1use alloc::boxed::Box;
15use core::time::Duration;
16
17use zenoh_buffers::{
18 reader::{DidntRead, Reader},
19 writer::{DidntWrite, Writer},
20};
21use zenoh_protocol::{
22 common::{iext, imsg, ZExtZBufHeader},
23 core::{Priority, Resolution, WhatAmI, ZenohIdProto},
24 transport::{
25 batch_size, id,
26 join::{ext, flag, Join},
27 BatchSize, PrioritySn, TransportSn,
28 },
29};
30
31use crate::{common::extension, LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
32
33impl LCodec<&PrioritySn> for Zenoh080 {
34 fn w_len(self, p: &PrioritySn) -> usize {
35 let PrioritySn {
36 reliable,
37 best_effort,
38 } = p;
39 self.w_len(*reliable) + self.w_len(*best_effort)
40 }
41}
42
43impl<W> WCodec<&PrioritySn, &mut W> for Zenoh080
44where
45 W: Writer,
46{
47 type Output = Result<(), DidntWrite>;
48
49 fn write(self, writer: &mut W, x: &PrioritySn) -> Self::Output {
50 let PrioritySn {
51 reliable,
52 best_effort,
53 } = x;
54
55 self.write(&mut *writer, reliable)?;
56 self.write(&mut *writer, best_effort)?;
57 Ok(())
58 }
59}
60
61impl<R> RCodec<PrioritySn, &mut R> for Zenoh080
62where
63 R: Reader,
64{
65 type Error = DidntRead;
66
67 fn read(self, reader: &mut R) -> Result<PrioritySn, Self::Error> {
68 let reliable: TransportSn = self.read(&mut *reader)?;
69 let best_effort: TransportSn = self.read(&mut *reader)?;
70
71 Ok(PrioritySn {
72 reliable,
73 best_effort,
74 })
75 }
76}
77
78impl<W> WCodec<(&ext::QoSType, bool), &mut W> for Zenoh080
80where
81 W: Writer,
82{
83 type Output = Result<(), DidntWrite>;
84
85 fn write(self, writer: &mut W, x: (&ext::QoSType, bool)) -> Self::Output {
86 let (x, more) = x;
87
88 let len = x.iter().fold(0, |acc, p| acc + self.w_len(p));
90 let header = ZExtZBufHeader::<{ ext::QoS::ID }>::new(len);
91 self.write(&mut *writer, (&header, more))?;
92
93 for p in x.iter() {
95 self.write(&mut *writer, p)?;
96 }
97
98 Ok(())
99 }
100}
101
102impl<R> RCodec<(ext::QoSType, bool), &mut R> for Zenoh080
103where
104 R: Reader,
105{
106 type Error = DidntRead;
107
108 fn read(self, reader: &mut R) -> Result<(ext::QoSType, bool), Self::Error> {
109 let header: u8 = self.read(&mut *reader)?;
110 let codec = Zenoh080Header::new(header);
111 codec.read(reader)
112 }
113}
114
115impl<R> RCodec<(ext::QoSType, bool), &mut R> for Zenoh080Header
116where
117 R: Reader,
118{
119 type Error = DidntRead;
120
121 fn read(self, reader: &mut R) -> Result<(ext::QoSType, bool), Self::Error> {
122 let (_, more): (ZExtZBufHeader<{ ext::QoS::ID }>, bool) = self.read(&mut *reader)?;
124
125 let mut ext_qos = Box::new([PrioritySn::DEFAULT; Priority::NUM]);
127 for p in ext_qos.iter_mut() {
128 *p = self.codec.read(&mut *reader)?;
129 }
130
131 Ok((ext_qos, more))
132 }
133}
134
135impl<W> WCodec<&Join, &mut W> for Zenoh080
137where
138 W: Writer,
139{
140 type Output = Result<(), DidntWrite>;
141
142 fn write(self, writer: &mut W, x: &Join) -> Self::Output {
143 let Join {
144 version,
145 whatami,
146 zid,
147 resolution,
148 batch_size,
149 lease,
150 next_sn,
151 ext_qos,
152 ext_shm,
153 ext_patch,
154 } = x;
155
156 let mut header = id::JOIN;
158 if lease.as_millis() % 1_000 == 0 {
159 header |= flag::T;
160 }
161 if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST {
162 header |= flag::S;
163 }
164 let mut n_exts = (ext_qos.is_some() as u8)
165 + (ext_shm.is_some() as u8)
166 + (*ext_patch != ext::PatchType::NONE) as u8;
167 if n_exts != 0 {
168 header |= flag::Z;
169 }
170 self.write(&mut *writer, header)?;
171
172 self.write(&mut *writer, version)?;
174
175 let whatami: u8 = match whatami {
176 WhatAmI::Router => 0b00,
177 WhatAmI::Peer => 0b01,
178 WhatAmI::Client => 0b10,
179 };
180 let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
181 self.write(&mut *writer, flags)?;
182
183 let lodec = Zenoh080Length::new(zid.size());
184 lodec.write(&mut *writer, zid)?;
185
186 if imsg::has_flag(header, flag::S) {
187 self.write(&mut *writer, resolution.as_u8())?;
188 self.write(&mut *writer, batch_size.to_le_bytes())?;
189 }
190
191 if imsg::has_flag(header, flag::T) {
192 self.write(&mut *writer, lease.as_secs())?;
193 } else {
194 self.write(&mut *writer, lease.as_millis() as u64)?;
195 }
196 self.write(&mut *writer, next_sn)?;
197
198 if let Some(qos) = ext_qos.as_ref() {
200 n_exts -= 1;
201 self.write(&mut *writer, (qos, n_exts != 0))?;
202 }
203 if let Some(shm) = ext_shm.as_ref() {
204 n_exts -= 1;
205 self.write(&mut *writer, (shm, n_exts != 0))?;
206 }
207 if *ext_patch != ext::PatchType::NONE {
208 n_exts -= 1;
209 self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
210 }
211
212 Ok(())
213 }
214}
215
216impl<R> RCodec<Join, &mut R> for Zenoh080
217where
218 R: Reader,
219{
220 type Error = DidntRead;
221
222 fn read(self, reader: &mut R) -> Result<Join, Self::Error> {
223 let header: u8 = self.read(&mut *reader)?;
224 let codec = Zenoh080Header::new(header);
225 codec.read(reader)
226 }
227}
228
229impl<R> RCodec<Join, &mut R> for Zenoh080Header
230where
231 R: Reader,
232{
233 type Error = DidntRead;
234
235 fn read(self, reader: &mut R) -> Result<Join, Self::Error> {
236 if imsg::mid(self.header) != id::JOIN {
237 return Err(DidntRead);
238 }
239
240 let version: u8 = self.codec.read(&mut *reader)?;
242
243 let flags: u8 = self.codec.read(&mut *reader)?;
244 let whatami = match flags & 0b11 {
245 0b00 => WhatAmI::Router,
246 0b01 => WhatAmI::Peer,
247 0b10 => WhatAmI::Client,
248 _ => return Err(DidntRead),
249 };
250 let length = 1 + ((flags >> 4) as usize);
251 let lodec = Zenoh080Length::new(length);
252 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
253
254 let mut resolution = Resolution::default();
255 let mut batch_size = batch_size::MULTICAST.to_le_bytes();
256 if imsg::has_flag(self.header, flag::S) {
257 let flags: u8 = self.codec.read(&mut *reader)?;
258 resolution = Resolution::from(flags & 0b00111111);
259 batch_size = self.codec.read(&mut *reader)?;
260 }
261 let batch_size = BatchSize::from_le_bytes(batch_size);
262
263 let lease: u64 = self.codec.read(&mut *reader)?;
264 let lease = if imsg::has_flag(self.header, flag::T) {
265 Duration::from_secs(lease)
266 } else {
267 Duration::from_millis(lease)
268 };
269 let next_sn: PrioritySn = self.codec.read(&mut *reader)?;
270
271 let mut ext_qos = None;
273 let mut ext_shm = None;
274 let mut ext_patch = ext::PatchType::NONE;
275
276 let mut has_ext = imsg::has_flag(self.header, flag::Z);
277 while has_ext {
278 let ext: u8 = self.codec.read(&mut *reader)?;
279 let eodec = Zenoh080Header::new(ext);
280 match iext::eid(ext) {
281 ext::QoS::ID => {
282 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
283 ext_qos = Some(q);
284 has_ext = ext;
285 }
286 ext::Shm::ID => {
287 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
288 ext_shm = Some(s);
289 has_ext = ext;
290 }
291 ext::Patch::ID => {
292 let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
293 ext_patch = p;
294 has_ext = ext;
295 }
296 _ => {
297 has_ext = extension::skip(reader, "Join", ext)?;
298 }
299 }
300 }
301
302 Ok(Join {
303 version,
304 whatami,
305 zid,
306 resolution,
307 batch_size,
308 lease,
309 next_sn,
310 ext_qos,
311 ext_shm,
312 ext_patch,
313 })
314 }
315}