use tracing::warn;
use super::file_stats::FileStatsDelta;
use super::{Crc, FileStatsValidity};
use crate::actions::{DomainMetadata, Metadata, Protocol, SetTransaction};
#[derive(Debug, Clone, Default)]
pub(crate) struct CrcDelta {
pub(crate) file_stats: FileStatsDelta,
pub(crate) protocol: Option<Protocol>,
pub(crate) metadata: Option<Metadata>,
pub(crate) domain_metadata_changes: Vec<DomainMetadata>,
pub(crate) set_transaction_changes: Vec<SetTransaction>,
pub(crate) in_commit_timestamp: Option<i64>,
pub(crate) operation: Option<String>,
pub(crate) has_missing_file_size: bool,
}
impl CrcDelta {
pub(crate) fn into_crc_for_version_zero(self) -> Option<Crc> {
let protocol = self.protocol?;
let metadata = self.metadata?;
let domain_metadata = Some(
self.domain_metadata_changes
.into_iter()
.filter(|dm| !dm.is_removed())
.map(|dm| (dm.domain().to_string(), dm))
.collect(),
);
let set_transactions = Some(
self.set_transaction_changes
.into_iter()
.map(|txn| (txn.app_id.clone(), txn))
.collect(),
);
let initial_histogram = self.file_stats.net_histogram.and_then(|delta| {
delta
.check_non_negative()
.inspect_err(|e| {
warn!("Non-negative file count check failed, dropping file size histogram for version zero: {e}");
})
.ok()
});
Some(Crc {
table_size_bytes: self.file_stats.net_bytes,
num_files: self.file_stats.net_files,
num_metadata: 1,
num_protocol: 1,
protocol,
metadata,
domain_metadata,
set_transactions,
in_commit_timestamp_opt: self.in_commit_timestamp,
file_size_histogram: initial_histogram,
..Default::default()
})
}
}
impl Crc {
pub(crate) fn apply(&mut self, delta: CrcDelta) {
if let Some(p) = delta.protocol {
self.protocol = p;
}
if let Some(m) = delta.metadata {
self.metadata = m;
}
if !delta.domain_metadata_changes.is_empty() {
if let Some(map) = &mut self.domain_metadata {
for dm in delta.domain_metadata_changes {
if dm.is_removed() {
map.remove(dm.domain());
} else {
let domain = dm.domain().to_string();
map.insert(domain, dm);
}
}
}
}
if let Some(map) = &mut self.set_transactions {
map.extend(
delta
.set_transaction_changes
.into_iter()
.map(|txn| (txn.app_id.clone(), txn)),
);
}
self.in_commit_timestamp_opt = delta.in_commit_timestamp;
if self.file_stats_validity == FileStatsValidity::Untrackable {
return;
}
if delta.has_missing_file_size {
self.file_stats_validity = FileStatsValidity::Untrackable;
self.file_size_histogram = None;
return;
}
if self.file_stats_validity == FileStatsValidity::Indeterminate {
return;
}
let is_incremental_safe = delta
.operation
.as_deref()
.is_some_and(FileStatsDelta::is_incremental_safe);
if !is_incremental_safe {
self.file_stats_validity = FileStatsValidity::Indeterminate;
self.file_size_histogram = None;
return;
}
self.num_files += delta.file_stats.net_files;
self.table_size_bytes += delta.file_stats.net_bytes;
if let (Some(base_hist), Some(delta_hist)) = (
self.file_size_histogram.as_ref(),
&delta.file_stats.net_histogram,
) {
match base_hist.try_apply_delta(delta_hist) {
Ok(merged) => self.file_size_histogram = Some(merged),
Err(e) => {
warn!("Histogram merge failed, dropping file size histogram: {e}");
self.file_size_histogram = None;
}
}
} else if self.file_size_histogram.is_some() {
self.file_size_histogram = None;
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use rstest::rstest;
use super::*;
use crate::actions::{DomainMetadata, Metadata, Protocol};
use crate::crc::FileSizeHistogram;
fn base_crc() -> Crc {
Crc {
table_size_bytes: 1000,
num_files: 10,
num_metadata: 1,
num_protocol: 1,
..Default::default()
}
}
fn write_delta(net_files: i64, net_bytes: i64) -> CrcDelta {
CrcDelta {
file_stats: FileStatsDelta {
net_files,
net_bytes,
..Default::default()
},
operation: Some("WRITE".to_string()),
..Default::default()
}
}
#[test]
fn test_incremental_safe_operations() {
for op in [
"WRITE",
"MERGE",
"UPDATE",
"DELETE",
"OPTIMIZE",
"CREATE TABLE",
"REPLACE TABLE",
"CREATE TABLE AS SELECT",
"REPLACE TABLE AS SELECT",
"CREATE OR REPLACE TABLE AS SELECT",
] {
assert!(
FileStatsDelta::is_incremental_safe(op),
"{op} should be incremental-safe"
);
}
}
#[test]
fn test_non_incremental_safe_operations() {
assert!(!FileStatsDelta::is_incremental_safe("ANALYZE STATS"));
assert!(!FileStatsDelta::is_incremental_safe("UNKNOWN"));
}
#[test]
fn test_deserialized_crc_has_valid_stats() {
let crc = base_crc();
assert_eq!(crc.file_stats_validity, FileStatsValidity::Valid);
assert_eq!(crc.num_files, 10);
assert_eq!(crc.table_size_bytes, 1000);
}
#[test]
fn test_apply_updates_file_stats() {
let mut crc = base_crc();
crc.apply(write_delta(3, 600));
assert_eq!(crc.num_files, 13); assert_eq!(crc.table_size_bytes, 1600); assert_eq!(crc.file_stats_validity, FileStatsValidity::Valid);
}
#[test]
fn test_apply_multiple_deltas() {
let mut crc = base_crc();
crc.apply(write_delta(3, 600));
crc.apply(write_delta(-2, -400));
assert_eq!(crc.num_files, 11); assert_eq!(crc.table_size_bytes, 1200); assert_eq!(crc.file_stats_validity, FileStatsValidity::Valid);
}
#[test]
fn test_apply_unsafe_op_transitions_to_indeterminate() {
let mut crc = base_crc();
let unsafe_change = CrcDelta {
operation: Some("ANALYZE STATS".to_string()),
..write_delta(1, 100)
};
crc.apply(unsafe_change);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
}
#[test]
fn test_apply_none_op_transitions_to_indeterminate() {
let mut crc = base_crc();
let unknown_delta = CrcDelta {
operation: None,
..write_delta(1, 100)
};
crc.apply(unknown_delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
}
#[test]
fn test_indeterminate_stays_indeterminate() {
let mut crc = base_crc();
let unsafe_change = CrcDelta {
operation: Some("ANALYZE STATS".to_string()),
..write_delta(1, 100)
};
crc.apply(unsafe_change);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
crc.apply(write_delta(5, 500));
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
}
#[test]
fn test_missing_file_size_transitions_to_untrackable() {
let mut crc = base_crc();
let delta = CrcDelta {
has_missing_file_size: true,
..write_delta(1, 100)
};
crc.apply(delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
}
#[test]
fn test_untrackable_stays_untrackable() {
let mut crc = base_crc();
let delta = CrcDelta {
has_missing_file_size: true,
..write_delta(1, 100)
};
crc.apply(delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
crc.apply(write_delta(5, 500));
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
crc.apply(CrcDelta {
operation: None,
..write_delta(1, 100)
});
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
}
#[test]
fn test_indeterminate_transitions_to_untrackable_on_missing_size() {
let mut crc = base_crc();
let unsafe_change = CrcDelta {
operation: Some("ANALYZE STATS".to_string()),
..write_delta(1, 100)
};
crc.apply(unsafe_change);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
let delta = CrcDelta {
has_missing_file_size: true,
..write_delta(1, 100)
};
crc.apply(delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
}
#[test]
fn test_apply_replaces_protocol() {
let mut crc = base_crc();
let new_protocol = Protocol::try_new(
2,
5,
None::<Vec<crate::table_features::TableFeature>>,
None::<Vec<crate::table_features::TableFeature>>,
)
.unwrap();
let delta = CrcDelta {
protocol: Some(new_protocol.clone()),
..write_delta(0, 0)
};
crc.apply(delta);
assert_eq!(crc.protocol, new_protocol);
assert_eq!(crc.metadata, Metadata::default()); }
#[test]
fn test_apply_adds_domain_metadata_to_tracked_map() {
let mut crc = base_crc();
crc.domain_metadata = Some(HashMap::new());
let dm = DomainMetadata::new("my.domain".to_string(), "config1".to_string());
let delta = CrcDelta {
domain_metadata_changes: vec![dm],
..write_delta(0, 0)
};
crc.apply(delta);
let map = crc.domain_metadata.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my.domain"].configuration(), "config1");
}
#[test]
fn test_apply_with_untracked_domain_metadata_skips_changes() {
let mut crc = base_crc();
assert!(crc.domain_metadata.is_none()); let dm = DomainMetadata::new("my.domain".to_string(), "config1".to_string());
let delta = CrcDelta {
domain_metadata_changes: vec![dm],
..write_delta(0, 0)
};
crc.apply(delta);
assert!(crc.domain_metadata.is_none());
}
#[test]
fn test_apply_upserts_domain_metadata() {
let mut crc = base_crc();
crc.domain_metadata = Some(HashMap::from([(
"my.domain".to_string(),
DomainMetadata::new("my.domain".to_string(), "old_config".to_string()),
)]));
let dm = DomainMetadata::new("my.domain".to_string(), "new_config".to_string());
let delta = CrcDelta {
domain_metadata_changes: vec![dm],
..write_delta(0, 0)
};
crc.apply(delta);
let map = crc.domain_metadata.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my.domain"].configuration(), "new_config");
}
#[test]
fn test_apply_removes_domain_metadata() {
let mut crc = base_crc();
crc.domain_metadata = Some(HashMap::from([(
"my.domain".to_string(),
DomainMetadata::new("my.domain".to_string(), "config1".to_string()),
)]));
let dm = DomainMetadata::remove("my.domain".to_string(), "config1".to_string());
let delta = CrcDelta {
domain_metadata_changes: vec![dm],
..write_delta(0, 0)
};
crc.apply(delta);
let map = crc.domain_metadata.as_ref().unwrap();
assert!(map.is_empty());
}
#[test]
fn test_apply_replaces_in_commit_timestamp() {
let mut crc = base_crc();
let delta = CrcDelta {
in_commit_timestamp: Some(9999),
..write_delta(0, 0)
};
crc.apply(delta);
assert_eq!(crc.in_commit_timestamp_opt, Some(9999));
}
#[test]
fn test_apply_clears_in_commit_timestamp_when_ict_disabled() {
let mut crc = base_crc();
crc.in_commit_timestamp_opt = Some(1000);
let delta = CrcDelta {
in_commit_timestamp: None,
..write_delta(0, 0)
};
crc.apply(delta);
assert_eq!(crc.in_commit_timestamp_opt, None);
}
fn test_protocol() -> Protocol {
Protocol::try_new(
1,
2,
None::<Vec<crate::table_features::TableFeature>>,
None::<Vec<crate::table_features::TableFeature>>,
)
.unwrap()
}
#[test]
fn test_into_crc_for_version_zero_with_protocol_and_metadata() {
let protocol = test_protocol();
let metadata = Metadata::default();
let delta = CrcDelta {
protocol: Some(protocol.clone()),
metadata: Some(metadata.clone()),
..write_delta(5, 1000)
};
let crc = delta.into_crc_for_version_zero().unwrap();
assert_eq!(crc.protocol, protocol);
assert_eq!(crc.metadata, metadata);
assert_eq!(crc.num_files, 5);
assert_eq!(crc.table_size_bytes, 1000);
assert_eq!(crc.num_metadata, 1);
assert_eq!(crc.num_protocol, 1);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Valid);
assert_eq!(crc.domain_metadata, Some(HashMap::new()));
assert_eq!(crc.in_commit_timestamp_opt, None);
}
#[test]
fn test_into_crc_for_version_zero_returns_none_without_protocol() {
let delta = CrcDelta {
metadata: Some(Metadata::default()),
..write_delta(5, 1000)
};
assert!(delta.into_crc_for_version_zero().is_none());
}
#[test]
fn test_into_crc_for_version_zero_returns_none_without_metadata() {
let delta = CrcDelta {
protocol: Some(test_protocol()),
..write_delta(5, 1000)
};
assert!(delta.into_crc_for_version_zero().is_none());
}
#[test]
fn test_into_crc_for_version_zero_with_domain_metadata() {
let dm = DomainMetadata::new("my.domain".to_string(), "config1".to_string());
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
domain_metadata_changes: vec![dm],
..write_delta(0, 0)
};
let crc = delta.into_crc_for_version_zero().unwrap();
let map = crc.domain_metadata.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my.domain"].configuration(), "config1");
}
#[test]
fn test_into_crc_for_version_zero_with_in_commit_timestamp() {
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
in_commit_timestamp: Some(12345),
..write_delta(0, 0)
};
let crc = delta.into_crc_for_version_zero().unwrap();
assert_eq!(crc.in_commit_timestamp_opt, Some(12345));
}
#[test]
fn test_apply_adds_set_transaction_to_tracked_map() {
let mut crc = base_crc();
crc.set_transactions = Some(HashMap::new());
let txn = SetTransaction::new("my-app".to_string(), 1, Some(1000));
let delta = CrcDelta {
set_transaction_changes: vec![txn],
..write_delta(0, 0)
};
crc.apply(delta);
let map = crc.set_transactions.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my-app"].version, 1);
assert_eq!(map["my-app"].last_updated, Some(1000));
}
#[test]
fn test_apply_with_untracked_set_transactions_skips_changes() {
let mut crc = base_crc();
assert!(crc.set_transactions.is_none()); let txn = SetTransaction::new("my-app".to_string(), 1, Some(1000));
let delta = CrcDelta {
set_transaction_changes: vec![txn],
..write_delta(0, 0)
};
crc.apply(delta);
assert!(crc.set_transactions.is_none());
}
#[test]
fn test_apply_upserts_set_transaction() {
let mut crc = base_crc();
crc.set_transactions = Some(HashMap::from([(
"my-app".to_string(),
SetTransaction::new("my-app".to_string(), 1, Some(1000)),
)]));
let txn = SetTransaction::new("my-app".to_string(), 2, Some(2000));
let delta = CrcDelta {
set_transaction_changes: vec![txn],
..write_delta(0, 0)
};
crc.apply(delta);
let map = crc.set_transactions.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my-app"].version, 2);
assert_eq!(map["my-app"].last_updated, Some(2000));
}
#[test]
fn test_into_crc_for_version_zero_with_set_transactions() {
let txn = SetTransaction::new("my-app".to_string(), 5, Some(3000));
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
set_transaction_changes: vec![txn],
..write_delta(0, 0)
};
let crc = delta.into_crc_for_version_zero().unwrap();
let map = crc.set_transactions.as_ref().unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map["my-app"].version, 5);
assert_eq!(map["my-app"].last_updated, Some(3000));
}
#[test]
fn test_into_crc_for_version_zero_with_no_set_transactions() {
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
..write_delta(0, 0)
};
let crc = delta.into_crc_for_version_zero().unwrap();
assert_eq!(crc.set_transactions, Some(HashMap::new()));
}
fn histogram_from_sizes(sizes: &[i64]) -> FileSizeHistogram {
let mut hist = FileSizeHistogram::create_default();
for &size in sizes {
hist.insert(size).unwrap();
}
hist
}
fn base_crc_with_histogram(file_sizes: &[i64]) -> Crc {
let hist = histogram_from_sizes(file_sizes);
Crc {
table_size_bytes: file_sizes.iter().sum(),
num_files: file_sizes.len() as i64,
num_metadata: 1,
num_protocol: 1,
file_size_histogram: Some(hist),
..Default::default()
}
}
fn write_delta_with_histograms(add_sizes: &[i64], remove_sizes: &[i64]) -> CrcDelta {
let mut hist = FileSizeHistogram::create_default();
for &s in add_sizes {
hist.insert(s).unwrap();
}
for &s in remove_sizes {
hist.remove(s).unwrap();
}
let net_files = add_sizes.len() as i64 - remove_sizes.len() as i64;
let net_bytes: i64 = add_sizes.iter().sum::<i64>() - remove_sizes.iter().sum::<i64>();
CrcDelta {
file_stats: FileStatsDelta {
net_files,
net_bytes,
net_histogram: Some(hist),
},
operation: Some("WRITE".to_string()),
..Default::default()
}
}
#[rstest]
#[case::single_bin(&[100, 200, 300], &[500], &[200], &[(0, 3, 900)])]
#[case::adds_only(&[100], &[200, 300], &[], &[(0, 3, 600)])]
#[case::removes_only(&[100, 200, 300], &[], &[100, 200], &[(0, 1, 300)])]
#[case::empty_delta(&[100, 10_000], &[], &[], &[(0, 1, 100), (1, 1, 10_000)])]
#[case::multi_bin(
&[100, 10_000, 20_000],
&[200, 10_500],
&[100, 20_000],
&[(0, 1, 200), (1, 2, 20_500), (2, 0, 0)]
)]
#[case::large_files(
&[100, 5_000_000],
&[10_000, 5_500_000],
&[100],
&[(0, 0, 0), (1, 1, 10_000), (10, 2, 10_500_000)]
)]
fn apply_merges_histogram(
#[case] base: &[i64],
#[case] add: &[i64],
#[case] remove: &[i64],
#[case] expected_bins: &[(usize, i64, i64)],
) {
let mut crc = base_crc_with_histogram(base);
let delta = write_delta_with_histograms(add, remove);
crc.apply(delta);
let hist = crc.file_size_histogram.as_ref().unwrap();
for &(bin, count, bytes) in expected_bins {
assert_eq!(hist.file_counts[bin], count, "file_counts[{bin}]");
assert_eq!(hist.total_bytes[bin], bytes, "total_bytes[{bin}]");
}
}
#[rstest]
#[case::base_none_delta_none(None)]
#[case::base_some_delta_none(Some(vec![100i64, 200]))]
fn apply_drops_histogram_when_delta_missing_histogram(#[case] base_files: Option<Vec<i64>>) {
let mut crc = match &base_files {
Some(sizes) => base_crc_with_histogram(sizes),
None => base_crc(),
};
let delta = CrcDelta {
file_stats: FileStatsDelta {
net_files: 1,
net_bytes: 100,
net_histogram: None,
},
operation: Some("WRITE".to_string()),
..Default::default()
};
crc.apply(delta);
assert!(
crc.file_size_histogram.is_none(),
"histogram should be None when delta doesn't provide a histogram"
);
}
#[test]
fn apply_drops_histogram_on_indeterminate() {
let mut crc = base_crc_with_histogram(&[100, 200]);
let unsafe_delta = CrcDelta {
operation: Some("ANALYZE STATS".to_string()),
..write_delta(1, 100)
};
crc.apply(unsafe_delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Indeterminate);
assert!(crc.file_size_histogram.is_none());
}
#[test]
fn apply_drops_histogram_on_untrackable() {
let mut crc = base_crc_with_histogram(&[100, 200]);
let delta = CrcDelta {
has_missing_file_size: true,
..write_delta(1, 100)
};
crc.apply(delta);
assert_eq!(crc.file_stats_validity, FileStatsValidity::Untrackable);
assert!(crc.file_size_histogram.is_none());
}
#[test]
fn into_crc_for_version_zero_includes_histogram() {
let delta_hist = histogram_from_sizes(&[500, 1000]);
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
file_stats: FileStatsDelta {
net_files: 2,
net_bytes: 1500,
net_histogram: Some(delta_hist),
},
operation: Some("WRITE".to_string()),
..Default::default()
};
let crc = delta.into_crc_for_version_zero().unwrap();
let hist = crc.file_size_histogram.as_ref().unwrap();
assert_eq!(hist.file_counts[0], 2);
assert_eq!(hist.total_bytes[0], 1500);
}
#[test]
fn into_crc_for_version_zero_without_histogram() {
let delta = CrcDelta {
protocol: Some(test_protocol()),
metadata: Some(Metadata::default()),
..write_delta(0, 0)
};
let crc = delta.into_crc_for_version_zero().unwrap();
assert!(crc.file_size_histogram.is_none());
}
#[test]
fn apply_merges_histogram_with_non_default_boundaries() {
let boundaries = vec![0, 200, 1000];
let base_hist = FileSizeHistogram::try_new(
boundaries.clone(),
vec![2, 1, 0], vec![300, 500, 0],
)
.unwrap();
let mut crc = Crc {
table_size_bytes: 800,
num_files: 3,
num_metadata: 1,
num_protocol: 1,
file_size_histogram: Some(base_hist),
..Default::default()
};
let mut delta_hist = FileSizeHistogram::create_empty_with_boundaries(boundaries).unwrap();
delta_hist.insert(100).unwrap(); delta_hist.insert(1500).unwrap(); delta_hist.remove(150).unwrap();
let delta = CrcDelta {
file_stats: FileStatsDelta {
net_files: 1, net_bytes: 1450, net_histogram: Some(delta_hist),
},
operation: Some("WRITE".to_string()),
..Default::default()
};
crc.apply(delta);
let hist = crc.file_size_histogram.as_ref().unwrap();
assert_eq!(hist.sorted_bin_boundaries, vec![0, 200, 1000]);
assert_eq!(hist.file_counts, vec![2, 1, 1]); assert_eq!(hist.total_bytes, vec![250, 500, 1500]); assert_eq!(crc.num_files, 4);
assert_eq!(crc.table_size_bytes, 2250);
}
}