Skip to main content

zenoh_codec/core/
zbuf.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    buffer::Buffer,
16    reader::{DidntRead, Reader},
17    writer::{DidntWrite, Writer},
18    ZBuf,
19};
20
21use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Bounded};
22
23// ZBuf bounded
24macro_rules! zbuf_impl {
25    ($bound:ty) => {
26        impl LCodec<&ZBuf> for Zenoh080Bounded<$bound> {
27            fn w_len(self, message: &ZBuf) -> usize {
28                message.len()
29            }
30        }
31
32        impl<W> WCodec<&ZBuf, &mut W> for Zenoh080Bounded<$bound>
33        where
34            W: Writer,
35        {
36            type Output = Result<(), DidntWrite>;
37
38            #[inline(always)]
39            fn write(self, writer: &mut W, x: &ZBuf) -> Self::Output {
40                self.write(&mut *writer, x.len())?;
41                for s in x.zslices() {
42                    writer.write_zslice(s)?;
43                }
44                Ok(())
45            }
46        }
47
48        impl<R> RCodec<ZBuf, &mut R> for Zenoh080Bounded<$bound>
49        where
50            R: Reader,
51        {
52            type Error = DidntRead;
53
54            #[inline(always)]
55            fn read(self, reader: &mut R) -> Result<ZBuf, Self::Error> {
56                let len: usize = self.read(&mut *reader)?;
57                reader.read_zbuf(len)
58            }
59        }
60    };
61}
62
63zbuf_impl!(u8);
64zbuf_impl!(u16);
65zbuf_impl!(u32);
66zbuf_impl!(u64);
67zbuf_impl!(usize);
68
69// ZBuf flat
70impl<W> WCodec<&ZBuf, &mut W> for Zenoh080
71where
72    W: Writer,
73{
74    type Output = Result<(), DidntWrite>;
75
76    fn write(self, writer: &mut W, x: &ZBuf) -> Self::Output {
77        let zodec = Zenoh080Bounded::<usize>::new();
78        zodec.write(&mut *writer, x)
79    }
80}
81
82impl<R> RCodec<ZBuf, &mut R> for Zenoh080
83where
84    R: Reader,
85{
86    type Error = DidntRead;
87
88    fn read(self, reader: &mut R) -> Result<ZBuf, Self::Error> {
89        let zodec = Zenoh080Bounded::<usize>::new();
90        zodec.read(&mut *reader)
91    }
92}
93
94impl LCodec<&ZBuf> for Zenoh080 {
95    fn w_len(self, message: &ZBuf) -> usize {
96        let zodec = Zenoh080Bounded::<usize>::new();
97        zodec.w_len(message)
98    }
99}
100
101// ZBuf sliced
102#[cfg(feature = "shared-memory")]
103mod shm {
104    use zenoh_buffers::{ZSlice, ZSliceKind};
105    use zenoh_shm::ShmBufInner;
106
107    use super::*;
108    use crate::Zenoh080Sliced;
109
110    const RAW: u8 = 0;
111    const SHM_PTR: u8 = 1;
112
113    macro_rules! zbuf_sliced_impl {
114        ($bound:ty) => {
115            impl LCodec<&ZBuf> for Zenoh080Sliced<$bound> {
116                fn w_len(self, message: &ZBuf) -> usize {
117                    if self.is_sliced {
118                        message.zslices().fold(0, |acc, x| acc + 1 + x.len())
119                    } else {
120                        self.codec.w_len(message)
121                    }
122                }
123            }
124
125            impl<W> WCodec<&ZBuf, &mut W> for Zenoh080Sliced<$bound>
126            where
127                W: Writer,
128            {
129                type Output = Result<(), DidntWrite>;
130
131                #[inline(always)]
132                fn write(self, writer: &mut W, x: &ZBuf) -> Self::Output {
133                    if self.is_sliced {
134                        self.codec.write(&mut *writer, x.zslices().count())?;
135
136                        for zs in x.zslices() {
137                            match zs.kind {
138                                ZSliceKind::Raw => {
139                                    self.codec.write(&mut *writer, RAW)?;
140                                    self.codec.write(&mut *writer, zs)?;
141                                }
142                                ZSliceKind::ShmPtr => {
143                                    self.codec.write(&mut *writer, SHM_PTR)?;
144                                    let shmb = zs.downcast_ref::<ShmBufInner>().unwrap();
145                                    let mut info = vec![];
146                                    Zenoh080::new().write(&mut &mut info, &shmb.info)?;
147                                    self.codec.write(&mut *writer, &*info)?;
148                                    // Increase the reference count so to keep the ShmBufInner
149                                    // valid until it is received.
150                                    unsafe { shmb.inc_ref_count() };
151                                }
152                            }
153                        }
154                    } else {
155                        self.codec.write(&mut *writer, x)?;
156                    }
157
158                    Ok(())
159                }
160            }
161
162            impl<R> RCodec<ZBuf, &mut R> for Zenoh080Sliced<$bound>
163            where
164                R: Reader,
165            {
166                type Error = DidntRead;
167
168                #[inline(always)]
169                fn read(self, reader: &mut R) -> Result<ZBuf, Self::Error> {
170                    if self.is_sliced {
171                        let num: usize = self.codec.read(&mut *reader)?;
172                        let mut zbuf = ZBuf::empty();
173                        for _ in 0..num {
174                            let kind: u8 = self.codec.read(&mut *reader)?;
175                            match kind {
176                                RAW => {
177                                    let len: usize = self.codec.read(&mut *reader)?;
178                                    reader.read_zslices(len, |s| zbuf.push_zslice(s))?;
179                                }
180                                SHM_PTR => {
181                                    let mut zslice: ZSlice = self.codec.read(&mut *reader)?;
182                                    zslice.kind = ZSliceKind::ShmPtr;
183                                    zbuf.push_zslice(zslice);
184                                }
185                                _ => return Err(DidntRead),
186                            }
187                        }
188                        Ok(zbuf)
189                    } else {
190                        self.codec.read(&mut *reader)
191                    }
192                }
193            }
194        };
195    }
196
197    zbuf_sliced_impl!(u8);
198    zbuf_sliced_impl!(u16);
199    zbuf_sliced_impl!(u32);
200    zbuf_sliced_impl!(u64);
201    zbuf_sliced_impl!(usize);
202}