zenoh_codec/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22    reader::{DidntRead, Reader},
23    writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26    common::{imsg, ZExtZ64, ZExtZBufHeader},
27    core::{EntityId, Reliability, ZenohIdProto},
28    network::{ext::EntityGlobalIdType, *},
29};
30
31use crate::{
32    LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
33};
34
35// NetworkMessage
36impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
37where
38    W: Writer,
39{
40    type Output = Result<(), DidntWrite>;
41
42    fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
43        let NetworkMessage { body, .. } = x;
44
45        match body {
46            NetworkBody::Push(b) => self.write(&mut *writer, b),
47            NetworkBody::Request(b) => self.write(&mut *writer, b),
48            NetworkBody::Response(b) => self.write(&mut *writer, b),
49            NetworkBody::ResponseFinal(b) => self.write(&mut *writer, b),
50            NetworkBody::Interest(b) => self.write(&mut *writer, b),
51            NetworkBody::Declare(b) => self.write(&mut *writer, b),
52            NetworkBody::OAM(b) => self.write(&mut *writer, b),
53        }
54    }
55}
56
57impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
58where
59    R: Reader,
60{
61    type Error = DidntRead;
62
63    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
64        let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
65        codec.read(reader)
66    }
67}
68
69impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
70where
71    R: Reader,
72{
73    type Error = DidntRead;
74
75    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
76        let header: u8 = self.codec.read(&mut *reader)?;
77
78        let codec = Zenoh080Header::new(header);
79        let mut msg: NetworkMessage = codec.read(&mut *reader)?;
80        msg.reliability = self.reliability;
81        Ok(msg)
82    }
83}
84
85impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
86where
87    R: Reader,
88{
89    type Error = DidntRead;
90
91    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
92        let body = match imsg::mid(self.header) {
93            id::PUSH => NetworkBody::Push(self.read(&mut *reader)?),
94            id::REQUEST => NetworkBody::Request(self.read(&mut *reader)?),
95            id::RESPONSE => NetworkBody::Response(self.read(&mut *reader)?),
96            id::RESPONSE_FINAL => NetworkBody::ResponseFinal(self.read(&mut *reader)?),
97            id::INTEREST => NetworkBody::Interest(self.read(&mut *reader)?),
98            id::DECLARE => NetworkBody::Declare(self.read(&mut *reader)?),
99            id::OAM => NetworkBody::OAM(self.read(&mut *reader)?),
100            _ => return Err(DidntRead),
101        };
102
103        Ok(body.into())
104    }
105}
106
107// Extensions: QoS
108impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
109where
110    W: Writer,
111{
112    type Output = Result<(), DidntWrite>;
113
114    fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
115        let (x, more) = x;
116        let ext: ZExtZ64<{ ID }> = x.into();
117        self.write(&mut *writer, (&ext, more))
118    }
119}
120
121impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
122where
123    R: Reader,
124{
125    type Error = DidntRead;
126
127    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
128        let header: u8 = self.read(&mut *reader)?;
129        let codec = Zenoh080Header::new(header);
130        codec.read(reader)
131    }
132}
133
134impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
135where
136    R: Reader,
137{
138    type Error = DidntRead;
139
140    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
141        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
142        Ok((ext.into(), more))
143    }
144}
145
146// Extensions: Timestamp
147impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
148where
149    W: Writer,
150{
151    type Output = Result<(), DidntWrite>;
152
153    fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
154        let (tstamp, more) = x;
155        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
156        self.write(&mut *writer, (&header, more))?;
157        self.write(&mut *writer, &tstamp.timestamp)
158    }
159}
160
161impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
162where
163    R: Reader,
164{
165    type Error = DidntRead;
166
167    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
168        let header: u8 = self.read(&mut *reader)?;
169        let codec = Zenoh080Header::new(header);
170        codec.read(reader)
171    }
172}
173
174impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
175where
176    R: Reader,
177{
178    type Error = DidntRead;
179
180    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
181        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
182        let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
183        Ok((ext::TimestampType { timestamp }, more))
184    }
185}
186
187// Extensions: NodeId
188impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
189where
190    W: Writer,
191{
192    type Output = Result<(), DidntWrite>;
193
194    fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
195        let (x, more) = x;
196        let ext: ZExtZ64<{ ID }> = x.into();
197        self.write(&mut *writer, (&ext, more))
198    }
199}
200
201impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
202where
203    R: Reader,
204{
205    type Error = DidntRead;
206
207    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
208        let header: u8 = self.read(&mut *reader)?;
209        let codec = Zenoh080Header::new(header);
210        codec.read(reader)
211    }
212}
213
214impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
215where
216    R: Reader,
217{
218    type Error = DidntRead;
219
220    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
221        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
222        Ok((ext.into(), more))
223    }
224}
225
226// Extension: EntityId
227impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
228    fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
229        let EntityGlobalIdType { zid, eid } = x;
230
231        1 + self.w_len(zid) + self.w_len(*eid)
232    }
233}
234
235impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
236where
237    W: Writer,
238{
239    type Output = Result<(), DidntWrite>;
240
241    fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
242        let (x, more) = x;
243        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
244        self.write(&mut *writer, (&header, more))?;
245
246        let flags: u8 = (x.zid.size() as u8 - 1) << 4;
247        self.write(&mut *writer, flags)?;
248
249        let lodec = Zenoh080Length::new(x.zid.size());
250        lodec.write(&mut *writer, &x.zid)?;
251
252        self.write(&mut *writer, x.eid)?;
253        Ok(())
254    }
255}
256
257impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
258where
259    R: Reader,
260{
261    type Error = DidntRead;
262
263    fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
264        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
265
266        let flags: u8 = self.codec.read(&mut *reader)?;
267        let length = 1 + ((flags >> 4) as usize);
268
269        let lodec = Zenoh080Length::new(length);
270        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
271
272        let eid: EntityId = self.codec.read(&mut *reader)?;
273
274        Ok((ext::EntityGlobalIdType { zid, eid }, more))
275    }
276}