use std::any::Any;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fmt;
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::ArrowError;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::TableProvider;
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::pruning::PruningStatistics;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::common::{Column, DFSchemaRef, Result, Statistics, ToDFSchema};
use datafusion::config::{ConfigOptions, TableParquetOptions};
use datafusion::datasource::TableType;
use datafusion::datasource::physical_plan::FileGroup;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::table_schema::TableSchema;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::simplify::SimplifyContext;
use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::{BinaryExpr, LogicalPlan, Operator};
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PhysicalExpr, PlanProperties,
};
use datafusion::{
catalog::Session,
common::{HashMap, HashSet},
datasource::listing::PartitionedFile,
logical_expr::{TableProviderFilterPushDown, utils::conjunction},
prelude::Expr,
scalar::ScalarValue,
};
use futures::TryStreamExt as _;
use futures::future::BoxFuture;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use url::Url;
use uuid::Uuid;
use crate::delta_datafusion::file_id::{file_id_data_type, wrap_file_id_value};
use crate::delta_datafusion::table_provider::next::SnapshotWrapper;
use crate::delta_datafusion::{
DataFusionMixins as _, DeltaSessionExt, FindFilesExprProperties, get_null_of_arrow_type,
to_correct_scalar_value,
};
use crate::kernel::transaction::PROTOCOL;
use crate::kernel::{Add, EagerSnapshot, Snapshot, Version};
use crate::logstore::LogStore;
use crate::logstore::LogStoreExt as _;
use crate::protocol::SaveMode;
use crate::table::normalize_table_url;
use crate::{DeltaResult, DeltaTable, DeltaTableError, logstore::LogStoreRef};
mod data_sink;
pub(crate) mod next;
const PATH_COLUMN: &str = "__delta_rs_path";
pub(crate) fn resolve_file_column_name(
input_schema: &Schema,
file_column_name: Option<&str>,
) -> DeltaResult<String> {
let column_names: HashSet<&str> = input_schema
.fields()
.iter()
.map(|field| field.name().as_str())
.collect();
match file_column_name {
Some(name) => {
if column_names.contains(name) {
return Err(DeltaTableError::Generic(format!(
"Unable to add file path column since column with name {name} exists"
)));
}
Ok(name.to_owned())
}
None => {
let prefix = PATH_COLUMN;
let mut idx = 0;
let mut name = prefix.to_owned();
while column_names.contains(name.as_str()) {
idx += 1;
name = format!("{prefix}_{idx}");
}
Ok(name)
}
}
}
#[derive(Debug, Clone)]
pub struct DeltaScanConfigBuilder {
pub(super) include_file_column: bool,
pub(super) file_column_name: Option<String>,
pub(super) wrap_partition_values: Option<bool>,
pub(super) enable_parquet_pushdown: bool,
pub(super) schema: Option<SchemaRef>,
}
impl Default for DeltaScanConfigBuilder {
fn default() -> Self {
DeltaScanConfigBuilder {
include_file_column: false,
file_column_name: None,
wrap_partition_values: None,
enable_parquet_pushdown: true,
schema: None,
}
}
}
impl DeltaScanConfigBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_file_column(mut self, include: bool) -> Self {
self.include_file_column = include;
self.file_column_name = None;
self
}
pub fn with_file_column_name<S: ToString>(mut self, name: &S) -> Self {
self.file_column_name = Some(name.to_string());
self.include_file_column = true;
self
}
pub fn wrap_partition_values(mut self, wrap: bool) -> Self {
self.wrap_partition_values = Some(wrap);
self
}
pub fn with_parquet_pushdown(mut self, pushdown: bool) -> Self {
self.enable_parquet_pushdown = pushdown;
self
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult<DeltaScanConfig> {
let file_column_name = if self.include_file_column {
Some(resolve_file_column_name(
snapshot.input_schema().as_ref(),
self.file_column_name.as_deref(),
)?)
} else {
None
};
Ok(DeltaScanConfig {
file_column_name,
wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
enable_parquet_pushdown: self.enable_parquet_pushdown,
schema: self.schema.clone(),
schema_force_view_types: true,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaScanConfig {
pub file_column_name: Option<String>,
pub wrap_partition_values: bool,
pub enable_parquet_pushdown: bool,
pub schema_force_view_types: bool,
pub schema: Option<SchemaRef>,
}
impl Default for DeltaScanConfig {
fn default() -> Self {
Self::new()
}
}
impl DeltaScanConfig {
pub fn new() -> Self {
Self {
file_column_name: None,
wrap_partition_values: true,
enable_parquet_pushdown: true,
schema_force_view_types: true,
schema: None,
}
}
pub fn new_from_session(session: &dyn Session) -> Self {
let config_options = session.config().options();
Self {
file_column_name: None,
wrap_partition_values: true,
enable_parquet_pushdown: config_options.execution.parquet.pushdown_filters,
schema_force_view_types: config_options.execution.parquet.schema_force_view_types,
schema: None,
}
}
pub fn with_file_column_name<S: ToString>(mut self, name: S) -> Self {
self.file_column_name = Some(name.to_string());
self
}
pub fn with_wrap_partition_values(mut self, wrap: bool) -> Self {
self.wrap_partition_values = wrap;
self
}
pub fn with_parquet_pushdown(mut self, pushdown: bool) -> Self {
self.enable_parquet_pushdown = pushdown;
self
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
}
pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a EagerSnapshot,
log_store: LogStoreRef,
filter: Option<Expr>,
session: &'a dyn Session,
projection: Option<&'a Vec<usize>>,
limit: Option<usize>,
files: Option<&'a [Add]>,
config: Option<DeltaScanConfig>,
}
impl<'a> DeltaScanBuilder<'a> {
pub fn new(
snapshot: &'a EagerSnapshot,
log_store: LogStoreRef,
session: &'a dyn Session,
) -> Self {
DeltaScanBuilder {
snapshot,
log_store,
filter: None,
session,
projection: None,
limit: None,
files: None,
config: None,
}
}
pub fn with_filter(mut self, filter: Option<Expr>) -> Self {
self.filter = filter;
self
}
pub fn with_files(mut self, files: &'a [Add]) -> Self {
self.files = Some(files);
self
}
pub fn with_projection(mut self, projection: Option<&'a Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self {
self.config = Some(config);
self
}
pub async fn build(self) -> DeltaResult<DeltaScan> {
PROTOCOL.can_read_from(self.snapshot)?;
let config = match self.config {
Some(config) => config,
None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
};
let schema = match config.schema.clone() {
Some(value) => value,
None => self.snapshot.read_schema(),
};
let logical_schema = df_logical_schema(
self.snapshot,
&config.file_column_name,
Some(schema.clone()),
)?;
let logical_schema = if let Some(used_columns) = self.projection {
let mut fields = Vec::with_capacity(used_columns.len());
for idx in used_columns {
fields.push(logical_schema.field(*idx).to_owned());
}
if let Some(expr) = &self.filter {
let column_refs = expr.column_refs().into_iter().collect::<BTreeSet<_>>();
for c in column_refs {
let idx = logical_schema.index_of(c.name.as_str())?;
if !used_columns.contains(&idx) {
fields.push(logical_schema.field(idx).to_owned());
}
}
}
Arc::new(Schema::new(fields))
} else {
logical_schema
};
let df_schema = Arc::new(logical_schema.clone().to_dfschema()?);
let logical_filter = self
.filter
.clone()
.map(|expr| simplify_expr(self.session, df_schema.clone(), expr))
.transpose()?;
let pushdown_filter = self
.filter
.and_then(|expr| {
let predicates = split_conjunction(&expr);
let pushdown_filters =
get_pushdown_filters(&predicates, self.snapshot.metadata().partition_columns());
let filtered_predicates = predicates
.into_iter()
.zip(pushdown_filters.into_iter())
.filter_map(|(filter, pushdown)| {
if pushdown == TableProviderFilterPushDown::Inexact {
Some(filter.clone())
} else {
None
}
});
conjunction(filtered_predicates)
})
.map(|expr| simplify_expr(self.session, df_schema.clone(), expr))
.transpose()?;
let (files, files_scanned, files_pruned, _) = match self.files {
Some(files) => {
let files = files.to_owned();
let files_scanned = files.len();
(files, files_scanned, 0, None)
}
None => {
if logical_filter.is_none() && self.limit.is_none() {
let files = self
.snapshot
.file_views(&self.log_store, None)
.map_ok(|f| f.to_add())
.try_collect::<Vec<_>>()
.await?;
let files_scanned = files.len();
(files, files_scanned, 0, None)
} else {
let num_containers = self.snapshot.num_containers();
let files_to_prune = if let Some(predicate) = &logical_filter {
let pruning_predicate =
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
pruning_predicate.prune(self.snapshot)?
} else {
vec![true; num_containers]
};
let mut pruned_without_stats = Vec::new();
let mut rows_collected = 0;
let mut files = Vec::with_capacity(num_containers);
let file_actions: Vec<_> = self
.snapshot
.file_views(&self.log_store, None)
.map_ok(|f| f.to_add())
.try_collect::<Vec<_>>()
.await?;
for (action, keep) in
file_actions.into_iter().zip(files_to_prune.iter().cloned())
{
if keep {
if let Some(limit) = self.limit {
if let Some(stats) = action.get_stats()? {
if rows_collected <= limit as i64 {
rows_collected += stats.num_records;
files.push(action.to_owned());
} else {
break;
}
} else {
pruned_without_stats.push(action.to_owned());
}
} else {
files.push(action.to_owned());
}
}
}
if let Some(limit) = self.limit
&& rows_collected < limit as i64
{
files.extend(pruned_without_stats);
}
let files_scanned = files.len();
let files_pruned = num_containers - files_scanned;
(files, files_scanned, files_pruned, Some(files_to_prune))
}
}
};
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
let table_partition_cols = &self.snapshot.metadata().partition_columns();
for action in files.iter() {
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
if config.file_column_name.is_some() {
let partition_value = if config.wrap_partition_values {
wrap_file_id_value(action.path.clone())
} else {
ScalarValue::Utf8(Some(action.path.clone()))
};
part.partition_values.push(partition_value);
}
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
}
let file_schema = Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));
let mut table_partition_cols = table_partition_cols
.iter()
.map(|name| schema.field_with_name(name).map(|f| f.to_owned()))
.collect::<Result<Vec<_>, ArrowError>>()?;
if let Some(file_column_name) = &config.file_column_name {
let field_name_datatype = if config.wrap_partition_values {
file_id_data_type()
} else {
DataType::Utf8
};
table_partition_cols.push(Field::new(
file_column_name.clone(),
field_name_datatype,
false,
));
}
let parquet_options = TableParquetOptions {
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};
let partition_fields: Vec<Arc<Field>> =
table_partition_cols.into_iter().map(Arc::new).collect();
let table_schema = TableSchema::new(file_schema, partition_fields);
let mut file_source =
ParquetSource::new(table_schema).with_table_parquet_options(parquet_options);
if let Some(predicate) = pushdown_filter
&& config.enable_parquet_pushdown
{
file_source = file_source.with_predicate(predicate);
};
let file_scan_config =
FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(file_source))
.with_file_groups(
if file_groups.is_empty() {
vec![FileGroup::from(vec![])]
} else {
file_groups.into_values().map(FileGroup::from).collect()
},
)
.with_projection_indices(self.projection.cloned())?
.with_limit(self.limit)
.build();
let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics)
.global_counter("files_scanned")
.add(files_scanned);
MetricBuilder::new(&metrics)
.global_counter("files_pruned")
.add(files_pruned);
Ok(DeltaScan {
table_url: self.log_store.root_url().clone(),
parquet_scan: DataSourceExec::from_data_source(file_scan_config),
config,
logical_schema,
metrics,
})
}
}
pub struct TableProviderBuilder {
log_store: Option<Arc<dyn LogStore>>,
snapshot: Option<SnapshotWrapper>,
session: Option<Arc<dyn Session>>,
file_column: Option<String>,
table_version: Option<Version>,
file_skipping_predicates: Option<Vec<Expr>>,
file_selection: Option<next::FileSelection>,
}
impl fmt::Debug for TableProviderBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TableProviderBuilder")
.field("log_store", &self.log_store)
.field("snapshot", &self.snapshot)
.field("has_session", &self.session.is_some())
.field("file_column", &self.file_column)
.field("table_version", &self.table_version)
.field("file_skipping_predicates", &self.file_skipping_predicates)
.field("file_selection", &self.file_selection)
.finish()
}
}
impl Default for TableProviderBuilder {
fn default() -> Self {
Self::new()
}
}
impl TableProviderBuilder {
fn new() -> Self {
Self {
log_store: None,
snapshot: None,
session: None,
file_column: None,
table_version: None,
file_skipping_predicates: None,
file_selection: None,
}
}
pub fn with_log_store(mut self, log_store: impl Into<Arc<dyn LogStore>>) -> Self {
self.log_store = Some(log_store.into());
self
}
pub fn with_eager_snapshot(mut self, snapshot: impl Into<Arc<EagerSnapshot>>) -> Self {
self.snapshot = Some(SnapshotWrapper::EagerSnapshot(snapshot.into()));
self
}
pub fn with_snapshot(mut self, snapshot: impl Into<Arc<Snapshot>>) -> Self {
self.snapshot = Some(SnapshotWrapper::Snapshot(snapshot.into()));
self
}
pub fn with_session<S>(mut self, session: Arc<S>) -> Self
where
S: Session + 'static,
{
self.session = Some(session);
self
}
pub fn with_table_version(mut self, version: impl Into<Option<Version>>) -> Self {
self.table_version = version.into();
self
}
pub fn with_file_column(mut self, file_column: impl ToString) -> Self {
self.file_column = Some(file_column.to_string());
self
}
pub(crate) fn with_file_skipping_predicates(
mut self,
file_skipping_predicates: impl IntoIterator<Item = Expr>,
) -> Self {
self.file_skipping_predicates = Some(file_skipping_predicates.into_iter().collect());
self
}
pub(crate) fn with_file_selection(mut self, file_selection: next::FileSelection) -> Self {
self.file_selection = Some(file_selection);
self
}
pub async fn build(self) -> Result<next::DeltaScan> {
let TableProviderBuilder {
log_store,
snapshot,
session,
file_column,
table_version,
file_skipping_predicates,
file_selection,
} = self;
let mut config = session
.as_ref()
.map_or_else(DeltaScanConfig::new, |session| {
DeltaScanConfig::new_from_session(session.as_ref())
});
if let Some(file_column) = file_column {
config = config.with_file_column_name(file_column);
}
let snapshot = match snapshot {
Some(wrapper) => wrapper,
None => {
if let Some(log_store) = log_store.as_ref() {
SnapshotWrapper::Snapshot(
Snapshot::try_new(log_store, Default::default(), table_version)
.await?
.into(),
)
} else {
return Err(DataFusionError::Plan(
"Either a log store or a snapshot must be provided to build a Delta TableProvider".to_string(),
));
}
}
};
if let Some(log_store) = log_store.as_ref() {
let snapshot_root_identity = canonical_table_root_identity(
snapshot.snapshot().scan_builder().build()?.table_root(),
);
let log_store_root = log_store.table_root_url();
let log_store_root_identity = canonical_table_root_identity(&log_store_root);
let snapshot_root_redacted = next::redact_url_for_error(&snapshot_root_identity);
let log_store_root_redacted = next::redact_url_for_error(&log_store_root_identity);
if snapshot_root_identity != log_store_root_identity {
return Err(DataFusionError::Plan(format!(
"Provided snapshot root ({snapshot_root_redacted}) does not match provided log store root ({log_store_root_redacted})"
)));
}
}
let mut provider = next::DeltaScan::new(snapshot, config)?;
if let Some(log_store) = log_store {
provider = provider.with_log_store(log_store);
}
if let Some(skipping) = file_skipping_predicates {
for term in &skipping {
let mut visitor = FindFilesExprProperties::default();
term.visit(&mut visitor)?;
visitor.result?;
}
provider = provider.with_file_skipping_predicate(skipping);
}
if let Some(file_selection) = file_selection {
provider = provider.with_file_selection(file_selection);
}
Ok(provider)
}
}
fn canonical_table_root_identity(root: &url::Url) -> url::Url {
let mut root = next::ensure_table_root_url(&normalize_table_url(root));
let _ = root.set_username("");
let _ = root.set_password(None);
root.set_query(None);
root.set_fragment(None);
root
}
impl std::future::IntoFuture for TableProviderBuilder {
type Output = Result<Arc<dyn TableProvider>>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move { Ok(Arc::new(this.build().await?) as _) })
}
}
impl DeltaTable {
pub fn table_provider(&self) -> TableProviderBuilder {
let mut builder = TableProviderBuilder::new().with_log_store(self.log_store());
if let Ok(state) = self.snapshot() {
builder = builder.with_snapshot(state.snapshot().snapshot().clone());
}
builder
}
pub fn update_datafusion_session(&self, session: &dyn Session) -> DeltaResult<()> {
crate::delta_datafusion::DeltaSessionExt::ensure_object_store_registered(
session,
self.log_store().as_ref(),
None,
)
}
}
pub(crate) fn update_datafusion_session(
session: &dyn Session,
log_store: &dyn LogStore,
operation_id: Option<Uuid>,
) -> DeltaResult<()> {
crate::delta_datafusion::DeltaSessionExt::ensure_object_store_registered(
session,
log_store,
operation_id,
)
}
#[derive(Debug)]
pub struct DeltaTableProvider {
snapshot: EagerSnapshot,
log_store: LogStoreRef,
config: DeltaScanConfig,
schema: Arc<Schema>,
files: Option<Vec<Add>>,
}
impl DeltaTableProvider {
pub fn try_new(
snapshot: EagerSnapshot,
log_store: LogStoreRef,
config: DeltaScanConfig,
) -> DeltaResult<Self> {
Ok(DeltaTableProvider {
schema: df_logical_schema(&snapshot, &config.file_column_name, config.schema.clone())?,
snapshot,
log_store,
config,
files: None,
})
}
pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
self.files = Some(files);
self
}
}
#[async_trait::async_trait]
impl TableProvider for DeltaTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn get_table_definition(&self) -> Option<&str> {
None
}
fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
None
}
async fn scan(
&self,
session: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
session.ensure_log_store_registered(self.log_store.as_ref())?;
let filter_expr = conjunction(filters.iter().cloned());
let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
.with_scan_config(self.config.clone());
if let Some(files) = &self.files {
scan = scan.with_files(files);
}
Ok(Arc::new(scan.build().await?))
}
fn supports_filters_pushdown(
&self,
filter: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(get_pushdown_filters(
filter,
self.snapshot.metadata().partition_columns(),
))
}
async fn insert_into(
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
state.ensure_log_store_registered(self.log_store.as_ref())?;
let save_mode = match insert_op {
InsertOp::Append => SaveMode::Append,
InsertOp::Overwrite => SaveMode::Overwrite,
InsertOp::Replace => {
return Err(DataFusionError::Plan(
"Replace operation is not supported for DeltaTableProvider".to_string(),
));
}
};
let data_sink =
data_sink::DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(data_sink),
None,
)))
}
}
#[derive(Debug)]
pub struct DeltaScan {
table_url: Url,
pub(crate) config: DeltaScanConfig,
pub(crate) parquet_scan: Arc<dyn ExecutionPlan>,
pub(crate) logical_schema: Arc<Schema>,
metrics: ExecutionPlanMetricsSet,
}
impl DeltaScan {
pub(crate) fn new(
table_url: &Url,
config: DeltaScanConfig,
parquet_scan: Arc<dyn ExecutionPlan>,
logical_schema: Arc<Schema>,
) -> Self {
Self {
table_url: normalize_table_url(table_url),
metrics: ExecutionPlanMetricsSet::new(),
config,
parquet_scan,
logical_schema,
}
}
}
#[non_exhaustive]
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct DeltaScanWire {
pub(crate) table_url: Url,
pub(crate) config: DeltaScanConfig,
pub(crate) logical_schema: Arc<Schema>,
}
impl From<&DeltaScan> for DeltaScanWire {
fn from(scan: &DeltaScan) -> Self {
Self {
table_url: scan.table_url.clone(),
config: scan.config.clone(),
logical_schema: scan.logical_schema.clone(),
}
}
}
impl DisplayAs for DeltaScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "DeltaScan")
}
}
impl ExecutionPlan for DeltaScan {
fn name(&self) -> &str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.parquet_scan.schema()
}
fn properties(&self) -> &Arc<PlanProperties> {
self.parquet_scan.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.parquet_scan]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(format!(
"DeltaScan wrong number of children {}",
children.len()
)));
}
Ok(Arc::new(DeltaScan {
table_url: self.table_url.clone(),
config: self.config.clone(),
parquet_scan: children[0].clone(),
logical_schema: self.logical_schema.clone(),
metrics: self.metrics.clone(),
}))
}
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(parquet_scan) = self.parquet_scan.repartitioned(target_partitions, config)? {
Ok(Some(Arc::new(DeltaScan {
table_url: self.table_url.clone(),
config: self.config.clone(),
parquet_scan,
logical_schema: self.logical_schema.clone(),
metrics: self.metrics.clone(),
})))
} else {
Ok(None)
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.parquet_scan.execute(partition, context)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.parquet_scan.partition_statistics(partition)
}
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
}
fn df_logical_schema(
snapshot: &EagerSnapshot,
file_column_name: &Option<String>,
schema: Option<SchemaRef>,
) -> DeltaResult<SchemaRef> {
let input_schema = match schema {
Some(schema) => schema,
None => snapshot.input_schema(),
};
let table_partition_cols = snapshot.metadata().partition_columns();
let mut fields: Vec<Arc<Field>> = input_schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect();
for partition_col in table_partition_cols.iter() {
fields.push(Arc::new(
input_schema
.field_with_name(partition_col)
.unwrap()
.to_owned(),
));
}
if let Some(file_column_name) = file_column_name {
fields.push(Arc::new(Field::new(file_column_name, DataType::Utf8, true)));
}
Ok(Arc::new(Schema::new(fields)))
}
pub(crate) fn simplify_expr(
session: &dyn Session,
df_schema: DFSchemaRef,
expr: Expr,
) -> Result<Arc<dyn PhysicalExpr>> {
let execution_props = session.execution_props();
let context = SimplifyContext::default()
.with_schema(df_schema.clone())
.with_query_execution_start_time(execution_props.query_execution_start_time.clone())
.with_config_options(
execution_props
.config_options()
.cloned()
.unwrap_or_else(|| session.config().options().clone()),
);
let simplifier = ExprSimplifier::new(context).with_max_cycles(10);
session.create_physical_expr(simplifier.simplify(expr)?, df_schema.as_ref())
}
fn get_pushdown_filters(
filter: &[&Expr],
partition_cols: &[String],
) -> Vec<TableProviderFilterPushDown> {
filter
.iter()
.cloned()
.map(|expr| {
let applicable = expr_is_exact_predicate_for_cols(partition_cols, expr);
if !expr.column_refs().is_empty() && applicable {
TableProviderFilterPushDown::Exact
} else {
TableProviderFilterPushDown::Inexact
}
})
.collect()
}
fn expr_is_exact_predicate_for_cols(partition_cols: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| match expr {
Expr::Column(Column { name, .. }) => {
is_applicable &= partition_cols.contains(name);
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
}
Expr::BinaryExpr(BinaryExpr { op, .. }) => {
is_applicable &= matches!(
op,
Operator::And
| Operator::Or
| Operator::NotEq
| Operator::Eq
| Operator::Gt
| Operator::GtEq
| Operator::Lt
| Operator::LtEq
);
if is_applicable {
Ok(TreeNodeRecursion::Continue)
} else {
Ok(TreeNodeRecursion::Stop)
}
}
Expr::Literal(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::Between(_)
| Expr::InList(_) => Ok(TreeNodeRecursion::Continue),
_ => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
})
.unwrap();
is_applicable
}
fn partitioned_file_from_action(
action: &Add,
partition_columns: &[String],
schema: &Schema,
) -> PartitionedFile {
let partition_values = partition_columns
.iter()
.map(|part| {
action
.partition_values
.get(part)
.map(|val| {
schema
.field_with_name(part)
.map(|field| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
field.data_type(),
)
.unwrap_or(Some(ScalarValue::Null))
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(field.data_type())
.unwrap_or(ScalarValue::Null),
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
})
.collect::<Vec<_>>();
let ts_secs = action.modification_time / 1000;
let ts_ns = (action.modification_time % 1000) * 1_000_000;
let last_modified = Utc.from_utc_datetime(
&DateTime::from_timestamp(ts_secs, ts_ns as u32)
.unwrap()
.naive_utc(),
);
PartitionedFile {
object_meta: ObjectMeta {
last_modified,
..action.try_into().unwrap()
},
partition_values,
range: None,
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
}
}
#[cfg(test)]
mod tests {
use crate::kernel::{DataType, PrimitiveType, StructField, StructType};
use crate::operations::create::CreateBuilder;
use crate::test_utils::object_store::{
drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
};
use crate::{DeltaTable, DeltaTableConfig, DeltaTableError};
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use chrono::{TimeZone, Utc};
use datafusion::common::ScalarValue;
use datafusion::datasource::MemTable;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::physical_plan::{collect_partitioned, displayable};
use datafusion::prelude::{and, col, lit};
use object_store::path::Path;
use std::sync::Arc;
use url::Url;
use super::*;
async fn create_in_memory_id_table() -> Result<DeltaTable, DeltaTableError> {
let schema = StructType::try_new(vec![StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
)])?;
DeltaTable::new_in_memory()
.create()
.with_columns(schema.fields().cloned())
.await
}
async fn create_test_table() -> Result<DeltaTable, DeltaTableError> {
use tempfile::TempDir;
let tmp_dir = TempDir::new().unwrap();
let table_path = tmp_dir.path().to_str().unwrap();
let schema = StructType::try_new(vec![
StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Long),
false,
),
StructField::new(
"value".to_string(),
DataType::Primitive(PrimitiveType::String),
false,
),
])?;
CreateBuilder::new()
.with_location(table_path)
.with_columns(schema.fields().cloned())
.await
}
fn create_test_session_state() -> Arc<SessionState> {
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionConfig;
let config = SessionConfig::new();
Arc::new(SessionStateBuilder::new().with_config(config).build())
}
#[test]
fn test_resolve_file_column_name_avoids_collisions() {
let schema = Schema::new(vec![
Field::new("id", ArrowDataType::Int64, false),
Field::new(PATH_COLUMN, ArrowDataType::Utf8, true),
Field::new(format!("{PATH_COLUMN}_1"), ArrowDataType::Utf8, true),
]);
let resolved = resolve_file_column_name(&schema, None).unwrap();
assert_eq!(resolved, format!("{PATH_COLUMN}_2"));
}
#[test]
fn test_resolve_file_column_name_rejects_explicit_collision() {
let schema = Schema::new(vec![
Field::new("id", ArrowDataType::Int64, false),
Field::new("file_col", ArrowDataType::Utf8, true),
]);
let err = resolve_file_column_name(&schema, Some("file_col")).unwrap_err();
assert!(
err.to_string()
.contains("Unable to add file path column since column with name file_col exists")
);
}
#[tokio::test]
async fn test_insert_into_simple() {
let table = create_test_table().await.unwrap();
let session_state = create_test_session_state();
let scan_config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let table_provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot().clone(),
table.log_store(),
scan_config,
)
.unwrap();
let schema = table_provider.schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["test1", "test2"])),
],
)
.unwrap();
let mem_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap());
let logical_plan = mem_table
.scan(&*session_state, None, &[], None)
.await
.unwrap();
let result_plan = table_provider
.insert_into(&*session_state, logical_plan, InsertOp::Append)
.await
.unwrap();
assert!(!result_plan.schema().fields().is_empty());
let result_schema = result_plan.schema();
assert_eq!(result_schema.fields().len(), 1);
assert_eq!(result_schema.field(0).name(), "count");
assert_eq!(
result_schema.field(0).data_type(),
&arrow::datatypes::DataType::UInt64
);
assert_eq!(result_plan.children().len(), 1);
assert!(result_plan.metrics().is_some());
}
#[tokio::test]
async fn test_table_provider_from_loaded_table_supports_insert_into() {
let table = create_in_memory_id_table().await.unwrap();
let log_store = table.log_store();
let provider = table.table_provider().await.unwrap();
let session = Arc::new(crate::delta_datafusion::create_session().into_inner());
let state = session.state_ref().read().clone();
let schema = provider.schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![11, 13]))],
)
.unwrap();
let mem_table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
let input = mem_table.scan(&state, None, &[], None).await.unwrap();
let write_plan = provider
.insert_into(&state, input, InsertOp::Append)
.await
.unwrap();
let _write_batches: Vec<_> = collect_partitioned(write_plan, session.task_ctx())
.await
.unwrap()
.into_iter()
.flatten()
.collect();
let read_provider = next::DeltaScan::builder()
.with_log_store(log_store)
.await
.unwrap();
session
.register_table("delta_table", read_provider)
.unwrap();
let batches = session
.sql("SELECT id FROM delta_table ORDER BY id")
.await
.unwrap()
.collect()
.await
.unwrap();
let expected = vec!["+----+", "| id |", "+----+", "| 11 |", "| 13 |", "+----+"];
datafusion::assert_batches_sorted_eq!(&expected, &batches);
}
#[tokio::test]
async fn test_table_provider_from_loaded_table_reuses_seeded_snapshot_state() {
let base = crate::test_utils::TestTables::Simple
.table_builder()
.unwrap()
.build_storage()
.unwrap();
let (log_store, mut operations) = recording_log_store(base);
let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default());
table.load().await.unwrap();
drain_recorded_ops(&mut operations).await;
let provider = table.table_provider().await.unwrap();
let session = Arc::new(crate::delta_datafusion::create_session().into_inner());
let state = session.state_ref().read().clone();
let read_plan = provider.scan(&state, None, &[], None).await.unwrap();
let _read_batches: Vec<_> = collect_partitioned(read_plan, session.task_ctx())
.await
.unwrap()
.into_iter()
.flatten()
.collect();
let replay_ops = drain_recorded_ops(&mut operations).await;
assert!(
replay_ops.iter().all(|op| !op.is_log_replay_read()),
"expected loaded table provider scan to reuse seeded snapshot state, got {replay_ops:?}",
);
}
#[tokio::test]
async fn test_builder_allows_matching_snapshot_and_log_store_root() {
let table = create_in_memory_id_table().await.unwrap();
let snapshot = table.snapshot().unwrap().snapshot().snapshot().clone();
let log_store = table.log_store();
let provider = next::DeltaScan::builder()
.with_snapshot(snapshot)
.with_log_store(log_store)
.build()
.await;
assert!(
provider.is_ok(),
"unexpected error: {}",
provider.unwrap_err()
);
}
#[tokio::test]
async fn test_builder_allows_same_root_with_different_query_on_log_store() {
let one_url = Url::parse("memory:///same-root?snap-token").unwrap();
let one_store =
crate::logstore::logstore_for(&one_url, crate::logstore::StorageConfig::default())
.unwrap();
let schema = StructType::try_new(vec![StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
)])
.unwrap();
let table_one = CreateBuilder::new()
.with_log_store(one_store)
.with_columns(schema.fields().cloned())
.await
.unwrap();
let snapshot = table_one.snapshot().unwrap().snapshot().snapshot().clone();
let two_url = Url::parse("memory:///same-root?log-token").unwrap();
let two_store =
crate::logstore::logstore_for(&two_url, crate::logstore::StorageConfig::default())
.unwrap();
let provider = next::DeltaScan::builder()
.with_snapshot(snapshot)
.with_log_store(two_store)
.build()
.await;
assert!(
provider.is_ok(),
"unexpected error: {}",
provider.unwrap_err()
);
}
#[tokio::test]
async fn test_builder_rejects_mismatched_snapshot_and_log_store() {
let one_url = Url::parse("memory:///same-root?snap-token").unwrap();
let one_store =
crate::logstore::logstore_for(&one_url, crate::logstore::StorageConfig::default())
.unwrap();
let schema = StructType::try_new(vec![StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
)])
.unwrap();
let table_one = CreateBuilder::new()
.with_log_store(one_store)
.with_columns(schema.fields().cloned())
.await
.unwrap();
let snapshot = table_one.snapshot().unwrap().snapshot().snapshot().clone();
let two_url = Url::parse("memory:///different-root?log-token").unwrap();
let two_store =
crate::logstore::logstore_for(&two_url, crate::logstore::StorageConfig::default())
.unwrap();
let err = next::DeltaScan::builder()
.with_snapshot(snapshot)
.with_log_store(two_store)
.build()
.await
.unwrap_err();
let err_str = err.to_string();
assert!(
err_str.contains("snapshot") || err_str.contains("Snapshot"),
"unexpected error: {err_str}"
);
assert!(
err_str.contains("log store") || err_str.contains("log_store"),
"unexpected error: {err_str}"
);
assert!(
!err_str.contains("snap-token"),
"unexpected error: {err_str}"
);
assert!(
!err_str.contains("log-token"),
"unexpected error: {err_str}"
);
}
#[tokio::test]
async fn test_builder_with_file_selection_propagates_to_scan() {
let log_store = crate::test_utils::TestTables::Simple
.table_builder()
.unwrap()
.build_storage()
.unwrap();
let snapshot = Arc::new(
crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
.await
.unwrap(),
);
let table_root = snapshot
.scan_builder()
.build()
.unwrap()
.table_root()
.clone();
let missing_file_id = table_root
.join("__does_not_exist__.parquet")
.unwrap()
.to_string();
let provider = next::DeltaScan::builder()
.with_snapshot(snapshot)
.with_file_selection(next::FileSelection::new([missing_file_id]))
.build()
.await
.unwrap();
let session = Arc::new(crate::delta_datafusion::create_session().into_inner());
let state = session.state_ref().read().clone();
let err = provider.scan(&state, None, &[], None).await.unwrap_err();
let err_str = err.to_string();
assert!(
err_str.contains("File selection contains"),
"unexpected error: {err_str}"
);
}
#[test]
fn test_canonical_table_root_identity_strips_username_query_and_fragment() {
let url =
Url::parse("https://urluser:urlpassword@example.com/path?token=abc#frag").unwrap();
let canonical = canonical_table_root_identity(&url);
assert_eq!(canonical.username(), "");
assert!(canonical.password().is_none());
assert!(canonical.query().is_none());
assert!(canonical.fragment().is_none());
}
#[test]
fn test_partitioned_file_from_action() {
let mut partition_values = std::collections::HashMap::new();
partition_values.insert("month".to_string(), Some("1".to_string()));
partition_values.insert("year".to_string(), Some("2015".to_string()));
let action = Add {
path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
size: 10644,
partition_values,
modification_time: 1660497727833,
data_change: true,
stats: None,
deletion_vector: None,
tags: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};
let schema = Schema::new(vec![
Field::new("year", ArrowDataType::Int64, true),
Field::new("month", ArrowDataType::Int64, true),
]);
let part_columns = vec!["year".to_string(), "month".to_string()];
let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None,
version: None,
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
range: None,
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
};
assert_eq!(file.partition_values, ref_file.partition_values)
}
#[tokio::test]
async fn test_scan_with_projection_has_stable_schema_for_filters() {
let columns = [
StructField::new(
"v1".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
),
StructField::new(
"v2".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
),
StructField::new(
"v3".to_string(),
DataType::Primitive(PrimitiveType::Long),
true,
),
];
let table = DeltaTable::new_in_memory()
.create()
.with_columns(columns)
.await
.unwrap();
let session_state = create_test_session_state();
let scan_config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let table_provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot().clone(),
table.log_store(),
scan_config,
)
.unwrap();
let filter = and(col("v2").eq(lit(2)), col("v3").eq(lit(3)));
let scan = table_provider
.scan(session_state.as_ref(), Some(&vec![0]), &[filter], None)
.await
.unwrap();
assert_eq!(
displayable(scan.as_ref()).indent(false).to_string(),
"\
DeltaScan
DataSourceExec: file_groups={1 group: [[]]}, projection=[v1], file_type=parquet, predicate=v2@1 = CAST(2 AS Int64) AND v3@2 = CAST(3 AS Int64), pruning_predicate=v2_null_count@2 != row_count@3 AND v2_min@0 <= 2 AND 2 <= v2_max@1 AND v3_null_count@6 != row_count@3 AND v3_min@4 <= 3 AND 3 <= v3_max@5, required_guarantees=[]\n"
)
}
}