#[macro_use]
extern crate criterion;
use criterion::Criterion;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::SessionConfig;
use parking_lot::Mutex;
use std::sync::Arc;
extern crate arrow;
extern crate datafusion;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use tokio::runtime::Runtime;
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
}
fn create_context() -> Arc<Mutex<SessionContext>> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]));
let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");
let table_path = ListingTableUrl::parse(path).unwrap();
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(schema);
let csv = async { ListingTable::try_new(config).unwrap() };
let rt = Runtime::new().unwrap();
let ctx_holder: Arc<Mutex<Vec<Arc<Mutex<SessionContext>>>>> =
Arc::new(Mutex::new(vec![]));
let partitions = 16;
rt.block_on(async {
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(1),
);
let table_provider = Arc::new(csv.await);
let mem_table = MemTable::load(table_provider, Some(partitions), &ctx.state())
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
.unwrap();
ctx_holder.lock().push(Arc::new(Mutex::new(ctx)))
});
let ctx = ctx_holder.lock().first().unwrap().clone();
ctx
}
fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context();
let rt = Runtime::new().unwrap();
c.bench_function("sort_and_limit_by_int", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c6
LIMIT 10",
)
})
});
c.bench_function("sort_and_limit_by_float", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT c1, c13, c12 \
FROM aggregate_test_100 \
ORDER BY c13
LIMIT 10",
)
})
});
c.bench_function("sort_and_limit_lex_by_int", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c6 DESC, c10 DESC
LIMIT 10",
)
})
});
c.bench_function("sort_and_limit_lex_by_string", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT c1, c13, c6, c10 \
FROM aggregate_test_100 \
ORDER BY c1, c13
LIMIT 10",
)
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);