1mod session;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::TryStreamExt;
12use session::MultiFileSessionExt;
13use tracing::debug;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_err;
17use vortex_io::filesystem::FileListing;
18use vortex_io::filesystem::FileSystemRef;
19use vortex_layout::LayoutReaderRef;
20use vortex_scan::api::DataSource;
21use vortex_scan::multi::LayoutReaderFactory;
22use vortex_scan::multi::MultiLayoutDataSource;
23use vortex_session::VortexSession;
24
25use crate::OpenOptionsSessionExt;
26use crate::VortexOpenOptions;
27use crate::v2::FileStatsLayoutReader;
28
29pub struct MultiFileDataSource {
53 session: VortexSession,
54 fs: Option<FileSystemRef>,
55 glob: Option<String>,
56 open_options_fn: Arc<dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync>,
57}
58
59impl MultiFileDataSource {
60 pub fn new(session: VortexSession) -> Self {
62 Self {
63 session,
64 fs: None,
65 glob: None,
66 open_options_fn: Arc::new(|opts| opts),
67 }
68 }
69
70 pub fn with_glob(mut self, glob: impl Into<String>) -> Self {
74 self.glob = Some(glob.into().trim_start_matches("/").to_string());
75 self
76 }
77
78 pub fn with_filesystem(mut self, fs: FileSystemRef) -> Self {
83 self.fs = Some(fs);
84 self
85 }
86
87 pub fn with_open_options(
91 mut self,
92 f: impl Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync + 'static,
93 ) -> Self {
94 self.open_options_fn = Arc::new(f);
95 self
96 }
97
98 pub async fn build(mut self) -> VortexResult<impl DataSource> {
103 let glob = self
104 .glob
105 .take()
106 .ok_or_else(|| vortex_err!("MultiFileDataSource requires a glob URL"))?;
107
108 let fs = match self.fs {
109 Some(fs) => fs,
110 None => create_local_filesystem(&self.session)?,
111 };
112 let files: Vec<FileListing> = fs.glob(&glob)?.try_collect().await?;
113
114 if files.is_empty() {
115 vortex_bail!("No files matched the glob pattern '{}'", glob);
116 }
117
118 let file_count = files.len();
119 debug!(file_count, glob = %glob, "discovered files");
120
121 let first_file =
123 open_file(&fs, &files[0], &self.session, self.open_options_fn.as_ref()).await?;
124 let first_reader = layout_reader_with_stats(&first_file)?;
125
126 let factories: Vec<Arc<dyn LayoutReaderFactory>> = files[1..]
127 .iter()
128 .map(|f| {
129 Arc::new(VortexFileReaderFactory {
130 fs: fs.clone(),
131 file: f.clone(),
132 session: self.session.clone(),
133 open_options_fn: self.open_options_fn.clone(),
134 }) as Arc<dyn LayoutReaderFactory>
135 })
136 .collect();
137
138 let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session);
139
140 debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource");
141
142 Ok(inner)
143 }
144}
145
146#[cfg(feature = "object_store")]
151fn create_local_filesystem(session: &VortexSession) -> VortexResult<FileSystemRef> {
152 use vortex_io::object_store::ObjectStoreFileSystem;
153 use vortex_io::session::RuntimeSessionExt;
154
155 let store = Arc::new(object_store::local::LocalFileSystem::default());
156 let fs: FileSystemRef = Arc::new(ObjectStoreFileSystem::new(store, session.handle()));
157 Ok(fs)
158}
159
160#[cfg(not(feature = "object_store"))]
161fn create_local_filesystem(_session: &VortexSession) -> VortexResult<FileSystemRef> {
162 vortex_bail!(
163 "The 'object_store' feature is required for automatic local filesystem creation. \
164 Either enable the feature or provide a filesystem via .with_filesystem()."
165 );
166}
167
168async fn open_file(
170 fs: &FileSystemRef,
171 file: &FileListing,
172 session: &VortexSession,
173 open_options_fn: &(dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync),
174) -> VortexResult<crate::VortexFile> {
175 debug!(path = %file.path, "opening vortex file");
176
177 let source = fs.open_read(&file.path).await?;
181 let cache_key = source
182 .uri()
183 .map(|u| u.to_string())
184 .unwrap_or_else(|| file.path.clone());
185
186 let options = {
189 let mut options = open_options_fn(session.open_options());
190 if let Some(size) = file.size {
191 options = options.with_file_size(size);
192 }
193 if let Some(footer) = session.multi_file().get_footer(&cache_key) {
194 options = options.with_footer(footer);
195 }
196 options
197 };
198
199 let vortex_file = options.open(source).await?;
200
201 session
203 .multi_file()
204 .put_footer(&cache_key, vortex_file.footer().clone());
205 Ok(vortex_file)
206}
207
208fn layout_reader_with_stats(file: &crate::VortexFile) -> VortexResult<LayoutReaderRef> {
211 let mut reader = file.layout_reader()?;
212 if let Some(stats) = file.file_stats().cloned() {
213 reader = Arc::new(FileStatsLayoutReader::new(
214 reader,
215 stats,
216 file.session.clone(),
217 ));
218 }
219 Ok(reader)
220}
221
222struct VortexFileReaderFactory {
224 fs: FileSystemRef,
225 file: FileListing,
226 session: VortexSession,
227 open_options_fn: Arc<dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync>,
228}
229
230#[async_trait]
231impl LayoutReaderFactory for VortexFileReaderFactory {
232 async fn open(&self) -> VortexResult<Option<LayoutReaderRef>> {
233 let file = open_file(
234 &self.fs,
235 &self.file,
236 &self.session,
237 self.open_options_fn.as_ref(),
238 )
239 .await?;
240 Ok(Some(layout_reader_with_stats(&file)?))
241 }
242}