1use zenoh_buffers::{
15 reader::{BacktrackableReader, DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19 common::{iext, imsg},
20 core::Reliability,
21 transport::{
22 fragment::{ext, flag, Fragment, FragmentHeader, TransportSn},
23 id,
24 },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29impl<W> WCodec<&FragmentHeader, &mut W> for Zenoh080
31where
32 W: Writer,
33{
34 type Output = Result<(), DidntWrite>;
35
36 fn write(self, writer: &mut W, x: &FragmentHeader) -> Self::Output {
37 let FragmentHeader {
38 reliability,
39 more,
40 sn,
41 ext_qos,
42 ext_first,
43 ext_drop,
44 } = x;
45
46 let mut header = id::FRAGMENT;
48 if let Reliability::Reliable = reliability {
49 header |= flag::R;
50 }
51 if *more {
52 header |= flag::M;
53 }
54 let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
55 + ext_first.is_some() as u8
56 + ext_drop.is_some() as u8;
57 if n_exts != 0 {
58 header |= flag::Z;
59 }
60 self.write(&mut *writer, header)?;
61
62 self.write(&mut *writer, sn)?;
64
65 if ext_qos != &ext::QoSType::DEFAULT {
67 n_exts -= 1;
68 self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
69 }
70 if let Some(first) = ext_first {
71 n_exts -= 1;
72 self.write(&mut *writer, (first, n_exts != 0))?
73 }
74 if let Some(drop) = ext_drop {
75 n_exts -= 1;
76 self.write(&mut *writer, (drop, n_exts != 0))?
77 }
78
79 Ok(())
80 }
81}
82
83impl<R> RCodec<FragmentHeader, &mut R> for Zenoh080
84where
85 R: Reader,
86{
87 type Error = DidntRead;
88
89 fn read(self, reader: &mut R) -> Result<FragmentHeader, Self::Error> {
90 let header: u8 = self.read(&mut *reader)?;
91 let codec = Zenoh080Header::new(header);
92 codec.read(reader)
93 }
94}
95
96impl<R> RCodec<FragmentHeader, &mut R> for Zenoh080Header
97where
98 R: Reader,
99{
100 type Error = DidntRead;
101
102 fn read(self, reader: &mut R) -> Result<FragmentHeader, Self::Error> {
103 if imsg::mid(self.header) != id::FRAGMENT {
104 return Err(DidntRead);
105 }
106
107 let reliability = match imsg::has_flag(self.header, flag::R) {
108 true => Reliability::Reliable,
109 false => Reliability::BestEffort,
110 };
111 let more = imsg::has_flag(self.header, flag::M);
112 let sn: TransportSn = self.codec.read(&mut *reader)?;
113
114 let mut ext_qos = ext::QoSType::DEFAULT;
116 let mut ext_first = None;
117 let mut ext_drop = None;
118
119 let mut has_ext = imsg::has_flag(self.header, flag::Z);
120 while has_ext {
121 let ext: u8 = self.codec.read(&mut *reader)?;
122 let eodec = Zenoh080Header::new(ext);
123 match iext::eid(ext) {
124 ext::QoS::ID => {
125 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
126 ext_qos = q;
127 has_ext = ext;
128 }
129 ext::First::ID => {
130 let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?;
131 ext_first = Some(first);
132 has_ext = ext;
133 }
134 ext::Drop::ID => {
135 let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?;
136 ext_drop = Some(drop);
137 has_ext = ext;
138 }
139 _ => {
140 has_ext = extension::skip(reader, "Fragment", ext)?;
141 }
142 }
143 }
144
145 Ok(FragmentHeader {
146 reliability,
147 more,
148 sn,
149 ext_qos,
150 ext_first,
151 ext_drop,
152 })
153 }
154}
155
156impl<W> WCodec<&Fragment, &mut W> for Zenoh080
158where
159 W: Writer,
160{
161 type Output = Result<(), DidntWrite>;
162
163 fn write(self, writer: &mut W, x: &Fragment) -> Self::Output {
164 let Fragment {
165 reliability,
166 more,
167 sn,
168 payload,
169 ext_qos,
170 ext_first,
171 ext_drop,
172 } = x;
173
174 let header = FragmentHeader {
176 reliability: *reliability,
177 more: *more,
178 sn: *sn,
179 ext_qos: *ext_qos,
180 ext_first: *ext_first,
181 ext_drop: *ext_drop,
182 };
183 self.write(&mut *writer, &header)?;
184
185 writer.write_zslice(payload)?;
187
188 Ok(())
189 }
190}
191
192impl<R> RCodec<Fragment, &mut R> for Zenoh080
193where
194 R: Reader + BacktrackableReader,
195{
196 type Error = DidntRead;
197
198 fn read(self, reader: &mut R) -> Result<Fragment, Self::Error> {
199 let header: u8 = self.read(&mut *reader)?;
200 let codec = Zenoh080Header::new(header);
201 codec.read(reader)
202 }
203}
204
205impl<R> RCodec<Fragment, &mut R> for Zenoh080Header
206where
207 R: Reader + BacktrackableReader,
208{
209 type Error = DidntRead;
210
211 fn read(self, reader: &mut R) -> Result<Fragment, Self::Error> {
212 let header: FragmentHeader = self.read(&mut *reader)?;
213 let payload = reader.read_zslice(reader.remaining())?;
214
215 Ok(Fragment {
216 reliability: header.reliability,
217 more: header.more,
218 sn: header.sn,
219 ext_qos: header.ext_qos,
220 ext_first: header.ext_first,
221 ext_drop: header.ext_drop,
222 payload,
223 })
224 }
225}