#![allow(unreachable_pub)]
mod delta;
mod file_size_histogram;
mod file_stats;
mod lazy;
mod reader;
mod writer;
use std::collections::HashMap;
#[allow(unused)]
pub(crate) use delta::CrcDelta;
pub use file_size_histogram::FileSizeHistogram;
pub use file_stats::FileStats;
#[allow(unused)]
pub(crate) use file_stats::FileStatsDelta;
pub(crate) use lazy::{CrcLoadResult, LazyCrc};
pub(crate) use reader::try_read_crc_file;
use serde::de::Deserializer;
use serde::ser::Serializer;
use serde::{Deserialize, Serialize};
#[allow(unused)]
pub(crate) use writer::try_write_crc_file;
use crate::actions::{Add, DomainMetadata, Metadata, Protocol, SetTransaction};
#[allow(dead_code)] #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum FileStatsValidity {
#[default]
Valid,
RequiresCheckpointRead,
Indeterminate,
Untrackable,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Crc {
table_size_bytes: i64,
num_files: i64,
pub num_metadata: i64,
pub num_protocol: i64,
pub metadata: Metadata,
pub protocol: Protocol,
#[serde(skip)]
pub file_stats_validity: FileStatsValidity,
#[serde(skip)]
pub txn_id: Option<String>,
pub in_commit_timestamp_opt: Option<i64>,
#[serde(
default,
deserialize_with = "de_opt_vec_to_opt_map",
serialize_with = "ser_opt_map_to_opt_vec"
)]
pub set_transactions: Option<HashMap<String, SetTransaction>>,
#[serde(
default,
deserialize_with = "de_opt_vec_to_opt_map",
serialize_with = "ser_opt_map_to_opt_vec"
)]
pub domain_metadata: Option<HashMap<String, DomainMetadata>>,
#[serde(
default,
deserialize_with = "de_validated_file_size_histogram",
skip_serializing_if = "Option::is_none"
)]
pub file_size_histogram: Option<FileSizeHistogram>,
#[serde(skip)]
pub all_files: Option<Vec<Add>>,
#[serde(skip)]
pub num_deleted_records_opt: Option<i64>,
#[serde(skip)]
pub num_deletion_vectors_opt: Option<i64>,
#[serde(skip)]
pub deleted_record_counts_histogram_opt: Option<DeletedRecordCountsHistogram>,
}
impl Crc {
pub fn file_stats(&self) -> Option<FileStats> {
match self.file_stats_validity {
FileStatsValidity::Valid => Some(FileStats {
num_files: self.num_files,
table_size_bytes: self.table_size_bytes,
file_size_histogram: self.file_size_histogram.clone(),
}),
_ => None,
}
}
}
trait MapKey {
fn map_key(&self) -> &str;
}
impl MapKey for DomainMetadata {
fn map_key(&self) -> &str {
self.domain()
}
}
impl MapKey for SetTransaction {
fn map_key(&self) -> &str {
&self.app_id
}
}
fn de_opt_vec_to_opt_map<'de, D, T>(deserializer: D) -> Result<Option<HashMap<String, T>>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de> + MapKey,
{
let opt_vec: Option<Vec<T>> = Option::deserialize(deserializer)?;
Ok(opt_vec.map(|vec| {
vec.into_iter()
.map(|item| (item.map_key().to_string(), item))
.collect()
}))
}
fn de_validated_file_size_histogram<'de, D>(
deserializer: D,
) -> Result<Option<FileSizeHistogram>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<FileSizeHistogram> = Option::deserialize(deserializer)?;
match opt {
Some(hist) => FileSizeHistogram::try_new(
hist.sorted_bin_boundaries,
hist.file_counts,
hist.total_bytes,
)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
fn ser_opt_map_to_opt_vec<S, T>(
map: &Option<HashMap<String, T>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
T: Serialize,
{
match map {
None => serializer.serialize_none(),
Some(m) => m.values().collect::<Vec<_>>().serialize(serializer),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeletedRecordCountsHistogram {
pub(crate) deleted_record_counts: Vec<i64>,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::Crc;
use crate::actions::{DomainMetadata, SetTransaction};
fn crc_with(
txns: Option<HashMap<String, SetTransaction>>,
domains: Option<HashMap<String, DomainMetadata>>,
) -> Crc {
Crc {
set_transactions: txns,
domain_metadata: domains,
..Default::default()
}
}
#[test]
fn de_vec_to_map_produces_correct_keys_and_values() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1},
"setTransactions": [
{"appId": "app-1", "version": 3, "lastUpdated": 1000},
{"appId": "app-2", "version": 7}
],
"domainMetadata": [
{"domain": "delta.rowTracking", "configuration": "{\"rowIdHighWaterMark\":1}", "removed": false},
{"domain": "delta.clustering", "configuration": "{}", "removed": false}
]
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
let txns = crc.set_transactions.as_ref().unwrap();
assert_eq!(txns.len(), 2);
let txn1 = &txns["app-1"];
assert_eq!(txn1.app_id, "app-1");
assert_eq!(txn1.version, 3);
assert_eq!(txn1.last_updated, Some(1000));
let txn2 = &txns["app-2"];
assert_eq!(txn2.app_id, "app-2");
assert_eq!(txn2.version, 7);
assert_eq!(txn2.last_updated, None);
let domains = crc.domain_metadata.as_ref().unwrap();
assert_eq!(domains.len(), 2);
assert!(domains.contains_key("delta.rowTracking"));
assert!(domains.contains_key("delta.clustering"));
}
#[test]
fn de_null_deserializes_to_none() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1},
"setTransactions": null,
"domainMetadata": null
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
assert!(crc.set_transactions.is_none());
assert!(crc.domain_metadata.is_none());
}
#[test]
fn de_missing_field_deserializes_to_none() {
let json = r#"{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {
"id": "test",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 0
},
"protocol": {"minReaderVersion": 1, "minWriterVersion": 1}
}"#;
let crc: Crc = serde_json::from_str(json).unwrap();
assert!(crc.set_transactions.is_none());
assert!(crc.domain_metadata.is_none());
}
#[test]
fn ser_none_serializes_to_null() {
let crc = crc_with(None, None);
let json = serde_json::to_value(&crc).unwrap();
assert!(json["setTransactions"].is_null());
assert!(json["domainMetadata"].is_null());
}
#[test]
fn ser_map_round_trips_through_vec() {
let mut txns = HashMap::new();
txns.insert(
"app-1".to_string(),
SetTransaction::new("app-1".to_string(), 5, Some(2000)),
);
txns.insert(
"app-2".to_string(),
SetTransaction::new("app-2".to_string(), 10, None),
);
let mut domains = HashMap::new();
domains.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new("delta.rowTracking".to_string(), "{}".to_string()),
);
let original = crc_with(Some(txns), Some(domains));
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(original, deserialized);
}
#[test]
fn round_trip_empty_maps() {
let original = crc_with(Some(HashMap::new()), Some(HashMap::new()));
let json_str = serde_json::to_string(&original).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(original, deserialized);
let json_value = serde_json::to_value(&original).unwrap();
assert_eq!(json_value["setTransactions"], serde_json::json!([]));
assert_eq!(json_value["domainMetadata"], serde_json::json!([]));
}
#[test]
fn test_crc_with_multiple_domain_metadatas_and_set_transactions() {
let mut txns = HashMap::new();
txns.insert(
"streaming-app".to_string(),
SetTransaction::new("streaming-app".to_string(), 42, Some(1700000000)),
);
txns.insert(
"batch-job".to_string(),
SetTransaction::new("batch-job".to_string(), 100, None),
);
txns.insert(
"etl-pipeline".to_string(),
SetTransaction::new("etl-pipeline".to_string(), 7, Some(1700001000)),
);
let mut domains = HashMap::new();
domains.insert(
"delta.rowTracking".to_string(),
DomainMetadata::new(
"delta.rowTracking".to_string(),
r#"{"rowIdHighWaterMark":500}"#.to_string(),
),
);
domains.insert(
"delta.clustering".to_string(),
DomainMetadata::new("delta.clustering".to_string(), "{}".to_string()),
);
domains.insert(
"custom.app".to_string(),
DomainMetadata::new("custom.app".to_string(), r#"{"version":"2.0"}"#.to_string()),
);
let crc = Crc {
table_size_bytes: 1024 * 1024,
num_files: 10,
num_metadata: 1,
num_protocol: 1,
set_transactions: Some(txns),
domain_metadata: Some(domains),
..Default::default()
};
let json_str = serde_json::to_string(&crc).unwrap();
let deserialized: Crc = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.table_size_bytes, 1024 * 1024);
assert_eq!(deserialized.num_files, 10);
let txns = deserialized.set_transactions.as_ref().unwrap();
assert_eq!(txns.len(), 3);
assert_eq!(txns["streaming-app"].version, 42);
assert_eq!(txns["streaming-app"].last_updated, Some(1700000000));
assert_eq!(txns["batch-job"].version, 100);
assert_eq!(txns["batch-job"].last_updated, None);
assert_eq!(txns["etl-pipeline"].version, 7);
let domains = deserialized.domain_metadata.as_ref().unwrap();
assert_eq!(domains.len(), 3);
assert!(domains.contains_key("delta.rowTracking"));
assert!(domains.contains_key("delta.clustering"));
assert!(domains.contains_key("custom.app"));
assert_eq!(
domains["custom.app"].configuration(),
r#"{"version":"2.0"}"#
);
assert_eq!(crc, deserialized);
}
fn crc_json_with_histogram(histogram_json: &str) -> String {
format!(
r#"{{
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": {{
"id": "test",
"format": {{"provider": "parquet", "options": {{}}}},
"schemaString": "{{\"type\":\"struct\",\"fields\":[]}}",
"partitionColumns": [],
"configuration": {{}},
"createdTime": 0
}},
"protocol": {{"minReaderVersion": 1, "minWriterVersion": 1}},
"fileSizeHistogram": {histogram_json}
}}"#
)
}
#[test]
fn de_valid_file_size_histogram_succeeds() {
let json = crc_json_with_histogram(
r#"{"sortedBinBoundaries": [0, 100, 200], "fileCounts": [1, 2, 3], "totalBytes": [10, 200, 300]}"#,
);
let crc: Crc = serde_json::from_str(&json).unwrap();
assert!(crc.file_size_histogram.is_some());
}
#[test]
fn de_null_file_size_histogram_deserializes_to_none() {
let json = crc_json_with_histogram("null");
let crc: Crc = serde_json::from_str(&json).unwrap();
assert!(crc.file_size_histogram.is_none());
}
use rstest::rstest;
#[rstest]
#[case::unsorted_boundaries(
r#"{"sortedBinBoundaries": [0, 200, 100], "fileCounts": [0, 0, 0], "totalBytes": [0, 0, 0]}"#
)]
#[case::nonzero_first_boundary(
r#"{"sortedBinBoundaries": [1, 100], "fileCounts": [0, 0], "totalBytes": [0, 0]}"#
)]
#[case::mismatched_lengths(
r#"{"sortedBinBoundaries": [0, 100], "fileCounts": [0], "totalBytes": [0, 0]}"#
)]
#[case::single_boundary(
r#"{"sortedBinBoundaries": [0], "fileCounts": [0], "totalBytes": [0]}"#
)]
fn de_malformed_file_size_histogram_returns_error(#[case] histogram_json: &str) {
let json = crc_json_with_histogram(histogram_json);
assert!(serde_json::from_str::<Crc>(&json).is_err());
}
}