datafusion_datasource_parquet/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ParquetSource implementation for reading parquet files
19use std::any::Any;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::sync::Arc;
23
24use crate::DefaultParquetFileReaderFactory;
25use crate::ParquetFileReaderFactory;
26use crate::opener::ParquetOpener;
27use crate::opener::build_pruning_predicates;
28use crate::row_filter::can_expr_be_pushed_down_with_schemas;
29use datafusion_common::config::ConfigOptions;
30#[cfg(feature = "parquet_encryption")]
31use datafusion_common::config::EncryptionFactoryOptions;
32use datafusion_datasource::as_file_source;
33use datafusion_datasource::file_stream::FileOpener;
34
35use arrow::datatypes::TimeUnit;
36use datafusion_common::DataFusionError;
37use datafusion_common::config::TableParquetOptions;
38use datafusion_datasource::TableSchema;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_physical_expr::projection::ProjectionExprs;
42use datafusion_physical_expr::{EquivalenceProperties, conjunction};
43use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::physical_expr::fmt_sql;
46use datafusion_physical_plan::DisplayFormatType;
47use datafusion_physical_plan::SortOrderPushdownResult;
48use datafusion_physical_plan::filter_pushdown::PushedDown;
49use datafusion_physical_plan::filter_pushdown::{
50 FilterPushdownPropagation, PushedDownPredicate,
51};
52use datafusion_physical_plan::metrics::Count;
53use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
54
55#[cfg(feature = "parquet_encryption")]
56use datafusion_execution::parquet_encryption::EncryptionFactory;
57use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
58use itertools::Itertools;
59use object_store::ObjectStore;
60#[cfg(feature = "parquet_encryption")]
61use parquet::encryption::decrypt::FileDecryptionProperties;
62
63/// Execution plan for reading one or more Parquet files.
64///
65/// ```text
66/// ▲
67/// │
68/// │ Produce a stream of
69/// │ RecordBatches
70/// │
71/// ┌───────────────────────┐
72/// │ │
73/// │ DataSourceExec │
74/// │ │
75/// └───────────────────────┘
76/// ▲
77/// │ Asynchronously read from one
78/// │ or more parquet files via
79/// │ ObjectStore interface
80/// │
81/// │
82/// .───────────────────.
83/// │ )
84/// │`───────────────────'│
85/// │ ObjectStore │
86/// │.───────────────────.│
87/// │ )
88/// `───────────────────'
89/// ```
90///
91/// # Example: Create a `DataSourceExec`
92/// ```
93/// # use std::sync::Arc;
94/// # use arrow::datatypes::Schema;
95/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
96/// # use datafusion_datasource_parquet::source::ParquetSource;
97/// # use datafusion_datasource::PartitionedFile;
98/// # use datafusion_execution::object_store::ObjectStoreUrl;
99/// # use datafusion_physical_expr::expressions::lit;
100/// # use datafusion_datasource::source::DataSourceExec;
101/// # use datafusion_common::config::TableParquetOptions;
102///
103/// # let file_schema = Arc::new(Schema::empty());
104/// # let object_store_url = ObjectStoreUrl::local_filesystem();
105/// # let predicate = lit(true);
106/// let source = Arc::new(
107/// ParquetSource::new(Arc::clone(&file_schema))
108/// .with_predicate(predicate)
109/// );
110/// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
111/// let config = FileScanConfigBuilder::new(object_store_url, source)
112/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
113/// let exec = DataSourceExec::from_data_source(config);
114/// ```
115///
116/// # Features
117///
118/// Supports the following optimizations:
119///
120/// * Concurrent reads: reads from one or more files in parallel as multiple
121/// partitions, including concurrently reading multiple row groups from a single
122/// file.
123///
124/// * Predicate push down: skips row groups, pages, rows based on metadata
125/// and late materialization. See "Predicate Pushdown" below.
126///
127/// * Projection pushdown: reads and decodes only the columns required.
128///
129/// * Limit pushdown: stop execution early after some number of rows are read.
130///
131/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
132/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
133/// details.
134///
135/// * Schema evolution: read parquet files with different schemas into a unified
136/// table schema. See [`DefaultPhysicalExprAdapterFactory`] for more details.
137///
138/// * metadata_size_hint: controls the number of bytes read from the end of the
139/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
140/// custom reader is used, it supplies the metadata directly and this parameter
141/// is ignored. [`ParquetSource::with_metadata_size_hint`] for more details.
142///
143/// * User provided `ParquetAccessPlan`s to skip row groups and/or pages
144/// based on external information. See "Implementing External Indexes" below
145///
146/// # Predicate Pushdown
147///
148/// `DataSourceExec` uses the provided [`PhysicalExpr`] predicate as a filter to
149/// skip reading unnecessary data and improve query performance using several techniques:
150///
151/// * Row group pruning: skips entire row groups based on min/max statistics
152/// found in [`ParquetMetaData`] and any Bloom filters that are present.
153///
154/// * Page pruning: skips individual pages within a ColumnChunk using the
155/// [Parquet PageIndex], if present.
156///
157/// * Row filtering: skips rows within a page using a form of late
158/// materialization. When possible, predicates are applied by the parquet
159/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more
160/// details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true.
161///
162/// Note: If the predicate can not be used to accelerate the scan, it is ignored
163/// (no error is raised on predicate evaluation errors).
164///
165/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate
166/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
167/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
168///
169/// # Example: rewriting `DataSourceExec`
170///
171/// You can modify a `DataSourceExec` using [`ParquetSource`], for example
172/// to change files or add a predicate.
173///
174/// ```no_run
175/// # use std::sync::Arc;
176/// # use arrow::datatypes::Schema;
177/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
178/// # use datafusion_datasource::PartitionedFile;
179/// # use datafusion_datasource::source::DataSourceExec;
180///
181/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
182/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
183/// let exec = parquet_exec();
184/// let data_source = exec.data_source();
185/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
186/// let existing_file_groups = &base_config.file_groups;
187/// let new_execs = existing_file_groups
188/// .iter()
189/// .map(|file_group| {
190/// // create a new exec by copying the existing exec's source config
191/// let new_config = FileScanConfigBuilder::from(base_config.clone())
192/// .with_file_groups(vec![file_group.clone()])
193/// .build();
194///
195/// (DataSourceExec::from_data_source(new_config))
196/// })
197/// .collect::<Vec<_>>();
198/// ```
199///
200/// # Implementing External Indexes
201///
202/// It is possible to restrict the row groups and selections within those row
203/// groups that the DataSourceExec will consider by providing an initial
204/// `ParquetAccessPlan` as `extensions` on `PartitionedFile`. This can be
205/// used to implement external indexes on top of parquet files and select only
206/// portions of the files.
207///
208/// The `DataSourceExec` will try and reduce any provided `ParquetAccessPlan`
209/// further based on the contents of `ParquetMetadata` and other settings.
210///
211/// ## Example of providing a ParquetAccessPlan
212///
213/// ```
214/// # use std::sync::Arc;
215/// # use arrow::datatypes::{Schema, SchemaRef};
216/// # use datafusion_datasource::PartitionedFile;
217/// # use datafusion_datasource_parquet::ParquetAccessPlan;
218/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
219/// # use datafusion_datasource_parquet::source::ParquetSource;
220/// # use datafusion_execution::object_store::ObjectStoreUrl;
221/// # use datafusion_datasource::source::DataSourceExec;
222///
223/// # fn schema() -> SchemaRef {
224/// # Arc::new(Schema::empty())
225/// # }
226/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
227/// let mut access_plan = ParquetAccessPlan::new_all(5);
228/// access_plan.skip(2);
229/// access_plan.skip(4);
230/// // provide the plan as extension to the FileScanConfig
231/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
232/// .with_extensions(Arc::new(access_plan));
233/// // create a FileScanConfig to scan this file
234/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema())))
235/// .with_file(partitioned_file).build();
236/// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
237/// // pruning based on predicates may also happen
238/// let exec = DataSourceExec::from_data_source(config);
239/// ```
240///
241/// For a complete example, see the [`advanced_parquet_index` example]).
242///
243/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/data_io/parquet_advanced_index.rs
244///
245/// # Execution Overview
246///
247/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
248/// configured to open parquet files with a `ParquetOpener`.
249///
250/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
251/// the file.
252///
253/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
254/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
255/// applying predicates to metadata. The plan and projections are used to
256/// determine what pages must be read.
257///
258/// * Step 4: The stream begins reading data, fetching the required parquet
259/// pages incrementally decoding them, and applying any row filters (see
260/// [`Self::with_pushdown_filters`]).
261///
262/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
263/// [`DefaultPhysicalExprAdapterFactory`] to match the table schema. By default missing columns are
264/// filled with nulls, but this can be customized via [`PhysicalExprAdapterFactory`].
265///
266/// [`RecordBatch`]: arrow::record_batch::RecordBatch
267/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
268/// [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory
269#[derive(Clone, Debug)]
270pub struct ParquetSource {
271 /// Options for reading Parquet files
272 pub(crate) table_parquet_options: TableParquetOptions,
273 /// Optional metrics
274 pub(crate) metrics: ExecutionPlanMetricsSet,
275 /// The schema of the file.
276 /// In particular, this is the schema of the table without partition columns,
277 /// *not* the physical schema of the file.
278 pub(crate) table_schema: TableSchema,
279 /// Optional predicate for row filtering during parquet scan
280 pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
281 /// Optional user defined parquet file reader factory
282 pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
283 /// Batch size configuration
284 pub(crate) batch_size: Option<usize>,
285 /// Optional hint for the size of the parquet metadata
286 pub(crate) metadata_size_hint: Option<usize>,
287 /// Projection to apply to the output.
288 pub(crate) projection: ProjectionExprs,
289 #[cfg(feature = "parquet_encryption")]
290 pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
291 /// If true, read files in reverse order and reverse row groups within files.
292 /// But it's not guaranteed that rows within row groups are in reverse order,
293 /// so we still need to sort them after reading, so the reverse scan is inexact.
294 /// Used to optimize ORDER BY ... DESC on sorted data.
295 reverse_row_groups: bool,
296}
297
298impl ParquetSource {
299 /// Create a new ParquetSource to read the data specified in the file scan
300 /// configuration with the provided schema.
301 ///
302 /// Uses default `TableParquetOptions`.
303 /// To set custom options, use [ParquetSource::with_table_parquet_options`].
304 pub fn new(table_schema: impl Into<TableSchema>) -> Self {
305 let table_schema = table_schema.into();
306 // Projection over the full table schema (file columns + partition columns)
307 let full_schema = table_schema.table_schema();
308 let indices: Vec<usize> = (0..full_schema.fields().len()).collect();
309 Self {
310 projection: ProjectionExprs::from_indices(&indices, full_schema),
311 table_schema,
312 table_parquet_options: TableParquetOptions::default(),
313 metrics: ExecutionPlanMetricsSet::new(),
314 predicate: None,
315 parquet_file_reader_factory: None,
316 batch_size: None,
317 metadata_size_hint: None,
318 #[cfg(feature = "parquet_encryption")]
319 encryption_factory: None,
320 reverse_row_groups: false,
321 }
322 }
323
324 /// Set the `TableParquetOptions` for this ParquetSource.
325 pub fn with_table_parquet_options(
326 mut self,
327 table_parquet_options: TableParquetOptions,
328 ) -> Self {
329 self.table_parquet_options = table_parquet_options;
330 self
331 }
332
333 /// Set the metadata size hint
334 ///
335 /// This value determines how many bytes at the end of the file the default
336 /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is
337 /// too small, the ParquetSource will need to make additional IO requests to
338 /// read the footer.
339 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
340 self.metadata_size_hint = Some(metadata_size_hint);
341 self
342 }
343
344 /// Set predicate information
345 #[expect(clippy::needless_pass_by_value)]
346 pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
347 let mut conf = self.clone();
348 conf.predicate = Some(Arc::clone(&predicate));
349 conf
350 }
351
352 /// Set the encryption factory to use to generate file decryption properties
353 #[cfg(feature = "parquet_encryption")]
354 pub fn with_encryption_factory(
355 mut self,
356 encryption_factory: Arc<dyn EncryptionFactory>,
357 ) -> Self {
358 self.encryption_factory = Some(encryption_factory);
359 self
360 }
361
362 /// Options passed to the parquet reader for this scan
363 pub fn table_parquet_options(&self) -> &TableParquetOptions {
364 &self.table_parquet_options
365 }
366
367 /// Optional predicate.
368 #[deprecated(since = "50.2.0", note = "use `filter` instead")]
369 pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
370 self.predicate.as_ref()
371 }
372
373 /// return the optional file reader factory
374 pub fn parquet_file_reader_factory(
375 &self,
376 ) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
377 self.parquet_file_reader_factory.as_ref()
378 }
379
380 /// Optional user defined parquet file reader factory.
381 pub fn with_parquet_file_reader_factory(
382 mut self,
383 parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
384 ) -> Self {
385 self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
386 self
387 }
388
389 /// If true, the predicate will be used during the parquet scan.
390 /// Defaults to false.
391 pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
392 self.table_parquet_options.global.pushdown_filters = pushdown_filters;
393 self
394 }
395
396 /// Return the value described in [`Self::with_pushdown_filters`]
397 pub(crate) fn pushdown_filters(&self) -> bool {
398 self.table_parquet_options.global.pushdown_filters
399 }
400
401 /// If true, the `RowFilter` made by `pushdown_filters` may try to
402 /// minimize the cost of filter evaluation by reordering the
403 /// predicate [`Expr`]s. If false, the predicates are applied in
404 /// the same order as specified in the query. Defaults to false.
405 ///
406 /// [`Expr`]: datafusion_expr::Expr
407 pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
408 self.table_parquet_options.global.reorder_filters = reorder_filters;
409 self
410 }
411
412 /// Return the value described in [`Self::with_reorder_filters`]
413 fn reorder_filters(&self) -> bool {
414 self.table_parquet_options.global.reorder_filters
415 }
416
417 /// Return the value of [`datafusion_common::config::ParquetOptions::force_filter_selections`]
418 fn force_filter_selections(&self) -> bool {
419 self.table_parquet_options.global.force_filter_selections
420 }
421
422 /// If enabled, the reader will read the page index
423 /// This is used to optimize filter pushdown
424 /// via `RowSelector` and `RowFilter` by
425 /// eliminating unnecessary IO and decoding
426 pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
427 self.table_parquet_options.global.enable_page_index = enable_page_index;
428 self
429 }
430
431 /// Return the value described in [`Self::with_enable_page_index`]
432 fn enable_page_index(&self) -> bool {
433 self.table_parquet_options.global.enable_page_index
434 }
435
436 /// If enabled, the reader will read by the bloom filter
437 pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
438 self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
439 self
440 }
441
442 /// If enabled, the writer will write by the bloom filter
443 pub fn with_bloom_filter_on_write(
444 mut self,
445 enable_bloom_filter_on_write: bool,
446 ) -> Self {
447 self.table_parquet_options.global.bloom_filter_on_write =
448 enable_bloom_filter_on_write;
449 self
450 }
451
452 /// Return the value described in [`Self::with_bloom_filter_on_read`]
453 fn bloom_filter_on_read(&self) -> bool {
454 self.table_parquet_options.global.bloom_filter_on_read
455 }
456
457 /// Return the maximum predicate cache size, in bytes, used when
458 /// `pushdown_filters`
459 pub fn max_predicate_cache_size(&self) -> Option<usize> {
460 self.table_parquet_options.global.max_predicate_cache_size
461 }
462
463 #[cfg(feature = "parquet_encryption")]
464 fn get_encryption_factory_with_config(
465 &self,
466 ) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> {
467 match &self.encryption_factory {
468 None => None,
469 Some(factory) => Some((
470 Arc::clone(factory),
471 self.table_parquet_options.crypto.factory_options.clone(),
472 )),
473 }
474 }
475
476 pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self {
477 self.reverse_row_groups = reverse_row_groups;
478 self
479 }
480 #[cfg(test)]
481 pub(crate) fn reverse_row_groups(&self) -> bool {
482 self.reverse_row_groups
483 }
484}
485
486/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
487pub(crate) fn parse_coerce_int96_string(
488 str_setting: &str,
489) -> datafusion_common::Result<TimeUnit> {
490 let str_setting_lower: &str = &str_setting.to_lowercase();
491
492 match str_setting_lower {
493 "ns" => Ok(TimeUnit::Nanosecond),
494 "us" => Ok(TimeUnit::Microsecond),
495 "ms" => Ok(TimeUnit::Millisecond),
496 "s" => Ok(TimeUnit::Second),
497 _ => Err(DataFusionError::Configuration(format!(
498 "Unknown or unsupported parquet coerce_int96: \
499 {str_setting}. Valid values are: ns, us, ms, and s."
500 ))),
501 }
502}
503
504/// Allows easy conversion from ParquetSource to Arc<dyn FileSource>
505impl From<ParquetSource> for Arc<dyn FileSource> {
506 fn from(source: ParquetSource) -> Self {
507 as_file_source(source)
508 }
509}
510
511impl FileSource for ParquetSource {
512 fn create_file_opener(
513 &self,
514 object_store: Arc<dyn ObjectStore>,
515 base_config: &FileScanConfig,
516 partition: usize,
517 ) -> datafusion_common::Result<Arc<dyn FileOpener>> {
518 let expr_adapter_factory = base_config
519 .expr_adapter_factory
520 .clone()
521 .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);
522
523 let parquet_file_reader_factory =
524 self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
525 Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
526 });
527
528 #[cfg(feature = "parquet_encryption")]
529 let file_decryption_properties = self
530 .table_parquet_options()
531 .crypto
532 .file_decryption
533 .clone()
534 .map(FileDecryptionProperties::from)
535 .map(Arc::new);
536
537 let coerce_int96 = self
538 .table_parquet_options
539 .global
540 .coerce_int96
541 .as_ref()
542 .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
543
544 let opener = Arc::new(ParquetOpener {
545 partition_index: partition,
546 projection: self.projection.clone(),
547 batch_size: self
548 .batch_size
549 .expect("Batch size must set before creating ParquetOpener"),
550 limit: base_config.limit,
551 predicate: self.predicate.clone(),
552 table_schema: self.table_schema.clone(),
553 metadata_size_hint: self.metadata_size_hint,
554 metrics: self.metrics().clone(),
555 parquet_file_reader_factory,
556 pushdown_filters: self.pushdown_filters(),
557 reorder_filters: self.reorder_filters(),
558 force_filter_selections: self.force_filter_selections(),
559 enable_page_index: self.enable_page_index(),
560 enable_bloom_filter: self.bloom_filter_on_read(),
561 enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
562 coerce_int96,
563 #[cfg(feature = "parquet_encryption")]
564 file_decryption_properties,
565 expr_adapter_factory,
566 #[cfg(feature = "parquet_encryption")]
567 encryption_factory: self.get_encryption_factory_with_config(),
568 max_predicate_cache_size: self.max_predicate_cache_size(),
569 reverse_row_groups: self.reverse_row_groups,
570 });
571 Ok(opener)
572 }
573
574 fn as_any(&self) -> &dyn Any {
575 self
576 }
577
578 fn table_schema(&self) -> &TableSchema {
579 &self.table_schema
580 }
581
582 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
583 self.predicate.clone()
584 }
585
586 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
587 let mut conf = self.clone();
588 conf.batch_size = Some(batch_size);
589 Arc::new(conf)
590 }
591
592 fn try_pushdown_projection(
593 &self,
594 projection: &ProjectionExprs,
595 ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
596 let mut source = self.clone();
597 source.projection = self.projection.try_merge(projection)?;
598 Ok(Some(Arc::new(source)))
599 }
600
601 fn projection(&self) -> Option<&ProjectionExprs> {
602 Some(&self.projection)
603 }
604
605 fn metrics(&self) -> &ExecutionPlanMetricsSet {
606 &self.metrics
607 }
608
609 fn file_type(&self) -> &str {
610 "parquet"
611 }
612
613 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
614 match t {
615 DisplayFormatType::Default | DisplayFormatType::Verbose => {
616 let predicate_string = self
617 .filter()
618 .map(|p| format!(", predicate={p}"))
619 .unwrap_or_default();
620
621 write!(f, "{predicate_string}")?;
622
623 // Add reverse_scan info if enabled
624 if self.reverse_row_groups {
625 write!(f, ", reverse_row_groups=true")?;
626 }
627
628 // Try to build a the pruning predicates.
629 // These are only generated here because it's useful to have *some*
630 // idea of what pushdown is happening when viewing plans.
631 // However it is important to note that these predicates are *not*
632 // necessarily the predicates that are actually evaluated:
633 // the actual predicates are built in reference to the physical schema of
634 // each file, which we do not have at this point and hence cannot use.
635 // Instead we use the logical schema of the file (the table schema without partition columns).
636 if let Some(predicate) = &self.predicate {
637 let predicate_creation_errors = Count::new();
638 if let (Some(pruning_predicate), _) = build_pruning_predicates(
639 Some(predicate),
640 self.table_schema.table_schema(),
641 &predicate_creation_errors,
642 ) {
643 let mut guarantees = pruning_predicate
644 .literal_guarantees()
645 .iter()
646 .map(|item| format!("{item}"))
647 .collect_vec();
648 guarantees.sort();
649 write!(
650 f,
651 ", pruning_predicate={}, required_guarantees=[{}]",
652 pruning_predicate.predicate_expr(),
653 guarantees.join(", ")
654 )?;
655 }
656 };
657 Ok(())
658 }
659 DisplayFormatType::TreeRender => {
660 if let Some(predicate) = self.filter() {
661 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
662 }
663 Ok(())
664 }
665 }
666 }
667
668 fn try_pushdown_filters(
669 &self,
670 filters: Vec<Arc<dyn PhysicalExpr>>,
671 config: &ConfigOptions,
672 ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
673 let table_schema = self.table_schema.table_schema();
674 // Determine if based on configs we should push filters down.
675 // If either the table / scan itself or the config has pushdown enabled,
676 // we will push down the filters.
677 // If both are disabled, we will not push down the filters.
678 // By default they are both disabled.
679 // Regardless of pushdown, we will update the predicate to include the filters
680 // because even if scan pushdown is disabled we can still use the filters for stats pruning.
681 let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
682 let table_pushdown_enabled = self.pushdown_filters();
683 let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
684
685 let mut source = self.clone();
686 let filters: Vec<PushedDownPredicate> = filters
687 .into_iter()
688 .map(|filter| {
689 if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
690 PushedDownPredicate::supported(filter)
691 } else {
692 PushedDownPredicate::unsupported(filter)
693 }
694 })
695 .collect();
696 if filters
697 .iter()
698 .all(|f| matches!(f.discriminant, PushedDown::No))
699 {
700 // No filters can be pushed down, so we can just return the remaining filters
701 // and avoid replacing the source in the physical plan.
702 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
703 vec![PushedDown::No; filters.len()],
704 ));
705 }
706 let allowed_filters = filters
707 .iter()
708 .filter_map(|f| match f.discriminant {
709 PushedDown::Yes => Some(Arc::clone(&f.predicate)),
710 PushedDown::No => None,
711 })
712 .collect_vec();
713 let predicate = match source.predicate {
714 Some(predicate) => {
715 conjunction(std::iter::once(predicate).chain(allowed_filters))
716 }
717 None => conjunction(allowed_filters),
718 };
719 source.predicate = Some(predicate);
720 source = source.with_pushdown_filters(pushdown_filters);
721 let source = Arc::new(source);
722 // If pushdown_filters is false we tell our parents that they still have to handle the filters,
723 // even if we updated the predicate to include the filters (they will only be used for stats pruning).
724 if !pushdown_filters {
725 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
726 vec![PushedDown::No; filters.len()],
727 )
728 .with_updated_node(source));
729 }
730 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
731 filters.iter().map(|f| f.discriminant).collect(),
732 )
733 .with_updated_node(source))
734 }
735
736 /// Try to optimize the scan to produce data in the requested sort order.
737 ///
738 /// This method receives:
739 /// 1. The query's required ordering (`order` parameter)
740 /// 2. The file's natural ordering (via `self.file_ordering`, set by FileScanConfig)
741 ///
742 /// With both pieces of information, ParquetSource can decide what optimizations to apply.
743 ///
744 /// # Phase 1 Behavior (Current)
745 /// Returns `Inexact` when reversing the row group scan order would help satisfy the
746 /// requested ordering. We still need a Sort operator at a higher level because:
747 /// - We only reverse row group read order, not rows within row groups
748 /// - This provides approximate ordering that benefits limit pushdown
749 ///
750 /// # Phase 2 (Future)
751 /// Could return `Exact` when we can guarantee perfect ordering through techniques like:
752 /// - File reordering based on statistics
753 /// - Detecting already-sorted data
754 /// This would allow removing the Sort operator entirely.
755 ///
756 /// # Returns
757 /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
758 /// - `Unsupported`: Cannot optimize for this ordering
759 fn try_reverse_output(
760 &self,
761 order: &[PhysicalSortExpr],
762 eq_properties: &EquivalenceProperties,
763 ) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
764 if order.is_empty() {
765 return Ok(SortOrderPushdownResult::Unsupported);
766 }
767
768 // Build new equivalence properties with the reversed ordering.
769 // This allows us to check if the reversed ordering satisfies the request
770 // by leveraging:
771 // - Function monotonicity (e.g., extract_year_month preserves ordering)
772 // - Constant columns (from filters)
773 // - Other equivalence relationships
774 //
775 // Example flow:
776 // 1. File ordering: [extract_year_month(ws) DESC, ws DESC]
777 // 2. After reversal: [extract_year_month(ws) ASC, ws ASC]
778 // 3. Requested: [ws ASC]
779 // 4. Through extract_year_month's monotonicity property, the reversed
780 // ordering satisfies [ws ASC] even though it has additional prefix
781 let reversed_eq_properties = {
782 let mut new = eq_properties.clone();
783 new.clear_orderings();
784
785 // Reverse each ordering in the equivalence properties
786 let reversed_orderings = eq_properties
787 .oeq_class()
788 .iter()
789 .map(|ordering| {
790 ordering
791 .iter()
792 .map(|expr| expr.reverse())
793 .collect::<Vec<_>>()
794 })
795 .collect::<Vec<_>>();
796
797 new.add_orderings(reversed_orderings);
798 new
799 };
800
801 // Check if the reversed ordering satisfies the requested ordering
802 if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
803 return Ok(SortOrderPushdownResult::Unsupported);
804 }
805
806 // Return Inexact because we're only reversing row group order,
807 // not guaranteeing perfect row-level ordering
808 let new_source = self.clone().with_reverse_row_groups(true);
809 Ok(SortOrderPushdownResult::Inexact {
810 inner: Arc::new(new_source) as Arc<dyn FileSource>,
811 })
812
813 // TODO Phase 2: Add support for other optimizations:
814 // - File reordering based on min/max statistics
815 // - Detection of exact ordering (return Exact to remove Sort operator)
816 // - Partial sort pushdown for prefix matches
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use arrow::datatypes::Schema;
824 use datafusion_physical_expr::expressions::lit;
825
826 #[test]
827 #[expect(deprecated)]
828 fn test_parquet_source_predicate_same_as_filter() {
829 let predicate = lit(true);
830
831 let parquet_source =
832 ParquetSource::new(Arc::new(Schema::empty())).with_predicate(predicate);
833 // same value. but filter() call Arc::clone internally
834 assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref());
835 }
836
837 #[test]
838 fn test_reverse_scan_default_value() {
839 use arrow::datatypes::Schema;
840
841 let schema = Arc::new(Schema::empty());
842 let source = ParquetSource::new(schema);
843
844 assert!(!source.reverse_row_groups());
845 }
846
847 #[test]
848 fn test_reverse_scan_with_setter() {
849 use arrow::datatypes::Schema;
850
851 let schema = Arc::new(Schema::empty());
852
853 let source = ParquetSource::new(schema.clone()).with_reverse_row_groups(true);
854 assert!(source.reverse_row_groups());
855
856 let source = source.with_reverse_row_groups(false);
857 assert!(!source.reverse_row_groups());
858 }
859
860 #[test]
861 fn test_reverse_scan_clone_preserves_value() {
862 use arrow::datatypes::Schema;
863
864 let schema = Arc::new(Schema::empty());
865
866 let source = ParquetSource::new(schema).with_reverse_row_groups(true);
867 let cloned = source.clone();
868
869 assert!(cloned.reverse_row_groups());
870 assert_eq!(source.reverse_row_groups(), cloned.reverse_row_groups());
871 }
872
873 #[test]
874 fn test_reverse_scan_with_other_options() {
875 use arrow::datatypes::Schema;
876 use datafusion_common::config::TableParquetOptions;
877
878 let schema = Arc::new(Schema::empty());
879 let options = TableParquetOptions::default();
880
881 let source = ParquetSource::new(schema)
882 .with_table_parquet_options(options)
883 .with_metadata_size_hint(8192)
884 .with_reverse_row_groups(true);
885
886 assert!(source.reverse_row_groups());
887 assert_eq!(source.metadata_size_hint, Some(8192));
888 }
889
890 #[test]
891 fn test_reverse_scan_builder_pattern() {
892 use arrow::datatypes::Schema;
893
894 let schema = Arc::new(Schema::empty());
895
896 let source = ParquetSource::new(schema)
897 .with_reverse_row_groups(true)
898 .with_reverse_row_groups(false)
899 .with_reverse_row_groups(true);
900
901 assert!(source.reverse_row_groups());
902 }
903
904 #[test]
905 fn test_reverse_scan_independent_of_predicate() {
906 use arrow::datatypes::Schema;
907 use datafusion_physical_expr::expressions::lit;
908
909 let schema = Arc::new(Schema::empty());
910 let predicate = lit(true);
911
912 let source = ParquetSource::new(schema)
913 .with_predicate(predicate)
914 .with_reverse_row_groups(true);
915
916 assert!(source.reverse_row_groups());
917 assert!(source.filter().is_some());
918 }
919}