transferred_files/
source.rs1use std::path::PathBuf;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream};
6use tokio::fs::File;
7use transferred_core::{BatchStream, Source, TransferredError};
8
9use crate::formats::FormatRead;
10
11#[derive(Clone)]
13pub struct FilesSource {
14 paths: GlobOrPaths,
15 format: Arc<dyn FormatRead>,
16}
17
18impl FilesSource {
19 #[must_use]
21 pub fn new(paths: GlobOrPaths, format: Arc<dyn FormatRead>) -> Self {
22 Self { paths, format }
23 }
24}
25
26#[async_trait]
27impl Source for FilesSource {
28 async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError> {
30 let paths = self.paths.resolve()?;
31 let format = self.format;
32 Ok(paths
33 .into_iter()
34 .map(|path| lazy_open_file(path, format.clone()))
35 .collect())
36 }
37}
38
39#[derive(Debug, Clone)]
41pub enum GlobOrPaths {
42 Glob(String),
44 Paths(Vec<PathBuf>),
46}
47
48impl GlobOrPaths {
49 fn resolve(self) -> Result<Vec<PathBuf>, TransferredError> {
51 let paths = match self {
52 GlobOrPaths::Glob(pattern) => expand_glob(&pattern)?,
53 GlobOrPaths::Paths(paths) if paths.is_empty() => {
54 return Err(TransferredError::source("no input paths provided"));
55 }
56 GlobOrPaths::Paths(paths) => paths,
57 };
58 if let Some(dir) = paths.iter().find(|path| path.is_dir()) {
59 return Err(TransferredError::source(format!(
60 concat!(
61 "{} is a directory, not a file. ",
62 r#"Pass a list of filenames or glob pattern ("directory/*.parquet")"#
63 ),
64 dir.display()
65 )));
66 }
67 Ok(paths)
68 }
69}
70
71fn expand_glob(pattern: &str) -> Result<Vec<PathBuf>, TransferredError> {
73 let paths: Vec<PathBuf> = glob::glob(pattern)
74 .map_err(|err| {
75 TransferredError::source(format!("invalid glob pattern '{pattern}': {err}"))
76 })?
77 .collect::<Result<Vec<_>, _>>()
78 .map_err(|err| TransferredError::source(format!("glob walk error: {err}")))?;
79
80 if paths.is_empty() {
81 return Err(TransferredError::source(format!(
82 "glob '{pattern}' matched no files"
83 )));
84 }
85
86 Ok(paths)
87}
88
89fn lazy_open_file(path: PathBuf, format: Arc<dyn FormatRead>) -> BatchStream {
91 Box::pin(stream::once(open_file_stream(path, format)).try_flatten())
92}
93
94async fn open_file_stream(
95 path: PathBuf,
96 format: Arc<dyn FormatRead>,
97) -> Result<BatchStream, TransferredError> {
98 let file = File::open(&path).await?;
99 format.read(Box::new(file)).await
100}
101
102#[cfg(test)]
103#[allow(clippy::unwrap_used)]
104mod tests {
105 use crate::Parquet;
106 use tempfile::tempdir;
107
108 use super::*;
109
110 #[tokio::test]
111 async fn directory_path_errors_with_clear_message() {
112 let dir = tempdir().unwrap();
113 let glob_dir = GlobOrPaths::Glob(dir.path().to_string_lossy().into_owned());
114 let paths_dir = GlobOrPaths::Paths(vec![dir.path().to_path_buf()]);
115
116 for paths in [glob_dir, paths_dir] {
117 let source = FilesSource::new(paths, Arc::new(Parquet::default()));
118 let err = Box::new(source).stream_partitions().await.err().unwrap();
119 assert!(err.to_string().contains("is a directory, not a file"));
120 }
121 }
122}