Skip to main content

datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result, TableReference};
21use datafusion_execution::cache::TableScopedPath;
22use datafusion_execution::cache::cache_manager::CachedFileList;
23use datafusion_execution::object_store::ObjectStoreUrl;
24use datafusion_session::Session;
25
26use futures::stream::BoxStream;
27use futures::{StreamExt, TryStreamExt};
28use glob::Pattern;
29use itertools::Itertools;
30use log::debug;
31use object_store::path::DELIMITER;
32use object_store::path::Path;
33use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
34use url::Url;
35
36/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
37/// for more information on the supported expressions
38#[derive(Debug, Clone, Eq, PartialEq, Hash)]
39pub struct ListingTableUrl {
40    /// A URL that identifies a file or directory to list files from
41    url: Url,
42    /// The path prefix
43    prefix: Path,
44    /// An optional glob expression used to filter files
45    glob: Option<Pattern>,
46    /// Optional table reference for the table this url belongs to
47    table_ref: Option<TableReference>,
48}
49
50impl ListingTableUrl {
51    /// Parse a provided string as a `ListingTableUrl`
52    ///
53    /// A URL can either refer to a single object, or a collection of objects with a
54    /// common prefix, with the presence of a trailing `/` indicating a collection.
55    ///
56    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
57    /// `file:///foo/` refers to all the files under the directory `/foo` and its
58    /// subdirectories.
59    ///
60    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
61    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
62    /// S3 bucket `BUCKET`
63    ///
64    /// # URL Encoding
65    ///
66    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
67    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
68    ///
69    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
70    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
71    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
72    /// when using the aws-cli.
73    ///
74    /// # Paths without a Scheme
75    ///
76    /// If no scheme is provided, or the string is an absolute filesystem path
77    /// as determined by [`std::path::Path::is_absolute`], the string will be
78    /// interpreted as a path on the local filesystem using the operating
79    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
80    ///
81    /// If the path contains any of `'?', '*', '['`, it will be considered
82    /// a glob expression and resolved as described in the section below.
83    ///
84    /// Otherwise, the path will be resolved to an absolute path based on the current
85    /// working directory, and converted to a [file URI].
86    ///
87    /// If the path already exists in the local filesystem this will be used to determine if this
88    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
89    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
90    /// of ambiguity it is recommended users always include trailing `/` when intending to
91    /// refer to a directory.
92    ///
93    /// ## Glob File Paths
94    ///
95    /// If no scheme is provided, and the path contains a glob expression, it will
96    /// be resolved as follows.
97    ///
98    /// The string up to the first path segment containing a glob expression will be extracted,
99    /// and resolved in the same manner as a normal scheme-less path above.
100    ///
101    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
102    /// filter when listing files from object storage
103    ///
104    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
105    /// [URL]: https://url.spec.whatwg.org/
106    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
107        let s = s.as_ref();
108
109        // This is necessary to handle the case of a path starting with a drive letter
110        #[cfg(not(target_arch = "wasm32"))]
111        if std::path::Path::new(s).is_absolute() {
112            return Self::parse_path(s);
113        }
114
115        match Url::parse(s) {
116            Ok(url) => Self::try_new(url, None),
117            #[cfg(not(target_arch = "wasm32"))]
118            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
119            Err(e) => Err(DataFusionError::External(Box::new(e))),
120        }
121    }
122
123    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
124    #[cfg(not(target_arch = "wasm32"))]
125    fn parse_path(s: &str) -> Result<Self> {
126        let (path, glob) = match split_glob_expression(s) {
127            Some((prefix, glob)) => {
128                let glob = Pattern::new(glob)
129                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
130                (prefix, Some(glob))
131            }
132            None => (s, None),
133        };
134
135        let url = url_from_filesystem_path(path).ok_or_else(|| {
136            DataFusionError::External(
137                format!("Failed to convert path to URL: {path}").into(),
138            )
139        })?;
140
141        Self::try_new(url, glob)
142    }
143
144    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
145    ///
146    /// [`Self::parse`] supports glob expression only for file system paths.
147    /// However, some applications may want to support glob expression for URLs with a scheme.
148    /// The application can split the URL into a base URL and a glob expression and use this method
149    /// to create a [`ListingTableUrl`].
150    pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
151        let prefix = Path::from_url_path(url.path())?;
152        Ok(Self {
153            url,
154            prefix,
155            glob,
156            table_ref: None,
157        })
158    }
159
160    /// Returns the URL scheme
161    pub fn scheme(&self) -> &str {
162        self.url.scheme()
163    }
164
165    /// Return the URL path not excluding any glob expression
166    ///
167    /// If [`Self::is_collection`], this is the listing prefix
168    /// Otherwise, this is the path to the object
169    pub fn prefix(&self) -> &Path {
170        &self.prefix
171    }
172
173    /// Returns `true` if `path` matches this [`ListingTableUrl`]
174    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
175        let Some(all_segments) = self.strip_prefix(path) else {
176            return false;
177        };
178
179        // remove any segments that contain `=` as they are allowed even
180        // when ignore subdirectories is `true`.
181        let mut segments = all_segments.filter(|s| !s.contains('='));
182
183        match &self.glob {
184            Some(glob) => {
185                if ignore_subdirectory {
186                    segments
187                        .next()
188                        .is_some_and(|file_name| glob.matches(file_name))
189                } else {
190                    let stripped = segments.join(DELIMITER);
191                    glob.matches(&stripped)
192                }
193            }
194            // where we are ignoring subdirectories, we require
195            // the path to be either empty, or contain just the
196            // final file name segment.
197            None if ignore_subdirectory => segments.count() <= 1,
198            // in this case, any valid path at or below the url is allowed
199            None => true,
200        }
201    }
202
203    /// Returns `true` if `path` refers to a collection of objects
204    pub fn is_collection(&self) -> bool {
205        self.url.path().ends_with(DELIMITER)
206    }
207
208    /// Returns the file extension of the last path segment if it exists
209    ///
210    /// Examples:
211    /// ```rust
212    /// use datafusion_datasource::ListingTableUrl;
213    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
214    /// assert_eq!(url.file_extension(), Some("csv"));
215    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
216    /// assert_eq!(url.file_extension(), None);
217    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
218    /// assert_eq!(url.file_extension(), None);
219    /// ```
220    pub fn file_extension(&self) -> Option<&str> {
221        if let Some(mut segments) = self.url.path_segments()
222            && let Some(last_segment) = segments.next_back()
223            && last_segment.contains(".")
224            && !last_segment.ends_with(".")
225        {
226            return last_segment.split('.').next_back();
227        }
228
229        None
230    }
231
232    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
233    /// an iterator of the remaining path segments
234    pub fn strip_prefix<'a, 'b: 'a>(
235        &'a self,
236        path: &'b Path,
237    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
238        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
239        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
240            stripped = stripped.strip_prefix(DELIMITER)?;
241        }
242        Some(stripped.split_terminator(DELIMITER))
243    }
244
245    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`,
246    /// optionally filtering by a path prefix
247    pub async fn list_prefixed_files<'a>(
248        &'a self,
249        ctx: &'a dyn Session,
250        store: &'a dyn ObjectStore,
251        prefix: Option<Path>,
252        file_extension: &'a str,
253    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
254        let exec_options = &ctx.config_options().execution;
255        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
256
257        // Build full_prefix for non-cached path and head() calls
258        let full_prefix = if let Some(ref p) = prefix {
259            let mut parts = self.prefix.parts().collect::<Vec<_>>();
260            parts.extend(p.parts());
261            Path::from_iter(parts.into_iter())
262        } else {
263            self.prefix.clone()
264        };
265
266        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
267            list_with_cache(
268                ctx,
269                store,
270                self.table_ref.as_ref(),
271                &self.prefix,
272                prefix.as_ref(),
273            )
274            .await?
275        } else {
276            match store.head(&full_prefix).await {
277                Ok(meta) => futures::stream::once(async { Ok(meta) })
278                    .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
279                    .boxed(),
280                // If the head command fails, it is likely that object doesn't exist.
281                // Retry as though it were a prefix (aka a collection)
282                Err(object_store::Error::NotFound { .. }) => {
283                    list_with_cache(
284                        ctx,
285                        store,
286                        self.table_ref.as_ref(),
287                        &self.prefix,
288                        prefix.as_ref(),
289                    )
290                    .await?
291                }
292                Err(e) => return Err(e.into()),
293            }
294        };
295
296        Ok(list
297            .try_filter(move |meta| {
298                let path = &meta.location;
299                let extension_match = path.as_ref().ends_with(file_extension);
300                let glob_match = self.contains(path, ignore_subdirectory);
301                futures::future::ready(extension_match && glob_match)
302            })
303            .boxed())
304    }
305
306    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
307    pub async fn list_all_files<'a>(
308        &'a self,
309        ctx: &'a dyn Session,
310        store: &'a dyn ObjectStore,
311        file_extension: &'a str,
312    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
313        self.list_prefixed_files(ctx, store, None, file_extension)
314            .await
315    }
316
317    /// Returns this [`ListingTableUrl`] as a string
318    pub fn as_str(&self) -> &str {
319        self.as_ref()
320    }
321
322    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
323    pub fn object_store(&self) -> ObjectStoreUrl {
324        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
325        ObjectStoreUrl::parse(url).unwrap()
326    }
327
328    /// Returns true if the [`ListingTableUrl`] points to the folder
329    pub fn is_folder(&self) -> bool {
330        self.url.scheme() == "file" && self.is_collection()
331    }
332
333    /// Return the `url` for [`ListingTableUrl`]
334    pub fn get_url(&self) -> &Url {
335        &self.url
336    }
337
338    /// Return the `glob` for [`ListingTableUrl`]
339    pub fn get_glob(&self) -> &Option<Pattern> {
340        &self.glob
341    }
342
343    /// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
344    pub fn with_glob(mut self, glob: &str) -> Result<Self> {
345        self.glob =
346            Some(Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?);
347        Ok(self)
348    }
349
350    /// Set the table reference for this [`ListingTableUrl`]
351    pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
352        self.table_ref = Some(table_ref);
353        self
354    }
355
356    /// Return the table reference for this [`ListingTableUrl`]
357    pub fn get_table_ref(&self) -> &Option<TableReference> {
358        &self.table_ref
359    }
360}
361
362/// Lists files with cache support, using prefix-aware lookups.
363///
364/// # Arguments
365/// * `ctx` - The session context
366/// * `store` - The object store to list from
367/// * `table_base_path` - The table's base path (the stable cache key)
368/// * `prefix` - Optional prefix relative to table base for filtering results
369///
370/// # Cache Behavior:
371/// The cache key is always `table_base_path`. When a prefix-filtered listing
372/// is requested via `prefix`, the cache:
373/// - Looks up `table_base_path` in the cache
374/// - Filters results to match `table_base_path/prefix`
375/// - Returns filtered results without a storage call
376///
377/// On cache miss, the full table is always listed and cached, ensuring
378/// subsequent prefix queries can be served from cache.
379async fn list_with_cache<'b>(
380    ctx: &'b dyn Session,
381    store: &'b dyn ObjectStore,
382    table_ref: Option<&TableReference>,
383    table_base_path: &Path,
384    prefix: Option<&Path>,
385) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
386    // Build the full listing path (table_base + prefix)
387    let full_prefix = match prefix {
388        Some(p) => {
389            let mut parts: Vec<_> = table_base_path.parts().collect();
390            parts.extend(p.parts());
391            Path::from_iter(parts)
392        }
393        None => table_base_path.clone(),
394    };
395
396    match ctx.runtime_env().cache_manager.get_list_files_cache() {
397        None => Ok(store
398            .list(Some(&full_prefix))
399            .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
400            .boxed()),
401        Some(cache) => {
402            // Build the filter prefix (only Some if prefix was requested)
403            let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
404
405            let table_scoped_base_path = TableScopedPath {
406                table: table_ref.cloned(),
407                path: table_base_path.clone(),
408            };
409
410            // Try cache lookup - get returns CachedFileList
411            let vec = if let Some(cached) = cache.get(&table_scoped_base_path) {
412                debug!("Hit list files cache");
413                cached.files_matching_prefix(&filter_prefix)
414            } else {
415                // Cache miss - always list and cache the full table
416                // This ensures we have complete data for future prefix queries
417                let mut vec = store
418                    .list(Some(table_base_path))
419                    .try_collect::<Vec<ObjectMeta>>()
420                    .await?;
421                vec.shrink_to_fit(); // Right-size before caching
422                let cached: CachedFileList = vec.into();
423                let result = cached.files_matching_prefix(&filter_prefix);
424                cache.put(&table_scoped_base_path, cached);
425                result
426            };
427            Ok(
428                futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
429                    .boxed(),
430            )
431        }
432    }
433}
434
435/// Creates a file URL from a potentially relative filesystem path
436#[cfg(not(target_arch = "wasm32"))]
437fn url_from_filesystem_path(s: &str) -> Option<Url> {
438    let path = std::path::Path::new(s);
439    let is_dir = match path.exists() {
440        true => path.is_dir(),
441        // Fallback to inferring from trailing separator
442        false => std::path::is_separator(s.chars().last()?),
443    };
444
445    let from_absolute_path = |p| {
446        let first = match is_dir {
447            true => Url::from_directory_path(p).ok(),
448            false => Url::from_file_path(p).ok(),
449        }?;
450
451        // By default from_*_path preserve relative path segments
452        // We therefore parse the URL again to resolve these
453        Url::parse(first.as_str()).ok()
454    };
455
456    if path.is_absolute() {
457        return from_absolute_path(path);
458    }
459
460    let absolute = std::env::current_dir().ok()?.join(path);
461    from_absolute_path(&absolute)
462}
463
464impl AsRef<str> for ListingTableUrl {
465    fn as_ref(&self) -> &str {
466        self.url.as_ref()
467    }
468}
469
470impl AsRef<Url> for ListingTableUrl {
471    fn as_ref(&self) -> &Url {
472        &self.url
473    }
474}
475
476impl std::fmt::Display for ListingTableUrl {
477    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478        self.as_str().fmt(f)
479    }
480}
481
482#[cfg(not(target_arch = "wasm32"))]
483const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
484
485/// Splits `path` at the first path segment containing a glob expression, returning
486/// `None` if no glob expression found.
487///
488/// Path delimiters are determined using [`std::path::is_separator`] which
489/// permits `/` as a path delimiter even on Windows platforms.
490#[cfg(not(target_arch = "wasm32"))]
491fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
492    let mut last_separator = 0;
493
494    for (byte_idx, char) in path.char_indices() {
495        if GLOB_START_CHARS.contains(&char) {
496            if last_separator == 0 {
497                return Some((".", path));
498            }
499            return Some(path.split_at(last_separator));
500        }
501
502        if std::path::is_separator(char) {
503            last_separator = byte_idx + char.len_utf8();
504        }
505    }
506    None
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use async_trait::async_trait;
513    use bytes::Bytes;
514    use datafusion_common::DFSchema;
515    use datafusion_common::config::TableOptions;
516    use datafusion_execution::TaskContext;
517    use datafusion_execution::config::SessionConfig;
518    use datafusion_execution::runtime_env::RuntimeEnv;
519    use datafusion_expr::execution_props::ExecutionProps;
520    use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
521    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
522    use datafusion_physical_plan::ExecutionPlan;
523    use object_store::{
524        CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
525        PutMultipartOptions, PutPayload,
526    };
527    use std::any::Any;
528    use std::collections::HashMap;
529    use std::ops::Range;
530    use std::sync::Arc;
531    use tempfile::tempdir;
532
533    #[test]
534    fn test_prefix_path() {
535        let root = std::env::current_dir().unwrap();
536        let root = root.to_string_lossy();
537
538        let url = ListingTableUrl::parse(root).unwrap();
539        let child = url.prefix.child("partition").child("file");
540
541        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
542        assert_eq!(prefix, vec!["partition", "file"]);
543
544        let url = ListingTableUrl::parse("file:///").unwrap();
545        let child = Path::parse("/foo/bar").unwrap();
546        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
547        assert_eq!(prefix, vec!["foo", "bar"]);
548
549        let url = ListingTableUrl::parse("file:///foo").unwrap();
550        let child = Path::parse("/foob/bar").unwrap();
551        assert!(url.strip_prefix(&child).is_none());
552
553        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
554        let child = Path::parse("/foo/file").unwrap();
555        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
556
557        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
558        assert_eq!(url.prefix.as_ref(), "foo/ bar");
559
560        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
561        assert_eq!(url.prefix.as_ref(), "foo/bar");
562
563        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
564        assert_eq!(url.prefix.as_ref(), "foo/😺");
565
566        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
567        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
568
569        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
570        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
571
572        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
573        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
574
575        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
576        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
577
578        let dir = tempdir().unwrap();
579        let path = dir.path().join("bar%2Ffoo");
580        std::fs::File::create(&path).unwrap();
581
582        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
583        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
584
585        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
586        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
587
588        let url =
589            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
590        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
591
592        let workdir = std::env::current_dir().unwrap();
593        let t = workdir.join("non-existent");
594        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
595        let b = ListingTableUrl::parse("non-existent").unwrap();
596        assert_eq!(a, b);
597        assert!(a.prefix.as_ref().ends_with("non-existent"));
598
599        let t = workdir.parent().unwrap();
600        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
601        let b = ListingTableUrl::parse("..").unwrap();
602        assert_eq!(a, b);
603
604        let t = t.join("bar");
605        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
606        let b = ListingTableUrl::parse("../bar").unwrap();
607        assert_eq!(a, b);
608        assert!(a.prefix.as_ref().ends_with("bar"));
609
610        let t = t.join(".").join("foo").join("..").join("baz");
611        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
612        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
613        assert_eq!(a, b);
614        assert!(a.prefix.as_ref().ends_with("bar/baz"));
615    }
616
617    #[test]
618    fn test_prefix_s3() {
619        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
620        assert_eq!(url.prefix.as_ref(), "foo/bar");
621
622        let path = Path::from("foo/bar/partition/foo.parquet");
623        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
624        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
625
626        let path = Path::from("other/bar/partition/foo.parquet");
627        assert!(url.strip_prefix(&path).is_none());
628    }
629
630    #[test]
631    fn test_split_glob() {
632        fn test(input: &str, expected: Option<(&str, &str)>) {
633            assert_eq!(
634                split_glob_expression(input),
635                expected,
636                "testing split_glob_expression with {input}"
637            );
638        }
639
640        // no glob patterns
641        test("/", None);
642        test("/a.txt", None);
643        test("/a", None);
644        test("/a/", None);
645        test("/a/b", None);
646        test("/a/b/", None);
647        test("/a/b.txt", None);
648        test("/a/b/c.txt", None);
649        // glob patterns, thus we build the longest path (os-specific)
650        test("*.txt", Some((".", "*.txt")));
651        test("/*.txt", Some(("/", "*.txt")));
652        test("/a/*b.txt", Some(("/a/", "*b.txt")));
653        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
654        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
655        test("/a/b*.txt", Some(("/a/", "b*.txt")));
656        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
657
658        // https://github.com/apache/datafusion/issues/2465
659        test(
660            "/a/b/c//alltypes_plain*.parquet",
661            Some(("/a/b/c//", "alltypes_plain*.parquet")),
662        );
663    }
664
665    #[test]
666    fn test_is_collection() {
667        fn test(input: &str, expected: bool, message: &str) {
668            let url = ListingTableUrl::parse(input).unwrap();
669            assert_eq!(url.is_collection(), expected, "{message}");
670        }
671
672        test("https://a.b.c/path/", true, "path ends with / - collection");
673        test(
674            "https://a.b.c/path/?a=b",
675            true,
676            "path ends with / - with query args - collection",
677        );
678        test(
679            "https://a.b.c/path?a=b/",
680            false,
681            "path not ends with / - query ends with / - not collection",
682        );
683        test(
684            "https://a.b.c/path/#a=b",
685            true,
686            "path ends with / - with fragment - collection",
687        );
688        test(
689            "https://a.b.c/path#a=b/",
690            false,
691            "path not ends with / - fragment ends with / - not collection",
692        );
693    }
694
695    #[test]
696    fn test_file_extension() {
697        fn test(input: &str, expected: Option<&str>, message: &str) {
698            let url = ListingTableUrl::parse(input).unwrap();
699            assert_eq!(url.file_extension(), expected, "{message}");
700        }
701
702        test("https://a.b.c/path/", None, "path ends with / - not a file");
703        test(
704            "https://a.b.c/path/?a=b",
705            None,
706            "path ends with / - with query args - not a file",
707        );
708        test(
709            "https://a.b.c/path?a=b/",
710            None,
711            "path not ends with / - query ends with / but no file extension",
712        );
713        test(
714            "https://a.b.c/path/#a=b",
715            None,
716            "path ends with / - with fragment - not a file",
717        );
718        test(
719            "https://a.b.c/path#a=b/",
720            None,
721            "path not ends with / - fragment ends with / but no file extension",
722        );
723        test(
724            "file///some/path/",
725            None,
726            "file path ends with / - not a file",
727        );
728        test(
729            "file///some/path/file",
730            None,
731            "file path does not end with - no extension",
732        );
733        test(
734            "file///some/path/file.",
735            None,
736            "file path ends with . - no value after .",
737        );
738        test(
739            "file///some/path/file.ext",
740            Some("ext"),
741            "file path ends with .ext - extension is ext",
742        );
743    }
744
745    #[tokio::test]
746    async fn test_list_files() -> Result<()> {
747        let store = MockObjectStore {
748            in_mem: object_store::memory::InMemory::new(),
749            forbidden_paths: vec!["forbidden/e.parquet".into()],
750        };
751
752        // Create some files:
753        create_file(&store, "a.parquet").await;
754        create_file(&store, "/t/b.parquet").await;
755        create_file(&store, "/t/c.csv").await;
756        create_file(&store, "/t/d.csv").await;
757
758        // This file returns a permission error.
759        create_file(&store, "/forbidden/e.parquet").await;
760
761        assert_eq!(
762            list_all_files("/", &store, "parquet").await?,
763            vec!["a.parquet"],
764        );
765
766        // test with and without trailing slash
767        assert_eq!(
768            list_all_files("/t/", &store, "parquet").await?,
769            vec!["t/b.parquet"],
770        );
771        assert_eq!(
772            list_all_files("/t", &store, "parquet").await?,
773            vec!["t/b.parquet"],
774        );
775
776        // test with and without trailing slash
777        assert_eq!(
778            list_all_files("/t", &store, "csv").await?,
779            vec!["t/c.csv", "t/d.csv"],
780        );
781        assert_eq!(
782            list_all_files("/t/", &store, "csv").await?,
783            vec!["t/c.csv", "t/d.csv"],
784        );
785
786        // Test a non existing prefix
787        assert_eq!(
788            list_all_files("/NonExisting", &store, "csv").await?,
789            vec![] as Vec<String>
790        );
791        assert_eq!(
792            list_all_files("/NonExisting/", &store, "csv").await?,
793            vec![] as Vec<String>
794        );
795
796        // Including forbidden.parquet generates an error.
797        let Err(DataFusionError::ObjectStore(err)) =
798            list_all_files("/forbidden/e.parquet", &store, "parquet").await
799        else {
800            panic!("Expected ObjectStore error");
801        };
802
803        let object_store::Error::PermissionDenied { .. } = &*err else {
804            panic!("Expected PermissionDenied error");
805        };
806
807        // Test prefix filtering with partition-style paths
808        create_file(&store, "/data/a=1/file1.parquet").await;
809        create_file(&store, "/data/a=1/b=100/file2.parquet").await;
810        create_file(&store, "/data/a=2/b=200/file3.parquet").await;
811        create_file(&store, "/data/a=2/b=200/file4.csv").await;
812
813        assert_eq!(
814            list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
815                .await?,
816            vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
817        );
818
819        assert_eq!(
820            list_prefixed_files(
821                "/data/",
822                &store,
823                Some(Path::from("a=1/b=100")),
824                "parquet"
825            )
826            .await?,
827            vec!["data/a=1/b=100/file2.parquet"],
828        );
829
830        assert_eq!(
831            list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
832                .await?,
833            vec!["data/a=2/b=200/file3.parquet"],
834        );
835
836        Ok(())
837    }
838
839    /// Tests that the cached code path produces identical results to the non-cached path.
840    ///
841    /// This is critical: the cache is a transparent optimization, so both paths
842    /// MUST return the same files. Note: order is not guaranteed by ObjectStore::list,
843    /// so we sort results before comparison.
844    #[tokio::test]
845    async fn test_cache_path_equivalence() -> Result<()> {
846        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
847
848        let store = MockObjectStore {
849            in_mem: object_store::memory::InMemory::new(),
850            forbidden_paths: vec![],
851        };
852
853        // Create test files with partition-style paths
854        create_file(&store, "/table/year=2023/data1.parquet").await;
855        create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
856        create_file(&store, "/table/year=2024/data3.parquet").await;
857        create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
858        create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
859
860        // Session WITHOUT cache
861        let session_no_cache = MockSession::new();
862
863        // Session WITH cache - use RuntimeEnvBuilder with cache limit (no TTL needed for this test)
864        let runtime_with_cache = RuntimeEnvBuilder::new()
865            .with_object_list_cache_limit(1024 * 1024) // 1MB limit
866            .build_arc()?;
867        let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
868
869        // Test cases: (url, prefix, description)
870        let test_cases = vec![
871            ("/table/", None, "full table listing"),
872            (
873                "/table/",
874                Some(Path::from("year=2023")),
875                "single partition filter",
876            ),
877            (
878                "/table/",
879                Some(Path::from("year=2024")),
880                "different partition filter",
881            ),
882            (
883                "/table/",
884                Some(Path::from("year=2024/month=06")),
885                "nested partition filter",
886            ),
887            (
888                "/table/",
889                Some(Path::from("year=2025")),
890                "non-existent partition",
891            ),
892        ];
893
894        for (url_str, prefix, description) in test_cases {
895            let url = ListingTableUrl::parse(url_str)?;
896
897            // Get results WITHOUT cache (sorted for comparison)
898            let mut results_no_cache: Vec<String> = url
899                .list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
900                .await?
901                .try_collect::<Vec<_>>()
902                .await?
903                .into_iter()
904                .map(|m| m.location.to_string())
905                .collect();
906            results_no_cache.sort();
907
908            // Get results WITH cache (first call - cache miss, sorted for comparison)
909            let mut results_with_cache_miss: Vec<String> = url
910                .list_prefixed_files(
911                    &session_with_cache,
912                    &store,
913                    prefix.clone(),
914                    "parquet",
915                )
916                .await?
917                .try_collect::<Vec<_>>()
918                .await?
919                .into_iter()
920                .map(|m| m.location.to_string())
921                .collect();
922            results_with_cache_miss.sort();
923
924            // Get results WITH cache (second call - cache hit, sorted for comparison)
925            let mut results_with_cache_hit: Vec<String> = url
926                .list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
927                .await?
928                .try_collect::<Vec<_>>()
929                .await?
930                .into_iter()
931                .map(|m| m.location.to_string())
932                .collect();
933            results_with_cache_hit.sort();
934
935            // All three should contain the same files
936            assert_eq!(
937                results_no_cache, results_with_cache_miss,
938                "Cache miss path should match non-cached path for: {description}"
939            );
940            assert_eq!(
941                results_no_cache, results_with_cache_hit,
942                "Cache hit path should match non-cached path for: {description}"
943            );
944        }
945
946        Ok(())
947    }
948
949    /// Tests that prefix queries can be served from a cached full-table listing
950    #[tokio::test]
951    async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
952        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
953
954        let store = MockObjectStore {
955            in_mem: object_store::memory::InMemory::new(),
956            forbidden_paths: vec![],
957        };
958
959        // Create test files
960        create_file(&store, "/sales/region=US/q1.parquet").await;
961        create_file(&store, "/sales/region=US/q2.parquet").await;
962        create_file(&store, "/sales/region=EU/q1.parquet").await;
963
964        // Create session with cache (no TTL needed for this test)
965        let runtime = RuntimeEnvBuilder::new()
966            .with_object_list_cache_limit(1024 * 1024) // 1MB limit
967            .build_arc()?;
968        let session = MockSession::with_runtime_env(runtime);
969
970        let url = ListingTableUrl::parse("/sales/")?;
971
972        // First: query full table (populates cache)
973        let full_results: Vec<String> = url
974            .list_prefixed_files(&session, &store, None, "parquet")
975            .await?
976            .try_collect::<Vec<_>>()
977            .await?
978            .into_iter()
979            .map(|m| m.location.to_string())
980            .collect();
981        assert_eq!(full_results.len(), 3);
982
983        // Second: query with prefix (should be served from cache)
984        let mut us_results: Vec<String> = url
985            .list_prefixed_files(
986                &session,
987                &store,
988                Some(Path::from("region=US")),
989                "parquet",
990            )
991            .await?
992            .try_collect::<Vec<_>>()
993            .await?
994            .into_iter()
995            .map(|m| m.location.to_string())
996            .collect();
997        us_results.sort();
998
999        assert_eq!(
1000            us_results,
1001            vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
1002        );
1003
1004        // Third: different prefix (also from cache)
1005        let eu_results: Vec<String> = url
1006            .list_prefixed_files(
1007                &session,
1008                &store,
1009                Some(Path::from("region=EU")),
1010                "parquet",
1011            )
1012            .await?
1013            .try_collect::<Vec<_>>()
1014            .await?
1015            .into_iter()
1016            .map(|m| m.location.to_string())
1017            .collect();
1018
1019        assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
1020
1021        Ok(())
1022    }
1023
1024    /// Creates a file with "hello world" content at the specified path
1025    async fn create_file(object_store: &dyn ObjectStore, path: &str) {
1026        object_store
1027            .put(&Path::from(path), PutPayload::from_static(b"hello world"))
1028            .await
1029            .expect("failed to create test file");
1030    }
1031
1032    /// Runs "list_prefixed_files"  with no prefix to list all files and returns their paths
1033    ///
1034    /// Panic's on error
1035    async fn list_all_files(
1036        url: &str,
1037        store: &dyn ObjectStore,
1038        file_extension: &str,
1039    ) -> Result<Vec<String>> {
1040        try_list_prefixed_files(url, store, None, file_extension).await
1041    }
1042
1043    /// Runs "list_prefixed_files" and returns their paths
1044    ///
1045    /// Panic's on error
1046    async fn list_prefixed_files(
1047        url: &str,
1048        store: &dyn ObjectStore,
1049        prefix: Option<Path>,
1050        file_extension: &str,
1051    ) -> Result<Vec<String>> {
1052        try_list_prefixed_files(url, store, prefix, file_extension).await
1053    }
1054
1055    /// Runs "list_prefixed_files" and returns their paths
1056    async fn try_list_prefixed_files(
1057        url: &str,
1058        store: &dyn ObjectStore,
1059        prefix: Option<Path>,
1060        file_extension: &str,
1061    ) -> Result<Vec<String>> {
1062        let session = MockSession::new();
1063        let url = ListingTableUrl::parse(url)?;
1064        let files = url
1065            .list_prefixed_files(&session, store, prefix, file_extension)
1066            .await?
1067            .try_collect::<Vec<_>>()
1068            .await?
1069            .into_iter()
1070            .map(|meta| meta.location.as_ref().to_string())
1071            .collect();
1072        Ok(files)
1073    }
1074
1075    #[derive(Debug)]
1076    struct MockObjectStore {
1077        in_mem: object_store::memory::InMemory,
1078        forbidden_paths: Vec<Path>,
1079    }
1080
1081    impl std::fmt::Display for MockObjectStore {
1082        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1083            self.in_mem.fmt(f)
1084        }
1085    }
1086
1087    #[async_trait]
1088    impl ObjectStore for MockObjectStore {
1089        async fn put_opts(
1090            &self,
1091            location: &Path,
1092            payload: PutPayload,
1093            opts: object_store::PutOptions,
1094        ) -> object_store::Result<object_store::PutResult> {
1095            self.in_mem.put_opts(location, payload, opts).await
1096        }
1097
1098        async fn put_multipart_opts(
1099            &self,
1100            location: &Path,
1101            opts: PutMultipartOptions,
1102        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1103            self.in_mem.put_multipart_opts(location, opts).await
1104        }
1105
1106        async fn get_opts(
1107            &self,
1108            location: &Path,
1109            options: GetOptions,
1110        ) -> object_store::Result<GetResult> {
1111            if options.head && self.forbidden_paths.contains(location) {
1112                Err(object_store::Error::PermissionDenied {
1113                    path: location.to_string(),
1114                    source: "forbidden".into(),
1115                })
1116            } else {
1117                self.in_mem.get_opts(location, options).await
1118            }
1119        }
1120
1121        async fn get_ranges(
1122            &self,
1123            location: &Path,
1124            ranges: &[Range<u64>],
1125        ) -> object_store::Result<Vec<Bytes>> {
1126            self.in_mem.get_ranges(location, ranges).await
1127        }
1128
1129        fn delete_stream(
1130            &self,
1131            locations: BoxStream<'static, object_store::Result<Path>>,
1132        ) -> BoxStream<'static, object_store::Result<Path>> {
1133            self.in_mem.delete_stream(locations)
1134        }
1135
1136        fn list(
1137            &self,
1138            prefix: Option<&Path>,
1139        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1140            self.in_mem.list(prefix)
1141        }
1142
1143        async fn list_with_delimiter(
1144            &self,
1145            prefix: Option<&Path>,
1146        ) -> object_store::Result<ListResult> {
1147            self.in_mem.list_with_delimiter(prefix).await
1148        }
1149
1150        async fn copy_opts(
1151            &self,
1152            from: &Path,
1153            to: &Path,
1154            options: CopyOptions,
1155        ) -> object_store::Result<()> {
1156            self.in_mem.copy_opts(from, to, options).await
1157        }
1158    }
1159
1160    struct MockSession {
1161        config: SessionConfig,
1162        runtime_env: Arc<RuntimeEnv>,
1163    }
1164
1165    impl MockSession {
1166        fn new() -> Self {
1167            Self {
1168                config: SessionConfig::new(),
1169                runtime_env: Arc::new(RuntimeEnv::default()),
1170            }
1171        }
1172
1173        /// Create a MockSession with a custom RuntimeEnv (for cache testing)
1174        fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1175            Self {
1176                config: SessionConfig::new(),
1177                runtime_env,
1178            }
1179        }
1180    }
1181
1182    #[async_trait::async_trait]
1183    impl Session for MockSession {
1184        fn session_id(&self) -> &str {
1185            unimplemented!()
1186        }
1187
1188        fn config(&self) -> &SessionConfig {
1189            &self.config
1190        }
1191
1192        async fn create_physical_plan(
1193            &self,
1194            _logical_plan: &LogicalPlan,
1195        ) -> Result<Arc<dyn ExecutionPlan>> {
1196            unimplemented!()
1197        }
1198
1199        fn create_physical_expr(
1200            &self,
1201            _expr: Expr,
1202            _df_schema: &DFSchema,
1203        ) -> Result<Arc<dyn PhysicalExpr>> {
1204            unimplemented!()
1205        }
1206
1207        fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
1208            unimplemented!()
1209        }
1210
1211        fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
1212            unimplemented!()
1213        }
1214
1215        fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
1216            unimplemented!()
1217        }
1218
1219        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1220            &self.runtime_env
1221        }
1222
1223        fn execution_props(&self) -> &ExecutionProps {
1224            unimplemented!()
1225        }
1226
1227        fn as_any(&self) -> &dyn Any {
1228            unimplemented!()
1229        }
1230
1231        fn table_options(&self) -> &TableOptions {
1232            unimplemented!()
1233        }
1234
1235        fn table_options_mut(&mut self) -> &mut TableOptions {
1236            unimplemented!()
1237        }
1238
1239        fn task_ctx(&self) -> Arc<TaskContext> {
1240            unimplemented!()
1241        }
1242    }
1243}