Skip to main content

zenoh_codec/zenoh/
put.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::vec::Vec;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19    ZBuf,
20};
21use zenoh_protocol::{
22    common::{iext, imsg, ZExtUnknown},
23    core::Encoding,
24    zenoh::{
25        id,
26        put::{ext, flag, Put},
27    },
28};
29
30#[cfg(not(feature = "shared-memory"))]
31use crate::Zenoh080Bounded;
32#[cfg(feature = "shared-memory")]
33use crate::Zenoh080Sliced;
34use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36impl<W> WCodec<&Put, &mut W> for Zenoh080
37where
38    W: Writer,
39{
40    type Output = Result<(), DidntWrite>;
41
42    #[inline(always)]
43    fn write(self, writer: &mut W, x: &Put) -> Self::Output {
44        let Put {
45            timestamp,
46            encoding,
47            ext_sinfo,
48            ext_attachment,
49            #[cfg(feature = "shared-memory")]
50            ext_shm,
51            ext_unknown,
52            payload,
53        }: &Put = x;
54
55        // Header
56        let mut header = id::PUT;
57        if timestamp.is_some() {
58            header |= flag::T;
59        }
60        if encoding != &Encoding::empty() {
61            header |= flag::E;
62        }
63        let mut n_exts = (ext_sinfo.is_some()) as u8
64            + (ext_attachment.is_some()) as u8
65            + (ext_unknown.len() as u8);
66        #[cfg(feature = "shared-memory")]
67        {
68            n_exts += ext_shm.is_some() as u8;
69        }
70        if n_exts != 0 {
71            header |= flag::Z;
72        }
73        self.write(&mut *writer, header)?;
74
75        // Body
76        if let Some(ts) = timestamp.as_ref() {
77            self.write(&mut *writer, ts)?;
78        }
79        if encoding != &Encoding::empty() {
80            self.write(&mut *writer, encoding)?;
81        }
82
83        // Extensions
84        if let Some(sinfo) = ext_sinfo.as_ref() {
85            n_exts -= 1;
86            self.write(&mut *writer, (sinfo, n_exts != 0))?;
87        }
88        #[cfg(feature = "shared-memory")]
89        if let Some(eshm) = ext_shm.as_ref() {
90            n_exts -= 1;
91            self.write(&mut *writer, (eshm, n_exts != 0))?;
92        }
93        if let Some(att) = ext_attachment.as_ref() {
94            n_exts -= 1;
95            self.write(&mut *writer, (att, n_exts != 0))?;
96        }
97        for u in ext_unknown.iter() {
98            n_exts -= 1;
99            self.write(&mut *writer, (u, n_exts != 0))?;
100        }
101
102        // Payload
103        #[cfg(feature = "shared-memory")]
104        {
105            let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
106            codec.write(&mut *writer, payload)?;
107        }
108
109        #[cfg(not(feature = "shared-memory"))]
110        {
111            let bodec = Zenoh080Bounded::<u32>::new();
112            bodec.write(&mut *writer, payload)?;
113        }
114
115        Ok(())
116    }
117}
118
119impl<R> RCodec<Put, &mut R> for Zenoh080
120where
121    R: Reader,
122{
123    type Error = DidntRead;
124
125    fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
126        let header: u8 = self.read(&mut *reader)?;
127        let codec = Zenoh080Header::new(header);
128        codec.read(reader)
129    }
130}
131
132impl<R> RCodec<Put, &mut R> for Zenoh080Header
133where
134    R: Reader,
135{
136    type Error = DidntRead;
137
138    #[inline(always)]
139    fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
140        if imsg::mid(self.header) != id::PUT {
141            return Err(DidntRead);
142        }
143
144        // Body
145        let mut timestamp: Option<uhlc::Timestamp> = None;
146        if imsg::has_flag(self.header, flag::T) {
147            #[cold]
148            fn read_timestampt<R: Reader>(reader: &mut R) -> Result<uhlc::Timestamp, DidntRead> {
149                let codec = Zenoh080::new();
150                codec.read(&mut *reader)
151            }
152            timestamp = Some(read_timestampt(reader)?);
153        }
154
155        let mut encoding = Encoding::empty();
156        if imsg::has_flag(self.header, flag::E) {
157            #[cold]
158            fn read_encoding<R: Reader>(reader: &mut R) -> Result<Encoding, DidntRead> {
159                let codec = Zenoh080::new();
160                codec.read(&mut *reader)
161            }
162            encoding = read_encoding(reader)?;
163        }
164
165        // Extensions
166        let mut ext_sinfo: Option<ext::SourceInfoType> = None;
167        #[cfg(feature = "shared-memory")]
168        let mut ext_shm: Option<ext::ShmType> = None;
169        let mut ext_attachment: Option<ext::AttachmentType> = None;
170        let mut ext_unknown = Vec::new();
171
172        let mut has_ext = imsg::has_flag(self.header, flag::Z);
173        while has_ext {
174            #[cold]
175            fn read_exts<R: Reader>(
176                reader: &mut R,
177                ext_sinfo: &mut Option<ext::SourceInfoType>,
178                #[cfg(feature = "shared-memory")] ext_shm: &mut Option<ext::ShmType>,
179                ext_attachment: &mut Option<ext::AttachmentType>,
180                ext_unknown: &mut Vec<ZExtUnknown>,
181            ) -> Result<bool, DidntRead> {
182                let codec = Zenoh080::new();
183                let ext: u8 = codec.read(&mut *reader)?;
184                let eodec = Zenoh080Header::new(ext);
185                Ok(match iext::eid(ext) {
186                    ext::SourceInfo::ID => {
187                        let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
188                        *ext_sinfo = Some(s);
189                        ext
190                    }
191                    #[cfg(feature = "shared-memory")]
192                    ext::Shm::ID => {
193                        let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
194                        *ext_shm = Some(s);
195                        ext
196                    }
197                    ext::Attachment::ID => {
198                        let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
199                        *ext_attachment = Some(a);
200                        ext
201                    }
202                    _ => {
203                        let (u, ext) = extension::read(reader, "Put", ext)?;
204                        ext_unknown.push(u);
205                        ext
206                    }
207                })
208            }
209            has_ext = read_exts(
210                reader,
211                &mut ext_sinfo,
212                #[cfg(feature = "shared-memory")]
213                &mut ext_shm,
214                &mut ext_attachment,
215                &mut ext_unknown,
216            )?;
217        }
218
219        // Payload
220        let payload: ZBuf = {
221            #[cfg(feature = "shared-memory")]
222            {
223                let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
224                codec.read(&mut *reader)?
225            }
226
227            #[cfg(not(feature = "shared-memory"))]
228            {
229                let bodec = Zenoh080Bounded::<u32>::new();
230                bodec.read(&mut *reader)?
231            }
232        };
233
234        Ok(Put {
235            timestamp,
236            encoding,
237            ext_sinfo,
238            #[cfg(feature = "shared-memory")]
239            ext_shm,
240            ext_attachment,
241            ext_unknown,
242            payload,
243        })
244    }
245}