use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use futures::{TryStreamExt, stream};
use tokio::fs::File;
use transferred_core::{BatchStream, Source, TransferredError};
use crate::formats::FormatRead;
#[derive(Clone)]
pub struct FilesSource {
paths: GlobOrPaths,
format: Arc<dyn FormatRead>,
}
impl FilesSource {
#[must_use]
pub fn new(paths: GlobOrPaths, format: Arc<dyn FormatRead>) -> Self {
Self { paths, format }
}
}
#[async_trait]
impl Source for FilesSource {
async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError> {
let paths = self.paths.resolve()?;
let format = self.format;
Ok(paths
.into_iter()
.map(|path| lazy_open_file(path, format.clone()))
.collect())
}
}
#[derive(Debug, Clone)]
pub enum GlobOrPaths {
Glob(String),
Paths(Vec<PathBuf>),
}
impl GlobOrPaths {
fn resolve(self) -> Result<Vec<PathBuf>, TransferredError> {
let paths = match self {
GlobOrPaths::Glob(pattern) => expand_glob(&pattern)?,
GlobOrPaths::Paths(paths) if paths.is_empty() => {
return Err(TransferredError::source("no input paths provided"));
}
GlobOrPaths::Paths(paths) => paths,
};
if let Some(dir) = paths.iter().find(|path| path.is_dir()) {
return Err(TransferredError::source(format!(
concat!(
"{} is a directory, not a file. ",
r#"Pass a list of filenames or glob pattern ("directory/*.parquet")"#
),
dir.display()
)));
}
Ok(paths)
}
}
fn expand_glob(pattern: &str) -> Result<Vec<PathBuf>, TransferredError> {
let paths: Vec<PathBuf> = glob::glob(pattern)
.map_err(|err| {
TransferredError::source(format!("invalid glob pattern '{pattern}': {err}"))
})?
.collect::<Result<Vec<_>, _>>()
.map_err(|err| TransferredError::source(format!("glob walk error: {err}")))?;
if paths.is_empty() {
return Err(TransferredError::source(format!(
"glob '{pattern}' matched no files"
)));
}
Ok(paths)
}
fn lazy_open_file(path: PathBuf, format: Arc<dyn FormatRead>) -> BatchStream {
Box::pin(stream::once(open_file_stream(path, format)).try_flatten())
}
async fn open_file_stream(
path: PathBuf,
format: Arc<dyn FormatRead>,
) -> Result<BatchStream, TransferredError> {
let file = File::open(&path).await?;
format.read(Box::new(file)).await
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use crate::Parquet;
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn directory_path_errors_with_clear_message() {
let dir = tempdir().unwrap();
let glob_dir = GlobOrPaths::Glob(dir.path().to_string_lossy().into_owned());
let paths_dir = GlobOrPaths::Paths(vec![dir.path().to_path_buf()]);
for paths in [glob_dir, paths_dir] {
let source = FilesSource::new(paths, Arc::new(Parquet::default()));
let err = Box::new(source).stream_partitions().await.err().unwrap();
assert!(err.to_string().contains("is a directory, not a file"));
}
}
}