Struct zenoh_buffers::ZBuf
source · pub struct ZBuf { /* private fields */ }
Expand description
A zenoh buffer.
ZBuf
is a buffer that contains one or more ZSlice
s. It is used
to efficiently send and receive data in zenoh. It provides transparent usage for
both network and shared memory operations through a simple API.
By storing a set of ZSlice
, it is possible to compose the target payload
starting from a set of non-contiguous memory regions. This provides a twofold benefit:
(1) the user can compose the payload in an incremental manner without requiring reallocations
and (2) the payload is received and recomposed as it arrives from the network without reallocating
any receiving buffer.
Example for creating a data buffer:
use zenoh_buffers::{ZBuf, ZSlice, traits::SplitBuffer, traits::buffer::InsertBuffer};
// Create a ZBuf containing a newly allocated vector of bytes.
let zbuf: ZBuf = vec![0_u8; 16].into();
assert_eq!(&vec![0_u8; 16], zbuf.contiguous().as_ref());
// Create a ZBuf containing twice a newly allocated vector of bytes.
// Allocate first a vectore of bytes and convert it into a ZSlice.
let zslice: ZSlice = vec![0_u8; 16].into();
let mut zbuf = ZBuf::default();
zbuf.append(zslice.clone()).unwrap(); // Cloning a ZSlice does not allocate
zbuf.append(zslice).unwrap();
assert_eq!(&vec![0_u8; 32], zbuf.contiguous().as_ref());
Calling contiguous()
allows to acces to the whole payload as a contiguous &[u8]
via the
ZSlice
type. However, this operation has a drawback when the original message was large
enough to cause network fragmentation. Because of that, the actual message payload may have
been received in multiple fragments (i.e. ZSlice
) which are non-contiguous in memory.
use zenoh_buffers::{ZBuf, ZSlice, traits::SplitBuffer, traits::buffer::InsertBuffer};
// Create a ZBuf containing twice a newly allocated vector of bytes.
let zslice: ZSlice = vec![0_u8; 16].into();
let mut zbuf = ZBuf::default();
zbuf.append(zslice.clone());
// contiguous() does not allocate since zbuf contains only one slice
assert_eq!(&vec![0_u8; 16], zbuf.contiguous().as_ref());
// Add a second slice to zbuf
zbuf.append(zslice.clone());
// contiguous() allocates since zbuf contains two slices
assert_eq!(&vec![0_u8; 32], zbuf.contiguous().as_ref());
zslices_num()
returns the number of ZSlice
s the ZBuf
is composed of. If
the returned value is greater than 1, then contiguous()
will allocate. In order to retrieve the
content of the ZBuf
without allocating, it is possible to loop over its ZSlice
s.
Implementations§
source§impl ZBuf
impl ZBuf
sourcepub fn get_zslice(&self, index: usize) -> Option<&ZSlice>
pub fn get_zslice(&self, index: usize) -> Option<&ZSlice>
Examples found in repository?
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
fn copy_bytes(&self, bs: &mut [u8], mut pos: (usize, usize)) -> usize {
let len = bs.len();
let mut written = 0;
while written < len {
if let Some(slice) = self.get_zslice(pos.0) {
let remaining = slice.len() - pos.1;
let to_read = remaining.min(bs.len() - written);
bs[written..written + to_read]
.copy_from_slice(&slice.as_slice()[pos.1..pos.1 + to_read]);
written += to_read;
pos = (pos.0 + 1, 0);
} else {
return written;
}
}
written
}
#[cfg(feature = "shared-memory")]
#[inline(always)]
pub fn has_shminfo(&self) -> bool {
self.has_shminfo
}
#[cfg(feature = "shared-memory")]
#[inline(always)]
pub fn has_shmbuf(&self) -> bool {
self.has_shmbuf
}
#[cfg(feature = "shared-memory")]
#[inline(never)]
pub fn map_to_shmbuf(&mut self, shmr: Arc<RwLock<SharedMemoryReader>>) -> ZResult<bool> {
if !self.has_shminfo() {
return Ok(false);
}
let mut new_len = 0;
let mut res = false;
match &mut self.slices {
ZBufInner::Single(s) => {
res = s.map_to_shmbuf(shmr)?;
new_len += s.len();
}
ZBufInner::Multiple(m) => {
for s in m.iter_mut() {
res = res || s.map_to_shmbuf(shmr.clone())?;
new_len += s.len();
}
}
ZBufInner::Empty => {}
}
self.len = new_len;
self.has_shminfo = false;
self.has_shmbuf = true;
Ok(res)
}
#[cfg(feature = "shared-memory")]
#[inline(never)]
pub fn map_to_shminfo(&mut self) -> ZResult<bool> {
if !self.has_shmbuf() {
return Ok(false);
}
let mut new_len = 0;
let mut res = false;
match &mut self.slices {
ZBufInner::Single(s) => {
res = s.map_to_shminfo()?;
new_len = s.len();
}
ZBufInner::Multiple(m) => {
for s in m.iter_mut() {
res = res || s.map_to_shminfo()?;
new_len += s.len();
}
}
ZBufInner::Empty => {}
}
self.has_shminfo = true;
self.has_shmbuf = false;
self.len = new_len;
Ok(res)
}
}
impl fmt::Display for ZBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ZBuf{{ content: ",)?;
match &self.slices {
ZBufInner::Single(s) => write!(f, "{}", hex::encode_upper(s.as_slice()))?,
ZBufInner::Multiple(m) => {
for s in m.iter() {
write!(f, "{}", hex::encode_upper(s.as_slice()))?;
}
}
ZBufInner::Empty => {}
}
write!(f, " }}")
}
}
impl fmt::Debug for ZBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
macro_rules! zsliceprint {
($slice:expr) => {
#[cfg(feature = "shared-memory")]
{
match $slice.buf {
ZSliceBuffer::NetSharedBuffer(_) => write!(f, " BUF:")?,
ZSliceBuffer::NetOwnedBuffer(_) => write!(f, " BUF:")?,
ZSliceBuffer::ShmBuffer(_) => write!(f, " SHM_BUF:")?,
ZSliceBuffer::ShmInfo(_) => write!(f, " SHM_INFO:")?,
}
}
#[cfg(not(feature = "shared-memory"))]
{
write!(f, " BUF:")?;
}
};
}
write!(f, "ZBuf{{ ")?;
write!(f, "slices: [")?;
match &self.slices {
ZBufInner::Single(s) => {
zsliceprint!(s);
write!(f, "{}", hex::encode_upper(s.as_slice()))?;
}
ZBufInner::Multiple(m) => {
for s in m.iter() {
zsliceprint!(s);
write!(f, " {},", hex::encode_upper(s.as_slice()))?;
}
}
ZBufInner::Empty => {
write!(f, " None")?;
}
}
write!(f, " ] }}")
}
}
impl<'a> ZBufReader<'a> {
#[inline(always)]
pub fn reset(&mut self) {
self.read = 0;
self.slice = 0;
self.byte = 0;
}
// Read 'len' bytes from 'self' and add those to 'dest'
// This is 0-copy, only ZSlices from 'self' are added to 'dest', without cloning the original buffer.
pub fn read_into_zbuf(&mut self, dest: &mut ZBuf, len: usize) -> bool {
if self.remaining() < len {
return false;
}
let mut n = len;
while n > 0 {
let pos_1 = self.byte;
let current = self.curr_slice().unwrap();
let slice_len = current.len();
let remain_in_slice = slice_len - pos_1;
let l = n.min(remain_in_slice);
let zs = match current.new_sub_slice(pos_1, pos_1 + l) {
Some(zs) => zs,
None => return false,
};
dest.add_zslice(zs);
self.skip_bytes_no_check(l);
n -= l;
}
true
}
// // Read all the bytes from 'self' and add those to 'dest'
// #[inline(always)]
// pub(crate) fn drain_into_zbuf(&mut self, dest: &mut ZBuf) -> bool {
// self.read_into_zbuf(dest, self.readable())
// }
// Read a subslice of current slice
pub fn read_zslice(&mut self, len: usize) -> Option<ZSlice> {
let slice = self.curr_slice()?;
if len <= slice.len() {
let slice = slice.new_sub_slice(self.byte, self.byte + len)?;
self.skip_bytes_no_check(len);
Some(slice)
} else {
None
}
}
#[inline(always)]
fn curr_slice(&self) -> Option<&ZSlice> {
self.inner.get_zslice(self.slice)
}
pub fn zslices_num(&self) -> usize
pub fn clear(&mut self)
Trait Implementations§
source§impl ConstructibleBuffer for ZBuf
impl ConstructibleBuffer for ZBuf
source§fn with_capacities(slice_capacity: usize, _cache_capacity: usize) -> Self
fn with_capacities(slice_capacity: usize, _cache_capacity: usize) -> Self
slice_capacity
segments without allocating.
It may also accept receiving cached writes for cache_capacity
bytes before needing to reallocate its cache.