1mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22 reader::{BacktrackableReader, DidntRead, Reader},
23 writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26 common::{imsg, ZExtZ64, ZExtZBufHeader},
27 core::{EntityId, Reliability, ZenohIdProto},
28 network::{
29 ext::{self, EntityGlobalIdType},
30 id, NetworkBody, NetworkBodyRef, NetworkMessage, NetworkMessageExt, NetworkMessageRef,
31 },
32};
33
34use crate::{
35 LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
36};
37
38impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for Zenoh080
40where
41 W: Writer,
42{
43 type Output = Result<(), DidntWrite>;
44
45 #[inline(always)]
46 fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
47 let NetworkMessageRef { body, .. } = x;
48 if let NetworkBodyRef::Push(b) = body {
49 return self.write(&mut *writer, b);
50 }
51 #[cold]
52 fn write_not_push<W: Writer>(
53 codec: Zenoh080,
54 writer: &mut W,
55 body: NetworkBodyRef,
56 ) -> Result<(), DidntWrite> {
57 match body {
58 NetworkBodyRef::Push(_) => unreachable!(),
59 NetworkBodyRef::Request(b) => codec.write(&mut *writer, b),
60 NetworkBodyRef::Response(b) => codec.write(&mut *writer, b),
61 NetworkBodyRef::ResponseFinal(b) => codec.write(&mut *writer, b),
62 NetworkBodyRef::Interest(b) => codec.write(&mut *writer, b),
63 NetworkBodyRef::Declare(b) => codec.write(&mut *writer, b),
64 NetworkBodyRef::OAM(b) => codec.write(&mut *writer, b),
65 }
66 }
67 write_not_push(self, writer, body)
68 }
69}
70
71impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
72where
73 W: Writer,
74{
75 type Output = Result<(), DidntWrite>;
76
77 fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
78 self.write(writer, x.as_ref())
79 }
80}
81
82impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
83where
84 R: Reader,
85{
86 type Error = DidntRead;
87
88 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
89 let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
90 codec.read(reader)
91 }
92}
93
94impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
95where
96 R: Reader,
97{
98 type Error = DidntRead;
99
100 #[inline(always)]
101 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
102 let header: u8 = self.codec.read(&mut *reader)?;
103
104 let codec = Zenoh080Header::new(header);
105 let mut msg: NetworkMessage = codec.read(&mut *reader)?;
106 msg.reliability = self.reliability;
107 Ok(msg)
108 }
109}
110
111impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
112where
113 R: Reader,
114{
115 type Error = DidntRead;
116
117 #[inline(always)]
118 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
119 if imsg::mid(self.header) == id::PUSH {
120 return Ok(NetworkBody::Push(self.read(&mut *reader)?).into());
121 }
122 #[cold]
123 fn read_not_push<R: Reader>(
124 header: Zenoh080Header,
125 reader: &mut R,
126 ) -> Result<NetworkMessage, DidntRead> {
127 let body = match imsg::mid(header.header) {
128 id::REQUEST => NetworkBody::Request(header.read(&mut *reader)?),
129 id::RESPONSE => NetworkBody::Response(header.read(&mut *reader)?),
130 id::RESPONSE_FINAL => NetworkBody::ResponseFinal(header.read(&mut *reader)?),
131 id::INTEREST => NetworkBody::Interest(header.read(&mut *reader)?),
132 id::DECLARE => NetworkBody::Declare(header.read(&mut *reader)?),
133 id::OAM => NetworkBody::OAM(header.read(&mut *reader)?),
134 _ => return Err(DidntRead),
135 };
136
137 Ok(body.into())
138 }
139 read_not_push(self, reader)
140 }
141}
142
143pub struct NetworkMessageIter<R> {
144 codec: Zenoh080Reliability,
145 reader: R,
146}
147
148impl<R> NetworkMessageIter<R> {
149 pub fn new(reliability: Reliability, reader: R) -> Self {
150 let codec = Zenoh080Reliability::new(reliability);
151 Self { codec, reader }
152 }
153}
154
155impl<R: BacktrackableReader> Iterator for NetworkMessageIter<R> {
156 type Item = NetworkMessage;
157
158 fn next(&mut self) -> Option<Self::Item> {
159 let mark = self.reader.mark();
160 let msg = self.codec.read(&mut self.reader).ok();
161 if msg.is_none() {
162 self.reader.rewind(mark);
163 }
164 msg
165 }
166}
167
168impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
170where
171 W: Writer,
172{
173 type Output = Result<(), DidntWrite>;
174
175 fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
176 let (x, more) = x;
177 let ext: ZExtZ64<{ ID }> = x.into();
178 self.write(&mut *writer, (&ext, more))
179 }
180}
181
182impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
183where
184 R: Reader,
185{
186 type Error = DidntRead;
187
188 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
189 let header: u8 = self.read(&mut *reader)?;
190 let codec = Zenoh080Header::new(header);
191 codec.read(reader)
192 }
193}
194
195impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
196where
197 R: Reader,
198{
199 type Error = DidntRead;
200
201 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
202 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
203 Ok((ext.into(), more))
204 }
205}
206
207impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
209where
210 W: Writer,
211{
212 type Output = Result<(), DidntWrite>;
213
214 fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
215 let (tstamp, more) = x;
216 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
217 self.write(&mut *writer, (&header, more))?;
218 self.write(&mut *writer, &tstamp.timestamp)
219 }
220}
221
222impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
223where
224 R: Reader,
225{
226 type Error = DidntRead;
227
228 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
229 let header: u8 = self.read(&mut *reader)?;
230 let codec = Zenoh080Header::new(header);
231 codec.read(reader)
232 }
233}
234
235impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
236where
237 R: Reader,
238{
239 type Error = DidntRead;
240
241 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
242 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
243 let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
244 Ok((ext::TimestampType { timestamp }, more))
245 }
246}
247
248impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
250where
251 W: Writer,
252{
253 type Output = Result<(), DidntWrite>;
254
255 fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
256 let (x, more) = x;
257 let ext: ZExtZ64<{ ID }> = x.into();
258 self.write(&mut *writer, (&ext, more))
259 }
260}
261
262impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
263where
264 R: Reader,
265{
266 type Error = DidntRead;
267
268 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
269 let header: u8 = self.read(&mut *reader)?;
270 let codec = Zenoh080Header::new(header);
271 codec.read(reader)
272 }
273}
274
275impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
276where
277 R: Reader,
278{
279 type Error = DidntRead;
280
281 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
282 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
283 Ok((ext.into(), more))
284 }
285}
286
287impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
289 fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
290 let EntityGlobalIdType { zid, eid } = x;
291
292 1 + self.w_len(zid) + self.w_len(*eid)
293 }
294}
295
296impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
297where
298 W: Writer,
299{
300 type Output = Result<(), DidntWrite>;
301
302 fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
303 let (x, more) = x;
304 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
305 self.write(&mut *writer, (&header, more))?;
306
307 let flags: u8 = (x.zid.size() as u8 - 1) << 4;
308 self.write(&mut *writer, flags)?;
309
310 let lodec = Zenoh080Length::new(x.zid.size());
311 lodec.write(&mut *writer, &x.zid)?;
312
313 self.write(&mut *writer, x.eid)?;
314 Ok(())
315 }
316}
317
318impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
319where
320 R: Reader,
321{
322 type Error = DidntRead;
323
324 fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
325 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
326
327 let flags: u8 = self.codec.read(&mut *reader)?;
328 let length = 1 + ((flags >> 4) as usize);
329
330 let lodec = Zenoh080Length::new(length);
331 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
332
333 let eid: EntityId = self.codec.read(&mut *reader)?;
334
335 Ok((ext::EntityGlobalIdType { zid, eid }, more))
336 }
337}