use crate::{
buffer::Buffer,
pool::{
error::Error,
kitty_pool::{KittyPool, Token},
},
};
use async_trait::async_trait;
use futures::pending;
use getset::{Getters, MutGetters};
use std::{
cmp::min,
collections::{HashMap, LinkedList},
io::{Read, Seek, SeekFrom, Write},
ops::Range,
};
pub type Ranges = LinkedList<Range<u64>>;
#[derive(Debug, Getters, MutGetters)]
pub struct ContiguousKittyPool {
#[get = "pub"]
#[get_mut = "pub"]
ranges: Ranges,
#[get = "pub"]
#[get_mut = "pub"]
buffer: Box<Buffer>,
#[get = "pub"]
capacity: usize,
#[get = "pub"]
capacity_remaining: usize,
#[get = "pub"]
block_size: usize,
#[get = "pub"]
#[get_mut = "pub"]
range_map: HashMap<Token, Ranges>,
}
impl ContiguousKittyPool {
pub fn new(buffer_size: usize, block_size: usize) -> Self {
let buffer = Buffer::new(buffer_size);
let size = buffer_size / block_size;
let ranges: Ranges = (0..size)
.collect::<Vec<_>>()
.into_iter()
.map(|i| {
let begin = i as u64 * block_size as u64;
Range {
start: begin,
end: begin + block_size as u64,
}
})
.collect();
Self {
ranges,
buffer: Box::new(buffer),
capacity: size * block_size,
capacity_remaining: size * block_size,
block_size,
range_map: HashMap::new(),
}
}
}
#[async_trait]
impl KittyPool for ContiguousKittyPool {
async fn borrow(&mut self, mut requested_size: usize) -> Result<Token, Error> {
if requested_size == 0 {
return Err(Error::new(
"0 is an invalid request size to borrow from pool!".to_string(),
));
}
let ranges = &mut self.ranges;
if requested_size > self.capacity {
return Err(Error::new(format!(
"{} is too large to borrow from buffer with capacity {}!",
requested_size, self.capacity
)));
}
if requested_size > self.capacity_remaining {
pending!();
}
let mut result = Ranges::new();
let token = Token::new_v4();
for _ in 0..ranges.len() {
if requested_size > 0 {
let range = self.ranges.pop_front().unwrap();
result.push_back(range);
self.capacity_remaining -= self.block_size;
requested_size -= min(self.block_size, requested_size);
}
}
self.range_map.insert(token, result);
Ok(token)
}
fn release(&mut self, token: &Token) -> Result<(), Error> {
if !self.range_map.contains_key(token) {
return Err(Error::new(format!(
"Unrecognized token {} used for release operation on pool!",
token
)));
}
let mut ranges = self.range_map.remove(token).unwrap();
self.capacity_remaining += self.block_size * ranges.len();
self.ranges.append(&mut ranges);
Ok(())
}
async fn read(&mut self, token: &Token, data_to_read: &mut [u8]) -> Result<Token, Error> {
if !self.range_map.contains_key(token) {
return Err(Error::new(format!(
"Unrecognized token {} used for read operation on pool!",
token
)));
}
let ranges = self.range_map.get(token).unwrap();
let mut begin = 0;
let mut end = begin + self.block_size;
let mut result = Ok(*token);
if (ranges.len() * self.block_size) > data_to_read.len() {
return Err(Error::new(format!(
"Unable to read data into data_to_read of size {}, data_to_read should be at least {}",
data_to_read.len(),
(ranges.len() * self.block_size)
)));
}
for range in ranges {
let seek_result = self.buffer.seek(std::io::SeekFrom::Start(range.start));
if let Err(seek_err) = seek_result {
result = Err(Error::from(seek_err));
}
let write_result = self.buffer.read(&mut data_to_read[begin..end]);
if let Err(write_err) = write_result {
result = Err(Error::from(write_err));
}
begin = end;
end += self.block_size;
}
result
}
async fn write(&mut self, token: &Token, data_to_write: &[u8]) -> Result<Token, Error> {
if !self.range_map.contains_key(token) {
return Err(Error::new(format!(
"Unrecognized token {} used for write operation on pool!",
token
)));
}
let ranges = self.range_map.get(token).unwrap();
let mut begin = 0;
let mut end = begin + self.block_size;
let mut result = Ok(*token);
if (ranges.len() * self.block_size) < data_to_write.len() {
return Err(Error::new(format!(
"Unable to write data into inner buffer with data_to_write of size {}, data_to_write should be at most {}",
data_to_write.len(),
(ranges.len() * self.block_size)
)));
}
for range in ranges {
end = min(end, data_to_write.len());
let seek_result = self.buffer.seek(SeekFrom::Start(range.start));
if let Err(seek_err) = seek_result {
result = Err(Error::from(seek_err));
}
let write_result = self.buffer.write(&data_to_write[begin..end]);
if let Err(write_err) = write_result {
result = Err(Error::from(write_err));
}
begin = end;
end += self.block_size;
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::block_on, future::maybe_done, pin_mut};
const POOL_SIZE: usize = 1024;
const BLOCK_SIZE: usize = 64;
const BLOCKS: usize = 16;
const ALIGNED_BORROW_SIZE: usize = 512;
const UNALIGNED_BORROW_SIZE: usize = 513;
const TOO_LARGE_BORROW_SIZE: usize = 2048;
const ZERO_BORROW_SIZE: usize = 0;
#[test]
fn test_new() {
{
let pool = ContiguousKittyPool::new(BLOCK_SIZE, POOL_SIZE);
assert_eq!(*pool.capacity(), 0);
assert_eq!(*pool.capacity_remaining(), 0);
assert_eq!(pool.ranges().len(), 0);
assert!(pool.range_map().is_empty());
}
{
let pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
assert_eq!(*pool.capacity(), POOL_SIZE);
assert_eq!(*pool.capacity_remaining(), POOL_SIZE);
assert_eq!(pool.ranges().len(), BLOCKS);
assert!(pool.range_map().is_empty());
}
}
#[test]
fn test_debug() {
let pool = ContiguousKittyPool::new(2, 1);
assert_eq!(format!("{:?}", pool), "ContiguousKittyPool { ranges: [0..1, 1..2], buffer: Buffer { inner: [0, 0], sentinel: 0, capacity: 2 }, capacity: 2, capacity_remaining: 2, block_size: 1, range_map: {} }");
}
#[test]
fn test_borrow() {
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ZERO_BORROW_SIZE));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(
format!("{}", err),
"Error: 0 is an invalid request size to borrow from pool!"
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(TOO_LARGE_BORROW_SIZE));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(
format!("{}", err),
"Error: 2048 is too large to borrow from buffer with capacity 1024!"
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(UNALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(
pool.capacity_remaining,
POOL_SIZE - ALIGNED_BORROW_SIZE - BLOCK_SIZE
);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 9);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let is_done = maybe_done(pool.borrow(UNALIGNED_BORROW_SIZE));
pin_mut!(is_done);
assert!(is_done.as_mut().take_output().is_none());
}
}
#[test]
fn test_release() {
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let token = Token::new_v4();
let result = pool.release(&token);
assert!(result.is_err());
let err = result.err();
assert!(err.is_some());
assert_eq!(
format!("{}", err.unwrap()),
format!(
"Error: Unrecognized token {} used for release operation on pool!",
token
)
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
let result = pool.release(&token);
assert!(result.is_ok());
assert_eq!(pool.capacity_remaining, POOL_SIZE);
assert!(pool.range_map.get(&token).is_none());
}
}
#[test]
fn test_read() {
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let bad_token = Token::new_v4();
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.read(&bad_token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unrecognized token {} used for read operation on pool!",
bad_token
)
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.read(&token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unable to read data into data_to_read of size {}, data_to_read should be at least {}",
dummy_data.len(),
ALIGNED_BORROW_SIZE
)
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
let mut data: [u8; ALIGNED_BORROW_SIZE] = [0; ALIGNED_BORROW_SIZE];
let result = block_on(pool.read(&token, &mut data));
assert!(result.is_ok());
}
}
#[test]
fn test_write() {
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let bad_token = Token::new_v4();
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.write(&bad_token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unrecognized token {} used for write operation on pool!",
bad_token
)
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
let mut dummy_data: [u8; UNALIGNED_BORROW_SIZE] = [1; UNALIGNED_BORROW_SIZE];
let result = block_on(pool.write(&token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unable to write data into inner buffer with data_to_write of size {}, data_to_write should be at most {}",
dummy_data.len(),
ALIGNED_BORROW_SIZE
)
);
}
{
let mut pool = ContiguousKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.range_map.get(&token).is_some());
assert_eq!(pool.range_map.get(&token).unwrap().len(), 8);
let mut data: [u8; ALIGNED_BORROW_SIZE] = [0; ALIGNED_BORROW_SIZE];
let result = block_on(pool.write(&token, &mut data));
assert!(result.is_ok());
}
}
}