use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, LazyLock, OnceLock};
use delta_kernel_derive::internal_api;
use tracing::instrument;
use super::Transaction;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::get_log_add_schema;
use crate::committer::Committer;
use crate::engine_data::{
FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, TypedGetData,
};
use crate::error::Error;
use crate::expressions::{column_name, ArrayData, ColumnName, Scalar, StructData, Transform};
use crate::scan::data_skipping::stats_schema::schema_with_all_fields_nullable;
use crate::scan::log_replay::get_scan_metadata_transform_expr;
use crate::scan::{restored_add_schema, scan_row_schema};
use crate::schema::{ArrayType, SchemaRef, StructField, StructType, ToSchema};
use crate::snapshot::SnapshotRef;
use crate::table_features::{Operation, TableFeature};
use crate::utils::current_time_ms;
use crate::{DataType, DeltaResult, Engine, Expression};
impl Transaction {
pub(crate) fn try_new_existing_table(
snapshot: impl Into<SnapshotRef>,
committer: Box<dyn Committer>,
engine: &dyn Engine,
) -> DeltaResult<Self> {
let read_snapshot = snapshot.into();
read_snapshot
.table_configuration()
.ensure_operation_supported(Operation::Write)?;
let clustering_columns = read_snapshot.get_physical_clustering_columns(engine)?;
let commit_timestamp = current_time_ms()?;
let span = tracing::info_span!(
"txn",
path = %read_snapshot.table_root(),
read_version = read_snapshot.version(),
);
let effective_table_config = read_snapshot.table_configuration().clone();
Ok(Transaction {
span,
read_snapshot_opt: Some(read_snapshot),
effective_table_config,
should_emit_protocol: false,
should_emit_metadata: false,
committer,
operation: None,
engine_info: None,
add_files_metadata: vec![],
remove_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
user_domain_metadata_additions: vec![],
system_domain_metadata_additions: vec![],
user_domain_removals: vec![],
data_change: true,
engine_commit_info: None,
is_blind_append: false,
dv_matched_files: vec![],
physical_clustering_columns: clustering_columns,
shared_write_state: OnceLock::new(),
_state: PhantomData,
})
}
pub fn with_blind_append(mut self) -> Self {
self.is_blind_append = true;
self
}
pub fn with_operation(mut self, operation: String) -> Self {
self.operation = Some(operation);
self
}
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
self.user_domain_removals.push(domain);
self
}
pub fn remove_files(&mut self, remove_metadata: FilteredEngineData) {
self.remove_files_metadata.push(remove_metadata);
}
pub fn scan_metadata_to_engine_data(
scan_metadata: impl Iterator<Item = DeltaResult<crate::scan::ScanMetadata>>,
) -> impl Iterator<Item = DeltaResult<FilteredEngineData>> {
scan_metadata.map(|result| result.map(|metadata| metadata.scan_files))
}
#[internal_api]
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
#[instrument(
name = "txn.update_dvs",
skip_all,
fields(num_dv_updates = new_dv_descriptors.len()),
err
)]
pub(crate) fn update_deletion_vectors(
&mut self,
new_dv_descriptors: HashMap<String, DeletionVectorDescriptor>,
existing_data_files: impl Iterator<Item = DeltaResult<FilteredEngineData>>,
) -> DeltaResult<()> {
if self.is_create_table() {
return Err(Error::generic(
"Deletion vector operations require an existing table",
));
}
if !self
.effective_table_config
.is_feature_supported(&TableFeature::DeletionVectors)
{
return Err(Error::unsupported(
"Deletion vector operations require reader version 3, writer version 7, \
and the 'deletionVectors' feature in both reader and writer features",
));
}
let mut matched_dv_files = 0;
let mut visitor = DvMatchVisitor::new(&new_dv_descriptors);
for scan_file_result in existing_data_files {
let scan_file = scan_file_result?;
visitor.new_dv_entries.clear();
visitor.matched_file_indexes.clear();
visitor.visit_rows_of(&scan_file)?;
let (data, mut selection_vector) = scan_file.into_parts();
let mut current_matched_index = 0;
for (i, selected) in selection_vector.iter_mut().enumerate() {
if current_matched_index < visitor.matched_file_indexes.len() {
if visitor.matched_file_indexes[current_matched_index] != i {
*selected = false;
} else {
current_matched_index += 1;
matched_dv_files += if *selected { 1 } else { 0 };
}
} else {
*selected = false;
}
}
let new_columns = vec![ArrayData::try_new(
struct_deletion_vector_schema().clone(),
visitor.new_dv_entries.clone(),
)?];
self.dv_matched_files.push(FilteredEngineData::try_new(
data.append_columns(new_dv_column_schema().clone(), new_columns)?,
selection_vector,
)?);
}
if matched_dv_files != new_dv_descriptors.len() {
return Err(Error::generic(format!(
"Number of matched DV files does not match number of new DV descriptors: {} != {}",
matched_dv_files,
new_dv_descriptors.len()
)));
}
Ok(())
}
}
static NEW_DELETION_VECTOR_NAME: &str = "newDeletionVector";
static INTERMEDIATE_DV_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(
scan_row_schema()
.fields()
.cloned()
.chain([StructField::nullable(
NEW_DELETION_VECTOR_NAME.to_string(),
DeletionVectorDescriptor::to_schema(),
)]),
))
});
fn intermediate_dv_schema() -> &'static SchemaRef {
&INTERMEDIATE_DV_SCHEMA
}
#[allow(clippy::panic)]
static NULLABLE_SCAN_ROWS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
schema_with_all_fields_nullable(scan_row_schema().as_ref())
.unwrap_or_else(|_| panic!("Failed to transform scan_row_schema"))
.into()
});
fn nullable_scan_rows_schema() -> &'static SchemaRef {
&NULLABLE_SCAN_ROWS_SCHEMA
}
#[allow(clippy::panic)]
static NULLABLE_RESTORED_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
schema_with_all_fields_nullable(restored_add_schema())
.unwrap_or_else(|_| panic!("Failed to transform restored_add_schema"))
.into()
});
fn nullable_restored_add_schema() -> &'static SchemaRef {
&NULLABLE_RESTORED_ADD_SCHEMA
}
#[allow(clippy::panic)]
static NULLABLE_ADD_LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
schema_with_all_fields_nullable(get_log_add_schema())
.unwrap_or_else(|_| panic!("Failed to transform nullable_restored_add_schema"))
.into()
});
fn nullable_add_log_schema() -> &'static SchemaRef {
&NULLABLE_ADD_LOG_SCHEMA
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
static STRUCT_DELETION_VECTOR_SCHEMA: LazyLock<ArrayType> =
LazyLock::new(|| ArrayType::new(DeletionVectorDescriptor::to_schema().into(), true));
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn struct_deletion_vector_schema() -> &'static ArrayType {
&STRUCT_DELETION_VECTOR_SCHEMA
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
static NEW_DV_COLUMN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(vec![StructField::nullable(
NEW_DELETION_VECTOR_NAME,
DeletionVectorDescriptor::to_schema(),
)]))
});
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn new_dv_column_schema() -> &'static SchemaRef {
&NEW_DV_COLUMN_SCHEMA
}
impl<S> Transaction<S> {
pub(super) fn generate_dv_update_actions<'a>(
&'a self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
if self.is_create_table() && !self.dv_matched_files.is_empty() {
return Err(crate::error::Error::internal_error(
"CREATE TABLE transaction cannot have DV update actions",
));
}
static COLUMNS_TO_DROP: &[&str] = &[NEW_DELETION_VECTOR_NAME];
let remove_actions =
self.generate_remove_actions(engine, self.dv_matched_files.iter(), COLUMNS_TO_DROP)?;
let add_actions = self.generate_adds_for_dv_update(engine, self.dv_matched_files.iter())?;
Ok(remove_actions.chain(add_actions))
}
fn generate_adds_for_dv_update<'a>(
&'a self,
engine: &'a dyn Engine,
file_metadata_batch: impl Iterator<Item = &'a FilteredEngineData> + Send + 'a,
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
let evaluation_handler = engine.evaluation_handler();
let with_new_dv_transform = Expression::transform(
Transform::new_top_level()
.with_replaced_field(
"deletionVector",
Expression::column([NEW_DELETION_VECTOR_NAME]).into(),
)
.with_dropped_field(NEW_DELETION_VECTOR_NAME),
);
let with_new_dv_eval = evaluation_handler.new_expression_evaluator(
intermediate_dv_schema().clone(),
Arc::new(with_new_dv_transform),
nullable_scan_rows_schema().clone().into(),
)?;
let restored_add_eval = evaluation_handler.new_expression_evaluator(
nullable_scan_rows_schema().clone(),
get_scan_metadata_transform_expr(),
nullable_restored_add_schema().clone().into(),
)?;
let with_data_change_transform =
Arc::new(Expression::struct_from([Expression::transform(
Transform::new_nested(["add"]).with_inserted_field(
Some("modificationTime"),
Expression::literal(self.data_change).into(),
),
)]));
let with_data_change_eval = evaluation_handler.new_expression_evaluator(
nullable_restored_add_schema().clone(),
with_data_change_transform,
nullable_add_log_schema().clone().into(),
)?;
Ok(file_metadata_batch.map(
move |file_metadata_batch| -> DeltaResult<FilteredEngineData> {
let with_new_dv_data = with_new_dv_eval.evaluate(file_metadata_batch.data())?;
let as_partial_add_data = restored_add_eval.evaluate(with_new_dv_data.as_ref())?;
let with_data_change_data =
with_data_change_eval.evaluate(as_partial_add_data.as_ref())?;
FilteredEngineData::try_new(
with_data_change_data,
file_metadata_batch.selection_vector().to_vec(),
)
},
))
}
}
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
struct DvMatchVisitor<'a> {
dv_updates: &'a HashMap<String, DeletionVectorDescriptor>,
new_dv_entries: Vec<Scalar>,
matched_file_indexes: Vec<usize>,
}
impl<'a> DvMatchVisitor<'a> {
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
const PATH_INDEX: usize = 0;
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn new(dv_updates: &'a HashMap<String, DeletionVectorDescriptor>) -> Self {
Self {
dv_updates,
new_dv_entries: Vec::new(),
matched_file_indexes: Vec::new(),
}
}
}
impl FilteredRowVisitor for DvMatchVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<(Vec<ColumnName>, Vec<DataType>)> = LazyLock::new(|| {
let names = vec![column_name!("path")];
let types = vec![DataType::STRING];
(names, types)
});
(&NAMES_AND_TYPES.0, &NAMES_AND_TYPES.1)
}
fn visit_filtered<'a>(
&mut self,
getters: &[&'a dyn GetData<'a>],
rows: RowIndexIterator<'_>,
) -> DeltaResult<()> {
static NULL_DV: LazyLock<Scalar> =
LazyLock::new(|| Scalar::Null(DataType::from(DeletionVectorDescriptor::to_schema())));
static DV_SCHEMA_FIELDS: LazyLock<Vec<StructField>> = LazyLock::new(|| {
DeletionVectorDescriptor::to_schema()
.into_fields()
.collect()
});
let num_rows = rows.num_rows();
self.new_dv_entries.reserve(num_rows);
for row_index in rows {
self.new_dv_entries
.resize_with(row_index, || NULL_DV.clone());
let path_opt: Option<String> = getters[Self::PATH_INDEX].get_opt(row_index, "path")?;
let Some(path) = path_opt else {
self.new_dv_entries.push(NULL_DV.clone());
continue;
};
if let Some(dv_result) = self.dv_updates.get(&path) {
self.new_dv_entries.push(Scalar::Struct(StructData::try_new(
DV_SCHEMA_FIELDS.clone(),
vec![
Scalar::from(dv_result.storage_type.to_string()),
Scalar::from(dv_result.path_or_inline_dv.clone()),
Scalar::from(dv_result.offset),
Scalar::from(dv_result.size_in_bytes),
Scalar::from(dv_result.cardinality),
],
)?));
self.matched_file_indexes.push(row_index);
} else {
self.new_dv_entries.push(NULL_DV.clone());
}
}
self.new_dv_entries
.resize_with(num_rows, || NULL_DV.clone());
Ok(())
}
}