zenoh_codec/zenoh/
reply.rs1use alloc::vec::Vec;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21 common::imsg,
22 zenoh::{
23 id,
24 query::ConsolidationMode,
25 reply::{flag, Reply, ReplyBody},
26 },
27};
28
29use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
30
31impl<W> WCodec<&Reply, &mut W> for Zenoh080
32where
33 W: Writer,
34{
35 type Output = Result<(), DidntWrite>;
36
37 fn write(self, writer: &mut W, x: &Reply) -> Self::Output {
38 let Reply {
39 consolidation,
40 ext_unknown,
41 payload,
42 } = x;
43
44 let mut header = id::REPLY;
46 if consolidation != &ConsolidationMode::DEFAULT {
47 header |= flag::C;
48 }
49 let mut n_exts = ext_unknown.len() as u8;
50 if n_exts != 0 {
51 header |= flag::Z;
52 }
53 self.write(&mut *writer, header)?;
54
55 if consolidation != &ConsolidationMode::DEFAULT {
57 self.write(&mut *writer, *consolidation)?;
58 }
59
60 for u in ext_unknown.iter() {
62 n_exts -= 1;
63 self.write(&mut *writer, (u, n_exts != 0))?;
64 }
65
66 self.write(&mut *writer, payload)?;
68
69 Ok(())
70 }
71}
72
73impl<R> RCodec<Reply, &mut R> for Zenoh080
74where
75 R: Reader,
76{
77 type Error = DidntRead;
78
79 fn read(self, reader: &mut R) -> Result<Reply, Self::Error> {
80 let header: u8 = self.read(&mut *reader)?;
81 let codec = Zenoh080Header::new(header);
82 codec.read(reader)
83 }
84}
85
86impl<R> RCodec<Reply, &mut R> for Zenoh080Header
87where
88 R: Reader,
89{
90 type Error = DidntRead;
91
92 fn read(self, reader: &mut R) -> Result<Reply, Self::Error> {
93 if imsg::mid(self.header) != id::REPLY {
94 return Err(DidntRead);
95 }
96
97 let mut consolidation = ConsolidationMode::DEFAULT;
99 if imsg::has_flag(self.header, flag::C) {
100 consolidation = self.codec.read(&mut *reader)?;
101 }
102
103 let mut ext_unknown = Vec::new();
105
106 let mut has_ext = imsg::has_flag(self.header, flag::Z);
107 while has_ext {
108 let ext: u8 = self.codec.read(&mut *reader)?;
109 let (u, ext) = extension::read(reader, "Reply", ext)?;
110 ext_unknown.push(u);
111 has_ext = ext;
112 }
113
114 let payload: ReplyBody = self.codec.read(&mut *reader)?;
116
117 Ok(Reply {
118 consolidation,
119 ext_unknown,
120 payload,
121 })
122 }
123}