1use futures::{Stream, StreamExt};
24use std::sync::Arc;
25
26use crate::file_groups::FileGroup;
27use crate::PartitionedFile;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::SchemaRef;
31use arrow::{
32 compute::SortColumn,
33 row::{Row, Rows},
34};
35use datafusion_common::stats::Precision;
36use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
37use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
38use datafusion_physical_expr_common::sort_expr::LexOrdering;
39use datafusion_physical_plan::{ColumnStatistics, Statistics};
40
41pub(crate) struct MinMaxStatistics {
45 min_by_sort_order: Rows,
46 max_by_sort_order: Rows,
47 sort_order: LexOrdering,
48}
49
50impl MinMaxStatistics {
51 #[allow(unused)]
53 pub fn sort_order(&self) -> &LexOrdering {
54 &self.sort_order
55 }
56
57 #[allow(unused)]
59 pub fn min(&self, idx: usize) -> Row {
60 self.min_by_sort_order.row(idx)
61 }
62
63 pub fn max(&self, idx: usize) -> Row {
65 self.max_by_sort_order.row(idx)
66 }
67
68 pub fn new_from_files<'a>(
69 projected_sort_order: &LexOrdering, projected_schema: &SchemaRef, projection: Option<&[usize]>, files: impl IntoIterator<Item = &'a PartitionedFile>,
73 ) -> Result<Self> {
74 use datafusion_common::ScalarValue;
75
76 let statistics_and_partition_values = files
77 .into_iter()
78 .map(|file| {
79 file.statistics
80 .as_ref()
81 .zip(Some(file.partition_values.as_slice()))
82 })
83 .collect::<Option<Vec<_>>>()
84 .ok_or_else(|| {
85 DataFusionError::Plan("Parquet file missing statistics".to_string())
86 })?;
87
88 let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
90 Ok(statistics_and_partition_values
91 .iter()
92 .map(|(s, pv)| {
93 if i < s.column_statistics.len() {
94 s.column_statistics[i]
95 .min_value
96 .get_value()
97 .cloned()
98 .zip(s.column_statistics[i].max_value.get_value().cloned())
99 .ok_or_else(|| {
100 DataFusionError::Plan("statistics not found".to_string())
101 })
102 } else {
103 let partition_value = &pv[i - s.column_statistics.len()];
104 Ok((partition_value.clone(), partition_value.clone()))
105 }
106 })
107 .collect::<Result<Vec<_>>>()?
108 .into_iter()
109 .unzip())
110 };
111
112 let sort_columns = sort_columns_from_physical_sort_exprs(projected_sort_order)
113 .ok_or(DataFusionError::Plan(
114 "sort expression must be on column".to_string(),
115 ))?;
116
117 let min_max_schema = Arc::new(
119 projected_schema
120 .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
121 );
122 let min_max_sort_order = LexOrdering::from(
123 sort_columns
124 .iter()
125 .zip(projected_sort_order.iter())
126 .enumerate()
127 .map(|(i, (col, sort))| PhysicalSortExpr {
128 expr: Arc::new(Column::new(col.name(), i)),
129 options: sort.options,
130 })
131 .collect::<Vec<_>>(),
132 );
133
134 let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
135 .iter()
136 .map(|c| {
137 let i = projection
141 .map(|p| p[c.index()])
142 .unwrap_or_else(|| c.index());
143
144 let (min, max) = get_min_max(i).map_err(|e| {
145 e.context(format!("get min/max for column: '{}'", c.name()))
146 })?;
147 Ok((
148 ScalarValue::iter_to_array(min)?,
149 ScalarValue::iter_to_array(max)?,
150 ))
151 })
152 .collect::<Result<Vec<_>>>()
153 .map_err(|e| e.context("collect min/max values"))?
154 .into_iter()
155 .unzip();
156
157 Self::new(
158 &min_max_sort_order,
159 &min_max_schema,
160 RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
161 |e| {
162 DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string()))
163 },
164 )?,
165 RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err(
166 |e| {
167 DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string()))
168 },
169 )?,
170 )
171 }
172
173 pub fn new(
174 sort_order: &LexOrdering,
175 schema: &SchemaRef,
176 min_values: RecordBatch,
177 max_values: RecordBatch,
178 ) -> Result<Self> {
179 use arrow::row::*;
180
181 let sort_fields = sort_order
182 .iter()
183 .map(|expr| {
184 expr.expr
185 .data_type(schema)
186 .map(|data_type| SortField::new_with_options(data_type, expr.options))
187 })
188 .collect::<Result<Vec<_>>>()
189 .map_err(|e| e.context("create sort fields"))?;
190 let converter = RowConverter::new(sort_fields)?;
191
192 let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or(
193 DataFusionError::Plan("sort expression must be on column".to_string()),
194 )?;
195
196 let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
198 .iter()
199 .zip(sort_columns.iter().copied())
200 .map(|(sort_expr, column)| {
201 if sort_expr.options.descending {
202 max_values
203 .column_by_name(column.name())
204 .zip(min_values.column_by_name(column.name()))
205 } else {
206 min_values
207 .column_by_name(column.name())
208 .zip(max_values.column_by_name(column.name()))
209 }
210 .ok_or_else(|| {
211 plan_datafusion_err!(
212 "missing column in MinMaxStatistics::new: '{}'",
213 column.name()
214 )
215 })
216 })
217 .collect::<Result<Vec<_>>>()?
218 .into_iter()
219 .unzip();
220
221 let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
222 let values = RecordBatch::try_new(
223 min_values.schema(),
224 cols.into_iter().cloned().collect(),
225 )?;
226 let sorting_columns = sort_order
227 .iter()
228 .zip(sort_columns.iter().copied())
229 .map(|(sort_expr, column)| {
230 let schema = values.schema();
231
232 let idx = schema.index_of(column.name())?;
233 let field = schema.field(idx);
234
235 if field.is_nullable() {
237 return plan_err!("cannot sort by nullable column");
238 }
239
240 Ok(SortColumn {
241 values: Arc::clone(values.column(idx)),
242 options: Some(sort_expr.options),
243 })
244 })
245 .collect::<Result<Vec<_>>>()
246 .map_err(|e| e.context("create sorting columns"))?;
247 converter
248 .convert_columns(
249 &sorting_columns
250 .into_iter()
251 .map(|c| c.values)
252 .collect::<Vec<_>>(),
253 )
254 .map_err(|e| {
255 DataFusionError::ArrowError(e, Some("convert columns".to_string()))
256 })
257 });
258
259 Ok(Self {
260 min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
261 max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
262 sort_order: sort_order.clone(),
263 })
264 }
265
266 pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
268 let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
269 sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
270 sort
271 }
272
273 pub fn is_sorted(&self) -> bool {
275 self.max_by_sort_order
276 .iter()
277 .zip(self.min_by_sort_order.iter().skip(1))
278 .all(|(max, next_min)| max < next_min)
279 }
280}
281
282fn sort_columns_from_physical_sort_exprs(
283 sort_order: &LexOrdering,
284) -> Option<Vec<&Column>> {
285 sort_order
286 .iter()
287 .map(|expr| expr.expr.as_any().downcast_ref::<Column>())
288 .collect::<Option<Vec<_>>>()
289}
290
291#[deprecated(
297 since = "47.0.0",
298 note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
299)]
300#[allow(unused)]
301pub async fn get_statistics_with_limit(
302 all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
303 file_schema: SchemaRef,
304 limit: Option<usize>,
305 collect_stats: bool,
306) -> Result<(FileGroup, Statistics)> {
307 let mut result_files = FileGroup::default();
308 let size = file_schema.fields().len();
315 let mut col_stats_set = vec![ColumnStatistics::default(); size];
316 let mut num_rows = Precision::<usize>::Absent;
317 let mut total_byte_size = Precision::<usize>::Absent;
318
319 let mut all_files = Box::pin(all_files.fuse());
321
322 if let Some(first_file) = all_files.next().await {
323 let (mut file, file_stats) = first_file?;
324 file.statistics = Some(Arc::clone(&file_stats));
325 result_files.push(file);
326
327 num_rows = file_stats.num_rows;
329 total_byte_size = file_stats.total_byte_size;
330 for (index, file_column) in
331 file_stats.column_statistics.clone().into_iter().enumerate()
332 {
333 col_stats_set[index].null_count = file_column.null_count;
334 col_stats_set[index].max_value = file_column.max_value;
335 col_stats_set[index].min_value = file_column.min_value;
336 col_stats_set[index].sum_value = file_column.sum_value;
337 }
338
339 let conservative_num_rows = match num_rows {
344 Precision::Exact(nr) => nr,
345 _ => usize::MIN,
346 };
347 if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
348 while let Some(current) = all_files.next().await {
349 let (mut file, file_stats) = current?;
350 file.statistics = Some(Arc::clone(&file_stats));
351 result_files.push(file);
352 if !collect_stats {
353 continue;
354 }
355
356 num_rows = num_rows.add(&file_stats.num_rows);
361
362 total_byte_size = total_byte_size.add(&file_stats.total_byte_size);
363
364 for (file_col_stats, col_stats) in file_stats
365 .column_statistics
366 .iter()
367 .zip(col_stats_set.iter_mut())
368 {
369 let ColumnStatistics {
370 null_count: file_nc,
371 max_value: file_max,
372 min_value: file_min,
373 sum_value: file_sum,
374 distinct_count: _,
375 } = file_col_stats;
376
377 col_stats.null_count = col_stats.null_count.add(file_nc);
378 col_stats.max_value = col_stats.max_value.max(file_max);
379 col_stats.min_value = col_stats.min_value.min(file_min);
380 col_stats.sum_value = col_stats.sum_value.add(file_sum);
381 }
382
383 if num_rows.get_value().unwrap_or(&usize::MIN)
388 > &limit.unwrap_or(usize::MAX)
389 {
390 break;
391 }
392 }
393 }
394 };
395
396 let mut statistics = Statistics {
397 num_rows,
398 total_byte_size,
399 column_statistics: col_stats_set,
400 };
401 if all_files.next().await.is_some() {
402 statistics = statistics.to_inexact()
406 }
407
408 Ok((result_files, statistics))
409}
410
411pub fn compute_file_group_statistics(
427 file_group: FileGroup,
428 file_schema: SchemaRef,
429 collect_stats: bool,
430) -> Result<FileGroup> {
431 if !collect_stats {
432 return Ok(file_group);
433 }
434
435 let file_group_stats = file_group.iter().filter_map(|file| {
436 let stats = file.statistics.as_ref()?;
437 Some(stats.as_ref())
438 });
439 let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?;
440
441 Ok(file_group.with_statistics(Arc::new(statistics)))
442}
443
444pub fn compute_all_files_statistics(
462 file_groups: Vec<FileGroup>,
463 table_schema: SchemaRef,
464 collect_stats: bool,
465 inexact_stats: bool,
466) -> Result<(Vec<FileGroup>, Statistics)> {
467 let file_groups_with_stats = file_groups
468 .into_iter()
469 .map(|file_group| {
470 compute_file_group_statistics(
471 file_group,
472 Arc::clone(&table_schema),
473 collect_stats,
474 )
475 })
476 .collect::<Result<Vec<_>>>()?;
477
478 let file_groups_statistics = file_groups_with_stats
480 .iter()
481 .filter_map(|file_group| file_group.file_statistics(None));
482
483 let mut statistics =
484 Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;
485
486 if inexact_stats {
487 statistics = statistics.to_inexact()
488 }
489
490 Ok((file_groups_with_stats, statistics))
491}
492
493#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
494pub fn add_row_stats(
495 file_num_rows: Precision<usize>,
496 num_rows: Precision<usize>,
497) -> Precision<usize> {
498 file_num_rows.add(&num_rows)
499}