Struct zenoh_buffers::ZBuf

source ·
pub struct ZBuf { /* private fields */ }
Expand description

A zenoh buffer.

ZBuf is a buffer that contains one or more ZSlices. It is used to efficiently send and receive data in zenoh. It provides transparent usage for both network and shared memory operations through a simple API.

By storing a set of ZSlice, it is possible to compose the target payload starting from a set of non-contiguous memory regions. This provides a twofold benefit: (1) the user can compose the payload in an incremental manner without requiring reallocations and (2) the payload is received and recomposed as it arrives from the network without reallocating any receiving buffer.

Example for creating a data buffer:

use zenoh_buffers::{ZBuf, ZSlice, traits::SplitBuffer, traits::buffer::InsertBuffer};

// Create a ZBuf containing a newly allocated vector of bytes.
let zbuf: ZBuf = vec![0_u8; 16].into();
assert_eq!(&vec![0_u8; 16], zbuf.contiguous().as_ref());

// Create a ZBuf containing twice a newly allocated vector of bytes.
// Allocate first a vectore of bytes and convert it into a ZSlice.
let zslice: ZSlice = vec![0_u8; 16].into();

let mut zbuf = ZBuf::default();
zbuf.append(zslice.clone()).unwrap(); // Cloning a ZSlice does not allocate
zbuf.append(zslice).unwrap();

assert_eq!(&vec![0_u8; 32], zbuf.contiguous().as_ref());

Calling contiguous() allows to acces to the whole payload as a contiguous &[u8] via the ZSlice type. However, this operation has a drawback when the original message was large enough to cause network fragmentation. Because of that, the actual message payload may have been received in multiple fragments (i.e. ZSlice) which are non-contiguous in memory.

use zenoh_buffers::{ZBuf, ZSlice, traits::SplitBuffer, traits::buffer::InsertBuffer};

// Create a ZBuf containing twice a newly allocated vector of bytes.
let zslice: ZSlice = vec![0_u8; 16].into();
let mut zbuf = ZBuf::default();
zbuf.append(zslice.clone());

// contiguous() does not allocate since zbuf contains only one slice
assert_eq!(&vec![0_u8; 16], zbuf.contiguous().as_ref());

// Add a second slice to zbuf
zbuf.append(zslice.clone());

// contiguous() allocates since zbuf contains two slices
assert_eq!(&vec![0_u8; 32], zbuf.contiguous().as_ref());

zslices_num() returns the number of ZSlices the ZBuf is composed of. If the returned value is greater than 1, then contiguous() will allocate. In order to retrieve the content of the ZBuf without allocating, it is possible to loop over its ZSlices.

Implementations§

Examples found in repository?
src/zbuf.rs (line 213)
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
    fn copy_bytes(&self, bs: &mut [u8], mut pos: (usize, usize)) -> usize {
        let len = bs.len();

        let mut written = 0;
        while written < len {
            if let Some(slice) = self.get_zslice(pos.0) {
                let remaining = slice.len() - pos.1;
                let to_read = remaining.min(bs.len() - written);
                bs[written..written + to_read]
                    .copy_from_slice(&slice.as_slice()[pos.1..pos.1 + to_read]);
                written += to_read;
                pos = (pos.0 + 1, 0);
            } else {
                return written;
            }
        }
        written
    }

    #[cfg(feature = "shared-memory")]
    #[inline(always)]
    pub fn has_shminfo(&self) -> bool {
        self.has_shminfo
    }

    #[cfg(feature = "shared-memory")]
    #[inline(always)]
    pub fn has_shmbuf(&self) -> bool {
        self.has_shmbuf
    }

    #[cfg(feature = "shared-memory")]
    #[inline(never)]
    pub fn map_to_shmbuf(&mut self, shmr: Arc<RwLock<SharedMemoryReader>>) -> ZResult<bool> {
        if !self.has_shminfo() {
            return Ok(false);
        }

        let mut new_len = 0;

        let mut res = false;
        match &mut self.slices {
            ZBufInner::Single(s) => {
                res = s.map_to_shmbuf(shmr)?;
                new_len += s.len();
            }
            ZBufInner::Multiple(m) => {
                for s in m.iter_mut() {
                    res = res || s.map_to_shmbuf(shmr.clone())?;
                    new_len += s.len();
                }
            }
            ZBufInner::Empty => {}
        }
        self.len = new_len;
        self.has_shminfo = false;
        self.has_shmbuf = true;

        Ok(res)
    }

    #[cfg(feature = "shared-memory")]
    #[inline(never)]
    pub fn map_to_shminfo(&mut self) -> ZResult<bool> {
        if !self.has_shmbuf() {
            return Ok(false);
        }

        let mut new_len = 0;

        let mut res = false;
        match &mut self.slices {
            ZBufInner::Single(s) => {
                res = s.map_to_shminfo()?;
                new_len = s.len();
            }
            ZBufInner::Multiple(m) => {
                for s in m.iter_mut() {
                    res = res || s.map_to_shminfo()?;
                    new_len += s.len();
                }
            }
            ZBufInner::Empty => {}
        }
        self.has_shminfo = true;
        self.has_shmbuf = false;
        self.len = new_len;

        Ok(res)
    }
}

