#![allow(unreachable_pub)]
mod delta;
mod file_size_histogram;
mod file_stats;
mod lazy;
mod reader;
mod state;
mod writer;
use std::collections::HashMap;
#[allow(unused)]
pub(crate) use delta::CrcDelta;
pub use file_size_histogram::FileSizeHistogram;
pub use file_stats::FileStats;
#[allow(unused)]
pub(crate) use file_stats::{is_incremental_safe_operation, FileStatsDelta};
pub(crate) use lazy::{CrcLoadResult, LazyCrc};
pub(crate) use reader::try_read_crc_file;
use serde::de::Deserializer;
use serde::{Deserialize, Serialize};
pub use state::{DomainMetadataState, FileStatsState, SetTransactionState};
#[allow(unused)]
pub(crate) use writer::try_write_crc_file;
use crate::actions::{Add, DomainMetadata, Metadata, Protocol, SetTransaction};
use crate::Error;
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
#[serde(try_from = "CrcRaw")]
pub struct Crc {
pub metadata: Metadata,
pub protocol: Protocol,
pub(crate) file_stats_state: FileStatsState,
pub in_commit_timestamp_opt: Option<i64>,
pub set_transaction_state: SetTransactionState,
pub domain_metadata_state: DomainMetadataState,
pub(crate) txn_id: Option<String>,
pub(crate) all_files: Option<Vec<Add>>,
pub(crate) num_deleted_records_opt: Option<i64>,
pub(crate) num_deletion_vectors_opt: Option<i64>,
pub(crate) deleted_record_counts_histogram_opt: Option<DeletedRecordCountsHistogram>,
}
impl Crc {
pub fn file_stats(&self) -> Option<&FileStats> {
self.file_stats_state.file_stats()
}
#[cfg(any(test, feature = "test-utils"))]
pub fn file_stats_state(&self) -> &FileStatsState {
&self.file_stats_state
}
}
impl Serialize for Crc {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
CrcRaw::try_from(self)
.map_err(serde::ser::Error::custom)?
.serialize(serializer)
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
struct CrcRaw {
table_size_bytes: i64,
num_files: i64,
num_metadata: i64,
num_protocol: i64,
metadata: Metadata,
protocol: Protocol,
#[serde(default, skip_serializing_if = "Option::is_none")]
in_commit_timestamp_opt: Option<i64>,
#[serde(default)]
set_transactions: Option<Vec<SetTransaction>>,
#[serde(default)]
domain_metadata: Option<Vec<DomainMetadata>>,
#[serde(
default,
alias = "histogramOpt",
deserialize_with = "de_validated_file_size_histogram",
skip_serializing_if = "Option::is_none"
)]
file_size_histogram: Option<FileSizeHistogram>,
}
impl TryFrom<CrcRaw> for Crc {
type Error = Error;
fn try_from(raw: CrcRaw) -> Result<Self, Self::Error> {
for (name, value) in [
("numMetadata", raw.num_metadata),
("numProtocol", raw.num_protocol),
] {
if value != 1 {
return Err(Error::generic(format!(
"CRC file has invalid {name}: expected 1, got {value}"
)));
}
}
let file_stats_state = FileStatsState::Complete(FileStats {
num_files: raw.num_files,
table_size_bytes: raw.table_size_bytes,
file_size_histogram: raw.file_size_histogram,
});
Ok(Crc {
metadata: raw.metadata,
protocol: raw.protocol,
file_stats_state,
in_commit_timestamp_opt: raw.in_commit_timestamp_opt,
set_transaction_state: match raw.set_transactions {
Some(v) => SetTransactionState::Complete(
v.into_iter().map(|t| (t.app_id.clone(), t)).collect(),
),
None => SetTransactionState::Partial(HashMap::new()),
},
domain_metadata_state: match raw.domain_metadata {
Some(v) => DomainMetadataState::Complete(
v.into_iter().map(|d| (d.domain().to_string(), d)).collect(),
),
None => DomainMetadataState::Partial(HashMap::new()),
},
txn_id: None,
all_files: None,
num_deleted_records_opt: None,
num_deletion_vectors_opt: None,
deleted_record_counts_histogram_opt: None,
})
}
}
impl TryFrom<&Crc> for CrcRaw {
type Error = Error;
fn try_from(crc: &Crc) -> Result<Self, Self::Error> {
let FileStatsState::Complete(stats) = &crc.file_stats_state else {
return Err(Error::ChecksumWriteUnsupported(format!(
"Cannot serialize CRC with {:?} file stats",
crc.file_stats_state
)));
};
Ok(CrcRaw {
table_size_bytes: stats.table_size_bytes,
num_files: stats.num_files,
num_metadata: 1,
num_protocol: 1,
metadata: crc.metadata.clone(),
protocol: crc.protocol.clone(),
in_commit_timestamp_opt: crc.in_commit_timestamp_opt,
set_transactions: match &crc.set_transaction_state {
SetTransactionState::Complete(m) => Some(m.values().cloned().collect()),
SetTransactionState::Partial(_) => None,
},
domain_metadata: match &crc.domain_metadata_state {
DomainMetadataState::Complete(m) => Some(m.values().cloned().collect()),
DomainMetadataState::Partial(_) => None,
},
file_size_histogram: stats.file_size_histogram.clone(),
})
}
}
fn de_validated_file_size_histogram<'de, D>(
deserializer: D,
) -> Result<Option<FileSizeHistogram>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<FileSizeHistogram> = Option::deserialize(deserializer)?;
match opt {
Some(hist) => FileSizeHistogram::try_new(
hist.sorted_bin_boundaries,
hist.file_counts,
hist.total_bytes,
)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeletedRecordCountsHistogram {
pub(crate) deleted_record_counts: Vec<i64>,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use rstest::rstest;
use super::{Crc, CrcRaw, DomainMetadataState, FileStats, FileStatsState, SetTransactionState};
use crate::actions::{DomainMetadata, SetTransaction};
fn crc_with(
set_transaction_state: SetTransactionState,
domain_metadata_state: DomainMetadataState,
) -> Crc {
Crc {
set_transaction_state,
domain_metadata_state,
..Default::default()
}
}
#[test]
fn de_vec_to_map_produces_correct_keys_and_values() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1},
"setTransactions": [
{"appId": "app-1", "version": 3, "lastUpdated": 1000},
{"appId": "app-2", "version": 7}
],
"domainMetadata": [
{"domain": "delta.rowTracking", "configuration": "{\"rowIdHighWaterMark\":1}", "removed": false},
{"domain": "delta.clustering", "configuration": "{}", "removed": false}
]
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
let txns = crc.set_transaction_state.expect_complete();
assert_eq!(txns.len(), 2);
let txn1 = &txns["app-1"];
assert_eq!(txn1.app_id, "app-1");
assert_eq!(txn1.version, 3);
assert_eq!(txn1.last_updated, Some(1000));
let txn2 = &txns["app-2"];
assert_eq!(txn2.app_id, "app-2");
assert_eq!(txn2.version, 7);
assert_eq!(txn2.last_updated, None);
let domains = crc.domain_metadata_state.expect_complete();
assert_eq!(domains.len(), 2);
assert!(domains.contains_key("delta.rowTracking"));
assert!(domains.contains_key("delta.clustering"));
}
#[test]
fn de_null_dm_and_txns_deserialize_to_partial_empty() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1},
"setTransactions": null,
"domainMetadata": null
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
assert_eq!(
crc.set_transaction_state,
SetTransactionState::Partial(HashMap::new())
);
assert_eq!(
crc.domain_metadata_state,
DomainMetadataState::Partial(HashMap::new())
);
}
#[test]
fn de_missing_dm_and_txns_fields_deserialize_to_partial_empty() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1}
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
assert_eq!(
crc.set_transaction_state,
SetTransactionState::Partial(HashMap::new())
);
assert_eq!(
crc.domain_metadata_state,
DomainMetadataState::Partial(HashMap::new())
);
}
#[test]
fn ser_partial_dm_and_partial_txns_serialize_to_null() {
let crc = crc_with(
SetTransactionState::Partial(HashMap::new()),
DomainMetadataState::Partial(HashMap::new()),
);
let json = serde_json::to_value(&crc).unwrap();
assert!(json["setTransactions"].is_null());
assert!(json["domainMetadata"].is_null());
}
#[test]
fn ser_non_empty_partial_dm_still_serializes_to_null() {
let mut partial = HashMap::new();
partial.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new("delta.rowTracking".to_string(), "{}".to_string()),
);
let crc = crc_with(
SetTransactionState::Partial(HashMap::new()),
DomainMetadataState::Partial(partial),
);
let json = serde_json::to_value(&crc).unwrap();
assert!(json["domainMetadata"].is_null());
}
#[test]
fn ser_non_empty_partial_txns_still_serializes_to_null() {
let mut partial = HashMap::new();
partial.insert(
"my-app".to_string(),
SetTransaction::new("my-app".to_string(), 1, None),
);
let crc = crc_with(
SetTransactionState::Partial(partial),
DomainMetadataState::Partial(HashMap::new()),
);
let json = serde_json::to_value(&crc).unwrap();
assert!(json["setTransactions"].is_null());
}
#[test]
fn ser_map_round_trips_through_vec() {
let mut txns = HashMap::new();
txns.insert(
"app-1".to_string(),
SetTransaction::new("app-1".to_string(), 5, Some(2000)),
);
txns.insert(
"app-2".to_string(),
SetTransaction::new("app-2".to_string(), 10, None),
);
let mut domains = HashMap::new();
domains.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new("delta.rowTracking".to_string(), "{}".to_string()),
);
let original = crc_with(
SetTransactionState::Complete(txns),
DomainMetadataState::Complete(domains),
);
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(original, deserialized);
}
#[test]
fn round_trip_empty_complete_dm_and_empty_txns() {
let original = crc_with(
SetTransactionState::Complete(HashMap::new()),
DomainMetadataState::Complete(HashMap::new()),
);
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(original, deserialized);
let json_value = serde_json::to_value(&original).unwrap();
assert_eq!(json_value["setTransactions"], serde_json::json!([]));
assert_eq!(json_value["domainMetadata"], serde_json::json!([]));
}
#[test]
fn round_trip_partial_dm_becomes_empty_partial() {
let mut partial = HashMap::new();
partial.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new("delta.rowTracking".to_string(), "{}".to_string()),
);
let original = crc_with(
SetTransactionState::Partial(HashMap::new()),
DomainMetadataState::Partial(partial),
);
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(
deserialized.domain_metadata_state,
DomainMetadataState::Partial(HashMap::new())
);
}
#[test]
fn partial_txns_written_as_null_reads_back_as_empty_partial() {
let mut partial = HashMap::new();
partial.insert(
"my-app".to_string(),
SetTransaction::new("my-app".to_string(), 7, Some(1000)),
);
let original = crc_with(
SetTransactionState::Partial(partial),
DomainMetadataState::Partial(HashMap::new()),
);
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(
deserialized.set_transaction_state,
SetTransactionState::Partial(HashMap::new())
);
}
#[test]
fn test_crc_with_multiple_domain_metadatas_and_set_transactions() {
let mut txns = HashMap::new();
txns.insert(
"streaming-app".to_string(),
SetTransaction::new("streaming-app".to_string(), 42, Some(1700000000)),
);
txns.insert(
"batch-job".to_string(),
SetTransaction::new("batch-job".to_string(), 100, None),
);
txns.insert(
"etl-pipeline".to_string(),
SetTransaction::new("etl-pipeline".to_string(), 7, Some(1700001000)),
);
let mut domains = HashMap::new();
domains.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new(
"delta.rowTracking".to_string(),
r#"{"rowIdHighWaterMark":500}"#.to_string(),
),
);
domains.insert(
"delta.clustering".to_string(),
DomainMetadata::new("delta.clustering".to_string(), "{}".to_string()),
);
domains.insert(
"custom.app".to_string(),
DomainMetadata::new("custom.app".to_string(), r#"{"version":"2.0"}"#.to_string()),
);
let crc = Crc {
file_stats_state: FileStatsState::Complete(FileStats {
num_files: 10,
table_size_bytes: 1024 * 1024,
file_size_histogram: None,
}),
set_transaction_state: SetTransactionState::Complete(txns),
domain_metadata_state: DomainMetadataState::Complete(domains),
..Default::default()
};
let json_str = serde_json::to_string(&crc).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
let stats = deserialized.file_stats().unwrap();
assert_eq!(stats.table_size_bytes(), 1024 * 1024);
assert_eq!(stats.num_files(), 10);
let txns = deserialized.set_transaction_state.expect_complete();
assert_eq!(txns.len(), 3);
assert_eq!(txns["streaming-app"].version, 42);
assert_eq!(txns["streaming-app"].last_updated, Some(1700000000));
assert_eq!(txns["batch-job"].version, 100);
assert_eq!(txns["batch-job"].last_updated, None);
assert_eq!(txns["etl-pipeline"].version, 7);
let domains = deserialized.domain_metadata_state.expect_complete();
assert_eq!(domains.len(), 3);
assert!(domains.contains_key("delta.rowTracking"));
assert!(domains.contains_key("delta.clustering"));
assert!(domains.contains_key("custom.app"));
assert_eq!(
domains["custom.app"].configuration(),
r#"{"version":"2.0"}"#
);
assert_eq!(crc, deserialized);
}
fn crc_json_with_counts(num_metadata: i64, num_protocol: i64) -> String {
format!(
r#"{{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": {num_metadata},
"numProtocol": {num_protocol},
"metadata": {{
"id": "test",
"format": {{"provider": "parquet", "options": {{}}}},
"schemaString": "{{\"type\":\"struct\",\"fields\":[]}}",
"partitionColumns": [],
"configuration": {{}},
"createdTime": 0
}},
"protocol": {{"minReaderVersion": 1, "minWriterVersion": 1}}
}}"#
)
}
#[rstest]
#[case::num_metadata("numMetadata", |b| (b, 1))]
#[case::num_protocol("numProtocol", |b| (1, b))]
fn de_invalid_count_is_rejected(
#[case] field: &str,
#[case] counts: fn(i64) -> (i64, i64),
#[values(0i64, 2, 3, -1)] bad: i64,
) {
let (m, p) = counts(bad);
let json = crc_json_with_counts(m, p);
let err = serde_json::from_str::<Crc>(&json).unwrap_err().to_string();
assert!(
err.contains(field),
"expected error to mention {field} for value {bad}, got: {err}"
);
}
#[test]
fn ser_indeterminate_file_stats_returns_error() {
let crc = Crc {
file_stats_state: FileStatsState::Indeterminate,
..Default::default()
};
let err = serde_json::to_string(&crc).unwrap_err().to_string();
assert!(
err.contains("Cannot serialize CRC"),
"expected serialize-rejection error, got: {err}"
);
}
#[test]
fn try_from_ref_indeterminate_returns_checksum_write_unsupported() {
let crc = Crc {
file_stats_state: FileStatsState::Indeterminate,
..Default::default()
};
let err = CrcRaw::try_from(&crc).unwrap_err();
assert!(
matches!(err, crate::Error::ChecksumWriteUnsupported(_)),
"expected ChecksumWriteUnsupported, got: {err:?}"
);
}
fn crc_json_with_histogram(field_name: &str, histogram_json: &str) -> String {
format!(
r#"{{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {{
"id": "test",
"format": {{"provider": "parquet", "options": {{}}}},
"schemaString": "{{\"type\":\"struct\",\"fields\":[]}}",
"partitionColumns": [],
"configuration": {{}},
"createdTime": 0
}},
"protocol": {{"minReaderVersion": 1, "minWriterVersion": 1}},
"{field_name}": {histogram_json}
}}"#
)
}
#[rstest]
#[case::spec_name("fileSizeHistogram")]
#[case::legacy_name("histogramOpt")]
fn de_valid_file_size_histogram_succeeds(#[case] field_name: &str) {
let json = crc_json_with_histogram(
field_name,
r#"{"sortedBinBoundaries": [0, 100, 200], "fileCounts": [1, 2, 3], "totalBytes": [10, 200, 300]}"#,
);
let crc: Crc = serde_json::from_str(&json).unwrap();
assert!(crc.file_stats().unwrap().file_size_histogram().is_some());
}
#[rstest]
#[case::spec_name("fileSizeHistogram")]
#[case::legacy_name("histogramOpt")]
fn de_null_file_size_histogram_deserializes_to_none(#[case] field_name: &str) {
let json = crc_json_with_histogram(field_name, "null");
let crc: Crc = serde_json::from_str(&json).unwrap();
assert!(crc.file_stats().unwrap().file_size_histogram().is_none());
}
#[rstest]
#[case::unsorted_boundaries(
r#"{"sortedBinBoundaries": [0, 200, 100], "fileCounts": [0, 0, 0], "totalBytes": [0, 0, 0]}"#
)]
#[case::nonzero_first_boundary(
r#"{"sortedBinBoundaries": [1, 100], "fileCounts": [0, 0], "totalBytes": [0, 0]}"#
)]
#[case::mismatched_lengths(
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [0], "totalBytes": [0, 0]}"#
)]
#[case::single_boundary(
r#"{"sortedBinBoundaries": [0], "fileCounts": [0], "totalBytes": [0]}"#
)]
fn de_malformed_file_size_histogram_returns_error(
#[case] histogram_json: &str,
#[values("fileSizeHistogram", "histogramOpt")] field_name: &str,
) {
let json = crc_json_with_histogram(field_name, histogram_json);
assert!(serde_json::from_str::<Crc>(&json).is_err());
}
#[test]
fn ser_uses_spec_field_name_after_deserializing_legacy_alias() {
let legacy_json = crc_json_with_histogram(
"histogramOpt",
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [1, 0], "totalBytes": [50, 0]}"#,
);
let crc: Crc = serde_json::from_str(&legacy_json).unwrap();
let serialized = serde_json::to_value(&crc).unwrap();
assert!(serialized.get("fileSizeHistogram").is_some());
assert!(serialized.get("histogramOpt").is_none());
}
#[rstest]
#[case::matching_payloads(
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [1, 0], "totalBytes": [50, 0]}"#,
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [1, 0], "totalBytes": [50, 0]}"#
)]
#[case::mismatched_payloads(
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [1, 0], "totalBytes": [50, 0]}"#,
r#"{"sortedBinBoundaries": [0, 200], "fileCounts": [9, 9], "totalBytes": [99, 99]}"#
)]
fn de_both_field_names_present_returns_duplicate_field_error(
#[case] histogram_opt_payload: &str,
#[case] file_size_histogram_payload: &str,
#[values(true, false)] spec_listed_last: bool,
) {
let (first_name, first_payload, second_name, second_payload) = if spec_listed_last {
(
"histogramOpt",
histogram_opt_payload,
"fileSizeHistogram",
file_size_histogram_payload,
)
} else {
(
"fileSizeHistogram",
file_size_histogram_payload,
"histogramOpt",
histogram_opt_payload,
)
};
let json = format!(
r#"{{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {{
"id": "test",
"format": {{"provider": "parquet", "options": {{}}}},
"schemaString": "{{\"type\":\"struct\",\"fields\":[]}}",
"partitionColumns": [],
"configuration": {{}},
"createdTime": 0
}},
"protocol": {{"minReaderVersion": 1, "minWriterVersion": 1}},
"{first_name}": {first_payload},
"{second_name}": {second_payload}
}}"#
);
let err = serde_json::from_str::<Crc>(&json).unwrap_err();
assert!(
err.to_string().contains("duplicate field"),
"expected duplicate-field error, got: {err}"
);
}
}