Skip to main content

delta_kernel/crc/
file_stats.rs

1//! File statistics and deltas for CRC tracking.
2//!
3//! [`FileStats`] represents absolute file-level statistics (count, size, histogram) for a table
4//! version. [`FileStatsDelta`] captures the net changes from a single commit as a single delta
5//! [`FileSizeHistogram`] (adds minus removes).
6//!
7//! [`FileStatsDelta`] captures how many files were added/removed and their total sizes. It can be
8//! produced from either:
9//! 1. In-memory transaction data via [`FileStatsDelta::try_compute_for_txn`]
10//! 2. A parsed .json commit file
11
12use std::sync::LazyLock;
13
14use super::FileSizeHistogram;
15use crate::engine_data::{FilteredEngineData, GetData, TypedGetData as _};
16use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType};
17use crate::utils::require;
18use crate::{DeltaResult, EngineData, Error, RowVisitor};
19
20/// File-level statistics for a table version: total file count, size, and histogram.
21///
22/// Obtained via [`Snapshot::get_file_stats_if_present`] or [`Crc::file_stats()`]. Returns
23/// `None` when the source CRC's `file_stats_state` is not `Complete`.
24///
25/// [`Snapshot::get_file_stats_if_present`]: crate::snapshot::Snapshot::get_file_stats_if_present
26/// [`Crc::file_stats()`]: super::Crc::file_stats
27#[derive(Debug, Clone, Default, PartialEq, Eq)]
28pub struct FileStats {
29    /// Number of active [`Add`](crate::actions::Add) file actions in this table version.
30    pub(crate) num_files: i64,
31    /// Total size of the table in bytes (sum of all active
32    /// [`Add`](crate::actions::Add) file sizes).
33    pub(crate) table_size_bytes: i64,
34    /// Size distribution of active files, if available.
35    pub(crate) file_size_histogram: Option<FileSizeHistogram>,
36}
37
38impl FileStats {
39    /// Returns the number of active [`Add`](crate::actions::Add) file actions in this table
40    /// version.
41    pub fn num_files(&self) -> i64 {
42        self.num_files
43    }
44
45    /// Returns the total size of the table in bytes (sum of all active
46    /// [`Add`](crate::actions::Add) file sizes).
47    pub fn table_size_bytes(&self) -> i64 {
48        self.table_size_bytes
49    }
50
51    /// Returns the size distribution of active files, if available.
52    pub fn file_size_histogram(&self) -> Option<&FileSizeHistogram> {
53        self.file_size_histogram.as_ref()
54    }
55}
56
57/// Gross file-change totals from a single commit, plus an optional net file-size histogram.
58#[derive(Debug, Clone, Default, PartialEq, Eq)]
59pub(crate) struct FileStatsDelta {
60    pub(crate) gross_add_files: u64,
61    pub(crate) gross_remove_files: u64,
62    pub(crate) gross_add_bytes: u64,
63    pub(crate) gross_remove_bytes: u64,
64    /// Net change in file size histogram (adds minus removes per bin). May contain negative
65    /// values in bins where more files were removed than added. `None` when the delta source
66    /// does not provide histogram data.
67    pub(crate) net_histogram: Option<FileSizeHistogram>,
68}
69
70const INCREMENTAL_SAFE_OPS: &[&str] = &[
71    "WRITE",
72    "MERGE",
73    "UPDATE",
74    "DELETE",
75    "OPTIMIZE",
76    "CREATE TABLE",
77    "REPLACE TABLE",
78    "CREATE TABLE AS SELECT",
79    "REPLACE TABLE AS SELECT",
80    "CREATE OR REPLACE TABLE AS SELECT",
81];
82
83/// Returns `true` if the given operation can be safely tracked by incremental file stats.
84///
85/// Incremental-safe operations produce add/remove actions whose net counts give correct file
86/// stats. Unknown or missing operations are treated as unsafe. For example, ANALYZE STATS
87/// re-adds existing files with updated statistics; naively counting those adds would
88/// double-count file stats.
89pub(crate) fn is_incremental_safe_operation(operation: &str) -> bool {
90    INCREMENTAL_SAFE_OPS.contains(&operation)
91}
92
93impl FileStatsDelta {
94    /// Net change in file count (added minus removed).
95    pub(crate) fn net_files(&self) -> i64 {
96        self.gross_add_files as i64 - self.gross_remove_files as i64
97    }
98
99    /// Net change in total bytes (added minus removed).
100    pub(crate) fn net_bytes(&self) -> i64 {
101        self.gross_add_bytes as i64 - self.gross_remove_bytes as i64
102    }
103
104    /// Compute file stats and a delta histogram from a transaction's staged add and remove
105    /// metadata.
106    ///
107    /// A commit writes three kinds of file actions:
108    ///   (1) Add actions (from `add_files_metadata`)
109    ///   (2) Remove actions (from `remove_files_metadata`)
110    ///   (3) DV update actions (which contain both a Remove and an Add for the same file at
111    ///       the same size).
112    ///
113    /// Only the first two need visiting -- DV updates have a net-zero effect on file counts,
114    /// sizes, and histograms.
115    ///
116    /// `bin_boundaries` specifies the histogram bin boundaries to use. When `Some`, the
117    /// delta histogram is built with those boundaries (matching the previous CRC's histogram).
118    /// When `None`, the standard default boundaries are used. Callers should pass the previous
119    /// CRC's boundaries when available so that `try_apply_delta` in [`Crc::apply`] succeeds.
120    pub(crate) fn try_compute_for_txn(
121        add_files_metadata: &[Box<dyn EngineData>],
122        remove_files_metadata: &[FilteredEngineData],
123        bin_boundaries: Option<&[i64]>,
124    ) -> DeltaResult<Self> {
125        let mut histogram = match bin_boundaries {
126            Some(b) => FileSizeHistogram::create_empty_with_boundaries(b.to_vec())?,
127            None => FileSizeHistogram::create_default(),
128        };
129        let mut gross_add_files = 0u64;
130        let mut gross_remove_files = 0u64;
131        let mut gross_add_bytes = 0u64;
132        let mut gross_remove_bytes = 0u64;
133
134        // Visit add files (insert into histogram). Every row is a file being added.
135        for batch in add_files_metadata {
136            let mut visitor = FileStatsVisitor::new(None, false, &mut histogram);
137            visitor.visit_rows_of(batch.as_ref())?;
138            gross_add_files += visitor.count;
139            gross_add_bytes += visitor.total_size;
140        }
141
142        // Visit remove files (remove from histogram). Each FilteredEngineData has its own
143        // selection vector, so we create a visitor per batch.
144        for filtered_batch in remove_files_metadata {
145            let sv = filtered_batch.selection_vector();
146            let sv_opt = if sv.is_empty() { None } else { Some(sv) };
147            let mut visitor = FileStatsVisitor::new(sv_opt, true, &mut histogram);
148            visitor.visit_rows_of(filtered_batch.data())?;
149            gross_remove_files += visitor.count;
150            gross_remove_bytes += visitor.total_size;
151        }
152
153        Ok(FileStatsDelta {
154            gross_add_files,
155            gross_remove_files,
156            gross_add_bytes,
157            gross_remove_bytes,
158            net_histogram: Some(histogram),
159        })
160    }
161}
162
163/// Read a file `size` (a non-negative byte count stored as `i64`) as `u64`, erroring on a
164/// negative size (corrupt input).
165pub(crate) fn size_to_u64(size: i64) -> DeltaResult<u64> {
166    u64::try_from(size)
167        .map_err(|_| Error::internal_error(format!("File size must be non-negative, got {size}")))
168}
169
170/// Visitor that extracts the `size` column from file metadata and updates a shared histogram.
171///
172/// `is_remove` selects whether each visited row is inserted into (add) or removed from (remove)
173/// the shared delta histogram. `count`/`total_size` always accumulate gross magnitude; the
174/// add/remove direction is the caller's to track.
175///
176/// Accepts an optional selection vector to filter which rows are visited. AddFiles pass `None`
177/// (count every row); RemoveFiles may pass `Some(sv)` from [`FilteredEngineData`] to skip rows
178/// that are not actually being removed.
179struct FileStatsVisitor<'sv, 'h> {
180    /// Optional selection vector. When `Some`, only rows marked `true` are counted. Rows beyond
181    /// the SV length are implicitly selected.
182    selection_vector: Option<&'sv [bool]>,
183    /// Offset into the selection vector, tracking position across multiple visit calls.
184    offset: usize,
185    /// Whether visited rows are removed from (vs added to) the histogram.
186    is_remove: bool,
187    /// Count of files visited.
188    count: u64,
189    /// Total bytes of files visited.
190    total_size: u64,
191    /// Shared histogram that all visitors (add and remove) write to.
192    histogram: &'h mut FileSizeHistogram,
193}
194
195impl<'sv, 'h> FileStatsVisitor<'sv, 'h> {
196    fn new(
197        selection_vector: Option<&'sv [bool]>,
198        is_remove: bool,
199        histogram: &'h mut FileSizeHistogram,
200    ) -> Self {
201        Self {
202            selection_vector,
203            offset: 0,
204            is_remove,
205            count: 0,
206            total_size: 0,
207            histogram,
208        }
209    }
210}
211
212impl RowVisitor for FileStatsVisitor<'_, '_> {
213    fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
214        static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
215            LazyLock::new(|| (vec![ColumnName::new(["size"])], vec![DataType::LONG]).into());
216        NAMES_AND_TYPES.as_ref()
217    }
218
219    fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
220        require!(
221            getters.len() == 1,
222            Error::InternalError(format!(
223                "Wrong number of FileStatsVisitor getters: {}",
224                getters.len()
225            ))
226        );
227        for i in 0..row_count {
228            let selected = match self.selection_vector {
229                Some(sv) => sv.get(self.offset + i).copied().unwrap_or(true),
230                None => true,
231            };
232            if selected {
233                let size: i64 = getters[0].get(i, "size")?;
234                self.count += 1;
235                self.total_size += size_to_u64(size)?;
236                if self.is_remove {
237                    self.histogram.remove(size)?;
238                } else {
239                    self.histogram.insert(size)?;
240                }
241            }
242        }
243        self.offset += row_count;
244        Ok(())
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use rstest::rstest;
251    use test_utils::{generate_batch, IntoArray};
252
253    use super::*;
254    use crate::engine::arrow_data::ArrowEngineData;
255
256    fn size_batch(sizes: Vec<i64>) -> Box<dyn EngineData> {
257        let batch = generate_batch(vec![("size", sizes.into_array())]).unwrap();
258        Box::new(ArrowEngineData::new(batch))
259    }
260
261    struct TryComputeCase {
262        add_batches: Vec<Vec<i64>>,
263        remove_batches: Vec<Vec<i64>>,
264        expected_net_files: i64,
265        expected_net_bytes: i64,
266    }
267
268    #[rstest]
269    #[case::empty(TryComputeCase {
270        add_batches: vec![],
271        remove_batches: vec![],
272        expected_net_files: 0,
273        expected_net_bytes: 0,
274    })]
275    #[case::adds_only(TryComputeCase {
276        add_batches: vec![vec![100, 200, 300]],
277        remove_batches: vec![],
278        expected_net_files: 3,
279        expected_net_bytes: 600, // 600 = 100 + 200 + 300
280    })]
281    #[case::multiple_add_batches(TryComputeCase {
282        add_batches: vec![vec![100, 200], vec![300, 400, 500]],
283        remove_batches: vec![],
284        expected_net_files: 5,
285        expected_net_bytes: 1500, // 1500 = 100 + 200 + 300 + 400 + 500
286    })]
287    #[case::removes_only(TryComputeCase {
288        add_batches: vec![],
289        remove_batches: vec![vec![500, 700]],
290        expected_net_files: -2,
291        expected_net_bytes: -1200, // -1200 = -(500 + 700)
292    })]
293    #[case::adds_and_removes(TryComputeCase {
294        add_batches: vec![vec![100, 200], vec![300, 400]],
295        remove_batches: vec![vec![500], vec![600, 700]],
296        expected_net_files: 1,
297        expected_net_bytes: -800, // -800 = (100 + 200 + 300 + 400) -(500 + 600 + 700)
298    })]
299    fn test_try_compute(#[case] case: TryComputeCase) {
300        let adds: Vec<_> = case.add_batches.into_iter().map(size_batch).collect();
301        let removes: Vec<_> = case
302            .remove_batches
303            .into_iter()
304            .map(|sizes| FilteredEngineData::with_all_rows_selected(size_batch(sizes)))
305            .collect();
306        let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
307        assert_eq!(stats.net_files(), case.expected_net_files);
308        assert_eq!(stats.net_bytes(), case.expected_net_bytes);
309    }
310
311    #[test]
312    fn test_with_selection_vectors() {
313        // Multiple add batches + multiple remove batches with mixed SV scenarios
314        let adds = vec![size_batch(vec![100, 200]), size_batch(vec![300])];
315        let removes = vec![
316            // First remove batch: all rows selected (no SV)
317            FilteredEngineData::with_all_rows_selected(size_batch(vec![400, 500])),
318            // Second remove batch: partial selection (600 skipped)
319            FilteredEngineData::try_new(size_batch(vec![600, 700, 800]), vec![false, true, true])
320                .unwrap(),
321        ];
322        let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
323        // adds: 3 files, 600 bytes (100 + 200 + 300)
324        // removes: 4 files, 2400 bytes (400 + 500 + 700 + 800)
325        assert_eq!(stats.net_files(), -1); // 3 - 4
326        assert_eq!(stats.net_bytes(), -1800); // 600 - 2400
327        assert_eq!(stats.gross_add_files, 3);
328        assert_eq!(stats.gross_remove_files, 4);
329        assert_eq!(stats.gross_add_bytes, 600);
330        assert_eq!(stats.gross_remove_bytes, 2400);
331    }
332
333    #[test]
334    fn try_compute_builds_delta_histogram_from_add_and_remove_sizes() {
335        let adds = vec![size_batch(vec![100, 200, 300])];
336        let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
337            vec![500, 700],
338        ))];
339        let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
340
341        // All sizes < 8KB so they all land in bin 0. Net: 3 adds - 2 removes = 1 file,
342        // 600 - 1200 = -600 bytes.
343        let delta = stats.net_histogram.unwrap();
344        assert_eq!(delta.file_counts[0], 1);
345        assert_eq!(delta.total_bytes[0], -600);
346    }
347
348    #[test]
349    fn try_compute_empty_batches_produce_zero_histogram() {
350        let stats = FileStatsDelta::try_compute_for_txn(&[], &[], None).unwrap();
351        let delta = stats.net_histogram.unwrap();
352        assert!(delta.file_counts.iter().all(|&c| c == 0));
353        assert!(delta.total_bytes.iter().all(|&b| b == 0));
354    }
355
356    #[test]
357    fn try_compute_histogram_with_selection_vectors() {
358        let adds = vec![size_batch(vec![100, 200])];
359        let removes = vec![FilteredEngineData::try_new(
360            size_batch(vec![300, 400, 500]),
361            vec![true, false, true], // 300 selected, 400 skipped, 500 selected
362        )
363        .unwrap()];
364        let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
365
366        // Net bin 0: 2 adds - 2 removes = 0 files, 300 - 800 = -500 bytes
367        let delta = stats.net_histogram.unwrap();
368        assert_eq!(delta.file_counts[0], 0);
369        assert_eq!(delta.total_bytes[0], -500);
370    }
371
372    #[test]
373    fn try_compute_with_custom_boundaries_uses_them() {
374        // Custom 3-bin histogram: [0, 200) [200, 1000) [1000, inf)
375        let boundaries: &[i64] = &[0, 200, 1000];
376        let adds = vec![size_batch(vec![50, 300, 1500])];
377        let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
378            vec![100, 500],
379        ))];
380        let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(boundaries)).unwrap();
381
382        let delta = stats.net_histogram.unwrap();
383        assert_eq!(delta.sorted_bin_boundaries, vec![0, 200, 1000]);
384        // Net per bin: (1-1, 1-1, 1-0) = (0, 0, 1)
385        assert_eq!(delta.file_counts, vec![0, 0, 1]);
386        // Net per bin: (50-100, 300-500, 1500-0) = (-50, -200, 1500)
387        assert_eq!(delta.total_bytes, vec![-50, -200, 1500]);
388    }
389
390    #[test]
391    fn try_compute_with_custom_boundaries_produces_mergeable_histogram() {
392        // Build a base histogram with custom boundaries, then verify delta merges correctly.
393        let boundaries = vec![0, 200, 1000];
394        let mut base = FileSizeHistogram::create_empty_with_boundaries(boundaries.clone()).unwrap();
395        base.insert(150).unwrap(); // bin 0
396        base.insert(500).unwrap(); // bin 1
397
398        let adds = vec![size_batch(vec![100, 300])];
399        let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
400            vec![150],
401        ))];
402        let stats =
403            FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(&boundaries)).unwrap();
404
405        let delta = stats.net_histogram.unwrap();
406        let merged = base.try_apply_delta(&delta).unwrap();
407        assert_eq!(merged.file_counts, vec![1, 2, 0]); // (1+1-1), (1+1-0), (0+0-0)
408        assert_eq!(merged.total_bytes, vec![100, 800, 0]); // (150+100-150), (500+300-0)
409    }
410}