Expand description
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s target users are developers building fast and feature rich database and analytic systems, customized to particular workloads. See use cases for examples.
“Out of the box,” DataFusion offers SQL and Dataframe
APIs,
excellent performance, built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
Python Bindings are also available.
DataFusion features a full query planner, a columnar, streaming, multi-threaded, vectorized execution engine, and partitioned data sources. You can customize DataFusion at almost all points including additional data sources, query languages, functions, custom operators and more. See the Architecture section below for more details.
§Examples
The main entry point for interacting with DataFusion is the
SessionContext
. Expr
s represent expressions such as a + b
.
§DataFrame
To execute a query against data stored
in a CSV file using a DataFrame
:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
§SQL
To execute a query against a CSV file using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
§More Examples
There are many additional annotated examples of using DataFusion in the datafusion-examples directory.
§Architecture
You can find a formal description of DataFusion’s architecture in our SIGMOD 2024 Paper.
§Design Goals
DataFusion’s Architecture Goals are:
-
Work “out of the box”: Provide a very fast, world class query engine with minimal setup or required configuration.
-
Customizable everything: All behavior should be customizable by implementing traits.
-
Architecturally boring 🥱: Follow industrial best practice rather than trying cutting edge, but unproven, techniques.
With these principles, users start with a basic, high-performance engine and specialize it over time to suit their needs and available engineering capacity.
§Overview Presentations
The following presentations offer high level overviews of the different components and how they interact together.
- [Apr 2023]: The Apache DataFusion Architecture talks
- [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
- [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
- [February 2021]: How DataFusion is used within the Ballista Project is described in Ballista: Distributed Compute with Rust and Apache Arrow: recording
§Customization and Extension
DataFusion is designed to be highly extensible, so you can
start with a working, full featured engine, and then
specialize any behavior for your use case. For example,
some projects may add custom ExecutionPlan
operators, or create their own
query language that directly creates LogicalPlan
rather than using the
built in SQL planner, SqlToRel
.
In order to achieve this, DataFusion supports extension at many points:
- read from any datasource (
TableProvider
) - define your own catalogs, schemas, and table lists (
catalog
andCatalogProvider
) - build your own query language or plans (
LogicalPlanBuilder
) - declare and use user-defined functions (
ScalarUDF
, andAggregateUDF
,WindowUDF
) - add custom plan rewrite passes (
AnalyzerRule
,OptimizerRule
andPhysicalOptimizerRule
) - extend the planner to use user-defined logical and physical nodes (
QueryPlanner
)
You can find examples of each of them in the datafusion-examples directory.
§Query Planning and Execution Overview
§SQL
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes
-
The query string is parsed to an Abstract Syntax Tree (AST)
Statement
using sqlparser. -
The AST is converted to a
LogicalPlan
and logical expressionsExpr
s to compute the desired result bySqlToRel
. This phase also includes name and type resolution (“binding”).
§DataFrame
When executing plans using the DataFrame
API, the process is
identical as with SQL, except the DataFrame API builds the
LogicalPlan
directly using LogicalPlanBuilder
. Systems
that have their own custom query languages typically also build
LogicalPlan
directly.
§Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ DataSourceExec│ │ DataSourceExec│
└─────────────┘ └─────────────┘ └─────────────────┘ └─────────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
To process large datasets with many rows as efficiently as possible, significant effort is spent planning and optimizing, in the following manner:
-
The
LogicalPlan
is checked and rewritten to enforce semantic rules, such as type coercion, byAnalyzerRule
s -
The
LogicalPlan
is rewritten byOptimizerRule
s, such as projection and filter pushdown, to improve its efficiency. -
The
LogicalPlan
is converted to anExecutionPlan
by aPhysicalPlanner
-
The
ExecutionPlan
is rewritten byPhysicalOptimizerRule
s, such as sort and join selection, to improve its efficiency.
§Data Sources
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌───────────────┐
│ │ │ │
│impl TableProvider │────────▶│DataSourceExec │
│ │ │ │
└─────────────────────────┘ └───────────────┘
TableProvider
(built in or user provided) ExecutionPlan
A TableProvider
provides information for planning and
an ExecutionPlan
s for execution. DataFusion includes ListingTable
which supports reading several common file formats, and you can support any
new file format by implementing the TableProvider
trait. See also:
-
ListingTable
: Reads data from Parquet, JSON, CSV, or AVRO files. Supports single files or multiple files with HIVE style partitioning, optional compression, directly reading from remote object store and more. -
MemTable
: Reads data from in memoryRecordBatch
es. -
StreamingTable
: Reads data from potentially unbounded inputs.
§Plan Representations
§Logical Plans
Logical planning yields LogicalPlan
nodes and Expr
representing expressions which are Schema
aware and represent statements
independent of how they are physically executed.
A LogicalPlan
is a Directed Acyclic Graph (DAG) of other
LogicalPlan
s, each potentially containing embedded Expr
s.
LogicalPlan
s can be rewritten with TreeNode
API, see the
tree_node module
for more details.
Expr
s can also be rewritten with TreeNode
API and simplified using
ExprSimplifier
. Examples of working with and executing Expr
s can be
found in the expr_api
.rs example
§Physical Plans
An ExecutionPlan
(sometimes referred to as a “physical plan”)
is a plan that can be executed against data. It a DAG of other
ExecutionPlan
s each potentially containing expressions that implement the
PhysicalExpr
trait.
Compared to a LogicalPlan
, an ExecutionPlan
has additional concrete
information about how to perform calculations (e.g. hash vs merge
join), and how data flows during execution (e.g. partitioning and
sortedness).
cp_solver performs range propagation analysis on PhysicalExpr
s and
PruningPredicate
can prove certain boolean PhysicalExpr
s used for
filtering can never be true
using additional statistical information.
§Execution
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌────────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ DataSourceExec│ │ │ │ ┌────────────┐
└────────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─
ExecutionPlan
s process data using the Apache Arrow memory
format, making heavy use of functions from the arrow
crate. Values are represented with ColumnarValue
, which are either
ScalarValue
(single constant values) or ArrayRef
(Arrow
Arrays).
Calling execute
produces 1 or more partitions of data,
as a SendableRecordBatchStream
, which implements a pull based execution
API. Calling next()
.await
will incrementally compute and return the next
RecordBatch
. Balanced parallelism is achieved using Volcano style
“Exchange” operations implemented by RepartitionExec
.
While some recent research such as Morsel-Driven Parallelism describes challenges with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves similar scalability as systems that use push driven schedulers such as DuckDB. See the DataFusion paper in SIGMOD 2024 for more details.
See the implementors of ExecutionPlan
for a list of physical operators available.
§Streaming Execution
DataFusion is a “streaming” query engine which means ExecutionPlan
s incrementally
read from their input(s) and compute output one RecordBatch
at a time
by continually polling SendableRecordBatchStream
s. Output and
intermediate RecordBatch
s each have approximately batch_size
rows,
which amortizes per-batch overhead of execution.
Note that certain operations, sometimes called “pipeline breakers”,
(for example full sorts or hash aggregations) are fundamentally non streaming and
must read their input fully before producing any output. As much as possible,
other operators read a single RecordBatch
from their input to produce a
single RecordBatch
as output.
For example, given this SQL query:
SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30);
The diagram below shows the call sequence when a consumer calls next()
to
get the next RecordBatch
of output. While it is possible that some
steps run on different threads, typically tokio will use the same thread
that called next()
to read from the input, apply the filter, and
return the results without interleaving any other operations. This results
in excellent cache locality as the same CPU core that produces the data often
consumes it immediately as well.
Step 3: FilterExec calls next() Step 2: ProjectionExec calls
on input Stream next() on input Stream
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ Step 1: Consumer
▼ ▼ │ calls next()
┏━━━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┃ DataSource ┃ ┃ ┃ ┃ ┃
┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃
┃ ParquetSource) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃
┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
┃ ┃ ┃ ┃ ┃ ┃
┗━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
│ ▲ ▲ Step 6: ProjectionExec
│ │ │ computes date_trunc into a
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned
┌─────────────────────┐ ┌─────────────┐ from client
│ RecordBatch │ │ RecordBatch │
└─────────────────────┘ └─────────────┘
Step 4: DataSource returns a Step 5: FilterExec returns a new
single RecordBatch RecordBatch with only matching rows
§Thread Scheduling, CPU / IO Thread Pools, and Tokio Runtime
s
DataFusion automatically runs each plan with multiple CPU cores using
a Tokio Runtime
as a thread pool. While tokio is most commonly used
for asynchronous network I/O, the combination of an efficient, work-stealing
scheduler and first class compiler support for automatic continuation
generation (async
), also makes it a compelling choice for CPU intensive
applications as explained in the Using Rustlang’s Async Tokio
Runtime for CPU-Bound Tasks blog.
The number of cores used is determined by the target_partitions
configuration setting, which defaults to the number of CPU cores.
While preparing for execution, DataFusion tries to create this many distinct
async
Stream
s for each ExecutionPlan
.
The Stream
s for certain ExecutionPlans
, such as as RepartitionExec
and CoalescePartitionsExec
, spawn Tokio task
s, that are run by
threads managed by the Runtime
.
Many DataFusion Stream
s perform CPU intensive processing.
Using async
for CPU intensive tasks makes it easy for TableProvider
s
to perform network I/O using standard Rust async
during execution.
However, this design also makes it very easy to mix CPU intensive and latency
sensitive I/O work on the same thread pool (Runtime
).
Using the same (default) Runtime
is convenient, and often works well for
initial development and processing local files, but it can lead to problems
under load and/or when reading from network sources such as AWS S3.
If your system does not fully utilize either the CPU or network bandwidth
during execution, or you see significantly higher tail (e.g. p99) latencies
responding to network requests, it is likely you need to use a different
Runtime
for CPU intensive DataFusion plans. This effect can be especially
pronounced when running several queries concurrently.
As shown in the following figure, using the same Runtime
for both CPU
intensive processing and network requests can introduce significant
delays in responding to those network requests. Delays in processing network
requests can and does lead network flow control to throttle the available
bandwidth in response.
Legend
┏━━━━━━┓
Processing network request ┃ ┃ CPU bound work
is delayed due to processing ┗━━━━━━┛
CPU bound work ┌─┐
│ │ Network request
││ └─┘ processing
││
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ │
▼ ▼
┌─────────────┐ ┌─┐┌─┐┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓┌─┐
│ │thread 1 │ ││ │┃ Decoding ┃┃ Filtering ┃│ │
│ │ └─┘└─┘┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛└─┘
│ │ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│Tokio Runtime│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
│(thread pool)│ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
│ │ ... ...
│ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓┌─┐ ┏━━━━━━━━━━━━━━┓
│ │thread N ┃ Decoding ┃ Filtering ┃│ │ ┃ Decoding ┃
└─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┛└─┘ ┗━━━━━━━━━━━━━━┛
─────────────────────────────────────────────────────────────▶
time
The bottleneck resulting from network throttling can be avoided
by using separate Runtime
s for the different types of work, as shown
in the diagram below.
A separate thread pool processes network Legend
requests, reducing the latency for
processing each request ┏━━━━━━┓
┃ ┃ CPU bound work
│ ┗━━━━━━┛
│ ┌─┐
┌ ─ ─ ─ ─ ┘ │ │ Network request
┌ ─ ─ ─ ┘ └─┘ processing
│
▼ ▼
┌─────────────┐ ┌─┐┌─┐┌─┐
│ │thread 1 │ ││ ││ │
│ │ └─┘└─┘└─┘
│Tokio Runtime│ ...
│(thread pool)│thread 2
│ │
│"IO Runtime" │ ...
│ │ ┌─┐
│ │thread N │ │
└─────────────┘ └─┘
─────────────────────────────────────────────────────────────▶
time
┌─────────────┐ ┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓
│ │thread 1 ┃ Decoding ┃┃ Filtering ┃
│ │ ┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛
│Tokio Runtime│ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│(thread pool)│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
│ │ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
│ CPU Runtime │ ... ...
│ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
│ │thread N ┃ Decoding ┃ Filtering ┃ Decoding ┃
└─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
─────────────────────────────────────────────────────────────▶
time
Note that DataFusion does not use tokio::task::spawn_blocking
for
CPU-bounded work, because spawn_blocking
is designed for blocking IO,
not designed CPU bound tasks. Among other challenges, spawned blocking
tasks can’t yield waiting for input (can’t call await
) so they
can’t be used to limit the number of concurrent CPU bound tasks or
keep the processing pipeline to the same core.
§State Management and Configuration
ConfigOptions
contain options to control DataFusion’s
execution.
The state required to execute queries is managed by the following structures:
-
SessionContext
: State needed for createLogicalPlan
s such as the table definitions, and the function registries. -
TaskContext
: State needed for execution such as theMemoryPool
,DiskManager
, andObjectStoreRegistry
. -
ExecutionProps
: Per-execution properties and data (such as starting timestamps, etc).
§Resource Management
The amount of memory and temporary local disk space used by
DataFusion when running a plan can be controlled using the
MemoryPool
and DiskManager
. Other runtime options can be
found on RuntimeEnv
.
§Crate Organization
Most users interact with DataFusion via this crate (datafusion
), which re-exports
all functionality needed to build and execute queries.
There are three other crates that provide additional functionality that must be used directly:
datafusion_proto
: Plan serialization and deserializationdatafusion_substrait
: Support for the substrait plan serialization formatdatafusion_sqllogictest
: The DataFusion SQL logic test runner
DataFusion is internally split into multiple sub crates to enforce modularity and improve compilation times. See the list of modules for all available sub-crates. Major ones are
- datafusion_common: Common traits and types
- datafusion_catalog: Catalog APIs such as
SchemaProvider
andCatalogProvider
- datafusion_execution: State and structures needed for execution
- datafusion_expr:
LogicalPlan
,Expr
and related logical planning structure - datafusion_functions: Scalar function packages
- datafusion_functions_aggregate: Aggregate functions such as
MIN
,MAX
,SUM
, etc - datafusion_functions_nested: Scalar function packages for
ARRAY
s,MAP
s andSTRUCT
s - datafusion_functions_table: Table Functions such as
GENERATE_SERIES
- datafusion_functions_window: Window functions such as
ROW_NUMBER
,RANK
, etc - datafusion_optimizer:
OptimizerRule
s andAnalyzerRule
s - datafusion_physical_expr:
PhysicalExpr
and related expressions - datafusion_physical_plan:
ExecutionPlan
and related expressions - datafusion_physical_optimizer:
ExecutionPlan
and related expressions - datafusion_sql: SQL planner (
SqlToRel
)
§Citing DataFusion in Academic Papers
You can use the following citation to reference DataFusion in academic papers:
@inproceedings{lamb2024apache
title={Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine},
author={Lamb, Andrew and Shen, Yijie and Heres, Dani{\"e}l and Chakraborty, Jayjeet and Kabak, Mehmet Ozan and Hsieh, Liang-Chi and Sun, Chao},
booktitle={Companion of the 2024 International Conference on Management of Data},
pages={5--17},
year={2024}
}
Re-exports§
Modules§
- catalog
- re-export of
datafusion_catalog
crate - catalog_
common - Interfaces and default implementations of catalogs and schemas.
- common
- re-export of
datafusion_common
crate - config
- Runtime configuration, via
ConfigOptions
- dataframe
DataFrame
API for building and executing query plans.- datasource
- DataFusion data sources:
TableProvider
andListingTable
- error
- DataFusion error type
DataFusionError
andResult
. - execution
- Shared state for query planning and execution.
- functions
- re-export of
datafusion_functions
crate - functions_
aggregate - re-export of
datafusion_functions_aggregate
crate - functions_
array Deprecated - re-export of
datafusion_functions_nested
crate asfunctions_array
for backward compatibility, if “nested_expressions” feature is enabled - functions_
nested - re-export of
datafusion_functions_nested
crate, if “nested_expressions” feature is enabled - functions_
table - re-export of
datafusion_functions_table
crate - functions_
window - re-export of
datafusion_functions_window
crate - logical_
expr - re-export of
datafusion_expr
crate - logical_
expr_ common - re-export of
datafusion_expr_common
crate - optimizer
- re-export of
datafusion_optimizer
crate - physical_
expr - re-export of
datafusion_physical_expr
crate - physical_
expr_ common - re-export of
datafusion_physical_expr
crate - physical_
optimizer - re-export of
datafusion_physical_optimizer
crate - physical_
plan - re-export of
datafusion_physical_plan
crate - physical_
planner - Planner for
LogicalPlan
toExecutionPlan
- prelude
- DataFusion “prelude” to simplify importing common types.
- scalar
ScalarValue
single value representation.- sql
- re-export of
datafusion_sql
crate - test
Non-WebAssembly - Common unit test utility methods
- test_
util - Utility functions to make testing DataFusion based crates easier
- variable
- re-export of variable provider for
@name
and@@name
style runtime values.
Macros§
- assert_
batches_ eq - Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
- assert_
batches_ sorted_ eq - Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
Constants§
- DATAFUSION_
VERSION - DataFusion crate version