use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use std::time::Instant;
use delta_kernel_derive::internal_api;
use itertools::Itertools;
use tracing::{debug, info};
use url::Url;
use crate::metrics::MetricId;
use crate::scan::metrics::ScanMetrics;
use crate::utils::IteratorExt;
use self::data_skipping::as_checkpoint_skipping_predicate;
use self::log_replay::get_scan_metadata_transform_expr;
use crate::actions::deletion_vector::{
deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor,
};
use crate::actions::{get_commit_schema, Add, ADD_NAME, REMOVE_NAME};
use crate::engine_data::FilteredEngineData;
use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Scalar};
use crate::kernel_predicates::{
DefaultKernelPredicateEvaluator, EmptyColumnResolver, KernelPredicateEvaluator as _,
};
use crate::log_replay::{ActionsBatch, HasSelectionVector};
use crate::log_segment::{ActionsWithCheckpointInfo, CheckpointReadInfo, LogSegment};
use crate::log_segment_files::LogSegmentFiles;
use crate::metrics::ScanType;
use crate::parallel::sequential_phase::SequentialPhase;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::log_replay::{
BASE_ROW_ID_NAME, CLUSTERING_PROVIDER_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME,
};
use crate::scan::state_info::StateInfo;
use crate::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField, StructType,
ToSchema as _,
};
use crate::table_features::{ColumnMappingMode, Operation};
use crate::transforms::{ExpressionTransform, SchemaTransform};
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, SnapshotRef, Version};
use self::log_replay::scan_action_iter;
pub(crate) mod data_skipping;
pub(crate) mod field_classifiers;
pub mod log_replay;
pub(crate) mod metrics;
pub mod state;
pub(crate) mod state_info;
pub(crate) mod transform_spec;
#[cfg(test)]
pub(crate) mod test_utils;
#[cfg(test)]
mod tests;
#[allow(clippy::unwrap_used)]
pub(crate) static COMMIT_READ_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
get_commit_schema()
.project(&[ADD_NAME, REMOVE_NAME])
.unwrap()
});
#[allow(clippy::unwrap_used)]
pub(crate) static CHECKPOINT_READ_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| get_commit_schema().project(&[ADD_NAME]).unwrap());
pub(crate) static CHECKPOINT_READ_SCHEMA_NO_STATS: LazyLock<SchemaRef> = LazyLock::new(|| {
let add_schema = Add::to_schema();
let fields_no_stats: Vec<_> = add_schema
.fields()
.filter(|f| f.name() != "stats")
.cloned()
.collect();
let add_no_stats = StructType::new_unchecked(fields_no_stats);
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME,
add_no_stats,
)]))
});
#[allow(unused)]
pub use crate::parallel::parallel_scan_metadata::{
AfterSequentialScanMetadata, ParallelScanMetadata, ParallelState, SequentialScanMetadata,
};
#[derive(Debug, Clone)]
pub enum StatsOutputMode {
AllColumns,
Columns(Vec<ColumnName>),
Skip,
}
impl Default for StatsOutputMode {
fn default() -> Self {
StatsOutputMode::Columns(Vec::new())
}
}
pub struct ScanBuilder {
snapshot: SnapshotRef,
schema: Option<SchemaRef>,
predicate: Option<PredicateRef>,
stats_output_mode: StatsOutputMode,
}
impl std::fmt::Debug for ScanBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("ScanBuilder")
.field("schema", &self.schema)
.field("predicate", &self.predicate)
.field("stats_output_mode", &self.stats_output_mode)
.finish()
}
}
impl ScanBuilder {
pub fn new(snapshot: impl Into<SnapshotRef>) -> Self {
Self {
snapshot: snapshot.into(),
schema: None,
predicate: None,
stats_output_mode: StatsOutputMode::default(),
}
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn with_schema_opt(self, schema_opt: Option<SchemaRef>) -> Self {
match schema_opt {
Some(schema) => self.with_schema(schema),
None => self,
}
}
pub fn with_predicate(mut self, predicate: impl Into<Option<PredicateRef>>) -> Self {
self.predicate = predicate.into();
self
}
pub fn include_all_stats_columns(mut self) -> Self {
self.stats_output_mode = StatsOutputMode::AllColumns;
self
}
pub fn with_stats_columns(mut self, columns: Vec<ColumnName>) -> Self {
self.stats_output_mode = StatsOutputMode::Columns(columns);
self
}
pub fn with_skip_stats(mut self, skip_stats: bool) -> Self {
if skip_stats {
self.stats_output_mode = StatsOutputMode::Skip;
}
self
}
pub fn build(self) -> DeltaResult<Scan> {
let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema());
self.snapshot
.table_configuration()
.ensure_operation_supported(Operation::Scan)?;
let state_info = StateInfo::try_new(
logical_schema,
self.snapshot.table_configuration(),
self.predicate,
self.stats_output_mode.clone(),
(), )?;
Ok(Scan {
snapshot: self.snapshot,
state_info: Arc::new(state_info),
stats_output_mode: self.stats_output_mode,
})
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum PhysicalPredicate {
Some(PredicateRef, SchemaRef),
StaticSkipAll,
None,
}
impl PhysicalPredicate {
pub(crate) fn try_new(
predicate: &Predicate,
logical_schema: &Schema,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<PhysicalPredicate> {
if can_statically_skip_all_files(predicate) {
return Ok(PhysicalPredicate::StaticSkipAll);
}
let unresolved_references = predicate.references();
let mut folded_references: HashMap<Vec<String>, Vec<&ColumnName>> = HashMap::new();
for r in &unresolved_references {
let folded: Vec<String> = r.iter().map(|s| s.to_lowercase()).collect();
folded_references.entry(folded).or_default().push(r);
}
let mut get_referenced_fields = GetReferencedFields {
unresolved_references,
folded_references,
column_mappings: HashMap::new(),
logical_path: vec![],
folded_logical_path: vec![],
physical_path: vec![],
column_mapping_mode,
};
let schema_opt = get_referenced_fields.transform_struct(logical_schema);
let mut unresolved = get_referenced_fields.unresolved_references.into_iter();
if let Some(unresolved) = unresolved.next() {
return Err(Error::missing_column(format!(
"Predicate references unknown column: {unresolved}"
)));
}
let Some(schema) = schema_opt else {
return Ok(PhysicalPredicate::None);
};
let mut apply_mappings = ApplyColumnMappings {
column_mappings: get_referenced_fields.column_mappings,
};
if let Some(predicate) = apply_mappings.transform_pred(predicate) {
Ok(PhysicalPredicate::Some(
Arc::new(predicate.into_owned()),
Arc::new(schema.into_owned()),
))
} else {
Ok(PhysicalPredicate::None)
}
}
}
fn can_statically_skip_all_files(predicate: &Predicate) -> bool {
let evaluator = DefaultKernelPredicateEvaluator::from(EmptyColumnResolver);
evaluator.eval_sql_where(predicate) == Some(false)
}
struct GetReferencedFields<'a> {
unresolved_references: HashSet<&'a ColumnName>,
folded_references: HashMap<Vec<String>, Vec<&'a ColumnName>>,
column_mappings: HashMap<ColumnName, ColumnName>,
logical_path: Vec<String>,
folded_logical_path: Vec<String>,
physical_path: Vec<String>,
column_mapping_mode: ColumnMappingMode,
}
impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> {
fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
let pred_cols = self
.folded_references
.remove(self.folded_logical_path.as_slice())?;
let physical = ColumnName::new(&self.physical_path);
for pred_col in pred_cols {
self.unresolved_references.remove(pred_col);
self.column_mappings
.insert(pred_col.clone(), physical.clone());
}
Some(Cow::Borrowed(ptype))
}
fn transform_array(&mut self, _: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
None
}
fn transform_map(&mut self, _: &'a MapType) -> Option<Cow<'a, MapType>> {
None
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
let physical_name = field.physical_name(self.column_mapping_mode);
self.logical_path.push(field.name.clone());
self.folded_logical_path.push(field.name.to_lowercase());
self.physical_path.push(physical_name.to_string());
let field = self.recurse_into_struct_field(field);
self.logical_path.pop();
self.folded_logical_path.pop();
self.physical_path.pop();
Some(Cow::Owned(field?.with_name(physical_name)))
}
}
struct PrefixColumns {
prefix: ColumnName,
}
impl<'a> ExpressionTransform<'a> for PrefixColumns {
fn transform_expr_column(&mut self, name: &'a ColumnName) -> Option<Cow<'a, ColumnName>> {
Some(Cow::Owned(self.prefix.join(name)))
}
}
struct ApplyColumnMappings {
column_mappings: HashMap<ColumnName, ColumnName>,
}
impl<'a> ExpressionTransform<'a> for ApplyColumnMappings {
fn transform_expr_column(&mut self, name: &'a ColumnName) -> Option<Cow<'a, ColumnName>> {
self.column_mappings
.get(name)
.map(|physical_name| Cow::Owned(physical_name.clone()))
}
}
static RESTORED_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
StructType::new_unchecked(vec![StructField::nullable(
"add",
StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null("partitionValues", partition_values),
StructField::not_null("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable(
"tags",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
StructField::nullable(CLUSTERING_PROVIDER_NAME, DataType::STRING),
]),
)])
.into()
});
pub(crate) fn restored_add_schema() -> &'static SchemaRef {
&RESTORED_ADD_SCHEMA
}
pub fn get_transform_for_row(
row: usize,
transforms: &[Option<ExpressionRef>],
) -> Option<ExpressionRef> {
transforms.get(row).cloned().flatten()
}
pub struct ScanMetadata {
pub scan_files: FilteredEngineData,
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
}
impl ScanMetadata {
fn try_new(
data: Box<dyn EngineData>,
selection_vector: Vec<bool>,
scan_file_transforms: Vec<Option<ExpressionRef>>,
) -> DeltaResult<Self> {
Ok(Self {
scan_files: FilteredEngineData::try_new(data, selection_vector)?,
scan_file_transforms,
})
}
}
impl HasSelectionVector for ScanMetadata {
fn has_selected_rows(&self) -> bool {
self.scan_files.selection_vector().contains(&true)
}
}
pub struct Scan {
snapshot: SnapshotRef,
state_info: Arc<StateInfo>,
stats_output_mode: StatsOutputMode,
}
impl std::fmt::Debug for Scan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("Scan")
.field("schema", &self.state_info.logical_schema)
.field("predicate", &self.state_info.physical_predicate)
.field("stats_output_mode", &self.stats_output_mode)
.finish()
}
}
impl Scan {
fn skip_stats(&self) -> bool {
matches!(self.stats_output_mode, StatsOutputMode::Skip)
}
pub fn table_root(&self) -> &Url {
self.snapshot.table_root()
}
pub fn snapshot(&self) -> &SnapshotRef {
&self.snapshot
}
pub fn logical_schema(&self) -> &SchemaRef {
&self.state_info.logical_schema
}
pub fn physical_schema(&self) -> &SchemaRef {
&self.state_info.physical_schema
}
pub fn physical_predicate(&self) -> Option<PredicateRef> {
if let PhysicalPredicate::Some(ref predicate, _) = self.state_info.physical_predicate {
Some(predicate.clone())
} else {
None
}
}
pub fn scan_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
let actions_with_checkpoint_info = self.replay_for_scan_metadata(engine)?;
self.scan_metadata_inner(engine, actions_with_checkpoint_info)
}
#[allow(unused)]
#[internal_api]
pub(crate) fn scan_metadata_from(
&self,
engine: &dyn Engine,
existing_version: Version,
existing_data: impl IntoIterator<Item = Box<dyn EngineData>> + 'static,
_existing_predicate: Option<PredicateRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> {
if existing_version > self.snapshot.version() {
return Err(Error::Generic(format!(
"existing_version {} is greater than current version {}",
existing_version,
self.snapshot.version()
)));
}
let transform = engine.evaluation_handler().new_expression_evaluator(
scan_row_schema(),
get_scan_metadata_transform_expr(),
restored_add_schema().clone().into(),
)?;
let apply_transform = move |data: Box<dyn EngineData>| {
Ok(ActionsBatch::new(transform.evaluate(data.as_ref())?, false))
};
let log_segment = self.snapshot.log_segment();
if existing_version == self.snapshot.version() {
let actions_with_checkpoint_info = ActionsWithCheckpointInfo {
actions: existing_data.into_iter().map(apply_transform),
checkpoint_info: CheckpointReadInfo {
has_stats_parsed: false,
has_partition_values_parsed: false,
checkpoint_read_schema: restored_add_schema().clone(),
},
};
return Ok(Box::new(
self.scan_metadata_inner(engine, actions_with_checkpoint_info)?,
));
}
if matches!(log_segment.checkpoint_version, Some(v) if v > existing_version) {
return Ok(Box::new(self.scan_metadata(engine)?));
}
let mut ascending_commit_files = log_segment.listed.ascending_commit_files.clone();
ascending_commit_files.retain(|f| f.version > existing_version);
let log_segment_files = LogSegmentFiles {
ascending_commit_files,
latest_commit_file: log_segment.listed.latest_commit_file.clone(),
..Default::default()
};
let new_log_segment = LogSegment::try_new(
log_segment_files,
log_segment.log_root.clone(),
Some(log_segment.end_version),
None, )?;
let (checkpoint_schema, meta_predicate) = if self.skip_stats() {
(CHECKPOINT_READ_SCHEMA_NO_STATS.clone(), None)
} else {
(
CHECKPOINT_READ_SCHEMA.clone(),
self.build_actions_meta_predicate(),
)
};
let result = new_log_segment.read_actions_with_projected_checkpoint_actions(
engine,
COMMIT_READ_SCHEMA.clone(),
checkpoint_schema,
meta_predicate,
self.state_info
.physical_stats_schema
.as_ref()
.map(|s| s.as_ref()),
None,
)?;
let actions_with_checkpoint_info = ActionsWithCheckpointInfo {
actions: result
.actions
.chain(existing_data.into_iter().map(apply_transform)),
checkpoint_info: result.checkpoint_info,
};
Ok(Box::new(self.scan_metadata_inner(
engine,
actions_with_checkpoint_info,
)?))
}
fn scan_metadata_inner(
&self,
engine: &dyn Engine,
actions_with_checkpoint_info: ActionsWithCheckpointInfo<
impl Iterator<Item = DeltaResult<ActionsBatch>>,
>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
let start = Instant::now();
let reporter = engine.get_metrics_reporter();
let operation_id = MetricId::new();
let (iter, metrics) = match self.state_info.physical_predicate {
PhysicalPredicate::StaticSkipAll => {
info!("Predicate statically evaluated to false; skipping all files");
(None, Arc::new(ScanMetrics::default()))
}
_ => {
let (it, m) = scan_action_iter(
engine,
actions_with_checkpoint_info.actions,
self.state_info.clone(),
actions_with_checkpoint_info.checkpoint_info,
self.skip_stats(),
)?;
(Some(it), m)
}
};
let on_complete = move || {
let event = metrics.to_event(operation_id, ScanType::Full, start.elapsed());
info!(%event);
if let Some(r) = reporter {
r.report(event);
}
};
Ok(iter.into_iter().flatten().on_complete(on_complete))
}
fn replay_for_scan_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<
ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
> {
let (checkpoint_schema, meta_predicate) = if self.skip_stats() {
(CHECKPOINT_READ_SCHEMA_NO_STATS.clone(), None)
} else {
(
CHECKPOINT_READ_SCHEMA.clone(),
self.build_actions_meta_predicate(),
)
};
self.snapshot
.log_segment()
.read_actions_with_projected_checkpoint_actions(
engine,
COMMIT_READ_SCHEMA.clone(),
checkpoint_schema,
meta_predicate,
self.state_info
.physical_stats_schema
.as_ref()
.map(|s| s.as_ref()),
self.state_info
.physical_partition_schema
.as_ref()
.map(|s| s.as_ref()),
)
}
fn build_actions_meta_predicate(&self) -> Option<PredicateRef> {
let PhysicalPredicate::Some(ref predicate, _) = self.state_info.physical_predicate else {
return None;
};
self.state_info.physical_stats_schema.as_ref()?;
let partition_columns = self
.snapshot
.table_configuration()
.metadata()
.partition_columns();
let skipping_pred = as_checkpoint_skipping_predicate(predicate, partition_columns)?;
let mut prefixer = PrefixColumns {
prefix: ColumnName::new(["add", "stats_parsed"]),
};
let prefixed = prefixer.transform_pred(&skipping_pred)?;
Some(Arc::new(prefixed.into_owned()))
}
pub fn parallel_scan_metadata(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<SequentialScanMetadata> {
let checkpoint_read_schema = if self.skip_stats() {
CHECKPOINT_READ_SCHEMA_NO_STATS.clone()
} else {
CHECKPOINT_READ_SCHEMA.clone()
};
let checkpoint_info = CheckpointReadInfo {
has_stats_parsed: false,
has_partition_values_parsed: false,
checkpoint_read_schema,
};
let processor = ScanLogReplayProcessor::new(
engine.as_ref(),
self.state_info.clone(),
checkpoint_info,
self.skip_stats(),
)?;
let sequential =
SequentialPhase::try_new(processor, self.snapshot.log_segment(), engine.clone())?;
Ok(SequentialScanMetadata::new(sequential))
}
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>> {
fn scan_metadata_callback(batches: &mut Vec<state::ScanFile>, file: state::ScanFile) {
batches.push(file);
}
debug!(
"Executing scan with logical schema {:#?} and physical schema {:#?}",
self.state_info.logical_schema, self.state_info.physical_schema
);
let table_root = self.snapshot.table_root().clone();
let scan_metadata_iter = self.scan_metadata(engine.as_ref())?;
let scan_files_iter = scan_metadata_iter
.map(|res| {
let scan_metadata = res?;
let scan_files = vec![];
scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)
})
.flatten_ok();
let physical_schema = self.physical_schema().clone();
let logical_schema = self.logical_schema().clone();
let result = scan_files_iter
.map(move |scan_file| -> DeltaResult<_> {
let scan_file = scan_file?;
let file_path = table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine.as_ref(), &table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size.try_into().map_err(|_| {
Error::generic("Unable to convert scan file size into FileSize")
})?,
location: file_path,
};
let read_result_iter = engine.parquet_handler().read_parquet_files(
&[meta],
physical_schema.clone(),
None,
)?;
let mut read_result_iter = read_result_iter.peekable();
let expect_data = scan_file.stats.as_ref().is_some_and(|s| s.num_records > 0);
if expect_data && read_result_iter.peek().is_none() {
return Err(Error::internal_error(format!(
"ParquetHandler returned no data for file '{}'. This is likely a connector \
bug -- the handler's read_parquet_files must return at least one batch for \
each requested file that contains rows.",
scan_file.path
)));
}
let engine = engine.clone(); let physical_schema_inner = physical_schema.clone();
let logical_schema_inner = logical_schema.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
let logical = state::transform_to_logical(
engine.as_ref(),
read_result,
&physical_schema_inner,
&logical_schema_inner,
scan_file.transform.clone(), );
let len = logical.as_ref().map_or(0, |res| res.len());
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None);
let result = match sv {
Some(sv) => logical.and_then(|data| data.apply_selection_vector(sv)),
None => logical,
};
selection_vector = rest;
result
}))
})
.flatten_ok()
.map(|x| x?);
Ok(result)
}
}
pub fn scan_row_schema() -> SchemaRef {
log_replay::SCAN_ROW_SCHEMA.clone()
}
pub fn selection_vector(
engine: &dyn Engine,
descriptor: &DeletionVectorDescriptor,
table_root: &Url,
) -> DeltaResult<Vec<bool>> {
let storage = engine.storage_handler();
let dv_treemap = descriptor.read(storage, table_root)?;
Ok(deletion_treemap_to_bools(dv_treemap))
}