zenoh_codec/transport/
fragment.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{BacktrackableReader, DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19    common::{iext, imsg},
20    core::Reliability,
21    transport::{
22        fragment::{ext, flag, Fragment, FragmentHeader, TransportSn},
23        id,
24    },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29// FragmentHeader
30impl<W> WCodec<&FragmentHeader, &mut W> for Zenoh080
31where
32    W: Writer,
33{
34    type Output = Result<(), DidntWrite>;
35
36    fn write(self, writer: &mut W, x: &FragmentHeader) -> Self::Output {
37        let FragmentHeader {
38            reliability,
39            more,
40            sn,
41            ext_qos,
42            ext_first,
43            ext_drop,
44        } = x;
45
46        // Header
47        let mut header = id::FRAGMENT;
48        if let Reliability::Reliable = reliability {
49            header |= flag::R;
50        }
51        if *more {
52            header |= flag::M;
53        }
54        let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
55            + ext_first.is_some() as u8
56            + ext_drop.is_some() as u8;
57        if n_exts != 0 {
58            header |= flag::Z;
59        }
60        self.write(&mut *writer, header)?;
61
62        // Body
63        self.write(&mut *writer, sn)?;
64
65        // Extensions
66        if ext_qos != &ext::QoSType::DEFAULT {
67            n_exts -= 1;
68            self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
69        }
70        if let Some(first) = ext_first {
71            n_exts -= 1;
72            self.write(&mut *writer, (first, n_exts != 0))?
73        }
74        if let Some(drop) = ext_drop {
75            n_exts -= 1;
76            self.write(&mut *writer, (drop, n_exts != 0))?
77        }
78
79        Ok(())
80    }
81}
82
83impl<R> RCodec<FragmentHeader, &mut R> for Zenoh080
84where
85    R: Reader,
86{
87    type Error = DidntRead;
88
89    fn read(self, reader: &mut R) -> Result<FragmentHeader, Self::Error> {
90        let header: u8 = self.read(&mut *reader)?;
91        let codec = Zenoh080Header::new(header);
92        codec.read(reader)
93    }
94}
95
96impl<R> RCodec<FragmentHeader, &mut R> for Zenoh080Header
97where
98    R: Reader,
99{
100    type Error = DidntRead;
101
102    fn read(self, reader: &mut R) -> Result<FragmentHeader, Self::Error> {
103        if imsg::mid(self.header) != id::FRAGMENT {
104            return Err(DidntRead);
105        }
106
107        let reliability = match imsg::has_flag(self.header, flag::R) {
108            true => Reliability::Reliable,
109            false => Reliability::BestEffort,
110        };
111        let more = imsg::has_flag(self.header, flag::M);
112        let sn: TransportSn = self.codec.read(&mut *reader)?;
113
114        // Extensions
115        let mut ext_qos = ext::QoSType::DEFAULT;
116        let mut ext_first = None;
117        let mut ext_drop = None;
118
119        let mut has_ext = imsg::has_flag(self.header, flag::Z);
120        while has_ext {
121            let ext: u8 = self.codec.read(&mut *reader)?;
122            let eodec = Zenoh080Header::new(ext);
123            match iext::eid(ext) {
124                ext::QoS::ID => {
125                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
126                    ext_qos = q;
127                    has_ext = ext;
128                }
129                ext::First::ID => {
130                    let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?;
131                    ext_first = Some(first);
132                    has_ext = ext;
133                }
134                ext::Drop::ID => {
135                    let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?;
136                    ext_drop = Some(drop);
137                    has_ext = ext;
138                }
139                _ => {
140                    has_ext = extension::skip(reader, "Fragment", ext)?;
141                }
142            }
143        }
144
145        Ok(FragmentHeader {
146            reliability,
147            more,
148            sn,
149            ext_qos,
150            ext_first,
151            ext_drop,
152        })
153    }
154}
155
156// Fragment
157impl<W> WCodec<&Fragment, &mut W> for Zenoh080
158where
159    W: Writer,
160{
161    type Output = Result<(), DidntWrite>;
162
163    fn write(self, writer: &mut W, x: &Fragment) -> Self::Output {
164        let Fragment {
165            reliability,
166            more,
167            sn,
168            payload,
169            ext_qos,
170            ext_first,
171            ext_drop,
172        } = x;
173
174        // Header
175        let header = FragmentHeader {
176            reliability: *reliability,
177            more: *more,
178            sn: *sn,
179            ext_qos: *ext_qos,
180            ext_first: *ext_first,
181            ext_drop: *ext_drop,
182        };
183        self.write(&mut *writer, &header)?;
184
185        // Body
186        writer.write_zslice(payload)?;
187
188        Ok(())
189    }
190}
191
192impl<R> RCodec<Fragment, &mut R> for Zenoh080
193where
194    R: Reader + BacktrackableReader,
195{
196    type Error = DidntRead;
197
198    fn read(self, reader: &mut R) -> Result<Fragment, Self::Error> {
199        let header: u8 = self.read(&mut *reader)?;
200        let codec = Zenoh080Header::new(header);
201        codec.read(reader)
202    }
203}
204
205impl<R> RCodec<Fragment, &mut R> for Zenoh080Header
206where
207    R: Reader + BacktrackableReader,
208{
209    type Error = DidntRead;
210
211    fn read(self, reader: &mut R) -> Result<Fragment, Self::Error> {
212        let header: FragmentHeader = self.read(&mut *reader)?;
213        let payload = reader.read_zslice(reader.remaining())?;
214
215        Ok(Fragment {
216            reliability: header.reliability,
217            more: header.more,
218            sn: header.sn,
219            ext_qos: header.ext_qos,
220            ext_first: header.ext_first,
221            ext_drop: header.ext_drop,
222            payload,
223        })
224    }
225}