use std::sync::Arc;
use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use criterion::{Criterion, criterion_group, criterion_main};
use lance::dataset::write::merge_insert::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::DatasetIndexExt;
use lance_core::utils::tempfile::TempStrDir;
use lance_index::IndexType;
use lance_index::scalar::ScalarIndexParams;
#[cfg(target_os = "linux")]
use lance_testing::pprof::{Output, PProfProfiler};
const ROWS_PER_FRAG: u64 = 100;
const NUM_FRAGS: u64 = 200;
fn schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int64,
false,
)]))
}
fn make_batch(start_id: i64, n: usize) -> RecordBatch {
let ids = Int64Array::from_iter_values(start_id..start_id + n as i64);
RecordBatch::try_new(schema(), vec![Arc::new(ids)]).unwrap()
}
fn make_batches(start_id: i64, total_rows: u64) -> Vec<RecordBatch> {
let mut out = Vec::new();
let mut remaining = total_rows;
let mut next_start = start_id;
while remaining > 0 {
let n = remaining.min(ROWS_PER_FRAG) as usize;
out.push(make_batch(next_start, n));
next_start += n as i64;
remaining -= n as u64;
}
out
}
async fn build_indexed_base(path: &str) -> Dataset {
let total = ROWS_PER_FRAG * NUM_FRAGS;
let params = WriteParams {
max_rows_per_file: ROWS_PER_FRAG as usize,
max_rows_per_group: ROWS_PER_FRAG as usize,
mode: WriteMode::Create,
..Default::default()
};
let batches = make_batches(0, total);
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema());
Dataset::write(reader, path, Some(params)).await.unwrap();
let mut ds = Dataset::open(path).await.unwrap();
ds.create_index(
&["id"],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();
ds
}
async fn append_rows(path: &str, base_id: i64, n: usize) -> Dataset {
let batch = make_batch(base_id, n);
let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema());
let params = WriteParams {
max_rows_per_file: n,
max_rows_per_group: n,
mode: WriteMode::Append,
..Default::default()
};
Dataset::write(reader, path, Some(params)).await.unwrap();
Dataset::open(path).await.unwrap()
}
async fn delete_some_indexed_rows(ds: &mut Dataset) {
ds.delete("id % 100 == 0").await.unwrap();
}
async fn one_merge_insert(ds: Arc<Dataset>, base_existing: i64, base_new: i64) {
let mut ids: Vec<i64> = (0..30).map(|i| base_existing + i).collect();
ids.extend(base_new..base_new + 70);
let id_arr = Int64Array::from(ids);
let batch = RecordBatch::try_new(schema(), vec![Arc::new(id_arr)]).unwrap();
let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema());
let mut builder = MergeInsertBuilder::try_new(ds, vec!["id".to_string()]).unwrap();
builder
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll);
let job = builder.try_build().unwrap();
let _ = job.execute_reader(reader).await.unwrap();
}
fn composite_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]))
}
fn composite_b(a: i64) -> i64 {
a * 7 + 3
}
fn make_composite_batch(start: i64, n: usize) -> RecordBatch {
let a = Int64Array::from_iter_values(start..start + n as i64);
let b = Int64Array::from_iter_values((start..start + n as i64).map(composite_b));
RecordBatch::try_new(composite_schema(), vec![Arc::new(a), Arc::new(b)]).unwrap()
}
async fn build_composite_key_indexed() -> (TempStrDir, Arc<Dataset>) {
let dir = TempStrDir::default();
let path = dir.as_str().to_string();
let total = ROWS_PER_FRAG * NUM_FRAGS;
let mut batches = Vec::new();
let mut start = 0i64;
while (start as u64) < total {
let n = (total - start as u64).min(ROWS_PER_FRAG) as usize;
batches.push(make_composite_batch(start, n));
start += n as i64;
}
let params = WriteParams {
max_rows_per_file: ROWS_PER_FRAG as usize,
max_rows_per_group: ROWS_PER_FRAG as usize,
mode: WriteMode::Create,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), composite_schema());
Dataset::write(reader, &path, Some(params)).await.unwrap();
let mut ds = Dataset::open(&path).await.unwrap();
for col in ["a", "b"] {
ds.create_index(
&[col],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();
}
(dir, Arc::new(ds))
}
async fn one_composite_merge_insert(ds: Arc<Dataset>, base_existing: i64, base_new: i64) {
let mut a_vals: Vec<i64> = (0..30).map(|i| base_existing + i).collect();
a_vals.extend(base_new..base_new + 70);
let b_vals: Vec<i64> = a_vals.iter().copied().map(composite_b).collect();
let batch = RecordBatch::try_new(
composite_schema(),
vec![
Arc::new(Int64Array::from(a_vals)),
Arc::new(Int64Array::from(b_vals)),
],
)
.unwrap();
let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), composite_schema());
let mut builder =
MergeInsertBuilder::try_new(ds, vec!["a".to_string(), "b".to_string()]).unwrap();
builder
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll);
let job = builder.try_build().unwrap();
let _ = job.execute_reader(reader).await.unwrap();
}
async fn build_clean() -> (TempStrDir, Arc<Dataset>) {
let dir = TempStrDir::default();
let path = dir.as_str().to_string();
let ds = build_indexed_base(&path).await;
(dir, Arc::new(ds))
}
async fn build_with_new_rows_only() -> (TempStrDir, Arc<Dataset>) {
let dir = TempStrDir::default();
let path = dir.as_str().to_string();
build_indexed_base(&path).await;
let base = (ROWS_PER_FRAG * NUM_FRAGS) as i64;
let ds = append_rows(&path, base, 500).await;
(dir, Arc::new(ds))
}
async fn build_with_deletions_only() -> (TempStrDir, Arc<Dataset>) {
let dir = TempStrDir::default();
let path = dir.as_str().to_string();
let mut ds = build_indexed_base(&path).await;
delete_some_indexed_rows(&mut ds).await;
(dir, Arc::new(ds))
}
async fn build_with_new_rows_and_deletions() -> (TempStrDir, Arc<Dataset>) {
let dir = TempStrDir::default();
let path = dir.as_str().to_string();
build_indexed_base(&path).await;
let base = (ROWS_PER_FRAG * NUM_FRAGS) as i64;
let mut ds = append_rows(&path, base, 500).await;
delete_some_indexed_rows(&mut ds).await;
(dir, Arc::new(ds))
}
fn bench_one_shape<F, Fut, M, MFut>(c: &mut Criterion, name: &str, builder: F, merge: M)
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = (TempStrDir, Arc<Dataset>)>,
M: Fn(Arc<Dataset>, i64, i64) -> MFut + Copy,
MFut: std::future::Future<Output = ()>,
{
let rt = tokio::runtime::Runtime::new().unwrap();
let (_dir, ds) = rt.block_on(builder());
let base_version = ds.version().version;
let path = ds.uri().to_string();
let total = rt.block_on(ds.count_rows(None)).unwrap() as i64;
c.bench_function(name, |b| {
b.iter(|| {
rt.block_on(async {
let bench_ds = Dataset::open(&path).await.unwrap();
let mut bench_ds = bench_ds.checkout_version(base_version).await.unwrap();
bench_ds.restore().await.unwrap();
let arc = Arc::new(bench_ds);
merge(arc, 100, total + 1_000_000).await;
})
})
});
}
fn bench_merge_insert(c: &mut Criterion) {
bench_one_shape(c, "merge_insert/clean", build_clean, one_merge_insert);
bench_one_shape(
c,
"merge_insert/with_new_rows_only",
build_with_new_rows_only,
one_merge_insert,
);
bench_one_shape(
c,
"merge_insert/with_deletions_only",
build_with_deletions_only,
one_merge_insert,
);
bench_one_shape(
c,
"merge_insert/with_new_rows_and_deletions",
build_with_new_rows_and_deletions,
one_merge_insert,
);
bench_one_shape(
c,
"merge_insert/composite_key_indexed",
build_composite_key_indexed,
one_composite_merge_insert,
);
}
#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_merge_insert);
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_merge_insert);
criterion_main!(benches);