zenoh_codec/zenoh/
err.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::vec::Vec;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19    ZBuf,
20};
21use zenoh_protocol::{
22    common::{iext, imsg},
23    core::Encoding,
24    zenoh::{
25        err::{ext, flag, Err},
26        id,
27    },
28};
29
30use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header};
31
32impl<W> WCodec<&Err, &mut W> for Zenoh080
33where
34    W: Writer,
35{
36    type Output = Result<(), DidntWrite>;
37
38    fn write(self, writer: &mut W, x: &Err) -> Self::Output {
39        let Err {
40            encoding,
41            ext_sinfo,
42            #[cfg(feature = "shared-memory")]
43            ext_shm,
44            ext_unknown,
45            payload,
46        } = x;
47
48        // Header
49        let mut header = id::ERR;
50        if encoding != &Encoding::empty() {
51            header |= flag::E;
52        }
53        let mut n_exts = (ext_sinfo.is_some() as u8) + (ext_unknown.len() as u8);
54        #[cfg(feature = "shared-memory")]
55        {
56            n_exts += ext_shm.is_some() as u8;
57        }
58        if n_exts != 0 {
59            header |= flag::Z;
60        }
61        self.write(&mut *writer, header)?;
62
63        // Body
64        if encoding != &Encoding::empty() {
65            self.write(&mut *writer, encoding)?;
66        }
67
68        // Extensions
69        if let Some(sinfo) = ext_sinfo.as_ref() {
70            n_exts -= 1;
71            self.write(&mut *writer, (sinfo, n_exts != 0))?;
72        }
73        #[cfg(feature = "shared-memory")]
74        if let Some(eshm) = ext_shm.as_ref() {
75            n_exts -= 1;
76            self.write(&mut *writer, (eshm, n_exts != 0))?;
77        }
78        for u in ext_unknown.iter() {
79            n_exts -= 1;
80            self.write(&mut *writer, (u, n_exts != 0))?;
81        }
82
83        // Payload
84        let bodec = Zenoh080Bounded::<u32>::new();
85        bodec.write(&mut *writer, payload)?;
86
87        Ok(())
88    }
89}
90
91impl<R> RCodec<Err, &mut R> for Zenoh080
92where
93    R: Reader,
94{
95    type Error = DidntRead;
96
97    fn read(self, reader: &mut R) -> Result<Err, Self::Error> {
98        let header: u8 = self.read(&mut *reader)?;
99        let codec = Zenoh080Header::new(header);
100        codec.read(reader)
101    }
102}
103
104impl<R> RCodec<Err, &mut R> for Zenoh080Header
105where
106    R: Reader,
107{
108    type Error = DidntRead;
109
110    fn read(self, reader: &mut R) -> Result<Err, Self::Error> {
111        if imsg::mid(self.header) != id::ERR {
112            return Err(DidntRead);
113        }
114
115        // Body
116        let mut encoding = Encoding::empty();
117        if imsg::has_flag(self.header, flag::E) {
118            encoding = self.codec.read(&mut *reader)?;
119        }
120
121        // Extensions
122        let mut ext_sinfo: Option<ext::SourceInfoType> = None;
123        #[cfg(feature = "shared-memory")]
124        let mut ext_shm: Option<ext::ShmType> = None;
125        let mut ext_unknown = Vec::new();
126
127        let mut has_ext = imsg::has_flag(self.header, flag::Z);
128        while has_ext {
129            let ext: u8 = self.codec.read(&mut *reader)?;
130            let eodec = Zenoh080Header::new(ext);
131            match iext::eid(ext) {
132                ext::SourceInfo::ID => {
133                    let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
134                    ext_sinfo = Some(s);
135                    has_ext = ext;
136                }
137                #[cfg(feature = "shared-memory")]
138                ext::Shm::ID => {
139                    let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
140                    ext_shm = Some(s);
141                    has_ext = ext;
142                }
143                _ => {
144                    let (u, ext) = extension::read(reader, "Err", ext)?;
145                    ext_unknown.push(u);
146                    has_ext = ext;
147                }
148            }
149        }
150
151        // Payload
152        let bodec = Zenoh080Bounded::<u32>::new();
153        let payload: ZBuf = bodec.read(&mut *reader)?;
154
155        Ok(Err {
156            encoding,
157            ext_sinfo,
158            #[cfg(feature = "shared-memory")]
159            ext_shm,
160            ext_unknown,
161            payload,
162        })
163    }
164}