Skip to main content

zenoh/api/
bytes.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ZBytes primitives.
16use std::{borrow::Cow, fmt::Debug, mem, str::Utf8Error};
17
18use zenoh_buffers::{
19    buffer::{Buffer, SplitBuffer},
20    reader::{HasReader, Reader},
21    ZBuf, ZBufReader, ZSlice, ZSliceBuffer,
22};
23use zenoh_protocol::zenoh::ext::AttachmentType;
24
25/// Technical wrapper type for API ergonomicity.
26///
27/// It allows any type `T` implementing `Into<ZBytes>` to be converted into `Option<ZBytes>`.
28///
29/// This type is unlikely to be used explicitly by the user. Its purpose is to allow passing types
30/// like `&str`, `String`, `&[u8]`, `Vec<u8>`, etc. to API methods
31/// that accept an optional payload, like the [`attachment`](crate::pubsub::PublicationBuilder::attachment).
32///
33/// # Examples
34/// ```no_run
35/// # #[tokio::main]
36/// # async fn main() {
37/// # let session = zenoh::open(zenoh::Config::default()).await.unwrap();
38/// let publisher = session.declare_publisher("key/expression").await.unwrap();
39/// // Set an attachment value directly
40/// publisher.put("value").attachment("metadata").await.unwrap();
41/// // Set an attachment value from the variable, which can contain the attachment data or not.
42/// let maybe_attachment: Option<String> = Some("metadata".to_string());
43/// publisher.put("value").attachment(maybe_attachment).await.unwrap();
44/// # }
45/// ```
46#[repr(transparent)]
47#[derive(Clone, Debug, Default, PartialEq, Eq)]
48pub struct OptionZBytes(Option<ZBytes>);
49
50impl<T> From<T> for OptionZBytes
51where
52    T: Into<ZBytes>,
53{
54    fn from(value: T) -> Self {
55        Self(Some(value.into()))
56    }
57}
58
59impl<T> From<Option<T>> for OptionZBytes
60where
61    T: Into<ZBytes>,
62{
63    fn from(mut value: Option<T>) -> Self {
64        match value.take() {
65            Some(v) => Self(Some(v.into())),
66            None => Self(None),
67        }
68    }
69}
70
71impl<T> From<&Option<T>> for OptionZBytes
72where
73    for<'a> &'a T: Into<ZBytes>,
74{
75    fn from(value: &Option<T>) -> Self {
76        match value.as_ref() {
77            Some(v) => Self(Some(v.into())),
78            None => Self(None),
79        }
80    }
81}
82
83impl From<OptionZBytes> for Option<ZBytes> {
84    fn from(value: OptionZBytes) -> Self {
85        value.0
86    }
87}
88
89/// ZBytes contains the raw bytes data.
90///
91/// This type is intended to represent the data payload with minimized copying.
92/// Zenoh may construct a single `ZBytes` instance from pointers to multiple buffers
93/// in cases where data is received fragmented from the network.
94///
95/// To directly access raw data as a contiguous slice, it is preferable to convert `ZBytes` into a [`std::borrow::Cow<[u8]>`] using [`to_bytes`](Self::to_bytes).
96/// If `ZBytes` contains all the data in a single memory location, this is guaranteed to be zero-copy. This is the common case for small messages.
97/// If `ZBytes` contains data scattered in different memory regions, this operation will do an allocation and a copy. This is the common case for large messages.
98///
99/// It is also possible to iterate over the raw data that may be scattered across different memory regions using [`slices`](Self::slices).
100///
101/// Another way to access raw data is to use a [`ZBytesReader`] obtained from [`reader`](Self::reader)
102/// that implements the standard [`std::io::Read`] trait. This is useful when deserializing data using
103/// libraries that operate on `std::io::Read`.
104///
105/// The creation of a `ZBytes` instance using the [`std::io::Write`] trait is also possible using the static
106/// [`writer`](Self::writer) method that creates a [`ZBytesWriter`].
107///
108/// # Examples
109///
110/// `ZBytes` can be converted from/to raw bytes:
111/// ```rust
112/// use std::borrow::Cow;
113/// use zenoh::bytes::ZBytes;
114///
115/// let buf = b"some raw bytes";
116/// let payload = ZBytes::from(buf);
117/// assert_eq!(payload.to_bytes(), buf.as_slice());
118/// ```
119///
120/// Create a `ZBytes` with a writer and read it back with a reader:
121/// ```rust
122/// use std::io::{Read, Write};
123/// use zenoh::bytes::ZBytes;
124///
125/// let mut writer = ZBytes::writer();
126/// writer.write_all(b"some raw bytes").unwrap();
127/// let payload = writer.finish();
128/// let mut reader = payload.reader();
129/// let mut buf = [0; 14];
130/// reader.read_exact(&mut buf).unwrap();
131/// assert_eq!(&buf, b"some raw bytes");
132/// ```
133#[repr(transparent)]
134#[derive(Clone, Debug, Default, PartialEq, Eq)]
135pub struct ZBytes(ZBuf);
136
137impl ZBytes {
138    /// Create an empty ZBytes.
139    pub const fn new() -> Self {
140        Self(ZBuf::empty())
141    }
142
143    /// Returns whether the [`ZBytes`] is empty or not.
144    pub fn is_empty(&self) -> bool {
145        self.0.is_empty()
146    }
147
148    /// Returns the total number of bytes in the [`ZBytes`].
149    pub fn len(&self) -> usize {
150        self.0.len()
151    }
152
153    /// Access raw bytes contained in the [`ZBytes`].
154    ///
155    /// In the case `ZBytes` contains non-contiguous regions of memory, an allocation and a copy
156    /// will be done; that's why the method returns a [`Cow`].
157    /// It's also possible to use [`ZBytes::slices`] instead to avoid this copy.
158    pub fn to_bytes(&self) -> Cow<'_, [u8]> {
159        self.0.contiguous()
160    }
161
162    /// Tries to access a string contained in the [`ZBytes`], and fails if it contains non-UTF-8 bytes.
163    ///
164    /// In the case `ZBytes` contains non-contiguous regions of memory, an allocation and a copy
165    /// will be done; that's why the method returns a [`Cow`].
166    /// It's also possible to use [`ZBytes::slices`] instead to avoid this copy, but then the UTF-8
167    /// check has to be done manually.
168    pub fn try_to_string(&self) -> Result<Cow<'_, str>, Utf8Error> {
169        Ok(match self.to_bytes() {
170            Cow::Borrowed(s) => std::str::from_utf8(s)?.into(),
171            Cow::Owned(v) => String::from_utf8(v).map_err(|err| err.utf8_error())?.into(),
172        })
173    }
174
175    /// Get a [`ZBytesReader`] implementing the [`std::io::Read`] trait.
176    ///
177    /// See [`ZBytesWriter`] on how to chain the deserialization of different types from a single [`ZBytes`].
178    pub fn reader(&self) -> ZBytesReader<'_> {
179        ZBytesReader(self.0.reader())
180    }
181
182    /// Build a [`ZBytes`] from a generic reader implementing [`std::io::Read`]. This operation copies data from the reader.
183    pub fn from_reader<R>(mut reader: R) -> Result<Self, std::io::Error>
184    where
185        R: std::io::Read,
186    {
187        let mut buf: Vec<u8> = vec![];
188        reader.read_to_end(&mut buf)?;
189        Ok(buf.into())
190    }
191
192    /// Get a [`ZBytesWriter`] implementing [`std::io::Write`] trait.
193    ///
194    /// See [`ZBytesWriter`] on how to chain the serialization of different types into a single [`ZBytes`].
195    pub fn writer() -> ZBytesWriter {
196        ZBytesWriter {
197            zbuf: ZBuf::empty(),
198            vec: Vec::new(),
199        }
200    }
201
202    /// Return an iterator over raw byte slices contained in the [`ZBytes`].
203    ///
204    /// [`ZBytes`] may store data in non-contiguous regions of memory; this iterator
205    /// then allows accessing raw data directly without any attempt at deserializing it.
206    /// Please note that no guarantee is provided on the internal memory layout of [`ZBytes`].
207    /// The only provided guarantee is that the byte order is preserved.
208    ///
209    /// ```rust
210    /// use std::io::Write;
211    /// use zenoh::bytes::ZBytes;
212    ///
213    /// let buf1: Vec<u8> = vec![1, 2, 3];
214    /// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
215    /// let mut writer = ZBytes::writer();
216    /// writer.write(&buf1);
217    /// writer.write(&buf2);
218    /// let zbytes = writer.finish();
219    ///
220    /// // Access the raw content
221    /// for slice in zbytes.slices() {
222    ///     println!("{:02x?}", slice);
223    /// }
224    ///
225    /// // Concatenate input in a single vector
226    /// let buf: Vec<u8> = buf1.into_iter().chain(buf2.into_iter()).collect();
227    /// // Concatenate raw bytes in a single vector
228    /// let out: Vec<u8> = zbytes.slices().fold(Vec::new(), |mut b, x| { b.extend_from_slice(x); b });
229    /// // The previous line is the equivalent of
230    /// // let out: Vec<u8> = zbs.into();
231    /// assert_eq!(buf, out);
232    /// ```
233    ///
234    /// The example below shows how the [`ZBytesWriter::append`] simply appends the slices of one [`ZBytes`]
235    /// to another and how those slices can be iterated over to access the raw data.
236    /// ```rust
237    /// use std::io::Write;
238    /// use zenoh::bytes::ZBytes;
239    ///
240    /// let buf1: Vec<u8> = vec![1, 2, 3];
241    /// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
242    ///
243    /// let mut writer = ZBytes::writer();
244    /// writer.append(ZBytes::from(buf1.clone()));
245    /// writer.append(ZBytes::from(buf2.clone()));
246    /// let zbytes = writer.finish();
247    ///
248    /// let mut iter = zbytes.slices();
249    /// assert_eq!(buf1.as_slice(), iter.next().unwrap());
250    /// assert_eq!(buf2.as_slice(), iter.next().unwrap());
251    /// ```
252    pub fn slices(&self) -> ZBytesSliceIterator<'_> {
253        ZBytesSliceIterator(self.0.slices())
254    }
255}
256#[cfg(all(feature = "unstable", feature = "shared-memory"))]
257const _: () = {
258    use zenoh_shm::{api::buffer::zshm::zshm, ShmBufInner};
259    impl ZBytes {
260        pub fn as_shm(&self) -> Option<&zshm> {
261            let mut zslices = self.0.zslices();
262            let buf = zslices.next()?.downcast_ref::<ShmBufInner>();
263            buf.map(Into::into).filter(|_| zslices.next().is_none())
264        }
265
266        pub fn as_shm_mut(&mut self) -> Option<&mut zshm> {
267            let mut zslices = self.0.zslices_mut();
268            // SAFETY: ShmBufInner cannot change the size of the slice
269            let buf = unsafe { zslices.next()?.downcast_mut::<ShmBufInner>() };
270            buf.map(Into::into).filter(|_| zslices.next().is_none())
271        }
272    }
273};
274
275/// A reader that implements [`std::io::Read`] trait to deserialize from a [`ZBytes`].
276///
277/// The instance of this struct is obtained from the [`ZBytes::reader`] method.
278/// It implements the standard [`std::io::Read`] and [`std::io::Seek`] traits.
279/// This allows using it with libraries that deserialize data from a `std::io::Read`.
280/// Example:
281/// ```rust
282/// use std::io::{Read, Seek, SeekFrom, Write};
283/// use zenoh::bytes::ZBytes;
284/// let mut writer = ZBytes::writer();
285/// writer.write_all(b"some raw bytes").unwrap();
286/// let payload = writer.finish();
287/// let mut reader = payload.reader();
288/// let mut buf = [0; 14];
289/// reader.read_exact(&mut buf).unwrap();
290/// assert_eq!(&buf, b"some raw bytes");
291/// reader.seek(SeekFrom::Start(5)).unwrap();
292/// let mut buf2 = [0; 4];
293/// reader.read_exact(&mut buf2).unwrap();
294/// assert_eq!(&buf2, b"raw ");
295/// ```
296#[repr(transparent)]
297#[derive(Debug)]
298pub struct ZBytesReader<'a>(ZBufReader<'a>);
299
300impl ZBytesReader<'_> {
301    /// Returns the number of bytes that can still be read
302    pub fn remaining(&self) -> usize {
303        self.0.remaining()
304    }
305
306    /// Returns true if no additional bytes can be read
307    pub fn is_empty(&self) -> bool {
308        self.remaining() == 0
309    }
310}
311
312impl std::io::Read for ZBytesReader<'_> {
313    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
314        std::io::Read::read(&mut self.0, buf)
315    }
316}
317
318impl std::io::Seek for ZBytesReader<'_> {
319    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
320        std::io::Seek::seek(&mut self.0, pos)
321    }
322}
323
324/// A writer that implements [`std::io::Write`] trait to serialize into a [`ZBytes`].
325///
326/// The instance of this struct is obtained from the [`ZBytes::writer`] method.
327/// It implements the standard [`std::io::Write`] trait.
328/// This allows using it with libraries that serialize data into a `std::io::Write`.
329/// Example:
330/// ```rust
331/// use std::io::{Read, Write};
332/// use zenoh::bytes::ZBytes;
333/// let mut writer = ZBytes::writer();
334/// writer.write_all(b"some raw bytes").unwrap();
335/// let payload = writer.finish();
336/// let mut reader = payload.reader();
337/// let mut buf = [0; 14];
338/// reader.read_exact(&mut buf).unwrap();
339/// assert_eq!(&buf, b"some raw bytes");
340/// ```
341/// It is also possible to append existing [`ZBytes`] instances by taking ownership of them
342/// using the [`append`](Self::append) method.
343/// This allows composing a [`ZBytes`] out of multiple [`ZBytes`] that may point to different memory regions.
344/// In other words, it allows creating a linear view on different memory regions without copying.
345#[derive(Debug)]
346pub struct ZBytesWriter {
347    zbuf: ZBuf,
348    vec: Vec<u8>,
349}
350
351impl ZBytesWriter {
352    /// Append a [`ZBytes`] to this [`ZBytes`] by taking ownership.
353    /// This allows composing a [`ZBytes`] out of multiple [`ZBytes`] that may point to different memory regions.
354    /// In other words, it allows creating a linear view on different memory regions without copying.
355    ///
356    /// Example:
357    /// ```
358    /// use zenoh::bytes::ZBytes;
359    ///
360    /// let one = ZBytes::from(vec![0, 1]);
361    /// let two = ZBytes::from(vec![2, 3, 4, 5]);
362    /// let three = ZBytes::from(vec![6, 7]);
363    ///
364    /// let mut writer = ZBytes::writer();
365    /// // Append data without copying by passing ownership
366    /// writer.append(one);
367    /// writer.append(two);
368    /// writer.append(three);
369    /// let zbytes = writer.finish();
370    ///
371    /// assert_eq!(zbytes.to_bytes(), vec![0u8, 1, 2, 3, 4, 5, 6, 7]);
372    /// ```
373    pub fn append(&mut self, zbytes: ZBytes) {
374        if !self.vec.is_empty() {
375            self.zbuf.push_zslice(mem::take(&mut self.vec).into());
376        }
377        for zslice in zbytes.0.into_zslices() {
378            self.zbuf.push_zslice(zslice);
379        }
380    }
381
382    pub fn finish(mut self) -> ZBytes {
383        if !self.vec.is_empty() {
384            self.zbuf.push_zslice(self.vec.into());
385        }
386        ZBytes(self.zbuf)
387    }
388}
389
390impl From<ZBytesWriter> for ZBytes {
391    fn from(value: ZBytesWriter) -> Self {
392        value.finish()
393    }
394}
395
396impl std::io::Write for ZBytesWriter {
397    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
398        std::io::Write::write(&mut self.vec, buf)
399    }
400
401    fn flush(&mut self) -> std::io::Result<()> {
402        Ok(())
403    }
404}
405
406/// An iterator to iterate on raw bytes slices contained in a [`ZBytes`].
407///
408/// Example:
409/// ```rust
410/// use std::io::Write;
411/// use zenoh::bytes::ZBytes;
412///
413/// let buf1: Vec<u8> = vec![1, 2, 3];
414/// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
415/// let mut writer = ZBytes::writer();
416/// writer.write(&buf1);
417/// writer.write(&buf2);
418/// let mut zbytes = writer.finish();
419///
420/// // Access the raw content
421/// for slice in zbytes.slices() {
422///     println!("{:02x?}", slice);
423/// }
424///
425/// // Concatenate input in a single vector
426/// let buf: Vec<u8> = buf1.into_iter().chain(buf2.into_iter()).collect();
427/// // Concatenate raw bytes in a single vector
428/// let out: Vec<u8> = zbytes.slices().fold(Vec::new(), |mut b, x| { b.extend_from_slice(x); b });
429/// // The previous line is the equivalent of
430/// // let out: Vec<u8> = zbs.into();
431/// assert_eq!(buf, out);
432/// ```
433#[derive(Debug)]
434pub struct ZBytesSliceIterator<'a>(ZBytesSliceIteratorInner<'a>);
435
436// Typedef to make clippy happy about complex type. Encapsulate inner `ZBufSliceOperator`.
437type ZBytesSliceIteratorInner<'a> =
438    std::iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
439
440impl<'a> Iterator for ZBytesSliceIterator<'a> {
441    type Item = &'a [u8];
442
443    fn next(&mut self) -> Option<Self::Item> {
444        self.0.next()
445    }
446}
447
448impl From<ZBuf> for ZBytes {
449    fn from(value: ZBuf) -> Self {
450        Self(value)
451    }
452}
453impl From<ZBytes> for ZBuf {
454    fn from(value: ZBytes) -> Self {
455        value.0
456    }
457}
458impl<const N: usize> From<[u8; N]> for ZBytes {
459    fn from(value: [u8; N]) -> Self {
460        Self(value.into())
461    }
462}
463impl<const N: usize> From<&[u8; N]> for ZBytes {
464    fn from(value: &[u8; N]) -> Self {
465        value.to_vec().into()
466    }
467}
468impl From<Vec<u8>> for ZBytes {
469    fn from(value: Vec<u8>) -> Self {
470        Self(value.into())
471    }
472}
473impl From<&Vec<u8>> for ZBytes {
474    fn from(value: &Vec<u8>) -> Self {
475        value.clone().into()
476    }
477}
478impl From<&[u8]> for ZBytes {
479    fn from(value: &[u8]) -> Self {
480        value.to_vec().into()
481    }
482}
483impl From<Cow<'_, [u8]>> for ZBytes {
484    fn from(value: Cow<'_, [u8]>) -> Self {
485        value.into_owned().into()
486    }
487}
488impl From<&Cow<'_, [u8]>> for ZBytes {
489    fn from(value: &Cow<'_, [u8]>) -> Self {
490        value.clone().into()
491    }
492}
493impl From<String> for ZBytes {
494    fn from(value: String) -> Self {
495        value.into_bytes().into()
496    }
497}
498impl From<&String> for ZBytes {
499    fn from(value: &String) -> Self {
500        value.clone().into()
501    }
502}
503impl From<&str> for ZBytes {
504    fn from(value: &str) -> Self {
505        value.as_bytes().into()
506    }
507}
508impl From<Cow<'_, str>> for ZBytes {
509    fn from(value: Cow<'_, str>) -> Self {
510        value.into_owned().into()
511    }
512}
513impl From<&Cow<'_, str>> for ZBytes {
514    fn from(value: &Cow<'_, str>) -> Self {
515        value.clone().into()
516    }
517}
518
519// Define a transparent wrapper type to get around Rust's orphan rule.
520// This allows using bytes::Bytes directly as the supporting buffer of a
521// ZSlice resulting in zero-copy and zero-alloc bytes::Bytes serialization.
522#[repr(transparent)]
523#[derive(Debug)]
524struct BytesWrap(bytes::Bytes);
525impl ZSliceBuffer for BytesWrap {
526    fn as_slice(&self) -> &[u8] {
527        &self.0
528    }
529
530    fn as_any(&self) -> &dyn std::any::Any {
531        self
532    }
533
534    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
535        self
536    }
537}
538impl From<bytes::Bytes> for ZBytes {
539    fn from(value: bytes::Bytes) -> Self {
540        Self(BytesWrap(value).into())
541    }
542}
543
544#[cfg(all(feature = "unstable", feature = "shared-memory"))]
545const _: () = {
546    use zenoh_shm::api::buffer::{typed::Typed, zshm::ZShm, zshmmut::ZShmMut};
547
548    impl From<ZShm> for ZBytes {
549        fn from(value: ZShm) -> Self {
550            Self(ZSlice::from(value).into())
551        }
552    }
553    impl From<ZShmMut> for ZBytes {
554        fn from(value: ZShmMut) -> Self {
555            Self(ZSlice::from(value).into())
556        }
557    }
558    impl<T, Buf: Into<ZBytes>> From<Typed<T, Buf>> for ZBytes {
559        fn from(value: Typed<T, Buf>) -> Self {
560            Typed::into_inner(value).into()
561        }
562    }
563};
564
565// Protocol attachment extension
566impl<const ID: u8> From<ZBytes> for AttachmentType<ID> {
567    fn from(this: ZBytes) -> Self {
568        AttachmentType {
569            buffer: this.into(),
570        }
571    }
572}
573
574impl<const ID: u8> From<AttachmentType<ID>> for ZBytes {
575    fn from(this: AttachmentType<ID>) -> Self {
576        this.buffer.into()
577    }
578}