use std::fs::{self, File, Metadata};
use std::io;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
use crate::{FileMeta, ListEntry, Result, SizedFile};
use super::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
};
pub static LOCAL_SCHEME: &str = "file";
#[derive(Debug)]
pub struct LocalFileSystem;
#[async_trait]
impl ObjectStore for LocalFileSystem {
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
let prefix = if let Some((_scheme, path)) = prefix.split_once("://") {
path
} else {
prefix
};
list_all(prefix.to_owned()).await
}
async fn list_dir(
&self,
prefix: &str,
delimiter: Option<String>,
) -> Result<ListEntryStream> {
if let Some(d) = delimiter {
if d != "/" && d != "\\" {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("delimiter not supported on local filesystem: {}", d),
));
}
let mut entry_stream = tokio::fs::read_dir(prefix).await?;
let list_entries = stream::poll_fn(move |cx| {
entry_stream.poll_next_entry(cx).map(|res| match res {
Ok(Some(x)) => Some(Ok(x)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
})
.then(|entry| async {
let entry = entry?;
let entry = if entry.file_type().await?.is_dir() {
ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
} else {
ListEntry::FileMeta(get_meta(
path_as_str(&entry.path())?.to_string(),
entry.metadata().await?,
))
};
Ok(entry)
});
Ok(Box::pin(list_entries))
} else {
Ok(Box::pin(
self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
))
}
}
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(LocalFileReader::new(file)?))
}
}
pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
path.to_str().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Invalid path '{}'", path.display()),
)
})
}
struct LocalFileReader {
file: SizedFile,
}
impl LocalFileReader {
fn new(file: SizedFile) -> Result<Self> {
Ok(Self { file })
}
}
#[async_trait]
impl ObjectReader for LocalFileReader {
async fn chunk_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Box<dyn AsyncRead>> {
todo!(
"implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
)
}
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
let mut file = File::open(&self.file.path)?;
file.seek(SeekFrom::Start(start))?;
let file = BufReader::new(file.take(length as u64));
Ok(Box::new(file))
}
fn length(&self) -> u64 {
self.file.size
}
}
fn get_meta(path: String, metadata: Metadata) -> FileMeta {
FileMeta {
sized_file: SizedFile {
path,
size: metadata.len(),
},
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
}
}
async fn list_all(prefix: String) -> Result<FileMetaStream> {
async fn find_files_in_dir(
path: String,
to_visit: &mut Vec<String>,
) -> Result<Vec<FileMeta>> {
let mut dir = tokio::fs::read_dir(path).await?;
let mut files = Vec::new();
while let Some(child) = dir.next_entry().await? {
let child_path = path_as_str(&child.path())?.to_string();
let metadata = child.metadata().await?;
if metadata.is_dir() {
to_visit.push(child_path.to_string());
} else {
files.push(get_meta(child_path.to_owned(), metadata))
}
}
files.sort_by(|a, b| a.path().cmp(b.path()));
Ok(files)
}
let prefix_meta = tokio::fs::metadata(&prefix).await?;
let prefix = prefix.to_owned();
if prefix_meta.is_file() {
Ok(Box::pin(stream::once(async move {
Ok(get_meta(prefix, prefix_meta))
})))
} else {
let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
match to_visit.pop() {
None => None,
Some(path) => {
let file_stream = match find_files_in_dir(path, &mut to_visit).await {
Ok(files) => stream::iter(files).map(Ok).left_stream(),
Err(e) => stream::once(async { Err(e) }).right_stream(),
};
Some((file_stream, to_visit))
}
}
})
.flatten();
Ok(Box::pin(result))
}
}
pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f))))
}
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
LocalFileSystem
.file_reader(local_unpartitioned_file(file).sized_file)
.expect("File not found")
}
pub fn local_unpartitioned_file(file: String) -> FileMeta {
let metadata = fs::metadata(&file).expect("Local file metadata");
FileMeta {
sized_file: SizedFile {
size: metadata.len(),
path: file,
},
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
}
}
#[cfg(test)]
mod tests {
use crate::ListEntry;
use super::*;
use futures::StreamExt;
use std::collections::HashSet;
use std::fs::create_dir;
use std::fs::File;
use tempfile::tempdir;
#[tokio::test]
async fn test_recursive_listing() -> Result<()> {
let tmp = tempdir()?;
let x_path = tmp.path().join("x");
let y_path = tmp.path().join("y");
let a_path = tmp.path().join("a.txt");
let b_path = x_path.join("b.txt");
let c_path = y_path.join("c.txt");
create_dir(&x_path)?;
create_dir(&y_path)?;
File::create(&a_path)?;
File::create(&b_path)?;
File::create(&c_path)?;
let mut all_files = HashSet::new();
let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
while let Some(file) = files.next().await {
let file = file?;
assert_eq!(file.size(), 0);
all_files.insert(file.path().to_owned());
}
assert_eq!(all_files.len(), 3);
assert!(all_files.contains(a_path.to_str().unwrap()));
assert!(all_files.contains(b_path.to_str().unwrap()));
assert!(all_files.contains(c_path.to_str().unwrap()));
Ok(())
}
#[tokio::test]
async fn test_list_dir() -> Result<()> {
let tmp = tempdir()?;
let x_path = tmp.path().join("x");
let a_path = tmp.path().join("a.txt");
let b_path = x_path.join("b.txt");
create_dir(&x_path)?;
File::create(&a_path)?;
File::create(&b_path)?;
fn get_path(entry: ListEntry) -> String {
match entry {
ListEntry::FileMeta(f) => f.sized_file.path,
ListEntry::Prefix(path) => path,
}
}
async fn assert_equal_paths(
expected: Vec<&std::path::PathBuf>,
actual: ListEntryStream,
) -> Result<()> {
let expected: HashSet<String> = expected
.iter()
.map(|x| x.to_str().unwrap().to_string())
.collect();
let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
assert_eq!(expected, actual);
Ok(())
}
let files = LocalFileSystem
.list_dir(tmp.path().to_str().unwrap(), None)
.await?;
assert_equal_paths(vec![&a_path, &b_path], files).await?;
let files = LocalFileSystem
.list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
.await?;
assert_equal_paths(vec![&a_path, &x_path], files).await?;
Ok(())
}
#[tokio::test]
async fn test_list_all_sort_by_filename() -> Result<()> {
let tmp = tempdir()?;
let a_path = tmp.path().join("file_23590.parquet");
let b_path = tmp.path().join("file_13690.parquet");
let c_path = tmp.path().join("file_12590.parquet");
let d_path = tmp.path().join("file_03590.parquet");
File::create(&a_path)?;
File::create(&b_path)?;
File::create(&c_path)?;
File::create(&d_path)?;
let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
let mut list_files_name = Vec::new();
while let Some(file) = files.next().await {
let file = file?;
list_files_name.push(file.path().to_owned());
}
let sort_files_name = [
tmp.path()
.join("file_03590.parquet")
.to_str()
.unwrap()
.to_string(),
tmp.path()
.join("file_12590.parquet")
.to_str()
.unwrap()
.to_string(),
tmp.path()
.join("file_13690.parquet")
.to_str()
.unwrap()
.to_string(),
tmp.path()
.join("file_23590.parquet")
.to_str()
.unwrap()
.to_string(),
];
assert_eq!(list_files_name, sort_files_name);
Ok(())
}
}