use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use percent_encoding;
use url::Url;
#[derive(Debug, Clone)]
pub struct ListingTableUrl {
url: Url,
prefix: Path,
glob: Option<Pattern>,
}
impl ListingTableUrl {
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();
if std::path::Path::new(s).is_absolute() {
return Self::parse_path(s);
}
match Url::parse(s) {
Ok(url) => Ok(Self::new(url, None)),
Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
}
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Some((prefix, glob)) => {
let glob = Pattern::new(glob)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
(prefix, Some(glob))
}
None => (s, None),
};
let path = std::path::Path::new(prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?;
Ok(Self::new(url, glob))
}
fn new(url: Url, glob: Option<Pattern>) -> Self {
let decoded_path =
percent_encoding::percent_decode_str(url.path()).decode_utf8_lossy();
let prefix = Path::from(decoded_path.as_ref());
Self { url, prefix, glob }
}
pub fn scheme(&self) -> &str {
self.url.scheme()
}
pub(crate) fn strip_prefix<'a, 'b: 'a>(
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
use object_store::path::DELIMITER;
let path: &str = path.as_ref();
let stripped = match self.prefix.as_ref() {
"" => path,
p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?,
};
Some(stripped.split(DELIMITER))
}
pub(crate) fn list_all_files<'a>(
&'a self,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> BoxStream<'a, Result<ObjectMeta>> {
let is_dir = self.url.as_str().ends_with('/');
let list = match is_dir {
true => futures::stream::once(store.list(Some(&self.prefix)))
.try_flatten()
.boxed(),
false => futures::stream::once(store.head(&self.prefix)).boxed(),
};
list.map_err(Into::into)
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = match &self.glob {
Some(glob) => match self.strip_prefix(path) {
Some(mut segments) => {
let stripped = segments.join("/");
glob.matches(&stripped)
}
None => false,
},
None => true,
};
futures::future::ready(extension_match && glob_match)
})
.boxed()
}
pub fn as_str(&self) -> &str {
self.as_ref()
}
pub fn object_store(&self) -> ObjectStoreUrl {
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
ObjectStoreUrl::parse(url).unwrap()
}
}
impl AsRef<str> for ListingTableUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
}
}
impl AsRef<Url> for ListingTableUrl {
fn as_ref(&self) -> &Url {
&self.url
}
}
impl std::fmt::Display for ListingTableUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
let mut last_separator = 0;
for (byte_idx, char) in path.char_indices() {
if GLOB_START_CHARS.contains(&char) {
if last_separator == 0 {
return Some((".", path));
}
return Some(path.split_at(last_separator));
}
if std::path::is_separator(char) {
last_separator = byte_idx + char.len_utf8();
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prefix_path() {
let root = std::env::current_dir().unwrap();
let root = root.to_string_lossy();
let url = ListingTableUrl::parse(root).unwrap();
let child = url.prefix.child("partition").child("file");
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["partition", "file"]);
let url = ListingTableUrl::parse("file:///").unwrap();
let child = Path::parse("/foo/bar").unwrap();
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["foo", "bar"]);
let url = ListingTableUrl::parse("file:///foo").unwrap();
let child = Path::parse("/foob/bar").unwrap();
assert!(url.strip_prefix(&child).is_none());
let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/ bar");
let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/%F0%9F%98%BA");
}
#[test]
fn test_prefix_s3() {
let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let path = Path::from("foo/bar/partition/foo.parquet");
let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
assert_eq!(prefix, vec!["partition", "foo.parquet"]);
let path = Path::from("other/bar/partition/foo.parquet");
assert!(url.strip_prefix(&path).is_none());
}
#[test]
fn test_split_glob() {
fn test(input: &str, expected: Option<(&str, &str)>) {
assert_eq!(
split_glob_expression(input),
expected,
"testing split_glob_expression with {input}"
);
}
test("/", None);
test("/a.txt", None);
test("/a", None);
test("/a/", None);
test("/a/b", None);
test("/a/b/", None);
test("/a/b.txt", None);
test("/a/b/c.txt", None);
test("*.txt", Some((".", "*.txt")));
test("/*.txt", Some(("/", "*.txt")));
test("/a/*b.txt", Some(("/a/", "*b.txt")));
test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
test("/a/b*.txt", Some(("/a/", "b*.txt")));
test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
test(
"/a/b/c//alltypes_plain*.parquet",
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}
}