1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38 url: Url,
40 prefix: Path,
42 glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103 let s = s.as_ref();
104
105 #[cfg(not(target_arch = "wasm32"))]
107 if std::path::Path::new(s).is_absolute() {
108 return Self::parse_path(s);
109 }
110
111 match Url::parse(s) {
112 Ok(url) => Self::try_new(url, None),
113 #[cfg(not(target_arch = "wasm32"))]
114 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115 Err(e) => Err(DataFusionError::External(Box::new(e))),
116 }
117 }
118
119 #[cfg(not(target_arch = "wasm32"))]
121 fn parse_path(s: &str) -> Result<Self> {
122 let (path, glob) = match split_glob_expression(s) {
123 Some((prefix, glob)) => {
124 let glob = Pattern::new(glob)
125 .map_err(|e| DataFusionError::External(Box::new(e)))?;
126 (prefix, Some(glob))
127 }
128 None => (s, None),
129 };
130
131 let url = url_from_filesystem_path(path).ok_or_else(|| {
132 DataFusionError::External(
133 format!("Failed to convert path to URL: {path}").into(),
134 )
135 })?;
136
137 Self::try_new(url, glob)
138 }
139
140 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147 let prefix = Path::from_url_path(url.path())?;
148 Ok(Self { url, prefix, glob })
149 }
150
151 pub fn scheme(&self) -> &str {
153 self.url.scheme()
154 }
155
156 pub fn prefix(&self) -> &Path {
161 &self.prefix
162 }
163
164 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166 let Some(all_segments) = self.strip_prefix(path) else {
167 return false;
168 };
169
170 let mut segments = all_segments.filter(|s| !s.contains('='));
173
174 match &self.glob {
175 Some(glob) => {
176 if ignore_subdirectory {
177 segments
178 .next()
179 .is_some_and(|file_name| glob.matches(file_name))
180 } else {
181 let stripped = segments.join(DELIMITER);
182 glob.matches(&stripped)
183 }
184 }
185 None if ignore_subdirectory => segments.count() <= 1,
189 None => true,
191 }
192 }
193
194 pub fn is_collection(&self) -> bool {
196 self.url.path().ends_with(DELIMITER)
197 }
198
199 pub fn file_extension(&self) -> Option<&str> {
212 if let Some(mut segments) = self.url.path_segments() {
213 if let Some(last_segment) = segments.next_back() {
214 if last_segment.contains(".") && !last_segment.ends_with(".") {
215 return last_segment.split('.').next_back();
216 }
217 }
218 }
219
220 None
221 }
222
223 pub fn strip_prefix<'a, 'b: 'a>(
226 &'a self,
227 path: &'b Path,
228 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231 stripped = stripped.strip_prefix(DELIMITER)?;
232 }
233 Some(stripped.split_terminator(DELIMITER))
234 }
235
236 pub async fn list_all_files<'a>(
238 &'a self,
239 ctx: &'a dyn Session,
240 store: &'a dyn ObjectStore,
241 file_extension: &'a str,
242 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243 let exec_options = &ctx.config_options().execution;
244 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245
246 let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247 list_with_cache(ctx, store, &self.prefix).await?
248 } else {
249 match store.head(&self.prefix).await {
250 Ok(meta) => futures::stream::once(async { Ok(meta) })
251 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252 .boxed(),
253 Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
256 }
257 };
258
259 Ok(list
260 .try_filter(move |meta| {
261 let path = &meta.location;
262 let extension_match = path.as_ref().ends_with(file_extension);
263 let glob_match = self.contains(path, ignore_subdirectory);
264 futures::future::ready(extension_match && glob_match)
265 })
266 .boxed())
267 }
268
269 pub fn as_str(&self) -> &str {
271 self.as_ref()
272 }
273
274 pub fn object_store(&self) -> ObjectStoreUrl {
276 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
277 ObjectStoreUrl::parse(url).unwrap()
278 }
279
280 pub fn is_folder(&self) -> bool {
282 self.url.scheme() == "file" && self.is_collection()
283 }
284
285 pub fn get_url(&self) -> &Url {
287 &self.url
288 }
289
290 pub fn get_glob(&self) -> &Option<Pattern> {
292 &self.glob
293 }
294
295 pub fn with_glob(self, glob: &str) -> Result<Self> {
297 let glob =
298 Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
299 Self::try_new(self.url, Some(glob))
300 }
301}
302
303async fn list_with_cache<'b>(
304 ctx: &'b dyn Session,
305 store: &'b dyn ObjectStore,
306 prefix: &'b Path,
307) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
308 match ctx.runtime_env().cache_manager.get_list_files_cache() {
309 None => Ok(store
310 .list(Some(prefix))
311 .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
312 .boxed()),
313 Some(cache) => {
314 let vec = if let Some(res) = cache.get(prefix) {
315 debug!("Hit list all files cache");
316 res.as_ref().clone()
317 } else {
318 let vec = store
319 .list(Some(prefix))
320 .try_collect::<Vec<ObjectMeta>>()
321 .await?;
322 cache.put(prefix, Arc::new(vec.clone()));
323 vec
324 };
325 Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
326 }
327 }
328}
329
330#[cfg(not(target_arch = "wasm32"))]
332fn url_from_filesystem_path(s: &str) -> Option<Url> {
333 let path = std::path::Path::new(s);
334 let is_dir = match path.exists() {
335 true => path.is_dir(),
336 false => std::path::is_separator(s.chars().last()?),
338 };
339
340 let from_absolute_path = |p| {
341 let first = match is_dir {
342 true => Url::from_directory_path(p).ok(),
343 false => Url::from_file_path(p).ok(),
344 }?;
345
346 Url::parse(first.as_str()).ok()
349 };
350
351 if path.is_absolute() {
352 return from_absolute_path(path);
353 }
354
355 let absolute = std::env::current_dir().ok()?.join(path);
356 from_absolute_path(&absolute)
357}
358
359impl AsRef<str> for ListingTableUrl {
360 fn as_ref(&self) -> &str {
361 self.url.as_ref()
362 }
363}
364
365impl AsRef<Url> for ListingTableUrl {
366 fn as_ref(&self) -> &Url {
367 &self.url
368 }
369}
370
371impl std::fmt::Display for ListingTableUrl {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 self.as_str().fmt(f)
374 }
375}
376
377#[cfg(not(target_arch = "wasm32"))]
378const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
379
380#[cfg(not(target_arch = "wasm32"))]
387fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
388 let mut last_separator = 0;
389
390 for (byte_idx, char) in path.char_indices() {
391 if GLOB_START_CHARS.contains(&char) {
392 if last_separator == 0 {
393 return Some((".", path));
394 }
395 return Some(path.split_at(last_separator));
396 }
397
398 if std::path::is_separator(char) {
399 last_separator = byte_idx + char.len_utf8();
400 }
401 }
402 None
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use datafusion_common::config::TableOptions;
409 use datafusion_common::DFSchema;
410 use datafusion_execution::config::SessionConfig;
411 use datafusion_execution::runtime_env::RuntimeEnv;
412 use datafusion_execution::TaskContext;
413 use datafusion_expr::execution_props::ExecutionProps;
414 use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
415 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
416 use datafusion_physical_plan::ExecutionPlan;
417 use object_store::PutPayload;
418 use std::any::Any;
419 use std::collections::HashMap;
420 use tempfile::tempdir;
421
422 #[test]
423 fn test_prefix_path() {
424 let root = std::env::current_dir().unwrap();
425 let root = root.to_string_lossy();
426
427 let url = ListingTableUrl::parse(root).unwrap();
428 let child = url.prefix.child("partition").child("file");
429
430 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
431 assert_eq!(prefix, vec!["partition", "file"]);
432
433 let url = ListingTableUrl::parse("file:///").unwrap();
434 let child = Path::parse("/foo/bar").unwrap();
435 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
436 assert_eq!(prefix, vec!["foo", "bar"]);
437
438 let url = ListingTableUrl::parse("file:///foo").unwrap();
439 let child = Path::parse("/foob/bar").unwrap();
440 assert!(url.strip_prefix(&child).is_none());
441
442 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
443 let child = Path::parse("/foo/file").unwrap();
444 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
445
446 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
447 assert_eq!(url.prefix.as_ref(), "foo/ bar");
448
449 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
450 assert_eq!(url.prefix.as_ref(), "foo/bar");
451
452 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
453 assert_eq!(url.prefix.as_ref(), "foo/😺");
454
455 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
456 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
457
458 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
459 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
460
461 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
462 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
463
464 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
465 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
466
467 let dir = tempdir().unwrap();
468 let path = dir.path().join("bar%2Ffoo");
469 std::fs::File::create(&path).unwrap();
470
471 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
472 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
473
474 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
475 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
476
477 let url =
478 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
479 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
480
481 let workdir = std::env::current_dir().unwrap();
482 let t = workdir.join("non-existent");
483 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
484 let b = ListingTableUrl::parse("non-existent").unwrap();
485 assert_eq!(a, b);
486 assert!(a.prefix.as_ref().ends_with("non-existent"));
487
488 let t = workdir.parent().unwrap();
489 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
490 let b = ListingTableUrl::parse("..").unwrap();
491 assert_eq!(a, b);
492
493 let t = t.join("bar");
494 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
495 let b = ListingTableUrl::parse("../bar").unwrap();
496 assert_eq!(a, b);
497 assert!(a.prefix.as_ref().ends_with("bar"));
498
499 let t = t.join(".").join("foo").join("..").join("baz");
500 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
501 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
502 assert_eq!(a, b);
503 assert!(a.prefix.as_ref().ends_with("bar/baz"));
504 }
505
506 #[test]
507 fn test_prefix_s3() {
508 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
509 assert_eq!(url.prefix.as_ref(), "foo/bar");
510
511 let path = Path::from("foo/bar/partition/foo.parquet");
512 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
513 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
514
515 let path = Path::from("other/bar/partition/foo.parquet");
516 assert!(url.strip_prefix(&path).is_none());
517 }
518
519 #[test]
520 fn test_split_glob() {
521 fn test(input: &str, expected: Option<(&str, &str)>) {
522 assert_eq!(
523 split_glob_expression(input),
524 expected,
525 "testing split_glob_expression with {input}"
526 );
527 }
528
529 test("/", None);
531 test("/a.txt", None);
532 test("/a", None);
533 test("/a/", None);
534 test("/a/b", None);
535 test("/a/b/", None);
536 test("/a/b.txt", None);
537 test("/a/b/c.txt", None);
538 test("*.txt", Some((".", "*.txt")));
540 test("/*.txt", Some(("/", "*.txt")));
541 test("/a/*b.txt", Some(("/a/", "*b.txt")));
542 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
543 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
544 test("/a/b*.txt", Some(("/a/", "b*.txt")));
545 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
546
547 test(
549 "/a/b/c//alltypes_plain*.parquet",
550 Some(("/a/b/c//", "alltypes_plain*.parquet")),
551 );
552 }
553
554 #[test]
555 fn test_is_collection() {
556 fn test(input: &str, expected: bool, message: &str) {
557 let url = ListingTableUrl::parse(input).unwrap();
558 assert_eq!(url.is_collection(), expected, "{message}");
559 }
560
561 test("https://a.b.c/path/", true, "path ends with / - collection");
562 test(
563 "https://a.b.c/path/?a=b",
564 true,
565 "path ends with / - with query args - collection",
566 );
567 test(
568 "https://a.b.c/path?a=b/",
569 false,
570 "path not ends with / - query ends with / - not collection",
571 );
572 test(
573 "https://a.b.c/path/#a=b",
574 true,
575 "path ends with / - with fragment - collection",
576 );
577 test(
578 "https://a.b.c/path#a=b/",
579 false,
580 "path not ends with / - fragment ends with / - not collection",
581 );
582 }
583
584 #[test]
585 fn test_file_extension() {
586 fn test(input: &str, expected: Option<&str>, message: &str) {
587 let url = ListingTableUrl::parse(input).unwrap();
588 assert_eq!(url.file_extension(), expected, "{message}");
589 }
590
591 test("https://a.b.c/path/", None, "path ends with / - not a file");
592 test(
593 "https://a.b.c/path/?a=b",
594 None,
595 "path ends with / - with query args - not a file",
596 );
597 test(
598 "https://a.b.c/path?a=b/",
599 None,
600 "path not ends with / - query ends with / but no file extension",
601 );
602 test(
603 "https://a.b.c/path/#a=b",
604 None,
605 "path ends with / - with fragment - not a file",
606 );
607 test(
608 "https://a.b.c/path#a=b/",
609 None,
610 "path not ends with / - fragment ends with / but no file extension",
611 );
612 test(
613 "file///some/path/",
614 None,
615 "file path ends with / - not a file",
616 );
617 test(
618 "file///some/path/file",
619 None,
620 "file path does not end with - no extension",
621 );
622 test(
623 "file///some/path/file.",
624 None,
625 "file path ends with . - no value after .",
626 );
627 test(
628 "file///some/path/file.ext",
629 Some("ext"),
630 "file path ends with .ext - extension is ext",
631 );
632 }
633
634 #[tokio::test]
635 async fn test_list_files() {
636 let store = object_store::memory::InMemory::new();
637 create_file(&store, "a.parquet").await;
639 create_file(&store, "/t/b.parquet").await;
640 create_file(&store, "/t/c.csv").await;
641 create_file(&store, "/t/d.csv").await;
642
643 assert_eq!(
644 list_all_files("/", &store, "parquet").await,
645 vec!["a.parquet"],
646 );
647
648 assert_eq!(
650 list_all_files("/t/", &store, "parquet").await,
651 vec!["t/b.parquet"],
652 );
653 assert_eq!(
654 list_all_files("/t", &store, "parquet").await,
655 vec!["t/b.parquet"],
656 );
657
658 assert_eq!(
660 list_all_files("/t", &store, "csv").await,
661 vec!["t/c.csv", "t/d.csv"],
662 );
663 assert_eq!(
664 list_all_files("/t/", &store, "csv").await,
665 vec!["t/c.csv", "t/d.csv"],
666 );
667
668 assert_eq!(
670 list_all_files("/NonExisting", &store, "csv").await,
671 vec![] as Vec<String>
672 );
673 assert_eq!(
674 list_all_files("/NonExisting/", &store, "csv").await,
675 vec![] as Vec<String>
676 );
677 }
678
679 async fn create_file(object_store: &dyn ObjectStore, path: &str) {
681 object_store
682 .put(&Path::from(path), PutPayload::from_static(b"hello world"))
683 .await
684 .expect("failed to create test file");
685 }
686
687 async fn list_all_files(
691 url: &str,
692 store: &dyn ObjectStore,
693 file_extension: &str,
694 ) -> Vec<String> {
695 try_list_all_files(url, store, file_extension)
696 .await
697 .unwrap()
698 }
699
700 async fn try_list_all_files(
702 url: &str,
703 store: &dyn ObjectStore,
704 file_extension: &str,
705 ) -> Result<Vec<String>> {
706 let session = MockSession::new();
707 let url = ListingTableUrl::parse(url)?;
708 let files = url
709 .list_all_files(&session, store, file_extension)
710 .await?
711 .try_collect::<Vec<_>>()
712 .await?
713 .into_iter()
714 .map(|meta| meta.location.as_ref().to_string())
715 .collect();
716 Ok(files)
717 }
718
719 struct MockSession {
720 config: SessionConfig,
721 runtime_env: Arc<RuntimeEnv>,
722 }
723
724 impl MockSession {
725 fn new() -> Self {
726 Self {
727 config: SessionConfig::new(),
728 runtime_env: Arc::new(RuntimeEnv::default()),
729 }
730 }
731 }
732
733 #[async_trait::async_trait]
734 impl Session for MockSession {
735 fn session_id(&self) -> &str {
736 unimplemented!()
737 }
738
739 fn config(&self) -> &SessionConfig {
740 &self.config
741 }
742
743 async fn create_physical_plan(
744 &self,
745 _logical_plan: &LogicalPlan,
746 ) -> Result<Arc<dyn ExecutionPlan>> {
747 unimplemented!()
748 }
749
750 fn create_physical_expr(
751 &self,
752 _expr: Expr,
753 _df_schema: &DFSchema,
754 ) -> Result<Arc<dyn PhysicalExpr>> {
755 unimplemented!()
756 }
757
758 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
759 unimplemented!()
760 }
761
762 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
763 unimplemented!()
764 }
765
766 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
767 unimplemented!()
768 }
769
770 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
771 &self.runtime_env
772 }
773
774 fn execution_props(&self) -> &ExecutionProps {
775 unimplemented!()
776 }
777
778 fn as_any(&self) -> &dyn Any {
779 unimplemented!()
780 }
781
782 fn table_options(&self) -> &TableOptions {
783 unimplemented!()
784 }
785
786 fn table_options_mut(&mut self) -> &mut TableOptions {
787 unimplemented!()
788 }
789
790 fn task_ctx(&self) -> Arc<TaskContext> {
791 unimplemented!()
792 }
793 }
794}