pub mod local;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::Read;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, Stream, StreamExt};
use local::LocalFileSystem;
use crate::error::{DataFusionError, Result};
#[async_trait]
pub trait ObjectReader: Send + Sync {
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>;
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}
fn length(&self) -> u64;
}
#[derive(Debug)]
pub enum ListEntry {
FileMeta(FileMeta),
Prefix(String),
}
#[derive(Debug, Clone, PartialEq)]
pub struct SizedFile {
pub path: String,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct FileMeta {
pub sized_file: SizedFile,
pub last_modified: Option<DateTime<Utc>>,
}
impl FileMeta {
pub fn path(&self) -> &str {
&self.sized_file.path
}
pub fn size(&self) -> u64 {
self.sized_file.size
}
}
impl std::fmt::Display for FileMeta {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} (size: {})", self.path(), self.size())
}
}
pub type FileMetaStream =
Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;
pub type ListEntryStream =
Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
pub type ObjectReaderStream =
Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>;
#[async_trait]
pub trait ObjectStore: Sync + Send + Debug {
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
async fn list_file_with_suffix(
&self,
prefix: &str,
suffix: &str,
) -> Result<FileMetaStream> {
let file_stream = self.list_file(prefix).await?;
let suffix = suffix.to_owned();
Ok(Box::pin(file_stream.filter(move |fr| {
let has_suffix = match fr {
Ok(f) => f.path().ends_with(&suffix),
Err(_) => true,
};
async move { has_suffix }
})))
}
async fn list_dir(
&self,
prefix: &str,
delimiter: Option<String>,
) -> Result<ListEntryStream>;
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
}
static LOCAL_SCHEME: &str = "file";
pub struct ObjectStoreRegistry {
pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
}
impl fmt::Debug for ObjectStoreRegistry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ObjectStoreRegistry")
.field(
"schemes",
&self.object_stores.read().keys().collect::<Vec<_>>(),
)
.finish()
}
}
impl Default for ObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
impl ObjectStoreRegistry {
pub fn new() -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
Self {
object_stores: RwLock::new(map),
}
}
pub fn register_store(
&self,
scheme: String,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mut stores = self.object_stores.write();
stores.insert(scheme, store)
}
pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
let stores = self.object_stores.read();
stores.get(scheme).cloned()
}
pub fn get_by_uri<'a>(
&self,
uri: &'a str,
) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
if let Some((scheme, path)) = uri.split_once("://") {
let stores = self.object_stores.read();
let store = stores
.get(&*scheme.to_lowercase())
.map(Clone::clone)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
scheme
))
})?;
Ok((store, path))
} else {
Ok((Arc::new(LocalFileSystem), uri))
}
}
}