datafusion_datasource/
url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result, TableReference};
21use datafusion_execution::cache::TableScopedPath;
22use datafusion_execution::object_store::ObjectStoreUrl;
23use datafusion_session::Session;
24
25use futures::stream::BoxStream;
26use futures::{StreamExt, TryStreamExt};
27use glob::Pattern;
28use itertools::Itertools;
29use log::debug;
30use object_store::path::DELIMITER;
31use object_store::path::Path;
32use object_store::{ObjectMeta, ObjectStore};
33use url::Url;
34
35/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
36/// for more information on the supported expressions
37#[derive(Debug, Clone, Eq, PartialEq, Hash)]
38pub struct ListingTableUrl {
39    /// A URL that identifies a file or directory to list files from
40    url: Url,
41    /// The path prefix
42    prefix: Path,
43    /// An optional glob expression used to filter files
44    glob: Option<Pattern>,
45
46    table_ref: Option<TableReference>,
47}
48
49impl ListingTableUrl {
50    /// Parse a provided string as a `ListingTableUrl`
51    ///
52    /// A URL can either refer to a single object, or a collection of objects with a
53    /// common prefix, with the presence of a trailing `/` indicating a collection.
54    ///
55    /// For example, `file:///foo.txt` refers to the file at `/foo.txt`, whereas
56    /// `file:///foo/` refers to all the files under the directory `/foo` and its
57    /// subdirectories.
58    ///
59    /// Similarly `s3://BUCKET/blob.csv` refers to `blob.csv` in the S3 bucket `BUCKET`,
60    /// whereas `s3://BUCKET/foo/` refers to all objects with the prefix `foo/` in the
61    /// S3 bucket `BUCKET`
62    ///
63    /// # URL Encoding
64    ///
65    /// URL paths are expected to be URL-encoded. That is, the URL for a file named `bar%2Efoo`
66    /// would be `file:///bar%252Efoo`, as per the [URL] specification.
67    ///
68    /// It should be noted that some tools, such as the AWS CLI, take a different approach and
69    /// instead interpret the URL path verbatim. For example the object `bar%2Efoo` would be
70    /// addressed as `s3://BUCKET/bar%252Efoo` using [`ListingTableUrl`] but `s3://BUCKET/bar%2Efoo`
71    /// when using the aws-cli.
72    ///
73    /// # Paths without a Scheme
74    ///
75    /// If no scheme is provided, or the string is an absolute filesystem path
76    /// as determined by [`std::path::Path::is_absolute`], the string will be
77    /// interpreted as a path on the local filesystem using the operating
78    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
79    ///
80    /// If the path contains any of `'?', '*', '['`, it will be considered
81    /// a glob expression and resolved as described in the section below.
82    ///
83    /// Otherwise, the path will be resolved to an absolute path based on the current
84    /// working directory, and converted to a [file URI].
85    ///
86    /// If the path already exists in the local filesystem this will be used to determine if this
87    /// [`ListingTableUrl`] refers to a collection or a single object, otherwise the presence
88    /// of a trailing path delimiter will be used to indicate a directory. For the avoidance
89    /// of ambiguity it is recommended users always include trailing `/` when intending to
90    /// refer to a directory.
91    ///
92    /// ## Glob File Paths
93    ///
94    /// If no scheme is provided, and the path contains a glob expression, it will
95    /// be resolved as follows.
96    ///
97    /// The string up to the first path segment containing a glob expression will be extracted,
98    /// and resolved in the same manner as a normal scheme-less path above.
99    ///
100    /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a
101    /// filter when listing files from object storage
102    ///
103    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
104    /// [URL]: https://url.spec.whatwg.org/
105    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
106        let s = s.as_ref();
107
108        // This is necessary to handle the case of a path starting with a drive letter
109        #[cfg(not(target_arch = "wasm32"))]
110        if std::path::Path::new(s).is_absolute() {
111            return Self::parse_path(s);
112        }
113
114        match Url::parse(s) {
115            Ok(url) => Self::try_new(url, None),
116            #[cfg(not(target_arch = "wasm32"))]
117            Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
118            Err(e) => Err(DataFusionError::External(Box::new(e))),
119        }
120    }
121
122    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
123    #[cfg(not(target_arch = "wasm32"))]
124    fn parse_path(s: &str) -> Result<Self> {
125        let (path, glob) = match split_glob_expression(s) {
126            Some((prefix, glob)) => {
127                let glob = Pattern::new(glob)
128                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
129                (prefix, Some(glob))
130            }
131            None => (s, None),
132        };
133
134        let url = url_from_filesystem_path(path).ok_or_else(|| {
135            DataFusionError::External(
136                format!("Failed to convert path to URL: {path}").into(),
137            )
138        })?;
139
140        Self::try_new(url, glob)
141    }
142
143    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
144    ///
145    /// [`Self::parse`] supports glob expression only for file system paths.
146    /// However, some applications may want to support glob expression for URLs with a scheme.
147    /// The application can split the URL into a base URL and a glob expression and use this method
148    /// to create a [`ListingTableUrl`].
149    pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
150        let prefix = Path::from_url_path(url.path())?;
151        Ok(Self {
152            url,
153            prefix,
154            glob,
155            table_ref: None,
156        })
157    }
158
159    /// Returns the URL scheme
160    pub fn scheme(&self) -> &str {
161        self.url.scheme()
162    }
163
164    /// Return the URL path not excluding any glob expression
165    ///
166    /// If [`Self::is_collection`], this is the listing prefix
167    /// Otherwise, this is the path to the object
168    pub fn prefix(&self) -> &Path {
169        &self.prefix
170    }
171
172    /// Returns `true` if `path` matches this [`ListingTableUrl`]
173    pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
174        let Some(all_segments) = self.strip_prefix(path) else {
175            return false;
176        };
177
178        // remove any segments that contain `=` as they are allowed even
179        // when ignore subdirectories is `true`.
180        let mut segments = all_segments.filter(|s| !s.contains('='));
181
182        match &self.glob {
183            Some(glob) => {
184                if ignore_subdirectory {
185                    segments
186                        .next()
187                        .is_some_and(|file_name| glob.matches(file_name))
188                } else {
189                    let stripped = segments.join(DELIMITER);
190                    glob.matches(&stripped)
191                }
192            }
193            // where we are ignoring subdirectories, we require
194            // the path to be either empty, or contain just the
195            // final file name segment.
196            None if ignore_subdirectory => segments.count() <= 1,
197            // in this case, any valid path at or below the url is allowed
198            None => true,
199        }
200    }
201
202    /// Returns `true` if `path` refers to a collection of objects
203    pub fn is_collection(&self) -> bool {
204        self.url.path().ends_with(DELIMITER)
205    }
206
207    /// Returns the file extension of the last path segment if it exists
208    ///
209    /// Examples:
210    /// ```rust
211    /// use datafusion_datasource::ListingTableUrl;
212    /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
213    /// assert_eq!(url.file_extension(), Some("csv"));
214    /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
215    /// assert_eq!(url.file_extension(), None);
216    /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
217    /// assert_eq!(url.file_extension(), None);
218    /// ```
219    pub fn file_extension(&self) -> Option<&str> {
220        if let Some(mut segments) = self.url.path_segments()
221            && let Some(last_segment) = segments.next_back()
222            && last_segment.contains(".")
223            && !last_segment.ends_with(".")
224        {
225            return last_segment.split('.').next_back();
226        }
227
228        None
229    }
230
231    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
232    /// an iterator of the remaining path segments
233    pub fn strip_prefix<'a, 'b: 'a>(
234        &'a self,
235        path: &'b Path,
236    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
237        let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
238        if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
239            stripped = stripped.strip_prefix(DELIMITER)?;
240        }
241        Some(stripped.split_terminator(DELIMITER))
242    }
243
244    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`,
245    /// optionally filtering by a path prefix
246    pub async fn list_prefixed_files<'a>(
247        &'a self,
248        ctx: &'a dyn Session,
249        store: &'a dyn ObjectStore,
250        prefix: Option<Path>,
251        file_extension: &'a str,
252    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
253        let exec_options = &ctx.config_options().execution;
254        let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
255
256        // Build full_prefix for non-cached path and head() calls
257        let full_prefix = if let Some(ref p) = prefix {
258            let mut parts = self.prefix.parts().collect::<Vec<_>>();
259            parts.extend(p.parts());
260            Path::from_iter(parts.into_iter())
261        } else {
262            self.prefix.clone()
263        };
264
265        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
266            list_with_cache(
267                ctx,
268                store,
269                self.table_ref.as_ref(),
270                &self.prefix,
271                prefix.as_ref(),
272            )
273            .await?
274        } else {
275            match store.head(&full_prefix).await {
276                Ok(meta) => futures::stream::once(async { Ok(meta) })
277                    .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
278                    .boxed(),
279                // If the head command fails, it is likely that object doesn't exist.
280                // Retry as though it were a prefix (aka a collection)
281                Err(object_store::Error::NotFound { .. }) => {
282                    list_with_cache(
283                        ctx,
284                        store,
285                        self.table_ref.as_ref(),
286                        &self.prefix,
287                        prefix.as_ref(),
288                    )
289                    .await?
290                }
291                Err(e) => return Err(e.into()),
292            }
293        };
294
295        Ok(list
296            .try_filter(move |meta| {
297                let path = &meta.location;
298                let extension_match = path.as_ref().ends_with(file_extension);
299                let glob_match = self.contains(path, ignore_subdirectory);
300                futures::future::ready(extension_match && glob_match)
301            })
302            .boxed())
303    }
304
305    /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
306    pub async fn list_all_files<'a>(
307        &'a self,
308        ctx: &'a dyn Session,
309        store: &'a dyn ObjectStore,
310        file_extension: &'a str,
311    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
312        self.list_prefixed_files(ctx, store, None, file_extension)
313            .await
314    }
315
316    /// Returns this [`ListingTableUrl`] as a string
317    pub fn as_str(&self) -> &str {
318        self.as_ref()
319    }
320
321    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
322    pub fn object_store(&self) -> ObjectStoreUrl {
323        let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
324        ObjectStoreUrl::parse(url).unwrap()
325    }
326
327    /// Returns true if the [`ListingTableUrl`] points to the folder
328    pub fn is_folder(&self) -> bool {
329        self.url.scheme() == "file" && self.is_collection()
330    }
331
332    /// Return the `url` for [`ListingTableUrl`]
333    pub fn get_url(&self) -> &Url {
334        &self.url
335    }
336
337    /// Return the `glob` for [`ListingTableUrl`]
338    pub fn get_glob(&self) -> &Option<Pattern> {
339        &self.glob
340    }
341
342    /// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
343    pub fn with_glob(self, glob: &str) -> Result<Self> {
344        let glob =
345            Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
346        Self::try_new(self.url, Some(glob))
347    }
348
349    pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
350        self.table_ref = Some(table_ref);
351        self
352    }
353
354    pub fn get_table_ref(&self) -> &Option<TableReference> {
355        &self.table_ref
356    }
357}
358
359/// Lists files with cache support, using prefix-aware lookups.
360///
361/// # Arguments
362/// * `ctx` - The session context
363/// * `store` - The object store to list from
364/// * `table_base_path` - The table's base path (the stable cache key)
365/// * `prefix` - Optional prefix relative to table base for filtering results
366///
367/// # Cache Behavior:
368/// The cache key is always `table_base_path`. When a prefix-filtered listing
369/// is requested via `prefix`, the cache:
370/// - Looks up `table_base_path` in the cache
371/// - Filters results to match `table_base_path/prefix`
372/// - Returns filtered results without a storage call
373///
374/// On cache miss, the full table is always listed and cached, ensuring
375/// subsequent prefix queries can be served from cache.
376async fn list_with_cache<'b>(
377    ctx: &'b dyn Session,
378    store: &'b dyn ObjectStore,
379    table_ref: Option<&TableReference>,
380    table_base_path: &Path,
381    prefix: Option<&Path>,
382) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
383    // Build the full listing path (table_base + prefix)
384    let full_prefix = match prefix {
385        Some(p) => {
386            let mut parts: Vec<_> = table_base_path.parts().collect();
387            parts.extend(p.parts());
388            Path::from_iter(parts)
389        }
390        None => table_base_path.clone(),
391    };
392
393    match ctx.runtime_env().cache_manager.get_list_files_cache() {
394        None => Ok(store
395            .list(Some(&full_prefix))
396            .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
397            .boxed()),
398        Some(cache) => {
399            // Convert prefix to Option<Path> for cache lookup
400            let prefix_filter = prefix.cloned();
401
402            let table_scoped_base_path = TableScopedPath {
403                table: table_ref.cloned(),
404                path: table_base_path.clone(),
405            };
406
407            // Try cache lookup with optional prefix filter
408            let vec = if let Some(res) =
409                cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
410            {
411                debug!("Hit list files cache");
412                res.as_ref().clone()
413            } else {
414                // Cache miss - always list and cache the full table
415                // This ensures we have complete data for future prefix queries
416                let vec = store
417                    .list(Some(table_base_path))
418                    .try_collect::<Vec<ObjectMeta>>()
419                    .await?;
420                cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
421
422                // If a prefix filter was requested, apply it to the results
423                if prefix.is_some() {
424                    let full_prefix_str = full_prefix.as_ref();
425                    vec.into_iter()
426                        .filter(|meta| {
427                            meta.location.as_ref().starts_with(full_prefix_str)
428                        })
429                        .collect()
430                } else {
431                    vec
432                }
433            };
434            Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
435        }
436    }
437}
438
439/// Creates a file URL from a potentially relative filesystem path
440#[cfg(not(target_arch = "wasm32"))]
441fn url_from_filesystem_path(s: &str) -> Option<Url> {
442    let path = std::path::Path::new(s);
443    let is_dir = match path.exists() {
444        true => path.is_dir(),
445        // Fallback to inferring from trailing separator
446        false => std::path::is_separator(s.chars().last()?),
447    };
448
449    let from_absolute_path = |p| {
450        let first = match is_dir {
451            true => Url::from_directory_path(p).ok(),
452            false => Url::from_file_path(p).ok(),
453        }?;
454
455        // By default from_*_path preserve relative path segments
456        // We therefore parse the URL again to resolve these
457        Url::parse(first.as_str()).ok()
458    };
459
460    if path.is_absolute() {
461        return from_absolute_path(path);
462    }
463
464    let absolute = std::env::current_dir().ok()?.join(path);
465    from_absolute_path(&absolute)
466}
467
468impl AsRef<str> for ListingTableUrl {
469    fn as_ref(&self) -> &str {
470        self.url.as_ref()
471    }
472}
473
474impl AsRef<Url> for ListingTableUrl {
475    fn as_ref(&self) -> &Url {
476        &self.url
477    }
478}
479
480impl std::fmt::Display for ListingTableUrl {
481    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482        self.as_str().fmt(f)
483    }
484}
485
486#[cfg(not(target_arch = "wasm32"))]
487const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
488
489/// Splits `path` at the first path segment containing a glob expression, returning
490/// `None` if no glob expression found.
491///
492/// Path delimiters are determined using [`std::path::is_separator`] which
493/// permits `/` as a path delimiter even on Windows platforms.
494#[cfg(not(target_arch = "wasm32"))]
495fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
496    let mut last_separator = 0;
497
498    for (byte_idx, char) in path.char_indices() {
499        if GLOB_START_CHARS.contains(&char) {
500            if last_separator == 0 {
501                return Some((".", path));
502            }
503            return Some(path.split_at(last_separator));
504        }
505
506        if std::path::is_separator(char) {
507            last_separator = byte_idx + char.len_utf8();
508        }
509    }
510    None
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use async_trait::async_trait;
517    use bytes::Bytes;
518    use datafusion_common::DFSchema;
519    use datafusion_common::config::TableOptions;
520    use datafusion_execution::TaskContext;
521    use datafusion_execution::config::SessionConfig;
522    use datafusion_execution::runtime_env::RuntimeEnv;
523    use datafusion_expr::execution_props::ExecutionProps;
524    use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
525    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
526    use datafusion_physical_plan::ExecutionPlan;
527    use object_store::{
528        GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
529        PutPayload,
530    };
531    use std::any::Any;
532    use std::collections::HashMap;
533    use std::ops::Range;
534    use tempfile::tempdir;
535
536    #[test]
537    fn test_prefix_path() {
538        let root = std::env::current_dir().unwrap();
539        let root = root.to_string_lossy();
540
541        let url = ListingTableUrl::parse(root).unwrap();
542        let child = url.prefix.child("partition").child("file");
543
544        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
545        assert_eq!(prefix, vec!["partition", "file"]);
546
547        let url = ListingTableUrl::parse("file:///").unwrap();
548        let child = Path::parse("/foo/bar").unwrap();
549        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
550        assert_eq!(prefix, vec!["foo", "bar"]);
551
552        let url = ListingTableUrl::parse("file:///foo").unwrap();
553        let child = Path::parse("/foob/bar").unwrap();
554        assert!(url.strip_prefix(&child).is_none());
555
556        let url = ListingTableUrl::parse("file:///foo/file").unwrap();
557        let child = Path::parse("/foo/file").unwrap();
558        assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
559
560        let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
561        assert_eq!(url.prefix.as_ref(), "foo/ bar");
562
563        let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
564        assert_eq!(url.prefix.as_ref(), "foo/bar");
565
566        let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
567        assert_eq!(url.prefix.as_ref(), "foo/😺");
568
569        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
570        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
571
572        let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
573        assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
574
575        let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
576        assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
577
578        let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
579        assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
580
581        let dir = tempdir().unwrap();
582        let path = dir.path().join("bar%2Ffoo");
583        std::fs::File::create(&path).unwrap();
584
585        let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
586        assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
587
588        let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
589        assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
590
591        let url =
592            ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
593        assert_eq!(url.prefix.as_ref(), "baz/test.txt");
594
595        let workdir = std::env::current_dir().unwrap();
596        let t = workdir.join("non-existent");
597        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
598        let b = ListingTableUrl::parse("non-existent").unwrap();
599        assert_eq!(a, b);
600        assert!(a.prefix.as_ref().ends_with("non-existent"));
601
602        let t = workdir.parent().unwrap();
603        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
604        let b = ListingTableUrl::parse("..").unwrap();
605        assert_eq!(a, b);
606
607        let t = t.join("bar");
608        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
609        let b = ListingTableUrl::parse("../bar").unwrap();
610        assert_eq!(a, b);
611        assert!(a.prefix.as_ref().ends_with("bar"));
612
613        let t = t.join(".").join("foo").join("..").join("baz");
614        let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
615        let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
616        assert_eq!(a, b);
617        assert!(a.prefix.as_ref().ends_with("bar/baz"));
618    }
619
620    #[test]
621    fn test_prefix_s3() {
622        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
623        assert_eq!(url.prefix.as_ref(), "foo/bar");
624
625        let path = Path::from("foo/bar/partition/foo.parquet");
626        let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
627        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
628
629        let path = Path::from("other/bar/partition/foo.parquet");
630        assert!(url.strip_prefix(&path).is_none());
631    }
632
633    #[test]
634    fn test_split_glob() {
635        fn test(input: &str, expected: Option<(&str, &str)>) {
636            assert_eq!(
637                split_glob_expression(input),
638                expected,
639                "testing split_glob_expression with {input}"
640            );
641        }
642
643        // no glob patterns
644        test("/", None);
645        test("/a.txt", None);
646        test("/a", None);
647        test("/a/", None);
648        test("/a/b", None);
649        test("/a/b/", None);
650        test("/a/b.txt", None);
651        test("/a/b/c.txt", None);
652        // glob patterns, thus we build the longest path (os-specific)
653        test("*.txt", Some((".", "*.txt")));
654        test("/*.txt", Some(("/", "*.txt")));
655        test("/a/*b.txt", Some(("/a/", "*b.txt")));
656        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
657        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
658        test("/a/b*.txt", Some(("/a/", "b*.txt")));
659        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
660
661        // https://github.com/apache/datafusion/issues/2465
662        test(
663            "/a/b/c//alltypes_plain*.parquet",
664            Some(("/a/b/c//", "alltypes_plain*.parquet")),
665        );
666    }
667
668    #[test]
669    fn test_is_collection() {
670        fn test(input: &str, expected: bool, message: &str) {
671            let url = ListingTableUrl::parse(input).unwrap();
672            assert_eq!(url.is_collection(), expected, "{message}");
673        }
674
675        test("https://a.b.c/path/", true, "path ends with / - collection");
676        test(
677            "https://a.b.c/path/?a=b",
678            true,
679            "path ends with / - with query args - collection",
680        );
681        test(
682            "https://a.b.c/path?a=b/",
683            false,
684            "path not ends with / - query ends with / - not collection",
685        );
686        test(
687            "https://a.b.c/path/#a=b",
688            true,
689            "path ends with / - with fragment - collection",
690        );
691        test(
692            "https://a.b.c/path#a=b/",
693            false,
694            "path not ends with / - fragment ends with / - not collection",
695        );
696    }
697
698    #[test]
699    fn test_file_extension() {
700        fn test(input: &str, expected: Option<&str>, message: &str) {
701            let url = ListingTableUrl::parse(input).unwrap();
702            assert_eq!(url.file_extension(), expected, "{message}");
703        }
704
705        test("https://a.b.c/path/", None, "path ends with / - not a file");
706        test(
707            "https://a.b.c/path/?a=b",
708            None,
709            "path ends with / - with query args - not a file",
710        );
711        test(
712            "https://a.b.c/path?a=b/",
713            None,
714            "path not ends with / - query ends with / but no file extension",
715        );
716        test(
717            "https://a.b.c/path/#a=b",
718            None,
719            "path ends with / - with fragment - not a file",
720        );
721        test(
722            "https://a.b.c/path#a=b/",
723            None,
724            "path not ends with / - fragment ends with / but no file extension",
725        );
726        test(
727            "file///some/path/",
728            None,
729            "file path ends with / - not a file",
730        );
731        test(
732            "file///some/path/file",
733            None,
734            "file path does not end with - no extension",
735        );
736        test(
737            "file///some/path/file.",
738            None,
739            "file path ends with . - no value after .",
740        );
741        test(
742            "file///some/path/file.ext",
743            Some("ext"),
744            "file path ends with .ext - extension is ext",
745        );
746    }
747
748    #[tokio::test]
749    async fn test_list_files() -> Result<()> {
750        let store = MockObjectStore {
751            in_mem: object_store::memory::InMemory::new(),
752            forbidden_paths: vec!["forbidden/e.parquet".into()],
753        };
754
755        // Create some files:
756        create_file(&store, "a.parquet").await;
757        create_file(&store, "/t/b.parquet").await;
758        create_file(&store, "/t/c.csv").await;
759        create_file(&store, "/t/d.csv").await;
760
761        // This file returns a permission error.
762        create_file(&store, "/forbidden/e.parquet").await;
763
764        assert_eq!(
765            list_all_files("/", &store, "parquet").await?,
766            vec!["a.parquet"],
767        );
768
769        // test with and without trailing slash
770        assert_eq!(
771            list_all_files("/t/", &store, "parquet").await?,
772            vec!["t/b.parquet"],
773        );
774        assert_eq!(
775            list_all_files("/t", &store, "parquet").await?,
776            vec!["t/b.parquet"],
777        );
778
779        // test with and without trailing slash
780        assert_eq!(
781            list_all_files("/t", &store, "csv").await?,
782            vec!["t/c.csv", "t/d.csv"],
783        );
784        assert_eq!(
785            list_all_files("/t/", &store, "csv").await?,
786            vec!["t/c.csv", "t/d.csv"],
787        );
788
789        // Test a non existing prefix
790        assert_eq!(
791            list_all_files("/NonExisting", &store, "csv").await?,
792            vec![] as Vec<String>
793        );
794        assert_eq!(
795            list_all_files("/NonExisting/", &store, "csv").await?,
796            vec![] as Vec<String>
797        );
798
799        // Including forbidden.parquet generates an error.
800        let Err(DataFusionError::ObjectStore(err)) =
801            list_all_files("/forbidden/e.parquet", &store, "parquet").await
802        else {
803            panic!("Expected ObjectStore error");
804        };
805
806        let object_store::Error::PermissionDenied { .. } = &*err else {
807            panic!("Expected PermissionDenied error");
808        };
809
810        // Test prefix filtering with partition-style paths
811        create_file(&store, "/data/a=1/file1.parquet").await;
812        create_file(&store, "/data/a=1/b=100/file2.parquet").await;
813        create_file(&store, "/data/a=2/b=200/file3.parquet").await;
814        create_file(&store, "/data/a=2/b=200/file4.csv").await;
815
816        assert_eq!(
817            list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
818                .await?,
819            vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
820        );
821
822        assert_eq!(
823            list_prefixed_files(
824                "/data/",
825                &store,
826                Some(Path::from("a=1/b=100")),
827                "parquet"
828            )
829            .await?,
830            vec!["data/a=1/b=100/file2.parquet"],
831        );
832
833        assert_eq!(
834            list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
835                .await?,
836            vec!["data/a=2/b=200/file3.parquet"],
837        );
838
839        Ok(())
840    }
841
842    /// Tests that the cached code path produces identical results to the non-cached path.
843    ///
844    /// This is critical: the cache is a transparent optimization, so both paths
845    /// MUST return the same files. Note: order is not guaranteed by ObjectStore::list,
846    /// so we sort results before comparison.
847    #[tokio::test]
848    async fn test_cache_path_equivalence() -> Result<()> {
849        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
850
851        let store = MockObjectStore {
852            in_mem: object_store::memory::InMemory::new(),
853            forbidden_paths: vec![],
854        };
855
856        // Create test files with partition-style paths
857        create_file(&store, "/table/year=2023/data1.parquet").await;
858        create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
859        create_file(&store, "/table/year=2024/data3.parquet").await;
860        create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
861        create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
862
863        // Session WITHOUT cache
864        let session_no_cache = MockSession::new();
865
866        // Session WITH cache - use RuntimeEnvBuilder with cache limit (no TTL needed for this test)
867        let runtime_with_cache = RuntimeEnvBuilder::new()
868            .with_object_list_cache_limit(1024 * 1024) // 1MB limit
869            .build_arc()?;
870        let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
871
872        // Test cases: (url, prefix, description)
873        let test_cases = vec![
874            ("/table/", None, "full table listing"),
875            (
876                "/table/",
877                Some(Path::from("year=2023")),
878                "single partition filter",
879            ),
880            (
881                "/table/",
882                Some(Path::from("year=2024")),
883                "different partition filter",
884            ),
885            (
886                "/table/",
887                Some(Path::from("year=2024/month=06")),
888                "nested partition filter",
889            ),
890            (
891                "/table/",
892                Some(Path::from("year=2025")),
893                "non-existent partition",
894            ),
895        ];
896
897        for (url_str, prefix, description) in test_cases {
898            let url = ListingTableUrl::parse(url_str)?;
899
900            // Get results WITHOUT cache (sorted for comparison)
901            let mut results_no_cache: Vec<String> = url
902                .list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
903                .await?
904                .try_collect::<Vec<_>>()
905                .await?
906                .into_iter()
907                .map(|m| m.location.to_string())
908                .collect();
909            results_no_cache.sort();
910
911            // Get results WITH cache (first call - cache miss, sorted for comparison)
912            let mut results_with_cache_miss: Vec<String> = url
913                .list_prefixed_files(
914                    &session_with_cache,
915                    &store,
916                    prefix.clone(),
917                    "parquet",
918                )
919                .await?
920                .try_collect::<Vec<_>>()
921                .await?
922                .into_iter()
923                .map(|m| m.location.to_string())
924                .collect();
925            results_with_cache_miss.sort();
926
927            // Get results WITH cache (second call - cache hit, sorted for comparison)
928            let mut results_with_cache_hit: Vec<String> = url
929                .list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
930                .await?
931                .try_collect::<Vec<_>>()
932                .await?
933                .into_iter()
934                .map(|m| m.location.to_string())
935                .collect();
936            results_with_cache_hit.sort();
937
938            // All three should contain the same files
939            assert_eq!(
940                results_no_cache, results_with_cache_miss,
941                "Cache miss path should match non-cached path for: {description}"
942            );
943            assert_eq!(
944                results_no_cache, results_with_cache_hit,
945                "Cache hit path should match non-cached path for: {description}"
946            );
947        }
948
949        Ok(())
950    }
951
952    /// Tests that prefix queries can be served from a cached full-table listing
953    #[tokio::test]
954    async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
955        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
956
957        let store = MockObjectStore {
958            in_mem: object_store::memory::InMemory::new(),
959            forbidden_paths: vec![],
960        };
961
962        // Create test files
963        create_file(&store, "/sales/region=US/q1.parquet").await;
964        create_file(&store, "/sales/region=US/q2.parquet").await;
965        create_file(&store, "/sales/region=EU/q1.parquet").await;
966
967        // Create session with cache (no TTL needed for this test)
968        let runtime = RuntimeEnvBuilder::new()
969            .with_object_list_cache_limit(1024 * 1024) // 1MB limit
970            .build_arc()?;
971        let session = MockSession::with_runtime_env(runtime);
972
973        let url = ListingTableUrl::parse("/sales/")?;
974
975        // First: query full table (populates cache)
976        let full_results: Vec<String> = url
977            .list_prefixed_files(&session, &store, None, "parquet")
978            .await?
979            .try_collect::<Vec<_>>()
980            .await?
981            .into_iter()
982            .map(|m| m.location.to_string())
983            .collect();
984        assert_eq!(full_results.len(), 3);
985
986        // Second: query with prefix (should be served from cache)
987        let mut us_results: Vec<String> = url
988            .list_prefixed_files(
989                &session,
990                &store,
991                Some(Path::from("region=US")),
992                "parquet",
993            )
994            .await?
995            .try_collect::<Vec<_>>()
996            .await?
997            .into_iter()
998            .map(|m| m.location.to_string())
999            .collect();
1000        us_results.sort();
1001
1002        assert_eq!(
1003            us_results,
1004            vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
1005        );
1006
1007        // Third: different prefix (also from cache)
1008        let eu_results: Vec<String> = url
1009            .list_prefixed_files(
1010                &session,
1011                &store,
1012                Some(Path::from("region=EU")),
1013                "parquet",
1014            )
1015            .await?
1016            .try_collect::<Vec<_>>()
1017            .await?
1018            .into_iter()
1019            .map(|m| m.location.to_string())
1020            .collect();
1021
1022        assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
1023
1024        Ok(())
1025    }
1026
1027    /// Creates a file with "hello world" content at the specified path
1028    async fn create_file(object_store: &dyn ObjectStore, path: &str) {
1029        object_store
1030            .put(&Path::from(path), PutPayload::from_static(b"hello world"))
1031            .await
1032            .expect("failed to create test file");
1033    }
1034
1035    /// Runs "list_prefixed_files"  with no prefix to list all files and returns their paths
1036    ///
1037    /// Panic's on error
1038    async fn list_all_files(
1039        url: &str,
1040        store: &dyn ObjectStore,
1041        file_extension: &str,
1042    ) -> Result<Vec<String>> {
1043        try_list_prefixed_files(url, store, None, file_extension).await
1044    }
1045
1046    /// Runs "list_prefixed_files" and returns their paths
1047    ///
1048    /// Panic's on error
1049    async fn list_prefixed_files(
1050        url: &str,
1051        store: &dyn ObjectStore,
1052        prefix: Option<Path>,
1053        file_extension: &str,
1054    ) -> Result<Vec<String>> {
1055        try_list_prefixed_files(url, store, prefix, file_extension).await
1056    }
1057
1058    /// Runs "list_prefixed_files" and returns their paths
1059    async fn try_list_prefixed_files(
1060        url: &str,
1061        store: &dyn ObjectStore,
1062        prefix: Option<Path>,
1063        file_extension: &str,
1064    ) -> Result<Vec<String>> {
1065        let session = MockSession::new();
1066        let url = ListingTableUrl::parse(url)?;
1067        let files = url
1068            .list_prefixed_files(&session, store, prefix, file_extension)
1069            .await?
1070            .try_collect::<Vec<_>>()
1071            .await?
1072            .into_iter()
1073            .map(|meta| meta.location.as_ref().to_string())
1074            .collect();
1075        Ok(files)
1076    }
1077
1078    #[derive(Debug)]
1079    struct MockObjectStore {
1080        in_mem: object_store::memory::InMemory,
1081        forbidden_paths: Vec<Path>,
1082    }
1083
1084    impl std::fmt::Display for MockObjectStore {
1085        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1086            self.in_mem.fmt(f)
1087        }
1088    }
1089
1090    #[async_trait]
1091    impl ObjectStore for MockObjectStore {
1092        async fn put_opts(
1093            &self,
1094            location: &Path,
1095            payload: PutPayload,
1096            opts: object_store::PutOptions,
1097        ) -> object_store::Result<object_store::PutResult> {
1098            self.in_mem.put_opts(location, payload, opts).await
1099        }
1100
1101        async fn put_multipart_opts(
1102            &self,
1103            location: &Path,
1104            opts: PutMultipartOptions,
1105        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1106            self.in_mem.put_multipart_opts(location, opts).await
1107        }
1108
1109        async fn get_opts(
1110            &self,
1111            location: &Path,
1112            options: GetOptions,
1113        ) -> object_store::Result<GetResult> {
1114            self.in_mem.get_opts(location, options).await
1115        }
1116
1117        async fn get_ranges(
1118            &self,
1119            location: &Path,
1120            ranges: &[Range<u64>],
1121        ) -> object_store::Result<Vec<Bytes>> {
1122            self.in_mem.get_ranges(location, ranges).await
1123        }
1124
1125        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1126            if self.forbidden_paths.contains(location) {
1127                Err(object_store::Error::PermissionDenied {
1128                    path: location.to_string(),
1129                    source: "forbidden".into(),
1130                })
1131            } else {
1132                self.in_mem.head(location).await
1133            }
1134        }
1135
1136        async fn delete(&self, location: &Path) -> object_store::Result<()> {
1137            self.in_mem.delete(location).await
1138        }
1139
1140        fn list(
1141            &self,
1142            prefix: Option<&Path>,
1143        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1144            self.in_mem.list(prefix)
1145        }
1146
1147        async fn list_with_delimiter(
1148            &self,
1149            prefix: Option<&Path>,
1150        ) -> object_store::Result<ListResult> {
1151            self.in_mem.list_with_delimiter(prefix).await
1152        }
1153
1154        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1155            self.in_mem.copy(from, to).await
1156        }
1157
1158        async fn copy_if_not_exists(
1159            &self,
1160            from: &Path,
1161            to: &Path,
1162        ) -> object_store::Result<()> {
1163            self.in_mem.copy_if_not_exists(from, to).await
1164        }
1165    }
1166
1167    struct MockSession {
1168        config: SessionConfig,
1169        runtime_env: Arc<RuntimeEnv>,
1170    }
1171
1172    impl MockSession {
1173        fn new() -> Self {
1174            Self {
1175                config: SessionConfig::new(),
1176                runtime_env: Arc::new(RuntimeEnv::default()),
1177            }
1178        }
1179
1180        /// Create a MockSession with a custom RuntimeEnv (for cache testing)
1181        fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
1182            Self {
1183                config: SessionConfig::new(),
1184                runtime_env,
1185            }
1186        }
1187    }
1188
1189    #[async_trait::async_trait]
1190    impl Session for MockSession {
1191        fn session_id(&self) -> &str {
1192            unimplemented!()
1193        }
1194
1195        fn config(&self) -> &SessionConfig {
1196            &self.config
1197        }
1198
1199        async fn create_physical_plan(
1200            &self,
1201            _logical_plan: &LogicalPlan,
1202        ) -> Result<Arc<dyn ExecutionPlan>> {
1203            unimplemented!()
1204        }
1205
1206        fn create_physical_expr(
1207            &self,
1208            _expr: Expr,
1209            _df_schema: &DFSchema,
1210        ) -> Result<Arc<dyn PhysicalExpr>> {
1211            unimplemented!()
1212        }
1213
1214        fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
1215            unimplemented!()
1216        }
1217
1218        fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
1219            unimplemented!()
1220        }
1221
1222        fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
1223            unimplemented!()
1224        }
1225
1226        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1227            &self.runtime_env
1228        }
1229
1230        fn execution_props(&self) -> &ExecutionProps {
1231            unimplemented!()
1232        }
1233
1234        fn as_any(&self) -> &dyn Any {
1235            unimplemented!()
1236        }
1237
1238        fn table_options(&self) -> &TableOptions {
1239            unimplemented!()
1240        }
1241
1242        fn table_options_mut(&mut self) -> &mut TableOptions {
1243            unimplemented!()
1244        }
1245
1246        fn task_ctx(&self) -> Arc<TaskContext> {
1247            unimplemented!()
1248        }
1249    }
1250}