Crate datafusion
source ·Expand description
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s use cases include building very fast database and analytic systems, customized to particular workloads.
“Out of the box,” DataFusion quickly runs complex SQL and
DataFrame
queries using a sophisticated query planner, a columnar,
multi-threaded, vectorized execution engine, and partitioned data
sources (Parquet, CSV, JSON, and Avro).
DataFusion is designed for easy customization such as supporting additional data sources, query languages, functions, custom operators and more. See the Architecture section for more details.
Examples
The main entry point for interacting with DataFusion is the
SessionContext
.
DataFrame
To execute a query against data stored
in a CSV file using a DataFrame
:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
SQL
To execute a query against a CSV file using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
More Examples
There are many additional annotated examples of using DataFusion in the datafusion-examples directory.
Customization and Extension
DataFusion supports extension at many points:
- read from any datasource (
TableProvider
) - define your own catalogs, schemas, and table lists (
CatalogProvider
) - build your own query langue or plans using the (
LogicalPlanBuilder
) - declare and use user-defined scalar functions (
ScalarUDF
) - declare and use user-defined aggregate functions (
AggregateUDF
) - add custom optimizer rewrite passes (
OptimizerRule
andPhysicalOptimizerRule
) - extend the planner to use user-defined logical and physical nodes (
QueryPlanner
)
You can find examples of each of them in the datafusion-examples directory.
Architecture
Overview Presentations
The following presentations offer high level overviews of the different components and how they interact together.
- [Apr 2023]: The Apache Arrow DataFusion Architecture talks
- [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
- [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
- [February 2021]: How DataFusion is used within the Ballista Project is described in Ballista: Distributed Compute with Rust and Apache Arrow: recording
Query Planning and Execution Overview
SQL
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes
-
The query string is parsed to an Abstract Syntax Tree (AST)
Statement
using sqlparser. -
The AST is converted to a
LogicalPlan
and logical expressionsExpr
s to compute the desired result by theSqlToRel
planner.
DataFrame
When executing plans using the DataFrame
API, the process is
identical as with SQL, except the DataFrame API builds the
LogicalPlan
directly using LogicalPlanBuilder
. Systems
that have their own custom query languages typically also build
LogicalPlan
directly.
Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
To process large datasets with many rows as efficiently as possible, significant effort is spent planning and optimizing, in the following manner:
-
The
LogicalPlan
is checked and rewritten to enforce semantic rules, such as type coercion, byAnalyzerRule
s -
The
LogicalPlan
is rewritten byOptimizerRule
s, such as projection and filter pushdown, to improve its efficiency. -
The
LogicalPlan
is converted to anExecutionPlan
by aPhysicalPlanner
-
The
ExecutionPlan
is rewritten byPhysicalOptimizerRule
s, such as sort and join selection, to improve its efficiency.
Data Sources
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlan
DataFusion includes several built in data sources for common use
cases, and can be extended by implementing the TableProvider
trait. A TableProvider
provides information for planning and
an ExecutionPlan
s for execution.
-
ListingTable
: Reads data from Parquet, JSON, CSV, or AVRO files. Supports single files or multiple files with HIVE style partitioning, optional compression, directly reading from remote object store and more. -
MemTable
: Reads data from in memoryRecordBatch
es. -
StreamingTable
: Reads data from potentially unbounded inputs.
Plans
Logical planning yields LogicalPlan
s nodes and Expr
expressions which are Schema
aware and represent statements
independent of how they are physically executed.
A LogicalPlan
is a Directed Acyclic Graph (DAG) of other
LogicalPlan
s, each potentially containing embedded Expr
s.
An ExecutionPlan
(sometimes referred to as a “physical plan”)
is a plan that can be executed against data. It a DAG of other
ExecutionPlan
s each potentially containing expressions of the
following types:
-
PhysicalExpr
: Scalar functions -
AggregateExpr
: Aggregate functions -
WindowExpr
: Window functions
Compared to a LogicalPlan
, an ExecutionPlan
has concrete
information about how to perform calculations (e.g. hash vs merge
join), and how data flows during execution (e.g. partitioning and
sortedness).
Execution
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─
ExecutionPlan
s process data using the Apache Arrow memory
format, largely with functions from the arrow crate. When
execute
is called, a SendableRecordBatchStream
is returned
that produces the desired output as a Stream
of RecordBatch
es.
Values are
represented with ColumnarValue
, which are either single
constant values (ScalarValue
) or Arrow Arrays (ArrayRef
).
See the implementors of ExecutionPlan
for a list of physical operators available.
State Management and Configuration
ConfigOptions
contain options to control DataFusion’s
execution.
The state required to execute queries is managed by the following structures:
-
SessionContext
: State needed for createLogicalPlan
s such as the table definitions, and the function registries. -
TaskContext
: State needed for execution such as theMemoryPool
,DiskManager
, andObjectStoreRegistry
. -
ExecutionProps
: Per-execution properties and data (such as starting timestamps, etc).
Resource Management
The amount of memory and temporary local disk space used by
DataFusion when running a plan can be controlled using the
MemoryPool
and DiskManager
.
Crate Organization
DataFusion is organized into multiple crates to enforce modularity and improve compilation times. The crates are:
- datafusion_common: Common traits and types
- datafusion_execution: State needed for execution
- datafusion_expr:
LogicalPlan
,Expr
and related logical planning structure - datafusion_optimizer:
OptimizerRule
s andAnalyzerRule
s - datafusion_physical_expr:
PhysicalExpr
and related expressions - datafusion_sql:
SqlToRel
SQL planner
Re-exports
pub use arrow;
pub use parquet;
pub use datafusion_common as common;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
pub use datafusion_row as row;
pub use datafusion_sql as sql;
Modules
- This module contains utilities to manipulate avro metadata.
- This module contains interfaces and default implementations of table namespacing concepts, including catalogs and schemas.
- DataFusion Configuration Options
- DataFrame API for building and executing query plans.
- DataFusion data sources
- DataFusion error types
- Shared state for query planning and execution.
- A trait to define from_slice functions for arrow types
- This module contains a query optimizer that operates against a physical plan and applies rules to a physical plan, such as “Repartition”.
- Traits for physical query plan, supporting parallel execution for partitioned relations.
- A “prelude” for users of the datafusion crate.
- ScalarValue reimported from datafusion-common to easy migration when datafusion was split into several different crates
- Utility functions to make testing DataFusion based crates easier
- Variable provider
Macros
- Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
- Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
Constants
- DataFusion crate version