use std::ops::Range;
use std::sync::{Arc, OnceLock};
use bytes::Bytes;
use super::storage::{
LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
};
use crate::Result;
#[derive(Clone, Debug)]
pub struct FileIO {
config: StorageConfig,
factory: Arc<dyn StorageFactory>,
storage: Arc<OnceLock<Arc<dyn Storage>>>,
}
impl FileIO {
pub fn new_with_memory() -> Self {
Self {
config: StorageConfig::new(),
factory: Arc::new(MemoryStorageFactory),
storage: Arc::new(OnceLock::new()),
}
}
pub fn new_with_fs() -> Self {
Self {
config: StorageConfig::new(),
factory: Arc::new(LocalFsStorageFactory),
storage: Arc::new(OnceLock::new()),
}
}
pub fn config(&self) -> &StorageConfig {
&self.config
}
fn get_storage(&self) -> Result<Arc<dyn Storage>> {
if let Some(storage) = self.storage.get() {
return Ok(storage.clone());
}
let storage = self.factory.build(&self.config)?;
let _ = self.storage.set(storage.clone());
Ok(self.storage.get().unwrap().clone())
}
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
self.get_storage()?.delete(path.as_ref()).await
}
pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
self.get_storage()?.delete_prefix(path.as_ref()).await
}
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
self.get_storage()?.exists(path.as_ref()).await
}
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
self.get_storage()?.new_input(path.as_ref())
}
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
self.get_storage()?.new_output(path.as_ref())
}
}
#[derive(Clone, Debug)]
pub struct FileIOBuilder {
factory: Arc<dyn StorageFactory>,
config: StorageConfig,
}
impl FileIOBuilder {
pub fn new(factory: Arc<dyn StorageFactory>) -> Self {
Self {
factory,
config: StorageConfig::new(),
}
}
pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
self.config = self.config.with_prop(key.to_string(), value.to_string());
self
}
pub fn with_props(
mut self,
args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
) -> Self {
self.config = self
.config
.with_props(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
self
}
pub fn config(&self) -> &StorageConfig {
&self.config
}
pub fn build(self) -> FileIO {
FileIO {
config: self.config,
factory: self.factory,
storage: Arc::new(OnceLock::new()),
}
}
}
pub struct FileMetadata {
pub size: u64,
}
#[async_trait::async_trait]
pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}
#[derive(Debug)]
pub struct InputFile {
storage: Arc<dyn Storage>,
path: String,
}
impl InputFile {
pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
Self { storage, path }
}
pub fn location(&self) -> &str {
&self.path
}
pub async fn exists(&self) -> crate::Result<bool> {
self.storage.exists(&self.path).await
}
pub async fn metadata(&self) -> crate::Result<FileMetadata> {
self.storage.metadata(&self.path).await
}
pub async fn read(&self) -> crate::Result<Bytes> {
self.storage.read(&self.path).await
}
pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
self.storage.reader(&self.path).await
}
}
#[async_trait::async_trait]
pub trait FileWrite: Send + Unpin + 'static {
async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
async fn close(&mut self) -> crate::Result<()>;
}
#[derive(Debug)]
pub struct OutputFile {
storage: Arc<dyn Storage>,
path: String,
}
impl OutputFile {
pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
Self { storage, path }
}
pub fn location(&self) -> &str {
&self.path
}
pub async fn exists(&self) -> Result<bool> {
self.storage.exists(&self.path).await
}
pub async fn delete(&self) -> Result<()> {
self.storage.delete(&self.path).await
}
pub fn to_input_file(self) -> InputFile {
InputFile {
storage: self.storage,
path: self.path,
}
}
pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
self.storage.write(&self.path, bs).await
}
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
self.storage.writer(&self.path).await
}
}
#[cfg(test)]
mod tests {
use std::fs::{File, create_dir_all};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use futures::AsyncReadExt;
use futures::io::AllowStdIo;
use tempfile::TempDir;
use super::{FileIO, FileIOBuilder};
use crate::io::{LocalFsStorageFactory, MemoryStorageFactory};
fn create_local_file_io() -> FileIO {
FileIO::new_with_fs()
}
fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
create_dir_all(path.as_ref().parent().unwrap()).unwrap();
let mut f = File::create(path).unwrap();
write!(f, "{s}").unwrap();
}
async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
let mut f = AllowStdIo::new(File::open(path).unwrap());
let mut s = String::new();
f.read_to_string(&mut s).await.unwrap();
s
}
#[tokio::test]
async fn test_local_input_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let content = "Iceberg loves rust.";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
write_to_file(content, &full_path);
let file_io = create_local_file_io();
let input_file = file_io.new_input(&full_path).unwrap();
assert!(input_file.exists().await.unwrap());
assert_eq!(&full_path, input_file.location());
let read_content = read_from_file(full_path).await;
assert_eq!(content, &read_content);
}
#[tokio::test]
async fn test_delete_local_file() {
let tmp_dir = TempDir::new().unwrap();
let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
let b_path = format!("{}/{}", sub_dir_path, "b.txt");
let c_path = format!("{}/{}", sub_dir_path, "c.txt");
write_to_file("Iceberg loves rust.", &a_path);
write_to_file("Iceberg loves rust.", &b_path);
write_to_file("Iceberg loves rust.", &c_path);
let file_io = create_local_file_io();
assert!(file_io.exists(&a_path).await.unwrap());
file_io.delete_prefix(&a_path).await.unwrap();
assert!(file_io.exists(&a_path).await.unwrap());
file_io.delete_prefix("not_exists/").await.unwrap();
file_io.delete_prefix(&sub_dir_path).await.unwrap();
assert!(!file_io.exists(&b_path).await.unwrap());
assert!(!file_io.exists(&c_path).await.unwrap());
assert!(file_io.exists(&a_path).await.unwrap());
file_io.delete(&a_path).await.unwrap();
assert!(!file_io.exists(&a_path).await.unwrap());
}
#[tokio::test]
async fn test_delete_non_exist_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let file_io = create_local_file_io();
assert!(!file_io.exists(&full_path).await.unwrap());
assert!(file_io.delete(&full_path).await.is_ok());
assert!(file_io.delete_prefix(&full_path).await.is_ok());
}
#[tokio::test]
async fn test_local_output_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let content = "Iceberg loves rust.";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let file_io = create_local_file_io();
let output_file = file_io.new_output(&full_path).unwrap();
assert!(!output_file.exists().await.unwrap());
{
output_file.write(content.into()).await.unwrap();
}
assert_eq!(&full_path, output_file.location());
let read_content = read_from_file(full_path).await;
assert_eq!(content, &read_content);
}
#[tokio::test]
async fn test_memory_io() {
let io = FileIO::new_with_memory();
let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
let output_file = io.new_output(&path).unwrap();
output_file.write("test".into()).await.unwrap();
assert!(io.exists(&path.clone()).await.unwrap());
let input_file = io.new_input(&path).unwrap();
let content = input_file.read().await.unwrap();
assert_eq!(content, Bytes::from("test"));
io.delete(&path).await.unwrap();
assert!(!io.exists(&path).await.unwrap());
}
#[tokio::test]
async fn test_file_io_builder_with_props() {
let factory = Arc::new(MemoryStorageFactory);
let file_io = FileIOBuilder::new(factory)
.with_prop("key1", "value1")
.with_prop("key2", "value2")
.build();
assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
}
#[tokio::test]
async fn test_file_io_builder_with_multiple_props() {
let factory = Arc::new(LocalFsStorageFactory);
let props = vec![("key1", "value1"), ("key2", "value2")];
let file_io = FileIOBuilder::new(factory).with_props(props).build();
assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string()));
assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string()));
}
}