zenoh_codec/transport/
frame.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::vec::Vec;
15
16use zenoh_buffers::{
17    reader::{BacktrackableReader, DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21    common::{iext, imsg},
22    core::Reliability,
23    network::NetworkMessage,
24    transport::{
25        frame::{ext, flag, Frame, FrameHeader},
26        id, TransportSn,
27    },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Reliability};
31
32// FrameHeader
33impl<W> WCodec<&FrameHeader, &mut W> for Zenoh080
34where
35    W: Writer,
36{
37    type Output = Result<(), DidntWrite>;
38
39    fn write(self, writer: &mut W, x: &FrameHeader) -> Self::Output {
40        let FrameHeader {
41            reliability,
42            sn,
43            ext_qos,
44        } = x;
45
46        // Header
47        let mut header = id::FRAME;
48        if let Reliability::Reliable = reliability {
49            header |= flag::R;
50        }
51        if ext_qos != &ext::QoSType::DEFAULT {
52            header |= flag::Z;
53        }
54        self.write(&mut *writer, header)?;
55
56        // Body
57        self.write(&mut *writer, sn)?;
58
59        // Extensions
60        if ext_qos != &ext::QoSType::DEFAULT {
61            self.write(&mut *writer, (x.ext_qos, false))?;
62        }
63
64        Ok(())
65    }
66}
67
68impl<R> RCodec<FrameHeader, &mut R> for Zenoh080
69where
70    R: Reader,
71{
72    type Error = DidntRead;
73
74    fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
75        let header: u8 = self.read(&mut *reader)?;
76        let codec = Zenoh080Header::new(header);
77        codec.read(reader)
78    }
79}
80
81impl<R> RCodec<FrameHeader, &mut R> for Zenoh080Header
82where
83    R: Reader,
84{
85    type Error = DidntRead;
86
87    fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
88        if imsg::mid(self.header) != id::FRAME {
89            return Err(DidntRead);
90        }
91
92        let reliability = match imsg::has_flag(self.header, flag::R) {
93            true => Reliability::Reliable,
94            false => Reliability::BestEffort,
95        };
96        let sn: TransportSn = self.codec.read(&mut *reader)?;
97
98        // Extensions
99        let mut ext_qos = ext::QoSType::DEFAULT;
100
101        let mut has_ext = imsg::has_flag(self.header, flag::Z);
102        while has_ext {
103            let ext: u8 = self.codec.read(&mut *reader)?;
104            let eodec = Zenoh080Header::new(ext);
105            match iext::eid(ext) {
106                ext::QoS::ID => {
107                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
108                    ext_qos = q;
109                    has_ext = ext;
110                }
111                _ => {
112                    has_ext = extension::skip(reader, "Frame", ext)?;
113                }
114            }
115        }
116
117        Ok(FrameHeader {
118            reliability,
119            sn,
120            ext_qos,
121        })
122    }
123}
124
125// Frame
126impl<W> WCodec<&Frame, &mut W> for Zenoh080
127where
128    W: Writer,
129{
130    type Output = Result<(), DidntWrite>;
131
132    fn write(self, writer: &mut W, x: &Frame) -> Self::Output {
133        let Frame {
134            reliability,
135            sn,
136            payload,
137            ext_qos,
138        } = x;
139
140        // Header
141        let header = FrameHeader {
142            reliability: *reliability,
143            sn: *sn,
144            ext_qos: *ext_qos,
145        };
146        self.write(&mut *writer, &header)?;
147
148        // Body
149        for m in payload.iter() {
150            self.write(&mut *writer, m)?;
151        }
152
153        Ok(())
154    }
155}
156
157impl<R> RCodec<Frame, &mut R> for Zenoh080
158where
159    R: Reader + BacktrackableReader,
160{
161    type Error = DidntRead;
162
163    fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
164        let header: u8 = self.read(&mut *reader)?;
165        let codec = Zenoh080Header::new(header);
166        codec.read(reader)
167    }
168}
169
170impl<R> RCodec<Frame, &mut R> for Zenoh080Header
171where
172    R: Reader + BacktrackableReader,
173{
174    type Error = DidntRead;
175
176    fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
177        let header: FrameHeader = self.read(&mut *reader)?;
178
179        let rcode = Zenoh080Reliability::new(header.reliability);
180        let mut payload = Vec::new();
181        while reader.can_read() {
182            let mark = reader.mark();
183            let res: Result<NetworkMessage, DidntRead> = rcode.read(&mut *reader);
184            match res {
185                Ok(m) => payload.push(m),
186                Err(_) => {
187                    reader.rewind(mark);
188                    break;
189                }
190            }
191        }
192
193        Ok(Frame {
194            reliability: header.reliability,
195            sn: header.sn,
196            ext_qos: header.ext_qos,
197            payload,
198        })
199    }
200}
201
202#[derive(Debug)]
203pub struct FrameReader<'a, R: BacktrackableReader> {
204    pub reliability: Reliability,
205    pub sn: TransportSn,
206    pub ext_qos: ext::QoSType,
207    reader: &'a mut R,
208}
209
210impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for Zenoh080 {
211    type Error = DidntRead;
212
213    fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
214        let mark = reader.mark();
215        let Ok(header): Result<FrameHeader, _> = self.read(&mut *reader) else {
216            reader.rewind(mark);
217            return Err(DidntRead);
218        };
219        Ok(FrameReader {
220            reliability: header.reliability,
221            sn: header.sn,
222            ext_qos: header.ext_qos,
223            reader,
224        })
225    }
226}
227
228impl<R: BacktrackableReader> Iterator for FrameReader<'_, R> {
229    type Item = NetworkMessage;
230
231    fn next(&mut self) -> Option<Self::Item> {
232        let mark = self.reader.mark();
233        let msg = Zenoh080Reliability::new(self.reliability).read(self.reader);
234        if msg.is_err() {
235            self.reader.rewind(mark);
236        }
237        msg.ok()
238    }
239}
240
241impl<R: BacktrackableReader> Drop for FrameReader<'_, R> {
242    fn drop(&mut self) {
243        for _ in self {}
244    }
245}