use crate::{append_fixed_random_data, get_fixed_journal};
use commonware_runtime::{
benchmarks::{context, tokio},
tokio::{Config, Context, Runner},
Runner as _,
};
use commonware_storage::journal::contiguous::{fixed::Journal, Reader as _};
use commonware_utils::{sequence::FixedBytes, NZU64};
use criterion::{criterion_group, Criterion};
use futures::future::try_join_all;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
hint::black_box,
num::NonZeroU64,
time::{Duration, Instant},
};
const PARTITION: &str = "test-partition";
const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10_000);
const ITEMS_TO_WRITE: u64 = 5_000_000;
const ITEM_SIZE: usize = 32;
async fn bench_run_serial(journal: &Journal<Context, FixedBytes<ITEM_SIZE>>, items_to_read: usize) {
let reader = journal.reader().await;
let mut rng = StdRng::seed_from_u64(0);
for _ in 0..items_to_read {
let pos = rng.gen_range(0..ITEMS_TO_WRITE);
black_box(reader.read(pos).await.expect("failed to read data"));
}
}
async fn bench_run_concurrent(
journal: &Journal<Context, FixedBytes<ITEM_SIZE>>,
items_to_read: usize,
) {
let reader = journal.reader().await;
let mut rng = StdRng::seed_from_u64(0);
let mut futures = Vec::with_capacity(items_to_read);
for _ in 0..items_to_read {
let pos = rng.gen_range(0..ITEMS_TO_WRITE);
futures.push(reader.read(pos));
}
try_join_all(futures).await.expect("failed to read data");
}
fn bench_fixed_read_random(c: &mut Criterion) {
let cfg = Config::default();
let runner = Runner::new(cfg.clone());
runner.start(|ctx| async move {
let mut j = get_fixed_journal(ctx, PARTITION, ITEMS_PER_BLOB).await;
append_fixed_random_data::<_, ITEM_SIZE>(&mut j, ITEMS_TO_WRITE).await;
j.sync().await.unwrap();
});
let runner = tokio::Runner::new(cfg.clone());
for mode in ["serial", "concurrent"] {
for items_to_read in [100, 1_000, 10_000, 100_000] {
c.bench_function(
&format!(
"{}/mode={} items={} size={}",
module_path!(),
mode,
items_to_read,
ITEM_SIZE
),
|b| {
b.to_async(&runner).iter_custom(|iters| async move {
let ctx = context::get::<commonware_runtime::tokio::Context>();
let j = get_fixed_journal(ctx.clone(), PARTITION, ITEMS_PER_BLOB).await;
let mut duration = Duration::ZERO;
for _ in 0..iters {
let start = Instant::now();
match mode {
"serial" => bench_run_serial(&j, items_to_read).await,
"concurrent" => bench_run_concurrent(&j, items_to_read).await,
_ => unreachable!(),
}
duration += start.elapsed();
}
duration
});
},
);
}
}
let runner = Runner::new(cfg);
runner.start(|context| async move {
let j = get_fixed_journal::<ITEM_SIZE>(context, PARTITION, ITEMS_PER_BLOB).await;
j.destroy().await.unwrap();
});
}
criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_fixed_read_random
}