1use std::sync::Arc;
24
25use crate::PartitionedFile;
26use crate::file_groups::FileGroup;
27
28use arrow::array::RecordBatch;
29use arrow::compute::SortColumn;
30use arrow::datatypes::SchemaRef;
31use arrow::row::{Row, Rows};
32use datafusion_common::stats::{NdvFallback, Precision};
33use datafusion_common::{
34 DataFusionError, Result, ScalarValue, plan_datafusion_err, plan_err,
35};
36use datafusion_physical_expr::expressions::Column;
37use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
38use datafusion_physical_plan::{ColumnStatistics, Statistics};
39
40use futures::{Stream, StreamExt};
41
42pub(crate) struct MinMaxStatistics {
46 min_by_sort_order: Rows,
47 max_by_sort_order: Rows,
48 sort_order: LexOrdering,
49}
50
51impl MinMaxStatistics {
52 #[expect(unused)]
54 pub fn sort_order(&self) -> &LexOrdering {
55 &self.sort_order
56 }
57
58 #[expect(unused)]
60 pub fn min(&'_ self, idx: usize) -> Row<'_> {
61 self.min_by_sort_order.row(idx)
62 }
63
64 pub fn max(&'_ self, idx: usize) -> Row<'_> {
66 self.max_by_sort_order.row(idx)
67 }
68
69 pub fn new_from_files<'a>(
70 projected_sort_order: &LexOrdering, projected_schema: &SchemaRef, projection: Option<&[usize]>, files: impl IntoIterator<Item = &'a PartitionedFile>,
74 ) -> Result<Self> {
75 let Some(statistics_and_partition_values) = files
76 .into_iter()
77 .map(|file| {
78 file.statistics
79 .as_ref()
80 .zip(Some(file.partition_values.as_slice()))
81 })
82 .collect::<Option<Vec<_>>>()
83 else {
84 return plan_err!("Parquet file missing statistics");
85 };
86
87 let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
89 Ok(statistics_and_partition_values
90 .iter()
91 .map(|(s, pv)| {
92 if i < s.column_statistics.len() {
93 s.column_statistics[i]
94 .min_value
95 .get_value()
96 .cloned()
97 .zip(s.column_statistics[i].max_value.get_value().cloned())
98 .ok_or_else(|| plan_datafusion_err!("statistics not found"))
99 } else {
100 let partition_value = &pv[i - s.column_statistics.len()];
101 Ok((partition_value.clone(), partition_value.clone()))
102 }
103 })
104 .collect::<Result<Vec<_>>>()?
105 .into_iter()
106 .unzip())
107 };
108
109 let Some(sort_columns) =
110 sort_columns_from_physical_sort_exprs(projected_sort_order)
111 else {
112 return plan_err!("sort expression must be on column");
113 };
114
115 let min_max_schema = Arc::new(
117 projected_schema
118 .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
119 );
120
121 let min_max_sort_order = projected_sort_order
122 .iter()
123 .zip(sort_columns.iter())
124 .enumerate()
125 .map(|(idx, (sort_expr, col))| {
126 let expr = Arc::new(Column::new(col.name(), idx));
127 PhysicalSortExpr::new(expr, sort_expr.options)
128 });
129 let min_max_sort_order = LexOrdering::new(min_max_sort_order).unwrap();
131
132 let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
133 .iter()
134 .map(|c| {
135 let i = projection
139 .map(|p| p[c.index()])
140 .unwrap_or_else(|| c.index());
141
142 let (min, max) = get_min_max(i).map_err(|e| {
143 e.context(format!("get min/max for column: '{}'", c.name()))
144 })?;
145 Ok((
146 ScalarValue::iter_to_array(min)?,
147 ScalarValue::iter_to_array(max)?,
148 ))
149 })
150 .collect::<Result<Vec<_>>>()
151 .map_err(|e| e.context("collect min/max values"))?
152 .into_iter()
153 .unzip();
154
155 let min_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), min_values)
156 .map_err(|e| {
157 DataFusionError::ArrowError(
158 Box::new(e),
159 Some("\ncreate min batch".to_string()),
160 )
161 })?;
162 let max_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), max_values)
163 .map_err(|e| {
164 DataFusionError::ArrowError(
165 Box::new(e),
166 Some("\ncreate max batch".to_string()),
167 )
168 })?;
169
170 Self::new(&min_max_sort_order, &min_max_schema, min_batch, max_batch)
171 }
172
173 #[expect(clippy::needless_pass_by_value)]
174 pub fn new(
175 sort_order: &LexOrdering,
176 schema: &SchemaRef,
177 min_values: RecordBatch,
178 max_values: RecordBatch,
179 ) -> Result<Self> {
180 use arrow::row::*;
181
182 let sort_fields = sort_order
183 .iter()
184 .map(|expr| {
185 expr.expr
186 .data_type(schema)
187 .map(|data_type| SortField::new_with_options(data_type, expr.options))
188 })
189 .collect::<Result<Vec<_>>>()
190 .map_err(|e| e.context("create sort fields"))?;
191 let converter = RowConverter::new(sort_fields)?;
192
193 let Some(sort_columns) = sort_columns_from_physical_sort_exprs(sort_order) else {
194 return plan_err!("sort expression must be on column");
195 };
196
197 let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
199 .iter()
200 .zip(sort_columns.iter().copied())
201 .map(|(sort_expr, column)| {
202 let maxes = max_values.column_by_name(column.name());
203 let mins = min_values.column_by_name(column.name());
204 let opt_value = if sort_expr.options.descending {
205 maxes.zip(mins)
206 } else {
207 mins.zip(maxes)
208 };
209 opt_value.ok_or_else(|| {
210 plan_datafusion_err!(
211 "missing column in MinMaxStatistics::new: '{}'",
212 column.name()
213 )
214 })
215 })
216 .collect::<Result<Vec<_>>>()?
217 .into_iter()
218 .unzip();
219
220 let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
221 let values = RecordBatch::try_new(
222 min_values.schema(),
223 cols.into_iter().cloned().collect(),
224 )?;
225 let sorting_columns = sort_order
226 .iter()
227 .zip(sort_columns.iter().copied())
228 .map(|(sort_expr, column)| {
229 let schema = values.schema();
230 let idx = schema.index_of(column.name())?;
231
232 Ok(SortColumn {
233 values: Arc::clone(values.column(idx)),
234 options: Some(sort_expr.options),
235 })
236 })
237 .collect::<Result<Vec<_>>>()
238 .map_err(|e| e.context("create sorting columns"))?;
239 converter
240 .convert_columns(
241 &sorting_columns
242 .into_iter()
243 .map(|c| c.values)
244 .collect::<Vec<_>>(),
245 )
246 .map_err(|e| {
247 DataFusionError::ArrowError(
248 Box::new(e),
249 Some("convert columns".to_string()),
250 )
251 })
252 });
253
254 Ok(Self {
255 min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
256 max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
257 sort_order: sort_order.clone(),
258 })
259 }
260
261 pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
263 let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
264 sort.sort_unstable_by_key(|(_, row)| *row);
265 sort
266 }
267
268 pub fn is_sorted(&self) -> bool {
271 self.max_by_sort_order
272 .iter()
273 .zip(self.min_by_sort_order.iter().skip(1))
274 .all(|(max, next_min)| max <= next_min)
275 }
276}
277
278fn sort_columns_from_physical_sort_exprs(
279 sort_order: &LexOrdering,
280) -> Option<Vec<&Column>> {
281 sort_order
282 .iter()
283 .map(|expr| expr.expr.downcast_ref::<Column>())
284 .collect()
285}
286
287fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) {
288 summary_statistics.num_rows = file_stats.num_rows;
289 summary_statistics.total_byte_size = file_stats.total_byte_size;
290
291 for (summary_col_stats, file_col_stats) in summary_statistics
292 .column_statistics
293 .iter_mut()
294 .zip(file_stats.column_statistics.iter())
295 {
296 summary_col_stats.null_count = file_col_stats.null_count;
297 summary_col_stats.max_value = file_col_stats.max_value.clone();
298 summary_col_stats.min_value = file_col_stats.min_value.clone();
299 summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type();
300 summary_col_stats.byte_size = file_col_stats.byte_size;
301 }
302}
303
304fn merge_summary_statistics(
305 summary_statistics: &mut Statistics,
306 file_stats: &Statistics,
307) {
308 summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows);
309 summary_statistics.total_byte_size = summary_statistics
310 .total_byte_size
311 .add(&file_stats.total_byte_size);
312
313 for (summary_col_stats, file_col_stats) in summary_statistics
314 .column_statistics
315 .iter_mut()
316 .zip(file_stats.column_statistics.iter())
317 {
318 let ColumnStatistics {
319 null_count: file_nc,
320 max_value: file_max,
321 min_value: file_min,
322 sum_value: file_sum,
323 distinct_count: _,
324 byte_size: file_sbs,
325 } = file_col_stats;
326
327 summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc);
328 summary_col_stats.max_value = summary_col_stats.max_value.max(file_max);
329 summary_col_stats.min_value = summary_col_stats.min_value.min(file_min);
330 summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum);
331 summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs);
332 }
333}
334
335fn seed_first_file_statistics(
336 limit_num_rows: &mut Precision<usize>,
337 summary_statistics: &mut Statistics,
338 file_stats: &Statistics,
339 collect_stats: bool,
340) {
341 *limit_num_rows = file_stats.num_rows;
342
343 if collect_stats {
344 seed_summary_statistics(summary_statistics, file_stats);
345 }
346}
347
348fn merge_file_statistics(
349 limit_num_rows: &mut Precision<usize>,
350 summary_statistics: &mut Statistics,
351 file_stats: &Statistics,
352 collect_stats: bool,
353) {
354 *limit_num_rows = limit_num_rows.add(&file_stats.num_rows);
355
356 if collect_stats {
357 merge_summary_statistics(summary_statistics, file_stats);
358 }
359}
360
361#[deprecated(
367 since = "47.0.0",
368 note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
369)]
370#[cfg_attr(not(test), expect(unused))]
371pub async fn get_statistics_with_limit(
372 all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
373 file_schema: SchemaRef,
374 limit: Option<usize>,
375 collect_stats: bool,
376) -> Result<(FileGroup, Statistics)> {
377 let mut result_files = FileGroup::default();
378 let size = file_schema.fields().len();
385 let mut summary_statistics = Statistics {
386 num_rows: Precision::Absent,
387 total_byte_size: Precision::Absent,
388 column_statistics: vec![ColumnStatistics::default(); size],
389 };
390 let mut limit_num_rows = Precision::<usize>::Absent;
393
394 let mut all_files = Box::pin(all_files.fuse());
396
397 if let Some(first_file) = all_files.next().await {
398 let (mut file, file_stats) = first_file?;
399 file.statistics = Some(Arc::clone(&file_stats));
400 result_files.push(file);
401
402 seed_first_file_statistics(
403 &mut limit_num_rows,
404 &mut summary_statistics,
405 &file_stats,
406 collect_stats,
407 );
408
409 let conservative_num_rows = match limit_num_rows {
414 Precision::Exact(nr) => nr,
415 _ => usize::MIN,
416 };
417 if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
418 while let Some(current) = all_files.next().await {
419 let (mut file, file_stats) = current?;
420 file.statistics = Some(Arc::clone(&file_stats));
421 result_files.push(file);
422 merge_file_statistics(
423 &mut limit_num_rows,
424 &mut summary_statistics,
425 &file_stats,
426 collect_stats,
427 );
428
429 if limit_num_rows.get_value().unwrap_or(&usize::MIN)
434 > &limit.unwrap_or(usize::MAX)
435 {
436 break;
437 }
438 }
439 }
440 };
441
442 let mut statistics = summary_statistics;
443 if all_files.next().await.is_some() {
444 statistics = statistics.to_inexact()
448 }
449
450 Ok((result_files, statistics))
451}
452
453#[expect(clippy::needless_pass_by_value)]
469pub fn compute_file_group_statistics(
470 file_group: FileGroup,
471 file_schema: SchemaRef,
472 collect_stats: bool,
473) -> Result<FileGroup> {
474 if !collect_stats {
475 return Ok(file_group);
476 }
477
478 let file_group_stats = file_group.iter().filter_map(|file| {
479 let stats = file.statistics.as_ref()?;
480 Some(stats.as_ref())
481 });
482 let statistics = Statistics::try_merge_iter_with_ndv_fallback(
483 file_group_stats,
484 &file_schema,
485 NdvFallback::Max,
486 )?;
487
488 Ok(file_group.with_statistics(Arc::new(statistics)))
489}
490
491#[expect(clippy::needless_pass_by_value)]
509pub fn compute_all_files_statistics(
510 file_groups: Vec<FileGroup>,
511 table_schema: SchemaRef,
512 collect_stats: bool,
513 inexact_stats: bool,
514) -> Result<(Vec<FileGroup>, Statistics)> {
515 let file_groups_with_stats = file_groups
516 .into_iter()
517 .map(|file_group| {
518 compute_file_group_statistics(
519 file_group,
520 Arc::clone(&table_schema),
521 collect_stats,
522 )
523 })
524 .collect::<Result<Vec<_>>>()?;
525
526 let file_groups_statistics = file_groups_with_stats
528 .iter()
529 .filter_map(|file_group| file_group.file_statistics(None));
530
531 let mut statistics = Statistics::try_merge_iter_with_ndv_fallback(
532 file_groups_statistics,
533 &table_schema,
534 NdvFallback::Max,
535 )?;
536
537 if inexact_stats {
538 statistics = statistics.to_inexact()
539 }
540
541 Ok((file_groups_with_stats, statistics))
542}
543
544#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
545pub fn add_row_stats(
546 file_num_rows: Precision<usize>,
547 num_rows: Precision<usize>,
548) -> Precision<usize> {
549 file_num_rows.add(&num_rows)
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use crate::PartitionedFile;
556 use crate::file_groups::FileGroup;
557 use arrow::datatypes::{DataType, Field, Schema};
558 use futures::stream;
559
560 fn file_stats(sum: u32) -> Statistics {
561 Statistics {
562 num_rows: Precision::Exact(1),
563 total_byte_size: Precision::Exact(4),
564 column_statistics: vec![ColumnStatistics {
565 null_count: Precision::Exact(0),
566 max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
567 min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
568 sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
569 distinct_count: Precision::Exact(1),
570 byte_size: Precision::Exact(4),
571 }],
572 }
573 }
574
575 fn test_schema() -> SchemaRef {
576 Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
577 }
578
579 fn make_file_stats(
580 num_rows: usize,
581 total_byte_size: usize,
582 col_stats: ColumnStatistics,
583 ) -> Arc<Statistics> {
584 Arc::new(Statistics {
585 num_rows: Precision::Exact(num_rows),
586 total_byte_size: Precision::Exact(total_byte_size),
587 column_statistics: vec![col_stats],
588 })
589 }
590
591 fn rich_col_stats(
592 null_count: usize,
593 min: i64,
594 max: i64,
595 sum: i64,
596 byte_size: usize,
597 ) -> ColumnStatistics {
598 ColumnStatistics {
599 null_count: Precision::Exact(null_count),
600 max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
601 min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
602 distinct_count: Precision::Absent,
603 sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
604 byte_size: Precision::Exact(byte_size),
605 }
606 }
607
608 fn utf8_file_stats(ndv: usize, min: &str, max: &str) -> Statistics {
609 Statistics {
610 num_rows: Precision::Exact(1),
611 total_byte_size: Precision::Exact(16),
612 column_statistics: vec![ColumnStatistics {
613 null_count: Precision::Exact(0),
614 max_value: Precision::Exact(ScalarValue::Utf8(Some(max.to_string()))),
615 min_value: Precision::Exact(ScalarValue::Utf8(Some(min.to_string()))),
616 sum_value: Precision::Absent,
617 distinct_count: Precision::Exact(ndv),
618 byte_size: Precision::Exact(16),
619 }],
620 }
621 }
622
623 fn file_with_stats(path: &str, stats: Statistics) -> PartitionedFile {
624 PartitionedFile::new(path, 1).with_statistics(Arc::new(stats))
625 }
626 #[tokio::test]
627 #[expect(deprecated)]
628 async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
629 -> Result<()> {
630 let schema =
631 Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
632
633 let files = stream::iter(vec![Ok((
634 PartitionedFile::new("f1.parquet", 1),
635 Arc::new(file_stats(100)),
636 ))]);
637
638 let (_group, stats) =
639 get_statistics_with_limit(files, schema, None, true).await?;
640
641 assert_eq!(
642 stats.column_statistics[0].sum_value,
643 Precision::Exact(ScalarValue::UInt64(Some(100)))
644 );
645
646 Ok(())
647 }
648
649 #[tokio::test]
650 #[expect(deprecated)]
651 async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
652 -> Result<()> {
653 let schema =
654 Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
655
656 let files = stream::iter(vec![
657 Ok((
658 PartitionedFile::new("f1.parquet", 1),
659 Arc::new(file_stats(100)),
660 )),
661 Ok((
662 PartitionedFile::new("f2.parquet", 1),
663 Arc::new(file_stats(200)),
664 )),
665 ]);
666
667 let (_group, stats) =
668 get_statistics_with_limit(files, schema, None, true).await?;
669
670 assert_eq!(
671 stats.column_statistics[0].sum_value,
672 Precision::Exact(ScalarValue::UInt64(Some(300)))
673 );
674
675 Ok(())
676 }
677
678 #[tokio::test]
679 #[expect(deprecated)]
680 async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
681 let all_files = stream::iter(vec![
682 Ok((
683 PartitionedFile::new("first.parquet", 10),
684 make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
685 )),
686 Ok((
687 PartitionedFile::new("second.parquet", 20),
688 make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
689 )),
690 ]);
691
692 let (_files, statistics) =
693 get_statistics_with_limit(all_files, test_schema(), None, false)
694 .await
695 .unwrap();
696
697 assert_eq!(statistics.num_rows, Precision::Absent);
698 assert_eq!(statistics.total_byte_size, Precision::Absent);
699 assert_eq!(statistics.column_statistics.len(), 1);
700 assert_eq!(
701 statistics.column_statistics[0].null_count,
702 Precision::Absent
703 );
704 assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
705 assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
706 assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
707 assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
708 }
709
710 #[tokio::test]
711 #[expect(deprecated)]
712 async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() {
713 let all_files = stream::iter(vec![
714 Ok((
715 PartitionedFile::new("first.parquet", 10),
716 make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)),
717 )),
718 Ok((
719 PartitionedFile::new("second.parquet", 20),
720 make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)),
721 )),
722 Ok((
723 PartitionedFile::new("third.parquet", 30),
724 make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)),
725 )),
726 ]);
727
728 let (files, statistics) =
729 get_statistics_with_limit(all_files, test_schema(), Some(4), false)
730 .await
731 .unwrap();
732
733 assert_eq!(files.len(), 2);
734 assert_eq!(statistics.num_rows, Precision::Absent);
735 assert_eq!(statistics.total_byte_size, Precision::Absent);
736 }
737
738 #[tokio::test]
739 #[expect(deprecated)]
740 async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
741 let all_files = stream::iter(vec![
742 Ok((
743 PartitionedFile::new("first.parquet", 10),
744 make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
745 )),
746 Ok((
747 PartitionedFile::new("second.parquet", 20),
748 make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
749 )),
750 ]);
751
752 let (_files, statistics) =
753 get_statistics_with_limit(all_files, test_schema(), None, true)
754 .await
755 .unwrap();
756
757 assert_eq!(statistics.num_rows, Precision::Exact(15));
758 assert_eq!(statistics.total_byte_size, Precision::Exact(150));
759 assert_eq!(
760 statistics.column_statistics[0].null_count,
761 Precision::Exact(3)
762 );
763 assert_eq!(
764 statistics.column_statistics[0].min_value,
765 Precision::Exact(ScalarValue::Int64(Some(1)))
766 );
767 assert_eq!(
768 statistics.column_statistics[0].max_value,
769 Precision::Exact(ScalarValue::Int64(Some(99)))
770 );
771 assert_eq!(
772 statistics.column_statistics[0].sum_value,
773 Precision::Exact(ScalarValue::Int64(Some(315)))
774 );
775 assert_eq!(
776 statistics.column_statistics[0].byte_size,
777 Precision::Exact(192)
778 );
779 }
780
781 #[tokio::test]
782 #[expect(deprecated)]
783 async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
784 let all_files = stream::iter(vec![
785 Ok((
786 PartitionedFile::new("first.parquet", 10),
787 make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
788 )),
789 Ok((
790 PartitionedFile::new("second.parquet", 20),
791 make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
792 )),
793 Ok((
794 PartitionedFile::new("third.parquet", 20),
795 make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
796 )),
797 ]);
798
799 let (files, statistics) =
800 get_statistics_with_limit(all_files, test_schema(), Some(8), true)
801 .await
802 .unwrap();
803
804 assert_eq!(files.len(), 2);
805 assert_eq!(statistics.num_rows, Precision::Inexact(10));
806 assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
807 assert_eq!(
808 statistics.column_statistics[0].min_value,
809 Precision::Inexact(ScalarValue::Int64(Some(1)))
810 );
811 assert_eq!(
812 statistics.column_statistics[0].max_value,
813 Precision::Inexact(ScalarValue::Int64(Some(10)))
814 );
815 assert_eq!(
816 statistics.column_statistics[0].sum_value,
817 Precision::Inexact(ScalarValue::Int64(Some(55)))
818 );
819 assert_eq!(
820 statistics.column_statistics[0].byte_size,
821 Precision::Inexact(128)
822 );
823 }
824
825 #[test]
826 fn test_compute_file_group_statistics_uses_max_ndv_fallback() -> Result<()> {
827 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
828 let file_group = FileGroup::new(vec![
829 file_with_stats("f1.parquet", utf8_file_stats(5, "a", "x")),
830 file_with_stats("f2.parquet", utf8_file_stats(8, "b", "z")),
831 ]);
832
833 let file_group =
834 compute_file_group_statistics(file_group, Arc::clone(&schema), true)?;
835 let stats = file_group.file_statistics(None).unwrap();
836
837 assert_eq!(
838 stats.column_statistics[0].distinct_count,
839 Precision::Inexact(8)
840 );
841 assert_eq!(
842 stats.column_statistics[0].min_value,
843 Precision::Exact(ScalarValue::Utf8(Some("a".to_string())))
844 );
845 assert_eq!(
846 stats.column_statistics[0].max_value,
847 Precision::Exact(ScalarValue::Utf8(Some("z".to_string())))
848 );
849
850 Ok(())
851 }
852
853 #[test]
854 fn test_compute_all_files_statistics_uses_max_ndv_fallback() -> Result<()> {
855 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
856 let file_groups = vec![
857 FileGroup::new(vec![
858 file_with_stats("f1.parquet", utf8_file_stats(5, "a", "x")),
859 file_with_stats("f2.parquet", utf8_file_stats(8, "b", "z")),
860 ]),
861 FileGroup::new(vec![
862 file_with_stats("f3.parquet", utf8_file_stats(3, "c", "w")),
863 file_with_stats("f4.parquet", utf8_file_stats(6, "d", "y")),
864 ]),
865 ];
866
867 let (file_groups, stats) =
868 compute_all_files_statistics(file_groups, schema, true, false)?;
869
870 assert_eq!(
871 file_groups[0]
872 .file_statistics(None)
873 .unwrap()
874 .column_statistics[0]
875 .distinct_count,
876 Precision::Inexact(8)
877 );
878 assert_eq!(
879 file_groups[1]
880 .file_statistics(None)
881 .unwrap()
882 .column_statistics[0]
883 .distinct_count,
884 Precision::Inexact(6)
885 );
886 assert_eq!(
887 stats.column_statistics[0].distinct_count,
888 Precision::Inexact(8)
889 );
890
891 Ok(())
892 }
893}