Skip to main content

zenoh_codec/zenoh/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod del;
15pub mod err;
16pub mod put;
17pub mod query;
18pub mod reply;
19
20use zenoh_buffers::{
21    reader::{DidntRead, Reader},
22    writer::{DidntWrite, Writer},
23    ZBuf,
24};
25#[cfg(feature = "shared-memory")]
26use zenoh_protocol::common::{iext, ZExtUnit};
27use zenoh_protocol::{
28    common::{imsg, ZExtZBufHeader},
29    core::{Encoding, EntityGlobalIdProto, EntityId, ZenohIdProto},
30    zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
31};
32
33#[cfg(not(feature = "shared-memory"))]
34use crate::Zenoh080Bounded;
35#[cfg(feature = "shared-memory")]
36use crate::Zenoh080Sliced;
37use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
38
39// Push
40impl<W> WCodec<&PushBody, &mut W> for Zenoh080
41where
42    W: Writer,
43{
44    type Output = Result<(), DidntWrite>;
45
46    #[inline(always)]
47    fn write(self, writer: &mut W, x: &PushBody) -> Self::Output {
48        if let PushBody::Put(b) = x {
49            return self.write(&mut *writer, b);
50        }
51        #[cold]
52        fn write_del<W: Writer>(
53            codec: Zenoh080,
54            writer: &mut W,
55            x: &PushBody,
56        ) -> Result<(), DidntWrite> {
57            match x {
58                PushBody::Del(b) => codec.write(&mut *writer, b),
59                _ => Err(DidntWrite),
60            }
61        }
62        write_del(self, writer, x)
63    }
64}
65
66impl<R> RCodec<PushBody, &mut R> for Zenoh080
67where
68    R: Reader,
69{
70    type Error = DidntRead;
71
72    #[inline(always)]
73    fn read(self, reader: &mut R) -> Result<PushBody, Self::Error> {
74        let header: u8 = self.read(&mut *reader)?;
75
76        let codec = Zenoh080Header::new(header);
77        if imsg::mid(codec.header) == id::PUT {
78            return Ok(PushBody::Put(codec.read(&mut *reader)?));
79        }
80        #[cold]
81        fn read_del<R: Reader>(
82            codec: Zenoh080Header,
83            reader: &mut R,
84        ) -> Result<PushBody, DidntRead> {
85            match imsg::mid(codec.header) {
86                id::DEL => Ok(PushBody::Del(codec.read(&mut *reader)?)),
87                _ => Err(DidntRead),
88            }
89        }
90        read_del(codec, reader)
91    }
92}
93
94// Request
95impl<W> WCodec<&RequestBody, &mut W> for Zenoh080
96where
97    W: Writer,
98{
99    type Output = Result<(), DidntWrite>;
100
101    fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
102        match x {
103            RequestBody::Query(b) => self.write(&mut *writer, b),
104        }
105    }
106}
107
108impl<R> RCodec<RequestBody, &mut R> for Zenoh080
109where
110    R: Reader,
111{
112    type Error = DidntRead;
113
114    fn read(self, reader: &mut R) -> Result<RequestBody, Self::Error> {
115        let header: u8 = self.read(&mut *reader)?;
116
117        let codec = Zenoh080Header::new(header);
118        let body = match imsg::mid(codec.header) {
119            id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
120            _ => return Err(DidntRead),
121        };
122
123        Ok(body)
124    }
125}
126
127// Response
128impl<W> WCodec<&ResponseBody, &mut W> for Zenoh080
129where
130    W: Writer,
131{
132    type Output = Result<(), DidntWrite>;
133
134    fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
135        match x {
136            ResponseBody::Reply(b) => self.write(&mut *writer, b),
137            ResponseBody::Err(b) => self.write(&mut *writer, b),
138        }
139    }
140}
141
142impl<R> RCodec<ResponseBody, &mut R> for Zenoh080
143where
144    R: Reader,
145{
146    type Error = DidntRead;
147
148    fn read(self, reader: &mut R) -> Result<ResponseBody, Self::Error> {
149        let header: u8 = self.read(&mut *reader)?;
150
151        let codec = Zenoh080Header::new(header);
152        let body = match imsg::mid(codec.header) {
153            id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
154            id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
155            _ => return Err(DidntRead),
156        };
157
158        Ok(body)
159    }
160}
161
162// Extension: SourceInfo
163impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
164    fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
165        let ext::SourceInfoType { id, sn } = x;
166
167        1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
168    }
169}
170
171impl<W, const ID: u8> WCodec<(&ext::SourceInfoType<{ ID }>, bool), &mut W> for Zenoh080
172where
173    W: Writer,
174{
175    type Output = Result<(), DidntWrite>;
176
177    fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
178        let (x, more) = x;
179        let ext::SourceInfoType { id, sn } = x;
180
181        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
182        self.write(&mut *writer, (&header, more))?;
183
184        let flags: u8 = (id.zid.size() as u8 - 1) << 4;
185        self.write(&mut *writer, flags)?;
186
187        let lodec = Zenoh080Length::new(id.zid.size());
188        lodec.write(&mut *writer, &id.zid)?;
189
190        self.write(&mut *writer, id.eid)?;
191        self.write(&mut *writer, sn)?;
192        Ok(())
193    }
194}
195
196impl<R, const ID: u8> RCodec<(ext::SourceInfoType<{ ID }>, bool), &mut R> for Zenoh080Header
197where
198    R: Reader,
199{
200    type Error = DidntRead;
201
202    fn read(self, reader: &mut R) -> Result<(ext::SourceInfoType<{ ID }>, bool), Self::Error> {
203        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
204
205        let flags: u8 = self.codec.read(&mut *reader)?;
206        let length = 1 + ((flags >> 4) as usize);
207
208        let lodec = Zenoh080Length::new(length);
209        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
210
211        let eid: EntityId = self.codec.read(&mut *reader)?;
212        let sn: u32 = self.codec.read(&mut *reader)?;
213
214        Ok((
215            ext::SourceInfoType {
216                id: EntityGlobalIdProto { zid, eid },
217                sn,
218            },
219            more,
220        ))
221    }
222}
223
224// Extension: Shm
225#[cfg(feature = "shared-memory")]
226impl<W, const ID: u8> WCodec<(&ext::ShmType<{ ID }>, bool), &mut W> for Zenoh080
227where
228    W: Writer,
229{
230    type Output = Result<(), DidntWrite>;
231
232    fn write(self, writer: &mut W, x: (&ext::ShmType<{ ID }>, bool)) -> Self::Output {
233        let (x, more) = x;
234        let ext::ShmType = x;
235
236        let header: ZExtUnit<{ ID }> = ZExtUnit::new();
237        self.write(&mut *writer, (&header, more))?;
238        Ok(())
239    }
240}
241
242#[cfg(feature = "shared-memory")]
243impl<R, const ID: u8> RCodec<(ext::ShmType<{ ID }>, bool), &mut R> for Zenoh080Header
244where
245    R: Reader,
246{
247    type Error = DidntRead;
248
249    fn read(self, reader: &mut R) -> Result<(ext::ShmType<{ ID }>, bool), Self::Error> {
250        let (_, more): (ZExtUnit<{ ID }>, bool) = self.read(&mut *reader)?;
251        Ok((ext::ShmType, more))
252    }
253}
254
255// Extension ValueType
256impl<W, const VID: u8, const SID: u8> WCodec<(&ext::ValueType<{ VID }, { SID }>, bool), &mut W>
257    for Zenoh080
258where
259    W: Writer,
260{
261    type Output = Result<(), DidntWrite>;
262
263    fn write(self, writer: &mut W, x: (&ext::ValueType<{ VID }, { SID }>, bool)) -> Self::Output {
264        let (x, more) = x;
265        let ext::ValueType {
266            encoding,
267            payload,
268            #[cfg(feature = "shared-memory")]
269            ext_shm,
270        } = x;
271
272        #[cfg(feature = "shared-memory")] // Write Shm extension if present
273        if let Some(eshm) = ext_shm.as_ref() {
274            self.write(&mut *writer, (eshm, true))?;
275        }
276
277        // Compute extension length
278        let mut len = self.w_len(encoding);
279
280        #[cfg(feature = "shared-memory")]
281        {
282            let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
283            len += codec.w_len(payload);
284        }
285
286        #[cfg(not(feature = "shared-memory"))]
287        {
288            let codec = Zenoh080Bounded::<u32>::new();
289            len += codec.w_len(payload);
290        }
291
292        // Write ZExtBuf header
293        let header: ZExtZBufHeader<{ VID }> = ZExtZBufHeader::new(len);
294        self.write(&mut *writer, (&header, more))?;
295
296        // Write encoding
297        self.write(&mut *writer, encoding)?;
298
299        // Write payload
300        fn write<W>(writer: &mut W, payload: &ZBuf) -> Result<(), DidntWrite>
301        where
302            W: Writer,
303        {
304            // Don't write the length since it is already included in the header
305            for s in payload.zslices() {
306                writer.write_zslice(s)?;
307            }
308            Ok(())
309        }
310
311        #[cfg(feature = "shared-memory")]
312        {
313            if ext_shm.is_some() {
314                let codec = Zenoh080Sliced::<u32>::new(true);
315                codec.write(&mut *writer, payload)?;
316            } else {
317                write(&mut *writer, payload)?;
318            }
319        }
320
321        #[cfg(not(feature = "shared-memory"))]
322        {
323            write(&mut *writer, payload)?;
324        }
325
326        Ok(())
327    }
328}
329
330impl<R, const VID: u8, const SID: u8> RCodec<(ext::ValueType<{ VID }, { SID }>, bool), &mut R>
331    for Zenoh080Header
332where
333    R: Reader,
334{
335    type Error = DidntRead;
336
337    fn read(
338        #[allow(unused_mut)] mut self,
339        reader: &mut R,
340    ) -> Result<(ext::ValueType<{ VID }, { SID }>, bool), Self::Error> {
341        #[cfg(feature = "shared-memory")]
342        let ext_shm = if iext::eid(self.header) == SID {
343            self.header = self.codec.read(&mut *reader)?;
344            Some(ext::ShmType)
345        } else {
346            None
347        };
348        let (header, more): (ZExtZBufHeader<{ VID }>, bool) = self.read(&mut *reader)?;
349
350        // Read encoding
351        let start = reader.remaining();
352        let encoding: Encoding = self.codec.read(&mut *reader)?;
353        let end = reader.remaining();
354
355        // Read payload
356        fn read<R>(reader: &mut R, len: usize) -> Result<ZBuf, DidntRead>
357        where
358            R: Reader,
359        {
360            reader.read_zbuf(len)
361        }
362
363        // Calculate how many bytes are left in the payload
364        let len = header.len - (start - end);
365
366        let payload: ZBuf = {
367            #[cfg(feature = "shared-memory")]
368            {
369                if ext_shm.is_some() {
370                    let codec = Zenoh080Sliced::<u32>::new(true);
371                    let payload: ZBuf = codec.read(&mut *reader)?;
372                    payload
373                } else {
374                    read(&mut *reader, len)?
375                }
376            }
377
378            #[cfg(not(feature = "shared-memory"))]
379            {
380                read(&mut *reader, len)?
381            }
382        };
383
384        Ok((
385            ext::ValueType {
386                #[cfg(feature = "shared-memory")]
387                ext_shm,
388                encoding,
389                payload,
390            },
391            more,
392        ))
393    }
394}
395
396// Extension: Attachment
397impl<W, const ID: u8> WCodec<(&ext::AttachmentType<{ ID }>, bool), &mut W> for Zenoh080
398where
399    W: Writer,
400{
401    type Output = Result<(), DidntWrite>;
402
403    fn write(self, writer: &mut W, x: (&ext::AttachmentType<{ ID }>, bool)) -> Self::Output {
404        let (x, more) = x;
405        let ext::AttachmentType { buffer } = x;
406
407        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(buffer));
408        self.write(&mut *writer, (&header, more))?;
409        for s in buffer.zslices() {
410            writer.write_zslice(s)?;
411        }
412
413        Ok(())
414    }
415}
416
417impl<R, const ID: u8> RCodec<(ext::AttachmentType<{ ID }>, bool), &mut R> for Zenoh080Header
418where
419    R: Reader,
420{
421    type Error = DidntRead;
422
423    fn read(self, reader: &mut R) -> Result<(ext::AttachmentType<{ ID }>, bool), Self::Error> {
424        let (h, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
425        let buffer = reader.read_zbuf(h.len)?;
426
427        Ok((ext::AttachmentType { buffer }, more))
428    }
429}