1use alloc::vec::Vec;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19 ZBuf,
20};
21use zenoh_protocol::{
22 common::{iext, imsg, ZExtUnknown},
23 core::Encoding,
24 zenoh::{
25 id,
26 put::{ext, flag, Put},
27 },
28};
29
30#[cfg(not(feature = "shared-memory"))]
31use crate::Zenoh080Bounded;
32#[cfg(feature = "shared-memory")]
33use crate::Zenoh080Sliced;
34use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36impl<W> WCodec<&Put, &mut W> for Zenoh080
37where
38 W: Writer,
39{
40 type Output = Result<(), DidntWrite>;
41
42 #[inline(always)]
43 fn write(self, writer: &mut W, x: &Put) -> Self::Output {
44 let Put {
45 timestamp,
46 encoding,
47 ext_sinfo,
48 ext_attachment,
49 #[cfg(feature = "shared-memory")]
50 ext_shm,
51 ext_unknown,
52 payload,
53 }: &Put = x;
54
55 let mut header = id::PUT;
57 if timestamp.is_some() {
58 header |= flag::T;
59 }
60 if encoding != &Encoding::empty() {
61 header |= flag::E;
62 }
63 let mut n_exts = (ext_sinfo.is_some()) as u8
64 + (ext_attachment.is_some()) as u8
65 + (ext_unknown.len() as u8);
66 #[cfg(feature = "shared-memory")]
67 {
68 n_exts += ext_shm.is_some() as u8;
69 }
70 if n_exts != 0 {
71 header |= flag::Z;
72 }
73 self.write(&mut *writer, header)?;
74
75 if let Some(ts) = timestamp.as_ref() {
77 self.write(&mut *writer, ts)?;
78 }
79 if encoding != &Encoding::empty() {
80 self.write(&mut *writer, encoding)?;
81 }
82
83 if let Some(sinfo) = ext_sinfo.as_ref() {
85 n_exts -= 1;
86 self.write(&mut *writer, (sinfo, n_exts != 0))?;
87 }
88 #[cfg(feature = "shared-memory")]
89 if let Some(eshm) = ext_shm.as_ref() {
90 n_exts -= 1;
91 self.write(&mut *writer, (eshm, n_exts != 0))?;
92 }
93 if let Some(att) = ext_attachment.as_ref() {
94 n_exts -= 1;
95 self.write(&mut *writer, (att, n_exts != 0))?;
96 }
97 for u in ext_unknown.iter() {
98 n_exts -= 1;
99 self.write(&mut *writer, (u, n_exts != 0))?;
100 }
101
102 #[cfg(feature = "shared-memory")]
104 {
105 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
106 codec.write(&mut *writer, payload)?;
107 }
108
109 #[cfg(not(feature = "shared-memory"))]
110 {
111 let bodec = Zenoh080Bounded::<u32>::new();
112 bodec.write(&mut *writer, payload)?;
113 }
114
115 Ok(())
116 }
117}
118
119impl<R> RCodec<Put, &mut R> for Zenoh080
120where
121 R: Reader,
122{
123 type Error = DidntRead;
124
125 fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
126 let header: u8 = self.read(&mut *reader)?;
127 let codec = Zenoh080Header::new(header);
128 codec.read(reader)
129 }
130}
131
132impl<R> RCodec<Put, &mut R> for Zenoh080Header
133where
134 R: Reader,
135{
136 type Error = DidntRead;
137
138 #[inline(always)]
139 fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
140 if imsg::mid(self.header) != id::PUT {
141 return Err(DidntRead);
142 }
143
144 let mut timestamp: Option<uhlc::Timestamp> = None;
146 if imsg::has_flag(self.header, flag::T) {
147 #[cold]
148 fn read_timestampt<R: Reader>(reader: &mut R) -> Result<uhlc::Timestamp, DidntRead> {
149 let codec = Zenoh080::new();
150 codec.read(&mut *reader)
151 }
152 timestamp = Some(read_timestampt(reader)?);
153 }
154
155 let mut encoding = Encoding::empty();
156 if imsg::has_flag(self.header, flag::E) {
157 #[cold]
158 fn read_encoding<R: Reader>(reader: &mut R) -> Result<Encoding, DidntRead> {
159 let codec = Zenoh080::new();
160 codec.read(&mut *reader)
161 }
162 encoding = read_encoding(reader)?;
163 }
164
165 let mut ext_sinfo: Option<ext::SourceInfoType> = None;
167 #[cfg(feature = "shared-memory")]
168 let mut ext_shm: Option<ext::ShmType> = None;
169 let mut ext_attachment: Option<ext::AttachmentType> = None;
170 let mut ext_unknown = Vec::new();
171
172 let mut has_ext = imsg::has_flag(self.header, flag::Z);
173 while has_ext {
174 #[cold]
175 fn read_exts<R: Reader>(
176 reader: &mut R,
177 ext_sinfo: &mut Option<ext::SourceInfoType>,
178 #[cfg(feature = "shared-memory")] ext_shm: &mut Option<ext::ShmType>,
179 ext_attachment: &mut Option<ext::AttachmentType>,
180 ext_unknown: &mut Vec<ZExtUnknown>,
181 ) -> Result<bool, DidntRead> {
182 let codec = Zenoh080::new();
183 let ext: u8 = codec.read(&mut *reader)?;
184 let eodec = Zenoh080Header::new(ext);
185 Ok(match iext::eid(ext) {
186 ext::SourceInfo::ID => {
187 let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
188 *ext_sinfo = Some(s);
189 ext
190 }
191 #[cfg(feature = "shared-memory")]
192 ext::Shm::ID => {
193 let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
194 *ext_shm = Some(s);
195 ext
196 }
197 ext::Attachment::ID => {
198 let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
199 *ext_attachment = Some(a);
200 ext
201 }
202 _ => {
203 let (u, ext) = extension::read(reader, "Put", ext)?;
204 ext_unknown.push(u);
205 ext
206 }
207 })
208 }
209 has_ext = read_exts(
210 reader,
211 &mut ext_sinfo,
212 #[cfg(feature = "shared-memory")]
213 &mut ext_shm,
214 &mut ext_attachment,
215 &mut ext_unknown,
216 )?;
217 }
218
219 let payload: ZBuf = {
221 #[cfg(feature = "shared-memory")]
222 {
223 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
224 codec.read(&mut *reader)?
225 }
226
227 #[cfg(not(feature = "shared-memory"))]
228 {
229 let bodec = Zenoh080Bounded::<u32>::new();
230 bodec.read(&mut *reader)?
231 }
232 };
233
234 Ok(Put {
235 timestamp,
236 encoding,
237 ext_sinfo,
238 #[cfg(feature = "shared-memory")]
239 ext_shm,
240 ext_attachment,
241 ext_unknown,
242 payload,
243 })
244 }
245}