use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use delta_kernel_derive::internal_api;
use super::deletion_vector::DeletionVectorDescriptor;
use super::set_transaction::is_set_txn_expired;
use super::*;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::log_segment::DomainMetadataMap;
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType, Schema, StructField};
use crate::utils::require;
use crate::{DeltaResult, Error};
#[derive(Default)]
#[internal_api]
pub(crate) struct MetadataVisitor {
pub(crate) metadata: Option<Metadata>,
}
impl RowVisitor for MetadataVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Metadata::to_schema().leaves(METADATA_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(metadata) = visit_metadata_at(i, getters)? {
self.metadata = Some(metadata);
break;
}
}
Ok(())
}
}
#[derive(Default)]
pub(crate) struct SelectionVectorVisitor {
pub(crate) selection_vector: Vec<bool>,
pub(crate) num_filtered: u64,
}
impl RowVisitor for SelectionVectorVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| (vec![column_name!("output")], vec![DataType::BOOLEAN]).into());
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 1,
Error::InternalError(format!(
"Wrong number of SelectionVectorVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
let selected: bool = getters[0].get(i, "selectionvector.output")?;
if !selected {
self.num_filtered += 1;
}
self.selection_vector.push(selected);
}
Ok(())
}
}
#[derive(Default)]
#[internal_api]
pub(crate) struct ProtocolVisitor {
pub(crate) protocol: Option<Protocol>,
}
impl RowVisitor for ProtocolVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Protocol::to_schema().leaves(PROTOCOL_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(protocol) = visit_protocol_at(i, getters)? {
self.protocol = Some(protocol);
break;
}
}
Ok(())
}
}
#[allow(unused)]
#[derive(Default)]
#[internal_api]
pub(crate) struct AddVisitor {
pub(crate) adds: Vec<Add>,
}
impl AddVisitor {
#[internal_api]
fn visit_add<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Add> {
require!(
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of AddVisitor getters: {}",
getters.len()
))
);
let partition_values: HashMap<_, _> = getters[1].get(row_index, "add.partitionValues")?;
let size: i64 = getters[2].get(row_index, "add.size")?;
let modification_time: i64 = getters[3].get(row_index, "add.modificationTime")?;
let data_change: bool = getters[4].get(row_index, "add.dataChange")?;
let stats: Option<String> = getters[5].get_opt(row_index, "add.stats")?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;
let base_row_id: Option<i64> = getters[12].get_opt(row_index, "add.base_row_id")?;
let default_row_commit_version: Option<i64> =
getters[13].get_opt(row_index, "add.default_row_commit")?;
let clustering_provider: Option<String> =
getters[14].get_opt(row_index, "add.clustering_provider")?;
Ok(Add {
path,
partition_values,
size,
modification_time,
data_change,
stats,
tags: None,
deletion_vector,
base_row_id,
default_row_commit_version,
clustering_provider,
})
}
pub(crate) fn names_and_types() -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Add::to_schema().leaves(ADD_NAME));
NAMES_AND_TYPES.as_ref()
}
}
impl RowVisitor for AddVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
Self::names_and_types()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(path) = getters[0].get_opt(i, "add.path")? {
self.adds.push(Self::visit_add(i, path, getters)?);
}
}
Ok(())
}
}
#[allow(unused)]
#[derive(Default)]
#[internal_api]
pub(crate) struct RemoveVisitor {
pub(crate) removes: Vec<Remove>,
}
impl RemoveVisitor {
#[internal_api]
pub(crate) fn visit_remove<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Remove> {
require!(
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of RemoveVisitor getters: {}",
getters.len()
))
);
let deletion_timestamp: Option<i64> =
getters[1].get_opt(row_index, "remove.deletionTimestamp")?;
let data_change: bool = getters[2].get(row_index, "remove.dataChange")?;
let extended_file_metadata: Option<bool> =
getters[3].get_opt(row_index, "remove.extendedFileMetadata")?;
let partition_values: Option<HashMap<_, _>> =
getters[4].get_opt(row_index, "remove.partitionValues")?;
let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;
let stats: Option<String> = getters[6].get_opt(row_index, "remove.stats")?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..])?;
let base_row_id: Option<i64> = getters[13].get_opt(row_index, "remove.baseRowId")?;
let default_row_commit_version: Option<i64> =
getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?;
Ok(Remove {
path,
data_change,
deletion_timestamp,
extended_file_metadata,
partition_values,
size,
stats,
tags: None,
deletion_vector,
base_row_id,
default_row_commit_version,
})
}
pub(crate) fn names_and_types() -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Remove::to_schema().leaves(REMOVE_NAME));
NAMES_AND_TYPES.as_ref()
}
}
impl RowVisitor for RemoveVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
Self::names_and_types()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(path) = getters[0].get_opt(i, "remove.path")? {
self.removes.push(Self::visit_remove(i, path, getters)?);
}
}
Ok(())
}
}
#[allow(unused)]
#[derive(Default)]
#[internal_api]
pub(crate) struct CdcVisitor {
pub(crate) cdcs: Vec<Cdc>,
}
impl CdcVisitor {
#[internal_api]
pub(crate) fn visit_cdc<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Cdc> {
Ok(Cdc {
path,
partition_values: getters[1].get(row_index, "cdc.partitionValues")?,
size: getters[2].get(row_index, "cdc.size")?,
data_change: getters[3].get(row_index, "cdc.dataChange")?,
tags: getters[4].get_opt(row_index, "cdc.tags")?,
})
}
}
impl RowVisitor for CdcVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Cdc::to_schema().leaves(CDC_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 5,
Error::InternalError(format!(
"Wrong number of CdcVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if let Some(path) = getters[0].get_opt(i, "cdc.path")? {
self.cdcs.push(Self::visit_cdc(i, path, getters)?);
}
}
Ok(())
}
}
pub(crate) type SetTransactionMap = HashMap<String, SetTransaction>;
#[derive(Default, Debug)]
#[internal_api]
pub(crate) struct SetTransactionVisitor {
pub(crate) set_transactions: SetTransactionMap,
pub(crate) application_id: Option<String>,
expiration_timestamp: Option<i64>,
}
impl SetTransactionVisitor {
pub(crate) fn new(application_id: Option<String>, expiration_timestamp: Option<i64>) -> Self {
SetTransactionVisitor {
set_transactions: HashMap::default(),
application_id,
expiration_timestamp,
}
}
#[internal_api]
pub(crate) fn visit_txn<'a>(
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<SetTransaction> {
require!(
getters.len() == 3,
Error::InternalError(format!(
"Wrong number of SetTransactionVisitor getters: {}",
getters.len()
))
);
let version: i64 = getters[1].get(row_index, "txn.version")?;
let last_updated: Option<i64> = getters[2].get_opt(row_index, "txn.lastUpdated")?;
Ok(SetTransaction {
app_id,
version,
last_updated,
})
}
}
impl RowVisitor for SetTransactionVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| SetTransaction::to_schema().leaves(SET_TRANSACTION_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(app_id) = getters[0].get_opt(i, "txn.appId")? {
if self
.application_id
.as_ref()
.is_none_or(|requested| requested.eq(&app_id))
{
let txn = SetTransactionVisitor::visit_txn(i, app_id, getters)?;
if is_set_txn_expired(self.expiration_timestamp, txn.last_updated) {
continue;
}
if !self.set_transactions.contains_key(&txn.app_id) {
self.set_transactions.insert(txn.app_id.clone(), txn);
}
}
}
}
Ok(())
}
}
#[derive(Default)]
#[internal_api]
pub(crate) struct SidecarVisitor {
pub(crate) sidecars: Vec<Sidecar>,
}
impl SidecarVisitor {
#[internal_api]
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}
impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}
#[derive(Debug, Default)]
pub(crate) struct DomainMetadataVisitor {
domain_metadatas: DomainMetadataMap,
domain_filter: Option<HashSet<String>>,
}
impl DomainMetadataVisitor {
pub(crate) fn new(domain_filter: Option<HashSet<String>>) -> Self {
DomainMetadataVisitor {
domain_filter,
..Default::default()
}
}
pub(crate) fn visit_domain_metadata<'a>(
row_index: usize,
domain: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<DomainMetadata> {
require!(
getters.len() == 3,
Error::InternalError(format!(
"Wrong number of DomainMetadataVisitor getters: {}",
getters.len()
))
);
let configuration: String = getters[1].get(row_index, "domainMetadata.configuration")?;
let removed: bool = getters[2].get(row_index, "domainMetadata.removed")?;
Ok(DomainMetadata {
domain,
configuration,
removed,
})
}
pub(crate) fn filter_found(&self) -> bool {
self.domain_filter
.as_ref()
.is_some_and(|filter| self.domain_metadatas.len() == filter.len())
}
pub(crate) fn into_domain_metadatas(mut self) -> DomainMetadataMap {
self.domain_metadatas.retain(|_, dm| !dm.removed);
self.domain_metadatas
}
}
impl RowVisitor for DomainMetadataVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| DomainMetadata::to_schema().leaves(DOMAIN_METADATA_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
let domain: Option<String> = getters[0].get_opt(i, "domainMetadata.domain")?;
if let Some(domain) = domain {
let filter = self.domain_filter.as_ref();
if filter.is_none_or(|requested| requested.contains(&domain)) {
if let Entry::Vacant(entry) = self.domain_metadatas.entry(domain.clone()) {
let domain_metadata =
DomainMetadataVisitor::visit_domain_metadata(i, domain, getters)?;
entry.insert(domain_metadata);
}
}
}
}
Ok(())
}
}
pub(crate) fn visit_deletion_vector_at<'a>(
row_index: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<DeletionVectorDescriptor>> {
let storage_type_opt: Option<String> =
getters[0].get_opt(row_index, "remove.deletionVector.storageType")?;
if let Some(storage_type_str) = storage_type_opt {
let storage_type = storage_type_str.parse()?;
let path_or_inline_dv: String =
getters[1].get(row_index, "deletionVector.pathOrInlineDv")?;
let offset: Option<i32> = getters[2].get_opt(row_index, "deletionVector.offset")?;
let size_in_bytes: i32 = getters[3].get(row_index, "deletionVector.sizeInBytes")?;
let cardinality: i64 = getters[4].get(row_index, "deletionVector.cardinality")?;
Ok(Some(DeletionVectorDescriptor {
storage_type,
path_or_inline_dv,
offset,
size_in_bytes,
cardinality,
}))
} else {
Ok(None)
}
}
#[internal_api]
pub(crate) fn visit_metadata_at<'a>(
row_index: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<Metadata>> {
require!(
getters.len() == 9,
Error::InternalError(format!(
"Wrong number of MetadataVisitor getters: {}",
getters.len()
))
);
let Some(id) = getters[0].get_opt(row_index, "metadata.id")? else {
return Ok(None);
};
let name: Option<String> = getters[1].get_opt(row_index, "metadata.name")?;
let description: Option<String> = getters[2].get_opt(row_index, "metadata.description")?;
let format_provider: String = getters[3].get(row_index, "metadata.format.provider")?;
let schema_string: String = getters[5].get(row_index, "metadata.schema_string")?;
let partition_columns: Vec<_> = getters[6].get(row_index, "metadata.partition_list")?;
let created_time: Option<i64> = getters[7].get_opt(row_index, "metadata.created_time")?;
let configuration_map_opt: Option<HashMap<_, _>> =
getters[8].get_opt(row_index, "metadata.configuration")?;
let configuration = configuration_map_opt.unwrap_or_else(HashMap::new);
Ok(Some(Metadata {
id,
name,
description,
format: Format {
provider: format_provider,
options: HashMap::new(),
},
schema_string,
partition_columns,
created_time,
configuration,
}))
}
#[internal_api]
pub(crate) fn visit_protocol_at<'a>(
row_index: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<Protocol>> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of ProtocolVisitor getters: {}",
getters.len()
))
);
let Some(min_reader_version) = getters[0].get_opt(row_index, "protocol.min_reader_version")?
else {
return Ok(None);
};
let min_writer_version: i32 = getters[1].get(row_index, "protocol.min_writer_version")?;
let reader_features: Option<Vec<_>> =
getters[2].get_opt(row_index, "protocol.reader_features")?;
let writer_features: Option<Vec<_>> =
getters[3].get_opt(row_index, "protocol.writer_features")?;
let protocol = Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features,
)?;
Ok(Some(protocol))
}
#[allow(unused)]
#[derive(Default)]
pub(crate) struct InCommitTimestampVisitor {
pub(crate) in_commit_timestamp: Option<i64>,
}
impl InCommitTimestampVisitor {
#[allow(unused)]
pub(crate) fn schema() -> Arc<Schema> {
static SCHEMA: LazyLock<Arc<Schema>> = LazyLock::new(|| {
let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true);
Arc::new(StructType::new_unchecked(vec![StructField::new(
COMMIT_INFO_NAME,
StructType::new_unchecked([ict_type]),
true,
)]))
});
SCHEMA.clone()
}
}
impl RowVisitor for InCommitTimestampVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [crate::schema::ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
let names = vec![column_name!("commitInfo.inCommitTimestamp")];
let types = vec![DataType::LONG];
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn crate::engine_data::GetData<'a>],
) -> DeltaResult<()> {
require!(
getters.len() == 1,
Error::InternalError(format!(
"Wrong number of InCommitTimestampVisitor getters: {}",
getters.len()
))
);
if row_count == 0 {
return Ok(());
}
if let Some(in_commit_timestamp) = getters[0].get_long(0, "commitInfo.inCommitTimestamp")? {
self.in_commit_timestamp = Some(in_commit_timestamp);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{BooleanArray, StringArray};
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use crate::arrow::record_batch::RecordBatch;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr_ref, Expression};
use crate::table_features::TableFeature;
use crate::utils::test_utils::{action_batch, parse_json_batch};
use crate::Engine;
#[test]
fn test_parse_protocol() -> DeltaResult<()> {
let data = action_batch();
let parsed = Protocol::try_new_from_data(data.as_ref())?.unwrap();
let expected = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![TableFeature::DeletionVectors]),
writer_features: Some(vec![TableFeature::DeletionVectors]),
};
assert_eq!(parsed, expected);
Ok(())
}
#[test]
fn test_parse_cdc() -> DeltaResult<()> {
let data = action_batch();
let mut visitor = CdcVisitor::default();
visitor.visit_rows_of(data.as_ref())?;
let expected = Cdc {
path: "_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("age".to_string(), "21".to_string()),
]),
size: 1033,
data_change: false,
tags: None
};
assert_eq!(&visitor.cdcs, &[expected]);
Ok(())
}
#[test]
fn test_parse_sidecar() -> DeltaResult<()> {
let data = action_batch();
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(data.as_ref())?;
let sidecar1 = Sidecar {
path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(),
size_in_bytes: 9268,
modification_time: 1714496113961,
tags: Some(HashMap::from([(
"tag_foo".to_string(),
"tag_bar".to_string(),
)])),
};
assert_eq!(visitor.sidecars.len(), 1);
assert_eq!(visitor.sidecars[0], sidecar1);
Ok(())
}
#[test]
fn test_parse_metadata() -> DeltaResult<()> {
let data = action_batch();
let parsed = Metadata::try_new_from_data(data.as_ref())?.unwrap();
use crate::table_properties::{
COLUMN_MAPPING_MODE, ENABLE_CHANGE_DATA_FEED, ENABLE_DELETION_VECTORS,
};
let configuration = HashMap::from_iter([
(ENABLE_DELETION_VECTORS.to_string(), "true".to_string()),
(COLUMN_MAPPING_MODE.to_string(), "none".to_string()),
(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string()),
]);
let expected = Metadata {
id: "testId".into(),
name: None,
description: None,
format: Format {
provider: "parquet".into(),
options: Default::default(),
},
schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(),
partition_columns: Vec::new(),
created_time: Some(1677811175819),
configuration,
};
assert_eq!(parsed, expected);
Ok(())
}
#[test]
fn test_parse_add_partitioned() {
let json_strings: StringArray = vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998136,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut add_visitor = AddVisitor::default();
add_visitor.visit_rows_of(batch.as_ref()).unwrap();
let add1 = Add {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
]),
size: 452,
modification_time: 1670892998135,
data_change: true,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()),
..Default::default()
};
let add2 = Add {
path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "5".to_string()),
("c2".to_string(), "b".to_string()),
]),
modification_time: 1670892998136,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}".into()),
..add1.clone()
};
let add3 = Add {
path: "c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "6".to_string()),
("c2".to_string(), "a".to_string()),
]),
modification_time: 1670892998137,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}".into()),
..add1.clone()
};
let expected = vec![add1, add2, add3];
assert_eq!(add_visitor.adds.len(), expected.len());
for (add, expected) in add_visitor.adds.into_iter().zip(expected) {
assert_eq!(add, expected);
}
}
#[test]
fn test_parse_remove_partitioned() {
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452,"stats":"{\"numRecords\":1}"}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();
let expected_remove = Remove {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet"
.into(),
deletion_timestamp: Some(1670892998135),
data_change: true,
partition_values: Some(HashMap::from([
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
])),
size: Some(452),
stats: Some(r#"{"numRecords":1}"#.to_string()),
..Default::default()
};
assert_eq!(
remove_visitor.removes.len(),
1,
"Unexpected number of remove actions"
);
assert_eq!(
remove_visitor.removes[0], expected_remove,
"Unexpected remove action"
);
}
#[test]
fn test_parse_remove_all_fields_unique() {
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"test-path.parquet","deletionTimestamp":1234567890,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"part":"value"},"size":9999,"stats":"{\"numRecords\":42}","deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":3},"baseRowId":100,"defaultRowCommitVersion":5}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();
assert_eq!(
remove_visitor.removes.len(),
1,
"Expected exactly one remove action"
);
let remove = &remove_visitor.removes[0];
assert_eq!(remove.path, "test-path.parquet", "path mismatch");
assert_eq!(
remove.deletion_timestamp,
Some(1234567890),
"deletion_timestamp mismatch"
);
assert!(!remove.data_change, "data_change mismatch");
assert_eq!(
remove.extended_file_metadata,
Some(true),
"extended_file_metadata mismatch"
);
assert_eq!(
remove.partition_values,
Some(HashMap::from([("part".to_string(), "value".to_string())])),
"partition_values mismatch"
);
assert_eq!(remove.size, Some(9999), "size mismatch");
assert_eq!(
remove.stats,
Some(r#"{"numRecords":42}"#.to_string()),
"stats mismatch"
);
let dv = remove
.deletion_vector
.as_ref()
.expect("deletion_vector should be present");
assert_eq!(
dv.path_or_inline_dv, "vBn[lx{q8@P<9BNH/isA",
"deletion_vector.path_or_inline_dv mismatch"
);
assert_eq!(dv.offset, Some(1), "deletion_vector.offset mismatch");
assert_eq!(
dv.size_in_bytes, 36,
"deletion_vector.size_in_bytes mismatch"
);
assert_eq!(dv.cardinality, 3, "deletion_vector.cardinality mismatch");
assert_eq!(
remove.base_row_id,
Some(100),
"base_row_id mismatch - check getter index"
);
assert_eq!(
remove.default_row_commit_version,
Some(5),
"default_row_commit_version mismatch - check getter index"
);
}
#[test]
fn test_parse_txn() {
let json_strings: StringArray = vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"txn":{"appId":"myApp","version": 3}}"#,
r#"{"txn":{"appId":"myApp2","version": 4, "lastUpdated": 1670892998177}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut txn_visitor = SetTransactionVisitor::default();
txn_visitor.visit_rows_of(batch.as_ref()).unwrap();
let mut actual = txn_visitor.set_transactions;
assert_eq!(
actual.remove("myApp2"),
Some(SetTransaction {
app_id: "myApp2".to_string(),
version: 4,
last_updated: Some(1670892998177),
})
);
assert_eq!(
actual.remove("myApp"),
Some(SetTransaction {
app_id: "myApp".to_string(),
version: 3,
last_updated: None,
})
);
}
#[test]
fn test_parse_domain_metadata() {
let json_strings: StringArray = vec![
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"domainMetadata":{"domain": "zach1","configuration":"cfg1","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach2","configuration":"cfg2","removed": false}}"#,
r#"{"domainMetadata":{"domain": "zach3","configuration":"cfg3","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach4","configuration":"cfg4","removed": false}}"#,
r#"{"domainMetadata":{"domain": "zach5","configuration":"cfg5","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach6","configuration":"cfg6","removed": false}}"#,
]
.into();
let commit_1 = parse_json_batch(json_strings);
let json_strings: StringArray = vec![
r#"{"domainMetadata":{"domain": "zach1","configuration":"old_cfg1","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach2","configuration":"old_cfg2","removed": false}}"#,
r#"{"domainMetadata":{"domain": "zach3","configuration":"old_cfg3","removed": false}}"#,
r#"{"domainMetadata":{"domain": "zach4","configuration":"old_cfg4","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach7","configuration":"cfg7","removed": true}}"#,
r#"{"domainMetadata":{"domain": "zach8","configuration":"cfg8","removed": false}}"#,
]
.into();
let commit_0 = parse_json_batch(json_strings);
let mut domain_metadata_visitor = DomainMetadataVisitor::default();
domain_metadata_visitor
.visit_rows_of(commit_1.as_ref())
.unwrap();
domain_metadata_visitor
.visit_rows_of(commit_0.as_ref())
.unwrap();
let actual = domain_metadata_visitor.domain_metadatas.clone();
let expected = DomainMetadataMap::from([
(
"zach1".to_string(),
DomainMetadata {
domain: "zach1".to_string(),
configuration: "cfg1".to_string(),
removed: true,
},
),
(
"zach2".to_string(),
DomainMetadata {
domain: "zach2".to_string(),
configuration: "cfg2".to_string(),
removed: false,
},
),
(
"zach3".to_string(),
DomainMetadata {
domain: "zach3".to_string(),
configuration: "cfg3".to_string(),
removed: true,
},
),
(
"zach4".to_string(),
DomainMetadata {
domain: "zach4".to_string(),
configuration: "cfg4".to_string(),
removed: false,
},
),
(
"zach5".to_string(),
DomainMetadata {
domain: "zach5".to_string(),
configuration: "cfg5".to_string(),
removed: true,
},
),
(
"zach6".to_string(),
DomainMetadata {
domain: "zach6".to_string(),
configuration: "cfg6".to_string(),
removed: false,
},
),
(
"zach7".to_string(),
DomainMetadata {
domain: "zach7".to_string(),
configuration: "cfg7".to_string(),
removed: true,
},
),
(
"zach8".to_string(),
DomainMetadata {
domain: "zach8".to_string(),
configuration: "cfg8".to_string(),
removed: false,
},
),
]);
assert_eq!(actual, expected);
let expected = DomainMetadataMap::from([
(
"zach2".to_string(),
DomainMetadata {
domain: "zach2".to_string(),
configuration: "cfg2".to_string(),
removed: false,
},
),
(
"zach4".to_string(),
DomainMetadata {
domain: "zach4".to_string(),
configuration: "cfg4".to_string(),
removed: false,
},
),
(
"zach6".to_string(),
DomainMetadata {
domain: "zach6".to_string(),
configuration: "cfg6".to_string(),
removed: false,
},
),
(
"zach8".to_string(),
DomainMetadata {
domain: "zach8".to_string(),
configuration: "cfg8".to_string(),
removed: false,
},
),
]);
assert_eq!(domain_metadata_visitor.into_domain_metadatas(), expected);
let mut domain_metadata_visitor =
DomainMetadataVisitor::new(Some(HashSet::from(["zach3".to_string()])));
domain_metadata_visitor
.visit_rows_of(commit_1.as_ref())
.unwrap();
domain_metadata_visitor
.visit_rows_of(commit_0.as_ref())
.unwrap();
let actual = domain_metadata_visitor.domain_metadatas.clone();
let expected = DomainMetadataMap::from([(
"zach3".to_string(),
DomainMetadata {
domain: "zach3".to_string(),
configuration: "cfg3".to_string(),
removed: true,
},
)]);
assert_eq!(actual, expected);
let expected = DomainMetadataMap::from([]);
assert_eq!(domain_metadata_visitor.into_domain_metadatas(), expected);
let mut domain_metadata_visitor =
DomainMetadataVisitor::new(Some(HashSet::from(["notexist".to_string()])));
domain_metadata_visitor
.visit_rows_of(commit_1.as_ref())
.unwrap();
domain_metadata_visitor
.visit_rows_of(commit_0.as_ref())
.unwrap();
assert!(domain_metadata_visitor.domain_metadatas.is_empty());
}
#[test]
fn test_domain_metadata_visitor_multi_domain_filter() {
let commit_1: Box<dyn EngineData> = parse_json_batch(
vec![
r#"{"domainMetadata":{"domain":"zach1","configuration":"cfg1","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach2","configuration":"cfg2","removed":false}}"#,
r#"{"domainMetadata":{"domain":"zach3","configuration":"cfg3","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach4","configuration":"cfg4","removed":false}}"#,
r#"{"domainMetadata":{"domain":"zach5","configuration":"cfg5","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach6","configuration":"cfg6","removed":false}}"#,
]
.into(),
);
let commit_0: Box<dyn EngineData> = parse_json_batch(
vec![
r#"{"domainMetadata":{"domain":"zach1","configuration":"old_cfg1","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach2","configuration":"old_cfg2","removed":false}}"#,
r#"{"domainMetadata":{"domain":"zach3","configuration":"old_cfg3","removed":false}}"#,
r#"{"domainMetadata":{"domain":"zach4","configuration":"old_cfg4","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach7","configuration":"cfg7","removed":true}}"#,
r#"{"domainMetadata":{"domain":"zach8","configuration":"cfg8","removed":false}}"#,
]
.into(),
);
let mut visitor = DomainMetadataVisitor::new(Some(HashSet::from([
"zach2".to_string(),
"zach4".to_string(),
])));
assert!(!visitor.filter_found()); visitor.visit_rows_of(commit_1.as_ref()).unwrap();
assert!(visitor.filter_found());
let result = visitor.into_domain_metadatas();
assert_eq!(result.len(), 2);
assert_eq!(result["zach2"].configuration, "cfg2");
assert_eq!(result["zach4"].configuration, "cfg4");
let mut visitor = DomainMetadataVisitor::new(Some(HashSet::from([
"zach2".to_string(),
"zach8".to_string(),
])));
visitor.visit_rows_of(commit_1.as_ref()).unwrap();
assert!(!visitor.filter_found());
visitor.visit_rows_of(commit_0.as_ref()).unwrap();
assert!(visitor.filter_found());
let result = visitor.into_domain_metadatas();
assert_eq!(result.len(), 2);
assert_eq!(result["zach2"].configuration, "cfg2");
assert_eq!(result["zach8"].configuration, "cfg8");
let mut visitor = DomainMetadataVisitor::new(Some(HashSet::from([
"zach3".to_string(),
"zach6".to_string(),
])));
visitor.visit_rows_of(commit_1.as_ref()).unwrap();
assert!(visitor.filter_found()); let result = visitor.into_domain_metadatas();
assert_eq!(result.len(), 1); assert_eq!(result["zach6"].configuration, "cfg6");
let mut visitor = DomainMetadataVisitor::new(Some(HashSet::from([
"ghost1".to_string(),
"ghost2".to_string(),
])));
visitor.visit_rows_of(commit_1.as_ref()).unwrap();
visitor.visit_rows_of(commit_0.as_ref()).unwrap();
assert!(!visitor.filter_found());
assert!(visitor.into_domain_metadatas().is_empty());
}
fn add_action() -> &'static str {
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#
}
fn commit_info_action() -> &'static str {
r#"{"commitInfo":{"inCommitTimestamp":1677811178585, "timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#
}
fn transform_batch(batch: Box<dyn EngineData>) -> Box<dyn EngineData> {
let engine = SyncEngine::new();
let expression =
Expression::struct_from([Arc::new(Expression::struct_from([column_expr_ref!(
"commitInfo.inCommitTimestamp"
)]))]);
engine
.evaluation_handler()
.new_expression_evaluator(
get_commit_schema().clone(),
expression.into(),
InCommitTimestampVisitor::schema().into(),
)
.unwrap()
.evaluate(batch.as_ref())
.unwrap()
}
fn run_timestamp_visitor_test(json_strings: Vec<&str>, expected_timestamp: Option<i64>) {
let json_strings: StringArray = json_strings.into();
let batch = parse_json_batch(json_strings);
let batch = transform_batch(batch);
let mut visitor = InCommitTimestampVisitor::default();
visitor.visit_rows_of(batch.as_ref()).unwrap();
assert_eq!(visitor.in_commit_timestamp, expected_timestamp);
}
#[test]
fn commit_info_not_first() {
run_timestamp_visitor_test(vec![add_action(), commit_info_action()], None);
}
#[test]
fn commit_info_not_present() {
run_timestamp_visitor_test(vec![add_action()], None);
}
#[test]
fn commit_info_get() {
run_timestamp_visitor_test(
vec![commit_info_action(), add_action()],
Some(1677811178585), );
}
fn create_boolean_batch(values: Vec<bool>) -> Box<dyn EngineData> {
let array = BooleanArray::from(values);
let arrow_schema = ArrowSchema::new(vec![Field::new("output", DataType::Boolean, false)]);
let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
Box::new(ArrowEngineData::new(batch))
}
#[rstest::rstest]
#[case::empty_batch(vec![], 0, "empty batch should have no filtered rows")]
#[case::all_selected(vec![true, true, true, true], 0, "all selected should have no filtered rows")]
#[case::all_filtered(vec![false, false, false, false, false], 5, "all filtered should count all rows")]
#[case::mixed_selection(vec![true, false, true, false, false, true], 3, "mixed selection should count false values")]
fn selection_vector_visitor_counter_accuracy(
#[case] input: Vec<bool>,
#[case] expected_filtered: u64,
#[case] _description: &str,
) {
let batch = create_boolean_batch(input.clone());
let mut visitor = SelectionVectorVisitor::default();
visitor.visit_rows_of(batch.as_ref()).unwrap();
assert_eq!(visitor.selection_vector, input);
assert_eq!(visitor.num_filtered, expected_filtered);
}
}