use crate::error::*;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::SystemTime;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use opendal::raw::normalize_root;
use opendal::Operator;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use url::Url;
use super::Storage;
#[derive(Clone, Debug)]
pub struct FileIO {
storage: Arc<Storage>,
}
impl FileIO {
pub fn from_url(path: &str) -> crate::Result<FileIOBuilder> {
let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
message: format!("Invalid URL: {path}"),
})?;
Ok(FileIOBuilder::new(url.scheme()))
}
pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
let path = path.as_ref();
let url = if looks_like_windows_drive_path(path) {
Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
message: format!("Input {path} is neither a valid url nor path"),
})?
} else {
Url::parse(path)
.map_err(|_| Error::ConfigInvalid {
message: format!("Invalid URL: {path}"),
})
.or_else(|_| {
Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
message: format!("Input {path} is neither a valid url nor path"),
})
})?
};
Ok(FileIOBuilder::new(url.scheme()))
}
pub fn new_input(&self, path: &str) -> crate::Result<InputFile> {
let (op, relative_path) = self.storage.create(path)?;
let path = path.to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(InputFile {
op,
path,
relative_path_pos,
})
}
pub fn new_output(&self, path: &str) -> Result<OutputFile> {
let (op, relative_path) = self.storage.create(path)?;
let path = path.to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(OutputFile {
op,
path,
relative_path_pos,
})
}
pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
let (op, relative_path) = self.storage.create(path)?;
let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to get file status for '{path}'"),
})?;
Ok(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
path: path.to_string(),
})
}
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
let (op, relative_path) = self.storage.create(path)?;
let base_path = &path[..path.len() - relative_path.len()];
let list_path = normalize_root(relative_path);
let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to list files in '{path}'"),
})?;
let mut statuses = Vec::new();
let list_path_normalized = list_path.trim_start_matches('/');
for entry in entries {
let entry_path = entry.path();
if entry_path.trim_start_matches('/') == list_path_normalized {
continue;
}
let meta = entry.metadata();
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
path: format!("{base_path}{entry_path}"),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
});
}
Ok(statuses)
}
pub async fn exists(&self, path: &str) -> Result<bool> {
let (op, relative_path) = self.storage.create(path)?;
op.exists(relative_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to check existence of '{path}'"),
})
}
pub async fn delete_file(&self, path: &str) -> Result<()> {
let (op, relative_path) = self.storage.create(path)?;
op.delete(relative_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to delete file '{path}'"),
})?;
Ok(())
}
pub async fn delete_dir(&self, path: &str) -> Result<()> {
let (op, relative_path) = self.storage.create(path)?;
op.remove_all(relative_path)
.await
.context(IoUnexpectedSnafu {
message: format!("Failed to delete directory '{path}'"),
})?;
Ok(())
}
pub async fn mkdirs(&self, path: &str) -> Result<()> {
let (op, relative_path) = self.storage.create(path)?;
let dir_path = normalize_root(relative_path);
op.create_dir(&dir_path).await.context(IoUnexpectedSnafu {
message: format!("Failed to create directory '{path}'"),
})?;
Ok(())
}
pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
let (op_src, relative_path_src) = self.storage.create(src)?;
let (_, relative_path_dst) = self.storage.create(dst)?;
op_src
.rename(relative_path_src, relative_path_dst)
.await
.context(IoUnexpectedSnafu {
message: format!("Failed to rename '{src}' to '{dst}'"),
})?;
Ok(())
}
}
fn looks_like_windows_drive_path(path: &str) -> bool {
let bytes = path.as_bytes();
bytes.len() >= 3
&& bytes[0].is_ascii_alphabetic()
&& bytes[1] == b':'
&& matches!(bytes[2], b'\\' | b'/')
}
#[derive(Debug)]
pub struct FileIOBuilder {
scheme_str: Option<String>,
props: HashMap<String, String>,
}
impl FileIOBuilder {
pub fn new(scheme_str: impl ToString) -> Self {
Self {
scheme_str: Some(scheme_str.to_string()),
props: HashMap::default(),
}
}
pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
(self.scheme_str.unwrap_or_default(), self.props)
}
pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
self.props.insert(key.to_string(), value.to_string());
self
}
pub fn with_props(
mut self,
args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
) -> Self {
self.props
.extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
self
}
pub fn build(self) -> crate::Result<FileIO> {
let storage = Storage::build(self)?;
Ok(FileIO {
storage: Arc::new(storage),
})
}
}
#[async_trait::async_trait]
pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}
#[async_trait::async_trait]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
}
}
#[async_trait::async_trait]
pub trait FileWrite: Send + Unpin + 'static {
async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
async fn close(&mut self) -> crate::Result<()>;
}
#[async_trait::async_trait]
impl FileWrite for opendal::Writer {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
}
async fn close(&mut self) -> crate::Result<()> {
opendal::Writer::close(self).await?;
Ok(())
}
}
pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {}
impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWrite for T {}
#[derive(Clone, Debug)]
pub struct FileStatus {
pub size: u64,
pub is_dir: bool,
pub path: String,
pub last_modified: Option<DateTime<Utc>>,
}
#[derive(Debug)]
pub struct InputFile {
op: Operator,
path: String,
relative_path_pos: usize,
}
impl InputFile {
pub fn location(&self) -> &str {
&self.path
}
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
pub async fn metadata(&self) -> crate::Result<FileStatus> {
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
Ok(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
path: self.path.clone(),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
})
}
pub async fn read(&self) -> crate::Result<Bytes> {
Ok(self
.op
.read(&self.path[self.relative_path_pos..])
.await?
.to_bytes())
}
pub async fn reader(&self) -> crate::Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}
#[derive(Debug, Clone)]
pub struct OutputFile {
op: Operator,
path: String,
relative_path_pos: usize,
}
impl OutputFile {
pub fn location(&self) -> &str {
&self.path
}
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
pub fn to_input_file(self) -> InputFile {
InputFile {
op: self.op,
path: self.path,
relative_path_pos: self.relative_path_pos,
}
}
pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
let mut writer = self.writer().await?;
writer.write(bs).await?;
writer.close().await
}
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
Ok(Box::new(self.opendal_writer().await?))
}
pub(crate) async fn async_writer(&self) -> crate::Result<Box<dyn AsyncFileWrite>> {
Ok(Box::new(
self.opendal_writer()
.await?
.into_futures_async_write()
.compat_write(),
))
}
async fn opendal_writer(&self) -> crate::Result<opendal::Writer> {
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
}
}
#[cfg(test)]
mod file_action_test {
use std::collections::BTreeSet;
use std::fs;
use tempfile::tempdir;
use super::*;
use bytes::Bytes;
fn setup_memory_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}
fn setup_fs_file_io() -> FileIO {
FileIOBuilder::new("file").build().unwrap()
}
fn local_file_path(path: &std::path::Path) -> String {
let normalized = path.to_string_lossy().replace('\\', "/");
if normalized.starts_with('/') {
format!("file:{normalized}")
} else {
format!("file:/{normalized}")
}
}
async fn common_test_get_status(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let status = file_io.get_status(path).await.unwrap();
assert_eq!(status.size, 11);
file_io.delete_file(path).await.unwrap();
}
async fn common_test_exists(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let exists = file_io.exists(path).await.unwrap();
assert!(exists);
file_io.delete_file(path).await.unwrap();
}
async fn common_test_delete_file(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
file_io.delete_file(path).await.unwrap();
let exists = file_io.exists(path).await.unwrap();
assert!(!exists);
}
async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) {
file_io.mkdirs(dir_path).await.unwrap();
let exists = file_io.exists(dir_path).await.unwrap();
assert!(exists);
let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap());
}
async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) {
let output = file_io.new_output(src).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
file_io.rename(src, dst).await.unwrap();
let exists_old = file_io.exists(src).await.unwrap();
let exists_new = file_io.exists(dst).await.unwrap();
assert!(!exists_old);
assert!(exists_new);
file_io.delete_file(dst).await.unwrap();
}
async fn common_test_list_status_paths(file_io: &FileIO, dir_path: &str) {
if let Some(local_dir) = dir_path.strip_prefix("file:/") {
let _ = fs::remove_dir_all(local_dir);
}
file_io.mkdirs(dir_path).await.unwrap();
let file_a = format!("{dir_path}a.txt");
let file_b = format!("{dir_path}b.txt");
for file in [&file_a, &file_b] {
file_io
.new_output(file)
.unwrap()
.write(Bytes::from("test data"))
.await
.unwrap();
}
let statuses = file_io.list_status(dir_path).await.unwrap();
assert_eq!(statuses.len(), 2);
let expected_paths: BTreeSet<String> =
[file_a.clone(), file_b.clone()].into_iter().collect();
let actual_paths: BTreeSet<String> =
statuses.iter().map(|status| status.path.clone()).collect();
assert_eq!(
actual_paths, expected_paths,
"list_status should return exact entry paths"
);
file_io.delete_dir(dir_path).await.unwrap();
}
#[tokio::test]
async fn test_delete_file_memory() {
let file_io = setup_memory_file_io();
common_test_delete_file(&file_io, "memory:/test_file_delete_mem").await;
}
#[tokio::test]
async fn test_empty_path_should_return_error_for_exists_fs() {
let file_io = setup_fs_file_io();
let result = file_io.exists("").await;
assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
}
#[tokio::test]
async fn test_empty_path_should_return_error_for_exists_memory() {
let file_io = setup_memory_file_io();
let result = file_io.exists("").await;
assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
}
#[tokio::test]
async fn test_memory_operator_reuse_across_file_io_calls() {
let file_io = setup_memory_file_io();
let path = "memory:/tmp/reuse_case";
let dir = "memory:/tmp/";
file_io
.new_output(path)
.unwrap()
.write(Bytes::from("data"))
.await
.unwrap();
assert!(file_io.exists(path).await.unwrap());
assert_eq!(file_io.get_status(path).await.unwrap().size, 4);
assert!(file_io
.list_status(dir)
.await
.unwrap()
.iter()
.any(|status| status.path == path));
file_io.delete_dir(dir).await.unwrap();
}
#[tokio::test]
async fn test_memory_operator_not_shared_between_file_io_instances() {
let file_io_1 = setup_memory_file_io();
let file_io_2 = setup_memory_file_io();
let path = "memory:/tmp/reuse_isolation_case";
file_io_1
.new_output(path)
.unwrap()
.write(Bytes::from("data"))
.await
.unwrap();
assert!(file_io_1.exists(path).await.unwrap());
assert!(!file_io_2.exists(path).await.unwrap());
}
#[tokio::test]
async fn test_get_status_fs() {
let file_io = setup_fs_file_io();
common_test_get_status(&file_io, "file:/tmp/test_file_get_status_fs").await;
}
#[tokio::test]
async fn test_exists_fs() {
let file_io = setup_fs_file_io();
common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await;
}
#[tokio::test]
async fn test_delete_file_fs() {
let file_io = setup_fs_file_io();
common_test_delete_file(&file_io, "file:/tmp/test_file_delete_fs").await;
}
#[tokio::test]
async fn test_mkdirs_fs() {
let file_io = setup_fs_file_io();
common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await;
}
#[tokio::test]
async fn test_rename_fs() {
let file_io = setup_fs_file_io();
common_test_rename(
&file_io,
"file:/tmp/test_file_fs_z",
"file:/tmp/new_test_file_fs_o",
)
.await;
}
#[tokio::test]
async fn test_list_status_fs_should_return_entry_paths() {
let file_io = setup_fs_file_io();
common_test_list_status_paths(&file_io, "file:/tmp/test_list_status_paths_fs/").await;
}
#[test]
fn test_from_path_detects_local_fs_path() {
let dir = tempdir().unwrap();
let file_io = FileIO::from_path(dir.path().to_string_lossy())
.unwrap()
.build()
.unwrap();
let path = local_file_path(&dir.path().join("from_path_detects_local_fs_path.txt"));
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
file_io
.new_output(&path)
.unwrap()
.write(Bytes::from("data"))
.await
.unwrap();
assert!(file_io.exists(&path).await.unwrap());
});
}
}
#[cfg(test)]
mod input_output_test {
use super::*;
use bytes::Bytes;
fn setup_memory_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}
fn setup_fs_file_io() -> FileIO {
FileIOBuilder::new("file").build().unwrap()
}
async fn common_test_output_file_write_and_read(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let input = output.to_input_file();
let content = input.read().await.unwrap();
assert_eq!(&content[..], b"hello world");
file_io.delete_file(path).await.unwrap();
}
async fn common_test_output_file_exists(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let exists = output.exists().await.unwrap();
assert!(exists);
file_io.delete_file(path).await.unwrap();
}
async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let input = output.to_input_file();
let metadata = input.metadata().await.unwrap();
assert_eq!(metadata.size, 11);
file_io.delete_file(path).await.unwrap();
}
async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
writer.write(Bytes::from("hello world")).await.unwrap();
writer.close().await.unwrap();
let input = output.to_input_file();
let reader = input.reader().await.unwrap();
let partial_content = reader.read(0..5).await.unwrap();
assert_eq!(&partial_content[..], b"hello");
file_io.delete_file(path).await.unwrap();
}
#[tokio::test]
async fn test_output_file_write_and_read_memory() {
let file_io = setup_memory_file_io();
common_test_output_file_write_and_read(&file_io, "memory:/test_file_rw_mem").await;
}
#[tokio::test]
async fn test_output_file_exists_memory() {
let file_io = setup_memory_file_io();
common_test_output_file_exists(&file_io, "memory:/test_file_exist_mem").await;
}
#[tokio::test]
async fn test_input_file_metadata_memory() {
let file_io = setup_memory_file_io();
common_test_input_file_metadata(&file_io, "memory:/test_file_meta_mem").await;
}
#[tokio::test]
async fn test_input_file_partial_read_memory() {
let file_io = setup_memory_file_io();
common_test_input_file_partial_read(&file_io, "memory:/test_file_part_read_mem").await;
}
#[tokio::test]
async fn test_output_file_write_and_read_fs() {
let file_io = setup_fs_file_io();
common_test_output_file_write_and_read(&file_io, "file:/tmp/test_file_fs_rw").await;
}
#[tokio::test]
async fn test_output_file_exists_fs() {
let file_io = setup_fs_file_io();
common_test_output_file_exists(&file_io, "file:/tmp/test_file_exists").await;
}
#[tokio::test]
async fn test_input_file_metadata_fs() {
let file_io = setup_fs_file_io();
common_test_input_file_metadata(&file_io, "file:/tmp/test_file_meta").await;
}
#[tokio::test]
async fn test_input_file_partial_read_fs() {
let file_io = setup_fs_file_io();
common_test_input_file_partial_read(&file_io, "file:/tmp/test_file_read_fs").await;
}
}