1mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22 reader::{DidntRead, Reader},
23 writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26 common::{imsg, ZExtZ64, ZExtZBufHeader},
27 core::{EntityId, Reliability, ZenohIdProto},
28 network::{
29 ext::{self, EntityGlobalIdType},
30 id, NetworkBody, NetworkBodyRef, NetworkMessage, NetworkMessageExt, NetworkMessageRef,
31 },
32};
33
34use crate::{
35 LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
36};
37
38impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for Zenoh080
40where
41 W: Writer,
42{
43 type Output = Result<(), DidntWrite>;
44
45 fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
46 let NetworkMessageRef { body, .. } = x;
47
48 match body {
49 NetworkBodyRef::Push(b) => self.write(&mut *writer, b),
50 NetworkBodyRef::Request(b) => self.write(&mut *writer, b),
51 NetworkBodyRef::Response(b) => self.write(&mut *writer, b),
52 NetworkBodyRef::ResponseFinal(b) => self.write(&mut *writer, b),
53 NetworkBodyRef::Interest(b) => self.write(&mut *writer, b),
54 NetworkBodyRef::Declare(b) => self.write(&mut *writer, b),
55 NetworkBodyRef::OAM(b) => self.write(&mut *writer, b),
56 }
57 }
58}
59
60impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
61where
62 W: Writer,
63{
64 type Output = Result<(), DidntWrite>;
65
66 fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
67 self.write(writer, x.as_ref())
68 }
69}
70
71impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
72where
73 R: Reader,
74{
75 type Error = DidntRead;
76
77 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
78 let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
79 codec.read(reader)
80 }
81}
82
83impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
84where
85 R: Reader,
86{
87 type Error = DidntRead;
88
89 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
90 let header: u8 = self.codec.read(&mut *reader)?;
91
92 let codec = Zenoh080Header::new(header);
93 let mut msg: NetworkMessage = codec.read(&mut *reader)?;
94 msg.reliability = self.reliability;
95 Ok(msg)
96 }
97}
98
99impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
100where
101 R: Reader,
102{
103 type Error = DidntRead;
104
105 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
106 let body = match imsg::mid(self.header) {
107 id::PUSH => NetworkBody::Push(self.read(&mut *reader)?),
108 id::REQUEST => NetworkBody::Request(self.read(&mut *reader)?),
109 id::RESPONSE => NetworkBody::Response(self.read(&mut *reader)?),
110 id::RESPONSE_FINAL => NetworkBody::ResponseFinal(self.read(&mut *reader)?),
111 id::INTEREST => NetworkBody::Interest(self.read(&mut *reader)?),
112 id::DECLARE => NetworkBody::Declare(self.read(&mut *reader)?),
113 id::OAM => NetworkBody::OAM(self.read(&mut *reader)?),
114 _ => return Err(DidntRead),
115 };
116
117 Ok(body.into())
118 }
119}
120
121pub struct NetworkMessageIter<R> {
122 codec: Zenoh080Reliability,
123 reader: R,
124}
125
126impl<R> NetworkMessageIter<R> {
127 pub fn new(reliability: Reliability, reader: R) -> Self {
128 let codec = Zenoh080Reliability::new(reliability);
129 Self { codec, reader }
130 }
131}
132
133impl<R: Reader> Iterator for NetworkMessageIter<R> {
134 type Item = NetworkMessage;
135
136 fn next(&mut self) -> Option<Self::Item> {
137 self.codec.read(&mut self.reader).ok()
138 }
139}
140
141impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
143where
144 W: Writer,
145{
146 type Output = Result<(), DidntWrite>;
147
148 fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
149 let (x, more) = x;
150 let ext: ZExtZ64<{ ID }> = x.into();
151 self.write(&mut *writer, (&ext, more))
152 }
153}
154
155impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
156where
157 R: Reader,
158{
159 type Error = DidntRead;
160
161 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
162 let header: u8 = self.read(&mut *reader)?;
163 let codec = Zenoh080Header::new(header);
164 codec.read(reader)
165 }
166}
167
168impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
169where
170 R: Reader,
171{
172 type Error = DidntRead;
173
174 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
175 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
176 Ok((ext.into(), more))
177 }
178}
179
180impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
182where
183 W: Writer,
184{
185 type Output = Result<(), DidntWrite>;
186
187 fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
188 let (tstamp, more) = x;
189 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
190 self.write(&mut *writer, (&header, more))?;
191 self.write(&mut *writer, &tstamp.timestamp)
192 }
193}
194
195impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
196where
197 R: Reader,
198{
199 type Error = DidntRead;
200
201 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
202 let header: u8 = self.read(&mut *reader)?;
203 let codec = Zenoh080Header::new(header);
204 codec.read(reader)
205 }
206}
207
208impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
209where
210 R: Reader,
211{
212 type Error = DidntRead;
213
214 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
215 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
216 let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
217 Ok((ext::TimestampType { timestamp }, more))
218 }
219}
220
221impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
223where
224 W: Writer,
225{
226 type Output = Result<(), DidntWrite>;
227
228 fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
229 let (x, more) = x;
230 let ext: ZExtZ64<{ ID }> = x.into();
231 self.write(&mut *writer, (&ext, more))
232 }
233}
234
235impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
236where
237 R: Reader,
238{
239 type Error = DidntRead;
240
241 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
242 let header: u8 = self.read(&mut *reader)?;
243 let codec = Zenoh080Header::new(header);
244 codec.read(reader)
245 }
246}
247
248impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
249where
250 R: Reader,
251{
252 type Error = DidntRead;
253
254 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
255 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
256 Ok((ext.into(), more))
257 }
258}
259
260impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
262 fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
263 let EntityGlobalIdType { zid, eid } = x;
264
265 1 + self.w_len(zid) + self.w_len(*eid)
266 }
267}
268
269impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
270where
271 W: Writer,
272{
273 type Output = Result<(), DidntWrite>;
274
275 fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
276 let (x, more) = x;
277 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
278 self.write(&mut *writer, (&header, more))?;
279
280 let flags: u8 = (x.zid.size() as u8 - 1) << 4;
281 self.write(&mut *writer, flags)?;
282
283 let lodec = Zenoh080Length::new(x.zid.size());
284 lodec.write(&mut *writer, &x.zid)?;
285
286 self.write(&mut *writer, x.eid)?;
287 Ok(())
288 }
289}
290
291impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
292where
293 R: Reader,
294{
295 type Error = DidntRead;
296
297 fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
298 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
299
300 let flags: u8 = self.codec.read(&mut *reader)?;
301 let length = 1 + ((flags >> 4) as usize);
302
303 let lodec = Zenoh080Length::new(length);
304 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
305
306 let eid: EntityId = self.codec.read(&mut *reader)?;
307
308 Ok((ext::EntityGlobalIdType { zid, eid }, more))
309 }
310}