Skip to main content

zenoh_codec/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14mod declare;
15mod interest;
16mod oam;
17mod push;
18mod request;
19mod response;
20
21use zenoh_buffers::{
22    reader::{BacktrackableReader, DidntRead, Reader},
23    writer::{DidntWrite, Writer},
24};
25use zenoh_protocol::{
26    common::{imsg, ZExtZ64, ZExtZBufHeader},
27    core::{EntityId, Reliability, ZenohIdProto},
28    network::{
29        ext::{self, EntityGlobalIdType},
30        id, NetworkBody, NetworkBodyRef, NetworkMessage, NetworkMessageExt, NetworkMessageRef,
31    },
32};
33
34use crate::{
35    LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length, Zenoh080Reliability,
36};
37
38// NetworkMessage
39impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for Zenoh080
40where
41    W: Writer,
42{
43    type Output = Result<(), DidntWrite>;
44
45    #[inline(always)]
46    fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
47        let NetworkMessageRef { body, .. } = x;
48        if let NetworkBodyRef::Push(b) = body {
49            return self.write(&mut *writer, b);
50        }
51        #[cold]
52        fn write_not_push<W: Writer>(
53            codec: Zenoh080,
54            writer: &mut W,
55            body: NetworkBodyRef,
56        ) -> Result<(), DidntWrite> {
57            match body {
58                NetworkBodyRef::Push(_) => unreachable!(),
59                NetworkBodyRef::Request(b) => codec.write(&mut *writer, b),
60                NetworkBodyRef::Response(b) => codec.write(&mut *writer, b),
61                NetworkBodyRef::ResponseFinal(b) => codec.write(&mut *writer, b),
62                NetworkBodyRef::Interest(b) => codec.write(&mut *writer, b),
63                NetworkBodyRef::Declare(b) => codec.write(&mut *writer, b),
64                NetworkBodyRef::OAM(b) => codec.write(&mut *writer, b),
65            }
66        }
67        write_not_push(self, writer, body)
68    }
69}
70
71impl<W> WCodec<&NetworkMessage, &mut W> for Zenoh080
72where
73    W: Writer,
74{
75    type Output = Result<(), DidntWrite>;
76
77    fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
78        self.write(writer, x.as_ref())
79    }
80}
81
82impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080
83where
84    R: Reader,
85{
86    type Error = DidntRead;
87
88    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
89        let codec = Zenoh080Reliability::new(Reliability::DEFAULT);
90        codec.read(reader)
91    }
92}
93
94impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Reliability
95where
96    R: Reader,
97{
98    type Error = DidntRead;
99
100    #[inline(always)]
101    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
102        let header: u8 = self.codec.read(&mut *reader)?;
103
104        let codec = Zenoh080Header::new(header);
105        let mut msg: NetworkMessage = codec.read(&mut *reader)?;
106        msg.reliability = self.reliability;
107        Ok(msg)
108    }
109}
110
111impl<R> RCodec<NetworkMessage, &mut R> for Zenoh080Header
112where
113    R: Reader,
114{
115    type Error = DidntRead;
116
117    #[inline(always)]
118    fn read(self, reader: &mut R) -> Result<NetworkMessage, Self::Error> {
119        if imsg::mid(self.header) == id::PUSH {
120            return Ok(NetworkBody::Push(self.read(&mut *reader)?).into());
121        }
122        #[cold]
123        fn read_not_push<R: Reader>(
124            header: Zenoh080Header,
125            reader: &mut R,
126        ) -> Result<NetworkMessage, DidntRead> {
127            let body = match imsg::mid(header.header) {
128                id::REQUEST => NetworkBody::Request(header.read(&mut *reader)?),
129                id::RESPONSE => NetworkBody::Response(header.read(&mut *reader)?),
130                id::RESPONSE_FINAL => NetworkBody::ResponseFinal(header.read(&mut *reader)?),
131                id::INTEREST => NetworkBody::Interest(header.read(&mut *reader)?),
132                id::DECLARE => NetworkBody::Declare(header.read(&mut *reader)?),
133                id::OAM => NetworkBody::OAM(header.read(&mut *reader)?),
134                _ => return Err(DidntRead),
135            };
136
137            Ok(body.into())
138        }
139        read_not_push(self, reader)
140    }
141}
142
143pub struct NetworkMessageIter<R> {
144    codec: Zenoh080Reliability,
145    reader: R,
146}
147
148impl<R> NetworkMessageIter<R> {
149    pub fn new(reliability: Reliability, reader: R) -> Self {
150        let codec = Zenoh080Reliability::new(reliability);
151        Self { codec, reader }
152    }
153}
154
155impl<R: BacktrackableReader> Iterator for NetworkMessageIter<R> {
156    type Item = NetworkMessage;
157
158    fn next(&mut self) -> Option<Self::Item> {
159        let mark = self.reader.mark();
160        let msg = self.codec.read(&mut self.reader).ok();
161        if msg.is_none() {
162            self.reader.rewind(mark);
163        }
164        msg
165    }
166}
167
168// Extensions: QoS
169impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
170where
171    W: Writer,
172{
173    type Output = Result<(), DidntWrite>;
174
175    fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
176        let (x, more) = x;
177        let ext: ZExtZ64<{ ID }> = x.into();
178        self.write(&mut *writer, (&ext, more))
179    }
180}
181
182impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
183where
184    R: Reader,
185{
186    type Error = DidntRead;
187
188    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
189        let header: u8 = self.read(&mut *reader)?;
190        let codec = Zenoh080Header::new(header);
191        codec.read(reader)
192    }
193}
194
195impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
196where
197    R: Reader,
198{
199    type Error = DidntRead;
200
201    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
202        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
203        Ok((ext.into(), more))
204    }
205}
206
207// Extensions: Timestamp
208impl<W, const ID: u8> WCodec<(&ext::TimestampType<{ ID }>, bool), &mut W> for Zenoh080
209where
210    W: Writer,
211{
212    type Output = Result<(), DidntWrite>;
213
214    fn write(self, writer: &mut W, x: (&ext::TimestampType<{ ID }>, bool)) -> Self::Output {
215        let (tstamp, more) = x;
216        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(&tstamp.timestamp));
217        self.write(&mut *writer, (&header, more))?;
218        self.write(&mut *writer, &tstamp.timestamp)
219    }
220}
221
222impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080
223where
224    R: Reader,
225{
226    type Error = DidntRead;
227
228    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
229        let header: u8 = self.read(&mut *reader)?;
230        let codec = Zenoh080Header::new(header);
231        codec.read(reader)
232    }
233}
234
235impl<R, const ID: u8> RCodec<(ext::TimestampType<{ ID }>, bool), &mut R> for Zenoh080Header
236where
237    R: Reader,
238{
239    type Error = DidntRead;
240
241    fn read(self, reader: &mut R) -> Result<(ext::TimestampType<{ ID }>, bool), Self::Error> {
242        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
243        let timestamp: uhlc::Timestamp = self.codec.read(&mut *reader)?;
244        Ok((ext::TimestampType { timestamp }, more))
245    }
246}
247
248// Extensions: NodeId
249impl<W, const ID: u8> WCodec<(ext::NodeIdType<{ ID }>, bool), &mut W> for Zenoh080
250where
251    W: Writer,
252{
253    type Output = Result<(), DidntWrite>;
254
255    fn write(self, writer: &mut W, x: (ext::NodeIdType<{ ID }>, bool)) -> Self::Output {
256        let (x, more) = x;
257        let ext: ZExtZ64<{ ID }> = x.into();
258        self.write(&mut *writer, (&ext, more))
259    }
260}
261
262impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080
263where
264    R: Reader,
265{
266    type Error = DidntRead;
267
268    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
269        let header: u8 = self.read(&mut *reader)?;
270        let codec = Zenoh080Header::new(header);
271        codec.read(reader)
272    }
273}
274
275impl<R, const ID: u8> RCodec<(ext::NodeIdType<{ ID }>, bool), &mut R> for Zenoh080Header
276where
277    R: Reader,
278{
279    type Error = DidntRead;
280
281    fn read(self, reader: &mut R) -> Result<(ext::NodeIdType<{ ID }>, bool), Self::Error> {
282        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
283        Ok((ext.into(), more))
284    }
285}
286
287// Extension: EntityId
288impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
289    fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
290        let EntityGlobalIdType { zid, eid } = x;
291
292        1 + self.w_len(zid) + self.w_len(*eid)
293    }
294}
295
296impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
297where
298    W: Writer,
299{
300    type Output = Result<(), DidntWrite>;
301
302    fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
303        let (x, more) = x;
304        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
305        self.write(&mut *writer, (&header, more))?;
306
307        let flags: u8 = (x.zid.size() as u8 - 1) << 4;
308        self.write(&mut *writer, flags)?;
309
310        let lodec = Zenoh080Length::new(x.zid.size());
311        lodec.write(&mut *writer, &x.zid)?;
312
313        self.write(&mut *writer, x.eid)?;
314        Ok(())
315    }
316}
317
318impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
319where
320    R: Reader,
321{
322    type Error = DidntRead;
323
324    fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
325        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
326
327        let flags: u8 = self.codec.read(&mut *reader)?;
328        let length = 1 + ((flags >> 4) as usize);
329
330        let lodec = Zenoh080Length::new(length);
331        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
332
333        let eid: EntityId = self.codec.read(&mut *reader)?;
334
335        Ok((ext::EntityGlobalIdType { zid, eid }, more))
336    }
337}