#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use data_utils::create_table_provider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
fn create_context(
partitions_len: usize,
array_len: usize,
batch_size: usize,
) -> Result<Arc<Mutex<ExecutionContext>>> {
let mut ctx = ExecutionContext::new();
let provider = create_table_provider(partitions_len, array_len, batch_size)?;
ctx.register_table("t", provider)?;
Ok(Arc::new(Mutex::new(ctx)))
}
fn criterion_benchmark(c: &mut Criterion) {
let partitions_len = 8;
let array_len = 1024 * 1024;
let batch_size = 8 * 1024;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
c.bench_function("window empty over, aggregate functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (), \
MIN(f32) OVER (), \
SUM(u64_narrow) OVER () \
FROM t",
)
})
});
c.bench_function("window empty over, built-in functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (), \
LAST_VALUE(f32) OVER (), \
NTH_VALUE(u64_narrow, 50) OVER () \
FROM t",
)
})
});
c.bench_function("window order by, aggregate functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (ORDER BY u64_narrow), \
MIN(f32) OVER (ORDER BY u64_narrow DESC), \
SUM(u64_narrow) OVER (ORDER BY u64_narrow ASC) \
FROM t",
)
})
});
c.bench_function("window order by, built-in functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (ORDER BY u64_narrow), \
LAST_VALUE(f32) OVER (ORDER BY u64_narrow DESC), \
NTH_VALUE(u64_narrow, 50) OVER (ORDER BY u64_narrow ASC) \
FROM t",
)
})
});
c.bench_function("window partition by, u64_wide, aggregate functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (PARTITION BY u64_wide), \
MIN(f32) OVER (PARTITION BY u64_wide), \
SUM(u64_narrow) OVER (PARTITION BY u64_wide) \
FROM t",
)
})
});
c.bench_function(
"window partition by, u64_narrow, aggregate functions",
|b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (PARTITION BY u64_narrow), \
MIN(f32) OVER (PARTITION BY u64_narrow), \
SUM(u64_narrow) OVER (PARTITION BY u64_narrow) \
FROM t",
)
})
},
);
c.bench_function("window partition by, u64_wide, built-in functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_wide), \
LAST_VALUE(f32) OVER (PARTITION BY u64_wide), \
NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_wide) \
FROM t",
)
})
});
c.bench_function("window partition by, u64_narrow, built-in functions", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow), \
LAST_VALUE(f32) OVER (PARTITION BY u64_narrow), \
NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_narrow) \
FROM t",
)
})
});
c.bench_function(
"window partition and order by, u64_wide, aggregate functions",
|b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (PARTITION BY u64_wide ORDER by f64), \
MIN(f32) OVER (PARTITION BY u64_wide ORDER by f64), \
SUM(u64_narrow) OVER (PARTITION BY u64_wide ORDER by f64) \
FROM t",
)
})
},
);
c.bench_function(
"window partition and order by, u64_narrow, aggregate functions",
|b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
MAX(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \
MIN(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \
SUM(u64_narrow) OVER (PARTITION BY u64_narrow ORDER by f64) \
FROM t",
)
})
},
);
c.bench_function(
"window partition and order by, u64_wide, built-in functions",
|b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_wide ORDER by f64), \
LAST_VALUE(f32) OVER (PARTITION BY u64_wide ORDER by f64), \
NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_wide ORDER by f64) \
FROM t",
)
})
},
);
c.bench_function(
"window partition and order by, u64_narrow, built-in functions",
|b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT \
FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \
LAST_VALUE(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \
NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_narrow ORDER by f64) \
FROM t",
)
})
},
);
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);