1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38 url: Url,
40 prefix: Path,
42 glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103 let s = s.as_ref();
104
105 #[cfg(not(target_arch = "wasm32"))]
107 if std::path::Path::new(s).is_absolute() {
108 return Self::parse_path(s);
109 }
110
111 match Url::parse(s) {
112 Ok(url) => Self::try_new(url, None),
113 #[cfg(not(target_arch = "wasm32"))]
114 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115 Err(e) => Err(DataFusionError::External(Box::new(e))),
116 }
117 }
118
119 #[cfg(not(target_arch = "wasm32"))]
121 fn parse_path(s: &str) -> Result<Self> {
122 let (path, glob) = match split_glob_expression(s) {
123 Some((prefix, glob)) => {
124 let glob = Pattern::new(glob)
125 .map_err(|e| DataFusionError::External(Box::new(e)))?;
126 (prefix, Some(glob))
127 }
128 None => (s, None),
129 };
130
131 let url = url_from_filesystem_path(path).ok_or_else(|| {
132 DataFusionError::External(
133 format!("Failed to convert path to URL: {path}").into(),
134 )
135 })?;
136
137 Self::try_new(url, glob)
138 }
139
140 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147 let prefix = Path::from_url_path(url.path())?;
148 Ok(Self { url, prefix, glob })
149 }
150
151 pub fn scheme(&self) -> &str {
153 self.url.scheme()
154 }
155
156 pub fn prefix(&self) -> &Path {
161 &self.prefix
162 }
163
164 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166 let Some(all_segments) = self.strip_prefix(path) else {
167 return false;
168 };
169
170 let mut segments = all_segments.filter(|s| !s.contains('='));
173
174 match &self.glob {
175 Some(glob) => {
176 if ignore_subdirectory {
177 segments
178 .next()
179 .is_some_and(|file_name| glob.matches(file_name))
180 } else {
181 let stripped = segments.join(DELIMITER);
182 glob.matches(&stripped)
183 }
184 }
185 None if ignore_subdirectory => segments.count() <= 1,
189 None => true,
191 }
192 }
193
194 pub fn is_collection(&self) -> bool {
196 self.url.path().ends_with(DELIMITER)
197 }
198
199 pub fn file_extension(&self) -> Option<&str> {
212 if let Some(mut segments) = self.url.path_segments() {
213 if let Some(last_segment) = segments.next_back() {
214 if last_segment.contains(".") && !last_segment.ends_with(".") {
215 return last_segment.split('.').next_back();
216 }
217 }
218 }
219
220 None
221 }
222
223 pub fn strip_prefix<'a, 'b: 'a>(
226 &'a self,
227 path: &'b Path,
228 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231 stripped = stripped.strip_prefix(DELIMITER)?;
232 }
233 Some(stripped.split_terminator(DELIMITER))
234 }
235
236 pub async fn list_all_files<'a>(
238 &'a self,
239 ctx: &'a dyn Session,
240 store: &'a dyn ObjectStore,
241 file_extension: &'a str,
242 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243 let exec_options = &ctx.config_options().execution;
244 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245 let list = match self.is_collection() {
247 true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
248 None => store.list(Some(&self.prefix)),
249 Some(cache) => {
250 if let Some(res) = cache.get(&self.prefix) {
251 debug!("Hit list all files cache");
252 futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
253 .boxed()
254 } else {
255 let list_res = store.list(Some(&self.prefix));
256 let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
257 cache.put(&self.prefix, Arc::new(vec.clone()));
258 futures::stream::iter(vec.into_iter().map(Ok)).boxed()
259 }
260 }
261 },
262 false => futures::stream::once(store.head(&self.prefix)).boxed(),
263 };
264 Ok(list
265 .try_filter(move |meta| {
266 let path = &meta.location;
267 let extension_match = path.as_ref().ends_with(file_extension);
268 let glob_match = self.contains(path, ignore_subdirectory);
269 futures::future::ready(extension_match && glob_match)
270 })
271 .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
272 .boxed())
273 }
274
275 pub fn as_str(&self) -> &str {
277 self.as_ref()
278 }
279
280 pub fn object_store(&self) -> ObjectStoreUrl {
282 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
283 ObjectStoreUrl::parse(url).unwrap()
284 }
285
286 pub fn is_folder(&self) -> bool {
288 self.url.scheme() == "file" && self.is_collection()
289 }
290
291 pub fn get_url(&self) -> &Url {
293 &self.url
294 }
295
296 pub fn get_glob(&self) -> &Option<Pattern> {
298 &self.glob
299 }
300
301 pub fn with_glob(self, glob: &str) -> Result<Self> {
303 let glob =
304 Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
305 Self::try_new(self.url, Some(glob))
306 }
307}
308
309#[cfg(not(target_arch = "wasm32"))]
311fn url_from_filesystem_path(s: &str) -> Option<Url> {
312 let path = std::path::Path::new(s);
313 let is_dir = match path.exists() {
314 true => path.is_dir(),
315 false => std::path::is_separator(s.chars().last()?),
317 };
318
319 let from_absolute_path = |p| {
320 let first = match is_dir {
321 true => Url::from_directory_path(p).ok(),
322 false => Url::from_file_path(p).ok(),
323 }?;
324
325 Url::parse(first.as_str()).ok()
328 };
329
330 if path.is_absolute() {
331 return from_absolute_path(path);
332 }
333
334 let absolute = std::env::current_dir().ok()?.join(path);
335 from_absolute_path(&absolute)
336}
337
338impl AsRef<str> for ListingTableUrl {
339 fn as_ref(&self) -> &str {
340 self.url.as_ref()
341 }
342}
343
344impl AsRef<Url> for ListingTableUrl {
345 fn as_ref(&self) -> &Url {
346 &self.url
347 }
348}
349
350impl std::fmt::Display for ListingTableUrl {
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 self.as_str().fmt(f)
353 }
354}
355
356#[cfg(not(target_arch = "wasm32"))]
357const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
358
359#[cfg(not(target_arch = "wasm32"))]
366fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
367 let mut last_separator = 0;
368
369 for (byte_idx, char) in path.char_indices() {
370 if GLOB_START_CHARS.contains(&char) {
371 if last_separator == 0 {
372 return Some((".", path));
373 }
374 return Some(path.split_at(last_separator));
375 }
376
377 if std::path::is_separator(char) {
378 last_separator = byte_idx + char.len_utf8();
379 }
380 }
381 None
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use tempfile::tempdir;
388
389 #[test]
390 fn test_prefix_path() {
391 let root = std::env::current_dir().unwrap();
392 let root = root.to_string_lossy();
393
394 let url = ListingTableUrl::parse(root).unwrap();
395 let child = url.prefix.child("partition").child("file");
396
397 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
398 assert_eq!(prefix, vec!["partition", "file"]);
399
400 let url = ListingTableUrl::parse("file:///").unwrap();
401 let child = Path::parse("/foo/bar").unwrap();
402 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
403 assert_eq!(prefix, vec!["foo", "bar"]);
404
405 let url = ListingTableUrl::parse("file:///foo").unwrap();
406 let child = Path::parse("/foob/bar").unwrap();
407 assert!(url.strip_prefix(&child).is_none());
408
409 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
410 let child = Path::parse("/foo/file").unwrap();
411 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
412
413 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
414 assert_eq!(url.prefix.as_ref(), "foo/ bar");
415
416 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
417 assert_eq!(url.prefix.as_ref(), "foo/bar");
418
419 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
420 assert_eq!(url.prefix.as_ref(), "foo/😺");
421
422 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
423 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
424
425 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
426 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
427
428 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
429 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
430
431 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
432 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
433
434 let dir = tempdir().unwrap();
435 let path = dir.path().join("bar%2Ffoo");
436 std::fs::File::create(&path).unwrap();
437
438 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
439 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
440
441 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
442 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
443
444 let url =
445 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
446 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
447
448 let workdir = std::env::current_dir().unwrap();
449 let t = workdir.join("non-existent");
450 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
451 let b = ListingTableUrl::parse("non-existent").unwrap();
452 assert_eq!(a, b);
453 assert!(a.prefix.as_ref().ends_with("non-existent"));
454
455 let t = workdir.parent().unwrap();
456 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
457 let b = ListingTableUrl::parse("..").unwrap();
458 assert_eq!(a, b);
459
460 let t = t.join("bar");
461 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
462 let b = ListingTableUrl::parse("../bar").unwrap();
463 assert_eq!(a, b);
464 assert!(a.prefix.as_ref().ends_with("bar"));
465
466 let t = t.join(".").join("foo").join("..").join("baz");
467 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
468 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
469 assert_eq!(a, b);
470 assert!(a.prefix.as_ref().ends_with("bar/baz"));
471 }
472
473 #[test]
474 fn test_prefix_s3() {
475 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
476 assert_eq!(url.prefix.as_ref(), "foo/bar");
477
478 let path = Path::from("foo/bar/partition/foo.parquet");
479 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
480 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
481
482 let path = Path::from("other/bar/partition/foo.parquet");
483 assert!(url.strip_prefix(&path).is_none());
484 }
485
486 #[test]
487 fn test_split_glob() {
488 fn test(input: &str, expected: Option<(&str, &str)>) {
489 assert_eq!(
490 split_glob_expression(input),
491 expected,
492 "testing split_glob_expression with {input}"
493 );
494 }
495
496 test("/", None);
498 test("/a.txt", None);
499 test("/a", None);
500 test("/a/", None);
501 test("/a/b", None);
502 test("/a/b/", None);
503 test("/a/b.txt", None);
504 test("/a/b/c.txt", None);
505 test("*.txt", Some((".", "*.txt")));
507 test("/*.txt", Some(("/", "*.txt")));
508 test("/a/*b.txt", Some(("/a/", "*b.txt")));
509 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
510 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
511 test("/a/b*.txt", Some(("/a/", "b*.txt")));
512 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
513
514 test(
516 "/a/b/c//alltypes_plain*.parquet",
517 Some(("/a/b/c//", "alltypes_plain*.parquet")),
518 );
519 }
520
521 #[test]
522 fn test_is_collection() {
523 fn test(input: &str, expected: bool, message: &str) {
524 let url = ListingTableUrl::parse(input).unwrap();
525 assert_eq!(url.is_collection(), expected, "{message}");
526 }
527
528 test("https://a.b.c/path/", true, "path ends with / - collection");
529 test(
530 "https://a.b.c/path/?a=b",
531 true,
532 "path ends with / - with query args - collection",
533 );
534 test(
535 "https://a.b.c/path?a=b/",
536 false,
537 "path not ends with / - query ends with / - not collection",
538 );
539 test(
540 "https://a.b.c/path/#a=b",
541 true,
542 "path ends with / - with fragment - collection",
543 );
544 test(
545 "https://a.b.c/path#a=b/",
546 false,
547 "path not ends with / - fragment ends with / - not collection",
548 );
549 }
550
551 #[test]
552 fn test_file_extension() {
553 fn test(input: &str, expected: Option<&str>, message: &str) {
554 let url = ListingTableUrl::parse(input).unwrap();
555 assert_eq!(url.file_extension(), expected, "{message}");
556 }
557
558 test("https://a.b.c/path/", None, "path ends with / - not a file");
559 test(
560 "https://a.b.c/path/?a=b",
561 None,
562 "path ends with / - with query args - not a file",
563 );
564 test(
565 "https://a.b.c/path?a=b/",
566 None,
567 "path not ends with / - query ends with / but no file extension",
568 );
569 test(
570 "https://a.b.c/path/#a=b",
571 None,
572 "path ends with / - with fragment - not a file",
573 );
574 test(
575 "https://a.b.c/path#a=b/",
576 None,
577 "path not ends with / - fragment ends with / but no file extension",
578 );
579 test(
580 "file///some/path/",
581 None,
582 "file path ends with / - not a file",
583 );
584 test(
585 "file///some/path/file",
586 None,
587 "file path does not end with - no extension",
588 );
589 test(
590 "file///some/path/file.",
591 None,
592 "file path ends with . - no value after .",
593 );
594 test(
595 "file///some/path/file.ext",
596 Some("ext"),
597 "file path ends with .ext - extension is ext",
598 );
599 }
600}