use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, LazyLock, OnceLock};
use delta_kernel_derive::internal_api;
use tracing::instrument;
use super::Transaction;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{get_log_add_schema, NUM_RECORDS};
use crate::committer::Committer;
use crate::engine_data::{
FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, TypedGetData,
};
use crate::error::Error;
use crate::expressions::{column_name, ArrayData, ColumnName, Scalar, StructData, Transform};
use crate::scan::data_skipping::stats_schema::schema_with_all_fields_nullable;
use crate::scan::log_replay::get_scan_metadata_transform_expr;
use crate::scan::{restored_add_schema, scan_row_schema};
use crate::schema::{ArrayType, SchemaRef, StructField, StructType, ToSchema};
use crate::snapshot::SnapshotRef;
use crate::table_features::{Operation, TableFeature};
use crate::utils::current_time_ms;
use crate::{DataType, DeltaResult, Engine, Expression};
impl Transaction {
pub(crate) fn try_new_existing_table(
snapshot: impl Into<SnapshotRef>,
committer: Box<dyn Committer>,
engine: &dyn Engine,
) -> DeltaResult<Self> {
let read_snapshot = snapshot.into();
read_snapshot
.table_configuration()
.ensure_operation_supported(Operation::Write)?;
let clustering_columns = read_snapshot.get_physical_clustering_columns(engine)?;
let commit_timestamp = current_time_ms()?;
let span = tracing::info_span!(
"txn",
path = %read_snapshot.table_root(),
read_version = read_snapshot.version(),
);
let effective_table_config = read_snapshot.table_configuration().clone();
Ok(Transaction {
span,
read_snapshot_opt: Some(read_snapshot),
effective_table_config,
should_emit_protocol: false,
should_emit_metadata: false,
committer,
operation: None,
engine_info: None,
add_files_metadata: vec![],
remove_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
user_domain_metadata_additions: vec![],
system_domain_metadata_additions: vec![],
user_domain_removals: vec![],
data_change: true,
engine_commit_info: None,
is_blind_append: false,
dv_matched_files: vec![],
physical_clustering_columns: clustering_columns,
shared_write_state: OnceLock::new(),
_state: PhantomData,
})
}
pub fn with_blind_append(mut self) -> Self {
self.is_blind_append = true;
self
}
pub fn with_operation(mut self, operation: String) -> Self {
self.operation = Some(operation);
self
}
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
self.user_domain_removals.push(domain);
self
}
pub fn remove_files(&mut self, remove_metadata: FilteredEngineData) {
self.remove_files_metadata.push(remove_metadata);
}
pub fn scan_metadata_to_engine_data(
scan_metadata: impl Iterator<Item = DeltaResult<crate::scan::ScanMetadata>>,
) -> impl Iterator<Item = DeltaResult<FilteredEngineData>> {
scan_metadata.map(|result| result.map(|metadata| metadata.scan_files))
}
#[internal_api]
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
#[instrument(
name = "txn.update_dvs",
skip_all,
fields(num_dv_updates = new_dv_descriptors.len()),
err
)]
pub(crate) fn update_deletion_vectors(
&mut self,
new_dv_descriptors: HashMap<String, DeletionVectorDescriptor>,
existing_data_files: impl Iterator<Item = DeltaResult<FilteredEngineData>>,
) -> DeltaResult<()> {
if self.is_create_table() {
return Err(Error::generic(
"Deletion vector operations require an existing table",
));
}
self.ensure_deletion_vectors_enabled()?;
let mut matched_dv_files = 0;
let mut matched_files = Vec::new();
let mut visitor = DvMatchVisitor::new(&new_dv_descriptors);
for scan_file_result in existing_data_files {
let scan_file = scan_file_result?;
visitor.new_dv_entries.clear();
visitor.matched_file_indexes.clear();
visitor.visit_rows_of(&scan_file)?;
let (data, mut selection_vector) = scan_file.into_parts();
let mut current_matched_index = 0;
for (i, selected) in selection_vector.iter_mut().enumerate() {
if current_matched_index < visitor.matched_file_indexes.len() {
if visitor.matched_file_indexes[current_matched_index] != i {
*selected = false;
} else {
current_matched_index += 1;
matched_dv_files += 1;
}
} else {
*selected = false;
}
}
let new_columns = vec![ArrayData::try_new(
struct_deletion_vector_schema().clone(),
visitor.new_dv_entries.clone(),
)?];
matched_files.push(FilteredEngineData::try_new(
data.append_columns(new_dv_column_schema().clone(), new_columns)?,
selection_vector,
)?);
}
if matched_dv_files != new_dv_descriptors.len() {
return Err(Error::generic(format!(
"Number of matched DV files does not match number of new DV descriptors: {} != {}",
matched_dv_files,
new_dv_descriptors.len()
)));
}
self.dv_matched_files.extend(matched_files);
Ok(())
}
fn ensure_deletion_vectors_enabled(&self) -> DeltaResult<()> {
if !self
.effective_table_config
.is_feature_enabled(&TableFeature::DeletionVectors)
{
return Err(Error::unsupported(
"Deletion vector writes require reader version 3, writer version 7, the \
'deletionVectors' feature in both reader and writer features, and the \
`delta.enableDeletionVectors` table property set to `true`",
));
}
Ok(())
}
}
static NEW_DELETION_VECTOR_NAME: &str = "newDeletionVector";
static INTERMEDIATE_DV_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(
scan_row_schema()
.fields()
.cloned()
.chain([StructField::nullable(
NEW_DELETION_VECTOR_NAME.to_string(),
DeletionVectorDescriptor::to_schema(),
)]),
))
});
fn intermediate_dv_schema() -> &'static SchemaRef {
&INTERMEDIATE_DV_SCHEMA
}
static NULLABLE_SCAN_ROWS_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| schema_with_all_fields_nullable(scan_row_schema().as_ref()).into());
fn nullable_scan_rows_schema() -> &'static SchemaRef {
&NULLABLE_SCAN_ROWS_SCHEMA
}
static NULLABLE_RESTORED_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| schema_with_all_fields_nullable(restored_add_schema()).into());
fn nullable_restored_add_schema() -> &'static SchemaRef {
&NULLABLE_RESTORED_ADD_SCHEMA
}
static NULLABLE_ADD_LOG_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| schema_with_all_fields_nullable(get_log_add_schema()).into());
fn nullable_add_log_schema() -> &'static SchemaRef {
&NULLABLE_ADD_LOG_SCHEMA
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
static STRUCT_DELETION_VECTOR_SCHEMA: LazyLock<ArrayType> =
LazyLock::new(|| ArrayType::new(DeletionVectorDescriptor::to_schema().into(), true));
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn struct_deletion_vector_schema() -> &'static ArrayType {
&STRUCT_DELETION_VECTOR_SCHEMA
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
static NEW_DV_COLUMN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(vec![StructField::nullable(
NEW_DELETION_VECTOR_NAME,
DeletionVectorDescriptor::to_schema(),
)]))
});
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn new_dv_column_schema() -> &'static SchemaRef {
&NEW_DV_COLUMN_SCHEMA
}
impl<S> Transaction<S> {
pub(super) fn generate_dv_update_actions<'a>(
&'a self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
if self.is_create_table() && !self.dv_matched_files.is_empty() {
return Err(crate::error::Error::internal_error(
"CREATE TABLE transaction cannot have DV update actions",
));
}
static COLUMNS_TO_DROP: &[&str] = &[NEW_DELETION_VECTOR_NAME];
let remove_actions =
self.generate_remove_actions(engine, self.dv_matched_files.iter(), COLUMNS_TO_DROP)?;
let add_actions = self.generate_adds_for_dv_update(engine, self.dv_matched_files.iter())?;
Ok(remove_actions.chain(add_actions))
}
fn generate_adds_for_dv_update<'a>(
&'a self,
engine: &'a dyn Engine,
file_metadata_batch: impl Iterator<Item = &'a FilteredEngineData> + Send + 'a,
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
let evaluation_handler = engine.evaluation_handler();
let with_new_dv_transform = Expression::transform(
Transform::new_top_level()
.with_replaced_field(
"deletionVector",
Expression::column([NEW_DELETION_VECTOR_NAME]).into(),
)
.with_dropped_field(NEW_DELETION_VECTOR_NAME),
);
let with_new_dv_eval = evaluation_handler.new_expression_evaluator(
intermediate_dv_schema().clone(),
Arc::new(with_new_dv_transform),
nullable_scan_rows_schema().clone().into(),
)?;
let restored_add_eval = evaluation_handler.new_expression_evaluator(
nullable_scan_rows_schema().clone(),
get_scan_metadata_transform_expr(),
nullable_restored_add_schema().clone().into(),
)?;
let with_data_change_transform =
Arc::new(Expression::struct_from([Expression::transform(
Transform::new_nested(["add"]).with_inserted_field(
Some("modificationTime"),
Expression::literal(self.data_change).into(),
),
)]));
let with_data_change_eval = evaluation_handler.new_expression_evaluator(
nullable_restored_add_schema().clone(),
with_data_change_transform,
nullable_add_log_schema().clone().into(),
)?;
Ok(file_metadata_batch.map(
move |file_metadata_batch| -> DeltaResult<FilteredEngineData> {
let with_new_dv_data = with_new_dv_eval.evaluate(file_metadata_batch.data())?;
let as_partial_add_data = restored_add_eval.evaluate(with_new_dv_data.as_ref())?;
let with_data_change_data =
with_data_change_eval.evaluate(as_partial_add_data.as_ref())?;
FilteredEngineData::try_new(
with_data_change_data,
file_metadata_batch.selection_vector().to_vec(),
)
},
))
}
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
struct DvMatchVisitor<'a> {
dv_updates: &'a HashMap<String, DeletionVectorDescriptor>,
new_dv_entries: Vec<Scalar>,
matched_file_indexes: Vec<usize>,
}
impl<'a> DvMatchVisitor<'a> {
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
const PATH_INDEX: usize = 0;
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
const STATS_INDEX: usize = 1;
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn new(dv_updates: &'a HashMap<String, DeletionVectorDescriptor>) -> Self {
Self {
dv_updates,
new_dv_entries: Vec::new(),
matched_file_indexes: Vec::new(),
}
}
}
impl FilteredRowVisitor for DvMatchVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<(Vec<ColumnName>, Vec<DataType>)> = LazyLock::new(|| {
let names = vec![column_name!("path"), column_name!("stats")];
let types = vec![DataType::STRING, DataType::STRING];
(names, types)
});
(&NAMES_AND_TYPES.0, &NAMES_AND_TYPES.1)
}
fn visit_filtered<'a>(
&mut self,
getters: &[&'a dyn GetData<'a>],
rows: RowIndexIterator<'_>,
) -> DeltaResult<()> {
static NULL_DV: LazyLock<Scalar> =
LazyLock::new(|| Scalar::Null(DataType::from(DeletionVectorDescriptor::to_schema())));
static DV_SCHEMA_FIELDS: LazyLock<Vec<StructField>> = LazyLock::new(|| {
DeletionVectorDescriptor::to_schema()
.into_fields()
.collect()
});
let num_rows = rows.num_rows();
self.new_dv_entries.reserve(num_rows);
for row_index in rows {
self.new_dv_entries
.resize_with(row_index, || NULL_DV.clone());
let path_opt: Option<String> = getters[Self::PATH_INDEX].get_opt(row_index, "path")?;
let Some(path) = path_opt else {
self.new_dv_entries.push(NULL_DV.clone());
continue;
};
if let Some(dv_result) = self.dv_updates.get(&path) {
let stats: Option<String> =
getters[Self::STATS_INDEX].get_opt(row_index, "stats")?;
let stats = stats.ok_or_else(|| {
Error::generic(format!(
"update_deletion_vectors: file {path} has no stats; \
deletion vectors require an accurate {NUM_RECORDS}"
))
})?;
let parsed: serde_json::Value = serde_json::from_str(&stats).map_err(|e| {
Error::generic(format!(
"update_deletion_vectors: stats for {path} is not valid JSON: {e}"
))
})?;
if parsed
.get(NUM_RECORDS)
.and_then(serde_json::Value::as_u64)
.is_none()
{
return Err(Error::generic(format!(
"update_deletion_vectors: stats for {path} is missing {NUM_RECORDS} \
or it is not a non-negative integer"
)));
}
self.new_dv_entries.push(Scalar::Struct(StructData::try_new(
DV_SCHEMA_FIELDS.clone(),
vec![
Scalar::from(dv_result.storage_type.to_string()),
Scalar::from(dv_result.path_or_inline_dv.clone()),
Scalar::from(dv_result.offset),
Scalar::from(dv_result.size_in_bytes),
Scalar::from(dv_result.cardinality),
],
)?));
self.matched_file_indexes.push(row_index);
} else {
self.new_dv_entries.push(NULL_DV.clone());
}
}
self.new_dv_entries
.resize_with(num_rows, || NULL_DV.clone());
Ok(())
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::actions::deletion_vector::DeletionVectorStorageType;
use crate::arrow::array::{ArrayRef, StringArray};
use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema};
use crate::arrow::record_batch::RecordBatch;
use crate::engine::arrow_data::ArrowEngineData;
const TEST_PATH: &str = "data/file.parquet";
fn make_scan_metadata_row(
path: &str,
stats: Option<&str>,
extra_columns: &[&str],
) -> FilteredEngineData {
let mut fields = vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
ArrowField::new("stats", ArrowDataType::Utf8, true),
];
let mut columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(vec![Some(path)])),
Arc::new(StringArray::from(vec![stats])),
];
for name in extra_columns {
fields.push(ArrowField::new(*name, ArrowDataType::Utf8, true));
columns.push(Arc::new(StringArray::from(vec![Some("ignored")])));
}
let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns).unwrap();
FilteredEngineData::with_all_rows_selected(Box::new(ArrowEngineData::new(batch)))
}
fn dv_updates_for(path: &str) -> HashMap<String, DeletionVectorDescriptor> {
let descriptor = DeletionVectorDescriptor::try_new(
DeletionVectorStorageType::Inline,
"AAAAAAAA".to_string(),
None,
8,
0,
)
.unwrap();
HashMap::from([(path.to_string(), descriptor)])
}
#[rstest]
#[case::null_stats(None, &[], true)]
#[case::stats_missing_num_records(Some("{}"), &[], true)]
#[case::stats_with_num_records(Some(r#"{"numRecords":10}"#), &[], false)]
#[case::extra_columns_dont_shift_indexes(
Some(r#"{"numRecords":10}"#),
&["size", "modificationTime"],
false,
)]
fn dv_match_visitor_validates_matched_row_stats(
#[case] stats: Option<&str>,
#[case] extra_columns: &[&str],
#[case] expect_error: bool,
) {
let dv_updates = dv_updates_for(TEST_PATH);
let mut visitor = DvMatchVisitor::new(&dv_updates);
let data = make_scan_metadata_row(TEST_PATH, stats, extra_columns);
let result = visitor.visit_rows_of(&data);
if expect_error {
let msg = result.expect_err("expected error").to_string();
assert!(msg.contains(NUM_RECORDS), "message was: {msg}");
assert!(msg.contains(TEST_PATH), "message was: {msg}");
} else {
result.expect("validation should pass");
assert_eq!(visitor.matched_file_indexes, vec![0]);
}
}
#[test]
fn dv_match_visitor_skips_validation_for_unmatched_rows() {
let dv_updates = dv_updates_for("other/file.parquet");
let mut visitor = DvMatchVisitor::new(&dv_updates);
let data = make_scan_metadata_row(TEST_PATH, None, &[]);
visitor
.visit_rows_of(&data)
.expect("unmatched rows must not trigger stats validation");
assert!(visitor.matched_file_indexes.is_empty());
}
}