DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.
Below is an example of how to execute a query against data stored
in a CSV file using a
let mut ctx = ExecutionContext::new(); // create the dataframe let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; // create a plan let df = df.filter(col("a").lt_eq(col("b")))? .aggregate(vec![col("a")], vec![min(col("b"))])? .limit(100)?; // execute the plan let results: Vec<RecordBatch> = df.collect().await?; // format the results let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; let expected = vec![ "+---+--------+", "| a | MIN(b) |", "+---+--------+", "| 1 | 2 |", "+---+--------+" ]; assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
and how to execute a query against a CSV using SQL:
let mut ctx = ExecutionContext::new(); ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?; // create a plan let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?; // execute the plan let results: Vec<RecordBatch> = df.collect().await?; // format the results let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; let expected = vec![ "+---+--------+", "| a | MIN(b) |", "+---+--------+", "| 1 | 2 |", "+---+--------+" ]; assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
DataFusion is a fully fledged query engine capable of performing complex operations. Specifically, when DataFusion receives an SQL query, there are different steps that it passes through until a result is obtained. Broadly, they are:
- The string is parsed to an Abstract syntax tree (AST) using sqlparser.
- The planner
SqlToRelconverts logical expressions on the AST to logical expressions
- The planner
SqlToRelconverts logical nodes on the AST to a
OptimizerRulesare applied to the
LogicalPlanto optimize it.
LogicalPlanis converted to an
ExecutionPlanis executed against data through the
Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a lot of effort to ensure that phase 6 runs efficiently and without errors.
DataFusion’s planning is divided in two main parts: logical planning and physical planning.
A Physical plan (
ExecutionPlan) is a plan that can be executed against data.
Contrarily to a logical plan, the physical plan has concrete information about how the calculation
should be performed (e.g. what Rust functions are used) and how data should be loaded into memory.
ExecutionPlan uses the Arrow format as its in-memory representation of data, through the arrow crate.
We recommend going through its documentation for details on how the data is physically represented.
ExecutionPlan is composed by nodes (implement the trait
and each node is composed by physical expressions (
or aggreagate expressions (
All of these are located in the module
ExecutionPlanreceives a partition number and asyncronosly returns an iterator over
RecordBatch(a node-specific struct that implements
RecordBatchand returns an
RecordBatches and returns a
RecordBatchof a single row(*)
(*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
The following physical nodes are currently implemented:
- Hash and Grouped aggregations:
- Merge (partitions):
- Scan a CSV:
- Scan a Parquet:
- Scan from memory:
- Explain the plan:
DataFusion allows users to
- extend the planner to use user-defined logical and physical nodes (
- declare and use user-defined scalar functions (
- declare and use user-defined aggregate functions (
you can find examples of each of them in examples section.
This module contains interfaces and default implementations of table namespacing concepts, including catalogs and schemas.
DataFrame API for building and executing query plans.
DataFusion data sources
DataFusion error types
DataFusion query execution
This module provides a logical query plan enum that can describe queries. Logical query plans can be created from a SQL statement or built programmatically via the Table API.
This module contains a query optimizer that operates against a logical plan and applies some simple rules to a logical plan, such as “Projection Push Down” and “Type Coercion”.
This module contains a query optimizer that operates against a physical plan and applies rules to a physical plan, such as “Repartition”.
Traits for physical query plan, supporting parallel execution for partitioned relations.
A “prelude” for users of the datafusion crate.
This module provides ScalarValue, an enum that can be used for storage of single elements
This module provides a SQL parser that translates SQL queries into an abstract syntax tree (AST), and a SQL query planner that creates a logical plan from the AST.
The binary_array_op macro includes types that extend beyond the primitive, such as Utf8 strings.
The binary_array_op_scalar macro includes types that extend beyond the primitive, such as Utf8 strings.