1mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22 reader::{DidntRead, Reader},
23 writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26 common::{imsg, ZExtZ64, ZExtZBufHeader},
27 core::{EntityId, Reliability, ZenohIdProto},
28 network::{ext::EntityGlobalIdType, *},
29};
30
31use crate::{
32 LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
33};
34
35impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
37where
38 W: Writer,
39{
40 type Output = Result<(), DidntWrite>;
41
42 fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
43 let NetworkMessage { body, .. } = x;
44
45 match body {
46 NetworkBody::Push(b) => self.write(&mut *writer, b),
47 NetworkBody::Request(b) => self.write(&mut *writer, b),
48 NetworkBody::Response(b) => self.write(&mut *writer, b),
49 NetworkBody::ResponseFinal(b) => self.write(&mut *writer, b),
50 NetworkBody::Interest(b) => self.write(&mut *writer, b),
51 NetworkBody::Declare(b) => self.write(&mut *writer, b),
52 NetworkBody::OAM(b) => self.write(&mut *writer, b),
53 }
54 }
55}
56
57impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
58where
59 R: Reader,
60{
61 type Error = DidntRead;
62
63 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
64 let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
65 codec.read(reader)
66 }
67}
68
69impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
70where
71 R: Reader,
72{
73 type Error = DidntRead;
74
75 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
76 let header: u8 = self.codec.read(&mut *reader)?;
77
78 let codec = Zenoh080Header::new(header);
79 let mut msg: NetworkMessage = codec.read(&mut *reader)?;
80 msg.reliability = self.reliability;
81 Ok(msg)
82 }
83}
84
85impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
86where
87 R: Reader,
88{
89 type Error = DidntRead;
90
91 fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
92 let body = match imsg::mid(self.header) {
93 id::PUSH => NetworkBody::Push(self.read(&mut *reader)?),
94 id::REQUEST => NetworkBody::Request(self.read(&mut *reader)?),
95 id::RESPONSE => NetworkBody::Response(self.read(&mut *reader)?),
96 id::RESPONSE_FINAL => NetworkBody::ResponseFinal(self.read(&mut *reader)?),
97 id::INTEREST => NetworkBody::Interest(self.read(&mut *reader)?),
98 id::DECLARE => NetworkBody::Declare(self.read(&mut *reader)?),
99 id::OAM => NetworkBody::OAM(self.read(&mut *reader)?),
100 _ => return Err(DidntRead),
101 };
102
103 Ok(body.into())
104 }
105}
106
107impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
109where
110 W: Writer,
111{
112 type Output = Result<(), DidntWrite>;
113
114 fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
115 let (x, more) = x;
116 let ext: ZExtZ64<{ ID }> = x.into();
117 self.write(&mut *writer, (&ext, more))
118 }
119}
120
121impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
122where
123 R: Reader,
124{
125 type Error = DidntRead;
126
127 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
128 let header: u8 = self.read(&mut *reader)?;
129 let codec = Zenoh080Header::new(header);
130 codec.read(reader)
131 }
132}
133
134impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
135where
136 R: Reader,
137{
138 type Error = DidntRead;
139
140 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
141 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
142 Ok((ext.into(), more))
143 }
144}
145
146impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
148where
149 W: Writer,
150{
151 type Output = Result<(), DidntWrite>;
152
153 fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
154 let (tstamp, more) = x;
155 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
156 self.write(&mut *writer, (&header, more))?;
157 self.write(&mut *writer, &tstamp.timestamp)
158 }
159}
160
161impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
162where
163 R: Reader,
164{
165 type Error = DidntRead;
166
167 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
168 let header: u8 = self.read(&mut *reader)?;
169 let codec = Zenoh080Header::new(header);
170 codec.read(reader)
171 }
172}
173
174impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
175where
176 R: Reader,
177{
178 type Error = DidntRead;
179
180 fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
181 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
182 let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
183 Ok((ext::TimestampType { timestamp }, more))
184 }
185}
186
187impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
189where
190 W: Writer,
191{
192 type Output = Result<(), DidntWrite>;
193
194 fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
195 let (x, more) = x;
196 let ext: ZExtZ64<{ ID }> = x.into();
197 self.write(&mut *writer, (&ext, more))
198 }
199}
200
201impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
202where
203 R: Reader,
204{
205 type Error = DidntRead;
206
207 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
208 let header: u8 = self.read(&mut *reader)?;
209 let codec = Zenoh080Header::new(header);
210 codec.read(reader)
211 }
212}
213
214impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
215where
216 R: Reader,
217{
218 type Error = DidntRead;
219
220 fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
221 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
222 Ok((ext.into(), more))
223 }
224}
225
226impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
228 fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
229 let EntityGlobalIdType { zid, eid } = x;
230
231 1 + self.w_len(zid) + self.w_len(*eid)
232 }
233}
234
235impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
236where
237 W: Writer,
238{
239 type Output = Result<(), DidntWrite>;
240
241 fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
242 let (x, more) = x;
243 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
244 self.write(&mut *writer, (&header, more))?;
245
246 let flags: u8 = (x.zid.size() as u8 - 1) << 4;
247 self.write(&mut *writer, flags)?;
248
249 let lodec = Zenoh080Length::new(x.zid.size());
250 lodec.write(&mut *writer, &x.zid)?;
251
252 self.write(&mut *writer, x.eid)?;
253 Ok(())
254 }
255}
256
257impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
258where
259 R: Reader,
260{
261 type Error = DidntRead;
262
263 fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
264 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
265
266 let flags: u8 = self.codec.read(&mut *reader)?;
267 let length = 1 + ((flags >> 4) as usize);
268
269 let lodec = Zenoh080Length::new(length);
270 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
271
272 let eid: EntityId = self.codec.read(&mut *reader)?;
273
274 Ok((ext::EntityGlobalIdType { zid, eid }, more))
275 }
276}