ParquetSource

Struct ParquetSource 

Source
pub struct ParquetSource { /* private fields */ }
Expand description

Execution plan for reading one or more Parquet files.

            ▲
            │
            │  Produce a stream of
            │  RecordBatches
            │
┌───────────────────────┐
│                       │
│     DataSourceExec    │
│                       │
└───────────────────────┘
            ▲
            │  Asynchronously read from one
            │  or more parquet files via
            │  ObjectStore interface
            │
            │
  .───────────────────.
 │                     )
 │`───────────────────'│
 │    ObjectStore      │
 │.───────────────────.│
 │                     )
  `───────────────────'

§Example: Create a DataSourceExec


let source = Arc::new(
    ParquetSource::new(Arc::clone(&file_schema))
        .with_predicate(predicate)
);
// Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
let config = FileScanConfigBuilder::new(object_store_url, source)
   .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
let exec = DataSourceExec::from_data_source(config);

§Features

Supports the following optimizations:

  • Concurrent reads: reads from one or more files in parallel as multiple partitions, including concurrently reading multiple row groups from a single file.

  • Predicate push down: skips row groups, pages, rows based on metadata and late materialization. See “Predicate Pushdown” below.

  • Projection pushdown: reads and decodes only the columns required.

  • Limit pushdown: stop execution early after some number of rows are read.

  • Custom readers: customize reading parquet files, e.g. to cache metadata, coalesce I/O operations, etc. See ParquetFileReaderFactory for more details.

  • Schema evolution: read parquet files with different schemas into a unified table schema. See DefaultPhysicalExprAdapterFactory for more details.

  • metadata_size_hint: controls the number of bytes read from the end of the file in the initial I/O when the default ParquetFileReaderFactory. If a custom reader is used, it supplies the metadata directly and this parameter is ignored. ParquetSource::with_metadata_size_hint for more details.

  • User provided ParquetAccessPlans to skip row groups and/or pages based on external information. See “Implementing External Indexes” below

§Predicate Pushdown

DataSourceExec uses the provided PhysicalExpr predicate as a filter to skip reading unnecessary data and improve query performance using several techniques:

  • Row group pruning: skips entire row groups based on min/max statistics found in ParquetMetaData and any Bloom filters that are present.

  • Page pruning: skips individual pages within a ColumnChunk using the Parquet PageIndex, if present.

  • Row filtering: skips rows within a page using a form of late materialization. When possible, predicates are applied by the parquet decoder during decode (see ArrowPredicate and RowFilter for more details). This is only enabled if ParquetScanOptions::pushdown_filters is set to true.

Note: If the predicate can not be used to accelerate the scan, it is ignored (no error is raised on predicate evaluation errors).

§Example: rewriting DataSourceExec

You can modify a DataSourceExec using ParquetSource, for example to change files or add a predicate.


// Split a single DataSourceExec into multiple DataSourceExecs, one for each file
let exec = parquet_exec();
let data_source = exec.data_source();
let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let existing_file_groups = &base_config.file_groups;
let new_execs = existing_file_groups
  .iter()
  .map(|file_group| {
    // create a new exec by copying the existing exec's source config
    let new_config = FileScanConfigBuilder::from(base_config.clone())
       .with_file_groups(vec![file_group.clone()])
      .build();

    (DataSourceExec::from_data_source(new_config))
  })
  .collect::<Vec<_>>();

§Implementing External Indexes

It is possible to restrict the row groups and selections within those row groups that the DataSourceExec will consider by providing an initial ParquetAccessPlan as extensions on PartitionedFile. This can be used to implement external indexes on top of parquet files and select only portions of the files.

The DataSourceExec will try and reduce any provided ParquetAccessPlan further based on the contents of ParquetMetadata and other settings.

§Example of providing a ParquetAccessPlan


// create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
let mut access_plan = ParquetAccessPlan::new_all(5);
access_plan.skip(2);
access_plan.skip(4);
// provide the plan as extension to the FileScanConfig
let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
  .with_extensions(Arc::new(access_plan));
// create a FileScanConfig to scan this file
let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema())))
    .with_file(partitioned_file).build();
// this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
// pruning based on predicates may also happen
let exec = DataSourceExec::from_data_source(config);

For a complete example, see the [advanced_parquet_index example]).

§Execution Overview

  • Step 1: DataSourceExec::execute is called, returning a FileStream configured to open parquet files with a ParquetOpener.

  • Step 2: When the stream is polled, the ParquetOpener is called to open the file.

  • Step 3: The ParquetOpener gets the ParquetMetaData (file metadata) via ParquetFileReaderFactory, creating a ParquetAccessPlan by applying predicates to metadata. The plan and projections are used to determine what pages must be read.

  • Step 4: The stream begins reading data, fetching the required parquet pages incrementally decoding them, and applying any row filters (see Self::with_pushdown_filters).

  • Step 5: As each RecordBatch is read, it may be adapted by a DefaultPhysicalExprAdapterFactory to match the table schema. By default missing columns are filled with nulls, but this can be customized via PhysicalExprAdapterFactory.

Implementations§

Source§

impl ParquetSource

Source

pub fn new(table_schema: impl Into<TableSchema>) -> Self

Create a new ParquetSource to read the data specified in the file scan configuration with the provided schema.

