use std::collections::HashMap;
use std::ops::Add;
use std::path::PathBuf;
use std::sync::Arc;
use delta_kernel::actions::deletion_vector_writer::{
KernelDeletionVector, StreamingDeletionVectorWriter,
};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::object_store::ObjectStoreExt as _;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::transaction::CommitResult;
use delta_kernel::{DeltaResult, EngineData, Snapshot};
use tempfile::tempdir;
use test_utils::{
create_add_files_metadata, create_table, engine_store_setup, generate_batch, into_record_batch,
record_batch_to_bytes, IntoArray,
};
use itertools::Itertools;
async fn write_parquet_file(
store: &Arc<dyn delta_kernel::object_store::ObjectStore>,
table_url: &url::Url,
file_suffix: &str,
data: &delta_kernel::arrow::record_batch::RecordBatch,
) -> Result<(String, usize), Box<dyn std::error::Error>> {
use delta_kernel::object_store::path::Path as ObjectStorePath;
let parquet_data = record_batch_to_bytes(data);
let parquet_data_len = parquet_data.len();
let data_file_path = format!("data_file_{file_suffix}.parquet");
let data_url = table_url.join(&data_file_path)?;
let data_object_path = ObjectStorePath::from_url_path(data_url.path())?;
store.put(&data_object_path, parquet_data.into()).await?;
Ok((data_file_path, parquet_data_len))
}
fn count_total_scan_rows(
scan_result_iter: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>>,
) -> DeltaResult<usize> {
scan_result_iter
.map(|result| Ok(result?.len()))
.fold_ok(0, Add::add)
}
#[test_log::test(rstest::rstest)]
#[case::with_dv("./tests/data/table-with-dv-small/", 8)]
#[case::without_dv("./tests/data/table-without-dv-small/", 10)]
fn test_table_scan(
#[case] table_path: &str,
#[case] expected_rows: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from(table_path))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(engine)?;
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, expected_rows);
Ok(())
}
fn get_scan_files(
snapshot: Arc<Snapshot>,
engine: &dyn delta_kernel::Engine,
) -> DeltaResult<Vec<FilteredEngineData>> {
let scan = snapshot.scan_builder().build()?;
let all_scan_metadata: Vec<_> = scan.scan_metadata(engine)?.collect::<Result<Vec<_>, _>>()?;
Ok(all_scan_metadata
.into_iter()
.map(|sm| sm.scan_files)
.collect())
}
fn get_write_context(
table_url: &url::Url,
engine: &dyn delta_kernel::Engine,
) -> Result<delta_kernel::transaction::WriteContext, Box<dyn std::error::Error>> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), engine)?;
Ok(txn.get_write_context())
}
async fn write_deletion_vector_to_store(
store: &Arc<dyn delta_kernel::object_store::ObjectStore>,
write_context: &delta_kernel::transaction::WriteContext,
dv: KernelDeletionVector,
prefix: &str,
) -> Result<
delta_kernel::actions::deletion_vector::DeletionVectorDescriptor,
Box<dyn std::error::Error>,
> {
use delta_kernel::object_store::path::Path as ObjectStorePath;
let dv_path = write_context.new_deletion_vector_path(String::from(prefix));
let dv_absolute_path = dv_path.absolute_path()?;
let dv_object_path = ObjectStorePath::parse(dv_absolute_path.path())?;
let mut dv_buffer = Vec::new();
let mut dv_writer = StreamingDeletionVectorWriter::new(&mut dv_buffer);
let dv_write_result = dv_writer.write_deletion_vector(dv)?;
dv_writer.finalize()?;
store.put(&dv_object_path, dv_buffer.into()).await?;
Ok(dv_write_result.to_descriptor(&dv_path))
}
fn create_dv_update_transaction(
table_url: &url::Url,
engine: &dyn delta_kernel::Engine,
) -> Result<delta_kernel::transaction::Transaction, Box<dyn std::error::Error>> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine)?;
Ok(snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine)?
.with_engine_info("test engine")
.with_operation("DELETE".to_string()))
}
fn verify_sorted_scan_results(
batches: Vec<delta_kernel::arrow::record_batch::RecordBatch>,
expected_ids: Vec<i32>,
expected_values: &[&str],
) -> Result<(), Box<dyn std::error::Error>> {
use delta_kernel::arrow::array::{Array, Int32Array, StringArray};
let mut actual_ids = Vec::new();
let mut actual_values = Vec::new();
for batch in batches {
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let val_col = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..batch.num_rows() {
actual_ids.push(id_col.value(i));
actual_values.push(val_col.value(i).to_string());
}
}
actual_ids.sort();
assert_eq!(
actual_ids, expected_ids,
"IDs should match expected non-deleted rows"
);
actual_values.sort();
let mut expected_values_sorted = expected_values
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
expected_values_sorted.sort();
assert_eq!(
actual_values, expected_values_sorted,
"Values should match expected non-deleted rows"
);
Ok(())
}
#[tokio::test]
async fn test_write_deletion_vectors_end_to_end() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
])?);
let temp_dir = tempdir()?;
let base_url = url::Url::from_directory_path(temp_dir.path()).unwrap();
let (store, engine, table_url) = engine_store_setup("test_table", Some(&base_url));
let engine = Arc::new(engine);
create_table(
store.clone(),
table_url.clone(),
schema.clone(),
&[],
true, vec!["deletionVectors"],
vec!["deletionVectors"],
)
.await?;
let data_batch_1 = generate_batch(vec![
("id", vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_array()),
(
"value",
vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"].into_array(),
),
])?;
let data_batch_2 = generate_batch(vec![
(
"id",
vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19].into_array(),
),
(
"value",
vec!["k", "l", "m", "n", "o", "p", "q", "r", "s", "t"].into_array(),
),
])?;
let (data_file_path_1, parquet_data_len_1) =
write_parquet_file(&store, &table_url, "1", &data_batch_1).await?;
let (data_file_path_2, parquet_data_len_2) =
write_parquet_file(&store, &table_url, "2", &data_batch_2).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("WRITE".to_string());
let add_files_schema = txn.add_files_schema();
let add_metadata = create_add_files_metadata(
add_files_schema,
vec![
(&data_file_path_1, parquet_data_len_1 as i64, 1000000, 10),
(&data_file_path_2, parquet_data_len_2 as i64, 1000000, 10),
],
)?;
txn.add_files(add_metadata);
let commit_result = txn.commit(engine.as_ref())?;
assert!(matches!(
commit_result,
CommitResult::CommittedTransaction(_)
));
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(engine.clone())?;
let total_rows_before = count_total_scan_rows(stream)?;
assert_eq!(total_rows_before, 20, "Should have 20 rows before deletion");
const FILE1_FIRST_DELETE_INDEXES: [u64; 3] = [2, 5, 7];
const FILE1_SECOND_DELETE_INDEX: u64 = 1;
const FILE2_DELETE_INDEXES: [u64; 2] = [2, 5];
let mut dv_file1_first = KernelDeletionVector::new();
dv_file1_first.add_deleted_row_indexes(FILE1_FIRST_DELETE_INDEXES);
let write_context = get_write_context(&table_url, engine.as_ref())?;
let dv_descriptor_1 =
write_deletion_vector_to_store(&store, &write_context, dv_file1_first, "").await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = create_dv_update_transaction(&table_url, engine.as_ref())?;
let scan_files = get_scan_files(snapshot.clone(), engine.as_ref())?;
let mut dv_map = HashMap::new();
dv_map.insert(data_file_path_1.clone(), dv_descriptor_1);
txn.update_deletion_vectors(dv_map, scan_files.into_iter().map(Ok))?;
let commit_result = txn.commit(engine.as_ref())?;
assert!(matches!(
commit_result,
CommitResult::CommittedTransaction(_)
));
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(engine.clone())?;
let total_rows_after_first_delete = count_total_scan_rows(stream)?;
assert_eq!(
total_rows_after_first_delete, 17,
"Should have 17 rows after deleting 3 rows from first file"
);
let mut dv_file1_second = KernelDeletionVector::new();
dv_file1_second.add_deleted_row_indexes(FILE1_FIRST_DELETE_INDEXES); dv_file1_second.add_deleted_row_indexes([FILE1_SECOND_DELETE_INDEX]);
let mut dv_file2 = KernelDeletionVector::new();
dv_file2.add_deleted_row_indexes(FILE2_DELETE_INDEXES);
let dv_descriptor_1_second =
write_deletion_vector_to_store(&store, &write_context, dv_file1_second, "").await?;
let dv_descriptor_2 =
write_deletion_vector_to_store(&store, &write_context, dv_file2, "").await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = create_dv_update_transaction(&table_url, engine.as_ref())?;
let mut dv_map1 = HashMap::new();
dv_map1.insert(data_file_path_1.clone(), dv_descriptor_1_second);
let mut dv_map2 = HashMap::new();
dv_map2.insert(data_file_path_2.clone(), dv_descriptor_2);
txn.update_deletion_vectors(
dv_map1,
get_scan_files(snapshot.clone(), engine.as_ref())?
.into_iter()
.map(Ok),
)?;
txn.update_deletion_vectors(
dv_map2,
get_scan_files(snapshot.clone(), engine.as_ref())?
.into_iter()
.map(Ok),
)?;
let commit_result = txn.commit(engine.as_ref())?;
assert!(matches!(
commit_result,
CommitResult::CommittedTransaction(_)
));
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(engine.clone())?;
let batches: Vec<_> = stream
.map(|result| result.map(into_record_batch))
.collect::<Result<Vec<_>, _>>()?;
let expected_ids = vec![0, 3, 4, 6, 8, 9, 10, 11, 13, 14, 16, 17, 18, 19];
let expected_values = [
"a", "d", "e", "g", "i", "j", "k", "l", "n", "o", "q", "r", "s", "t",
];
verify_sorted_scan_results(batches, expected_ids, &expected_values)?;
Ok(())
}