use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{Array, Int64Array, StringArray, StructArray};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::ColumnName;
use delta_kernel::object_store::local::LocalFileSystem;
use delta_kernel::object_store::DynObjectStore;
use delta_kernel::schema::StructType;
use delta_kernel::table_features::{get_any_level_column_physical_name, ColumnMappingMode};
use delta_kernel::transaction::create_table::create_table as create_table_txn;
use delta_kernel::Snapshot;
use itertools::Itertools;
use tempfile::TempDir;
use test_utils::{
create_default_engine_mt_executor, nested_batches, nested_schema, read_actions_from_commit,
test_table_setup, write_batch_to_table,
};
use url::Url;
use crate::common::write_utils::{
assert_column_mapping_mode, assert_min_max_stats, resolve_json_path, resolve_struct_field,
set_table_properties,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_non_kernel_written_table() {
let source_path = std::path::Path::new("./tests/data/external-table-different-nullability");
let temp_dir = tempfile::tempdir().unwrap();
let table_path = temp_dir.path().join("test-checkpoint-table");
test_utils::copy_directory(source_path, &table_path).unwrap();
let url = Url::from_directory_path(&table_path).unwrap();
let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new());
let executor = Arc::new(
delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
),
);
let engine: Arc<delta_kernel::engine::default::DefaultEngine<_>> = Arc::new(
delta_kernel::engine::default::DefaultEngineBuilder::new(store)
.with_task_executor(executor)
.build(),
);
let snapshot = Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.unwrap();
let scan_before = Arc::clone(&snapshot).scan_builder().build().unwrap();
let batches_before = test_utils::read_scan(&scan_before, engine.clone()).unwrap();
snapshot.checkpoint(engine.as_ref(), None).unwrap();
let snapshot_after = Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.unwrap();
let scan_after = snapshot_after.scan_builder().build().unwrap();
let batches_after = test_utils::read_scan(&scan_after, engine.clone()).unwrap();
let formatted_before =
delta_kernel::arrow::util::pretty::pretty_format_batches(&batches_before)
.unwrap()
.to_string();
let formatted_after = delta_kernel::arrow::util::pretty::pretty_format_batches(&batches_after)
.unwrap()
.to_string();
assert_eq!(
formatted_before, formatted_after,
"Row data changed after checkpoint creation!"
);
let delta_log_path = table_path.join("_delta_log");
let has_checkpoint = std::fs::read_dir(&delta_log_path)
.unwrap()
.filter_map(|e| e.ok())
.any(|e| {
e.file_name()
.to_str()
.is_some_and(|n| n.contains(".checkpoint.parquet"))
});
assert!(has_checkpoint, "Expected at least one checkpoint file");
}
struct ClusteredTableSetup {
_tmp_dir: TempDir,
table_path: String,
table_url: Url,
engine: Arc<DefaultEngine<TokioMultiThreadExecutor>>,
snapshot: Arc<Snapshot>,
}
fn setup_clustered_table(
cm_mode: &str,
schema: Arc<StructType>,
clustering_cols: Vec<ColumnName>,
table_properties: &[(&str, &str)],
) -> Result<ClusteredTableSetup, Box<dyn std::error::Error>> {
use delta_kernel::transaction::data_layout::DataLayout;
let (_tmp_dir, table_path, _) = test_table_setup()?;
let table_url = Url::from_directory_path(&table_path).unwrap();
let engine = create_default_engine_mt_executor(&table_url)?;
let _ = create_table_txn(table_url.as_str(), schema, "Test/1.0")
.with_table_properties([("delta.columnMapping.mode", cm_mode)])
.with_data_layout(DataLayout::Clustered {
columns: clustering_cols,
})
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = set_table_properties(
&table_path,
&table_url,
engine.as_ref(),
0,
table_properties,
)?;
Ok(ClusteredTableSetup {
_tmp_dir,
table_path,
table_url,
engine,
snapshot,
})
}
#[rstest::rstest]
#[case::cm_none("none")]
#[case::cm_name("name")]
#[case::cm_id("id")]
#[tokio::test(flavor = "multi_thread")]
async fn test_clustered_table_write_has_stats(
#[case] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let clustering_cols = vec![
ColumnName::new(["row_number"]),
ColumnName::new(["address", "street"]),
];
let setup = setup_clustered_table(
cm_mode,
nested_schema()?,
clustering_cols.clone(),
&[("delta.dataSkippingNumIndexedCols", "0")],
)?;
let engine = &setup.engine;
let mut snapshot = setup.snapshot;
for batch in nested_batches()? {
snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, HashMap::new()).await?;
}
let cm = assert_column_mapping_mode(&snapshot, cm_mode);
let physical_paths: Vec<Vec<String>> = clustering_cols
.iter()
.map(|c| {
get_any_level_column_physical_name(snapshot.schema().as_ref(), c, cm)
.unwrap()
.into_inner()
})
.collect();
if cm != ColumnMappingMode::None {
let logical_paths: Vec<Vec<&str>> = vec![vec!["row_number"], vec!["address", "street"]];
for (phys, logical) in physical_paths.iter().zip(&logical_paths) {
assert_ne!(
phys.iter().map(String::as_str).collect_vec(),
*logical,
"physical path should differ from logical when cm={cm:?}"
);
}
}
let non_clustering_physical = get_any_level_column_physical_name(
snapshot.schema().as_ref(),
&ColumnName::new(["name"]),
cm,
)?
.into_inner();
let expected: [(i64, i64, &str, &str); 2] = [(1, 3, "st1", "st3"), (4, 6, "st4", "st6")];
for (version, (min_rn, max_rn, min_st, max_st)) in expected.iter().enumerate() {
let version = (version + 2) as u64;
let add_actions = read_actions_from_commit(&setup.table_url, version, "add")?;
assert!(
!add_actions.is_empty(),
"v{version}: should have add actions"
);
for add in &add_actions {
let stats: serde_json::Value = serde_json::from_str(
add.get("stats")
.and_then(|s| s.as_str())
.expect("add action should have stats"),
)?;
assert_min_max_stats(&stats, &physical_paths[0], *min_rn, *max_rn);
assert_min_max_stats(&stats, &physical_paths[1], *min_st, *max_st);
let non_cluster_min = resolve_json_path(&stats["minValues"], &non_clustering_physical);
assert!(
non_cluster_min.is_null(),
"v{version}: non-clustering column 'name' should not have stats"
);
}
}
Ok(())
}
#[rstest::rstest]
#[case::cm_none("none")]
#[case::cm_name("name")]
#[case::cm_id("id")]
#[tokio::test(flavor = "multi_thread")]
async fn test_clustered_table_write_has_stats_parsed(
#[case] cm_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let clustering_cols = vec![
ColumnName::new(["row_number"]),
ColumnName::new(["address", "street"]),
];
let setup = setup_clustered_table(
cm_mode,
nested_schema()?,
clustering_cols.clone(),
&[
("delta.checkpoint.writeStatsAsStruct", "true"),
("delta.dataSkippingNumIndexedCols", "0"),
],
)?;
let engine = &setup.engine;
let mut snapshot = setup.snapshot;
for batch in nested_batches()? {
snapshot = write_batch_to_table(&snapshot, engine.as_ref(), batch, HashMap::new()).await?;
}
let cm = assert_column_mapping_mode(&snapshot, cm_mode);
let physical_paths: Vec<Vec<String>> = clustering_cols
.iter()
.map(|c| {
get_any_level_column_physical_name(snapshot.schema().as_ref(), c, cm)
.unwrap()
.into_inner()
})
.collect();
if cm != ColumnMappingMode::None {
let logical_paths: Vec<Vec<&str>> = vec![vec!["row_number"], vec!["address", "street"]];
for (phys, logical) in physical_paths.iter().zip(&logical_paths) {
assert_ne!(
phys.iter().map(String::as_str).collect_vec(),
*logical,
"physical path should differ from logical when cm={cm:?}"
);
}
}
let non_clustering_physical = get_any_level_column_physical_name(
snapshot.schema().as_ref(),
&ColumnName::new(["name"]),
cm,
)?
.into_inner();
snapshot.checkpoint(engine.as_ref(), None)?;
use delta_kernel::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let delta_log = std::path::Path::new(&setup.table_path).join("_delta_log");
let ckpt_path = std::fs::read_dir(&delta_log)?
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.is_some_and(|n| n.contains(".checkpoint.parquet"))
})
.expect("checkpoint parquet should exist")
.path();
let file = std::fs::File::open(&ckpt_path)?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
let min_path = |field: &[String]| -> Vec<String> { [&["minValues".into()], field].concat() };
let max_path = |field: &[String]| -> Vec<String> { [&["maxValues".into()], field].concat() };
let mut stats_rows: Vec<(i64, i64, String, String)> = Vec::new();
for batch in reader {
let batch = batch?;
let batch_struct = StructArray::from(batch);
let add: &StructArray = resolve_struct_field(&batch_struct, &["add".into()]);
let stats_parsed: &StructArray = resolve_struct_field(add, &["stats_parsed".into()]);
let min_values: &StructArray = resolve_struct_field(stats_parsed, &["minValues".into()]);
assert!(
min_values
.column_by_name(&non_clustering_physical[0])
.is_none(),
"non-clustering column '{}' should not have stats_parsed",
non_clustering_physical[0]
);
let min_row_num: &Int64Array =
resolve_struct_field(stats_parsed, &min_path(&physical_paths[0]));
let max_row_num: &Int64Array =
resolve_struct_field(stats_parsed, &max_path(&physical_paths[0]));
let min_st: &StringArray =
resolve_struct_field(stats_parsed, &min_path(&physical_paths[1]));
let max_st: &StringArray =
resolve_struct_field(stats_parsed, &max_path(&physical_paths[1]));
for i in 0..stats_parsed.len() {
if !stats_parsed.is_null(i) {
stats_rows.push((
min_row_num.value(i),
max_row_num.value(i),
min_st.value(i).to_string(),
max_st.value(i).to_string(),
));
}
}
}
stats_rows.sort_by_key(|r| r.0);
assert_eq!(stats_rows.len(), 2, "should have stats_parsed for 2 files");
assert_eq!(stats_rows[0], (1, 3, "st1".to_string(), "st3".to_string()));
assert_eq!(stats_rows[1], (4, 6, "st4".to_string(), "st6".to_string()));
Ok(())
}