use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_optimizer::OptimizerConfig;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
use object_store::path::DELIMITER;
use object_store::{ObjectMeta, ObjectStore};
use std::sync::Arc;
use url::Url;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ListingTableUrl {
url: Url,
prefix: Path,
glob: Option<Pattern>,
}
impl ListingTableUrl {
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();
#[cfg(not(target_arch = "wasm32"))]
if std::path::Path::new(s).is_absolute() {
return Self::parse_path(s);
}
match Url::parse(s) {
Ok(url) => Self::try_new(url, None),
#[cfg(not(target_arch = "wasm32"))]
Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn parse_path(s: &str) -> Result<Self> {
let (path, glob) = match split_glob_expression(s) {
Some((prefix, glob)) => {
let glob = Pattern::new(glob)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
(prefix, Some(glob))
}
None => (s, None),
};
let url = url_from_filesystem_path(path).ok_or_else(|| {
DataFusionError::External(
format!("Failed to convert path to URL: {path}").into(),
)
})?;
Self::try_new(url, glob)
}
fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
let prefix = Path::from_url_path(url.path())?;
Ok(Self { url, prefix, glob })
}
pub fn scheme(&self) -> &str {
self.url.scheme()
}
pub fn prefix(&self) -> &Path {
&self.prefix
}
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
let Some(all_segments) = self.strip_prefix(path) else {
return false;
};
let mut segments = all_segments.filter(|s| !s.contains('='));
match &self.glob {
Some(glob) => {
if ignore_subdirectory {
segments
.next()
.is_some_and(|file_name| glob.matches(file_name))
} else {
let stripped = segments.join(DELIMITER);
glob.matches(&stripped)
}
}
None if ignore_subdirectory => segments.count() <= 1,
None => true,
}
}
pub fn is_collection(&self) -> bool {
self.url.path().ends_with(DELIMITER)
}
pub fn file_extension(&self) -> Option<&str> {
if let Some(segments) = self.url.path_segments() {
if let Some(last_segment) = segments.last() {
if last_segment.contains(".") && !last_segment.ends_with(".") {
return last_segment.split('.').last();
}
}
}
None
}
pub(crate) fn strip_prefix<'a, 'b: 'a>(
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
stripped = stripped.strip_prefix(DELIMITER)?;
}
Some(stripped.split_terminator(DELIMITER))
}
pub async fn list_all_files<'a>(
&'a self,
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
let list = match self.is_collection() {
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => store.list(Some(&self.prefix)),
Some(cache) => {
if let Some(res) = cache.get(&self.prefix) {
debug!("Hit list all files cache");
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
.boxed()
} else {
let list_res = store.list(Some(&self.prefix));
let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
cache.put(&self.prefix, Arc::new(vec.clone()));
futures::stream::iter(vec.into_iter().map(Ok)).boxed()
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),
};
Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.map_err(DataFusionError::ObjectStore)
.boxed())
}
pub fn as_str(&self) -> &str {
self.as_ref()
}
pub fn object_store(&self) -> ObjectStoreUrl {
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
ObjectStoreUrl::parse(url).unwrap()
}
}
#[cfg(not(target_arch = "wasm32"))]
fn url_from_filesystem_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
false => std::path::is_separator(s.chars().last()?),
};
let from_absolute_path = |p| {
let first = match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
}?;
Url::parse(first.as_str()).ok()
};
if path.is_absolute() {
return from_absolute_path(path);
}
let absolute = std::env::current_dir().ok()?.join(path);
from_absolute_path(&absolute)
}
impl AsRef<str> for ListingTableUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
}
}
impl AsRef<Url> for ListingTableUrl {
fn as_ref(&self) -> &Url {
&self.url
}
}
impl std::fmt::Display for ListingTableUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
let mut last_separator = 0;
for (byte_idx, char) in path.char_indices() {
if GLOB_START_CHARS.contains(&char) {
if last_separator == 0 {
return Some((".", path));
}
return Some(path.split_at(last_separator));
}
if std::path::is_separator(char) {
last_separator = byte_idx + char.len_utf8();
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_prefix_path() {
let root = std::env::current_dir().unwrap();
let root = root.to_string_lossy();
let url = ListingTableUrl::parse(root).unwrap();
let child = url.prefix.child("partition").child("file");
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["partition", "file"]);
let url = ListingTableUrl::parse("file:///").unwrap();
let child = Path::parse("/foo/bar").unwrap();
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["foo", "bar"]);
let url = ListingTableUrl::parse("file:///foo").unwrap();
let child = Path::parse("/foob/bar").unwrap();
assert!(url.strip_prefix(&child).is_none());
let url = ListingTableUrl::parse("file:///foo/file").unwrap();
let child = Path::parse("/foo/file").unwrap();
assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/ bar");
let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/😺");
let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
let dir = tempdir().unwrap();
let path = dir.path().join("bar%2Ffoo");
std::fs::File::create(&path).unwrap();
let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
let url =
ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "baz/test.txt");
let workdir = std::env::current_dir().unwrap();
let t = workdir.join("non-existent");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("non-existent").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("non-existent"));
let t = workdir.parent().unwrap();
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("..").unwrap();
assert_eq!(a, b);
let t = t.join("bar");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar"));
let t = t.join(".").join("foo").join("..").join("baz");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar/baz"));
}
#[test]
fn test_prefix_s3() {
let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let path = Path::from("foo/bar/partition/foo.parquet");
let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
assert_eq!(prefix, vec!["partition", "foo.parquet"]);
let path = Path::from("other/bar/partition/foo.parquet");
assert!(url.strip_prefix(&path).is_none());
}
#[test]
fn test_split_glob() {
fn test(input: &str, expected: Option<(&str, &str)>) {
assert_eq!(
split_glob_expression(input),
expected,
"testing split_glob_expression with {input}"
);
}
test("/", None);
test("/a.txt", None);
test("/a", None);
test("/a/", None);
test("/a/b", None);
test("/a/b/", None);
test("/a/b.txt", None);
test("/a/b/c.txt", None);
test("*.txt", Some((".", "*.txt")));
test("/*.txt", Some(("/", "*.txt")));
test("/a/*b.txt", Some(("/a/", "*b.txt")));
test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
test("/a/b*.txt", Some(("/a/", "b*.txt")));
test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
test(
"/a/b/c//alltypes_plain*.parquet",
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}
#[test]
fn test_is_collection() {
fn test(input: &str, expected: bool, message: &str) {
let url = ListingTableUrl::parse(input).unwrap();
assert_eq!(url.is_collection(), expected, "{message}");
}
test("https://a.b.c/path/", true, "path ends with / - collection");
test(
"https://a.b.c/path/?a=b",
true,
"path ends with / - with query args - collection",
);
test(
"https://a.b.c/path?a=b/",
false,
"path not ends with / - query ends with / - not collection",
);
test(
"https://a.b.c/path/#a=b",
true,
"path ends with / - with fragment - collection",
);
test(
"https://a.b.c/path#a=b/",
false,
"path not ends with / - fragment ends with / - not collection",
);
}
#[test]
fn test_file_extension() {
fn test(input: &str, expected: Option<&str>, message: &str) {
let url = ListingTableUrl::parse(input).unwrap();
assert_eq!(url.file_extension(), expected, "{message}");
}
test("https://a.b.c/path/", None, "path ends with / - not a file");
test(
"https://a.b.c/path/?a=b",
None,
"path ends with / - with query args - not a file",
);
test(
"https://a.b.c/path?a=b/",
None,
"path not ends with / - query ends with / but no file extension",
);
test(
"https://a.b.c/path/#a=b",
None,
"path ends with / - with fragment - not a file",
);
test(
"https://a.b.c/path#a=b/",
None,
"path not ends with / - fragment ends with / but no file extension",
);
test(
"file///some/path/",
None,
"file path ends with / - not a file",
);
test(
"file///some/path/file",
None,
"file path does not end with - no extension",
);
test(
"file///some/path/file.",
None,
"file path ends with . - no value after .",
);
test(
"file///some/path/file.ext",
Some("ext"),
"file path ends with .ext - extension is ext",
);
}
}