use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use bytes::Bytes;
use opool::Pool;
use opool::PoolAllocator;
use opool::RcGuard;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use crate::Result;
pub struct BufferAllocator;
impl PoolAllocator<Vec<u8>> for BufferAllocator {
fn allocate(&self) -> Vec<u8> {
Vec::default()
}
}
pub type BufferGuard = RcGuard<BufferAllocator, Vec<u8>>;
#[derive(Clone)]
pub struct BufferPool(Arc<Pool<BufferAllocator, Vec<u8>>>);
impl BufferPool {
pub fn new(size: usize) -> Self {
Self(Pool::new(size, BufferAllocator).into())
}
pub fn alloc(&self, size: usize) -> BufferGuard {
let mut buffer = self.0.clone().get_rc();
buffer.resize(size, 0);
buffer
}
pub async fn read_block(
&self,
path: &Path,
block_size: u64,
source_size: u64,
offset: &AtomicU64,
) -> Result<Block> {
let mut file = File::open(path).await?;
let mut buffer = self.alloc(block_size.try_into().expect("block size too large"));
let offset = match offset.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |offset| {
if offset >= source_size {
None
} else {
Some(offset + block_size)
}
}) {
Ok(offset) => offset,
Err(_) => panic!("more reads than there were blocks"),
};
assert_eq!(
offset % block_size,
0,
"expected offset to be a multiple of the block size"
);
let block_num = offset / block_size;
file.seek(std::io::SeekFrom::Start(offset)).await?;
let mut len = 0;
while len < buffer.len() {
match file.read(&mut buffer[len..]).await? {
0 => break,
n => len += n,
}
}
buffer.truncate(len);
assert!(!buffer.is_empty(), "an empty block was read");
Ok(Block::new(block_num, buffer))
}
}
pub struct Block {
num: u64,
buffer: BufferGuard,
}
impl Block {
fn new(num: u64, buffer: BufferGuard) -> Self {
Self { num, buffer }
}
pub fn num(&self) -> u64 {
self.num
}
pub fn into_bytes(self) -> Bytes {
Bytes::from_owner(self)
}
}
impl AsRef<[u8]> for Block {
fn as_ref(&self) -> &[u8] {
self.buffer.as_slice()
}
}
impl Deref for Block {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buffer.as_slice()
}
}