use std::{
cmp,
io::{self, Read, Write},
ops, ptr,
sync::atomic::{AtomicUsize, Ordering},
};
static BUFFER_COUNT: AtomicUsize = AtomicUsize::new(0);
pub struct Pool {
pub inner: poule::Pool<BufferMetadata>,
pub buffer_size: usize,
}
impl Pool {
pub fn with_capacity(minimum: usize, maximum: usize, buffer_size: usize) -> Pool {
let mut inner = poule::Pool::with_extra(maximum, buffer_size);
inner.grow_to(minimum);
Pool { inner, buffer_size }
}
pub fn checkout(&mut self) -> Option<Checkout> {
if self.inner.used() == self.inner.capacity()
&& self.inner.capacity() < self.inner.maximum_capacity()
{
self.inner.grow_to(std::cmp::min(
self.inner.capacity() * 2,
self.inner.maximum_capacity(),
));
debug!(
"growing pool capacity from {} to {}",
self.inner.capacity(),
std::cmp::min(self.inner.capacity() * 2, self.inner.maximum_capacity())
);
}
let capacity = self.buffer_size;
self.inner
.checkout(|| {
trace!("initializing a buffer with capacity {}", capacity);
BufferMetadata::new()
})
.map(|c| {
let old_buffer_count = BUFFER_COUNT.fetch_add(1, Ordering::SeqCst);
gauge!("buffer.number", old_buffer_count + 1);
Checkout { inner: c }
})
}
}
impl ops::Deref for Pool {
type Target = poule::Pool<BufferMetadata>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ops::DerefMut for Pool {
fn deref_mut(&mut self) -> &mut poule::Pool<BufferMetadata> {
&mut self.inner
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BufferMetadata {
position: usize,
end: usize,
}
impl Default for BufferMetadata {
fn default() -> Self {
Self::new()
}
}
impl BufferMetadata {
pub fn new() -> BufferMetadata {
BufferMetadata {
position: 0,
end: 0,
}
}
}
pub struct Checkout {
pub inner: poule::Checkout<BufferMetadata>,
}
impl Drop for Checkout {
fn drop(&mut self) {
let old_buffer_count = BUFFER_COUNT.fetch_sub(1, Ordering::SeqCst);
gauge!("buffer.number", old_buffer_count - 1);
}
}
impl Checkout {
pub fn available_data(&self) -> usize {
self.inner.end - self.inner.position
}
pub fn available_space(&self) -> usize {
self.capacity() - self.inner.end
}
pub fn capacity(&self) -> usize {
self.inner.extra().len()
}
pub fn empty(&self) -> bool {
self.inner.position == self.inner.end
}
pub fn consume(&mut self, count: usize) -> usize {
let cnt = cmp::min(count, self.available_data());
self.inner.position += cnt;
if self.inner.position > self.capacity() / 2 {
self.shift();
}
cnt
}
pub fn fill(&mut self, count: usize) -> usize {
let cnt = cmp::min(count, self.available_space());
self.inner.end += cnt;
if self.available_space() < self.available_data() + cnt {
self.shift();
}
cnt
}
pub fn reset(&mut self) {
self.inner.position = 0;
self.inner.end = 0;
}
pub fn sync(&mut self, end: usize, position: usize) {
self.inner.position = position;
self.inner.end = end;
}
pub fn data(&self) -> &[u8] {
&self.inner.extra()[self.inner.position..self.inner.end]
}
pub fn space(&mut self) -> &mut [u8] {
let range = self.inner.end..self.capacity();
&mut self.inner.extra_mut()[range]
}
pub fn shift(&mut self) {
let pos = self.inner.position;
let end = self.inner.end;
if pos > 0 {
unsafe {
let length = end - pos;
ptr::copy(
self.inner.extra()[pos..end].as_ptr(),
self.inner.extra_mut()[..length].as_mut_ptr(),
length,
);
self.inner.position = 0;
self.inner.end = length;
}
}
}
pub fn delete_slice(&mut self, start: usize, length: usize) -> Option<usize> {
if start + length >= self.available_data() {
return None;
}
unsafe {
let begin = self.inner.position + start;
let next_end = self.inner.end - length;
ptr::copy(
self.inner.extra()[begin + length..self.inner.end].as_ptr(),
self.inner.extra_mut()[begin..next_end].as_mut_ptr(),
self.inner.end - (begin + length),
);
self.inner.end = next_end;
}
Some(self.available_data())
}
pub fn replace_slice(&mut self, data: &[u8], start: usize, length: usize) -> Option<usize> {
let data_len = data.len();
if start + length > self.available_data()
|| self.inner.position + start + data_len > self.capacity()
{
return None;
}
unsafe {
let begin = self.inner.position + start;
let slice_end = begin + data_len;
if data_len < length {
ptr::copy(
data.as_ptr(),
self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
data_len,
);
ptr::copy(
self.inner.extra()[start + length..self.inner.end].as_ptr(),
self.inner.extra_mut()[slice_end..].as_mut_ptr(),
self.inner.end - (start + length),
);
self.inner.end -= length - data_len;
} else {
ptr::copy(
self.inner.extra()[start + length..self.inner.end].as_ptr(),
self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
self.inner.end - (start + length),
);
ptr::copy(
data.as_ptr(),
self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
data_len,
);
self.inner.end += data_len - length;
}
}
Some(self.available_data())
}
pub fn insert_slice(&mut self, data: &[u8], start: usize) -> Option<usize> {
let data_len = data.len();
if start > self.available_data()
|| self.inner.position + self.inner.end + data_len > self.capacity()
{
return None;
}
unsafe {
let begin = self.inner.position + start;
let slice_end = begin + data_len;
ptr::copy(
self.inner.extra()[start..self.inner.end].as_ptr(),
self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
self.inner.end - start,
);
ptr::copy(
data.as_ptr(),
self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
data_len,
);
self.inner.end += data_len;
}
Some(self.available_data())
}
}
impl Write for Checkout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.space().write(buf) {
Ok(size) => {
self.fill(size);
Ok(size)
}
err => err,
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Read for Checkout {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(self.available_data(), buf.len());
unsafe {
ptr::copy(
self.inner.extra()[self.inner.position..self.inner.position + len].as_ptr(),
buf.as_mut_ptr(),
len,
);
self.inner.position += len;
}
Ok(len)
}
}