zenoh/api/bytes.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ZBytes primitives.
16use std::{borrow::Cow, fmt::Debug, mem, str::Utf8Error};
17
18use zenoh_buffers::{
19 buffer::{Buffer, SplitBuffer},
20 reader::{HasReader, Reader},
21 ZBuf, ZBufReader, ZSlice, ZSliceBuffer,
22};
23use zenoh_protocol::zenoh::ext::AttachmentType;
24
25/// Technical wrapper type for API ergonomicity.
26///
27/// It allows any type `T` implementing `Into<ZBytes>` to be converted into `Option<ZBytes>`.
28///
29/// This type is unlikely to be used explicitly by the user. Its purpose is to allow passing types
30/// like `&str`, `String`, `&[u8]`, `Vec<u8>`, etc. to API methods
31/// that accept an optional payload, like the [`attachment`](crate::pubsub::PublicationBuilder::attachment).
32///
33/// # Examples
34/// ```no_run
35/// # #[tokio::main]
36/// # async fn main() {
37/// # let session = zenoh::open(zenoh::Config::default()).await.unwrap();
38/// let publisher = session.declare_publisher("key/expression").await.unwrap();
39/// // Set an attachment value directly
40/// publisher.put("value").attachment("metadata").await.unwrap();
41/// // Set an attachment value from the variable, which can contain the attachment data or not.
42/// let maybe_attachment: Option<String> = Some("metadata".to_string());
43/// publisher.put("value").attachment(maybe_attachment).await.unwrap();
44/// # }
45/// ```
46#[repr(transparent)]
47#[derive(Clone, Debug, Default, PartialEq, Eq)]
48pub struct OptionZBytes(Option<ZBytes>);
49
50impl<T> From<T> for OptionZBytes
51where
52 T: Into<ZBytes>,
53{
54 fn from(value: T) -> Self {
55 Self(Some(value.into()))
56 }
57}
58
59impl<T> From<Option<T>> for OptionZBytes
60where
61 T: Into<ZBytes>,
62{
63 fn from(mut value: Option<T>) -> Self {
64 match value.take() {
65 Some(v) => Self(Some(v.into())),
66 None => Self(None),
67 }
68 }
69}
70
71impl<T> From<&Option<T>> for OptionZBytes
72where
73 for<'a> &'a T: Into<ZBytes>,
74{
75 fn from(value: &Option<T>) -> Self {
76 match value.as_ref() {
77 Some(v) => Self(Some(v.into())),
78 None => Self(None),
79 }
80 }
81}
82
83impl From<OptionZBytes> for Option<ZBytes> {
84 fn from(value: OptionZBytes) -> Self {
85 value.0
86 }
87}
88
89/// ZBytes contains the raw bytes data.
90///
91/// This type is intended to represent the data payload with minimized copying.
92/// Zenoh may construct a single `ZBytes` instance from pointers to multiple buffers
93/// in cases where data is received fragmented from the network.
94///
95/// To directly access raw data as a contiguous slice, it is preferable to convert `ZBytes` into a [`std::borrow::Cow<[u8]>`] using [`to_bytes`](Self::to_bytes).
96/// If `ZBytes` contains all the data in a single memory location, this is guaranteed to be zero-copy. This is the common case for small messages.
97/// If `ZBytes` contains data scattered in different memory regions, this operation will do an allocation and a copy. This is the common case for large messages.
98///
99/// It is also possible to iterate over the raw data that may be scattered across different memory regions using [`slices`](Self::slices).
100///
101/// Another way to access raw data is to use a [`ZBytesReader`] obtained from [`reader`](Self::reader)
102/// that implements the standard [`std::io::Read`] trait. This is useful when deserializing data using
103/// libraries that operate on `std::io::Read`.
104///
105/// The creation of a `ZBytes` instance using the [`std::io::Write`] trait is also possible using the static
106/// [`writer`](Self::writer) method that creates a [`ZBytesWriter`].
107///
108/// # Examples
109///
110/// `ZBytes` can be converted from/to raw bytes:
111/// ```rust
112/// use std::borrow::Cow;
113/// use zenoh::bytes::ZBytes;
114///
115/// let buf = b"some raw bytes";
116/// let payload = ZBytes::from(buf);
117/// assert_eq!(payload.to_bytes(), buf.as_slice());
118/// ```
119///
120/// Create a `ZBytes` with a writer and read it back with a reader:
121/// ```rust
122/// use std::io::{Read, Write};
123/// use zenoh::bytes::ZBytes;
124///
125/// let mut writer = ZBytes::writer();
126/// writer.write_all(b"some raw bytes").unwrap();
127/// let payload = writer.finish();
128/// let mut reader = payload.reader();
129/// let mut buf = [0; 14];
130/// reader.read_exact(&mut buf).unwrap();
131/// assert_eq!(&buf, b"some raw bytes");
132/// ```
133#[repr(transparent)]
134#[derive(Clone, Debug, Default, PartialEq, Eq)]
135pub struct ZBytes(ZBuf);
136
137impl ZBytes {
138 /// Create an empty ZBytes.
139 pub const fn new() -> Self {
140 Self(ZBuf::empty())
141 }
142
143 /// Returns whether the [`ZBytes`] is empty or not.
144 pub fn is_empty(&self) -> bool {
145 self.0.is_empty()
146 }
147
148 /// Returns the total number of bytes in the [`ZBytes`].
149 pub fn len(&self) -> usize {
150 self.0.len()
151 }
152
153 /// Access raw bytes contained in the [`ZBytes`].
154 ///
155 /// In the case `ZBytes` contains non-contiguous regions of memory, an allocation and a copy
156 /// will be done; that's why the method returns a [`Cow`].
157 /// It's also possible to use [`ZBytes::slices`] instead to avoid this copy.
158 pub fn to_bytes(&self) -> Cow<'_, [u8]> {
159 self.0.contiguous()
160 }
161
162 /// Tries to access a string contained in the [`ZBytes`], and fails if it contains non-UTF-8 bytes.
163 ///
164 /// In the case `ZBytes` contains non-contiguous regions of memory, an allocation and a copy
165 /// will be done; that's why the method returns a [`Cow`].
166 /// It's also possible to use [`ZBytes::slices`] instead to avoid this copy, but then the UTF-8
167 /// check has to be done manually.
168 pub fn try_to_string(&self) -> Result<Cow<'_, str>, Utf8Error> {
169 Ok(match self.to_bytes() {
170 Cow::Borrowed(s) => std::str::from_utf8(s)?.into(),
171 Cow::Owned(v) => String::from_utf8(v).map_err(|err| err.utf8_error())?.into(),
172 })
173 }
174
175 /// Get a [`ZBytesReader`] implementing the [`std::io::Read`] trait.
176 ///
177 /// See [`ZBytesWriter`] on how to chain the deserialization of different types from a single [`ZBytes`].
178 pub fn reader(&self) -> ZBytesReader<'_> {
179 ZBytesReader(self.0.reader())
180 }
181
182 /// Build a [`ZBytes`] from a generic reader implementing [`std::io::Read`]. This operation copies data from the reader.
183 pub fn from_reader<R>(mut reader: R) -> Result<Self, std::io::Error>
184 where
185 R: std::io::Read,
186 {
187 let mut buf: Vec<u8> = vec![];
188 reader.read_to_end(&mut buf)?;
189 Ok(buf.into())
190 }
191
192 /// Get a [`ZBytesWriter`] implementing [`std::io::Write`] trait.
193 ///
194 /// See [`ZBytesWriter`] on how to chain the serialization of different types into a single [`ZBytes`].
195 pub fn writer() -> ZBytesWriter {
196 ZBytesWriter {
197 zbuf: ZBuf::empty(),
198 vec: Vec::new(),
199 }
200 }
201
202 /// Return an iterator over raw byte slices contained in the [`ZBytes`].
203 ///
204 /// [`ZBytes`] may store data in non-contiguous regions of memory; this iterator
205 /// then allows accessing raw data directly without any attempt at deserializing it.
206 /// Please note that no guarantee is provided on the internal memory layout of [`ZBytes`].
207 /// The only provided guarantee is that the byte order is preserved.
208 ///
209 /// ```rust
210 /// use std::io::Write;
211 /// use zenoh::bytes::ZBytes;
212 ///
213 /// let buf1: Vec<u8> = vec![1, 2, 3];
214 /// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
215 /// let mut writer = ZBytes::writer();
216 /// writer.write(&buf1);
217 /// writer.write(&buf2);
218 /// let zbytes = writer.finish();
219 ///
220 /// // Access the raw content
221 /// for slice in zbytes.slices() {
222 /// println!("{:02x?}", slice);
223 /// }
224 ///
225 /// // Concatenate input in a single vector
226 /// let buf: Vec<u8> = buf1.into_iter().chain(buf2.into_iter()).collect();
227 /// // Concatenate raw bytes in a single vector
228 /// let out: Vec<u8> = zbytes.slices().fold(Vec::new(), |mut b, x| { b.extend_from_slice(x); b });
229 /// // The previous line is the equivalent of
230 /// // let out: Vec<u8> = zbs.into();
231 /// assert_eq!(buf, out);
232 /// ```
233 ///
234 /// The example below shows how the [`ZBytesWriter::append`] simply appends the slices of one [`ZBytes`]
235 /// to another and how those slices can be iterated over to access the raw data.
236 /// ```rust
237 /// use std::io::Write;
238 /// use zenoh::bytes::ZBytes;
239 ///
240 /// let buf1: Vec<u8> = vec![1, 2, 3];
241 /// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
242 ///
243 /// let mut writer = ZBytes::writer();
244 /// writer.append(ZBytes::from(buf1.clone()));
245 /// writer.append(ZBytes::from(buf2.clone()));
246 /// let zbytes = writer.finish();
247 ///
248 /// let mut iter = zbytes.slices();
249 /// assert_eq!(buf1.as_slice(), iter.next().unwrap());
250 /// assert_eq!(buf2.as_slice(), iter.next().unwrap());
251 /// ```
252 pub fn slices(&self) -> ZBytesSliceIterator<'_> {
253 ZBytesSliceIterator(self.0.slices())
254 }
255}
256#[cfg(all(feature = "unstable", feature = "shared-memory"))]
257const _: () = {
258 use zenoh_shm::{api::buffer::zshm::zshm, ShmBufInner};
259 impl ZBytes {
260 pub fn as_shm(&self) -> Option<&zshm> {
261 let mut zslices = self.0.zslices();
262 let buf = zslices.next()?.downcast_ref::<ShmBufInner>();
263 buf.map(Into::into).filter(|_| zslices.next().is_none())
264 }
265
266 pub fn as_shm_mut(&mut self) -> Option<&mut zshm> {
267 let mut zslices = self.0.zslices_mut();
268 // SAFETY: ShmBufInner cannot change the size of the slice
269 let buf = unsafe { zslices.next()?.downcast_mut::<ShmBufInner>() };
270 buf.map(Into::into).filter(|_| zslices.next().is_none())
271 }
272 }
273};
274
275/// A reader that implements [`std::io::Read`] trait to deserialize from a [`ZBytes`].
276///
277/// The instance of this struct is obtained from the [`ZBytes::reader`] method.
278/// It implements the standard [`std::io::Read`] and [`std::io::Seek`] traits.
279/// This allows using it with libraries that deserialize data from a `std::io::Read`.
280/// Example:
281/// ```rust
282/// use std::io::{Read, Seek, SeekFrom, Write};
283/// use zenoh::bytes::ZBytes;
284/// let mut writer = ZBytes::writer();
285/// writer.write_all(b"some raw bytes").unwrap();
286/// let payload = writer.finish();
287/// let mut reader = payload.reader();
288/// let mut buf = [0; 14];
289/// reader.read_exact(&mut buf).unwrap();
290/// assert_eq!(&buf, b"some raw bytes");
291/// reader.seek(SeekFrom::Start(5)).unwrap();
292/// let mut buf2 = [0; 4];
293/// reader.read_exact(&mut buf2).unwrap();
294/// assert_eq!(&buf2, b"raw ");
295/// ```
296#[repr(transparent)]
297#[derive(Debug)]
298pub struct ZBytesReader<'a>(ZBufReader<'a>);
299
300impl ZBytesReader<'_> {
301 /// Returns the number of bytes that can still be read
302 pub fn remaining(&self) -> usize {
303 self.0.remaining()
304 }
305
306 /// Returns true if no additional bytes can be read
307 pub fn is_empty(&self) -> bool {
308 self.remaining() == 0
309 }
310}
311
312impl std::io::Read for ZBytesReader<'_> {
313 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
314 std::io::Read::read(&mut self.0, buf)
315 }
316}
317
318impl std::io::Seek for ZBytesReader<'_> {
319 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
320 std::io::Seek::seek(&mut self.0, pos)
321 }
322}
323
324/// A writer that implements [`std::io::Write`] trait to serialize into a [`ZBytes`].
325///
326/// The instance of this struct is obtained from the [`ZBytes::writer`] method.
327/// It implements the standard [`std::io::Write`] trait.
328/// This allows using it with libraries that serialize data into a `std::io::Write`.
329/// Example:
330/// ```rust
331/// use std::io::{Read, Write};
332/// use zenoh::bytes::ZBytes;
333/// let mut writer = ZBytes::writer();
334/// writer.write_all(b"some raw bytes").unwrap();
335/// let payload = writer.finish();
336/// let mut reader = payload.reader();
337/// let mut buf = [0; 14];
338/// reader.read_exact(&mut buf).unwrap();
339/// assert_eq!(&buf, b"some raw bytes");
340/// ```
341/// It is also possible to append existing [`ZBytes`] instances by taking ownership of them
342/// using the [`append`](Self::append) method.
343/// This allows composing a [`ZBytes`] out of multiple [`ZBytes`] that may point to different memory regions.
344/// In other words, it allows creating a linear view on different memory regions without copying.
345#[derive(Debug)]
346pub struct ZBytesWriter {
347 zbuf: ZBuf,
348 vec: Vec<u8>,
349}
350
351impl ZBytesWriter {
352 /// Append a [`ZBytes`] to this [`ZBytes`] by taking ownership.
353 /// This allows composing a [`ZBytes`] out of multiple [`ZBytes`] that may point to different memory regions.
354 /// In other words, it allows creating a linear view on different memory regions without copying.
355 ///
356 /// Example:
357 /// ```
358 /// use zenoh::bytes::ZBytes;
359 ///
360 /// let one = ZBytes::from(vec![0, 1]);
361 /// let two = ZBytes::from(vec![2, 3, 4, 5]);
362 /// let three = ZBytes::from(vec![6, 7]);
363 ///
364 /// let mut writer = ZBytes::writer();
365 /// // Append data without copying by passing ownership
366 /// writer.append(one);
367 /// writer.append(two);
368 /// writer.append(three);
369 /// let zbytes = writer.finish();
370 ///
371 /// assert_eq!(zbytes.to_bytes(), vec![0u8, 1, 2, 3, 4, 5, 6, 7]);
372 /// ```
373 pub fn append(&mut self, zbytes: ZBytes) {
374 if !self.vec.is_empty() {
375 self.zbuf.push_zslice(mem::take(&mut self.vec).into());
376 }
377 for zslice in zbytes.0.into_zslices() {
378 self.zbuf.push_zslice(zslice);
379 }
380 }
381
382 pub fn finish(mut self) -> ZBytes {
383 if !self.vec.is_empty() {
384 self.zbuf.push_zslice(self.vec.into());
385 }
386 ZBytes(self.zbuf)
387 }
388}
389
390impl From<ZBytesWriter> for ZBytes {
391 fn from(value: ZBytesWriter) -> Self {
392 value.finish()
393 }
394}
395
396impl std::io::Write for ZBytesWriter {
397 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
398 std::io::Write::write(&mut self.vec, buf)
399 }
400
401 fn flush(&mut self) -> std::io::Result<()> {
402 Ok(())
403 }
404}
405
406/// An iterator to iterate on raw bytes slices contained in a [`ZBytes`].
407///
408/// Example:
409/// ```rust
410/// use std::io::Write;
411/// use zenoh::bytes::ZBytes;
412///
413/// let buf1: Vec<u8> = vec![1, 2, 3];
414/// let buf2: Vec<u8> = vec![4, 5, 6, 7, 8];
415/// let mut writer = ZBytes::writer();
416/// writer.write(&buf1);
417/// writer.write(&buf2);
418/// let mut zbytes = writer.finish();
419///
420/// // Access the raw content
421/// for slice in zbytes.slices() {
422/// println!("{:02x?}", slice);
423/// }
424///
425/// // Concatenate input in a single vector
426/// let buf: Vec<u8> = buf1.into_iter().chain(buf2.into_iter()).collect();
427/// // Concatenate raw bytes in a single vector
428/// let out: Vec<u8> = zbytes.slices().fold(Vec::new(), |mut b, x| { b.extend_from_slice(x); b });
429/// // The previous line is the equivalent of
430/// // let out: Vec<u8> = zbs.into();
431/// assert_eq!(buf, out);
432/// ```
433#[derive(Debug)]
434pub struct ZBytesSliceIterator<'a>(ZBytesSliceIteratorInner<'a>);
435
436// Typedef to make clippy happy about complex type. Encapsulate inner `ZBufSliceOperator`.
437type ZBytesSliceIteratorInner<'a> =
438 std::iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
439
440impl<'a> Iterator for ZBytesSliceIterator<'a> {
441 type Item = &'a [u8];
442
443 fn next(&mut self) -> Option<Self::Item> {
444 self.0.next()
445 }
446}
447
448impl From<ZBuf> for ZBytes {
449 fn from(value: ZBuf) -> Self {
450 Self(value)
451 }
452}
453impl From<ZBytes> for ZBuf {
454 fn from(value: ZBytes) -> Self {
455 value.0
456 }
457}
458impl<const N: usize> From<[u8; N]> for ZBytes {
459 fn from(value: [u8; N]) -> Self {
460 Self(value.into())
461 }
462}
463impl<const N: usize> From<&[u8; N]> for ZBytes {
464 fn from(value: &[u8; N]) -> Self {
465 value.to_vec().into()
466 }
467}
468impl From<Vec<u8>> for ZBytes {
469 fn from(value: Vec<u8>) -> Self {
470 Self(value.into())
471 }
472}
473impl From<&Vec<u8>> for ZBytes {
474 fn from(value: &Vec<u8>) -> Self {
475 value.clone().into()
476 }
477}
478impl From<&[u8]> for ZBytes {
479 fn from(value: &[u8]) -> Self {
480 value.to_vec().into()
481 }
482}
483impl From<Cow<'_, [u8]>> for ZBytes {
484 fn from(value: Cow<'_, [u8]>) -> Self {
485 value.into_owned().into()
486 }
487}
488impl From<&Cow<'_, [u8]>> for ZBytes {
489 fn from(value: &Cow<'_, [u8]>) -> Self {
490 value.clone().into()
491 }
492}
493impl From<String> for ZBytes {
494 fn from(value: String) -> Self {
495 value.into_bytes().into()
496 }
497}
498impl From<&String> for ZBytes {
499 fn from(value: &String) -> Self {
500 value.clone().into()
501 }
502}
503impl From<&str> for ZBytes {
504 fn from(value: &str) -> Self {
505 value.as_bytes().into()
506 }
507}
508impl From<Cow<'_, str>> for ZBytes {
509 fn from(value: Cow<'_, str>) -> Self {
510 value.into_owned().into()
511 }
512}
513impl From<&Cow<'_, str>> for ZBytes {
514 fn from(value: &Cow<'_, str>) -> Self {
515 value.clone().into()
516 }
517}
518
519// Define a transparent wrapper type to get around Rust's orphan rule.
520// This allows using bytes::Bytes directly as the supporting buffer of a
521// ZSlice resulting in zero-copy and zero-alloc bytes::Bytes serialization.
522#[repr(transparent)]
523#[derive(Debug)]
524struct BytesWrap(bytes::Bytes);
525impl ZSliceBuffer for BytesWrap {
526 fn as_slice(&self) -> &[u8] {
527 &self.0
528 }
529
530 fn as_any(&self) -> &dyn std::any::Any {
531 self
532 }
533
534 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
535 self
536 }
537}
538impl From<bytes::Bytes> for ZBytes {
539 fn from(value: bytes::Bytes) -> Self {
540 Self(BytesWrap(value).into())
541 }
542}
543
544#[cfg(all(feature = "unstable", feature = "shared-memory"))]
545const _: () = {
546 use zenoh_shm::api::buffer::{typed::Typed, zshm::ZShm, zshmmut::ZShmMut};
547
548 impl From<ZShm> for ZBytes {
549 fn from(value: ZShm) -> Self {
550 Self(ZSlice::from(value).into())
551 }
552 }
553 impl From<ZShmMut> for ZBytes {
554 fn from(value: ZShmMut) -> Self {
555 Self(ZSlice::from(value).into())
556 }
557 }
558 impl<T, Buf: Into<ZBytes>> From<Typed<T, Buf>> for ZBytes {
559 fn from(value: Typed<T, Buf>) -> Self {
560 Typed::into_inner(value).into()
561 }
562 }
563};
564
565// Protocol attachment extension
566impl<const ID: u8> From<ZBytes> for AttachmentType<ID> {
567 fn from(this: ZBytes) -> Self {
568 AttachmentType {
569 buffer: this.into(),
570 }
571 }
572}
573
574impl<const ID: u8> From<AttachmentType<ID>> for ZBytes {
575 fn from(this: AttachmentType<ID>) -> Self {
576 this.buffer.into()
577 }
578}