Skip to main content

vortex_file/multi/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Builder for constructing a [`MultiLayoutDataSource`] from multiple Vortex files.
5
6mod session;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::TryStreamExt;
12use session::MultiFileSessionExt;
13use tracing::debug;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_err;
17use vortex_io::filesystem::FileListing;
18use vortex_io::filesystem::FileSystemRef;
19use vortex_layout::LayoutReaderRef;
20use vortex_scan::api::DataSource;
21use vortex_scan::multi::LayoutReaderFactory;
22use vortex_scan::multi::MultiLayoutDataSource;
23use vortex_session::VortexSession;
24
25use crate::OpenOptionsSessionExt;
26use crate::VortexOpenOptions;
27use crate::v2::FileStatsLayoutReader;
28
29/// A builder that discovers multiple Vortex files from a glob pattern and constructs a
30/// [`MultiLayoutDataSource`] to scan them as a single data source.
31///
32/// The primary interface is [`Self::with_glob`], which accepts a glob
33/// pattern (optionally prefixed with `file://`). For non-local filesystems (S3, GCS, etc.),
34/// callers must also provide a [`FileSystemRef`] via [`Self::with_filesystem`]).
35///
36/// # Examples
37///
38/// ```ignore
39/// // Local files — filesystem is auto-created:
40/// let ds = MultiFileDataSource::new(session)
41///     .with_glob("/data/warehouse/*.vortex")
42///     .build()
43///     .await?;
44///
45/// // S3 — caller provides the filesystem:
46/// let ds = MultiFileDataSource::new(session)
47///     .with_filesystem(s3_fs)
48///     .with_glob("prefix/*.vortex")
49///     .build()
50///     .await?;
51/// ```
52pub struct MultiFileDataSource {
53    session: VortexSession,
54    fs: Option<FileSystemRef>,
55    glob: Option<String>,
56    open_options_fn: Arc<dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync>,
57}
58
59impl MultiFileDataSource {
60    /// Create a new [`MultiFileDataSource`] builder.
61    pub fn new(session: VortexSession) -> Self {
62        Self {
63            session,
64            fs: None,
65            glob: None,
66            open_options_fn: Arc::new(|opts| opts),
67        }
68    }
69
70    /// Set the path glob for file discovery.
71    ///
72    /// This path should be relative to the filesystem's base URL.
73    pub fn with_glob(mut self, glob: impl Into<String>) -> Self {
74        self.glob = Some(glob.into().trim_start_matches("/").to_string());
75        self
76    }
77
78    /// Set the filesystem to use for file discovery and reading.
79    ///
80    /// Required for non-local URLs (S3, GCS, etc.). For `file://` or bare path URLs,
81    /// a local filesystem is created automatically if none is provided.
82    pub fn with_filesystem(mut self, fs: FileSystemRef) -> Self {
83        self.fs = Some(fs);
84        self
85    }
86
87    /// Customize [`VortexOpenOptions`] applied to each file.
88    ///
89    /// Use this to configure segment caches, metrics registries, or other per-file options.
90    pub fn with_open_options(
91        mut self,
92        f: impl Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync + 'static,
93    ) -> Self {
94        self.open_options_fn = Arc::new(f);
95        self
96    }
97
98    /// Build the [`DataSource`].
99    ///
100    /// Discovers files via glob, opens the first file eagerly to determine the schema,
101    /// and creates lazy factories for the remaining files.
102    pub async fn build(mut self) -> VortexResult<impl DataSource> {
103        let glob = self
104            .glob
105            .take()
106            .ok_or_else(|| vortex_err!("MultiFileDataSource requires a glob URL"))?;
107
108        let fs = match self.fs {
109            Some(fs) => fs,
110            None => create_local_filesystem(&self.session)?,
111        };
112        let files: Vec<FileListing> = fs.glob(&glob)?.try_collect().await?;
113
114        if files.is_empty() {
115            vortex_bail!("No files matched the glob pattern '{}'", glob);
116        }
117
118        let file_count = files.len();
119        debug!(file_count, glob = %glob, "discovered files");
120
121        // Open first file eagerly for dtype.
122        let first_file =
123            open_file(&fs, &files[0], &self.session, self.open_options_fn.as_ref()).await?;
124        let first_reader = layout_reader_with_stats(&first_file)?;
125
126        let factories: Vec<Arc<dyn LayoutReaderFactory>> = files[1..]
127            .iter()
128            .map(|f| {
129                Arc::new(VortexFileReaderFactory {
130                    fs: fs.clone(),
131                    file: f.clone(),
132                    session: self.session.clone(),
133                    open_options_fn: self.open_options_fn.clone(),
134                }) as Arc<dyn LayoutReaderFactory>
135            })
136            .collect();
137
138        let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session);
139
140        debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource");
141
142        Ok(inner)
143    }
144}
145
146/// Creates a local filesystem backed by `object_store::local::LocalFileSystem`.
147// TODO(ngates): create a native file system without an object_store dependency.
148//  Turns out it's not a trivial change because we have always used object_store with its own
149//  coalescing and concurrency configs, so we need to re-tune for local disk.
150#[cfg(feature = "object_store")]
151fn create_local_filesystem(session: &VortexSession) -> VortexResult<FileSystemRef> {
152    use vortex_io::object_store::ObjectStoreFileSystem;
153    use vortex_io::session::RuntimeSessionExt;
154
155    let store = Arc::new(object_store::local::LocalFileSystem::default());
156    let fs: FileSystemRef = Arc::new(ObjectStoreFileSystem::new(store, session.handle()));
157    Ok(fs)
158}
159
160#[cfg(not(feature = "object_store"))]
161fn create_local_filesystem(_session: &VortexSession) -> VortexResult<FileSystemRef> {
162    vortex_bail!(
163        "The 'object_store' feature is required for automatic local filesystem creation. \
164             Either enable the feature or provide a filesystem via .with_filesystem()."
165    );
166}
167
168/// Open a single Vortex file, checking the session's footer cache.
169async fn open_file(
170    fs: &FileSystemRef,
171    file: &FileListing,
172    session: &VortexSession,
173    open_options_fn: &(dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync),
174) -> VortexResult<crate::VortexFile> {
175    debug!(path = %file.path, "opening vortex file");
176
177    // Open the reader first so we can use its URI as the cache key.
178    // The URI includes the full path (with any filesystem prefix), making it unique
179    // even when different PrefixFileSystem instances strip paths to the same relative name.
180    let source = fs.open_read(&file.path).await?;
181    let cache_key = source
182        .uri()
183        .map(|u| u.to_string())
184        .unwrap_or_else(|| file.path.clone());
185
186    // Build open options. The DashMap Ref from multi_file() must not live across an await,
187    // so we scope the cache lookup in a block.
188    let options = {
189        let mut options = open_options_fn(session.open_options());
190        if let Some(size) = file.size {
191            options = options.with_file_size(size);
192        }
193        if let Some(footer) = session.multi_file().get_footer(&cache_key) {
194            options = options.with_footer(footer);
195        }
196        options
197    };
198
199    let vortex_file = options.open(source).await?;
200
201    // Store footer in cache (scoped to avoid holding the Ref across subsequent code).
202    session
203        .multi_file()
204        .put_footer(&cache_key, vortex_file.footer().clone());
205    Ok(vortex_file)
206}
207
208/// Creates a layout reader from a VortexFile, wrapping with `FileStatsLayoutReader` when
209/// file-level statistics are available.
210fn layout_reader_with_stats(file: &crate::VortexFile) -> VortexResult<LayoutReaderRef> {
211    let mut reader = file.layout_reader()?;
212    if let Some(stats) = file.file_stats().cloned() {
213        reader = Arc::new(FileStatsLayoutReader::new(
214            reader,
215            stats,
216            file.session.clone(),
217        ));
218    }
219    Ok(reader)
220}
221
222/// A [`LayoutReaderFactory`] that lazily opens a single Vortex file and returns its layout reader.
223struct VortexFileReaderFactory {
224    fs: FileSystemRef,
225    file: FileListing,
226    session: VortexSession,
227    open_options_fn: Arc<dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync>,
228}
229
230#[async_trait]
231impl LayoutReaderFactory for VortexFileReaderFactory {
232    async fn open(&self) -> VortexResult<Option<LayoutReaderRef>> {
233        let file = open_file(
234            &self.fs,
235            &self.file,
236            &self.session,
237            self.open_options_fn.as_ref(),
238        )
239        .await?;
240        Ok(Some(layout_reader_with_stats(&file)?))
241    }
242}