Crate datafusion
source · [−]Expand description
DataFusion is an extensible query execution framework that uses Apache Arrow as its in-memory format.
DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.
Below is an example of how to execute a query against data stored
in a CSV file using a DataFrame
:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(None, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
and how to execute a query against a CSV using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
Parse, Plan, Optimize, Execute
DataFusion is a fully fledged query engine capable of performing complex operations. Specifically, when DataFusion receives an SQL query, there are different steps that it passes through until a result is obtained. Broadly, they are:
- The string is parsed to an Abstract syntax tree (AST) using sqlparser.
- The planner
SqlToRel
converts logical expressions on the AST to logical expressionsExpr
s. - The planner
SqlToRel
converts logical nodes on the AST to aLogicalPlan
. OptimizerRules
are applied to theLogicalPlan
to optimize it.- The
LogicalPlan
is converted to anExecutionPlan
by aPhysicalPlanner
- The
ExecutionPlan
is executed against data through theSessionContext
With a DataFrame
API, steps 1-3 are not used as the DataFrame builds the LogicalPlan
directly.
Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a lot of effort to ensure that phase 6 runs efficiently and without errors.
DataFusion’s planning is divided in two main parts: logical planning and physical planning.
Logical plan
Logical planning yields logical plans
and logical expressions
.
These are Schema
-aware traits that represent statements whose result is independent of how it should physically be executed.
A LogicalPlan
is a Directed Acyclic Graph (DAG) of other LogicalPlan
s and each node contains logical expressions (Expr
s).
All of these are located in logical_plan
.
Physical plan
A Physical plan (ExecutionPlan
) is a plan that can be executed against data.
Contrarily to a logical plan, the physical plan has concrete information about how the calculation
should be performed (e.g. what Rust functions are used) and how data should be loaded into memory.
ExecutionPlan
uses the Arrow format as its in-memory representation of data, through the arrow crate.
We recommend going through its documentation for details on how the data is physically represented.
A ExecutionPlan
is composed by nodes (implement the trait ExecutionPlan
),
and each node is composed by physical expressions (PhysicalExpr
)
or aggreagate expressions (AggregateExpr
).
All of these are located in the module physical_plan
.
Broadly speaking,
- an
ExecutionPlan
receives a partition number and asyncronosly returns an iterator overRecordBatch
(a node-specific struct that implementsRecordBatchReader
) - a
PhysicalExpr
receives aRecordBatch
and returns anArray
- an
AggregateExpr
receivesRecordBatch
es and returns aRecordBatch
of a single row(*)
(*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
The following physical nodes are currently implemented:
- Projection:
ProjectionExec
- Filter:
FilterExec
- Grouped and non-grouped aggregations:
AggregateExec
- Sort:
SortExec
- Coalesce partitions:
CoalescePartitionsExec
- Limit:
LocalLimitExec
andGlobalLimitExec
- Scan a CSV:
CsvExec
- Scan a Parquet:
ParquetExec
- Scan from memory:
MemoryExec
- Explain the plan:
ExplainExec
Customize
DataFusion allows users to
- extend the planner to use user-defined logical and physical nodes (
QueryPlanner
) - declare and use user-defined scalar functions (
ScalarUDF
) - declare and use user-defined aggregate functions (
AggregateUDF
)
you can find examples of each of them in examples section.
Examples
Examples are located in datafusion-examples directory
Here’s how to run them
git clone https://github.com/apache/arrow-datafusion
cd arrow-datafusion
git submodule update --init
cargo run --example csv_sql
cargo run --example parquet_sql
cargo run --example dataframe
cargo run --example dataframe_in_memory
cargo run --example simple_udaf
cargo run --example simple_udf
Re-exports
pub use arrow;
pub use parquet;
pub use datafusion_common as common;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
pub use datafusion_row as row;
pub use datafusion_sql as sql;
Modules
This module contains utilities to manipulate avro metadata.
This module contains interfaces and default implementations of table namespacing concepts, including catalogs and schemas.
DataFusion Configuration Options
DataFrame API for building and executing query plans.
DataFusion data sources
DataFusion error types
This module contains the shared state available at different parts of query planning and execution
A trait to define from_slice functions for arrow types
This is a legacy module that only contains re-exports of other modules
This module contains a query optimizer that operates against a physical plan and applies rules to a physical plan, such as “Repartition”.
Traits for physical query plan, supporting parallel execution for partitioned relations.
A “prelude” for users of the datafusion crate.
ScalarValue reimported from datafusion-common
Utility functions to make testing DataFusion based crates easier
Variable provider
Macros
Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
A macro to assert that one string is contained within another with a nice error message if they are not.
A macro to assert that one string is NOT contained within another with a nice error message if they are are.
Constants
DataFusion crate version