use tokio::sync::mpsc;
use std::mem::take;
use std::ops::{Deref, DerefMut, Range};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Chunk<T> {
buffer: Arc<Vec<T>>,
range: Range<usize>,
recycler: Option<mpsc::UnboundedSender<Vec<T>>>,
}
impl<T> Chunk<T> {
fn new(buffer: Vec<T>, recycler: Option<mpsc::UnboundedSender<Vec<T>>>) -> Self {
let len = buffer.len();
Chunk {
buffer: Arc::new(buffer),
range: 0..len,
recycler,
}
}
pub fn discard_beginning(&mut self, len: usize) {
assert!(len <= self.range.end - self.range.start, "length exceeded");
self.range.start += len;
}
pub fn separate_beginning(&mut self, len: usize) -> Self {
assert!(len <= self.range.end - self.range.start, "length exceeded");
let new_range = self.range.start..self.range.start + len;
self.range.start = new_range.end;
Chunk {
buffer: self.buffer.clone(),
range: new_range,
recycler: self.recycler.clone(),
}
}
}
impl<T> Drop for Chunk<T> {
fn drop(&mut self) {
if let Some(recycler) = self.recycler.as_ref() {
if let Ok(buffer) = Arc::try_unwrap(take(&mut self.buffer)) {
recycler.send(buffer).ok();
}
}
}
}
impl<T> Deref for Chunk<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.buffer[self.range.clone()]
}
}
impl<T> From<Vec<T>> for Chunk<T> {
fn from(vec: Vec<T>) -> Self {
Chunk::new(vec, None)
}
}
impl<T> From<ChunkBuf<T>> for Chunk<T> {
fn from(chunk_buf: ChunkBuf<T>) -> Self {
chunk_buf.finalize()
}
}
#[derive(Debug)]
pub struct ChunkBuf<T> {
buffer: Vec<T>,
recycler: Option<mpsc::UnboundedSender<Vec<T>>>,
}
impl<T> ChunkBuf<T> {
fn new(buffer: Vec<T>, recycler: mpsc::UnboundedSender<Vec<T>>) -> Self {
ChunkBuf {
buffer,
recycler: Some(recycler),
}
}
pub fn finalize(mut self) -> Chunk<T> {
Chunk::new(take(&mut self.buffer), Some(self.recycler.take().unwrap()))
}
}
impl<T> Drop for ChunkBuf<T> {
fn drop(&mut self) {
if let Some(recycler) = self.recycler.take() {
recycler.send(take(&mut self.buffer)).ok();
}
}
}
impl<T> Deref for ChunkBuf<T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.buffer
}
}
impl<T> DerefMut for ChunkBuf<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.buffer
}
}
pub struct ChunkBufPool<T> {
recycler: mpsc::UnboundedSender<Vec<T>>,
dispenser: mpsc::UnboundedReceiver<Vec<T>>,
}
impl<T> ChunkBufPool<T> {
pub fn new() -> Self {
let (recycler, dispenser) = mpsc::unbounded_channel::<Vec<T>>();
Self {
recycler,
dispenser,
}
}
pub fn get(&mut self) -> ChunkBuf<T> {
let buffer = match self.dispenser.try_recv() {
Ok(mut buffer) => {
buffer.clear();
buffer
}
Err(_) => Vec::new(),
};
ChunkBuf::new(buffer, self.recycler.clone())
}
pub fn get_with_capacity(&mut self, capacity: usize) -> ChunkBuf<T> {
let buffer = match self.dispenser.try_recv() {
Ok(mut buffer) => {
buffer.clear();
buffer
}
Err(_) => Vec::with_capacity(capacity),
};
ChunkBuf::new(buffer, self.recycler.clone())
}
}