#![allow(clippy::print_stdout, clippy::print_stderr)]
use std::sync::Arc;
use arrow_array::types::Float32Type;
use arrow_array::{
Array, FixedSizeListArray, Float32Array, Int64Array, RecordBatch, RecordBatchIterator,
StringArray,
};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use futures::TryStreamExt;
use lance::dataset::mem_wal::write::{CacheConfig, IndexStore, MemTable};
use lance::dataset::{Dataset, WriteParams};
use lance::index::vector::VectorIndexParams;
use lance_arrow::FixedSizeListArrayExt;
use lance_index::scalar::FullTextSearchQuery;
use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::ivf::storage::IvfModel;
use lance_index::vector::kmeans::{KMeansParams, train_kmeans};
use lance_index::vector::pq::builder::PQBuildParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::{DistanceType, MetricType};
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};
use rand::Rng;
use uuid::Uuid;
const DEFAULT_NUM_ROWS: usize = 10000;
const DEFAULT_BATCH_SIZE: usize = 100;
const DEFAULT_VECTOR_DIM: usize = 128;
const DEFAULT_NUM_LOOKUPS: usize = 100;
const DEFAULT_K: usize = 10;
fn get_num_rows() -> usize {
std::env::var("NUM_ROWS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_NUM_ROWS)
}
fn get_batch_size() -> usize {
std::env::var("BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_BATCH_SIZE)
}
fn get_vector_dim() -> usize {
std::env::var("VECTOR_DIM")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_VECTOR_DIM)
}
fn get_sample_size() -> usize {
std::env::var("SAMPLE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100)
.max(10)
}
fn create_schema(vector_dim: usize) -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("text", DataType::Utf8, true),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
vector_dim as i32,
),
false,
),
]))
}
fn create_batch(
schema: &ArrowSchema,
start_id: i64,
num_rows: usize,
vector_dim: usize,
) -> RecordBatch {
let mut rng = rand::rng();
let ids: Vec<i64> = (start_id..start_id + num_rows as i64).collect();
let words = [
"hello",
"world",
"search",
"benchmark",
"lance",
"memory",
"test",
"data",
];
let texts: Vec<String> = (0..num_rows)
.map(|i| {
let w1 = words[i % words.len()];
let w2 = words[(i + 3) % words.len()];
let w3 = words[(i + 5) % words.len()];
format!("{} {} {} row_{}", w1, w2, w3, start_id + i as i64)
})
.collect();
let vectors: Vec<f32> = (0..num_rows)
.flat_map(|_| {
let v: Vec<f32> = (0..vector_dim).map(|_| rng.random::<f32>() - 0.5).collect();
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
v.into_iter().map(move |x| x / norm)
})
.collect();
let vector_array =
FixedSizeListArray::try_new_from_values(Float32Array::from(vectors), vector_dim as i32)
.unwrap();
RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(texts)),
Arc::new(vector_array),
],
)
.unwrap()
}
fn create_query_vector(vector_dim: usize) -> Vec<f32> {
let mut rng = rand::rng();
let v: Vec<f32> = (0..vector_dim).map(|_| rng.random::<f32>() - 0.5).collect();
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
v.into_iter().map(|x| x / norm).collect()
}
fn generate_random_ids(max_id: i64, count: usize) -> Vec<i64> {
let mut rng = rand::rng();
(0..count).map(|_| rng.random_range(0..max_id)).collect()
}
fn train_ivf_pq_models(
batches: &[RecordBatch],
vector_dim: usize,
num_partitions: usize,
num_sub_vectors: usize,
distance_type: DistanceType,
) -> (IvfModel, lance_index::vector::pq::ProductQuantizer) {
let mut all_vectors: Vec<f32> = Vec::new();
for batch in batches {
let vector_col = batch.column_by_name("vector").unwrap();
let fsl = vector_col
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
let values = fsl
.values()
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
all_vectors.extend_from_slice(values.values());
}
let vectors_array = Float32Array::from(all_vectors);
let kmeans_params = KMeansParams::new(None, 50, 1, distance_type);
let kmeans = train_kmeans::<Float32Type>(
&vectors_array,
kmeans_params,
vector_dim,
num_partitions,
256,
)
.unwrap();
let centroids_flat = kmeans
.centroids
.as_any()
.downcast_ref::<Float32Array>()
.expect("Centroids should be Float32Array")
.clone();
let centroids_fsl =
FixedSizeListArray::try_new_from_values(centroids_flat, vector_dim as i32).unwrap();
let ivf_model = IvfModel::new(centroids_fsl, None);
let vectors_fsl =
FixedSizeListArray::try_new_from_values(vectors_array, vector_dim as i32).unwrap();
let pq_params = PQBuildParams::new(num_sub_vectors, 8);
let pq = pq_params.build(&vectors_fsl, distance_type).unwrap();
(ivf_model, pq)
}
async fn setup_memtable(
batches: Vec<RecordBatch>,
vector_dim: usize,
num_partitions: usize,
num_sub_vectors: usize,
) -> MemTable {
let schema = batches[0].schema();
let num_batches = batches.len();
let (ivf_model, pq) = train_ivf_pq_models(
&batches,
vector_dim,
num_partitions,
num_sub_vectors,
DistanceType::L2,
);
let mut index_store = IndexStore::new();
index_store.add_btree("id_idx".to_string(), 0, "id".to_string());
index_store.add_fts("text_idx".to_string(), 1, "text".to_string());
index_store.add_ivf_pq(
"vector_idx".to_string(),
2,
"vector".to_string(),
ivf_model,
pq,
DistanceType::L2,
);
let batch_capacity = ((num_batches as f64) * 1.1) as usize;
let mut memtable =
MemTable::with_capacity(schema, 1, vec![0], CacheConfig::default(), batch_capacity)
.unwrap();
memtable.set_indexes(index_store);
for batch in batches.into_iter() {
memtable.insert(batch).await.unwrap();
}
memtable
}
struct LanceSetup {
dataset: Arc<Dataset>,
#[allow(dead_code)]
total_rows: usize,
}
async fn setup_lance(batches: Vec<RecordBatch>) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_bench_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: total_rows + 1,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
async fn setup_lance_per_batch(batches: Vec<RecordBatch>, batch_size: usize) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_per_batch_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: batch_size,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
async fn setup_lance_with_fts(batches: Vec<RecordBatch>) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_fts_bench_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: total_rows + 1,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
let params = InvertedIndexParams::default();
dataset
.create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
async fn setup_lance_per_batch_with_fts(
batches: Vec<RecordBatch>,
batch_size: usize,
) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_fts_per_batch_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: batch_size,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
let params = InvertedIndexParams::default();
dataset
.create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
async fn setup_lance_with_vector_index(
batches: Vec<RecordBatch>,
num_partitions: usize,
num_sub_vectors: usize,
) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_vec_bench_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: total_rows + 1,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
let ivf_params = IvfBuildParams {
num_partitions: Some(num_partitions),
..Default::default()
};
let pq_params = PQBuildParams {
num_sub_vectors,
num_bits: 8,
..Default::default()
};
let vector_params =
VectorIndexParams::with_ivf_pq_params(MetricType::L2, ivf_params, pq_params);
dataset
.create_index(&["vector"], IndexType::Vector, None, &vector_params, true)
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
async fn setup_lance_per_batch_with_vector_index(
batches: Vec<RecordBatch>,
batch_size: usize,
num_partitions: usize,
num_sub_vectors: usize,
) -> LanceSetup {
let schema = batches[0].schema();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let uri = format!("memory://lance_vec_per_batch_{}", Uuid::new_v4());
let write_params = WriteParams {
max_rows_per_file: batch_size,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();
let ivf_params = IvfBuildParams {
num_partitions: Some(num_partitions),
..Default::default()
};
let pq_params = PQBuildParams {
num_sub_vectors,
num_bits: 8,
..Default::default()
};
let vector_params =
VectorIndexParams::with_ivf_pq_params(MetricType::L2, ivf_params, pq_params);
dataset
.create_index(&["vector"], IndexType::Vector, None, &vector_params, true)
.await
.unwrap();
LanceSetup {
dataset: Arc::new(dataset),
total_rows,
}
}
fn bench_scan(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let num_rows = get_num_rows();
let batch_size = get_batch_size();
let vector_dim = get_vector_dim();
let sample_size = get_sample_size();
let num_batches = num_rows.div_ceil(batch_size);
let schema = create_schema(vector_dim);
println!("=== Scan Benchmark ===");
println!("Num rows: {}", num_rows);
println!("Batch size: {}", batch_size);
println!("Num batches: {}", num_batches);
println!();
let batches: Vec<RecordBatch> = (0..num_batches)
.map(|i| {
let start_id = (i * batch_size) as i64;
let rows = batch_size.min(num_rows - i * batch_size);
create_batch(&schema, start_id, rows, vector_dim)
})
.collect();
let lance_setup = rt.block_on(setup_lance(batches.clone()));
println!(
"Lance (single fragment): {} fragments",
lance_setup.dataset.get_fragments().len()
);
let lance_per_batch_setup = rt.block_on(setup_lance_per_batch(batches.clone(), batch_size));
println!(
"Lance (per-batch): {} fragments",
lance_per_batch_setup.dataset.get_fragments().len()
);
let num_partitions = (num_rows / 100).clamp(4, 256);
let num_sub_vectors = (vector_dim / 8).clamp(4, 32);
println!("Creating MemTable with indexes...");
let memtable = rt.block_on(setup_memtable(
batches,
vector_dim,
num_partitions,
num_sub_vectors,
));
println!(
"MemTable created with {} rows",
memtable.batch_store().total_rows()
);
let mut group = c.benchmark_group("Scan");
group.throughput(Throughput::Elements(num_rows as u64));
group.sample_size(sample_size);
let label = format!("{}_rows", num_rows);
group.bench_with_input(BenchmarkId::new("MemTable", &label), &(), |b, _| {
b.to_async(&rt).iter(|| async {
let batches: Vec<RecordBatch> = memtable
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
});
});
group.bench_with_input(
BenchmarkId::new("Lance_SingleFragment", &label),
&(),
|b, _| {
let dataset = lance_setup.dataset.clone();
b.to_async(&rt).iter(|| async {
let batches: Vec<RecordBatch> = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_PerBatchFragment", &label),
&(),
|b, _| {
let dataset = lance_per_batch_setup.dataset.clone();
b.to_async(&rt).iter(|| async {
let batches: Vec<RecordBatch> = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
});
},
);
group.finish();
}
fn bench_point_lookup(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let num_rows = get_num_rows();
let batch_size = get_batch_size();
let vector_dim = get_vector_dim();
let sample_size = get_sample_size();
let num_lookups = DEFAULT_NUM_LOOKUPS;
let num_batches = num_rows.div_ceil(batch_size);
let schema = create_schema(vector_dim);
println!("=== Point Lookup Benchmark ===");
println!("Num rows: {}", num_rows);
println!("Num lookups: {}", num_lookups);
println!();
let batches: Vec<RecordBatch> = (0..num_batches)
.map(|i| {
let start_id = (i * batch_size) as i64;
let rows = batch_size.min(num_rows - i * batch_size);
create_batch(&schema, start_id, rows, vector_dim)
})
.collect();
let lance_setup = rt.block_on(setup_lance(batches.clone()));
println!(
"Lance (single fragment): {} fragments",
lance_setup.dataset.get_fragments().len()
);
let lance_per_batch_setup = rt.block_on(setup_lance_per_batch(batches.clone(), batch_size));
println!(
"Lance (per-batch): {} fragments",
lance_per_batch_setup.dataset.get_fragments().len()
);
let num_partitions = (num_rows / 100).clamp(4, 256);
let num_sub_vectors = (vector_dim / 8).clamp(4, 32);
println!("Creating MemTable with indexes...");
let memtable = rt.block_on(setup_memtable(
batches,
vector_dim,
num_partitions,
num_sub_vectors,
));
println!("MemTable created.");
let lookup_ids = generate_random_ids(num_rows as i64, num_lookups);
let mut group = c.benchmark_group("PointLookup");
group.throughput(Throughput::Elements(num_lookups as u64));
group.sample_size(sample_size);
let label = format!("{}_lookups", num_lookups);
group.bench_with_input(
BenchmarkId::new("MemTable_Filter", &label),
&lookup_ids,
|b, ids| {
let id_list: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
let filter = format!("id IN ({})", id_list.join(","));
b.to_async(&rt).iter(|| {
let filter = filter.clone();
let mut scanner = memtable.scan();
async move {
let batches: Vec<RecordBatch> = scanner
.filter(&filter)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_SingleFragment_Filter", &label),
&lookup_ids,
|b, ids| {
let dataset = lance_setup.dataset.clone();
let id_list: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
let filter = format!("id IN ({})", id_list.join(","));
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let filter = filter.clone();
async move {
let batches: Vec<RecordBatch> = dataset
.scan()
.filter(&filter)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_PerBatchFragment_Filter", &label),
&lookup_ids,
|b, ids| {
let dataset = lance_per_batch_setup.dataset.clone();
let id_list: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
let filter = format!("id IN ({})", id_list.join(","));
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let filter = filter.clone();
async move {
let batches: Vec<RecordBatch> = dataset
.scan()
.filter(&filter)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.finish();
}
fn bench_fts(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let num_rows = get_num_rows();
let batch_size = get_batch_size();
let vector_dim = get_vector_dim();
let sample_size = get_sample_size();
let num_batches = num_rows.div_ceil(batch_size);
let schema = create_schema(vector_dim);
println!("=== FTS Benchmark ===");
println!("Num rows: {}", num_rows);
println!("Batch size: {}", batch_size);
println!("Num batches: {}", num_batches);
println!();
let batches: Vec<RecordBatch> = (0..num_batches)
.map(|i| {
let start_id = (i * batch_size) as i64;
let rows = batch_size.min(num_rows - i * batch_size);
create_batch(&schema, start_id, rows, vector_dim)
})
.collect();
println!("Creating Lance dataset with FTS index (single fragment)...");
let lance_fts_setup = rt.block_on(setup_lance_with_fts(batches.clone()));
println!(
"Lance FTS (single fragment): {} fragments",
lance_fts_setup.dataset.get_fragments().len()
);
println!("Creating Lance dataset with FTS index (per-batch fragments)...");
let lance_fts_per_batch_setup =
rt.block_on(setup_lance_per_batch_with_fts(batches.clone(), batch_size));
println!(
"Lance FTS (per-batch): {} fragments",
lance_fts_per_batch_setup.dataset.get_fragments().len()
);
let num_partitions = (num_rows / 100).clamp(4, 256);
let num_sub_vectors = (vector_dim / 8).clamp(4, 32);
println!("Creating MemTable with indexes...");
let memtable = rt.block_on(setup_memtable(
batches,
vector_dim,
num_partitions,
num_sub_vectors,
));
println!("MemTable created.");
let search_terms = ["hello", "world", "search", "benchmark", "lance"];
let mut group = c.benchmark_group("FTS");
group.throughput(Throughput::Elements(search_terms.len() as u64));
group.sample_size(sample_size);
let label = format!("{}_terms", search_terms.len());
group.bench_with_input(
BenchmarkId::new("MemTable_FTS", &label),
&search_terms,
|b, terms| {
b.to_async(&rt).iter(|| {
let terms = *terms;
let scanners: Vec<_> = terms.iter().map(|_| memtable.scan()).collect();
async move {
let mut total_found = 0usize;
for (mut scanner, term) in scanners.into_iter().zip(terms.iter()) {
let batches: Vec<RecordBatch> = scanner
.full_text_search("text", term)
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
total_found += batches.iter().map(|b| b.num_rows()).sum::<usize>();
}
assert!(total_found > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_SingleFragment_FTS", &label),
&search_terms,
|b, terms| {
let dataset = lance_fts_setup.dataset.clone();
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let terms = terms.to_vec();
async move {
let mut total_found = 0usize;
for term in terms {
let query = FullTextSearchQuery::new(term.to_string());
let batches: Vec<RecordBatch> = dataset
.scan()
.full_text_search(query)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
total_found += batches.iter().map(|b| b.num_rows()).sum::<usize>();
}
assert!(total_found > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_PerBatchFragment_FTS", &label),
&search_terms,
|b, terms| {
let dataset = lance_fts_per_batch_setup.dataset.clone();
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let terms = terms.to_vec();
async move {
let mut total_found = 0usize;
for term in terms {
let query = FullTextSearchQuery::new(term.to_string());
let batches: Vec<RecordBatch> = dataset
.scan()
.full_text_search(query)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
total_found += batches.iter().map(|b| b.num_rows()).sum::<usize>();
}
assert!(total_found > 0);
}
});
},
);
group.finish();
}
fn bench_vector_search(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let num_rows = get_num_rows();
let batch_size = get_batch_size();
let vector_dim = get_vector_dim();
let sample_size = get_sample_size();
let k = DEFAULT_K;
let num_batches = num_rows.div_ceil(batch_size);
let schema = create_schema(vector_dim);
println!("=== Vector Search Benchmark ===");
println!("Num rows: {}", num_rows);
println!("Batch size: {}", batch_size);
println!("Num batches: {}", num_batches);
println!("Vector dim: {}", vector_dim);
println!("K: {}", k);
println!();
let batches: Vec<RecordBatch> = (0..num_batches)
.map(|i| {
let start_id = (i * batch_size) as i64;
let rows = batch_size.min(num_rows - i * batch_size);
create_batch(&schema, start_id, rows, vector_dim)
})
.collect();
let num_partitions = (num_rows / 100).clamp(4, 256);
let num_sub_vectors = (vector_dim / 8).clamp(4, 32);
println!(
"Creating Lance dataset with IVF-PQ index (single fragment, partitions={}, sub_vectors={})...",
num_partitions, num_sub_vectors
);
let lance_vec_setup = rt.block_on(setup_lance_with_vector_index(
batches.clone(),
num_partitions,
num_sub_vectors,
));
println!(
"Lance IVF-PQ (single fragment): {} fragments",
lance_vec_setup.dataset.get_fragments().len()
);
println!(
"Creating Lance dataset with IVF-PQ index (per-batch fragments, partitions={}, sub_vectors={})...",
num_partitions, num_sub_vectors
);
let lance_vec_per_batch_setup = rt.block_on(setup_lance_per_batch_with_vector_index(
batches.clone(),
batch_size,
num_partitions,
num_sub_vectors,
));
println!(
"Lance IVF-PQ (per-batch): {} fragments",
lance_vec_per_batch_setup.dataset.get_fragments().len()
);
println!(
"Creating MemTable with IVF-PQ index (partitions={}, sub_vectors={})...",
num_partitions, num_sub_vectors
);
let memtable = rt.block_on(setup_memtable(
batches,
vector_dim,
num_partitions,
num_sub_vectors,
));
println!("MemTable IVF-PQ index created.");
let query = create_query_vector(vector_dim);
let mut group = c.benchmark_group("VectorSearch");
group.throughput(Throughput::Elements(1));
group.sample_size(sample_size);
let label = format!("{}_rows_k{}", num_rows, k);
group.bench_with_input(
BenchmarkId::new("MemTable_IVFPQ", &label),
&query,
|b, q| {
let query_array: Arc<dyn arrow_array::Array> = Arc::new(Float32Array::from(q.clone()));
b.to_async(&rt).iter(|| {
let query_array = query_array.clone();
async {
let mut scanner = memtable.scan();
let batches: Vec<RecordBatch> = scanner
.nearest("vector", query_array, k)
.nprobes(8)
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_SingleFragment_IVFPQ", &label),
&query,
|b, q| {
let dataset = lance_vec_setup.dataset.clone();
let query_array = Float32Array::from(q.clone());
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let query_array = query_array.clone();
async move {
let batches: Vec<RecordBatch> = dataset
.scan()
.nearest("vector", &query_array, k)
.unwrap()
.nprobes(8)
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.bench_with_input(
BenchmarkId::new("Lance_PerBatchFragment_IVFPQ", &label),
&query,
|b, q| {
let dataset = lance_vec_per_batch_setup.dataset.clone();
let query_array = Float32Array::from(q.clone());
b.to_async(&rt).iter(|| {
let dataset = dataset.clone();
let query_array = query_array.clone();
async move {
let batches: Vec<RecordBatch> = dataset
.scan()
.nearest("vector", &query_array, k)
.unwrap()
.nprobes(8)
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(total > 0);
}
});
},
);
group.finish();
}
fn all_benchmarks(c: &mut Criterion) {
bench_scan(c);
bench_point_lookup(c);
bench_fts(c);
bench_vector_search(c);
}
#[cfg(target_os = "linux")]
criterion_group!(
name = benches;
config = Criterion::default()
.significance_level(0.05)
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = all_benchmarks
);
#[cfg(not(target_os = "linux"))]
criterion_group!(
name = benches;
config = Criterion::default().significance_level(0.05);
targets = all_benchmarks
);
criterion_main!(benches);