Skip to main content

datafusion_datasource_parquet/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics
19//! and schema information.
20
21use crate::{
22    ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
23};
24use arrow::array::{Array, ArrayRef, BooleanArray};
25use arrow::compute::and;
26use arrow::compute::kernels::cmp::eq;
27use arrow::compute::sum;
28use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
29use datafusion_common::encryption::FileDecryptionProperties;
30use datafusion_common::stats::Precision;
31use datafusion_common::{
32    ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
33};
34use datafusion_execution::cache::cache_manager::{
35    CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
36};
37use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
38use datafusion_physical_expr::expressions::Column;
39use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
40use datafusion_physical_plan::Accumulator;
41use log::debug;
42use object_store::path::Path;
43use object_store::{ObjectMeta, ObjectStore};
44use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
45use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
46use parquet::file::metadata::{
47    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
48    SortingColumn,
49};
50use parquet::schema::types::SchemaDescriptor;
51use std::any::Any;
52use std::collections::HashMap;
53use std::sync::Arc;
54
55/// Handles fetching Parquet file schema, metadata and statistics
56/// from object store.
57///
58/// This component is exposed for low level integrations through
59/// [`ParquetFileReaderFactory`].
60///
61/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
62#[derive(Debug)]
63pub struct DFParquetMetadata<'a> {
64    store: &'a dyn ObjectStore,
65    object_meta: &'a ObjectMeta,
66    metadata_size_hint: Option<usize>,
67    decryption_properties: Option<Arc<FileDecryptionProperties>>,
68    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
69    /// timeunit to coerce INT96 timestamps to
70    pub coerce_int96: Option<TimeUnit>,
71}
72
73impl<'a> DFParquetMetadata<'a> {
74    pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self {
75        Self {
76            store,
77            object_meta,
78            metadata_size_hint: None,
79            decryption_properties: None,
80            file_metadata_cache: None,
81            coerce_int96: None,
82        }
83    }
84
85    /// set metadata size hint
86    pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
87        self.metadata_size_hint = metadata_size_hint;
88        self
89    }
90
91    /// set decryption properties
92    pub fn with_decryption_properties(
93        mut self,
94        decryption_properties: Option<Arc<FileDecryptionProperties>>,
95    ) -> Self {
96        self.decryption_properties = decryption_properties;
97        self
98    }
99
100    /// set file metadata cache
101    pub fn with_file_metadata_cache(
102        mut self,
103        file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
104    ) -> Self {
105        self.file_metadata_cache = file_metadata_cache;
106        self
107    }
108
109    /// Set timeunit to coerce INT96 timestamps to
110    pub fn with_coerce_int96(mut self, time_unit: Option<TimeUnit>) -> Self {
111        self.coerce_int96 = time_unit;
112        self
113    }
114
115    /// Fetch parquet metadata from the remote object store
116    pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
117        let Self {
118            store,
119            object_meta,
120            metadata_size_hint,
121            decryption_properties,
122            file_metadata_cache,
123            coerce_int96: _,
124        } = self;
125
126        let fetch = ObjectStoreFetch::new(*store, object_meta);
127
128        // implementation to fetch parquet metadata
129        let cache_metadata =
130            !cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
131
132        if cache_metadata
133            && let Some(file_metadata_cache) = file_metadata_cache.as_ref()
134            && let Some(cached) = file_metadata_cache.get(&object_meta.location)
135            && cached.is_valid_for(object_meta)
136            && let Some(cached_parquet) = cached
137                .file_metadata
138                .as_any()
139                .downcast_ref::<CachedParquetMetaData>()
140        {
141            return Ok(Arc::clone(cached_parquet.parquet_metadata()));
142        }
143
144        let mut reader =
145            ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
146
147        #[cfg(feature = "parquet_encryption")]
148        if let Some(decryption_properties) = decryption_properties {
149            reader = reader
150                .with_decryption_properties(Some(Arc::clone(decryption_properties)));
151        }
152
153        if cache_metadata && file_metadata_cache.is_some() {
154            // Need to retrieve the entire metadata for the caching to be effective.
155            reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
156        }
157
158        let metadata = Arc::new(
159            reader
160                .load_and_finish(fetch, object_meta.size)
161                .await
162                .map_err(DataFusionError::from)?,
163        );
164
165        if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
166            file_metadata_cache.put(
167                &object_meta.location,
168                CachedFileMetadataEntry::new(
169                    (*object_meta).clone(),
170                    Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
171                ),
172            );
173        }
174
175        Ok(metadata)
176    }
177
178    /// Read and parse the schema of the Parquet file
179    pub async fn fetch_schema(&self) -> Result<Schema> {
180        let metadata = self.fetch_metadata().await?;
181
182        let file_metadata = metadata.file_metadata();
183        let schema = parquet_to_arrow_schema(
184            file_metadata.schema_descr(),
185            file_metadata.key_value_metadata(),
186        )?;
187        let schema = self
188            .coerce_int96
189            .as_ref()
190            .and_then(|time_unit| {
191                coerce_int96_to_resolution(
192                    file_metadata.schema_descr(),
193                    &schema,
194                    time_unit,
195                )
196            })
197            .unwrap_or(schema);
198        Ok(schema)
199    }
200
201    /// Return (path, schema) tuple by fetching the schema from Parquet file
202    pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
203        let loc_path = self.object_meta.location.clone();
204        let schema = self.fetch_schema().await?;
205        Ok((loc_path, schema))
206    }
207
208    /// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert
209    /// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`]
210    pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
211        let metadata = self.fetch_metadata().await?;
212        Self::statistics_from_parquet_metadata(&metadata, table_schema)
213    }
214
215    /// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`]
216    ///
217    /// The statistics are calculated for each column in the table schema
218    /// using the row group statistics in the parquet metadata.
219    ///
220    /// # Key behaviors:
221    ///
222    /// 1. Extracts row counts and byte sizes from all row groups
223    /// 2. Applies schema type coercions to align file schema with table schema
224    /// 3. Collects and aggregates statistics across row groups when available
225    ///
226    /// # When there are no statistics:
227    ///
228    /// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with:
229    /// - Exact row count
230    /// - Exact byte size
231    /// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema)
232    /// - Column byte sizes are still calculated and recorded
233    ///
234    /// # When only some columns have statistics:
235    ///
236    /// For columns with statistics:
237    /// - Min/max values are properly extracted and represented as Precision::Exact
238    /// - Null counts are calculated by summing across row groups
239    /// - Byte sizes are calculated and recorded
240    ///
241    /// For columns without statistics,
242    /// - For min/max, there are two situations:
243    ///     1. The column isn't in arrow schema, then min/max values are set to Precision::Absent
244    ///     2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null)
245    /// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null)
246    ///
247    /// # Byte Size Calculation:
248    ///
249    /// - For primitive types with known fixed size, exact byte size is calculated as (byte width * number of rows)
250    /// - For other types, uncompressed Parquet size is used as an estimate for in-memory size
251    /// - If neither method is applicable, byte size is marked as Precision::Absent
252    pub fn statistics_from_parquet_metadata(
253        metadata: &ParquetMetaData,
254        logical_file_schema: &SchemaRef,
255    ) -> Result<Statistics> {
256        let row_groups_metadata = metadata.row_groups();
257
258        // Use Statistics::default() as opposed to Statistics::new_unknown()
259        // because we are going to replace the column statistics below
260        // and we don't want to initialize them twice.
261        let mut statistics = Statistics::default();
262        let mut has_statistics = false;
263        let mut num_rows = 0_usize;
264        for row_group_meta in row_groups_metadata {
265            num_rows += row_group_meta.num_rows() as usize;
266
267            if !has_statistics {
268                has_statistics = row_group_meta
269                    .columns()
270                    .iter()
271                    .any(|column| column.statistics().is_some());
272            }
273        }
274        statistics.num_rows = Precision::Exact(num_rows);
275
276        let file_metadata = metadata.file_metadata();
277        let mut physical_file_schema = parquet_to_arrow_schema(
278            file_metadata.schema_descr(),
279            file_metadata.key_value_metadata(),
280        )?;
281
282        if let Some(merged) =
283            apply_file_schema_type_coercions(logical_file_schema, &physical_file_schema)
284        {
285            physical_file_schema = merged;
286        }
287
288        statistics.column_statistics =
289            if has_statistics {
290                let (mut max_accs, mut min_accs) =
291                    create_max_min_accs(logical_file_schema);
292                let mut null_counts_array =
293                    vec![Precision::Absent; logical_file_schema.fields().len()];
294                let mut column_byte_sizes =
295                    vec![Precision::Absent; logical_file_schema.fields().len()];
296                let mut is_max_value_exact =
297                    vec![Some(true); logical_file_schema.fields().len()];
298                let mut is_min_value_exact =
299                    vec![Some(true); logical_file_schema.fields().len()];
300                logical_file_schema.fields().iter().enumerate().for_each(
301                    |(idx, field)| match StatisticsConverter::try_new(
302                        field.name(),
303                        &physical_file_schema,
304                        file_metadata.schema_descr(),
305                    ) {
306                        Ok(stats_converter) => {
307                            let mut accumulators = StatisticsAccumulators {
308                                min_accs: &mut min_accs,
309                                max_accs: &mut max_accs,
310                                null_counts_array: &mut null_counts_array,
311                                is_min_value_exact: &mut is_min_value_exact,
312                                is_max_value_exact: &mut is_max_value_exact,
313                                column_byte_sizes: &mut column_byte_sizes,
314                            };
315                            summarize_min_max_null_counts(
316                                file_metadata.schema_descr(),
317                                logical_file_schema,
318                                &physical_file_schema,
319                                &mut accumulators,
320                                idx,
321                                &stats_converter,
322                                row_groups_metadata,
323                            )
324                            .ok();
325                        }
326                        Err(e) => {
327                            debug!("Failed to create statistics converter: {e}");
328                            null_counts_array[idx] = Precision::Exact(num_rows);
329                        }
330                    },
331                );
332
333                get_col_stats(
334                    logical_file_schema,
335                    &null_counts_array,
336                    &mut max_accs,
337                    &mut min_accs,
338                    &mut is_max_value_exact,
339                    &mut is_min_value_exact,
340                    &column_byte_sizes,
341                )
342            } else {
343                // Record column sizes
344                logical_file_schema
345                    .fields()
346                    .iter()
347                    .enumerate()
348                    .map(|(logical_file_schema_index, field)| {
349                        let arrow_field =
350                            logical_file_schema.field(logical_file_schema_index);
351                        let parquet_idx = parquet_column(
352                            file_metadata.schema_descr(),
353                            &physical_file_schema,
354                            arrow_field.name(),
355                        )
356                        .map(|(idx, _)| idx);
357                        let byte_size = compute_arrow_column_size(
358                            field.data_type(),
359                            row_groups_metadata,
360                            parquet_idx,
361                            num_rows,
362                        );
363                        ColumnStatistics::new_unknown().with_byte_size(byte_size)
364                    })
365                    .collect()
366            };
367
368        #[cfg(debug_assertions)]
369        {
370            // Check that the column statistics length matches the table schema fields length
371            assert_eq!(
372                statistics.column_statistics.len(),
373                logical_file_schema.fields().len(),
374                "Column statistics length does not match table schema fields length"
375            );
376        }
377
378        Ok(statistics)
379    }
380}
381
382/// Min/max aggregation can take Dictionary encode input but always produces unpacked
383/// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
384/// The reason min/max aggregate produces unpacked output because there is only one
385/// min/max value per group; there is no needs to keep them Dictionary encoded
386fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
387    if let DataType::Dictionary(_, value_type) = input_type {
388        value_type.as_ref()
389    } else {
390        input_type
391    }
392}
393
394fn create_max_min_accs(
395    schema: &Schema,
396) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
397    let max_values: Vec<Option<MaxAccumulator>> = schema
398        .fields()
399        .iter()
400        .map(|field| {
401            MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
402        })
403        .collect();
404    let min_values: Vec<Option<MinAccumulator>> = schema
405        .fields()
406        .iter()
407        .map(|field| {
408            MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
409        })
410        .collect();
411    (max_values, min_values)
412}
413
414fn get_col_stats(
415    schema: &Schema,
416    null_counts: &[Precision<usize>],
417    max_values: &mut [Option<MaxAccumulator>],
418    min_values: &mut [Option<MinAccumulator>],
419    is_max_value_exact: &mut [Option<bool>],
420    is_min_value_exact: &mut [Option<bool>],
421    column_byte_sizes: &[Precision<usize>],
422) -> Vec<ColumnStatistics> {
423    (0..schema.fields().len())
424        .map(|i| {
425            let max_value = match (
426                max_values.get_mut(i).unwrap(),
427                is_max_value_exact.get(i).unwrap(),
428            ) {
429                (Some(max_value), Some(true)) => {
430                    max_value.evaluate().ok().map(Precision::Exact)
431                }
432                (Some(max_value), Some(false)) | (Some(max_value), None) => {
433                    max_value.evaluate().ok().map(Precision::Inexact)
434                }
435                (None, _) => None,
436            };
437            let min_value = match (
438                min_values.get_mut(i).unwrap(),
439                is_min_value_exact.get(i).unwrap(),
440            ) {
441                (Some(min_value), Some(true)) => {
442                    min_value.evaluate().ok().map(Precision::Exact)
443                }
444                (Some(min_value), Some(false)) | (Some(min_value), None) => {
445                    min_value.evaluate().ok().map(Precision::Inexact)
446                }
447                (None, _) => None,
448            };
449            ColumnStatistics {
450                null_count: null_counts[i],
451                max_value: max_value.unwrap_or(Precision::Absent),
452                min_value: min_value.unwrap_or(Precision::Absent),
453                sum_value: Precision::Absent,
454                distinct_count: Precision::Absent,
455                byte_size: column_byte_sizes[i],
456            }
457        })
458        .collect()
459}
460
461/// Holds the accumulator state for collecting statistics from row groups
462struct StatisticsAccumulators<'a> {
463    min_accs: &'a mut [Option<MinAccumulator>],
464    max_accs: &'a mut [Option<MaxAccumulator>],
465    null_counts_array: &'a mut [Precision<usize>],
466    is_min_value_exact: &'a mut [Option<bool>],
467    is_max_value_exact: &'a mut [Option<bool>],
468    column_byte_sizes: &'a mut [Precision<usize>],
469}
470
471fn summarize_min_max_null_counts(
472    parquet_schema: &SchemaDescriptor,
473    logical_file_schema: &Schema,
474    physical_file_schema: &Schema,
475    accumulators: &mut StatisticsAccumulators,
476    logical_schema_index: usize,
477    stats_converter: &StatisticsConverter,
478    row_groups_metadata: &[RowGroupMetaData],
479) -> Result<()> {
480    let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
481    let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
482    let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
483    let is_max_value_exact_stat =
484        stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
485    let is_min_value_exact_stat =
486        stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
487
488    if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] {
489        max_acc.update_batch(&[Arc::clone(&max_values)])?;
490
491        // handle the common special case when all row groups have exact statistics
492        let exactness = &is_max_value_exact_stat;
493        if !exactness.is_empty()
494            && exactness.null_count() == 0
495            && exactness.true_count() == exactness.len()
496        {
497            accumulators.is_max_value_exact[logical_schema_index] = Some(true);
498        } else if exactness.true_count() == 0 {
499            accumulators.is_max_value_exact[logical_schema_index] = Some(false);
500        } else {
501            let val = max_acc.evaluate()?;
502            accumulators.is_max_value_exact[logical_schema_index] =
503                has_any_exact_match(&val, &max_values, exactness);
504        }
505    }
506
507    if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] {
508        min_acc.update_batch(&[Arc::clone(&min_values)])?;
509
510        // handle the common special case when all row groups have exact statistics
511        let exactness = &is_min_value_exact_stat;
512        if !exactness.is_empty()
513            && exactness.null_count() == 0
514            && exactness.true_count() == exactness.len()
515        {
516            accumulators.is_min_value_exact[logical_schema_index] = Some(true);
517        } else if exactness.true_count() == 0 {
518            accumulators.is_min_value_exact[logical_schema_index] = Some(false);
519        } else {
520            let val = min_acc.evaluate()?;
521            accumulators.is_min_value_exact[logical_schema_index] =
522                has_any_exact_match(&val, &min_values, exactness);
523        }
524    }
525
526    accumulators.null_counts_array[logical_schema_index] = match sum(&null_counts) {
527        Some(null_count) => Precision::Exact(null_count as usize),
528        None => match null_counts.len() {
529            // If sum() returned None we either have no rows or all values are null
530            0 => Precision::Exact(0),
531            _ => Precision::Absent,
532        },
533    };
534
535    // This is the same logic as parquet_column but we start from arrow schema index
536    // instead of looking up by name.
537    let parquet_index = parquet_column(
538        parquet_schema,
539        physical_file_schema,
540        logical_file_schema.field(logical_schema_index).name(),
541    )
542    .map(|(idx, _)| idx);
543
544    let arrow_field = logical_file_schema.field(logical_schema_index);
545    accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size(
546        arrow_field.data_type(),
547        row_groups_metadata,
548        parquet_index,
549        row_groups_metadata
550            .iter()
551            .map(|rg| rg.num_rows() as usize)
552            .sum(),
553    );
554
555    Ok(())
556}
557
558/// Compute the Arrow in-memory size for a single column
559fn compute_arrow_column_size(
560    data_type: &DataType,
561    row_groups_metadata: &[RowGroupMetaData],
562    parquet_idx: Option<usize>,
563    num_rows: usize,
564) -> Precision<usize> {
565    // For primitive types with known fixed size, compute exact size
566    if let Some(byte_width) = data_type.primitive_width() {
567        return Precision::Exact(byte_width * num_rows);
568    }
569
570    // Use the uncompressed Parquet size as an estimate for other types
571    if let Some(parquet_idx) = parquet_idx {
572        let uncompressed_bytes: i64 = row_groups_metadata
573            .iter()
574            .filter_map(|rg| rg.columns().get(parquet_idx))
575            .map(|col| col.uncompressed_size())
576            .sum();
577        return Precision::Inexact(uncompressed_bytes as usize);
578    }
579
580    // Otherwise, we cannot determine the size
581    Precision::Absent
582}
583
584/// Checks if any occurrence of `value` in `array` corresponds to a `true`
585/// entry in the `exactness` array.
586///
587/// This is used to determine if a calculated statistic (e.g., min or max)
588/// is exact, by checking if at least one of its source values was exact.
589///
590/// # Example
591/// - `value`: `0`
592/// - `array`: `[0, 1, 0, 3, 0, 5]`
593/// - `exactness`: `[true, false, false, false, false, false]`
594///
595/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness
596/// values are `[true, false, false]`. Since at least one is `true`, the
597/// function returns `Some(true)`.
598fn has_any_exact_match(
599    value: &ScalarValue,
600    array: &ArrayRef,
601    exactness: &BooleanArray,
602) -> Option<bool> {
603    if value.is_null() {
604        return Some(false);
605    }
606
607    // Shortcut for single row group
608    if array.len() == 1 {
609        return Some(exactness.is_valid(0) && exactness.value(0));
610    }
611
612    let scalar_array = value.to_scalar().ok()?;
613    let eq_mask = eq(&scalar_array, &array).ok()?;
614    let combined_mask = and(&eq_mask, exactness).ok()?;
615    Some(combined_mask.true_count() > 0)
616}
617
618/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
619pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
620
621impl CachedParquetMetaData {
622    pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
623        Self(metadata)
624    }
625
626    pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
627        &self.0
628    }
629}
630
631impl FileMetadata for CachedParquetMetaData {
632    fn as_any(&self) -> &dyn Any {
633        self
634    }
635
636    fn memory_size(&self) -> usize {
637        self.0.memory_size()
638    }
639
640    fn extra_info(&self) -> HashMap<String, String> {
641        let page_index =
642            self.0.column_index().is_some() && self.0.offset_index().is_some();
643        HashMap::from([("page_index".to_owned(), page_index.to_string())])
644    }
645}
646
647/// Convert a [`PhysicalSortExpr`] to a Parquet [`SortingColumn`].
648///
649/// Returns `Err` if the expression is not a simple column reference.
650pub(crate) fn sort_expr_to_sorting_column(
651    sort_expr: &PhysicalSortExpr,
652) -> Result<SortingColumn> {
653    let column = sort_expr
654        .expr
655        .as_any()
656        .downcast_ref::<Column>()
657        .ok_or_else(|| {
658            DataFusionError::Plan(format!(
659                "Parquet sorting_columns only supports simple column references, \
660                 but got expression: {}",
661                sort_expr.expr
662            ))
663        })?;
664
665    let column_idx: i32 = column.index().try_into().map_err(|_| {
666        DataFusionError::Plan(format!(
667            "Column index {} is too large to be represented as i32",
668            column.index()
669        ))
670    })?;
671
672    Ok(SortingColumn {
673        column_idx,
674        descending: sort_expr.options.descending,
675        nulls_first: sort_expr.options.nulls_first,
676    })
677}
678
679/// Convert a [`LexOrdering`] to `Vec<SortingColumn>` for Parquet.
680///
681/// Returns `Err` if any expression is not a simple column reference.
682pub(crate) fn lex_ordering_to_sorting_columns(
683    ordering: &LexOrdering,
684) -> Result<Vec<SortingColumn>> {
685    ordering.iter().map(sort_expr_to_sorting_column).collect()
686}
687
688/// Extracts ordering information from Parquet metadata.
689///
690/// This function reads the sorting_columns from the first row group's metadata
691/// and converts them into a [`LexOrdering`] that can be used by the query engine.
692///
693/// # Arguments
694/// * `metadata` - The Parquet metadata containing sorting_columns information
695/// * `schema` - The Arrow schema to use for column lookup
696///
697/// # Returns
698/// * `Ok(Some(ordering))` if valid ordering information was found
699/// * `Ok(None)` if no sorting columns were specified or they couldn't be resolved
700pub fn ordering_from_parquet_metadata(
701    metadata: &ParquetMetaData,
702    schema: &SchemaRef,
703) -> Result<Option<LexOrdering>> {
704    // Get the sorting columns from the first row group metadata.
705    // If no row groups exist or no sorting columns are specified, return None.
706    let sorting_columns = metadata
707        .row_groups()
708        .first()
709        .and_then(|rg| rg.sorting_columns())
710        .filter(|cols| !cols.is_empty());
711
712    let Some(sorting_columns) = sorting_columns else {
713        return Ok(None);
714    };
715
716    let parquet_schema = metadata.file_metadata().schema_descr();
717
718    let sort_exprs =
719        sorting_columns_to_physical_exprs(sorting_columns, parquet_schema, schema);
720
721    if sort_exprs.is_empty() {
722        return Ok(None);
723    }
724
725    Ok(LexOrdering::new(sort_exprs))
726}
727
728/// Converts Parquet sorting columns to physical sort expressions.
729fn sorting_columns_to_physical_exprs(
730    sorting_columns: &[SortingColumn],
731    parquet_schema: &SchemaDescriptor,
732    arrow_schema: &SchemaRef,
733) -> Vec<PhysicalSortExpr> {
734    use arrow::compute::SortOptions;
735
736    sorting_columns
737        .iter()
738        .filter_map(|sc| {
739            let parquet_column = parquet_schema.column(sc.column_idx as usize);
740            let name = parquet_column.name();
741
742            // Find the column in the arrow schema
743            let (index, _) = arrow_schema.column_with_name(name)?;
744
745            let expr = Arc::new(Column::new(name, index));
746            let options = SortOptions {
747                descending: sc.descending,
748                nulls_first: sc.nulls_first,
749            };
750            Some(PhysicalSortExpr::new(expr, options))
751        })
752        .collect()
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use arrow::array::{ArrayRef, BooleanArray, Int32Array};
759    use datafusion_common::ScalarValue;
760    use std::sync::Arc;
761
762    #[test]
763    fn test_has_any_exact_match() {
764        // Case 1: Mixed exact and inexact matches
765        {
766            let computed_min = ScalarValue::Int32(Some(0));
767            let row_group_mins =
768                Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
769            let exactness =
770                BooleanArray::from(vec![true, false, false, false, false, false]);
771
772            let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
773            assert_eq!(result, Some(true));
774        }
775        // Case 2: All inexact matches
776        {
777            let computed_min = ScalarValue::Int32(Some(0));
778            let row_group_mins =
779                Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
780            let exactness =
781                BooleanArray::from(vec![false, false, false, false, false, false]);
782
783            let result = has_any_exact_match(&computed_min, &row_group_mins, &exactness);
784            assert_eq!(result, Some(false));
785        }
786        // Case 3: All exact matches
787        {
788            let computed_max = ScalarValue::Int32(Some(5));
789            let row_group_maxes =
790                Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
791            let exactness =
792                BooleanArray::from(vec![false, true, true, true, false, true]);
793
794            let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
795            assert_eq!(result, Some(true));
796        }
797        // Case 4: All maxes are null values
798        {
799            let computed_max = ScalarValue::Int32(None);
800            let row_group_maxes =
801                Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
802            let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);
803
804            let result = has_any_exact_match(&computed_max, &row_group_maxes, &exactness);
805            assert_eq!(result, Some(false));
806        }
807    }
808}