use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::actions::deletion_vector::{DeletionVectorDescriptor, DeletionVectorStorageType};
use delta_kernel::arrow::array::{Int32Array, RecordBatch};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::expressions::{column_expr, Scalar};
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::ObjectStoreExt as _;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::transaction::CommitResult;
use delta_kernel::{Expression as Expr, Predicate as Pred, Snapshot};
use itertools::Itertools;
use serde_json::Deserializer;
use tempfile::tempdir;
use test_utils::{
begin_transaction, copy_directory, create_default_engine, create_default_engine_mt_executor,
load_and_begin_transaction, read_actions_from_commit, setup_test_tables,
};
use url::Url;
use crate::common::write_utils::{
create_dv_table_with_files, get_scan_files, get_simple_int_schema, set_table_properties,
write_data_and_check_result_and_stats,
};
#[tokio::test]
async fn test_remove_files_adds_expected_entries() -> Result<(), Box<dyn std::error::Error>> {
use std::path::PathBuf;
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let tmp_table_path = tmp_dir.path().join("table-with-dv-small");
let source_path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
copy_directory(&source_path, &tmp_table_path)?;
let table_url = url::Url::from_directory_path(&tmp_table_path).unwrap();
let engine = create_default_engine(&table_url)?;
let snapshot = Snapshot::builder_for(table_url.clone())
.at_version(1)
.build(engine.as_ref())?;
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("test engine")
.with_data_change(true);
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
let remove_metadata = FilteredEngineData::try_new(data, selection_vector)?;
txn.remove_files(remove_metadata);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let commit_path = tmp_table_path.join(format!("_delta_log/{commit_version:020}.json"));
let commit_content = std::fs::read_to_string(commit_path)?;
let parsed_commits: Vec<_> = Deserializer::from_str(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert!(
parsed_commits.len() >= 2,
"Expected at least 2 actions (commitInfo + remove)"
);
let commit_info_action = parsed_commits
.iter()
.find(|action| action.get("commitInfo").is_some())
.expect("Missing commitInfo action");
let commit_info = &commit_info_action["commitInfo"];
let commit_timestamp = commit_info["timestamp"]
.as_i64()
.expect("Missing timestamp in commitInfo");
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
assert!(
!remove_actions.is_empty(),
"Expected at least one remove action"
);
assert_eq!(remove_actions.len(), 1);
let remove_action = remove_actions[0];
let remove = &remove_action["remove"];
assert!(remove.get("path").is_some(), "Missing path field");
let path = remove["path"].as_str().expect("path should be a string");
assert_eq!(
path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(remove["dataChange"].as_bool(), Some(true));
let deletion_timestamp = remove["deletionTimestamp"]
.as_i64()
.expect("Missing deletionTimestamp");
assert_eq!(
deletion_timestamp, commit_timestamp,
"deletionTimestamp should match commit timestamp"
);
assert_eq!(remove["extendedFileMetadata"].as_bool(), Some(true));
let partition_vals = remove["partitionValues"]
.as_object()
.expect("Missing partitionValues");
assert_eq!(partition_vals.len(), 0);
let size = remove["size"].as_i64().expect("Missing size");
assert_eq!(size, 635);
let stats = remove["stats"].as_str().expect("Missing stats");
let stats_json: serde_json::Value = serde_json::from_str(stats)?;
assert_eq!(stats_json["numRecords"], 10);
let tags = remove["tags"].as_object().expect("Missing tags");
assert_eq!(
tags.get("INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("MIN_INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("MAX_INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("OPTIMIZE_TARGET_SIZE").and_then(|v| v.as_str()),
Some("268435456")
);
let dv = remove["deletionVector"]
.as_object()
.expect("Missing deletionVector");
assert_eq!(dv.get("storageType").and_then(|v| v.as_str()), Some("u"));
assert_eq!(
dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("vBn[lx{q8@P<9BNH/isA")
);
assert_eq!(dv.get("offset").and_then(|v| v.as_i64()), Some(1));
assert_eq!(dv.get("sizeInBytes").and_then(|v| v.as_i64()), Some(36));
assert_eq!(dv.get("cardinality").and_then(|v| v.as_i64()), Some(2));
assert!(remove.get("baseRowId").is_none());
assert!(remove.get("defaultRowCommitVersion").is_none());
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_update_deletion_vectors_adds_expected_entries(
) -> Result<(), Box<dyn std::error::Error>> {
use std::path::PathBuf;
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let tmp_table_path = tmp_dir.path().join("table-with-dv-small");
let source_path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
copy_directory(&source_path, &tmp_table_path)?;
let table_url = url::Url::from_directory_path(&tmp_table_path).unwrap();
let engine = create_default_engine(&table_url)?;
let snapshot = Snapshot::builder_for(table_url.clone())
.at_version(1)
.build(engine.as_ref())?;
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("UPDATE".to_string())
.with_data_change(true);
let scan = snapshot.clone().scan_builder().build()?;
let all_scan_metadata: Vec<_> = scan
.scan_metadata(engine.as_ref())?
.collect::<Result<Vec<_>, _>>()?;
let scan_files: Vec<_> = all_scan_metadata
.into_iter()
.map(|sm| sm.scan_files)
.collect();
let file_path = "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let mut dv_map = HashMap::new();
let new_dv = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "cd^-aqEH.-t@S}K{vb[*k^".to_string(),
offset: Some(10),
size_in_bytes: 40,
cardinality: 3,
};
dv_map.insert(file_path.to_string(), new_dv);
txn.update_deletion_vectors(dv_map, scan_files.into_iter().map(Ok))?;
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let original_log_path = tmp_table_path.join("_delta_log/00000000000000000001.json");
let original_log_content = std::fs::read_to_string(original_log_path)?;
let original_commits: Vec<_> = Deserializer::from_str(&original_log_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
let file_path = "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let original_add = original_commits
.iter()
.find(|action| {
action
.get("add")
.and_then(|add| add.get("path").and_then(|p| p.as_str()))
== Some(file_path)
})
.expect("Missing original add action in version 1")
.get("add")
.expect("Should have add field");
let original_size = original_add["size"]
.as_i64()
.expect("Original add action should have size");
let original_partition_values = original_add["partitionValues"]
.as_object()
.expect("Original add action should have partitionValues");
let original_tags = original_add.get("tags");
let original_stats = original_add.get("stats");
let commit_path = tmp_table_path.join(format!("_delta_log/{commit_version:020}.json"));
let commit_content = std::fs::read_to_string(commit_path)?;
let parsed_commits: Vec<_> = Deserializer::from_str(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert!(
parsed_commits.len() >= 3,
"Expected at least 3 actions (commitInfo + remove + add), got {}",
parsed_commits.len()
);
let commit_info_action = parsed_commits
.iter()
.find(|action| action.get("commitInfo").is_some())
.expect("Missing commitInfo action");
let commit_info = &commit_info_action["commitInfo"];
let commit_timestamp = commit_info["timestamp"]
.as_i64()
.expect("Missing timestamp in commitInfo");
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
assert_eq!(
remove_actions.len(),
1,
"Expected exactly one remove action"
);
let remove_action = remove_actions[0];
let remove = &remove_action["remove"];
assert_eq!(
remove["path"].as_str(),
Some(file_path),
"Remove path should match"
);
assert_eq!(remove["dataChange"].as_bool(), Some(true));
assert_eq!(
remove["deletionTimestamp"].as_i64(),
Some(commit_timestamp),
"deletionTimestamp should match commit timestamp"
);
let old_dv = remove["deletionVector"]
.as_object()
.expect("Remove action should have deletionVector");
assert_eq!(
old_dv.get("storageType").and_then(|v| v.as_str()),
Some("u"),
"Old DV storage type should be 'u'"
);
assert_eq!(
old_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("vBn[lx{q8@P<9BNH/isA"),
"Old DV path should match original"
);
assert_eq!(
old_dv.get("offset").and_then(|v| v.as_i64()),
Some(1),
"Old DV offset should be 1"
);
assert_eq!(
old_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(36),
"Old DV size should be 36"
);
assert_eq!(
old_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(2),
"Old DV cardinality should be 2"
);
let remove_size = remove["size"]
.as_i64()
.expect("Remove action should have size");
let remove_partition_values = remove["partitionValues"]
.as_object()
.expect("Remove action should have partitionValues");
let remove_tags = remove.get("tags");
let remove_stats = remove.get("stats");
let add_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("add").is_some())
.collect();
assert_eq!(add_actions.len(), 1, "Expected exactly one add action");
let add_action = add_actions[0];
let add = &add_action["add"];
assert_eq!(
add["path"].as_str(),
Some(file_path),
"Add path should match"
);
assert_eq!(add["dataChange"].as_bool(), Some(true));
let new_dv = add["deletionVector"]
.as_object()
.expect("Add action should have deletionVector");
assert_eq!(
new_dv.get("storageType").and_then(|v| v.as_str()),
Some("u"),
"New DV storage type should be 'u'"
);
assert_eq!(
new_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("cd^-aqEH.-t@S}K{vb[*k^"),
"New DV path should match updated value"
);
assert_eq!(
new_dv.get("offset").and_then(|v| v.as_i64()),
Some(10),
"New DV offset should be 10"
);
assert_eq!(
new_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(40),
"New DV size should be 40"
);
assert_eq!(
new_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(3),
"New DV cardinality should be 3"
);
let add_size = add["size"].as_i64().expect("Add action should have size");
let add_partition_values = add["partitionValues"]
.as_object()
.expect("Add action should have partitionValues");
let add_tags = add.get("tags");
let add_stats = add.get("stats");
assert_eq!(
remove_size, add_size,
"File size should be preserved between remove and add"
);
assert_eq!(
remove_partition_values, add_partition_values,
"Partition values should be preserved between remove and add"
);
assert_eq!(
remove_tags, add_tags,
"Tags should be preserved between remove and add"
);
assert_eq!(
remove_stats, add_stats,
"Stats should be preserved between remove and add"
);
assert_eq!(
remove_size, original_size,
"Remove action size should match original file size"
);
assert_eq!(
add_size, original_size,
"Add action size should match original file size"
);
assert_eq!(
remove_partition_values, original_partition_values,
"Remove action partition values should match original"
);
assert_eq!(
add_partition_values, original_partition_values,
"Add action partition values should match original"
);
assert_eq!(
remove_tags, original_tags,
"Remove action tags should match original"
);
assert_eq!(
add_tags, original_tags,
"Add action tags should match original"
);
assert_eq!(
remove_stats, original_stats,
"Remove action stats should match original"
);
assert_eq!(
add_stats, original_stats,
"Add action stats should match original"
);
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_update_deletion_vectors_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
])?);
let file_names = &["file0.parquet", "file1.parquet", "file2.parquet"];
let (store, engine, table_url, file_paths) =
create_dv_table_with_files("test_table", schema, file_names).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("UPDATE".to_string())
.with_data_change(true);
let mut scan_files = get_scan_files(snapshot.clone(), engine.as_ref())?;
let mut dv_map = HashMap::new();
for (idx, file_path) in file_paths.iter().enumerate() {
let descriptor = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: format!("dv_file_{idx}.bin"),
offset: Some(idx as i32 * 10),
size_in_bytes: 40 + idx as i32,
cardinality: idx as i64 + 1,
};
dv_map.insert(file_path.to_string(), descriptor);
}
txn.update_deletion_vectors(dv_map, scan_files.drain(..).map(Ok))?;
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let final_commit_path =
table_url.join(&format!("_delta_log/{commit_version:020}.json"))?;
let commit_content = store
.get(&Path::from_url_path(final_commit_path.path())?)
.await?
.bytes()
.await?;
let parsed_commits: Vec<_> = Deserializer::from_slice(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
let add_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("add").is_some())
.collect();
assert_eq!(
remove_actions.len(),
3,
"Expected 3 remove actions for 3 files"
);
assert_eq!(add_actions.len(), 3, "Expected 3 add actions for 3 files");
for (idx, file_path) in file_paths.iter().enumerate() {
let remove_action = remove_actions
.iter()
.find(|action| action["remove"]["path"].as_str() == Some(file_path.as_str()))
.unwrap_or_else(|| panic!("Should find remove action for {file_path}"));
let add_action = add_actions
.iter()
.find(|action| action["add"]["path"].as_str() == Some(file_path.as_str()))
.unwrap_or_else(|| panic!("Should find add action for {file_path}"));
assert!(
remove_action["remove"]["deletionVector"].is_null(),
"Remove action for newly written file should not have a DV"
);
let add_dv = add_action["add"]["deletionVector"]
.as_object()
.expect("Add action should have deletionVector");
let expected_path = format!("dv_file_{idx}.bin");
assert_eq!(
add_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some(expected_path.as_str()),
"DV path should match for file {file_path}"
);
assert_eq!(
add_dv.get("offset").and_then(|v| v.as_i64()),
Some(idx as i64 * 10),
"DV offset should match for file {file_path}"
);
assert_eq!(
add_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(40 + idx as i64),
"DV size should match for file {file_path}"
);
assert_eq!(
add_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(idx as i64 + 1),
"DV cardinality should match for file {file_path}"
);
}
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_remove_files_verify_files_excluded_from_scan(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 1)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.clone().scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (_, selection_vector) = scan_metadata.scan_files.into_parts();
let initial_file_count = selection_vector.iter().filter(|&x| *x).count();
assert!(initial_file_count > 0);
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?;
let scan2 = snapshot.scan_builder().build()?;
let scan_metadata2 = scan2.scan_metadata(engine.as_ref())?.next().unwrap()?;
let file_remove_count = (scan_metadata2.scan_files.data().len()
- scan_metadata2.scan_files.selection_vector().len())
+ scan_metadata2
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
assert!(file_remove_count > 0);
txn.remove_files(scan_metadata2.scan_files);
let result = txn.commit(engine.as_ref());
match result? {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 2);
let new_snapshot = Snapshot::builder_for(table_url.clone())
.at_version(2)
.build(engine.as_ref())?;
let new_scan = new_snapshot.scan_builder().build()?;
let mut new_file_count = 0;
for new_metadata in new_scan.scan_metadata(engine.as_ref())? {
new_file_count += new_metadata?.scan_files.data().len();
}
assert_eq!(new_file_count, 0);
}
_ => panic!("Transaction did not succeeed."),
}
}
Ok(())
}
#[tokio::test]
async fn test_remove_files_with_modified_selection_vector() -> Result<(), Box<dyn std::error::Error>>
{
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
for i in 1..=5 {
write_data_and_check_result_and_stats(
table_url.clone(),
schema.clone(),
engine.clone(),
i,
)
.await?;
}
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.clone().scan_builder().build()?;
let mut initial_file_count = 0;
for metadata in scan.scan_metadata(engine.as_ref())? {
let metadata = metadata?;
initial_file_count += metadata
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
}
assert!(
initial_file_count >= 3,
"Need at least 3 files for this test, got {initial_file_count}"
);
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("selective remove test")
.with_operation("DELETE".to_string())
.with_data_change(true);
let scan2 = snapshot.clone().scan_builder().build()?;
let scan_metadata2 = scan2.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, mut selection_vector) = scan_metadata2.scan_files.into_parts();
let mut first_batch_removed = 0;
for selected in selection_vector.iter_mut() {
if *selected && first_batch_removed < 1 {
first_batch_removed += 1;
} else {
*selected = false;
}
}
assert_eq!(
first_batch_removed, 1,
"Should remove exactly 1 file in first batch"
);
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
let scan3 = snapshot.clone().scan_builder().build()?;
let scan_metadata3 = scan3.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data2, mut selection_vector2) = scan_metadata3.scan_files.into_parts();
let mut last_selected_idx = None;
for (i, &selected) in selection_vector2.iter().enumerate() {
if selected {
last_selected_idx = Some(i);
}
}
for (i, selected) in selection_vector2.iter_mut().enumerate() {
if Some(i) != last_selected_idx {
*selected = false;
}
}
let second_batch_removed = selection_vector2.iter().filter(|&x| *x).count();
assert_eq!(
second_batch_removed, 1,
"Should remove exactly 1 file in second batch"
);
txn.remove_files(FilteredEngineData::try_new(data2, selection_vector2)?);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 6);
let new_snapshot = Snapshot::builder_for(table_url.clone())
.at_version(6)
.build(engine.as_ref())?;
let new_scan = new_snapshot.scan_builder().build()?;
let mut new_file_count = 0;
for new_metadata in new_scan.scan_metadata(engine.as_ref())? {
let metadata = new_metadata?;
new_file_count += metadata
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
}
let total_removed = first_batch_removed + second_batch_removed;
assert_eq!(total_removed, 2);
assert_eq!(new_file_count, initial_file_count - total_removed);
assert!(new_file_count > 0, "At least one file should remain");
}
_ => panic!("Transaction did not succeed"),
}
}
Ok(())
}
#[rstest::rstest]
#[case(false, false)]
#[case(false, true)]
#[case(true, false)]
#[case(true, true)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_remove_files_after_predicate_scan_includes_stats_parsed(
#[case] use_struct_stats_checkpoint: bool,
#[case] use_predicate: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let tmp_dir = tempdir()?;
let tmp_url = Url::from_directory_path(tmp_dir.path()).unwrap();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], Some(&tmp_url), "test_table").await?
{
let engine = Arc::new(engine);
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 1)
.await?;
let snapshot = if use_struct_stats_checkpoint {
let table_path = table_url.to_file_path().unwrap();
let snapshot_v2 = set_table_properties(
table_path.to_str().unwrap(),
&table_url,
engine.as_ref(),
1,
&[
("delta.checkpoint.writeStatsAsJson", "false"),
("delta.checkpoint.writeStatsAsStruct", "true"),
],
)?;
let mt_engine = create_default_engine_mt_executor(&table_url)?;
snapshot_v2.checkpoint(mt_engine.as_ref(), None)?;
Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?
} else {
Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?
};
let expected_commit_version = if use_struct_stats_checkpoint { 3 } else { 2 };
let mut scan_builder = snapshot.clone().scan_builder().include_all_stats_columns();
if use_predicate {
scan_builder = scan_builder.with_predicate(Arc::new(Pred::gt(
column_expr!("number"),
Expr::literal(0_i32),
)));
}
let scan = scan_builder.build()?;
let mut txn = begin_transaction(snapshot, engine.as_ref())?.with_data_change(true);
for scan_metadata in scan.scan_metadata(engine.as_ref())? {
txn.remove_files(scan_metadata?.scan_files);
}
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
assert_eq!(committed.commit_version(), expected_commit_version);
let remove_actions =
read_actions_from_commit(&table_url, expected_commit_version, "remove")?;
assert!(
!remove_actions.is_empty(),
"expected remove actions in commit"
);
for remove in &remove_actions {
let stats_str = remove["stats"]
.as_str()
.expect("stats field should be a non-null JSON string");
let stats: serde_json::Value = serde_json::from_str(stats_str)?;
assert!(
stats["numRecords"].as_i64().unwrap_or(0) > 0,
"stats.numRecords should be populated, got: {stats}"
);
}
}
Ok(())
}
#[rstest::rstest]
#[case::no_predicate(None, &["usa", "japan"])]
#[case::data_predicate(
Some(Pred::gt(column_expr!("id"), Expr::literal(0_i32))),
&["usa", "japan"]
)]
#[case::partition_predicate(
Some(Pred::eq(column_expr!("country"), Expr::literal("usa".to_string()))),
&["usa"]
)]
#[tokio::test]
async fn test_remove_files_partitioned_with_parsed_columns(
#[case] predicate: Option<Pred>,
#[case] expected_partitions: &[&str],
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let partition_col = "country";
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("country", DataType::STRING),
])?);
let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let tmp_dir = tempdir()?;
let tmp_url = Url::from_directory_path(tmp_dir.path()).unwrap();
for (table_url, engine, _store, _table_name) in setup_test_tables(
table_schema.clone(),
&[partition_col],
Some(&tmp_url),
"test_table",
)
.await?
{
let engine = Arc::new(engine);
let mut txn =
load_and_begin_transaction(table_url.clone(), engine.as_ref())?.with_data_change(true);
let append_data = [[1, 2, 3], [10, 20, 30]].map(|data| -> delta_kernel::DeltaResult<_> {
let data = RecordBatch::try_new(
Arc::new(data_schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(data.to_vec()))],
)?;
Ok(Box::new(ArrowEngineData::new(data)))
});
for (data, partition_val) in append_data.into_iter().zip(["usa", "japan"]) {
let ctx = Arc::new(txn.partitioned_write_context(HashMap::from([(
partition_col.to_string(),
Scalar::String(partition_val.into()),
)]))?);
let add_meta = engine.write_parquet(data?.as_ref(), ctx.as_ref()).await?;
txn.add_files(add_meta);
}
txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut scan_builder = snapshot.clone().scan_builder().include_all_stats_columns();
if let Some(pred) = predicate.clone() {
scan_builder = scan_builder.with_predicate(Arc::new(pred));
}
let scan = scan_builder.build()?;
let mut txn = begin_transaction(snapshot, engine.as_ref())?.with_data_change(true);
for scan_metadata in scan.scan_metadata(engine.as_ref())? {
txn.remove_files(scan_metadata?.scan_files);
}
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
assert_eq!(committed.commit_version(), 2);
let remove_actions = read_actions_from_commit(&table_url, 2, "remove")?;
assert_eq!(
remove_actions.len(),
expected_partitions.len(),
"unexpected remove count; got {}: {remove_actions:?}",
remove_actions.len()
);
let mut actual_partitions: Vec<String> = remove_actions
.iter()
.filter_map(|r| {
r["partitionValues"][partition_col]
.as_str()
.map(String::from)
})
.collect();
actual_partitions.sort();
let mut expected_sorted: Vec<String> =
expected_partitions.iter().map(|s| s.to_string()).collect();
expected_sorted.sort();
assert_eq!(
actual_partitions, expected_sorted,
"partitionValues mismatch across removes; got: {remove_actions:?}"
);
for remove in &remove_actions {
let stats_str = remove["stats"]
.as_str()
.expect("stats field should be a non-null JSON string");
let stats: serde_json::Value = serde_json::from_str(stats_str)?;
assert!(
stats["numRecords"].as_i64().unwrap_or(0) > 0,
"stats.numRecords should be populated, got: {stats}"
);
}
}
Ok(())
}