zenoh_codec/zenoh/
put.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::vec::Vec;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19    ZBuf,
20};
21use zenoh_protocol::{
22    common::{iext, imsg},
23    core::Encoding,
24    zenoh::{
25        id,
26        put::{ext, flag, Put},
27    },
28};
29
30#[cfg(not(feature = "shared-memory"))]
31use crate::Zenoh080Bounded;
32#[cfg(feature = "shared-memory")]
33use crate::Zenoh080Sliced;
34use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36impl<W> WCodec<&Put, &mut W> for Zenoh080
37where
38    W: Writer,
39{
40    type Output = Result<(), DidntWrite>;
41
42    fn write(self, writer: &mut W, x: &Put) -> Self::Output {
43        let Put {
44            timestamp,
45            encoding,
46            ext_sinfo,
47            ext_attachment,
48            #[cfg(feature = "shared-memory")]
49            ext_shm,
50            ext_unknown,
51            payload,
52        }: &Put = x;
53
54        // Header
55        let mut header = id::PUT;
56        if timestamp.is_some() {
57            header |= flag::T;
58        }
59        if encoding != &Encoding::empty() {
60            header |= flag::E;
61        }
62        let mut n_exts = (ext_sinfo.is_some()) as u8
63            + (ext_attachment.is_some()) as u8
64            + (ext_unknown.len() as u8);
65        #[cfg(feature = "shared-memory")]
66        {
67            n_exts += ext_shm.is_some() as u8;
68        }
69        if n_exts != 0 {
70            header |= flag::Z;
71        }
72        self.write(&mut *writer, header)?;
73
74        // Body
75        if let Some(ts) = timestamp.as_ref() {
76            self.write(&mut *writer, ts)?;
77        }
78        if encoding != &Encoding::empty() {
79            self.write(&mut *writer, encoding)?;
80        }
81
82        // Extensions
83        if let Some(sinfo) = ext_sinfo.as_ref() {
84            n_exts -= 1;
85            self.write(&mut *writer, (sinfo, n_exts != 0))?;
86        }
87        #[cfg(feature = "shared-memory")]
88        if let Some(eshm) = ext_shm.as_ref() {
89            n_exts -= 1;
90            self.write(&mut *writer, (eshm, n_exts != 0))?;
91        }
92        if let Some(att) = ext_attachment.as_ref() {
93            n_exts -= 1;
94            self.write(&mut *writer, (att, n_exts != 0))?;
95        }
96        for u in ext_unknown.iter() {
97            n_exts -= 1;
98            self.write(&mut *writer, (u, n_exts != 0))?;
99        }
100
101        // Payload
102        #[cfg(feature = "shared-memory")]
103        {
104            let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
105            codec.write(&mut *writer, payload)?;
106        }
107
108        #[cfg(not(feature = "shared-memory"))]
109        {
110            let bodec = Zenoh080Bounded::<u32>::new();
111            bodec.write(&mut *writer, payload)?;
112        }
113
114        Ok(())
115    }
116}
117
118impl<R> RCodec<Put, &mut R> for Zenoh080
119where
120    R: Reader,
121{
122    type Error = DidntRead;
123
124    fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
125        let header: u8 = self.read(&mut *reader)?;
126        let codec = Zenoh080Header::new(header);
127        codec.read(reader)
128    }
129}
130
131impl<R> RCodec<Put, &mut R> for Zenoh080Header
132where
133    R: Reader,
134{
135    type Error = DidntRead;
136
137    fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
138        if imsg::mid(self.header) != id::PUT {
139            return Err(DidntRead);
140        }
141
142        // Body
143        let mut timestamp: Option<uhlc::Timestamp> = None;
144        if imsg::has_flag(self.header, flag::T) {
145            timestamp = Some(self.codec.read(&mut *reader)?);
146        }
147
148        let mut encoding = Encoding::empty();
149        if imsg::has_flag(self.header, flag::E) {
150            encoding = self.codec.read(&mut *reader)?;
151        }
152
153        // Extensions
154        let mut ext_sinfo: Option<ext::SourceInfoType> = None;
155        #[cfg(feature = "shared-memory")]
156        let mut ext_shm: Option<ext::ShmType> = None;
157        let mut ext_attachment: Option<ext::AttachmentType> = None;
158        let mut ext_unknown = Vec::new();
159
160        let mut has_ext = imsg::has_flag(self.header, flag::Z);
161        while has_ext {
162            let ext: u8 = self.codec.read(&mut *reader)?;
163            let eodec = Zenoh080Header::new(ext);
164            match iext::eid(ext) {
165                ext::SourceInfo::ID => {
166                    let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
167                    ext_sinfo = Some(s);
168                    has_ext = ext;
169                }
170                #[cfg(feature = "shared-memory")]
171                ext::Shm::ID => {
172                    let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
173                    ext_shm = Some(s);
174                    has_ext = ext;
175                }
176                ext::Attachment::ID => {
177                    let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
178                    ext_attachment = Some(a);
179                    has_ext = ext;
180                }
181                _ => {
182                    let (u, ext) = extension::read(reader, "Put", ext)?;
183                    ext_unknown.push(u);
184                    has_ext = ext;
185                }
186            }
187        }
188
189        // Payload
190        let payload: ZBuf = {
191            #[cfg(feature = "shared-memory")]
192            {
193                let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
194                codec.read(&mut *reader)?
195            }
196
197            #[cfg(not(feature = "shared-memory"))]
198            {
199                let bodec = Zenoh080Bounded::<u32>::new();
200                bodec.read(&mut *reader)?
201            }
202        };
203
204        Ok(Put {
205            timestamp,
206            encoding,
207            ext_sinfo,
208            #[cfg(feature = "shared-memory")]
209            ext_shm,
210            ext_attachment,
211            ext_unknown,
212            payload,
213        })
214    }
215}