zenoh_codec/transport/
frame.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{BacktrackableReader, DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19    common::{iext, imsg},
20    core::Reliability,
21    transport::{
22        frame::{ext, flag, Frame, FrameHeader},
23        id, TransportSn,
24    },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29// FrameHeader
30impl<W> WCodec<&FrameHeader, &mut W> for Zenoh080
31where
32    W: Writer,
33{
34    type Output = Result<(), DidntWrite>;
35
36    fn write(self, writer: &mut W, x: &FrameHeader) -> Self::Output {
37        let FrameHeader {
38            reliability,
39            sn,
40            ext_qos,
41        } = x;
42
43        // Header
44        let mut header = id::FRAME;
45        if let Reliability::Reliable = reliability {
46            header |= flag::R;
47        }
48        if ext_qos != &ext::QoSType::DEFAULT {
49            header |= flag::Z;
50        }
51        self.write(&mut *writer, header)?;
52
53        // Body
54        self.write(&mut *writer, sn)?;
55
56        // Extensions
57        if ext_qos != &ext::QoSType::DEFAULT {
58            self.write(&mut *writer, (x.ext_qos, false))?;
59        }
60
61        Ok(())
62    }
63}
64
65impl<R> RCodec<FrameHeader, &mut R> for Zenoh080
66where
67    R: Reader,
68{
69    type Error = DidntRead;
70
71    fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
72        let header: u8 = self.read(&mut *reader)?;
73        let codec = Zenoh080Header::new(header);
74        codec.read(reader)
75    }
76}
77
78impl<R> RCodec<FrameHeader, &mut R> for Zenoh080Header
79where
80    R: Reader,
81{
82    type Error = DidntRead;
83
84    fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
85        if imsg::mid(self.header) != id::FRAME {
86            return Err(DidntRead);
87        }
88
89        let reliability = match imsg::has_flag(self.header, flag::R) {
90            true => Reliability::Reliable,
91            false => Reliability::BestEffort,
92        };
93        let sn: TransportSn = self.codec.read(&mut *reader)?;
94
95        // Extensions
96        let mut ext_qos = ext::QoSType::DEFAULT;
97
98        let mut has_ext = imsg::has_flag(self.header, flag::Z);
99        while has_ext {
100            let ext: u8 = self.codec.read(&mut *reader)?;
101            let eodec = Zenoh080Header::new(ext);
102            match iext::eid(ext) {
103                ext::QoS::ID => {
104                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
105                    ext_qos = q;
106                    has_ext = ext;
107                }
108                _ => {
109                    has_ext = extension::skip(reader, "Frame", ext)?;
110                }
111            }
112        }
113
114        Ok(FrameHeader {
115            reliability,
116            sn,
117            ext_qos,
118        })
119    }
120}
121
122// Frame
123impl<W> WCodec<&Frame, &mut W> for Zenoh080
124where
125    W: Writer,
126{
127    type Output = Result<(), DidntWrite>;
128
129    fn write(self, writer: &mut W, x: &Frame) -> Self::Output {
130        let Frame {
131            reliability,
132            sn,
133            payload,
134            ext_qos,
135        } = x;
136
137        // Header
138        let header = FrameHeader {
139            reliability: *reliability,
140            sn: *sn,
141            ext_qos: *ext_qos,
142        };
143        self.write(&mut *writer, &header)?;
144
145        // Body
146        writer.write_zslice(payload)?;
147
148        Ok(())
149    }
150}
151
152impl<R> RCodec<Frame, &mut R> for Zenoh080
153where
154    R: Reader + BacktrackableReader,
155{
156    type Error = DidntRead;
157
158    fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
159        let header: u8 = self.read(&mut *reader)?;
160        let codec = Zenoh080Header::new(header);
161        codec.read(reader)
162    }
163}
164
165impl<R> RCodec<Frame, &mut R> for Zenoh080Header
166where
167    R: Reader + BacktrackableReader,
168{
169    type Error = DidntRead;
170
171    fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
172        let header: FrameHeader = self.read(&mut *reader)?;
173        let payload = reader.read_zslice(reader.remaining())?;
174
175        Ok(Frame {
176            reliability: header.reliability,
177            sn: header.sn,
178            ext_qos: header.ext_qos,
179            payload,
180        })
181    }
182}