datafusion 9.0.0

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model
Documentation

DataFusion is an extensible query execution framework that uses Apache Arrow as its in-memory format.

DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.

Below is an example of how to execute a query against data stored in a CSV file using a DataFrame:

# use datafusion::prelude::*;
# use datafusion::error::Result;
# use datafusion::arrow::record_batch::RecordBatch;

# #[tokio::main]
# async fn main() -> Result<()> {
let ctx = SessionContext::new();

// create the dataframe
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(None, Some(100))?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();

let expected = vec![
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2              |",
"+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
# Ok(())
# }

and how to execute a query against a CSV using SQL:

# use datafusion::prelude::*;
# use datafusion::error::Result;
# use datafusion::arrow::record_batch::RecordBatch;

# #[tokio::main]
# async fn main() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();

let expected = vec![
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2              |",
"+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
# Ok(())
# }

Parse, Plan, Optimize, Execute

DataFusion is a fully fledged query engine capable of performing complex operations. Specifically, when DataFusion receives an SQL query, there are different steps that it passes through until a result is obtained. Broadly, they are:

  1. The string is parsed to an Abstract syntax tree (AST) using sqlparser.
  2. The planner SqlToRel converts logical expressions on the AST to logical expressions Exprs.
  3. The planner SqlToRel converts logical nodes on the AST to a LogicalPlan.
  4. OptimizerRules are applied to the LogicalPlan to optimize it.
  5. The LogicalPlan is converted to an ExecutionPlan by a PhysicalPlanner
  6. The ExecutionPlan is executed against data through the SessionContext

With a DataFrame API, steps 1-3 are not used as the DataFrame builds the LogicalPlan directly.

Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a lot of effort to ensure that phase 6 runs efficiently and without errors.

DataFusion's planning is divided in two main parts: logical planning and physical planning.

Logical plan

Logical planning yields logical plans and logical expressions. These are Schema-aware traits that represent statements whose result is independent of how it should physically be executed.

A LogicalPlan is a Directed Acyclic Graph (DAG) of other LogicalPlans and each node contains logical expressions (Exprs). All of these are located in logical_plan.

Physical plan

A Physical plan (ExecutionPlan) is a plan that can be executed against data. Contrarily to a logical plan, the physical plan has concrete information about how the calculation should be performed (e.g. what Rust functions are used) and how data should be loaded into memory.

ExecutionPlan uses the Arrow format as its in-memory representation of data, through the [arrow] crate. We recommend going through its documentation for details on how the data is physically represented.

A ExecutionPlan is composed by nodes (implement the trait ExecutionPlan), and each node is composed by physical expressions (PhysicalExpr) or aggreagate expressions (AggregateExpr). All of these are located in the module physical_plan.

Broadly speaking,

(*) Technically, it aggregates the results on each partition and then merges the results into a single partition.

The following physical nodes are currently implemented:

Customize

DataFusion allows users to

  • extend the planner to use user-defined logical and physical nodes (QueryPlanner)
  • declare and use user-defined scalar functions (ScalarUDF)
  • declare and use user-defined aggregate functions (AggregateUDF)

you can find examples of each of them in examples section.

Examples

Examples are located in datafusion-examples directory

Here's how to run them

git clone https://github.com/apache/arrow-datafusion
cd arrow-datafusion
# Download test data
git submodule update --init

cargo run --example csv_sql

cargo run --example parquet_sql

cargo run --example dataframe

cargo run --example dataframe_in_memory

cargo run --example simple_udaf

cargo run --example simple_udf