1use zenoh_buffers::{
15 reader::{DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17 ZSlice,
18};
19use zenoh_protocol::{
20 common::{iext, imsg},
21 core::{Resolution, WhatAmI, ZenohIdProto},
22 transport::{
23 batch_size, id,
24 init::{ext, flag, InitAck, InitSyn},
25 BatchSize,
26 },
27};
28
29use crate::{
30 common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header, Zenoh080Length,
31};
32
33impl<W> WCodec<&InitSyn, &mut W> for Zenoh080
35where
36 W: Writer,
37{
38 type Output = Result<(), DidntWrite>;
39
40 fn write(self, writer: &mut W, x: &InitSyn) -> Self::Output {
41 let InitSyn {
42 version,
43 whatami,
44 zid,
45 resolution,
46 batch_size,
47 ext_qos,
48 ext_qos_link,
49 #[cfg(feature = "shared-memory")]
50 ext_shm,
51 ext_auth,
52 ext_mlink,
53 ext_lowlatency,
54 ext_compression,
55 ext_patch,
56 ext_region_name,
57 } = x;
58
59 let mut header = id::INIT;
61 if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
62 header |= flag::S;
63 }
64 let mut n_exts = (ext_qos.is_some() as u8)
65 + (ext_qos_link.is_some() as u8)
66 + (ext_auth.is_some() as u8)
67 + (ext_mlink.is_some() as u8)
68 + (ext_lowlatency.is_some() as u8)
69 + (ext_compression.is_some() as u8)
70 + (*ext_patch != ext::PatchType::NONE) as u8
71 + (ext_region_name.is_some() as u8);
72
73 #[cfg(feature = "shared-memory")]
74 {
75 n_exts += ext_shm.is_some() as u8;
76 }
77
78 if n_exts != 0 {
79 header |= flag::Z;
80 }
81 self.write(&mut *writer, header)?;
82
83 self.write(&mut *writer, version)?;
85
86 let whatami: u8 = match whatami {
87 WhatAmI::Router => 0b00,
88 WhatAmI::Peer => 0b01,
89 WhatAmI::Client => 0b10,
90 };
91 let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
92 self.write(&mut *writer, flags)?;
93
94 let lodec = Zenoh080Length::new(zid.size());
95 lodec.write(&mut *writer, zid)?;
96
97 if imsg::has_flag(header, flag::S) {
98 self.write(&mut *writer, resolution.as_u8())?;
99 self.write(&mut *writer, batch_size.to_le_bytes())?;
100 }
101
102 if let Some(qos) = ext_qos.as_ref() {
104 n_exts -= 1;
105 self.write(&mut *writer, (qos, n_exts != 0))?;
106 }
107 if let Some(qos_link) = ext_qos_link.as_ref() {
108 n_exts -= 1;
109 self.write(&mut *writer, (qos_link, n_exts != 0))?;
110 }
111 #[cfg(feature = "shared-memory")]
112 if let Some(shm) = ext_shm.as_ref() {
113 n_exts -= 1;
114 self.write(&mut *writer, (shm, n_exts != 0))?;
115 }
116 if let Some(auth) = ext_auth.as_ref() {
117 n_exts -= 1;
118 self.write(&mut *writer, (auth, n_exts != 0))?;
119 }
120 if let Some(mlink) = ext_mlink.as_ref() {
121 n_exts -= 1;
122 self.write(&mut *writer, (mlink, n_exts != 0))?;
123 }
124 if let Some(lowlatency) = ext_lowlatency.as_ref() {
125 n_exts -= 1;
126 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
127 }
128 if let Some(compression) = ext_compression.as_ref() {
129 n_exts -= 1;
130 self.write(&mut *writer, (compression, n_exts != 0))?;
131 }
132 if *ext_patch != ext::PatchType::NONE {
133 n_exts -= 1;
134 self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
135 }
136 if let Some(region_name) = ext_region_name.as_ref() {
137 n_exts -= 1;
138 self.write(&mut *writer, (region_name, n_exts != 0))?;
139 }
140
141 Ok(())
142 }
143}
144
145impl<R> RCodec<InitSyn, &mut R> for Zenoh080
146where
147 R: Reader,
148{
149 type Error = DidntRead;
150
151 fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
152 let header: u8 = self.read(&mut *reader)?;
153 let codec = Zenoh080Header::new(header);
154 codec.read(reader)
155 }
156}
157
158impl<R> RCodec<InitSyn, &mut R> for Zenoh080Header
159where
160 R: Reader,
161{
162 type Error = DidntRead;
163
164 fn read(self, reader: &mut R) -> Result<InitSyn, Self::Error> {
165 if imsg::mid(self.header) != id::INIT || imsg::has_flag(self.header, flag::A) {
166 return Err(DidntRead);
167 }
168
169 let version: u8 = self.codec.read(&mut *reader)?;
171
172 let flags: u8 = self.codec.read(&mut *reader)?;
173 let whatami = match flags & 0b11 {
174 0b00 => WhatAmI::Router,
175 0b01 => WhatAmI::Peer,
176 0b10 => WhatAmI::Client,
177 _ => return Err(DidntRead),
178 };
179 let length = 1 + ((flags >> 4) as usize);
180 let lodec = Zenoh080Length::new(length);
181 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
182
183 let mut resolution = Resolution::default();
184 let mut batch_size = batch_size::UNICAST.to_le_bytes();
185 if imsg::has_flag(self.header, flag::S) {
186 let flags: u8 = self.codec.read(&mut *reader)?;
187 resolution = Resolution::from(flags & 0b00111111);
188 batch_size = self.codec.read(&mut *reader)?;
189 }
190 let batch_size = BatchSize::from_le_bytes(batch_size);
191
192 let mut ext_qos = None;
194 let mut ext_qos_link = None;
195 #[cfg(feature = "shared-memory")]
196 let mut ext_shm = None;
197 let mut ext_auth = None;
198 let mut ext_mlink = None;
199 let mut ext_lowlatency = None;
200 let mut ext_compression = None;
201 let mut ext_patch = ext::PatchType::NONE;
202 let mut ext_northtag = None;
203
204 let mut has_ext = imsg::has_flag(self.header, flag::Z);
205 while has_ext {
206 let ext: u8 = self.codec.read(&mut *reader)?;
207 let eodec = Zenoh080Header::new(ext);
208 match iext::eid(ext) {
209 ext::QoS::ID => {
210 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
211 ext_qos = Some(q);
212 has_ext = ext;
213 }
214 ext::QoSLink::ID => {
215 let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
216 ext_qos_link = Some(q);
217 has_ext = ext;
218 }
219 #[cfg(feature = "shared-memory")]
220 ext::Shm::ID => {
221 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
222 ext_shm = Some(s);
223 has_ext = ext;
224 }
225 ext::Auth::ID => {
226 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
227 ext_auth = Some(a);
228 has_ext = ext;
229 }
230 ext::MultiLink::ID => {
231 let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
232 ext_mlink = Some(a);
233 has_ext = ext;
234 }
235 ext::LowLatency::ID => {
236 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
237 ext_lowlatency = Some(q);
238 has_ext = ext;
239 }
240 ext::Compression::ID => {
241 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
242 ext_compression = Some(q);
243 has_ext = ext;
244 }
245 ext::Patch::ID => {
246 let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
247 ext_patch = p;
248 has_ext = ext;
249 }
250 ext::RegionName::ID => {
251 let (p, ext): (ext::RegionName, bool) = eodec.read(&mut *reader)?;
252 ext_northtag = Some(p);
253 has_ext = ext;
254 }
255 _ => {
256 has_ext = extension::skip(reader, "InitSyn", ext)?;
257 }
258 }
259 }
260
261 Ok(InitSyn {
262 version,
263 whatami,
264 zid,
265 resolution,
266 batch_size,
267 ext_qos,
268 ext_qos_link,
269 #[cfg(feature = "shared-memory")]
270 ext_shm,
271 ext_auth,
272 ext_mlink,
273 ext_lowlatency,
274 ext_compression,
275 ext_patch,
276 ext_region_name: ext_northtag,
277 })
278 }
279}
280
281impl<W> WCodec<&InitAck, &mut W> for Zenoh080
283where
284 W: Writer,
285{
286 type Output = Result<(), DidntWrite>;
287
288 fn write(self, writer: &mut W, x: &InitAck) -> Self::Output {
289 let InitAck {
290 version,
291 whatami,
292 zid,
293 resolution,
294 batch_size,
295 cookie,
296 ext_qos,
297 ext_qos_link,
298 #[cfg(feature = "shared-memory")]
299 ext_shm,
300 ext_auth,
301 ext_mlink,
302 ext_lowlatency,
303 ext_compression,
304 ext_patch,
305 ext_region_name,
306 } = x;
307
308 let mut header = id::INIT | flag::A;
310 if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST {
311 header |= flag::S;
312 }
313 let mut n_exts = (ext_qos.is_some() as u8)
314 + (ext_qos_link.is_some() as u8)
315 + (ext_auth.is_some() as u8)
316 + (ext_mlink.is_some() as u8)
317 + (ext_lowlatency.is_some() as u8)
318 + (ext_compression.is_some() as u8)
319 + (*ext_patch != ext::PatchType::NONE) as u8
320 + (ext_region_name.is_some() as u8);
321
322 #[cfg(feature = "shared-memory")]
323 {
324 n_exts += ext_shm.is_some() as u8;
325 }
326
327 if n_exts != 0 {
328 header |= flag::Z;
329 }
330 self.write(&mut *writer, header)?;
331
332 self.write(&mut *writer, version)?;
334
335 let whatami: u8 = match whatami {
336 WhatAmI::Router => 0b00,
337 WhatAmI::Peer => 0b01,
338 WhatAmI::Client => 0b10,
339 };
340 let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami;
341 self.write(&mut *writer, flags)?;
342
343 let lodec = Zenoh080Length::new(zid.size());
344 lodec.write(&mut *writer, zid)?;
345
346 if imsg::has_flag(header, flag::S) {
347 self.write(&mut *writer, resolution.as_u8())?;
348 self.write(&mut *writer, batch_size.to_le_bytes())?;
349 }
350
351 let zodec = Zenoh080Bounded::<BatchSize>::new();
352 zodec.write(&mut *writer, cookie)?;
353
354 if let Some(qos) = ext_qos.as_ref() {
356 n_exts -= 1;
357 self.write(&mut *writer, (qos, n_exts != 0))?;
358 }
359 if let Some(qos_link) = ext_qos_link.as_ref() {
360 n_exts -= 1;
361 self.write(&mut *writer, (qos_link, n_exts != 0))?;
362 }
363 #[cfg(feature = "shared-memory")]
364 if let Some(shm) = ext_shm.as_ref() {
365 n_exts -= 1;
366 self.write(&mut *writer, (shm, n_exts != 0))?;
367 }
368 if let Some(auth) = ext_auth.as_ref() {
369 n_exts -= 1;
370 self.write(&mut *writer, (auth, n_exts != 0))?;
371 }
372 if let Some(mlink) = ext_mlink.as_ref() {
373 n_exts -= 1;
374 self.write(&mut *writer, (mlink, n_exts != 0))?;
375 }
376 if let Some(lowlatency) = ext_lowlatency.as_ref() {
377 n_exts -= 1;
378 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
379 }
380 if let Some(compression) = ext_compression.as_ref() {
381 n_exts -= 1;
382 self.write(&mut *writer, (compression, n_exts != 0))?;
383 }
384 if *ext_patch != ext::PatchType::NONE {
385 n_exts -= 1;
386 self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
387 }
388 if let Some(region_name) = ext_region_name.as_ref() {
389 n_exts -= 1;
390 self.write(&mut *writer, (region_name, n_exts != 0))?;
391 }
392
393 Ok(())
394 }
395}
396
397impl<R> RCodec<InitAck, &mut R> for Zenoh080
398where
399 R: Reader,
400{
401 type Error = DidntRead;
402
403 fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
404 let header: u8 = self.read(&mut *reader)?;
405 let codec = Zenoh080Header::new(header);
406 codec.read(reader)
407 }
408}
409
410impl<R> RCodec<InitAck, &mut R> for Zenoh080Header
411where
412 R: Reader,
413{
414 type Error = DidntRead;
415
416 fn read(self, reader: &mut R) -> Result<InitAck, Self::Error> {
417 if imsg::mid(self.header) != id::INIT || !imsg::has_flag(self.header, flag::A) {
418 return Err(DidntRead);
419 }
420
421 let version: u8 = self.codec.read(&mut *reader)?;
423
424 let flags: u8 = self.codec.read(&mut *reader)?;
425 let whatami = match flags & 0b11 {
426 0b00 => WhatAmI::Router,
427 0b01 => WhatAmI::Peer,
428 0b10 => WhatAmI::Client,
429 _ => return Err(DidntRead),
430 };
431 let length = 1 + ((flags >> 4) as usize);
432 let lodec = Zenoh080Length::new(length);
433 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
434
435 let mut resolution = Resolution::default();
436 let mut batch_size = batch_size::UNICAST.to_le_bytes();
437 if imsg::has_flag(self.header, flag::S) {
438 let flags: u8 = self.codec.read(&mut *reader)?;
439 resolution = Resolution::from(flags & 0b00111111);
440 batch_size = self.codec.read(&mut *reader)?;
441 }
442 let batch_size = BatchSize::from_le_bytes(batch_size);
443
444 let zodec = Zenoh080Bounded::<BatchSize>::new();
445 let cookie: ZSlice = zodec.read(&mut *reader)?;
446
447 let mut ext_qos = None;
449 let mut ext_qos_link = None;
450 #[cfg(feature = "shared-memory")]
451 let mut ext_shm = None;
452 let mut ext_auth = None;
453 let mut ext_mlink = None;
454 let mut ext_lowlatency = None;
455 let mut ext_compression = None;
456 let mut ext_patch = ext::PatchType::NONE;
457 let mut ext_region_name = None;
458
459 let mut has_ext = imsg::has_flag(self.header, flag::Z);
460 while has_ext {
461 let ext: u8 = self.codec.read(&mut *reader)?;
462 let eodec = Zenoh080Header::new(ext);
463 match iext::eid(ext) {
464 ext::QoS::ID => {
465 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
466 ext_qos = Some(q);
467 has_ext = ext;
468 }
469 ext::QoSLink::ID => {
470 let (q, ext): (ext::QoSLink, bool) = eodec.read(&mut *reader)?;
471 ext_qos_link = Some(q);
472 has_ext = ext;
473 }
474 #[cfg(feature = "shared-memory")]
475 ext::Shm::ID => {
476 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
477 ext_shm = Some(s);
478 has_ext = ext;
479 }
480 ext::Auth::ID => {
481 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
482 ext_auth = Some(a);
483 has_ext = ext;
484 }
485 ext::MultiLink::ID => {
486 let (a, ext): (ext::MultiLink, bool) = eodec.read(&mut *reader)?;
487 ext_mlink = Some(a);
488 has_ext = ext;
489 }
490 ext::LowLatency::ID => {
491 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
492 ext_lowlatency = Some(q);
493 has_ext = ext;
494 }
495 ext::Compression::ID => {
496 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
497 ext_compression = Some(q);
498 has_ext = ext;
499 }
500 ext::Patch::ID => {
501 let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
502 ext_patch = p;
503 has_ext = ext;
504 }
505 ext::RegionName::ID => {
506 let (q, ext): (ext::RegionName, bool) = eodec.read(&mut *reader)?;
507 ext_region_name = Some(q);
508 has_ext = ext;
509 }
510 _ => {
511 has_ext = extension::skip(reader, "InitAck", ext)?;
512 }
513 }
514 }
515
516 Ok(InitAck {
517 version,
518 whatami,
519 zid,
520 resolution,
521 batch_size,
522 cookie,
523 ext_qos,
524 ext_qos_link,
525 #[cfg(feature = "shared-memory")]
526 ext_shm,
527 ext_auth,
528 ext_mlink,
529 ext_lowlatency,
530 ext_compression,
531 ext_patch,
532 ext_region_name,
533 })
534 }
535}