datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
35/// for more information on the supported expressions
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38    /// A URL that identifies a file or directory to list files from
39    url: Url,
40    /// The path prefix
41    prefix: Path,
42    /// An optional glob expression used to filter files
43    glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47    /// Parse a provided string as a `ListingTableUrl`
48    ///
49    /// A URL can either refer to a single object, or a collection of objects with a
50    /// common prefix, with the presence of a trailing `/` indicating a collection.
51    ///
52    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
53    /// `file:///foo/` refers to all the files under the directory `/foo` and its
54    /// subdirectories.
55    ///
56    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
57    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
58    /// S3 bucket `BUCKET`
59    ///
60    /// # URL Encoding
61    ///
62    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
63    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
64    ///
65    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
66    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
67    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
68    /// when using the aws-cli.
69    ///
70    /// # Paths without a Scheme
71    ///
72    /// If no scheme is provided, or the string is an absolute filesystem path
73    /// as determined by [`std::path::Path::is_absolute`], the string will be
74    /// interpreted as a path on the local filesystem using the operating
75    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
76    ///
77    /// If the path contains any of `'?', '*', '['`, it will be considered
78    /// a glob expression and resolved as described in the section below.
79    ///
80    /// Otherwise, the path will be resolved to an absolute path based on the current
81    /// working directory, and converted to a [file URI].
82    ///
83    /// If the path already exists in the local filesystem this will be used to determine if this
84    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
85    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
86    /// of ambiguity it is recommended users always include trailing `/` when intending to
87    /// refer to a directory.
88    ///
89    /// ## Glob File Paths
90    ///
91    /// If no scheme is provided, and the path contains a glob expression, it will
92    /// be resolved as follows.
93    ///
94    /// The string up to the first path segment containing a glob expression will be extracted,
95    /// and resolved in the same manner as a normal scheme-less path above.
96    ///
97    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
98    /// filter when listing files from object storage
99    ///
100    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
101    /// [URL]: https://url.spec.whatwg.org/
102    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103        let s = s.as_ref();
104
105        // This is necessary to handle the case of a path starting with a drive letter
106        #[cfg(not(target_arch = "wasm32"))]
107        if std::path::Path::new(s).is_absolute() {
108            return Self::parse_path(s);
109        }
110
111        match Url::parse(s) {
112            Ok(url) => Self::try_new(url, None),
113            #[cfg(not(target_arch = "wasm32"))]
114            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115            Err(e) => Err(DataFusionError::External(Box::new(e))),
116        }
117    }
118
119    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
120    #[cfg(not(target_arch = "wasm32"))]
121    fn parse_path(s: &str) -> Result<Self> {
122        let (path, glob) = match split_glob_expression(s) {
123            Some((prefix, glob)) => {
124                let glob = Pattern::new(glob)
125                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
126                (prefix, Some(glob))
127            }
128            None => (s, None),
129        };
130
131        let url = url_from_filesystem_path(path).ok_or_else(|| {
132            DataFusionError::External(
133                format!("Failed to convert path to URL: {path}").into(),
134            )
135        })?;
136
137        Self::try_new(url, glob)
138    }
139
140    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
141    ///
142    /// [`Self::parse`] supports glob expression only for file system paths.
143    /// However, some applications may want to support glob expression for URLs with a scheme.
144    /// The application can split the URL into a base URL and a glob expression and use this method
145    /// to create a [`ListingTableUrl`].
146    pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147        let prefix = Path::from_url_path(url.path())?;
148        Ok(Self { url, prefix, glob })
149    }
150
151    /// Returns the URL scheme
152    pub fn scheme(&self) -> &str {
153        self.url.scheme()
154    }
155
156    /// Return the URL path not excluding any glob expression
157    ///
158    /// If [`Self::is_collection`], this is the listing prefix
159    /// Otherwise, this is the path to the object
160    pub fn prefix(&self) -> &Path {
161        &self.prefix
162    }
163
164    /// Returns `true` if `path` matches this [`ListingTableUrl`]
165    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166        let Some(all_segments) = self.strip_prefix(path) else {
167            return false;
168        };
169
170        // remove any segments that contain `=` as they are allowed even
171        // when ignore subdirectories is `true`.
172        let mut segments = all_segments.filter(|s| !s.contains('='));
173
174        match &self.glob {
175            Some(glob) => {
176                if ignore_subdirectory {
177                    segments
178                        .next()
179                        .is_some_and(|file_name| glob.matches(file_name))
180                } else {
181                    let stripped = segments.join(DELIMITER);
182                    glob.matches(&stripped)
183                }
184            }
185            // where we are ignoring subdirectories, we require
186            // the path to be either empty, or contain just the
187            // final file name segment.
188            None if ignore_subdirectory => segments.count() <= 1,
189            // in this case, any valid path at or below the url is allowed
190            None => true,
191        }
192    }
193
194    /// Returns `true` if `path` refers to a collection of objects
195    pub fn is_collection(&self) -> bool {
196        self.url.path().ends_with(DELIMITER)
197    }
198
199    /// Returns the file extension of the last path segment if it exists
200    ///
201    /// Examples:
202    /// ```rust
203    /// use datafusion_datasource::ListingTableUrl;
204    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
205    /// assert_eq!(url.file_extension(), Some("csv"));
206    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
207    /// assert_eq!(url.file_extension(), None);
208    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
209    /// assert_eq!(url.file_extension(), None);
210    /// ```
211    pub fn file_extension(&self) -> Option<&str> {
212        if let Some(mut segments) = self.url.path_segments() {
213            if let Some(last_segment) = segments.next_back() {
214                if last_segment.contains(".") && !last_segment.ends_with(".") {
215                    return last_segment.split('.').next_back();
216                }
217            }
218        }
219
220        None
221    }
222
223    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
224    /// an iterator of the remaining path segments
225    pub fn strip_prefix<'a, 'b: 'a>(
226        &'a self,
227        path: &'b Path,
228    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231            stripped = stripped.strip_prefix(DELIMITER)?;
232        }
233        Some(stripped.split_terminator(DELIMITER))
234    }
235
236    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
237    pub async fn list_all_files<'a>(
238        &'a self,
239        ctx: &'a dyn Session,
240        store: &'a dyn ObjectStore,
241        file_extension: &'a str,
242    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243        let exec_options = &ctx.config_options().execution;
244        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245
246        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247            list_with_cache(ctx, store, &self.prefix).await?
248        } else {
249            match store.head(&self.prefix).await {
250                Ok(meta) => futures::stream::once(async { Ok(meta) })
251                    .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252                    .boxed(),
253                // If the head command fails, it is likely that object doesn't exist.
254                // Retry as though it were a prefix (aka a collection)
255                Err(object_store::Error::NotFound { .. }) => {
256                    list_with_cache(ctx, store, &self.prefix).await?
257                }
258                Err(e) => return Err(e.into()),
259            }
260        };
261
262        Ok(list
263            .try_filter(move |meta| {
264                let path = &meta.location;
265                let extension_match = path.as_ref().ends_with(file_extension);
266                let glob_match = self.contains(path, ignore_subdirectory);
267                futures::future::ready(extension_match && glob_match)
268            })
269            .boxed())
270    }
271
272    /// Returns this [`ListingTableUrl`] as a string
273    pub fn as_str(&self) -> &str {
274        self.as_ref()
275    }
276
277    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
278    pub fn object_store(&self) -> ObjectStoreUrl {
279        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
280        ObjectStoreUrl::parse(url).unwrap()
281    }
282
283    /// Returns true if the [`ListingTableUrl`] points to the folder
284    pub fn is_folder(&self) -> bool {
285        self.url.scheme() == "file" && self.is_collection()
286    }
287
288    /// Return the `url` for [`ListingTableUrl`]
289    pub fn get_url(&self) -> &Url {
290        &self.url
291    }
292
293    /// Return the `glob` for [`ListingTableUrl`]
294    pub fn get_glob(&self) -> &Option<Pattern> {
295        &self.glob
296    }
297
298    /// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
299    pub fn with_glob(self, glob: &str) -> Result<Self> {
300        let glob =
301            Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
302        Self::try_new(self.url, Some(glob))
303    }
304}
305
306async fn list_with_cache<'b>(
307    ctx: &'b dyn Session,
308    store: &'b dyn ObjectStore,
309    prefix: &'b Path,
310) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
311    match ctx.runtime_env().cache_manager.get_list_files_cache() {
312        None => Ok(store
313            .list(Some(prefix))
314            .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
315            .boxed()),
316        Some(cache) => {
317            let vec = if let Some(res) = cache.get(prefix) {
318                debug!("Hit list all files cache");
319                res.as_ref().clone()
320            } else {
321                let vec = store
322                    .list(Some(prefix))
323                    .try_collect::<Vec<ObjectMeta>>()
324                    .await?;
325                cache.put(prefix, Arc::new(vec.clone()));
326                vec
327            };
328            Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
329        }
330    }
331}
332
333/// Creates a file URL from a potentially relative filesystem path
334#[cfg(not(target_arch = "wasm32"))]
335fn url_from_filesystem_path(s: &str) -> Option<Url> {
336    let path = std::path::Path::new(s);
337    let is_dir = match path.exists() {
338        true => path.is_dir(),
339        // Fallback to inferring from trailing separator
340        false => std::path::is_separator(s.chars().last()?),
341    };
342
343    let from_absolute_path = |p| {
344        let first = match is_dir {
345            true => Url::from_directory_path(p).ok(),
346            false => Url::from_file_path(p).ok(),
347        }?;
348
349        // By default from_*_path preserve relative path segments
350        // We therefore parse the URL again to resolve these
351        Url::parse(first.as_str()).ok()
352    };
353
354    if path.is_absolute() {
355        return from_absolute_path(path);
356    }
357
358    let absolute = std::env::current_dir().ok()?.join(path);
359    from_absolute_path(&absolute)
360}
361
362impl AsRef<str> for ListingTableUrl {
363    fn as_ref(&self) -> &str {
364        self.url.as_ref()
365    }
366}
367
368impl AsRef<Url> for ListingTableUrl {
369    fn as_ref(&self) -> &Url {
370        &self.url
371    }
372}
373
374impl std::fmt::Display for ListingTableUrl {
375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376        self.as_str().fmt(f)
377    }
378}
379
380#[cfg(not(target_arch = "wasm32"))]
381const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
382
383/// Splits `path` at the first path segment containing a glob expression, returning
384/// `None` if no glob expression found.
385///
386/// Path delimiters are determined using [`std::path::is_separator`] which
387/// permits `/` as a path delimiter even on Windows platforms.
388#[cfg(not(target_arch = "wasm32"))]
389fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
390    let mut last_separator = 0;
391
392    for (byte_idx, char) in path.char_indices() {
393        if GLOB_START_CHARS.contains(&char) {
394            if last_separator == 0 {
395                return Some((".", path));
396            }
397            return Some(path.split_at(last_separator));
398        }
399
400        if std::path::is_separator(char) {
401            last_separator = byte_idx + char.len_utf8();
402        }
403    }
404    None
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use async_trait::async_trait;
411    use bytes::Bytes;
412    use datafusion_common::config::TableOptions;
413    use datafusion_common::DFSchema;
414    use datafusion_execution::config::SessionConfig;
415    use datafusion_execution::runtime_env::RuntimeEnv;
416    use datafusion_execution::TaskContext;
417    use datafusion_expr::execution_props::ExecutionProps;
418    use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
419    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
420    use datafusion_physical_plan::ExecutionPlan;
421    use object_store::{
422        GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
423        PutPayload,
424    };
425    use std::any::Any;
426    use std::collections::HashMap;
427    use std::ops::Range;
428    use tempfile::tempdir;
429
430    #[test]
431    fn test_prefix_path() {
432        let root = std::env::current_dir().unwrap();
433        let root = root.to_string_lossy();
434
435        let url = ListingTableUrl::parse(root).unwrap();
436        let child = url.prefix.child("partition").child("file");
437
438        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
439        assert_eq!(prefix, vec!["partition", "file"]);
440
441        let url = ListingTableUrl::parse("file:///").unwrap();
442        let child = Path::parse("/foo/bar").unwrap();
443        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
444        assert_eq!(prefix, vec!["foo", "bar"]);
445
446        let url = ListingTableUrl::parse("file:///foo").unwrap();
447        let child = Path::parse("/foob/bar").unwrap();
448        assert!(url.strip_prefix(&child).is_none());
449
450        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
451        let child = Path::parse("/foo/file").unwrap();
452        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
453
454        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
455        assert_eq!(url.prefix.as_ref(), "foo/ bar");
456
457        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
458        assert_eq!(url.prefix.as_ref(), "foo/bar");
459
460        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
461        assert_eq!(url.prefix.as_ref(), "foo/😺");
462
463        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
464        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
465
466        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
467        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
468
469        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
470        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
471
472        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
473        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
474
475        let dir = tempdir().unwrap();
476        let path = dir.path().join("bar%2Ffoo");
477        std::fs::File::create(&path).unwrap();
478
479        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
480        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
481
482        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
483        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
484
485        let url =
486            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
487        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
488
489        let workdir = std::env::current_dir().unwrap();
490        let t = workdir.join("non-existent");
491        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
492        let b = ListingTableUrl::parse("non-existent").unwrap();
493        assert_eq!(a, b);
494        assert!(a.prefix.as_ref().ends_with("non-existent"));
495
496        let t = workdir.parent().unwrap();
497        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
498        let b = ListingTableUrl::parse("..").unwrap();
499        assert_eq!(a, b);
500
501        let t = t.join("bar");
502        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
503        let b = ListingTableUrl::parse("../bar").unwrap();
504        assert_eq!(a, b);
505        assert!(a.prefix.as_ref().ends_with("bar"));
506
507        let t = t.join(".").join("foo").join("..").join("baz");
508        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
509        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
510        assert_eq!(a, b);
511        assert!(a.prefix.as_ref().ends_with("bar/baz"));
512    }
513
514    #[test]
515    fn test_prefix_s3() {
516        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
517        assert_eq!(url.prefix.as_ref(), "foo/bar");
518
519        let path = Path::from("foo/bar/partition/foo.parquet");
520        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
521        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
522
523        let path = Path::from("other/bar/partition/foo.parquet");
524        assert!(url.strip_prefix(&path).is_none());
525    }
526
527    #[test]
528    fn test_split_glob() {
529        fn test(input: &str, expected: Option<(&str, &str)>) {
530            assert_eq!(
531                split_glob_expression(input),
532                expected,
533                "testing split_glob_expression with {input}"
534            );
535        }
536
537        // no glob patterns
538        test("/", None);
539        test("/a.txt", None);
540        test("/a", None);
541        test("/a/", None);
542        test("/a/b", None);
543        test("/a/b/", None);
544        test("/a/b.txt", None);
545        test("/a/b/c.txt", None);
546        // glob patterns, thus we build the longest path (os-specific)
547        test("*.txt", Some((".", "*.txt")));
548        test("/*.txt", Some(("/", "*.txt")));
549        test("/a/*b.txt", Some(("/a/", "*b.txt")));
550        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
551        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
552        test("/a/b*.txt", Some(("/a/", "b*.txt")));
553        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
554
555        // https://github.com/apache/datafusion/issues/2465
556        test(
557            "/a/b/c//alltypes_plain*.parquet",
558            Some(("/a/b/c//", "alltypes_plain*.parquet")),
559        );
560    }
561
562    #[test]
563    fn test_is_collection() {
564        fn test(input: &str, expected: bool, message: &str) {
565            let url = ListingTableUrl::parse(input).unwrap();
566            assert_eq!(url.is_collection(), expected, "{message}");
567        }
568
569        test("https://a.b.c/path/", true, "path ends with / - collection");
570        test(
571            "https://a.b.c/path/?a=b",
572            true,
573            "path ends with / - with query args - collection",
574        );
575        test(
576            "https://a.b.c/path?a=b/",
577            false,
578            "path not ends with / - query ends with / - not collection",
579        );
580        test(
581            "https://a.b.c/path/#a=b",
582            true,
583            "path ends with / - with fragment - collection",
584        );
585        test(
586            "https://a.b.c/path#a=b/",
587            false,
588            "path not ends with / - fragment ends with / - not collection",
589        );
590    }
591
592    #[test]
593    fn test_file_extension() {
594        fn test(input: &str, expected: Option<&str>, message: &str) {
595            let url = ListingTableUrl::parse(input).unwrap();
596            assert_eq!(url.file_extension(), expected, "{message}");
597        }
598
599        test("https://a.b.c/path/", None, "path ends with / - not a file");
600        test(
601            "https://a.b.c/path/?a=b",
602            None,
603            "path ends with / - with query args - not a file",
604        );
605        test(
606            "https://a.b.c/path?a=b/",
607            None,
608            "path not ends with / - query ends with / but no file extension",
609        );
610        test(
611            "https://a.b.c/path/#a=b",
612            None,
613            "path ends with / - with fragment - not a file",
614        );
615        test(
616            "https://a.b.c/path#a=b/",
617            None,
618            "path not ends with / - fragment ends with / but no file extension",
619        );
620        test(
621            "file///some/path/",
622            None,
623            "file path ends with / - not a file",
624        );
625        test(
626            "file///some/path/file",
627            None,
628            "file path does not end with - no extension",
629        );
630        test(
631            "file///some/path/file.",
632            None,
633            "file path ends with . - no value after .",
634        );
635        test(
636            "file///some/path/file.ext",
637            Some("ext"),
638            "file path ends with .ext - extension is ext",
639        );
640    }
641
642    #[tokio::test]
643    async fn test_list_files() -> Result<()> {
644        let store = MockObjectStore {
645            in_mem: object_store::memory::InMemory::new(),
646            forbidden_paths: vec!["forbidden/e.parquet".into()],
647        };
648
649        // Create some files:
650        create_file(&store, "a.parquet").await;
651        create_file(&store, "/t/b.parquet").await;
652        create_file(&store, "/t/c.csv").await;
653        create_file(&store, "/t/d.csv").await;
654
655        // This file returns a permission error.
656        create_file(&store, "/forbidden/e.parquet").await;
657
658        assert_eq!(
659            list_all_files("/", &store, "parquet").await?,
660            vec!["a.parquet"],
661        );
662
663        // test with and without trailing slash
664        assert_eq!(
665            list_all_files("/t/", &store, "parquet").await?,
666            vec!["t/b.parquet"],
667        );
668        assert_eq!(
669            list_all_files("/t", &store, "parquet").await?,
670            vec!["t/b.parquet"],
671        );
672
673        // test with and without trailing slash
674        assert_eq!(
675            list_all_files("/t", &store, "csv").await?,
676            vec!["t/c.csv", "t/d.csv"],
677        );
678        assert_eq!(
679            list_all_files("/t/", &store, "csv").await?,
680            vec!["t/c.csv", "t/d.csv"],
681        );
682
683        // Test a non existing prefix
684        assert_eq!(
685            list_all_files("/NonExisting", &store, "csv").await?,
686            vec![] as Vec<String>
687        );
688        assert_eq!(
689            list_all_files("/NonExisting/", &store, "csv").await?,
690            vec![] as Vec<String>
691        );
692
693        // Including forbidden.parquet generates an error.
694        let Err(DataFusionError::ObjectStore(err)) =
695            list_all_files("/forbidden/e.parquet", &store, "parquet").await
696        else {
697            panic!("Expected ObjectStore error");
698        };
699
700        let object_store::Error::PermissionDenied { .. } = &*err else {
701            panic!("Expected PermissionDenied error");
702        };
703
704        Ok(())
705    }
706
707    /// Creates a file with "hello world" content at the specified path
708    async fn create_file(object_store: &dyn ObjectStore, path: &str) {
709        object_store
710            .put(&Path::from(path), PutPayload::from_static(b"hello world"))
711            .await
712            .expect("failed to create test file");
713    }
714
715    /// Runs "list_all_files" and returns their paths
716    ///
717    /// Panic's on error
718    async fn list_all_files(
719        url: &str,
720        store: &dyn ObjectStore,
721        file_extension: &str,
722    ) -> Result<Vec<String>> {
723        try_list_all_files(url, store, file_extension).await
724    }
725
726    /// Runs "list_all_files" and returns their paths
727    async fn try_list_all_files(
728        url: &str,
729        store: &dyn ObjectStore,
730        file_extension: &str,
731    ) -> Result<Vec<String>> {
732        let session = MockSession::new();
733        let url = ListingTableUrl::parse(url)?;
734        let files = url
735            .list_all_files(&session, store, file_extension)
736            .await?
737            .try_collect::<Vec<_>>()
738            .await?
739            .into_iter()
740            .map(|meta| meta.location.as_ref().to_string())
741            .collect();
742        Ok(files)
743    }
744
745    #[derive(Debug)]
746    struct MockObjectStore {
747        in_mem: object_store::memory::InMemory,
748        forbidden_paths: Vec<Path>,
749    }
750
751    impl std::fmt::Display for MockObjectStore {
752        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753            self.in_mem.fmt(f)
754        }
755    }
756
757    #[async_trait]
758    impl ObjectStore for MockObjectStore {
759        async fn put_opts(
760            &self,
761            location: &Path,
762            payload: PutPayload,
763            opts: object_store::PutOptions,
764        ) -> object_store::Result<object_store::PutResult> {
765            self.in_mem.put_opts(location, payload, opts).await
766        }
767
768        async fn put_multipart_opts(
769            &self,
770            location: &Path,
771            opts: PutMultipartOptions,
772        ) -> object_store::Result<Box<dyn MultipartUpload>> {
773            self.in_mem.put_multipart_opts(location, opts).await
774        }
775
776        async fn get_opts(
777            &self,
778            location: &Path,
779            options: GetOptions,
780        ) -> object_store::Result<GetResult> {
781            self.in_mem.get_opts(location, options).await
782        }
783
784        async fn get_ranges(
785            &self,
786            location: &Path,
787            ranges: &[Range<u64>],
788        ) -> object_store::Result<Vec<Bytes>> {
789            self.in_mem.get_ranges(location, ranges).await
790        }
791
792        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
793            if self.forbidden_paths.contains(location) {
794                Err(object_store::Error::PermissionDenied {
795                    path: location.to_string(),
796                    source: "forbidden".into(),
797                })
798            } else {
799                self.in_mem.head(location).await
800            }
801        }
802
803        async fn delete(&self, location: &Path) -> object_store::Result<()> {
804            self.in_mem.delete(location).await
805        }
806
807        fn list(
808            &self,
809            prefix: Option<&Path>,
810        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
811            self.in_mem.list(prefix)
812        }
813
814        async fn list_with_delimiter(
815            &self,
816            prefix: Option<&Path>,
817        ) -> object_store::Result<ListResult> {
818            self.in_mem.list_with_delimiter(prefix).await
819        }
820
821        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
822            self.in_mem.copy(from, to).await
823        }
824
825        async fn copy_if_not_exists(
826            &self,
827            from: &Path,
828            to: &Path,
829        ) -> object_store::Result<()> {
830            self.in_mem.copy_if_not_exists(from, to).await
831        }
832    }
833
834    struct MockSession {
835        config: SessionConfig,
836        runtime_env: Arc<RuntimeEnv>,
837    }
838
839    impl MockSession {
840        fn new() -> Self {
841            Self {
842                config: SessionConfig::new(),
843                runtime_env: Arc::new(RuntimeEnv::default()),
844            }
845        }
846    }
847
848    #[async_trait::async_trait]
849    impl Session for MockSession {
850        fn session_id(&self) -> &str {
851            unimplemented!()
852        }
853
854        fn config(&self) -> &SessionConfig {
855            &self.config
856        }
857
858        async fn create_physical_plan(
859            &self,
860            _logical_plan: &LogicalPlan,
861        ) -> Result<Arc<dyn ExecutionPlan>> {
862            unimplemented!()
863        }
864
865        fn create_physical_expr(
866            &self,
867            _expr: Expr,
868            _df_schema: &DFSchema,
869        ) -> Result<Arc<dyn PhysicalExpr>> {
870            unimplemented!()
871        }
872
873        fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
874            unimplemented!()
875        }
876
877        fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
878            unimplemented!()
879        }
880
881        fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
882            unimplemented!()
883        }
884
885        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
886            &self.runtime_env
887        }
888
889        fn execution_props(&self) -> &ExecutionProps {
890            unimplemented!()
891        }
892
893        fn as_any(&self) -> &dyn Any {
894            unimplemented!()
895        }
896
897        fn table_options(&self) -> &TableOptions {
898            unimplemented!()
899        }
900
901        fn table_options_mut(&mut self) -> &mut TableOptions {
902            unimplemented!()
903        }
904
905        fn task_ctx(&self) -> Arc<TaskContext> {
906            unimplemented!()
907        }
908    }
909}