pub mod metadata;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use bytes::{Buf, Bytes};
#[allow(clippy::len_without_is_empty)]
pub trait ChunkReader {
type T: Read;
fn len(&self) -> u64;
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>;
fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Bytes> {
let mut bytes = vec![0; length as usize];
self.get_read(offset_from_start)?
.take(length)
.read_exact(&mut bytes)?;
Ok(bytes.into())
}
}
impl ChunkReader for File {
type T = BufReader<File>;
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(offset_from_start))?;
Ok(BufReader::new(self.try_clone()?))
}
}
impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;
fn len(&self) -> u64 {
self.len() as u64
}
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
Ok(self.slice(offset_from_start as usize..).reader())
}
}
#[cfg(feature = "async")]
mod async_chunk_reader {
use super::*;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
#[allow(clippy::len_without_is_empty)]
pub trait AsyncChunkReader: Send {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>>;
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>>;
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncChunkReader for T {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { self.seek(SeekFrom::End(0)).await }.boxed()
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(offset_from_start)).await?;
let mut buffer = vec![0; length as usize];
self.read_exact(&mut buffer).await?;
Ok(buffer.into())
}
.boxed()
}
}
impl AsyncChunkReader for Box<dyn AsyncChunkReader> {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
self.as_mut().len()
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
self.as_mut().get_bytes(offset_from_start, length)
}
}
}
#[cfg(feature = "async")]
pub use async_chunk_reader::AsyncChunkReader;
#[cfg(all(feature = "async", feature = "opendal"))]
mod async_opendal_reader {
use crate::reader::AsyncChunkReader;
use bytes::Bytes;
use futures_util::future::BoxFuture;
use opendal::Operator;
use std::sync::Arc;
pub struct AsyncOpendalReader {
op: Operator,
path: Arc<String>,
}
impl AsyncOpendalReader {
pub fn new(op: Operator, path: &str) -> Self {
Self {
op,
path: Arc::new(path.to_string()),
}
}
}
impl AsyncChunkReader for AsyncOpendalReader {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
let path = self.path.clone();
Box::pin(async move {
let meta = self.op.stat(&path).await?;
Ok(meta.content_length())
})
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
let path = self.path.clone();
Box::pin(async move {
let reader = self
.op
.read_with(&path)
.range(offset_from_start..offset_from_start + length)
.await?;
Ok(reader.to_bytes())
})
}
}
}
#[cfg(all(feature = "async", feature = "opendal"))]
pub use async_opendal_reader::AsyncOpendalReader;