use std::sync::Arc;
use arrow::{
array::{Array, ArrayRef, AsArray, StringBuilder},
compute::{and, cast, prep_null_mask_filter},
datatypes::{Field, Schema},
record_batch::RecordBatch,
};
use datafusion::{
common::DFSchema,
datasource::listing::{ListingTableUrl, PartitionedFile},
error::{DataFusionError, Result},
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
prelude::Expr,
scalar::ScalarValue,
};
use futures::{
stream::{BoxStream, FuturesUnordered},
StreamExt, TryStreamExt,
};
use object_store::{path::Path, ObjectMeta, ObjectStore};
const CONCURRENCY_LIMIT: usize = 100;
pub(crate) async fn list_all_files<'a>(
path: &'a ListingTableUrl,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let is_dir = path.as_str().ends_with('/');
let list = match is_dir {
true => store.list(Some(path.prefix())),
false => futures::stream::once(store.head(path.prefix())).boxed(),
};
Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path
.as_ref()
.to_lowercase()
.ends_with(file_extension.to_lowercase().as_str());
futures::future::ready(extension_match)
})
.map_err(DataFusionError::ObjectStore)
.boxed())
}
pub async fn pruned_partition_list<'a>(
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
file_extension: &'a str,
partition_cols: &'a [Field],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
tracing::info!(
"pruned_partition_list: table_path: {:?}, partition_cols: {:?}, file_extension: {:?}",
table_path,
partition_cols,
file_extension
);
if partition_cols.is_empty() {
let files = list_all_files(table_path, store, file_extension)
.await?
.map_ok(|o| o.into());
tracing::trace!("pruned_partition_list: no partition columns, returning all files");
return Ok(Box::pin(files));
}
let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
let pruned = prune_partitions(table_path, partitions, filters, partition_cols).await?;
tracing::info!(
"pruned_partition_list: got n partitions {:?} for table_path {:?}",
pruned.len(),
table_path
);
let stream = futures::stream::iter(pruned)
.map(move |partition: Partition| async move {
let cols = partition_cols.iter().map(|x| x.name().as_str());
let parsed = parse_partitions_for_path(table_path, &partition.path, cols);
let partition_values = parsed
.into_iter()
.flatten()
.zip(partition_cols)
.map(|(parsed, field)| {
ScalarValue::try_from_string(parsed.to_string(), field.data_type())
})
.collect::<Result<Vec<_>>>()?;
let files = match partition.files {
Some(files) => files,
None => {
let s = store.list(Some(&partition.path));
s.try_collect().await?
}
};
let files = files.into_iter().filter(move |o| {
let extension_match = o.location.as_ref().to_lowercase().ends_with(file_extension);
let glob_match = table_path.contains(&o.location, false);
tracing::info!(
"pruned_partition_list: extension_match: {:?}, glob_match: {:?} for {:?}",
extension_match,
glob_match,
o.location
);
extension_match && glob_match
});
let stream = futures::stream::iter(files.map(move |object_meta| {
Ok(PartitionedFile {
object_meta,
partition_values: partition_values.clone(),
range: None,
extensions: None,
statistics: None,
})
}));
Ok::<_, DataFusionError>(stream)
})
.buffer_unordered(CONCURRENCY_LIMIT)
.try_flatten()
.boxed();
Ok(stream)
}
#[derive(Debug)]
struct Partition {
path: Path,
depth: usize,
files: Option<Vec<ObjectMeta>>,
}
impl Partition {
async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
self.files = Some(result.objects);
Ok((self, result.common_prefixes))
}
}
async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
) -> Result<Vec<Partition>> {
let partition = Partition {
path: table_path.prefix().clone(),
depth: 0,
files: None,
};
let mut out = Vec::with_capacity(64);
let mut pending = vec![];
let mut futures = FuturesUnordered::new();
futures.push(partition.list(store));
while let Some((partition, paths)) = futures.next().await.transpose()? {
if let Some(next) = pending.pop() {
futures.push(next)
}
let depth = partition.depth;
out.push(partition);
for path in paths {
let child = Partition {
path,
depth: depth + 1,
files: None,
};
match depth < max_depth {
true => match futures.len() < CONCURRENCY_LIMIT {
true => futures.push(child.list(store)),
false => pending.push(child.list(store)),
},
false => out.push(child),
}
}
}
Ok(out)
}
async fn prune_partitions(
table_path: &ListingTableUrl,
partitions: Vec<Partition>,
filters: &[Expr],
partition_cols: &[Field],
) -> Result<Vec<Partition>> {
if filters.is_empty() {
return Ok(partitions);
}
let mut builders: Vec<_> = (0..partition_cols.len())
.map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
.collect();
for partition in &partitions {
let cols = partition_cols.iter().map(|x| x.name().as_str());
let parsed =
parse_partitions_for_path(table_path, &partition.path, cols).unwrap_or_default();
let mut builders = builders.iter_mut();
for (p, b) in parsed.iter().zip(&mut builders) {
b.append_value(p);
}
builders.for_each(|b| b.append_null());
}
let arrays = partition_cols
.iter()
.zip(builders)
.map(|(field, mut builder)| {
let array = builder.finish();
cast(&array, field.data_type()).map_err(|e| DataFusionError::ArrowError(e, None))
})
.collect::<Result<_, _>>()?;
let schema = Arc::new(Schema::new(partition_cols.to_vec()));
let df_schema = DFSchema::from_unqualified_fields(
partition_cols
.iter()
.map(|f| Field::new(f.name(), f.data_type().clone(), f.is_nullable())) .collect(),
Default::default(),
)?;
let batch = RecordBatch::try_new(Arc::clone(&schema), arrays)?;
let props = ExecutionProps::new();
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
let eval = expr.evaluate(&batch).ok()?;
eval.into_array(partitions.len()).ok()
};
let mask = filters
.iter()
.fold(None, |acc, filter| match (acc, do_filter(filter)) {
(Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
(None, Some(r)) => Some(r.as_boolean().clone()),
(r, None) => r,
});
let mask = match mask {
Some(mask) => mask,
None => return Ok(partitions),
};
let prepared = match mask.null_count() {
0 => mask,
_ => prep_null_mask_filter(&mask),
};
assert_eq!(prepared.len(), partitions.len());
let filtered = partitions
.into_iter()
.zip(prepared.values())
.filter_map(|(p, f)| f.then_some(p))
.collect();
Ok(filtered)
}
fn parse_partitions_for_path<'a, I>(
table_path: &ListingTableUrl,
file_path: &'a Path,
table_partition_cols: I,
) -> Option<Vec<&'a str>>
where
I: IntoIterator<Item = &'a str>,
{
let subpath = strip_prefix(table_path, file_path)?;
let mut part_values = vec![];
for (part, pn) in subpath.zip(table_partition_cols) {
match part.split_once('=') {
Some((name, val)) if name == pn => part_values.push(val),
_ => {
return None;
}
}
}
Some(part_values)
}
fn strip_prefix<'a, 'b: 'a>(
table_path: &'a ListingTableUrl,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
use object_store::path::DELIMITER;
let mut stripped = path.as_ref().strip_prefix(table_path.prefix().as_ref())?;
if !stripped.is_empty() && !table_path.prefix().as_ref().is_empty() {
stripped = stripped.strip_prefix(DELIMITER)?;
}
Some(stripped.split_terminator(DELIMITER))
}