use std::sync::LazyLock;
use super::FileSizeHistogram;
use crate::engine_data::{FilteredEngineData, GetData, TypedGetData as _};
use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileStats {
pub num_files: i64,
pub table_size_bytes: i64,
pub file_size_histogram: Option<FileSizeHistogram>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub(crate) struct FileStatsDelta {
pub(crate) net_files: i64,
pub(crate) net_bytes: i64,
pub(crate) net_histogram: Option<FileSizeHistogram>,
}
impl FileStatsDelta {
const INCREMENTAL_SAFE_OPS: &[&str] = &[
"WRITE",
"MERGE",
"UPDATE",
"DELETE",
"OPTIMIZE",
"CREATE TABLE",
"REPLACE TABLE",
"CREATE TABLE AS SELECT",
"REPLACE TABLE AS SELECT",
"CREATE OR REPLACE TABLE AS SELECT",
];
pub(crate) fn is_incremental_safe(operation: &str) -> bool {
Self::INCREMENTAL_SAFE_OPS.contains(&operation)
}
pub(crate) fn try_compute_for_txn(
add_files_metadata: &[Box<dyn EngineData>],
remove_files_metadata: &[FilteredEngineData],
bin_boundaries: Option<&[i64]>,
) -> DeltaResult<Self> {
let mut histogram = match bin_boundaries {
Some(b) => FileSizeHistogram::create_empty_with_boundaries(b.to_vec())?,
None => FileSizeHistogram::create_default(),
};
let mut net_files = 0i64;
let mut net_bytes = 0i64;
for batch in add_files_metadata {
let mut visitor = FileStatsVisitor::new(None, false, &mut histogram);
visitor.visit_rows_of(batch.as_ref())?;
net_files += visitor.count;
net_bytes += visitor.total_size;
}
for filtered_batch in remove_files_metadata {
let sv = filtered_batch.selection_vector();
let sv_opt = if sv.is_empty() { None } else { Some(sv) };
let mut visitor = FileStatsVisitor::new(sv_opt, true, &mut histogram);
visitor.visit_rows_of(filtered_batch.data())?;
net_files += visitor.count;
net_bytes += visitor.total_size;
}
Ok(FileStatsDelta {
net_files,
net_bytes,
net_histogram: Some(histogram),
})
}
}
struct FileStatsVisitor<'sv, 'h> {
selection_vector: Option<&'sv [bool]>,
offset: usize,
is_remove: bool,
count: i64,
total_size: i64,
histogram: &'h mut FileSizeHistogram,
}
impl<'sv, 'h> FileStatsVisitor<'sv, 'h> {
fn new(
selection_vector: Option<&'sv [bool]>,
is_remove: bool,
histogram: &'h mut FileSizeHistogram,
) -> Self {
Self {
selection_vector,
offset: 0,
is_remove,
count: 0,
total_size: 0,
histogram,
}
}
}
impl RowVisitor for FileStatsVisitor<'_, '_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| (vec![ColumnName::new(["size"])], vec![DataType::LONG]).into());
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 1,
Error::InternalError(format!(
"Wrong number of FileStatsVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
let selected = match self.selection_vector {
Some(sv) => sv.get(self.offset + i).copied().unwrap_or(true),
None => true,
};
if selected {
let size: i64 = getters[0].get(i, "size")?;
if self.is_remove {
self.count -= 1;
self.total_size -= size;
self.histogram.remove(size)?;
} else {
self.count += 1;
self.total_size += size;
self.histogram.insert(size)?;
}
}
}
self.offset += row_count;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::arrow_data::ArrowEngineData;
use rstest::rstest;
use test_utils::{generate_batch, IntoArray};
fn size_batch(sizes: Vec<i64>) -> Box<dyn EngineData> {
let batch = generate_batch(vec![("size", sizes.into_array())]).unwrap();
Box::new(ArrowEngineData::new(batch))
}
struct TryComputeCase {
add_batches: Vec<Vec<i64>>,
remove_batches: Vec<Vec<i64>>,
expected_net_files: i64,
expected_net_bytes: i64,
}
#[rstest]
#[case::empty(TryComputeCase {
add_batches: vec![],
remove_batches: vec![],
expected_net_files: 0,
expected_net_bytes: 0,
})]
#[case::adds_only(TryComputeCase {
add_batches: vec![vec![100, 200, 300]],
remove_batches: vec![],
expected_net_files: 3,
expected_net_bytes: 600, // 600 = 100 + 200 + 300
})]
#[case::multiple_add_batches(TryComputeCase {
add_batches: vec![vec![100, 200], vec![300, 400, 500]],
remove_batches: vec![],
expected_net_files: 5,
expected_net_bytes: 1500, // 1500 = 100 + 200 + 300 + 400 + 500
})]
#[case::removes_only(TryComputeCase {
add_batches: vec![],
remove_batches: vec![vec![500, 700]],
expected_net_files: -2,
expected_net_bytes: -1200, // -1200 = -(500 + 700)
})]
#[case::adds_and_removes(TryComputeCase {
add_batches: vec![vec![100, 200], vec![300, 400]],
remove_batches: vec![vec![500], vec![600, 700]],
expected_net_files: 1,
expected_net_bytes: -800, // -800 = (100 + 200 + 300 + 400) -(500 + 600 + 700)
})]
fn test_try_compute(#[case] case: TryComputeCase) {
let adds: Vec<_> = case.add_batches.into_iter().map(size_batch).collect();
let removes: Vec<_> = case
.remove_batches
.into_iter()
.map(|sizes| FilteredEngineData::with_all_rows_selected(size_batch(sizes)))
.collect();
let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
assert_eq!(stats.net_files, case.expected_net_files);
assert_eq!(stats.net_bytes, case.expected_net_bytes);
}
#[test]
fn test_with_selection_vectors() {
let adds = vec![size_batch(vec![100, 200]), size_batch(vec![300])];
let removes = vec![
FilteredEngineData::with_all_rows_selected(size_batch(vec![400, 500])),
FilteredEngineData::try_new(size_batch(vec![600, 700, 800]), vec![false, true, true])
.unwrap(),
];
let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
assert_eq!(stats.net_files, -1); assert_eq!(stats.net_bytes, -1800); }
#[test]
fn try_compute_builds_delta_histogram_from_add_and_remove_sizes() {
let adds = vec![size_batch(vec![100, 200, 300])];
let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
vec![500, 700],
))];
let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
let delta = stats.net_histogram.unwrap();
assert_eq!(delta.file_counts[0], 1);
assert_eq!(delta.total_bytes[0], -600);
}
#[test]
fn try_compute_empty_batches_produce_zero_histogram() {
let stats = FileStatsDelta::try_compute_for_txn(&[], &[], None).unwrap();
let delta = stats.net_histogram.unwrap();
assert!(delta.file_counts.iter().all(|&c| c == 0));
assert!(delta.total_bytes.iter().all(|&b| b == 0));
}
#[test]
fn try_compute_histogram_with_selection_vectors() {
let adds = vec![size_batch(vec![100, 200])];
let removes = vec![FilteredEngineData::try_new(
size_batch(vec![300, 400, 500]),
vec![true, false, true], )
.unwrap()];
let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
let delta = stats.net_histogram.unwrap();
assert_eq!(delta.file_counts[0], 0);
assert_eq!(delta.total_bytes[0], -500);
}
#[test]
fn try_compute_with_custom_boundaries_uses_them() {
let boundaries: &[i64] = &[0, 200, 1000];
let adds = vec![size_batch(vec![50, 300, 1500])];
let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
vec![100, 500],
))];
let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(boundaries)).unwrap();
let delta = stats.net_histogram.unwrap();
assert_eq!(delta.sorted_bin_boundaries, vec![0, 200, 1000]);
assert_eq!(delta.file_counts, vec![0, 0, 1]);
assert_eq!(delta.total_bytes, vec![-50, -200, 1500]);
}
#[test]
fn try_compute_with_custom_boundaries_produces_mergeable_histogram() {
let boundaries = vec![0, 200, 1000];
let mut base = FileSizeHistogram::create_empty_with_boundaries(boundaries.clone()).unwrap();
base.insert(150).unwrap(); base.insert(500).unwrap();
let adds = vec![size_batch(vec![100, 300])];
let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
vec![150],
))];
let stats =
FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(&boundaries)).unwrap();
let delta = stats.net_histogram.unwrap();
let merged = base.try_apply_delta(&delta).unwrap();
assert_eq!(merged.file_counts, vec![1, 2, 0]); assert_eq!(merged.total_bytes, vec![100, 800, 0]); }
}