pub struct FileScanConfig {
pub object_store_url: ObjectStoreUrl,
pub file_groups: Vec<FileGroup>,
pub constraints: Constraints,
pub limit: Option<usize>,
pub preserve_order: bool,
pub output_ordering: Vec<LexOrdering>,
pub file_compression_type: FileCompressionType,
pub file_source: Arc<dyn FileSource>,
pub batch_size: Option<usize>,
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
pub partitioned_by_file_group: bool,
/* private fields */
}Expand description
FileScanConfig represents scanning data from a group of files
FileScanConfig is used to create a DataSourceExec, the physical plan
for scanning files with a particular file format.
The FileSource (e.g. ParquetSource, CsvSource, etc.) is responsible
for creating the actual execution plan to read the files based on a
FileScanConfig. Fields in a FileScanConfig such as Statistics represent
information about the files before any projection or filtering is
applied in the file source.
Use FileScanConfigBuilder to construct a FileScanConfig.
Use DataSourceExec::from_data_source to create a DataSourceExec from
a FileScanConfig.
§Example
#[derive(Clone)]
// create FileScan config for reading parquet files from file://
let object_store_url = ObjectStoreUrl::local_filesystem();
let file_source = Arc::new(ParquetSource::new(file_schema.clone()));
let config = FileScanConfigBuilder::new(object_store_url, file_source)
.with_limit(Some(1000)) // read only the first 1000 records
.with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
.expect("Failed to push down projection")
// Read /tmp/file1.parquet with known size of 1234 bytes in a single group
.with_file(PartitionedFile::new("file1.parquet", 1234))
// Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
// in a single row group
.with_file_group(FileGroup::new(vec![
PartitionedFile::new("file2.parquet", 56),
PartitionedFile::new("file3.parquet", 78),
])).build();
// create an execution plan from the config
let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);Fields§
§object_store_url: ObjectStoreUrlObject store URL, used to get an ObjectStore instance from
RuntimeEnv::object_store
This ObjectStoreUrl should be the prefix of the absolute url for files
as file:// or s3://my_bucket. It should not include the path to the
file itself. The relevant URL prefix must be registered via
RuntimeEnv::register_object_store
file_groups: Vec<FileGroup>List of files to be processed, grouped into partitions
Each file must have a schema of file_schema or a subset. If
a particular file has a subset, the missing columns are
padded with NULLs.
DataFusion may attempt to read each partition of files concurrently, however files within a partition will be read sequentially, one after the next.
constraints: ConstraintsTable constraints
limit: Option<usize>The maximum number of records to read from this plan. If None,
all records after filtering are returned.
preserve_order: boolWhether the scan’s limit is order sensitive
When true, files must be read in the exact order specified to produce
correct results (e.g., for ORDER BY ... LIMIT queries). When false,
DataFusion may reorder file processing for optimization without affecting correctness.
output_ordering: Vec<LexOrdering>All equivalent lexicographical output orderings of this file scan, in terms of
FileSource::table_schema. See FileScanConfigBuilder::with_output_ordering for more
details.
Self::eq_properties uses this information along with projection
and filtering information to compute the effective
EquivalenceProperties
file_compression_type: FileCompressionTypeFile compression type
file_source: Arc<dyn FileSource>File source such as ParquetSource, CsvSource, JsonSource, etc.
batch_size: Option<usize>Batch size while creating new batches
Defaults to datafusion_common::config::ExecutionOptions batch_size.
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>Expression adapter used to adapt filters and projections that are pushed down into the scan from the logical schema to the physical schema of the file.
partitioned_by_file_group: boolWhen true, file_groups are organized by partition column values and output_partitioning will return Hash partitioning on partition columns. This allows the optimizer to skip hash repartitioning for aggregates and joins on partition columns.
If the number of file partitions > target_partitions, the file partitions will be grouped in a round-robin fashion such that number of file partitions = target_partitions.
Implementations§
Source§impl FileScanConfig
impl FileScanConfig
Sourcepub fn file_schema(&self) -> &SchemaRef
pub fn file_schema(&self) -> &SchemaRef
Get the file schema (schema of the files without partition columns)
Sourcepub fn table_partition_cols(&self) -> &Vec<FieldRef> ⓘ
pub fn table_partition_cols(&self) -> &Vec<FieldRef> ⓘ
Get the table partition columns
Sourcepub fn statistics(&self) -> Statistics
pub fn statistics(&self) -> Statistics
Returns the unprojected table statistics, marking them as inexact if filters are present.
When filters are pushed down (including pruning predicates and bloom filters), we can’t guarantee the statistics are exact because we don’t know how many rows will be filtered out.
pub fn projected_schema(&self) -> Result<Arc<Schema>>
Sourcepub fn newlines_in_values(&self) -> bool
👎Deprecated since 52.0.0: newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first.
pub fn newlines_in_values(&self) -> bool
newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first.
Returns whether newlines in values are supported.
This method always returns false. The actual newlines_in_values setting
has been moved to CsvSource and should be accessed via
CsvSource::csv_options() instead.
pub fn projected_constraints(&self) -> Constraints
This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first.
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>>
This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first.
Sourcepub fn split_groups_by_statistics_with_target_partitions(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
target_partitions: usize,
) -> Result<Vec<FileGroup>>
pub fn split_groups_by_statistics_with_target_partitions( table_schema: &SchemaRef, file_groups: &[FileGroup], sort_order: &LexOrdering, target_partitions: usize, ) -> Result<Vec<FileGroup>>
Splits file groups into new groups based on statistics to enable efficient parallel processing.
The method distributes files across a target number of partitions while ensuring files within each partition maintain sort order based on their min/max statistics.
The algorithm works by:
- Takes files sorted by minimum values
- For each file:
- Finds eligible groups (empty or where file’s min > group’s last max)
- Selects the smallest eligible group
- Creates a new group if needed
§Parameters
table_schema: Schema containing information about the columnsfile_groups: The original file groups to splitsort_order: The lexicographical ordering to maintain within each grouptarget_partitions: The desired number of output partitions
§Returns
A new set of file groups, where files within each group are non-overlapping with respect to their min/max statistics and maintain the specified sort order.
Sourcepub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
) -> Result<Vec<FileGroup>>
pub fn split_groups_by_statistics( table_schema: &SchemaRef, file_groups: &[FileGroup], sort_order: &LexOrdering, ) -> Result<Vec<FileGroup>>
Attempts to do a bin-packing on files into file groups, such that any two files in a file group are ordered and non-overlapping with respect to their statistics. It will produce the smallest number of file groups possible.
Sourcepub fn file_source(&self) -> &Arc<dyn FileSource>
pub fn file_source(&self) -> &Arc<dyn FileSource>
Returns the file_source
Trait Implementations§
Source§impl Clone for FileScanConfig
impl Clone for FileScanConfig
Source§fn clone(&self) -> FileScanConfig
fn clone(&self) -> FileScanConfig
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl DataSource for FileScanConfig
impl DataSource for FileScanConfig
Source§fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>>
fn repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, output_ordering: Option<LexOrdering>, ) -> Result<Option<Arc<dyn DataSource>>>
If supported by the underlying FileSource, redistribute files across partitions according to their size.
Source§fn output_partitioning(&self) -> Partitioning
fn output_partitioning(&self) -> Partitioning
Returns the output partitioning for this file scan.
When partitioned_by_file_group is true, this returns Partitioning::Hash on
the Hive partition columns, allowing the optimizer to skip hash repartitioning
for aggregates and joins on those columns.
Tradeoffs
- Benefit: Eliminates
RepartitionExecandSortExecfor queries withGROUP BYorORDER BYon partition columns. - Cost: Files are grouped by partition values rather than split by byte
ranges, which may reduce I/O parallelism when partition sizes are uneven.
For simple aggregations without
ORDER BY, this cost may outweigh the benefit.
Follow-up Work
- Idea: Could allow byte-range splitting within partition-aware groups, preserving I/O parallelism while maintaining partition semantics.
Source§fn eq_properties(&self) -> EquivalenceProperties
fn eq_properties(&self) -> EquivalenceProperties
Computes the effective equivalence properties of this file scan, taking into account the file schema, any projections or filters applied by the file source, and the output ordering.
fn open( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
fn as_any(&self) -> &dyn Any
Source§fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> FmtResult
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> FmtResult
fn scheduling_type(&self) -> SchedulingType
Source§fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
partition is None.Source§fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>>
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>>
fn fetch(&self) -> Option<usize>
fn metrics(&self) -> ExecutionPlanMetricsSet
fn try_swapping_with_projection( &self, projection: &ProjectionExprs, ) -> Result<Option<Arc<dyn DataSource>>>
Source§fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>>
fn try_pushdown_filters( &self, filters: Vec<Arc<dyn PhysicalExpr>>, config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>>
Source§fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>>
fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>>
Source§fn with_preserve_order(
&self,
preserve_order: bool,
) -> Option<Arc<dyn DataSource>>
fn with_preserve_order( &self, preserve_order: bool, ) -> Option<Arc<dyn DataSource>>
DataSource that is aware of order-sensitivity.Source§impl Debug for FileScanConfig
impl Debug for FileScanConfig
Source§impl DisplayAs for FileScanConfig
impl DisplayAs for FileScanConfig
Source§impl From<FileScanConfig> for FileScanConfigBuilder
impl From<FileScanConfig> for FileScanConfigBuilder
Source§fn from(config: FileScanConfig) -> Self
fn from(config: FileScanConfig) -> Self
Auto Trait Implementations§
impl Freeze for FileScanConfig
impl !RefUnwindSafe for FileScanConfig
impl Send for FileScanConfig
impl Sync for FileScanConfig
impl Unpin for FileScanConfig
impl UnsafeUnpin for FileScanConfig
impl !UnwindSafe for FileScanConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more