1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result, TableReference};
21use datafusion_execution::cache::TableScopedPath;
22use datafusion_execution::cache::cache_manager::CachedFileList;
23use datafusion_execution::object_store::ObjectStoreUrl;
24use datafusion_session::Session;
25
26use futures::stream::BoxStream;
27use futures::{StreamExt, TryStreamExt};
28use glob::Pattern;
29use itertools::Itertools;
30use log::debug;
31use object_store::path::DELIMITER;
32use object_store::path::Path;
33use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
34use url::Url;
35
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
39pub struct ListingTableUrl {
40 url: Url,
42 prefix: Path,
44 glob: Option<Pattern>,
46 table_ref: Option<TableReference>,
48}
49
50impl ListingTableUrl {
51 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
107 let s = s.as_ref();
108
109 #[cfg(not(target_arch = "wasm32"))]
111 if std::path::Path::new(s).is_absolute() {
112 return Self::parse_path(s);
113 }
114
115 match Url::parse(s) {
116 Ok(url) => Self::try_new(url, None),
117 #[cfg(not(target_arch = "wasm32"))]
118 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
119 Err(e) => Err(DataFusionError::External(Box::new(e))),
120 }
121 }
122
123 #[cfg(not(target_arch = "wasm32"))]
125 fn parse_path(s: &str) -> Result<Self> {
126 let (path, glob) = match split_glob_expression(s) {
127 Some((prefix, glob)) => {
128 let glob = Pattern::new(glob)
129 .map_err(|e| DataFusionError::External(Box::new(e)))?;
130 (prefix, Some(glob))
131 }
132 None => (s, None),
133 };
134
135 let url = url_from_filesystem_path(path).ok_or_else(|| {
136 DataFusionError::External(
137 format!("Failed to convert path to URL: {path}").into(),
138 )
139 })?;
140
141 Self::try_new(url, glob)
142 }
143
144 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
151 let prefix = Path::from_url_path(url.path())?;
152 Ok(Self {
153 url,
154 prefix,
155 glob,
156 table_ref: None,
157 })
158 }
159
160 pub fn scheme(&self) -> &str {
162 self.url.scheme()
163 }
164
165 pub fn prefix(&self) -> &Path {
170 &self.prefix
171 }
172
173 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
175 let Some(all_segments) = self.strip_prefix(path) else {
176 return false;
177 };
178
179 let mut segments = all_segments.filter(|s| !s.contains('='));
182
183 match &self.glob {
184 Some(glob) => {
185 if ignore_subdirectory {
186 segments
187 .next()
188 .is_some_and(|file_name| glob.matches(file_name))
189 } else {
190 let stripped = segments.join(DELIMITER);
191 glob.matches(&stripped)
192 }
193 }
194 None if ignore_subdirectory => segments.count() <= 1,
198 None => true,
200 }
201 }
202
203 pub fn is_collection(&self) -> bool {
205 self.url.path().ends_with(DELIMITER)
206 }
207
208 pub fn file_extension(&self) -> Option<&str> {
221 if let Some(mut segments) = self.url.path_segments()
222 && let Some(last_segment) = segments.next_back()
223 && last_segment.contains(".")
224 && !last_segment.ends_with(".")
225 {
226 return last_segment.split('.').next_back();
227 }
228
229 None
230 }
231
232 pub fn strip_prefix<'a, 'b: 'a>(
235 &'a self,
236 path: &'b Path,
237 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
238 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
239 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
240 stripped = stripped.strip_prefix(DELIMITER)?;
241 }
242 Some(stripped.split_terminator(DELIMITER))
243 }
244
245 pub async fn list_prefixed_files<'a>(
248 &'a self,
249 ctx: &'a dyn Session,
250 store: &'a dyn ObjectStore,
251 prefix: Option<Path>,
252 file_extension: &'a str,
253 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
254 let exec_options = &ctx.config_options().execution;
255 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
256
257 let full_prefix = if let Some(ref p) = prefix {
259 let mut parts = self.prefix.parts().collect::<Vec<_>>();
260 parts.extend(p.parts());
261 Path::from_iter(parts)
262 } else {
263 self.prefix.clone()
264 };
265
266 let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
267 list_with_cache(
268 ctx,
269 store,
270 self.table_ref.as_ref(),
271 &self.prefix,
272 prefix.as_ref(),
273 )
274 .await?
275 } else {
276 match store.head(&full_prefix).await {
277 Ok(meta) => futures::stream::once(async { Ok(meta) })
278 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
279 .boxed(),
280 Err(object_store::Error::NotFound { .. }) => {
283 list_with_cache(
284 ctx,
285 store,
286 self.table_ref.as_ref(),
287 &self.prefix,
288 prefix.as_ref(),
289 )
290 .await?
291 }
292 Err(e) => return Err(e.into()),
293 }
294 };
295
296 Ok(list
297 .try_filter(move |meta| {
298 let path = &meta.location;
299 let extension_match = path.as_ref().ends_with(file_extension);
300 let glob_match = self.contains(path, ignore_subdirectory);
301 futures::future::ready(extension_match && glob_match)
302 })
303 .boxed())
304 }
305
306 pub async fn list_all_files<'a>(
308 &'a self,
309 ctx: &'a dyn Session,
310 store: &'a dyn ObjectStore,
311 file_extension: &'a str,
312 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
313 self.list_prefixed_files(ctx, store, None, file_extension)
314 .await
315 }
316
317 pub fn as_str(&self) -> &str {
319 self.as_ref()
320 }
321
322 pub fn object_store(&self) -> ObjectStoreUrl {
324 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
325 ObjectStoreUrl::parse(url).unwrap()
326 }
327
328 pub fn is_folder(&self) -> bool {
330 self.url.scheme() == "file" && self.is_collection()
331 }
332
333 pub fn get_url(&self) -> &Url {
335 &self.url
336 }
337
338 pub fn get_glob(&self) -> &Option<Pattern> {
340 &self.glob
341 }
342
343 pub fn with_glob(mut self, glob: &str) -> Result<Self> {
345 self.glob =
346 Some(Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?);
347 Ok(self)
348 }
349
350 pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
352 self.table_ref = Some(table_ref);
353 self
354 }
355
356 pub fn get_table_ref(&self) -> &Option<TableReference> {
358 &self.table_ref
359 }
360}
361
362async fn list_with_cache<'b>(
380 ctx: &'b dyn Session,
381 store: &'b dyn ObjectStore,
382 table_ref: Option<&TableReference>,
383 table_base_path: &Path,
384 prefix: Option<&Path>,
385) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
386 let full_prefix = match prefix {
388 Some(p) => {
389 let mut parts: Vec<_> = table_base_path.parts().collect();
390 parts.extend(p.parts());
391 Path::from_iter(parts)
392 }
393 None => table_base_path.clone(),
394 };
395
396 match ctx.runtime_env().cache_manager.get_list_files_cache() {
397 None => Ok(store
398 .list(Some(&full_prefix))
399 .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
400 .boxed()),
401 Some(cache) => {
402 let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
404
405 let table_scoped_base_path = TableScopedPath {
406 table: table_ref.cloned(),
407 path: table_base_path.clone(),
408 };
409
410 let vec = if let Some(cached) = cache.get(&table_scoped_base_path) {
412 debug!("Hit list files cache");
413 cached.files_matching_prefix(&filter_prefix)
414 } else {
415 let mut vec = store
418 .list(Some(table_base_path))
419 .try_collect::<Vec<ObjectMeta>>()
420 .await?;
421 vec.shrink_to_fit(); let cached: CachedFileList = vec.into();
423 let result = cached.files_matching_prefix(&filter_prefix);
424 cache.put(&table_scoped_base_path, cached);
425 result
426 };
427 Ok(
428 futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
429 .boxed(),
430 )
431 }
432 }
433}
434
435#[cfg(not(target_arch = "wasm32"))]
437fn url_from_filesystem_path(s: &str) -> Option<Url> {
438 let path = std::path::Path::new(s);
439 let is_dir = match path.exists() {
440 true => path.is_dir(),
441 false => std::path::is_separator(s.chars().last()?),
443 };
444
445 let from_absolute_path = |p| {
446 let first = match is_dir {
447 true => Url::from_directory_path(p).ok(),
448 false => Url::from_file_path(p).ok(),
449 }?;
450
451 Url::parse(first.as_str()).ok()
454 };
455
456 if path.is_absolute() {
457 return from_absolute_path(path);
458 }
459
460 let absolute = std::env::current_dir().ok()?.join(path);
461 from_absolute_path(&absolute)
462}
463
464impl AsRef<str> for ListingTableUrl {
465 fn as_ref(&self) -> &str {
466 self.url.as_ref()
467 }
468}
469
470impl AsRef<Url> for ListingTableUrl {
471 fn as_ref(&self) -> &Url {
472 &self.url
473 }
474}
475
476impl std::fmt::Display for ListingTableUrl {
477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478 self.as_str().fmt(f)
479 }
480}
481
482#[cfg(not(target_arch = "wasm32"))]
483const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
484
485#[cfg(not(target_arch = "wasm32"))]
491fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
492 let mut last_separator = 0;
493
494 for (byte_idx, char) in path.char_indices() {
495 if GLOB_START_CHARS.contains(&char) {
496 if last_separator == 0 {
497 return Some((".", path));
498 }
499 return Some(path.split_at(last_separator));
500 }
501
502 if std::path::is_separator(char) {
503 last_separator = byte_idx + char.len_utf8();
504 }
505 }
506 None
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use async_trait::async_trait;
513 use bytes::Bytes;
514 use datafusion_common::DFSchema;
515 use datafusion_common::config::TableOptions;
516 use datafusion_execution::TaskContext;
517 use datafusion_execution::config::SessionConfig;
518 use datafusion_execution::runtime_env::RuntimeEnv;
519 use datafusion_expr::execution_props::ExecutionProps;
520 use datafusion_expr::registry::ExtensionTypeRegistryRef;
521 use datafusion_expr::{
522 AggregateUDF, Expr, HigherOrderUDF, LogicalPlan, ScalarUDF, WindowUDF,
523 };
524 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
525 use datafusion_physical_plan::ExecutionPlan;
526 use object_store::{
527 CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
528 PutMultipartOptions, PutPayload,
529 };
530 use std::any::Any;
531 use std::collections::HashMap;
532 use std::ops::Range;
533 use tempfile::tempdir;
534
535 #[test]
536 fn test_prefix_path() {
537 let root = std::env::current_dir().unwrap();
538 let root = root.to_string_lossy();
539
540 let url = ListingTableUrl::parse(root).unwrap();
541 let child = url.prefix.clone().join("partition").join("file");
542
543 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
544 assert_eq!(prefix, vec!["partition", "file"]);
545
546 let url = ListingTableUrl::parse("file:///").unwrap();
547 let child = Path::parse("/foo/bar").unwrap();
548 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
549 assert_eq!(prefix, vec!["foo", "bar"]);
550
551 let url = ListingTableUrl::parse("file:///foo").unwrap();
552 let child = Path::parse("/foob/bar").unwrap();
553 assert!(url.strip_prefix(&child).is_none());
554
555 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
556 let child = Path::parse("/foo/file").unwrap();
557 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
558
559 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
560 assert_eq!(url.prefix.as_ref(), "foo/ bar");
561
562 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
563 assert_eq!(url.prefix.as_ref(), "foo/bar");
564
565 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
566 assert_eq!(url.prefix.as_ref(), "foo/😺");
567
568 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
569 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
570
571 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
572 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
573
574 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
575 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
576
577 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
578 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
579
580 let dir = tempdir().unwrap();
581 let path = dir.path().join("bar%2Ffoo");
582 std::fs::File::create(&path).unwrap();
583
584 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
585 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
586
587 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
588 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
589
590 let url =
591 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
592 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
593
594 let workdir = std::env::current_dir().unwrap();
595 let t = workdir.join("non-existent");
596 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
597 let b = ListingTableUrl::parse("non-existent").unwrap();
598 assert_eq!(a, b);
599 assert!(a.prefix.as_ref().ends_with("non-existent"));
600
601 let t = workdir.parent().unwrap();
602 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
603 let b = ListingTableUrl::parse("..").unwrap();
604 assert_eq!(a, b);
605
606 let t = t.join("bar");
607 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
608 let b = ListingTableUrl::parse("../bar").unwrap();
609 assert_eq!(a, b);
610 assert!(a.prefix.as_ref().ends_with("bar"));
611
612 let t = t.join(".").join("foo").join("..").join("baz");
613 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
614 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
615 assert_eq!(a, b);
616 assert!(a.prefix.as_ref().ends_with("bar/baz"));
617 }
618
619 #[test]
620 fn test_prefix_s3() {
621 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
622 assert_eq!(url.prefix.as_ref(), "foo/bar");
623
624 let path = Path::from("foo/bar/partition/foo.parquet");
625 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
626 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
627
628 let path = Path::from("other/bar/partition/foo.parquet");
629 assert!(url.strip_prefix(&path).is_none());
630 }
631
632 #[test]
633 fn test_split_glob() {
634 fn test(input: &str, expected: Option<(&str, &str)>) {
635 assert_eq!(
636 split_glob_expression(input),
637 expected,
638 "testing split_glob_expression with {input}"
639 );
640 }
641
642 test("/", None);
644 test("/a.txt", None);
645 test("/a", None);
646 test("/a/", None);
647 test("/a/b", None);
648 test("/a/b/", None);
649 test("/a/b.txt", None);
650 test("/a/b/c.txt", None);
651 test("*.txt", Some((".", "*.txt")));
653 test("/*.txt", Some(("/", "*.txt")));
654 test("/a/*b.txt", Some(("/a/", "*b.txt")));
655 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
656 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
657 test("/a/b*.txt", Some(("/a/", "b*.txt")));
658 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
659
660 test(
662 "/a/b/c//alltypes_plain*.parquet",
663 Some(("/a/b/c//", "alltypes_plain*.parquet")),
664 );
665 }
666
667 #[test]
668 fn test_is_collection() {
669 fn test(input: &str, expected: bool, message: &str) {
670 let url = ListingTableUrl::parse(input).unwrap();
671 assert_eq!(url.is_collection(), expected, "{message}");
672 }
673
674 test("https://a.b.c/path/", true, "path ends with / - collection");
675 test(
676 "https://a.b.c/path/?a=b",
677 true,
678 "path ends with / - with query args - collection",
679 );
680 test(
681 "https://a.b.c/path?a=b/",
682 false,
683 "path not ends with / - query ends with / - not collection",
684 );
685 test(
686 "https://a.b.c/path/#a=b",
687 true,
688 "path ends with / - with fragment - collection",
689 );
690 test(
691 "https://a.b.c/path#a=b/",
692 false,
693 "path not ends with / - fragment ends with / - not collection",
694 );
695 }
696
697 #[test]
698 fn test_file_extension() {
699 fn test(input: &str, expected: Option<&str>, message: &str) {
700 let url = ListingTableUrl::parse(input).unwrap();
701 assert_eq!(url.file_extension(), expected, "{message}");
702 }
703
704 test("https://a.b.c/path/", None, "path ends with / - not a file");
705 test(
706 "https://a.b.c/path/?a=b",
707 None,
708 "path ends with / - with query args - not a file",
709 );
710 test(
711 "https://a.b.c/path?a=b/",
712 None,
713 "path not ends with / - query ends with / but no file extension",
714 );
715 test(
716 "https://a.b.c/path/#a=b",
717 None,
718 "path ends with / - with fragment - not a file",
719 );
720 test(
721 "https://a.b.c/path#a=b/",
722 None,
723 "path not ends with / - fragment ends with / but no file extension",
724 );
725 test(
726 "file///some/path/",
727 None,
728 "file path ends with / - not a file",
729 );
730 test(
731 "file///some/path/file",
732 None,
733 "file path does not end with - no extension",
734 );
735 test(
736 "file///some/path/file.",
737 None,
738 "file path ends with . - no value after .",
739 );
740 test(
741 "file///some/path/file.ext",
742 Some("ext"),
743 "file path ends with .ext - extension is ext",
744 );
745 }
746
747 #[tokio::test]
748 async fn test_list_files() -> Result<()> {
749 let store = MockObjectStore {
750 in_mem: object_store::memory::InMemory::new(),
751 forbidden_paths: vec!["forbidden/e.parquet".into()],
752 };
753
754 create_file(&store, "a.parquet").await;
756 create_file(&store, "/t/b.parquet").await;
757 create_file(&store, "/t/c.csv").await;
758 create_file(&store, "/t/d.csv").await;
759
760 create_file(&store, "/forbidden/e.parquet").await;
762
763 assert_eq!(
764 list_all_files("/", &store, "parquet").await?,
765 vec!["a.parquet"],
766 );
767
768 assert_eq!(
770 list_all_files("/t/", &store, "parquet").await?,
771 vec!["t/b.parquet"],
772 );
773 assert_eq!(
774 list_all_files("/t", &store, "parquet").await?,
775 vec!["t/b.parquet"],
776 );
777
778 assert_eq!(
780 list_all_files("/t", &store, "csv").await?,
781 vec!["t/c.csv", "t/d.csv"],
782 );
783 assert_eq!(
784 list_all_files("/t/", &store, "csv").await?,
785 vec!["t/c.csv", "t/d.csv"],
786 );
787
788 assert_eq!(
790 list_all_files("/NonExisting", &store, "csv").await?,
791 vec![] as Vec<String>
792 );
793 assert_eq!(
794 list_all_files("/NonExisting/", &store, "csv").await?,
795 vec![] as Vec<String>
796 );
797
798 let Err(DataFusionError::ObjectStore(err)) =
800 list_all_files("/forbidden/e.parquet", &store, "parquet").await
801 else {
802 panic!("Expected ObjectStore error");
803 };
804
805 let object_store::Error::PermissionDenied { .. } = &*err else {
806 panic!("Expected PermissionDenied error");
807 };
808
809 create_file(&store, "/data/a=1/file1.parquet").await;
811 create_file(&store, "/data/a=1/b=100/file2.parquet").await;
812 create_file(&store, "/data/a=2/b=200/file3.parquet").await;
813 create_file(&store, "/data/a=2/b=200/file4.csv").await;
814
815 assert_eq!(
816 list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
817 .await?,
818 vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
819 );
820
821 assert_eq!(
822 list_prefixed_files(
823 "/data/",
824 &store,
825 Some(Path::from("a=1/b=100")),
826 "parquet"
827 )
828 .await?,
829 vec!["data/a=1/b=100/file2.parquet"],
830 );
831
832 assert_eq!(
833 list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
834 .await?,
835 vec!["data/a=2/b=200/file3.parquet"],
836 );
837
838 Ok(())
839 }
840
841 #[tokio::test]
847 async fn test_cache_path_equivalence() -> Result<()> {
848 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
849
850 let store = MockObjectStore {
851 in_mem: object_store::memory::InMemory::new(),
852 forbidden_paths: vec![],
853 };
854
855 create_file(&store, "/table/year=2023/data1.parquet").await;
857 create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
858 create_file(&store, "/table/year=2024/data3.parquet").await;
859 create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
860 create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
861
862 let session_no_cache = MockSession::new();
864
865 let runtime_with_cache = RuntimeEnvBuilder::new()
867 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
869 let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
870
871 let test_cases = vec![
873 ("/table/", None, "full table listing"),
874 (
875 "/table/",
876 Some(Path::from("year=2023")),
877 "single partition filter",
878 ),
879 (
880 "/table/",
881 Some(Path::from("year=2024")),
882 "different partition filter",
883 ),
884 (
885 "/table/",
886 Some(Path::from("year=2024/month=06")),
887 "nested partition filter",
888 ),
889 (
890 "/table/",
891 Some(Path::from("year=2025")),
892 "non-existent partition",
893 ),
894 ];
895
896 for (url_str, prefix, description) in test_cases {
897 let url = ListingTableUrl::parse(url_str)?;
898
899 let mut results_no_cache: Vec<String> = url
901 .list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
902 .await?
903 .try_collect::<Vec<_>>()
904 .await?
905 .into_iter()
906 .map(|m| m.location.to_string())
907 .collect();
908 results_no_cache.sort();
909
910 let mut results_with_cache_miss: Vec<String> = url
912 .list_prefixed_files(
913 &session_with_cache,
914 &store,
915 prefix.clone(),
916 "parquet",
917 )
918 .await?
919 .try_collect::<Vec<_>>()
920 .await?
921 .into_iter()
922 .map(|m| m.location.to_string())
923 .collect();
924 results_with_cache_miss.sort();
925
926 let mut results_with_cache_hit: Vec<String> = url
928 .list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
929 .await?
930 .try_collect::<Vec<_>>()
931 .await?
932 .into_iter()
933 .map(|m| m.location.to_string())
934 .collect();
935 results_with_cache_hit.sort();
936
937 assert_eq!(
939 results_no_cache, results_with_cache_miss,
940 "Cache miss path should match non-cached path for: {description}"
941 );
942 assert_eq!(
943 results_no_cache, results_with_cache_hit,
944 "Cache hit path should match non-cached path for: {description}"
945 );
946 }
947
948 Ok(())
949 }
950
951 #[tokio::test]
953 async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
954 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
955
956 let store = MockObjectStore {
957 in_mem: object_store::memory::InMemory::new(),
958 forbidden_paths: vec![],
959 };
960
961 create_file(&store, "/sales/region=US/q1.parquet").await;
963 create_file(&store, "/sales/region=US/q2.parquet").await;
964 create_file(&store, "/sales/region=EU/q1.parquet").await;
965
966 let runtime = RuntimeEnvBuilder::new()
968 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
970 let session = MockSession::with_runtime_env(runtime);
971
972 let url = ListingTableUrl::parse("/sales/")?;
973
974 let full_results: Vec<String> = url
976 .list_prefixed_files(&session, &store, None, "parquet")
977 .await?
978 .try_collect::<Vec<_>>()
979 .await?
980 .into_iter()
981 .map(|m| m.location.to_string())
982 .collect();
983 assert_eq!(full_results.len(), 3);
984
985 let mut us_results: Vec<String> = url
987 .list_prefixed_files(
988 &session,
989 &store,
990 Some(Path::from("region=US")),
991 "parquet",
992 )
993 .await?
994 .try_collect::<Vec<_>>()
995 .await?
996 .into_iter()
997 .map(|m| m.location.to_string())
998 .collect();
999 us_results.sort();
1000
1001 assert_eq!(
1002 us_results,
1003 vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
1004 );
1005
1006 let eu_results: Vec<String> = url
1008 .list_prefixed_files(
1009 &session,
1010 &store,
1011 Some(Path::from("region=EU")),
1012 "parquet",
1013 )
1014 .await?
1015 .try_collect::<Vec<_>>()
1016 .await?
1017 .into_iter()
1018 .map(|m| m.location.to_string())
1019 .collect();
1020
1021 assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
1022
1023 Ok(())
1024 }
1025
1026 async fn create_file(object_store: &dyn ObjectStore, path: &str) {
1028 object_store
1029 .put(&Path::from(path), PutPayload::from_static(b"hello world"))
1030 .await
1031 .expect("failed to create test file");
1032 }
1033
1034 async fn list_all_files(
1038 url: &str,
1039 store: &dyn ObjectStore,
1040 file_extension: &str,
1041 ) -> Result<Vec<String>> {
1042 try_list_prefixed_files(url, store, None, file_extension).await
1043 }
1044
1045 async fn list_prefixed_files(
1049 url: &str,
1050 store: &dyn ObjectStore,
1051 prefix: Option<Path>,
1052 file_extension: &str,
1053 ) -> Result<Vec<String>> {
1054 try_list_prefixed_files(url, store, prefix, file_extension).await
1055 }
1056
1057 async fn try_list_prefixed_files(
1059 url: &str,
1060 store: &dyn ObjectStore,
1061 prefix: Option<Path>,
1062 file_extension: &str,
1063 ) -> Result<Vec<String>> {
1064 let session = MockSession::new();
1065 let url = ListingTableUrl::parse(url)?;
1066 let files = url
1067 .list_prefixed_files(&session, store, prefix, file_extension)
1068 .await?
1069 .try_collect::<Vec<_>>()
1070 .await?
1071 .into_iter()
1072 .map(|meta| meta.location.as_ref().to_string())
1073 .collect();
1074 Ok(files)
1075 }
1076
1077 #[derive(Debug)]
1078 struct MockObjectStore {
1079 in_mem: object_store::memory::InMemory,
1080 forbidden_paths: Vec<Path>,
1081 }
1082
1083 impl std::fmt::Display for MockObjectStore {
1084 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1085 self.in_mem.fmt(f)
1086 }
1087 }
1088
1089 #[async_trait]
1090 impl ObjectStore for MockObjectStore {
1091 async fn put_opts(
1092 &self,
1093 location: &Path,
1094 payload: PutPayload,
1095 opts: object_store::PutOptions,
1096 ) -> object_store::Result<object_store::PutResult> {
1097 self.in_mem.put_opts(location, payload, opts).await
1098 }
1099
1100 async fn put_multipart_opts(
1101 &self,
1102 location: &Path,
1103 opts: PutMultipartOptions,
1104 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1105 self.in_mem.put_multipart_opts(location, opts).await
1106 }
1107
1108 async fn get_opts(
1109 &self,
1110 location: &Path,
1111 options: GetOptions,
1112 ) -> object_store::Result<GetResult> {
1113 if options.head && self.forbidden_paths.contains(location) {
1114 Err(object_store::Error::PermissionDenied {
1115 path: location.to_string(),
1116 source: "forbidden".into(),
1117 })
1118 } else {
1119 self.in_mem.get_opts(location, options).await
1120 }
1121 }
1122
1123 async fn get_ranges(
1124 &self,
1125 location: &Path,
1126 ranges: &[Range<u64>],
1127 ) -> object_store::Result<Vec<Bytes>> {
1128 self.in_mem.get_ranges(location, ranges).await
1129 }
1130
1131 fn delete_stream(
1132 &self,
1133 locations: BoxStream<'static, object_store::Result<Path>>,
1134 ) -> BoxStream<'static, object_store::Result<Path>> {
1135 self.in_mem.delete_stream(locations)
1136 }
1137
1138 fn list(
1139 &self,
1140 prefix: Option<&Path>,
1141 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1142 self.in_mem.list(prefix)
1143 }
1144
1145 async fn list_with_delimiter(
1146 &self,
1147 prefix: Option<&Path>,
1148 ) -> object_store::Result<ListResult> {
1149 self.in_mem.list_with_delimiter(prefix).await
1150 }
1151
1152 async fn copy_opts(
1153 &self,
1154 from: &Path,
1155 to: &Path,
1156 options: CopyOptions,
1157 ) -> object_store::Result<()> {
1158 self.in_mem.copy_opts(from, to, options).await
1159 }
1160 }
1161
1162 struct MockSession {
1163 config: SessionConfig,
1164 runtime_env: Arc<RuntimeEnv>,
1165 }
1166
1167 impl MockSession {
1168 fn new() -> Self {
1169 Self {
1170 config: SessionConfig::new(),
1171 runtime_env: Arc::new(RuntimeEnv::default()),
1172 }
1173 }
1174
1175 fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1177 Self {
1178 config: SessionConfig::new(),
1179 runtime_env,
1180 }
1181 }
1182 }
1183
1184 #[async_trait::async_trait]
1185 impl Session for MockSession {
1186 fn session_id(&self) -> &str {
1187 unimplemented!()
1188 }
1189
1190 fn config(&self) -> &SessionConfig {
1191 &self.config
1192 }
1193
1194 async fn create_physical_plan(
1195 &self,
1196 _logical_plan: &LogicalPlan,
1197 ) -> Result<Arc<dyn ExecutionPlan>> {
1198 unimplemented!()
1199 }
1200
1201 fn create_physical_expr(
1202 &self,
1203 _expr: Expr,
1204 _df_schema: &DFSchema,
1205 ) -> Result<Arc<dyn PhysicalExpr>> {
1206 unimplemented!()
1207 }
1208
1209 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
1210 unimplemented!()
1211 }
1212
1213 fn higher_order_functions(&self) -> &HashMap<String, Arc<HigherOrderUDF>> {
1214 unimplemented!()
1215 }
1216
1217 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
1218 unimplemented!()
1219 }
1220
1221 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
1222 unimplemented!()
1223 }
1224
1225 fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef {
1226 unimplemented!()
1227 }
1228
1229 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1230 &self.runtime_env
1231 }
1232
1233 fn execution_props(&self) -> &ExecutionProps {
1234 unimplemented!()
1235 }
1236
1237 fn as_any(&self) -> &dyn Any {
1238 unimplemented!()
1239 }
1240
1241 fn table_options(&self) -> &TableOptions {
1242 unimplemented!()
1243 }
1244
1245 fn table_options_mut(&mut self) -> &mut TableOptions {
1246 unimplemented!()
1247 }
1248
1249 fn task_ctx(&self) -> Arc<TaskContext> {
1250 unimplemented!()
1251 }
1252 }
1253}