timely_communication/allocator/zero_copy/bytes_slab.rs
1//! A large binary allocation for writing and sharing.
2
3use bytes::arc::Bytes;
4
5/// A large binary allocation for writing and sharing.
6///
7/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
8/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
9/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
10pub struct BytesSlab {
11 buffer: Bytes, // current working buffer.
12 in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
13 stash: Vec<Bytes>, // reclaimed and resuable buffers.
14 shift: usize, // current buffer allocation size.
15 valid: usize, // buffer[..valid] are valid bytes.
16}
17
18impl BytesSlab {
19 /// Allocates a new `BytesSlab` with an initial size determined by a shift.
20 pub fn new(shift: usize) -> Self {
21 BytesSlab {
22 buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
23 in_progress: Vec::new(),
24 stash: Vec::new(),
25 shift,
26 valid: 0,
27 }
28 }
29 /// The empty region of the slab.
30 pub fn empty(&mut self) -> &mut [u8] {
31 &mut self.buffer[self.valid..]
32 }
33 /// The valid region of the slab.
34 pub fn valid(&mut self) -> &mut [u8] {
35 &mut self.buffer[..self.valid]
36 }
37 /// Marks the next `bytes` bytes as valid.
38 pub fn make_valid(&mut self, bytes: usize) {
39 self.valid += bytes;
40 }
41 /// Extracts the first `bytes` valid bytes.
42 pub fn extract(&mut self, bytes: usize) -> Bytes {
43 debug_assert!(bytes <= self.valid);
44 self.valid -= bytes;
45 self.buffer.extract_to(bytes)
46 }
47
48 /// Ensures that `self.empty().len()` is at least `capacity`.
49 ///
50 /// This method may retire the current buffer if it does not have enough space, in which case
51 /// it will copy any remaining contents into a new buffer. If this would not create enough free
52 /// space, the shift is increased until it is sufficient.
53 pub fn ensure_capacity(&mut self, capacity: usize) {
54
55 if self.empty().len() < capacity {
56
57 let mut increased_shift = false;
58
59 // Increase allocation if copy would be insufficient.
60 while self.valid + capacity > (1 << self.shift) {
61 self.shift += 1;
62 self.stash.clear(); // clear wrongly sized buffers.
63 self.in_progress.clear(); // clear wrongly sized buffers.
64 increased_shift = true;
65 }
66
67 // Attempt to reclaim shared slices.
68 if self.stash.is_empty() {
69 for shared in self.in_progress.iter_mut() {
70 if let Some(mut bytes) = shared.take() {
71 if bytes.try_regenerate::<Box<[u8]>>() {
72 // NOTE: Test should be redundant, but better safe...
73 if bytes.len() == (1 << self.shift) {
74 self.stash.push(bytes);
75 }
76 }
77 else {
78 *shared = Some(bytes);
79 }
80 }
81 }
82 self.in_progress.retain(|x| x.is_some());
83 }
84
85 let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
86 let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
87
88 self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
89 if !increased_shift {
90 self.in_progress.push(Some(old_buffer));
91 }
92 }
93 }
94}