use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::Error as KernelError;
use delta_kernel::{DeltaResult, Engine, Snapshot, Version};
use url::Url;
use uuid::Uuid;
use delta_kernel::actions::deletion_vector::{DeletionVectorDescriptor, DeletionVectorStorageType};
use delta_kernel::arrow::array::{Array, ArrayRef, BinaryArray, Int64Array, StructArray};
use delta_kernel::arrow::array::{Int32Array, StringArray, TimestampMicrosecondArray};
use delta_kernel::arrow::buffer::NullBuffer;
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field};
use delta_kernel::arrow::error::ArrowError;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::{TryFromKernel, TryIntoArrow as _};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use delta_kernel::engine::default::parquet::DefaultParquetHandler;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::object_store::local::LocalFileSystem;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::{DynObjectStore, ObjectStoreExt as _};
use delta_kernel::transaction::create_table::create_table as create_table_txn;
use delta_kernel::transaction::CommitResult;
use tempfile::TempDir;
use test_utils::set_json_value;
use itertools::Itertools;
use serde_json::json;
use serde_json::Deserializer;
use tempfile::tempdir;
use delta_kernel::expressions::ColumnName;
use delta_kernel::parquet::file::reader::{FileReader, SerializedFileReader};
use delta_kernel::schema::{
ColumnMetadataKey, DataType, MetadataValue, SchemaRef, StructField, StructType,
};
use delta_kernel::table_features::{get_any_level_column_physical_name, ColumnMappingMode};
use delta_kernel::FileMeta;
use test_utils::create_default_engine_mt_executor;
use test_utils::{
assert_partition_values, assert_result_error_with_message, assert_schema_has_field,
copy_directory, create_add_files_metadata, create_default_engine, create_table,
create_table_and_load_snapshot, engine_store_setup, nested_batches, nested_schema,
read_actions_from_commit, read_add_infos, remove_all_and_get_remove_actions, resolve_field,
setup_test_tables, test_read, test_table_setup, write_batch_to_table,
};
mod common;
fn get_parquet_field_id(parquet_file: &std::path::Path, physical_path: &[String]) -> Option<i32> {
let file = std::fs::File::open(parquet_file).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let root = reader
.metadata()
.file_metadata()
.schema_descr()
.root_schema()
.clone();
let mut current = &root;
for name in physical_path {
current = current
.get_fields()
.iter()
.find(|f| f.name() == name)
.unwrap_or_else(|| panic!("parquet schema missing field '{name}'"));
}
let info = current.get_basic_info();
info.has_id().then(|| info.id())
}
fn validate_txn_id(commit_info: &serde_json::Value) {
let txn_id = commit_info["txnId"]
.as_str()
.expect("txnId should be present in commitInfo");
Uuid::parse_str(txn_id).expect("txnId should be valid UUID format");
}
fn validate_timestamp(commit_info: &serde_json::Value) {
let timestamp = commit_info["timestamp"]
.as_i64()
.expect("timestamp should be present in commitInfo");
let current_ts: i64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap();
let two_days_ms = Duration::from_secs(2 * 24 * 60 * 60).as_millis() as i64;
assert!(
(timestamp <= current_ts && timestamp > current_ts - two_days_ms),
"commit timestamp should be at most 2 days behind current system time: got {timestamp}, now {current_ts}"
);
}
const ZERO_UUID: &str = "00000000-0000-0000-0000-000000000000";
async fn create_dv_table_with_files(
table_name: &str,
schema: Arc<StructType>,
file_paths: &[&str],
) -> Result<
(
Arc<DynObjectStore>,
Arc<dyn delta_kernel::Engine>,
Url,
Vec<String>,
),
Box<dyn std::error::Error>,
> {
let (store, engine, table_url) = engine_store_setup(table_name, None);
let engine = Arc::new(engine);
create_table(
store.clone(),
table_url.clone(),
schema.clone(),
&[],
true, vec!["deletionVectors"],
vec!["deletionVectors"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("WRITE".to_string())
.with_data_change(true);
let add_files_schema = txn.add_files_schema();
let files: Vec<(&str, i64, i64, i64)> = file_paths
.iter()
.enumerate()
.map(|(i, &path)| {
(
path,
1024 + i as i64 * 100, 1000000 + i as i64, 3, )
})
.collect();
let metadata = create_add_files_metadata(add_files_schema, files)?;
txn.add_files(metadata);
let _ = txn.commit(engine.as_ref())?;
let paths: Vec<String> = file_paths.iter().map(|&s| s.to_string()).collect();
Ok((store, engine, table_url, paths))
}
fn get_scan_files(
snapshot: Arc<Snapshot>,
engine: &dyn delta_kernel::Engine,
) -> DeltaResult<Vec<FilteredEngineData>> {
let scan = snapshot.scan_builder().build()?;
let all_scan_metadata: Vec<_> = scan.scan_metadata(engine)?.collect::<Result<Vec<_>, _>>()?;
Ok(all_scan_metadata
.into_iter()
.map(|sm| sm.scan_files)
.collect())
}
fn get_simple_int_schema() -> Arc<StructType> {
Arc::new(StructType::try_new(vec![StructField::nullable("number", DataType::INTEGER)]).unwrap())
}
fn set_table_properties(
table_path: &str,
table_url: &Url,
engine: &dyn Engine,
current_version: Version,
properties: &[(&str, &str)],
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let v0_path = std::path::Path::new(table_path).join("_delta_log/00000000000000000000.json");
let mut meta: serde_json::Value = std::fs::read_to_string(&v0_path)?
.lines()
.find_map(|line| {
serde_json::from_str::<serde_json::Value>(line)
.ok()
.filter(|v| v.get("metaData").is_some())
})
.expect("version 0 should contain a metaData action");
for &(key, value) in properties {
meta["metaData"]["configuration"][key] = json!(value);
}
let new_commit = std::path::Path::new(table_path)
.join(format!("_delta_log/{:020}.json", current_version + 1));
std::fs::write(&new_commit, serde_json::to_string(&meta)?)?;
Ok(Snapshot::builder_for(table_url.clone()).build(engine)?)
}
fn assert_column_mapping_mode(snapshot: &Snapshot, cm_mode: &str) -> ColumnMappingMode {
let expected = match cm_mode {
"none" => ColumnMappingMode::None,
"name" => ColumnMappingMode::Name,
"id" => ColumnMappingMode::Id,
_ => panic!("unexpected cm_mode: {cm_mode}"),
};
let actual = snapshot
.table_properties()
.column_mapping_mode
.expect("column mapping mode should be set");
assert_eq!(actual, expected);
actual
}
fn resolve_struct_field<'a, T: 'static>(root: &'a StructArray, path: &[String]) -> &'a T {
assert!(!path.is_empty(), "path must be non-empty");
let mut current: &StructArray = root;
for (i, name) in path.iter().enumerate() {
let col = current
.column_by_name(name)
.unwrap_or_else(|| panic!("missing field: {name}"));
if i == path.len() - 1 {
return col
.as_any()
.downcast_ref::<T>()
.expect("leaf array type mismatch");
}
current = col
.as_any()
.downcast_ref::<StructArray>()
.unwrap_or_else(|| panic!("expected StructArray at field: {name}"));
}
unreachable!()
}
fn resolve_json_path<'a>(root: &'a serde_json::Value, path: &[String]) -> &'a serde_json::Value {
path.iter().fold(root, |v, key| &v[key])
}
fn assert_min_max_stats(
stats: &serde_json::Value,
physical_path: &[String],
expected_min: impl Into<serde_json::Value>,
expected_max: impl Into<serde_json::Value>,
) {
assert_eq!(
*resolve_json_path(&stats["minValues"], physical_path),
expected_min.into()
);
assert_eq!(
*resolve_json_path(&stats["maxValues"], physical_path),
expected_max.into()
);
}
#[tokio::test]
async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema, &[], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let committer = Box::new(FileSystemCommitter::new());
let txn = snapshot
.transaction(committer, &engine)?
.with_engine_info("default engine");
let _ = txn.commit(&engine)?;
let commit1 = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commit: serde_json::Value = serde_json::from_slice(&commit1.bytes().await?)?;
validate_txn_id(&parsed_commit["commitInfo"]);
set_json_value(&mut parsed_commit, "commitInfo.timestamp", json!(0))?;
set_json_value(&mut parsed_commit, "commitInfo.txnId", json!(ZERO_UUID))?;
let expected_commit = json!({
"commitInfo": {
"timestamp": 0,
"operation": "UNKNOWN",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
"engineInfo": "default engine",
"txnId": ZERO_UUID,
}
});
assert_eq!(parsed_commit, expected_commit);
}
Ok(())
}
fn check_action_timestamps<'a>(
parsed_commits: impl Iterator<Item = &'a serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error>> {
let now: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis()
.try_into()
.unwrap();
parsed_commits.for_each(|commit| {
if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") {
assert!((now - commit_info_ts.as_i64().unwrap()).abs() < 10_000);
}
if let Some(add_ts) = &commit.pointer("/add/modificationTime") {
assert!((now - add_ts.as_i64().unwrap()).abs() < 10_000);
}
});
Ok(())
}
async fn get_and_check_all_parquet_sizes(store: Arc<DynObjectStore>, path: &str) -> u64 {
use futures::stream::StreamExt;
let files: Vec<_> = store.list(Some(&Path::from(path))).collect().await;
let parquet_files = files
.into_iter()
.filter(|f| match f {
Ok(f) => f.location.extension() == Some("parquet"),
Err(_) => false,
})
.collect::<Vec<_>>();
assert_eq!(parquet_files.len(), 2);
let size = parquet_files.first().unwrap().as_ref().unwrap().size;
assert!(parquet_files
.iter()
.all(|f| f.as_ref().unwrap().size == size));
size
}
async fn write_data_and_check_result_and_stats(
table_url: Url,
schema: SchemaRef,
engine: Arc<DefaultEngine<TokioBackgroundExecutor>>,
expected_since_commit: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let committer = Box::new(FileSystemCommitter::new());
let mut txn = snapshot
.transaction(committer, engine.as_ref())?
.with_data_change(true);
let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> {
let data = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(data.to_vec()))],
)?;
Ok(Box::new(ArrowEngineData::new(data)))
});
let write_context = Arc::new(txn.get_write_context());
let tasks = append_data.into_iter().map(|data| {
let engine = engine.clone();
let write_context = write_context.clone();
tokio::task::spawn(async move {
engine
.write_parquet(
data.as_ref().unwrap(),
write_context.as_ref(),
HashMap::new(),
)
.await
})
});
let add_files_metadata = futures::future::join_all(tasks).await.into_iter().flatten();
for meta in add_files_metadata {
txn.add_files(meta?);
}
match txn.commit(engine.as_ref())? {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), expected_since_commit as Version);
assert_eq!(
committed.post_commit_stats().commits_since_checkpoint,
expected_since_commit
);
assert_eq!(
committed.post_commit_stats().commits_since_log_compaction,
expected_since_commit
);
}
_ => panic!("Commit should have succeeded"),
};
Ok(())
}
#[tokio::test]
async fn test_commit_info_action() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let _ = txn.commit(&engine)?;
let commit = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
validate_txn_id(&parsed_commits[0]["commitInfo"]);
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
let expected_commit = vec![json!({
"commitInfo": {
"timestamp": 0,
"operation": "UNKNOWN",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
"engineInfo": "default engine",
"txnId": ZERO_UUID
}
})];
assert_eq!(parsed_commits, expected_commit);
}
Ok(())
}
#[tokio::test]
async fn test_commit_info_with_engine_commit_info() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema, &[], None, "test_table").await?
{
let arrow_schema = Arc::new(delta_kernel::arrow::datatypes::Schema::new(vec![
Field::new("myApp", ArrowDataType::Utf8, false),
Field::new("myVersion", ArrowDataType::Utf8, false),
Field::new("operation", ArrowDataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(StringArray::from(vec!["spark"])) as ArrayRef,
Arc::new(StringArray::from(vec!["3.5.0"])) as ArrayRef,
Arc::new(StringArray::from(vec!["STALE_OP"])) as ArrayRef,
],
)?;
let engine_schema = Arc::new(StructType::new_unchecked(vec![
StructField::not_null("myApp", DataType::STRING),
StructField::not_null("myVersion", DataType::STRING),
StructField::nullable("operation", DataType::STRING),
]));
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_operation("WRITE".to_string())
.with_commit_info(Box::new(ArrowEngineData::new(batch)), engine_schema);
let _ = txn.commit(&engine)?;
let commit = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
validate_txn_id(&parsed_commits[0]["commitInfo"]);
validate_timestamp(&parsed_commits[0]["commitInfo"]);
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
let expected_commits = vec![json!({
"commitInfo": {
"myApp": "spark",
"myVersion": "3.5.0",
"operation": "WRITE",
"operationParameters": {},
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"txnId": ZERO_UUID,
"timestamp": 0,
}
})];
assert_eq!(parsed_commits, expected_commits);
}
Ok(())
}
#[tokio::test]
async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 1)
.await?;
let commit1 = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
let size =
get_and_check_all_parquet_sizes(store.clone(), format!("/{table_name}/").as_str())
.await;
check_action_timestamps(parsed_commits.iter())?;
validate_txn_id(&parsed_commits[0]["commitInfo"]);
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
set_json_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?;
set_json_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?;
set_json_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?;
let expected_commit = vec![
json!({
"commitInfo": {
"timestamp": 0,
"operation": "UNKNOWN",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
"txnId": ZERO_UUID
}
}),
json!({
"add": {
"path": "first.parquet",
"partitionValues": {},
"size": size,
"modificationTime": 0,
"dataChange": true,
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":1},\"maxValues\":{\"number\":3},\"tightBounds\":true}"
}
}),
json!({
"add": {
"path": "second.parquet",
"partitionValues": {},
"size": size,
"modificationTime": 0,
"dataChange": true,
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":4},\"maxValues\":{\"number\":6},\"tightBounds\":true}"
}
}),
];
assert_eq!(parsed_commits, expected_commit);
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]))],
)?),
&table_url,
engine,
)?;
}
Ok(())
}
#[tokio::test]
async fn test_no_add_actions() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
assert!(txn.commit(&engine)?.is_committed());
let commit1 = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let parsed_actions: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert_eq!(parsed_actions.len(), 1, "Expected only one action");
assert!(parsed_actions[0].get("commitInfo").is_some());
}
Ok(())
}
#[tokio::test]
async fn test_append_twice() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, _, _) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 1)
.await?;
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 2)
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![
1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6,
]))],
)?),
&table_url,
engine,
)?;
}
Ok(())
}
#[tokio::test]
async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let partition_col = "partition";
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("number", DataType::INTEGER),
StructField::nullable("partition", DataType::STRING),
])?);
let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);
for (table_url, engine, store, table_name) in
setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine")
.with_data_change(false);
let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> {
let data = RecordBatch::try_new(
Arc::new(data_schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(data.to_vec()))],
)?;
Ok(Box::new(ArrowEngineData::new(data)))
});
let partition_vals = vec!["a", "b"];
let engine = Arc::new(engine);
let write_context = Arc::new(txn.get_write_context());
let tasks = append_data
.into_iter()
.zip(partition_vals)
.map(|(data, partition_val)| {
let engine = engine.clone();
let write_context = write_context.clone();
tokio::task::spawn(async move {
engine
.write_parquet(
data.as_ref().unwrap(),
write_context.as_ref(),
HashMap::from([(partition_col.to_string(), partition_val.to_string())]),
)
.await
})
});
let add_files_metadata = futures::future::join_all(tasks).await.into_iter().flatten();
for meta in add_files_metadata {
txn.add_files(meta?);
}
assert!(txn.commit(engine.as_ref())?.is_committed());
let commit1 = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
let size =
get_and_check_all_parquet_sizes(store.clone(), format!("/{table_name}/").as_str())
.await;
check_action_timestamps(parsed_commits.iter())?;
validate_txn_id(&parsed_commits[0]["commitInfo"]);
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
set_json_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?;
set_json_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?;
set_json_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?;
let expected_commit = vec![
json!({
"commitInfo": {
"timestamp": 0,
"operation": "UNKNOWN",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
"engineInfo": "default engine",
"txnId": ZERO_UUID
}
}),
json!({
"add": {
"path": "first.parquet",
"partitionValues": {
"partition": "a"
},
"size": size,
"modificationTime": 0,
"dataChange": false,
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":1},\"maxValues\":{\"number\":3},\"tightBounds\":true}"
}
}),
json!({
"add": {
"path": "second.parquet",
"partitionValues": {
"partition": "b"
},
"size": size,
"modificationTime": 0,
"dataChange": false,
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":4},\"maxValues\":{\"number\":6},\"tightBounds\":true}"
}
}),
];
assert_eq!(parsed_commits, expected_commit);
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(table_schema.as_ref().try_into_arrow()?),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])),
Arc::new(StringArray::from(vec!["a", "a", "a", "b", "b", "b"])),
],
)?),
&table_url,
engine,
)?;
}
Ok(())
}
#[tokio::test]
async fn test_append_invalid_schema() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let table_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);
let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"string",
DataType::STRING,
)])?);
for (table_url, engine, _store, _table_name) in
setup_test_tables(table_schema, &[], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let append_data = [["a", "b"], ["c", "d"]].map(|data| -> DeltaResult<_> {
let data = RecordBatch::try_new(
Arc::new(data_schema.as_ref().try_into_arrow()?),
vec![Arc::new(StringArray::from(data.to_vec()))],
)?;
Ok(Box::new(ArrowEngineData::new(data)))
});
let engine = Arc::new(engine);
let write_context = Arc::new(txn.get_write_context());
let tasks = append_data.into_iter().map(|data| {
let engine = engine.clone();
let write_context = write_context.clone();
tokio::task::spawn(async move {
engine
.write_parquet(
data.as_ref().unwrap(),
write_context.as_ref(),
HashMap::new(),
)
.await
})
});
let mut add_files_metadata = futures::future::join_all(tasks).await.into_iter().flatten();
assert!(add_files_metadata.all(|res| match res {
Err(KernelError::Arrow(ArrowError::InvalidArgumentError(_))) => true,
Err(KernelError::Backtraced { source, .. })
if matches!(
&*source,
KernelError::Arrow(ArrowError::InvalidArgumentError(_))
) =>
true,
_ => false,
}));
}
Ok(())
}
#[tokio::test]
async fn test_write_txn_actions() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, store, table_name) in
setup_test_tables(schema, &[], None, "test_table").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
assert!(matches!(
snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_transaction_id("app_id1".to_string(), 0)
.with_transaction_id("app_id1".to_string(), 1)
.commit(&engine),
Err(KernelError::Generic(msg)) if msg == "app_id app_id1 already exists in transaction"
));
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine")
.with_transaction_id("app_id1".to_string(), 1)
.with_transaction_id("app_id2".to_string(), 2);
assert!(txn.commit(&engine)?.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone())
.at_version(1)
.build(&engine)?;
assert_eq!(snapshot.get_app_id_version("app_id1", &engine)?, Some(1));
assert_eq!(snapshot.get_app_id_version("app_id2", &engine)?, Some(2));
assert_eq!(snapshot.get_app_id_version("app_id3", &engine)?, None);
let commit1 = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?;
let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0)).unwrap();
let time_ms: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis()
.try_into()
.unwrap();
let last_updated1 = parsed_commits[1]
.get("txn")
.unwrap()
.get("lastUpdated")
.unwrap();
let last_updated2 = parsed_commits[2]
.get("txn")
.unwrap()
.get("lastUpdated")
.unwrap();
assert_eq!(last_updated1, last_updated2);
let last_updated = parsed_commits[1]
.get_mut("txn")
.unwrap()
.get_mut("lastUpdated")
.unwrap();
assert!((last_updated.as_i64().unwrap() - time_ms).abs() < 10_000);
*last_updated = serde_json::Value::Number(1.into());
let last_updated = parsed_commits[2]
.get_mut("txn")
.unwrap()
.get_mut("lastUpdated")
.unwrap();
assert!((last_updated.as_i64().unwrap() - time_ms).abs() < 10_000);
*last_updated = serde_json::Value::Number(2.into());
validate_txn_id(&parsed_commits[0]["commitInfo"]);
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
let expected_commit = vec![
json!({
"commitInfo": {
"timestamp": 0,
"operation": "UNKNOWN",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
"engineInfo": "default engine",
"txnId": ZERO_UUID
}
}),
json!({
"txn": {
"appId": "app_id1",
"version": 1,
"lastUpdated": 1
}
}),
json!({
"txn": {
"appId": "app_id2",
"version": 2,
"lastUpdated": 2
}
}),
];
assert_eq!(parsed_commits, expected_commit);
}
Ok(())
}
#[tokio::test]
async fn test_append_timestamp_ntz() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"ts_ntz",
DataType::TIMESTAMP_NTZ,
)])?);
let (store, engine, table_location) = engine_store_setup("test_table_timestamp_ntz", None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec!["timestampNtz"],
vec!["timestampNtz"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let timestamp_values = vec![
0i64, 1634567890123456i64, 1634567950654321i64, 1672531200000000i64, 253402300799999999i64, -62135596800000000i64, ];
let data = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(TimestampMicrosecondArray::from(timestamp_values))],
)?;
let engine = Arc::new(engine);
let write_context = Arc::new(txn.get_write_context());
let add_files_metadata = engine
.write_parquet(
&ArrowEngineData::new(data.clone()),
write_context.as_ref(),
HashMap::new(),
)
.await?;
txn.add_files(add_files_metadata);
assert!(txn.commit(engine.as_ref())?.is_committed());
let commit1 = store
.get(&Path::from(
"/test_table_timestamp_ntz/_delta_log/00000000000000000001.json",
))
.await?;
let parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert_eq!(parsed_commits.len(), 2);
assert!(parsed_commits[1].get("add").is_some());
assert!(parsed_commits[1]
.get("add")
.unwrap()
.get("dataChange")
.unwrap()
.as_bool()
.unwrap());
test_read(&ArrowEngineData::new(data), &table_url, engine)?;
Ok(())
}
#[tokio::test]
async fn test_append_variant() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
fn unshredded_variant_schema_flipped() -> DataType {
DataType::variant_type([
StructField::not_null("value", DataType::BINARY),
StructField::not_null("metadata", DataType::BINARY),
])
.unwrap()
}
fn variant_arrow_type_flipped() -> ArrowDataType {
let metadata_field = Field::new("metadata", ArrowDataType::Binary, false);
let value_field = Field::new("value", ArrowDataType::Binary, false);
let fields = vec![value_field, metadata_field];
ArrowDataType::Struct(fields.into())
}
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("v", DataType::unshredded_variant()),
StructField::nullable("i", DataType::INTEGER),
StructField::nullable(
"nested",
StructType::try_new(vec![StructField::nullable(
"nested_v",
unshredded_variant_schema_flipped(),
)])?,
),
])?);
let write_schema = table_schema.clone();
let tmp_test_dir = tempdir()?;
let tmp_test_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
let (store, engine, table_location) =
engine_store_setup("test_table_variant", Some(&tmp_test_dir_url));
let table_url = create_table(
store.clone(),
table_location,
table_schema.clone(),
&[],
true,
vec!["variantType", "variantShredding-preview"],
vec!["variantType", "variantShredding-preview"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_data_change(true);
let metadata_v = vec![
Some(&[0x01, 0x00, 0x00][..]),
None,
Some(&[0x01, 0x01, 0x00, 0x01, 0x61][..]),
];
let value_v = vec![
Some(&[0x0C, 0x01][..]),
None,
Some(&[0x02, 0x01, 0x00, 0x00, 0x01, 0x02][..]),
];
let metadata_v_array = Arc::new(BinaryArray::from(metadata_v)) as ArrayRef;
let value_v_array = Arc::new(BinaryArray::from(value_v)) as ArrayRef;
let metadata_nested_v = vec![
Some(&[0x01, 0x00, 0x00][..]),
None,
Some(&[0x01, 0x01, 0x00, 0x01, 0x62][..]),
];
let value_nested_v = vec![
Some(&[0x0C, 0x02][..]),
None,
Some(&[0x02, 0x01, 0x00, 0x00, 0x01, 0x03][..]),
];
let value_nested_v_array = Arc::new(BinaryArray::from(value_nested_v)) as ArrayRef;
let metadata_nested_v_array = Arc::new(BinaryArray::from(metadata_nested_v)) as ArrayRef;
let variant_arrow = ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap();
let variant_arrow_flipped = variant_arrow_type_flipped();
let i_values = vec![31, 32, 33];
let fields = match variant_arrow {
ArrowDataType::Struct(fields) => Ok(fields),
_ => Err(KernelError::Generic(
"Variant arrow data type is not struct.".to_string(),
)),
}?;
let fields_flipped = match variant_arrow_flipped {
ArrowDataType::Struct(fields) => Ok(fields),
_ => Err(KernelError::Generic(
"Variant arrow data type is not struct.".to_string(),
)),
}?;
let null_bitmap = NullBuffer::from_iter([true, false, true]);
let variant_v_array = StructArray::try_new(
fields.clone(),
vec![metadata_v_array, value_v_array],
Some(null_bitmap.clone()),
)?;
let variant_nested_v_array = Arc::new(StructArray::try_new(
fields_flipped.clone(),
vec![
value_nested_v_array.clone(),
metadata_nested_v_array.clone(),
],
Some(null_bitmap.clone()),
)?);
let data = RecordBatch::try_new(
Arc::new(write_schema.as_ref().try_into_arrow()?),
vec![
Arc::new(variant_v_array.clone()),
Arc::new(Int32Array::from(i_values.clone())),
Arc::new(StructArray::try_new(
vec![Field::new("nested_v", variant_arrow_type_flipped(), true)].into(),
vec![variant_nested_v_array.clone()],
None,
)?),
],
)
.unwrap();
let engine = Arc::new(engine);
let write_context = Arc::new(txn.get_write_context());
let add_files_metadata = (*engine)
.parquet_handler()
.as_any()
.downcast_ref::<DefaultParquetHandler<TokioBackgroundExecutor>>()
.unwrap()
.write_parquet_file(
write_context.target_dir(),
Box::new(ArrowEngineData::new(data.clone())),
HashMap::new(),
Some(write_context.stats_columns()),
)
.await?;
txn.add_files(add_files_metadata);
assert!(txn.commit(engine.as_ref())?.is_committed());
let commit1_url = tmp_test_dir_url
.join("test_table_variant/_delta_log/00000000000000000001.json")
.unwrap();
let commit1 = store
.get(&Path::from_url_path(commit1_url.path()).unwrap())
.await?;
let parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert_eq!(parsed_commits.len(), 2);
assert!(parsed_commits[1].get("add").is_some());
let expected_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("v", DataType::unshredded_variant()),
StructField::nullable("i", DataType::INTEGER),
StructField::nullable(
"nested",
StructType::try_new(vec![StructField::nullable(
"nested_v",
DataType::unshredded_variant(),
)])
.unwrap(),
),
])?);
let variant_nested_v_array_expected = Arc::new(StructArray::try_new(
fields,
vec![metadata_nested_v_array, value_nested_v_array],
Some(null_bitmap),
)?);
let variant_arrow_type: ArrowDataType =
ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap();
let expected_data = RecordBatch::try_new(
Arc::new(expected_schema.as_ref().try_into_arrow()?),
vec![
Arc::new(variant_v_array),
Arc::new(Int32Array::from(i_values)),
Arc::new(StructArray::try_new(
vec![Field::new("nested_v", variant_arrow_type, true)].into(),
vec![variant_nested_v_array_expected],
None,
)?),
],
)
.unwrap();
test_read(&ArrowEngineData::new(expected_data), &table_url, engine)?;
Ok(())
}
#[tokio::test]
async fn test_shredded_variant_read_rejection() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let table_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"v",
DataType::unshredded_variant(),
)])?);
let shredded_write_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"v",
DataType::try_struct_type([
StructField::new("metadata", DataType::BINARY, true),
StructField::new("value", DataType::BINARY, true),
StructField::new("typed_value", DataType::INTEGER, true),
])?,
)])?);
let tmp_test_dir = tempdir()?;
let tmp_test_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
let (store, engine, table_location) =
engine_store_setup("test_table_variant_2", Some(&tmp_test_dir_url));
let table_url = create_table(
store.clone(),
table_location,
table_schema.clone(),
&[],
true,
vec!["variantType", "variantShredding-preview"],
vec!["variantType", "variantShredding-preview"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_data_change(true);
let metadata_v = vec![
Some(&[0x01, 0x00, 0x00][..]),
Some(&[0x01, 0x01, 0x00, 0x01, 0x61][..]),
];
let value_v = vec![
Some(&[0x0C, 0x01][..]),
Some(&[0x02, 0x01, 0x00, 0x00, 0x01, 0x02][..]),
];
let typed_value_v = vec![Some(21), Some(3)];
let metadata_v_array = Arc::new(BinaryArray::from(metadata_v)) as ArrayRef;
let value_v_array = Arc::new(BinaryArray::from(value_v)) as ArrayRef;
let typed_value_v_array = Arc::new(Int32Array::from(typed_value_v)) as ArrayRef;
let variant_arrow = ArrowDataType::Struct(
vec![
Field::new("metadata", ArrowDataType::Binary, true),
Field::new("value", ArrowDataType::Binary, true),
Field::new("typed_value", ArrowDataType::Int32, true),
]
.into(),
);
let fields = match variant_arrow {
ArrowDataType::Struct(fields) => Ok(fields),
_ => Err(KernelError::Generic(
"Variant arrow data type is not struct.".to_string(),
)),
}?;
let variant_v_array = StructArray::try_new(
fields.clone(),
vec![metadata_v_array, value_v_array, typed_value_v_array],
None,
)?;
let data = RecordBatch::try_new(
Arc::new(shredded_write_schema.as_ref().try_into_arrow()?),
vec![
Arc::new(variant_v_array.clone()),
],
)
.unwrap();
let engine = Arc::new(engine);
let write_context = Arc::new(txn.get_write_context());
let add_files_metadata = (*engine)
.parquet_handler()
.as_any()
.downcast_ref::<DefaultParquetHandler<TokioBackgroundExecutor>>()
.unwrap()
.write_parquet_file(
write_context.target_dir(),
Box::new(ArrowEngineData::new(data.clone())),
HashMap::new(),
Some(write_context.stats_columns()),
)
.await?;
txn.add_files(add_files_metadata);
assert!(txn.commit(engine.as_ref())?.is_committed());
let commit1_url = tmp_test_dir_url
.join("test_table_variant_2/_delta_log/00000000000000000001.json")
.unwrap();
let commit1 = store
.get(&Path::from_url_path(commit1_url.path()).unwrap())
.await?;
let parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert_eq!(parsed_commits.len(), 2);
assert!(parsed_commits[1].get("add").is_some());
let res = test_read(&ArrowEngineData::new(data), &table_url, engine);
assert!(matches!(res,
Err(e) if e.to_string().contains("The default engine does not support shredded reads")));
Ok(())
}
#[tokio::test]
async fn test_set_domain_metadata_basic() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_basic";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let _write_context = txn.get_write_context();
let domain1 = "app.config";
let config1 = r#"{"version": 1}"#;
let domain2 = "spark.settings";
let config2 = r#"{"cores": 4}"#;
assert!(txn
.with_domain_metadata(domain1.to_string(), config1.to_string())
.with_domain_metadata(domain2.to_string(), config2.to_string())
.commit(&engine)?
.is_committed());
let commit_data = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?
.bytes()
.await?;
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
.into_iter()
.try_collect()?;
let domain_actions: Vec<_> = actions
.iter()
.filter(|v| v.get("domainMetadata").is_some())
.collect();
for action in &domain_actions {
let domain = action["domainMetadata"]["domain"].as_str().unwrap();
let config = action["domainMetadata"]["configuration"].as_str().unwrap();
assert!(!action["domainMetadata"]["removed"].as_bool().unwrap());
match domain {
d if d == domain1 => assert_eq!(config, config1),
d if d == domain2 => assert_eq!(config, config2),
_ => panic!("Unexpected domain: {domain}"),
}
}
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let domain1_config = final_snapshot.get_domain_metadata(domain1, &engine)?;
assert_eq!(domain1_config, Some(config1.to_string()));
let domain2_config = final_snapshot.get_domain_metadata(domain2, &engine)?;
assert_eq!(domain2_config, Some(config2.to_string()));
Ok(())
}
#[tokio::test]
async fn test_set_domain_metadata_errors() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_errors";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let res = txn
.with_domain_metadata("delta.system".to_string(), "config".to_string())
.commit(&engine);
assert_result_error_with_message(
res,
"Cannot modify domains that start with 'delta.' as those are system controlled",
);
let txn2 = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let res = txn2
.with_domain_metadata("app.config".to_string(), "v1".to_string())
.with_domain_metadata("app.config".to_string(), "v2".to_string())
.commit(&engine);
assert_result_error_with_message(
res,
"Metadata for domain app.config already specified in this transaction",
);
Ok(())
}
#[tokio::test]
async fn test_set_domain_metadata_unsupported_writer_feature(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_unsupported";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec![],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let res = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_domain_metadata("app.config".to_string(), "test_config".to_string())
.commit(&engine);
assert_result_error_with_message(res, "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature");
Ok(())
}
#[tokio::test]
async fn test_remove_domain_metadata_unsupported_writer_feature(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_remove_domain_metadata_unsupported";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec![],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let res = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_domain_metadata_removed("app.config".to_string())
.commit(&engine);
assert_result_error_with_message(res, "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature");
Ok(())
}
#[tokio::test]
async fn test_remove_domain_metadata_non_existent_domain() -> Result<(), Box<dyn std::error::Error>>
{
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_unsupported";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let domain = "app.deprecated";
let _ = txn
.with_domain_metadata_removed(domain.to_string())
.commit(&engine)?;
let commit_data = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?
.bytes()
.await?;
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
.into_iter()
.try_collect()?;
let domain_action = actions.iter().find(|v| v.get("domainMetadata").is_some());
assert!(
domain_action.is_none(),
"No tombstone should be written for non-existent domain"
);
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let config = final_snapshot.get_domain_metadata(domain, &engine)?;
assert_eq!(config, None);
Ok(())
}
#[tokio::test]
async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_unsupported";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let err = txn
.with_domain_metadata("app.config".to_string(), "v1".to_string())
.with_domain_metadata_removed("app.config".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));
let txn2 = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let err = txn2
.with_domain_metadata_removed("test.domain".to_string())
.with_domain_metadata("test.domain".to_string(), "v1".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));
let txn3 = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let err = txn3
.with_domain_metadata_removed("another.domain".to_string())
.with_domain_metadata_removed("another.domain".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));
let txn4 = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let err = txn4
.with_domain_metadata_removed("delta.system".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("Cannot modify domains that start with 'delta.' as those are system controlled"));
Ok(())
}
#[tokio::test]
async fn test_domain_metadata_set_then_remove() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let table_name = "test_domain_metadata_unsupported";
let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;
let domain = "app.config";
let configuration = r#"{"version": 1}"#;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let _ = txn
.with_domain_metadata(domain.to_string(), configuration.to_string())
.commit(&engine)?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let _ = txn
.with_domain_metadata_removed(domain.to_string())
.commit(&engine)?;
let commit_data = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000002.json"
)))
.await?
.bytes()
.await?;
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
.into_iter()
.try_collect()?;
let domain_action = actions
.iter()
.find(|v| v.get("domainMetadata").is_some())
.unwrap();
assert_eq!(domain_action["domainMetadata"]["domain"], domain);
assert_eq!(
domain_action["domainMetadata"]["configuration"],
configuration
);
assert_eq!(domain_action["domainMetadata"]["removed"], true);
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?;
assert_eq!(domain_config, None);
Ok(())
}
async fn get_ict_at_version(
store: Arc<DynObjectStore>,
table_url: &Url,
version: u64,
) -> Result<i64, Box<dyn std::error::Error>> {
let commit_path = table_url.join(&format!("_delta_log/{version:020}.json"))?;
let commit = store.get(&Path::from_url_path(commit_path.path())?).await?;
let commit_content = String::from_utf8(commit.bytes().await?.to_vec())?;
let lines: Vec<_> = commit_content
.lines()
.filter(|line| !line.trim().is_empty())
.collect();
assert!(
!lines.is_empty(),
"Commit log at version {version} should not be empty"
);
let first_action: serde_json::Value = serde_json::from_str(lines[0])?;
let commit_info = first_action
.get("commitInfo")
.expect("First action must be commitInfo when ICT is enabled");
let ict = commit_info
.get("inCommitTimestamp")
.expect("commitInfo must have inCommitTimestamp when ICT is enabled")
.as_i64()
.unwrap();
Ok(ict)
}
async fn generate_and_add_data_file(
txn: &mut delta_kernel::transaction::Transaction,
engine: &DefaultEngine<TokioBackgroundExecutor>,
schema: SchemaRef,
values: Vec<i32>,
) -> Result<(), Box<dyn std::error::Error>> {
let data = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(values))],
)?;
let write_context = Arc::new(txn.get_write_context());
let file_meta = engine
.write_parquet(
&ArrowEngineData::new(data),
write_context.as_ref(),
HashMap::new(),
)
.await?;
txn.add_files(file_meta);
Ok(())
}
#[tokio::test]
async fn test_ict_commit_e2e() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let tmp_dir = TempDir::new()?;
let tmp_test_dir_url = Url::from_file_path(&tmp_dir).unwrap();
let (store, engine, table_location) =
engine_store_setup("test_ict_first_commit", Some(&tmp_test_dir_url));
let table_url = test_utils::create_table(
store.clone(),
table_location,
schema.clone(),
&[], true, vec![], vec!["inCommitTimestamp"], )
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
assert_eq!(
snapshot.version(),
0,
"Initial snapshot should be version 0"
);
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("ict test");
generate_and_add_data_file(&mut txn, &engine, schema.clone(), vec![1, 2, 3]).await?;
let commit_result = txn.commit(&engine)?;
match commit_result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(
committed.commit_version(),
1,
"First commit should result in version 1"
);
}
CommitResult::ConflictedTransaction(conflicted) => {
panic!(
"First commit should not conflict, got conflict at version {}",
conflicted.conflict_version()
);
}
CommitResult::RetryableTransaction(_) => {
panic!("First commit should not be retryable error");
}
}
let first_ict = get_ict_at_version(store.clone(), &table_url, 1).await?;
assert!(
first_ict > 1612345678,
"First commit ICT ({first_ict}) should be greater than enablement timestamp (1612345678)"
);
let snapshot2 = Snapshot::builder_for(table_url.clone()).build(&engine)?;
assert_eq!(
snapshot2.version(),
1,
"Second snapshot should be version 1"
);
let mut txn2 = snapshot2
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("ict test 2");
generate_and_add_data_file(&mut txn2, &engine, schema, vec![4, 5, 6]).await?;
let commit_result2 = txn2.commit(&engine)?;
match commit_result2 {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(
committed.commit_version(),
2,
"Second commit should result in version 2"
);
}
CommitResult::ConflictedTransaction(conflicted) => {
panic!(
"Second commit should not conflict, got conflict at version {}",
conflicted.conflict_version()
);
}
CommitResult::RetryableTransaction(_) => {
panic!("Second commit should not be retryable error");
}
}
let second_ict = get_ict_at_version(store, &table_url, 2).await?;
assert!(
second_ict > first_ict,
"Second ICT ({second_ict}) should be greater than first ICT ({first_ict})"
);
Ok(())
}
#[tokio::test]
async fn test_remove_files_adds_expected_entries() -> Result<(), Box<dyn std::error::Error>> {
use std::path::PathBuf;
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let tmp_table_path = tmp_dir.path().join("table-with-dv-small");
let source_path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
copy_directory(&source_path, &tmp_table_path)?;
let table_url = url::Url::from_directory_path(&tmp_table_path).unwrap();
let engine = create_default_engine(&table_url)?;
let snapshot = Snapshot::builder_for(table_url.clone())
.at_version(1)
.build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test engine")
.with_data_change(true);
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
let remove_metadata = FilteredEngineData::try_new(data, selection_vector)?;
txn.remove_files(remove_metadata);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let commit_path = tmp_table_path.join(format!("_delta_log/{commit_version:020}.json"));
let commit_content = std::fs::read_to_string(commit_path)?;
let parsed_commits: Vec<_> = Deserializer::from_str(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert!(
parsed_commits.len() >= 2,
"Expected at least 2 actions (commitInfo + remove)"
);
let commit_info_action = parsed_commits
.iter()
.find(|action| action.get("commitInfo").is_some())
.expect("Missing commitInfo action");
let commit_info = &commit_info_action["commitInfo"];
let commit_timestamp = commit_info["timestamp"]
.as_i64()
.expect("Missing timestamp in commitInfo");
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
assert!(
!remove_actions.is_empty(),
"Expected at least one remove action"
);
assert_eq!(remove_actions.len(), 1);
let remove_action = remove_actions[0];
let remove = &remove_action["remove"];
assert!(remove.get("path").is_some(), "Missing path field");
let path = remove["path"].as_str().expect("path should be a string");
assert_eq!(
path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(remove["dataChange"].as_bool(), Some(true));
let deletion_timestamp = remove["deletionTimestamp"]
.as_i64()
.expect("Missing deletionTimestamp");
assert_eq!(
deletion_timestamp, commit_timestamp,
"deletionTimestamp should match commit timestamp"
);
assert_eq!(remove["extendedFileMetadata"].as_bool(), Some(true));
let partition_vals = remove["partitionValues"]
.as_object()
.expect("Missing partitionValues");
assert_eq!(partition_vals.len(), 0);
let size = remove["size"].as_i64().expect("Missing size");
assert_eq!(size, 635);
let stats = remove["stats"].as_str().expect("Missing stats");
let stats_json: serde_json::Value = serde_json::from_str(stats)?;
assert_eq!(stats_json["numRecords"], 10);
let tags = remove["tags"].as_object().expect("Missing tags");
assert_eq!(
tags.get("INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("MIN_INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("MAX_INSERTION_TIME").and_then(|v| v.as_str()),
Some("1677811178336000")
);
assert_eq!(
tags.get("OPTIMIZE_TARGET_SIZE").and_then(|v| v.as_str()),
Some("268435456")
);
let dv = remove["deletionVector"]
.as_object()
.expect("Missing deletionVector");
assert_eq!(dv.get("storageType").and_then(|v| v.as_str()), Some("u"));
assert_eq!(
dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("vBn[lx{q8@P<9BNH/isA")
);
assert_eq!(dv.get("offset").and_then(|v| v.as_i64()), Some(1));
assert_eq!(dv.get("sizeInBytes").and_then(|v| v.as_i64()), Some(36));
assert_eq!(dv.get("cardinality").and_then(|v| v.as_i64()), Some(2));
assert!(remove.get("baseRowId").is_none());
assert!(remove.get("defaultRowCommitVersion").is_none());
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_update_deletion_vectors_adds_expected_entries(
) -> Result<(), Box<dyn std::error::Error>> {
use std::path::PathBuf;
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let tmp_table_path = tmp_dir.path().join("table-with-dv-small");
let source_path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
copy_directory(&source_path, &tmp_table_path)?;
let table_url = url::Url::from_directory_path(&tmp_table_path).unwrap();
let engine = create_default_engine(&table_url)?;
let snapshot = Snapshot::builder_for(table_url.clone())
.at_version(1)
.build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("UPDATE".to_string())
.with_data_change(true);
let scan = snapshot.clone().scan_builder().build()?;
let all_scan_metadata: Vec<_> = scan
.scan_metadata(engine.as_ref())?
.collect::<Result<Vec<_>, _>>()?;
let scan_files: Vec<_> = all_scan_metadata
.into_iter()
.map(|sm| sm.scan_files)
.collect();
let file_path = "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let mut dv_map = HashMap::new();
let new_dv = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "cd^-aqEH.-t@S}K{vb[*k^".to_string(),
offset: Some(10),
size_in_bytes: 40,
cardinality: 3,
};
dv_map.insert(file_path.to_string(), new_dv);
txn.update_deletion_vectors(dv_map, scan_files.into_iter().map(Ok))?;
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let original_log_path = tmp_table_path.join("_delta_log/00000000000000000001.json");
let original_log_content = std::fs::read_to_string(original_log_path)?;
let original_commits: Vec<_> = Deserializer::from_str(&original_log_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
let file_path = "part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let original_add = original_commits
.iter()
.find(|action| {
action
.get("add")
.and_then(|add| add.get("path").and_then(|p| p.as_str()))
== Some(file_path)
})
.expect("Missing original add action in version 1")
.get("add")
.expect("Should have add field");
let original_size = original_add["size"]
.as_i64()
.expect("Original add action should have size");
let original_partition_values = original_add["partitionValues"]
.as_object()
.expect("Original add action should have partitionValues");
let original_tags = original_add.get("tags");
let original_stats = original_add.get("stats");
let commit_path = tmp_table_path.join(format!("_delta_log/{commit_version:020}.json"));
let commit_content = std::fs::read_to_string(commit_path)?;
let parsed_commits: Vec<_> = Deserializer::from_str(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
assert!(
parsed_commits.len() >= 3,
"Expected at least 3 actions (commitInfo + remove + add), got {}",
parsed_commits.len()
);
let commit_info_action = parsed_commits
.iter()
.find(|action| action.get("commitInfo").is_some())
.expect("Missing commitInfo action");
let commit_info = &commit_info_action["commitInfo"];
let commit_timestamp = commit_info["timestamp"]
.as_i64()
.expect("Missing timestamp in commitInfo");
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
assert_eq!(
remove_actions.len(),
1,
"Expected exactly one remove action"
);
let remove_action = remove_actions[0];
let remove = &remove_action["remove"];
assert_eq!(
remove["path"].as_str(),
Some(file_path),
"Remove path should match"
);
assert_eq!(remove["dataChange"].as_bool(), Some(true));
assert_eq!(
remove["deletionTimestamp"].as_i64(),
Some(commit_timestamp),
"deletionTimestamp should match commit timestamp"
);
let old_dv = remove["deletionVector"]
.as_object()
.expect("Remove action should have deletionVector");
assert_eq!(
old_dv.get("storageType").and_then(|v| v.as_str()),
Some("u"),
"Old DV storage type should be 'u'"
);
assert_eq!(
old_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("vBn[lx{q8@P<9BNH/isA"),
"Old DV path should match original"
);
assert_eq!(
old_dv.get("offset").and_then(|v| v.as_i64()),
Some(1),
"Old DV offset should be 1"
);
assert_eq!(
old_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(36),
"Old DV size should be 36"
);
assert_eq!(
old_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(2),
"Old DV cardinality should be 2"
);
let remove_size = remove["size"]
.as_i64()
.expect("Remove action should have size");
let remove_partition_values = remove["partitionValues"]
.as_object()
.expect("Remove action should have partitionValues");
let remove_tags = remove.get("tags");
let remove_stats = remove.get("stats");
let add_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("add").is_some())
.collect();
assert_eq!(add_actions.len(), 1, "Expected exactly one add action");
let add_action = add_actions[0];
let add = &add_action["add"];
assert_eq!(
add["path"].as_str(),
Some(file_path),
"Add path should match"
);
assert_eq!(add["dataChange"].as_bool(), Some(true));
let new_dv = add["deletionVector"]
.as_object()
.expect("Add action should have deletionVector");
assert_eq!(
new_dv.get("storageType").and_then(|v| v.as_str()),
Some("u"),
"New DV storage type should be 'u'"
);
assert_eq!(
new_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some("cd^-aqEH.-t@S}K{vb[*k^"),
"New DV path should match updated value"
);
assert_eq!(
new_dv.get("offset").and_then(|v| v.as_i64()),
Some(10),
"New DV offset should be 10"
);
assert_eq!(
new_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(40),
"New DV size should be 40"
);
assert_eq!(
new_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(3),
"New DV cardinality should be 3"
);
let add_size = add["size"].as_i64().expect("Add action should have size");
let add_partition_values = add["partitionValues"]
.as_object()
.expect("Add action should have partitionValues");
let add_tags = add.get("tags");
let add_stats = add.get("stats");
assert_eq!(
remove_size, add_size,
"File size should be preserved between remove and add"
);
assert_eq!(
remove_partition_values, add_partition_values,
"Partition values should be preserved between remove and add"
);
assert_eq!(
remove_tags, add_tags,
"Tags should be preserved between remove and add"
);
assert_eq!(
remove_stats, add_stats,
"Stats should be preserved between remove and add"
);
assert_eq!(
remove_size, original_size,
"Remove action size should match original file size"
);
assert_eq!(
add_size, original_size,
"Add action size should match original file size"
);
assert_eq!(
remove_partition_values, original_partition_values,
"Remove action partition values should match original"
);
assert_eq!(
add_partition_values, original_partition_values,
"Add action partition values should match original"
);
assert_eq!(
remove_tags, original_tags,
"Remove action tags should match original"
);
assert_eq!(
add_tags, original_tags,
"Add action tags should match original"
);
assert_eq!(
remove_stats, original_stats,
"Remove action stats should match original"
);
assert_eq!(
add_stats, original_stats,
"Add action stats should match original"
);
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_update_deletion_vectors_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("value", DataType::STRING),
])?);
let file_names = &["file0.parquet", "file1.parquet", "file2.parquet"];
let (store, engine, table_url, file_paths) =
create_dv_table_with_files("test_table", schema, file_names).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("UPDATE".to_string())
.with_data_change(true);
let mut scan_files = get_scan_files(snapshot.clone(), engine.as_ref())?;
let mut dv_map = HashMap::new();
for (idx, file_path) in file_paths.iter().enumerate() {
let descriptor = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: format!("dv_file_{idx}.bin"),
offset: Some(idx as i32 * 10),
size_in_bytes: 40 + idx as i32,
cardinality: idx as i64 + 1,
};
dv_map.insert(file_path.to_string(), descriptor);
}
txn.update_deletion_vectors(dv_map, scan_files.drain(..).map(Ok))?;
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
let commit_version = committed.commit_version();
let final_commit_path =
table_url.join(&format!("_delta_log/{commit_version:020}.json"))?;
let commit_content = store
.get(&Path::from_url_path(final_commit_path.path())?)
.await?
.bytes()
.await?;
let parsed_commits: Vec<_> = Deserializer::from_slice(&commit_content)
.into_iter::<serde_json::Value>()
.try_collect()?;
let remove_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("remove").is_some())
.collect();
let add_actions: Vec<_> = parsed_commits
.iter()
.filter(|action| action.get("add").is_some())
.collect();
assert_eq!(
remove_actions.len(),
3,
"Expected 3 remove actions for 3 files"
);
assert_eq!(add_actions.len(), 3, "Expected 3 add actions for 3 files");
for (idx, file_path) in file_paths.iter().enumerate() {
let remove_action = remove_actions
.iter()
.find(|action| action["remove"]["path"].as_str() == Some(file_path.as_str()))
.unwrap_or_else(|| panic!("Should find remove action for {file_path}"));
let add_action = add_actions
.iter()
.find(|action| action["add"]["path"].as_str() == Some(file_path.as_str()))
.unwrap_or_else(|| panic!("Should find add action for {file_path}"));
assert!(
remove_action["remove"]["deletionVector"].is_null(),
"Remove action for newly written file should not have a DV"
);
let add_dv = add_action["add"]["deletionVector"]
.as_object()
.expect("Add action should have deletionVector");
let expected_path = format!("dv_file_{idx}.bin");
assert_eq!(
add_dv.get("pathOrInlineDv").and_then(|v| v.as_str()),
Some(expected_path.as_str()),
"DV path should match for file {file_path}"
);
assert_eq!(
add_dv.get("offset").and_then(|v| v.as_i64()),
Some(idx as i64 * 10),
"DV offset should match for file {file_path}"
);
assert_eq!(
add_dv.get("sizeInBytes").and_then(|v| v.as_i64()),
Some(40 + idx as i64),
"DV size should match for file {file_path}"
);
assert_eq!(
add_dv.get("cardinality").and_then(|v| v.as_i64()),
Some(idx as i64 + 1),
"DV cardinality should match for file {file_path}"
);
}
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_remove_files_verify_files_excluded_from_scan(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
write_data_and_check_result_and_stats(table_url.clone(), schema.clone(), engine.clone(), 1)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.clone().scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (_, selection_vector) = scan_metadata.scan_files.into_parts();
let initial_file_count = selection_vector.iter().filter(|&x| *x).count();
assert!(initial_file_count > 0);
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?;
let scan2 = snapshot.scan_builder().build()?;
let scan_metadata2 = scan2.scan_metadata(engine.as_ref())?.next().unwrap()?;
let file_remove_count = (scan_metadata2.scan_files.data().len()
- scan_metadata2.scan_files.selection_vector().len())
+ scan_metadata2
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
assert!(file_remove_count > 0);
txn.remove_files(scan_metadata2.scan_files);
let result = txn.commit(engine.as_ref());
match result? {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 2);
let new_snapshot = Snapshot::builder_for(table_url.clone())
.at_version(2)
.build(engine.as_ref())?;
let new_scan = new_snapshot.scan_builder().build()?;
let mut new_file_count = 0;
for new_metadata in new_scan.scan_metadata(engine.as_ref())? {
new_file_count += new_metadata?.scan_files.data().len();
}
assert_eq!(new_file_count, 0);
}
_ => panic!("Transaction did not succeeed."),
}
}
Ok(())
}
#[tokio::test]
async fn test_remove_files_with_modified_selection_vector() -> Result<(), Box<dyn std::error::Error>>
{
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], None, "test_table").await?
{
let engine = Arc::new(engine);
for i in 1..=5 {
write_data_and_check_result_and_stats(
table_url.clone(),
schema.clone(),
engine.clone(),
i,
)
.await?;
}
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = snapshot.clone().scan_builder().build()?;
let mut initial_file_count = 0;
for metadata in scan.scan_metadata(engine.as_ref())? {
let metadata = metadata?;
initial_file_count += metadata
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
}
assert!(
initial_file_count >= 3,
"Need at least 3 files for this test, got {initial_file_count}"
);
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("selective remove test")
.with_operation("DELETE".to_string())
.with_data_change(true);
let scan2 = snapshot.clone().scan_builder().build()?;
let scan_metadata2 = scan2.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, mut selection_vector) = scan_metadata2.scan_files.into_parts();
let mut first_batch_removed = 0;
for selected in selection_vector.iter_mut() {
if *selected && first_batch_removed < 1 {
first_batch_removed += 1;
} else {
*selected = false;
}
}
assert_eq!(
first_batch_removed, 1,
"Should remove exactly 1 file in first batch"
);
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
let scan3 = snapshot.clone().scan_builder().build()?;
let scan_metadata3 = scan3.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data2, mut selection_vector2) = scan_metadata3.scan_files.into_parts();
let mut last_selected_idx = None;
for (i, &selected) in selection_vector2.iter().enumerate() {
if selected {
last_selected_idx = Some(i);
}
}
for (i, selected) in selection_vector2.iter_mut().enumerate() {
if Some(i) != last_selected_idx {
*selected = false;
}
}
let second_batch_removed = selection_vector2.iter().filter(|&x| *x).count();
assert_eq!(
second_batch_removed, 1,
"Should remove exactly 1 file in second batch"
);
txn.remove_files(FilteredEngineData::try_new(data2, selection_vector2)?);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 6);
let new_snapshot = Snapshot::builder_for(table_url.clone())
.at_version(6)
.build(engine.as_ref())?;
let new_scan = new_snapshot.scan_builder().build()?;
let mut new_file_count = 0;
for new_metadata in new_scan.scan_metadata(engine.as_ref())? {
let metadata = new_metadata?;
new_file_count += metadata
.scan_files
.selection_vector()
.iter()
.filter(|&x| *x)
.count();
}
let total_removed = first_batch_removed + second_batch_removed;
assert_eq!(total_removed, 2);
assert_eq!(new_file_count, initial_file_count - total_removed);
assert!(new_file_count > 0, "At least one file should remain");
}
_ => panic!("Transaction did not succeed"),
}
}
Ok(())
}
async fn create_cdf_table(
table_name: &str,
schema: SchemaRef,
) -> Result<(Url, Arc<DefaultEngine<TokioBackgroundExecutor>>, TempDir), Box<dyn std::error::Error>>
{
let tmp_dir = tempdir()?;
let tmp_test_dir_url = Url::from_directory_path(tmp_dir.path()).unwrap();
let (store, engine, table_location) = engine_store_setup(table_name, Some(&tmp_test_dir_url));
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true, vec![],
vec!["changeDataFeed"],
)
.await?;
Ok((table_url, Arc::new(engine), tmp_dir))
}
async fn write_data_to_table(
table_url: &Url,
engine: &Arc<DefaultEngine<TokioBackgroundExecutor>>,
schema: SchemaRef,
values: Vec<i32>,
) -> Result<Version, Box<dyn std::error::Error>> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test");
add_files_to_transaction(&mut txn, engine, schema, values).await?;
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => Ok(committed.commit_version()),
_ => panic!("Transaction should be committed"),
}
}
async fn add_files_to_transaction(
txn: &mut delta_kernel::transaction::Transaction,
engine: &Arc<DefaultEngine<TokioBackgroundExecutor>>,
schema: SchemaRef,
values: Vec<i32>,
) -> Result<(), Box<dyn std::error::Error>> {
let data = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(values))],
)?;
let write_context = Arc::new(txn.get_write_context());
let add_files_metadata = engine
.write_parquet(
&ArrowEngineData::new(data),
write_context.as_ref(),
HashMap::new(),
)
.await?;
txn.add_files(add_files_metadata);
Ok(())
}
#[tokio::test]
async fn test_cdf_write_all_adds_succeeds() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let (table_url, engine, _tmp_dir) =
create_cdf_table("test_cdf_all_adds", schema.clone()).await?;
let version = write_data_to_table(&table_url, &engine, schema, vec![1, 2, 3]).await?;
assert_eq!(version, 1);
Ok(())
}
#[tokio::test]
async fn test_cdf_write_all_removes_succeeds() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let (table_url, engine, _tmp_dir) =
create_cdf_table("test_cdf_all_removes", schema.clone()).await?;
write_data_to_table(&table_url, &engine, schema, vec![1, 2, 3]).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("cdf remove test")
.with_data_change(true);
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 2);
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_cdf_write_mixed_no_data_change_succeeds() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let (table_url, engine, _tmp_dir) =
create_cdf_table("test_cdf_mixed_no_data_change", schema.clone()).await?;
write_data_to_table(&table_url, &engine, schema.clone(), vec![1, 2, 3]).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("cdf mixed test")
.with_data_change(false);
add_files_to_transaction(&mut txn, &engine, schema, vec![4, 5, 6]).await?;
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
let result = txn.commit(engine.as_ref())?;
match result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 2);
}
_ => panic!("Transaction should be committed"),
}
Ok(())
}
#[tokio::test]
async fn test_cdf_write_mixed_with_data_change_fails() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = get_simple_int_schema();
let (table_url, engine, _tmp_dir) =
create_cdf_table("test_cdf_mixed_with_data_change", schema.clone()).await?;
write_data_to_table(&table_url, &engine, schema.clone(), vec![1, 2, 3]).await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("cdf mixed fail test")
.with_data_change(true);
add_files_to_transaction(&mut txn, &engine, schema, vec![4, 5, 6]).await?;
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
assert_result_error_with_message(
txn.commit(engine.as_ref()),
"Cannot add and remove data in the same transaction when Change Data Feed is enabled (delta.enableChangeDataFeed = true). \
This would require writing CDC files for DML operations, which is not yet supported. \
Consider using separate transactions: one to add files, another to remove files."
);
Ok(())
}
#[tokio::test]
async fn test_post_commit_snapshot_create_then_insert() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let temp_dir = tempdir().unwrap();
let table_url = Url::from_directory_path(temp_dir.path()).unwrap();
let engine = create_default_engine(&table_url)?;
let schema = get_simple_int_schema();
let create_result = create_table_txn(table_url.as_str(), schema, env!("CARGO_PKG_VERSION"))
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let mut current_snapshot = match create_result {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), 0);
let post_snapshot = committed
.post_commit_snapshot()
.expect("should have post_commit_snapshot");
assert_eq!(post_snapshot.version(), 0);
post_snapshot.clone()
}
_ => panic!("Create should succeed"),
};
for i in 1..11 {
let base_version = current_snapshot.version();
let txn = current_snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("test");
match txn.commit(engine.as_ref())? {
CommitResult::CommittedTransaction(committed) => {
let post_snapshot = committed
.post_commit_snapshot()
.expect("should have post_commit_snapshot");
assert_eq!(post_snapshot.version(), base_version + 1);
assert_eq!(post_snapshot.version(), committed.commit_version());
assert_eq!(post_snapshot.schema(), current_snapshot.schema());
assert_eq!(post_snapshot.table_root(), current_snapshot.table_root());
current_snapshot = post_snapshot.clone();
}
_ => panic!("Commit {i} should succeed"),
}
}
Ok(())
}
#[tokio::test]
async fn test_write_parquet_succeed_with_logical_partition_names(
) -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("letter", DataType::STRING),
])?);
for (table_url, engine, _store, _table_name) in setup_test_tables(
schema.clone(),
&["letter"],
None,
"test_partition_translate",
)
.await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let data_schema = Arc::new(
StructType::try_new(vec![StructField::nullable("id", DataType::INTEGER)]).unwrap(),
);
let batch = RecordBatch::try_new(
Arc::new(data_schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?;
let result = write_batch_to_table(
&snapshot,
&engine,
batch,
HashMap::from([("letter".to_string(), "a".to_string())]),
)
.await;
assert!(
result.is_ok(),
"write_parquet should succeed with valid logical partition name"
);
}
Ok(())
}
#[tokio::test]
async fn test_write_parquet_rejects_unknown_partition_column(
) -> Result<(), Box<dyn std::error::Error>> {
let schema = get_simple_int_schema();
for (table_url, engine, _store, _table_name) in
setup_test_tables(schema.clone(), &[], None, "test_partition_reject").await?
{
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let batch = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?;
let result = write_batch_to_table(
&snapshot,
&engine,
batch,
HashMap::from([("nonexistent".to_string(), "val".to_string())]),
)
.await;
let err = result.expect_err("write_parquet should fail with unknown partition column");
let err_msg = err.to_string();
assert!(
err_msg.contains("Partition column 'nonexistent' not found in table schema"),
"Error should mention the unknown column name, got: {err_msg}"
);
}
Ok(())
}
#[rstest::rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_id(ColumnMappingMode::Id)]
#[case::cm_name(ColumnMappingMode::Name)]
#[tokio::test(flavor = "multi_thread")]
async fn test_column_mapping_write(
#[case] cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = nested_schema()?;
let (_tmp_dir, table_path, _) = test_table_setup()?;
let table_url = Url::from_directory_path(&table_path).unwrap();
let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)))
.build(),
);
let mode_str = match cm_mode {
ColumnMappingMode::None => "none",
ColumnMappingMode::Id => "id",
ColumnMappingMode::Name => "name",
};
let mut latest_snapshot = create_table_and_load_snapshot(
&table_path,
schema.clone(),
engine.as_ref(),
&[("delta.columnMapping.mode", mode_str)],
)?;
let cm = latest_snapshot
.table_properties()
.column_mapping_mode
.unwrap_or(ColumnMappingMode::None);
let row_number_physical = get_any_level_column_physical_name(
latest_snapshot.schema().as_ref(),
&ColumnName::new(["row_number"]),
cm,
)?
.into_inner();
let street_physical = get_any_level_column_physical_name(
latest_snapshot.schema().as_ref(),
&ColumnName::new(["address", "street"]),
cm,
)?
.into_inner();
for data in nested_batches()? {
latest_snapshot =
write_batch_to_table(&latest_snapshot, engine.as_ref(), data, HashMap::new()).await?;
}
latest_snapshot = set_table_properties(
&table_path,
&table_url,
engine.as_ref(),
latest_snapshot.version(),
&[("delta.checkpoint.writeStatsAsStruct", "true")],
)?;
let snapshot_for_checkpoint = latest_snapshot.clone();
snapshot_for_checkpoint.checkpoint(engine.as_ref())?;
let ckpt_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let add_actions = read_add_infos(&ckpt_snapshot, engine.as_ref())?;
let mut all_stats: Vec<_> = add_actions
.iter()
.filter_map(|a| a.stats.as_ref())
.filter(|s| s.get("minValues").is_some())
.collect();
assert_eq!(all_stats.len(), 2, "should have stats for 2 files");
all_stats.sort_by_key(|s| s["minValues"][&row_number_physical[0]].as_i64().unwrap());
assert_min_max_stats(all_stats[0], &row_number_physical, 1, 3);
assert_min_max_stats(all_stats[0], &street_physical, "st1", "st3");
assert_min_max_stats(all_stats[1], &row_number_physical, 4, 6);
assert_min_max_stats(all_stats[1], &street_physical, "st4", "st6");
{
let scan = ckpt_snapshot
.scan_builder()
.include_all_stats_columns()
.build()?;
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())?
.collect::<Result<Vec<_>, _>>()?;
let mut stats_rows: Vec<(i64, i64, String, String)> = Vec::new();
for sm in scan_metadata_results {
let (data, sel) = sm.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into();
let batch_struct = StructArray::from(batch.clone());
let stats_parsed: &StructArray =
resolve_struct_field(&batch_struct, &["stats_parsed".into()]);
let min_path = |field: &[String]| -> Vec<String> {
[&["stats_parsed".into(), "minValues".into()], field].concat()
};
let max_path = |field: &[String]| -> Vec<String> {
[&["stats_parsed".into(), "maxValues".into()], field].concat()
};
let min_row_num: &Int64Array =
resolve_struct_field(&batch_struct, &min_path(&row_number_physical));
let max_row_num: &Int64Array =
resolve_struct_field(&batch_struct, &max_path(&row_number_physical));
let min_st: &StringArray =
resolve_struct_field(&batch_struct, &min_path(&street_physical));
let max_st: &StringArray =
resolve_struct_field(&batch_struct, &max_path(&street_physical));
for (i, &selected) in sel.iter().enumerate().take(batch.num_rows()) {
if selected && !stats_parsed.is_null(i) {
stats_rows.push((
min_row_num.value(i),
max_row_num.value(i),
min_st.value(i).to_string(),
max_st.value(i).to_string(),
));
}
}
}
stats_rows.sort_by_key(|r| r.0);
assert_eq!(stats_rows.len(), 2, "should have stats_parsed for 2 files");
assert_eq!(stats_rows[0], (1, 3, "st1".to_string(), "st3".to_string()));
assert_eq!(stats_rows[1], (4, 6, "st4".to_string(), "st6".to_string()));
}
{
let parquet_path = &add_actions
.first()
.expect("should have at least one add file")
.path;
let parquet_url = table_url.join(parquet_path)?;
let local_path = parquet_url.to_file_path().unwrap();
let obj_meta = store
.head(&Path::from_url_path(parquet_url.path())?)
.await?;
let file_meta = FileMeta::new(
parquet_url,
0,
obj_meta.size as u64,
);
let footer = engine.parquet_handler().read_parquet_footer(&file_meta)?;
let footer_schema = footer.schema;
let logical_schema = latest_snapshot.schema();
for logical_path in [&["row_number"][..], &["address", "street"]] {
let col = ColumnName::new(logical_path.iter().copied());
let physical =
get_any_level_column_physical_name(logical_schema.as_ref(), &col, cm)?.into_inner();
assert_schema_has_field(&footer_schema, &physical);
let field_id = get_parquet_field_id(&local_path, &physical);
let logical_field = resolve_field(logical_schema.as_ref(), logical_path).unwrap();
match cm_mode {
ColumnMappingMode::Id | ColumnMappingMode::Name => {
let expected_id =
match logical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
Some(MetadataValue::Number(n)) => *n as i32,
other => panic!("expected ColumnMappingId number, got {other:?}"),
};
assert_eq!(
field_id,
Some(expected_id),
"parquet field_id mismatch for {logical_path:?}"
);
}
ColumnMappingMode::None => {
assert_eq!(
field_id, None,
"parquet field_id should not be set in None column mapping mode"
);
}
}
}
}
{
let post_ckpt_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan = post_ckpt_snapshot.scan_builder().build()?;
let batches: Vec<RecordBatch> = scan
.execute(engine.clone())?
.map(|r| {
let data = r.unwrap();
let arrow = ArrowEngineData::try_from_engine_data(data).unwrap();
arrow.record_batch().clone()
})
.collect();
let result_schema = batches[0].schema();
let combined = delta_kernel::arrow::compute::concat_batches(&result_schema, &batches)?;
assert_eq!(
combined.num_rows(),
6,
"Should have 6 rows from two written batches"
);
let combined_struct = StructArray::from(combined);
let row_numbers: &Int64Array =
resolve_struct_field(&combined_struct, &["row_number".into()]);
let mut vals: Vec<i64> = (0..row_numbers.len())
.map(|i| row_numbers.value(i))
.collect();
vals.sort();
assert_eq!(vals, vec![1, 2, 3, 4, 5, 6]);
let streets: &StringArray =
resolve_struct_field(&combined_struct, &["address".into(), "street".into()]);
let mut street_vals: Vec<&str> = (0..streets.len()).map(|i| streets.value(i)).collect();
street_vals.sort();
assert_eq!(street_vals, vec!["st1", "st2", "st3", "st4", "st5", "st6"]);
}
{
let original_add_stats: Vec<serde_json::Value> =
add_actions.iter().filter_map(|a| a.stats.clone()).collect();
assert!(
!original_add_stats.is_empty(),
"should have at least one add with stats"
);
let remove_actions =
remove_all_and_get_remove_actions(&latest_snapshot, &table_url, engine.as_ref())?;
assert!(
!remove_actions.is_empty(),
"Expected at least one remove action"
);
let remove_stats: Vec<serde_json::Value> = remove_actions
.iter()
.filter_map(|r| {
r["stats"]
.as_str()
.map(|s| serde_json::from_str(s).unwrap())
})
.collect();
assert_eq!(
remove_stats, original_add_stats,
"remove.stats should match original add.stats"
);
}
Ok(())
}
#[rstest::rstest]
#[case::cm_none("./tests/data/partition_cm/none")]
#[case::cm_id("./tests/data/partition_cm/id")]
#[case::cm_name("./tests/data/partition_cm/name")]
#[tokio::test(flavor = "multi_thread")]
async fn test_column_mapping_partitioned_write(
#[case] table_dir: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
copy_directory(std::path::Path::new(table_dir), tmp_dir.path())?;
let table_url = Url::from_directory_path(tmp_dir.path()).unwrap();
let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
let engine = Arc::new(
DefaultEngineBuilder::new(store.clone())
.with_task_executor(Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)))
.build(),
);
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let cm = snapshot
.table_properties()
.column_mapping_mode
.unwrap_or(ColumnMappingMode::None);
let physical_name = get_any_level_column_physical_name(
snapshot.schema().as_ref(),
&ColumnName::new(["category"]),
cm,
)?
.into_inner()
.remove(0);
if table_dir.ends_with("none") {
assert_eq!(physical_name, "category");
} else {
assert_ne!(
physical_name, "category",
"physical name should differ from logical name under column mapping"
);
}
let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"value",
DataType::INTEGER,
)])?);
let batch = RecordBatch::try_new(
Arc::new(data_schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?;
let partition_values = HashMap::from([("category".to_string(), "A".to_string())]);
write_batch_to_table(&snapshot, engine.as_ref(), batch, partition_values).await?;
let add_actions = read_actions_from_commit(&table_url, 1, "add")?;
assert!(!add_actions.is_empty(), "no add action found in commit log");
for add in &add_actions {
assert_partition_values(add, &physical_name, "A");
}
let post_write_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let remove_actions =
remove_all_and_get_remove_actions(&post_write_snapshot, &table_url, engine.as_ref())?;
assert!(
!remove_actions.is_empty(),
"no remove action found in commit log"
);
for remove in &remove_actions {
assert_partition_values(remove, &physical_name, "A");
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_non_kernel_written_table() {
let source_path = std::path::Path::new("./tests/data/external-table-different-nullability");
let temp_dir = tempfile::tempdir().unwrap();
let table_path = temp_dir.path().join("test-checkpoint-table");
test_utils::copy_directory(source_path, &table_path).unwrap();
let url = Url::from_directory_path(&table_path).unwrap();
let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
let executor = Arc::new(
delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
),
);
let engine: Arc<delta_kernel::engine::default::DefaultEngine<_>> = Arc::new(
delta_kernel::engine::default::DefaultEngineBuilder::new(store)
.with_task_executor(executor)
.build(),
);
let snapshot = Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.unwrap();
let scan_before = Arc::clone(&snapshot).scan_builder().build().unwrap();
let batches_before = test_utils::read_scan(&scan_before, engine.clone()).unwrap();
snapshot.checkpoint(engine.as_ref()).unwrap();
let snapshot_after = Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.unwrap();
let scan_after = snapshot_after.scan_builder().build().unwrap();
let batches_after = test_utils::read_scan(&scan_after, engine.clone()).unwrap();
let formatted_before =
delta_kernel::arrow::util::pretty::pretty_format_batches(&batches_before)
.unwrap()
.to_string();
let formatted_after = delta_kernel::arrow::util::pretty::pretty_format_batches(&batches_after)
.unwrap()
.to_string();
assert_eq!(
formatted_before, formatted_after,
"Row data changed after checkpoint creation!"
);
let delta_log_path = table_path.join("_delta_log");
let has_checkpoint = std::fs::read_dir(&delta_log_path)
.unwrap()
.filter_map(|e| e.ok())
.any(|e| {
e.file_name()
.to_str()
.is_some_and(|n| n.contains(".checkpoint.parquet"))
});
assert!(has_checkpoint, "Expected at least one checkpoint file");
}
struct ClusteredTableSetup {
_tmp_dir: TempDir,
table_path: String,
table_url: Url,
engine: Arc<DefaultEngine<TokioMultiThreadExecutor>>,
snapshot: Arc<Snapshot>,
}
fn setup_clustered_table(
cm_mode: &str,
schema: Arc<StructType>,
clustering_cols: Vec<ColumnName>,
table_properties: &[(&str, &str)],
) -> Result<ClusteredTableSetup, Box<dyn std::error::Error>> {
use delta_kernel::transaction::data_layout::DataLayout;
let (_tmp_dir, table_path, _) = test_table_setup()?;
let table_url = Url::from_directory_path(&table_path).unwrap();
let engine = create_default_engine_mt_executor(&table_url)?;
let _ = create_table_txn(table_url.as_str(), schema, "Test/1.0")
.with_table_properties([("delta.columnMapping.mode", cm_mode)])
.with_data_layout(DataLayout::Clustered {
columns: clustering_cols,
})
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = set_table_properties(
&table_path,
&table_url,
engine.as_ref(),
0,
table_properties,
)?;
Ok(ClusteredTableSetup {
_tmp_dir,
table_path,
table_url,
engine,
snapshot,
})
}
#[rstest::rstest]
#[case::cm_none("none")]
#[case::cm_name("name")]
#[case::cm_id("id")]
#[tokio::test(flavor = "multi_thread")]
async fn test_clustered_table_write_has_stats(
#[case] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let clustering_cols = vec![
ColumnName::new(["row_number"]),
ColumnName::new(["address", "street"]),
];
let setup = setup_clustered_table(
cm_mode,
nested_schema()?,
clustering_cols.clone(),
&[("delta.dataSkippingNumIndexedCols", "0")],
)?;
let engine = &setup.engine;
let mut snapshot = setup.snapshot;
for batch in nested_batches()? {
snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, HashMap::new()).await?;
}
let cm = assert_column_mapping_mode(&snapshot, cm_mode);
let physical_paths: Vec<Vec<String>> = clustering_cols
.iter()
.map(|c| {
get_any_level_column_physical_name(snapshot.schema().as_ref(), c, cm)
.unwrap()
.into_inner()
})
.collect();
if cm != ColumnMappingMode::None {
let logical_paths: Vec<Vec<&str>> = vec![vec!["row_number"], vec!["address", "street"]];
for (phys, logical) in physical_paths.iter().zip(&logical_paths) {
assert_ne!(
phys.iter().map(String::as_str).collect_vec(),
*logical,
"physical path should differ from logical when cm={cm:?}"
);
}
}
let non_clustering_physical = get_any_level_column_physical_name(
snapshot.schema().as_ref(),
&ColumnName::new(["name"]),
cm,
)?
.into_inner();
let expected: [(i64, i64, &str, &str); 2] = [(1, 3, "st1", "st3"), (4, 6, "st4", "st6")];
for (version, (min_rn, max_rn, min_st, max_st)) in expected.iter().enumerate() {
let version = (version + 2) as u64;
let add_actions = read_actions_from_commit(&setup.table_url, version, "add")?;
assert!(
!add_actions.is_empty(),
"v{version}: should have add actions"
);
for add in &add_actions {
let stats: serde_json::Value = serde_json::from_str(
add.get("stats")
.and_then(|s| s.as_str())
.expect("add action should have stats"),
)?;
assert_min_max_stats(&stats, &physical_paths[0], *min_rn, *max_rn);
assert_min_max_stats(&stats, &physical_paths[1], *min_st, *max_st);
let non_cluster_min = resolve_json_path(&stats["minValues"], &non_clustering_physical);
assert!(
non_cluster_min.is_null(),
"v{version}: non-clustering column 'name' should not have stats"
);
}
}
Ok(())
}
#[rstest::rstest]
#[case::cm_none("none")]
#[case::cm_name("name")]
#[case::cm_id("id")]
#[tokio::test(flavor = "multi_thread")]
async fn test_clustered_table_write_has_stats_parsed(
#[case] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let clustering_cols = vec![
ColumnName::new(["row_number"]),
ColumnName::new(["address", "street"]),
];
let setup = setup_clustered_table(
cm_mode,
nested_schema()?,
clustering_cols.clone(),
&[
("delta.checkpoint.writeStatsAsStruct", "true"),
("delta.dataSkippingNumIndexedCols", "0"),
],
)?;
let engine = &setup.engine;
let mut snapshot = setup.snapshot;
for batch in nested_batches()? {
snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, HashMap::new()).await?;
}
let cm = assert_column_mapping_mode(&snapshot, cm_mode);
let physical_paths: Vec<Vec<String>> = clustering_cols
.iter()
.map(|c| {
get_any_level_column_physical_name(snapshot.schema().as_ref(), c, cm)
.unwrap()
.into_inner()
})
.collect();
if cm != ColumnMappingMode::None {
let logical_paths: Vec<Vec<&str>> = vec![vec!["row_number"], vec!["address", "street"]];
for (phys, logical) in physical_paths.iter().zip(&logical_paths) {
assert_ne!(
phys.iter().map(String::as_str).collect_vec(),
*logical,
"physical path should differ from logical when cm={cm:?}"
);
}
}
let non_clustering_physical = get_any_level_column_physical_name(
snapshot.schema().as_ref(),
&ColumnName::new(["name"]),
cm,
)?
.into_inner();
snapshot.checkpoint(engine.as_ref())?;
use delta_kernel::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let delta_log = std::path::Path::new(&setup.table_path).join("_delta_log");
let ckpt_path = std::fs::read_dir(&delta_log)?
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.is_some_and(|n| n.contains(".checkpoint.parquet"))
})
.expect("checkpoint parquet should exist")
.path();
let file = std::fs::File::open(&ckpt_path)?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
let min_path = |field: &[String]| -> Vec<String> { [&["minValues".into()], field].concat() };
let max_path = |field: &[String]| -> Vec<String> { [&["maxValues".into()], field].concat() };
let mut stats_rows: Vec<(i64, i64, String, String)> = Vec::new();
for batch in reader {
let batch = batch?;
let batch_struct = StructArray::from(batch);
let add: &StructArray = resolve_struct_field(&batch_struct, &["add".into()]);
let stats_parsed: &StructArray = resolve_struct_field(add, &["stats_parsed".into()]);
let min_values: &StructArray = resolve_struct_field(stats_parsed, &["minValues".into()]);
assert!(
min_values
.column_by_name(&non_clustering_physical[0])
.is_none(),
"non-clustering column '{}' should not have stats_parsed",
non_clustering_physical[0]
);
let min_row_num: &Int64Array =
resolve_struct_field(stats_parsed, &min_path(&physical_paths[0]));
let max_row_num: &Int64Array =
resolve_struct_field(stats_parsed, &max_path(&physical_paths[0]));
let min_st: &StringArray =
resolve_struct_field(stats_parsed, &min_path(&physical_paths[1]));
let max_st: &StringArray =
resolve_struct_field(stats_parsed, &max_path(&physical_paths[1]));
for i in 0..stats_parsed.len() {
if !stats_parsed.is_null(i) {
stats_rows.push((
min_row_num.value(i),
max_row_num.value(i),
min_st.value(i).to_string(),
max_st.value(i).to_string(),
));
}
}
}
stats_rows.sort_by_key(|r| r.0);
assert_eq!(stats_rows.len(), 2, "should have stats_parsed for 2 files");
assert_eq!(stats_rows[0], (1, 3, "st1".to_string(), "st3".to_string()));
assert_eq!(stats_rows[1], (4, 6, "st4".to_string(), "st6".to_string()));
Ok(())
}