1use zenoh_buffers::{
15 reader::{DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17 ZSlice,
18};
19use zenoh_protocol::{
20 common::{iext, imsg},
21 core::{Resolution, WhatAmI, ZenohIdProto},
22 transport::{
23 batch_size, id,
24 init::{ext, flag, InitAck, InitSyn},
25 BatchSize,
26 },
27};
28
29use crate::{
30 common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header, Zenoh080Length,
31};
32
33impl<W> WCodec<&InitSyn, &mut W> for Zenoh080
35where
36 W: Writer,
37{
38 type Output = Result<(), DidntWrite>;
39
40 fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output {
41 let InitSyn {
42 version,
43 whatami,
44 zid,
45 resolution,
46 batch_size,
47 ext_qos,
48 ext_qos_link,
49 #[cfg(feature = "shared-memory")]
50 ext_shm,
51 ext_auth,
52 ext_mlink,
53 ext_lowlatency,
54 ext_compression,
55 ext_patch,
56 } = x;
57
58 let mut header = id::INIT;
60 if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
61 header |= flag::S;
62 }
63 let mut n_exts = (ext_qos.is_some() as u8)
64 + (ext_qos_link.is_some() as u8)
65 + (ext_auth.is_some() as u8)
66 + (ext_mlink.is_some() as u8)
67 + (ext_lowlatency.is_some() as u8)
68 + (ext_compression.is_some() as u8)
69 + (*ext_patch != ext::PatchType::NONE) as u8;
70
71 #[cfg(feature = "shared-memory")]
72 {
73 n_exts += ext_shm.is_some() as u8;
74 }
75
76 if n_exts != 0 {
77 header |= flag::Z;
78 }
79 self.write(&mut *writer, header)?;
80
81 self.write(&mut *writer, version)?;
83
84 let whatami: u8 = match whatami {
85 WhatAmI::Router => 0b00,
86 WhatAmI::Peer => 0b01,
87 WhatAmI::Client => 0b10,
88 };
89 let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
90 self.write(&mut *writer, flags)?;
91
92 let lodec = Zenoh080Length::new(zid.size());
93 lodec.write(&mut *writer, zid)?;
94
95 if imsg::has_flag(header, flag::S) {
96 self.write(&mut *writer, resolution.as_u8())?;
97 self.write(&mut *writer, batch_size.to_le_bytes())?;
98 }
99
100 if let Some(qos) = ext_qos.as_ref() {
102 n_exts -= 1;
103 self.write(&mut *writer, (qos, n_exts != 0))?;
104 }
105 if let Some(qos_link) = ext_qos_link.as_ref() {
106 n_exts -= 1;
107 self.write(&mut *writer, (qos_link, n_exts != 0))?;
108 }
109 #[cfg(feature = "shared-memory")]
110 if let Some(shm) = ext_shm.as_ref() {
111 n_exts -= 1;
112 self.write(&mut *writer, (shm, n_exts != 0))?;
113 }
114 if let Some(auth) = ext_auth.as_ref() {
115 n_exts -= 1;
116 self.write(&mut *writer, (auth, n_exts != 0))?;
117 }
118 if let Some(mlink) = ext_mlink.as_ref() {
119 n_exts -= 1;
120 self.write(&mut *writer, (mlink, n_exts != 0))?;
121 }
122 if let Some(lowlatency) = ext_lowlatency.as_ref() {
123 n_exts -= 1;
124 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
125 }
126 if let Some(compression) = ext_compression.as_ref() {
127 n_exts -= 1;
128 self.write(&mut *writer, (compression, n_exts != 0))?;
129 }
130 if *ext_patch != ext::PatchType::NONE {
131 n_exts -= 1;
132 self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
133 }
134
135 Ok(())
136 }
137}
138
139impl<R> RCodec<InitSyn, &mut R> for Zenoh080
140where
141 R: Reader,
142{
143 type Error = DidntRead;
144
145 fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
146 let header: u8 = self.read(&mut *reader)?;
147 let codec = Zenoh080Header::new(header);
148 codec.read(reader)
149 }
150}
151
152impl<R> RCodec<InitSyn, &mut R> for Zenoh080Header
153where
154 R: Reader,
155{
156 type Error = DidntRead;
157
158 fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
159 if imsg::mid(self.header) != id::INIT || imsg::has_flag(self.header, flag::A) {
160 return Err(DidntRead);
161 }
162
163 let version: u8 = self.codec.read(&mut *reader)?;
165
166 let flags: u8 = self.codec.read(&mut *reader)?;
167 let whatami = match flags & 0b11 {
168 0b00 => WhatAmI::Router,
169 0b01 => WhatAmI::Peer,
170 0b10 => WhatAmI::Client,
171 _ => return Err(DidntRead),
172 };
173 let length = 1 + ((flags >> 4) as usize);
174 let lodec = Zenoh080Length::new(length);
175 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
176
177 let mut resolution = Resolution::default();
178 let mut batch_size = batch_size::UNICAST.to_le_bytes();
179 if imsg::has_flag(self.header, flag::S) {
180 let flags: u8 = self.codec.read(&mut *reader)?;
181 resolution = Resolution::from(flags & 0b00111111);
182 batch_size = self.codec.read(&mut *reader)?;
183 }
184 let batch_size = BatchSize::from_le_bytes(batch_size);
185
186 let mut ext_qos = None;
188 let mut ext_qos_link = None;
189 #[cfg(feature = "shared-memory")]
190 let mut ext_shm = None;
191 let mut ext_auth = None;
192 let mut ext_mlink = None;
193 let mut ext_lowlatency = None;
194 let mut ext_compression = None;
195 let mut ext_patch = ext::PatchType::NONE;
196
197 let mut has_ext = imsg::has_flag(self.header, flag::Z);
198 while has_ext {
199 let ext: u8 = self.codec.read(&mut *reader)?;
200 let eodec = Zenoh080Header::new(ext);
201 match iext::eid(ext) {
202 ext::QoS::ID => {
203 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
204 ext_qos = Some(q);
205 has_ext = ext;
206 }
207 ext::QoSLink::ID => {
208 let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
209 ext_qos_link = Some(q);
210 has_ext = ext;
211 }
212 #[cfg(feature = "shared-memory")]
213 ext::Shm::ID => {
214 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
215 ext_shm = Some(s);
216 has_ext = ext;
217 }
218 ext::Auth::ID => {
219 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
220 ext_auth = Some(a);
221 has_ext = ext;
222 }
223 ext::MultiLink::ID => {
224 let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
225 ext_mlink = Some(a);
226 has_ext = ext;
227 }
228 ext::LowLatency::ID => {
229 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
230 ext_lowlatency = Some(q);
231 has_ext = ext;
232 }
233 ext::Compression::ID => {
234 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
235 ext_compression = Some(q);
236 has_ext = ext;
237 }
238 ext::Patch::ID => {
239 let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
240 ext_patch = p;
241 has_ext = ext;
242 }
243 _ => {
244 has_ext = extension::skip(reader, "InitSyn", ext)?;
245 }
246 }
247 }
248
249 Ok(InitSyn {
250 version,
251 whatami,
252 zid,
253 resolution,
254 batch_size,
255 ext_qos,
256 ext_qos_link,
257 #[cfg(feature = "shared-memory")]
258 ext_shm,
259 ext_auth,
260 ext_mlink,
261 ext_lowlatency,
262 ext_compression,
263 ext_patch,
264 })
265 }
266}
267
268impl<W> WCodec<&InitAck, &mut W> for Zenoh080
270where
271 W: Writer,
272{
273 type Output = Result<(), DidntWrite>;
274
275 fn write(self, writer: &mut W, x: &InitAck) -> Self::Output {
276 let InitAck {
277 version,
278 whatami,
279 zid,
280 resolution,
281 batch_size,
282 cookie,
283 ext_qos,
284 ext_qos_link,
285 #[cfg(feature = "shared-memory")]
286 ext_shm,
287 ext_auth,
288 ext_mlink,
289 ext_lowlatency,
290 ext_compression,
291 ext_patch,
292 } = x;
293
294 let mut header = id::INIT | flag::A;
296 if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
297 header |= flag::S;
298 }
299 let mut n_exts = (ext_qos.is_some() as u8)
300 + (ext_qos_link.is_some() as u8)
301 + (ext_auth.is_some() as u8)
302 + (ext_mlink.is_some() as u8)
303 + (ext_lowlatency.is_some() as u8)
304 + (ext_compression.is_some() as u8)
305 + (*ext_patch != ext::PatchType::NONE) as u8;
306
307 #[cfg(feature = "shared-memory")]
308 {
309 n_exts += ext_shm.is_some() as u8;
310 }
311
312 if n_exts != 0 {
313 header |= flag::Z;
314 }
315 self.write(&mut *writer, header)?;
316
317 self.write(&mut *writer, version)?;
319
320 let whatami: u8 = match whatami {
321 WhatAmI::Router => 0b00,
322 WhatAmI::Peer => 0b01,
323 WhatAmI::Client => 0b10,
324 };
325 let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
326 self.write(&mut *writer, flags)?;
327
328 let lodec = Zenoh080Length::new(zid.size());
329 lodec.write(&mut *writer, zid)?;
330
331 if imsg::has_flag(header, flag::S) {
332 self.write(&mut *writer, resolution.as_u8())?;
333 self.write(&mut *writer, batch_size.to_le_bytes())?;
334 }
335
336 let zodec = Zenoh080Bounded::<BatchSize>::new();
337 zodec.write(&mut *writer, cookie)?;
338
339 if let Some(qos) = ext_qos.as_ref() {
341 n_exts -= 1;
342 self.write(&mut *writer, (qos, n_exts != 0))?;
343 }
344 if let Some(qos_link) = ext_qos_link.as_ref() {
345 n_exts -= 1;
346 self.write(&mut *writer, (qos_link, n_exts != 0))?;
347 }
348 #[cfg(feature = "shared-memory")]
349 if let Some(shm) = ext_shm.as_ref() {
350 n_exts -= 1;
351 self.write(&mut *writer, (shm, n_exts != 0))?;
352 }
353 if let Some(auth) = ext_auth.as_ref() {
354 n_exts -= 1;
355 self.write(&mut *writer, (auth, n_exts != 0))?;
356 }
357 if let Some(mlink) = ext_mlink.as_ref() {
358 n_exts -= 1;
359 self.write(&mut *writer, (mlink, n_exts != 0))?;
360 }
361 if let Some(lowlatency) = ext_lowlatency.as_ref() {
362 n_exts -= 1;
363 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
364 }
365 if let Some(compression) = ext_compression.as_ref() {
366 n_exts -= 1;
367 self.write(&mut *writer, (compression, n_exts != 0))?;
368 }
369 if *ext_patch != ext::PatchType::NONE {
370 n_exts -= 1;
371 self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
372 }
373
374 Ok(())
375 }
376}
377
378impl<R> RCodec<InitAck, &mut R> for Zenoh080
379where
380 R: Reader,
381{
382 type Error = DidntRead;
383
384 fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
385 let header: u8 = self.read(&mut *reader)?;
386 let codec = Zenoh080Header::new(header);
387 codec.read(reader)
388 }
389}
390
391impl<R> RCodec<InitAck, &mut R> for Zenoh080Header
392where
393 R: Reader,
394{
395 type Error = DidntRead;
396
397 fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
398 if imsg::mid(self.header) != id::INIT || !imsg::has_flag(self.header, flag::A) {
399 return Err(DidntRead);
400 }
401
402 let version: u8 = self.codec.read(&mut *reader)?;
404
405 let flags: u8 = self.codec.read(&mut *reader)?;
406 let whatami = match flags & 0b11 {
407 0b00 => WhatAmI::Router,
408 0b01 => WhatAmI::Peer,
409 0b10 => WhatAmI::Client,
410 _ => return Err(DidntRead),
411 };
412 let length = 1 + ((flags >> 4) as usize);
413 let lodec = Zenoh080Length::new(length);
414 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
415
416 let mut resolution = Resolution::default();
417 let mut batch_size = batch_size::UNICAST.to_le_bytes();
418 if imsg::has_flag(self.header, flag::S) {
419 let flags: u8 = self.codec.read(&mut *reader)?;
420 resolution = Resolution::from(flags & 0b00111111);
421 batch_size = self.codec.read(&mut *reader)?;
422 }
423 let batch_size = BatchSize::from_le_bytes(batch_size);
424
425 let zodec = Zenoh080Bounded::<BatchSize>::new();
426 let cookie: ZSlice = zodec.read(&mut *reader)?;
427
428 let mut ext_qos = None;
430 let mut ext_qos_link = None;
431 #[cfg(feature = "shared-memory")]
432 let mut ext_shm = None;
433 let mut ext_auth = None;
434 let mut ext_mlink = None;
435 let mut ext_lowlatency = None;
436 let mut ext_compression = None;
437 let mut ext_patch = ext::PatchType::NONE;
438
439 let mut has_ext = imsg::has_flag(self.header, flag::Z);
440 while has_ext {
441 let ext: u8 = self.codec.read(&mut *reader)?;
442 let eodec = Zenoh080Header::new(ext);
443 match iext::eid(ext) {
444 ext::QoS::ID => {
445 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
446 ext_qos = Some(q);
447 has_ext = ext;
448 }
449 ext::QoSLink::ID => {
450 let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
451 ext_qos_link = Some(q);
452 has_ext = ext;
453 }
454 #[cfg(feature = "shared-memory")]
455 ext::Shm::ID => {
456 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
457 ext_shm = Some(s);
458 has_ext = ext;
459 }
460 ext::Auth::ID => {
461 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
462 ext_auth = Some(a);
463 has_ext = ext;
464 }
465 ext::MultiLink::ID => {
466 let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
467 ext_mlink = Some(a);
468 has_ext = ext;
469 }
470 ext::LowLatency::ID => {
471 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
472 ext_lowlatency = Some(q);
473 has_ext = ext;
474 }
475 ext::Compression::ID => {
476 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
477 ext_compression = Some(q);
478 has_ext = ext;
479 }
480 ext::Patch::ID => {
481 let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
482 ext_patch = p;
483 has_ext = ext;
484 }
485 _ => {
486 has_ext = extension::skip(reader, "InitAck", ext)?;
487 }
488 }
489 }
490
491 Ok(InitAck {
492 version,
493 whatami,
494 zid,
495 resolution,
496 batch_size,
497 cookie,
498 ext_qos,
499 ext_qos_link,
500 #[cfg(feature = "shared-memory")]
501 ext_shm,
502 ext_auth,
503 ext_mlink,
504 ext_lowlatency,
505 ext_compression,
506 ext_patch,
507 })
508 }
509}