use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
use crate::log_replay::deduplicator::Deduplicator as _;
use crate::log_replay::{
ActionsBatch, FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor,
};
use crate::scan::data_skipping::DataSkippingFilter;
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, Error};
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, LazyLock};
pub(crate) struct ActionReconciliationProcessor {
seen_file_keys: HashSet<FileActionKey>,
seen_protocol: bool,
seen_metadata: bool,
seen_txns: HashSet<String>,
seen_domains: HashSet<String>,
minimum_file_retention_timestamp: i64,
txn_expiration_timestamp: Option<i64>,
}
pub(crate) struct ActionReconciliationBatch {
pub(crate) filtered_data: FilteredEngineData,
pub(crate) actions_count: i64,
pub(crate) add_actions_count: i64,
}
impl HasSelectionVector for ActionReconciliationBatch {
fn has_selected_rows(&self) -> bool {
self.filtered_data.has_selected_rows()
}
}
#[derive(Debug, Default)]
pub struct ActionReconciliationIteratorState {
actions_count: AtomicI64,
add_actions_count: AtomicI64,
is_exhausted: AtomicBool,
}
impl ActionReconciliationIteratorState {
pub fn actions_count(&self) -> i64 {
self.actions_count.load(Ordering::Acquire)
}
pub fn add_actions_count(&self) -> i64 {
self.add_actions_count.load(Ordering::Acquire)
}
pub fn is_exhausted(&self) -> bool {
self.is_exhausted.load(Ordering::Acquire)
}
}
pub struct ActionReconciliationIterator {
inner: Box<dyn Iterator<Item = DeltaResult<ActionReconciliationBatch>> + Send>,
state: Arc<ActionReconciliationIteratorState>,
}
impl ActionReconciliationIterator {
pub(crate) fn new(
inner: Box<dyn Iterator<Item = DeltaResult<ActionReconciliationBatch>> + Send>,
) -> Self {
Self {
inner,
state: Arc::new(ActionReconciliationIteratorState::default()),
}
}
pub fn state(&self) -> Arc<ActionReconciliationIteratorState> {
Arc::clone(&self.state)
}
fn transform_batch(
&mut self,
batch: Option<DeltaResult<ActionReconciliationBatch>>,
) -> Option<DeltaResult<FilteredEngineData>> {
let Some(batch) = batch else {
self.state.is_exhausted.store(true, Ordering::Release);
return None;
};
Some(batch.map(|batch| {
self.state
.actions_count
.fetch_add(batch.actions_count, Ordering::Release);
self.state
.add_actions_count
.fetch_add(batch.add_actions_count, Ordering::Release);
batch.filtered_data
}))
}
}
impl std::fmt::Debug for ActionReconciliationIterator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActionReconciliationIterator")
.field("state", &self.state)
.finish()
}
}
impl Iterator for ActionReconciliationIterator {
type Item = DeltaResult<FilteredEngineData>;
fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next();
self.transform_batch(batch)
}
}
impl LogReplayProcessor for ActionReconciliationProcessor {
type Output = ActionReconciliationBatch;
fn process_actions_batch(&mut self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
let ActionsBatch {
actions,
is_log_batch,
} = actions_batch;
let selection_vector = vec![true; actions.len()];
let mut visitor = ActionReconciliationVisitor::new(
&mut self.seen_file_keys,
is_log_batch,
selection_vector,
self.minimum_file_retention_timestamp,
self.seen_protocol,
self.seen_metadata,
&mut self.seen_txns,
&mut self.seen_domains,
self.txn_expiration_timestamp,
);
visitor.visit_rows_of(actions.as_ref())?;
self.seen_protocol = visitor.seen_protocol;
self.seen_metadata = visitor.seen_metadata;
let filtered_data = FilteredEngineData::try_new(actions, visitor.selection_vector)?;
Ok(ActionReconciliationBatch {
filtered_data,
actions_count: visitor.actions_count,
add_actions_count: visitor.add_actions_count,
})
}
fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> {
None
}
}
impl ActionReconciliationProcessor {
pub(crate) fn new(
minimum_file_retention_timestamp: i64,
txn_expiration_timestamp: Option<i64>,
) -> Self {
Self {
seen_file_keys: Default::default(),
seen_protocol: false,
seen_metadata: false,
seen_txns: Default::default(),
seen_domains: Default::default(),
minimum_file_retention_timestamp,
txn_expiration_timestamp,
}
}
}
pub(crate) struct ActionReconciliationVisitor<'seen> {
deduplicator: FileActionDeduplicator<'seen>,
selection_vector: Vec<bool>,
actions_count: i64,
add_actions_count: i64,
minimum_file_retention_timestamp: i64,
seen_protocol: bool,
seen_metadata: bool,
seen_txns: &'seen mut HashSet<String>,
seen_domains: &'seen mut HashSet<String>,
txn_expiration_timestamp: Option<i64>,
}
#[derive(Debug, Copy, Clone)]
struct GetterColumn {
index: usize,
name: &'static str,
}
impl GetterColumn {
const fn new(index: usize, name: &'static str) -> Self {
GetterColumn { index, name }
}
}
#[allow(unused)]
impl ActionReconciliationVisitor<'_> {
const ADD_PATH: GetterColumn = GetterColumn::new(0, "add.path");
const ADD_DV_STORAGE_TYPE: GetterColumn =
GetterColumn::new(1, "add.deletionVector.storageType");
const ADD_DV_PATH_OR_INLINE_DV: GetterColumn =
GetterColumn::new(2, "add.deletionVector.pathOrInlineDv");
const ADD_DV_OFFSET: GetterColumn = GetterColumn::new(3, "add.deletionVector.offset");
const REMOVE_PATH: GetterColumn = GetterColumn::new(4, "remove.path");
const REMOVE_DELETION_TIMESTAMP: GetterColumn =
GetterColumn::new(5, "remove.deletionTimestamp");
const REMOVE_DV_STORAGE_TYPE: GetterColumn =
GetterColumn::new(6, "remove.deletionVector.storageType");
const REMOVE_DV_PATH_OR_INLINE_DV: GetterColumn =
GetterColumn::new(7, "remove.deletionVector.pathOrInlineDv");
const REMOVE_DV_OFFSET: GetterColumn = GetterColumn::new(8, "remove.deletionVector.offset");
const METADATA_ID: GetterColumn = GetterColumn::new(9, "metaData.id");
const PROTOCOL_MIN_READER_VERSION: GetterColumn =
GetterColumn::new(10, "protocol.minReaderVersion");
const TXN_APP_ID: GetterColumn = GetterColumn::new(11, "txn.appId");
const TXN_LAST_UPDATED: GetterColumn = GetterColumn::new(12, "txn.lastUpdated");
const DOMAIN_METADATA_DOMAIN: GetterColumn = GetterColumn::new(13, "domainMetadata.domain");
const DOMAIN_METADATA_REMOVED: GetterColumn = GetterColumn::new(14, "domainMetadata.removed");
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<'seen>(
seen_file_keys: &'seen mut HashSet<FileActionKey>,
is_log_batch: bool,
selection_vector: Vec<bool>,
minimum_file_retention_timestamp: i64,
seen_protocol: bool,
seen_metadata: bool,
seen_txns: &'seen mut HashSet<String>,
seen_domains: &'seen mut HashSet<String>,
txn_expiration_timestamp: Option<i64>,
) -> ActionReconciliationVisitor<'seen> {
ActionReconciliationVisitor {
deduplicator: FileActionDeduplicator::new(
seen_file_keys,
is_log_batch,
Self::ADD_PATH.index,
Self::REMOVE_PATH.index,
Self::ADD_DV_STORAGE_TYPE.index,
Self::REMOVE_DV_STORAGE_TYPE.index,
),
selection_vector,
actions_count: 0,
add_actions_count: 0,
minimum_file_retention_timestamp,
seen_protocol,
seen_metadata,
seen_txns,
seen_domains,
txn_expiration_timestamp,
}
}
fn is_expired_tombstone<'a>(&self, i: usize, getter: &'a dyn GetData<'a>) -> DeltaResult<bool> {
let deletion_timestamp = getter.get_opt(i, Self::REMOVE_DELETION_TIMESTAMP.name)?;
let deletion_timestamp = deletion_timestamp.unwrap_or(0i64);
Ok(deletion_timestamp <= self.minimum_file_retention_timestamp)
}
fn check_file_action<'a>(
&mut self,
i: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<bool>> {
let Some((file_key, is_add)) = self.deduplicator.extract_file_action(i, getters, false)?
else {
return Ok(None); };
let is_valid = if self.deduplicator.check_and_record_seen(file_key) {
false } else if is_add {
self.add_actions_count += 1;
true
} else {
!self.is_expired_tombstone(i, getters[Self::REMOVE_DELETION_TIMESTAMP.index])?
};
Ok(Some(is_valid))
}
fn check_protocol_action<'a>(
&mut self,
i: usize,
getter: &'a dyn GetData<'a>,
) -> DeltaResult<Option<bool>> {
let result = getter
.get_int(i, Self::PROTOCOL_MIN_READER_VERSION.name)?
.is_some()
.then(|| !std::mem::replace(&mut self.seen_protocol, true));
Ok(result)
}
fn check_metadata_action<'a>(
&mut self,
i: usize,
getter: &'a dyn GetData<'a>,
) -> DeltaResult<Option<bool>> {
let result = getter
.get_str(i, Self::METADATA_ID.name)?
.is_some()
.then(|| !std::mem::replace(&mut self.seen_metadata, true));
Ok(result)
}
fn check_txn_action<'a>(
&mut self,
i: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<bool>> {
let Some(app_id) = getters[Self::TXN_APP_ID.index].get_str(i, Self::TXN_APP_ID.name)?
else {
return Ok(None); };
if let Some(retention_ts) = self.txn_expiration_timestamp {
if let Some(last_updated) =
getters[Self::TXN_LAST_UPDATED.index].get_opt(i, Self::TXN_LAST_UPDATED.name)?
{
let last_updated: i64 = last_updated;
if last_updated <= retention_ts {
return Ok(Some(false));
}
}
}
Ok(Some(self.seen_txns.insert(app_id.to_string())))
}
fn check_domain_metadata_action<'a>(
&mut self,
i: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Option<bool>> {
let Some(domain) = getters[Self::DOMAIN_METADATA_DOMAIN.index]
.get_str(i, Self::DOMAIN_METADATA_DOMAIN.name)?
else {
return Ok(None); };
let removed: bool = getters[Self::DOMAIN_METADATA_REMOVED.index]
.get_opt(i, Self::DOMAIN_METADATA_REMOVED.name)?
.unwrap_or(false);
if removed {
return Ok(Some(false));
}
Ok(Some(self.seen_domains.insert(domain.to_string())))
}
pub(crate) fn is_valid_action<'a>(
&mut self,
i: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<bool> {
let is_valid = if let Some(result) = self.check_file_action(i, getters)? {
result
} else if let Some(result) = self.check_txn_action(i, getters)? {
result
} else if let Some(result) = self.check_domain_metadata_action(i, getters)? {
result
} else if let Some(result) =
self.check_protocol_action(i, getters[Self::PROTOCOL_MIN_READER_VERSION.index])?
{
result
} else {
self.check_metadata_action(i, getters[Self::METADATA_ID.index])?
.unwrap_or_default()
};
if is_valid {
self.actions_count += 1;
}
Ok(is_valid)
}
}
impl RowVisitor for ActionReconciliationVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
const LONG: DataType = DataType::LONG;
const BOOLEAN: DataType = DataType::BOOLEAN;
let types_and_names = vec![
(STRING, column_name!("add.path")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
(STRING, column_name!("remove.path")),
(LONG, column_name!("remove.deletionTimestamp")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("remove.deletionVector.offset")),
(STRING, column_name!("metaData.id")),
(INTEGER, column_name!("protocol.minReaderVersion")),
(STRING, column_name!("txn.appId")),
(LONG, column_name!("txn.lastUpdated")),
(STRING, column_name!("domainMetadata.domain")),
(BOOLEAN, column_name!("domainMetadata.removed")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of visitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
self.selection_vector[i] = self.is_valid_action(i, getters)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::arrow::array::StringArray;
use crate::utils::test_utils::{action_batch, parse_json_batch};
use crate::Error;
use itertools::Itertools;
fn create_batch(json_strings: Vec<&str>) -> DeltaResult<ActionsBatch> {
let actions = parse_json_batch(StringArray::from(json_strings));
Ok(ActionsBatch::new(actions, true))
}
fn run_action_reconciliation_test(
input_batches: Vec<ActionsBatch>,
) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> {
let processed_batches: Vec<_> = ActionReconciliationProcessor::new(0, None)
.process_actions_iter(input_batches.into_iter().map(Ok))
.try_collect()?;
let total_count: i64 = processed_batches.iter().map(|b| b.actions_count).sum();
let add_count: i64 = processed_batches.iter().map(|b| b.add_actions_count).sum();
let filtered_data = processed_batches
.into_iter()
.map(|b| b.filtered_data)
.collect();
Ok((filtered_data, total_count, add_count))
}
#[test]
fn test_action_reconciliation_visitor() -> DeltaResult<()> {
let data = action_batch();
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true,
vec![true; 9],
0, false,
false,
&mut seen_txns,
&mut seen_domains,
None,
);
visitor.visit_rows_of(data.as_ref())?;
let expected = vec![
true, true, false, true, true, false, false, true, false, ];
assert_eq!(visitor.actions_count, 5);
assert_eq!(visitor.add_actions_count, 1);
assert!(visitor.seen_protocol);
assert!(visitor.seen_metadata);
assert_eq!(visitor.seen_txns.len(), 1);
assert_eq!(visitor.selection_vector, expected);
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_boundary_cases_for_tombstone_expiration(
) -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"remove":{"path":"exactly_at_threshold","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"one_below_threshold","deletionTimestamp":99,"dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"one_above_threshold","deletionTimestamp":101,"dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"missing_timestamp","dataChange":true,"partitionValues":{}}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true,
vec![true; 4],
100, false,
false,
&mut seen_txns,
&mut seen_domains,
None,
);
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![false, false, true, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.actions_count, 1);
assert_eq!(visitor.add_actions_count, 0);
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_file_actions_in_batch() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
false, vec![true; 1],
0,
false,
false,
&mut seen_txns,
&mut seen_domains,
None,
);
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![true];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.actions_count, 1);
assert_eq!(visitor.add_actions_count, 1);
assert!(seen_file_keys.is_empty());
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true,
vec![true; 3],
0,
false,
false,
&mut seen_txns,
&mut seen_domains,
None,
);
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![true, true, true];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.actions_count, 3);
assert_eq!(visitor.add_actions_count, 1);
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_already_seen_non_file_actions() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
].into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
seen_txns.insert("app1".to_string());
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true,
vec![true; 3],
0,
true, true, &mut seen_txns, &mut seen_domains,
None,
);
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![false, false, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.actions_count, 0);
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_duplicate_non_file_actions() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#, r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true, vec![true; 7],
0, false,
false,
&mut seen_txns,
&mut seen_domains,
None,
);
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![true, false, true, true, false, true, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.seen_txns.len(), 2); assert_eq!(visitor.actions_count, 4);
Ok(())
}
#[test]
fn test_action_reconciliation_actions_iter_non_file_actions() -> DeltaResult<()> {
let batch1 = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
];
let batch2 = vec![
r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#,
r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#,
];
let batch3 = vec![r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#];
let input_batches = vec![
create_batch(batch1)?,
create_batch(batch2)?,
create_batch(batch3)?,
];
let (results, actions_count, add_actions) = run_action_reconciliation_test(input_batches)?;
assert_eq!(results.len(), 2, "Expected two batches in results");
assert_eq!(results[0].selection_vector(), &vec![true, true, true]);
assert_eq!(
results[1].selection_vector(),
&vec![false, false, false, true]
);
assert_eq!(actions_count, 4);
assert_eq!(add_actions, 0);
Ok(())
}
#[test]
fn test_action_reconciliation_actions_iter_file_actions() -> DeltaResult<()> {
let batch1 = vec![
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
];
let batch2 = vec![
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
];
let batch3 = vec![
r#"{"add":{"path":"file2","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
];
let input_batches = vec![
create_batch(batch1)?,
create_batch(batch2)?,
create_batch(batch3)?,
];
let (results, actions_count, add_actions) = run_action_reconciliation_test(input_batches)?;
assert_eq!(results.len(), 2); assert_eq!(results[0].selection_vector(), &vec![true]);
assert_eq!(results[1].selection_vector(), &vec![false, true]);
assert_eq!(actions_count, 2);
assert_eq!(add_actions, 1);
Ok(())
}
#[test]
fn test_action_reconciliation_actions_iter_file_actions_with_deletion_vectors(
) -> DeltaResult<()> {
let batch1 = vec![
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
];
let batch2 = vec![
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
];
let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?];
let (results, actions_count, add_actions) = run_action_reconciliation_test(input_batches)?;
assert_eq!(results.len(), 2);
assert_eq!(results[0].selection_vector(), &vec![true, true]);
assert_eq!(results[1].selection_vector(), &vec![false, false, true]);
assert_eq!(actions_count, 3);
assert_eq!(add_actions, 2);
Ok(())
}
#[test]
fn test_action_reconciliation_visitor_txn_retention() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":100}}"#,
r#"{"txn":{"appId":"app2","version":2,"lastUpdated":2000}}"#,
r#"{"txn":{"appId":"app3","version":3}}"#,
r#"{"txn":{"appId":"app4","version":4,"lastUpdated":1000}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = ActionReconciliationVisitor::new(
&mut seen_file_keys,
true,
vec![true; 4],
0,
false,
false,
&mut seen_txns,
&mut seen_domains,
Some(1000), );
visitor.visit_rows_of(batch.as_ref())?;
let expected = vec![false, true, true, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.actions_count, 2);
assert_eq!(visitor.seen_txns.len(), 2);
assert!(visitor.seen_txns.contains("app2"));
assert!(visitor.seen_txns.contains("app3"));
Ok(())
}
#[test]
fn test_action_reconciliation_actions_iter_with_txn_retention() -> DeltaResult<()> {
let batch1 = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[]}","partitionColumns":[],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"txn":{"appId":"old_app","version":1,"lastUpdated":100}}"#,
r#"{"txn":{"appId":"new_app","version":2,"lastUpdated":2000}}"#,
];
let batch2 = vec![
r#"{"txn":{"appId":"timeless_app","version":3}}"#,
r#"{"txn":{"appId":"another_old","version":4,"lastUpdated":500}}"#,
];
let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?];
let processor = ActionReconciliationProcessor::new(0, Some(1000));
let results: Vec<_> = processor
.process_actions_iter(input_batches.into_iter().map(Ok))
.try_collect()?;
assert_eq!(results.len(), 2);
assert_eq!(
results[0].filtered_data.selection_vector(),
vec![true, true, false, true]
);
assert_eq!(results[0].actions_count, 3);
assert_eq!(
results[1].filtered_data.selection_vector(),
vec![true, false]
);
assert_eq!(results[1].actions_count, 1);
Ok(())
}
mod test_mocks {
use super::*;
pub(super) struct MockErrorGetData {
error_on_field: &'static str,
error_type: &'static str,
}
impl MockErrorGetData {
pub(super) fn new(error_on_field: &'static str, error_type: &'static str) -> Self {
Self {
error_on_field,
error_type,
}
}
pub(super) fn default() -> Self {
Self::new("", "")
}
}
impl<'a> GetData<'a> for MockErrorGetData {
fn get_str(&'a self, _: usize, field_name: &str) -> DeltaResult<Option<&'a str>> {
if field_name == self.error_on_field && self.error_type == "str" {
Err(
Error::UnexpectedColumnType(format!("{field_name} is not of type str"))
.with_backtrace(),
)
} else {
Ok(None)
}
}
fn get_int(&'a self, _: usize, field_name: &str) -> DeltaResult<Option<i32>> {
if field_name == self.error_on_field && self.error_type == "int" {
Err(
Error::UnexpectedColumnType(format!("{field_name} is not of type i32"))
.with_backtrace(),
)
} else {
Ok(None)
}
}
}
pub(super) struct FlexibleMock {
pub(super) error_field: &'static str,
}
impl<'a> GetData<'a> for FlexibleMock {
fn get_str(&'a self, _: usize, field_name: &str) -> DeltaResult<Option<&'a str>> {
if field_name == "txn.appId" {
Ok(Some("test_app"))
} else if field_name == "remove.path" {
Ok(Some("test_path"))
} else {
Ok(None)
}
}
fn get_long(&'a self, _: usize, field_name: &str) -> DeltaResult<Option<i64>> {
if field_name.contains(self.error_field) {
Err(
Error::UnexpectedColumnType(format!("{field_name} is not of type i64"))
.with_backtrace(),
)
} else {
Ok(None)
}
}
}
}
use test_mocks::*;
fn create_test_visitor<'a>(
seen_file_keys: &'a mut HashSet<FileActionKey>,
seen_txns: &'a mut HashSet<String>,
seen_domains: &'a mut HashSet<String>,
txn_expiration_timestamp: Option<i64>,
) -> ActionReconciliationVisitor<'a> {
ActionReconciliationVisitor::new(
seen_file_keys,
true,
vec![true; 1],
0,
false,
false,
seen_txns,
seen_domains,
txn_expiration_timestamp,
)
}
fn create_getters_with_error_at_index(
error_index: usize,
error_field: &'static str,
error_type: &'static str,
) -> Vec<MockErrorGetData> {
(0..15)
.map(|i| {
if i == error_index {
MockErrorGetData::new(error_field, error_type)
} else {
MockErrorGetData::default()
}
})
.collect()
}
#[test]
fn test_action_reconciliation_visitor_validation_and_type_errors() {
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor =
create_test_visitor(&mut seen_file_keys, &mut seen_txns, &mut seen_domains, None);
let getter = MockErrorGetData::default();
let getters = vec![&getter as &dyn GetData<'_>; 5]; let result = visitor.visit(1, &getters);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Wrong number of visitor getters"));
let test_cases = [
(0, "add.path", "str", "add.path is not of type str"),
(9, "metaData.id", "str", "metaData.id is not of type str"),
(
10,
"protocol.minReaderVersion",
"int",
"protocol.minReaderVersion is not of type i32",
),
(11, "txn.appId", "str", "txn.appId is not of type str"),
];
for (getter_index, field_name, error_type, expected_error_text) in test_cases {
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor =
create_test_visitor(&mut seen_file_keys, &mut seen_txns, &mut seen_domains, None);
let getters = create_getters_with_error_at_index(getter_index, field_name, error_type);
let getter_refs: Vec<&dyn GetData<'_>> =
getters.iter().map(|g| g as &dyn GetData<'_>).collect();
let result = visitor.visit(1, &getter_refs);
assert!(result.is_err(), "Expected error for {field_name}");
assert!(result
.unwrap_err()
.to_string()
.contains(expected_error_text));
}
}
#[test]
fn test_action_reconciliation_visitor_complex_field_errors() {
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor = create_test_visitor(
&mut seen_file_keys,
&mut seen_txns,
&mut seen_domains,
Some(1000),
);
let defaults = (0..11)
.map(|_| MockErrorGetData::default())
.collect::<Vec<_>>();
let error_mock = FlexibleMock {
error_field: "lastUpdated",
};
let domain_default = MockErrorGetData::default();
let domain_removed_default = MockErrorGetData::default();
let mut getters: Vec<&dyn GetData<'_>> =
defaults.iter().map(|g| g as &dyn GetData<'_>).collect();
getters.push(&error_mock); getters.push(&error_mock);
getters.push(&domain_default); getters.push(&domain_removed_default); let result = visitor.visit(1, &getters);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("lastUpdated is not of type i64"));
let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut seen_domains = HashSet::new();
let mut visitor =
create_test_visitor(&mut seen_file_keys, &mut seen_txns, &mut seen_domains, None);
let defaults = (0..4)
.map(|_| MockErrorGetData::default())
.collect::<Vec<_>>();
let error_mock = FlexibleMock {
error_field: "deletionTimestamp",
};
let defaults2 = (0..9)
.map(|_| MockErrorGetData::default())
.collect::<Vec<_>>();
let mut getters: Vec<&dyn GetData<'_>> =
defaults.iter().map(|g| g as &dyn GetData<'_>).collect();
getters.push(&error_mock); getters.push(&error_mock); getters.extend(defaults2.iter().map(|g| g as &dyn GetData<'_>));
let result = visitor.visit(1, &getters);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("deletionTimestamp is not of type i64"));
}
#[test]
fn test_action_reconciliation_processor_error_propagation() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"add":{"path":"test","partitionValues":{},"size":100,"modificationTime":123,"dataChange":true}}"#,
].into();
let actions = parse_json_batch(json_strings);
let batch = ActionsBatch::new(actions, true);
let mut processor = ActionReconciliationProcessor::new(0, None);
let result = processor.process_actions_batch(batch);
assert!(result.is_ok());
let action_reconciliation_batch = result.unwrap();
assert_eq!(action_reconciliation_batch.actions_count, 1);
assert_eq!(action_reconciliation_batch.add_actions_count, 1);
Ok(())
}
}