impl fmt::Display for ZBuf {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "ZBuf{{ content: ",)?;
        match &self.slices {
            ZBufInner::Single(s) => write!(f, "{}", hex::encode_upper(s.as_slice()))?,
            ZBufInner::Multiple(m) => {
                for s in m.iter() {
                    write!(f, "{}", hex::encode_upper(s.as_slice()))?;
                }
            }
            ZBufInner::Empty => {}
        }
        write!(f, " }}")
    }
}

impl fmt::Debug for ZBuf {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        macro_rules! zsliceprint {
            ($slice:expr) => {
                #[cfg(feature = "shared-memory")]
                {
                    match $slice.buf {
                        ZSliceBuffer::NetSharedBuffer(_) => write!(f, " BUF:")?,
                        ZSliceBuffer::NetOwnedBuffer(_) => write!(f, " BUF:")?,
                        ZSliceBuffer::ShmBuffer(_) => write!(f, " SHM_BUF:")?,
                        ZSliceBuffer::ShmInfo(_) => write!(f, " SHM_INFO:")?,
                    }
                }
                #[cfg(not(feature = "shared-memory"))]
                {
                    write!(f, " BUF:")?;
                }
            };
        }

        write!(f, "ZBuf{{ ")?;
        write!(f, "slices: [")?;
        match &self.slices {
            ZBufInner::Single(s) => {
                zsliceprint!(s);
                write!(f, "{}", hex::encode_upper(s.as_slice()))?;
            }
            ZBufInner::Multiple(m) => {
                for s in m.iter() {
                    zsliceprint!(s);
                    write!(f, " {},", hex::encode_upper(s.as_slice()))?;
                }
            }
            ZBufInner::Empty => {
                write!(f, " None")?;
            }
        }
        write!(f, " ] }}")
    }
}

impl<'a> ZBufReader<'a> {
    #[inline(always)]
    pub fn reset(&mut self) {
        self.read = 0;
        self.slice = 0;
        self.byte = 0;
    }
    // Read 'len' bytes from 'self' and add those to 'dest'
    // This is 0-copy, only ZSlices from 'self' are added to 'dest', without cloning the original buffer.
    pub fn read_into_zbuf(&mut self, dest: &mut ZBuf, len: usize) -> bool {
        if self.remaining() < len {
            return false;
        }
        let mut n = len;
        while n > 0 {
            let pos_1 = self.byte;
            let current = self.curr_slice().unwrap();
            let slice_len = current.len();
            let remain_in_slice = slice_len - pos_1;
            let l = n.min(remain_in_slice);
            let zs = match current.new_sub_slice(pos_1, pos_1 + l) {
                Some(zs) => zs,
                None => return false,
            };
            dest.add_zslice(zs);
            self.skip_bytes_no_check(l);
            n -= l;
        }
        true
    }
    // // Read all the bytes from 'self' and add those to 'dest'
    // #[inline(always)]
    // pub(crate) fn drain_into_zbuf(&mut self, dest: &mut ZBuf) -> bool {
    //     self.read_into_zbuf(dest, self.readable())
    // }
    // Read a subslice of current slice
    pub fn read_zslice(&mut self, len: usize) -> Option<ZSlice> {
        let slice = self.curr_slice()?;
        if len <= slice.len() {
            let slice = slice.new_sub_slice(self.byte, self.byte + len)?;
            self.skip_bytes_no_check(len);
            Some(slice)
        } else {
            None
        }
    }
    #[inline(always)]
    fn curr_slice(&self) -> Option<&ZSlice> {
        self.inner.get_zslice(self.slice)
    }

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Constructs a split buffer that may accept slice_capacity segments without allocating. It may also accept receiving cached writes for cache_capacity bytes before needing to reallocate its cache.
Formats the value using the given formatter. Read more
Returns the “default value” for a type. Read more
Formats the value using the given formatter. Read more
Converts to this type from the input type.
Converts to this type from the input type.
Converts to this type from the input type.
Converts to this type from the input type.
Converts to this type from the input type.
Returns the most appropriate reader for self
Appends a slice to the buffer without copying its data.
This method tests for self and other values to be equal, and is used by ==.
This method tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Gets all the slices of this buffer.
Returns true if the buffer has a length of 0.
Returns the number of bytes in the buffer.
Returns all the bytes of this buffer in a conitguous slice. This may require allocation and copy if the original buffer is not contiguous.

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
Converts the given value to a String. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.