#[macro_use]
extern crate criterion;
use criterion::{BatchSize, Criterion};
extern crate arrow;
extern crate datafusion;
use std::{iter::FromIterator, sync::Arc};
use arrow::{
array::{ArrayRef, Int64Array, StringArray},
record_batch::RecordBatch,
};
use tokio::runtime::Runtime;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{
collect,
expressions::{col, PhysicalSortExpr},
memory::MemoryExec,
};
fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
let schema = batches[0].schema();
let sort = sort
.iter()
.map(|name| PhysicalSortExpr {
expr: col(name, &schema).unwrap(),
options: Default::default(),
})
.collect::<Vec<_>>();
let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema,
None,
)
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
let rt = Runtime::new().unwrap();
let rt_env = Arc::new(RuntimeEnv::default());
rt.block_on(collect(merge, rt_env)).unwrap();
}
fn batches(
n: usize,
m: usize,
rows_per_sort_key: usize,
batch_offset: usize,
) -> Vec<RecordBatch> {
let mut rbs = Vec::with_capacity(n);
let mut curr_batch_offset = 0;
for _ in 0..n {
let mut col_a = Vec::with_capacity(m);
let mut col_b = Vec::with_capacity(m);
let mut col_c = Vec::with_capacity(m);
let mut col_d = Vec::with_capacity(m);
let mut j = 0;
let mut current_rows_per_sort_key = 0;
for i in 0..m {
if current_rows_per_sort_key == rows_per_sort_key {
current_rows_per_sort_key = 0;
j = i;
}
col_a.push(Some(format!("a-{:?}", j)));
col_b.push(Some(format!("b-{:?}", j)));
col_c.push(Some(format!("c-{:?}", j)));
col_d.push(Some((i + curr_batch_offset) as i64));
current_rows_per_sort_key += 1;
}
col_a.sort();
col_b.sort();
col_c.sort();
let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a));
let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b));
let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c));
let col_d: ArrayRef = Arc::new(Int64Array::from(col_d));
let rb = RecordBatch::try_from_iter(vec![
("a", col_a),
("b", col_b),
("c", col_c),
("d", col_d),
])
.unwrap();
rbs.push(rb);
curr_batch_offset += batch_offset;
}
rbs
}
fn criterion_benchmark(c: &mut Criterion) {
let small_batch = batches(1, 100, 10, 0).remove(0);
let large_batch = batches(1, 1000, 1, 0).remove(0);
let benches = vec![
("interleave_batches", batches(2, 1000, 10, 1)),
("merge_batches_some_overlap_small", batches(2, 1000, 10, 5)),
(
"merge_batches_some_overlap_large",
batches(2, 1000, 250, 125),
),
("merge_batches_no_overlap_small", batches(2, 1000, 10, 12)),
("merge_batches_no_overlap_large", batches(2, 1000, 250, 252)),
(
"merge_batches_small_into_large",
vec![large_batch, small_batch],
),
];
for (name, input) in benches {
c.bench_function(name, move |b| {
b.iter_batched(
|| input.clone(),
|input| {
sort_preserving_merge_operator(input, &["a", "b", "c", "d"]);
},
BatchSize::LargeInput,
)
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);