1pub mod del;
15pub mod err;
16pub mod put;
17pub mod query;
18pub mod reply;
19
20use zenoh_buffers::{
21 reader::{DidntRead, Reader},
22 writer::{DidntWrite, Writer},
23 ZBuf,
24};
25#[cfg(feature = "shared-memory")]
26use zenoh_protocol::common::{iext, ZExtUnit};
27use zenoh_protocol::{
28 common::{imsg, ZExtZBufHeader},
29 core::{Encoding, EntityGlobalIdProto, EntityId, ZenohIdProto},
30 zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
31};
32
33#[cfg(not(feature = "shared-memory"))]
34use crate::Zenoh080Bounded;
35#[cfg(feature = "shared-memory")]
36use crate::Zenoh080Sliced;
37use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
38
39impl<W> WCodec<&PushBody, &mut W> for Zenoh080
41where
42 W: Writer,
43{
44 type Output = Result<(), DidntWrite>;
45
46 #[inline(always)]
47 fn write(self, writer: &mut W, x: &PushBody) -> Self::Output {
48 if let PushBody::Put(b) = x {
49 return self.write(&mut *writer, b);
50 }
51 #[cold]
52 fn write_del<W: Writer>(
53 codec: Zenoh080,
54 writer: &mut W,
55 x: &PushBody,
56 ) -> Result<(), DidntWrite> {
57 match x {
58 PushBody::Del(b) => codec.write(&mut *writer, b),
59 _ => Err(DidntWrite),
60 }
61 }
62 write_del(self, writer, x)
63 }
64}
65
66impl<R> RCodec<PushBody, &mut R> for Zenoh080
67where
68 R: Reader,
69{
70 type Error = DidntRead;
71
72 #[inline(always)]
73 fn read(self, reader: &mut R) -> Result<PushBody, Self::Error> {
74 let header: u8 = self.read(&mut *reader)?;
75
76 let codec = Zenoh080Header::new(header);
77 if imsg::mid(codec.header) == id::PUT {
78 return Ok(PushBody::Put(codec.read(&mut *reader)?));
79 }
80 #[cold]
81 fn read_del<R: Reader>(
82 codec: Zenoh080Header,
83 reader: &mut R,
84 ) -> Result<PushBody, DidntRead> {
85 match imsg::mid(codec.header) {
86 id::DEL => Ok(PushBody::Del(codec.read(&mut *reader)?)),
87 _ => Err(DidntRead),
88 }
89 }
90 read_del(codec, reader)
91 }
92}
93
94impl<W> WCodec<&RequestBody, &mut W> for Zenoh080
96where
97 W: Writer,
98{
99 type Output = Result<(), DidntWrite>;
100
101 fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
102 match x {
103 RequestBody::Query(b) => self.write(&mut *writer, b),
104 }
105 }
106}
107
108impl<R> RCodec<RequestBody, &mut R> for Zenoh080
109where
110 R: Reader,
111{
112 type Error = DidntRead;
113
114 fn read(self, reader: &mut R) -> Result<RequestBody, Self::Error> {
115 let header: u8 = self.read(&mut *reader)?;
116
117 let codec = Zenoh080Header::new(header);
118 let body = match imsg::mid(codec.header) {
119 id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
120 _ => return Err(DidntRead),
121 };
122
123 Ok(body)
124 }
125}
126
127impl<W> WCodec<&ResponseBody, &mut W> for Zenoh080
129where
130 W: Writer,
131{
132 type Output = Result<(), DidntWrite>;
133
134 fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
135 match x {
136 ResponseBody::Reply(b) => self.write(&mut *writer, b),
137 ResponseBody::Err(b) => self.write(&mut *writer, b),
138 }
139 }
140}
141
142impl<R> RCodec<ResponseBody, &mut R> for Zenoh080
143where
144 R: Reader,
145{
146 type Error = DidntRead;
147
148 fn read(self, reader: &mut R) -> Result<ResponseBody, Self::Error> {
149 let header: u8 = self.read(&mut *reader)?;
150
151 let codec = Zenoh080Header::new(header);
152 let body = match imsg::mid(codec.header) {
153 id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
154 id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
155 _ => return Err(DidntRead),
156 };
157
158 Ok(body)
159 }
160}
161
162impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
164 fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
165 let ext::SourceInfoType { id, sn } = x;
166
167 1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
168 }
169}
170
171impl<W, const ID: u8> WCodec<(&ext::SourceInfoType<{ ID }>, bool), &mut W> for Zenoh080
172where
173 W: Writer,
174{
175 type Output = Result<(), DidntWrite>;
176
177 fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
178 let (x, more) = x;
179 let ext::SourceInfoType { id, sn } = x;
180
181 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
182 self.write(&mut *writer, (&header, more))?;
183
184 let flags: u8 = (id.zid.size() as u8 - 1) << 4;
185 self.write(&mut *writer, flags)?;
186
187 let lodec = Zenoh080Length::new(id.zid.size());
188 lodec.write(&mut *writer, &id.zid)?;
189
190 self.write(&mut *writer, id.eid)?;
191 self.write(&mut *writer, sn)?;
192 Ok(())
193 }
194}
195
196impl<R, const ID: u8> RCodec<(ext::SourceInfoType<{ ID }>, bool), &mut R> for Zenoh080Header
197where
198 R: Reader,
199{
200 type Error = DidntRead;
201
202 fn read(self, reader: &mut R) -> Result<(ext::SourceInfoType<{ ID }>, bool), Self::Error> {
203 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
204
205 let flags: u8 = self.codec.read(&mut *reader)?;
206 let length = 1 + ((flags >> 4) as usize);
207
208 let lodec = Zenoh080Length::new(length);
209 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
210
211 let eid: EntityId = self.codec.read(&mut *reader)?;
212 let sn: u32 = self.codec.read(&mut *reader)?;
213
214 Ok((
215 ext::SourceInfoType {
216 id: EntityGlobalIdProto { zid, eid },
217 sn,
218 },
219 more,
220 ))
221 }
222}
223
224#[cfg(feature = "shared-memory")]
226impl<W, const ID: u8> WCodec<(&ext::ShmType<{ ID }>, bool), &mut W> for Zenoh080
227where
228 W: Writer,
229{
230 type Output = Result<(), DidntWrite>;
231
232 fn write(self, writer: &mut W, x: (&ext::ShmType<{ ID }>, bool)) -> Self::Output {
233 let (x, more) = x;
234 let ext::ShmType = x;
235
236 let header: ZExtUnit<{ ID }> = ZExtUnit::new();
237 self.write(&mut *writer, (&header, more))?;
238 Ok(())
239 }
240}
241
242#[cfg(feature = "shared-memory")]
243impl<R, const ID: u8> RCodec<(ext::ShmType<{ ID }>, bool), &mut R> for Zenoh080Header
244where
245 R: Reader,
246{
247 type Error = DidntRead;
248
249 fn read(self, reader: &mut R) -> Result<(ext::ShmType<{ ID }>, bool), Self::Error> {
250 let (_, more): (ZExtUnit<{ ID }>, bool) = self.read(&mut *reader)?;
251 Ok((ext::ShmType, more))
252 }
253}
254
255impl<W, const VID: u8, const SID: u8> WCodec<(&ext::ValueType<{ VID }, { SID }>, bool), &mut W>
257 for Zenoh080
258where
259 W: Writer,
260{
261 type Output = Result<(), DidntWrite>;
262
263 fn write(self, writer: &mut W, x: (&ext::ValueType<{ VID }, { SID }>, bool)) -> Self::Output {
264 let (x, more) = x;
265 let ext::ValueType {
266 encoding,
267 payload,
268 #[cfg(feature = "shared-memory")]
269 ext_shm,
270 } = x;
271
272 #[cfg(feature = "shared-memory")] if let Some(eshm) = ext_shm.as_ref() {
274 self.write(&mut *writer, (eshm, true))?;
275 }
276
277 let mut len = self.w_len(encoding);
279
280 #[cfg(feature = "shared-memory")]
281 {
282 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
283 len += codec.w_len(payload);
284 }
285
286 #[cfg(not(feature = "shared-memory"))]
287 {
288 let codec = Zenoh080Bounded::<u32>::new();
289 len += codec.w_len(payload);
290 }
291
292 let header: ZExtZBufHeader<{ VID }> = ZExtZBufHeader::new(len);
294 self.write(&mut *writer, (&header, more))?;
295
296 self.write(&mut *writer, encoding)?;
298
299 fn write<W>(writer: &mut W, payload: &ZBuf) -> Result<(), DidntWrite>
301 where
302 W: Writer,
303 {
304 for s in payload.zslices() {
306 writer.write_zslice(s)?;
307 }
308 Ok(())
309 }
310
311 #[cfg(feature = "shared-memory")]
312 {
313 if ext_shm.is_some() {
314 let codec = Zenoh080Sliced::<u32>::new(true);
315 codec.write(&mut *writer, payload)?;
316 } else {
317 write(&mut *writer, payload)?;
318 }
319 }
320
321 #[cfg(not(feature = "shared-memory"))]
322 {
323 write(&mut *writer, payload)?;
324 }
325
326 Ok(())
327 }
328}
329
330impl<R, const VID: u8, const SID: u8> RCodec<(ext::ValueType<{ VID }, { SID }>, bool), &mut R>
331 for Zenoh080Header
332where
333 R: Reader,
334{
335 type Error = DidntRead;
336
337 fn read(
338 #[allow(unused_mut)] mut self,
339 reader: &mut R,
340 ) -> Result<(ext::ValueType<{ VID }, { SID }>, bool), Self::Error> {
341 #[cfg(feature = "shared-memory")]
342 let ext_shm = if iext::eid(self.header) == SID {
343 self.header = self.codec.read(&mut *reader)?;
344 Some(ext::ShmType)
345 } else {
346 None
347 };
348 let (header, more): (ZExtZBufHeader<{ VID }>, bool) = self.read(&mut *reader)?;
349
350 let start = reader.remaining();
352 let encoding: Encoding = self.codec.read(&mut *reader)?;
353 let end = reader.remaining();
354
355 fn read<R>(reader: &mut R, len: usize) -> Result<ZBuf, DidntRead>
357 where
358 R: Reader,
359 {
360 reader.read_zbuf(len)
361 }
362
363 let len = header.len - (start - end);
365
366 let payload: ZBuf = {
367 #[cfg(feature = "shared-memory")]
368 {
369 if ext_shm.is_some() {
370 let codec = Zenoh080Sliced::<u32>::new(true);
371 let payload: ZBuf = codec.read(&mut *reader)?;
372 payload
373 } else {
374 read(&mut *reader, len)?
375 }
376 }
377
378 #[cfg(not(feature = "shared-memory"))]
379 {
380 read(&mut *reader, len)?
381 }
382 };
383
384 Ok((
385 ext::ValueType {
386 #[cfg(feature = "shared-memory")]
387 ext_shm,
388 encoding,
389 payload,
390 },
391 more,
392 ))
393 }
394}
395
396impl<W, const ID: u8> WCodec<(&ext::AttachmentType<{ ID }>, bool), &mut W> for Zenoh080
398where
399 W: Writer,
400{
401 type Output = Result<(), DidntWrite>;
402
403 fn write(self, writer: &mut W, x: (&ext::AttachmentType<{ ID }>, bool)) -> Self::Output {
404 let (x, more) = x;
405 let ext::AttachmentType { buffer } = x;
406
407 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(buffer));
408 self.write(&mut *writer, (&header, more))?;
409 for s in buffer.zslices() {
410 writer.write_zslice(s)?;
411 }
412
413 Ok(())
414 }
415}
416
417impl<R, const ID: u8> RCodec<(ext::AttachmentType<{ ID }>, bool), &mut R> for Zenoh080Header
418where
419 R: Reader,
420{
421 type Error = DidntRead;
422
423 fn read(self, reader: &mut R) -> Result<(ext::AttachmentType<{ ID }>, bool), Self::Error> {
424 let (h, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
425 let buffer = reader.read_zbuf(h.len)?;
426
427 Ok((ext::AttachmentType { buffer }, more))
428 }
429}