1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result, TableReference};
21use datafusion_execution::cache::TableScopedPath;
22use datafusion_execution::cache::cache_manager::CachedFileList;
23use datafusion_execution::object_store::ObjectStoreUrl;
24use datafusion_session::Session;
25
26use futures::stream::BoxStream;
27use futures::{StreamExt, TryStreamExt};
28use glob::Pattern;
29use itertools::Itertools;
30use log::debug;
31use object_store::path::DELIMITER;
32use object_store::path::Path;
33use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
34use url::Url;
35
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
39pub struct ListingTableUrl {
40 url: Url,
42 prefix: Path,
44 glob: Option<Pattern>,
46 table_ref: Option<TableReference>,
48}
49
50impl ListingTableUrl {
51 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
107 let s = s.as_ref();
108
109 #[cfg(not(target_arch = "wasm32"))]
111 if std::path::Path::new(s).is_absolute() {
112 return Self::parse_path(s);
113 }
114
115 match Url::parse(s) {
116 Ok(url) => Self::try_new(url, None),
117 #[cfg(not(target_arch = "wasm32"))]
118 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
119 Err(e) => Err(DataFusionError::External(Box::new(e))),
120 }
121 }
122
123 #[cfg(not(target_arch = "wasm32"))]
125 fn parse_path(s: &str) -> Result<Self> {
126 let (path, glob) = match split_glob_expression(s) {
127 Some((prefix, glob)) => {
128 let glob = Pattern::new(glob)
129 .map_err(|e| DataFusionError::External(Box::new(e)))?;
130 (prefix, Some(glob))
131 }
132 None => (s, None),
133 };
134
135 let url = url_from_filesystem_path(path).ok_or_else(|| {
136 DataFusionError::External(
137 format!("Failed to convert path to URL: {path}").into(),
138 )
139 })?;
140
141 Self::try_new(url, glob)
142 }
143
144 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
151 let prefix = Path::from_url_path(url.path())?;
152 Ok(Self {
153 url,
154 prefix,
155 glob,
156 table_ref: None,
157 })
158 }
159
160 pub fn scheme(&self) -> &str {
162 self.url.scheme()
163 }
164
165 pub fn prefix(&self) -> &Path {
170 &self.prefix
171 }
172
173 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
175 let Some(all_segments) = self.strip_prefix(path) else {
176 return false;
177 };
178
179 let mut segments = all_segments.filter(|s| !s.contains('='));
182
183 match &self.glob {
184 Some(glob) => {
185 if ignore_subdirectory {
186 segments
187 .next()
188 .is_some_and(|file_name| glob.matches(file_name))
189 } else {
190 let stripped = segments.join(DELIMITER);
191 glob.matches(&stripped)
192 }
193 }
194 None if ignore_subdirectory => segments.count() <= 1,
198 None => true,
200 }
201 }
202
203 pub fn is_collection(&self) -> bool {
205 self.url.path().ends_with(DELIMITER)
206 }
207
208 pub fn file_extension(&self) -> Option<&str> {
221 if let Some(mut segments) = self.url.path_segments()
222 && let Some(last_segment) = segments.next_back()
223 && last_segment.contains(".")
224 && !last_segment.ends_with(".")
225 {
226 return last_segment.split('.').next_back();
227 }
228
229 None
230 }
231
232 pub fn strip_prefix<'a, 'b: 'a>(
235 &'a self,
236 path: &'b Path,
237 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
238 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
239 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
240 stripped = stripped.strip_prefix(DELIMITER)?;
241 }
242 Some(stripped.split_terminator(DELIMITER))
243 }
244
245 pub async fn list_prefixed_files<'a>(
248 &'a self,
249 ctx: &'a dyn Session,
250 store: &'a dyn ObjectStore,
251 prefix: Option<Path>,
252 file_extension: &'a str,
253 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
254 let exec_options = &ctx.config_options().execution;
255 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
256
257 let full_prefix = if let Some(ref p) = prefix {
259 let mut parts = self.prefix.parts().collect::<Vec<_>>();
260 parts.extend(p.parts());
261 Path::from_iter(parts.into_iter())
262 } else {
263 self.prefix.clone()
264 };
265
266 let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
267 list_with_cache(
268 ctx,
269 store,
270 self.table_ref.as_ref(),
271 &self.prefix,
272 prefix.as_ref(),
273 )
274 .await?
275 } else {
276 match store.head(&full_prefix).await {
277 Ok(meta) => futures::stream::once(async { Ok(meta) })
278 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
279 .boxed(),
280 Err(object_store::Error::NotFound { .. }) => {
283 list_with_cache(
284 ctx,
285 store,
286 self.table_ref.as_ref(),
287 &self.prefix,
288 prefix.as_ref(),
289 )
290 .await?
291 }
292 Err(e) => return Err(e.into()),
293 }
294 };
295
296 Ok(list
297 .try_filter(move |meta| {
298 let path = &meta.location;
299 let extension_match = path.as_ref().ends_with(file_extension);
300 let glob_match = self.contains(path, ignore_subdirectory);
301 futures::future::ready(extension_match && glob_match)
302 })
303 .boxed())
304 }
305
306 pub async fn list_all_files<'a>(
308 &'a self,
309 ctx: &'a dyn Session,
310 store: &'a dyn ObjectStore,
311 file_extension: &'a str,
312 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
313 self.list_prefixed_files(ctx, store, None, file_extension)
314 .await
315 }
316
317 pub fn as_str(&self) -> &str {
319 self.as_ref()
320 }
321
322 pub fn object_store(&self) -> ObjectStoreUrl {
324 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
325 ObjectStoreUrl::parse(url).unwrap()
326 }
327
328 pub fn is_folder(&self) -> bool {
330 self.url.scheme() == "file" && self.is_collection()
331 }
332
333 pub fn get_url(&self) -> &Url {
335 &self.url
336 }
337
338 pub fn get_glob(&self) -> &Option<Pattern> {
340 &self.glob
341 }
342
343 pub fn with_glob(mut self, glob: &str) -> Result<Self> {
345 self.glob =
346 Some(Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?);
347 Ok(self)
348 }
349
350 pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
352 self.table_ref = Some(table_ref);
353 self
354 }
355
356 pub fn get_table_ref(&self) -> &Option<TableReference> {
358 &self.table_ref
359 }
360}
361
362async fn list_with_cache<'b>(
380 ctx: &'b dyn Session,
381 store: &'b dyn ObjectStore,
382 table_ref: Option<&TableReference>,
383 table_base_path: &Path,
384 prefix: Option<&Path>,
385) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
386 let full_prefix = match prefix {
388 Some(p) => {
389 let mut parts: Vec<_> = table_base_path.parts().collect();
390 parts.extend(p.parts());
391 Path::from_iter(parts)
392 }
393 None => table_base_path.clone(),
394 };
395
396 match ctx.runtime_env().cache_manager.get_list_files_cache() {
397 None => Ok(store
398 .list(Some(&full_prefix))
399 .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
400 .boxed()),
401 Some(cache) => {
402 let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
404
405 let table_scoped_base_path = TableScopedPath {
406 table: table_ref.cloned(),
407 path: table_base_path.clone(),
408 };
409
410 let vec = if let Some(cached) = cache.get(&table_scoped_base_path) {
412 debug!("Hit list files cache");
413 cached.files_matching_prefix(&filter_prefix)
414 } else {
415 let mut vec = store
418 .list(Some(table_base_path))
419 .try_collect::<Vec<ObjectMeta>>()
420 .await?;
421 vec.shrink_to_fit(); let cached: CachedFileList = vec.into();
423 let result = cached.files_matching_prefix(&filter_prefix);
424 cache.put(&table_scoped_base_path, cached);
425 result
426 };
427 Ok(
428 futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
429 .boxed(),
430 )
431 }
432 }
433}
434
435#[cfg(not(target_arch = "wasm32"))]
437fn url_from_filesystem_path(s: &str) -> Option<Url> {
438 let path = std::path::Path::new(s);
439 let is_dir = match path.exists() {
440 true => path.is_dir(),
441 false => std::path::is_separator(s.chars().last()?),
443 };
444
445 let from_absolute_path = |p| {
446 let first = match is_dir {
447 true => Url::from_directory_path(p).ok(),
448 false => Url::from_file_path(p).ok(),
449 }?;
450
451 Url::parse(first.as_str()).ok()
454 };
455
456 if path.is_absolute() {
457 return from_absolute_path(path);
458 }
459
460 let absolute = std::env::current_dir().ok()?.join(path);
461 from_absolute_path(&absolute)
462}
463
464impl AsRef<str> for ListingTableUrl {
465 fn as_ref(&self) -> &str {
466 self.url.as_ref()
467 }
468}
469
470impl AsRef<Url> for ListingTableUrl {
471 fn as_ref(&self) -> &Url {
472 &self.url
473 }
474}
475
476impl std::fmt::Display for ListingTableUrl {
477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478 self.as_str().fmt(f)
479 }
480}
481
482#[cfg(not(target_arch = "wasm32"))]
483const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
484
485#[cfg(not(target_arch = "wasm32"))]
491fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
492 let mut last_separator = 0;
493
494 for (byte_idx, char) in path.char_indices() {
495 if GLOB_START_CHARS.contains(&char) {
496 if last_separator == 0 {
497 return Some((".", path));
498 }
499 return Some(path.split_at(last_separator));
500 }
501
502 if std::path::is_separator(char) {
503 last_separator = byte_idx + char.len_utf8();
504 }
505 }
506 None
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use async_trait::async_trait;
513 use bytes::Bytes;
514 use datafusion_common::DFSchema;
515 use datafusion_common::config::TableOptions;
516 use datafusion_execution::TaskContext;
517 use datafusion_execution::config::SessionConfig;
518 use datafusion_execution::runtime_env::RuntimeEnv;
519 use datafusion_expr::execution_props::ExecutionProps;
520 use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
521 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
522 use datafusion_physical_plan::ExecutionPlan;
523 use object_store::{
524 CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
525 PutMultipartOptions, PutPayload,
526 };
527 use std::any::Any;
528 use std::collections::HashMap;
529 use std::ops::Range;
530 use std::sync::Arc;
531 use tempfile::tempdir;
532
533 #[test]
534 fn test_prefix_path() {
535 let root = std::env::current_dir().unwrap();
536 let root = root.to_string_lossy();
537
538 let url = ListingTableUrl::parse(root).unwrap();
539 let child = url.prefix.child("partition").child("file");
540
541 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
542 assert_eq!(prefix, vec!["partition", "file"]);
543
544 let url = ListingTableUrl::parse("file:///").unwrap();
545 let child = Path::parse("/foo/bar").unwrap();
546 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
547 assert_eq!(prefix, vec!["foo", "bar"]);
548
549 let url = ListingTableUrl::parse("file:///foo").unwrap();
550 let child = Path::parse("/foob/bar").unwrap();
551 assert!(url.strip_prefix(&child).is_none());
552
553 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
554 let child = Path::parse("/foo/file").unwrap();
555 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
556
557 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
558 assert_eq!(url.prefix.as_ref(), "foo/ bar");
559
560 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
561 assert_eq!(url.prefix.as_ref(), "foo/bar");
562
563 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
564 assert_eq!(url.prefix.as_ref(), "foo/😺");
565
566 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
567 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
568
569 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
570 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
571
572 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
573 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
574
575 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
576 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
577
578 let dir = tempdir().unwrap();
579 let path = dir.path().join("bar%2Ffoo");
580 std::fs::File::create(&path).unwrap();
581
582 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
583 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
584
585 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
586 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
587
588 let url =
589 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
590 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
591
592 let workdir = std::env::current_dir().unwrap();
593 let t = workdir.join("non-existent");
594 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
595 let b = ListingTableUrl::parse("non-existent").unwrap();
596 assert_eq!(a, b);
597 assert!(a.prefix.as_ref().ends_with("non-existent"));
598
599 let t = workdir.parent().unwrap();
600 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
601 let b = ListingTableUrl::parse("..").unwrap();
602 assert_eq!(a, b);
603
604 let t = t.join("bar");
605 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
606 let b = ListingTableUrl::parse("../bar").unwrap();
607 assert_eq!(a, b);
608 assert!(a.prefix.as_ref().ends_with("bar"));
609
610 let t = t.join(".").join("foo").join("..").join("baz");
611 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
612 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
613 assert_eq!(a, b);
614 assert!(a.prefix.as_ref().ends_with("bar/baz"));
615 }
616
617 #[test]
618 fn test_prefix_s3() {
619 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
620 assert_eq!(url.prefix.as_ref(), "foo/bar");
621
622 let path = Path::from("foo/bar/partition/foo.parquet");
623 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
624 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
625
626 let path = Path::from("other/bar/partition/foo.parquet");
627 assert!(url.strip_prefix(&path).is_none());
628 }
629
630 #[test]
631 fn test_split_glob() {
632 fn test(input: &str, expected: Option<(&str, &str)>) {
633 assert_eq!(
634 split_glob_expression(input),
635 expected,
636 "testing split_glob_expression with {input}"
637 );
638 }
639
640 test("/", None);
642 test("/a.txt", None);
643 test("/a", None);
644 test("/a/", None);
645 test("/a/b", None);
646 test("/a/b/", None);
647 test("/a/b.txt", None);
648 test("/a/b/c.txt", None);
649 test("*.txt", Some((".", "*.txt")));
651 test("/*.txt", Some(("/", "*.txt")));
652 test("/a/*b.txt", Some(("/a/", "*b.txt")));
653 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
654 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
655 test("/a/b*.txt", Some(("/a/", "b*.txt")));
656 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
657
658 test(
660 "/a/b/c//alltypes_plain*.parquet",
661 Some(("/a/b/c//", "alltypes_plain*.parquet")),
662 );
663 }
664
665 #[test]
666 fn test_is_collection() {
667 fn test(input: &str, expected: bool, message: &str) {
668 let url = ListingTableUrl::parse(input).unwrap();
669 assert_eq!(url.is_collection(), expected, "{message}");
670 }
671
672 test("https://a.b.c/path/", true, "path ends with / - collection");
673 test(
674 "https://a.b.c/path/?a=b",
675 true,
676 "path ends with / - with query args - collection",
677 );
678 test(
679 "https://a.b.c/path?a=b/",
680 false,
681 "path not ends with / - query ends with / - not collection",
682 );
683 test(
684 "https://a.b.c/path/#a=b",
685 true,
686 "path ends with / - with fragment - collection",
687 );
688 test(
689 "https://a.b.c/path#a=b/",
690 false,
691 "path not ends with / - fragment ends with / - not collection",
692 );
693 }
694
695 #[test]
696 fn test_file_extension() {
697 fn test(input: &str, expected: Option<&str>, message: &str) {
698 let url = ListingTableUrl::parse(input).unwrap();
699 assert_eq!(url.file_extension(), expected, "{message}");
700 }
701
702 test("https://a.b.c/path/", None, "path ends with / - not a file");
703 test(
704 "https://a.b.c/path/?a=b",
705 None,
706 "path ends with / - with query args - not a file",
707 );
708 test(
709 "https://a.b.c/path?a=b/",
710 None,
711 "path not ends with / - query ends with / but no file extension",
712 );
713 test(
714 "https://a.b.c/path/#a=b",
715 None,
716 "path ends with / - with fragment - not a file",
717 );
718 test(
719 "https://a.b.c/path#a=b/",
720 None,
721 "path not ends with / - fragment ends with / but no file extension",
722 );
723 test(
724 "file///some/path/",
725 None,
726 "file path ends with / - not a file",
727 );
728 test(
729 "file///some/path/file",
730 None,
731 "file path does not end with - no extension",
732 );
733 test(
734 "file///some/path/file.",
735 None,
736 "file path ends with . - no value after .",
737 );
738 test(
739 "file///some/path/file.ext",
740 Some("ext"),
741 "file path ends with .ext - extension is ext",
742 );
743 }
744
745 #[tokio::test]
746 async fn test_list_files() -> Result<()> {
747 let store = MockObjectStore {
748 in_mem: object_store::memory::InMemory::new(),
749 forbidden_paths: vec!["forbidden/e.parquet".into()],
750 };
751
752 create_file(&store, "a.parquet").await;
754 create_file(&store, "/t/b.parquet").await;
755 create_file(&store, "/t/c.csv").await;
756 create_file(&store, "/t/d.csv").await;
757
758 create_file(&store, "/forbidden/e.parquet").await;
760
761 assert_eq!(
762 list_all_files("/", &store, "parquet").await?,
763 vec!["a.parquet"],
764 );
765
766 assert_eq!(
768 list_all_files("/t/", &store, "parquet").await?,
769 vec!["t/b.parquet"],
770 );
771 assert_eq!(
772 list_all_files("/t", &store, "parquet").await?,
773 vec!["t/b.parquet"],
774 );
775
776 assert_eq!(
778 list_all_files("/t", &store, "csv").await?,
779 vec!["t/c.csv", "t/d.csv"],
780 );
781 assert_eq!(
782 list_all_files("/t/", &store, "csv").await?,
783 vec!["t/c.csv", "t/d.csv"],
784 );
785
786 assert_eq!(
788 list_all_files("/NonExisting", &store, "csv").await?,
789 vec![] as Vec<String>
790 );
791 assert_eq!(
792 list_all_files("/NonExisting/", &store, "csv").await?,
793 vec![] as Vec<String>
794 );
795
796 let Err(DataFusionError::ObjectStore(err)) =
798 list_all_files("/forbidden/e.parquet", &store, "parquet").await
799 else {
800 panic!("Expected ObjectStore error");
801 };
802
803 let object_store::Error::PermissionDenied { .. } = &*err else {
804 panic!("Expected PermissionDenied error");
805 };
806
807 create_file(&store, "/data/a=1/file1.parquet").await;
809 create_file(&store, "/data/a=1/b=100/file2.parquet").await;
810 create_file(&store, "/data/a=2/b=200/file3.parquet").await;
811 create_file(&store, "/data/a=2/b=200/file4.csv").await;
812
813 assert_eq!(
814 list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
815 .await?,
816 vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
817 );
818
819 assert_eq!(
820 list_prefixed_files(
821 "/data/",
822 &store,
823 Some(Path::from("a=1/b=100")),
824 "parquet"
825 )
826 .await?,
827 vec!["data/a=1/b=100/file2.parquet"],
828 );
829
830 assert_eq!(
831 list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
832 .await?,
833 vec!["data/a=2/b=200/file3.parquet"],
834 );
835
836 Ok(())
837 }
838
839 #[tokio::test]
845 async fn test_cache_path_equivalence() -> Result<()> {
846 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
847
848 let store = MockObjectStore {
849 in_mem: object_store::memory::InMemory::new(),
850 forbidden_paths: vec![],
851 };
852
853 create_file(&store, "/table/year=2023/data1.parquet").await;
855 create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
856 create_file(&store, "/table/year=2024/data3.parquet").await;
857 create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
858 create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
859
860 let session_no_cache = MockSession::new();
862
863 let runtime_with_cache = RuntimeEnvBuilder::new()
865 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
867 let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
868
869 let test_cases = vec![
871 ("/table/", None, "full table listing"),
872 (
873 "/table/",
874 Some(Path::from("year=2023")),
875 "single partition filter",
876 ),
877 (
878 "/table/",
879 Some(Path::from("year=2024")),
880 "different partition filter",
881 ),
882 (
883 "/table/",
884 Some(Path::from("year=2024/month=06")),
885 "nested partition filter",
886 ),
887 (
888 "/table/",
889 Some(Path::from("year=2025")),
890 "non-existent partition",
891 ),
892 ];
893
894 for (url_str, prefix, description) in test_cases {
895 let url = ListingTableUrl::parse(url_str)?;
896
897 let mut results_no_cache: Vec<String> = url
899 .list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
900 .await?
901 .try_collect::<Vec<_>>()
902 .await?
903 .into_iter()
904 .map(|m| m.location.to_string())
905 .collect();
906 results_no_cache.sort();
907
908 let mut results_with_cache_miss: Vec<String> = url
910 .list_prefixed_files(
911 &session_with_cache,
912 &store,
913 prefix.clone(),
914 "parquet",
915 )
916 .await?
917 .try_collect::<Vec<_>>()
918 .await?
919 .into_iter()
920 .map(|m| m.location.to_string())
921 .collect();
922 results_with_cache_miss.sort();
923
924 let mut results_with_cache_hit: Vec<String> = url
926 .list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
927 .await?
928 .try_collect::<Vec<_>>()
929 .await?
930 .into_iter()
931 .map(|m| m.location.to_string())
932 .collect();
933 results_with_cache_hit.sort();
934
935 assert_eq!(
937 results_no_cache, results_with_cache_miss,
938 "Cache miss path should match non-cached path for: {description}"
939 );
940 assert_eq!(
941 results_no_cache, results_with_cache_hit,
942 "Cache hit path should match non-cached path for: {description}"
943 );
944 }
945
946 Ok(())
947 }
948
949 #[tokio::test]
951 async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
952 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
953
954 let store = MockObjectStore {
955 in_mem: object_store::memory::InMemory::new(),
956 forbidden_paths: vec![],
957 };
958
959 create_file(&store, "/sales/region=US/q1.parquet").await;
961 create_file(&store, "/sales/region=US/q2.parquet").await;
962 create_file(&store, "/sales/region=EU/q1.parquet").await;
963
964 let runtime = RuntimeEnvBuilder::new()
966 .with_object_list_cache_limit(1024 * 1024) .build_arc()?;
968 let session = MockSession::with_runtime_env(runtime);
969
970 let url = ListingTableUrl::parse("/sales/")?;
971
972 let full_results: Vec<String> = url
974 .list_prefixed_files(&session, &store, None, "parquet")
975 .await?
976 .try_collect::<Vec<_>>()
977 .await?
978 .into_iter()
979 .map(|m| m.location.to_string())
980 .collect();
981 assert_eq!(full_results.len(), 3);
982
983 let mut us_results: Vec<String> = url
985 .list_prefixed_files(
986 &session,
987 &store,
988 Some(Path::from("region=US")),
989 "parquet",
990 )
991 .await?
992 .try_collect::<Vec<_>>()
993 .await?
994 .into_iter()
995 .map(|m| m.location.to_string())
996 .collect();
997 us_results.sort();
998
999 assert_eq!(
1000 us_results,
1001 vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
1002 );
1003
1004 let eu_results: Vec<String> = url
1006 .list_prefixed_files(
1007 &session,
1008 &store,
1009 Some(Path::from("region=EU")),
1010 "parquet",
1011 )
1012 .await?
1013 .try_collect::<Vec<_>>()
1014 .await?
1015 .into_iter()
1016 .map(|m| m.location.to_string())
1017 .collect();
1018
1019 assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
1020
1021 Ok(())
1022 }
1023
1024 async fn create_file(object_store: &dyn ObjectStore, path: &str) {
1026 object_store
1027 .put(&Path::from(path), PutPayload::from_static(b"hello world"))
1028 .await
1029 .expect("failed to create test file");
1030 }
1031
1032 async fn list_all_files(
1036 url: &str,
1037 store: &dyn ObjectStore,
1038 file_extension: &str,
1039 ) -> Result<Vec<String>> {
1040 try_list_prefixed_files(url, store, None, file_extension).await
1041 }
1042
1043 async fn list_prefixed_files(
1047 url: &str,
1048 store: &dyn ObjectStore,
1049 prefix: Option<Path>,
1050 file_extension: &str,
1051 ) -> Result<Vec<String>> {
1052 try_list_prefixed_files(url, store, prefix, file_extension).await
1053 }
1054
1055 async fn try_list_prefixed_files(
1057 url: &str,
1058 store: &dyn ObjectStore,
1059 prefix: Option<Path>,
1060 file_extension: &str,
1061 ) -> Result<Vec<String>> {
1062 let session = MockSession::new();
1063 let url = ListingTableUrl::parse(url)?;
1064 let files = url
1065 .list_prefixed_files(&session, store, prefix, file_extension)
1066 .await?
1067 .try_collect::<Vec<_>>()
1068 .await?
1069 .into_iter()
1070 .map(|meta| meta.location.as_ref().to_string())
1071 .collect();
1072 Ok(files)
1073 }
1074
1075 #[derive(Debug)]
1076 struct MockObjectStore {
1077 in_mem: object_store::memory::InMemory,
1078 forbidden_paths: Vec<Path>,
1079 }
1080
1081 impl std::fmt::Display for MockObjectStore {
1082 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1083 self.in_mem.fmt(f)
1084 }
1085 }
1086
1087 #[async_trait]
1088 impl ObjectStore for MockObjectStore {
1089 async fn put_opts(
1090 &self,
1091 location: &Path,
1092 payload: PutPayload,
1093 opts: object_store::PutOptions,
1094 ) -> object_store::Result<object_store::PutResult> {
1095 self.in_mem.put_opts(location, payload, opts).await
1096 }
1097
1098 async fn put_multipart_opts(
1099 &self,
1100 location: &Path,
1101 opts: PutMultipartOptions,
1102 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1103 self.in_mem.put_multipart_opts(location, opts).await
1104 }
1105
1106 async fn get_opts(
1107 &self,
1108 location: &Path,
1109 options: GetOptions,
1110 ) -> object_store::Result<GetResult> {
1111 if options.head && self.forbidden_paths.contains(location) {
1112 Err(object_store::Error::PermissionDenied {
1113 path: location.to_string(),
1114 source: "forbidden".into(),
1115 })
1116 } else {
1117 self.in_mem.get_opts(location, options).await
1118 }
1119 }
1120
1121 async fn get_ranges(
1122 &self,
1123 location: &Path,
1124 ranges: &[Range<u64>],
1125 ) -> object_store::Result<Vec<Bytes>> {
1126 self.in_mem.get_ranges(location, ranges).await
1127 }
1128
1129 fn delete_stream(
1130 &self,
1131 locations: BoxStream<'static, object_store::Result<Path>>,
1132 ) -> BoxStream<'static, object_store::Result<Path>> {
1133 self.in_mem.delete_stream(locations)
1134 }
1135
1136 fn list(
1137 &self,
1138 prefix: Option<&Path>,
1139 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1140 self.in_mem.list(prefix)
1141 }
1142
1143 async fn list_with_delimiter(
1144 &self,
1145 prefix: Option<&Path>,
1146 ) -> object_store::Result<ListResult> {
1147 self.in_mem.list_with_delimiter(prefix).await
1148 }
1149
1150 async fn copy_opts(
1151 &self,
1152 from: &Path,
1153 to: &Path,
1154 options: CopyOptions,
1155 ) -> object_store::Result<()> {
1156 self.in_mem.copy_opts(from, to, options).await
1157 }
1158 }
1159
1160 struct MockSession {
1161 config: SessionConfig,
1162 runtime_env: Arc<RuntimeEnv>,
1163 }
1164
1165 impl MockSession {
1166 fn new() -> Self {
1167 Self {
1168 config: SessionConfig::new(),
1169 runtime_env: Arc::new(RuntimeEnv::default()),
1170 }
1171 }
1172
1173 fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1175 Self {
1176 config: SessionConfig::new(),
1177 runtime_env,
1178 }
1179 }
1180 }
1181
1182 #[async_trait::async_trait]
1183 impl Session for MockSession {
1184 fn session_id(&self) -> &str {
1185 unimplemented!()
1186 }
1187
1188 fn config(&self) -> &SessionConfig {
1189 &self.config
1190 }
1191
1192 async fn create_physical_plan(
1193 &self,
1194 _logical_plan: &LogicalPlan,
1195 ) -> Result<Arc<dyn ExecutionPlan>> {
1196 unimplemented!()
1197 }
1198
1199 fn create_physical_expr(
1200 &self,
1201 _expr: Expr,
1202 _df_schema: &DFSchema,
1203 ) -> Result<Arc<dyn PhysicalExpr>> {
1204 unimplemented!()
1205 }
1206
1207 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
1208 unimplemented!()
1209 }
1210
1211 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
1212 unimplemented!()
1213 }
1214
1215 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
1216 unimplemented!()
1217 }
1218
1219 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1220 &self.runtime_env
1221 }
1222
1223 fn execution_props(&self) -> &ExecutionProps {
1224 unimplemented!()
1225 }
1226
1227 fn as_any(&self) -> &dyn Any {
1228 unimplemented!()
1229 }
1230
1231 fn table_options(&self) -> &TableOptions {
1232 unimplemented!()
1233 }
1234
1235 fn table_options_mut(&mut self) -> &mut TableOptions {
1236 unimplemented!()
1237 }
1238
1239 fn task_ctx(&self) -> Arc<TaskContext> {
1240 unimplemented!()
1241 }
1242 }
1243}