Expand description
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s target users are developers building fast and feature rich database and analytic systems, customized to particular workloads. Please see the DataFusion website for additional documentation, use cases and examples.
“Out of the box,” DataFusion offers SQL and Dataframe APIs,
excellent performance, built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
Python Bindings are also available.
DataFusion features a full query planner, a columnar, streaming, multi-threaded, vectorized execution engine, and partitioned data sources. You can customize DataFusion at almost all points including additional data sources, query languages, functions, custom operators and more. See the Architecture section below for more details.
§Examples
The main entry point for interacting with DataFusion is the
SessionContext. Exprs represent expressions such as a + b.
§DataFrame
To execute a query against data stored
in a CSV file using a DataFrame:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?;
// create a plan
let df = df
.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results =
arrow::util::pretty::pretty_format_batches(&results)?.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+",
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);§SQL
To execute a query against a CSV file using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
.await?;
// create a plan
let df = ctx
.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
.await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results =
arrow::util::pretty::pretty_format_batches(&results)?.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+",
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);§More Examples
There are many additional annotated examples of using DataFusion in the datafusion-examples directory.
§Architecture
You can find a formal description of DataFusion’s architecture in our SIGMOD 2024 Paper.
§Design Goals
DataFusion’s Architecture Goals are:
-
Work “out of the box”: Provide a very fast, world class query engine with minimal setup or required configuration.
-
Customizable everything: All behavior should be customizable by implementing traits.
-
Architecturally boring 🥱: Follow industrial best practice rather than trying cutting edge, but unproven, techniques.
With these principles, users start with a basic, high-performance engine and specialize it over time to suit their needs and available engineering capacity.
§Overview Presentations
The following presentations offer high level overviews of the different components and how they interact together.
- [Apr 2023]: The Apache DataFusion Architecture talks
- [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
- [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
- [February 2021]: How DataFusion is used within the Ballista Project is described in Ballista: Distributed Compute with Rust and Apache Arrow: recording
§Customization and Extension
DataFusion is designed to be highly extensible, so you can
start with a working, full featured engine, and then
specialize any behavior for your use case. For example,
some projects may add custom ExecutionPlan operators, or create their own
query language that directly creates LogicalPlan rather than using the
built in SQL planner, SqlToRel.
In order to achieve this, DataFusion supports extension at many points:
- read from any datasource (
TableProvider) - define your own catalogs, schemas, and table lists (
catalogandCatalogProvider) - build your own query language or plans (
LogicalPlanBuilder) - declare and use user-defined functions (
ScalarUDF, andAggregateUDF,WindowUDF) - add custom plan rewrite passes (
AnalyzerRule,OptimizerRuleandPhysicalOptimizerRule) - extend the planner to use user-defined logical and physical nodes (
QueryPlanner)
You can find examples of each of them in the datafusion-examples directory.
§Query Planning and Execution Overview
§SQL
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes-
The query string is parsed to an Abstract Syntax Tree (AST)
Statementusing sqlparser. -
The AST is converted to a
LogicalPlanand logical expressionsExprs to compute the desired result bySqlToRel. This phase also includes name and type resolution (“binding”).
§DataFrame
When executing plans using the DataFrame API, the process is
identical as with SQL, except the DataFrame API builds the
LogicalPlan directly using LogicalPlanBuilder. Systems
that have their own custom query languages typically also build
LogicalPlan directly.
§Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ DataSourceExec│ │ DataSourceExec│
└─────────────┘ └─────────────┘ └─────────────────┘ └─────────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlanTo process large datasets with many rows as efficiently as possible, significant effort is spent planning and optimizing, in the following manner:
-
The
LogicalPlanis checked and rewritten to enforce semantic rules, such as type coercion, byAnalyzerRules -
The
LogicalPlanis rewritten byOptimizerRules, such as projection and filter pushdown, to improve its efficiency. -
The
LogicalPlanis converted to anExecutionPlanby aPhysicalPlanner -
The
ExecutionPlanis rewritten byPhysicalOptimizerRules, such as sort and join selection, to improve its efficiency.
§Data Sources
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌───────────────┐
│ │ │ │
│impl TableProvider │────────▶│DataSourceExec │
│ │ │ │
└─────────────────────────┘ └───────────────┘
TableProvider
(built in or user provided) ExecutionPlanA TableProvider provides information for planning and
an ExecutionPlan for execution. DataFusion includes two built-in
table providers that support common file formats and require no runtime services,
ListingTable and MemTable. You can add support for any other data
source and/or file formats by implementing the TableProvider trait.
See also:
-
ListingTable: Reads data from one or more Parquet, JSON, CSV, or AVRO files in one or more local or remote directories. Supports HIVE style partitioning, optional compression, directly reading from remote object store, file metadata caching, and more. -
MemTable: Reads data from in memoryRecordBatches. -
StreamingTable: Reads data from potentially unbounded inputs.
§Plan Representations
§Logical Plans
Logical planning yields LogicalPlan nodes and Expr
representing expressions which are Schema aware and represent statements
independent of how they are physically executed.
A LogicalPlan is a Directed Acyclic Graph (DAG) of other
LogicalPlans, each potentially containing embedded Exprs.
LogicalPlans can be rewritten with TreeNode API, see the
tree_node module for more details.
Exprs can also be rewritten with TreeNode API and simplified using
ExprSimplifier. Examples of working with and executing Exprs can be
found in the expr_api.rs example
§Physical Plans
An ExecutionPlan (sometimes referred to as a “physical plan”)
is a plan that can be executed against data. It a DAG of other
ExecutionPlans each potentially containing expressions that implement the
PhysicalExpr trait.
Compared to a LogicalPlan, an ExecutionPlan has additional concrete
information about how to perform calculations (e.g. hash vs merge
join), and how data flows during execution (e.g. partitioning and
sortedness).
cp_solver performs range propagation analysis on PhysicalExprs and
PruningPredicate can prove certain boolean PhysicalExprs used for
filtering can never be true using additional statistical information.
§Execution
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌────────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ DataSourceExec│ │ │ │ ┌────────────┐
└────────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─ExecutionPlans process data using the Apache Arrow memory
format, making heavy use of functions from the arrow
crate. Values are represented with ColumnarValue, which are either
ScalarValue (single constant values) or ArrayRef (Arrow
Arrays).
Calling execute produces 1 or more partitions of data,
as a SendableRecordBatchStream, which implements a pull based execution
API. Calling next().await will incrementally compute and return the next
RecordBatch. Balanced parallelism is achieved using Volcano style
“Exchange” operations implemented by RepartitionExec.
While some recent research such as Morsel-Driven Parallelism describes challenges with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves similar scalability as systems that use push driven schedulers such as DuckDB. See the DataFusion paper in SIGMOD 2024 for more details.
See the implementors of ExecutionPlan for a list of physical operators available.
§Streaming Execution
DataFusion is a “streaming” query engine which means ExecutionPlans incrementally
read from their input(s) and compute output one RecordBatch at a time
by continually polling SendableRecordBatchStreams. Output and
intermediate RecordBatchs each have approximately batch_size rows,
which amortizes per-batch overhead of execution.
Note that certain operations, sometimes called “pipeline breakers”,
(for example full sorts or hash aggregations) are fundamentally non streaming and
must read their input fully before producing any output. As much as possible,
other operators read a single RecordBatch from their input to produce a
single RecordBatch as output.
For example, given this SQL:
SELECT name FROM 'data.parquet' WHERE id > 10An simplified DataFusion execution plan is shown below. It first reads
data from the Parquet file, then applies the filter, then the projection,
and finally produces output. Each step processes one RecordBatch at a
time. Multiple batches are processed concurrently on different CPU cores
for plans with multiple partitions.
┌─────────────┐ ┌──────────────┐ ┌────────────────┐ ┌──────────────────┐ ┌──────────┐
│ Parquet │───▶│ DataSource │───▶│ FilterExec │───▶│ ProjectionExec │───▶│ Results │
│ File │ │ │ │ │ │ │ │ │
└─────────────┘ └──────────────┘ └────────────────┘ └──────────────────┘ └──────────┘
(reads data) (id > 10) (keeps "name" col)
RecordBatch ───▶ RecordBatch ────▶ RecordBatch ────▶ RecordBatchDataFusion uses the classic “pull” based control flow (explained more in the next section) to implement streaming execution. As an example, consider the following SQL query:
SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30);The diagram below shows the call sequence when a consumer calls next() to
get the next RecordBatch of output. While it is possible that some
steps run on different threads, typically tokio will use the same thread
that called next() to read from the input, apply the filter, and
return the results without interleaving any other operations. This results
in excellent cache locality as the same CPU core that produces the data often
consumes it immediately as well.
Step 3: FilterExec calls next() Step 2: ProjectionExec calls
on input Stream next() on input Stream
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ Step 1: Consumer
▼ ▼ │ calls next()
┏━━━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┃ DataSource ┃ ┃ ┃ ┃ ┃
┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃
┃ ParquetSource) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃
┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
┃ ┃ ┃ ┃ ┃ ┃
┗━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
│ ▲ ▲ Step 6: ProjectionExec
│ │ │ computes date_trunc into a
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned
┌─────────────────────┐ ┌─────────────┐ from client
│ RecordBatch │ │ RecordBatch │
└─────────────────────┘ └─────────────┘
Step 4: DataSource returns a Step 5: FilterExec returns a new
single RecordBatch RecordBatch with only matching rows§Thread Scheduling, CPU / IO Thread Pools, and Tokio Runtimes
DataFusion automatically runs each plan with multiple CPU cores using
a Tokio Runtime as a thread pool. While tokio is most commonly used
for asynchronous network I/O, the combination of an efficient, work-stealing
scheduler, and first class compiler support for automatic continuation
generation (async) also makes it a compelling choice for CPU intensive
applications as explained in the Using Rustlang’s Async Tokio
Runtime for CPU-Bound Tasks blog.
The number of cores used is determined by the target_partitions
configuration setting, which defaults to the number of CPU cores.
While preparing for execution, DataFusion tries to create this many distinct
async Streams for each ExecutionPlan.
The Streams for certain ExecutionPlans, such as RepartitionExec
and CoalescePartitionsExec, spawn Tokio tasks, that run on
threads managed by the Runtime.
Many DataFusion Streams perform CPU intensive processing.
§Cooperative Scheduling
DataFusion uses cooperative scheduling, which means that each Stream
is responsible for yielding control back to the Runtime after
some amount of work is done. Please see the coop module documentation
for more details.
§Network I/O and CPU intensive tasks
Using async for CPU intensive tasks makes it easy for TableProviders
to perform network I/O using standard Rust async during execution.
However, this design also makes it very easy to mix CPU intensive and latency
sensitive I/O work on the same thread pool (Runtime).
Using the same (default) Runtime is convenient, and often works well for
initial development and processing local files, but it can lead to problems
under load and/or when reading from network sources such as AWS S3.
§Optimizing Latency: Throttled CPU / IO under Highly Concurrent Load
If your system does not fully utilize either the CPU or network bandwidth
during execution, or you see significantly higher tail (e.g. p99) latencies
responding to network requests, it is likely you need to use a different
Runtime for DataFusion plans. The thread_pools example
has an example of how to do so.
As shown below, using the same Runtime for both CPU intensive processing
and network requests can introduce significant delays in responding to
those network requests. Delays in processing network requests can and does
lead network flow control to throttle the available bandwidth in response.
This effect can be especially pronounced when running multiple queries
concurrently.
Legend
┏━━━━━━┓
Processing network request ┃ ┃ CPU bound work
is delayed due to processing ┗━━━━━━┛
CPU bound work ┌─┐
│ │ Network request
││ └─┘ processing
││
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ │
▼ ▼
┌─────────────┐ ┌─┐┌─┐┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓┌─┐
│ │thread 1 │ ││ │┃ Decoding ┃┃ Filtering ┃│ │
│ │ └─┘└─┘┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛└─┘
│ │ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│Tokio Runtime│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
│(thread pool)│ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
│ │ ... ...
│ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓┌─┐ ┏━━━━━━━━━━━━━━┓
│ │thread N ┃ Decoding ┃ Filtering ┃│ │ ┃ Decoding ┃
└─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┛└─┘ ┗━━━━━━━━━━━━━━┛
─────────────────────────────────────────────────────────────▶
timeThe bottleneck resulting from network throttling can be avoided
by using separate Runtimes for the different types of work, as shown
in the diagram below.
A separate thread pool processes network Legend
requests, reducing the latency for
processing each request ┏━━━━━━┓
┃ ┃ CPU bound work
│ ┗━━━━━━┛
│ ┌─┐
┌ ─ ─ ─ ─ ┘ │ │ Network request
┌ ─ ─ ─ ┘ └─┘ processing
│
▼ ▼
┌─────────────┐ ┌─┐┌─┐┌─┐
│ │thread 1 │ ││ ││ │
│ │ └─┘└─┘└─┘
│Tokio Runtime│ ...
│(thread pool)│thread 2
│ │
│"IO Runtime" │ ...
│ │ ┌─┐
│ │thread N │ │
└─────────────┘ └─┘
─────────────────────────────────────────────────────────────▶
time
┌─────────────┐ ┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓
│ │thread 1 ┃ Decoding ┃┃ Filtering ┃
│ │ ┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛
│Tokio Runtime│ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│(thread pool)│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
│ │ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
│ CPU Runtime │ ... ...
│ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│ │thread N ┃ Decoding ┃ Filtering ┃ Decoding ┃
└─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
─────────────────────────────────────────────────────────────▶
timeNote that DataFusion does not use tokio::task::spawn_blocking for
CPU-bounded work, because spawn_blocking is designed for blocking IO,
not designed CPU bound tasks. Among other challenges, spawned blocking
tasks can’t yield waiting for input (can’t call await) so they
can’t be used to limit the number of concurrent CPU bound tasks or
keep the processing pipeline to the same core.
§State Management and Configuration
ConfigOptions contain options to control DataFusion’s
execution.
The state required to execute queries is managed by the following structures:
-
SessionContext: State needed to createLogicalPlans such as the table definitions and the function registries. -
TaskContext: State needed for execution such as theMemoryPool,DiskManager, andObjectStoreRegistry. -
ExecutionProps: Per-execution properties and data (such as starting timestamps, etc).
§Resource Management
The amount of memory and temporary local disk space used by
DataFusion when running a plan can be controlled using the
MemoryPool and DiskManager. Other runtime options can be
found on RuntimeEnv.
§Crate Organization
Most users interact with DataFusion via this crate (datafusion), which re-exports
all functionality needed to build and execute queries.
There are three other crates that provide additional functionality that must be used directly:
datafusion_proto: Plan serialization and deserializationdatafusion_substrait: Support for the substrait plan serialization formatdatafusion_sqllogictest: The DataFusion SQL logic test runner
DataFusion is internally split into multiple sub crates to enforce modularity and improve compilation times. See the list of modules for all available sub-crates. Major ones are
- datafusion_common: Common traits and types
- datafusion_catalog: Catalog APIs such as
SchemaProviderandCatalogProvider - datafusion_datasource: File and Data IO such as
FileSourceandDataSink - datafusion_session:
Sessionand related structures - datafusion_execution: State and structures needed for execution
- datafusion_expr:
LogicalPlan,Exprand related logical planning structure - datafusion_functions: Scalar function packages
- datafusion_functions_aggregate: Aggregate functions such as
MIN,MAX,SUM, etc - datafusion_functions_nested: Scalar function packages for
ARRAYs,MAPs andSTRUCTs - datafusion_functions_table: Table Functions such as
GENERATE_SERIES - datafusion_functions_window: Window functions such as
ROW_NUMBER,RANK, etc - datafusion_optimizer:
OptimizerRules andAnalyzerRules - datafusion_physical_expr:
PhysicalExprand related expressions - datafusion_physical_plan:
ExecutionPlanand related expressions - datafusion_physical_optimizer:
ExecutionPlanand related expressions - datafusion_sql: SQL planner (
SqlToRel)
§Citing DataFusion in Academic Papers
You can use the following citation to reference DataFusion in academic papers:
@inproceedings{lamb2024apache
title={Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine},
author={Lamb, Andrew and Shen, Yijie and Heres, Dani{\"e}l and Chakraborty, Jayjeet and Kabak, Mehmet Ozan and Hsieh, Liang-Chi and Sun, Chao},
booktitle={Companion of the 2024 International Conference on Management of Data},
pages={5--17},
year={2024}
}§Built-in Optimizer Rules
DataFusion applies a default analyzer, logical optimizer, and physical optimizer pipeline.
The rule names listed here match the names shown by EXPLAIN VERBOSE.
Rule order matters. The default pipeline may change between releases.
§Analyzer Rules
| order | rule | summary |
|---|---|---|
| 1 | resolve_grouping_function | Rewrites GROUPING(...) calls into expressions over DataFusion’s internal grouping id. |
| 2 | type_coercion | Adds implicit casts so operators and functions receive valid input types. |
§Logical Optimizer Rules
| order | rule | summary |
|---|---|---|
| 1 | rewrite_set_comparison | Rewrites ANY and ALL set-comparison subqueries into EXISTS-based boolean expressions with correct SQL NULL semantics. |
| 2 | optimize_unions | Flattens nested unions and removes unions with a single input. |
| 3 | unions_to_filter | Merges UNION DISTINCT branches that share the same source into a single filtered branch with a disjunctive predicate. |
| 4 | simplify_expressions | Constant-folds and simplifies expressions while preserving output names. |
| 5 | replace_distinct_aggregate | Rewrites DISTINCT and DISTINCT ON operators into aggregate-based plans that later rules can optimize further. |
| 6 | eliminate_join | Replaces keyless inner joins with a literal false filter by an empty relation. |
| 7 | decorrelate_predicate_subquery | Converts eligible IN and EXISTS predicate subqueries into semi or anti joins. |
| 8 | scalar_subquery_to_join | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
| 9 | decorrelate_lateral_join | Rewrites eligible lateral joins into regular joins. |
| 10 | extract_equijoin_predicate | Splits join filters into equijoin keys and residual predicates. |
| 11 | eliminate_duplicated_expr | Removes duplicate expressions from projections, aggregates, and similar operators. |
| 12 | eliminate_filter | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
| 13 | eliminate_cross_join | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
| 14 | eliminate_limit | Removes no-op limits and simplifies trivial limit shapes. |
| 15 | propagate_empty_relation | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
| 16 | filter_null_join_keys | Adds IS NOT NULL filters to nullable equijoin keys that can never match. |
| 17 | eliminate_outer_join | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
| 18 | push_down_limit | Moves literal limits closer to scans and unions and merges adjacent limits. |
| 19 | push_down_filter | Moves filters as early as possible through filter-commutative operators. |
| 20 | single_distinct_aggregation_to_group_by | Rewrites single-column DISTINCT aggregations into two-stage GROUP BY plans. |
| 21 | eliminate_group_by_constant | Removes constant or functionally redundant expressions from GROUP BY. |
| 22 | common_sub_expression_eliminate | Computes repeated subexpressions once and reuses the result. |
| 23 | extract_leaf_expressions | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
| 24 | push_down_leaf_projections | Pushes the helper projections created by leaf extraction toward leaf inputs. |
| 25 | optimize_projections | Prunes unused columns and removes unnecessary logical projections. |
§Physical Optimizer Rules
The same rule name may appear more than once when the default pipeline runs it in multiple phases.
| order | rule | phase | summary |
|---|---|---|---|
| 1 | OutputRequirements | add phase | Adds helper nodes so output requirements survive later physical rewrites. |
| 2 | aggregate_statistics | - | Uses exact source statistics to answer some aggregates without scanning data. |
| 3 | join_selection | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
| 4 | LimitedDistinctAggregation | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
| 5 | FilterPushdown | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
| 6 | EnforceDistribution | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
| 7 | CombinePartialFinalAggregate | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
| 8 | EnforceSorting | - | Adds or removes local sorts to satisfy required input orderings. |
| 9 | OptimizeAggregateOrder | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
| 10 | WindowTopN | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
| 11 | ProjectionPushdown | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
| 12 | OutputRequirements | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
| 13 | LimitAggregation | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
| 14 | LimitPushPastWindows | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
| 15 | HashJoinBuffering | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
| 16 | LimitPushdown | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
| 17 | TopKRepartition | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
| 18 | ProjectionPushdown | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
| 19 | PushdownSort | - | Pushes sort requirements into data sources that can already return sorted output. |
| 20 | EnsureCooperative | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 21 | FilterPushdown(Post) | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 22 | SanityCheckPlan | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
Re-exports§
pub use arrow;pub use object_store;pub use parquet;parquetpub use datafusion_datasource_avro::arrow_avro;avro
Modules§
- catalog
- re-export of
datafusion_catalogcrate - common
- re-export of
datafusion_commoncrate - config
- Runtime configuration, via
ConfigOptions - dataframe
DataFrameAPI for building and executing query plans.- datasource
- DataFusion data sources:
TableProviderandListingTable - error
- DataFusion error type
DataFusionErrorandResult. - execution
- Shared state for query planning and execution.
- functions
- re-export of
datafusion_functionscrate - functions_
aggregate - re-export of
datafusion_functions_aggregatecrate - functions_
nested - re-export of
datafusion_functions_nestedcrate, if “nested_expressions” feature is enabled - functions_
table - re-export of
datafusion_functions_tablecrate - functions_
window - re-export of
datafusion_functions_windowcrate - logical_
expr - re-export of
datafusion_exprcrate - logical_
expr_ common - re-export of
datafusion_expr_commoncrate - optimizer
- re-export of
datafusion_optimizercrate - physical_
expr - re-export of
datafusion_physical_exprcrate - physical_
expr_ adapter - re-export of
datafusion_physical_expr_adaptercrate - physical_
expr_ common - re-export of
datafusion_physical_exprcrate - physical_
optimizer - re-export of
datafusion_physical_optimizercrate - physical_
plan - re-export of
datafusion_physical_plancrate - physical_
planner - Planner for
LogicalPlantoExecutionPlan - prelude
- DataFusion “prelude” to simplify importing common types.
- scalar
ScalarValuesingle value representation.- sql
sql - re-export of
datafusion_sqlcrate - test
Non-WebAssembly - Common unit test utility methods
- test_
util - Utility functions to make testing DataFusion based crates easier
- variable
- re-export of variable provider for
@nameand@@namestyle runtime values.
Macros§
- assert_
batches_ eq - Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
- assert_
batches_ sorted_ eq - Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
- dataframe
- Macro for creating DataFrame.
Constants§
- DATAFUSION_
VERSION - DataFusion crate version