datafusion_datasource/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Use statistics to optimize physical planning.
19//!
20//! Currently, this module houses code to sort file groups if they are non-overlapping with
21//! respect to the required sort order. See [`MinMaxStatistics`]
22
23use futures::{Stream, StreamExt};
24use std::sync::Arc;
25
26use crate::file_groups::FileGroup;
27use crate::PartitionedFile;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::SchemaRef;
31use arrow::{
32    compute::SortColumn,
33    row::{Row, Rows},
34};
35use datafusion_common::stats::Precision;
36use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
37use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
38use datafusion_physical_expr_common::sort_expr::LexOrdering;
39use datafusion_physical_plan::{ColumnStatistics, Statistics};
40
41/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
42/// The min/max values are ordered by [`Self::sort_order`].
43/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped.
44pub(crate) struct MinMaxStatistics {
45    min_by_sort_order: Rows,
46    max_by_sort_order: Rows,
47    sort_order: LexOrdering,
48}
49
50impl MinMaxStatistics {
51    /// Sort order used to sort the statistics
52    #[allow(unused)]
53    pub fn sort_order(&self) -> &LexOrdering {
54        &self.sort_order
55    }
56
57    /// Min value at index
58    #[allow(unused)]
59    pub fn min(&self, idx: usize) -> Row {
60        self.min_by_sort_order.row(idx)
61    }
62
63    /// Max value at index
64    pub fn max(&self, idx: usize) -> Row {
65        self.max_by_sort_order.row(idx)
66    }
67
68    pub fn new_from_files<'a>(
69        projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
70        projected_schema: &SchemaRef,       // Projected schema
71        projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
72        files: impl IntoIterator<Item = &'a PartitionedFile>,
73    ) -> Result<Self> {
74        use datafusion_common::ScalarValue;
75
76        let statistics_and_partition_values = files
77            .into_iter()
78            .map(|file| {
79                file.statistics
80                    .as_ref()
81                    .zip(Some(file.partition_values.as_slice()))
82            })
83            .collect::<Option<Vec<_>>>()
84            .ok_or_else(|| {
85                DataFusionError::Plan("Parquet file missing statistics".to_string())
86            })?;
87
88        // Helper function to get min/max statistics for a given column of projected_schema
89        let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
90            Ok(statistics_and_partition_values
91                .iter()
92                .map(|(s, pv)| {
93                    if i < s.column_statistics.len() {
94                        s.column_statistics[i]
95                            .min_value
96                            .get_value()
97                            .cloned()
98                            .zip(s.column_statistics[i].max_value.get_value().cloned())
99                            .ok_or_else(|| {
100                                DataFusionError::Plan("statistics not found".to_string())
101                            })
102                    } else {
103                        let partition_value = &pv[i - s.column_statistics.len()];
104                        Ok((partition_value.clone(), partition_value.clone()))
105                    }
106                })
107                .collect::<Result<Vec<_>>>()?
108                .into_iter()
109                .unzip())
110        };
111
112        let sort_columns = sort_columns_from_physical_sort_exprs(projected_sort_order)
113            .ok_or(DataFusionError::Plan(
114                "sort expression must be on column".to_string(),
115            ))?;
116
117        // Project the schema & sort order down to just the relevant columns
118        let min_max_schema = Arc::new(
119            projected_schema
120                .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
121        );
122        let min_max_sort_order = LexOrdering::from(
123            sort_columns
124                .iter()
125                .zip(projected_sort_order.iter())
126                .enumerate()
127                .map(|(i, (col, sort))| PhysicalSortExpr {
128                    expr: Arc::new(Column::new(col.name(), i)),
129                    options: sort.options,
130                })
131                .collect::<Vec<_>>(),
132        );
133
134        let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
135            .iter()
136            .map(|c| {
137                // Reverse the projection to get the index of the column in the full statistics
138                // The file statistics contains _every_ column , but the sort column's index()
139                // refers to the index in projected_schema
140                let i = projection.map(|p| p[c.index()]).unwrap_or(c.index());
141
142                let (min, max) = get_min_max(i).map_err(|e| {
143                    e.context(format!("get min/max for column: '{}'", c.name()))
144                })?;
145                Ok((
146                    ScalarValue::iter_to_array(min)?,
147                    ScalarValue::iter_to_array(max)?,
148                ))
149            })
150            .collect::<Result<Vec<_>>>()
151            .map_err(|e| e.context("collect min/max values"))?
152            .into_iter()
153            .unzip();
154
155        Self::new(
156            &min_max_sort_order,
157            &min_max_schema,
158            RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
159                |e| {
160                    DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string()))
161                },
162            )?,
163            RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err(
164                |e| {
165                    DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string()))
166                },
167            )?,
168        )
169    }
170
171    pub fn new(
172        sort_order: &LexOrdering,
173        schema: &SchemaRef,
174        min_values: RecordBatch,
175        max_values: RecordBatch,
176    ) -> Result<Self> {
177        use arrow::row::*;
178
179        let sort_fields = sort_order
180            .iter()
181            .map(|expr| {
182                expr.expr
183                    .data_type(schema)
184                    .map(|data_type| SortField::new_with_options(data_type, expr.options))
185            })
186            .collect::<Result<Vec<_>>>()
187            .map_err(|e| e.context("create sort fields"))?;
188        let converter = RowConverter::new(sort_fields)?;
189
190        let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or(
191            DataFusionError::Plan("sort expression must be on column".to_string()),
192        )?;
193
194        // swap min/max if they're reversed in the ordering
195        let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
196            .iter()
197            .zip(sort_columns.iter().copied())
198            .map(|(sort_expr, column)| {
199                if sort_expr.options.descending {
200                    max_values
201                        .column_by_name(column.name())
202                        .zip(min_values.column_by_name(column.name()))
203                } else {
204                    min_values
205                        .column_by_name(column.name())
206                        .zip(max_values.column_by_name(column.name()))
207                }
208                .ok_or_else(|| {
209                    plan_datafusion_err!(
210                        "missing column in MinMaxStatistics::new: '{}'",
211                        column.name()
212                    )
213                })
214            })
215            .collect::<Result<Vec<_>>>()?
216            .into_iter()
217            .unzip();
218
219        let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
220            let values = RecordBatch::try_new(
221                min_values.schema(),
222                cols.into_iter().cloned().collect(),
223            )?;
224            let sorting_columns = sort_order
225                .iter()
226                .zip(sort_columns.iter().copied())
227                .map(|(sort_expr, column)| {
228                    let schema = values.schema();
229
230                    let idx = schema.index_of(column.name())?;
231                    let field = schema.field(idx);
232
233                    // check that sort columns are non-nullable
234                    if field.is_nullable() {
235                        return plan_err!("cannot sort by nullable column");
236                    }
237
238                    Ok(SortColumn {
239                        values: Arc::clone(values.column(idx)),
240                        options: Some(sort_expr.options),
241                    })
242                })
243                .collect::<Result<Vec<_>>>()
244                .map_err(|e| e.context("create sorting columns"))?;
245            converter
246                .convert_columns(
247                    &sorting_columns
248                        .into_iter()
249                        .map(|c| c.values)
250                        .collect::<Vec<_>>(),
251                )
252                .map_err(|e| {
253                    DataFusionError::ArrowError(e, Some("convert columns".to_string()))
254                })
255        });
256
257        Ok(Self {
258            min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
259            max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
260            sort_order: sort_order.clone(),
261        })
262    }
263
264    /// Return a sorted list of the min statistics together with the original indices
265    pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
266        let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
267        sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
268        sort
269    }
270
271    /// Check if the min/max statistics are in order and non-overlapping
272    pub fn is_sorted(&self) -> bool {
273        self.max_by_sort_order
274            .iter()
275            .zip(self.min_by_sort_order.iter().skip(1))
276            .all(|(max, next_min)| max < next_min)
277    }
278}
279
280fn sort_columns_from_physical_sort_exprs(
281    sort_order: &LexOrdering,
282) -> Option<Vec<&Column>> {
283    sort_order
284        .iter()
285        .map(|expr| expr.expr.as_any().downcast_ref::<Column>())
286        .collect::<Option<Vec<_>>>()
287}
288
289/// Get all files as well as the file level summary statistics (no statistic for partition columns).
290/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
291/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
292/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
293///  call to `multiunzip` for constructing file level summary statistics.
294#[deprecated(
295    since = "47.0.0",
296    note = "Please use `get_files_with_limit` and  `compute_all_files_statistics` instead"
297)]
298#[allow(unused)]
299pub async fn get_statistics_with_limit(
300    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
301    file_schema: SchemaRef,
302    limit: Option<usize>,
303    collect_stats: bool,
304) -> Result<(FileGroup, Statistics)> {
305    let mut result_files = FileGroup::default();
306    // These statistics can be calculated as long as at least one file provides
307    // useful information. If none of the files provides any information, then
308    // they will end up having `Precision::Absent` values. Throughout calculations,
309    // missing values will be imputed as:
310    // - zero for summations, and
311    // - neutral element for extreme points.
312    let size = file_schema.fields().len();
313    let mut col_stats_set = vec![ColumnStatistics::default(); size];
314    let mut num_rows = Precision::<usize>::Absent;
315    let mut total_byte_size = Precision::<usize>::Absent;
316
317    // Fusing the stream allows us to call next safely even once it is finished.
318    let mut all_files = Box::pin(all_files.fuse());
319
320    if let Some(first_file) = all_files.next().await {
321        let (mut file, file_stats) = first_file?;
322        file.statistics = Some(Arc::clone(&file_stats));
323        result_files.push(file);
324
325        // First file, we set them directly from the file statistics.
326        num_rows = file_stats.num_rows;
327        total_byte_size = file_stats.total_byte_size;
328        for (index, file_column) in
329            file_stats.column_statistics.clone().into_iter().enumerate()
330        {
331            col_stats_set[index].null_count = file_column.null_count;
332            col_stats_set[index].max_value = file_column.max_value;
333            col_stats_set[index].min_value = file_column.min_value;
334            col_stats_set[index].sum_value = file_column.sum_value;
335        }
336
337        // If the number of rows exceeds the limit, we can stop processing
338        // files. This only applies when we know the number of rows. It also
339        // currently ignores tables that have no statistics regarding the
340        // number of rows.
341        let conservative_num_rows = match num_rows {
342            Precision::Exact(nr) => nr,
343            _ => usize::MIN,
344        };
345        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
346            while let Some(current) = all_files.next().await {
347                let (mut file, file_stats) = current?;
348                file.statistics = Some(Arc::clone(&file_stats));
349                result_files.push(file);
350                if !collect_stats {
351                    continue;
352                }
353
354                // We accumulate the number of rows, total byte size and null
355                // counts across all the files in question. If any file does not
356                // provide any information or provides an inexact value, we demote
357                // the statistic precision to inexact.
358                num_rows = num_rows.add(&file_stats.num_rows);
359
360                total_byte_size = total_byte_size.add(&file_stats.total_byte_size);
361
362                for (file_col_stats, col_stats) in file_stats
363                    .column_statistics
364                    .iter()
365                    .zip(col_stats_set.iter_mut())
366                {
367                    let ColumnStatistics {
368                        null_count: file_nc,
369                        max_value: file_max,
370                        min_value: file_min,
371                        sum_value: file_sum,
372                        distinct_count: _,
373                    } = file_col_stats;
374
375                    col_stats.null_count = col_stats.null_count.add(file_nc);
376                    col_stats.max_value = col_stats.max_value.max(file_max);
377                    col_stats.min_value = col_stats.min_value.min(file_min);
378                    col_stats.sum_value = col_stats.sum_value.add(file_sum);
379                }
380
381                // If the number of rows exceeds the limit, we can stop processing
382                // files. This only applies when we know the number of rows. It also
383                // currently ignores tables that have no statistics regarding the
384                // number of rows.
385                if num_rows.get_value().unwrap_or(&usize::MIN)
386                    > &limit.unwrap_or(usize::MAX)
387                {
388                    break;
389                }
390            }
391        }
392    };
393
394    let mut statistics = Statistics {
395        num_rows,
396        total_byte_size,
397        column_statistics: col_stats_set,
398    };
399    if all_files.next().await.is_some() {
400        // If we still have files in the stream, it means that the limit kicked
401        // in, and the statistic could have been different had we processed the
402        // files in a different order.
403        statistics = statistics.to_inexact()
404    }
405
406    Ok((result_files, statistics))
407}
408
409/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
410///
411/// This function combines statistics from all files in the file group to create
412/// summary statistics. It handles the following aspects:
413/// - Merges row counts and byte sizes across files
414/// - Computes column-level statistics like min/max values
415/// - Maintains appropriate precision information (exact, inexact, absent)
416///
417/// # Parameters
418/// * `file_group` - The group of files to process
419/// * `file_schema` - Schema of the files
420/// * `collect_stats` - Whether to collect statistics (if false, returns original file group)
421///
422/// # Returns
423/// A new file group with summary statistics attached
424pub fn compute_file_group_statistics(
425    file_group: FileGroup,
426    file_schema: SchemaRef,
427    collect_stats: bool,
428) -> Result<FileGroup> {
429    if !collect_stats {
430        return Ok(file_group);
431    }
432
433    let file_group_stats = file_group.iter().filter_map(|file| {
434        let stats = file.statistics.as_ref()?;
435        Some(stats.as_ref())
436    });
437    let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?;
438
439    Ok(file_group.with_statistics(Arc::new(statistics)))
440}
441
442/// Computes statistics for all files across multiple file groups.
443///
444/// This function:
445/// 1. Computes statistics for each individual file group
446/// 2. Summary statistics across all file groups
447/// 3. Optionally marks statistics as inexact
448///
449/// # Parameters
450/// * `file_groups` - Vector of file groups to process
451/// * `table_schema` - Schema of the table
452/// * `collect_stats` - Whether to collect statistics
453/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
454///
455/// # Returns
456/// A tuple containing:
457/// * The processed file groups with their individual statistics attached
458/// * The summary statistics across all file groups, aka all files summary statistics
459pub fn compute_all_files_statistics(
460    file_groups: Vec<FileGroup>,
461    table_schema: SchemaRef,
462    collect_stats: bool,
463    inexact_stats: bool,
464) -> Result<(Vec<FileGroup>, Statistics)> {
465    let file_groups_with_stats = file_groups
466        .into_iter()
467        .map(|file_group| {
468            compute_file_group_statistics(
469                file_group,
470                Arc::clone(&table_schema),
471                collect_stats,
472            )
473        })
474        .collect::<Result<Vec<_>>>()?;
475
476    // Then summary statistics across all file groups
477    let file_groups_statistics = file_groups_with_stats
478        .iter()
479        .filter_map(|file_group| file_group.statistics());
480
481    let mut statistics =
482        Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;
483
484    if inexact_stats {
485        statistics = statistics.to_inexact()
486    }
487
488    Ok((file_groups_with_stats, statistics))
489}
490
491#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
492pub fn add_row_stats(
493    file_num_rows: Precision<usize>,
494    num_rows: Precision<usize>,
495) -> Precision<usize> {
496    file_num_rows.add(&num_rows)
497}