datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
35/// for more information on the supported expressions
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38    /// A URL that identifies a file or directory to list files from
39    url: Url,
40    /// The path prefix
41    prefix: Path,
42    /// An optional glob expression used to filter files
43    glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47    /// Parse a provided string as a `ListingTableUrl`
48    ///
49    /// A URL can either refer to a single object, or a collection of objects with a
50    /// common prefix, with the presence of a trailing `/` indicating a collection.
51    ///
52    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
53    /// `file:///foo/` refers to all the files under the directory `/foo` and its
54    /// subdirectories.
55    ///
56    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
57    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
58    /// S3 bucket `BUCKET`
59    ///
60    /// # URL Encoding
61    ///
62    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
63    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
64    ///
65    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
66    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
67    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
68    /// when using the aws-cli.
69    ///
70    /// # Paths without a Scheme
71    ///
72    /// If no scheme is provided, or the string is an absolute filesystem path
73    /// as determined by [`std::path::Path::is_absolute`], the string will be
74    /// interpreted as a path on the local filesystem using the operating
75    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
76    ///
77    /// If the path contains any of `'?', '*', '['`, it will be considered
78    /// a glob expression and resolved as described in the section below.
79    ///
80    /// Otherwise, the path will be resolved to an absolute path based on the current
81    /// working directory, and converted to a [file URI].
82    ///
83    /// If the path already exists in the local filesystem this will be used to determine if this
84    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
85    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
86    /// of ambiguity it is recommended users always include trailing `/` when intending to
87    /// refer to a directory.
88    ///
89    /// ## Glob File Paths
90    ///
91    /// If no scheme is provided, and the path contains a glob expression, it will
92    /// be resolved as follows.
93    ///
94    /// The string up to the first path segment containing a glob expression will be extracted,
95    /// and resolved in the same manner as a normal scheme-less path above.
96    ///
97    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
98    /// filter when listing files from object storage
99    ///
100    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
101    /// [URL]: https://url.spec.whatwg.org/
102    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103        let s = s.as_ref();
104
105        // This is necessary to handle the case of a path starting with a drive letter
106        #[cfg(not(target_arch = "wasm32"))]
107        if std::path::Path::new(s).is_absolute() {
108            return Self::parse_path(s);
109        }
110
111        match Url::parse(s) {
112            Ok(url) => Self::try_new(url, None),
113            #[cfg(not(target_arch = "wasm32"))]
114            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115            Err(e) => Err(DataFusionError::External(Box::new(e))),
116        }
117    }
118
119    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
120    #[cfg(not(target_arch = "wasm32"))]
121    fn parse_path(s: &str) -> Result<Self> {
122        let (path, glob) = match split_glob_expression(s) {
123            Some((prefix, glob)) => {
124                let glob = Pattern::new(glob)
125                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
126                (prefix, Some(glob))
127            }
128            None => (s, None),
129        };
130
131        let url = url_from_filesystem_path(path).ok_or_else(|| {
132            DataFusionError::External(
133                format!("Failed to convert path to URL: {path}").into(),
134            )
135        })?;
136
137        Self::try_new(url, glob)
138    }
139
140    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
141    ///
142    /// [`Self::parse`] supports glob expression only for file system paths.
143    /// However, some applications may want to support glob expression for URLs with a scheme.
144    /// The application can split the URL into a base URL and a glob expression and use this method
145    /// to create a [`ListingTableUrl`].
146    pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147        let prefix = Path::from_url_path(url.path())?;
148        Ok(Self { url, prefix, glob })
149    }
150
151    /// Returns the URL scheme
152    pub fn scheme(&self) -> &str {
153        self.url.scheme()
154    }
155
156    /// Return the URL path not excluding any glob expression
157    ///
158    /// If [`Self::is_collection`], this is the listing prefix
159    /// Otherwise, this is the path to the object
160    pub fn prefix(&self) -> &Path {
161        &self.prefix
162    }
163
164    /// Returns `true` if `path` matches this [`ListingTableUrl`]
165    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166        let Some(all_segments) = self.strip_prefix(path) else {
167            return false;
168        };
169
170        // remove any segments that contain `=` as they are allowed even
171        // when ignore subdirectories is `true`.
172        let mut segments = all_segments.filter(|s| !s.contains('='));
173
174        match &self.glob {
175            Some(glob) => {
176                if ignore_subdirectory {
177                    segments
178                        .next()
179                        .is_some_and(|file_name| glob.matches(file_name))
180                } else {
181                    let stripped = segments.join(DELIMITER);
182                    glob.matches(&stripped)
183                }
184            }
185            // where we are ignoring subdirectories, we require
186            // the path to be either empty, or contain just the
187            // final file name segment.
188            None if ignore_subdirectory => segments.count() <= 1,
189            // in this case, any valid path at or below the url is allowed
190            None => true,
191        }
192    }
193
194    /// Returns `true` if `path` refers to a collection of objects
195    pub fn is_collection(&self) -> bool {
196        self.url.path().ends_with(DELIMITER)
197    }
198
199    /// Returns the file extension of the last path segment if it exists
200    ///
201    /// Examples:
202    /// ```rust
203    /// use datafusion_datasource::ListingTableUrl;
204    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
205    /// assert_eq!(url.file_extension(), Some("csv"));
206    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
207    /// assert_eq!(url.file_extension(), None);
208    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
209    /// assert_eq!(url.file_extension(), None);
210    /// ```
211    pub fn file_extension(&self) -> Option<&str> {
212        if let Some(mut segments) = self.url.path_segments() {
213            if let Some(last_segment) = segments.next_back() {
214                if last_segment.contains(".") && !last_segment.ends_with(".") {
215                    return last_segment.split('.').next_back();
216                }
217            }
218        }
219
220        None
221    }
222
223    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
224    /// an iterator of the remaining path segments
225    pub fn strip_prefix<'a, 'b: 'a>(
226        &'a self,
227        path: &'b Path,
228    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231            stripped = stripped.strip_prefix(DELIMITER)?;
232        }
233        Some(stripped.split_terminator(DELIMITER))
234    }
235
236    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
237    pub async fn list_all_files<'a>(
238        &'a self,
239        ctx: &'a dyn Session,
240        store: &'a dyn ObjectStore,
241        file_extension: &'a str,
242    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243        let exec_options = &ctx.config_options().execution;
244        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245
246        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247            list_with_cache(ctx, store, &self.prefix).await?
248        } else {
249            match store.head(&self.prefix).await {
250                Ok(meta) => futures::stream::once(async { Ok(meta) })
251                    .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252                    .boxed(),
253                // If the head command fails, it is likely that object doesn't exist.
254                // Retry as though it were a prefix (aka a collection)
255                Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
256            }
257        };
258
259        Ok(list
260            .try_filter(move |meta| {
261                let path = &meta.location;
262                let extension_match = path.as_ref().ends_with(file_extension);
263                let glob_match = self.contains(path, ignore_subdirectory);
264                futures::future::ready(extension_match && glob_match)
265            })
266            .boxed())
267    }
268
269    /// Returns this [`ListingTableUrl`] as a string
270    pub fn as_str(&self) -> &str {
271        self.as_ref()
272    }
273
274    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
275    pub fn object_store(&self) -> ObjectStoreUrl {
276        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
277        ObjectStoreUrl::parse(url).unwrap()
278    }
279
280    /// Returns true if the [`ListingTableUrl`] points to the folder
281    pub fn is_folder(&self) -> bool {
282        self.url.scheme() == "file" && self.is_collection()
283    }
284
285    /// Return the `url` for [`ListingTableUrl`]
286    pub fn get_url(&self) -> &Url {
287        &self.url
288    }
289
290    /// Return the `glob` for [`ListingTableUrl`]
291    pub fn get_glob(&self) -> &Option<Pattern> {
292        &self.glob
293    }
294
295    /// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
296    pub fn with_glob(self, glob: &str) -> Result<Self> {
297        let glob =
298            Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
299        Self::try_new(self.url, Some(glob))
300    }
301}
302
303async fn list_with_cache<'b>(
304    ctx: &'b dyn Session,
305    store: &'b dyn ObjectStore,
306    prefix: &'b Path,
307) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
308    match ctx.runtime_env().cache_manager.get_list_files_cache() {
309        None => Ok(store
310            .list(Some(prefix))
311            .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
312            .boxed()),
313        Some(cache) => {
314            let vec = if let Some(res) = cache.get(prefix) {
315                debug!("Hit list all files cache");
316                res.as_ref().clone()
317            } else {
318                let vec = store
319                    .list(Some(prefix))
320                    .try_collect::<Vec<ObjectMeta>>()
321                    .await?;
322                cache.put(prefix, Arc::new(vec.clone()));
323                vec
324            };
325            Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
326        }
327    }
328}
329
330/// Creates a file URL from a potentially relative filesystem path
331#[cfg(not(target_arch = "wasm32"))]
332fn url_from_filesystem_path(s: &str) -> Option<Url> {
333    let path = std::path::Path::new(s);
334    let is_dir = match path.exists() {
335        true => path.is_dir(),
336        // Fallback to inferring from trailing separator
337        false => std::path::is_separator(s.chars().last()?),
338    };
339
340    let from_absolute_path = |p| {
341        let first = match is_dir {
342            true => Url::from_directory_path(p).ok(),
343            false => Url::from_file_path(p).ok(),
344        }?;
345
346        // By default from_*_path preserve relative path segments
347        // We therefore parse the URL again to resolve these
348        Url::parse(first.as_str()).ok()
349    };
350
351    if path.is_absolute() {
352        return from_absolute_path(path);
353    }
354
355    let absolute = std::env::current_dir().ok()?.join(path);
356    from_absolute_path(&absolute)
357}
358
359impl AsRef<str> for ListingTableUrl {
360    fn as_ref(&self) -> &str {
361        self.url.as_ref()
362    }
363}
364
365impl AsRef<Url> for ListingTableUrl {
366    fn as_ref(&self) -> &Url {
367        &self.url
368    }
369}
370
371impl std::fmt::Display for ListingTableUrl {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        self.as_str().fmt(f)
374    }
375}
376
377#[cfg(not(target_arch = "wasm32"))]
378const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
379
380/// Splits `path` at the first path segment containing a glob expression, returning
381/// `None` if no glob expression found.
382///
383/// Path delimiters are determined using [`std::path::is_separator`] which
384/// permits `/` as a path delimiter even on Windows platforms.
385///
386#[cfg(not(target_arch = "wasm32"))]
387fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
388    let mut last_separator = 0;
389
390    for (byte_idx, char) in path.char_indices() {
391        if GLOB_START_CHARS.contains(&char) {
392            if last_separator == 0 {
393                return Some((".", path));
394            }
395            return Some(path.split_at(last_separator));
396        }
397
398        if std::path::is_separator(char) {
399            last_separator = byte_idx + char.len_utf8();
400        }
401    }
402    None
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use datafusion_common::config::TableOptions;
409    use datafusion_common::DFSchema;
410    use datafusion_execution::config::SessionConfig;
411    use datafusion_execution::runtime_env::RuntimeEnv;
412    use datafusion_execution::TaskContext;
413    use datafusion_expr::execution_props::ExecutionProps;
414    use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
415    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
416    use datafusion_physical_plan::ExecutionPlan;
417    use object_store::PutPayload;
418    use std::any::Any;
419    use std::collections::HashMap;
420    use tempfile::tempdir;
421
422    #[test]
423    fn test_prefix_path() {
424        let root = std::env::current_dir().unwrap();
425        let root = root.to_string_lossy();
426
427        let url = ListingTableUrl::parse(root).unwrap();
428        let child = url.prefix.child("partition").child("file");
429
430        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
431        assert_eq!(prefix, vec!["partition", "file"]);
432
433        let url = ListingTableUrl::parse("file:///").unwrap();
434        let child = Path::parse("/foo/bar").unwrap();
435        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
436        assert_eq!(prefix, vec!["foo", "bar"]);
437
438        let url = ListingTableUrl::parse("file:///foo").unwrap();
439        let child = Path::parse("/foob/bar").unwrap();
440        assert!(url.strip_prefix(&child).is_none());
441
442        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
443        let child = Path::parse("/foo/file").unwrap();
444        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
445
446        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
447        assert_eq!(url.prefix.as_ref(), "foo/ bar");
448
449        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
450        assert_eq!(url.prefix.as_ref(), "foo/bar");
451
452        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
453        assert_eq!(url.prefix.as_ref(), "foo/😺");
454
455        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
456        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
457
458        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
459        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
460
461        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
462        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
463
464        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
465        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
466
467        let dir = tempdir().unwrap();
468        let path = dir.path().join("bar%2Ffoo");
469        std::fs::File::create(&path).unwrap();
470
471        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
472        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
473
474        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
475        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
476
477        let url =
478            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
479        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
480
481        let workdir = std::env::current_dir().unwrap();
482        let t = workdir.join("non-existent");
483        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
484        let b = ListingTableUrl::parse("non-existent").unwrap();
485        assert_eq!(a, b);
486        assert!(a.prefix.as_ref().ends_with("non-existent"));
487
488        let t = workdir.parent().unwrap();
489        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
490        let b = ListingTableUrl::parse("..").unwrap();
491        assert_eq!(a, b);
492
493        let t = t.join("bar");
494        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
495        let b = ListingTableUrl::parse("../bar").unwrap();
496        assert_eq!(a, b);
497        assert!(a.prefix.as_ref().ends_with("bar"));
498
499        let t = t.join(".").join("foo").join("..").join("baz");
500        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
501        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
502        assert_eq!(a, b);
503        assert!(a.prefix.as_ref().ends_with("bar/baz"));
504    }
505
506    #[test]
507    fn test_prefix_s3() {
508        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
509        assert_eq!(url.prefix.as_ref(), "foo/bar");
510
511        let path = Path::from("foo/bar/partition/foo.parquet");
512        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
513        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
514
515        let path = Path::from("other/bar/partition/foo.parquet");
516        assert!(url.strip_prefix(&path).is_none());
517    }
518
519    #[test]
520    fn test_split_glob() {
521        fn test(input: &str, expected: Option<(&str, &str)>) {
522            assert_eq!(
523                split_glob_expression(input),
524                expected,
525                "testing split_glob_expression with {input}"
526            );
527        }
528
529        // no glob patterns
530        test("/", None);
531        test("/a.txt", None);
532        test("/a", None);
533        test("/a/", None);
534        test("/a/b", None);
535        test("/a/b/", None);
536        test("/a/b.txt", None);
537        test("/a/b/c.txt", None);
538        // glob patterns, thus we build the longest path (os-specific)
539        test("*.txt", Some((".", "*.txt")));
540        test("/*.txt", Some(("/", "*.txt")));
541        test("/a/*b.txt", Some(("/a/", "*b.txt")));
542        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
543        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
544        test("/a/b*.txt", Some(("/a/", "b*.txt")));
545        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
546
547        // https://github.com/apache/datafusion/issues/2465
548        test(
549            "/a/b/c//alltypes_plain*.parquet",
550            Some(("/a/b/c//", "alltypes_plain*.parquet")),
551        );
552    }
553
554    #[test]
555    fn test_is_collection() {
556        fn test(input: &str, expected: bool, message: &str) {
557            let url = ListingTableUrl::parse(input).unwrap();
558            assert_eq!(url.is_collection(), expected, "{message}");
559        }
560
561        test("https://a.b.c/path/", true, "path ends with / - collection");
562        test(
563            "https://a.b.c/path/?a=b",
564            true,
565            "path ends with / - with query args - collection",
566        );
567        test(
568            "https://a.b.c/path?a=b/",
569            false,
570            "path not ends with / - query ends with / - not collection",
571        );
572        test(
573            "https://a.b.c/path/#a=b",
574            true,
575            "path ends with / - with fragment - collection",
576        );
577        test(
578            "https://a.b.c/path#a=b/",
579            false,
580            "path not ends with / - fragment ends with / - not collection",
581        );
582    }
583
584    #[test]
585    fn test_file_extension() {
586        fn test(input: &str, expected: Option<&str>, message: &str) {
587            let url = ListingTableUrl::parse(input).unwrap();
588            assert_eq!(url.file_extension(), expected, "{message}");
589        }
590
591        test("https://a.b.c/path/", None, "path ends with / - not a file");
592        test(
593            "https://a.b.c/path/?a=b",
594            None,
595            "path ends with / - with query args - not a file",
596        );
597        test(
598            "https://a.b.c/path?a=b/",
599            None,
600            "path not ends with / - query ends with / but no file extension",
601        );
602        test(
603            "https://a.b.c/path/#a=b",
604            None,
605            "path ends with / - with fragment - not a file",
606        );
607        test(
608            "https://a.b.c/path#a=b/",
609            None,
610            "path not ends with / - fragment ends with / but no file extension",
611        );
612        test(
613            "file///some/path/",
614            None,
615            "file path ends with / - not a file",
616        );
617        test(
618            "file///some/path/file",
619            None,
620            "file path does not end with - no extension",
621        );
622        test(
623            "file///some/path/file.",
624            None,
625            "file path ends with . - no value after .",
626        );
627        test(
628            "file///some/path/file.ext",
629            Some("ext"),
630            "file path ends with .ext - extension is ext",
631        );
632    }
633
634    #[tokio::test]
635    async fn test_list_files() {
636        let store = object_store::memory::InMemory::new();
637        // Create some files:
638        create_file(&store, "a.parquet").await;
639        create_file(&store, "/t/b.parquet").await;
640        create_file(&store, "/t/c.csv").await;
641        create_file(&store, "/t/d.csv").await;
642
643        assert_eq!(
644            list_all_files("/", &store, "parquet").await,
645            vec!["a.parquet"],
646        );
647
648        // test with and without trailing slash
649        assert_eq!(
650            list_all_files("/t/", &store, "parquet").await,
651            vec!["t/b.parquet"],
652        );
653        assert_eq!(
654            list_all_files("/t", &store, "parquet").await,
655            vec!["t/b.parquet"],
656        );
657
658        // test with and without trailing slash
659        assert_eq!(
660            list_all_files("/t", &store, "csv").await,
661            vec!["t/c.csv", "t/d.csv"],
662        );
663        assert_eq!(
664            list_all_files("/t/", &store, "csv").await,
665            vec!["t/c.csv", "t/d.csv"],
666        );
667
668        // Test a non existing prefix
669        assert_eq!(
670            list_all_files("/NonExisting", &store, "csv").await,
671            vec![] as Vec<String>
672        );
673        assert_eq!(
674            list_all_files("/NonExisting/", &store, "csv").await,
675            vec![] as Vec<String>
676        );
677    }
678
679    /// Creates a file with "hello world" content at the specified path
680    async fn create_file(object_store: &dyn ObjectStore, path: &str) {
681        object_store
682            .put(&Path::from(path), PutPayload::from_static(b"hello world"))
683            .await
684            .expect("failed to create test file");
685    }
686
687    /// Runs "list_all_files" and returns their paths
688    ///
689    /// Panic's on error
690    async fn list_all_files(
691        url: &str,
692        store: &dyn ObjectStore,
693        file_extension: &str,
694    ) -> Vec<String> {
695        try_list_all_files(url, store, file_extension)
696            .await
697            .unwrap()
698    }
699
700    /// Runs "list_all_files" and returns their paths
701    async fn try_list_all_files(
702        url: &str,
703        store: &dyn ObjectStore,
704        file_extension: &str,
705    ) -> Result<Vec<String>> {
706        let session = MockSession::new();
707        let url = ListingTableUrl::parse(url)?;
708        let files = url
709            .list_all_files(&session, store, file_extension)
710            .await?
711            .try_collect::<Vec<_>>()
712            .await?
713            .into_iter()
714            .map(|meta| meta.location.as_ref().to_string())
715            .collect();
716        Ok(files)
717    }
718
719    struct MockSession {
720        config: SessionConfig,
721        runtime_env: Arc<RuntimeEnv>,
722    }
723
724    impl MockSession {
725        fn new() -> Self {
726            Self {
727                config: SessionConfig::new(),
728                runtime_env: Arc::new(RuntimeEnv::default()),
729            }
730        }
731    }
732
733    #[async_trait::async_trait]
734    impl Session for MockSession {
735        fn session_id(&self) -> &str {
736            unimplemented!()
737        }
738
739        fn config(&self) -> &SessionConfig {
740            &self.config
741        }
742
743        async fn create_physical_plan(
744            &self,
745            _logical_plan: &LogicalPlan,
746        ) -> Result<Arc<dyn ExecutionPlan>> {
747            unimplemented!()
748        }
749
750        fn create_physical_expr(
751            &self,
752            _expr: Expr,
753            _df_schema: &DFSchema,
754        ) -> Result<Arc<dyn PhysicalExpr>> {
755            unimplemented!()
756        }
757
758        fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
759            unimplemented!()
760        }
761
762        fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
763            unimplemented!()
764        }
765
766        fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
767            unimplemented!()
768        }
769
770        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
771            &self.runtime_env
772        }
773
774        fn execution_props(&self) -> &ExecutionProps {
775            unimplemented!()
776        }
777
778        fn as_any(&self) -> &dyn Any {
779            unimplemented!()
780        }
781
782        fn table_options(&self) -> &TableOptions {
783            unimplemented!()
784        }
785
786        fn table_options_mut(&mut self) -> &mut TableOptions {
787            unimplemented!()
788        }
789
790        fn task_ctx(&self) -> Arc<TaskContext> {
791            unimplemented!()
792        }
793    }
794}