use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::actions::deletion_vector::{DeletionVectorDescriptor, DeletionVectorStorageType};
use delta_kernel::arrow::array::{
Array, ArrayRef, AsArray, Int32Array, Int64Array, RecordBatch, RecordBatchReader, StringArray,
StructArray,
};
use delta_kernel::arrow::compute::concat_batches;
use delta_kernel::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use delta_kernel::checkpoint::{CheckpointSpec, V2CheckpointConfig};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryFromKernel;
use delta_kernel::engine::default::executor::TaskExecutor;
use delta_kernel::expressions::Scalar;
use delta_kernel::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::transaction::data_layout::DataLayout;
use delta_kernel::transaction::CommitResult;
use delta_kernel::{DeltaResult, Engine, Snapshot};
use itertools::Itertools;
use test_utils::{
begin_transaction, create_add_files_metadata, create_table_and_load_snapshot, insert_data,
load_test_data, read_add_infos, read_scan, test_table_setup_mt, write_batch_to_table,
};
use crate::common::write_utils::{
get_simple_schema, load_existing_single_file_checkpoint_path, resolve_struct_field,
simple_id_batch,
};
fn read_v2_checkpoint_table(test_name: impl AsRef<str>) -> DeltaResult<Vec<RecordBatch>> {
let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap();
let test_path = test_dir.path().join(test_name.as_ref());
let url =
delta_kernel::try_parse_uri(test_path.to_str().expect("table path to string")).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine)?;
Ok(batches)
}
fn test_v2_checkpoint_with_table(
table_name: &str,
mut expected_table: Vec<String>,
) -> DeltaResult<()> {
let batches = read_v2_checkpoint_table(table_name)?;
sort_lines!(expected_table);
assert_batches_sorted_eq!(expected_table, &batches);
Ok(())
}
fn to_string_vec(string_slice_vec: Vec<&str>) -> Vec<String> {
string_slice_vec
.into_iter()
.map(|s| s.to_string())
.collect()
}
fn generate_sidecar_expected_data() -> Vec<String> {
let header = vec![
"+-----+".to_string(),
"| id |".to_string(),
"+-----+".to_string(),
];
let generate_rows = |count: usize| -> Vec<String> {
(0..count)
.map(|id| format!("| {:<max_width$} |", id, max_width = 3))
.collect()
};
[
header,
vec!["| 0 |".to_string(); 3],
generate_rows(30),
generate_rows(100),
generate_rows(100),
generate_rows(1000),
vec!["+-----+".to_string()],
]
.into_iter()
.flatten()
.collect_vec()
}
#[rustfmt::skip]
fn get_simple_id_table() -> Vec<String> {
to_string_vec(vec![
"+----+",
"| id |",
"+----+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"| 9 |",
"+----+",
])
}
#[rustfmt::skip]
fn get_classic_checkpoint_table() -> Vec<String> {
to_string_vec(vec![
"+----+",
"| id |",
"+----+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"| 9 |",
"| 10 |",
"| 11 |",
"| 12 |",
"| 13 |",
"| 14 |",
"| 15 |",
"| 16 |",
"| 17 |",
"| 18 |",
"| 19 |",
"+----+",
])
}
#[rustfmt::skip]
fn get_without_sidecars_table() -> Vec<String> {
to_string_vec(vec![
"+------+",
"| id |",
"+------+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"| 9 |",
"| 2718 |",
"+------+",
])
}
#[test]
fn v2_checkpoints_json_with_sidecars() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-json-with-sidecars",
generate_sidecar_expected_data(),
)
}
#[test]
fn v2_checkpoints_parquet_with_sidecars() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-parquet-with-sidecars",
generate_sidecar_expected_data(),
)
}
#[test]
fn v2_checkpoints_json_without_sidecars() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-json-without-sidecars",
get_without_sidecars_table(),
)
}
#[test]
fn v2_checkpoints_parquet_without_sidecars() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-parquet-without-sidecars",
get_without_sidecars_table(),
)
}
#[test]
fn v2_classic_checkpoint_json() -> DeltaResult<()> {
test_v2_checkpoint_with_table("v2-classic-checkpoint-json", get_classic_checkpoint_table())
}
#[test]
fn v2_classic_checkpoint_parquet() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-classic-checkpoint-parquet",
get_classic_checkpoint_table(),
)
}
#[test]
fn v2_checkpoints_json_with_last_checkpoint() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-json-with-last-checkpoint",
get_simple_id_table(),
)
}
#[test]
fn v2_checkpoints_parquet_with_last_checkpoint() -> DeltaResult<()> {
test_v2_checkpoint_with_table(
"v2-checkpoints-parquet-with-last-checkpoint",
get_simple_id_table(),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_checkpoint_parquet_write() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"value",
DataType::INTEGER,
)])?);
let _ = create_table(&table_path, schema.clone(), "Test/1.0")
.with_table_properties([("delta.feature.v2Checkpoint", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot0 = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let result = insert_data(
snapshot0,
&engine,
vec![Arc::new(Int32Array::from(vec![1]))],
)
.await?;
let CommitResult::CommittedTransaction(committed) = result else {
panic!("Expected CommittedTransaction");
};
let snapshot = committed
.post_commit_snapshot()
.expect("expected post-commit snapshot");
snapshot.checkpoint(engine.as_ref(), None)?;
let snapshot2 = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(snapshot2.version(), 1);
let log_segment = snapshot2.log_segment();
assert!(
!log_segment.listed.checkpoint_parts.is_empty(),
"expected snapshot to use the written checkpoint, but checkpoint_parts is empty"
);
assert_eq!(
log_segment.checkpoint_version,
Some(1),
"expected checkpoint at version 1"
);
assert!(
log_segment.listed.ascending_commit_files.is_empty(),
"expected no commit files after checkpoint, but found: {:?}",
log_segment.listed.ascending_commit_files
);
let scan = snapshot2.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
assert_batches_sorted_eq!(
vec![
"+-------+",
"| value |",
"+-------+",
"| 1 |",
"+-------+",
],
&batches
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_checkpoint_with_sidecars() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = v2_table_with_domain_metadata_and_txn(&table_path, &table_url, &engine).await?;
let version = snapshot.version() as i64;
let checkpoint_spec = CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(2),
});
snapshot.checkpoint(engine.as_ref(), Some(&checkpoint_spec))?;
let info_field = Arc::new(ArrowField::new("name", ArrowDataType::Utf8, true));
let info_array: ArrayRef = Arc::new(StructArray::from(vec![(
info_field,
Arc::new(StringArray::from(vec![Some("quinn")])) as ArrayRef,
)]));
let post_ckpt_snapshot = insert_data(
Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?,
&engine,
vec![Arc::new(Int32Array::from(vec![17])) as ArrayRef, info_array],
)
.await?
.unwrap_post_commit_snapshot();
let post_ckpt_snapshot = begin_transaction(post_ckpt_snapshot, engine.as_ref())?
.with_domain_metadata("app.settings".to_string(), r#"{"version":3}"#.to_string())
.commit(engine.as_ref())?
.unwrap_post_commit_snapshot();
let last_ckpt = read_last_checkpoint(&table_path);
let ckpt_file = load_existing_single_file_checkpoint_path(&table_path, version as _);
let ckpt_file_size = std::fs::metadata(&ckpt_file).unwrap().len() as i64;
let sidecars_dir_for_size = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
let sidecars_total_size: i64 = list_sidecar_parquet_files(&sidecars_dir_for_size)
.iter()
.map(|e| e.metadata().unwrap().len() as i64)
.sum();
assert_eq!(last_ckpt["version"], version);
assert_eq!(last_ckpt["size"].as_i64().unwrap(), 29);
assert_eq!(
last_ckpt["sizeInBytes"].as_i64().unwrap(),
ckpt_file_size + sidecars_total_size
);
assert_eq!(last_ckpt["numOfAddFiles"], 8);
let ckpt_batch = read_parquet_file(&ckpt_file);
let ckpt_schema = ckpt_batch.schema();
for field_name in [
"protocol",
"metaData",
"checkpointMetadata",
"domainMetadata",
"sidecar",
"txn",
] {
assert!(
ckpt_schema.field_with_name(field_name).is_ok(),
"checkpoint should have {field_name}"
);
}
let sidecar_col = get_struct_column_from_record_batch(&ckpt_batch, "sidecar");
let sidecar_rows = valid_row_indices(sidecar_col, ckpt_batch.num_rows());
assert_eq!(
sidecar_rows.len(),
5,
"checkpoint should contain 5 sidecar action rows"
);
assert_sidecar_actions_match_disk(sidecar_col, &sidecar_rows, &table_path, engine.as_ref());
for action in ["add", "remove"] {
let col = get_struct_column_from_record_batch(&ckpt_batch, action);
let rows = valid_row_indices(col, ckpt_batch.num_rows());
assert!(
rows.is_empty(),
"main checkpoint should not contain {action} actions when sidecars are used, \
but found {} rows",
rows.len()
);
}
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
let per_sidecar = assert_sidecars_contain_only_file_actions(&sidecars_dir);
assert_eq!(
per_sidecar,
vec![(0, 8), (2, 0), (2, 0), (2, 0), (2, 0)],
"per-sidecar (adds, removes) distribution"
);
let ckpt_meta_col = get_struct_column_from_record_batch(&ckpt_batch, "checkpointMetadata");
let ckpt_meta_rows = valid_row_indices(ckpt_meta_col, ckpt_batch.num_rows());
assert_eq!(
ckpt_meta_rows.len(),
1,
"should have exactly one checkpointMetadata action"
);
let version_col = ckpt_meta_col
.column_by_name("version")
.expect("checkpointMetadata should have version field");
assert_eq!(
version_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(ckpt_meta_rows[0]),
version,
"checkpointMetadata version should match checkpoint version"
);
let dm_col = get_struct_column_from_record_batch(&ckpt_batch, "domainMetadata");
let dm_rows = valid_row_indices(dm_col, ckpt_batch.num_rows());
assert_eq!(
dm_rows.len(),
3,
"checkpoint should contain exactly 3 domainMetadata actions"
);
let txn_col = get_struct_column_from_record_batch(&ckpt_batch, "txn");
let txn_rows = valid_row_indices(txn_col, ckpt_batch.num_rows());
assert_eq!(
txn_rows.len(),
2,
"checkpoint should contain exactly 2 txn actions after reconciliation"
);
let txn_app_id_col = txn_col
.column_by_name("appId")
.expect("txn should have appId field")
.as_string::<i32>();
let txn_version_col = txn_col
.column_by_name("version")
.expect("txn should have version field")
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>();
let mut txn_pairs: Vec<(String, i64)> = txn_rows
.iter()
.map(|&r| {
(
txn_app_id_col.value(r).to_string(),
txn_version_col.value(r),
)
})
.collect();
txn_pairs.sort();
assert_eq!(
txn_pairs,
vec![("app1".to_string(), 3), ("app2".to_string(), 5)],
"txn actions should reflect reconciliation (app1 kept at latest version)"
);
let fresh_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
assert_eq!(fresh_snapshot.version(), post_ckpt_snapshot.version());
let log_segment = fresh_snapshot.log_segment();
assert!(
!log_segment.listed.checkpoint_parts.is_empty(),
"fresh snapshot should load from checkpoint"
);
assert_eq!(log_segment.checkpoint_version, Some(version as u64));
assert_eq!(
log_segment.listed.ascending_commit_files.len(),
2,
"expected 2 commit files after checkpoint (insert + domain metadata update)"
);
assert_eq!(
fresh_snapshot.get_domain_metadata("app.settings", engine.as_ref())?,
Some(r#"{"version":3}"#.to_string()),
"app.settings domain metadata should reflect the post-checkpoint update"
);
assert_eq!(
fresh_snapshot.get_domain_metadata("app.feature_flags", engine.as_ref())?,
Some(r#"{"dark_mode":true}"#.to_string()),
"app.feature_flags domain metadata should be preserved across checkpoint"
);
assert_eq!(
fresh_snapshot.get_domain_metadata("app.analytics", engine.as_ref())?,
Some(r#"{"tracking":false}"#.to_string()),
"app.analytics domain metadata should be preserved across checkpoint"
);
let scan = fresh_snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
assert_batches_sorted_eq!(
vec![
"+----+---------------+",
"| id | info |",
"+----+---------------+",
"| 10 | {name: judy} |",
"| 11 | {name: karl} |",
"| 12 | {name: lena} |",
"| 13 | {name: mike} |",
"| 14 | {name: nina} |",
"| 15 | {name: omar} |",
"| 16 | {name: pat} |",
"| 17 | {name: quinn} |",
"| 9 | {name: ivan} |",
"+----+---------------+",
],
&batches
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_checkpoint_partition_values_parsed_and_stats(
) -> Result<(), Box<dyn std::error::Error>> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot2 = create_partitioned_stats_table(&table_path, &table_url, &engine).await?;
let checkpoint_spec = CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(1),
});
snapshot2.checkpoint(engine.as_ref(), Some(&checkpoint_spec))?;
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
let per_sidecar = assert_sidecars_contain_only_file_actions(&sidecars_dir);
assert_eq!(
per_sidecar,
vec![(1, 0), (1, 0)],
"per-sidecar (adds, removes) distribution"
);
let sidecar_files = list_sidecar_parquet_files(&sidecars_dir);
let mut all_record_counts = Vec::new();
let mut all_part_values = Vec::new();
let mut all_min_ids = Vec::new();
let mut all_max_ids = Vec::new();
let mut all_stats_json = Vec::new();
for sidecar_entry in &sidecar_files {
let sidecar_batch = read_parquet_file(&sidecar_entry.path());
let add_col = get_struct_column_from_record_batch(&sidecar_batch, "add");
let add_rows = valid_row_indices(add_col, sidecar_batch.num_rows());
if add_rows.is_empty() {
continue;
}
let stats_col = add_col
.column_by_name("stats")
.expect("add should have stats");
for &row in &add_rows {
assert!(stats_col.is_valid(row), "add.stats should be non-null");
all_stats_json.push(stats_col.as_string::<i32>().value(row).to_string());
}
let stats_parsed = get_struct_column_from_struct_array(add_col, "stats_parsed");
let num_records_col = stats_parsed
.column_by_name("numRecords")
.expect("stats_parsed should have numRecords");
for &row in &add_rows {
all_record_counts.push(
num_records_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(row),
);
}
let min_values = get_struct_column_from_struct_array(stats_parsed, "minValues");
let min_id_col = min_values
.column_by_name("id")
.expect("minValues should have id");
for &row in &add_rows {
all_min_ids.push(
min_id_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(row),
);
}
let max_values = get_struct_column_from_struct_array(stats_parsed, "maxValues");
let max_id_col = max_values
.column_by_name("id")
.expect("maxValues should have id");
for &row in &add_rows {
all_max_ids.push(
max_id_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(row),
);
}
let pv_parsed = get_struct_column_from_struct_array(add_col, "partitionValues_parsed");
let part_key_col = pv_parsed
.column_by_name("part_key")
.expect("partitionValues_parsed should have part_key field");
for &row in &add_rows {
all_part_values.push(part_key_col.as_string::<i32>().value(row).to_string());
}
}
all_record_counts.sort();
assert_eq!(
all_record_counts,
vec![2, 3],
"stats_parsed.numRecords should be [2, 3] (one per partition)"
);
all_min_ids.sort();
assert_eq!(
all_min_ids,
vec![1, 3],
"stats_parsed.minValues.id should be [1, 3] (min id per partition)"
);
all_max_ids.sort();
assert_eq!(
all_max_ids,
vec![2, 5],
"stats_parsed.maxValues.id should be [2, 5] (max id per partition)"
);
all_part_values.sort();
assert_eq!(
all_part_values,
vec!["a", "b"],
"partitionValues_parsed.part_key should be ['a', 'b']"
);
let mut parsed_stats: Vec<serde_json::Value> = all_stats_json
.iter()
.map(|s| serde_json::from_str(s).expect("add.stats should be valid JSON"))
.collect();
parsed_stats.sort_by_key(|v| v["numRecords"].as_i64().unwrap());
assert_eq!(
parsed_stats,
vec![
serde_json::json!({
"numRecords": 2,
"minValues": { "id": 1, "name": "alice" },
"maxValues": { "id": 2, "name": "bob" },
"nullCount": { "id": 0, "name": 0 },
"tightBounds": true,
}),
serde_json::json!({
"numRecords": 3,
"minValues": { "id": 3, "name": "charlie" },
"maxValues": { "id": 5, "name": "eve" },
"nullCount": { "id": 0, "name": 0 },
"tightBounds": true,
}),
],
"add.stats JSON should match the expected stats"
);
let snapshot3 = Snapshot::builder_for(table_url).build(engine.as_ref())?;
let scan = snapshot3.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
assert_batches_sorted_eq!(
vec![
"+----+---------+----------+",
"| id | name | part_key |",
"+----+---------+----------+",
"| 1 | alice | a |",
"| 2 | bob | a |",
"| 3 | charlie | b |",
"| 4 | dave | b |",
"| 5 | eve | b |",
"+----+---------+----------+",
],
&batches
);
Ok(())
}
#[rstest::rstest]
#[case::v2_spec_requires_v2checkpoint_feature(
false,
CheckpointSpec::V2(V2CheckpointConfig::NoSidecar),
"v2Checkpoint"
)]
#[case::v1_rejected_on_v2_table(true, CheckpointSpec::V1, "V1")]
#[case::sidecar_hint_zero_rejected(
true,
CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(0),
}),
"greater than 0"
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_spec_rejected(
#[case] enable_v2checkpoint: bool,
#[case] spec: CheckpointSpec,
#[case] err_substring: &str,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"value",
DataType::INTEGER,
)])?);
let mut builder = create_table(&table_path, schema, "Test/1.0");
if enable_v2checkpoint {
builder = builder.with_table_properties([("delta.feature.v2Checkpoint", "supported")]);
}
let _ = builder
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
let result = snapshot.checkpoint(engine.as_ref(), Some(&spec));
let err_msg = result.expect_err("spec should be rejected").to_string();
assert!(
err_msg.contains(err_substring),
"error should mention {err_substring:?}, got: {err_msg}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_sidecar_checkpoint_with_no_file_actions() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"value",
DataType::INTEGER,
)])?);
let _ = create_table(&table_path, schema, "Test/1.0")
.with_table_properties([("delta.feature.v2Checkpoint", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
let version = snapshot.version();
let spec = CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(2),
});
snapshot.checkpoint(engine.as_ref(), Some(&spec))?;
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
assert!(
!sidecars_dir.exists(),
"_sidecars directory should not exist when there are no file actions, found: {}",
sidecars_dir.display()
);
let ckpt_file = load_existing_single_file_checkpoint_path(&table_path, version);
let ckpt_batch = read_parquet_file(&ckpt_file);
let sidecar_col = get_struct_column_from_record_batch(&ckpt_batch, "sidecar");
let sidecar_rows = valid_row_indices(sidecar_col, ckpt_batch.num_rows());
assert!(
sidecar_rows.is_empty(),
"main checkpoint should contain no sidecar action rows, found {}",
sidecar_rows.len()
);
let last_ckpt = read_last_checkpoint(&table_path);
assert_eq!(last_ckpt["numOfAddFiles"], 0);
let ckpt_file_size = std::fs::metadata(&ckpt_file).unwrap().len() as i64;
assert_eq!(
last_ckpt["sizeInBytes"].as_i64().unwrap(),
ckpt_file_size,
"sizeInBytes should equal main checkpoint size when there are no sidecars"
);
Ok(())
}
fn read_parquet_file(path: &std::path::Path) -> RecordBatch {
let bytes = std::fs::read(path).expect("failed to read parquet file");
let bytes = bytes::Bytes::from(bytes);
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
.expect("failed to create parquet reader")
.build()
.expect("failed to build reader");
let batches: Vec<RecordBatch> = reader.map(|b| b.unwrap()).collect();
let schema = batches[0].schema();
concat_batches(&schema, &batches).expect("failed to concat batches")
}
fn read_last_checkpoint(table_path: &str) -> serde_json::Value {
let path = std::path::Path::new(table_path).join("_delta_log/_last_checkpoint");
let content = std::fs::read_to_string(&path).expect("failed to read _last_checkpoint");
serde_json::from_str(&content).expect("failed to parse _last_checkpoint JSON")
}
async fn v2_table_with_domain_metadata_and_txn<E: TaskExecutor>(
table_path: &str,
table_url: &url::Url,
engine: &Arc<delta_kernel::engine::default::DefaultEngine<E>>,
) -> DeltaResult<Arc<Snapshot>> {
fn make_info_array(names: &[&str]) -> ArrayRef {
let name_array: ArrayRef = Arc::new(StringArray::from(
names.iter().map(|s| Some(*s)).collect::<Vec<_>>(),
));
let field = Arc::new(ArrowField::new("name", ArrowDataType::Utf8, true));
Arc::new(StructArray::from(vec![(field, name_array)]))
}
fn make_columns(id: i32, name: &str) -> Vec<ArrayRef> {
vec![
Arc::new(Int32Array::from(vec![id])) as ArrayRef,
make_info_array(&[name]),
]
}
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable(
"info",
DataType::try_struct_type([StructField::nullable("name", DataType::STRING)])?,
),
])?);
let _ = create_table(table_path, schema.clone(), "Test/1.0")
.with_table_properties([
("delta.feature.v2Checkpoint", "supported"),
("delta.feature.domainMetadata", "supported"),
])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let mut snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let names = [
"alice", "bob", "carol", "dave", "eve", "frank", "grace", "heidi",
];
for (i, name) in names.iter().enumerate() {
snapshot = insert_data(snapshot, engine, make_columns(i as i32 + 1, name))
.await?
.unwrap_post_commit_snapshot();
}
snapshot = begin_transaction(snapshot, engine.as_ref())?
.with_domain_metadata("app.settings".to_string(), r#"{"version":1}"#.to_string())
.with_domain_metadata(
"app.feature_flags".to_string(),
r#"{"dark_mode":true}"#.to_string(),
)
.commit(engine.as_ref())?
.unwrap_post_commit_snapshot();
snapshot = begin_transaction(snapshot, engine.as_ref())?
.with_domain_metadata(
"app.analytics".to_string(),
r#"{"tracking":false}"#.to_string(),
)
.with_domain_metadata("app.settings".to_string(), r#"{"version":2}"#.to_string())
.commit(engine.as_ref())?
.unwrap_post_commit_snapshot();
for (app_id, version) in [("app1", 1i64), ("app2", 5), ("app1", 3)] {
snapshot = begin_transaction(snapshot, engine.as_ref())?
.with_transaction_id(app_id.to_string(), version)
.commit(engine.as_ref())?
.unwrap_post_commit_snapshot();
}
let scan = snapshot.clone().scan_builder().build()?;
let mut txn = begin_transaction(snapshot, engine.as_ref())?
.with_operation("DELETE".to_string())
.with_data_change(true);
for sm in scan.scan_metadata(engine.as_ref())? {
txn.remove_files(sm?.scan_files);
}
snapshot = txn.commit(engine.as_ref())?.unwrap_post_commit_snapshot();
let names = [
"ivan", "judy", "karl", "lena", "mike", "nina", "omar", "pat",
];
for (i, name) in names.iter().enumerate() {
snapshot = insert_data(snapshot, engine, make_columns(i as i32 + 9, name))
.await?
.unwrap_post_commit_snapshot();
}
Ok(snapshot)
}
fn get_struct_column_from_record_batch<'a>(batch: &'a RecordBatch, name: &str) -> &'a StructArray {
batch
.column_by_name(name)
.unwrap_or_else(|| panic!("batch should have column '{name}'"))
.as_any()
.downcast_ref::<StructArray>()
.unwrap_or_else(|| panic!("column '{name}' should be a StructArray"))
}
fn valid_row_indices(col: &dyn Array, num_rows: usize) -> Vec<usize> {
(0..num_rows).filter(|&i| col.is_valid(i)).collect()
}
fn get_struct_column_from_struct_array<'a>(parent: &'a StructArray, name: &str) -> &'a StructArray {
parent
.column_by_name(name)
.unwrap_or_else(|| panic!("struct should have field '{name}'"))
.as_any()
.downcast_ref::<StructArray>()
.unwrap_or_else(|| panic!("field '{name}' should be a StructArray"))
}
fn list_sidecar_parquet_files(sidecars_dir: &std::path::Path) -> Vec<std::fs::DirEntry> {
std::fs::read_dir(sidecars_dir)
.expect("failed to list sidecars dir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with(".parquet"))
.collect()
}
fn assert_sidecar_actions_match_disk(
sidecar_col: &StructArray,
sidecar_rows: &[usize],
table_path: &str,
engine: &dyn Engine,
) {
let sidecar_path_col = sidecar_col
.column_by_name("path")
.expect("sidecar should have path field");
let sidecar_size_col = sidecar_col
.column_by_name("sizeInBytes")
.expect("sidecar should have sizeInBytes field");
let sidecar_mtime_col = sidecar_col
.column_by_name("modificationTime")
.expect("sidecar should have modificationTime field");
let sidecar_tags_col = sidecar_col
.column_by_name("tags")
.expect("sidecar should have tags field");
let sidecars_base = delta_kernel::try_parse_uri(table_path)
.unwrap()
.join("_delta_log/_sidecars/")
.unwrap();
for &row in sidecar_rows {
let path = sidecar_path_col.as_string::<i32>().value(row);
assert!(
path.ends_with(".parquet"),
"sidecar path should be a parquet file, got: {path}"
);
let sidecar_url = sidecars_base.join(path).unwrap();
let file_meta = engine
.storage_handler()
.head(&sidecar_url)
.unwrap_or_else(|e| panic!("sidecar file {path} should exist: {e}"));
let recorded_size = sidecar_size_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(row);
assert_eq!(
recorded_size, file_meta.size as i64,
"sidecar sizeInBytes should match actual file size for {path}"
);
let recorded_mtime = sidecar_mtime_col
.as_primitive::<delta_kernel::arrow::datatypes::Int64Type>()
.value(row);
assert_eq!(
recorded_mtime, file_meta.last_modified,
"sidecar modificationTime should match actual file mtime for {path}"
);
assert!(
sidecar_tags_col.is_null(row),
"sidecar tags should be null for {path}"
);
}
}
fn assert_sidecars_contain_only_file_actions(
sidecars_dir: &std::path::Path,
) -> Vec<(usize, usize)> {
let sidecar_files = list_sidecar_parquet_files(sidecars_dir);
assert!(
!sidecar_files.is_empty(),
"should have at least one sidecar parquet file in _delta_log/_sidecars/"
);
let mut per_sidecar_counts: Vec<(usize, usize)> = Vec::new();
for sidecar_entry in &sidecar_files {
let sidecar_batch = read_parquet_file(&sidecar_entry.path());
let sidecar_schema = sidecar_batch.schema();
assert!(
sidecar_schema.field_with_name("add").is_ok()
|| sidecar_schema.field_with_name("remove").is_ok(),
"sidecar file should contain add and/or remove columns"
);
for forbidden in &[
"protocol",
"metaData",
"checkpointMetadata",
"domainMetadata",
"txn",
"sidecar",
] {
if let Ok(field) = sidecar_schema.field_with_name(forbidden) {
let col = sidecar_batch.column_by_name(forbidden).unwrap();
let non_null = valid_row_indices(col, sidecar_batch.num_rows()).len();
assert_eq!(
non_null, 0,
"sidecar should not contain non-null {forbidden} actions, \
but found {non_null} (field type: {field:?})"
);
}
}
let adds = sidecar_batch
.column_by_name("add")
.map(|c| valid_row_indices(c, sidecar_batch.num_rows()).len())
.unwrap_or(0);
let removes = sidecar_batch
.column_by_name("remove")
.map(|c| valid_row_indices(c, sidecar_batch.num_rows()).len())
.unwrap_or(0);
per_sidecar_counts.push((adds, removes));
}
per_sidecar_counts.sort();
per_sidecar_counts
}
async fn create_partitioned_stats_table<E: TaskExecutor>(
table_path: &str,
table_url: &url::Url,
engine: &Arc<delta_kernel::engine::default::DefaultEngine<E>>,
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("name", DataType::STRING),
StructField::nullable("part_key", DataType::STRING),
])?);
let _ = create_table(table_path, schema.clone(), "Test/1.0")
.with_table_properties([
("delta.feature.v2Checkpoint", "supported"),
("delta.checkpoint.writeStatsAsStruct", "true"),
])
.with_data_layout(DataLayout::partitioned(["part_key"]))
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let data_schema = StructType::try_new(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("name", DataType::STRING),
])?;
let arrow_schema = Arc::new(ArrowSchema::try_from_kernel(&data_schema)?);
let batch1 = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
Arc::new(StringArray::from(vec![Some("alice"), Some("bob")])) as ArrayRef,
],
)
.unwrap();
let snapshot0 = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let snapshot1 = write_batch_to_table(
&snapshot0,
engine.as_ref(),
batch1,
HashMap::from([("part_key".to_string(), Scalar::from("a"))]),
)
.await?;
let batch2 = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(Int64Array::from(vec![3, 4, 5])) as ArrayRef,
Arc::new(StringArray::from(vec![
Some("charlie"),
Some("dave"),
Some("eve"),
])) as ArrayRef,
],
)
.unwrap();
let snapshot2 = write_batch_to_table(
&snapshot1,
engine.as_ref(),
batch2,
HashMap::from([("part_key".to_string(), Scalar::from("b"))]),
)
.await?;
Ok(snapshot2)
}
#[rstest::rstest]
#[case::none(None)]
#[case::v2_no_sidecar(Some(CheckpointSpec::V2(V2CheckpointConfig::NoSidecar)))]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_snapshot_checkpoint_default_on_v2_table(
#[case] spec: Option<CheckpointSpec>,
) -> Result<(), Box<dyn std::error::Error>> {
let schema = get_simple_schema();
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let mut snapshot = create_table_and_load_snapshot(
&table_path,
schema.clone(),
engine.as_ref(),
&[("delta.feature.v2Checkpoint", "supported")],
)?;
snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![1, 2]),
HashMap::new(),
)
.await?;
let version = snapshot.version();
snapshot.checkpoint(engine.as_ref(), spec.as_ref())?;
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
assert!(
!sidecars_dir.exists(),
"_sidecars directory should not exist for a no-sidecar V2 checkpoint, found: {}",
sidecars_dir.display()
);
let ckpt_path = load_existing_single_file_checkpoint_path(&table_path, version);
let file = std::fs::File::open(&ckpt_path)?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
let schema = reader.schema();
assert!(
schema.field_with_name("checkpointMetadata").is_ok(),
"V2 checkpoint must contain `checkpointMetadata` column, found schema: {schema:?}"
);
Ok(())
}
#[rstest::rstest]
#[case::clustered(vec![CrossFeature::Clustered])]
#[case::column_mapping_and_partitioned(vec![CrossFeature::ColumnMapping, CrossFeature::Partitioned])]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_sidecar_checkpoint_cross_feature(
#[case] features: Vec<CrossFeature>,
) -> Result<(), Box<dyn std::error::Error>> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = build_v2_table_with_feature(&table_path, &table_url, &engine, &features).await?;
let version = snapshot.version();
let spec = CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(2),
});
snapshot.checkpoint(engine.as_ref(), Some(&spec))?;
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
let per_sidecar = assert_sidecars_contain_only_file_actions(&sidecars_dir);
assert_eq!(
per_sidecar,
vec![(1, 0), (2, 0)],
"per-sidecar (adds, removes) distribution does not match expected"
);
let fresh = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(fresh.version(), version);
let log_segment = fresh.log_segment();
assert!(
!log_segment.listed.checkpoint_parts.is_empty(),
"fresh snapshot should load from the checkpoint"
);
assert_eq!(log_segment.checkpoint_version, Some(version));
assert!(
log_segment.listed.ascending_commit_files.is_empty(),
"no commit files should remain after the checkpoint"
);
let scan = fresh.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
assert_batches_sorted_eq!(
vec![
"+----+-------+--------+",
"| id | value | region |",
"+----+-------+--------+",
"| 1 | a | east |",
"| 2 | b | east |",
"| 3 | c | west |",
"| 4 | d | west |",
"| 5 | e | north |",
"| 6 | f | north |",
"+----+-------+--------+",
],
&batches
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_sidecar_consecutive_checkpoints() -> Result<(), Box<dyn std::error::Error>> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let schema = get_simple_schema();
let snapshot = create_table_and_load_snapshot(
&table_path,
schema.clone(),
engine.as_ref(),
&[("delta.feature.v2Checkpoint", "supported")],
)?;
let spec = CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: Some(1),
});
let snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![1, 2]),
HashMap::new(),
)
.await?;
let snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![3, 4]),
HashMap::new(),
)
.await?;
let v_first = snapshot.version();
snapshot.checkpoint(engine.as_ref(), Some(&spec))?;
assert_eq!(
read_last_checkpoint(&table_path)["version"].as_u64(),
Some(v_first),
"_last_checkpoint should point to the first checkpoint"
);
let after_first_ckpt = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let snapshot = write_batch_to_table(
&after_first_ckpt,
engine.as_ref(),
simple_id_batch(&schema, vec![5, 6]),
HashMap::new(),
)
.await?;
let snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![7, 8]),
HashMap::new(),
)
.await?;
let v_second = snapshot.version();
snapshot.checkpoint(engine.as_ref(), Some(&spec))?;
let last = read_last_checkpoint(&table_path);
assert_eq!(last["version"].as_u64(), Some(v_second));
assert_eq!(last["numOfAddFiles"].as_i64(), Some(4));
let fresh = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(fresh.version(), v_second);
let log_segment = fresh.log_segment();
assert_eq!(log_segment.checkpoint_version, Some(v_second));
assert!(
log_segment.listed.ascending_commit_files.is_empty(),
"no commit files should remain after the second checkpoint"
);
let scan = fresh.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
#[rustfmt::skip]
let expected = vec![
"+----+",
"| id |",
"+----+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"+----+",
];
assert_batches_sorted_eq!(expected, &batches);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_sidecar_preserves_dv_and_row_tracking_on_add(
) -> Result<(), Box<dyn std::error::Error>> {
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let schema = get_simple_schema();
let _ = create_table(&table_path, schema.clone(), "Test/1.0")
.with_table_properties([
("delta.feature.v2Checkpoint", "supported"),
("delta.feature.deletionVectors", "supported"),
("delta.enableRowTracking", "true"),
])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![1, 2, 3]),
HashMap::new(),
)
.await?;
let path = read_add_infos(snapshot.as_ref(), engine.as_ref())?[0]
.path
.clone();
let dv = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "abcdefghijklmnopqrst".to_string(),
offset: Some(7),
size_in_bytes: 42,
cardinality: 1,
};
let scan_files: Vec<_> = snapshot
.clone()
.scan_builder()
.build()?
.scan_metadata(engine.as_ref())?
.map_ok(|sm| sm.scan_files)
.try_collect()?;
let mut txn = begin_transaction(snapshot, engine.as_ref())?.with_data_change(true);
txn.update_deletion_vectors(
HashMap::from([(path, dv.clone())]),
scan_files.into_iter().map(Ok),
)?;
let snapshot = txn.commit(engine.as_ref())?.unwrap_post_commit_snapshot();
snapshot.checkpoint(
engine.as_ref(),
Some(&CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: None,
})),
)?;
let sidecars =
list_sidecar_parquet_files(&std::path::Path::new(&table_path).join("_delta_log/_sidecars"));
assert_eq!(sidecars.len(), 1, "default hint should produce one sidecar");
let batch = read_parquet_file(&sidecars[0].path());
let add = get_struct_column_from_record_batch(&batch, "add");
let live = valid_row_indices(add, batch.num_rows());
assert_eq!(live.len(), 1, "expected one live add in sidecar");
let row = live[0];
let path_to_vec = |path: &[&str]| path.iter().map(|s| s.to_string()).collect::<Vec<_>>();
let read_str = |path| resolve_struct_field::<StringArray>(add, &path_to_vec(path)).value(row);
let read_i32 = |path| resolve_struct_field::<Int32Array>(add, &path_to_vec(path)).value(row);
let read_i64 = |path| resolve_struct_field::<Int64Array>(add, &path_to_vec(path)).value(row);
assert_eq!(read_str(&["deletionVector", "storageType"]), "u");
assert_eq!(
read_str(&["deletionVector", "pathOrInlineDv"]),
dv.path_or_inline_dv
);
assert_eq!(read_i32(&["deletionVector", "offset"]), 7);
assert_eq!(read_i32(&["deletionVector", "sizeInBytes"]), 42);
assert_eq!(read_i64(&["deletionVector", "cardinality"]), 1);
assert_eq!(read_i64(&["baseRowId"]), 0);
assert_eq!(read_i64(&["defaultRowCommitVersion"]), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_sidecar_default_hint_splits_at_50k() -> Result<(), Box<dyn std::error::Error>> {
const PER_COMMIT: usize = 1_000;
const COMMITS: usize = 60;
let (_temp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let _ = create_table(&table_path, get_simple_schema(), "Test/1.0")
.with_table_properties([("delta.feature.v2Checkpoint", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let mut snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
for c in 0..COMMITS {
let mut txn = begin_transaction(snapshot, engine.as_ref())?.with_data_change(true);
let add_files_schema = txn.add_files_schema().clone();
let paths: Vec<String> = (0..PER_COMMIT)
.map(|i| format!("part-{c:03}-{i:04}.parquet"))
.collect();
let files: Vec<(&str, i64, i64, Option<i64>)> = paths
.iter()
.map(|p| (p.as_str(), 100, 0, Some(1)))
.collect();
txn.add_files(create_add_files_metadata(&add_files_schema, files)?);
snapshot = txn.commit(engine.as_ref())?.unwrap_post_commit_snapshot();
}
snapshot.checkpoint(
engine.as_ref(),
Some(&CheckpointSpec::V2(V2CheckpointConfig::WithSidecar {
file_actions_per_sidecar_hint: None,
})),
)?;
let sidecars_dir = std::path::Path::new(&table_path).join("_delta_log/_sidecars");
let per_sidecar = assert_sidecars_contain_only_file_actions(&sidecars_dir);
assert_eq!(
per_sidecar,
vec![(10_000, 0), (50_000, 0)],
"per-sidecar (adds, removes) distribution"
);
assert_eq!(
read_last_checkpoint(&table_path)["numOfAddFiles"].as_i64(),
Some((PER_COMMIT * COMMITS) as i64),
);
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CrossFeature {
ColumnMapping,
Partitioned,
Clustered,
}
fn cross_feature_schema() -> Arc<StructType> {
Arc::new(
StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
StructField::nullable("region", DataType::STRING),
])
.unwrap(),
)
}
async fn build_v2_table_with_feature<E: TaskExecutor>(
table_path: &str,
table_url: &url::Url,
engine: &Arc<delta_kernel::engine::default::DefaultEngine<E>>,
features: &[CrossFeature],
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let schema = cross_feature_schema();
let mut props: Vec<(&str, &str)> = vec![("delta.feature.v2Checkpoint", "supported")];
let mut layout: Option<DataLayout> = None;
for feature in features {
match feature {
CrossFeature::ColumnMapping => {
props.push(("delta.columnMapping.mode", "name"));
}
CrossFeature::Partitioned => {
assert!(
layout.is_none(),
"Partitioned and Clustered are mutually exclusive"
);
layout = Some(DataLayout::partitioned(["region"]));
}
CrossFeature::Clustered => {
assert!(
layout.is_none(),
"Partitioned and Clustered are mutually exclusive"
);
layout = Some(DataLayout::clustered(["id"]));
}
}
}
let mut builder =
create_table(table_path, schema.clone(), "Test/1.0").with_table_properties(props);
if let Some(layout) = layout {
builder = builder.with_data_layout(layout);
}
let _ = builder
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let partitioned = features.contains(&CrossFeature::Partitioned);
let data_arrow_schema = if partitioned {
Arc::new(ArrowSchema::try_from_kernel(&StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
])?)?)
} else {
Arc::new(ArrowSchema::try_from_kernel(schema.as_ref())?)
};
let writes: [(&str, Vec<i32>, Vec<&str>); 3] = [
("east", vec![1, 2], vec!["a", "b"]),
("west", vec![3, 4], vec!["c", "d"]),
("north", vec![5, 6], vec!["e", "f"]),
];
let mut snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
for (region, ids, values) in writes {
let id_arr: ArrayRef = Arc::new(Int32Array::from(ids.clone()));
let val_arr: ArrayRef = Arc::new(StringArray::from(values));
let columns: Vec<ArrayRef> = if partitioned {
vec![id_arr, val_arr]
} else {
let region_arr: ArrayRef = Arc::new(StringArray::from(vec![region; ids.len()]));
vec![id_arr, val_arr, region_arr]
};
let batch = RecordBatch::try_new(data_arrow_schema.clone(), columns)?;
let pv = if partitioned {
HashMap::from([("region".to_string(), Scalar::from(region))])
} else {
HashMap::new()
};
snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, pv).await?;
}
Ok(snapshot)
}