use std::ops::{Deref, DerefMut};
use timely_bytes::arc::{Bytes, BytesMut};
pub struct BytesSlab {
buffer: BytesMut, in_progress: Vec<Option<BytesMut>>, stash: Vec<BytesMut>, shift: usize, valid: usize, new_bytes: BytesRefill, }
#[derive(Clone)]
pub struct BytesRefill {
pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>,
pub limit: Option<usize>,
}
impl BytesSlab {
pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
BytesSlab {
buffer: BytesMut::from(BoxDerefMut { boxed: (new_bytes.logic)(1 << shift) }),
in_progress: Vec::new(),
stash: Vec::new(),
shift,
valid: 0,
new_bytes,
}
}
pub fn empty(&mut self) -> &mut [u8] {
&mut self.buffer[self.valid..]
}
pub fn valid(&mut self) -> &mut [u8] {
&mut self.buffer[..self.valid]
}
pub fn make_valid(&mut self, bytes: usize) {
self.valid += bytes;
}
pub fn extract(&mut self, bytes: usize) -> Bytes {
debug_assert!(bytes <= self.valid);
self.valid -= bytes;
self.buffer.extract_to(bytes)
}
pub fn ensure_capacity(&mut self, capacity: usize) {
if self.empty().len() < capacity {
let mut increased_shift = false;
while self.valid + capacity > (1 << self.shift) {
self.shift += 1;
self.stash.clear(); self.in_progress.clear(); increased_shift = true;
}
if self.stash.is_empty() {
for shared in self.in_progress.iter_mut() {
if let Some(mut bytes) = shared.take() {
if bytes.try_regenerate::<BoxDerefMut>() == Some(true) {
if bytes.len() == (1 << self.shift) {
self.stash.push(bytes);
}
}
else {
*shared = Some(bytes);
}
}
}
self.in_progress.retain(|x| x.is_some());
}
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes.logic)(1 << self.shift) }));
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
if let Some(limit) = self.new_bytes.limit {
self.stash.truncate(limit);
}
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
if !increased_shift {
self.in_progress.push(Some(old_buffer));
}
}
}
}
struct BoxDerefMut {
boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
}
impl Deref for BoxDerefMut {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.boxed[..]
}
}
impl DerefMut for BoxDerefMut {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.boxed[..]
}
}