1use std::sync::Arc;
24
25use crate::PartitionedFile;
26use crate::file_groups::FileGroup;
27
28use arrow::array::RecordBatch;
29use arrow::compute::SortColumn;
30use arrow::datatypes::SchemaRef;
31use arrow::row::{Row, Rows};
32use datafusion_common::stats::Precision;
33use datafusion_common::{
34 DataFusionError, Result, ScalarValue, plan_datafusion_err, plan_err,
35};
36use datafusion_physical_expr::expressions::Column;
37use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
38use datafusion_physical_plan::{ColumnStatistics, Statistics};
39
40use futures::{Stream, StreamExt};
41
42pub(crate) struct MinMaxStatistics {
46 min_by_sort_order: Rows,
47 max_by_sort_order: Rows,
48 sort_order: LexOrdering,
49}
50
51impl MinMaxStatistics {
52 #[expect(unused)]
54 pub fn sort_order(&self) -> &LexOrdering {
55 &self.sort_order
56 }
57
58 #[expect(unused)]
60 pub fn min(&'_ self, idx: usize) -> Row<'_> {
61 self.min_by_sort_order.row(idx)
62 }
63
64 pub fn max(&'_ self, idx: usize) -> Row<'_> {
66 self.max_by_sort_order.row(idx)
67 }
68
69 pub fn new_from_files<'a>(
70 projected_sort_order: &LexOrdering, projected_schema: &SchemaRef, projection: Option<&[usize]>, files: impl IntoIterator<Item = &'a PartitionedFile>,
74 ) -> Result<Self> {
75 let Some(statistics_and_partition_values) = files
76 .into_iter()
77 .map(|file| {
78 file.statistics
79 .as_ref()
80 .zip(Some(file.partition_values.as_slice()))
81 })
82 .collect::<Option<Vec<_>>>()
83 else {
84 return plan_err!("Parquet file missing statistics");
85 };
86
87 let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
89 Ok(statistics_and_partition_values
90 .iter()
91 .map(|(s, pv)| {
92 if i < s.column_statistics.len() {
93 s.column_statistics[i]
94 .min_value
95 .get_value()
96 .cloned()
97 .zip(s.column_statistics[i].max_value.get_value().cloned())
98 .ok_or_else(|| plan_datafusion_err!("statistics not found"))
99 } else {
100 let partition_value = &pv[i - s.column_statistics.len()];
101 Ok((partition_value.clone(), partition_value.clone()))
102 }
103 })
104 .collect::<Result<Vec<_>>>()?
105 .into_iter()
106 .unzip())
107 };
108
109 let Some(sort_columns) =
110 sort_columns_from_physical_sort_exprs(projected_sort_order)
111 else {
112 return plan_err!("sort expression must be on column");
113 };
114
115 let min_max_schema = Arc::new(
117 projected_schema
118 .project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
119 );
120
121 let min_max_sort_order = projected_sort_order
122 .iter()
123 .zip(sort_columns.iter())
124 .enumerate()
125 .map(|(idx, (sort_expr, col))| {
126 let expr = Arc::new(Column::new(col.name(), idx));
127 PhysicalSortExpr::new(expr, sort_expr.options)
128 });
129 let min_max_sort_order = LexOrdering::new(min_max_sort_order).unwrap();
131
132 let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
133 .iter()
134 .map(|c| {
135 let i = projection
139 .map(|p| p[c.index()])
140 .unwrap_or_else(|| c.index());
141
142 let (min, max) = get_min_max(i).map_err(|e| {
143 e.context(format!("get min/max for column: '{}'", c.name()))
144 })?;
145 Ok((
146 ScalarValue::iter_to_array(min)?,
147 ScalarValue::iter_to_array(max)?,
148 ))
149 })
150 .collect::<Result<Vec<_>>>()
151 .map_err(|e| e.context("collect min/max values"))?
152 .into_iter()
153 .unzip();
154
155 let min_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), min_values)
156 .map_err(|e| {
157 DataFusionError::ArrowError(
158 Box::new(e),
159 Some("\ncreate min batch".to_string()),
160 )
161 })?;
162 let max_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), max_values)
163 .map_err(|e| {
164 DataFusionError::ArrowError(
165 Box::new(e),
166 Some("\ncreate max batch".to_string()),
167 )
168 })?;
169
170 Self::new(&min_max_sort_order, &min_max_schema, min_batch, max_batch)
171 }
172
173 #[expect(clippy::needless_pass_by_value)]
174 pub fn new(
175 sort_order: &LexOrdering,
176 schema: &SchemaRef,
177 min_values: RecordBatch,
178 max_values: RecordBatch,
179 ) -> Result<Self> {
180 use arrow::row::*;
181
182 let sort_fields = sort_order
183 .iter()
184 .map(|expr| {
185 expr.expr
186 .data_type(schema)
187 .map(|data_type| SortField::new_with_options(data_type, expr.options))
188 })
189 .collect::<Result<Vec<_>>>()
190 .map_err(|e| e.context("create sort fields"))?;
191 let converter = RowConverter::new(sort_fields)?;
192
193 let Some(sort_columns) = sort_columns_from_physical_sort_exprs(sort_order) else {
194 return plan_err!("sort expression must be on column");
195 };
196
197 let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
199 .iter()
200 .zip(sort_columns.iter().copied())
201 .map(|(sort_expr, column)| {
202 let maxes = max_values.column_by_name(column.name());
203 let mins = min_values.column_by_name(column.name());
204 let opt_value = if sort_expr.options.descending {
205 maxes.zip(mins)
206 } else {
207 mins.zip(maxes)
208 };
209 opt_value.ok_or_else(|| {
210 plan_datafusion_err!(
211 "missing column in MinMaxStatistics::new: '{}'",
212 column.name()
213 )
214 })
215 })
216 .collect::<Result<Vec<_>>>()?
217 .into_iter()
218 .unzip();
219
220 let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
221 let values = RecordBatch::try_new(
222 min_values.schema(),
223 cols.into_iter().cloned().collect(),
224 )?;
225 let sorting_columns = sort_order
226 .iter()
227 .zip(sort_columns.iter().copied())
228 .map(|(sort_expr, column)| {
229 let schema = values.schema();
230 let idx = schema.index_of(column.name())?;
231
232 Ok(SortColumn {
233 values: Arc::clone(values.column(idx)),
234 options: Some(sort_expr.options),
235 })
236 })
237 .collect::<Result<Vec<_>>>()
238 .map_err(|e| e.context("create sorting columns"))?;
239 converter
240 .convert_columns(
241 &sorting_columns
242 .into_iter()
243 .map(|c| c.values)
244 .collect::<Vec<_>>(),
245 )
246 .map_err(|e| {
247 DataFusionError::ArrowError(
248 Box::new(e),
249 Some("convert columns".to_string()),
250 )
251 })
252 });
253
254 Ok(Self {
255 min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
256 max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
257 sort_order: sort_order.clone(),
258 })
259 }
260
261 pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
263 let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
264 sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
265 sort
266 }
267
268 pub fn is_sorted(&self) -> bool {
270 self.max_by_sort_order
271 .iter()
272 .zip(self.min_by_sort_order.iter().skip(1))
273 .all(|(max, next_min)| max < next_min)
274 }
275}
276
277fn sort_columns_from_physical_sort_exprs(
278 sort_order: &LexOrdering,
279) -> Option<Vec<&Column>> {
280 sort_order
281 .iter()
282 .map(|expr| expr.expr.as_any().downcast_ref::<Column>())
283 .collect()
284}
285
286#[deprecated(
292 since = "47.0.0",
293 note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
294)]
295#[expect(unused)]
296pub async fn get_statistics_with_limit(
297 all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
298 file_schema: SchemaRef,
299 limit: Option<usize>,
300 collect_stats: bool,
301) -> Result<(FileGroup, Statistics)> {
302 let mut result_files = FileGroup::default();
303 let size = file_schema.fields().len();
310 let mut col_stats_set = vec![ColumnStatistics::default(); size];
311 let mut num_rows = Precision::<usize>::Absent;
312 let mut total_byte_size = Precision::<usize>::Absent;
313
314 let mut all_files = Box::pin(all_files.fuse());
316
317 if let Some(first_file) = all_files.next().await {
318 let (mut file, file_stats) = first_file?;
319 file.statistics = Some(Arc::clone(&file_stats));
320 result_files.push(file);
321
322 num_rows = file_stats.num_rows;
324 total_byte_size = file_stats.total_byte_size;
325 for (index, file_column) in
326 file_stats.column_statistics.clone().into_iter().enumerate()
327 {
328 col_stats_set[index].null_count = file_column.null_count;
329 col_stats_set[index].max_value = file_column.max_value;
330 col_stats_set[index].min_value = file_column.min_value;
331 col_stats_set[index].sum_value = file_column.sum_value;
332 }
333
334 let conservative_num_rows = match num_rows {
339 Precision::Exact(nr) => nr,
340 _ => usize::MIN,
341 };
342 if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
343 while let Some(current) = all_files.next().await {
344 let (mut file, file_stats) = current?;
345 file.statistics = Some(Arc::clone(&file_stats));
346 result_files.push(file);
347 if !collect_stats {
348 continue;
349 }
350
351 num_rows = num_rows.add(&file_stats.num_rows);
356
357 total_byte_size = total_byte_size.add(&file_stats.total_byte_size);
358
359 for (file_col_stats, col_stats) in file_stats
360 .column_statistics
361 .iter()
362 .zip(col_stats_set.iter_mut())
363 {
364 let ColumnStatistics {
365 null_count: file_nc,
366 max_value: file_max,
367 min_value: file_min,
368 sum_value: file_sum,
369 distinct_count: _,
370 byte_size: file_sbs,
371 } = file_col_stats;
372
373 col_stats.null_count = col_stats.null_count.add(file_nc);
374 col_stats.max_value = col_stats.max_value.max(file_max);
375 col_stats.min_value = col_stats.min_value.min(file_min);
376 col_stats.sum_value = col_stats.sum_value.add(file_sum);
377 col_stats.byte_size = col_stats.byte_size.add(file_sbs);
378 }
379
380 if num_rows.get_value().unwrap_or(&usize::MIN)
385 > &limit.unwrap_or(usize::MAX)
386 {
387 break;
388 }
389 }
390 }
391 };
392
393 let mut statistics = Statistics {
394 num_rows,
395 total_byte_size,
396 column_statistics: col_stats_set,
397 };
398 if all_files.next().await.is_some() {
399 statistics = statistics.to_inexact()
403 }
404
405 Ok((result_files, statistics))
406}
407
408#[expect(clippy::needless_pass_by_value)]
424pub fn compute_file_group_statistics(
425 file_group: FileGroup,
426 file_schema: SchemaRef,
427 collect_stats: bool,
428) -> Result<FileGroup> {
429 if !collect_stats {
430 return Ok(file_group);
431 }
432
433 let file_group_stats = file_group.iter().filter_map(|file| {
434 let stats = file.statistics.as_ref()?;
435 Some(stats.as_ref())
436 });
437 let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?;
438
439 Ok(file_group.with_statistics(Arc::new(statistics)))
440}
441
442#[expect(clippy::needless_pass_by_value)]
460pub fn compute_all_files_statistics(
461 file_groups: Vec<FileGroup>,
462 table_schema: SchemaRef,
463 collect_stats: bool,
464 inexact_stats: bool,
465) -> Result<(Vec<FileGroup>, Statistics)> {
466 let file_groups_with_stats = file_groups
467 .into_iter()
468 .map(|file_group| {
469 compute_file_group_statistics(
470 file_group,
471 Arc::clone(&table_schema),
472 collect_stats,
473 )
474 })
475 .collect::<Result<Vec<_>>>()?;
476
477 let file_groups_statistics = file_groups_with_stats
479 .iter()
480 .filter_map(|file_group| file_group.file_statistics(None));
481
482 let mut statistics =
483 Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;
484
485 if inexact_stats {
486 statistics = statistics.to_inexact()
487 }
488
489 Ok((file_groups_with_stats, statistics))
490}
491
492#[deprecated(since = "47.0.0", note = "Use Statistics::add")]
493pub fn add_row_stats(
494 file_num_rows: Precision<usize>,
495 num_rows: Precision<usize>,
496) -> Precision<usize> {
497 file_num_rows.add(&num_rows)
498}