use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufs};
use std::num::NonZeroUsize;
pub struct Read<B: Blob> {
blob: B,
buffer: IoBuf,
blob_position: u64,
blob_size: u64,
buffer_position: usize,
buffer_valid_len: usize,
buffer_size: usize,
pool: BufferPool,
}
impl<B: Blob> Read<B> {
pub fn new(blob: B, blob_size: u64, buffer_size: NonZeroUsize, pool: BufferPool) -> Self {
Self {
blob,
buffer: pool.alloc(buffer_size.get()).freeze(),
blob_position: 0,
blob_size,
buffer_position: 0,
buffer_valid_len: 0,
buffer_size: buffer_size.get(),
pool,
}
}
pub fn from_pooler(
pooler: &impl BufferPooler,
blob: B,
blob_size: u64,
buffer_size: NonZeroUsize,
) -> Self {
Self::new(
blob,
blob_size,
buffer_size,
pooler.storage_buffer_pool().clone(),
)
}
pub const fn buffer_remaining(&self) -> usize {
self.buffer_valid_len - self.buffer_position
}
pub const fn blob_remaining(&self) -> u64 {
self.blob_size
.saturating_sub(self.blob_position + self.buffer_position as u64)
}
pub const fn blob_size(&self) -> u64 {
self.blob_size
}
async fn refill(&mut self) -> Result<usize, Error> {
self.blob_position += self.buffer_position as u64;
self.buffer_position = 0;
self.buffer_valid_len = 0;
let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
if blob_remaining == 0 {
return Err(Error::BlobInsufficientLength);
}
let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
let current = std::mem::take(&mut self.buffer);
let buf = match current.try_into_mut() {
Ok(mut reusable) if reusable.capacity() >= bytes_to_read => {
reusable.clear();
reusable
}
Ok(_) | Err(_) => self.pool.alloc(bytes_to_read),
};
let read_result = self
.blob
.read_at_buf(self.blob_position, bytes_to_read, buf)
.await?;
self.buffer = read_result.coalesce_with_pool(&self.pool).freeze();
self.buffer_valid_len = self.buffer.len();
Ok(self.buffer_valid_len)
}
pub async fn read(&mut self, len: usize) -> Result<IoBufs, Error> {
if len == 0 {
return Ok(IoBufs::default());
}
if self.blob_remaining() < len as u64 {
return Err(Error::BlobInsufficientLength);
}
let mut remaining = len;
let mut out = IoBufs::default();
while remaining > 0 {
if self.buffer_position >= self.buffer_valid_len {
self.refill().await?;
}
let bytes_to_take = std::cmp::min(remaining, self.buffer_remaining());
out.append(
self.buffer
.slice(self.buffer_position..(self.buffer_position + bytes_to_take)),
);
self.buffer_position += bytes_to_take;
remaining -= bytes_to_take;
}
Ok(out)
}
pub const fn position(&self) -> u64 {
self.blob_position + self.buffer_position as u64
}
pub const fn seek_to(&mut self, position: u64) -> Result<(), Error> {
if position > self.blob_size {
return Err(Error::BlobInsufficientLength);
}
let buffer_start = self.blob_position;
let buffer_end = self.blob_position + self.buffer_valid_len as u64;
if position >= buffer_start && position < buffer_end {
self.buffer_position = (position - self.blob_position) as usize;
} else {
self.blob_position = position;
self.buffer_position = 0;
self.buffer_valid_len = 0;
}
Ok(())
}
pub async fn resize(self, len: u64) -> Result<(), Error> {
self.blob.resize(len).await?;
self.blob.sync().await
}
}