datafusion_datasource/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Use statistics to optimize physical planning.
19//!
20//! Currently, this module houses code to sort file groups if they are non-overlapping with
21//! respect to the required sort order. See [`MinMaxStatistics`]
22
23use futures::{Stream, StreamExt};
24use std::sync::Arc;
25
26use crate::file_groups::FileGroup;
27use crate::PartitionedFile;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::SchemaRef;
31use arrow::{
32    compute::SortColumn,
33    row::{Row, Rows},
34};
35use datafusion_common::stats::Precision;
36use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
37use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
38use datafusion_physical_expr_common::sort_expr::LexOrdering;
39use datafusion_physical_plan::{ColumnStatistics, Statistics};
40
41/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
42/// The min/max values are ordered by [`Self::sort_order`].
43/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped.
44pub(crate) struct MinMaxStatistics {
45    min_by_sort_order: Rows,
46    max_by_sort_order: Rows,
47    sort_order: LexOrdering,
48}
49
50impl MinMaxStatistics {
51    /// Sort order used to sort the statistics
52    #[allow(unused)]
53    pub fn sort_order(&self) -> &LexOrdering {
54        &self.sort_order
55    }
56
57    /// Min value at index
58    #[allow(unused)]
59    pub fn min(&self, idx: usize) -> Row {
60        self.min_by_sort_order.row(idx)
61    }
62
63    /// Max value at index
64    pub fn max(&self, idx: usize) -> Row {
65        self.max_by_sort_order.row(idx)
66    }
67
68    pub fn new_from_files<'a>(
69        projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
70        projected_schema: &SchemaRef,       // Projected schema
71        projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
72        files: impl IntoIterator<Item = &'a PartitionedFile>,
73    ) -> Result<Self> {
74        use datafusion_common::ScalarValue;
75
76        let statistics_and_partition_values = files
77            .into_iter()
78            .map(|file| {
79                file.statistics
80                    .as_ref()
81                    .zip(Some(file.partition_values.as_slice()))
82            })
83            .collect::<Option<Vec<_>>>()
84            .ok_or_else(|| {
85                DataFusionError::Plan("Parquet file missing statistics".to_string())
86            })?;
87
88        // Helper function to get min/max statistics for a given column of projected_schema
89        let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
90            Ok(statistics_and_partition_values
91                .iter()
92                .map(|(s, pv)| {
93                    if i < s.column_statistics.len() {
94                        s.column_statistics[i]
95                            .min_value
96                            .get_value()
97                            .cloned()
98                            .zip(s.column_statistics[i].max_value.get_value().cloned())
99                            .ok_or_else(|| {
100                                DataFusionError::Plan("statistics not found".to_string())
101                            })
102                    } else {
103                        let partition_value = &pv[i - s.column_statistics.len()];
104                        Ok((partition_value.clone(), partition_value.clone()))
105                    }
106                })
107                .collect::<Result<Vec<_>>>()?
108                .into_iter()
109                .unzip())
110        };
111
112        let sort_columns = sort_columns_from_physical_sort_exprs(projected_sort_order)
113            .ok_or(DataFusionError::Plan(
114                "sort expression must be on column".to_string(),
115            ))?;
116
117        // Project the schema & sort order down to just the relevant columns
118        let min_max_schema = Arc::new(
119            projected_schema
120                .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
121        );
122        let min_max_sort_order = LexOrdering::from(
123            sort_columns
124                .iter()
125                .zip(projected_sort_order.iter())
126                .enumerate()
127                .map(|(i, (col, sort))| PhysicalSortExpr {
128                    expr: Arc::new(Column::new(col.name(), i)),
129                    options: sort.options,
130                })
131                .collect::<Vec<_>>(),
132        );
133
134        let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
135            .iter()
136            .map(|c| {
137                // Reverse the projection to get the index of the column in the full statistics
138                // The file statistics contains _every_ column , but the sort column's index()
139                // refers to the index in projected_schema
140                let i = projection
141                    .map(|p| p[c.index()])
142                    .unwrap_or_else(|| c.index());
143
144                let (min, max) = get_min_max(i).map_err(|e| {
145                    e.context(format!("get min/max for column: '{}'", c.name()))
146                })?;
147                Ok((
148                    ScalarValue::iter_to_array(min)?,
149                    ScalarValue::iter_to_array(max)?,
150                ))
151            })
152            .collect::<Result<Vec<_>>>()
153            .map_err(|e| e.context("collect min/max values"))?
154            .into_iter()
155            .unzip();
156
157        Self::new(
158            &min_max_sort_order,
159            &min_max_schema,
160            RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
161                |e| {
162                    DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string()))
163                },
164            )?,
165            RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err(
166                |e| {
167                    DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string()))
168                },
169            )?,
170        )
171    }
172
173    pub fn new(
174        sort_order: &LexOrdering,
175        schema: &SchemaRef,
176        min_values: RecordBatch,
177        max_values: RecordBatch,
178    ) -> Result<Self> {
179        use arrow::row::*;
180
181        let sort_fields = sort_order
182            .iter()
183            .map(|expr| {
184                expr.expr
185                    .data_type(schema)
186                    .map(|data_type| SortField::new_with_options(data_type, expr.options))
187            })
188            .collect::<Result<Vec<_>>>()
189            .map_err(|e| e.context("create sort fields"))?;
190        let converter = RowConverter::new(sort_fields)?;
191
192        let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or(
193            DataFusionError::Plan("sort expression must be on column".to_string()),
194        )?;
195
196        // swap min/max if they're reversed in the ordering
197        let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
198            .iter()
199            .zip(sort_columns.iter().copied())
200            .map(|(sort_expr, column)| {
201                if sort_expr.options.descending {
202                    max_values
203                        .column_by_name(column.name())
204                        .zip(min_values.column_by_name(column.name()))
205                } else {
206                    min_values
207                        .column_by_name(column.name())
208                        .zip(max_values.column_by_name(column.name()))
209                }
210                .ok_or_else(|| {
211                    plan_datafusion_err!(
212                        "missing column in MinMaxStatistics::new: '{}'",
213                        column.name()
214                    )
215                })
216            })
217            .collect::<Result<Vec<_>>>()?
218            .into_iter()
219            .unzip();
220
221        let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
222            let values = RecordBatch::try_new(
223                min_values.schema(),
224                cols.into_iter().cloned().collect(),
225            )?;
226            let sorting_columns = sort_order
227                .iter()
228                .zip(sort_columns.iter().copied())
229                .map(|(sort_expr, column)| {
230                    let schema = values.schema();
231
232                    let idx = schema.index_of(column.name())?;
233                    let field = schema.field(idx);
234
235                    // check that sort columns are non-nullable
236                    if field.is_nullable() {
237                        return plan_err!("cannot sort by nullable column");
238                    }
239
240                    Ok(SortColumn {
241                        values: Arc::clone(values.column(idx)),
242                        options: Some(sort_expr.options),
243                    })
244                })
245                .collect::<Result<Vec<_>>>()
246                .map_err(|e| e.context("create sorting columns"))?;
247            converter
248                .convert_columns(
249                    &sorting_columns
250                        .into_iter()
251                        .map(|c| c.values)
252                        .collect::<Vec<_>>(),
253                )
254                .map_err(|e| {
255                    DataFusionError::ArrowError(e, Some("convert columns".to_string()))
256                })
257        });
258
259        Ok(Self {
260            min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
261            max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
262            sort_order: sort_order.clone(),
263        })
264    }
265
266    /// Return a sorted list of the min statistics together with the original indices
267    pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
268        let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
269        sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
270        sort
271    }
272
273    /// Check if the min/max statistics are in order and non-overlapping
274    pub fn is_sorted(&self) -> bool {
275        self.max_by_sort_order
276            .iter()
277            .zip(self.min_by_sort_order.iter().skip(1))
278            .all(|(max, next_min)| max < next_min)
279    }
280}
281
282fn sort_columns_from_physical_sort_exprs(
283    sort_order: &LexOrdering,
284) -> Option<Vec<&Column>> {
285    sort_order
286        .iter()
287        .map(|expr| expr.expr.as_any().downcast_ref::<Column>())
288        .collect::<Option<Vec<_>>>()
289}
290
291/// Get all files as well as the file level summary statistics (no statistic for partition columns).
292/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
293/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
294/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
295///  call to `multiunzip` for constructing file level summary statistics.
296#[deprecated(
297    since = "47.0.0",
298    note = "Please use `get_files_with_limit` and  `compute_all_files_statistics` instead"
299)]
300#[allow(unused)]
301pub async fn get_statistics_with_limit(
302    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
303    file_schema: SchemaRef,
304    limit: Option<usize>,
305    collect_stats: bool,
306) -> Result<(FileGroup, Statistics)> {
307    let mut result_files = FileGroup::default();
308    // These statistics can be calculated as long as at least one file provides
309    // useful information. If none of the files provides any information, then
310    // they will end up having `Precision::Absent` values. Throughout calculations,
311    // missing values will be imputed as:
312    // - zero for summations, and
313    // - neutral element for extreme points.
314    let size = file_schema.fields().len();
315    let mut col_stats_set = vec![ColumnStatistics::default(); size];
316    let mut num_rows = Precision::<usize>::Absent;
317    let mut total_byte_size = Precision::<usize>::Absent;
318
319    // Fusing the stream allows us to call next safely even once it is finished.
320    let mut all_files = Box::pin(all_files.fuse());
321
322    if let Some(first_file) = all_files.next().await {
323        let (mut file, file_stats) = first_file?;
324        file.statistics = Some(Arc::clone(&file_stats));
325        result_files.push(file);
326
327        // First file, we set them directly from the file statistics.
328        num_rows = file_stats.num_rows;
329        total_byte_size = file_stats.total_byte_size;
330        for (index, file_column) in
331            file_stats.column_statistics.clone().into_iter().enumerate()
332        {
333            col_stats_set[index].null_count = file_column.null_count;
334            col_stats_set[index].max_value = file_column.max_value;
335            col_stats_set[index].min_value = file_column.min_value;
336            col_stats_set[index].sum_value = file_column.sum_value;
337        }
338
339        // If the number of rows exceeds the limit, we can stop processing
340        // files. This only applies when we know the number of rows. It also
341        // currently ignores tables that have no statistics regarding the
342        // number of rows.
343        let conservative_num_rows = match num_rows {
344            Precision::Exact(nr) => nr,
345            _ => usize::MIN,
346        };
347        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
348            while let Some(current) = all_files.next().await {
349                let (mut file, file_stats) = current?;
350                file.statistics = Some(Arc::clone(&file_stats));
351                result_files.push(file);
352                if !collect_stats {
353                    continue;
354                }
355
356                // We accumulate the number of rows, total byte size and null
357                // counts across all the files in question. If any file does not
358                // provide any information or provides an inexact value, we demote
359                // the statistic precision to inexact.
360                num_rows = num_rows.add(&file_stats.num_rows);
361
362                total_byte_size = total_byte_size.add(&file_stats.total_byte_size);
363
364                for (file_col_stats, col_stats) in file_stats
365                    .column_statistics
366                    .iter()
367                    .zip(col_stats_set.iter_mut())
368                {
369                    let ColumnStatistics {
370                        null_count: file_nc,
371                        max_value: file_max,
372                        min_value: file_min,
373                        sum_value: file_sum,
374                        distinct_count: _,
375                    } = file_col_stats;
376
377                    col_stats.null_count = col_stats.null_count.add(file_nc);
378                    col_stats.max_value = col_stats.max_value.max(file_max);
379                    col_stats.min_value = col_stats.min_value.min(file_min);
380                    col_stats.sum_value = col_stats.sum_value.add(file_sum);
381                }
382
383                // If the number of rows exceeds the limit, we can stop processing
384                // files. This only applies when we know the number of rows. It also
385                // currently ignores tables that have no statistics regarding the
386                // number of rows.
387                if num_rows.get_value().unwrap_or(&usize::MIN)
388                    > &limit.unwrap_or(usize::MAX)
389                {
390                    break;
391                }
392            }
393        }
394    };
395
396    let mut statistics = Statistics {
397        num_rows,
398        total_byte_size,
399        column_statistics: col_stats_set,
400    };
401    if all_files.next().await.is_some() {
402        // If we still have files in the stream, it means that the limit kicked
403        // in, and the statistic could have been different had we processed the
404        // files in a different order.
405        statistics = statistics.to_inexact()
406    }
407
408    Ok((result_files, statistics))
409}
410
411/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
412///
413/// This function combines statistics from all files in the file group to create
414/// summary statistics. It handles the following aspects:
415/// - Merges row counts and byte sizes across files
416/// - Computes column-level statistics like min/max values
417/// - Maintains appropriate precision information (exact, inexact, absent)
418///
419/// # Parameters
420/// * `file_group` - The group of files to process
421/// * `file_schema` - Schema of the files
422/// * `collect_stats` - Whether to collect statistics (if false, returns original file group)
423///
424/// # Returns
425/// A new file group with summary statistics attached
426pub fn compute_file_group_statistics(
427    file_group: FileGroup,
428    file_schema: SchemaRef,
429    collect_stats: bool,
430) -> Result<FileGroup> {
431    if !collect_stats {
432        return Ok(file_group);
433    }
434
435    let file_group_stats = file_group.iter().filter_map(|file| {
436        let stats = file.statistics.as_ref()?;
437        Some(stats.as_ref())
438    });
439    let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?;
440
441    Ok(file_group.with_statistics(Arc::new(statistics)))
442}
443
444/// Computes statistics for all files across multiple file groups.
445///
446/// This function:
447/// 1. Computes statistics for each individual file group
448/// 2. Summary statistics across all file groups
449/// 3. Optionally marks statistics as inexact
450///
451/// # Parameters
452/// * `file_groups` - Vector of file groups to process
453/// * `table_schema` - Schema of the table
454/// * `collect_stats` - Whether to collect statistics
455/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
456///
457/// # Returns
458/// A tuple containing:
459/// * The processed file groups with their individual statistics attached
460/// * The summary statistics across all file groups, aka all files summary statistics
461pub fn compute_all_files_statistics(
462    file_groups: Vec<FileGroup>,
463    table_schema: SchemaRef,
464    collect_stats: bool,
465    inexact_stats: bool,
466) -> Result<(Vec<FileGroup>, Statistics)> {
467    let file_groups_with_stats = file_groups
468        .into_iter()
469        .map(|file_group| {
470            compute_file_group_statistics(
471                file_group,
472                Arc::clone(&table_schema),
473                collect_stats,
474            )
475        })
476        .collect::<Result<Vec<_>>>()?;
477
478    // Then summary statistics across all file groups
479    let file_groups_statistics = file_groups_with_stats
480        .iter()
481        .filter_map(|file_group| file_group.file_statistics(None));
482
483    let mut statistics =
484        Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;
485
486    if inexact_stats {
487        statistics = statistics.to_inexact()
488    }
489
490    Ok((file_groups_with_stats, statistics))
491}
492
493#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
494pub fn add_row_stats(
495    file_num_rows: Precision<usize>,
496    num_rows: Precision<usize>,
497) -> Precision<usize> {
498    file_num_rows.add(&num_rows)
499}