zenoh_codec/network/
declare.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::string::String;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, HasWriter, Writer},
19    ZBuf,
20};
21use zenoh_protocol::{
22    common::{iext, imsg},
23    core::{ExprId, ExprLen, WireExpr},
24    network::{
25        declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody},
26        id, Mapping,
27    },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Condition, Zenoh080Header};
31
32// Declaration
33impl<W> WCodec<&DeclareBody, &mut W> for Zenoh080
34where
35    W: Writer,
36{
37    type Output = Result<(), DidntWrite>;
38
39    fn write(self, writer: &mut W, x: &DeclareBody) -> Self::Output {
40        match x {
41            DeclareBody::DeclareKeyExpr(r) => self.write(&mut *writer, r)?,
42            DeclareBody::UndeclareKeyExpr(r) => self.write(&mut *writer, r)?,
43            DeclareBody::DeclareSubscriber(r) => self.write(&mut *writer, r)?,
44            DeclareBody::UndeclareSubscriber(r) => self.write(&mut *writer, r)?,
45            DeclareBody::DeclareQueryable(r) => self.write(&mut *writer, r)?,
46            DeclareBody::UndeclareQueryable(r) => self.write(&mut *writer, r)?,
47            DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
48            DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
49            DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
50        }
51
52        Ok(())
53    }
54}
55
56impl<R> RCodec<DeclareBody, &mut R> for Zenoh080
57where
58    R: Reader,
59{
60    type Error = DidntRead;
61
62    fn read(self, reader: &mut R) -> Result<DeclareBody, Self::Error> {
63        let header: u8 = self.read(&mut *reader)?;
64        let codec = Zenoh080Header::new(header);
65
66        use declare::id::*;
67        let d = match imsg::mid(codec.header) {
68            D_KEYEXPR => DeclareBody::DeclareKeyExpr(codec.read(&mut *reader)?),
69            U_KEYEXPR => DeclareBody::UndeclareKeyExpr(codec.read(&mut *reader)?),
70            D_SUBSCRIBER => DeclareBody::DeclareSubscriber(codec.read(&mut *reader)?),
71            U_SUBSCRIBER => DeclareBody::UndeclareSubscriber(codec.read(&mut *reader)?),
72            D_QUERYABLE => DeclareBody::DeclareQueryable(codec.read(&mut *reader)?),
73            U_QUERYABLE => DeclareBody::UndeclareQueryable(codec.read(&mut *reader)?),
74            D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
75            U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
76            D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
77            _ => return Err(DidntRead),
78        };
79
80        Ok(d)
81    }
82}
83
84// Declare
85impl<W> WCodec<&Declare, &mut W> for Zenoh080
86where
87    W: Writer,
88{
89    type Output = Result<(), DidntWrite>;
90
91    fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
92        let Declare {
93            interest_id,
94            ext_qos,
95            ext_tstamp,
96            ext_nodeid,
97            body,
98        } = x;
99
100        // Header
101        let mut header = id::DECLARE;
102        if x.interest_id.is_some() {
103            header |= declare::flag::I;
104        }
105        let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
106            + (ext_tstamp.is_some() as u8)
107            + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
108        if n_exts != 0 {
109            header |= declare::flag::Z;
110        }
111        self.write(&mut *writer, header)?;
112
113        if let Some(interest_id) = interest_id {
114            self.write(&mut *writer, interest_id)?;
115        }
116
117        // Extensions
118        if ext_qos != &declare::ext::QoSType::DEFAULT {
119            n_exts -= 1;
120            self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
121        }
122        if let Some(ts) = ext_tstamp.as_ref() {
123            n_exts -= 1;
124            self.write(&mut *writer, (ts, n_exts != 0))?;
125        }
126        if ext_nodeid != &declare::ext::NodeIdType::DEFAULT {
127            n_exts -= 1;
128            self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?;
129        }
130
131        // Body
132        self.write(&mut *writer, body)?;
133
134        Ok(())
135    }
136}
137
138impl<R> RCodec<Declare, &mut R> for Zenoh080
139where
140    R: Reader,
141{
142    type Error = DidntRead;
143
144    fn read(self, reader: &mut R) -> Result<Declare, Self::Error> {
145        let header: u8 = self.read(&mut *reader)?;
146        let codec = Zenoh080Header::new(header);
147
148        codec.read(reader)
149    }
150}
151
152impl<R> RCodec<Declare, &mut R> for Zenoh080Header
153where
154    R: Reader,
155{
156    type Error = DidntRead;
157
158    fn read(self, reader: &mut R) -> Result<Declare, Self::Error> {
159        if imsg::mid(self.header) != id::DECLARE {
160            return Err(DidntRead);
161        }
162
163        let mut interest_id = None;
164        if imsg::has_flag(self.header, declare::flag::I) {
165            interest_id = Some(self.codec.read(&mut *reader)?);
166        }
167
168        // Extensions
169        let mut ext_qos = declare::ext::QoSType::DEFAULT;
170        let mut ext_tstamp = None;
171        let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT;
172
173        let mut has_ext = imsg::has_flag(self.header, declare::flag::Z);
174        while has_ext {
175            let ext: u8 = self.codec.read(&mut *reader)?;
176            let eodec = Zenoh080Header::new(ext);
177            match iext::eid(ext) {
178                declare::ext::QoS::ID => {
179                    let (q, ext): (declare::ext::QoSType, bool) = eodec.read(&mut *reader)?;
180                    ext_qos = q;
181                    has_ext = ext;
182                }
183                declare::ext::Timestamp::ID => {
184                    let (t, ext): (declare::ext::TimestampType, bool) = eodec.read(&mut *reader)?;
185                    ext_tstamp = Some(t);
186                    has_ext = ext;
187                }
188                declare::ext::NodeId::ID => {
189                    let (nid, ext): (declare::ext::NodeIdType, bool) = eodec.read(&mut *reader)?;
190                    ext_nodeid = nid;
191                    has_ext = ext;
192                }
193                _ => {
194                    has_ext = extension::skip(reader, "Declare", ext)?;
195                }
196            }
197        }
198
199        // Body
200        let body: DeclareBody = self.codec.read(&mut *reader)?;
201
202        Ok(Declare {
203            interest_id,
204            ext_qos,
205            ext_tstamp,
206            ext_nodeid,
207            body,
208        })
209    }
210}
211
212// Final
213impl<W> WCodec<&common::DeclareFinal, &mut W> for Zenoh080
214where
215    W: Writer,
216{
217    type Output = Result<(), DidntWrite>;
218
219    fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output {
220        let common::DeclareFinal = x;
221
222        // Header
223        let header = declare::id::D_FINAL;
224        self.write(&mut *writer, header)?;
225
226        Ok(())
227    }
228}
229
230impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080
231where
232    R: Reader,
233{
234    type Error = DidntRead;
235
236    fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
237        let header: u8 = self.read(&mut *reader)?;
238        let codec = Zenoh080Header::new(header);
239
240        codec.read(reader)
241    }
242}
243
244impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080Header
245where
246    R: Reader,
247{
248    type Error = DidntRead;
249
250    fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
251        if imsg::mid(self.header) != declare::id::D_FINAL {
252            return Err(DidntRead);
253        }
254
255        // Extensions
256        let has_ext = imsg::has_flag(self.header, token::flag::Z);
257        if has_ext {
258            extension::skip_all(reader, "Final")?;
259        }
260
261        Ok(common::DeclareFinal)
262    }
263}
264
265// DeclareKeyExpr
266impl<W> WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080
267where
268    W: Writer,
269{
270    type Output = Result<(), DidntWrite>;
271
272    fn write(self, writer: &mut W, x: &keyexpr::DeclareKeyExpr) -> Self::Output {
273        let keyexpr::DeclareKeyExpr { id, wire_expr } = x;
274
275        // Header
276        let mut header = declare::id::D_KEYEXPR;
277        if wire_expr.has_suffix() {
278            header |= keyexpr::flag::N;
279        }
280        self.write(&mut *writer, header)?;
281
282        // Body
283        self.write(&mut *writer, id)?;
284        self.write(&mut *writer, wire_expr)?;
285
286        Ok(())
287    }
288}
289
290impl<R> RCodec<keyexpr::DeclareKeyExpr, &mut R> for Zenoh080
291where
292    R: Reader,
293{
294    type Error = DidntRead;
295
296    fn read(self, reader: &mut R) -> Result<keyexpr::DeclareKeyExpr, Self::Error> {
297        let header: u8 = self.read(&mut *reader)?;
298        let codec = Zenoh080Header::new(header);
299
300        codec.read(reader)
301    }
302}
303
304impl<R> RCodec<keyexpr::DeclareKeyExpr, &mut R> for Zenoh080Header
305where
306    R: Reader,
307{
308    type Error = DidntRead;
309
310    fn read(self, reader: &mut R) -> Result<keyexpr::DeclareKeyExpr, Self::Error> {
311        if imsg::mid(self.header) != declare::id::D_KEYEXPR {
312            return Err(DidntRead);
313        }
314
315        let id: ExprId = self.codec.read(&mut *reader)?;
316        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, keyexpr::flag::N));
317        let wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
318
319        // Extensions
320        let has_ext = imsg::has_flag(self.header, keyexpr::flag::Z);
321        if has_ext {
322            extension::skip_all(reader, "DeclareKeyExpr")?;
323        }
324
325        Ok(keyexpr::DeclareKeyExpr { id, wire_expr })
326    }
327}
328
329// UndeclareKeyExpr
330impl<W> WCodec<&keyexpr::UndeclareKeyExpr, &mut W> for Zenoh080
331where
332    W: Writer,
333{
334    type Output = Result<(), DidntWrite>;
335
336    fn write(self, writer: &mut W, x: &keyexpr::UndeclareKeyExpr) -> Self::Output {
337        let keyexpr::UndeclareKeyExpr { id } = x;
338
339        // Header
340        let header = declare::id::U_KEYEXPR;
341        self.write(&mut *writer, header)?;
342
343        // Body
344        self.write(&mut *writer, id)?;
345
346        Ok(())
347    }
348}
349
350impl<R> RCodec<keyexpr::UndeclareKeyExpr, &mut R> for Zenoh080
351where
352    R: Reader,
353{
354    type Error = DidntRead;
355
356    fn read(self, reader: &mut R) -> Result<keyexpr::UndeclareKeyExpr, Self::Error> {
357        let header: u8 = self.read(&mut *reader)?;
358        let codec = Zenoh080Header::new(header);
359
360        codec.read(reader)
361    }
362}
363
364impl<R> RCodec<keyexpr::UndeclareKeyExpr, &mut R> for Zenoh080Header
365where
366    R: Reader,
367{
368    type Error = DidntRead;
369
370    fn read(self, reader: &mut R) -> Result<keyexpr::UndeclareKeyExpr, Self::Error> {
371        if imsg::mid(self.header) != declare::id::U_KEYEXPR {
372            return Err(DidntRead);
373        }
374
375        let id: ExprId = self.codec.read(&mut *reader)?;
376
377        // Extensions
378        let has_ext = imsg::has_flag(self.header, keyexpr::flag::Z);
379        if has_ext {
380            extension::skip_all(reader, "UndeclareKeyExpr")?;
381        }
382
383        Ok(keyexpr::UndeclareKeyExpr { id })
384    }
385}
386
387// DeclareSubscriber
388impl<W> WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080
389where
390    W: Writer,
391{
392    type Output = Result<(), DidntWrite>;
393
394    fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output {
395        let subscriber::DeclareSubscriber { id, wire_expr } = x;
396
397        // Header
398        let mut header = declare::id::D_SUBSCRIBER;
399        if wire_expr.mapping != Mapping::DEFAULT {
400            header |= subscriber::flag::M;
401        }
402        if wire_expr.has_suffix() {
403            header |= subscriber::flag::N;
404        }
405        self.write(&mut *writer, header)?;
406
407        // Body
408        self.write(&mut *writer, id)?;
409        self.write(&mut *writer, wire_expr)?;
410
411        // Extensions
412
413        Ok(())
414    }
415}
416
417impl<R> RCodec<subscriber::DeclareSubscriber, &mut R> for Zenoh080
418where
419    R: Reader,
420{
421    type Error = DidntRead;
422
423    fn read(self, reader: &mut R) -> Result<subscriber::DeclareSubscriber, Self::Error> {
424        let header: u8 = self.read(&mut *reader)?;
425        let codec = Zenoh080Header::new(header);
426
427        codec.read(reader)
428    }
429}
430
431impl<R> RCodec<subscriber::DeclareSubscriber, &mut R> for Zenoh080Header
432where
433    R: Reader,
434{
435    type Error = DidntRead;
436
437    fn read(self, reader: &mut R) -> Result<subscriber::DeclareSubscriber, Self::Error> {
438        if imsg::mid(self.header) != declare::id::D_SUBSCRIBER {
439            return Err(DidntRead);
440        }
441
442        // Body
443        let id: subscriber::SubscriberId = self.codec.read(&mut *reader)?;
444        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, subscriber::flag::N));
445        let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
446        wire_expr.mapping = if imsg::has_flag(self.header, subscriber::flag::M) {
447            Mapping::Sender
448        } else {
449            Mapping::Receiver
450        };
451
452        // Extensions
453        let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
454        while has_ext {
455            let ext: u8 = self.codec.read(&mut *reader)?;
456            has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
457        }
458
459        Ok(subscriber::DeclareSubscriber { id, wire_expr })
460    }
461}
462
463// UndeclareSubscriber
464impl<W> WCodec<&subscriber::UndeclareSubscriber, &mut W> for Zenoh080
465where
466    W: Writer,
467{
468    type Output = Result<(), DidntWrite>;
469
470    fn write(self, writer: &mut W, x: &subscriber::UndeclareSubscriber) -> Self::Output {
471        let subscriber::UndeclareSubscriber { id, ext_wire_expr } = x;
472
473        // Header
474        let mut header = declare::id::U_SUBSCRIBER;
475        if !ext_wire_expr.is_null() {
476            header |= subscriber::flag::Z;
477        }
478        self.write(&mut *writer, header)?;
479
480        // Body
481        self.write(&mut *writer, id)?;
482
483        // Extension
484        if !ext_wire_expr.is_null() {
485            self.write(&mut *writer, (ext_wire_expr, false))?;
486        }
487
488        Ok(())
489    }
490}
491
492impl<R> RCodec<subscriber::UndeclareSubscriber, &mut R> for Zenoh080
493where
494    R: Reader,
495{
496    type Error = DidntRead;
497
498    fn read(self, reader: &mut R) -> Result<subscriber::UndeclareSubscriber, Self::Error> {
499        let header: u8 = self.read(&mut *reader)?;
500        let codec = Zenoh080Header::new(header);
501
502        codec.read(reader)
503    }
504}
505
506impl<R> RCodec<subscriber::UndeclareSubscriber, &mut R> for Zenoh080Header
507where
508    R: Reader,
509{
510    type Error = DidntRead;
511
512    fn read(self, reader: &mut R) -> Result<subscriber::UndeclareSubscriber, Self::Error> {
513        if imsg::mid(self.header) != declare::id::U_SUBSCRIBER {
514            return Err(DidntRead);
515        }
516
517        // Body
518        let id: subscriber::SubscriberId = self.codec.read(&mut *reader)?;
519
520        // Extensions
521        let mut ext_wire_expr = common::ext::WireExprType::null();
522
523        let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
524        while has_ext {
525            let ext: u8 = self.codec.read(&mut *reader)?;
526            let eodec = Zenoh080Header::new(ext);
527            match iext::eid(ext) {
528                common::ext::WireExprExt::ID => {
529                    let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
530                    ext_wire_expr = we;
531                    has_ext = ext;
532                }
533                _ => {
534                    has_ext = extension::skip(reader, "UndeclareSubscriber", ext)?;
535                }
536            }
537        }
538
539        Ok(subscriber::UndeclareSubscriber { id, ext_wire_expr })
540    }
541}
542
543// QueryableInfo
544impl<W> WCodec<(&queryable::ext::QueryableInfoType, bool), &mut W> for Zenoh080
545where
546    W: Writer,
547{
548    type Output = Result<(), DidntWrite>;
549    fn write(self, writer: &mut W, x: (&queryable::ext::QueryableInfoType, bool)) -> Self::Output {
550        let (x, more) = x;
551
552        let mut flags: u8 = 0;
553        if x.complete {
554            flags |= queryable::ext::flag::C;
555        }
556        let v: u64 = (flags as u64) | ((x.distance as u64) << 8);
557        let ext = queryable::ext::QueryableInfo::new(v);
558
559        self.write(&mut *writer, (&ext, more))
560    }
561}
562
563impl<R> RCodec<(queryable::ext::QueryableInfoType, bool), &mut R> for Zenoh080Header
564where
565    R: Reader,
566{
567    type Error = DidntRead;
568
569    fn read(
570        self,
571        reader: &mut R,
572    ) -> Result<(queryable::ext::QueryableInfoType, bool), Self::Error> {
573        let (ext, more): (queryable::ext::QueryableInfo, bool) = self.read(&mut *reader)?;
574
575        let complete = imsg::has_flag(ext.value as u8, queryable::ext::flag::C);
576        let distance = (ext.value >> 8) as u16;
577
578        Ok((
579            queryable::ext::QueryableInfoType { complete, distance },
580            more,
581        ))
582    }
583}
584
585// DeclareQueryable
586impl<W> WCodec<&queryable::DeclareQueryable, &mut W> for Zenoh080
587where
588    W: Writer,
589{
590    type Output = Result<(), DidntWrite>;
591
592    fn write(self, writer: &mut W, x: &queryable::DeclareQueryable) -> Self::Output {
593        let queryable::DeclareQueryable {
594            id,
595            wire_expr,
596            ext_info,
597        } = x;
598
599        // Header
600        let mut header = declare::id::D_QUERYABLE;
601        let mut n_exts = (ext_info != &queryable::ext::QueryableInfoType::DEFAULT) as u8;
602        if n_exts != 0 {
603            header |= subscriber::flag::Z;
604        }
605        if wire_expr.mapping != Mapping::DEFAULT {
606            header |= subscriber::flag::M;
607        }
608        if wire_expr.has_suffix() {
609            header |= subscriber::flag::N;
610        }
611        self.write(&mut *writer, header)?;
612
613        // Body
614        self.write(&mut *writer, id)?;
615        self.write(&mut *writer, wire_expr)?;
616        if ext_info != &queryable::ext::QueryableInfoType::DEFAULT {
617            n_exts -= 1;
618            self.write(&mut *writer, (ext_info, n_exts != 0))?;
619        }
620
621        Ok(())
622    }
623}
624
625impl<R> RCodec<queryable::DeclareQueryable, &mut R> for Zenoh080
626where
627    R: Reader,
628{
629    type Error = DidntRead;
630
631    fn read(self, reader: &mut R) -> Result<queryable::DeclareQueryable, Self::Error> {
632        let header: u8 = self.read(&mut *reader)?;
633        let codec = Zenoh080Header::new(header);
634
635        codec.read(reader)
636    }
637}
638
639impl<R> RCodec<queryable::DeclareQueryable, &mut R> for Zenoh080Header
640where
641    R: Reader,
642{
643    type Error = DidntRead;
644
645    fn read(self, reader: &mut R) -> Result<queryable::DeclareQueryable, Self::Error> {
646        if imsg::mid(self.header) != declare::id::D_QUERYABLE {
647            return Err(DidntRead);
648        }
649
650        // Body
651        let id: queryable::QueryableId = self.codec.read(&mut *reader)?;
652        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, queryable::flag::N));
653        let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
654        wire_expr.mapping = if imsg::has_flag(self.header, queryable::flag::M) {
655            Mapping::Sender
656        } else {
657            Mapping::Receiver
658        };
659
660        // Extensions
661        let mut ext_info = queryable::ext::QueryableInfoType::DEFAULT;
662
663        let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z);
664        while has_ext {
665            let ext: u8 = self.codec.read(&mut *reader)?;
666            let eodec = Zenoh080Header::new(ext);
667            match iext::eid(ext) {
668                queryable::ext::QueryableInfo::ID => {
669                    let (i, ext): (queryable::ext::QueryableInfoType, bool) =
670                        eodec.read(&mut *reader)?;
671                    ext_info = i;
672                    has_ext = ext;
673                }
674                _ => {
675                    has_ext = extension::skip(reader, "DeclareQueryable", ext)?;
676                }
677            }
678        }
679
680        Ok(queryable::DeclareQueryable {
681            id,
682            wire_expr,
683            ext_info,
684        })
685    }
686}
687
688// UndeclareQueryable
689impl<W> WCodec<&queryable::UndeclareQueryable, &mut W> for Zenoh080
690where
691    W: Writer,
692{
693    type Output = Result<(), DidntWrite>;
694
695    fn write(self, writer: &mut W, x: &queryable::UndeclareQueryable) -> Self::Output {
696        let queryable::UndeclareQueryable { id, ext_wire_expr } = x;
697
698        // Header
699        let header = declare::id::U_QUERYABLE | queryable::flag::Z;
700        self.write(&mut *writer, header)?;
701
702        // Body
703        self.write(&mut *writer, id)?;
704
705        // Extension
706        self.write(&mut *writer, (ext_wire_expr, false))?;
707
708        Ok(())
709    }
710}
711
712impl<R> RCodec<queryable::UndeclareQueryable, &mut R> for Zenoh080
713where
714    R: Reader,
715{
716    type Error = DidntRead;
717
718    fn read(self, reader: &mut R) -> Result<queryable::UndeclareQueryable, Self::Error> {
719        let header: u8 = self.read(&mut *reader)?;
720        let codec = Zenoh080Header::new(header);
721
722        codec.read(reader)
723    }
724}
725
726impl<R> RCodec<queryable::UndeclareQueryable, &mut R> for Zenoh080Header
727where
728    R: Reader,
729{
730    type Error = DidntRead;
731
732    fn read(self, reader: &mut R) -> Result<queryable::UndeclareQueryable, Self::Error> {
733        if imsg::mid(self.header) != declare::id::U_QUERYABLE {
734            return Err(DidntRead);
735        }
736
737        // Body
738        let id: queryable::QueryableId = self.codec.read(&mut *reader)?;
739
740        // Extensions
741        let mut ext_wire_expr = common::ext::WireExprType::null();
742
743        let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z);
744        while has_ext {
745            let ext: u8 = self.codec.read(&mut *reader)?;
746            let eodec = Zenoh080Header::new(ext);
747            match iext::eid(ext) {
748                common::ext::WireExprExt::ID => {
749                    let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
750                    ext_wire_expr = we;
751                    has_ext = ext;
752                }
753                _ => {
754                    has_ext = extension::skip(reader, "UndeclareQueryable", ext)?;
755                }
756            }
757        }
758
759        Ok(queryable::UndeclareQueryable { id, ext_wire_expr })
760    }
761}
762
763// DeclareToken
764impl<W> WCodec<&token::DeclareToken, &mut W> for Zenoh080
765where
766    W: Writer,
767{
768    type Output = Result<(), DidntWrite>;
769
770    fn write(self, writer: &mut W, x: &token::DeclareToken) -> Self::Output {
771        let token::DeclareToken { id, wire_expr } = x;
772
773        // Header
774        let mut header = declare::id::D_TOKEN;
775        if wire_expr.mapping != Mapping::DEFAULT {
776            header |= subscriber::flag::M;
777        }
778        if wire_expr.has_suffix() {
779            header |= subscriber::flag::N;
780        }
781        self.write(&mut *writer, header)?;
782
783        // Body
784        self.write(&mut *writer, id)?;
785        self.write(&mut *writer, wire_expr)?;
786
787        Ok(())
788    }
789}
790
791impl<R> RCodec<token::DeclareToken, &mut R> for Zenoh080
792where
793    R: Reader,
794{
795    type Error = DidntRead;
796
797    fn read(self, reader: &mut R) -> Result<token::DeclareToken, Self::Error> {
798        let header: u8 = self.read(&mut *reader)?;
799        let codec = Zenoh080Header::new(header);
800        codec.read(reader)
801    }
802}
803
804impl<R> RCodec<token::DeclareToken, &mut R> for Zenoh080Header
805where
806    R: Reader,
807{
808    type Error = DidntRead;
809
810    fn read(self, reader: &mut R) -> Result<token::DeclareToken, Self::Error> {
811        if imsg::mid(self.header) != declare::id::D_TOKEN {
812            return Err(DidntRead);
813        }
814
815        // Body
816        let id: token::TokenId = self.codec.read(&mut *reader)?;
817        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, token::flag::N));
818        let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
819        wire_expr.mapping = if imsg::has_flag(self.header, token::flag::M) {
820            Mapping::Sender
821        } else {
822            Mapping::Receiver
823        };
824
825        // Extensions
826        let has_ext = imsg::has_flag(self.header, token::flag::Z);
827        if has_ext {
828            extension::skip_all(reader, "DeclareToken")?;
829        }
830
831        Ok(token::DeclareToken { id, wire_expr })
832    }
833}
834
835// UndeclareToken
836impl<W> WCodec<&token::UndeclareToken, &mut W> for Zenoh080
837where
838    W: Writer,
839{
840    type Output = Result<(), DidntWrite>;
841
842    fn write(self, writer: &mut W, x: &token::UndeclareToken) -> Self::Output {
843        let token::UndeclareToken { id, ext_wire_expr } = x;
844
845        // Header
846        let header = declare::id::U_TOKEN | token::flag::Z;
847        self.write(&mut *writer, header)?;
848
849        // Body
850        self.write(&mut *writer, id)?;
851
852        // Extension
853        self.write(&mut *writer, (ext_wire_expr, false))?;
854
855        Ok(())
856    }
857}
858
859impl<R> RCodec<token::UndeclareToken, &mut R> for Zenoh080
860where
861    R: Reader,
862{
863    type Error = DidntRead;
864
865    fn read(self, reader: &mut R) -> Result<token::UndeclareToken, Self::Error> {
866        let header: u8 = self.read(&mut *reader)?;
867        let codec = Zenoh080Header::new(header);
868
869        codec.read(reader)
870    }
871}
872
873impl<R> RCodec<token::UndeclareToken, &mut R> for Zenoh080Header
874where
875    R: Reader,
876{
877    type Error = DidntRead;
878
879    fn read(self, reader: &mut R) -> Result<token::UndeclareToken, Self::Error> {
880        if imsg::mid(self.header) != declare::id::U_TOKEN {
881            return Err(DidntRead);
882        }
883
884        // Body
885        let id: token::TokenId = self.codec.read(&mut *reader)?;
886
887        // Extensions
888        let mut ext_wire_expr = common::ext::WireExprType::null();
889
890        let mut has_ext = imsg::has_flag(self.header, token::flag::Z);
891        while has_ext {
892            let ext: u8 = self.codec.read(&mut *reader)?;
893            let eodec = Zenoh080Header::new(ext);
894            match iext::eid(ext) {
895                common::ext::WireExprExt::ID => {
896                    let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?;
897                    ext_wire_expr = we;
898                    has_ext = ext;
899                }
900                _ => {
901                    has_ext = extension::skip(reader, "UndeclareToken", ext)?;
902                }
903            }
904        }
905
906        Ok(token::UndeclareToken { id, ext_wire_expr })
907    }
908}
909
910// WARNING: this is a temporary extension used for undeclarations
911impl<W> WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080
912where
913    W: Writer,
914{
915    type Output = Result<(), DidntWrite>;
916
917    fn write(self, writer: &mut W, x: (&common::ext::WireExprType, bool)) -> Self::Output {
918        let (x, more) = x;
919        let common::ext::WireExprType { wire_expr } = x;
920
921        let codec = Zenoh080::new();
922        let mut value = ZBuf::empty();
923        let mut zriter = value.writer();
924
925        let mut flags: u8 = 0;
926        if x.wire_expr.has_suffix() {
927            flags |= 1;
928        }
929        if let Mapping::Sender = wire_expr.mapping {
930            flags |= 1 << 1;
931        }
932        codec.write(&mut zriter, flags)?;
933
934        codec.write(&mut zriter, wire_expr.scope)?;
935        if wire_expr.has_suffix() {
936            zriter.write_exact(wire_expr.suffix.as_bytes())?;
937        }
938
939        let ext = common::ext::WireExprExt { value };
940        codec.write(&mut *writer, (&ext, more))?;
941
942        Ok(())
943    }
944}
945
946impl<R> RCodec<(common::ext::WireExprType, bool), &mut R> for Zenoh080Header
947where
948    R: Reader,
949{
950    type Error = DidntRead;
951
952    fn read(self, reader: &mut R) -> Result<(common::ext::WireExprType, bool), Self::Error> {
953        use zenoh_buffers::reader::HasReader;
954
955        let (ext, more): (common::ext::WireExprExt, bool) = self.read(&mut *reader)?;
956
957        let mut zeader = ext.value.reader();
958        let flags: u8 = self.codec.read(&mut zeader)?;
959
960        let scope: ExprLen = self.codec.read(&mut zeader)?;
961        let suffix = if imsg::has_flag(flags, 1) {
962            let mut buff = zenoh_buffers::vec::uninit(zeader.remaining());
963            zeader.read_exact(&mut buff)?;
964            String::from_utf8(buff).map_err(|_| DidntRead)?
965        } else {
966            String::new()
967        };
968        let mapping = if imsg::has_flag(flags, 1 << 1) {
969            Mapping::Sender
970        } else {
971            Mapping::Receiver
972        };
973
974        Ok((
975            common::ext::WireExprType {
976                wire_expr: WireExpr {
977                    scope,
978                    suffix: suffix.into(),
979                    mapping,
980                },
981            },
982            more,
983        ))
984    }
985}