use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::action_reconciliation::{
deleted_file_retention_timestamp_with_time, DEFAULT_RETENTION_SECS,
};
use crate::actions::{Add, Metadata, Protocol, Remove};
use crate::arrow::datatypes::DataType;
use crate::arrow::{
array::{create_array, Array, AsArray, RecordBatch, StructArray},
datatypes::{Field, Schema},
};
use crate::checkpoint::{create_last_checkpoint_data, CHECKPOINT_ACTIONS_SCHEMA_V2};
use crate::committer::FileSystemCommitter;
use crate::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt};
use crate::engine::default::executor::tokio::TokioMultiThreadExecutor;
use crate::engine::default::DefaultEngineBuilder;
use crate::log_replay::HasSelectionVector;
use crate::object_store::local::LocalFileSystem;
use crate::object_store::{memory::InMemory, path::Path, ObjectStoreExt as _};
use crate::schema::{DataType as KernelDataType, StructField, StructType};
use crate::table_features::TableFeature;
use crate::utils::test_utils::Action;
use crate::{DeltaResult, FileMeta, LogPath, Snapshot};
use serde_json::{from_slice, json, Value};
use tempfile::tempdir;
use test_utils::delta_path_for_version;
use url::Url;
#[rstest::rstest]
#[case::default_retention(
None,
10_000_000 - (DEFAULT_RETENTION_SECS as i64 * 1_000)
)]
#[case::zero_retention(Some(Duration::from_secs(0)), 10_000_000)]
#[case::custom_retention(Some(Duration::from_secs(2_000)), 10_000_000 - 2_000_000)]
fn test_deleted_file_retention_timestamp(
#[case] retention: Option<Duration>,
#[case] expected_timestamp: i64,
) -> DeltaResult<()> {
let reference_time_secs = 10_000;
let reference_time = Duration::from_secs(reference_time_secs);
let result = deleted_file_retention_timestamp_with_time(retention, reference_time)?;
assert_eq!(result, expected_timestamp);
Ok(())
}
#[tokio::test]
async fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![
create_v2_checkpoint_protocol_action(),
create_metadata_action(),
],
0,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.create_checkpoint_writer()?;
let checkpoint_batch =
writer.create_checkpoint_metadata_batch(&engine, &CHECKPOINT_ACTIONS_SCHEMA_V2)?;
assert!(checkpoint_batch.filtered_data.has_selected_rows());
let (underlying_data, _) = checkpoint_batch.filtered_data.into_parts();
let arrow_engine_data = ArrowEngineData::try_from_engine_data(underlying_data)?;
let record_batch = arrow_engine_data.record_batch();
let schema = record_batch.schema();
assert!(
schema.field_with_name("checkpointMetadata").is_ok(),
"Schema should have checkpointMetadata field"
);
assert!(
schema.field_with_name("add").is_ok(),
"Schema should have add field"
);
assert!(
schema.field_with_name("remove").is_ok(),
"Schema should have remove field"
);
assert_eq!(record_batch.num_rows(), 1);
assert_eq!(checkpoint_batch.actions_count, 1);
assert_eq!(checkpoint_batch.add_actions_count, 0);
Ok(())
}
#[test]
fn test_create_last_checkpoint_data() -> DeltaResult<()> {
let version = 10;
let total_actions_counter = 100;
let add_actions_counter = 75;
let size_in_bytes: i64 = 1024 * 1024; let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
let last_checkpoint_batch = create_last_checkpoint_data(
&engine,
version,
total_actions_counter,
add_actions_counter,
size_in_bytes,
)?;
let arrow_engine_data = ArrowEngineData::try_from_engine_data(last_checkpoint_batch)?;
let record_batch = arrow_engine_data.record_batch();
let expected_schema = Arc::new(Schema::new(vec![
Field::new("version", DataType::Int64, false),
Field::new("size", DataType::Int64, false),
Field::new("parts", DataType::Int64, true),
Field::new("sizeInBytes", DataType::Int64, true),
Field::new("numOfAddFiles", DataType::Int64, true),
]));
let expected = RecordBatch::try_new(
expected_schema,
vec![
create_array!(Int64, [version]),
create_array!(Int64, [total_actions_counter]),
create_array!(Int64, [None]),
create_array!(Int64, [size_in_bytes]),
create_array!(Int64, [add_actions_counter]),
],
)
.unwrap();
assert_eq!(*record_batch, expected);
Ok(())
}
fn new_in_memory_store() -> (Arc<InMemory>, Url) {
(
Arc::new(InMemory::new()),
Url::parse("memory:///")
.unwrap()
.join("_delta_log/")
.unwrap(),
)
}
async fn write_commit_to_store(
store: &Arc<InMemory>,
actions: Vec<Action>,
version: u64,
) -> DeltaResult<()> {
let json_lines: Vec<String> = actions
.into_iter()
.map(|action| serde_json::to_string(&action).expect("action to string"))
.collect();
let content = json_lines.join("\n");
let commit_path = delta_path_for_version(version, "json");
store.put(&commit_path, content.into()).await?;
Ok(())
}
fn create_basic_protocol_action() -> Action {
Action::Protocol(
Protocol::try_new_modern(TableFeature::EMPTY_LIST, TableFeature::EMPTY_LIST).unwrap(),
)
}
fn create_v2_checkpoint_protocol_action() -> Action {
Action::Protocol(Protocol::try_new_modern(vec!["v2Checkpoint"], vec!["v2Checkpoint"]).unwrap())
}
fn create_metadata_action() -> Action {
Action::Metadata(
Metadata::try_new(
Some("test-table".into()),
None,
Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
KernelDataType::INTEGER,
)])),
vec![],
0,
HashMap::new(),
)
.unwrap(),
)
}
fn create_add_action(path: &str) -> Action {
Action::Add(Add {
path: path.into(),
data_change: true,
..Default::default()
})
}
fn create_remove_action(path: &str) -> Action {
Action::Remove(Remove {
path: path.into(),
data_change: true,
deletion_timestamp: Some(i64::MAX), ..Default::default()
})
}
async fn assert_last_checkpoint_contents(
store: &Arc<InMemory>,
expected_version: u64,
expected_size: u64,
expected_num_add_files: u64,
expected_size_in_bytes: u64,
) -> DeltaResult<()> {
let last_checkpoint_data = read_last_checkpoint_file(store).await?;
let expected_data = json!({
"version": expected_version,
"size": expected_size,
"sizeInBytes": expected_size_in_bytes,
"numOfAddFiles": expected_num_add_files,
});
assert_eq!(last_checkpoint_data, expected_data);
Ok(())
}
async fn read_last_checkpoint_file(store: &Arc<InMemory>) -> DeltaResult<Value> {
let path = Path::from("_delta_log/_last_checkpoint");
let data = store.get(&path).await?;
let byte_data = data.bytes().await?;
Ok(from_slice(&byte_data)?)
}
#[tokio::test]
async fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![create_add_action_with_stats("fake_path_1", 10)],
0,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action_with_stats("fake_path_2", 20),
create_remove_action("fake_path_1"),
],
1,
)
.await?;
write_commit_to_store(
&store,
vec![create_metadata_action(), create_basic_protocol_action()],
2,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.create_checkpoint_writer()?;
assert_eq!(
writer.checkpoint_path()?,
Url::parse("memory:///_delta_log/00000000000000000002.checkpoint.parquet")?
);
let result = writer.checkpoint_data(&engine)?;
let mut data_iter = result;
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[true, true]);
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[true, true]);
assert!(data_iter.next().is_none());
let size_in_bytes = 10;
let metadata = FileMeta {
location: Url::parse("memory:///fake_path_2")?,
last_modified: 0,
size: size_in_bytes,
};
writer.finalize(&engine, &metadata, &data_iter.state())?;
assert_last_checkpoint_contents(&store, 2, 4, 1, size_in_bytes).await?;
Ok(())
}
#[tokio::test]
async fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![create_basic_protocol_action(), create_metadata_action()],
0,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action_with_stats("file1.parquet", 100),
create_add_action_with_stats("file2.parquet", 200),
],
1,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.create_checkpoint_writer()?;
assert_eq!(
writer.checkpoint_path()?,
Url::parse("memory:///_delta_log/00000000000000000000.checkpoint.parquet")?
);
let result = writer.checkpoint_data(&engine)?;
let mut data_iter = result;
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[true, true]);
assert!(data_iter.next().is_none());
let size_in_bytes = 10;
let metadata = FileMeta {
location: Url::parse("memory:///fake_path_2")?,
last_modified: 0,
size: size_in_bytes,
};
writer.finalize(&engine, &metadata, &data_iter.state())?;
assert_last_checkpoint_contents(&store, 0, 2, 0, size_in_bytes).await?;
Ok(())
}
#[tokio::test]
async fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![create_basic_protocol_action(), create_metadata_action()],
0,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.create_checkpoint_writer()?;
let data_iter = writer.checkpoint_data(&engine)?;
let size_in_bytes = 10;
let metadata = FileMeta {
location: Url::parse("memory:///fake_path_2")?,
last_modified: 0,
size: size_in_bytes,
};
let err = writer
.finalize(&engine, &metadata, &data_iter.state())
.expect_err("finalize should fail");
assert!(
err.to_string().contains("Error writing checkpoint: The checkpoint data iterator must be fully consumed and written to storage before calling finalize")
);
Ok(())
}
#[tokio::test]
async fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![
create_add_action_with_stats("fake_path_2", 50),
create_remove_action("fake_path_1"),
],
0,
)
.await?;
write_commit_to_store(
&store,
vec![
create_metadata_action(),
create_v2_checkpoint_protocol_action(),
],
1,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.create_checkpoint_writer()?;
assert_eq!(
writer.checkpoint_path()?,
Url::parse("memory:///_delta_log/00000000000000000001.checkpoint.parquet")?
);
let result = writer.checkpoint_data(&engine)?;
let mut data_iter = result;
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[true, true]);
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[true, true]);
let batch = data_iter.next().unwrap()?;
assert_eq!(batch.selection_vector(), &[] as &[bool]);
assert!(batch.has_selected_rows());
assert!(data_iter.next().is_none());
let size_in_bytes = 10;
let metadata = FileMeta {
location: Url::parse("memory:///fake_path_2")?,
last_modified: 0,
size: size_in_bytes,
};
writer.finalize(&engine, &metadata, &data_iter.state())?;
assert_last_checkpoint_contents(&store, 1, 5, 1, size_in_bytes).await?;
Ok(())
}
#[tokio::test]
async fn test_no_checkpoint_on_unpublished_snapshot() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let engine = DefaultEngineBuilder::new(store.clone()).build();
write_commit_to_store(
&store,
vec![create_metadata_action(), create_basic_protocol_action()],
0,
)
.await?;
let staged_commit_path = Path::from(
"_delta_log/_staged_commits/00000000000000000001.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
);
let add_action = Action::Add(Add::default());
store
.put(
&staged_commit_path,
serde_json::to_string(&add_action).unwrap().into(),
)
.await
.unwrap();
let table_root = Url::parse("memory:///")?;
let staged_commit = FileMeta {
location: Url::parse("memory:///_delta_log/_staged_commits/00000000000000000001.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")?,
last_modified: 0,
size: 100,
};
let snapshot = Snapshot::builder_for(table_root.clone())
.with_log_tail(vec![LogPath::try_new(staged_commit).unwrap()])
.build(&engine)?;
assert!(matches!(
snapshot.create_checkpoint_writer().unwrap_err(),
crate::Error::Generic(e) if e == "Log segment is not published"
));
Ok(())
}
fn create_add_action_with_stats(path: &str, num_records: i64) -> Action {
let stats = format!(
r#"{{"numRecords":{num_records},"minValues":{{"id":1,"name":"alice"}},"maxValues":{{"id":100,"name":"zoe"}},"nullCount":{{"id":0,"name":5}}}}"#
);
Action::Add(Add {
path: path.into(),
data_change: true,
stats: Some(stats),
..Default::default()
})
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_snapshot_checkpoint() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build();
write_commit_to_store(
&store,
vec![create_metadata_action(), create_basic_protocol_action()],
0,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action("file1.parquet"),
create_add_action("file2.parquet"),
create_add_action("file3.parquet"),
],
1,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action("file4.parquet"),
create_add_action("file5.parquet"),
create_remove_action("file1.parquet"),
],
2,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action("file6.parquet"),
create_remove_action("file2.parquet"),
create_remove_action("file3.parquet"),
],
3,
)
.await?;
write_commit_to_store(
&store,
vec![
create_add_action("file7.parquet"),
create_add_action("file8.parquet"),
],
4,
)
.await?;
let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder_for(table_root.clone()).build(&engine)?;
snapshot.checkpoint(&engine)?;
let checkpoint_path = Path::from("_delta_log/00000000000000000004.checkpoint.parquet");
let checkpoint_size = store.head(&checkpoint_path).await?.size;
assert_last_checkpoint_contents(&store, 4, 10, 5, checkpoint_size).await?;
write_commit_to_store(
&store,
vec![
create_add_action("file9.parquet"),
create_add_action("file10.parquet"),
create_remove_action("file4.parquet"),
],
5,
)
.await?;
write_commit_to_store(&store, vec![create_add_action("file11.parquet")], 6).await?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
snapshot.checkpoint(&engine)?;
let checkpoint_path = Path::from("_delta_log/00000000000000000006.checkpoint.parquet");
let checkpoint_size = store.head(&checkpoint_path).await?.size;
assert_last_checkpoint_contents(&store, 6, 13, 7, checkpoint_size).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_preserves_domain_metadata() -> DeltaResult<()> {
let tmp_dir = tempdir().unwrap();
let table_path = tmp_dir.path();
let table_url = Url::from_directory_path(table_path).unwrap();
std::fs::create_dir_all(table_path.join("_delta_log")).unwrap();
let commit0 = [
json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": [],
"writerFeatures": ["domainMetadata"]
}
}),
json!({
"metaData": {
"id": "test-table-id",
"format": { "provider": "parquet", "options": {} },
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1587968585495i64
}
}),
]
.map(|j| j.to_string())
.join("\n");
std::fs::write(
table_path.join("_delta_log/00000000000000000000.json"),
commit0,
)
.unwrap();
let store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build();
let commit_domain_metadata = |domain: &str, value: &str| -> DeltaResult<()> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let result = txn
.with_domain_metadata(domain.to_string(), value.to_string())
.commit(&engine)?;
assert!(result.is_committed());
Ok(())
};
commit_domain_metadata("foo", "bar1")?;
commit_domain_metadata("foo", "bar2")?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
assert_eq!(snapshot.version(), 2);
let domain_value = snapshot.get_domain_metadata("foo", &engine)?;
assert_eq!(domain_value, Some("bar2".to_string()));
snapshot.checkpoint(&engine)?;
let snapshot = Snapshot::builder_for(table_url)
.at_version(2)
.build(&engine)?;
let domain_value = snapshot.get_domain_metadata("foo", &engine)?;
assert_eq!(domain_value, Some("bar2".to_string()));
Ok(())
}
fn create_metadata_with_stats_config(
write_stats_as_json: bool,
write_stats_as_struct: bool,
) -> Action {
create_metadata_with_stats_config_and_partitions(
write_stats_as_json,
write_stats_as_struct,
vec![],
)
}
fn create_metadata_with_stats_config_and_partitions(
write_stats_as_json: bool,
write_stats_as_struct: bool,
partition_columns: Vec<String>,
) -> Action {
let config = HashMap::from([
(
"delta.checkpoint.writeStatsAsJson".to_string(),
write_stats_as_json.to_string(),
),
(
"delta.checkpoint.writeStatsAsStruct".to_string(),
write_stats_as_struct.to_string(),
),
]);
Action::Metadata(
Metadata::try_new(
Some("test-table".into()),
None,
StructType::new_unchecked([
StructField::nullable("id", KernelDataType::LONG),
StructField::nullable("name", KernelDataType::STRING),
StructField::nullable("category", KernelDataType::STRING),
])
.into(),
partition_columns,
0,
config,
)
.unwrap(),
)
}
fn verify_checkpoint_schema(
schema: &Schema,
expect_stats: bool,
expect_stats_parsed: bool,
) -> DeltaResult<()> {
verify_checkpoint_schema_with_partitions(schema, expect_stats, expect_stats_parsed, false)
}
fn verify_checkpoint_schema_with_partitions(
schema: &Schema,
expect_stats: bool,
expect_stats_parsed: bool,
expect_partition_values_parsed: bool,
) -> DeltaResult<()> {
let add_field = schema
.field_with_name("add")
.expect("schema should have 'add' field");
if let DataType::Struct(add_fields) = add_field.data_type() {
let has_stats = add_fields.iter().any(|f| f.name() == "stats");
let has_stats_parsed = add_fields.iter().any(|f| f.name() == "stats_parsed");
let has_pv_parsed = add_fields
.iter()
.any(|f| f.name() == "partitionValues_parsed");
assert_eq!(
has_stats, expect_stats,
"stats field: expected={expect_stats}, actual={has_stats}"
);
assert_eq!(
has_stats_parsed, expect_stats_parsed,
"stats_parsed field: expected={expect_stats_parsed}, actual={has_stats_parsed}"
);
assert_eq!(
has_pv_parsed, expect_partition_values_parsed,
"partitionValues_parsed field: expected={expect_partition_values_parsed}, actual={has_pv_parsed}"
);
} else {
panic!("add field should be a struct");
}
Ok(())
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_stats_config_round_trip(
#[values(true, false)] json1: bool,
#[values(true, false)] struct1: bool,
#[values(true, false)] json2: bool,
#[values(true, false)] struct2: bool,
) -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build();
let table_root = Url::parse("memory:///")?;
write_commit_to_store(
&store,
vec![
create_basic_protocol_action(),
create_metadata_with_stats_config(json1, struct1),
],
0,
)
.await?;
write_commit_to_store(
&store,
vec![create_add_action_with_stats("file1.parquet", 100)],
1,
)
.await?;
let snapshot1 = Snapshot::builder_for(table_root.clone()).build(&engine)?;
snapshot1.checkpoint(&engine)?;
write_commit_to_store(
&store,
vec![create_metadata_with_stats_config(json2, struct2)],
2,
)
.await?;
let snapshot2 = Snapshot::builder_for(table_root).build(&engine)?;
let writer2 = snapshot2.create_checkpoint_writer()?;
let mut result2 = writer2.checkpoint_data(&engine)?;
let first_batch = result2.next().expect("should have at least one batch")?;
let data = first_batch.apply_selection_vector()?;
let record_batch = data.try_into_record_batch()?;
verify_checkpoint_schema(&record_batch.schema(), json2, struct2)?;
for batch in result2 {
let _ = batch?;
}
Ok(())
}
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_stats_config_round_trip_partitioned(
#[values(true, false)] json1: bool,
#[values(true, false)] struct1: bool,
#[values(true, false)] json2: bool,
#[values(true, false)] struct2: bool,
) -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build();
let table_root = Url::parse("memory:///")?;
write_commit_to_store(
&store,
vec![
create_basic_protocol_action(),
create_metadata_with_stats_config_and_partitions(
json1,
struct1,
vec!["category".into()],
),
],
0,
)
.await?;
let mut add = Add {
path: "category=books/file1.parquet".into(),
data_change: true,
stats: Some(
r#"{"numRecords":100,"minValues":{"id":1,"name":"alice"},"maxValues":{"id":100,"name":"zoe"},"nullCount":{"id":0,"name":5}}"#.into(),
),
..Default::default()
};
add.partition_values
.insert("category".into(), "books".into());
write_commit_to_store(&store, vec![Action::Add(add)], 1).await?;
let snapshot1 = Snapshot::builder_for(table_root.clone()).build(&engine)?;
snapshot1.checkpoint(&engine)?;
write_commit_to_store(
&store,
vec![create_metadata_with_stats_config_and_partitions(
json2,
struct2,
vec!["category".into()],
)],
2,
)
.await?;
let snapshot2 = Snapshot::builder_for(table_root).build(&engine)?;
let writer2 = snapshot2.create_checkpoint_writer()?;
let result2 = writer2.checkpoint_data(&engine)?;
let mut all_batches = Vec::new();
for batch_result in result2 {
let batch = batch_result?;
let data = batch.apply_selection_vector()?;
all_batches.push(data.try_into_record_batch()?);
}
verify_checkpoint_schema_with_partitions(
&all_batches[0].schema(),
json2,
struct2,
struct2, )?;
if struct2 {
let mut found_add = false;
for record_batch in &all_batches {
let add_col = record_batch
.column_by_name("add")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
for row in 0..record_batch.num_rows() {
if !add_col.is_valid(row) {
continue;
}
found_add = true;
let pv_parsed = add_col
.column_by_name("partitionValues_parsed")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let category_col = pv_parsed
.column_by_name("category")
.expect("partitionValues_parsed should have category field");
assert_eq!(category_col.as_string::<i32>().value(row), "books");
}
}
assert!(found_add, "should have found an add action");
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_checkpoint_with_varchar_metadata_on_field() -> DeltaResult<()> {
let (store, _) = new_in_memory_store();
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
let engine = DefaultEngineBuilder::new(store.clone())
.with_task_executor(executor)
.build();
let config = HashMap::from([
("delta.checkpoint.writeStatsAsJson".into(), "true".into()),
("delta.checkpoint.writeStatsAsStruct".into(), "true".into()),
]);
let schema_v0 = Arc::new(StructType::new_unchecked([
StructField::nullable("id", KernelDataType::LONG),
StructField::nullable("name", KernelDataType::STRING),
]));
write_commit_to_store(
&store,
vec![
create_basic_protocol_action(),
Action::Metadata(
Metadata::try_new(
Some("test".into()),
None,
schema_v0,
vec![],
0,
config.clone(),
)
.unwrap(),
),
Action::Add(Add {
path: "file1.parquet".into(),
data_change: true,
stats: Some(
r#"{"numRecords":10,"minValues":{"id":1,"name":"alice"},"maxValues":{"id":100,"name":"zoe"},"nullCount":{"id":0,"name":2}}"#.into(),
),
..Default::default()
}),
],
0,
)
.await?;
let table_root = Url::parse("memory:///")?;
Snapshot::builder_for(table_root.clone())
.build(&engine)?
.checkpoint(&engine)?;
let schema_v1 = Arc::new(StructType::new_unchecked([
StructField::nullable("id", KernelDataType::LONG),
StructField::nullable("name", KernelDataType::STRING).with_metadata([(
"__CHAR_VARCHAR_TYPE_STRING",
crate::schema::MetadataValue::String("varchar(255)".to_string()),
)]),
]));
write_commit_to_store(
&store,
vec![Action::Metadata(
Metadata::try_new(Some("test".into()), None, schema_v1, vec![], 0, config).unwrap(),
)],
1,
)
.await?;
Snapshot::builder_for(table_root)
.build(&engine)?
.checkpoint(&engine)?;
Ok(())
}