use std::path::PathBuf;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{ArrayRef, Int32Array};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::crc::{Crc, FileStatsValidity};
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::object_store::local::LocalFileSystem;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::snapshot::{ChecksumWriteResult, Snapshot, SnapshotRef};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::transaction::data_layout::DataLayout;
use delta_kernel::{DeltaResult, Engine};
use rstest::rstest;
use test_utils::{add_commit, insert_data, test_table_setup};
#[tokio::test]
async fn test_get_file_stats_from_crc() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/crc-full/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(store).build();
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
assert_eq!(snapshot.version(), 0);
let file_stats = snapshot.get_or_load_file_stats(&engine).unwrap();
assert_eq!(file_stats.num_files, 10);
assert_eq!(file_stats.table_size_bytes, 5259);
assert!(file_stats.file_size_histogram.is_some());
Ok(())
}
#[tokio::test]
async fn test_get_file_stats_no_crc() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("value", DataType::STRING, true),
])?);
let _ = create_table(&table_path, schema, "Test/1.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let file_stats = snapshot.get_or_load_file_stats(engine.as_ref());
assert_eq!(file_stats, None);
Ok(())
}
#[tokio::test]
async fn test_get_file_stats_crc_not_at_snapshot_version() -> DeltaResult<()> {
use test_utils::copy_directory;
let (_temp_dir, table_path, engine) = test_table_setup()?;
let source_path = std::fs::canonicalize(PathBuf::from("./tests/data/crc-full/")).unwrap();
copy_directory(&source_path, _temp_dir.path()).unwrap();
let snapshot = Snapshot::builder_for(table_path.clone()).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
assert!(snapshot.get_or_load_file_stats(engine.as_ref()).is_some());
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?;
let _ = txn.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(table_path).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
let file_stats = snapshot.get_or_load_file_stats(engine.as_ref());
assert_eq!(file_stats, None);
Ok(())
}
#[tokio::test]
async fn test_get_current_crc_if_loaded_returns_loaded_crc() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/crc-full/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(store).build();
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
assert_eq!(snapshot.version(), 0);
let crc = snapshot.get_current_crc_if_loaded_for_testing().unwrap();
let file_stats = crc.file_stats().unwrap();
assert_eq!(file_stats.table_size_bytes, 5259);
assert_eq!(file_stats.num_files, 10);
assert_eq!(crc.num_metadata, 1);
assert_eq!(crc.num_protocol, 1);
assert_eq!(crc.protocol, *snapshot.table_configuration().protocol());
assert_eq!(crc.metadata, *snapshot.table_configuration().metadata());
let dms = crc.domain_metadata.as_ref().unwrap();
assert_eq!(dms.len(), 3);
assert!(dms.contains_key("delta.clustering"));
assert!(dms.contains_key("delta.rowTracking"));
assert!(dms.contains_key("myApp.metadata"));
Ok(())
}
#[tokio::test]
async fn test_get_current_crc_if_loaded_returns_none_when_no_crc() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let _ = create_table(&table_path, schema, "Test/1.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
assert!(snapshot.get_current_crc_if_loaded_for_testing().is_none());
Ok(())
}
fn create_table_and_commit(
table_path: &str,
engine: &dyn delta_kernel::Engine,
) -> DeltaResult<delta_kernel::transaction::CommittedTransaction> {
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let txn = create_table(table_path, schema, "test_engine")
.with_data_layout(DataLayout::clustered(["id"]))
.build(engine, Box::new(FileSystemCommitter::new()))?
.with_domain_metadata("zip".to_string(), "zap0".to_string());
Ok(txn.commit(engine)?.unwrap_committed())
}
#[tokio::test]
async fn test_create_table_produces_post_commit_crc() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
assert_eq!(committed.commit_version(), 0);
let snapshot = committed.post_commit_snapshot().unwrap();
let crc = snapshot.get_current_crc_if_loaded_for_testing().unwrap();
let file_stats = crc.file_stats().unwrap();
assert_eq!(file_stats.num_files, 0);
assert_eq!(file_stats.table_size_bytes, 0);
assert_eq!(crc.num_metadata, 1);
assert_eq!(crc.num_protocol, 1);
assert_eq!(crc.protocol, *snapshot.table_configuration().protocol());
assert_eq!(crc.metadata, *snapshot.table_configuration().metadata());
let dms = crc.domain_metadata.as_ref().unwrap();
assert_eq!(dms["zip"].configuration(), "zap0");
Ok(())
}
#[rstest]
#[case::with_in_memory_crc(true)]
#[case::without_crc(false)]
#[tokio::test]
async fn test_post_commit_crc_chains_only_if_read_snapshot_has_crc(
#[case] use_post_commit_snapshot: bool,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let create_committed = create_table_and_commit(&table_path, engine.as_ref())?;
let read_snapshot = if use_post_commit_snapshot {
create_committed.post_commit_snapshot().unwrap().clone()
} else {
Snapshot::builder_for(table_path).build(engine.as_ref())?
};
assert_eq!(
read_snapshot
.get_current_crc_if_loaded_for_testing()
.is_some(),
use_post_commit_snapshot
);
let committed = read_snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_domain_metadata("zip".to_string(), "zap1".to_string())
.commit(engine.as_ref())?
.unwrap_committed();
assert_eq!(committed.commit_version(), 1);
assert_eq!(
committed
.post_commit_snapshot()
.unwrap()
.get_current_crc_if_loaded_for_testing()
.is_some(),
use_post_commit_snapshot
);
Ok(())
}
fn write_and_verify_crc(
snapshot: &SnapshotRef,
table_path: &str,
engine: &dyn delta_kernel::Engine,
) -> Crc {
let crc_in_memory = snapshot.get_current_crc_if_loaded_for_testing().unwrap();
snapshot.write_checksum(engine).unwrap();
let snapshot_fresh = Snapshot::builder_for(table_path).build(engine).unwrap();
let crc_from_disk = snapshot_fresh
.get_current_crc_if_loaded_for_testing()
.unwrap();
assert_eq!(crc_in_memory, crc_from_disk);
crc_from_disk.clone()
}
#[tokio::test]
async fn test_post_commit_crc_tracks_file_stats_across_inserts() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed.post_commit_snapshot().unwrap().clone();
let col1: ArrayRef = Arc::new(Int32Array::from((1..=10).collect::<Vec<_>>()));
let committed = insert_data(snapshot_v0, &engine, vec![col1])
.await?
.unwrap_committed();
assert_eq!(committed.commit_version(), 1);
let snapshot_v1 = committed.post_commit_snapshot().unwrap();
let crc_v1 = write_and_verify_crc(snapshot_v1, &table_path, engine.as_ref());
let stats_v1 = crc_v1.file_stats().unwrap();
assert_eq!(stats_v1.num_files, 1); assert!(stats_v1.table_size_bytes > 0);
let col2: ArrayRef = Arc::new(Int32Array::from((11..=20).collect::<Vec<_>>()));
let committed = insert_data(snapshot_v1.clone(), &engine, vec![col2])
.await?
.unwrap_committed();
assert_eq!(committed.commit_version(), 2);
let snapshot_v2 = committed.post_commit_snapshot().unwrap();
let crc_v2 = write_and_verify_crc(snapshot_v2, &table_path, engine.as_ref());
let stats_v2 = crc_v2.file_stats().unwrap();
assert_eq!(stats_v2.num_files, 2); assert!(stats_v2.table_size_bytes > stats_v1.table_size_bytes);
let scan = snapshot_v2.clone().scan_builder().build()?;
let mut txn = snapshot_v2
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("DELETE".to_string())
.with_data_change(true);
for sm in scan.scan_metadata(engine.as_ref())? {
txn.remove_files(sm?.scan_files);
}
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
assert_eq!(committed.commit_version(), 3);
let snapshot_v3 = committed.post_commit_snapshot().unwrap();
let crc_v3 = write_and_verify_crc(snapshot_v3, &table_path, engine.as_ref());
let stats_v3 = crc_v3.file_stats().unwrap();
assert_eq!(stats_v3.num_files, 0); assert_eq!(stats_v3.table_size_bytes, 0);
Ok(())
}
#[tokio::test]
async fn test_post_commit_crc_tracks_domain_metadata_changes() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed.post_commit_snapshot().unwrap();
let crc_v0 = write_and_verify_crc(snapshot_v0, &table_path, engine.as_ref());
let dms = crc_v0.domain_metadata.as_ref().unwrap();
assert_eq!(dms["zip"].configuration(), "zap0");
let txn = snapshot_v0
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_domain_metadata("zip".to_string(), "zap1".to_string()) .with_domain_metadata("foo".to_string(), "bar".to_string()); let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot_v1 = committed.post_commit_snapshot().unwrap();
let crc_v1 = write_and_verify_crc(snapshot_v1, &table_path, engine.as_ref());
let dms = crc_v1.domain_metadata.as_ref().unwrap();
assert_eq!(dms["zip"].configuration(), "zap1"); assert_eq!(dms["foo"].configuration(), "bar");
let txn = snapshot_v1
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_domain_metadata_removed("zip".to_string()); let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot_v2 = committed.post_commit_snapshot().unwrap();
let crc_v2 = write_and_verify_crc(snapshot_v2, &table_path, engine.as_ref());
let dms = crc_v2.domain_metadata.as_ref().unwrap();
assert!(!dms.contains_key("zip")); assert_eq!(dms["foo"].configuration(), "bar");
Ok(())
}
#[tokio::test]
async fn test_post_commit_crc_non_incremental_op_makes_file_stats_indeterminate() -> DeltaResult<()>
{
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed.post_commit_snapshot().unwrap().clone();
let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let committed = insert_data(snapshot_v0, &engine, vec![col])
.await?
.unwrap_committed();
let snapshot_v1 = committed.post_commit_snapshot().unwrap();
let committed = snapshot_v1
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("ANALYZE STATS".to_string())
.commit(engine.as_ref())?
.unwrap_committed();
assert_eq!(committed.commit_version(), 2);
let snapshot_v2 = committed.post_commit_snapshot().unwrap();
let crc_v2 = snapshot_v2.get_current_crc_if_loaded_for_testing().unwrap();
assert_eq!(crc_v2.file_stats_validity, FileStatsValidity::Indeterminate);
Ok(())
}
#[tokio::test]
async fn test_write_checksum_success_simple() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot = committed.post_commit_snapshot().unwrap();
let (result, _updated) = snapshot.write_checksum(engine.as_ref())?;
assert_eq!(result, ChecksumWriteResult::Written);
let fresh_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_snapshot
.get_current_crc_if_loaded_for_testing()
.is_some());
Ok(())
}
#[rstest]
#[case::same_snapshot(false)]
#[case::fresh_snapshot(true)]
#[tokio::test]
async fn test_write_checksum_double_write_returns_already_exists(
#[case] reload_snapshot: bool,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot = committed.post_commit_snapshot().unwrap();
let (first, updated) = snapshot.write_checksum(engine.as_ref())?;
assert_eq!(first, ChecksumWriteResult::Written);
let second = if reload_snapshot {
let fresh = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let (result, _) = fresh.write_checksum(engine.as_ref())?;
result
} else {
let (result, _) = updated.write_checksum(engine.as_ref())?;
result
};
assert_eq!(second, ChecksumWriteResult::AlreadyExists);
Ok(())
}
#[tokio::test]
async fn test_write_checksum_with_no_in_memory_crc_returns_error() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let _ = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let result = snapshot.write_checksum(engine.as_ref());
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_in_memory_crc_chains_across_multiple_commits_then_writes() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let mut snapshot = committed.post_commit_snapshot().unwrap().clone();
assert!(snapshot.get_current_crc_if_loaded_for_testing().is_some());
for i in 0..5 {
let col: ArrayRef = Arc::new(Int32Array::from(vec![i]));
let committed = insert_data(snapshot, &engine, vec![col])
.await?
.unwrap_committed();
snapshot = committed.post_commit_snapshot().unwrap().clone();
assert!(
snapshot.get_current_crc_if_loaded_for_testing().is_some(),
"in-memory CRC lost at commit {}",
committed.commit_version()
);
}
assert_eq!(snapshot.version(), 5);
let crc = write_and_verify_crc(&snapshot, &table_path, engine.as_ref());
let crc_stats = crc.file_stats().unwrap();
assert_eq!(crc_stats.num_files, 5);
assert!(crc_stats.table_size_bytes > 0);
Ok(())
}
#[tokio::test]
async fn test_incremental_snapshot_preserves_loaded_crc() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed_v0 = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed_v0.post_commit_snapshot().unwrap();
snapshot_v0.write_checksum(engine.as_ref())?;
let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let committed_v1 = insert_data(snapshot_v0.clone(), &engine, vec![col])
.await?
.unwrap_committed();
committed_v1
.post_commit_snapshot()
.unwrap()
.write_checksum(engine.as_ref())?;
let fresh_v0 = Snapshot::builder_for(&table_path)
.at_version(0)
.build(engine.as_ref())?;
assert_eq!(fresh_v0.version(), 0);
let incremental_v1 = Snapshot::builder_from(fresh_v0).build(engine.as_ref())?;
assert_eq!(incremental_v1.version(), 1);
assert_eq!(incremental_v1.crc_version_for_testing(), Some(1));
assert!(
incremental_v1
.get_current_crc_if_loaded_for_testing()
.is_some(),
"CRC should be loaded at v1 after incremental snapshot update"
);
let col: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
let committed_v2 = insert_data(incremental_v1, &engine, vec![col])
.await?
.unwrap_committed();
assert_eq!(committed_v2.commit_version(), 2);
let snapshot_v2 = committed_v2.post_commit_snapshot().unwrap();
assert!(
snapshot_v2
.get_current_crc_if_loaded_for_testing()
.is_some(),
"Post-commit CRC should chain from incremental snapshot's CRC"
);
Ok(())
}
#[tokio::test]
async fn test_incremental_snapshot_old_crc_no_new_crc() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed_v0 = create_table_and_commit(&table_path, engine.as_ref())?;
committed_v0
.post_commit_snapshot()
.unwrap()
.write_checksum(engine.as_ref())?;
let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let committed_v1 = insert_data(
committed_v0.post_commit_snapshot().unwrap().clone(),
&engine,
vec![col],
)
.await?
.unwrap_committed();
assert_eq!(committed_v1.commit_version(), 1);
let fresh_v0 = Snapshot::builder_for(&table_path)
.at_version(0)
.build(engine.as_ref())?;
assert!(
fresh_v0.get_current_crc_if_loaded_for_testing().is_some(),
"Fresh v0 snapshot should have CRC loaded from 0.crc"
);
let incremental_v1 = Snapshot::builder_from(fresh_v0).build(engine.as_ref())?;
assert_eq!(incremental_v1.version(), 1);
assert!(
incremental_v1
.get_current_crc_if_loaded_for_testing()
.is_none(),
"CRC at v0 should not be reported as loaded at v1 (version mismatch)"
);
Ok(())
}
#[rstest]
#[case::dm_feature_supported(true)]
#[case::dm_feature_not_supported(false)]
#[tokio::test]
async fn test_write_checksum_with_no_dms_writes_empty_list(
#[case] dm_supported: bool,
) -> DeltaResult<()> {
use std::collections::HashMap;
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let mut builder = create_table(&table_path, schema, "test_engine");
if dm_supported {
let properties = HashMap::from([(
"delta.feature.domainMetadata".to_string(),
"supported".to_string(),
)]);
builder = builder.with_table_properties(properties);
}
let committed = builder
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot = committed.post_commit_snapshot().unwrap();
assert!(snapshot
.get_all_domain_metadata(engine.as_ref())?
.is_empty());
let crc = write_and_verify_crc(snapshot, &table_path, engine.as_ref());
assert_eq!(crc.domain_metadata, Some(Default::default()));
Ok(())
}
struct FailingEngine;
impl Engine for FailingEngine {
fn evaluation_handler(&self) -> Arc<dyn delta_kernel::EvaluationHandler> {
unimplemented!()
}
fn storage_handler(&self) -> Arc<dyn delta_kernel::StorageHandler> {
unimplemented!()
}
fn json_handler(&self) -> Arc<dyn delta_kernel::JsonHandler> {
unimplemented!()
}
fn parquet_handler(&self) -> Arc<dyn delta_kernel::ParquetHandler> {
unimplemented!()
}
}
#[tokio::test]
async fn test_get_domain_metadata_with_crc_skips_log_replay() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed.post_commit_snapshot().unwrap();
let committed = snapshot_v0
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_domain_metadata("zip".to_string(), "zap1".to_string())
.with_domain_metadata("foo".to_string(), "bar".to_string())
.commit(engine.as_ref())?
.unwrap_committed();
let assert_domain_metadata = |snapshot: &Snapshot, engine: &dyn delta_kernel::Engine| {
assert_eq!(
snapshot.get_domain_metadata("zip", engine).unwrap(),
Some("zap1".to_string())
);
assert_eq!(
snapshot.get_domain_metadata("foo", engine).unwrap(),
Some("bar".to_string())
);
assert!(snapshot
.get_domain_metadata_internal("delta.clustering", engine)
.unwrap()
.is_some());
assert_eq!(
snapshot
.get_domain_metadatas_internal(engine, None)
.unwrap()
.len(),
3
);
};
let post_commit_snapshot = committed.post_commit_snapshot().unwrap();
assert!(post_commit_snapshot
.get_current_crc_if_loaded_for_testing()
.is_some());
assert_domain_metadata(post_commit_snapshot, &FailingEngine);
let fresh_snapshot_no_crc = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_snapshot_no_crc
.get_current_crc_if_loaded_for_testing()
.is_none());
assert_domain_metadata(&fresh_snapshot_no_crc, engine.as_ref());
let _ = post_commit_snapshot.write_checksum(engine.as_ref())?;
let fresh_snapshot_with_crc = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_snapshot_with_crc
.get_current_crc_if_loaded_for_testing()
.is_some());
assert_domain_metadata(&fresh_snapshot_with_crc, &FailingEngine);
Ok(())
}
#[tokio::test]
async fn test_set_transaction_crc_tracking_and_fast_path() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table_and_commit(&table_path, engine.as_ref())?;
let snapshot_v0 = committed.post_commit_snapshot().unwrap();
let crc_v0 = write_and_verify_crc(snapshot_v0, &table_path, engine.as_ref());
assert_eq!(crc_v0.set_transactions, Some(Default::default()));
let fresh_v0 = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_v0.get_current_crc_if_loaded_for_testing().is_some());
assert_eq!(
fresh_v0
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
None
);
let committed = snapshot_v0
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_transaction_id("my-app".to_string(), 1)
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot_v1 = committed.post_commit_snapshot().unwrap();
assert_eq!(
snapshot_v1
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
Some(1)
);
assert_eq!(
snapshot_v1
.get_app_id_version("nonexistent", &FailingEngine)
.unwrap(),
None
);
let crc_v1 = write_and_verify_crc(snapshot_v1, &table_path, engine.as_ref());
let txns_v1 = crc_v1.set_transactions.as_ref().unwrap();
assert_eq!(txns_v1.len(), 1);
assert!(txns_v1.contains_key("my-app"));
let fresh_v1 = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_v1.get_current_crc_if_loaded_for_testing().is_some());
assert_eq!(
fresh_v1
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
Some(1)
);
assert_eq!(
fresh_v1
.get_app_id_version("nonexistent", &FailingEngine)
.unwrap(),
None
);
let committed = snapshot_v1
.clone()
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_transaction_id("my-app".to_string(), 2)
.with_transaction_id("other-app".to_string(), 1)
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot_v2 = committed.post_commit_snapshot().unwrap();
assert_eq!(
snapshot_v2
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
Some(2)
);
assert_eq!(
snapshot_v2
.get_app_id_version("other-app", &FailingEngine)
.unwrap(),
Some(1)
);
let crc_v2 = write_and_verify_crc(snapshot_v2, &table_path, engine.as_ref());
let txns_v2 = crc_v2.set_transactions.as_ref().unwrap();
assert_eq!(txns_v2.len(), 2);
let fresh_v2 = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert!(fresh_v2.get_current_crc_if_loaded_for_testing().is_some());
assert_eq!(
fresh_v2
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
Some(2)
);
assert_eq!(
fresh_v2
.get_app_id_version("other-app", &FailingEngine)
.unwrap(),
Some(1)
);
Ok(())
}
#[rstest]
#[case::zero_retention_expires(Some("interval 0 seconds"), None)]
#[case::large_retention_not_expired(Some("interval 365 days"), Some(1))]
#[case::no_retention_no_filtering(None, Some(1))]
#[tokio::test]
async fn test_set_txn_expiration_via_crc_fast_path(
#[case] retention: Option<&str>,
#[case] expected: Option<i64>,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let mut builder = create_table(&table_path, schema, "test_engine");
if let Some(r) = retention {
builder = builder.with_table_properties([("delta.setTransactionRetentionDuration", r)]);
}
let committed = builder
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot_v0 = committed.post_commit_snapshot().unwrap().clone();
let committed = snapshot_v0
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_operation("WRITE".to_string())
.with_transaction_id("my-app".to_string(), 1)
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot_v1 = committed.post_commit_snapshot().unwrap();
snapshot_v1.write_checksum(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
assert!(snapshot.get_current_crc_if_loaded_for_testing().is_some());
assert_eq!(
snapshot
.get_app_id_version("my-app", &FailingEngine)
.unwrap(),
expected
);
Ok(())
}
#[tokio::test]
async fn test_set_txn_null_last_updated_never_expires_via_log_replay() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
create_table(&table_path, schema, "test_engine")
.with_table_properties([(
"delta.setTransactionRetentionDuration",
"interval 0 seconds",
)])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?
.unwrap_committed();
let store = Arc::new(LocalFileSystem::new());
add_commit(
&table_path,
store.as_ref(),
1,
r#"{"txn":{"appId":"null-app","version":42}}"#.to_string(),
)
.await
.unwrap();
let fresh = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(fresh.version(), 1);
assert_eq!(
fresh.get_app_id_version("null-app", engine.as_ref())?,
Some(42)
);
Ok(())
}