timely_communication/allocator/zero_copy/
bytes_slab.rs

1//! A large binary allocation for writing and sharing.
2
3use bytes::arc::Bytes;
4
5/// A large binary allocation for writing and sharing.
6///
7/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
8/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
9/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
10pub struct BytesSlab {
11    buffer:         Bytes,                      // current working buffer.
12    in_progress:    Vec<Option<Bytes>>,         // buffers shared with workers.
13    stash:          Vec<Bytes>,                 // reclaimed and resuable buffers.
14    shift:          usize,                      // current buffer allocation size.
15    valid:          usize,                      // buffer[..valid] are valid bytes.
16}
17
18impl BytesSlab {
19    /// Allocates a new `BytesSlab` with an initial size determined by a shift.
20    pub fn new(shift: usize) -> Self {
21        BytesSlab {
22            buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
23            in_progress: Vec::new(),
24            stash: Vec::new(),
25            shift,
26            valid: 0,
27        }
28    }
29    /// The empty region of the slab.
30    pub fn empty(&mut self) -> &mut [u8] {
31        &mut self.buffer[self.valid..]
32    }
33    /// The valid region of the slab.
34    pub fn valid(&mut self) -> &mut [u8] {
35        &mut self.buffer[..self.valid]
36    }
37    /// Marks the next `bytes` bytes as valid.
38    pub fn make_valid(&mut self, bytes: usize) {
39        self.valid += bytes;
40    }
41    /// Extracts the first `bytes` valid bytes.
42    pub fn extract(&mut self, bytes: usize) -> Bytes {
43        debug_assert!(bytes <= self.valid);
44        self.valid -= bytes;
45        self.buffer.extract_to(bytes)
46    }
47
48    /// Ensures that `self.empty().len()` is at least `capacity`.
49    ///
50    /// This method may retire the current buffer if it does not have enough space, in which case
51    /// it will copy any remaining contents into a new buffer. If this would not create enough free
52    /// space, the shift is increased until it is sufficient.
53    pub fn ensure_capacity(&mut self, capacity: usize) {
54
55        if self.empty().len() < capacity {
56
57            let mut increased_shift = false;
58
59            // Increase allocation if copy would be insufficient.
60            while self.valid + capacity > (1 << self.shift) {
61                self.shift += 1;
62                self.stash.clear();         // clear wrongly sized buffers.
63                self.in_progress.clear();   // clear wrongly sized buffers.
64                increased_shift = true;
65            }
66
67            // Attempt to reclaim shared slices.
68            if self.stash.is_empty() {
69                for shared in self.in_progress.iter_mut() {
70                    if let Some(mut bytes) = shared.take() {
71                        if bytes.try_regenerate::<Box<[u8]>>() {
72                            // NOTE: Test should be redundant, but better safe...
73                            if bytes.len() == (1 << self.shift) {
74                                self.stash.push(bytes);
75                            }
76                        }
77                        else {
78                            *shared = Some(bytes);
79                        }
80                    }
81                }
82                self.in_progress.retain(|x| x.is_some());
83            }
84
85            let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
86            let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
87
88            self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
89            if !increased_shift {
90                self.in_progress.push(Some(old_buffer));
91            }
92        }
93    }
94}