1use crate::{
22 ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
23};
24use arrow::array::{Array, ArrayRef, BooleanArray};
25use arrow::compute::and;
26use arrow::compute::kernels::cmp::eq;
27use arrow::compute::sum;
28use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
29use datafusion_common::encryption::FileDecryptionProperties;
30use datafusion_common::stats::Precision;
31use datafusion_common::{
32 ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
33};
34use datafusion_execution::cache::cache_manager::{
35 CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
36};
37use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
38use datafusion_physical_expr::expressions::Column;
39use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
40use datafusion_physical_plan::Accumulator;
41use log::debug;
42use object_store::path::Path;
43use object_store::{ObjectMeta, ObjectStore};
44use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
45use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
46use parquet::file::metadata::{
47 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
48 SortingColumn,
49};
50use parquet::schema::types::SchemaDescriptor;
51use std::any::Any;
52use std::collections::HashMap;
53use std::sync::Arc;
54
55#[derive(Debug)]
63pub struct DFParquetMetadata<'a> {
64 store: &'a dyn ObjectStore,
65 object_meta: &'a ObjectMeta,
66 metadata_size_hint: Option<usize>,
67 decryption_properties: Option<Arc<FileDecryptionProperties>>,
68 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
69 pub coerce_int96: Option<TimeUnit>,
71}
72
73impl<'a> DFParquetMetadata<'a> {
74 pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self {
75 Self {
76 store,
77 object_meta,
78 metadata_size_hint: None,
79 decryption_properties: None,
80 file_metadata_cache: None,
81 coerce_int96: None,
82 }
83 }
84
85 pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
87 self.metadata_size_hint = metadata_size_hint;
88 self
89 }
90
91 pub fn with_decryption_properties(
93 mut self,
94 decryption_properties: Option<Arc<FileDecryptionProperties>>,
95 ) -> Self {
96 self.decryption_properties = decryption_properties;
97 self
98 }
99
100 pub fn with_file_metadata_cache(
102 mut self,
103 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
104 ) -> Self {
105 self.file_metadata_cache = file_metadata_cache;
106 self
107 }
108
109 pub fn with_coerce_int96(mut self, time_unit: Option<TimeUnit>) -> Self {
111 self.coerce_int96 = time_unit;
112 self
113 }
114
115 pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
117 let Self {
118 store,
119 object_meta,
120 metadata_size_hint,
121 decryption_properties,
122 file_metadata_cache,
123 coerce_int96: _,
124 } = self;
125
126 let fetch = ObjectStoreFetch::new(*store, object_meta);
127
128 let cache_metadata =
130 !cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
131
132 if cache_metadata
133 && let Some(file_metadata_cache) = file_metadata_cache.as_ref()
134 && let Some(cached) = file_metadata_cache.get(&object_meta.location)
135 && cached.is_valid_for(object_meta)
136 && let Some(cached_parquet) = cached
137 .file_metadata
138 .as_any()
139 .downcast_ref::<CachedParquetMetaData>()
140 {
141 return Ok(Arc::clone(cached_parquet.parquet_metadata()));
142 }
143
144 let mut reader =
145 ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
146
147 #[cfg(feature = "parquet_encryption")]
148 if let Some(decryption_properties) = decryption_properties {
149 reader = reader
150 .with_decryption_properties(Some(Arc::clone(decryption_properties)));
151 }
152
153 if cache_metadata && file_metadata_cache.is_some() {
154 reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
156 }
157
158 let metadata = Arc::new(
159 reader
160 .load_and_finish(fetch, object_meta.size)
161 .await
162 .map_err(DataFusionError::from)?,
163 );
164
165 if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
166 file_metadata_cache.put(
167 &object_meta.location,
168 CachedFileMetadataEntry::new(
169 (*object_meta).clone(),
170 Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
171 ),
172 );
173 }
174
175 Ok(metadata)
176 }
177
178 pub async fn fetch_schema(&self) -> Result<Schema> {
180 let metadata = self.fetch_metadata().await?;
181
182 let file_metadata = metadata.file_metadata();
183 let schema = parquet_to_arrow_schema(
184 file_metadata.schema_descr(),
185 file_metadata.key_value_metadata(),
186 )?;
187 let schema = self
188 .coerce_int96
189 .as_ref()
190 .and_then(|time_unit| {
191 coerce_int96_to_resolution(
192 file_metadata.schema_descr(),
193 &schema,
194 time_unit,
195 )
196 })
197 .unwrap_or(schema);
198 Ok(schema)
199 }
200
201 pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
203 let loc_path = self.object_meta.location.clone();
204 let schema = self.fetch_schema().await?;
205 Ok((loc_path, schema))
206 }
207
208 pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
211 let metadata = self.fetch_metadata().await?;
212 Self::statistics_from_parquet_metadata(&metadata, table_schema)
213 }
214
215 pub fn statistics_from_parquet_metadata(
253 metadata: &ParquetMetaData,
254 logical_file_schema: &SchemaRef,
255 ) -> Result<Statistics> {
256 let row_groups_metadata = metadata.row_groups();
257
258 let mut statistics = Statistics::default();
262 let mut has_statistics = false;
263 let mut num_rows = 0_usize;
264 for row_group_meta in row_groups_metadata {
265 num_rows += row_group_meta.num_rows() as usize;
266
267 if !has_statistics {
268 has_statistics = row_group_meta
269 .columns()
270 .iter()
271 .any(|column| column.statistics().is_some());
272 }
273 }
274 statistics.num_rows = Precision::Exact(num_rows);
275
276 let file_metadata = metadata.file_metadata();
277 let mut physical_file_schema = parquet_to_arrow_schema(
278 file_metadata.schema_descr(),
279 file_metadata.key_value_metadata(),
280 )?;
281
282 if let Some(merged) =
283 apply_file_schema_type_coercions(logical_file_schema, &physical_file_schema)
284 {
285 physical_file_schema = merged;
286 }
287
288 statistics.column_statistics =
289 if has_statistics {
290 let (mut max_accs, mut min_accs) =
291 create_max_min_accs(logical_file_schema);
292 let mut null_counts_array =
293 vec![Precision::Absent; logical_file_schema.fields().len()];
294 let mut column_byte_sizes =
295 vec![Precision::Absent; logical_file_schema.fields().len()];
296 let mut is_max_value_exact =
297 vec![Some(true); logical_file_schema.fields().len()];
298 let mut is_min_value_exact =
299 vec![Some(true); logical_file_schema.fields().len()];
300 logical_file_schema.fields().iter().enumerate().for_each(
301 |(idx, field)| match StatisticsConverter::try_new(
302 field.name(),
303 &physical_file_schema,
304 file_metadata.schema_descr(),
305 ) {
306 Ok(stats_converter) => {
307 let mut accumulators = StatisticsAccumulators {
308 min_accs: &mut min_accs,
309 max_accs: &mut max_accs,
310 null_counts_array: &mut null_counts_array,
311 is_min_value_exact: &mut is_min_value_exact,
312 is_max_value_exact: &mut is_max_value_exact,
313 column_byte_sizes: &mut column_byte_sizes,
314 };
315 summarize_min_max_null_counts(
316 file_metadata.schema_descr(),
317 logical_file_schema,
318 &physical_file_schema,
319 &mut accumulators,
320 idx,
321 &stats_converter,
322 row_groups_metadata,
323 )
324 .ok();
325 }
326 Err(e) => {
327 debug!("Failed to create statistics converter: {e}");
328 null_counts_array[idx] = Precision::Exact(num_rows);
329 }
330 },
331 );
332
333 get_col_stats(
334 logical_file_schema,
335 &null_counts_array,
336 &mut max_accs,
337 &mut min_accs,
338 &mut is_max_value_exact,
339 &mut is_min_value_exact,
340 &column_byte_sizes,
341 )
342 } else {
343 logical_file_schema
345 .fields()
346 .iter()
347 .enumerate()
348 .map(|(logical_file_schema_index, field)| {
349 let arrow_field =
350 logical_file_schema.field(logical_file_schema_index);
351 let parquet_idx = parquet_column(
352 file_metadata.schema_descr(),
353 &physical_file_schema,
354 arrow_field.name(),
355 )
356 .map(|(idx, _)| idx);
357 let byte_size = compute_arrow_column_size(
358 field.data_type(),
359 row_groups_metadata,
360 parquet_idx,
361 num_rows,
362 );
363 ColumnStatistics::new_unknown().with_byte_size(byte_size)
364 })
365 .collect()
366 };
367
368 #[cfg(debug_assertions)]
369 {
370 assert_eq!(
372 statistics.column_statistics.len(),
373 logical_file_schema.fields().len(),
374 "Column statistics length does not match table schema fields length"
375 );
376 }
377
378 Ok(statistics)
379 }
380}
381
382fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
387 if let DataType::Dictionary(_, value_type) = input_type {
388 value_type.as_ref()
389 } else {
390 input_type
391 }
392}
393
394fn create_max_min_accs(
395 schema: &Schema,
396) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
397 let max_values: Vec<Option<MaxAccumulator>> = schema
398 .fields()
399 .iter()
400 .map(|field| {
401 MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
402 })
403 .collect();
404 let min_values: Vec<Option<MinAccumulator>> = schema
405 .fields()
406 .iter()
407 .map(|field| {
408 MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
409 })
410 .collect();
411 (max_values, min_values)
412}
413
414fn get_col_stats(
415 schema: &Schema,
416 null_counts: &[Precision<usize>],
417 max_values: &mut [Option<MaxAccumulator>],
418 min_values: &mut [Option<MinAccumulator>],
419 is_max_value_exact: &mut [Option<bool>],
420 is_min_value_exact: &mut [Option<bool>],
421 column_byte_sizes: &[Precision<usize>],
422) -> Vec<ColumnStatistics> {
423 (0..schema.fields().len())
424 .map(|i| {
425 let max_value = match (
426 max_values.get_mut(i).unwrap(),
427 is_max_value_exact.get(i).unwrap(),
428 ) {
429 (Some(max_value), Some(true)) => {
430 max_value.evaluate().ok().map(Precision::Exact)
431 }
432 (Some(max_value), Some(false)) | (Some(max_value), None) => {
433 max_value.evaluate().ok().map(Precision::Inexact)
434 }
435 (None, _) => None,
436 };
437 let min_value = match (
438 min_values.get_mut(i).unwrap(),
439 is_min_value_exact.get(i).unwrap(),
440 ) {
441 (Some(min_value), Some(true)) => {
442 min_value.evaluate().ok().map(Precision::Exact)
443 }
444 (Some(min_value), Some(false)) | (Some(min_value), None) => {
445 min_value.evaluate().ok().map(Precision::Inexact)
446 }
447 (None, _) => None,
448 };
449 ColumnStatistics {
450 null_count: null_counts[i],
451 max_value: max_value.unwrap_or(Precision::Absent),
452 min_value: min_value.unwrap_or(Precision::Absent),
453 sum_value: Precision::Absent,
454 distinct_count: Precision::Absent,
455 byte_size: column_byte_sizes[i],
456 }
457 })
458 .collect()
459}
460
461struct StatisticsAccumulators<'a> {
463 min_accs: &'a mut [Option<MinAccumulator>],
464 max_accs: &'a mut [Option<MaxAccumulator>],
465 null_counts_array: &'a mut [Precision<usize>],
466 is_min_value_exact: &'a mut [Option<bool>],
467 is_max_value_exact: &'a mut [Option<bool>],
468 column_byte_sizes: &'a mut [Precision<usize>],
469}
470
471fn summarize_min_max_null_counts(
472 parquet_schema: &SchemaDescriptor,
473 logical_file_schema: &Schema,
474 physical_file_schema: &Schema,
475 accumulators: &mut StatisticsAccumulators,
476 logical_schema_index: usize,
477 stats_converter: &StatisticsConverter,
478 row_groups_metadata: &[RowGroupMetaData],
479) -> Result<()> {
480 let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
481 let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
482 let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
483 let is_max_value_exact_stat =
484 stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
485 let is_min_value_exact_stat =
486 stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
487
488 if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] {
489 max_acc.update_batch(&[Arc::clone(&max_values)])?;
490
491 let exactness = &is_max_value_exact_stat;
493 if !exactness.is_empty()
494 && exactness.null_count() == 0
495 && exactness.true_count() == exactness.len()
496 {
497 accumulators.is_max_value_exact[logical_schema_index] = Some(true);
498 } else if exactness.true_count() == 0 {
499 accumulators.is_max_value_exact[logical_schema_index] = Some(false);
500 } else {
501 let val = max_acc.evaluate()?;
502 accumulators.is_max_value_exact[logical_schema_index] =
503 has_any_exact_match(&val, &max_values, exactness);
504 }
505 }
506
507 if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] {
508 min_acc.update_batch(&[Arc::clone(&min_values)])?;
509
510 let exactness = &is_min_value_exact_stat;
512 if !exactness.is_empty()
513 && exactness.null_count() == 0
514 && exactness.true_count() == exactness.len()
515 {
516 accumulators.is_min_value_exact[logical_schema_index] = Some(true);
517 } else if exactness.true_count() == 0 {
518 accumulators.is_min_value_exact[logical_schema_index] = Some(false);
519 } else {
520 let val = min_acc.evaluate()?;
521 accumulators.is_min_value_exact[logical_schema_index] =
522 has_any_exact_match(&val, &min_values, exactness);
523 }
524 }
525
526 accumulators.null_counts_array[logical_schema_index] = match sum(&null_counts) {
527 Some(null_count) => Precision::Exact(null_count as usize),
528 None => match null_counts.len() {
529 0 => Precision::Exact(0),
531 _ => Precision::Absent,
532 },
533 };
534
535 let parquet_index = parquet_column(
538 parquet_schema,
539 physical_file_schema,
540 logical_file_schema.field(logical_schema_index).name(),
541 )
542 .map(|(idx, _)| idx);
543
544 let arrow_field = logical_file_schema.field(logical_schema_index);
545 accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size(
546 arrow_field.data_type(),
547 row_groups_metadata,
548 parquet_index,
549 row_groups_metadata
550 .iter()
551 .map(|rg| rg.num_rows() as usize)
552 .sum(),
553 );
554
555 Ok(())
556}
557
558fn compute_arrow_column_size(
560 data_type: &DataType,
561 row_groups_metadata: &[RowGroupMetaData],
562 parquet_idx: Option<usize>,
563 num_rows: usize,
564) -> Precision<usize> {
565 if let Some(byte_width) = data_type.primitive_width() {
567 return Precision::Exact(byte_width * num_rows);
568 }
569
570 if let Some(parquet_idx) = parquet_idx {
572 let uncompressed_bytes: i64 = row_groups_metadata
573 .iter()
574 .filter_map(|rg| rg.columns().get(parquet_idx))
575 .map(|col| col.uncompressed_size())
576 .sum();
577 return Precision::Inexact(uncompressed_bytes as usize);
578 }
579
580 Precision::Absent
582}
583
584fn has_any_exact_match(
599 value: &ScalarValue,
600 array: &ArrayRef,
601 exactness: &BooleanArray,
602) -> Option<bool> {
603 if value.is_null() {
604 return Some(false);
605 }
606
607 if array.len() == 1 {
609 return Some(exactness.is_valid(0) && exactness.value(0));
610 }
611
612 let scalar_array = value.to_scalar().ok()?;
613 let eq_mask = eq(&scalar_array, &array).ok()?;
614 let combined_mask = and(&eq_mask, exactness).ok()?;
615 Some(combined_mask.true_count() > 0)
616}
617
618pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
620
621impl CachedParquetMetaData {
622 pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
623 Self(metadata)
624 }
625
626 pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
627 &self.0
628 }
629}
630
631impl FileMetadata for CachedParquetMetaData {
632 fn as_any(&self) -> &dyn Any {
633 self
634 }
635
636 fn memory_size(&self) -> usize {
637 self.0.memory_size()
638 }
639
640 fn extra_info(&self) -> HashMap<String, String> {
641 let page_index =
642 self.0.column_index().is_some() && self.0.offset_index().is_some();
643 HashMap::from([("page_index".to_owned(), page_index.to_string())])
644 }
645}
646
647pub(crate) fn sort_expr_to_sorting_column(
651 sort_expr: &PhysicalSortExpr,
652) -> Result<SortingColumn> {
653 let column = sort_expr
654 .expr
655 .as_any()
656 .downcast_ref::<Column>()
657 .ok_or_else(|| {
658 DataFusionError::Plan(format!(
659 "Parquet sorting_columns only supports simple column references, \
660 but got expression: {}",
661 sort_expr.expr
662 ))
663 })?;
664
665 let column_idx: i32 = column.index().try_into().map_err(|_| {
666 DataFusionError::Plan(format!(
667 "Column index {} is too large to be represented as i32",
668 column.index()
669 ))
670 })?;
671
672 Ok(SortingColumn {
673 column_idx,
674 descending: sort_expr.options.descending,
675 nulls_first: sort_expr.options.nulls_first,
676 })
677}
678
679pub(crate) fn lex_ordering_to_sorting_columns(
683 ordering: &LexOrdering,
684) -> Result<Vec<SortingColumn>> {
685 ordering.iter().map(sort_expr_to_sorting_column).collect()
686}
687
688pub fn ordering_from_parquet_metadata(
701 metadata: &ParquetMetaData,
702 schema: &SchemaRef,
703) -> Result<Option<LexOrdering>> {
704 let sorting_columns = metadata
707 .row_groups()
708 .first()
709 .and_then(|rg| rg.sorting_columns())
710 .filter(|cols| !cols.is_empty());
711
712 let Some(sorting_columns) = sorting_columns else {
713 return Ok(None);
714 };
715
716 let parquet_schema = metadata.file_metadata().schema_descr();
717
718 let sort_exprs =
719 sorting_columns_to_physical_exprs(sorting_columns, parquet_schema, schema);
720
721 if sort_exprs.is_empty() {
722 return Ok(None);
723 }
724
725 Ok(LexOrdering::new(sort_exprs))
726}
727
728fn sorting_columns_to_physical_exprs(
730 sorting_columns: &[SortingColumn],
731 parquet_schema: &SchemaDescriptor,
732 arrow_schema: &SchemaRef,
733) -> Vec<PhysicalSortExpr> {
734 use arrow::compute::SortOptions;
735
736 sorting_columns
737 .iter()
738 .filter_map(|sc| {
739 let parquet_column = parquet_schema.column(sc.column_idx as usize);
740 let name = parquet_column.name();
741
742 let (index, _) = arrow_schema.column_with_name(name)?;
744
745 let expr = Arc::new(Column::new(name, index));
746 let options = SortOptions {
747 descending: sc.descending,
748 nulls_first: sc.nulls_first,
749 };
750 Some(PhysicalSortExpr::new(expr, options))
751 })
752 .collect()
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use arrow::array::{ArrayRef, BooleanArray, Int32Array};
759 use datafusion_common::ScalarValue;
760 use std::sync::Arc;
761
762 #[test]
763 fn test_has_any_exact_match() {
764 {
766 let computed_min = ScalarValue::Int32(Some(0));
767 let row_group_mins =
768 Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
769 let exactness =
770 BooleanArray::from(vec![true, false, false, false, false, false]);
771
772 let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
773 assert_eq!(result, Some(true));
774 }
775 {
777 let computed_min = ScalarValue::Int32(Some(0));
778 let row_group_mins =
779 Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
780 let exactness =
781 BooleanArray::from(vec![false, false, false, false, false, false]);
782
783 let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
784 assert_eq!(result, Some(false));
785 }
786 {
788 let computed_max = ScalarValue::Int32(Some(5));
789 let row_group_maxes =
790 Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
791 let exactness =
792 BooleanArray::from(vec![false, true, true, true, false, true]);
793
794 let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
795 assert_eq!(result, Some(true));
796 }
797 {
799 let computed_max = ScalarValue::Int32(None);
800 let row_group_maxes =
801 Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
802 let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);
803
804 let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
805 assert_eq!(result, Some(false));
806 }
807 }
808}