zenoh_codec/zenoh/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod del;
15pub mod err;
16pub mod put;
17pub mod query;
18pub mod reply;
19
20use zenoh_buffers::{
21    reader::{DidntRead, Reader},
22    writer::{DidntWrite, Writer},
23    ZBuf,
24};
25#[cfg(feature = "shared-memory")]
26use zenoh_protocol::common::{iext, ZExtUnit};
27use zenoh_protocol::{
28    common::{imsg, ZExtZBufHeader},
29    core::{Encoding, EntityGlobalIdProto, EntityId, ZenohIdProto},
30    zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
31};
32
33#[cfg(not(feature = "shared-memory"))]
34use crate::Zenoh080Bounded;
35#[cfg(feature = "shared-memory")]
36use crate::Zenoh080Sliced;
37use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
38
39// Push
40impl<W> WCodec<&PushBody, &mut W> for Zenoh080
41where
42    W: Writer,
43{
44    type Output = Result<(), DidntWrite>;
45
46    fn write(self, writer: &mut W, x: &PushBody) -> Self::Output {
47        match x {
48            PushBody::Put(b) => self.write(&mut *writer, b),
49            PushBody::Del(b) => self.write(&mut *writer, b),
50        }
51    }
52}
53
54impl<R> RCodec<PushBody, &mut R> for Zenoh080
55where
56    R: Reader,
57{
58    type Error = DidntRead;
59
60    fn read(self, reader: &mut R) -> Result<PushBody, Self::Error> {
61        let header: u8 = self.read(&mut *reader)?;
62
63        let codec = Zenoh080Header::new(header);
64        let body = match imsg::mid(codec.header) {
65            id::PUT => PushBody::Put(codec.read(&mut *reader)?),
66            id::DEL => PushBody::Del(codec.read(&mut *reader)?),
67            _ => return Err(DidntRead),
68        };
69
70        Ok(body)
71    }
72}
73
74// Request
75impl<W> WCodec<&RequestBody, &mut W> for Zenoh080
76where
77    W: Writer,
78{
79    type Output = Result<(), DidntWrite>;
80
81    fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
82        match x {
83            RequestBody::Query(b) => self.write(&mut *writer, b),
84        }
85    }
86}
87
88impl<R> RCodec<RequestBody, &mut R> for Zenoh080
89where
90    R: Reader,
91{
92    type Error = DidntRead;
93
94    fn read(self, reader: &mut R) -> Result<RequestBody, Self::Error> {
95        let header: u8 = self.read(&mut *reader)?;
96
97        let codec = Zenoh080Header::new(header);
98        let body = match imsg::mid(codec.header) {
99            id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
100            _ => return Err(DidntRead),
101        };
102
103        Ok(body)
104    }
105}
106
107// Response
108impl<W> WCodec<&ResponseBody, &mut W> for Zenoh080
109where
110    W: Writer,
111{
112    type Output = Result<(), DidntWrite>;
113
114    fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
115        match x {
116            ResponseBody::Reply(b) => self.write(&mut *writer, b),
117            ResponseBody::Err(b) => self.write(&mut *writer, b),
118        }
119    }
120}
121
122impl<R> RCodec<ResponseBody, &mut R> for Zenoh080
123where
124    R: Reader,
125{
126    type Error = DidntRead;
127
128    fn read(self, reader: &mut R) -> Result<ResponseBody, Self::Error> {
129        let header: u8 = self.read(&mut *reader)?;
130
131        let codec = Zenoh080Header::new(header);
132        let body = match imsg::mid(codec.header) {
133            id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
134            id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
135            _ => return Err(DidntRead),
136        };
137
138        Ok(body)
139    }
140}
141
142// Extension: SourceInfo
143impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
144    fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
145        let ext::SourceInfoType { id, sn } = x;
146
147        1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
148    }
149}
150
151impl<W, const ID: u8> WCodec<(&ext::SourceInfoType<{ ID }>, bool), &mut W> for Zenoh080
152where
153    W: Writer,
154{
155    type Output = Result<(), DidntWrite>;
156
157    fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
158        let (x, more) = x;
159        let ext::SourceInfoType { id, sn } = x;
160
161        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
162        self.write(&mut *writer, (&header, more))?;
163
164        let flags: u8 = (id.zid.size() as u8 - 1) << 4;
165        self.write(&mut *writer, flags)?;
166
167        let lodec = Zenoh080Length::new(id.zid.size());
168        lodec.write(&mut *writer, &id.zid)?;
169
170        self.write(&mut *writer, id.eid)?;
171        self.write(&mut *writer, sn)?;
172        Ok(())
173    }
174}
175
176impl<R, const ID: u8> RCodec<(ext::SourceInfoType<{ ID }>, bool), &mut R> for Zenoh080Header
177where
178    R: Reader,
179{
180    type Error = DidntRead;
181
182    fn read(self, reader: &mut R) -> Result<(ext::SourceInfoType<{ ID }>, bool), Self::Error> {
183        let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
184
185        let flags: u8 = self.codec.read(&mut *reader)?;
186        let length = 1 + ((flags >> 4) as usize);
187
188        let lodec = Zenoh080Length::new(length);
189        let zid: ZenohIdProto = lodec.read(&mut *reader)?;
190
191        let eid: EntityId = self.codec.read(&mut *reader)?;
192        let sn: u32 = self.codec.read(&mut *reader)?;
193
194        Ok((
195            ext::SourceInfoType {
196                id: EntityGlobalIdProto { zid, eid },
197                sn,
198            },
199            more,
200        ))
201    }
202}
203
204// Extension: Shm
205#[cfg(feature = "shared-memory")]
206impl<W, const ID: u8> WCodec<(&ext::ShmType<{ ID }>, bool), &mut W> for Zenoh080
207where
208    W: Writer,
209{
210    type Output = Result<(), DidntWrite>;
211
212    fn write(self, writer: &mut W, x: (&ext::ShmType<{ ID }>, bool)) -> Self::Output {
213        let (x, more) = x;
214        let ext::ShmType = x;
215
216        let header: ZExtUnit<{ ID }> = ZExtUnit::new();
217        self.write(&mut *writer, (&header, more))?;
218        Ok(())
219    }
220}
221
222#[cfg(feature = "shared-memory")]
223impl<R, const ID: u8> RCodec<(ext::ShmType<{ ID }>, bool), &mut R> for Zenoh080Header
224where
225    R: Reader,
226{
227    type Error = DidntRead;
228
229    fn read(self, reader: &mut R) -> Result<(ext::ShmType<{ ID }>, bool), Self::Error> {
230        let (_, more): (ZExtUnit<{ ID }>, bool) = self.read(&mut *reader)?;
231        Ok((ext::ShmType, more))
232    }
233}
234
235// Extension ValueType
236impl<W, const VID: u8, const SID: u8> WCodec<(&ext::ValueType<{ VID }, { SID }>, bool), &mut W>
237    for Zenoh080
238where
239    W: Writer,
240{
241    type Output = Result<(), DidntWrite>;
242
243    fn write(self, writer: &mut W, x: (&ext::ValueType<{ VID }, { SID }>, bool)) -> Self::Output {
244        let (x, more) = x;
245        let ext::ValueType {
246            encoding,
247            payload,
248            #[cfg(feature = "shared-memory")]
249            ext_shm,
250        } = x;
251
252        #[cfg(feature = "shared-memory")] // Write Shm extension if present
253        if let Some(eshm) = ext_shm.as_ref() {
254            self.write(&mut *writer, (eshm, true))?;
255        }
256
257        // Compute extension length
258        let mut len = self.w_len(encoding);
259
260        #[cfg(feature = "shared-memory")]
261        {
262            let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
263            len += codec.w_len(payload);
264        }
265
266        #[cfg(not(feature = "shared-memory"))]
267        {
268            let codec = Zenoh080Bounded::<u32>::new();
269            len += codec.w_len(payload);
270        }
271
272        // Write ZExtBuf header
273        let header: ZExtZBufHeader<{ VID }> = ZExtZBufHeader::new(len);
274        self.write(&mut *writer, (&header, more))?;
275
276        // Write encoding
277        self.write(&mut *writer, encoding)?;
278
279        // Write payload
280        fn write<W>(writer: &mut W, payload: &ZBuf) -> Result<(), DidntWrite>
281        where
282            W: Writer,
283        {
284            // Don't write the length since it is already included in the header
285            for s in payload.zslices() {
286                writer.write_zslice(s)?;
287            }
288            Ok(())
289        }
290
291        #[cfg(feature = "shared-memory")]
292        {
293            if ext_shm.is_some() {
294                let codec = Zenoh080Sliced::<u32>::new(true);
295                codec.write(&mut *writer, payload)?;
296            } else {
297                write(&mut *writer, payload)?;
298            }
299        }
300
301        #[cfg(not(feature = "shared-memory"))]
302        {
303            write(&mut *writer, payload)?;
304        }
305
306        Ok(())
307    }
308}
309
310impl<R, const VID: u8, const SID: u8> RCodec<(ext::ValueType<{ VID }, { SID }>, bool), &mut R>
311    for Zenoh080Header
312where
313    R: Reader,
314{
315    type Error = DidntRead;
316
317    fn read(
318        #[allow(unused_mut)] mut self,
319        reader: &mut R,
320    ) -> Result<(ext::ValueType<{ VID }, { SID }>, bool), Self::Error> {
321        #[cfg(feature = "shared-memory")]
322        let ext_shm = if iext::eid(self.header) == SID {
323            self.header = self.codec.read(&mut *reader)?;
324            Some(ext::ShmType)
325        } else {
326            None
327        };
328        let (header, more): (ZExtZBufHeader<{ VID }>, bool) = self.read(&mut *reader)?;
329
330        // Read encoding
331        let start = reader.remaining();
332        let encoding: Encoding = self.codec.read(&mut *reader)?;
333        let end = reader.remaining();
334
335        // Read payload
336        fn read<R>(reader: &mut R, len: usize) -> Result<ZBuf, DidntRead>
337        where
338            R: Reader,
339        {
340            let mut payload = ZBuf::empty();
341            reader.read_zslices(len, |s| payload.push_zslice(s))?;
342            Ok(payload)
343        }
344
345        // Calculate how many bytes are left in the payload
346        let len = header.len - (start - end);
347
348        let payload: ZBuf = {
349            #[cfg(feature = "shared-memory")]
350            {
351                if ext_shm.is_some() {
352                    let codec = Zenoh080Sliced::<u32>::new(true);
353                    let payload: ZBuf = codec.read(&mut *reader)?;
354                    payload
355                } else {
356                    read(&mut *reader, len)?
357                }
358            }
359
360            #[cfg(not(feature = "shared-memory"))]
361            {
362                read(&mut *reader, len)?
363            }
364        };
365
366        Ok((
367            ext::ValueType {
368                #[cfg(feature = "shared-memory")]
369                ext_shm,
370                encoding,
371                payload,
372            },
373            more,
374        ))
375    }
376}
377
378// Extension: Attachment
379impl<W, const ID: u8> WCodec<(&ext::AttachmentType<{ ID }>, bool), &mut W> for Zenoh080
380where
381    W: Writer,
382{
383    type Output = Result<(), DidntWrite>;
384
385    fn write(self, writer: &mut W, x: (&ext::AttachmentType<{ ID }>, bool)) -> Self::Output {
386        let (x, more) = x;
387        let ext::AttachmentType { buffer } = x;
388
389        let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(buffer));
390        self.write(&mut *writer, (&header, more))?;
391        for s in buffer.zslices() {
392            writer.write_zslice(s)?;
393        }
394
395        Ok(())
396    }
397}
398
399impl<R, const ID: u8> RCodec<(ext::AttachmentType<{ ID }>, bool), &mut R> for Zenoh080Header
400where
401    R: Reader,
402{
403    type Error = DidntRead;
404
405    fn read(self, reader: &mut R) -> Result<(ext::AttachmentType<{ ID }>, bool), Self::Error> {
406        let (h, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
407        let mut buffer = ZBuf::empty();
408        reader.read_zslices(h.len, |s| buffer.push_zslice(s))?;
409
410        Ok((ext::AttachmentType { buffer }, more))
411    }
412}