use std::sync::Arc;
use arrow_array::{FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema};
use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main};
use futures::{StreamExt, TryStreamExt};
use lance_file::version::LanceFileVersion;
use log::info;
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};
use rand::Rng;
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::DatasetIndexExt;
use lance::index::vector::VectorIndexParams;
use lance_arrow::FixedSizeListArrayExt;
use lance_index::{
IndexType,
vector::{ivf::IvfBuildParams, pq::PQBuildParams},
};
use lance_linalg::distance::MetricType;
use lance_testing::datagen::generate_random_array;
use tokio::runtime::Runtime;
const NUM_ROWS: usize = 1_000_000;
const DIM: usize = 768;
const NUM_QUERIES: usize = 100;
const K: usize = 50;
const NPROBES: usize = 20;
const REFINE_FACTOR: u32 = 10;
const IVF_PARTITIONS: usize = 256;
const PQ_BITS: usize = 8;
const PQ_SUB_VECTORS: usize = DIM / 16;
const MAX_ITERATIONS: usize = 50;
struct CachedDataset {
dataset: Arc<Dataset>,
query_vectors: Vec<Arc<Float32Array>>,
}
fn dataset_path(version: LanceFileVersion) -> String {
format!(
"/tmp/lance_bench_throughput_{}_{}_{}",
NUM_ROWS, DIM, version
)
}
fn get_or_create_dataset(rt: &Runtime, version: LanceFileVersion) -> Arc<CachedDataset> {
let uri = format!("file://{}", dataset_path(version));
rt.block_on(async {
let mut needs_creation = true;
let mut needs_indexing = true;
if let Ok(dataset) = Dataset::open(&uri).await {
let row_count = dataset.count_rows(None).await.unwrap();
if row_count == NUM_ROWS {
info!("Reusing existing dataset at {} ({} rows)", uri, row_count);
needs_creation = false;
let indices = dataset.load_indices().await.unwrap();
if !indices.is_empty() {
log::info!(
"Dataset already has {} index(es), skipping index creation",
indices.len()
);
needs_indexing = false;
} else {
info!("Dataset exists but has no index, will create index");
}
} else {
info!(
"Dataset exists but has wrong row count ({} vs {}), recreating",
row_count, NUM_ROWS
);
std::fs::remove_dir_all(&uri).ok();
}
} else {
info!(
"Creating new dataset with {} rows, {} dimensions",
NUM_ROWS, DIM
);
}
if needs_creation {
create_dataset(&uri).await;
}
let mut dataset = Dataset::open(&uri).await.unwrap();
if needs_indexing {
create_ivf_pq_index(&mut dataset).await;
}
let query_vectors = generate_query_vectors();
Arc::new(CachedDataset {
dataset: Arc::new(dataset),
query_vectors,
})
})
}
async fn create_dataset(uri: &str) {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"vector",
DataType::FixedSizeList(
FieldRef::new(Field::new("item", DataType::Float32, true)),
DIM as i32,
),
false,
)]));
let batch_size = 10_000;
let batches: Vec<RecordBatch> = (0..(NUM_ROWS / batch_size))
.map(|_| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
FixedSizeListArray::try_new_from_values(
generate_random_array(batch_size * DIM),
DIM as i32,
)
.unwrap(),
)],
)
.unwrap()
})
.collect();
let write_params = WriteParams {
max_rows_per_file: NUM_ROWS,
max_rows_per_group: batch_size,
mode: WriteMode::Create,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(reader, uri, Some(write_params))
.await
.unwrap();
info!("Dataset created at {}", uri);
}
async fn create_ivf_pq_index(dataset: &mut Dataset) {
info!("Creating IVF_PQ index...");
let ivf_params = IvfBuildParams {
num_partitions: Some(IVF_PARTITIONS),
max_iters: MAX_ITERATIONS,
..Default::default()
};
let pq_params = PQBuildParams {
num_bits: PQ_BITS,
num_sub_vectors: PQ_SUB_VECTORS,
..Default::default()
};
let params = VectorIndexParams::with_ivf_pq_params(MetricType::L2, ivf_params, pq_params);
dataset
.create_index(
vec!["vector"].as_slice(),
IndexType::Vector,
Some("ivf_pq_index".to_string()),
¶ms,
true,
)
.await
.unwrap();
info!("IVF_PQ index created");
}
fn generate_query_vectors() -> Vec<Arc<Float32Array>> {
let mut rng = rand::rng();
(0..NUM_QUERIES)
.map(|_| {
let values: Vec<f32> = (0..DIM).map(|_| rng.random_range(0.0..1.0)).collect();
Arc::new(Float32Array::from(values))
})
.collect()
}
#[cfg(target_os = "linux")]
fn drop_dataset_from_cache(dataset_dir: &str) -> std::io::Result<()> {
use std::fs;
use std::os::unix::io::AsRawFd;
let mut num_dropped = 0;
let entries = fs::read_dir(format!("{}/data", dataset_dir)).unwrap();
for entry in entries.flatten() {
let path = entry.path();
if path.is_file()
&& let Ok(file) = fs::File::open(&path)
{
let fd = file.as_raw_fd();
let result = unsafe { libc::posix_fadvise(fd, 0, 0, libc::POSIX_FADV_DONTNEED) };
if result != 0 {
panic!(
"Warning: Failed to drop {:?} from cache: {}",
path,
std::io::Error::from_raw_os_error(result)
);
}
num_dropped += 1;
}
}
if num_dropped == 0 {
panic!("No files dropped from cache");
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn drop_dataset_from_cache(_path: &str) -> std::io::Result<()> {
Ok(())
}
async fn run_queries(
dataset: Arc<Dataset>,
query_vectors: &[Arc<Float32Array>],
concurrent_queries: usize,
) {
futures::stream::iter(query_vectors)
.map(|q| {
let dataset = dataset.clone();
let q = q.clone();
tokio::spawn(async move {
dataset
.scan()
.nearest("vector", q.as_ref(), K)
.unwrap()
.minimum_nprobes(NPROBES)
.maximum_nprobes(NPROBES)
.refine(REFINE_FACTOR)
.project(&["vector", "_distance"])
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap()
})
})
.buffered(concurrent_queries)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
fn bench_ivf_pq_throughput(c: &mut Criterion) {
env_logger::init();
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let mut group = c.benchmark_group("ivf_pq_throughput");
group.throughput(Throughput::Elements(NUM_QUERIES as u64));
for &version in &[
LanceFileVersion::V2_0,
LanceFileVersion::V2_1,
LanceFileVersion::V2_2,
] {
let cached_dataset = get_or_create_dataset(&rt, version);
for &concurrent_queries in &[1, 16] {
for &cached in &[true, false] {
#[cfg(not(target_os = "linux"))]
if !cached {
continue;
}
let cache_label = if cached { "cached" } else { "nocache" };
rt.block_on(run_queries(
cached_dataset.dataset.clone(),
&cached_dataset.query_vectors,
concurrent_queries,
));
group.bench_function(
format!("{}_{}threads_{}", version, concurrent_queries, cache_label),
|b| {
b.iter_batched(
|| {
if !cached {
drop_dataset_from_cache(&dataset_path(version)).ok();
}
},
|_| {
rt.block_on(run_queries(
cached_dataset.dataset.clone(),
&cached_dataset.query_vectors,
concurrent_queries,
));
},
BatchSize::PerIteration,
);
},
);
}
}
}
group.finish();
}
#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_ivf_pq_throughput
);
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_ivf_pq_throughput
);
criterion_main!(benches);