zenoh_codec/zenoh/
query.rs1use alloc::{string::String, vec::Vec};
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21 common::{iext, imsg},
22 zenoh::{
23 id,
24 query::{ext, flag, ConsolidationMode, Query},
25 },
26};
27
28use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
29
30impl<W> WCodec<ConsolidationMode, &mut W> for Zenoh080
32where
33 W: Writer,
34{
35 type Output = Result<(), DidntWrite>;
36
37 fn write(self, writer: &mut W, x: ConsolidationMode) -> Self::Output {
38 let v: u64 = match x {
39 ConsolidationMode::Auto => 0,
40 ConsolidationMode::None => 1,
41 ConsolidationMode::Monotonic => 2,
42 ConsolidationMode::Latest => 3,
43 };
44 self.write(&mut *writer, v)
45 }
46}
47
48impl<R> RCodec<ConsolidationMode, &mut R> for Zenoh080
49where
50 R: Reader,
51{
52 type Error = DidntRead;
53
54 fn read(self, reader: &mut R) -> Result<ConsolidationMode, Self::Error> {
55 let v: u64 = self.read(&mut *reader)?;
56 let c = match v {
57 0 => ConsolidationMode::Auto,
58 1 => ConsolidationMode::None,
59 2 => ConsolidationMode::Monotonic,
60 3 => ConsolidationMode::Latest,
61 _ => ConsolidationMode::Auto, };
63 Ok(c)
64 }
65}
66
67impl<W> WCodec<&Query, &mut W> for Zenoh080
68where
69 W: Writer,
70{
71 type Output = Result<(), DidntWrite>;
72
73 fn write(self, writer: &mut W, x: &Query) -> Self::Output {
74 let Query {
75 consolidation,
76 parameters,
77 ext_sinfo,
78 ext_body,
79 ext_attachment,
80 ext_unknown,
81 } = x;
82
83 let mut header = id::QUERY;
85 if consolidation != &ConsolidationMode::DEFAULT {
86 header |= flag::C;
87 }
88 if !parameters.is_empty() {
89 header |= flag::P;
90 }
91 let mut n_exts = (ext_sinfo.is_some() as u8)
92 + (ext_body.is_some() as u8)
93 + (ext_attachment.is_some() as u8)
94 + (ext_unknown.len() as u8);
95 if n_exts != 0 {
96 header |= flag::Z;
97 }
98 self.write(&mut *writer, header)?;
99
100 if consolidation != &ConsolidationMode::DEFAULT {
102 self.write(&mut *writer, *consolidation)?;
103 }
104 if !parameters.is_empty() {
105 self.write(&mut *writer, parameters)?;
106 }
107
108 if let Some(sinfo) = ext_sinfo.as_ref() {
110 n_exts -= 1;
111 self.write(&mut *writer, (sinfo, n_exts != 0))?;
112 }
113 if let Some(body) = ext_body.as_ref() {
114 n_exts -= 1;
115 self.write(&mut *writer, (body, n_exts != 0))?;
116 }
117 if let Some(att) = ext_attachment.as_ref() {
118 n_exts -= 1;
119 self.write(&mut *writer, (att, n_exts != 0))?;
120 }
121 for u in ext_unknown.iter() {
122 n_exts -= 1;
123 self.write(&mut *writer, (u, n_exts != 0))?;
124 }
125
126 Ok(())
127 }
128}
129
130impl<R> RCodec<Query, &mut R> for Zenoh080
131where
132 R: Reader,
133{
134 type Error = DidntRead;
135
136 fn read(self, reader: &mut R) -> Result<Query, Self::Error> {
137 let header: u8 = self.read(&mut *reader)?;
138 let codec = Zenoh080Header::new(header);
139 codec.read(reader)
140 }
141}
142
143impl<R> RCodec<Query, &mut R> for Zenoh080Header
144where
145 R: Reader,
146{
147 type Error = DidntRead;
148
149 fn read(self, reader: &mut R) -> Result<Query, Self::Error> {
150 if imsg::mid(self.header) != id::QUERY {
151 return Err(DidntRead);
152 }
153
154 let mut consolidation = ConsolidationMode::DEFAULT;
156 if imsg::has_flag(self.header, flag::C) {
157 consolidation = self.codec.read(&mut *reader)?;
158 }
159
160 let mut parameters = String::new();
161 if imsg::has_flag(self.header, flag::P) {
162 parameters = self.codec.read(&mut *reader)?;
163 }
164
165 let mut ext_sinfo: Option<ext::SourceInfoType> = None;
167 let mut ext_body: Option<ext::QueryBodyType> = None;
168 let mut ext_attachment: Option<ext::AttachmentType> = None;
169 let mut ext_unknown = Vec::new();
170
171 let mut has_ext = imsg::has_flag(self.header, flag::Z);
172 while has_ext {
173 let ext: u8 = self.codec.read(&mut *reader)?;
174 let eodec = Zenoh080Header::new(ext);
175 match iext::eid(ext) {
176 ext::SourceInfo::ID => {
177 let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
178 ext_sinfo = Some(s);
179 has_ext = ext;
180 }
181 ext::QueryBodyType::SID | ext::QueryBodyType::VID => {
182 let (s, ext): (ext::QueryBodyType, bool) = eodec.read(&mut *reader)?;
183 ext_body = Some(s);
184 has_ext = ext;
185 }
186 ext::Attachment::ID => {
187 let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
188 ext_attachment = Some(a);
189 has_ext = ext;
190 }
191 _ => {
192 let (u, ext) = extension::read(reader, "Query", ext)?;
193 ext_unknown.push(u);
194 has_ext = ext;
195 }
196 }
197 }
198
199 Ok(Query {
200 consolidation,
201 parameters,
202 ext_sinfo,
203 ext_body,
204 ext_attachment,
205 ext_unknown,
206 })
207 }
208}