use crate::errors::AvroError;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use std::io::SeekFrom;
use std::ops::Range;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
pub trait AsyncFileReader: Send {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>>;
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}
}
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
self.as_mut().get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>> {
self.as_mut().get_byte_ranges(ranges)
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
async move {
self.seek(SeekFrom::Start(range.start)).await?;
let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read as usize);
let read = self.take(to_read).read_to_end(&mut buffer).await?;
if read as u64 != to_read {
return Err(AvroError::EOF(format!(
"expected to read {} bytes, got {}",
to_read, read
)));
}
Ok(buffer.into())
}
.boxed()
}
}