Skip to main content

datafusion_datasource/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Use statistics to optimize physical planning.
19//!
20//! Currently, this module houses code to sort file groups if they are non-overlapping with
21//! respect to the required sort order. See [`MinMaxStatistics`]
22
23use std::sync::Arc;
24
25use crate::PartitionedFile;
26use crate::file_groups::FileGroup;
27
28use arrow::array::RecordBatch;
29use arrow::compute::SortColumn;
30use arrow::datatypes::SchemaRef;
31use arrow::row::{Row, Rows};
32use datafusion_common::stats::{NdvFallback, Precision};
33use datafusion_common::{
34    DataFusionError, Result, ScalarValue, plan_datafusion_err, plan_err,
35};
36use datafusion_physical_expr::expressions::Column;
37use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
38use datafusion_physical_plan::{ColumnStatistics, Statistics};
39
40use futures::{Stream, StreamExt};
41
42/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
43/// The min/max values are ordered by [`Self::sort_order`].
44/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped.
45pub(crate) struct MinMaxStatistics {
46    min_by_sort_order: Rows,
47    max_by_sort_order: Rows,
48    sort_order: LexOrdering,
49}
50
51impl MinMaxStatistics {
52    /// Sort order used to sort the statistics
53    #[expect(unused)]
54    pub fn sort_order(&self) -> &LexOrdering {
55        &self.sort_order
56    }
57
58    /// Min value at index
59    #[expect(unused)]
60    pub fn min(&'_ self, idx: usize) -> Row<'_> {
61        self.min_by_sort_order.row(idx)
62    }
63
64    /// Max value at index
65    pub fn max(&'_ self, idx: usize) -> Row<'_> {
66        self.max_by_sort_order.row(idx)
67    }
68
69    pub fn new_from_files<'a>(
70        projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
71        projected_schema: &SchemaRef,       // Projected schema
72        projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
73        files: impl IntoIterator<Item = &'a PartitionedFile>,
74    ) -> Result<Self> {
75        let Some(statistics_and_partition_values) = files
76            .into_iter()
77            .map(|file| {
78                file.statistics
79                    .as_ref()
80                    .zip(Some(file.partition_values.as_slice()))
81            })
82            .collect::<Option<Vec<_>>>()
83        else {
84            return plan_err!("Parquet file missing statistics");
85        };
86
87        // Helper function to get min/max statistics for a given column of projected_schema
88        let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
89            Ok(statistics_and_partition_values
90                .iter()
91                .map(|(s, pv)| {
92                    if i < s.column_statistics.len() {
93                        s.column_statistics[i]
94                            .min_value
95                            .get_value()
96                            .cloned()
97                            .zip(s.column_statistics[i].max_value.get_value().cloned())
98                            .ok_or_else(|| plan_datafusion_err!("statistics not found"))
99                    } else {
100                        let partition_value = &pv[i - s.column_statistics.len()];
101                        Ok((partition_value.clone(), partition_value.clone()))
102                    }
103                })
104                .collect::<Result<Vec<_>>>()?
105                .into_iter()
106                .unzip())
107        };
108
109        let Some(sort_columns) =
110            sort_columns_from_physical_sort_exprs(projected_sort_order)
111        else {
112            return plan_err!("sort expression must be on column");
113        };
114
115        // Project the schema & sort order down to just the relevant columns
116        let min_max_schema = Arc::new(
117            projected_schema
118                .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
119        );
120
121        let min_max_sort_order = projected_sort_order
122            .iter()
123            .zip(sort_columns.iter())
124            .enumerate()
125            .map(|(idx, (sort_expr, col))| {
126                let expr = Arc::new(Column::new(col.name(), idx));
127                PhysicalSortExpr::new(expr, sort_expr.options)
128            });
129        // Safe to `unwrap` as we know that sort columns are non-empty:
130        let min_max_sort_order = LexOrdering::new(min_max_sort_order).unwrap();
131
132        let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
133            .iter()
134            .map(|c| {
135                // Reverse the projection to get the index of the column in the full statistics
136                // The file statistics contains _every_ column , but the sort column's index()
137                // refers to the index in projected_schema
138                let i = projection
139                    .map(|p| p[c.index()])
140                    .unwrap_or_else(|| c.index());
141
142                let (min, max) = get_min_max(i).map_err(|e| {
143                    e.context(format!("get min/max for column: '{}'", c.name()))
144                })?;
145                Ok((
146                    ScalarValue::iter_to_array(min)?,
147                    ScalarValue::iter_to_array(max)?,
148                ))
149            })
150            .collect::<Result<Vec<_>>>()
151            .map_err(|e| e.context("collect min/max values"))?
152            .into_iter()
153            .unzip();
154
155        let min_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), min_values)
156            .map_err(|e| {
157                DataFusionError::ArrowError(
158                    Box::new(e),
159                    Some("\ncreate min batch".to_string()),
160                )
161            })?;
162        let max_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), max_values)
163            .map_err(|e| {
164                DataFusionError::ArrowError(
165                    Box::new(e),
166                    Some("\ncreate max batch".to_string()),
167                )
168            })?;
169
170        Self::new(&min_max_sort_order, &min_max_schema, min_batch, max_batch)
171    }
172
173    #[expect(clippy::needless_pass_by_value)]
174    pub fn new(
175        sort_order: &LexOrdering,
176        schema: &SchemaRef,
177        min_values: RecordBatch,
178        max_values: RecordBatch,
179    ) -> Result<Self> {
180        use arrow::row::*;
181
182        let sort_fields = sort_order
183            .iter()
184            .map(|expr| {
185                expr.expr
186                    .data_type(schema)
187                    .map(|data_type| SortField::new_with_options(data_type, expr.options))
188            })
189            .collect::<Result<Vec<_>>>()
190            .map_err(|e| e.context("create sort fields"))?;
191        let converter = RowConverter::new(sort_fields)?;
192
193        let Some(sort_columns) = sort_columns_from_physical_sort_exprs(sort_order) else {
194            return plan_err!("sort expression must be on column");
195        };
196
197        // swap min/max if they're reversed in the ordering
198        let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
199            .iter()
200            .zip(sort_columns.iter().copied())
201            .map(|(sort_expr, column)| {
202                let maxes = max_values.column_by_name(column.name());
203                let mins = min_values.column_by_name(column.name());
204                let opt_value = if sort_expr.options.descending {
205                    maxes.zip(mins)
206                } else {
207                    mins.zip(maxes)
208                };
209                opt_value.ok_or_else(|| {
210                    plan_datafusion_err!(
211                        "missing column in MinMaxStatistics::new: '{}'",
212                        column.name()
213                    )
214                })
215            })
216            .collect::<Result<Vec<_>>>()?
217            .into_iter()
218            .unzip();
219
220        let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
221            let values = RecordBatch::try_new(
222                min_values.schema(),
223                cols.into_iter().cloned().collect(),
224            )?;
225            let sorting_columns = sort_order
226                .iter()
227                .zip(sort_columns.iter().copied())
228                .map(|(sort_expr, column)| {
229                    let schema = values.schema();
230                    let idx = schema.index_of(column.name())?;
231
232                    Ok(SortColumn {
233                        values: Arc::clone(values.column(idx)),
234                        options: Some(sort_expr.options),
235                    })
236                })
237                .collect::<Result<Vec<_>>>()
238                .map_err(|e| e.context("create sorting columns"))?;
239            converter
240                .convert_columns(
241                    &sorting_columns
242                        .into_iter()
243                        .map(|c| c.values)
244                        .collect::<Vec<_>>(),
245                )
246                .map_err(|e| {
247                    DataFusionError::ArrowError(
248                        Box::new(e),
249                        Some("convert columns".to_string()),
250                    )
251                })
252        });
253
254        Ok(Self {
255            min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
256            max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
257            sort_order: sort_order.clone(),
258        })
259    }
260
261    /// Return a sorted list of the min statistics together with the original indices
262    pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
263        let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
264        sort.sort_unstable_by_key(|(_, row)| *row);
265        sort
266    }
267
268    /// Check if the min/max statistics are in order and non-overlapping
269    /// (or touching at boundaries)
270    pub fn is_sorted(&self) -> bool {
271        self.max_by_sort_order
272            .iter()
273            .zip(self.min_by_sort_order.iter().skip(1))
274            .all(|(max, next_min)| max <= next_min)
275    }
276}
277
278fn sort_columns_from_physical_sort_exprs(
279    sort_order: &LexOrdering,
280) -> Option<Vec<&Column>> {
281    sort_order
282        .iter()
283        .map(|expr| expr.expr.downcast_ref::<Column>())
284        .collect()
285}
286
287fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) {
288    summary_statistics.num_rows = file_stats.num_rows;
289    summary_statistics.total_byte_size = file_stats.total_byte_size;
290
291    for (summary_col_stats, file_col_stats) in summary_statistics
292        .column_statistics
293        .iter_mut()
294        .zip(file_stats.column_statistics.iter())
295    {
296        summary_col_stats.null_count = file_col_stats.null_count;
297        summary_col_stats.max_value = file_col_stats.max_value.clone();
298        summary_col_stats.min_value = file_col_stats.min_value.clone();
299        summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type();
300        summary_col_stats.byte_size = file_col_stats.byte_size;
301    }
302}
303
304fn merge_summary_statistics(
305    summary_statistics: &mut Statistics,
306    file_stats: &Statistics,
307) {
308    summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows);
309    summary_statistics.total_byte_size = summary_statistics
310        .total_byte_size
311        .add(&file_stats.total_byte_size);
312
313    for (summary_col_stats, file_col_stats) in summary_statistics
314        .column_statistics
315        .iter_mut()
316        .zip(file_stats.column_statistics.iter())
317    {
318        let ColumnStatistics {
319            null_count: file_nc,
320            max_value: file_max,
321            min_value: file_min,
322            sum_value: file_sum,
323            distinct_count: _,
324            byte_size: file_sbs,
325        } = file_col_stats;
326
327        summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc);
328        summary_col_stats.max_value = summary_col_stats.max_value.max(file_max);
329        summary_col_stats.min_value = summary_col_stats.min_value.min(file_min);
330        summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum);
331        summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs);
332    }
333}
334
335fn seed_first_file_statistics(
336    limit_num_rows: &mut Precision<usize>,
337    summary_statistics: &mut Statistics,
338    file_stats: &Statistics,
339    collect_stats: bool,
340) {
341    *limit_num_rows = file_stats.num_rows;
342
343    if collect_stats {
344        seed_summary_statistics(summary_statistics, file_stats);
345    }
346}
347
348fn merge_file_statistics(
349    limit_num_rows: &mut Precision<usize>,
350    summary_statistics: &mut Statistics,
351    file_stats: &Statistics,
352    collect_stats: bool,
353) {
354    *limit_num_rows = limit_num_rows.add(&file_stats.num_rows);
355
356    if collect_stats {
357        merge_summary_statistics(summary_statistics, file_stats);
358    }
359}
360
361/// Get all files as well as the file level summary statistics (no statistic for partition columns).
362/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
363/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
364/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
365///  call to `multiunzip` for constructing file level summary statistics.
366#[deprecated(
367    since = "47.0.0",
368    note = "Please use `get_files_with_limit` and  `compute_all_files_statistics` instead"
369)]
370#[cfg_attr(not(test), expect(unused))]
371pub async fn get_statistics_with_limit(
372    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
373    file_schema: SchemaRef,
374    limit: Option<usize>,
375    collect_stats: bool,
376) -> Result<(FileGroup, Statistics)> {
377    let mut result_files = FileGroup::default();
378    // These statistics can be calculated as long as at least one file provides
379    // useful information. If none of the files provides any information, then
380    // they will end up having `Precision::Absent` values. Throughout calculations,
381    // missing values will be imputed as:
382    // - zero for summations, and
383    // - neutral element for extreme points.
384    let size = file_schema.fields().len();
385    let mut summary_statistics = Statistics {
386        num_rows: Precision::Absent,
387        total_byte_size: Precision::Absent,
388        column_statistics: vec![ColumnStatistics::default(); size],
389    };
390    // Keep limit pruning separate from the returned summary so `collect_stats=false`
391    // can still stop early using known file row counts.
392    let mut limit_num_rows = Precision::<usize>::Absent;
393
394    // Fusing the stream allows us to call next safely even once it is finished.
395    let mut all_files = Box::pin(all_files.fuse());
396
397    if let Some(first_file) = all_files.next().await {
398        let (mut file, file_stats) = first_file?;
399        file.statistics = Some(Arc::clone(&file_stats));
400        result_files.push(file);
401
402        seed_first_file_statistics(
403            &mut limit_num_rows,
404            &mut summary_statistics,
405            &file_stats,
406            collect_stats,
407        );
408
409        // If the number of rows exceeds the limit, we can stop processing
410        // files. This only applies when we know the number of rows. It also
411        // currently ignores tables that have no statistics regarding the
412        // number of rows.
413        let conservative_num_rows = match limit_num_rows {
414            Precision::Exact(nr) => nr,
415            _ => usize::MIN,
416        };
417        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
418            while let Some(current) = all_files.next().await {
419                let (mut file, file_stats) = current?;
420                file.statistics = Some(Arc::clone(&file_stats));
421                result_files.push(file);
422                merge_file_statistics(
423                    &mut limit_num_rows,
424                    &mut summary_statistics,
425                    &file_stats,
426                    collect_stats,
427                );
428
429                // If the number of rows exceeds the limit, we can stop processing
430                // files. This only applies when we know the number of rows. It also
431                // currently ignores tables that have no statistics regarding the
432                // number of rows.
433                if limit_num_rows.get_value().unwrap_or(&usize::MIN)
434                    > &limit.unwrap_or(usize::MAX)
435                {
436                    break;
437                }
438            }
439        }
440    };
441
442    let mut statistics = summary_statistics;
443    if all_files.next().await.is_some() {
444        // If we still have files in the stream, it means that the limit kicked
445        // in, and the statistic could have been different had we processed the
446        // files in a different order.
447        statistics = statistics.to_inexact()
448    }
449
450    Ok((result_files, statistics))
451}
452
453/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
454///
455/// This function combines statistics from all files in the file group to create
456/// summary statistics. It handles the following aspects:
457/// - Merges row counts and byte sizes across files
458/// - Computes column-level statistics like min/max values
459/// - Maintains appropriate precision information (exact, inexact, absent)
460///
461/// # Parameters
462/// * `file_group` - The group of files to process
463/// * `file_schema` - Schema of the files
464/// * `collect_stats` - Whether to collect statistics (if false, returns original file group)
465///
466/// # Returns
467/// A new file group with summary statistics attached
468#[expect(clippy::needless_pass_by_value)]
469pub fn compute_file_group_statistics(
470    file_group: FileGroup,
471    file_schema: SchemaRef,
472    collect_stats: bool,
473) -> Result<FileGroup> {
474    if !collect_stats {
475        return Ok(file_group);
476    }
477
478    let file_group_stats = file_group.iter().filter_map(|file| {
479        let stats = file.statistics.as_ref()?;
480        Some(stats.as_ref())
481    });
482    let statistics = Statistics::try_merge_iter_with_ndv_fallback(
483        file_group_stats,
484        &file_schema,
485        NdvFallback::Max,
486    )?;
487
488    Ok(file_group.with_statistics(Arc::new(statistics)))
489}
490
491/// Computes statistics for all files across multiple file groups.
492///
493/// This function:
494/// 1. Computes statistics for each individual file group
495/// 2. Summary statistics across all file groups
496/// 3. Optionally marks statistics as inexact
497///
498/// # Parameters
499/// * `file_groups` - Vector of file groups to process
500/// * `table_schema` - Schema of the table
501/// * `collect_stats` - Whether to collect statistics
502/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
503///
504/// # Returns
505/// A tuple containing:
506/// * The processed file groups with their individual statistics attached
507/// * The summary statistics across all file groups, aka all files summary statistics
508#[expect(clippy::needless_pass_by_value)]
509pub fn compute_all_files_statistics(
510    file_groups: Vec<FileGroup>,
511    table_schema: SchemaRef,
512    collect_stats: bool,
513    inexact_stats: bool,
514) -> Result<(Vec<FileGroup>, Statistics)> {
515    let file_groups_with_stats = file_groups
516        .into_iter()
517        .map(|file_group| {
518            compute_file_group_statistics(
519                file_group,
520                Arc::clone(&table_schema),
521                collect_stats,
522            )
523        })
524        .collect::<Result<Vec<_>>>()?;
525
526    // Then summary statistics across all file groups
527    let file_groups_statistics = file_groups_with_stats
528        .iter()
529        .filter_map(|file_group| file_group.file_statistics(None));
530
531    let mut statistics = Statistics::try_merge_iter_with_ndv_fallback(
532        file_groups_statistics,
533        &table_schema,
534        NdvFallback::Max,
535    )?;
536
537    if inexact_stats {
538        statistics = statistics.to_inexact()
539    }
540
541    Ok((file_groups_with_stats, statistics))
542}
543
544#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
545pub fn add_row_stats(
546    file_num_rows: Precision<usize>,
547    num_rows: Precision<usize>,
548) -> Precision<usize> {
549    file_num_rows.add(&num_rows)
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use crate::PartitionedFile;
556    use crate::file_groups::FileGroup;
557    use arrow::datatypes::{DataType, Field, Schema};
558    use futures::stream;
559
560    fn file_stats(sum: u32) -> Statistics {
561        Statistics {
562            num_rows: Precision::Exact(1),
563            total_byte_size: Precision::Exact(4),
564            column_statistics: vec![ColumnStatistics {
565                null_count: Precision::Exact(0),
566                max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
567                min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
568                sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
569                distinct_count: Precision::Exact(1),
570                byte_size: Precision::Exact(4),
571            }],
572        }
573    }
574
575    fn test_schema() -> SchemaRef {
576        Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
577    }
578
579    fn make_file_stats(
580        num_rows: usize,
581        total_byte_size: usize,
582        col_stats: ColumnStatistics,
583    ) -> Arc<Statistics> {
584        Arc::new(Statistics {
585            num_rows: Precision::Exact(num_rows),
586            total_byte_size: Precision::Exact(total_byte_size),
587            column_statistics: vec![col_stats],
588        })
589    }
590
591    fn rich_col_stats(
592        null_count: usize,
593        min: i64,
594        max: i64,
595        sum: i64,
596        byte_size: usize,
597    ) -> ColumnStatistics {
598        ColumnStatistics {
599            null_count: Precision::Exact(null_count),
600            max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
601            min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
602            distinct_count: Precision::Absent,
603            sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
604            byte_size: Precision::Exact(byte_size),
605        }
606    }
607
608    fn utf8_file_stats(ndv: usize, min: &str, max: &str) -> Statistics {
609        Statistics {
610            num_rows: Precision::Exact(1),
611            total_byte_size: Precision::Exact(16),
612            column_statistics: vec![ColumnStatistics {
613                null_count: Precision::Exact(0),
614                max_value: Precision::Exact(ScalarValue::Utf8(Some(max.to_string()))),
615                min_value: Precision::Exact(ScalarValue::Utf8(Some(min.to_string()))),
616                sum_value: Precision::Absent,
617                distinct_count: Precision::Exact(ndv),
618                byte_size: Precision::Exact(16),
619            }],
620        }
621    }
622
623    fn file_with_stats(path: &str, stats: Statistics) -> PartitionedFile {
624        PartitionedFile::new(path, 1).with_statistics(Arc::new(stats))
625    }
626    #[tokio::test]
627    #[expect(deprecated)]
628    async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
629    -> Result<()> {
630        let schema =
631            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
632
633        let files = stream::iter(vec![Ok((
634            PartitionedFile::new("f1.parquet", 1),
635            Arc::new(file_stats(100)),
636        ))]);
637
638        let (_group, stats) =
639            get_statistics_with_limit(files, schema, None, true).await?;
640
641        assert_eq!(
642            stats.column_statistics[0].sum_value,
643            Precision::Exact(ScalarValue::UInt64(Some(100)))
644        );
645
646        Ok(())
647    }
648
649    #[tokio::test]
650    #[expect(deprecated)]
651    async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
652    -> Result<()> {
653        let schema =
654            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
655
656        let files = stream::iter(vec![
657            Ok((
658                PartitionedFile::new("f1.parquet", 1),
659                Arc::new(file_stats(100)),
660            )),
661            Ok((
662                PartitionedFile::new("f2.parquet", 1),
663                Arc::new(file_stats(200)),
664            )),
665        ]);
666
667        let (_group, stats) =
668            get_statistics_with_limit(files, schema, None, true).await?;
669
670        assert_eq!(
671            stats.column_statistics[0].sum_value,
672            Precision::Exact(ScalarValue::UInt64(Some(300)))
673        );
674
675        Ok(())
676    }
677
678    #[tokio::test]
679    #[expect(deprecated)]
680    async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
681        let all_files = stream::iter(vec![
682            Ok((
683                PartitionedFile::new("first.parquet", 10),
684                make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
685            )),
686            Ok((
687                PartitionedFile::new("second.parquet", 20),
688                make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
689            )),
690        ]);
691
692        let (_files, statistics) =
693            get_statistics_with_limit(all_files, test_schema(), None, false)
694                .await
695                .unwrap();
696
697        assert_eq!(statistics.num_rows, Precision::Absent);
698        assert_eq!(statistics.total_byte_size, Precision::Absent);
699        assert_eq!(statistics.column_statistics.len(), 1);
700        assert_eq!(
701            statistics.column_statistics[0].null_count,
702            Precision::Absent
703        );
704        assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
705        assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
706        assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
707        assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
708    }
709
710    #[tokio::test]
711    #[expect(deprecated)]
712    async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() {
713        let all_files = stream::iter(vec![
714            Ok((
715                PartitionedFile::new("first.parquet", 10),
716                make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)),
717            )),
718            Ok((
719                PartitionedFile::new("second.parquet", 20),
720                make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)),
721            )),
722            Ok((
723                PartitionedFile::new("third.parquet", 30),
724                make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)),
725            )),
726        ]);
727
728        let (files, statistics) =
729            get_statistics_with_limit(all_files, test_schema(), Some(4), false)
730                .await
731                .unwrap();
732
733        assert_eq!(files.len(), 2);
734        assert_eq!(statistics.num_rows, Precision::Absent);
735        assert_eq!(statistics.total_byte_size, Precision::Absent);
736    }
737
738    #[tokio::test]
739    #[expect(deprecated)]
740    async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
741        let all_files = stream::iter(vec![
742            Ok((
743                PartitionedFile::new("first.parquet", 10),
744                make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
745            )),
746            Ok((
747                PartitionedFile::new("second.parquet", 20),
748                make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
749            )),
750        ]);
751
752        let (_files, statistics) =
753            get_statistics_with_limit(all_files, test_schema(), None, true)
754                .await
755                .unwrap();
756
757        assert_eq!(statistics.num_rows, Precision::Exact(15));
758        assert_eq!(statistics.total_byte_size, Precision::Exact(150));
759        assert_eq!(
760            statistics.column_statistics[0].null_count,
761            Precision::Exact(3)
762        );
763        assert_eq!(
764            statistics.column_statistics[0].min_value,
765            Precision::Exact(ScalarValue::Int64(Some(1)))
766        );
767        assert_eq!(
768            statistics.column_statistics[0].max_value,
769            Precision::Exact(ScalarValue::Int64(Some(99)))
770        );
771        assert_eq!(
772            statistics.column_statistics[0].sum_value,
773            Precision::Exact(ScalarValue::Int64(Some(315)))
774        );
775        assert_eq!(
776            statistics.column_statistics[0].byte_size,
777            Precision::Exact(192)
778        );
779    }
780
781    #[tokio::test]
782    #[expect(deprecated)]
783    async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
784        let all_files = stream::iter(vec![
785            Ok((
786                PartitionedFile::new("first.parquet", 10),
787                make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
788            )),
789            Ok((
790                PartitionedFile::new("second.parquet", 20),
791                make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
792            )),
793            Ok((
794                PartitionedFile::new("third.parquet", 20),
795                make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
796            )),
797        ]);
798
799        let (files, statistics) =
800            get_statistics_with_limit(all_files, test_schema(), Some(8), true)
801                .await
802                .unwrap();
803
804        assert_eq!(files.len(), 2);
805        assert_eq!(statistics.num_rows, Precision::Inexact(10));
806        assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
807        assert_eq!(
808            statistics.column_statistics[0].min_value,
809            Precision::Inexact(ScalarValue::Int64(Some(1)))
810        );
811        assert_eq!(
812            statistics.column_statistics[0].max_value,
813            Precision::Inexact(ScalarValue::Int64(Some(10)))
814        );
815        assert_eq!(
816            statistics.column_statistics[0].sum_value,
817            Precision::Inexact(ScalarValue::Int64(Some(55)))
818        );
819        assert_eq!(
820            statistics.column_statistics[0].byte_size,
821            Precision::Inexact(128)
822        );
823    }
824
825    #[test]
826    fn test_compute_file_group_statistics_uses_max_ndv_fallback() -> Result<()> {
827        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
828        let file_group = FileGroup::new(vec![
829            file_with_stats("f1.parquet", utf8_file_stats(5, "a", "x")),
830            file_with_stats("f2.parquet", utf8_file_stats(8, "b", "z")),
831        ]);
832
833        let file_group =
834            compute_file_group_statistics(file_group, Arc::clone(&schema), true)?;
835        let stats = file_group.file_statistics(None).unwrap();
836
837        assert_eq!(
838            stats.column_statistics[0].distinct_count,
839            Precision::Inexact(8)
840        );
841        assert_eq!(
842            stats.column_statistics[0].min_value,
843            Precision::Exact(ScalarValue::Utf8(Some("a".to_string())))
844        );
845        assert_eq!(
846            stats.column_statistics[0].max_value,
847            Precision::Exact(ScalarValue::Utf8(Some("z".to_string())))
848        );
849
850        Ok(())
851    }
852
853    #[test]
854    fn test_compute_all_files_statistics_uses_max_ndv_fallback() -> Result<()> {
855        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
856        let file_groups = vec![
857            FileGroup::new(vec![
858                file_with_stats("f1.parquet", utf8_file_stats(5, "a", "x")),
859                file_with_stats("f2.parquet", utf8_file_stats(8, "b", "z")),
860            ]),
861            FileGroup::new(vec![
862                file_with_stats("f3.parquet", utf8_file_stats(3, "c", "w")),
863                file_with_stats("f4.parquet", utf8_file_stats(6, "d", "y")),
864            ]),
865        ];
866
867        let (file_groups, stats) =
868            compute_all_files_statistics(file_groups, schema, true, false)?;
869
870        assert_eq!(
871            file_groups[0]
872                .file_statistics(None)
873                .unwrap()
874                .column_statistics[0]
875                .distinct_count,
876            Precision::Inexact(8)
877        );
878        assert_eq!(
879            file_groups[1]
880                .file_statistics(None)
881                .unwrap()
882                .column_statistics[0]
883                .distinct_count,
884            Precision::Inexact(6)
885        );
886        assert_eq!(
887            stats.column_statistics[0].distinct_count,
888            Precision::Inexact(8)
889        );
890
891        Ok(())
892    }
893}