use std::hint::black_box;
use arrow::datatypes::{DataType, Field, Schema};
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use datafusion_expr::{
JoinType, LogicalPlan, LogicalPlanBuilder, col, lit, logical_plan::table_scan,
};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_optimizer::optimize_projections::OptimizeProjections;
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
fn table(name: &str, cols: usize) -> LogicalPlan {
let fields: Vec<Field> = (0..cols)
.map(|i| Field::new(format!("c{i}"), DataType::Int32, true))
.collect();
table_scan(Some(name), &Schema::new(fields), None)
.unwrap()
.build()
.unwrap()
}
fn scan_with_filter(name: &str, cols: usize, filter_col: usize) -> LogicalPlan {
LogicalPlanBuilder::from(table(name, cols))
.filter(col(format!("{name}.c{filter_col}")).gt(lit(0i32)))
.unwrap()
.build()
.unwrap()
}
fn plan_tpch_q3() -> LogicalPlan {
let customer = scan_with_filter("customer", 8, 6);
let orders = scan_with_filter("orders", 9, 4);
let lineitem = scan_with_filter("lineitem", 16, 10);
LogicalPlanBuilder::from(customer)
.join_on(
orders,
JoinType::Inner,
vec![col("customer.c0").eq(col("orders.c1"))],
)
.unwrap()
.join_on(
lineitem,
JoinType::Inner,
vec![col("lineitem.c0").eq(col("orders.c0"))],
)
.unwrap()
.aggregate(
vec![col("lineitem.c0"), col("orders.c4"), col("orders.c7")],
vec![sum(col("lineitem.c5") - col("lineitem.c6"))],
)
.unwrap()
.build()
.unwrap()
}
fn plan_tpch_q5() -> LogicalPlan {
let region = scan_with_filter("region", 3, 1);
let nation = table("nation", 4);
let customer = table("customer", 8);
let orders = table("orders", 9);
let lineitem = table("lineitem", 16);
let supplier = table("supplier", 7);
LogicalPlanBuilder::from(region)
.join_on(
nation,
JoinType::Inner,
vec![col("region.c0").eq(col("nation.c2"))],
)
.unwrap()
.join_on(
customer,
JoinType::Inner,
vec![col("nation.c0").eq(col("customer.c3"))],
)
.unwrap()
.join_on(
orders,
JoinType::Inner,
vec![col("customer.c0").eq(col("orders.c1"))],
)
.unwrap()
.join_on(
lineitem,
JoinType::Inner,
vec![col("lineitem.c0").eq(col("orders.c0"))],
)
.unwrap()
.join_on(
supplier,
JoinType::Inner,
vec![col("lineitem.c2").eq(col("supplier.c0"))],
)
.unwrap()
.aggregate(
vec![col("nation.c1")],
vec![sum(col("lineitem.c5") - col("lineitem.c6"))],
)
.unwrap()
.build()
.unwrap()
}
fn plan_clickbench_groupby() -> LogicalPlan {
let hits = table("hits", 100);
let predicate = col("hits.c5")
.gt(lit(100i32))
.and(col("hits.c12").lt(lit(1000i32)));
LogicalPlanBuilder::from(hits)
.filter(predicate)
.unwrap()
.aggregate(
vec![col("hits.c3"), col("hits.c7")],
vec![sum(col("hits.c42")), sum(col("hits.c60"))],
)
.unwrap()
.build()
.unwrap()
}
fn plan_tpcds_subquery() -> LogicalPlan {
let store_sales = table("store_sales", 23);
let customer = table("customer", 18);
let item = table("item", 22);
let sub = LogicalPlanBuilder::from(store_sales)
.filter(col("store_sales.c5").gt(lit(0i32)))
.unwrap()
.project(vec![
col("store_sales.c0"),
col("store_sales.c3"),
col("store_sales.c13"),
])
.unwrap()
.alias("sub")
.unwrap()
.build()
.unwrap();
LogicalPlanBuilder::from(customer)
.join_on(
sub,
JoinType::Inner,
vec![col("customer.c0").eq(col("sub.c3"))],
)
.unwrap()
.join_on(
item,
JoinType::Inner,
vec![col("item.c0").eq(col("sub.c0"))],
)
.unwrap()
.aggregate(vec![col("customer.c2")], vec![sum(col("sub.c13"))])
.unwrap()
.build()
.unwrap()
}
fn plan_small_schema() -> LogicalPlan {
LogicalPlanBuilder::from(table("t", 10))
.filter(col("t.c3").gt(lit(0i32)))
.unwrap()
.project(vec![col("t.c0"), col("t.c1"), col("t.c5")])
.unwrap()
.build()
.unwrap()
}
type BenchCase = (&'static str, fn() -> LogicalPlan);
fn bench_optimize_projections(c: &mut Criterion) {
let rule = OptimizeProjections::new();
let config = OptimizerContext::new();
let mut group = c.benchmark_group("optimize_projections");
let cases: &[BenchCase] = &[
("tpch_q3", plan_tpch_q3),
("tpch_q5", plan_tpch_q5),
("clickbench_groupby", plan_clickbench_groupby),
("tpcds_subquery", plan_tpcds_subquery),
("small_schema", plan_small_schema),
];
for (name, build) in cases {
group.bench_function(*name, |b| {
b.iter_batched(
build,
|plan| black_box(rule.rewrite(plan, &config).unwrap()),
BatchSize::SmallInput,
);
});
}
group.finish();
}
criterion_group!(benches, bench_optimize_projections);
criterion_main!(benches);