1pub mod batch;
15mod close;
16mod fragment;
17pub mod frame;
18mod init;
19mod join;
20mod keepalive;
21mod oam;
22mod open;
23
24use zenoh_buffers::{
25 reader::{BacktrackableReader, DidntRead, Reader},
26 writer::{DidntWrite, Writer},
27};
28use zenoh_protocol::{
29 common::{imsg, ZExtZ64},
30 network::NetworkMessage,
31 transport::*,
32};
33
34use crate::{RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36impl<W> WCodec<TransportMessageLowLatencyRef<'_>, &mut W> for Zenoh080
38where
39 W: Writer,
40{
41 type Output = Result<(), DidntWrite>;
42
43 fn write(self, writer: &mut W, x: TransportMessageLowLatencyRef) -> Self::Output {
44 let TransportMessageLowLatencyRef { body } = x;
45
46 match body {
47 TransportBodyLowLatencyRef::Network(b) => self.write(&mut *writer, b),
48 TransportBodyLowLatencyRef::KeepAlive(b) => self.write(&mut *writer, &b),
49 TransportBodyLowLatencyRef::Close(b) => self.write(&mut *writer, &b),
50 }
51 }
52}
53
54impl<R> RCodec<TransportMessageLowLatency, &mut R> for Zenoh080
55where
56 R: Reader + BacktrackableReader,
57{
58 type Error = DidntRead;
59
60 fn read(self, reader: &mut R) -> Result<TransportMessageLowLatency, Self::Error> {
61 let header: u8 = self.read(&mut *reader)?;
62
63 let codec = Zenoh080Header::new(header);
64 let body = match imsg::mid(codec.header) {
65 id::KEEP_ALIVE => TransportBodyLowLatency::KeepAlive(codec.read(&mut *reader)?),
66 id::CLOSE => TransportBodyLowLatency::Close(codec.read(&mut *reader)?),
67 _ => {
68 let nw: NetworkMessage = codec.read(&mut *reader)?;
69 TransportBodyLowLatency::Network(nw)
70 }
71 };
72
73 Ok(TransportMessageLowLatency { body })
74 }
75}
76
77impl<W> WCodec<&TransportMessage, &mut W> for Zenoh080
79where
80 W: Writer,
81{
82 type Output = Result<(), DidntWrite>;
83
84 fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
85 let TransportMessage { body, .. } = x;
86
87 match body {
88 TransportBody::Frame(b) => self.write(&mut *writer, b),
89 TransportBody::Fragment(b) => self.write(&mut *writer, b),
90 TransportBody::KeepAlive(b) => self.write(&mut *writer, b),
91 TransportBody::InitSyn(b) => self.write(&mut *writer, b),
92 TransportBody::InitAck(b) => self.write(&mut *writer, b),
93 TransportBody::OpenSyn(b) => self.write(&mut *writer, b),
94 TransportBody::OpenAck(b) => self.write(&mut *writer, b),
95 TransportBody::Close(b) => self.write(&mut *writer, b),
96 TransportBody::OAM(b) => self.write(&mut *writer, b),
97 TransportBody::Join(b) => self.write(&mut *writer, b),
98 }
99 }
100}
101
102impl<R> RCodec<TransportMessage, &mut R> for Zenoh080
103where
104 R: Reader + BacktrackableReader,
105{
106 type Error = DidntRead;
107
108 fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
109 let header: u8 = self.read(&mut *reader)?;
110
111 let codec = Zenoh080Header::new(header);
112 let body = match imsg::mid(codec.header) {
113 id::FRAME => TransportBody::Frame(codec.read(&mut *reader)?),
114 id::FRAGMENT => TransportBody::Fragment(codec.read(&mut *reader)?),
115 id::KEEP_ALIVE => TransportBody::KeepAlive(codec.read(&mut *reader)?),
116 id::INIT => {
117 if !imsg::has_flag(codec.header, zenoh_protocol::transport::init::flag::A) {
118 TransportBody::InitSyn(codec.read(&mut *reader)?)
119 } else {
120 TransportBody::InitAck(codec.read(&mut *reader)?)
121 }
122 }
123 id::OPEN => {
124 if !imsg::has_flag(codec.header, zenoh_protocol::transport::open::flag::A) {
125 TransportBody::OpenSyn(codec.read(&mut *reader)?)
126 } else {
127 TransportBody::OpenAck(codec.read(&mut *reader)?)
128 }
129 }
130 id::CLOSE => TransportBody::Close(codec.read(&mut *reader)?),
131 id::OAM => TransportBody::OAM(codec.read(&mut *reader)?),
132 id::JOIN => TransportBody::Join(codec.read(&mut *reader)?),
133 _ => return Err(DidntRead),
134 };
135
136 Ok(body.into())
137 }
138}
139
140impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
142where
143 W: Writer,
144{
145 type Output = Result<(), DidntWrite>;
146
147 fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
148 let (x, more) = x;
149 let ext: ZExtZ64<{ ID }> = x.into();
150
151 self.write(&mut *writer, (&ext, more))
152 }
153}
154
155impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
156where
157 R: Reader,
158{
159 type Error = DidntRead;
160
161 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
162 let header: u8 = self.read(&mut *reader)?;
163 let codec = Zenoh080Header::new(header);
164 codec.read(reader)
165 }
166}
167
168impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
169where
170 R: Reader,
171{
172 type Error = DidntRead;
173
174 fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
175 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
176 Ok((ext.into(), more))
177 }
178}
179
180impl<W, const ID: u8> WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080
182where
183 W: Writer,
184{
185 type Output = Result<(), DidntWrite>;
186
187 fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output {
188 let (x, more) = x;
189 let ext: ZExtZ64<{ ID }> = x.into();
190
191 self.write(&mut *writer, (&ext, more))
192 }
193}
194
195impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080
196where
197 R: Reader,
198{
199 type Error = DidntRead;
200
201 fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
202 let header: u8 = self.read(&mut *reader)?;
203 let codec = Zenoh080Header::new(header);
204 codec.read(reader)
205 }
206}
207
208impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header
209where
210 R: Reader,
211{
212 type Error = DidntRead;
213
214 fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
215 let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
216 Ok((ext.into(), more))
217 }
218}