Skip to main content

transferred_files/
source.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::{TryStreamExt, stream};
6use tokio::fs::File;
7use transferred_core::{BatchStream, Source, TransferredError};
8
9use crate::formats::FormatRead;
10
11/// Local file source. One or many files, decoded by the supplied `FormatRead`.
12#[derive(Clone)]
13pub struct FilesSource {
14    paths: GlobOrPaths,
15    format: Arc<dyn FormatRead>,
16}
17
18impl FilesSource {
19    /// Build a source. No I/O performed.
20    #[must_use]
21    pub fn new(paths: GlobOrPaths, format: Arc<dyn FormatRead>) -> Self {
22        Self { paths, format }
23    }
24}
25
26#[async_trait]
27impl Source for FilesSource {
28    /// One stream per file. Globs expanded here; empty results error.
29    async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError> {
30        let paths = self.paths.resolve()?;
31        let format = self.format;
32        Ok(paths
33            .into_iter()
34            .map(|path| lazy_open_file(path, format.clone()))
35            .collect())
36    }
37}
38
39/// How the source enumerates files: glob or single path, or list of paths.
40#[derive(Debug, Clone)]
41pub enum GlobOrPaths {
42    /// Pattern (e.g. `data/*.parquet`). Expanded at `stream_partitions` time.
43    Glob(String),
44    /// Explicit paths. No per-item glob expansion.
45    Paths(Vec<PathBuf>),
46}
47
48impl GlobOrPaths {
49    /// Resolve to concrete paths. Glob walks the filesystem; empty results error.
50    fn resolve(self) -> Result<Vec<PathBuf>, TransferredError> {
51        let paths = match self {
52            GlobOrPaths::Glob(pattern) => expand_glob(&pattern)?,
53            GlobOrPaths::Paths(paths) if paths.is_empty() => {
54                return Err(TransferredError::source("no input paths provided"));
55            }
56            GlobOrPaths::Paths(paths) => paths,
57        };
58        if let Some(dir) = paths.iter().find(|path| path.is_dir()) {
59            return Err(TransferredError::source(format!(
60                concat!(
61                    "{} is a directory, not a file. ",
62                    r#"Pass a list of filenames or glob pattern ("directory/*.parquet")"#
63                ),
64                dir.display()
65            )));
66        }
67        Ok(paths)
68    }
69}
70
71/// Expand a glob pattern to matching paths. Empty matches error.
72fn expand_glob(pattern: &str) -> Result<Vec<PathBuf>, TransferredError> {
73    let paths: Vec<PathBuf> = glob::glob(pattern)
74        .map_err(|err| {
75            TransferredError::source(format!("invalid glob pattern '{pattern}': {err}"))
76        })?
77        .collect::<Result<Vec<_>, _>>()
78        .map_err(|err| TransferredError::source(format!("glob walk error: {err}")))?;
79
80    if paths.is_empty() {
81        return Err(TransferredError::source(format!(
82            "glob '{pattern}' matched no files"
83        )));
84    }
85
86    Ok(paths)
87}
88
89/// Keep files opening lazy so that only opened files has file descriptors.
90fn lazy_open_file(path: PathBuf, format: Arc<dyn FormatRead>) -> BatchStream {
91    Box::pin(stream::once(open_file_stream(path, format)).try_flatten())
92}
93
94async fn open_file_stream(
95    path: PathBuf,
96    format: Arc<dyn FormatRead>,
97) -> Result<BatchStream, TransferredError> {
98    let file = File::open(&path).await?;
99    format.read(Box::new(file)).await
100}
101
102#[cfg(test)]
103#[allow(clippy::unwrap_used)]
104mod tests {
105    use crate::Parquet;
106    use tempfile::tempdir;
107
108    use super::*;
109
110    #[tokio::test]
111    async fn directory_path_errors_with_clear_message() {
112        let dir = tempdir().unwrap();
113        let glob_dir = GlobOrPaths::Glob(dir.path().to_string_lossy().into_owned());
114        let paths_dir = GlobOrPaths::Paths(vec![dir.path().to_path_buf()]);
115
116        for paths in [glob_dir, paths_dir] {
117            let source = FilesSource::new(paths, Arc::new(Parquet::default()));
118            let err = Box::new(source).stream_partitions().await.err().unwrap();
119            assert!(err.to_string().contains("is a directory, not a file"));
120        }
121    }
122}