#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use data_utils::{create_table_provider, make_data};
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
fn create_context(
partitions_len: usize,
array_len: usize,
batch_size: usize,
) -> Result<Arc<Mutex<SessionContext>>> {
let ctx = SessionContext::new();
let provider = create_table_provider(partitions_len, array_len, batch_size)?;
ctx.register_table("t", provider)?;
Ok(Arc::new(Mutex::new(ctx)))
}
fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
let partitions_len = 10;
let array_len = 1 << 26; let batch_size = 8192;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("custom-measurement-time");
group.measurement_time(Duration::from_secs(40));
group.bench_function("distinct_group_by_u64_narrow_limit_10", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10",
)
})
});
group.bench_function("distinct_group_by_u64_narrow_limit_100", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100",
)
})
});
group.bench_function("distinct_group_by_u64_narrow_limit_1000", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000",
)
})
});
group.bench_function("distinct_group_by_u64_narrow_limit_10000", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000",
)
})
});
group.bench_function("group_by_multiple_columns_limit_10", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10",
)
})
});
group.finish();
}
async fn distinct_with_limit(
plan: Arc<dyn ExecutionPlan>,
ctx: Arc<TaskContext>,
) -> Result<()> {
let batches = collect(plan, ctx).await?;
assert_eq!(batches.len(), 1);
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), 10);
Ok(())
}
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(), ctx.clone())))
.unwrap();
}
pub async fn create_context_sampled_data(
sql: &str,
partition_cnt: i32,
sample_cnt: i32,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
let (schema, parts) =
make_data(partition_cnt, sample_cnt, false , false).unwrap();
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
let cfg = SessionConfig::new();
let ctx = SessionContext::new_with_config(cfg);
let _ = ctx.register_table("traces", mem_table)?;
let df = ctx.sql(sql).await?;
let physical_plan = df.create_physical_plan().await?;
Ok((physical_plan, ctx.task_ctx()))
}
fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let limit = 10;
let partitions = 100;
let samples = 100_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
c.bench_function(
format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);
let partitions = 10;
let samples = 1_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
c.bench_function(
format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);
let partitions = 1;
let samples = 10_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
c.bench_function(
format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);
}
criterion_group!(
benches,
criterion_benchmark_limited_distinct,
criterion_benchmark_limited_distinct_sampled
);
criterion_main!(benches);