datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_catalog::Session;
19use datafusion_common::{DataFusionError, Result};
20use datafusion_execution::object_store::ObjectStoreUrl;
21use futures::stream::BoxStream;
22use futures::{StreamExt, TryStreamExt};
23use glob::Pattern;
24use itertools::Itertools;
25use log::debug;
26use object_store::path::Path;
27use object_store::path::DELIMITER;
28use object_store::{ObjectMeta, ObjectStore};
29use std::sync::Arc;
30use url::Url;
31
32/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
33/// for more information on the supported expressions
34#[derive(Debug, Clone, Eq, PartialEq, Hash)]
35pub struct ListingTableUrl {
36    /// A URL that identifies a file or directory to list files from
37    url: Url,
38    /// The path prefix
39    prefix: Path,
40    /// An optional glob expression used to filter files
41    glob: Option<Pattern>,
42}
43
44impl ListingTableUrl {
45    /// Parse a provided string as a `ListingTableUrl`
46    ///
47    /// A URL can either refer to a single object, or a collection of objects with a
48    /// common prefix, with the presence of a trailing `/` indicating a collection.
49    ///
50    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
51    /// `file:///foo/` refers to all the files under the directory `/foo` and its
52    /// subdirectories.
53    ///
54    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
55    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
56    /// S3 bucket `BUCKET`
57    ///
58    /// # URL Encoding
59    ///
60    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
61    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
62    ///
63    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
64    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
65    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
66    /// when using the aws-cli.
67    ///
68    /// # Paths without a Scheme
69    ///
70    /// If no scheme is provided, or the string is an absolute filesystem path
71    /// as determined by [`std::path::Path::is_absolute`], the string will be
72    /// interpreted as a path on the local filesystem using the operating
73    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
74    ///
75    /// If the path contains any of `'?', '*', '['`, it will be considered
76    /// a glob expression and resolved as described in the section below.
77    ///
78    /// Otherwise, the path will be resolved to an absolute path based on the current
79    /// working directory, and converted to a [file URI].
80    ///
81    /// If the path already exists in the local filesystem this will be used to determine if this
82    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
83    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
84    /// of ambiguity it is recommended users always include trailing `/` when intending to
85    /// refer to a directory.
86    ///
87    /// ## Glob File Paths
88    ///
89    /// If no scheme is provided, and the path contains a glob expression, it will
90    /// be resolved as follows.
91    ///
92    /// The string up to the first path segment containing a glob expression will be extracted,
93    /// and resolved in the same manner as a normal scheme-less path above.
94    ///
95    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
96    /// filter when listing files from object storage
97    ///
98    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
99    /// [URL]: https://url.spec.whatwg.org/
100    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
101        let s = s.as_ref();
102
103        // This is necessary to handle the case of a path starting with a drive letter
104        #[cfg(not(target_arch = "wasm32"))]
105        if std::path::Path::new(s).is_absolute() {
106            return Self::parse_path(s);
107        }
108
109        match Url::parse(s) {
110            Ok(url) => Self::try_new(url, None),
111            #[cfg(not(target_arch = "wasm32"))]
112            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
113            Err(e) => Err(DataFusionError::External(Box::new(e))),
114        }
115    }
116
117    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
118    #[cfg(not(target_arch = "wasm32"))]
119    fn parse_path(s: &str) -> Result<Self> {
120        let (path, glob) = match split_glob_expression(s) {
121            Some((prefix, glob)) => {
122                let glob = Pattern::new(glob)
123                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
124                (prefix, Some(glob))
125            }
126            None => (s, None),
127        };
128
129        let url = url_from_filesystem_path(path).ok_or_else(|| {
130            DataFusionError::External(
131                format!("Failed to convert path to URL: {path}").into(),
132            )
133        })?;
134
135        Self::try_new(url, glob)
136    }
137
138    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
139    fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
140        let prefix = Path::from_url_path(url.path())?;
141        Ok(Self { url, prefix, glob })
142    }
143
144    /// Returns the URL scheme
145    pub fn scheme(&self) -> &str {
146        self.url.scheme()
147    }
148
149    /// Return the URL path not excluding any glob expression
150    ///
151    /// If [`Self::is_collection`], this is the listing prefix
152    /// Otherwise, this is the path to the object
153    pub fn prefix(&self) -> &Path {
154        &self.prefix
155    }
156
157    /// Returns `true` if `path` matches this [`ListingTableUrl`]
158    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
159        let Some(all_segments) = self.strip_prefix(path) else {
160            return false;
161        };
162
163        // remove any segments that contain `=` as they are allowed even
164        // when ignore subdirectories is `true`.
165        let mut segments = all_segments.filter(|s| !s.contains('='));
166
167        match &self.glob {
168            Some(glob) => {
169                if ignore_subdirectory {
170                    segments
171                        .next()
172                        .is_some_and(|file_name| glob.matches(file_name))
173                } else {
174                    let stripped = segments.join(DELIMITER);
175                    glob.matches(&stripped)
176                }
177            }
178            // where we are ignoring subdirectories, we require
179            // the path to be either empty, or contain just the
180            // final file name segment.
181            None if ignore_subdirectory => segments.count() <= 1,
182            // in this case, any valid path at or below the url is allowed
183            None => true,
184        }
185    }
186
187    /// Returns `true` if `path` refers to a collection of objects
188    pub fn is_collection(&self) -> bool {
189        self.url.path().ends_with(DELIMITER)
190    }
191
192    /// Returns the file extension of the last path segment if it exists
193    ///
194    /// Examples:
195    /// ```rust
196    /// use datafusion_datasource::ListingTableUrl;
197    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
198    /// assert_eq!(url.file_extension(), Some("csv"));
199    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
200    /// assert_eq!(url.file_extension(), None);
201    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
202    /// assert_eq!(url.file_extension(), None);
203    /// ```
204    pub fn file_extension(&self) -> Option<&str> {
205        if let Some(segments) = self.url.path_segments() {
206            if let Some(last_segment) = segments.last() {
207                if last_segment.contains(".") && !last_segment.ends_with(".") {
208                    return last_segment.split('.').last();
209                }
210            }
211        }
212
213        None
214    }
215
216    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
217    /// an iterator of the remaining path segments
218    pub fn strip_prefix<'a, 'b: 'a>(
219        &'a self,
220        path: &'b Path,
221    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
222        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
223        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
224            stripped = stripped.strip_prefix(DELIMITER)?;
225        }
226        Some(stripped.split_terminator(DELIMITER))
227    }
228
229    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
230    pub async fn list_all_files<'a>(
231        &'a self,
232        ctx: &'a dyn Session,
233        store: &'a dyn ObjectStore,
234        file_extension: &'a str,
235    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
236        let exec_options = &ctx.config_options().execution;
237        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
238        // If the prefix is a file, use a head request, otherwise list
239        let list = match self.is_collection() {
240            true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
241                None => store.list(Some(&self.prefix)),
242                Some(cache) => {
243                    if let Some(res) = cache.get(&self.prefix) {
244                        debug!("Hit list all files cache");
245                        futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
246                            .boxed()
247                    } else {
248                        let list_res = store.list(Some(&self.prefix));
249                        let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
250                        cache.put(&self.prefix, Arc::new(vec.clone()));
251                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
252                    }
253                }
254            },
255            false => futures::stream::once(store.head(&self.prefix)).boxed(),
256        };
257        Ok(list
258            .try_filter(move |meta| {
259                let path = &meta.location;
260                let extension_match = path.as_ref().ends_with(file_extension);
261                let glob_match = self.contains(path, ignore_subdirectory);
262                futures::future::ready(extension_match && glob_match)
263            })
264            .map_err(DataFusionError::ObjectStore)
265            .boxed())
266    }
267
268    /// Returns this [`ListingTableUrl`] as a string
269    pub fn as_str(&self) -> &str {
270        self.as_ref()
271    }
272
273    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
274    pub fn object_store(&self) -> ObjectStoreUrl {
275        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
276        ObjectStoreUrl::parse(url).unwrap()
277    }
278}
279
280/// Creates a file URL from a potentially relative filesystem path
281#[cfg(not(target_arch = "wasm32"))]
282fn url_from_filesystem_path(s: &str) -> Option<Url> {
283    let path = std::path::Path::new(s);
284    let is_dir = match path.exists() {
285        true => path.is_dir(),
286        // Fallback to inferring from trailing separator
287        false => std::path::is_separator(s.chars().last()?),
288    };
289
290    let from_absolute_path = |p| {
291        let first = match is_dir {
292            true => Url::from_directory_path(p).ok(),
293            false => Url::from_file_path(p).ok(),
294        }?;
295
296        // By default from_*_path preserve relative path segments
297        // We therefore parse the URL again to resolve these
298        Url::parse(first.as_str()).ok()
299    };
300
301    if path.is_absolute() {
302        return from_absolute_path(path);
303    }
304
305    let absolute = std::env::current_dir().ok()?.join(path);
306    from_absolute_path(&absolute)
307}
308
309impl AsRef<str> for ListingTableUrl {
310    fn as_ref(&self) -> &str {
311        self.url.as_ref()
312    }
313}
314
315impl AsRef<Url> for ListingTableUrl {
316    fn as_ref(&self) -> &Url {
317        &self.url
318    }
319}
320
321impl std::fmt::Display for ListingTableUrl {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        self.as_str().fmt(f)
324    }
325}
326
327#[cfg(not(target_arch = "wasm32"))]
328const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
329
330/// Splits `path` at the first path segment containing a glob expression, returning
331/// `None` if no glob expression found.
332///
333/// Path delimiters are determined using [`std::path::is_separator`] which
334/// permits `/` as a path delimiter even on Windows platforms.
335///
336#[cfg(not(target_arch = "wasm32"))]
337fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
338    let mut last_separator = 0;
339
340    for (byte_idx, char) in path.char_indices() {
341        if GLOB_START_CHARS.contains(&char) {
342            if last_separator == 0 {
343                return Some((".", path));
344            }
345            return Some(path.split_at(last_separator));
346        }
347
348        if std::path::is_separator(char) {
349            last_separator = byte_idx + char.len_utf8();
350        }
351    }
352    None
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use tempfile::tempdir;
359
360    #[test]
361    fn test_prefix_path() {
362        let root = std::env::current_dir().unwrap();
363        let root = root.to_string_lossy();
364
365        let url = ListingTableUrl::parse(root).unwrap();
366        let child = url.prefix.child("partition").child("file");
367
368        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
369        assert_eq!(prefix, vec!["partition", "file"]);
370
371        let url = ListingTableUrl::parse("file:///").unwrap();
372        let child = Path::parse("/foo/bar").unwrap();
373        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
374        assert_eq!(prefix, vec!["foo", "bar"]);
375
376        let url = ListingTableUrl::parse("file:///foo").unwrap();
377        let child = Path::parse("/foob/bar").unwrap();
378        assert!(url.strip_prefix(&child).is_none());
379
380        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
381        let child = Path::parse("/foo/file").unwrap();
382        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
383
384        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
385        assert_eq!(url.prefix.as_ref(), "foo/ bar");
386
387        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
388        assert_eq!(url.prefix.as_ref(), "foo/bar");
389
390        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
391        assert_eq!(url.prefix.as_ref(), "foo/😺");
392
393        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
394        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
395
396        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
397        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
398
399        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
400        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
401
402        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
403        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
404
405        let dir = tempdir().unwrap();
406        let path = dir.path().join("bar%2Ffoo");
407        std::fs::File::create(&path).unwrap();
408
409        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
410        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
411
412        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
413        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
414
415        let url =
416            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
417        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
418
419        let workdir = std::env::current_dir().unwrap();
420        let t = workdir.join("non-existent");
421        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
422        let b = ListingTableUrl::parse("non-existent").unwrap();
423        assert_eq!(a, b);
424        assert!(a.prefix.as_ref().ends_with("non-existent"));
425
426        let t = workdir.parent().unwrap();
427        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
428        let b = ListingTableUrl::parse("..").unwrap();
429        assert_eq!(a, b);
430
431        let t = t.join("bar");
432        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
433        let b = ListingTableUrl::parse("../bar").unwrap();
434        assert_eq!(a, b);
435        assert!(a.prefix.as_ref().ends_with("bar"));
436
437        let t = t.join(".").join("foo").join("..").join("baz");
438        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
439        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
440        assert_eq!(a, b);
441        assert!(a.prefix.as_ref().ends_with("bar/baz"));
442    }
443
444    #[test]
445    fn test_prefix_s3() {
446        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
447        assert_eq!(url.prefix.as_ref(), "foo/bar");
448
449        let path = Path::from("foo/bar/partition/foo.parquet");
450        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
451        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
452
453        let path = Path::from("other/bar/partition/foo.parquet");
454        assert!(url.strip_prefix(&path).is_none());
455    }
456
457    #[test]
458    fn test_split_glob() {
459        fn test(input: &str, expected: Option<(&str, &str)>) {
460            assert_eq!(
461                split_glob_expression(input),
462                expected,
463                "testing split_glob_expression with {input}"
464            );
465        }
466
467        // no glob patterns
468        test("/", None);
469        test("/a.txt", None);
470        test("/a", None);
471        test("/a/", None);
472        test("/a/b", None);
473        test("/a/b/", None);
474        test("/a/b.txt", None);
475        test("/a/b/c.txt", None);
476        // glob patterns, thus we build the longest path (os-specific)
477        test("*.txt", Some((".", "*.txt")));
478        test("/*.txt", Some(("/", "*.txt")));
479        test("/a/*b.txt", Some(("/a/", "*b.txt")));
480        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
481        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
482        test("/a/b*.txt", Some(("/a/", "b*.txt")));
483        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
484
485        // https://github.com/apache/datafusion/issues/2465
486        test(
487            "/a/b/c//alltypes_plain*.parquet",
488            Some(("/a/b/c//", "alltypes_plain*.parquet")),
489        );
490    }
491
492    #[test]
493    fn test_is_collection() {
494        fn test(input: &str, expected: bool, message: &str) {
495            let url = ListingTableUrl::parse(input).unwrap();
496            assert_eq!(url.is_collection(), expected, "{message}");
497        }
498
499        test("https://a.b.c/path/", true, "path ends with / - collection");
500        test(
501            "https://a.b.c/path/?a=b",
502            true,
503            "path ends with / - with query args - collection",
504        );
505        test(
506            "https://a.b.c/path?a=b/",
507            false,
508            "path not ends with / - query ends with / - not collection",
509        );
510        test(
511            "https://a.b.c/path/#a=b",
512            true,
513            "path ends with / - with fragment - collection",
514        );
515        test(
516            "https://a.b.c/path#a=b/",
517            false,
518            "path not ends with / - fragment ends with / - not collection",
519        );
520    }
521
522    #[test]
523    fn test_file_extension() {
524        fn test(input: &str, expected: Option<&str>, message: &str) {
525            let url = ListingTableUrl::parse(input).unwrap();
526            assert_eq!(url.file_extension(), expected, "{message}");
527        }
528
529        test("https://a.b.c/path/", None, "path ends with / - not a file");
530        test(
531            "https://a.b.c/path/?a=b",
532            None,
533            "path ends with / - with query args - not a file",
534        );
535        test(
536            "https://a.b.c/path?a=b/",
537            None,
538            "path not ends with / - query ends with / but no file extension",
539        );
540        test(
541            "https://a.b.c/path/#a=b",
542            None,
543            "path ends with / - with fragment - not a file",
544        );
545        test(
546            "https://a.b.c/path#a=b/",
547            None,
548            "path not ends with / - fragment ends with / but no file extension",
549        );
550        test(
551            "file///some/path/",
552            None,
553            "file path ends with / - not a file",
554        );
555        test(
556            "file///some/path/file",
557            None,
558            "file path does not end with - no extension",
559        );
560        test(
561            "file///some/path/file.",
562            None,
563            "file path ends with . - no value after .",
564        );
565        test(
566            "file///some/path/file.ext",
567            Some("ext"),
568            "file path ends with .ext - extension is ext",
569        );
570    }
571}