zenoh_codec/transport/
frame.rs1use zenoh_buffers::{
15 reader::{BacktrackableReader, DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19 common::{iext, imsg},
20 core::Reliability,
21 transport::{
22 frame::{ext, flag, Frame, FrameHeader},
23 id, TransportSn,
24 },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29impl<W> WCodec<&FrameHeader, &mut W> for Zenoh080
31where
32 W: Writer,
33{
34 type Output = Result<(), DidntWrite>;
35
36 fn write(self, writer: &mut W, x: &FrameHeader) -> Self::Output {
37 let FrameHeader {
38 reliability,
39 sn,
40 ext_qos,
41 } = x;
42
43 let mut header = id::FRAME;
45 if let Reliability::Reliable = reliability {
46 header |= flag::R;
47 }
48 if ext_qos != &ext::QoSType::DEFAULT {
49 header |= flag::Z;
50 }
51 self.write(&mut *writer, header)?;
52
53 self.write(&mut *writer, sn)?;
55
56 if ext_qos != &ext::QoSType::DEFAULT {
58 self.write(&mut *writer, (x.ext_qos, false))?;
59 }
60
61 Ok(())
62 }
63}
64
65impl<R> RCodec<FrameHeader, &mut R> for Zenoh080
66where
67 R: Reader,
68{
69 type Error = DidntRead;
70
71 fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
72 let header: u8 = self.read(&mut *reader)?;
73 let codec = Zenoh080Header::new(header);
74 codec.read(reader)
75 }
76}
77
78impl<R> RCodec<FrameHeader, &mut R> for Zenoh080Header
79where
80 R: Reader,
81{
82 type Error = DidntRead;
83
84 fn read(self, reader: &mut R) -> Result<FrameHeader, Self::Error> {
85 if imsg::mid(self.header) != id::FRAME {
86 return Err(DidntRead);
87 }
88
89 let reliability = match imsg::has_flag(self.header, flag::R) {
90 true => Reliability::Reliable,
91 false => Reliability::BestEffort,
92 };
93 let sn: TransportSn = self.codec.read(&mut *reader)?;
94
95 let mut ext_qos = ext::QoSType::DEFAULT;
97
98 let mut has_ext = imsg::has_flag(self.header, flag::Z);
99 while has_ext {
100 let ext: u8 = self.codec.read(&mut *reader)?;
101 let eodec = Zenoh080Header::new(ext);
102 match iext::eid(ext) {
103 ext::QoS::ID => {
104 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
105 ext_qos = q;
106 has_ext = ext;
107 }
108 _ => {
109 has_ext = extension::skip(reader, "Frame", ext)?;
110 }
111 }
112 }
113
114 Ok(FrameHeader {
115 reliability,
116 sn,
117 ext_qos,
118 })
119 }
120}
121
122impl<W> WCodec<&Frame, &mut W> for Zenoh080
124where
125 W: Writer,
126{
127 type Output = Result<(), DidntWrite>;
128
129 fn write(self, writer: &mut W, x: &Frame) -> Self::Output {
130 let Frame {
131 reliability,
132 sn,
133 payload,
134 ext_qos,
135 } = x;
136
137 let header = FrameHeader {
139 reliability: *reliability,
140 sn: *sn,
141 ext_qos: *ext_qos,
142 };
143 self.write(&mut *writer, &header)?;
144
145 writer.write_zslice(payload)?;
147
148 Ok(())
149 }
150}
151
152impl<R> RCodec<Frame, &mut R> for Zenoh080
153where
154 R: Reader + BacktrackableReader,
155{
156 type Error = DidntRead;
157
158 fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
159 let header: u8 = self.read(&mut *reader)?;
160 let codec = Zenoh080Header::new(header);
161 codec.read(reader)
162 }
163}
164
165impl<R> RCodec<Frame, &mut R> for Zenoh080Header
166where
167 R: Reader + BacktrackableReader,
168{
169 type Error = DidntRead;
170
171 fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
172 let header: FrameHeader = self.read(&mut *reader)?;
173 let payload = reader.read_zslice(reader.remaining())?;
174
175 Ok(Frame {
176 reliability: header.reliability,
177 sn: header.sn,
178 ext_qos: header.ext_qos,
179 payload,
180 })
181 }
182}