zenoh_codec/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22    reader::{BacktrackableReader, DidntRead, Reader},
23    writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26    common::{imsg, ZExtZ64, ZExtZBufHeader},
27    core::{EntityId, Reliability, ZenohIdProto},
28    network::{
29        ext::{self, EntityGlobalIdType},
30        id, NetworkBody, NetworkBodyRef, NetworkMessage, NetworkMessageExt, NetworkMessageRef,
31    },
32};
33
34use crate::{
35    LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
36};
37
38// NetworkMessage
39impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for Zenoh080
40where
41    W: Writer,
42{
43    type Output = Result<(), DidntWrite>;
44
45    fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
46        let NetworkMessageRef { body, .. } = x;
47
48        match body {
49            NetworkBodyRef::Push(b) => self.write(&mut *writer, b),
50            NetworkBodyRef::Request(b) => self.write(&mut *writer, b),
51            NetworkBodyRef::Response(b) => self.write(&mut *writer, b),
52            NetworkBodyRef::ResponseFinal(b) => self.write(&mut *writer, b),
53            NetworkBodyRef::Interest(b) => self.write(&mut *writer, b),
54            NetworkBodyRef::Declare(b) => self.write(&mut *writer, b),
55            NetworkBodyRef::OAM(b) => self.write(&mut *writer, b),
56        }
57    }
58}
59
60impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
61where
62    W: Writer,
63{
64    type Output = Result<(), DidntWrite>;
65
66    fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
67        self.write(writer, x.as_ref())
68    }
69}
70
71impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
72where
73    R: Reader,
74{
75    type Error = DidntRead;
76
77    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
78        let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
79        codec.read(reader)
80    }
81}
82
83impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
84where
85    R: Reader,
86{
87    type Error = DidntRead;
88
89    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
90        let header: u8 = self.codec.read(&mut *reader)?;
91
92        let codec = Zenoh080Header::new(header);
93        let mut msg: NetworkMessage = codec.read(&mut *reader)?;
94        msg.reliability = self.reliability;
95        Ok(msg)
96    }
97}
98
99impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
100where
101    R: Reader,
102{
103    type Error = DidntRead;
104
105    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
106        let body = match imsg::mid(self.header) {
107            id::PUSH => NetworkBody::Push(self.read(&mut *reader)?),
108            id::REQUEST => NetworkBody::Request(self.read(&mut *reader)?),
109            id::RESPONSE => NetworkBody::Response(self.read(&mut *reader)?),
110            id::RESPONSE_FINAL => NetworkBody::ResponseFinal(self.read(&mut *reader)?),
111            id::INTEREST => NetworkBody::Interest(self.read(&mut *reader)?),
112            id::DECLARE => NetworkBody::Declare(self.read(&mut *reader)?),
113            id::OAM => NetworkBody::OAM(self.read(&mut *reader)?),
114            _ => return Err(DidntRead),
115        };
116
117        Ok(body.into())
118    }
119}
120
121pub struct NetworkMessageIter<R> {
122    codec: Zenoh080Reliability,
123    reader: R,
124}
125
126impl<R> NetworkMessageIter<R> {
127    pub fn new(reliability: Reliability, reader: R) -> Self {
128        let codec = Zenoh080Reliability::new(reliability);
129        Self { codec, reader }
130    }
131}
132
133impl<R: BacktrackableReader> Iterator for NetworkMessageIter<R> {
134    type Item = NetworkMessage;
135
136    fn next(&mut self) -> Option<Self::Item> {
137        let mark = self.reader.mark();
138        let msg = self.codec.read(&mut self.reader).ok();
139        if msg.is_none() {
140            self.reader.rewind(mark);
141        }
142        msg
143    }
144}
145
146// Extensions: QoS
147impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
148where
149    W: Writer,
150{
151    type Output = Result<(), DidntWrite>;
152
153    fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
154        let (x, more) = x;
155        let ext: ZExtZ64<{ ID }> = x.into();
156        self.write(&mut *writer, (&ext, more))
157    }
158}
159
160impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
161where
162    R: Reader,
163{
164    type Error = DidntRead;
165
166    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
167        let header: u8 = self.read(&mut *reader)?;
168        let codec = Zenoh080Header::new(header);
169        codec.read(reader)
170    }
171}
172
173impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
174where
175    R: Reader,
176{
177    type Error = DidntRead;
178
179    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
180        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
181        Ok((ext.into(), more))
182    }
183}
184
185// Extensions: Timestamp
186impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
187where
188    W: Writer,
189{
190    type Output = Result<(), DidntWrite>;
191
192    fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
193        let (tstamp, more) = x;
194        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
195        self.write(&mut *writer, (&header, more))?;
196        self.write(&mut *writer, &tstamp.timestamp)
197    }
198}
199
200impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
201where
202    R: Reader,
203{
204    type Error = DidntRead;
205
206    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
207        let header: u8 = self.read(&mut *reader)?;
208        let codec = Zenoh080Header::new(header);
209        codec.read(reader)
210    }
211}
212
213impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
214where
215    R: Reader,
216{
217    type Error = DidntRead;
218
219    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
220        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
221        let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
222        Ok((ext::TimestampType { timestamp }, more))
223    }
224}
225
226// Extensions: NodeId
227impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
228where
229    W: Writer,
230{
231    type Output = Result<(), DidntWrite>;
232
233    fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
234        let (x, more) = x;
235        let ext: ZExtZ64<{ ID }> = x.into();
236        self.write(&mut *writer, (&ext, more))
237    }
238}
239
240impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
241where
242    R: Reader,
243{
244    type Error = DidntRead;
245
246    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
247        let header: u8 = self.read(&mut *reader)?;
248        let codec = Zenoh080Header::new(header);
249        codec.read(reader)
250    }
251}
252
253impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
254where
255    R: Reader,
256{
257    type Error = DidntRead;
258
259    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
260        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
261        Ok((ext.into(), more))
262    }
263}
264
265// Extension: EntityId
266impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
267    fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
268        let EntityGlobalIdType { zid, eid } = x;
269
270        1 + self.w_len(zid) + self.w_len(*eid)
271    }
272}
273
274impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
275where
276    W: Writer,
277{
278    type Output = Result<(), DidntWrite>;
279
280    fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
281        let (x, more) = x;
282        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
283        self.write(&mut *writer, (&header, more))?;
284
285        let flags: u8 = (x.zid.size() as u8 - 1) << 4;
286        self.write(&mut *writer, flags)?;
287
288        let lodec = Zenoh080Length::new(x.zid.size());
289        lodec.write(&mut *writer, &x.zid)?;
290
291        self.write(&mut *writer, x.eid)?;
292        Ok(())
293    }
294}
295
296impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
297where
298    R: Reader,
299{
300    type Error = DidntRead;
301
302    fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
303        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
304
305        let flags: u8 = self.codec.read(&mut *reader)?;
306        let length = 1 + ((flags >> 4) as usize);
307
308        let lodec = Zenoh080Length::new(length);
309        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
310
311        let eid: EntityId = self.codec.read(&mut *reader)?;
312
313        Ok((ext::EntityGlobalIdType { zid, eid }, more))
314    }
315}