use url::Url;
use super::Crc;
use crate::table_properties::ENABLE_IN_COMMIT_TIMESTAMPS;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error};
pub(crate) fn try_write_crc_file(engine: &dyn Engine, path: &Url, crc: &Crc) -> DeltaResult<()> {
require!(
crc.file_stats_state.is_complete(),
Error::ChecksumWriteUnsupported(format!(
"Cannot write CRC file with {:?} file stats",
crc.file_stats_state
))
);
let ict_enabled = crc
.metadata
.configuration()
.get(ENABLE_IN_COMMIT_TIMESTAMPS)
.is_some_and(|v| v == "true");
let ict_value_present = crc.in_commit_timestamp_opt.is_some();
require!(
!ict_enabled || ict_value_present,
Error::ChecksumWriteUnsupported(
"Cannot write CRC file: In-Commit Timestamps enabled but inCommitTimestampOpt is absent"
.to_string()
)
);
let data = serde_json::to_vec(crc)?;
engine
.storage_handler()
.put(path, data.into(), false )
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use rstest::rstest;
use super::*;
use crate::actions::{DomainMetadata, Metadata, Protocol, SetTransaction};
use crate::crc::reader::try_read_crc_file;
use crate::crc::{
DomainMetadataState, FileSizeHistogram, FileStats, FileStatsState, SetTransactionState,
};
use crate::engine::sync::SyncEngine;
use crate::object_store::memory::InMemory;
use crate::path::{AsUrl, ParsedLogPath};
use crate::table_features::TableFeature;
fn writer_test_env() -> (SyncEngine, ParsedLogPath) {
let engine = SyncEngine::new_with_store(Arc::new(InMemory::new()));
let table_root = Url::parse("memory:///test_table/").unwrap();
let crc_path = ParsedLogPath::create_parsed_crc(&table_root, 0);
(engine, crc_path)
}
fn test_crc(ict_supported: bool, ict_enabled: bool) -> Crc {
let mut writer_features = vec![
TableFeature::ColumnMapping,
TableFeature::RowTracking,
TableFeature::DomainMetadata,
];
if ict_supported {
writer_features.push(TableFeature::InCommitTimestamp);
}
let protocol =
Protocol::try_new_modern([TableFeature::ColumnMapping], writer_features).unwrap();
let metadata = if ict_enabled {
Metadata::default().with_configuration_entry(ENABLE_IN_COMMIT_TIMESTAMPS, "true")
} else {
Metadata::default()
};
let domain_metadata = HashMap::from([(
"delta.rowTracking".to_string(),
DomainMetadata::new(
"delta.rowTracking".to_string(),
r#"{"rowIdHighWaterMark":1048576}"#.to_string(),
),
)]);
let ict = 1234567890;
let app_id = "testAppId".to_string();
let set_transactions =
HashMap::from([(app_id.clone(), SetTransaction::new(app_id, 1, Some(ict)))]);
let mut histogram = FileSizeHistogram::create_default();
for size in [100, 200, 300, 150, 274] {
histogram.insert(size).unwrap(); }
Crc {
file_stats_state: FileStatsState::Complete(FileStats {
num_files: 5,
table_size_bytes: 1024,
file_size_histogram: Some(histogram),
}),
protocol,
metadata,
txn_id: None,
in_commit_timestamp_opt: Some(ict),
set_transaction_state: SetTransactionState::Complete(set_transactions),
domain_metadata_state: DomainMetadataState::Complete(domain_metadata),
..Default::default()
}
}
#[test]
fn test_serde_round_trip() {
let crc = test_crc( true, true);
let json_bytes = serde_json::to_vec(&crc).unwrap();
let round_tripped: Crc = serde_json::from_slice(&json_bytes).unwrap();
assert_eq!(round_tripped, crc);
}
#[test]
fn test_write_then_read_crc_file() {
let (engine, crc_path) = writer_test_env();
let crc = test_crc( true, true);
try_write_crc_file(&engine, crc_path.location.as_url(), &crc).unwrap();
let read_back = try_read_crc_file(&engine, &crc_path).unwrap();
assert_eq!(read_back, crc);
}
#[test]
fn test_crc_serialized_json_content() {
let crc = test_crc( true, true);
let actual: serde_json::Value = serde_json::to_value(&crc).unwrap();
let actual_obj = actual.as_object().unwrap();
let expected_non_hist = serde_json::json!({
"tableSizeBytes": 1024,
"numFiles": 5,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "",
"name": null,
"description": null,
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "",
"partitionColumns": [],
"createdTime": null,
"configuration": {
"delta.enableInCommitTimestamps": "true"
}
},
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": ["columnMapping"],
"writerFeatures": [
"columnMapping",
"rowTracking",
"domainMetadata",
"inCommitTimestamp"
]
},
"inCommitTimestampOpt": 1234567890,
"domainMetadata": [
{
"domain": "delta.rowTracking",
"configuration": "{\"rowIdHighWaterMark\":1048576}",
"removed": false
}
],
"setTransactions": [
{
"appId": "testAppId",
"version": 1,
"lastUpdated": 1234567890
}
]
});
for (key, expected_val) in expected_non_hist.as_object().unwrap() {
assert_eq!(
actual_obj.get(key).unwrap(),
expected_val,
"Mismatch for key: {key}"
);
}
let hist = actual_obj.get("fileSizeHistogram").unwrap();
let boundaries = hist.get("sortedBinBoundaries").unwrap().as_array().unwrap();
let counts = hist.get("fileCounts").unwrap().as_array().unwrap();
let bytes = hist.get("totalBytes").unwrap().as_array().unwrap();
assert_eq!(boundaries.len(), 95);
assert_eq!(counts.len(), 95);
assert_eq!(bytes.len(), 95);
assert_eq!(counts[0].as_i64().unwrap(), 5);
assert_eq!(bytes[0].as_i64().unwrap(), 1024); }
#[test]
fn test_write_crc_file_already_exists() {
let (engine, crc_path) = writer_test_env();
let crc = test_crc( true, true);
try_write_crc_file(&engine, crc_path.location.as_url(), &crc).unwrap();
let result = try_write_crc_file(&engine, crc_path.location.as_url(), &crc);
assert!(matches!(result, Err(Error::FileAlreadyExists(_))));
}
#[test]
fn test_write_rejects_indeterminate_file_stats_with_checksum_write_unsupported() {
let (engine, crc_path) = writer_test_env();
let mut crc = test_crc( true, true);
crc.file_stats_state = FileStatsState::Indeterminate;
let result = try_write_crc_file(&engine, crc_path.location.as_url(), &crc);
assert!(matches!(result, Err(Error::ChecksumWriteUnsupported(_))));
}
#[rstest]
#[case::not_supported(false, false)]
#[case::supported_not_enabled(true, false)]
#[case::supported_and_enabled(true, true)]
fn test_write_enforces_ict_enablement_value_consistency(
#[case] ict_supported: bool,
#[case] ict_enabled: bool,
#[values(false, true)] ict_value_present: bool,
) {
let (engine, crc_path) = writer_test_env();
let mut crc = test_crc(ict_supported, ict_enabled);
if !ict_value_present {
crc.in_commit_timestamp_opt = None;
}
let should_succeed = !ict_enabled || ict_value_present;
let result = try_write_crc_file(&engine, crc_path.location.as_url(), &crc);
if should_succeed {
result.unwrap();
} else {
let err = result.unwrap_err();
assert!(
matches!(err, Error::ChecksumWriteUnsupported(_)),
"expected ChecksumWriteUnsupported, got: {err:?}"
);
}
}
}