use std::sync::Arc;
use datafusion_common::{DataFusionError, Result, TableReference};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::CachedFileList;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
use log::debug;
use object_store::path::DELIMITER;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
use url::Url;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ListingTableUrl {
url: Url,
prefix: Path,
glob: Option<Pattern>,
table_ref: Option<TableReference>,
}
impl ListingTableUrl {
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let s = s.as_ref();
#[cfg(not(target_arch = "wasm32"))]
if std::path::Path::new(s).is_absolute() {
return Self::parse_path(s);
}
match Url::parse(s) {
Ok(url) => Self::try_new(url, None),
#[cfg(not(target_arch = "wasm32"))]
Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s),
Err(e) => Err(DataFusionError::External(Box::new(e))),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn parse_path(s: &str) -> Result<Self> {
let (path, glob) = match split_glob_expression(s) {
Some((prefix, glob)) => {
let glob = Pattern::new(glob)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
(prefix, Some(glob))
}
None => (s, None),
};
let url = url_from_filesystem_path(path).ok_or_else(|| {
DataFusionError::External(
format!("Failed to convert path to URL: {path}").into(),
)
})?;
Self::try_new(url, glob)
}
pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
let prefix = Path::from_url_path(url.path())?;
Ok(Self {
url,
prefix,
glob,
table_ref: None,
})
}
pub fn scheme(&self) -> &str {
self.url.scheme()
}
pub fn prefix(&self) -> &Path {
&self.prefix
}
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
let Some(all_segments) = self.strip_prefix(path) else {
return false;
};
let mut segments = all_segments.filter(|s| !s.contains('='));
match &self.glob {
Some(glob) => {
if ignore_subdirectory {
segments
.next()
.is_some_and(|file_name| glob.matches(file_name))
} else {
let stripped = segments.join(DELIMITER);
glob.matches(&stripped)
}
}
None if ignore_subdirectory => segments.count() <= 1,
None => true,
}
}
pub fn is_collection(&self) -> bool {
self.url.path().ends_with(DELIMITER)
}
pub fn file_extension(&self) -> Option<&str> {
if let Some(mut segments) = self.url.path_segments()
&& let Some(last_segment) = segments.next_back()
&& last_segment.contains(".")
&& !last_segment.ends_with(".")
{
return last_segment.split('.').next_back();
}
None
}
pub fn strip_prefix<'a, 'b: 'a>(
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
stripped = stripped.strip_prefix(DELIMITER)?;
}
Some(stripped.split_terminator(DELIMITER))
}
pub async fn list_prefixed_files<'a>(
&'a self,
ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
prefix: Option<Path>,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.config_options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
let full_prefix = if let Some(ref p) = prefix {
let mut parts = self.prefix.parts().collect::<Vec<_>>();
parts.extend(p.parts());
Path::from_iter(parts.into_iter())
} else {
self.prefix.clone()
};
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
list_with_cache(
ctx,
store,
self.table_ref.as_ref(),
&self.prefix,
prefix.as_ref(),
)
.await?
} else {
match store.head(&full_prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
.boxed(),
Err(object_store::Error::NotFound { .. }) => {
list_with_cache(
ctx,
store,
self.table_ref.as_ref(),
&self.prefix,
prefix.as_ref(),
)
.await?
}
Err(e) => return Err(e.into()),
}
};
Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.boxed())
}
pub async fn list_all_files<'a>(
&'a self,
ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
self.list_prefixed_files(ctx, store, None, file_extension)
.await
}
pub fn as_str(&self) -> &str {
self.as_ref()
}
pub fn object_store(&self) -> ObjectStoreUrl {
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
ObjectStoreUrl::parse(url).unwrap()
}
pub fn is_folder(&self) -> bool {
self.url.scheme() == "file" && self.is_collection()
}
pub fn get_url(&self) -> &Url {
&self.url
}
pub fn get_glob(&self) -> &Option<Pattern> {
&self.glob
}
pub fn with_glob(mut self, glob: &str) -> Result<Self> {
self.glob =
Some(Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?);
Ok(self)
}
pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
self.table_ref = Some(table_ref);
self
}
pub fn get_table_ref(&self) -> &Option<TableReference> {
&self.table_ref
}
}
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
table_ref: Option<&TableReference>,
table_base_path: &Path,
prefix: Option<&Path>,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
let full_prefix = match prefix {
Some(p) => {
let mut parts: Vec<_> = table_base_path.parts().collect();
parts.extend(p.parts());
Path::from_iter(parts)
}
None => table_base_path.clone(),
};
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
.list(Some(&full_prefix))
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
let table_scoped_base_path = TableScopedPath {
table: table_ref.cloned(),
path: table_base_path.clone(),
};
let vec = if let Some(cached) = cache.get(&table_scoped_base_path) {
debug!("Hit list files cache");
cached.files_matching_prefix(&filter_prefix)
} else {
let mut vec = store
.list(Some(table_base_path))
.try_collect::<Vec<ObjectMeta>>()
.await?;
vec.shrink_to_fit(); let cached: CachedFileList = vec.into();
let result = cached.files_matching_prefix(&filter_prefix);
cache.put(&table_scoped_base_path, cached);
result
};
Ok(
futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
.boxed(),
)
}
}
}
#[cfg(not(target_arch = "wasm32"))]
fn url_from_filesystem_path(s: &str) -> Option<Url> {
let path = std::path::Path::new(s);
let is_dir = match path.exists() {
true => path.is_dir(),
false => std::path::is_separator(s.chars().last()?),
};
let from_absolute_path = |p| {
let first = match is_dir {
true => Url::from_directory_path(p).ok(),
false => Url::from_file_path(p).ok(),
}?;
Url::parse(first.as_str()).ok()
};
if path.is_absolute() {
return from_absolute_path(path);
}
let absolute = std::env::current_dir().ok()?.join(path);
from_absolute_path(&absolute)
}
impl AsRef<str> for ListingTableUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
}
}
impl AsRef<Url> for ListingTableUrl {
fn as_ref(&self) -> &Url {
&self.url
}
}
impl std::fmt::Display for ListingTableUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
#[cfg(not(target_arch = "wasm32"))]
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
#[cfg(not(target_arch = "wasm32"))]
fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
let mut last_separator = 0;
for (byte_idx, char) in path.char_indices() {
if GLOB_START_CHARS.contains(&char) {
if last_separator == 0 {
return Some((".", path));
}
return Some(path.split_at(last_separator));
}
if std::path::is_separator(char) {
last_separator = byte_idx + char.len_utf8();
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::DFSchema;
use datafusion_common::config::TableOptions;
use datafusion_execution::TaskContext;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use object_store::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
PutMultipartOptions, PutPayload,
};
use std::any::Any;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use tempfile::tempdir;
#[test]
fn test_prefix_path() {
let root = std::env::current_dir().unwrap();
let root = root.to_string_lossy();
let url = ListingTableUrl::parse(root).unwrap();
let child = url.prefix.child("partition").child("file");
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["partition", "file"]);
let url = ListingTableUrl::parse("file:///").unwrap();
let child = Path::parse("/foo/bar").unwrap();
let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["foo", "bar"]);
let url = ListingTableUrl::parse("file:///foo").unwrap();
let child = Path::parse("/foob/bar").unwrap();
assert!(url.strip_prefix(&child).is_none());
let url = ListingTableUrl::parse("file:///foo/file").unwrap();
let child = Path::parse("/foo/file").unwrap();
assert_eq!(url.strip_prefix(&child).unwrap().count(), 0);
let url = ListingTableUrl::parse("file:///foo/ bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/ bar");
let url = ListingTableUrl::parse("file:///foo/bar?").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let url = ListingTableUrl::parse("file:///foo/😺").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/😺");
let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
let url = ListingTableUrl::parse("file:///foo/bar%2Efoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar.foo");
let url = ListingTableUrl::parse("file:///foo/bar%252Ffoo").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar%2Ffoo");
let url = ListingTableUrl::parse("file:///foo/a%252Fb.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/a%2Fb.txt");
let dir = tempdir().unwrap();
let path = dir.path().join("bar%2Ffoo");
std::fs::File::create(&path).unwrap();
let url = ListingTableUrl::parse(path.to_str().unwrap()).unwrap();
assert!(url.prefix.as_ref().ends_with("bar%2Ffoo"), "{}", url.prefix);
let url = ListingTableUrl::parse("file:///foo/../a%252Fb.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "a%2Fb.txt");
let url =
ListingTableUrl::parse("file:///foo/./bar/../../baz/./test.txt").unwrap();
assert_eq!(url.prefix.as_ref(), "baz/test.txt");
let workdir = std::env::current_dir().unwrap();
let t = workdir.join("non-existent");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("non-existent").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("non-existent"));
let t = workdir.parent().unwrap();
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("..").unwrap();
assert_eq!(a, b);
let t = t.join("bar");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar"));
let t = t.join(".").join("foo").join("..").join("baz");
let a = ListingTableUrl::parse(t.to_str().unwrap()).unwrap();
let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap();
assert_eq!(a, b);
assert!(a.prefix.as_ref().ends_with("bar/baz"));
}
#[test]
fn test_prefix_s3() {
let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
assert_eq!(url.prefix.as_ref(), "foo/bar");
let path = Path::from("foo/bar/partition/foo.parquet");
let prefix: Vec<_> = url.strip_prefix(&path).unwrap().collect();
assert_eq!(prefix, vec!["partition", "foo.parquet"]);
let path = Path::from("other/bar/partition/foo.parquet");
assert!(url.strip_prefix(&path).is_none());
}
#[test]
fn test_split_glob() {
fn test(input: &str, expected: Option<(&str, &str)>) {
assert_eq!(
split_glob_expression(input),
expected,
"testing split_glob_expression with {input}"
);
}
test("/", None);
test("/a.txt", None);
test("/a", None);
test("/a/", None);
test("/a/b", None);
test("/a/b/", None);
test("/a/b.txt", None);
test("/a/b/c.txt", None);
test("*.txt", Some((".", "*.txt")));
test("/*.txt", Some(("/", "*.txt")));
test("/a/*b.txt", Some(("/a/", "*b.txt")));
test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
test("/a/b*.txt", Some(("/a/", "b*.txt")));
test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
test(
"/a/b/c//alltypes_plain*.parquet",
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}
#[test]
fn test_is_collection() {
fn test(input: &str, expected: bool, message: &str) {
let url = ListingTableUrl::parse(input).unwrap();
assert_eq!(url.is_collection(), expected, "{message}");
}
test("https://a.b.c/path/", true, "path ends with / - collection");
test(
"https://a.b.c/path/?a=b",
true,
"path ends with / - with query args - collection",
);
test(
"https://a.b.c/path?a=b/",
false,
"path not ends with / - query ends with / - not collection",
);
test(
"https://a.b.c/path/#a=b",
true,
"path ends with / - with fragment - collection",
);
test(
"https://a.b.c/path#a=b/",
false,
"path not ends with / - fragment ends with / - not collection",
);
}
#[test]
fn test_file_extension() {
fn test(input: &str, expected: Option<&str>, message: &str) {
let url = ListingTableUrl::parse(input).unwrap();
assert_eq!(url.file_extension(), expected, "{message}");
}
test("https://a.b.c/path/", None, "path ends with / - not a file");
test(
"https://a.b.c/path/?a=b",
None,
"path ends with / - with query args - not a file",
);
test(
"https://a.b.c/path?a=b/",
None,
"path not ends with / - query ends with / but no file extension",
);
test(
"https://a.b.c/path/#a=b",
None,
"path ends with / - with fragment - not a file",
);
test(
"https://a.b.c/path#a=b/",
None,
"path not ends with / - fragment ends with / but no file extension",
);
test(
"file///some/path/",
None,
"file path ends with / - not a file",
);
test(
"file///some/path/file",
None,
"file path does not end with - no extension",
);
test(
"file///some/path/file.",
None,
"file path ends with . - no value after .",
);
test(
"file///some/path/file.ext",
Some("ext"),
"file path ends with .ext - extension is ext",
);
}
#[tokio::test]
async fn test_list_files() -> Result<()> {
let store = MockObjectStore {
in_mem: object_store::memory::InMemory::new(),
forbidden_paths: vec!["forbidden/e.parquet".into()],
};
create_file(&store, "a.parquet").await;
create_file(&store, "/t/b.parquet").await;
create_file(&store, "/t/c.csv").await;
create_file(&store, "/t/d.csv").await;
create_file(&store, "/forbidden/e.parquet").await;
assert_eq!(
list_all_files("/", &store, "parquet").await?,
vec!["a.parquet"],
);
assert_eq!(
list_all_files("/t/", &store, "parquet").await?,
vec!["t/b.parquet"],
);
assert_eq!(
list_all_files("/t", &store, "parquet").await?,
vec!["t/b.parquet"],
);
assert_eq!(
list_all_files("/t", &store, "csv").await?,
vec!["t/c.csv", "t/d.csv"],
);
assert_eq!(
list_all_files("/t/", &store, "csv").await?,
vec!["t/c.csv", "t/d.csv"],
);
assert_eq!(
list_all_files("/NonExisting", &store, "csv").await?,
vec![] as Vec<String>
);
assert_eq!(
list_all_files("/NonExisting/", &store, "csv").await?,
vec![] as Vec<String>
);
let Err(DataFusionError::ObjectStore(err)) =
list_all_files("/forbidden/e.parquet", &store, "parquet").await
else {
panic!("Expected ObjectStore error");
};
let object_store::Error::PermissionDenied { .. } = &*err else {
panic!("Expected PermissionDenied error");
};
create_file(&store, "/data/a=1/file1.parquet").await;
create_file(&store, "/data/a=1/b=100/file2.parquet").await;
create_file(&store, "/data/a=2/b=200/file3.parquet").await;
create_file(&store, "/data/a=2/b=200/file4.csv").await;
assert_eq!(
list_prefixed_files("/data/", &store, Some(Path::from("a=1")), "parquet")
.await?,
vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
);
assert_eq!(
list_prefixed_files(
"/data/",
&store,
Some(Path::from("a=1/b=100")),
"parquet"
)
.await?,
vec!["data/a=1/b=100/file2.parquet"],
);
assert_eq!(
list_prefixed_files("/data/", &store, Some(Path::from("a=2")), "parquet")
.await?,
vec!["data/a=2/b=200/file3.parquet"],
);
Ok(())
}
#[tokio::test]
async fn test_cache_path_equivalence() -> Result<()> {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
let store = MockObjectStore {
in_mem: object_store::memory::InMemory::new(),
forbidden_paths: vec![],
};
create_file(&store, "/table/year=2023/data1.parquet").await;
create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
create_file(&store, "/table/year=2024/data3.parquet").await;
create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
let session_no_cache = MockSession::new();
let runtime_with_cache = RuntimeEnvBuilder::new()
.with_object_list_cache_limit(1024 * 1024) .build_arc()?;
let session_with_cache = MockSession::with_runtime_env(runtime_with_cache);
let test_cases = vec![
("/table/", None, "full table listing"),
(
"/table/",
Some(Path::from("year=2023")),
"single partition filter",
),
(
"/table/",
Some(Path::from("year=2024")),
"different partition filter",
),
(
"/table/",
Some(Path::from("year=2024/month=06")),
"nested partition filter",
),
(
"/table/",
Some(Path::from("year=2025")),
"non-existent partition",
),
];
for (url_str, prefix, description) in test_cases {
let url = ListingTableUrl::parse(url_str)?;
let mut results_no_cache: Vec<String> = url
.list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
results_no_cache.sort();
let mut results_with_cache_miss: Vec<String> = url
.list_prefixed_files(
&session_with_cache,
&store,
prefix.clone(),
"parquet",
)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
results_with_cache_miss.sort();
let mut results_with_cache_hit: Vec<String> = url
.list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
results_with_cache_hit.sort();
assert_eq!(
results_no_cache, results_with_cache_miss,
"Cache miss path should match non-cached path for: {description}"
);
assert_eq!(
results_no_cache, results_with_cache_hit,
"Cache hit path should match non-cached path for: {description}"
);
}
Ok(())
}
#[tokio::test]
async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
let store = MockObjectStore {
in_mem: object_store::memory::InMemory::new(),
forbidden_paths: vec![],
};
create_file(&store, "/sales/region=US/q1.parquet").await;
create_file(&store, "/sales/region=US/q2.parquet").await;
create_file(&store, "/sales/region=EU/q1.parquet").await;
let runtime = RuntimeEnvBuilder::new()
.with_object_list_cache_limit(1024 * 1024) .build_arc()?;
let session = MockSession::with_runtime_env(runtime);
let url = ListingTableUrl::parse("/sales/")?;
let full_results: Vec<String> = url
.list_prefixed_files(&session, &store, None, "parquet")
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
assert_eq!(full_results.len(), 3);
let mut us_results: Vec<String> = url
.list_prefixed_files(
&session,
&store,
Some(Path::from("region=US")),
"parquet",
)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
us_results.sort();
assert_eq!(
us_results,
vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
);
let eu_results: Vec<String> = url
.list_prefixed_files(
&session,
&store,
Some(Path::from("region=EU")),
"parquet",
)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|m| m.location.to_string())
.collect();
assert_eq!(eu_results, vec!["sales/region=EU/q1.parquet"]);
Ok(())
}
async fn create_file(object_store: &dyn ObjectStore, path: &str) {
object_store
.put(&Path::from(path), PutPayload::from_static(b"hello world"))
.await
.expect("failed to create test file");
}
async fn list_all_files(
url: &str,
store: &dyn ObjectStore,
file_extension: &str,
) -> Result<Vec<String>> {
try_list_prefixed_files(url, store, None, file_extension).await
}
async fn list_prefixed_files(
url: &str,
store: &dyn ObjectStore,
prefix: Option<Path>,
file_extension: &str,
) -> Result<Vec<String>> {
try_list_prefixed_files(url, store, prefix, file_extension).await
}
async fn try_list_prefixed_files(
url: &str,
store: &dyn ObjectStore,
prefix: Option<Path>,
file_extension: &str,
) -> Result<Vec<String>> {
let session = MockSession::new();
let url = ListingTableUrl::parse(url)?;
let files = url
.list_prefixed_files(&session, store, prefix, file_extension)
.await?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|meta| meta.location.as_ref().to_string())
.collect();
Ok(files)
}
#[derive(Debug)]
struct MockObjectStore {
in_mem: object_store::memory::InMemory,
forbidden_paths: Vec<Path>,
}
impl std::fmt::Display for MockObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.in_mem.fmt(f)
}
}
#[async_trait]
impl ObjectStore for MockObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<object_store::PutResult> {
self.in_mem.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.in_mem.put_multipart_opts(location, opts).await
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
if options.head && self.forbidden_paths.contains(location) {
Err(object_store::Error::PermissionDenied {
path: location.to_string(),
source: "forbidden".into(),
})
} else {
self.in_mem.get_opts(location, options).await
}
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
self.in_mem.get_ranges(location, ranges).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, object_store::Result<Path>>,
) -> BoxStream<'static, object_store::Result<Path>> {
self.in_mem.delete_stream(locations)
}
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.in_mem.list(prefix)
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
self.in_mem.list_with_delimiter(prefix).await
}
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> object_store::Result<()> {
self.in_mem.copy_opts(from, to, options).await
}
}
struct MockSession {
config: SessionConfig,
runtime_env: Arc<RuntimeEnv>,
}
impl MockSession {
fn new() -> Self {
Self {
config: SessionConfig::new(),
runtime_env: Arc::new(RuntimeEnv::default()),
}
}
fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
Self {
config: SessionConfig::new(),
runtime_env,
}
}
}
#[async_trait::async_trait]
impl Session for MockSession {
fn session_id(&self) -> &str {
unimplemented!()
}
fn config(&self) -> &SessionConfig {
&self.config
}
async fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn create_physical_expr(
&self,
_expr: Expr,
_df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
unimplemented!()
}
fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
unimplemented!()
}
fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
unimplemented!()
}
fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
unimplemented!()
}
fn runtime_env(&self) -> &Arc<RuntimeEnv> {
&self.runtime_env
}
fn execution_props(&self) -> &ExecutionProps {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn table_options(&self) -> &TableOptions {
unimplemented!()
}
fn table_options_mut(&mut self) -> &mut TableOptions {
unimplemented!()
}
fn task_ctx(&self) -> Arc<TaskContext> {
unimplemented!()
}
}
}