1use std::sync::Arc;
19
20use datafusion_common::{DataFusionError, Result};
21use datafusion_execution::object_store::ObjectStoreUrl;
22use datafusion_session::Session;
23
24use futures::stream::BoxStream;
25use futures::{StreamExt, TryStreamExt};
26use glob::Pattern;
27use itertools::Itertools;
28use log::debug;
29use object_store::path::Path;
30use object_store::path::DELIMITER;
31use object_store::{ObjectMeta, ObjectStore};
32use url::Url;
33
34#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct ListingTableUrl {
38 url: Url,
40 prefix: Path,
42 glob: Option<Pattern>,
44}
45
46impl ListingTableUrl {
47 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
103 let s = s.as_ref();
104
105 #[cfg(not(target_arch = "wasm32"))]
107 if std::path::Path::new(s).is_absolute() {
108 return Self::parse_path(s);
109 }
110
111 match Url::parse(s) {
112 Ok(url) => Self::try_new(url, None),
113 #[cfg(not(target_arch = "wasm32"))]
114 Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
115 Err(e) => Err(DataFusionError::External(Box::new(e))),
116 }
117 }
118
119 #[cfg(not(target_arch = "wasm32"))]
121 fn parse_path(s: &str) -> Result<Self> {
122 let (path, glob) = match split_glob_expression(s) {
123 Some((prefix, glob)) => {
124 let glob = Pattern::new(glob)
125 .map_err(|e| DataFusionError::External(Box::new(e)))?;
126 (prefix, Some(glob))
127 }
128 None => (s, None),
129 };
130
131 let url = url_from_filesystem_path(path).ok_or_else(|| {
132 DataFusionError::External(
133 format!("Failed to convert path to URL: {path}").into(),
134 )
135 })?;
136
137 Self::try_new(url, glob)
138 }
139
140 pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
147 let prefix = Path::from_url_path(url.path())?;
148 Ok(Self { url, prefix, glob })
149 }
150
151 pub fn scheme(&self) -> &str {
153 self.url.scheme()
154 }
155
156 pub fn prefix(&self) -> &Path {
161 &self.prefix
162 }
163
164 pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
166 let Some(all_segments) = self.strip_prefix(path) else {
167 return false;
168 };
169
170 let mut segments = all_segments.filter(|s| !s.contains('='));
173
174 match &self.glob {
175 Some(glob) => {
176 if ignore_subdirectory {
177 segments
178 .next()
179 .is_some_and(|file_name| glob.matches(file_name))
180 } else {
181 let stripped = segments.join(DELIMITER);
182 glob.matches(&stripped)
183 }
184 }
185 None if ignore_subdirectory => segments.count() <= 1,
189 None => true,
191 }
192 }
193
194 pub fn is_collection(&self) -> bool {
196 self.url.path().ends_with(DELIMITER)
197 }
198
199 pub fn file_extension(&self) -> Option<&str> {
212 if let Some(mut segments) = self.url.path_segments() {
213 if let Some(last_segment) = segments.next_back() {
214 if last_segment.contains(".") && !last_segment.ends_with(".") {
215 return last_segment.split('.').next_back();
216 }
217 }
218 }
219
220 None
221 }
222
223 pub fn strip_prefix<'a, 'b: 'a>(
226 &'a self,
227 path: &'b Path,
228 ) -> Option<impl Iterator<Item = &'b str> + 'a> {
229 let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
230 if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
231 stripped = stripped.strip_prefix(DELIMITER)?;
232 }
233 Some(stripped.split_terminator(DELIMITER))
234 }
235
236 pub async fn list_all_files<'a>(
238 &'a self,
239 ctx: &'a dyn Session,
240 store: &'a dyn ObjectStore,
241 file_extension: &'a str,
242 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243 let exec_options = &ctx.config_options().execution;
244 let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245 let list = match self.is_collection() {
247 true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
248 None => store.list(Some(&self.prefix)),
249 Some(cache) => {
250 if let Some(res) = cache.get(&self.prefix) {
251 debug!("Hit list all files cache");
252 futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
253 .boxed()
254 } else {
255 let list_res = store.list(Some(&self.prefix));
256 let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
257 cache.put(&self.prefix, Arc::new(vec.clone()));
258 futures::stream::iter(vec.into_iter().map(Ok)).boxed()
259 }
260 }
261 },
262 false => futures::stream::once(store.head(&self.prefix)).boxed(),
263 };
264 Ok(list
265 .try_filter(move |meta| {
266 let path = &meta.location;
267 let extension_match = path.as_ref().ends_with(file_extension);
268 let glob_match = self.contains(path, ignore_subdirectory);
269 futures::future::ready(extension_match && glob_match)
270 })
271 .map_err(DataFusionError::ObjectStore)
272 .boxed())
273 }
274
275 pub fn as_str(&self) -> &str {
277 self.as_ref()
278 }
279
280 pub fn object_store(&self) -> ObjectStoreUrl {
282 let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
283 ObjectStoreUrl::parse(url).unwrap()
284 }
285}
286
287#[cfg(not(target_arch = "wasm32"))]
289fn url_from_filesystem_path(s: &str) -> Option<Url> {
290 let path = std::path::Path::new(s);
291 let is_dir = match path.exists() {
292 true => path.is_dir(),
293 false => std::path::is_separator(s.chars().last()?),
295 };
296
297 let from_absolute_path = |p| {
298 let first = match is_dir {
299 true => Url::from_directory_path(p).ok(),
300 false => Url::from_file_path(p).ok(),
301 }?;
302
303 Url::parse(first.as_str()).ok()
306 };
307
308 if path.is_absolute() {
309 return from_absolute_path(path);
310 }
311
312 let absolute = std::env::current_dir().ok()?.join(path);
313 from_absolute_path(&absolute)
314}
315
316impl AsRef<str> for ListingTableUrl {
317 fn as_ref(&self) -> &str {
318 self.url.as_ref()
319 }
320}
321
322impl AsRef<Url> for ListingTableUrl {
323 fn as_ref(&self) -> &Url {
324 &self.url
325 }
326}
327
328impl std::fmt::Display for ListingTableUrl {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 self.as_str().fmt(f)
331 }
332}
333
334#[cfg(not(target_arch = "wasm32"))]
335const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
336
337#[cfg(not(target_arch = "wasm32"))]
344fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
345 let mut last_separator = 0;
346
347 for (byte_idx, char) in path.char_indices() {
348 if GLOB_START_CHARS.contains(&char) {
349 if last_separator == 0 {
350 return Some((".", path));
351 }
352 return Some(path.split_at(last_separator));
353 }
354
355 if std::path::is_separator(char) {
356 last_separator = byte_idx + char.len_utf8();
357 }
358 }
359 None
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use tempfile::tempdir;
366
367 #[test]
368 fn test_prefix_path() {
369 let root = std::env::current_dir().unwrap();
370 let root = root.to_string_lossy();
371
372 let url = ListingTableUrl::parse(root).unwrap();
373 let child = url.prefix.child("partition").child("file");
374
375 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
376 assert_eq!(prefix, vec!["partition", "file"]);
377
378 let url = ListingTableUrl::parse("file:///").unwrap();
379 let child = Path::parse("/foo/bar").unwrap();
380 let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
381 assert_eq!(prefix, vec!["foo", "bar"]);
382
383 let url = ListingTableUrl::parse("file:///foo").unwrap();
384 let child = Path::parse("/foob/bar").unwrap();
385 assert!(url.strip_prefix(&child).is_none());
386
387 let url = ListingTableUrl::parse("file:///foo/file").unwrap();
388 let child = Path::parse("/foo/file").unwrap();
389 assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
390
391 let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
392 assert_eq!(url.prefix.as_ref(), "foo/ bar");
393
394 let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
395 assert_eq!(url.prefix.as_ref(), "foo/bar");
396
397 let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
398 assert_eq!(url.prefix.as_ref(), "foo/😺");
399
400 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
401 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
402
403 let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
404 assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
405
406 let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
407 assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
408
409 let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
410 assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
411
412 let dir = tempdir().unwrap();
413 let path = dir.path().join("bar%2Ffoo");
414 std::fs::File::create(&path).unwrap();
415
416 let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
417 assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
418
419 let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
420 assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
421
422 let url =
423 ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
424 assert_eq!(url.prefix.as_ref(), "baz/test.txt");
425
426 let workdir = std::env::current_dir().unwrap();
427 let t = workdir.join("non-existent");
428 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
429 let b = ListingTableUrl::parse("non-existent").unwrap();
430 assert_eq!(a, b);
431 assert!(a.prefix.as_ref().ends_with("non-existent"));
432
433 let t = workdir.parent().unwrap();
434 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
435 let b = ListingTableUrl::parse("..").unwrap();
436 assert_eq!(a, b);
437
438 let t = t.join("bar");
439 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
440 let b = ListingTableUrl::parse("../bar").unwrap();
441 assert_eq!(a, b);
442 assert!(a.prefix.as_ref().ends_with("bar"));
443
444 let t = t.join(".").join("foo").join("..").join("baz");
445 let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
446 let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
447 assert_eq!(a, b);
448 assert!(a.prefix.as_ref().ends_with("bar/baz"));
449 }
450
451 #[test]
452 fn test_prefix_s3() {
453 let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
454 assert_eq!(url.prefix.as_ref(), "foo/bar");
455
456 let path = Path::from("foo/bar/partition/foo.parquet");
457 let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
458 assert_eq!(prefix, vec!["partition", "foo.parquet"]);
459
460 let path = Path::from("other/bar/partition/foo.parquet");
461 assert!(url.strip_prefix(&path).is_none());
462 }
463
464 #[test]
465 fn test_split_glob() {
466 fn test(input: &str, expected: Option<(&str, &str)>) {
467 assert_eq!(
468 split_glob_expression(input),
469 expected,
470 "testing split_glob_expression with {input}"
471 );
472 }
473
474 test("/", None);
476 test("/a.txt", None);
477 test("/a", None);
478 test("/a/", None);
479 test("/a/b", None);
480 test("/a/b/", None);
481 test("/a/b.txt", None);
482 test("/a/b/c.txt", None);
483 test("*.txt", Some((".", "*.txt")));
485 test("/*.txt", Some(("/", "*.txt")));
486 test("/a/*b.txt", Some(("/a/", "*b.txt")));
487 test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
488 test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
489 test("/a/b*.txt", Some(("/a/", "b*.txt")));
490 test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
491
492 test(
494 "/a/b/c//alltypes_plain*.parquet",
495 Some(("/a/b/c//", "alltypes_plain*.parquet")),
496 );
497 }
498
499 #[test]
500 fn test_is_collection() {
501 fn test(input: &str, expected: bool, message: &str) {
502 let url = ListingTableUrl::parse(input).unwrap();
503 assert_eq!(url.is_collection(), expected, "{message}");
504 }
505
506 test("https://a.b.c/path/", true, "path ends with / - collection");
507 test(
508 "https://a.b.c/path/?a=b",
509 true,
510 "path ends with / - with query args - collection",
511 );
512 test(
513 "https://a.b.c/path?a=b/",
514 false,
515 "path not ends with / - query ends with / - not collection",
516 );
517 test(
518 "https://a.b.c/path/#a=b",
519 true,
520 "path ends with / - with fragment - collection",
521 );
522 test(
523 "https://a.b.c/path#a=b/",
524 false,
525 "path not ends with / - fragment ends with / - not collection",
526 );
527 }
528
529 #[test]
530 fn test_file_extension() {
531 fn test(input: &str, expected: Option<&str>, message: &str) {
532 let url = ListingTableUrl::parse(input).unwrap();
533 assert_eq!(url.file_extension(), expected, "{message}");
534 }
535
536 test("https://a.b.c/path/", None, "path ends with / - not a file");
537 test(
538 "https://a.b.c/path/?a=b",
539 None,
540 "path ends with / - with query args - not a file",
541 );
542 test(
543 "https://a.b.c/path?a=b/",
544 None,
545 "path not ends with / - query ends with / but no file extension",
546 );
547 test(
548 "https://a.b.c/path/#a=b",
549 None,
550 "path ends with / - with fragment - not a file",
551 );
552 test(
553 "https://a.b.c/path#a=b/",
554 None,
555 "path not ends with / - fragment ends with / but no file extension",
556 );
557 test(
558 "file///some/path/",
559 None,
560 "file path ends with / - not a file",
561 );
562 test(
563 "file///some/path/file",
564 None,
565 "file path does not end with - no extension",
566 );
567 test(
568 "file///some/path/file.",
569 None,
570 "file path ends with . - no value after .",
571 );
572 test(
573 "file///some/path/file.ext",
574 Some("ext"),
575 "file path ends with .ext - extension is ext",
576 );
577 }
578}