Uses default TableParquetOptions. To set custom options, use ParquetSource::with_table_parquet_options`.

Source

pub fn with_table_parquet_options( self, table_parquet_options: TableParquetOptions, ) -> Self

Set the TableParquetOptions for this ParquetSource.

Source

pub fn with_metadata_size_hint(self, metadata_size_hint: usize) -> Self

Set the metadata size hint

This value determines how many bytes at the end of the file the default ParquetFileReaderFactory will request in the initial IO. If this is too small, the ParquetSource will need to make additional IO requests to read the footer.

Source

pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self

Set predicate information

Source

pub fn with_encryption_factory( self, encryption_factory: Arc<dyn EncryptionFactory>, ) -> Self

Set the encryption factory to use to generate file decryption properties

Source

pub fn table_parquet_options(&self) -> &TableParquetOptions

Options passed to the parquet reader for this scan

Source

pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>

👎Deprecated since 50.2.0: use filter instead

Optional predicate.

Source

pub fn parquet_file_reader_factory( &self, ) -> Option<&Arc<dyn ParquetFileReaderFactory>>

return the optional file reader factory

Source

pub fn with_parquet_file_reader_factory( self, parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, ) -> Self

Optional user defined parquet file reader factory.

Source

pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self

If true, the predicate will be used during the parquet scan. Defaults to false.

Source

pub fn with_reorder_filters(self, reorder_filters: bool) -> Self

If true, the RowFilter made by pushdown_filters may try to minimize the cost of filter evaluation by reordering the predicate Exprs. If false, the predicates are applied in the same order as specified in the query. Defaults to false.

Source

pub fn with_enable_page_index(self, enable_page_index: bool) -> Self

If enabled, the reader will read the page index This is used to optimize filter pushdown via RowSelector and RowFilter by eliminating unnecessary IO and decoding

Source

pub fn with_bloom_filter_on_read(self, bloom_filter_on_read: bool) -> Self

If enabled, the reader will read by the bloom filter

Source

pub fn with_bloom_filter_on_write( self, enable_bloom_filter_on_write: bool, ) -> Self

If enabled, the writer will write by the bloom filter

Source

pub fn max_predicate_cache_size(&self) -> Option<usize>

Return the maximum predicate cache size, in bytes, used when pushdown_filters

Trait Implementations§

Source§

impl Clone for ParquetSource

Source§

fn clone(&self) -> ParquetSource

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for ParquetSource

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl FileSource for ParquetSource

Source§

fn try_reverse_output( &self, order: &[PhysicalSortExpr], eq_properties: &EquivalenceProperties, ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>>

Try to optimize the scan to produce data in the requested sort order.

This method receives:

  1. The query’s required ordering (order parameter)
  2. The file’s natural ordering (via self.file_ordering, set by FileScanConfig)

With both pieces of information, ParquetSource can decide what optimizations to apply.

§Phase 1 Behavior (Current)

Returns Inexact when reversing the row group scan order would help satisfy the requested ordering. We still need a Sort operator at a higher level because:

  • We only reverse row group read order, not rows within row groups
  • This provides approximate ordering that benefits limit pushdown
§Phase 2 (Future)

Could return Exact when we can guarantee perfect ordering through techniques like:

  • File reordering based on statistics
  • Detecting already-sorted data This would allow removing the Sort operator entirely.
§Returns
  • Inexact: Created an optimized source (e.g., reversed scan) that approximates the order
  • Unsupported: Cannot optimize for this ordering
Source§

fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, base_config: &FileScanConfig, partition: usize, ) -> Result<Arc<dyn FileOpener>>

Creates a dyn FileOpener based on given parameters
Source§

fn as_any(&self) -> &dyn Any

Any
Source§

fn table_schema(&self) -> &TableSchema

Returns the table schema for this file source. Read more
Source§

fn filter(&self) -> Option<Arc<dyn PhysicalExpr>>

Returns the filter expression that will be applied during the file scan.
Source§

fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>

Initialize new type with batch size configuration
Source§

fn try_pushdown_projection( &self, projection: &ProjectionExprs, ) -> Result<Option<Arc<dyn FileSource>>>

Try to push down a projection into a this FileSource. Read more
Source§

fn projection(&self) -> Option<&ProjectionExprs>

Return the projection that will be applied to the output stream on top of the table schema.
Source§

fn metrics(&self) -> &ExecutionPlanMetricsSet

Return execution plan metrics
Source§

fn file_type(&self) -> &str

String representation of file source such as “csv”, “json”, “parquet”
Source§

fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> Result

Format FileType specific information
Source§

fn try_pushdown_filters( &self, filters: Vec<Arc<dyn PhysicalExpr>>, config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>>

Try to push down filters into this FileSource. See ExecutionPlan::handle_child_pushdown_result for more details.
Source§

fn supports_repartitioning(&self) -> bool

Returns whether this file source supports repartitioning files by byte ranges. Read more
Source§

fn repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, output_ordering: Option<LexOrdering>, config: &FileScanConfig, ) -> Result<Option<FileScanConfig>, DataFusionError>

If supported by the FileSource, redistribute files across partitions according to their size. Allows custom file formats to implement their own repartitioning logic. Read more
Source§

fn with_schema_adapter_factory( &self, _factory: Arc<dyn SchemaAdapterFactory>, ) -> Result<Arc<dyn FileSource>, DataFusionError>

👎Deprecated since 52.0.0: SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details.
Deprecated: Set optional schema adapter factory. Read more
Source§

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>

👎Deprecated since 52.0.0: SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details.
Deprecated: Returns the current schema adapter factory if set. Read more
Source§

impl From<ParquetSource> for Arc<dyn FileSource>

Allows easy conversion from ParquetSource to Arc<dyn FileSource>

Source§

fn from(source: ParquetSource) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> ErasedDestructor for T
where T: 'static,