datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
35/// for more information on the supported expressions
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38    /// A URL that identifies a file or directory to list files from
39    url: Url,
40    /// The path prefix
41    prefix: Path,
42    /// An optional glob expression used to filter files
43    glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47    /// Parse a provided string as a `ListingTableUrl`
48    ///
49    /// A URL can either refer to a single object, or a collection of objects with a
50    /// common prefix, with the presence of a trailing `/` indicating a collection.
51    ///
52    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
53    /// `file:///foo/` refers to all the files under the directory `/foo` and its
54    /// subdirectories.
55    ///
56    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
57    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
58    /// S3 bucket `BUCKET`
59    ///
60    /// # URL Encoding
61    ///
62    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
63    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
64    ///
65    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
66    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
67    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
68    /// when using the aws-cli.
69    ///
70    /// # Paths without a Scheme
71    ///
72    /// If no scheme is provided, or the string is an absolute filesystem path
73    /// as determined by [`std::path::Path::is_absolute`], the string will be
74    /// interpreted as a path on the local filesystem using the operating
75    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
76    ///
77    /// If the path contains any of `'?', '*', '['`, it will be considered
78    /// a glob expression and resolved as described in the section below.
79    ///
80    /// Otherwise, the path will be resolved to an absolute path based on the current
81    /// working directory, and converted to a [file URI].
82    ///
83    /// If the path already exists in the local filesystem this will be used to determine if this
84    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
85    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
86    /// of ambiguity it is recommended users always include trailing `/` when intending to
87    /// refer to a directory.
88    ///
89    /// ## Glob File Paths
90    ///
91    /// If no scheme is provided, and the path contains a glob expression, it will
92    /// be resolved as follows.
93    ///
94    /// The string up to the first path segment containing a glob expression will be extracted,
95    /// and resolved in the same manner as a normal scheme-less path above.
96    ///
97    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
98    /// filter when listing files from object storage
99    ///
100    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
101    /// [URL]: https://url.spec.whatwg.org/
102    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103        let s = s.as_ref();
104
105        // This is necessary to handle the case of a path starting with a drive letter
106        #[cfg(not(target_arch = "wasm32"))]
107        if std::path::Path::new(s).is_absolute() {
108            return Self::parse_path(s);
109        }
110
111        match Url::parse(s) {
112            Ok(url) => Self::try_new(url, None),
113            #[cfg(not(target_arch = "wasm32"))]
114            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115            Err(e) => Err(DataFusionError::External(Box::new(e))),
116        }
117    }
118
119    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
120    #[cfg(not(target_arch = "wasm32"))]
121    fn parse_path(s: &str) -> Result<Self> {
122        let (path, glob) = match split_glob_expression(s) {
123            Some((prefix, glob)) => {
124                let glob = Pattern::new(glob)
125                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
126                (prefix, Some(glob))
127            }
128            None => (s, None),
129        };
130
131        let url = url_from_filesystem_path(path).ok_or_else(|| {
132            DataFusionError::External(
133                format!("Failed to convert path to URL: {path}").into(),
134            )
135        })?;
136
137        Self::try_new(url, glob)
138    }
139
140    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
141    ///
142    /// [`Self::parse`] supports glob expression only for file system paths.
143    /// However, some applications may want to support glob expression for URLs with a scheme.
144    /// The application can split the URL into a base URL and a glob expression and use this method
145    /// to create a [`ListingTableUrl`].
146    pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147        let prefix = Path::from_url_path(url.path())?;
148        Ok(Self { url, prefix, glob })
149    }
150
151    /// Returns the URL scheme
152    pub fn scheme(&self) -> &str {
153        self.url.scheme()
154    }
155
156    /// Return the URL path not excluding any glob expression
157    ///
158    /// If [`Self::is_collection`], this is the listing prefix
159    /// Otherwise, this is the path to the object
160    pub fn prefix(&self) -> &Path {
161        &self.prefix
162    }
163
164    /// Returns `true` if `path` matches this [`ListingTableUrl`]
165    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166        let Some(all_segments) = self.strip_prefix(path) else {
167            return false;
168        };
169
170        // remove any segments that contain `=` as they are allowed even
171        // when ignore subdirectories is `true`.
172        let mut segments = all_segments.filter(|s| !s.contains('='));
173
174        match &self.glob {
175            Some(glob) => {
176                if ignore_subdirectory {
177                    segments
178                        .next()
179                        .is_some_and(|file_name| glob.matches(file_name))
180                } else {
181                    let stripped = segments.join(DELIMITER);
182                    glob.matches(&stripped)
183                }
184            }
185            // where we are ignoring subdirectories, we require
186            // the path to be either empty, or contain just the
187            // final file name segment.
188            None if ignore_subdirectory => segments.count() <= 1,
189            // in this case, any valid path at or below the url is allowed
190            None => true,
191        }
192    }
193
194    /// Returns `true` if `path` refers to a collection of objects
195    pub fn is_collection(&self) -> bool {
196        self.url.path().ends_with(DELIMITER)
197    }
198
199    /// Returns the file extension of the last path segment if it exists
200    ///
201    /// Examples:
202    /// ```rust
203    /// use datafusion_datasource::ListingTableUrl;
204    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
205    /// assert_eq!(url.file_extension(), Some("csv"));
206    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
207    /// assert_eq!(url.file_extension(), None);
208    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
209    /// assert_eq!(url.file_extension(), None);
210    /// ```
211    pub fn file_extension(&self) -> Option<&str> {
212        if let Some(mut segments) = self.url.path_segments() {
213            if let Some(last_segment) = segments.next_back() {
214                if last_segment.contains(".") && !last_segment.ends_with(".") {
215                    return last_segment.split('.').next_back();
216                }
217            }
218        }
219
220        None
221    }
222
223    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
224    /// an iterator of the remaining path segments
225    pub fn strip_prefix<'a, 'b: 'a>(
226        &'a self,
227        path: &'b Path,
228    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231            stripped = stripped.strip_prefix(DELIMITER)?;
232        }
233        Some(stripped.split_terminator(DELIMITER))
234    }
235
236    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
237    pub async fn list_all_files<'a>(
238        &'a self,
239        ctx: &'a dyn Session,
240        store: &'a dyn ObjectStore,
241        file_extension: &'a str,
242    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243        let exec_options = &ctx.config_options().execution;
244        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245        // If the prefix is a file, use a head request, otherwise list
246        let list = match self.is_collection() {
247            true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
248                None => store.list(Some(&self.prefix)),
249                Some(cache) => {
250                    if let Some(res) = cache.get(&self.prefix) {
251                        debug!("Hit list all files cache");
252                        futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
253                            .boxed()
254                    } else {
255                        let list_res = store.list(Some(&self.prefix));
256                        let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
257                        cache.put(&self.prefix, Arc::new(vec.clone()));
258                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
259                    }
260                }
261            },
262            false => futures::stream::once(store.head(&self.prefix)).boxed(),
263        };
264        Ok(list
265            .try_filter(move |meta| {
266                let path = &meta.location;
267                let extension_match = path.as_ref().ends_with(file_extension);
268                let glob_match = self.contains(path, ignore_subdirectory);
269                futures::future::ready(extension_match && glob_match)
270            })
271            .map_err(DataFusionError::ObjectStore)
272            .boxed())
273    }
274
275    /// Returns this [`ListingTableUrl`] as a string
276    pub fn as_str(&self) -> &str {
277        self.as_ref()
278    }
279
280    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
281    pub fn object_store(&self) -> ObjectStoreUrl {
282        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
283        ObjectStoreUrl::parse(url).unwrap()
284    }
285}
286
287/// Creates a file URL from a potentially relative filesystem path
288#[cfg(not(target_arch = "wasm32"))]
289fn url_from_filesystem_path(s: &str) -> Option<Url> {
290    let path = std::path::Path::new(s);
291    let is_dir = match path.exists() {
292        true => path.is_dir(),
293        // Fallback to inferring from trailing separator
294        false => std::path::is_separator(s.chars().last()?),
295    };
296
297    let from_absolute_path = |p| {
298        let first = match is_dir {
299            true => Url::from_directory_path(p).ok(),
300            false => Url::from_file_path(p).ok(),
301        }?;
302
303        // By default from_*_path preserve relative path segments
304        // We therefore parse the URL again to resolve these
305        Url::parse(first.as_str()).ok()
306    };
307
308    if path.is_absolute() {
309        return from_absolute_path(path);
310    }
311
312    let absolute = std::env::current_dir().ok()?.join(path);
313    from_absolute_path(&absolute)
314}
315
316impl AsRef<str> for ListingTableUrl {
317    fn as_ref(&self) -> &str {
318        self.url.as_ref()
319    }
320}
321
322impl AsRef<Url> for ListingTableUrl {
323    fn as_ref(&self) -> &Url {
324        &self.url
325    }
326}
327
328impl std::fmt::Display for ListingTableUrl {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        self.as_str().fmt(f)
331    }
332}
333
334#[cfg(not(target_arch = "wasm32"))]
335const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
336
337/// Splits `path` at the first path segment containing a glob expression, returning
338/// `None` if no glob expression found.
339///
340/// Path delimiters are determined using [`std::path::is_separator`] which
341/// permits `/` as a path delimiter even on Windows platforms.
342///
343#[cfg(not(target_arch = "wasm32"))]
344fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
345    let mut last_separator = 0;
346
347    for (byte_idx, char) in path.char_indices() {
348        if GLOB_START_CHARS.contains(&char) {
349            if last_separator == 0 {
350                return Some((".", path));
351            }
352            return Some(path.split_at(last_separator));
353        }
354
355        if std::path::is_separator(char) {
356            last_separator = byte_idx + char.len_utf8();
357        }
358    }
359    None
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use tempfile::tempdir;
366
367    #[test]
368    fn test_prefix_path() {
369        let root = std::env::current_dir().unwrap();
370        let root = root.to_string_lossy();
371
372        let url = ListingTableUrl::parse(root).unwrap();
373        let child = url.prefix.child("partition").child("file");
374
375        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
376        assert_eq!(prefix, vec!["partition", "file"]);
377
378        let url = ListingTableUrl::parse("file:///").unwrap();
379        let child = Path::parse("/foo/bar").unwrap();
380        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
381        assert_eq!(prefix, vec!["foo", "bar"]);
382
383        let url = ListingTableUrl::parse("file:///foo").unwrap();
384        let child = Path::parse("/foob/bar").unwrap();
385        assert!(url.strip_prefix(&child).is_none());
386
387        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
388        let child = Path::parse("/foo/file").unwrap();
389        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
390
391        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
392        assert_eq!(url.prefix.as_ref(), "foo/ bar");
393
394        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
395        assert_eq!(url.prefix.as_ref(), "foo/bar");
396
397        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
398        assert_eq!(url.prefix.as_ref(), "foo/😺");
399
400        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
401        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
402
403        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
404        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
405
406        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
407        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
408
409        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
410        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
411
412        let dir = tempdir().unwrap();
413        let path = dir.path().join("bar%2Ffoo");
414        std::fs::File::create(&path).unwrap();
415
416        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
417        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
418
419        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
420        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
421
422        let url =
423            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
424        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
425
426        let workdir = std::env::current_dir().unwrap();
427        let t = workdir.join("non-existent");
428        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
429        let b = ListingTableUrl::parse("non-existent").unwrap();
430        assert_eq!(a, b);
431        assert!(a.prefix.as_ref().ends_with("non-existent"));
432
433        let t = workdir.parent().unwrap();
434        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
435        let b = ListingTableUrl::parse("..").unwrap();
436        assert_eq!(a, b);
437
438        let t = t.join("bar");
439        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
440        let b = ListingTableUrl::parse("../bar").unwrap();
441        assert_eq!(a, b);
442        assert!(a.prefix.as_ref().ends_with("bar"));
443
444        let t = t.join(".").join("foo").join("..").join("baz");
445        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
446        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
447        assert_eq!(a, b);
448        assert!(a.prefix.as_ref().ends_with("bar/baz"));
449    }
450
451    #[test]
452    fn test_prefix_s3() {
453        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
454        assert_eq!(url.prefix.as_ref(), "foo/bar");
455
456        let path = Path::from("foo/bar/partition/foo.parquet");
457        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
458        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
459
460        let path = Path::from("other/bar/partition/foo.parquet");
461        assert!(url.strip_prefix(&path).is_none());
462    }
463
464    #[test]
465    fn test_split_glob() {
466        fn test(input: &str, expected: Option<(&str, &str)>) {
467            assert_eq!(
468                split_glob_expression(input),
469                expected,
470                "testing split_glob_expression with {input}"
471            );
472        }
473
474        // no glob patterns
475        test("/", None);
476        test("/a.txt", None);
477        test("/a", None);
478        test("/a/", None);
479        test("/a/b", None);
480        test("/a/b/", None);
481        test("/a/b.txt", None);
482        test("/a/b/c.txt", None);
483        // glob patterns, thus we build the longest path (os-specific)
484        test("*.txt", Some((".", "*.txt")));
485        test("/*.txt", Some(("/", "*.txt")));
486        test("/a/*b.txt", Some(("/a/", "*b.txt")));
487        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
488        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
489        test("/a/b*.txt", Some(("/a/", "b*.txt")));
490        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
491
492        // https://github.com/apache/datafusion/issues/2465
493        test(
494            "/a/b/c//alltypes_plain*.parquet",
495            Some(("/a/b/c//", "alltypes_plain*.parquet")),
496        );
497    }
498
499    #[test]
500    fn test_is_collection() {
501        fn test(input: &str, expected: bool, message: &str) {
502            let url = ListingTableUrl::parse(input).unwrap();
503            assert_eq!(url.is_collection(), expected, "{message}");
504        }
505
506        test("https://a.b.c/path/", true, "path ends with / - collection");
507        test(
508            "https://a.b.c/path/?a=b",
509            true,
510            "path ends with / - with query args - collection",
511        );
512        test(
513            "https://a.b.c/path?a=b/",
514            false,
515            "path not ends with / - query ends with / - not collection",
516        );
517        test(
518            "https://a.b.c/path/#a=b",
519            true,
520            "path ends with / - with fragment - collection",
521        );
522        test(
523            "https://a.b.c/path#a=b/",
524            false,
525            "path not ends with / - fragment ends with / - not collection",
526        );
527    }
528
529    #[test]
530    fn test_file_extension() {
531        fn test(input: &str, expected: Option<&str>, message: &str) {
532            let url = ListingTableUrl::parse(input).unwrap();
533            assert_eq!(url.file_extension(), expected, "{message}");
534        }
535
536        test("https://a.b.c/path/", None, "path ends with / - not a file");
537        test(
538            "https://a.b.c/path/?a=b",
539            None,
540            "path ends with / - with query args - not a file",
541        );
542        test(
543            "https://a.b.c/path?a=b/",
544            None,
545            "path not ends with / - query ends with / but no file extension",
546        );
547        test(
548            "https://a.b.c/path/#a=b",
549            None,
550            "path ends with / - with fragment - not a file",
551        );
552        test(
553            "https://a.b.c/path#a=b/",
554            None,
555            "path not ends with / - fragment ends with / but no file extension",
556        );
557        test(
558            "file///some/path/",
559            None,
560            "file path ends with / - not a file",
561        );
562        test(
563            "file///some/path/file",
564            None,
565            "file path does not end with - no extension",
566        );
567        test(
568            "file///some/path/file.",
569            None,
570            "file path ends with . - no value after .",
571        );
572        test(
573            "file///some/path/file.ext",
574            Some("ext"),
575            "file path ends with .ext - extension is ext",
576        );
577    }
578}