1use alloc::string::String;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, HasWriter, Writer},
19 ZBuf,
20};
21use zenoh_protocol::{
22 common::{iext, imsg},
23 core::{ExprId, ExprLen, WireExpr},
24 network::{
25 declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody},
26 id, Mapping,
27 },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Condition, Zenoh080Header};
31
32impl<W> WCodec<&DeclareBody, &mut W> for Zenoh080
34where
35 W: Writer,
36{
37 type Output = Result<(), DidntWrite>;
38
39 fn write(self, writer: &mut W, x: &DeclareBody) -> Self::Output {
40 match x {
41 DeclareBody::DeclareKeyExpr(r) => self.write(&mut *writer, r)?,
42 DeclareBody::UndeclareKeyExpr(r) => self.write(&mut *writer, r)?,
43 DeclareBody::DeclareSubscriber(r) => self.write(&mut *writer, r)?,
44 DeclareBody::UndeclareSubscriber(r) => self.write(&mut *writer, r)?,
45 DeclareBody::DeclareQueryable(r) => self.write(&mut *writer, r)?,
46 DeclareBody::UndeclareQueryable(r) => self.write(&mut *writer, r)?,
47 DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
48 DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
49 DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
50 }
51
52 Ok(())
53 }
54}
55
56impl<R> RCodec<DeclareBody, &mut R> for Zenoh080
57where
58 R: Reader,
59{
60 type Error = DidntRead;
61
62 fn read(self, reader: &mut R) -> Result<DeclareBody, Self::Error> {
63 let header: u8 = self.read(&mut *reader)?;
64 let codec = Zenoh080Header::new(header);
65
66 use declare::id::*;
67 let d = match imsg::mid(codec.header) {
68 D_KEYEXPR => DeclareBody::DeclareKeyExpr(codec.read(&mut *reader)?),
69 U_KEYEXPR => DeclareBody::UndeclareKeyExpr(codec.read(&mut *reader)?),
70 D_SUBSCRIBER => DeclareBody::DeclareSubscriber(codec.read(&mut *reader)?),
71 U_SUBSCRIBER => DeclareBody::UndeclareSubscriber(codec.read(&mut *reader)?),
72 D_QUERYABLE => DeclareBody::DeclareQueryable(codec.read(&mut *reader)?),
73 U_QUERYABLE => DeclareBody::UndeclareQueryable(codec.read(&mut *reader)?),
74 D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
75 U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
76 D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
77 _ => return Err(DidntRead),
78 };
79
80 Ok(d)
81 }
82}
83
84impl<W> WCodec<&Declare, &mut W> for Zenoh080
86where
87 W: Writer,
88{
89 type Output = Result<(), DidntWrite>;
90
91 fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
92 let Declare {
93 interest_id,
94 ext_qos,
95 ext_tstamp,
96 ext_nodeid,
97 body,
98 } = x;
99
100 let mut header = id::DECLARE;
102 if x.interest_id.is_some() {
103 header |= declare::flag::I;
104 }
105 let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
106 + (ext_tstamp.is_some() as u8)
107 + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
108 if n_exts != 0 {
109 header |= declare::flag::Z;
110 }
111 self.write(&mut *writer, header)?;
112
113 if let Some(interest_id) = interest_id {
114 self.write(&mut *writer, interest_id)?;
115 }
116
117 if ext_qos != &declare::ext::QoSType::DEFAULT {
119 n_exts -= 1;
120 self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
121 }
122 if let Some(ts) = ext_tstamp.as_ref() {
123 n_exts -= 1;
124 self.write(&mut *writer, (ts, n_exts != 0))?;
125 }
126 if ext_nodeid != &declare::ext::NodeIdType::DEFAULT {
127 n_exts -= 1;
128 self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?;
129 }
130
131 self.write(&mut *writer, body)?;
133
134 Ok(())
135 }
136}
137
138impl<R> RCodec<Declare, &mut R> for Zenoh080
139where
140 R: Reader,
141{
142 type Error = DidntRead;
143
144 fn read(self, reader: &mut R) -> Result<Declare, Self::Error> {
145 let header: u8 = self.read(&mut *reader)?;
146 let codec = Zenoh080Header::new(header);
147
148 codec.read(reader)
149 }
150}
151
152impl<R> RCodec<Declare, &mut R> for Zenoh080Header
153where
154 R: Reader,
155{
156 type Error = DidntRead;
157
158 fn read(self, reader: &mut R) -> Result<Declare, Self::Error> {
159 if imsg::mid(self.header) != id::DECLARE {
160 return Err(DidntRead);
161 }
162
163 let mut interest_id = None;
164 if imsg::has_flag(self.header, declare::flag::I) {
165 interest_id = Some(self.codec.read(&mut *reader)?);
166 }
167
168 let mut ext_qos = declare::ext::QoSType::DEFAULT;
170 let mut ext_tstamp = None;
171 let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT;
172
173 let mut has_ext = imsg::has_flag(self.header, declare::flag::Z);
174 while has_ext {
175 let ext: u8 = self.codec.read(&mut *reader)?;
176 let eodec = Zenoh080Header::new(ext);
177 match iext::eid(ext) {
178 declare::ext::QoS::ID => {
179 let (q, ext): (declare::ext::QoSType, bool) = eodec.read(&mut *reader)?;
180 ext_qos = q;
181 has_ext = ext;
182 }
183 declare::ext::Timestamp::ID => {
184 let (t, ext): (declare::ext::TimestampType, bool) = eodec.read(&mut *reader)?;
185 ext_tstamp = Some(t);
186 has_ext = ext;
187 }
188 declare::ext::NodeId::ID => {
189 let (nid, ext): (declare::ext::NodeIdType, bool) = eodec.read(&mut *reader)?;
190 ext_nodeid = nid;
191 has_ext = ext;
192 }
193 _ => {
194 has_ext = extension::skip(reader, "Declare", ext)?;
195 }
196 }
197 }
198
199 let body: DeclareBody = self.codec.read(&mut *reader)?;
201
202 Ok(Declare {
203 interest_id,
204 ext_qos,
205 ext_tstamp,
206 ext_nodeid,
207 body,
208 })
209 }
210}
211
212impl<W> WCodec<&common::DeclareFinal, &mut W> for Zenoh080
214where
215 W: Writer,
216{
217 type Output = Result<(), DidntWrite>;
218
219 fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output {
220 let common::DeclareFinal = x;
221
222 let header = declare::id::D_FINAL;
224 self.write(&mut *writer, header)?;
225
226 Ok(())
227 }
228}
229
230impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080
231where
232 R: Reader,
233{
234 type Error = DidntRead;
235
236 fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
237 let header: u8 = self.read(&mut *reader)?;
238 let codec = Zenoh080Header::new(header);
239
240 codec.read(reader)
241 }
242}
243
244impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080Header
245where
246 R: Reader,
247{
248 type Error = DidntRead;
249
250 fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
251 if imsg::mid(self.header) != declare::id::D_FINAL {
252 return Err(DidntRead);
253 }
254
255 let has_ext = imsg::has_flag(self.header, token::flag::Z);
257 if has_ext {
258 extension::skip_all(reader, "Final")?;
259 }
260
261 Ok(common::DeclareFinal)
262 }
263}
264
265impl<W> WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080
267where
268 W: Writer,
269{
270 type Output = Result<(), DidntWrite>;
271
272 fn write(self, writer: &mut W, x: &keyexpr::DeclareKeyExpr) -> Self::Output {
273 let keyexpr::DeclareKeyExpr { id, wire_expr } = x;
274
275 let mut header = declare::id::D_KEYEXPR;
277 if wire_expr.has_suffix() {
278 header |= keyexpr::flag::N;
279 }
280 self.write(&mut *writer, header)?;
281
282 self.write(&mut *writer, id)?;
284 self.write(&mut *writer, wire_expr)?;
285
286 Ok(())
287 }
288}
289
290impl<R> RCodec<keyexpr::DeclareKeyExpr, &mut R> for Zenoh080
291where
292 R: Reader,
293{
294 type Error = DidntRead;
295
296 fn read(self, reader: &mut R) -> Result<keyexpr::DeclareKeyExpr, Self::Error> {
297 let header: u8 = self.read(&mut *reader)?;
298 let codec = Zenoh080Header::new(header);
299
300 codec.read(reader)
301 }
302}
303
304impl<R> RCodec<keyexpr::DeclareKeyExpr, &mut R> for Zenoh080Header
305where
306 R: Reader,
307{
308 type Error = DidntRead;
309
310 fn read(self, reader: &mut R) -> Result<keyexpr::DeclareKeyExpr, Self::Error> {
311 if imsg::mid(self.header) != declare::id::D_KEYEXPR {
312 return Err(DidntRead);
313 }
314
315 let id: ExprId = self.codec.read(&mut *reader)?;
316 let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, keyexpr::flag::N));
317 let wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
318
319 let has_ext = imsg::has_flag(self.header, keyexpr::flag::Z);
321 if has_ext {
322 extension::skip_all(reader, "DeclareKeyExpr")?;
323 }
324
325 Ok(keyexpr::DeclareKeyExpr { id, wire_expr })
326 }
327}
328
329impl<W> WCodec<&keyexpr::UndeclareKeyExpr, &mut W> for Zenoh080
331where
332 W: Writer,
333{
334 type Output = Result<(), DidntWrite>;
335
336 fn write(self, writer: &mut W, x: &keyexpr::UndeclareKeyExpr) -> Self::Output {
337 let keyexpr::UndeclareKeyExpr { id } = x;
338
339 let header = declare::id::U_KEYEXPR;
341 self.write(&mut *writer, header)?;
342
343 self.write(&mut *writer, id)?;
345
346 Ok(())
347 }
348}
349
350impl<R> RCodec<keyexpr::UndeclareKeyExpr, &mut R> for Zenoh080
351where
352 R: Reader,
353{
354 type Error = DidntRead;
355
356 fn read(self, reader: &mut R) -> Result<keyexpr::UndeclareKeyExpr, Self::Error> {
357 let header: u8 = self.read(&mut *reader)?;
358 let codec = Zenoh080Header::new(header);
359
360 codec.read(reader)
361 }
362}
363
364impl<R> RCodec<keyexpr::UndeclareKeyExpr, &mut R> for Zenoh080Header
365where
366 R: Reader,
367{
368 type Error = DidntRead;
369
370 fn read(self, reader: &mut R) -> Result<keyexpr::UndeclareKeyExpr, Self::Error> {
371 if imsg::mid(self.header) != declare::id::U_KEYEXPR {
372 return Err(DidntRead);
373 }
374
375 let id: ExprId = self.codec.read(&mut *reader)?;
376
377 let has_ext = imsg::has_flag(self.header, keyexpr::flag::Z);
379 if has_ext {
380 extension::skip_all(reader, "UndeclareKeyExpr")?;
381 }
382
383 Ok(keyexpr::UndeclareKeyExpr { id })
384 }
385}
386
387impl<W> WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080
389where
390 W: Writer,
391{
392 type Output = Result<(), DidntWrite>;
393
394 fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output {
395 let subscriber::DeclareSubscriber { id, wire_expr } = x;
396
397 let mut header = declare::id::D_SUBSCRIBER;
399 if wire_expr.mapping != Mapping::DEFAULT {
400 header |= subscriber::flag::M;
401 }
402 if wire_expr.has_suffix() {
403 header |= subscriber::flag::N;
404 }
405 self.write(&mut *writer, header)?;
406
407 self.write(&mut *writer, id)?;
409 self.write(&mut *writer, wire_expr)?;
410
411 Ok(())
414 }
415}
416
417impl<R> RCodec<subscriber::DeclareSubscriber, &mut R> for Zenoh080
418where
419 R: Reader,
420{
421 type Error = DidntRead;
422
423 fn read(self, reader: &mut R) -> Result<subscriber::DeclareSubscriber, Self::Error> {
424 let header: u8 = self.read(&mut *reader)?;
425 let codec = Zenoh080Header::new(header);
426
427 codec.read(reader)
428 }
429}
430
431impl<R> RCodec<subscriber::DeclareSubscriber, &mut R> for Zenoh080Header
432where
433 R: Reader,
434{
435 type Error = DidntRead;
436
437 fn read(self, reader: &mut R) -> Result<subscriber::DeclareSubscriber, Self::Error> {
438 if imsg::mid(self.header) != declare::id::D_SUBSCRIBER {
439 return Err(DidntRead);
440 }
441
442 let id: subscriber::SubscriberId = self.codec.read(&mut *reader)?;
444 let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, subscriber::flag::N));
445 let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
446 wire_expr.mapping = if imsg::has_flag(self.header, subscriber::flag::M) {
447 Mapping::Sender
448 } else {
449 Mapping::Receiver
450 };
451
452 let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
454 while has_ext {
455 let ext: u8 = self.codec.read(&mut *reader)?;
456 has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
457 }
458
459 Ok(subscriber::DeclareSubscriber { id, wire_expr })
460 }
461}
462
463impl<W> WCodec<&subscriber::UndeclareSubscriber, &mut W> for Zenoh080
465where
466 W: Writer,
467{
468 type Output = Result<(), DidntWrite>;
469
470 fn write(self, writer: &mut W, x: &subscriber::UndeclareSubscriber) -> Self::Output {
471 let subscriber::UndeclareSubscriber { id, ext_wire_expr } = x;
472
473 let mut header = declare::id::U_SUBSCRIBER;
475 if !ext_wire_expr.is_null() {
476 header |= subscriber::flag::Z;
477 }
478 self.write(&mut *writer, header)?;
479
480 self.write(&mut *writer, id)?;
482
483 if !ext_wire_expr.is_null() {
485 self.write(&mut *writer, (ext_wire_expr, false))?;
486 }
487
488 Ok(())
489 }
490}
491
492impl<R> RCodec<subscriber::UndeclareSubscriber, &mut R> for Zenoh080
493where
494 R: Reader,
495{
496 type Error = DidntRead;
497
498 fn read(self, reader: &mut R) -> Result<subscriber::UndeclareSubscriber, Self::Error> {
499 let header: u8 = self.read(&mut *reader)?;
500 let codec = Zenoh080Header::new(header);
501
502 codec.read(reader)
503 }
504}
505
506impl<R> RCodec<subscriber::UndeclareSubscriber, &mut R> for Zenoh080Header
507where
508 R: Reader,
509{
510 type Error = DidntRead;
511
512 fn read(self, reader: &mut R) -> Result<subscriber::UndeclareSubscriber, Self::Error> {
513 if imsg::mid(self.header) != declare::id::U_SUBSCRIBER {
514 return Err(DidntRead);
515 }
516
517 let id: subscriber::SubscriberId = self.codec.read(&mut *reader)?;
519
520 let mut ext_wire_expr = common::ext::WireExprType::null();
522
523 let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
524 while has_ext {
525 let ext: u8 = self.codec.read(&mut *reader)?;
526 let eodec = Zenoh080Header::new(ext);
527 match iext::eid(ext) {
528 common::ext::WireExprExt::ID => {
529 let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
530 ext_wire_expr = we;
531 has_ext = ext;
532 }
533 _ => {
534 has_ext = extension::skip(reader, "UndeclareSubscriber", ext)?;
535 }
536 }
537 }
538
539 Ok(subscriber::UndeclareSubscriber { id, ext_wire_expr })
540 }
541}
542
543impl<W> WCodec<(&queryable::ext::QueryableInfoType, bool), &mut W> for Zenoh080
545where
546 W: Writer,
547{
548 type Output = Result<(), DidntWrite>;
549 fn write(self, writer: &mut W, x: (&queryable::ext::QueryableInfoType, bool)) -> Self::Output {
550 let (x, more) = x;
551
552 let mut flags: u8 = 0;
553 if x.complete {
554 flags |= queryable::ext::flag::C;
555 }
556 let v: u64 = (flags as u64) | ((x.distance as u64) << 8);
557 let ext = queryable::ext::QueryableInfo::new(v);
558
559 self.write(&mut *writer, (&ext, more))
560 }
561}
562
563impl<R> RCodec<(queryable::ext::QueryableInfoType, bool), &mut R> for Zenoh080Header
564where
565 R: Reader,
566{
567 type Error = DidntRead;
568
569 fn read(
570 self,
571 reader: &mut R,
572 ) -> Result<(queryable::ext::QueryableInfoType, bool), Self::Error> {
573 let (ext, more): (queryable::ext::QueryableInfo, bool) = self.read(&mut *reader)?;
574
575 let complete = imsg::has_flag(ext.value as u8, queryable::ext::flag::C);
576 let distance = (ext.value >> 8) as u16;
577
578 Ok((
579 queryable::ext::QueryableInfoType { complete, distance },
580 more,
581 ))
582 }
583}
584
585impl<W> WCodec<&queryable::DeclareQueryable, &mut W> for Zenoh080
587where
588 W: Writer,
589{
590 type Output = Result<(), DidntWrite>;
591
592 fn write(self, writer: &mut W, x: &queryable::DeclareQueryable) -> Self::Output {
593 let queryable::DeclareQueryable {
594 id,
595 wire_expr,
596 ext_info,
597 } = x;
598
599 let mut header = declare::id::D_QUERYABLE;
601 let mut n_exts = (ext_info != &queryable::ext::QueryableInfoType::DEFAULT) as u8;
602 if n_exts != 0 {
603 header |= subscriber::flag::Z;
604 }
605 if wire_expr.mapping != Mapping::DEFAULT {
606 header |= subscriber::flag::M;
607 }
608 if wire_expr.has_suffix() {
609 header |= subscriber::flag::N;
610 }
611 self.write(&mut *writer, header)?;
612
613 self.write(&mut *writer, id)?;
615 self.write(&mut *writer, wire_expr)?;
616 if ext_info != &queryable::ext::QueryableInfoType::DEFAULT {
617 n_exts -= 1;
618 self.write(&mut *writer, (ext_info, n_exts != 0))?;
619 }
620
621 Ok(())
622 }
623}
624
625impl<R> RCodec<queryable::DeclareQueryable, &mut R> for Zenoh080
626where
627 R: Reader,
628{
629 type Error = DidntRead;
630
631 fn read(self, reader: &mut R) -> Result<queryable::DeclareQueryable, Self::Error> {
632 let header: u8 = self.read(&mut *reader)?;
633 let codec = Zenoh080Header::new(header);
634
635 codec.read(reader)
636 }
637}
638
639impl<R> RCodec<queryable::DeclareQueryable, &mut R> for Zenoh080Header
640where
641 R: Reader,
642{
643 type Error = DidntRead;
644
645 fn read(self, reader: &mut R) -> Result<queryable::DeclareQueryable, Self::Error> {
646 if imsg::mid(self.header) != declare::id::D_QUERYABLE {
647 return Err(DidntRead);
648 }
649
650 let id: queryable::QueryableId = self.codec.read(&mut *reader)?;
652 let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, queryable::flag::N));
653 let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
654 wire_expr.mapping = if imsg::has_flag(self.header, queryable::flag::M) {
655 Mapping::Sender
656 } else {
657 Mapping::Receiver
658 };
659
660 let mut ext_info = queryable::ext::QueryableInfoType::DEFAULT;
662
663 let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z);
664 while has_ext {
665 let ext: u8 = self.codec.read(&mut *reader)?;
666 let eodec = Zenoh080Header::new(ext);
667 match iext::eid(ext) {
668 queryable::ext::QueryableInfo::ID => {
669 let (i, ext): (queryable::ext::QueryableInfoType, bool) =
670 eodec.read(&mut *reader)?;
671 ext_info = i;
672 has_ext = ext;
673 }
674 _ => {
675 has_ext = extension::skip(reader, "DeclareQueryable", ext)?;
676 }
677 }
678 }
679
680 Ok(queryable::DeclareQueryable {
681 id,
682 wire_expr,
683 ext_info,
684 })
685 }
686}
687
688impl<W> WCodec<&queryable::UndeclareQueryable, &mut W> for Zenoh080
690where
691 W: Writer,
692{
693 type Output = Result<(), DidntWrite>;
694
695 fn write(self, writer: &mut W, x: &queryable::UndeclareQueryable) -> Self::Output {
696 let queryable::UndeclareQueryable { id, ext_wire_expr } = x;
697
698 let header = declare::id::U_QUERYABLE | queryable::flag::Z;
700 self.write(&mut *writer, header)?;
701
702 self.write(&mut *writer, id)?;
704
705 self.write(&mut *writer, (ext_wire_expr, false))?;
707
708 Ok(())
709 }
710}
711
712impl<R> RCodec<queryable::UndeclareQueryable, &mut R> for Zenoh080
713where
714 R: Reader,
715{
716 type Error = DidntRead;
717
718 fn read(self, reader: &mut R) -> Result<queryable::UndeclareQueryable, Self::Error> {
719 let header: u8 = self.read(&mut *reader)?;
720 let codec = Zenoh080Header::new(header);
721
722 codec.read(reader)
723 }
724}
725
726impl<R> RCodec<queryable::UndeclareQueryable, &mut R> for Zenoh080Header
727where
728 R: Reader,
729{
730 type Error = DidntRead;
731
732 fn read(self, reader: &mut R) -> Result<queryable::UndeclareQueryable, Self::Error> {
733 if imsg::mid(self.header) != declare::id::U_QUERYABLE {
734 return Err(DidntRead);
735 }
736
737 let id: queryable::QueryableId = self.codec.read(&mut *reader)?;
739
740 let mut ext_wire_expr = common::ext::WireExprType::null();
742
743 let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z);
744 while has_ext {
745 let ext: u8 = self.codec.read(&mut *reader)?;
746 let eodec = Zenoh080Header::new(ext);
747 match iext::eid(ext) {
748 common::ext::WireExprExt::ID => {
749 let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
750 ext_wire_expr = we;
751 has_ext = ext;
752 }
753 _ => {
754 has_ext = extension::skip(reader, "UndeclareQueryable", ext)?;
755 }
756 }
757 }
758
759 Ok(queryable::UndeclareQueryable { id, ext_wire_expr })
760 }
761}
762
763impl<W> WCodec<&token::DeclareToken, &mut W> for Zenoh080
765where
766 W: Writer,
767{
768 type Output = Result<(), DidntWrite>;
769
770 fn write(self, writer: &mut W, x: &token::DeclareToken) -> Self::Output {
771 let token::DeclareToken { id, wire_expr } = x;
772
773 let mut header = declare::id::D_TOKEN;
775 if wire_expr.mapping != Mapping::DEFAULT {
776 header |= subscriber::flag::M;
777 }
778 if wire_expr.has_suffix() {
779 header |= subscriber::flag::N;
780 }
781 self.write(&mut *writer, header)?;
782
783 self.write(&mut *writer, id)?;
785 self.write(&mut *writer, wire_expr)?;
786
787 Ok(())
788 }
789}
790
791impl<R> RCodec<token::DeclareToken, &mut R> for Zenoh080
792where
793 R: Reader,
794{
795 type Error = DidntRead;
796
797 fn read(self, reader: &mut R) -> Result<token::DeclareToken, Self::Error> {
798 let header: u8 = self.read(&mut *reader)?;
799 let codec = Zenoh080Header::new(header);
800 codec.read(reader)
801 }
802}
803
804impl<R> RCodec<token::DeclareToken, &mut R> for Zenoh080Header
805where
806 R: Reader,
807{
808 type Error = DidntRead;
809
810 fn read(self, reader: &mut R) -> Result<token::DeclareToken, Self::Error> {
811 if imsg::mid(self.header) != declare::id::D_TOKEN {
812 return Err(DidntRead);
813 }
814
815 let id: token::TokenId = self.codec.read(&mut *reader)?;
817 let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, token::flag::N));
818 let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
819 wire_expr.mapping = if imsg::has_flag(self.header, token::flag::M) {
820 Mapping::Sender
821 } else {
822 Mapping::Receiver
823 };
824
825 let has_ext = imsg::has_flag(self.header, token::flag::Z);
827 if has_ext {
828 extension::skip_all(reader, "DeclareToken")?;
829 }
830
831 Ok(token::DeclareToken { id, wire_expr })
832 }
833}
834
835impl<W> WCodec<&token::UndeclareToken, &mut W> for Zenoh080
837where
838 W: Writer,
839{
840 type Output = Result<(), DidntWrite>;
841
842 fn write(self, writer: &mut W, x: &token::UndeclareToken) -> Self::Output {
843 let token::UndeclareToken { id, ext_wire_expr } = x;
844
845 let header = declare::id::U_TOKEN | token::flag::Z;
847 self.write(&mut *writer, header)?;
848
849 self.write(&mut *writer, id)?;
851
852 self.write(&mut *writer, (ext_wire_expr, false))?;
854
855 Ok(())
856 }
857}
858
859impl<R> RCodec<token::UndeclareToken, &mut R> for Zenoh080
860where
861 R: Reader,
862{
863 type Error = DidntRead;
864
865 fn read(self, reader: &mut R) -> Result<token::UndeclareToken, Self::Error> {
866 let header: u8 = self.read(&mut *reader)?;
867 let codec = Zenoh080Header::new(header);
868
869 codec.read(reader)
870 }
871}
872
873impl<R> RCodec<token::UndeclareToken, &mut R> for Zenoh080Header
874where
875 R: Reader,
876{
877 type Error = DidntRead;
878
879 fn read(self, reader: &mut R) -> Result<token::UndeclareToken, Self::Error> {
880 if imsg::mid(self.header) != declare::id::U_TOKEN {
881 return Err(DidntRead);
882 }
883
884 let id: token::TokenId = self.codec.read(&mut *reader)?;
886
887 let mut ext_wire_expr = common::ext::WireExprType::null();
889
890 let mut has_ext = imsg::has_flag(self.header, token::flag::Z);
891 while has_ext {
892 let ext: u8 = self.codec.read(&mut *reader)?;
893 let eodec = Zenoh080Header::new(ext);
894 match iext::eid(ext) {
895 common::ext::WireExprExt::ID => {
896 let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
897 ext_wire_expr = we;
898 has_ext = ext;
899 }
900 _ => {
901 has_ext = extension::skip(reader, "UndeclareToken", ext)?;
902 }
903 }
904 }
905
906 Ok(token::UndeclareToken { id, ext_wire_expr })
907 }
908}
909
910impl<W> WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080
912where
913 W: Writer,
914{
915 type Output = Result<(), DidntWrite>;
916
917 fn write(self, writer: &mut W, x: (&common::ext::WireExprType, bool)) -> Self::Output {
918 let (x, more) = x;
919 let common::ext::WireExprType { wire_expr } = x;
920
921 let codec = Zenoh080::new();
922 let mut value = ZBuf::empty();
923 let mut zriter = value.writer();
924
925 let mut flags: u8 = 0;
926 if x.wire_expr.has_suffix() {
927 flags |= 1;
928 }
929 if let Mapping::Sender = wire_expr.mapping {
930 flags |= 1 << 1;
931 }
932 codec.write(&mut zriter, flags)?;
933
934 codec.write(&mut zriter, wire_expr.scope)?;
935 if wire_expr.has_suffix() {
936 zriter.write_exact(wire_expr.suffix.as_bytes())?;
937 }
938
939 let ext = common::ext::WireExprExt { value };
940 codec.write(&mut *writer, (&ext, more))?;
941
942 Ok(())
943 }
944}
945
946impl<R> RCodec<(common::ext::WireExprType, bool), &mut R> for Zenoh080Header
947where
948 R: Reader,
949{
950 type Error = DidntRead;
951
952 fn read(self, reader: &mut R) -> Result<(common::ext::WireExprType, bool), Self::Error> {
953 use zenoh_buffers::reader::HasReader;
954
955 let (ext, more): (common::ext::WireExprExt, bool) = self.read(&mut *reader)?;
956
957 let mut zeader = ext.value.reader();
958 let flags: u8 = self.codec.read(&mut zeader)?;
959
960 let scope: ExprLen = self.codec.read(&mut zeader)?;
961 let suffix = if imsg::has_flag(flags, 1) {
962 let mut buff = zenoh_buffers::vec::uninit(zeader.remaining());
963 zeader.read_exact(&mut buff)?;
964 String::from_utf8(buff).map_err(|_| DidntRead)?
965 } else {
966 String::new()
967 };
968 let mapping = if imsg::has_flag(flags, 1 << 1) {
969 Mapping::Sender
970 } else {
971 Mapping::Receiver
972 };
973
974 Ok((
975 common::ext::WireExprType {
976 wire_expr: WireExpr {
977 scope,
978 suffix: suffix.into(),
979 mapping,
980 },
981 },
982 more,
983 ))
984 }
985}