use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{
Array, ArrayRef, AsArray, Int64Array, RecordBatch, StringArray, StructArray,
};
use delta_kernel::arrow::compute::{concat_batches, sort_to_indices, take};
use delta_kernel::arrow::datatypes::TimestampMicrosecondType;
use delta_kernel::arrow::datatypes::{
DataType as ArrowDataType, Field, Int64Type, Schema as ArrowSchema,
};
use delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::expressions::column_expr;
use delta_kernel::object_store::memory::InMemory;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::ObjectStoreExt as _;
use delta_kernel::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use delta_kernel::DeltaResult;
use delta_kernel::Expression;
use delta_kernel::Snapshot;
use serde_json::json;
use test_utils::{insert_data, read_scan, write_batch_to_table};
use url::Url;
fn new_in_memory_store() -> (Arc<InMemory>, Url) {
(Arc::new(InMemory::new()), Url::parse("memory:///").unwrap())
}
async fn write_commit(store: &Arc<InMemory>, content: &str, version: u64) -> DeltaResult<()> {
let path = Path::from(format!("_delta_log/{version:020}.json"));
store.put(&path, content.to_string().into()).await?;
Ok(())
}
const NON_PARTITIONED_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}"#;
const PARTITIONED_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"created_at","type":"timestamp","nullable":true,"metadata":{}},{"name":"tag","type":"binary","nullable":true,"metadata":{}}]}"#;
fn build_commit(
schema_string: &str,
partition_columns: &[&str],
write_stats_as_json: bool,
write_stats_as_struct: bool,
include_protocol: bool,
) -> String {
let metadata = json!({
"metaData": {
"id": "test-table",
"format": { "provider": "parquet", "options": {} },
"schemaString": schema_string,
"partitionColumns": partition_columns,
"configuration": {
"delta.checkpoint.writeStatsAsJson": write_stats_as_json.to_string(),
"delta.checkpoint.writeStatsAsStruct": write_stats_as_struct.to_string()
},
"createdTime": 1587968585495i64
}
});
if include_protocol {
let protocol = json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": [],
"writerFeatures": []
}
});
format!("{protocol}\n{metadata}")
} else {
metadata.to_string()
}
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_stats_config_with_real_data(
#[values(true, false)] json1: bool,
#[values(true, false)] struct1: bool,
#[values(true, false)] json2: bool,
#[values(true, false)] struct2: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (store, table_root) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build(),
);
write_commit(
&store,
&build_commit(NON_PARTITIONED_SCHEMA, &[], json1, struct1, true),
0,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let result = insert_data(
snapshot,
&engine,
vec![
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
],
)
.await?;
assert!(result.is_committed());
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let result = insert_data(
snapshot,
&engine,
vec![
Arc::new(Int64Array::from(vec![4, 5, 6])) as ArrayRef,
Arc::new(StringArray::from(vec!["Diana", "Eve", "Frank"])),
],
)
.await?;
assert!(result.is_committed());
write_commit(
&store,
&build_commit(NON_PARTITIONED_SCHEMA, &[], json2, struct2, false),
3,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone())?;
let schema = batches[0].schema();
let merged = concat_batches(&schema, &batches)?;
let id_col = merged.column_by_name("id").unwrap();
let sort_indices = sort_to_indices(id_col, None, None)?;
let sorted_columns: Vec<ArrayRef> = merged
.columns()
.iter()
.map(|col| take(col.as_ref(), &sort_indices, None).unwrap())
.collect();
let sorted = RecordBatch::try_new(schema, sorted_columns)?;
assert_eq!(sorted.num_rows(), 6, "All 6 rows should be readable");
let ids: Vec<i64> = sorted
.column_by_name("id")
.unwrap()
.as_primitive::<Int64Type>()
.values()
.iter()
.copied()
.collect();
assert_eq!(ids, vec![1, 2, 3, 4, 5, 6]);
let names: Vec<&str> = (0..6)
.map(|i| {
sorted
.column_by_name("name")
.unwrap()
.as_string::<i32>()
.value(i)
})
.collect();
assert_eq!(
names,
vec!["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"]
);
Ok(())
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_partitioned_with_real_data(
#[values(true, false)] json1: bool,
#[values(true, false)] struct1: bool,
#[values(true, false)] json2: bool,
#[values(true, false)] struct2: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (store, table_root) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build(),
);
write_commit(
&store,
&build_commit(
PARTITIONED_SCHEMA,
&["created_at", "tag"],
json1,
struct1,
true,
),
0,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Int64, true),
Field::new("name", ArrowDataType::Utf8, true),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
Arc::new(StringArray::from(vec!["Alice", "Bob"])),
],
)?;
write_batch_to_table(
&snapshot,
engine.as_ref(),
batch,
HashMap::from([
("created_at".to_string(), "2024-01-15 10:30:00".to_string()),
("tag".to_string(), "hello".to_string()),
]),
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Int64, true),
Field::new("name", ArrowDataType::Utf8, true),
])),
vec![
Arc::new(Int64Array::from(vec![3, 4])) as ArrayRef,
Arc::new(StringArray::from(vec!["Charlie", "Diana"])),
],
)?;
write_batch_to_table(
&snapshot,
engine.as_ref(),
batch,
HashMap::from([
(
"created_at".to_string(),
"2025-03-01 09:15:30.123456".to_string(),
),
("tag".to_string(), "world".to_string()),
]),
)
.await?;
write_commit(
&store,
&build_commit(
PARTITIONED_SCHEMA,
&["created_at", "tag"],
json2,
struct2,
false,
),
3,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
if struct2 {
let checkpoint_path = Path::from("_delta_log/00000000000000000003.checkpoint.parquet");
let checkpoint_bytes = store.get(&checkpoint_path).await?.bytes().await?;
let reader = ParquetRecordBatchReaderBuilder::try_new(checkpoint_bytes)?.build()?;
let ckpt_batches: Vec<RecordBatch> = reader.map(|b| b.unwrap()).collect();
let ckpt_schema = ckpt_batches[0].schema();
let ckpt_merged = concat_batches(&ckpt_schema, &ckpt_batches)?;
let add_col = ckpt_merged
.column_by_name("add")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let pv_parsed = add_col
.column_by_name("partitionValues_parsed")
.expect("checkpoint should have partitionValues_parsed when writeStatsAsStruct=true")
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let add_rows: Vec<usize> = (0..ckpt_merged.num_rows())
.filter(|&i| add_col.is_valid(i))
.collect();
assert_eq!(add_rows.len(), 2, "should have 2 add actions");
let tag_col = pv_parsed
.column_by_name("tag")
.expect("partitionValues_parsed should have tag");
let mut tag_values: Vec<&[u8]> = add_rows
.iter()
.map(|&i| tag_col.as_binary::<i32>().value(i))
.collect();
tag_values.sort();
assert_eq!(tag_values, vec![b"hello", b"world"]);
let created_at_col = pv_parsed
.column_by_name("created_at")
.expect("partitionValues_parsed should have created_at");
let mut ts_values: Vec<i64> = add_rows
.iter()
.map(|&i| {
created_at_col
.as_primitive::<TimestampMicrosecondType>()
.value(i)
})
.collect();
ts_values.sort();
assert_eq!(ts_values, vec![1705314600000000, 1740820530123456]);
}
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone())?;
let schema = batches[0].schema();
let merged = concat_batches(&schema, &batches)?;
let id_col = merged.column_by_name("id").unwrap();
let sort_indices = sort_to_indices(id_col, None, None)?;
let sorted_columns: Vec<ArrayRef> = merged
.columns()
.iter()
.map(|col| take(col.as_ref(), &sort_indices, None).unwrap())
.collect();
let sorted = RecordBatch::try_new(schema, sorted_columns)?;
assert_eq!(sorted.num_rows(), 4, "All 4 rows should be readable");
let ids: Vec<i64> = sorted
.column_by_name("id")
.unwrap()
.as_primitive::<Int64Type>()
.values()
.iter()
.copied()
.collect();
assert_eq!(ids, vec![1, 2, 3, 4]);
let tags = sorted.column_by_name("tag").unwrap();
let tags: Vec<&[u8]> = (0..4).map(|i| tags.as_binary::<i32>().value(i)).collect();
assert_eq!(tags, vec![b"hello", b"hello", b"world", b"world"]);
Ok(())
}
const COLUMN_MAPPING_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{"delta.columnMapping.id":1,"delta.columnMapping.physicalName":"col-id-phys"}},{"name":"name","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":2,"delta.columnMapping.physicalName":"col-name-phys"}},{"name":"category","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":3,"delta.columnMapping.physicalName":"col-category-phys"}}]}"#;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_partition_values_parsed_with_column_mapping(
) -> Result<(), Box<dyn std::error::Error>> {
let (store, table_root) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build(),
);
let protocol = json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": ["columnMapping"],
"writerFeatures": ["columnMapping"]
}
});
let metadata = json!({
"metaData": {
"id": "test-table",
"format": { "provider": "parquet", "options": {} },
"schemaString": COLUMN_MAPPING_SCHEMA,
"partitionColumns": ["category"],
"configuration": {
"delta.checkpoint.writeStatsAsJson": "true",
"delta.checkpoint.writeStatsAsStruct": "true",
"delta.columnMapping.mode": "name",
"delta.columnMapping.maxColumnId": "3"
},
"createdTime": 1587968585495i64
}
});
write_commit(&store, &format!("{protocol}\n{metadata}"), 0).await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("col-id-phys", ArrowDataType::Int64, true),
Field::new("col-name-phys", ArrowDataType::Utf8, true),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
Arc::new(StringArray::from(vec!["Alice", "Bob"])),
],
)?;
write_batch_to_table(
&snapshot,
engine.as_ref(),
batch,
HashMap::from([("category".to_string(), "books".to_string())]),
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
let checkpoint_path = Path::from("_delta_log/00000000000000000001.checkpoint.parquet");
let checkpoint_bytes = store.get(&checkpoint_path).await?.bytes().await?;
let reader = ParquetRecordBatchReaderBuilder::try_new(checkpoint_bytes)?.build()?;
let ckpt_batches: Vec<RecordBatch> = reader.map(|b| b.unwrap()).collect();
let ckpt_schema = ckpt_batches[0].schema();
let ckpt_merged = concat_batches(&ckpt_schema, &ckpt_batches)?;
let add_col = ckpt_merged
.column_by_name("add")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let pv_parsed = add_col
.column_by_name("partitionValues_parsed")
.expect("checkpoint should have partitionValues_parsed")
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert!(
pv_parsed.column_by_name("col-category-phys").is_some(),
"partitionValues_parsed should use physical column name"
);
assert!(
pv_parsed.column_by_name("category").is_none(),
"partitionValues_parsed should not use logical column name"
);
let add_row = (0..ckpt_merged.num_rows())
.find(|&i| add_col.is_valid(i))
.expect("should have an add action");
let category_col = pv_parsed.column_by_name("col-category-phys").unwrap();
let category_value = category_col.as_string::<i32>().value(add_row);
assert_eq!(category_value, "books");
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine.clone())?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
Ok(())
}
const WIDE_SCHEMA: &str = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}}]}"#;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_scan_schema_evolved_table_with_checkpoint_predicate_on_new_column(
) -> Result<(), Box<dyn std::error::Error>> {
let (store, table_root) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build(),
);
write_commit(
&store,
&build_commit(NON_PARTITIONED_SCHEMA, &[], true, true, true),
0,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let result = insert_data(
snapshot,
&engine,
vec![
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
],
)
.await?;
assert!(result.is_committed());
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
snapshot.checkpoint(engine.as_ref())?;
write_commit(
&store,
&build_commit(WIDE_SCHEMA, &[], true, true, false),
2,
)
.await?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let result = insert_data(
snapshot,
&engine,
vec![
Arc::new(Int64Array::from(vec![4, 5, 6])) as ArrayRef,
Arc::new(StringArray::from(vec!["Diana", "Eve", "Frank"])),
Arc::new(Int64Array::from(vec![25, 35, 45])),
],
)
.await?;
assert!(result.is_committed());
let snapshot = Snapshot::builder_for(table_root.clone()).build(engine.as_ref())?;
let predicate = Arc::new(column_expr!("id").gt(Expression::literal(2i64)));
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
let batches = read_scan(&scan, engine.clone())?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total_rows > 0, "should return rows matching id > 2");
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let predicate = Arc::new(column_expr!("age").gt(Expression::literal(30i64)));
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
let batches = read_scan(&scan, engine.clone())?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 6,
"all rows returned when data skipping cannot filter"
);
Ok(())
}