use std::clone::Clone;
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};
use delta_kernel_derive::internal_api;
use serde::{Deserialize, Serialize};
use super::data_skipping::DataSkippingFilter;
use super::metrics::ScanMetrics;
use super::state_info::StateInfo;
use super::{PhysicalPredicate, ScanMetadata};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{
column_expr, column_expr_ref, column_name, ColumnName, Expression, ExpressionRef, PredicateRef,
};
use crate::log_replay::deduplicator::{CheckpointDeduplicator, Deduplicator};
use crate::log_replay::{
ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor,
ParallelLogReplayProcessor,
};
use crate::log_segment::CheckpointReadInfo;
use crate::scan::transform_spec::{get_transform_expr, parse_partition_values, TransformSpec};
use crate::scan::Scalar;
use crate::schema::ToSchema as _;
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, ExpressionEvaluator};
#[derive(serde::Serialize, serde::Deserialize, Clone)]
#[serde(deny_unknown_fields)]
struct InternalScanState {
logical_schema: Arc<StructType>,
physical_schema: Arc<StructType>,
predicate_schema: Option<Arc<StructType>>,
transform_spec: Option<Arc<TransformSpec>>,
column_mapping_mode: ColumnMappingMode,
physical_stats_schema: Option<SchemaRef>,
#[serde(default)]
skip_stats: bool,
physical_partition_schema: Option<SchemaRef>,
}
#[derive(Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct SerializableScanState {
pub predicate: Option<PredicateRef>,
pub internal_state_blob: Vec<u8>,
pub seen_file_keys: HashSet<FileActionKey>,
pub(crate) checkpoint_info: CheckpointReadInfo,
}
#[allow(rustdoc::broken_intra_doc_links, rustdoc::private_intra_doc_links)]
pub struct ScanLogReplayProcessor {
data_skipping_filter: Option<DataSkippingFilter>,
log_transform: Arc<dyn ExpressionEvaluator>,
checkpoint_transform: Arc<dyn ExpressionEvaluator>,
state_info: Arc<StateInfo>,
seen_file_keys: HashSet<FileActionKey>,
skip_stats: bool,
checkpoint_info: CheckpointReadInfo,
metrics: Arc<ScanMetrics>,
}
impl ScanLogReplayProcessor {
const ADD_PATH_INDEX: usize = 0; const ADD_PARTITION_VALUES_INDEX: usize = 1; const ADD_DV_START_INDEX: usize = 2; const BASE_ROW_ID_INDEX: usize = 5; const REMOVE_PATH_INDEX: usize = 6; const REMOVE_DV_START_INDEX: usize = 7;
pub(crate) fn new(
engine: &dyn Engine,
state_info: Arc<StateInfo>,
checkpoint_info: CheckpointReadInfo,
skip_stats: bool,
) -> DeltaResult<Self> {
let dedup_capacity = state_info.dedup_capacity_hint();
Self::new_with_seen_files(
engine,
state_info,
checkpoint_info,
HashSet::with_capacity(dedup_capacity),
skip_stats,
)
}
pub(crate) fn new_with_seen_files(
engine: &dyn Engine,
state_info: Arc<StateInfo>,
checkpoint_info: CheckpointReadInfo,
seen_file_keys: HashSet<FileActionKey>,
skip_stats: bool,
) -> DeltaResult<Self> {
let CheckpointReadInfo {
has_stats_parsed,
has_partition_values_parsed,
checkpoint_read_schema,
} = checkpoint_info.clone();
let metrics = Arc::new(ScanMetrics::default());
let physical_predicate = match &state_info.physical_predicate {
PhysicalPredicate::Some(predicate, schema) => Some((predicate.clone(), schema.clone())),
_ => None,
};
let (stats_schema_for_transform, partition_schema_for_transform) = if skip_stats {
(None, None)
} else {
(
state_info.physical_stats_schema.clone(),
state_info.physical_partition_schema.clone(),
)
};
let output_schema = scan_row_schema_with_parsed_columns(
stats_schema_for_transform.clone(),
partition_schema_for_transform.clone(),
);
let data_skipping_filter = if skip_stats {
None
} else {
DataSkippingFilter::new(
engine,
physical_predicate.as_ref().map(|(p, _)| p.clone()),
stats_schema_for_transform.as_ref(),
column_expr_ref!("stats_parsed"),
partition_schema_for_transform.as_ref(),
column_expr_ref!("partitionValues_parsed"),
output_schema.clone(),
Some(metrics.clone()),
)
};
Ok(Self {
data_skipping_filter,
log_transform: engine.evaluation_handler().new_expression_evaluator(
checkpoint_read_schema.clone(),
get_add_transform_expr(
stats_schema_for_transform.clone(),
false,
skip_stats,
partition_schema_for_transform.clone(),
false,
),
output_schema.clone().into(),
)?,
checkpoint_transform: engine.evaluation_handler().new_expression_evaluator(
checkpoint_read_schema,
get_add_transform_expr(
stats_schema_for_transform,
has_stats_parsed,
skip_stats,
partition_schema_for_transform,
has_partition_values_parsed,
),
output_schema.into(),
)?,
seen_file_keys,
state_info,
skip_stats,
checkpoint_info,
metrics,
})
}
pub(crate) fn checkpoint_info(&self) -> &CheckpointReadInfo {
&self.checkpoint_info
}
pub(crate) fn get_metrics(&self) -> &ScanMetrics {
self.metrics.as_ref()
}
#[internal_api]
#[allow(unused)]
pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
let StateInfo {
logical_schema,
physical_schema,
physical_predicate,
transform_spec,
column_mapping_mode,
physical_stats_schema,
physical_partition_schema,
} = self.state_info.as_ref().clone();
let (predicate, predicate_schema) = match physical_predicate {
PhysicalPredicate::Some(pred, schema) => (Some(pred), Some(schema)),
_ => (None, None),
};
let internal_state = InternalScanState {
logical_schema,
physical_schema,
transform_spec,
predicate_schema,
column_mapping_mode,
physical_stats_schema,
skip_stats: self.skip_stats,
physical_partition_schema,
};
let internal_state_blob = serde_json::to_vec(&internal_state)
.map_err(|e| Error::generic(format!("Failed to serialize internal state: {e}")))?;
Ok(SerializableScanState {
predicate,
internal_state_blob,
seen_file_keys: self.seen_file_keys,
checkpoint_info: self.checkpoint_info,
})
}
#[internal_api]
#[allow(unused)]
pub(crate) fn from_serializable_state(
engine: &dyn Engine,
state: SerializableScanState,
) -> DeltaResult<Self> {
let internal_state: InternalScanState =
serde_json::from_slice(&state.internal_state_blob).map_err(Error::MalformedJson)?;
let physical_predicate = match state.predicate {
Some(predicate) => {
let Some(predicate_schema) = internal_state.predicate_schema else {
return Err(Error::generic(
"Invalid serialized internal state. Expected predicate schema.",
));
};
PhysicalPredicate::Some(predicate, predicate_schema)
}
None => PhysicalPredicate::None,
};
let state_info = Arc::new(StateInfo {
logical_schema: internal_state.logical_schema,
physical_schema: internal_state.physical_schema,
physical_predicate,
transform_spec: internal_state.transform_spec,
column_mapping_mode: internal_state.column_mapping_mode,
physical_stats_schema: internal_state.physical_stats_schema,
physical_partition_schema: internal_state.physical_partition_schema,
});
Self::new_with_seen_files(
engine,
state_info,
state.checkpoint_info,
state.seen_file_keys,
internal_state.skip_stats,
)
}
}
struct AddRemoveDedupVisitor<'a, D: Deduplicator> {
deduplicator: D,
selection_vector: Vec<bool>,
state_info: Arc<StateInfo>,
row_transform_exprs: Vec<Option<ExpressionRef>>,
metrics: &'a ScanMetrics,
}
impl<'a, D: Deduplicator> AddRemoveDedupVisitor<'a, D> {
fn new(
deduplicator: D,
selection_vector: Vec<bool>,
state_info: Arc<StateInfo>,
metrics: &'a ScanMetrics,
) -> AddRemoveDedupVisitor<'a, D> {
AddRemoveDedupVisitor {
deduplicator,
selection_vector,
state_info,
row_transform_exprs: Vec::new(),
metrics,
}
}
fn is_valid_add<'b>(&mut self, i: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<bool> {
let Some((file_key, is_add)) = self.deduplicator.extract_file_action(
i,
getters,
!self.deduplicator.is_log_batch(), )?
else {
self.metrics.incr_non_file_actions();
return Ok(false);
};
if is_add {
self.metrics.incr_add_files_seen()
} else {
self.metrics.incr_remove_files_seen()
};
let partition_values = match &self.state_info.transform_spec {
Some(transform) if is_add => {
let partition_values = getters[ScanLogReplayProcessor::ADD_PARTITION_VALUES_INDEX]
.get(i, "add.partitionValues")?;
parse_partition_values(
&self.state_info.logical_schema,
transform,
&partition_values,
self.state_info.column_mapping_mode,
)?
}
_ => Default::default(),
};
if self.deduplicator.check_and_record_seen(file_key) || !is_add {
return Ok(false);
}
let base_row_id: Option<i64> =
getters[ScanLogReplayProcessor::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
let transform = self
.state_info
.transform_spec
.as_ref()
.map(|transform| {
get_transform_expr(
transform,
partition_values,
&self.state_info.physical_schema,
base_row_id,
)
})
.transpose()?;
if transform.is_some() {
self.row_transform_exprs.resize_with(i, Default::default);
self.row_transform_exprs.push(transform);
}
self.metrics.incr_active_add_files();
Ok(true)
}
}
impl<D: Deduplicator> RowVisitor for AddRemoveDedupVisitor<'_, D> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
const LONG: DataType = DataType::LONG;
let ss_map: DataType = MapType::new(STRING, STRING, true).into();
let types_and_names = vec![
(STRING, column_name!("add.path")),
(ss_map, column_name!("add.partitionValues")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
(LONG, column_name!("add.baseRowId")),
(STRING, column_name!("remove.path")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("remove.deletionVector.offset")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
let (names, types) = NAMES_AND_TYPES.as_ref();
if self.deduplicator.is_log_batch() {
(names, types)
} else {
(&names[..6], &types[..6])
}
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let start = std::time::Instant::now();
let is_log_batch = self.deduplicator.is_log_batch();
let expected_getters = if is_log_batch { 10 } else { 6 };
require!(
getters.len() == expected_getters,
Error::InternalError(format!(
"Wrong number of AddRemoveDedupVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if self.selection_vector[i] {
self.selection_vector[i] = self.is_valid_add(i, getters)?;
}
}
self.metrics
.add_dedup_visitor_time_ns(start.elapsed().as_nanos() as u64);
Ok(())
}
}
pub(crate) static FILE_CONSTANT_VALUES_NAME: &str = "fileConstantValues";
pub(crate) static BASE_ROW_ID_NAME: &str = "baseRowId";
pub(crate) static DEFAULT_ROW_COMMIT_VERSION_NAME: &str = "defaultRowCommitVersion";
pub(crate) static CLUSTERING_PROVIDER_NAME: &str = "clusteringProvider";
pub(crate) static TAGS_NAME: &str = "tags";
pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values = StructType::new_unchecked([
StructField::nullable("partitionValues", partition_values),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
StructField::nullable(
"tags",
MapType::new(
DataType::STRING,
DataType::STRING,
true,
),
),
StructField::nullable(CLUSTERING_PROVIDER_NAME, DataType::STRING),
]);
Arc::new(StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable(FILE_CONSTANT_VALUES_NAME, file_constant_values),
]))
});
fn scan_row_schema_with_parsed_columns(
stats_schema: Option<SchemaRef>,
partition_schema: Option<SchemaRef>,
) -> SchemaRef {
let needs_extra = stats_schema.is_some() || partition_schema.is_some();
if !needs_extra {
return SCAN_ROW_SCHEMA.clone();
}
let mut fields: Vec<StructField> = SCAN_ROW_SCHEMA.fields().cloned().collect();
if let Some(schema) = stats_schema {
fields.push(StructField::nullable(
"stats_parsed",
schema.as_ref().clone(),
));
}
if let Some(schema) = partition_schema {
fields.push(StructField::nullable(
"partitionValues_parsed",
schema.as_ref().clone(),
));
}
Arc::new(StructType::new_unchecked(fields))
}
fn get_add_transform_expr(
physical_stats_schema: Option<SchemaRef>,
has_stats_parsed: bool,
skip_stats: bool,
partition_schema: Option<SchemaRef>,
has_partition_values_parsed: bool,
) -> ExpressionRef {
let stats_expr = if skip_stats {
Arc::new(Expression::Literal(Scalar::Null(DataType::STRING)))
} else {
column_expr_ref!("add.stats")
};
let mut fields = vec![
column_expr_ref!("add.path"),
column_expr_ref!("add.size"),
column_expr_ref!("add.modificationTime"),
stats_expr,
column_expr_ref!("add.deletionVector"),
Arc::new(Expression::struct_from([
column_expr_ref!("add.partitionValues"),
column_expr_ref!("add.baseRowId"),
column_expr_ref!("add.defaultRowCommitVersion"),
column_expr_ref!("add.tags"),
column_expr_ref!("add.clusteringProvider"),
])),
];
if let Some(stats_schema) = physical_stats_schema {
let stats_parsed_expr = if has_stats_parsed {
column_expr!("add.stats_parsed")
} else {
Expression::parse_json(column_expr!("add.stats"), stats_schema)
};
fields.push(Arc::new(stats_parsed_expr));
}
if partition_schema.is_some() {
let pv_parsed_expr = if has_partition_values_parsed {
column_expr!("add.partitionValues_parsed")
} else {
Expression::map_to_struct(column_expr!("add.partitionValues"))
};
fields.push(Arc::new(pv_parsed_expr));
}
Arc::new(Expression::struct_from(fields))
}
#[allow(unused)]
pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
static EXPR: LazyLock<ExpressionRef> = LazyLock::new(|| {
Arc::new(Expression::struct_from([Arc::new(
Expression::struct_from([
column_expr_ref!("path"),
column_expr_ref!("fileConstantValues.partitionValues"),
column_expr_ref!("size"),
column_expr_ref!("modificationTime"),
column_expr_ref!("stats"),
column_expr_ref!("fileConstantValues.tags"),
column_expr_ref!("deletionVector"),
column_expr_ref!("fileConstantValues.baseRowId"),
column_expr_ref!("fileConstantValues.defaultRowCommitVersion"),
column_expr_ref!("fileConstantValues.clusteringProvider"),
]),
)]))
});
EXPR.clone()
}
impl ParallelLogReplayProcessor for ScanLogReplayProcessor {
type Output = <ScanLogReplayProcessor as LogReplayProcessor>::Output;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
let ActionsBatch {
actions,
is_log_batch,
} = actions_batch;
require!(
!is_log_batch,
Error::generic("Parallel checkpoint processor may only be applied to checkpoint files")
);
let transformed = self.checkpoint_transform.evaluate(actions.as_ref())?;
debug_assert_eq!(transformed.len(), actions.len());
require!(
transformed.len() == actions.len(),
Error::internal_error(format!(
"checkpoint transform output length {} != actions length {}",
transformed.len(),
actions.len()
))
);
let selection_vector = self.build_selection_vector(transformed.as_ref())?;
debug_assert_eq!(selection_vector.len(), actions.len());
require!(
selection_vector.len() == actions.len(),
Error::internal_error(format!(
"selection vector length {} != actions length {}",
selection_vector.len(),
actions.len()
))
);
let deduplicator = CheckpointDeduplicator::try_new(
&self.seen_file_keys,
Self::ADD_PATH_INDEX,
Self::ADD_DV_START_INDEX,
)?;
let mut visitor = AddRemoveDedupVisitor::new(
deduplicator,
selection_vector,
self.state_info.clone(),
&self.metrics,
);
visitor.visit_rows_of(actions.as_ref())?;
let scan_metadata = ScanMetadata::try_new(
transformed,
visitor.selection_vector,
visitor.row_transform_exprs,
)?;
self.metrics
.update_peak_hash_set_size(self.seen_file_keys.len());
Ok(scan_metadata)
}
}
impl LogReplayProcessor for ScanLogReplayProcessor {
type Output = ScanMetadata;
fn process_actions_batch(&mut self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
let ActionsBatch {
actions,
is_log_batch,
} = actions_batch;
let transform = if is_log_batch {
&self.log_transform
} else {
&self.checkpoint_transform
};
let transformed = transform.evaluate(actions.as_ref())?;
require!(
transformed.len() == actions.len(),
Error::internal_error(format!(
"transform output length {} != actions length {}",
transformed.len(),
actions.len()
))
);
let selection_vector = self.build_selection_vector(transformed.as_ref())?;
debug_assert_eq!(selection_vector.len(), actions.len());
require!(
selection_vector.len() == actions.len(),
Error::internal_error(format!(
"selection vector length {} != actions length {}",
selection_vector.len(),
actions.len()
))
);
let deduplicator = FileActionDeduplicator::new(
&mut self.seen_file_keys,
is_log_batch,
Self::ADD_PATH_INDEX,
Self::REMOVE_PATH_INDEX,
Self::ADD_DV_START_INDEX,
Self::REMOVE_DV_START_INDEX,
);
let mut visitor = AddRemoveDedupVisitor::new(
deduplicator,
selection_vector,
self.state_info.clone(),
&self.metrics,
);
visitor.visit_rows_of(actions.as_ref())?;
let scan_metadata = ScanMetadata::try_new(
transformed,
visitor.selection_vector,
visitor.row_transform_exprs,
)?;
self.metrics
.update_peak_hash_set_size(self.seen_file_keys.len());
Ok(scan_metadata)
}
fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> {
self.data_skipping_filter.as_ref()
}
}
pub(crate) fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<ActionsBatch>>,
state_info: Arc<StateInfo>,
checkpoint_info: CheckpointReadInfo,
skip_stats: bool,
) -> DeltaResult<(
impl Iterator<Item = DeltaResult<ScanMetadata>>,
Arc<ScanMetrics>,
)> {
let processor = ScanLogReplayProcessor::new(engine, state_info, checkpoint_info, skip_stats)?;
let metrics = processor.metrics.clone();
Ok((processor.process_actions_iter(action_iter), metrics))
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use rstest::rstest;
use crate::actions::get_commit_schema;
use crate::engine::sync::SyncEngine;
use crate::expressions::{
BinaryExpressionOp, Expression, OpaquePredicateOp, Predicate, Scalar,
ScalarExpressionEvaluator,
};
use crate::kernel_predicates::{
DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator,
IndirectDataSkippingPredicateEvaluator,
};
use crate::log_replay::ActionsBatch;
use crate::log_segment::CheckpointReadInfo;
use crate::scan::state::ScanFile;
use crate::scan::state_info::tests::{
assert_transform_spec, get_simple_state_info, get_state_info, ROW_TRACKING_FEATURES,
};
use crate::scan::state_info::StateInfo;
use crate::scan::test_utils::{
add_batch_for_row_id, add_batch_simple, add_batch_with_partition_col,
add_batch_with_remove, add_batch_with_remove_and_partition, run_with_validate_callback,
};
use crate::scan::PhysicalPredicate;
use crate::schema::MetadataColumnSpec;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::utils::test_utils::assert_result_error_with_message;
use crate::DeltaResult;
use crate::Expression as Expr;
use crate::ExpressionRef;
use super::{
scan_action_iter, InternalScanState, ScanLogReplayProcessor, SerializableScanState,
};
fn test_checkpoint_info() -> CheckpointReadInfo {
CheckpointReadInfo::without_stats_parsed()
}
#[derive(Debug, PartialEq)]
struct OpaqueTestOp(String);
impl OpaquePredicateOp for OpaqueTestOp {
fn name(&self) -> &str {
&self.0
}
fn eval_pred_scalar(
&self,
_eval_expr: &ScalarExpressionEvaluator<'_>,
_evaluator: &DirectPredicateEvaluator<'_>,
_exprs: &[Expr],
_inverted: bool,
) -> DeltaResult<Option<bool>> {
unimplemented!()
}
fn eval_as_data_skipping_predicate(
&self,
_predicate_evaluator: &DirectDataSkippingPredicateEvaluator<'_>,
_exprs: &[Expr],
_inverted: bool,
) -> Option<bool> {
unimplemented!()
}
fn as_data_skipping_predicate(
&self,
_predicate_evaluator: &IndirectDataSkippingPredicateEvaluator<'_>,
_exprs: &[Expr],
_inverted: bool,
) -> Option<Predicate> {
unimplemented!()
}
}
fn validate_simple(_: &mut (), scan_file: ScanFile) {
assert_eq!(
scan_file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(scan_file.size, 635);
assert!(scan_file.stats.is_some());
assert_eq!(scan_file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
scan_file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(scan_file.partition_values.get("non-existent"), None);
}
#[test]
fn test_scan_action_iter() {
run_with_validate_callback(
vec![add_batch_simple(get_commit_schema().clone())],
None, None, &[true, false],
(),
validate_simple,
);
}
#[test]
fn test_scan_action_iter_with_remove() {
run_with_validate_callback(
vec![add_batch_with_remove(get_commit_schema().clone())],
None, None, &[false, false, true, false],
(),
validate_simple,
);
}
#[test]
fn test_no_transforms() {
let batch = vec![add_batch_simple(get_commit_schema().clone())];
let logical_schema = Arc::new(StructType::new_unchecked(vec![]));
let state_info = Arc::new(StateInfo {
logical_schema: logical_schema.clone(),
physical_schema: logical_schema.clone(),
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
physical_stats_schema: None,
physical_partition_schema: None,
});
let (iter, _metrics) = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
state_info,
test_checkpoint_info(),
false,
)
.unwrap();
for res in iter {
let scan_metadata = res.unwrap();
assert!(
scan_metadata.scan_file_transforms.is_empty(),
"Should have no transforms"
);
}
}
#[test]
fn test_simple_transform() {
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let partition_cols = vec!["date".to_string()];
let state_info = get_simple_state_info(schema, partition_cols).unwrap();
let batch = vec![add_batch_with_partition_col()];
let (iter, _metrics) = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
test_checkpoint_info(),
false,
)
.unwrap();
fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
assert!(transform.is_some());
let Expr::Transform(transform) = transform.unwrap().as_ref() else {
panic!("Transform should always be a Transform expr");
};
assert!(transform.prepended_fields.is_empty());
let mut field_transforms = transform.field_transforms.iter();
let (field_name, field_transform) = field_transforms.next().unwrap();
assert_eq!(field_name, "value");
assert!(!field_transform.is_replace);
let [expr] = &field_transform.exprs[..] else {
panic!("Expected a single insertion");
};
let Expr::Literal(Scalar::Date(date_offset)) = expr.as_ref() else {
panic!("Expected a literal date");
};
assert_eq!(*date_offset, expected_date_offset);
assert!(field_transforms.next().is_none());
}
for res in iter {
let scan_metadata = res.unwrap();
let transforms = scan_metadata.scan_file_transforms;
assert_eq!(transforms.len(), 4, "Should have 4 transforms");
assert!(transforms[0].is_none(), "transform at [0] should be None");
assert!(transforms[2].is_none(), "transform at [2] should be None");
validate_transform(transforms[1].as_ref(), 17511);
validate_transform(transforms[3].as_ref(), 17510);
}
}
#[test]
fn test_row_id_transform() {
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"value",
DataType::INTEGER,
true,
)]));
let state_info = get_state_info(
schema.clone(),
vec![],
None,
ROW_TRACKING_FEATURES,
[
("delta.enableRowTracking", "true"),
(
"delta.rowTracking.materializedRowIdColumnName",
"row_id_col",
),
(
"delta.rowTracking.materializedRowCommitVersionColumnName",
"row_commit_version_col",
),
]
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
vec![("row_id", MetadataColumnSpec::RowId)],
)
.unwrap();
let transform_spec = state_info.transform_spec.as_ref().unwrap();
assert_transform_spec(
transform_spec,
false,
"row_id_col",
"row_indexes_for_row_id_0",
);
let batch = vec![add_batch_for_row_id(get_commit_schema().clone())];
let (iter, _metrics) = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
test_checkpoint_info(),
false,
)
.unwrap();
for res in iter {
let scan_metadata = res.unwrap();
let transforms = scan_metadata.scan_file_transforms;
assert_eq!(transforms.len(), 1, "Should have 1 transform");
if let Some(Expr::Transform(transform_expr)) = transforms[0].as_ref().map(Arc::as_ref) {
assert!(transform_expr.input_path.is_none());
let row_id_transform = transform_expr
.field_transforms
.get("row_id_col")
.expect("Should have row_id_col transform");
assert!(row_id_transform.is_replace);
assert_eq!(row_id_transform.exprs.len(), 1);
let expr = &row_id_transform.exprs[0];
let expeceted_expr = Arc::new(Expr::coalesce([
Expr::column(["row_id_col"]),
Expr::binary(
BinaryExpressionOp::Plus,
Expr::literal(42i64),
Expr::column(["row_indexes_for_row_id_0"]),
),
]));
assert_eq!(expr, &expeceted_expr);
} else {
panic!("Should have been a transform expression");
}
}
}
#[test]
fn test_serialization_basic_state_and_dv_dropping() {
let engine = SyncEngine::new();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::new("id", DataType::INTEGER, true),
StructField::new("value", DataType::STRING, true),
]));
let checkpoint_info = test_checkpoint_info();
let mut processor = ScanLogReplayProcessor::new(
&engine,
Arc::new(get_simple_state_info(schema.clone(), vec![]).unwrap()),
checkpoint_info.clone(),
false,
)
.unwrap();
let key1 = crate::log_replay::FileActionKey::new("file1.parquet", None);
let key2 = crate::log_replay::FileActionKey::new("file2.parquet", Some("dv-1".to_string()));
let key3 = crate::log_replay::FileActionKey::new("file3.parquet", Some("dv-2".to_string()));
processor.seen_file_keys.insert(key1.clone());
processor.seen_file_keys.insert(key2.clone());
processor.seen_file_keys.insert(key3.clone());
let state_info = processor.state_info.clone();
let deserialized = ScanLogReplayProcessor::from_serializable_state(
&engine,
processor.into_serializable_state().unwrap(),
)
.unwrap();
assert_eq!(
deserialized.state_info.logical_schema,
state_info.logical_schema
);
assert_eq!(
deserialized.state_info.physical_schema,
state_info.physical_schema
);
assert_eq!(
deserialized.state_info.column_mapping_mode,
state_info.column_mapping_mode
);
assert_eq!(deserialized.seen_file_keys.len(), 3);
assert!(deserialized.seen_file_keys.contains(&key1));
assert!(deserialized.seen_file_keys.contains(&key2));
assert!(deserialized.seen_file_keys.contains(&key3));
}
#[test]
fn test_serialization_with_predicate() {
let engine = SyncEngine::new();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::new("id", DataType::INTEGER, true),
StructField::new("value", DataType::STRING, true),
]));
let predicate = Arc::new(crate::expressions::Predicate::eq(
Expr::column(["id"]),
Expr::literal(10i32),
));
let state_info = Arc::new(
get_state_info(
schema.clone(),
vec![],
Some(predicate.clone()),
&[], HashMap::new(),
vec![],
)
.unwrap(),
);
let original_pred_schema = match &state_info.physical_predicate {
PhysicalPredicate::Some(_, s) => s.clone(),
_ => panic!("Expected predicate"),
};
let checkpoint_info = test_checkpoint_info();
let processor = ScanLogReplayProcessor::new(
&engine,
state_info.clone(),
checkpoint_info.clone(),
false,
)
.unwrap();
let deserialized = ScanLogReplayProcessor::from_serializable_state(
&engine,
processor.into_serializable_state().unwrap(),
)
.unwrap();
match &deserialized.state_info.physical_predicate {
PhysicalPredicate::Some(pred, pred_schema) => {
assert_eq!(pred.as_ref(), predicate.as_ref());
assert_eq!(pred_schema.as_ref(), original_pred_schema.as_ref());
}
_ => panic!("Expected PhysicalPredicate::Some"),
}
}
#[test]
fn test_serialization_with_transforms() {
let engine = SyncEngine::new();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let state_info = Arc::new(
get_state_info(
schema,
vec!["date".to_string()],
None,
ROW_TRACKING_FEATURES,
[
("delta.enableRowTracking", "true"),
(
"delta.rowTracking.materializedRowIdColumnName",
"row_id_col",
),
(
"delta.rowTracking.materializedRowCommitVersionColumnName",
"row_commit_version_col",
),
]
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
vec![("row_id", MetadataColumnSpec::RowId)],
)
.unwrap(),
);
let original_transform = state_info.transform_spec.clone();
assert!(original_transform.is_some());
let checkpoint_info = test_checkpoint_info();
let processor = ScanLogReplayProcessor::new(
&engine,
state_info.clone(),
checkpoint_info.clone(),
false,
)
.unwrap();
let deserialized = ScanLogReplayProcessor::from_serializable_state(
&engine,
processor.into_serializable_state().unwrap(),
)
.unwrap();
assert_eq!(deserialized.state_info.transform_spec, original_transform);
}
#[test]
fn test_serialization_column_mapping_modes() {
let engine = SyncEngine::new();
for mode in [
ColumnMappingMode::None,
ColumnMappingMode::Id,
ColumnMappingMode::Name,
] {
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"id",
DataType::INTEGER,
true,
)]));
let state_info = Arc::new(StateInfo {
logical_schema: schema.clone(),
physical_schema: schema,
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: mode,
physical_stats_schema: None,
physical_partition_schema: None,
});
let checkpoint_info = test_checkpoint_info();
let processor =
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone(), false)
.unwrap();
let deserialized = ScanLogReplayProcessor::from_serializable_state(
&engine,
processor.into_serializable_state().unwrap(),
)
.unwrap();
assert_eq!(deserialized.state_info.column_mapping_mode, mode);
}
}
#[test]
fn test_serialization_edge_cases() {
let engine = SyncEngine::new();
let checkpoint_info = test_checkpoint_info();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"id",
DataType::INTEGER,
true,
)]));
let state_info = Arc::new(StateInfo {
logical_schema: schema.clone(),
physical_schema: schema,
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
physical_stats_schema: None,
physical_partition_schema: None,
});
let processor =
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone(), false)
.unwrap();
let serialized = processor.into_serializable_state().unwrap();
assert!(serialized.predicate.is_none());
let deserialized =
ScanLogReplayProcessor::from_serializable_state(&engine, serialized).unwrap();
assert_eq!(deserialized.seen_file_keys.len(), 0);
assert!(deserialized.state_info.transform_spec.is_none());
}
#[test]
fn test_serialization_invalid_json() {
let engine = SyncEngine::new();
let checkpoint_info = test_checkpoint_info();
let invalid_state = SerializableScanState {
predicate: None,
internal_state_blob: vec![0, 1, 2, 3, 255], seen_file_keys: HashSet::new(),
checkpoint_info,
};
assert!(ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state).is_err());
}
#[test]
fn test_serialization_missing_predicate_schema() {
let engine = SyncEngine::new();
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"id",
DataType::INTEGER,
true,
)]));
let checkpoint_info = test_checkpoint_info();
let invalid_internal_state = InternalScanState {
logical_schema: schema.clone(),
physical_schema: schema,
predicate_schema: None, transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
physical_stats_schema: None,
skip_stats: false,
physical_partition_schema: None,
};
let predicate = Arc::new(crate::expressions::Predicate::column(["id"]));
let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap();
let invalid_state = SerializableScanState {
predicate: Some(predicate), internal_state_blob: invalid_blob,
seen_file_keys: HashSet::new(),
checkpoint_info,
};
let result = ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state);
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("predicate schema"));
}
}
#[test]
fn deserialize_internal_state_with_extry_fields_fails() {
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"id",
DataType::INTEGER,
true,
)]));
let invalid_internal_state = InternalScanState {
logical_schema: schema.clone(),
physical_schema: schema,
predicate_schema: None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
physical_stats_schema: None,
skip_stats: false,
physical_partition_schema: None,
};
let blob = serde_json::to_string(&invalid_internal_state).unwrap();
let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap();
obj["new_field"] = serde_json::json!("my_new_value");
let invalid_blob = obj.to_string();
let res: Result<InternalScanState, _> = serde_json::from_str(&invalid_blob);
assert_result_error_with_message(res, "unknown field");
}
#[test]
fn deserialize_serializable_scan_state_with_extra_fields_fails() {
let state = SerializableScanState {
predicate: None,
internal_state_blob: vec![],
seen_file_keys: HashSet::new(),
checkpoint_info: test_checkpoint_info(),
};
let blob = serde_json::to_string(&state).unwrap();
let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap();
obj["new_field"] = serde_json::json!("my_new_value");
let invalid_blob = obj.to_string();
let res: Result<SerializableScanState, _> = serde_json::from_str(&invalid_blob);
assert_result_error_with_message(res, "unknown field");
}
#[test]
fn serializng_scan_state_with_opaque_predicate_fails() {
let opaque_predicate = Arc::new(Predicate::opaque(OpaqueTestOp("test_op".to_string()), []));
let state = SerializableScanState {
predicate: Some(opaque_predicate),
internal_state_blob: vec![],
seen_file_keys: HashSet::new(),
checkpoint_info: test_checkpoint_info(),
};
let result = serde_json::to_string(&state);
assert_result_error_with_message(result, "Cannot serialize an Opaque Predicate");
}
#[test]
fn test_scan_action_iter_with_skip_stats() {
let batch = vec![add_batch_simple(get_commit_schema().clone())];
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let state_info = get_simple_state_info(schema, vec!["date".to_string()]).unwrap();
let (iter, _metrics) = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
test_checkpoint_info(),
true,
)
.unwrap();
let mut found_add = false;
for res in iter {
let scan_metadata = res.unwrap();
scan_metadata
.visit_scan_files((), |_: &mut (), scan_file: ScanFile| {
assert!(scan_file.stats.is_none());
})
.unwrap();
found_add = true;
}
assert!(found_add);
}
#[rstest]
#[case::stats_only(
Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
])),
vec![],
Arc::new(Expression::column(["value"]).gt(Expression::literal(5i32))),
false, // use batch without partition column
)]
#[case::partition_predicate(
Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
])),
vec!["date".to_string()],
Arc::new(Expression::column(["date"]).eq(Expression::literal(Scalar::Date(17_510)))),
true, // use batch with partition column
)]
#[case::mixed_stats_and_partition(
Arc::new(StructType::new_unchecked([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
])),
vec!["date".to_string()],
Arc::new(Predicate::and(
Expression::column(["value"]).gt(Expression::literal(5i32)),
Expression::column(["date"]).eq(Expression::literal(Scalar::Date(17_510))),
)),
true, // use batch with partition column
)]
fn data_skipping_does_not_prune_remove_actions(
#[case] schema: SchemaRef,
#[case] partition_columns: Vec<String>,
#[case] predicate: Arc<Predicate>,
#[case] with_partition: bool,
) {
let state_info = get_state_info(
schema,
partition_columns,
Some(predicate),
&[],
HashMap::new(),
vec![],
)
.unwrap();
let batch = if with_partition {
vec![add_batch_with_remove_and_partition(
get_commit_schema().clone(),
)]
} else {
vec![add_batch_with_remove(get_commit_schema().clone())]
};
let (iter, _metrics) = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
test_checkpoint_info(),
false,
)
.unwrap();
let mut add_paths: Vec<String> = Vec::new();
for res in iter {
let scan_metadata = res.unwrap();
let paths = scan_metadata
.visit_scan_files(
Vec::new(),
|paths: &mut Vec<String>, scan_file: ScanFile| {
paths.push(scan_file.path.to_string());
},
)
.unwrap();
add_paths.extend(paths);
}
assert_eq!(add_paths.len(), 1, "Expected exactly one add to survive");
assert!(
add_paths[0].contains("c000"),
"Expected c000 add to survive, got: {}",
add_paths[0]
);
}
}