1use datafusion_catalog::Session;
19use datafusion_common::{DataFusionError, Result};
20use datafusion_execution::object_store::ObjectStoreUrl;
21use futures::stream::BoxStream;
22use futures::{StreamExt, TryStreamExt};
23use glob::Pattern;
24use itertools::Itertools;
25use log::debug;
26use object_store::path::Path;
27use object_store::path::DELIMITER;
28use object_store::{ObjectMeta, ObjectStore};
29use std::sync::Arc;
30use url::Url;
31
32#[derive(Debug, Clone, Eq, PartialEq, Hash)]
35pub struct ListingTableUrl {
36 url: Url,
38 prefix: Path,
40 glob: Option<Pattern>,
42}
43
44impl ListingTableUrl {
45 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
101 let s = s.as_ref();
102
103 #[cfg(not(target_arch = "wasm32"))]
105 if std::path::Path::new(s).is_absolute() {
106 return Self::parse_path(s);
107 }
108
109 match Url::parse(s) {
110 Ok(url) => Self::try_new(url, None),
111 #[cfg(not(target_arch = "wasm32"))]
112 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
113 Err(e) => Err(DataFusionError::External(Box::new(e))),
114 }
115 }
116
117 #[cfg(not(target_arch = "wasm32"))]
119 fn parse_path(s: &str) -> Result<Self> {
120 let (path, glob) = match split_glob_expression(s) {
121 Some((prefix, glob)) => {
122 let glob = Pattern::new(glob)
123 .map_err(|e| DataFusionError::External(Box::new(e)))?;
124 (prefix, Some(glob))
125 }
126 None => (s, None),
127 };
128
129 let url = url_from_filesystem_path(path).ok_or_else(|| {
130 DataFusionError::External(
131 format!("Failed to convert path to URL: {path}").into(),
132 )
133 })?;
134
135 Self::try_new(url, glob)
136 }
137
138 fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
140 let prefix = Path::from_url_path(url.path())?;
141 Ok(Self { url, prefix, glob })
142 }
143
144 pub fn scheme(&self) -> &str {
146 self.url.scheme()
147 }
148
149 pub fn prefix(&self) -> &Path {
154 &self.prefix
155 }
156
157 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
159 let Some(all_segments) = self.strip_prefix(path) else {
160 return false;
161 };
162
163 let mut segments = all_segments.filter(|s| !s.contains('='));
166
167 match &self.glob {
168 Some(glob) => {
169 if ignore_subdirectory {
170 segments
171 .next()
172 .is_some_and(|file_name| glob.matches(file_name))
173 } else {
174 let stripped = segments.join(DELIMITER);
175 glob.matches(&stripped)
176 }
177 }
178 None if ignore_subdirectory => segments.count() <= 1,
182 None => true,
184 }
185 }
186
187 pub fn is_collection(&self) -> bool {
189 self.url.path().ends_with(DELIMITER)
190 }
191
192 pub fn file_extension(&self) -> Option<&str> {
205 if let Some(segments) = self.url.path_segments() {
206 if let Some(last_segment) = segments.last() {
207 if last_segment.contains(".") && !last_segment.ends_with(".") {
208 return last_segment.split('.').last();
209 }
210 }
211 }
212
213 None
214 }
215
216 pub fn strip_prefix<'a, 'b: 'a>(
219 &'a self,
220 path: &'b Path,
221 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
222 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
223 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
224 stripped = stripped.strip_prefix(DELIMITER)?;
225 }
226 Some(stripped.split_terminator(DELIMITER))
227 }
228
229 pub async fn list_all_files<'a>(
231 &'a self,
232 ctx: &'a dyn Session,
233 store: &'a dyn ObjectStore,
234 file_extension: &'a str,
235 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
236 let exec_options = &ctx.config_options().execution;
237 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
238 let list = match self.is_collection() {
240 true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
241 None => store.list(Some(&self.prefix)),
242 Some(cache) => {
243 if let Some(res) = cache.get(&self.prefix) {
244 debug!("Hit list all files cache");
245 futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
246 .boxed()
247 } else {
248 let list_res = store.list(Some(&self.prefix));
249 let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
250 cache.put(&self.prefix, Arc::new(vec.clone()));
251 futures::stream::iter(vec.into_iter().map(Ok)).boxed()
252 }
253 }
254 },
255 false => futures::stream::once(store.head(&self.prefix)).boxed(),
256 };
257 Ok(list
258 .try_filter(move |meta| {
259 let path = &meta.location;
260 let extension_match = path.as_ref().ends_with(file_extension);
261 let glob_match = self.contains(path, ignore_subdirectory);
262 futures::future::ready(extension_match && glob_match)
263 })
264 .map_err(DataFusionError::ObjectStore)
265 .boxed())
266 }
267
268 pub fn as_str(&self) -> &str {
270 self.as_ref()
271 }
272
273 pub fn object_store(&self) -> ObjectStoreUrl {
275 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
276 ObjectStoreUrl::parse(url).unwrap()
277 }
278}
279
280#[cfg(not(target_arch = "wasm32"))]
282fn url_from_filesystem_path(s: &str) -> Option<Url> {
283 let path = std::path::Path::new(s);
284 let is_dir = match path.exists() {
285 true => path.is_dir(),
286 false => std::path::is_separator(s.chars().last()?),
288 };
289
290 let from_absolute_path = |p| {
291 let first = match is_dir {
292 true => Url::from_directory_path(p).ok(),
293 false => Url::from_file_path(p).ok(),
294 }?;
295
296 Url::parse(first.as_str()).ok()
299 };
300
301 if path.is_absolute() {
302 return from_absolute_path(path);
303 }
304
305 let absolute = std::env::current_dir().ok()?.join(path);
306 from_absolute_path(&absolute)
307}
308
309impl AsRef<str> for ListingTableUrl {
310 fn as_ref(&self) -> &str {
311 self.url.as_ref()
312 }
313}
314
315impl AsRef<Url> for ListingTableUrl {
316 fn as_ref(&self) -> &Url {
317 &self.url
318 }
319}
320
321impl std::fmt::Display for ListingTableUrl {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 self.as_str().fmt(f)
324 }
325}
326
327#[cfg(not(target_arch = "wasm32"))]
328const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
329
330#[cfg(not(target_arch = "wasm32"))]
337fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
338 let mut last_separator = 0;
339
340 for (byte_idx, char) in path.char_indices() {
341 if GLOB_START_CHARS.contains(&char) {
342 if last_separator == 0 {
343 return Some((".", path));
344 }
345 return Some(path.split_at(last_separator));
346 }
347
348 if std::path::is_separator(char) {
349 last_separator = byte_idx + char.len_utf8();
350 }
351 }
352 None
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use tempfile::tempdir;
359
360 #[test]
361 fn test_prefix_path() {
362 let root = std::env::current_dir().unwrap();
363 let root = root.to_string_lossy();
364
365 let url = ListingTableUrl::parse(root).unwrap();
366 let child = url.prefix.child("partition").child("file");
367
368 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
369 assert_eq!(prefix, vec!["partition", "file"]);
370
371 let url = ListingTableUrl::parse("file:///").unwrap();
372 let child = Path::parse("/foo/bar").unwrap();
373 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
374 assert_eq!(prefix, vec!["foo", "bar"]);
375
376 let url = ListingTableUrl::parse("file:///foo").unwrap();
377 let child = Path::parse("/foob/bar").unwrap();
378 assert!(url.strip_prefix(&child).is_none());
379
380 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
381 let child = Path::parse("/foo/file").unwrap();
382 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
383
384 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
385 assert_eq!(url.prefix.as_ref(), "foo/ bar");
386
387 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
388 assert_eq!(url.prefix.as_ref(), "foo/bar");
389
390 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
391 assert_eq!(url.prefix.as_ref(), "foo/😺");
392
393 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
394 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
395
396 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
397 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
398
399 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
400 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
401
402 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
403 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
404
405 let dir = tempdir().unwrap();
406 let path = dir.path().join("bar%2Ffoo");
407 std::fs::File::create(&path).unwrap();
408
409 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
410 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
411
412 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
413 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
414
415 let url =
416 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
417 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
418
419 let workdir = std::env::current_dir().unwrap();
420 let t = workdir.join("non-existent");
421 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
422 let b = ListingTableUrl::parse("non-existent").unwrap();
423 assert_eq!(a, b);
424 assert!(a.prefix.as_ref().ends_with("non-existent"));
425
426 let t = workdir.parent().unwrap();
427 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
428 let b = ListingTableUrl::parse("..").unwrap();
429 assert_eq!(a, b);
430
431 let t = t.join("bar");
432 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
433 let b = ListingTableUrl::parse("../bar").unwrap();
434 assert_eq!(a, b);
435 assert!(a.prefix.as_ref().ends_with("bar"));
436
437 let t = t.join(".").join("foo").join("..").join("baz");
438 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
439 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
440 assert_eq!(a, b);
441 assert!(a.prefix.as_ref().ends_with("bar/baz"));
442 }
443
444 #[test]
445 fn test_prefix_s3() {
446 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
447 assert_eq!(url.prefix.as_ref(), "foo/bar");
448
449 let path = Path::from("foo/bar/partition/foo.parquet");
450 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
451 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
452
453 let path = Path::from("other/bar/partition/foo.parquet");
454 assert!(url.strip_prefix(&path).is_none());
455 }
456
457 #[test]
458 fn test_split_glob() {
459 fn test(input: &str, expected: Option<(&str, &str)>) {
460 assert_eq!(
461 split_glob_expression(input),
462 expected,
463 "testing split_glob_expression with {input}"
464 );
465 }
466
467 test("/", None);
469 test("/a.txt", None);
470 test("/a", None);
471 test("/a/", None);
472 test("/a/b", None);
473 test("/a/b/", None);
474 test("/a/b.txt", None);
475 test("/a/b/c.txt", None);
476 test("*.txt", Some((".", "*.txt")));
478 test("/*.txt", Some(("/", "*.txt")));
479 test("/a/*b.txt", Some(("/a/", "*b.txt")));
480 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
481 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
482 test("/a/b*.txt", Some(("/a/", "b*.txt")));
483 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
484
485 test(
487 "/a/b/c//alltypes_plain*.parquet",
488 Some(("/a/b/c//", "alltypes_plain*.parquet")),
489 );
490 }
491
492 #[test]
493 fn test_is_collection() {
494 fn test(input: &str, expected: bool, message: &str) {
495 let url = ListingTableUrl::parse(input).unwrap();
496 assert_eq!(url.is_collection(), expected, "{message}");
497 }
498
499 test("https://a.b.c/path/", true, "path ends with / - collection");
500 test(
501 "https://a.b.c/path/?a=b",
502 true,
503 "path ends with / - with query args - collection",
504 );
505 test(
506 "https://a.b.c/path?a=b/",
507 false,
508 "path not ends with / - query ends with / - not collection",
509 );
510 test(
511 "https://a.b.c/path/#a=b",
512 true,
513 "path ends with / - with fragment - collection",
514 );
515 test(
516 "https://a.b.c/path#a=b/",
517 false,
518 "path not ends with / - fragment ends with / - not collection",
519 );
520 }
521
522 #[test]
523 fn test_file_extension() {
524 fn test(input: &str, expected: Option<&str>, message: &str) {
525 let url = ListingTableUrl::parse(input).unwrap();
526 assert_eq!(url.file_extension(), expected, "{message}");
527 }
528
529 test("https://a.b.c/path/", None, "path ends with / - not a file");
530 test(
531 "https://a.b.c/path/?a=b",
532 None,
533 "path ends with / - with query args - not a file",
534 );
535 test(
536 "https://a.b.c/path?a=b/",
537 None,
538 "path not ends with / - query ends with / but no file extension",
539 );
540 test(
541 "https://a.b.c/path/#a=b",
542 None,
543 "path ends with / - with fragment - not a file",
544 );
545 test(
546 "https://a.b.c/path#a=b/",
547 None,
548 "path not ends with / - fragment ends with / but no file extension",
549 );
550 test(
551 "file///some/path/",
552 None,
553 "file path ends with / - not a file",
554 );
555 test(
556 "file///some/path/file",
557 None,
558 "file path does not end with - no extension",
559 );
560 test(
561 "file///some/path/file.",
562 None,
563 "file path ends with . - no value after .",
564 );
565 test(
566 "file///some/path/file.ext",
567 Some("ext"),
568 "file path ends with .ext - extension is ext",
569 );
570 }
571}