1use alloc::vec::Vec;
15
16use zenoh_buffers::{
17 reader::{BacktrackableReader, DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21 common::{iext, imsg},
22 core::Reliability,
23 network::NetworkMessage,
24 transport::{
25 frame::{ext, flag, Frame, FrameHeader},
26 id, TransportSn,
27 },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Reliability};
31
32impl<W> WCodec<&FrameHeader, &mut W> for Zenoh080
34where
35 W: Writer,
36{
37 type Output = Result<(), DidntWrite>;
38
39 fn write(self, writer: &mut W, x: &FrameHeader) -> Self::Output {
40 let FrameHeader {
41 reliability,
42 sn,
43 ext_qos,
44 } = x;
45
46 let mut header = id::FRAME;
48 if let Reliability::Reliable = reliability {
49 header |= flag::R;
50 }
51 if ext_qos != &ext::QoSType::DEFAULT {
52 header |= flag::Z;
53 }
54 self.write(&mut *writer, header)?;
55
56 self.write(&mut *writer, sn)?;
58
59 if ext_qos != &ext::QoSType::DEFAULT {
61 self.write(&mut *writer, (x.ext_qos, false))?;
62 }
63
64 Ok(())
65 }
66}
67
68impl<R> RCodec<FrameHeader, &mut R> for Zenoh080
69where
70 R: Reader,
71{
72 type Error = DidntRead;
73
74 fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
75 let header: u8 = self.read(&mut *reader)?;
76 let codec = Zenoh080Header::new(header);
77 codec.read(reader)
78 }
79}
80
81impl<R> RCodec<FrameHeader, &mut R> for Zenoh080Header
82where
83 R: Reader,
84{
85 type Error = DidntRead;
86
87 fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
88 if imsg::mid(self.header) != id::FRAME {
89 return Err(DidntRead);
90 }
91
92 let reliability = match imsg::has_flag(self.header, flag::R) {
93 true => Reliability::Reliable,
94 false => Reliability::BestEffort,
95 };
96 let sn: TransportSn = self.codec.read(&mut *reader)?;
97
98 let mut ext_qos = ext::QoSType::DEFAULT;
100
101 let mut has_ext = imsg::has_flag(self.header, flag::Z);
102 while has_ext {
103 let ext: u8 = self.codec.read(&mut *reader)?;
104 let eodec = Zenoh080Header::new(ext);
105 match iext::eid(ext) {
106 ext::QoS::ID => {
107 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
108 ext_qos = q;
109 has_ext = ext;
110 }
111 _ => {
112 has_ext = extension::skip(reader, "Frame", ext)?;
113 }
114 }
115 }
116
117 Ok(FrameHeader {
118 reliability,
119 sn,
120 ext_qos,
121 })
122 }
123}
124
125impl<W> WCodec<&Frame, &mut W> for Zenoh080
127where
128 W: Writer,
129{
130 type Output = Result<(), DidntWrite>;
131
132 fn write(self, writer: &mut W, x: &Frame) -> Self::Output {
133 let Frame {
134 reliability,
135 sn,
136 payload,
137 ext_qos,
138 } = x;
139
140 let header = FrameHeader {
142 reliability: *reliability,
143 sn: *sn,
144 ext_qos: *ext_qos,
145 };
146 self.write(&mut *writer, &header)?;
147
148 for m in payload.iter() {
150 self.write(&mut *writer, m)?;
151 }
152
153 Ok(())
154 }
155}
156
157impl<R> RCodec<Frame, &mut R> for Zenoh080
158where
159 R: Reader + BacktrackableReader,
160{
161 type Error = DidntRead;
162
163 fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
164 let header: u8 = self.read(&mut *reader)?;
165 let codec = Zenoh080Header::new(header);
166 codec.read(reader)
167 }
168}
169
170impl<R> RCodec<Frame, &mut R> for Zenoh080Header
171where
172 R: Reader + BacktrackableReader,
173{
174 type Error = DidntRead;
175
176 fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
177 let header: FrameHeader = self.read(&mut *reader)?;
178
179 let rcode = Zenoh080Reliability::new(header.reliability);
180 let mut payload = Vec::new();
181 while reader.can_read() {
182 let mark = reader.mark();
183 let res: Result<NetworkMessage, DidntRead> = rcode.read(&mut *reader);
184 match res {
185 Ok(m) => payload.push(m),
186 Err(_) => {
187 reader.rewind(mark);
188 break;
189 }
190 }
191 }
192
193 Ok(Frame {
194 reliability: header.reliability,
195 sn: header.sn,
196 ext_qos: header.ext_qos,
197 payload,
198 })
199 }
200}
201
202#[derive(Debug)]
203pub struct FrameReader<'a, R: BacktrackableReader> {
204 pub reliability: Reliability,
205 pub sn: TransportSn,
206 pub ext_qos: ext::QoSType,
207 reader: &'a mut R,
208}
209
210impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for Zenoh080 {
211 type Error = DidntRead;
212
213 fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
214 let mark = reader.mark();
215 let Ok(header): Result<FrameHeader, _> = self.read(&mut *reader) else {
216 reader.rewind(mark);
217 return Err(DidntRead);
218 };
219 Ok(FrameReader {
220 reliability: header.reliability,
221 sn: header.sn,
222 ext_qos: header.ext_qos,
223 reader,
224 })
225 }
226}
227
228impl<R: BacktrackableReader> Iterator for FrameReader<'_, R> {
229 type Item = NetworkMessage;
230
231 fn next(&mut self) -> Option<Self::Item> {
232 let mark = self.reader.mark();
233 let msg = Zenoh080Reliability::new(self.reliability).read(self.reader);
234 if msg.is_err() {
235 self.reader.rewind(mark);
236 }
237 msg.ok()
238 }
239}
240
241impl<R: BacktrackableReader> Drop for FrameReader<'_, R> {
242 fn drop(&mut self) {
243 for _ in self {}
244 }
245}