1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result, TableReference};
21use datafusion_execution::cache::TableScopedPath;
22use datafusion_execution::object_store::ObjectStoreUrl;
23use datafusion_session::Session;
24
25use futures::stream::BoxStream;
26use futures::{StreamExt, TryStreamExt};
27use glob::Pattern;
28use itertools::Itertools;
29use log::debug;
30use object_store::path::DELIMITER;
31use object_store::path::Path;
32use object_store::{ObjectMeta, ObjectStore};
33use url::Url;
34
35#[derive(Debug, Clone, Eq, PartialEq, Hash)]
38pub struct ListingTableUrl {
39 url: Url,
41 prefix: Path,
43 glob: Option<Pattern>,
45
46 table_ref: Option<TableReference>,
47}
48
49impl ListingTableUrl {
50 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
106 let s = s.as_ref();
107
108 #[cfg(not(target_arch = "wasm32"))]
110 if std::path::Path::new(s).is_absolute() {
111 return Self::parse_path(s);
112 }
113
114 match Url::parse(s) {
115 Ok(url) => Self::try_new(url, None),
116 #[cfg(not(target_arch = "wasm32"))]
117 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
118 Err(e) => Err(DataFusionError::External(Box::new(e))),
119 }
120 }
121
122 #[cfg(not(target_arch = "wasm32"))]
124 fn parse_path(s: &str) -> Result<Self> {
125 let (path, glob) = match split_glob_expression(s) {
126 Some((prefix, glob)) => {
127 let glob = Pattern::new(glob)
128 .map_err(|e| DataFusionError::External(Box::new(e)))?;
129 (prefix, Some(glob))
130 }
131 None => (s, None),
132 };
133
134 let url = url_from_filesystem_path(path).ok_or_else(|| {
135 DataFusionError::External(
136 format!("Failed to convert path to URL: {path}").into(),
137 )
138 })?;
139
140 Self::try_new(url, glob)
141 }
142
143 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
150 let prefix = Path::from_url_path(url.path())?;
151 Ok(Self {
152 url,
153 prefix,
154 glob,
155 table_ref: None,
156 })
157 }
158
159 pub fn scheme(&self) -> &str {
161 self.url.scheme()
162 }
163
164 pub fn prefix(&self) -> &Path {
169 &self.prefix
170 }
171
172 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
174 let Some(all_segments) = self.strip_prefix(path) else {
175 return false;
176 };
177
178 let mut segments = all_segments.filter(|s| !s.contains('='));
181
182 match &self.glob {
183 Some(glob) => {
184 if ignore_subdirectory {
185 segments
186 .next()
187 .is_some_and(|file_name| glob.matches(file_name))
188 } else {
189 let stripped = segments.join(DELIMITER);
190 glob.matches(&stripped)
191 }
192 }
193 None if ignore_subdirectory => segments.count() <= 1,
197 None => true,
199 }
200 }
201
202 pub fn is_collection(&self) -> bool {
204 self.url.path().ends_with(DELIMITER)
205 }
206
207 pub fn file_extension(&self) -> Option<&str> {
220 if let Some(mut segments) = self.url.path_segments()
221 && let Some(last_segment) = segments.next_back()
222 && last_segment.contains(".")
223 && !last_segment.ends_with(".")
224 {
225 return last_segment.split('.').next_back();
226 }
227
228 None
229 }
230
231 pub fn strip_prefix<'a, 'b: 'a>(
234 &'a self,
235 path: &'b Path,
236 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
237 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
238 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
239 stripped = stripped.strip_prefix(DELIMITER)?;
240 }
241 Some(stripped.split_terminator(DELIMITER))
242 }
243
244 pub async fn list_prefixed_files<'a>(
247 &'a self,
248 ctx: &'a dyn Session,
249 store: &'a dyn ObjectStore,
250 prefix: Option<Path>,
251 file_extension: &'a str,
252 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
253 let exec_options = &ctx.config_options().execution;
254 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
255
256 let full_prefix = if let Some(ref p) = prefix {
258 let mut parts = self.prefix.parts().collect::<Vec<_>>();
259 parts.extend(p.parts());
260 Path::from_iter(parts.into_iter())
261 } else {
262 self.prefix.clone()
263 };
264
265 let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
266 list_with_cache(
267 ctx,
268 store,
269 self.table_ref.as_ref(),
270 &self.prefix,
271 prefix.as_ref(),
272 )
273 .await?
274 } else {
275 match store.head(&full_prefix).await {
276 Ok(meta) => futures::stream::once(async { Ok(meta) })
277 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
278 .boxed(),
279 Err(object_store::Error::NotFound { .. }) => {
282 list_with_cache(
283 ctx,
284 store,
285 self.table_ref.as_ref(),
286 &self.prefix,
287 prefix.as_ref(),
288 )
289 .await?
290 }
291 Err(e) => return Err(e.into()),
292 }
293 };
294
295 Ok(list
296 .try_filter(move |meta| {
297 let path = &meta.location;
298 let extension_match = path.as_ref().ends_with(file_extension);
299 let glob_match = self.contains(path, ignore_subdirectory);
300 futures::future::ready(extension_match && glob_match)
301 })
302 .boxed())
303 }
304
305 pub async fn list_all_files<'a>(
307 &'a self,
308 ctx: &'a dyn Session,
309 store: &'a dyn ObjectStore,
310 file_extension: &'a str,
311 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
312 self.list_prefixed_files(ctx, store, None, file_extension)
313 .await
314 }
315
316 pub fn as_str(&self) -> &str {
318 self.as_ref()
319 }
320
321 pub fn object_store(&self) -> ObjectStoreUrl {
323 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
324 ObjectStoreUrl::parse(url).unwrap()
325 }
326
327 pub fn is_folder(&self) -> bool {
329 self.url.scheme() == "file" && self.is_collection()
330 }
331
332 pub fn get_url(&self) -> &Url {
334 &self.url
335 }
336
337 pub fn get_glob(&self) -> &Option<Pattern> {
339 &self.glob
340 }
341
342 pub fn with_glob(self, glob: &str) -> Result<Self> {
344 let glob =
345 Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
346 Self::try_new(self.url, Some(glob))
347 }
348
349 pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
350 self.table_ref = Some(table_ref);
351 self
352 }
353
354 pub fn get_table_ref(&self) -> &Option<TableReference> {
355 &self.table_ref
356 }
357}
358
359async fn list_with_cache<'b>(
377 ctx: &'b dyn Session,
378 store: &'b dyn ObjectStore,
379 table_ref: Option<&TableReference>,
380 table_base_path: &Path,
381 prefix: Option<&Path>,
382) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
383 let full_prefix = match prefix {
385 Some(p) => {
386 let mut parts: Vec<_> = table_base_path.parts().collect();
387 parts.extend(p.parts());
388 Path::from_iter(parts)
389 }
390 None => table_base_path.clone(),
391 };
392
393 match ctx.runtime_env().cache_manager.get_list_files_cache() {
394 None => Ok(store
395 .list(Some(&full_prefix))
396 .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
397 .boxed()),
398 Some(cache) => {
399 let prefix_filter = prefix.cloned();
401
402 let table_scoped_base_path = TableScopedPath {
403 table: table_ref.cloned(),
404 path: table_base_path.clone(),
405 };
406
407 let vec = if let Some(res) =
409 cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
410 {
411 debug!("Hit list files cache");
412 res.as_ref().clone()
413 } else {
414 let vec = store
417 .list(Some(table_base_path))
418 .try_collect::<Vec<ObjectMeta>>()
419 .await?;
420 cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
421
422 if prefix.is_some() {
424 let full_prefix_str = full_prefix.as_ref();
425 vec.into_iter()
426 .filter(|meta| {
427 meta.location.as_ref().starts_with(full_prefix_str)
428 })
429 .collect()
430 } else {
431 vec
432 }
433 };
434 Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
435 }
436 }
437}
438
439#[cfg(not(target_arch = "wasm32"))]
441fn url_from_filesystem_path(s: &str) -> Option<Url> {
442 let path = std::path::Path::new(s);
443 let is_dir = match path.exists() {
444 true => path.is_dir(),
445 false => std::path::is_separator(s.chars().last()?),
447 };
448
449 let from_absolute_path = |p| {
450 let first = match is_dir {
451 true => Url::from_directory_path(p).ok(),
452 false => Url::from_file_path(p).ok(),
453 }?;
454
455 Url::parse(first.as_str()).ok()
458 };
459
460 if path.is_absolute() {
461 return from_absolute_path(path);
462 }
463
464 let absolute = std::env::current_dir().ok()?.join(path);
465 from_absolute_path(&absolute)
466}
467
468impl AsRef<str> for ListingTableUrl {
469 fn as_ref(&self) -> &str {
470 self.url.as_ref()
471 }
472}
473
474impl AsRef<Url> for ListingTableUrl {
475 fn as_ref(&self) -> &Url {
476 &self.url
477 }
478}
479
480impl std::fmt::Display for ListingTableUrl {
481 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482 self.as_str().fmt(f)
483 }
484}
485
486#[cfg(not(target_arch = "wasm32"))]
487const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
488
489#[cfg(not(target_arch = "wasm32"))]
495fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
496 let mut last_separator = 0;
497
498 for (byte_idx, char) in path.char_indices() {
499 if GLOB_START_CHARS.contains(&char) {
500 if last_separator == 0 {
501 return Some((".", path));
502 }
503 return Some(path.split_at(last_separator));
504 }
505
506 if std::path::is_separator(char) {
507 last_separator = byte_idx + char.len_utf8();
508 }
509 }
510 None
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use async_trait::async_trait;
517 use bytes::Bytes;
518 use datafusion_common::DFSchema;
519 use datafusion_common::config::TableOptions;
520 use datafusion_execution::TaskContext;
521 use datafusion_execution::config::SessionConfig;
522 use datafusion_execution::runtime_env::RuntimeEnv;
523 use datafusion_expr::execution_props::ExecutionProps;
524 use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
525 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
526 use datafusion_physical_plan::ExecutionPlan;
527 use object_store::{
528 GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
529 PutPayload,
530 };
531 use std::any::Any;
532 use std::collections::HashMap;
533 use std::ops::Range;
534 use tempfile::tempdir;
535
536 #[test]
537 fn test_prefix_path() {
538 let root = std::env::current_dir().unwrap();
539 let root = root.to_string_lossy();
540
541 let url = ListingTableUrl::parse(root).unwrap();
542 let child = url.prefix.child("partition").child("file");
543
544 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
545 assert_eq!(prefix, vec!["partition", "file"]);
546
547 let url = ListingTableUrl::parse("file:///").unwrap();
548 let child = Path::parse("/foo/bar").unwrap();
549 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
550 assert_eq!(prefix, vec!["foo", "bar"]);
551
552 let url = ListingTableUrl::parse("file:///foo").unwrap();
553 let child = Path::parse("/foob/bar").unwrap();
554 assert!(url.strip_prefix(&child).is_none());
555
556 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
557 let child = Path::parse("/foo/file").unwrap();
558 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
559
560 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
561 assert_eq!(url.prefix.as_ref(), "foo/ bar");
562
563 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
564 assert_eq!(url.prefix.as_ref(), "foo/bar");
565
566 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
567 assert_eq!(url.prefix.as_ref(), "foo/😺");
568
569 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
570 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
571
572 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
573 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
574
575 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
576 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
577
578 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
579 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
580
581 let dir = tempdir().unwrap();
582 let path = dir.path().join("bar%2Ffoo");
583 std::fs::File::create(&path).unwrap();
584
585 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
586 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
587
588 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
589 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
590
591 let url =
592 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
593 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
594
595 let workdir = std::env::current_dir().unwrap();
596 let t = workdir.join("non-existent");
597 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
598 let b = ListingTableUrl::parse("non-existent").unwrap();
599 assert_eq!(a, b);
600 assert!(a.prefix.as_ref().ends_with("non-existent"));
601
602 let t = workdir.parent().unwrap();
603 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
604 let b = ListingTableUrl::parse("..").unwrap();
605 assert_eq!(a, b);
606
607 let t = t.join("bar");
608 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
609 let b = ListingTableUrl::parse("../bar").unwrap();
610 assert_eq!(a, b);
611 assert!(a.prefix.as_ref().ends_with("bar"));
612
613 let t = t.join(".").join("foo").join("..").join("baz");
614 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
615 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
616 assert_eq!(a, b);
617 assert!(a.prefix.as_ref().ends_with("bar/baz"));
618 }
619
620 #[test]
621 fn test_prefix_s3() {
622 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
623 assert_eq!(url.prefix.as_ref(), "foo/bar");
624
625 let path = Path::from("foo/bar/partition/foo.parquet");
626 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
627 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
628
629 let path = Path::from("other/bar/partition/foo.parquet");
630 assert!(url.strip_prefix(&path).is_none());
631 }
632
633 #[test]
634 fn test_split_glob() {
635 fn test(input: &str, expected: Option<(&str, &str)>) {
636 assert_eq!(
637 split_glob_expression(input),
638 expected,
639 "testing split_glob_expression with {input}"
640 );
641 }
642
643 test("/", None);
645 test("/a.txt", None);
646 test("/a", None);
647 test("/a/", None);
648 test("/a/b", None);
649 test("/a/b/", None);
650 test("/a/b.txt", None);
651 test("/a/b/c.txt", None);
652 test("*.txt", Some((".", "*.txt")));
654 test("/*.txt", Some(("/", "*.txt")));
655 test("/a/*b.txt", Some(("/a/", "*b.txt")));
656 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
657 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
658 test("/a/b*.txt", Some(("/a/", "b*.txt")));
659 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
660
661 test(
663 "/a/b/c//alltypes_plain*.parquet",
664 Some(("/a/b/c//", "alltypes_plain*.parquet")),
665 );
666 }
667
668 #[test]
669 fn test_is_collection() {
670 fn test(input: &str, expected: bool, message: &str) {
671 let url = ListingTableUrl::parse(input).unwrap();
672 assert_eq!(url.is_collection(), expected, "{message}");
673 }
674
675 test("https://a.b.c/path/", true, "path ends with / - collection");
676 test(
677 "https://a.b.c/path/?a=b",
678 true,
679 "path ends with / - with query args - collection",
680 );
681 test(
682 "https://a.b.c/path?a=b/",
683 false,
684 "path not ends with / - query ends with / - not collection",
685 );
686 test(
687 "https://a.b.c/path/#a=b",
688 true,
689 "path ends with / - with fragment - collection",
690 );
691 test(
692 "https://a.b.c/path#a=b/",
693 false,
694 "path not ends with / - fragment ends with / - not collection",
695 );
696 }
697
698 #[test]
699 fn test_file_extension() {
700 fn test(input: &str, expected: Option<&str>, message: &str) {
701 let url = ListingTableUrl::parse(input).unwrap();
702 assert_eq!(url.file_extension(), expected, "{message}");
703 }
704
705 test("https://a.b.c/path/", None, "path ends with / - not a file");
706 test(
707 "https://a.b.c/path/?a=b",
708 None,
709 "path ends with / - with query args - not a file",
710 );
711 test(
712 "https://a.b.c/path?a=b/",
713 None,
714 "path not ends with / - query ends with / but no file extension",
715 );
716 test(
717 "https://a.b.c/path/#a=b",
718 None,
719 "path ends with / - with fragment - not a file",
720 );
721 test(
722 "https://a.b.c/path#a=b/",
723 None,
724 "path not ends with / - fragment ends with / but no file extension",
725 );
726 test(
727 "file///some/path/",
728 None,
729 "file path ends with / - not a file",
730 );
731 test(
732 "file///some/path/file",
733 None,
734 "file path does not end with - no extension",
735 );
736 test(
737 "file///some/path/file.",
738 None,
739 "file path ends with . - no value after .",
740 );
741 test(
742 "file///some/path/file.ext",
743 Some("ext"),
744 "file path ends with .ext - extension is ext",
745 );
746 }
747
748 #[tokio::test]
749 async fn test_list_files() -> Result<()> {
750 let store = MockObjectStore {
751 in_mem: object_store::memory::InMemory::new(),
752 forbidden_paths: vec!["forbidden/e.parquet".into()],
753 };
754
755 create_file(&store, "a.parquet").await;
757 create_file(&store, "/t/b.parquet").await;
758 create_file(&store, "/t/c.csv").await;
759 create_file(&store, "/t/d.csv").await;
760
761 create_file(&store, "/forbidden/e.parquet").await;
763
764 assert_eq!(
765 list_all_files("/", &store, "parquet").await?,
766 vec!["a.parquet"],
767 );
768
769 assert_eq!(
771 list_all_files("/t/", &store, "parquet").await?,
772 vec!["t/b.parquet"],
773 );
774 assert_eq!(
775 list_all_files("/t", &store, "parquet").await?,
776 vec!["t/b.parquet"],
777 );
778
779 assert_eq!(
781 list_all_files("/t", &store, "csv").await?,
782 vec!["t/c.csv", "t/d.csv"],
783 );
784 assert_eq!(
785 list_all_files("/t/", &store, "csv").await?,
786 vec!["t/c.csv", "t/d.csv"],
787 );
788
789 assert_eq!(
791 list_all_files("/NonExisting", &store, "csv").await?,
792 vec![] as Vec<String>
793 );
794 assert_eq!(
795 list_all_files("/NonExisting/", &store, "csv").await?,
796 vec![] as Vec<String>
797 );
798
799 let Err(DataFusionError::ObjectStore(err)) =
801 list_all_files("/forbidden/e.parquet", &store, "parquet").await
802 else {
803 panic!("Expected ObjectStore error");
804 };
805
806 let object_store::Error::PermissionDenied { .. } = &*err else {
807 panic!("Expected PermissionDenied error");
808 };
809
810 create_file(&store, "/data/a=1/file1.parquet").await;
812 create_file(&store, "/data/a=1/b=100/file2.parquet").await;
813 create_file(&store, "/data/a=2/b=200/file3.parquet").await;
814 create_file(&store, "/data/a=2/b=200/file4.csv").await;
815
816 assert_eq!(
817 list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
818 .await?,
819 vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
820 );
821
822 assert_eq!(
823 list_prefixed_files(
824 "/data/",
825 &store,
826 Some(Path::from("a=1/b=100")),
827 "parquet"
828 )
829 .await?,
830 vec!["data/a=1/b=100/file2.parquet"],
831 );
832
833 assert_eq!(
834 list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
835 .await?,
836 vec!["data/a=2/b=200/file3.parquet"],
837 );
838
839 Ok(())
840 }
841
842 #[tokio::test]
848 async fn test_cache_path_equivalence() -> Result<()> {
849 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
850
851 let store = MockObjectStore {
852 in_mem: object_store::memory::InMemory::new(),
853 forbidden_paths: vec![],
854 };
855
856 create_file(&store, "/table/year=2023/data1.parquet").await;
858 create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
859 create_file(&store, "/table/year=2024/data3.parquet").await;
860 create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
861 create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
862
863 let session_no_cache = MockSession::new();
865
866 let runtime_with_cache = RuntimeEnvBuilder::new()
868 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
870 let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
871
872 let test_cases = vec![
874 ("/table/", None, "full table listing"),
875 (
876 "/table/",
877 Some(Path::from("year=2023")),
878 "single partition filter",
879 ),
880 (
881 "/table/",
882 Some(Path::from("year=2024")),
883 "different partition filter",
884 ),
885 (
886 "/table/",
887 Some(Path::from("year=2024/month=06")),
888 "nested partition filter",
889 ),
890 (
891 "/table/",
892 Some(Path::from("year=2025")),
893 "non-existent partition",
894 ),
895 ];
896
897 for (url_str, prefix, description) in test_cases {
898 let url = ListingTableUrl::parse(url_str)?;
899
900 let mut results_no_cache: Vec<String> = url
902 .list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
903 .await?
904 .try_collect::<Vec<_>>()
905 .await?
906 .into_iter()
907 .map(|m| m.location.to_string())
908 .collect();
909 results_no_cache.sort();
910
911 let mut results_with_cache_miss: Vec<String> = url
913 .list_prefixed_files(
914 &session_with_cache,
915 &store,
916 prefix.clone(),
917 "parquet",
918 )
919 .await?
920 .try_collect::<Vec<_>>()
921 .await?
922 .into_iter()
923 .map(|m| m.location.to_string())
924 .collect();
925 results_with_cache_miss.sort();
926
927 let mut results_with_cache_hit: Vec<String> = url
929 .list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
930 .await?
931 .try_collect::<Vec<_>>()
932 .await?
933 .into_iter()
934 .map(|m| m.location.to_string())
935 .collect();
936 results_with_cache_hit.sort();
937
938 assert_eq!(
940 results_no_cache, results_with_cache_miss,
941 "Cache miss path should match non-cached path for: {description}"
942 );
943 assert_eq!(
944 results_no_cache, results_with_cache_hit,
945 "Cache hit path should match non-cached path for: {description}"
946 );
947 }
948
949 Ok(())
950 }
951
952 #[tokio::test]
954 async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
955 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
956
957 let store = MockObjectStore {
958 in_mem: object_store::memory::InMemory::new(),
959 forbidden_paths: vec![],
960 };
961
962 create_file(&store, "/sales/region=US/q1.parquet").await;
964 create_file(&store, "/sales/region=US/q2.parquet").await;
965 create_file(&store, "/sales/region=EU/q1.parquet").await;
966
967 let runtime = RuntimeEnvBuilder::new()
969 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
971 let session = MockSession::with_runtime_env(runtime);
972
973 let url = ListingTableUrl::parse("/sales/")?;
974
975 let full_results: Vec<String> = url
977 .list_prefixed_files(&session, &store, None, "parquet")
978 .await?
979 .try_collect::<Vec<_>>()
980 .await?
981 .into_iter()
982 .map(|m| m.location.to_string())
983 .collect();
984 assert_eq!(full_results.len(), 3);
985
986 let mut us_results: Vec<String> = url
988 .list_prefixed_files(
989 &session,
990 &store,
991 Some(Path::from("region=US")),
992 "parquet",
993 )
994 .await?
995 .try_collect::<Vec<_>>()
996 .await?
997 .into_iter()
998 .map(|m| m.location.to_string())
999 .collect();
1000 us_results.sort();
1001
1002 assert_eq!(
1003 us_results,
1004 vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
1005 );
1006
1007 let eu_results: Vec<String> = url
1009 .list_prefixed_files(
1010 &session,
1011 &store,
1012 Some(Path::from("region=EU")),
1013 "parquet",
1014 )
1015 .await?
1016 .try_collect::<Vec<_>>()
1017 .await?
1018 .into_iter()
1019 .map(|m| m.location.to_string())
1020 .collect();
1021
1022 assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
1023
1024 Ok(())
1025 }
1026
1027 async fn create_file(object_store: &dyn ObjectStore, path: &str) {
1029 object_store
1030 .put(&Path::from(path), PutPayload::from_static(b"hello world"))
1031 .await
1032 .expect("failed to create test file");
1033 }
1034
1035 async fn list_all_files(
1039 url: &str,
1040 store: &dyn ObjectStore,
1041 file_extension: &str,
1042 ) -> Result<Vec<String>> {
1043 try_list_prefixed_files(url, store, None, file_extension).await
1044 }
1045
1046 async fn list_prefixed_files(
1050 url: &str,
1051 store: &dyn ObjectStore,
1052 prefix: Option<Path>,
1053 file_extension: &str,
1054 ) -> Result<Vec<String>> {
1055 try_list_prefixed_files(url, store, prefix, file_extension).await
1056 }
1057
1058 async fn try_list_prefixed_files(
1060 url: &str,
1061 store: &dyn ObjectStore,
1062 prefix: Option<Path>,
1063 file_extension: &str,
1064 ) -> Result<Vec<String>> {
1065 let session = MockSession::new();
1066 let url = ListingTableUrl::parse(url)?;
1067 let files = url
1068 .list_prefixed_files(&session, store, prefix, file_extension)
1069 .await?
1070 .try_collect::<Vec<_>>()
1071 .await?
1072 .into_iter()
1073 .map(|meta| meta.location.as_ref().to_string())
1074 .collect();
1075 Ok(files)
1076 }
1077
1078 #[derive(Debug)]
1079 struct MockObjectStore {
1080 in_mem: object_store::memory::InMemory,
1081 forbidden_paths: Vec<Path>,
1082 }
1083
1084 impl std::fmt::Display for MockObjectStore {
1085 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1086 self.in_mem.fmt(f)
1087 }
1088 }
1089
1090 #[async_trait]
1091 impl ObjectStore for MockObjectStore {
1092 async fn put_opts(
1093 &self,
1094 location: &Path,
1095 payload: PutPayload,
1096 opts: object_store::PutOptions,
1097 ) -> object_store::Result<object_store::PutResult> {
1098 self.in_mem.put_opts(location, payload, opts).await
1099 }
1100
1101 async fn put_multipart_opts(
1102 &self,
1103 location: &Path,
1104 opts: PutMultipartOptions,
1105 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1106 self.in_mem.put_multipart_opts(location, opts).await
1107 }
1108
1109 async fn get_opts(
1110 &self,
1111 location: &Path,
1112 options: GetOptions,
1113 ) -> object_store::Result<GetResult> {
1114 self.in_mem.get_opts(location, options).await
1115 }
1116
1117 async fn get_ranges(
1118 &self,
1119 location: &Path,
1120 ranges: &[Range<u64>],
1121 ) -> object_store::Result<Vec<Bytes>> {
1122 self.in_mem.get_ranges(location, ranges).await
1123 }
1124
1125 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1126 if self.forbidden_paths.contains(location) {
1127 Err(object_store::Error::PermissionDenied {
1128 path: location.to_string(),
1129 source: "forbidden".into(),
1130 })
1131 } else {
1132 self.in_mem.head(location).await
1133 }
1134 }
1135
1136 async fn delete(&self, location: &Path) -> object_store::Result<()> {
1137 self.in_mem.delete(location).await
1138 }
1139
1140 fn list(
1141 &self,
1142 prefix: Option<&Path>,
1143 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1144 self.in_mem.list(prefix)
1145 }
1146
1147 async fn list_with_delimiter(
1148 &self,
1149 prefix: Option<&Path>,
1150 ) -> object_store::Result<ListResult> {
1151 self.in_mem.list_with_delimiter(prefix).await
1152 }
1153
1154 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1155 self.in_mem.copy(from, to).await
1156 }
1157
1158 async fn copy_if_not_exists(
1159 &self,
1160 from: &Path,
1161 to: &Path,
1162 ) -> object_store::Result<()> {
1163 self.in_mem.copy_if_not_exists(from, to).await
1164 }
1165 }
1166
1167 struct MockSession {
1168 config: SessionConfig,
1169 runtime_env: Arc<RuntimeEnv>,
1170 }
1171
1172 impl MockSession {
1173 fn new() -> Self {
1174 Self {
1175 config: SessionConfig::new(),
1176 runtime_env: Arc::new(RuntimeEnv::default()),
1177 }
1178 }
1179
1180 fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1182 Self {
1183 config: SessionConfig::new(),
1184 runtime_env,
1185 }
1186 }
1187 }
1188
1189 #[async_trait::async_trait]
1190 impl Session for MockSession {
1191 fn session_id(&self) -> &str {
1192 unimplemented!()
1193 }
1194
1195 fn config(&self) -> &SessionConfig {
1196 &self.config
1197 }
1198
1199 async fn create_physical_plan(
1200 &self,
1201 _logical_plan: &LogicalPlan,
1202 ) -> Result<Arc<dyn ExecutionPlan>> {
1203 unimplemented!()
1204 }
1205
1206 fn create_physical_expr(
1207 &self,
1208 _expr: Expr,
1209 _df_schema: &DFSchema,
1210 ) -> Result<Arc<dyn PhysicalExpr>> {
1211 unimplemented!()
1212 }
1213
1214 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
1215 unimplemented!()
1216 }
1217
1218 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
1219 unimplemented!()
1220 }
1221
1222 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
1223 unimplemented!()
1224 }
1225
1226 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1227 &self.runtime_env
1228 }
1229
1230 fn execution_props(&self) -> &ExecutionProps {
1231 unimplemented!()
1232 }
1233
1234 fn as_any(&self) -> &dyn Any {
1235 unimplemented!()
1236 }
1237
1238 fn table_options(&self) -> &TableOptions {
1239 unimplemented!()
1240 }
1241
1242 fn table_options_mut(&mut self) -> &mut TableOptions {
1243 unimplemented!()
1244 }
1245
1246 fn task_ctx(&self) -> Arc<TaskContext> {
1247 unimplemented!()
1248 }
1249 }
1250}