1use core::time::Duration;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19 ZSlice,
20};
21use zenoh_protocol::{
22 common::{iext, imsg},
23 transport::{
24 id,
25 open::{ext, flag, OpenAck, OpenSyn},
26 TransportSn,
27 },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
31
32impl<W> WCodec<&OpenSyn, &mut W> for Zenoh080
34where
35 W: Writer,
36{
37 type Output = Result<(), DidntWrite>;
38
39 fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output {
40 let OpenSyn {
41 lease,
42 initial_sn,
43 cookie,
44 ext_qos,
45 #[cfg(feature = "shared-memory")]
46 ext_shm,
47 ext_auth,
48 ext_mlink,
49 ext_lowlatency,
50 ext_compression,
51 ext_south,
52 } = x;
53
54 let mut header = id::OPEN;
56 if lease.as_millis() % 1_000 == 0 {
57 header |= flag::T;
58 }
59 let mut n_exts = (ext_qos.is_some() as u8)
60 + (ext_auth.is_some() as u8)
61 + (ext_mlink.is_some() as u8)
62 + (ext_lowlatency.is_some() as u8)
63 + (ext_compression.is_some() as u8)
64 + (ext_south.is_some() as u8);
65
66 #[cfg(feature = "shared-memory")]
67 {
68 n_exts += ext_shm.is_some() as u8;
69 }
70
71 if n_exts != 0 {
72 header |= flag::Z;
73 }
74 self.write(&mut *writer, header)?;
75
76 if imsg::has_flag(header, flag::T) {
78 self.write(&mut *writer, lease.as_secs())?;
79 } else {
80 self.write(&mut *writer, lease.as_millis() as u64)?;
81 }
82 self.write(&mut *writer, initial_sn)?;
83 self.write(&mut *writer, cookie)?;
84
85 if let Some(qos) = ext_qos.as_ref() {
87 n_exts -= 1;
88 self.write(&mut *writer, (qos, n_exts != 0))?;
89 }
90 #[cfg(feature = "shared-memory")]
91 if let Some(shm) = ext_shm.as_ref() {
92 n_exts -= 1;
93 self.write(&mut *writer, (shm, n_exts != 0))?;
94 }
95 if let Some(auth) = ext_auth.as_ref() {
96 n_exts -= 1;
97 self.write(&mut *writer, (auth, n_exts != 0))?;
98 }
99 if let Some(mlink) = ext_mlink.as_ref() {
100 n_exts -= 1;
101 self.write(&mut *writer, (mlink, n_exts != 0))?;
102 }
103 if let Some(lowlatency) = ext_lowlatency.as_ref() {
104 n_exts -= 1;
105 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
106 }
107 if let Some(compression) = ext_compression.as_ref() {
108 n_exts -= 1;
109 self.write(&mut *writer, (compression, n_exts != 0))?;
110 }
111 if let Some(south) = ext_south.as_ref() {
112 n_exts -= 1;
113 self.write(&mut *writer, (south, n_exts != 0))?;
114 }
115
116 Ok(())
117 }
118}
119
120impl<R> RCodec<OpenSyn, &mut R> for Zenoh080
121where
122 R: Reader,
123{
124 type Error = DidntRead;
125
126 fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
127 let header: u8 = self.read(&mut *reader)?;
128 let codec = Zenoh080Header::new(header);
129 codec.read(reader)
130 }
131}
132
133impl<R> RCodec<OpenSyn, &mut R> for Zenoh080Header
134where
135 R: Reader,
136{
137 type Error = DidntRead;
138
139 fn read(self, reader: &mut R) -> Result<OpenSyn, Self::Error> {
140 if imsg::mid(self.header) != id::OPEN || imsg::has_flag(self.header, flag::A) {
141 return Err(DidntRead);
142 }
143
144 let lease: u64 = self.codec.read(&mut *reader)?;
146 let lease = if imsg::has_flag(self.header, flag::T) {
147 Duration::from_secs(lease)
148 } else {
149 Duration::from_millis(lease)
150 };
151 let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
152 let cookie: ZSlice = self.codec.read(&mut *reader)?;
153
154 let mut ext_qos = None;
156 #[cfg(feature = "shared-memory")]
157 let mut ext_shm = None;
158 let mut ext_auth = None;
159 let mut ext_mlink = None;
160 let mut ext_lowlatency = None;
161 let mut ext_compression = None;
162 let mut ext_south = None;
163
164 let mut has_ext = imsg::has_flag(self.header, flag::Z);
165 while has_ext {
166 let ext: u8 = self.codec.read(&mut *reader)?;
167 let eodec = Zenoh080Header::new(ext);
168 match iext::eid(ext) {
169 ext::QoS::ID => {
170 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
171 ext_qos = Some(q);
172 has_ext = ext;
173 }
174 #[cfg(feature = "shared-memory")]
175 ext::Shm::ID => {
176 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
177 ext_shm = Some(s);
178 has_ext = ext;
179 }
180 ext::Auth::ID => {
181 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
182 ext_auth = Some(a);
183 has_ext = ext;
184 }
185 ext::MultiLinkSyn::ID => {
186 let (a, ext): (ext::MultiLinkSyn, bool) = eodec.read(&mut *reader)?;
187 ext_mlink = Some(a);
188 has_ext = ext;
189 }
190 ext::LowLatency::ID => {
191 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
192 ext_lowlatency = Some(q);
193 has_ext = ext;
194 }
195 ext::Compression::ID => {
196 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
197 ext_compression = Some(q);
198 has_ext = ext;
199 }
200 ext::RemoteBound::ID => {
201 let (q, ext): (ext::RemoteBound, bool) = eodec.read(&mut *reader)?;
202 ext_south = Some(q);
203 has_ext = ext;
204 }
205 _ => {
206 has_ext = extension::skip(reader, "OpenSyn", ext)?;
207 }
208 }
209 }
210
211 Ok(OpenSyn {
212 lease,
213 initial_sn,
214 cookie,
215 ext_qos,
216 #[cfg(feature = "shared-memory")]
217 ext_shm,
218 ext_auth,
219 ext_mlink,
220 ext_lowlatency,
221 ext_compression,
222 ext_south,
223 })
224 }
225}
226
227impl<W> WCodec<&OpenAck, &mut W> for Zenoh080
229where
230 W: Writer,
231{
232 type Output = Result<(), DidntWrite>;
233
234 fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output {
235 let OpenAck {
236 lease,
237 initial_sn,
238 ext_qos,
239 #[cfg(feature = "shared-memory")]
240 ext_shm,
241 ext_auth,
242 ext_mlink,
243 ext_lowlatency,
244 ext_compression,
245 ext_south,
246 } = x;
247
248 let mut header = id::OPEN;
250 header |= flag::A;
251 if lease.subsec_nanos() == 0 {
253 header |= flag::T;
254 }
255 let mut n_exts = (ext_qos.is_some() as u8)
256 + (ext_auth.is_some() as u8)
257 + (ext_mlink.is_some() as u8)
258 + (ext_lowlatency.is_some() as u8)
259 + (ext_compression.is_some() as u8)
260 + (ext_south.is_some() as u8);
261
262 #[cfg(feature = "shared-memory")]
263 {
264 n_exts += ext_shm.is_some() as u8;
265 }
266
267 if n_exts != 0 {
268 header |= flag::Z;
269 }
270 self.write(&mut *writer, header)?;
271
272 if imsg::has_flag(header, flag::T) {
274 self.write(&mut *writer, lease.as_secs())?;
275 } else {
276 self.write(&mut *writer, lease.as_millis() as u64)?;
277 }
278 self.write(&mut *writer, initial_sn)?;
279
280 if let Some(qos) = ext_qos.as_ref() {
282 n_exts -= 1;
283 self.write(&mut *writer, (qos, n_exts != 0))?;
284 }
285 #[cfg(feature = "shared-memory")]
286 if let Some(shm) = ext_shm.as_ref() {
287 n_exts -= 1;
288 self.write(&mut *writer, (shm, n_exts != 0))?;
289 }
290 if let Some(auth) = ext_auth.as_ref() {
291 n_exts -= 1;
292 self.write(&mut *writer, (auth, n_exts != 0))?;
293 }
294 if let Some(mlink) = ext_mlink.as_ref() {
295 n_exts -= 1;
296 self.write(&mut *writer, (mlink, n_exts != 0))?;
297 }
298 if let Some(lowlatency) = ext_lowlatency.as_ref() {
299 n_exts -= 1;
300 self.write(&mut *writer, (lowlatency, n_exts != 0))?;
301 }
302 if let Some(compression) = ext_compression.as_ref() {
303 n_exts -= 1;
304 self.write(&mut *writer, (compression, n_exts != 0))?;
305 }
306 if let Some(south) = ext_south.as_ref() {
307 n_exts -= 1;
308 self.write(&mut *writer, (south, n_exts != 0))?;
309 }
310
311 Ok(())
312 }
313}
314
315impl<R> RCodec<OpenAck, &mut R> for Zenoh080
316where
317 R: Reader,
318{
319 type Error = DidntRead;
320
321 fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
322 let header: u8 = self.read(&mut *reader)?;
323 let codec = Zenoh080Header::new(header);
324 codec.read(reader)
325 }
326}
327
328impl<R> RCodec<OpenAck, &mut R> for Zenoh080Header
329where
330 R: Reader,
331{
332 type Error = DidntRead;
333
334 fn read(self, reader: &mut R) -> Result<OpenAck, Self::Error> {
335 if imsg::mid(self.header) != id::OPEN || !imsg::has_flag(self.header, flag::A) {
336 return Err(DidntRead);
337 }
338
339 let lease: u64 = self.codec.read(&mut *reader)?;
341 let lease = if imsg::has_flag(self.header, flag::T) {
342 Duration::from_secs(lease)
343 } else {
344 Duration::from_millis(lease)
345 };
346 let initial_sn: TransportSn = self.codec.read(&mut *reader)?;
347
348 let mut ext_qos = None;
350 #[cfg(feature = "shared-memory")]
351 let mut ext_shm = None;
352 let mut ext_auth = None;
353 let mut ext_mlink = None;
354 let mut ext_lowlatency = None;
355 let mut ext_compression = None;
356 let mut ext_south = None;
357
358 let mut has_ext = imsg::has_flag(self.header, flag::Z);
359 while has_ext {
360 let ext: u8 = self.codec.read(&mut *reader)?;
361 let eodec = Zenoh080Header::new(ext);
362 match iext::eid(ext) {
363 ext::QoS::ID => {
364 let (q, ext): (ext::QoS, bool) = eodec.read(&mut *reader)?;
365 ext_qos = Some(q);
366 has_ext = ext;
367 }
368 #[cfg(feature = "shared-memory")]
369 ext::Shm::ID => {
370 let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
371 ext_shm = Some(s);
372 has_ext = ext;
373 }
374 ext::Auth::ID => {
375 let (a, ext): (ext::Auth, bool) = eodec.read(&mut *reader)?;
376 ext_auth = Some(a);
377 has_ext = ext;
378 }
379 ext::MultiLinkAck::ID => {
380 let (a, ext): (ext::MultiLinkAck, bool) = eodec.read(&mut *reader)?;
381 ext_mlink = Some(a);
382 has_ext = ext;
383 }
384 ext::LowLatency::ID => {
385 let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?;
386 ext_lowlatency = Some(q);
387 has_ext = ext;
388 }
389 ext::Compression::ID => {
390 let (q, ext): (ext::Compression, bool) = eodec.read(&mut *reader)?;
391 ext_compression = Some(q);
392 has_ext = ext;
393 }
394 ext::RemoteBound::ID => {
395 let (q, ext): (ext::RemoteBound, bool) = eodec.read(&mut *reader)?;
396 ext_south = Some(q);
397 has_ext = ext;
398 }
399 _ => {
400 has_ext = extension::skip(reader, "OpenAck", ext)?;
401 }
402 }
403 }
404
405 Ok(OpenAck {
406 lease,
407 initial_sn,
408 ext_qos,
409 #[cfg(feature = "shared-memory")]
410 ext_shm,
411 ext_auth,
412 ext_mlink,
413 ext_lowlatency,
414 ext_compression,
415 ext_south,
416 })
417 }
418}