use std::fs::File;
use std::io::ErrorKind;
use std::ops::Range;
use std::sync::Arc;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use object_store::path::Path;
use super::object_reader::ObjectReader;
use crate::{Error, Result};
pub(crate) fn to_local_path(path: &Path) -> String {
if cfg!(windows) {
path.to_string()
} else {
format!("/{path}")
}
}
pub(super) fn remove_dir_all(path: &Path) -> Result<()> {
let local_path = to_local_path(path);
std::fs::remove_dir_all(local_path)?;
Ok(())
}
pub struct LocalObjectReader {
file: Arc<File>,
path: Path,
block_size: usize,
}
impl LocalObjectReader {
pub fn open(path: &Path, block_size: usize) -> Result<Box<dyn ObjectReader>> {
let local_path = to_local_path(path);
let file = File::open(local_path).map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::NotFound {
uri: path.to_string(),
},
_ => Error::IO {
message: e.to_string(),
},
})?;
Ok(Box::new(Self {
file: Arc::new(file),
block_size,
path: path.clone(),
}))
}
}
#[async_trait]
impl ObjectReader for LocalObjectReader {
fn path(&self) -> &Path {
&self.path
}
fn block_size(&self) -> usize {
self.block_size
}
async fn size(&self) -> Result<usize> {
Ok(self.file.metadata()?.len() as usize)
}
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
unsafe { buf.set_len(range.len()) };
#[cfg(unix)]
let bytes_read = file.read_at(buf.as_mut(), range.start as u64)?;
#[cfg(windows)]
let bytes_read = file.seek_read(buf.as_mut(), range.start as u64)?;
if bytes_read != range.len() {
return Err(Error::IO {
message: format!(
"failed to read all bytes from file: expected {}, got {}",
range.len(),
bytes_read
),
});
}
Ok(buf.freeze())
})
.await?
}
}