1use std::sync::LazyLock;
13
14use super::FileSizeHistogram;
15use crate::engine_data::{FilteredEngineData, GetData, TypedGetData as _};
16use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType};
17use crate::utils::require;
18use crate::{DeltaResult, EngineData, Error, RowVisitor};
19
20#[derive(Debug, Clone, Default, PartialEq, Eq)]
28pub struct FileStats {
29 pub(crate) num_files: i64,
31 pub(crate) table_size_bytes: i64,
34 pub(crate) file_size_histogram: Option<FileSizeHistogram>,
36}
37
38impl FileStats {
39 pub fn num_files(&self) -> i64 {
42 self.num_files
43 }
44
45 pub fn table_size_bytes(&self) -> i64 {
48 self.table_size_bytes
49 }
50
51 pub fn file_size_histogram(&self) -> Option<&FileSizeHistogram> {
53 self.file_size_histogram.as_ref()
54 }
55}
56
57#[derive(Debug, Clone, Default, PartialEq, Eq)]
59pub(crate) struct FileStatsDelta {
60 pub(crate) gross_add_files: u64,
61 pub(crate) gross_remove_files: u64,
62 pub(crate) gross_add_bytes: u64,
63 pub(crate) gross_remove_bytes: u64,
64 pub(crate) net_histogram: Option<FileSizeHistogram>,
68}
69
70const INCREMENTAL_SAFE_OPS: &[&str] = &[
71 "WRITE",
72 "MERGE",
73 "UPDATE",
74 "DELETE",
75 "OPTIMIZE",
76 "CREATE TABLE",
77 "REPLACE TABLE",
78 "CREATE TABLE AS SELECT",
79 "REPLACE TABLE AS SELECT",
80 "CREATE OR REPLACE TABLE AS SELECT",
81];
82
83pub(crate) fn is_incremental_safe_operation(operation: &str) -> bool {
90 INCREMENTAL_SAFE_OPS.contains(&operation)
91}
92
93impl FileStatsDelta {
94 pub(crate) fn net_files(&self) -> i64 {
96 self.gross_add_files as i64 - self.gross_remove_files as i64
97 }
98
99 pub(crate) fn net_bytes(&self) -> i64 {
101 self.gross_add_bytes as i64 - self.gross_remove_bytes as i64
102 }
103
104 pub(crate) fn try_compute_for_txn(
121 add_files_metadata: &[Box<dyn EngineData>],
122 remove_files_metadata: &[FilteredEngineData],
123 bin_boundaries: Option<&[i64]>,
124 ) -> DeltaResult<Self> {
125 let mut histogram = match bin_boundaries {
126 Some(b) => FileSizeHistogram::create_empty_with_boundaries(b.to_vec())?,
127 None => FileSizeHistogram::create_default(),
128 };
129 let mut gross_add_files = 0u64;
130 let mut gross_remove_files = 0u64;
131 let mut gross_add_bytes = 0u64;
132 let mut gross_remove_bytes = 0u64;
133
134 for batch in add_files_metadata {
136 let mut visitor = FileStatsVisitor::new(None, false, &mut histogram);
137 visitor.visit_rows_of(batch.as_ref())?;
138 gross_add_files += visitor.count;
139 gross_add_bytes += visitor.total_size;
140 }
141
142 for filtered_batch in remove_files_metadata {
145 let sv = filtered_batch.selection_vector();
146 let sv_opt = if sv.is_empty() { None } else { Some(sv) };
147 let mut visitor = FileStatsVisitor::new(sv_opt, true, &mut histogram);
148 visitor.visit_rows_of(filtered_batch.data())?;
149 gross_remove_files += visitor.count;
150 gross_remove_bytes += visitor.total_size;
151 }
152
153 Ok(FileStatsDelta {
154 gross_add_files,
155 gross_remove_files,
156 gross_add_bytes,
157 gross_remove_bytes,
158 net_histogram: Some(histogram),
159 })
160 }
161}
162
163pub(crate) fn size_to_u64(size: i64) -> DeltaResult<u64> {
166 u64::try_from(size)
167 .map_err(|_| Error::internal_error(format!("File size must be non-negative, got {size}")))
168}
169
170struct FileStatsVisitor<'sv, 'h> {
180 selection_vector: Option<&'sv [bool]>,
183 offset: usize,
185 is_remove: bool,
187 count: u64,
189 total_size: u64,
191 histogram: &'h mut FileSizeHistogram,
193}
194
195impl<'sv, 'h> FileStatsVisitor<'sv, 'h> {
196 fn new(
197 selection_vector: Option<&'sv [bool]>,
198 is_remove: bool,
199 histogram: &'h mut FileSizeHistogram,
200 ) -> Self {
201 Self {
202 selection_vector,
203 offset: 0,
204 is_remove,
205 count: 0,
206 total_size: 0,
207 histogram,
208 }
209 }
210}
211
212impl RowVisitor for FileStatsVisitor<'_, '_> {
213 fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
214 static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
215 LazyLock::new(|| (vec![ColumnName::new(["size"])], vec![DataType::LONG]).into());
216 NAMES_AND_TYPES.as_ref()
217 }
218
219 fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
220 require!(
221 getters.len() == 1,
222 Error::InternalError(format!(
223 "Wrong number of FileStatsVisitor getters: {}",
224 getters.len()
225 ))
226 );
227 for i in 0..row_count {
228 let selected = match self.selection_vector {
229 Some(sv) => sv.get(self.offset + i).copied().unwrap_or(true),
230 None => true,
231 };
232 if selected {
233 let size: i64 = getters[0].get(i, "size")?;
234 self.count += 1;
235 self.total_size += size_to_u64(size)?;
236 if self.is_remove {
237 self.histogram.remove(size)?;
238 } else {
239 self.histogram.insert(size)?;
240 }
241 }
242 }
243 self.offset += row_count;
244 Ok(())
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use rstest::rstest;
251 use test_utils::{generate_batch, IntoArray};
252
253 use super::*;
254 use crate::engine::arrow_data::ArrowEngineData;
255
256 fn size_batch(sizes: Vec<i64>) -> Box<dyn EngineData> {
257 let batch = generate_batch(vec![("size", sizes.into_array())]).unwrap();
258 Box::new(ArrowEngineData::new(batch))
259 }
260
261 struct TryComputeCase {
262 add_batches: Vec<Vec<i64>>,
263 remove_batches: Vec<Vec<i64>>,
264 expected_net_files: i64,
265 expected_net_bytes: i64,
266 }
267
268 #[rstest]
269 #[case::empty(TryComputeCase {
270 add_batches: vec![],
271 remove_batches: vec![],
272 expected_net_files: 0,
273 expected_net_bytes: 0,
274 })]
275 #[case::adds_only(TryComputeCase {
276 add_batches: vec![vec![100, 200, 300]],
277 remove_batches: vec![],
278 expected_net_files: 3,
279 expected_net_bytes: 600, })]
281 #[case::multiple_add_batches(TryComputeCase {
282 add_batches: vec![vec![100, 200], vec![300, 400, 500]],
283 remove_batches: vec![],
284 expected_net_files: 5,
285 expected_net_bytes: 1500, })]
287 #[case::removes_only(TryComputeCase {
288 add_batches: vec![],
289 remove_batches: vec![vec![500, 700]],
290 expected_net_files: -2,
291 expected_net_bytes: -1200, })]
293 #[case::adds_and_removes(TryComputeCase {
294 add_batches: vec![vec![100, 200], vec![300, 400]],
295 remove_batches: vec![vec![500], vec![600, 700]],
296 expected_net_files: 1,
297 expected_net_bytes: -800, })]
299 fn test_try_compute(#[case] case: TryComputeCase) {
300 let adds: Vec<_> = case.add_batches.into_iter().map(size_batch).collect();
301 let removes: Vec<_> = case
302 .remove_batches
303 .into_iter()
304 .map(|sizes| FilteredEngineData::with_all_rows_selected(size_batch(sizes)))
305 .collect();
306 let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
307 assert_eq!(stats.net_files(), case.expected_net_files);
308 assert_eq!(stats.net_bytes(), case.expected_net_bytes);
309 }
310
311 #[test]
312 fn test_with_selection_vectors() {
313 let adds = vec![size_batch(vec![100, 200]), size_batch(vec![300])];
315 let removes = vec![
316 FilteredEngineData::with_all_rows_selected(size_batch(vec![400, 500])),
318 FilteredEngineData::try_new(size_batch(vec![600, 700, 800]), vec![false, true, true])
320 .unwrap(),
321 ];
322 let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
323 assert_eq!(stats.net_files(), -1); assert_eq!(stats.net_bytes(), -1800); assert_eq!(stats.gross_add_files, 3);
328 assert_eq!(stats.gross_remove_files, 4);
329 assert_eq!(stats.gross_add_bytes, 600);
330 assert_eq!(stats.gross_remove_bytes, 2400);
331 }
332
333 #[test]
334 fn try_compute_builds_delta_histogram_from_add_and_remove_sizes() {
335 let adds = vec![size_batch(vec![100, 200, 300])];
336 let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
337 vec![500, 700],
338 ))];
339 let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
340
341 let delta = stats.net_histogram.unwrap();
344 assert_eq!(delta.file_counts[0], 1);
345 assert_eq!(delta.total_bytes[0], -600);
346 }
347
348 #[test]
349 fn try_compute_empty_batches_produce_zero_histogram() {
350 let stats = FileStatsDelta::try_compute_for_txn(&[], &[], None).unwrap();
351 let delta = stats.net_histogram.unwrap();
352 assert!(delta.file_counts.iter().all(|&c| c == 0));
353 assert!(delta.total_bytes.iter().all(|&b| b == 0));
354 }
355
356 #[test]
357 fn try_compute_histogram_with_selection_vectors() {
358 let adds = vec![size_batch(vec![100, 200])];
359 let removes = vec![FilteredEngineData::try_new(
360 size_batch(vec![300, 400, 500]),
361 vec![true, false, true], )
363 .unwrap()];
364 let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, None).unwrap();
365
366 let delta = stats.net_histogram.unwrap();
368 assert_eq!(delta.file_counts[0], 0);
369 assert_eq!(delta.total_bytes[0], -500);
370 }
371
372 #[test]
373 fn try_compute_with_custom_boundaries_uses_them() {
374 let boundaries: &[i64] = &[0, 200, 1000];
376 let adds = vec![size_batch(vec![50, 300, 1500])];
377 let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
378 vec![100, 500],
379 ))];
380 let stats = FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(boundaries)).unwrap();
381
382 let delta = stats.net_histogram.unwrap();
383 assert_eq!(delta.sorted_bin_boundaries, vec![0, 200, 1000]);
384 assert_eq!(delta.file_counts, vec![0, 0, 1]);
386 assert_eq!(delta.total_bytes, vec![-50, -200, 1500]);
388 }
389
390 #[test]
391 fn try_compute_with_custom_boundaries_produces_mergeable_histogram() {
392 let boundaries = vec![0, 200, 1000];
394 let mut base = FileSizeHistogram::create_empty_with_boundaries(boundaries.clone()).unwrap();
395 base.insert(150).unwrap(); base.insert(500).unwrap(); let adds = vec![size_batch(vec![100, 300])];
399 let removes = vec![FilteredEngineData::with_all_rows_selected(size_batch(
400 vec![150],
401 ))];
402 let stats =
403 FileStatsDelta::try_compute_for_txn(&adds, &removes, Some(&boundaries)).unwrap();
404
405 let delta = stats.net_histogram.unwrap();
406 let merged = base.try_apply_delta(&delta).unwrap();
407 assert_eq!(merged.file_counts, vec![1, 2, 0]); assert_eq!(merged.total_bytes, vec![100, 800, 0]); }
410}