use crate::{
buffer::Buffer,
pool::{
error::Error,
kitty_pool::{KittyPool, Token},
},
};
use async_trait::async_trait;
use futures::pending;
use getset::Getters;
use std::{
collections::HashMap,
io::{Read, Seek, SeekFrom, Write},
};
#[derive(Debug)]
pub struct SGL {
inner: Vec<Buffer>,
write_index: usize,
read_index: usize,
block_size: usize,
}
impl SGL {
fn new(inner: Vec<Buffer>, block_size: usize) -> Self {
Self {
inner,
write_index: 0,
read_index: 0,
block_size,
}
}
fn len(&self) -> usize {
self.inner.len()
}
fn write(&mut self, data_to_write: &[u8]) -> Result<(), Error> {
if data_to_write.len() > self.block_size * self.inner.len() {
return Err(Error::new(format!(
"Unable to write data into inner buffer with data_to_write of size {}, data_to_write should be at most {}",
data_to_write.len(),
(self.inner.len() * self.block_size)
)));
}
let write_index = self.write_index.clone() / self.block_size;
let mut data_index = 0;
for buffer in &mut self.inner[write_index..] {
let buffer_write_index = self.write_index % buffer.len();
buffer.seek(SeekFrom::Start(buffer_write_index as u64))?;
let res = buffer.write(&data_to_write[data_index..])?;
data_index += res;
self.write_index += buffer.len();
if res == 0 {
break;
}
}
Ok(())
}
fn read(&mut self, data_to_read: &mut [u8]) -> Result<(), Error> {
if data_to_read.len() < self.block_size * self.inner.len() {
return Err(Error::new(format!(
"Unable to read data into data_to_read of size {}, data_to_read should be at least {}",
data_to_read.len(),
(self.inner.len() * self.block_size)
)));
}
let read_index = self.read_index.clone() / self.block_size;
let mut data_index = 0;
for buffer in &mut self.inner[read_index..] {
let buffer_read_index = self.read_index % buffer.len();
buffer.seek(SeekFrom::Start(buffer_read_index as u64))?;
let res = buffer.read(&mut data_to_read[data_index..])?;
data_index += res;
self.read_index += buffer.len();
if res == 0 {
break;
}
}
Ok(())
}
}
#[derive(Debug, Getters)]
pub struct SGLKittyPool {
#[get = "pub"]
block_size: usize,
#[get = "pub"]
capacity: usize,
#[get = "pub"]
capacity_remaining: usize,
#[get = "pub"]
free_buffers: Vec<Buffer>,
#[get = "pub"]
used_buffers: HashMap<Token, SGL>,
}
impl SGLKittyPool {
pub fn new(capacity: usize, block_size: usize) -> Self {
let size = capacity / block_size;
let free_buffers = vec![Buffer::new(block_size); size];
Self {
block_size: block_size,
capacity: size * block_size,
capacity_remaining: size * block_size,
free_buffers: free_buffers,
used_buffers: HashMap::new(),
}
}
}
#[async_trait]
impl KittyPool for SGLKittyPool {
async fn borrow(&mut self, requested_size: usize) -> Result<Token, Error> {
if requested_size == 0 {
return Err(Error::new(
"0 is an invalid request size to borrow from pool!".to_string(),
));
}
if requested_size > self.capacity {
return Err(Error::new(format!(
"{} is too large to borrow from buffer with capacity {}!",
requested_size, self.capacity
)));
}
if requested_size > self.capacity_remaining {
pending!();
}
let token = Token::new_v4();
let blocks_needed = (requested_size / self.block_size) + (requested_size % self.block_size);
let blocks = self
.free_buffers
.split_off(self.free_buffers.len() - blocks_needed);
let sgl = SGL::new(blocks, self.block_size);
self.capacity_remaining -= blocks_needed * self.block_size;
self.used_buffers.insert(token, sgl);
Ok(token)
}
fn release(&mut self, token: &Token) -> Result<(), Error> {
let res = self.used_buffers.remove(token);
match res {
Some(mut sgl) => {
self.capacity_remaining += sgl.len() * self.block_size;
self.free_buffers.append(&mut sgl.inner);
Ok(())
}
None => Err(Error::new(format!(
"Unrecognized token {} used for release operation on pool!",
token
))),
}
}
async fn read(&mut self, token: &Token, data_to_read: &mut [u8]) -> Result<Token, Error> {
if !self.used_buffers.contains_key(token) {
return Err(Error::new(format!(
"Unrecognized token {} used for read operation on pool!",
token
)));
}
let sgl = self.used_buffers.get_mut(token).unwrap();
if (sgl.len() * self.block_size) > data_to_read.len() {
return Err(Error::new(format!(
"Unable to read data into data_to_read of size {}, data_to_read should be at least {}",
data_to_read.len(),
(sgl.len() * self.block_size)
)));
}
sgl.read(data_to_read)?;
Ok(*token)
}
async fn write(&mut self, token: &Token, data_to_write: &[u8]) -> Result<Token, Error> {
if !self.used_buffers.contains_key(token) {
return Err(Error::new(format!(
"Unrecognized token {} used for write operation on pool!",
token
)));
}
let sgl = self.used_buffers.get_mut(token).unwrap();
if (sgl.len() * self.block_size) < data_to_write.len() {
return Err(Error::new(format!(
"Unable to write data into inner buffer with data_to_write of size {}, data_to_write should be at most {}",
data_to_write.len(),
(sgl.len() * self.block_size)
)));
}
sgl.write(data_to_write)?;
Ok(*token)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::block_on, future::maybe_done, pin_mut};
const POOL_SIZE: usize = 1024;
const BLOCK_SIZE: usize = 64;
const BLOCKS: usize = 16;
const ALIGNED_BORROW_SIZE: usize = 512;
const UNALIGNED_BORROW_SIZE: usize = 513;
const TOO_LARGE_BORROW_SIZE: usize = 2048;
const ZERO_BORROW_SIZE: usize = 0;
#[test]
fn test_sgl_new() {
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let sgl = SGL::new(buffers.clone(), 2);
assert_eq!(sgl.len(), 1);
assert_eq!(sgl.block_size, 2);
assert_eq!(sgl.inner, buffers.clone());
assert_eq!(sgl.read_index, 0);
assert_eq!(sgl.write_index, 0);
}
#[test]
fn test_sgl_debug() {
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let sgl = SGL::new(buffers.clone(), 2);
assert_eq!(format!("{:?}", sgl), "SGL { inner: [Buffer { inner: [0, 0], sentinel: 0, capacity: 2 }], write_index: 0, read_index: 0, block_size: 2 }");
}
#[test]
fn test_sgl_write() {
{
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let mut sgl = SGL::new(buffers.clone(), 2);
let res = sgl.write(&[1, 1, 1]);
assert!(res.is_err());
assert_eq!(format!("{}", res.unwrap_err()), "Error: Unable to write data into inner buffer with data_to_write of size 3, data_to_write should be at most 2");
}
{
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let mut sgl = SGL::new(buffers.clone(), 2);
let res = sgl.write(&[1, 1]);
assert!(res.is_ok());
}
}
#[test]
fn test_sgl_read() {
{
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let mut sgl = SGL::new(buffers.clone(), 2);
let res = sgl.read(&mut [0]);
assert!(res.is_err());
assert_eq!(format!("{}", res.unwrap_err()), "Error: Unable to read data into data_to_read of size 1, data_to_read should be at least 2");
}
{
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let mut sgl = SGL::new(buffers.clone(), 2);
let res = sgl.write(&[1, 1]);
assert!(res.is_ok());
let res = sgl.read(&mut [0]);
assert!(res.is_err());
assert_eq!(format!("{}", res.unwrap_err()), "Error: Unable to read data into data_to_read of size 1, data_to_read should be at least 2");
}
{
let buffer = Buffer::new(2);
let buffers = vec![buffer];
let mut sgl = SGL::new(buffers.clone(), 2);
let res = sgl.write(&[1, 1]);
assert!(res.is_ok());
let mut data = [0, 0];
let res = sgl.read(&mut data);
assert!(res.is_ok());
assert_eq!(data, [1, 1]);
}
}
#[test]
fn test_pool_new() {
{
let pool = SGLKittyPool::new(BLOCK_SIZE, POOL_SIZE);
assert_eq!(*pool.capacity(), 0);
assert_eq!(*pool.capacity_remaining(), 0);
assert_eq!(pool.free_buffers().len(), 0);
assert!(pool.used_buffers().is_empty());
}
{
let pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
assert_eq!(*pool.capacity(), POOL_SIZE);
assert_eq!(*pool.capacity_remaining(), POOL_SIZE);
assert_eq!(pool.free_buffers().len(), BLOCKS);
assert!(pool.used_buffers().is_empty());
}
}
#[test]
fn test_pool_debug() {
let pool = SGLKittyPool::new(2, 1);
assert_eq!(format!("{:?}", pool), "SGLKittyPool { block_size: 1, capacity: 2, capacity_remaining: 2, free_buffers: [Buffer { inner: [0], sentinel: 0, capacity: 1 }, Buffer { inner: [0], sentinel: 0, capacity: 1 }], used_buffers: {} }");
}
#[test]
fn test_pool_borrow() {
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ZERO_BORROW_SIZE));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(
format!("{}", err),
"Error: 0 is an invalid request size to borrow from pool!"
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(TOO_LARGE_BORROW_SIZE));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(
format!("{}", err),
"Error: 2048 is too large to borrow from buffer with capacity 1024!"
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
let sgl = pool.used_buffers.get(&token);
assert!(sgl.is_some());
assert_eq!(sgl.unwrap().len(), 8);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(UNALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(
pool.capacity_remaining,
POOL_SIZE - ALIGNED_BORROW_SIZE - BLOCK_SIZE
);
let sgl = pool.used_buffers.get(&token);
assert!(sgl.is_some());
assert_eq!(sgl.unwrap().len(), 9);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let is_done = maybe_done(pool.borrow(UNALIGNED_BORROW_SIZE));
pin_mut!(is_done);
assert!(is_done.as_mut().take_output().is_none());
}
}
#[test]
fn test_pool_release() {
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let token = Token::new_v4();
let result = pool.release(&token);
assert!(result.is_err());
let err = result.err();
assert!(err.is_some());
assert_eq!(
format!("{}", err.unwrap()),
format!(
"Error: Unrecognized token {} used for release operation on pool!",
token
)
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.used_buffers.get(&token).is_some());
assert_eq!(pool.used_buffers.get(&token).unwrap().len(), 8);
let result = pool.release(&token);
assert!(result.is_ok());
assert_eq!(pool.capacity_remaining, POOL_SIZE);
assert!(pool.used_buffers.get(&token).is_none());
}
}
#[test]
fn test_pool_read() {
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let bad_token = Token::new_v4();
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.read(&bad_token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unrecognized token {} used for read operation on pool!",
bad_token
)
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.used_buffers.get(&token).is_some());
assert_eq!(pool.used_buffers.get(&token).unwrap().len(), 8);
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.read(&token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unable to read data into data_to_read of size {}, data_to_read should be at least {}",
dummy_data.len(),
ALIGNED_BORROW_SIZE
)
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.used_buffers.get(&token).is_some());
assert_eq!(pool.used_buffers.get(&token).unwrap().len(), 8);
let mut data: [u8; ALIGNED_BORROW_SIZE] = [0; ALIGNED_BORROW_SIZE];
let result = block_on(pool.read(&token, &mut data));
assert!(result.is_ok());
}
}
#[test]
fn test_pool_write() {
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let bad_token = Token::new_v4();
let mut dummy_data: [u8; 128] = [0; 128];
let result = block_on(pool.write(&bad_token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unrecognized token {} used for write operation on pool!",
bad_token
)
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.used_buffers.get(&token).is_some());
assert_eq!(pool.used_buffers.get(&token).unwrap().len(), 8);
let mut dummy_data: [u8; UNALIGNED_BORROW_SIZE] = [1; UNALIGNED_BORROW_SIZE];
let result = block_on(pool.write(&token, &mut dummy_data));
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()),
format!(
"Error: Unable to write data into inner buffer with data_to_write of size {}, data_to_write should be at most {}",
dummy_data.len(),
ALIGNED_BORROW_SIZE
)
);
}
{
let mut pool = SGLKittyPool::new(POOL_SIZE, BLOCK_SIZE);
let result = block_on(pool.borrow(ALIGNED_BORROW_SIZE));
assert!(result.is_ok());
let token = result.unwrap();
assert_eq!(pool.capacity_remaining, POOL_SIZE - ALIGNED_BORROW_SIZE);
assert!(pool.used_buffers.get(&token).is_some());
assert_eq!(pool.used_buffers.get(&token).unwrap().len(), 8);
let mut data: [u8; ALIGNED_BORROW_SIZE] = [0; ALIGNED_BORROW_SIZE];
let result = block_on(pool.write(&token, &mut data));
assert!(result.is_ok());
}
}
}