use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use alloc_counter::{count_alloc_future, AllocCounterSystem};
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use ethereum_types::H256;
use keyvaluedb::{DBKeyValue, KeyValueDB};
use keyvaluedb_sqlite::{Database, DatabaseConfig};
use once_cell::sync::Lazy;
use rand::{distributions::Uniform, seq::SliceRandom, Rng, RngCore, SeedableRng};
use rand_chacha::ChaCha20Rng;
static NEEDLES: Lazy<usize> = Lazy::new(|| {
std::env::var("NEEDLES")
.map(|seed| seed.parse::<usize>().expect("invalid NEEDLES env var"))
.unwrap_or(10_000)
});
static NEEDLES_TO_HAYSTACK_RATIO: Lazy<usize> = Lazy::new(|| {
std::env::var("NEEDLES_TO_HAYSTACK_RATIO")
.map(|seed| {
seed.parse::<usize>()
.expect("invalid NEEDLES_TO_HAYSTACK_RATIO env var")
})
.unwrap_or(100)
});
static TEMPDIR: Lazy<String> =
Lazy::new(|| std::env::var("TEMPDIR").unwrap_or("./benches/_sqlite_bench_get".into()));
static SEED: Lazy<u64> = Lazy::new(|| {
std::env::var("SEED")
.map(|seed| seed.parse::<u64>().expect("invalid SEED env var"))
.unwrap_or(12345678901234567890)
});
#[global_allocator]
static A: AllocCounterSystem = AllocCounterSystem;
criterion_group!(benches, get, iter);
criterion_main!(benches);
fn open_db() -> Database {
let cfg = DatabaseConfig::new().with_columns(1);
Database::open(&*TEMPDIR, cfg).expect("sqlite works")
}
fn n_random_bytes<R: RngCore>(rng: &mut R, n: usize) -> Vec<u8> {
let variability: i64 = rng.gen_range(0..(n / 5) as i64);
let plus_or_minus: i64 = if variability % 2 == 0 { 1 } else { -1 };
let range = Uniform::from(0..u8::MAX);
rng.sample_iter(&range)
.take((n as i64 + plus_or_minus * variability) as usize)
.collect()
}
async fn populate<R: RngCore>(rng: &mut R, db: &Database) -> io::Result<Vec<H256>> {
let mut needles = Vec::with_capacity(*NEEDLES);
let mut batch = db.transaction();
for i in 0..(*NEEDLES) * (*NEEDLES_TO_HAYSTACK_RATIO) {
let key = H256::random_using(rng);
if i % (*NEEDLES_TO_HAYSTACK_RATIO) == 0 {
needles.push(key);
if i % 100_000 == 0 && i > 0 {
println!("[populate] {} keys", i);
}
}
batch.put(0, key.as_bytes(), n_random_bytes(rng, 140));
}
db.write(batch).await.map_err(|e| e.error)?;
let db_size = db.path().metadata()?.len();
println!(
"[populate] {} keys total, file size {} MiB",
(*NEEDLES) * (*NEEDLES_TO_HAYSTACK_RATIO),
db_size >> 20
);
Ok(needles)
}
fn get(c: &mut Criterion) {
let rng = ChaCha20Rng::seed_from_u64(*SEED);
let db = open_db();
let rt = tokio::runtime::Runtime::new().unwrap();
let needles = rt.block_on(async {
let mut rng = rng.clone();
populate(&mut rng, &db).await.expect("sqlite works")
});
{
let total_iterations = Arc::new(AtomicU64::new(0));
let total_allocs = Arc::new(AtomicU64::new(0));
c.bench_function("get key", |b| {
b.to_async(FuturesExecutor).iter_custom(|iterations| {
let mut rng = rng.clone();
let needles = &needles;
let db = &db;
let total_iterations = total_iterations.clone();
let total_allocs = total_allocs.clone();
async move {
total_iterations.fetch_add(iterations, Ordering::Relaxed);
let mut elapsed = Duration::new(0, 0);
let (alloc_stats, _) = count_alloc_future(async {
let start = Instant::now();
for _ in 0..iterations {
let needle = needles.choose(&mut rng).expect("needles is not empty");
black_box(db.get(0, needle.as_bytes()).await.unwrap());
}
elapsed = start.elapsed();
})
.await;
total_allocs.fetch_add(alloc_stats.0 as u64, Ordering::Relaxed);
elapsed
}
});
});
let total_iterations = total_iterations.load(Ordering::Relaxed);
let total_allocs = total_allocs.load(Ordering::Relaxed);
if total_iterations > 0 {
println!(
"[get key] total: iterations={}, allocations={}; allocations per iter={:.2}\n",
total_iterations,
total_allocs,
total_allocs as f64 / total_iterations as f64
);
}
}
{
let total_iterations = Arc::new(AtomicU64::new(0));
let total_allocs = Arc::new(AtomicU64::new(0));
c.bench_function("get key by prefix", |b| {
b.to_async(FuturesExecutor).iter_custom(|iterations| {
let mut rng = rng.clone();
let needles = &needles;
let db = &db;
let total_iterations = total_iterations.clone();
let total_allocs = total_allocs.clone();
async move {
total_iterations.fetch_add(iterations, Ordering::Relaxed);
let mut elapsed = Duration::new(0, 0);
let (alloc_stats, _) = count_alloc_future(async {
let start = Instant::now();
for _ in 0..iterations {
let needle = needles.choose(&mut rng).expect("needles is not empty");
black_box(
db.first_with_prefix(0, &needle.as_bytes()[..8])
.await
.unwrap(),
);
}
elapsed = start.elapsed();
})
.await;
total_allocs.fetch_add(alloc_stats.0 as u64, Ordering::Relaxed);
elapsed
}
});
});
let total_iterations = total_iterations.load(Ordering::Relaxed);
let total_allocs = total_allocs.load(Ordering::Relaxed);
if total_iterations > 0 {
println!(
"[get key by prefix] total: iterations={}, allocations={}; allocations per iter={:.2}\n",
total_iterations,
total_allocs,
total_allocs as f64 / total_iterations as f64
);
}
}
}
fn iter(c: &mut Criterion) {
let db = open_db();
{
let total_iterations = Arc::new(AtomicU64::new(0));
let total_allocs = Arc::new(AtomicU64::new(0));
c.bench_function("iterate over 1k keys", |b| {
b.to_async(FuturesExecutor).iter_custom(|iterations| {
let db = &db;
let total_iterations = total_iterations.clone();
let total_allocs = total_allocs.clone();
async move {
total_iterations.fetch_add(iterations, Ordering::Relaxed);
let mut elapsed = Duration::new(0, 0);
let (alloc_stats, _) = count_alloc_future(async {
let start = Instant::now();
for _ in 0..iterations {
let contents: Vec<DBKeyValue> = Vec::new();
let (contents, _) = db.iter(0, None, contents, |contents, kv| {
contents.push((kv.0.clone(), kv.1.clone()));
Ok(if contents.len() != 1000 {
None
} else {
Some(())
})
})
.await
.unwrap();
black_box(contents);
}
elapsed = start.elapsed();
})
.await;
total_allocs.fetch_add(alloc_stats.0 as u64, Ordering::Relaxed);
elapsed
}
});
let total_iterations = total_iterations.load(Ordering::Relaxed);
let total_allocs = total_allocs.load(Ordering::Relaxed);
if total_iterations > 0 {
println!(
"[iterate over 1k keys] total: iterations={}, allocations={}; allocations per iter={:.2}\n",
total_iterations,
total_allocs,
total_allocs as f64 / total_iterations as f64
);
}
});
}
{
let total_iterations = Arc::new(AtomicU64::new(0));
let total_allocs = Arc::new(AtomicU64::new(0));
c.bench_function("single key from iterator", |b| {
b.to_async(FuturesExecutor).iter_custom(|iterations| {
let db = &db;
let total_iterations = total_iterations.clone();
let total_allocs = total_allocs.clone();
async move {
total_iterations.fetch_add(iterations, Ordering::Relaxed);
let mut elapsed = Duration::new(0, 0);
let (alloc_stats, _) = count_alloc_future(async {
let start = Instant::now();
for _ in 0..iterations {
let out = db.iter(0, None, (), |_, _| Ok(Some(()))).await;
let _ = black_box(out);
}
elapsed = start.elapsed();
})
.await;
total_allocs.fetch_add(alloc_stats.0 as u64, Ordering::Relaxed);
elapsed
}
});
});
let total_iterations = total_iterations.load(Ordering::Relaxed);
let total_allocs = total_allocs.load(Ordering::Relaxed);
if total_iterations > 0 {
println!(
"[single key from iterator] total: iterations={}, allocations={}; allocations per iter={:.2}\n",
total_iterations,
total_allocs,
total_allocs as f64 / total_iterations as f64
);
}
}
}