use crate::{
config::{CacheMode, Config, SyncMode, Workload},
error::Result,
filesystem::{drop_page_cache, prepare_blob, prepare_filled_blob, random_write_payload},
report::Report,
runner::{
random_blocks, run_read_loop, run_sync_write_loop, run_write_loop, sequential_blocks,
warm_read_loop,
},
};
use commonware_runtime::{tokio::Context, Blob as _, Storage as _};
use futures::{stream::FuturesUnordered, TryStreamExt};
use rand::{
rngs::{SmallRng, StdRng},
Rng, SeedableRng,
};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Instant,
};
const PARTITION: &str = "storage-bench";
const BLOB_NAME: &[u8] = b"blob";
type RuntimeBlob = <Context as commonware_runtime::Storage>::Blob;
pub async fn run_benchmark(cfg: &Config, context: Context) -> Result<Report> {
let result = match cfg.workload {
Workload::ReadSeq | Workload::ReadRand => run_read(cfg, &context).await,
Workload::WriteSeq | Workload::WriteRand => run_overwrite(cfg, &context).await,
Workload::WriteAppend => run_write_append(cfg, &context).await,
Workload::WriteSync => run_write_sync(cfg, &context).await,
Workload::ReadWriteAppend => run_read_write_append(cfg, &context).await,
};
let _ = context.remove(PARTITION, None).await;
result
}
async fn run_read(cfg: &Config, context: &Context) -> Result<Report> {
let sequential = cfg.workload == Workload::ReadSeq;
let file_size = cfg.file_size();
let total_blocks = file_size / cfg.io_size as u64;
let inflight = cfg.inflight as u64;
let mut rng = StdRng::seed_from_u64(cfg.seed);
let blob = prepare_filled_blob(
&mut rng, context, &cfg.root, PARTITION, BLOB_NAME, file_size,
)
.await?;
prepare_cache(cfg, &blob, total_blocks).await?;
let start = Instant::now();
let deadline = start + cfg.duration();
let workers = (0..cfg.inflight)
.map(|worker| {
let blob = blob.clone();
async move {
if sequential {
run_read_loop(
blob,
deadline,
cfg.io_size,
sequential_blocks(worker as u64 % total_blocks, inflight, total_blocks),
)
.await
} else {
run_read_loop(
blob,
deadline,
cfg.io_size,
random_blocks(worker_seed(cfg.seed, worker), total_blocks),
)
.await
}
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(Report::new(start.elapsed(), Some(workers), None, file_size))
}
async fn run_overwrite(cfg: &Config, context: &Context) -> Result<Report> {
let file_size = cfg.file_size();
let total_blocks = file_size / cfg.io_size as u64;
let inflight = cfg.inflight as u64;
let sequential = cfg.workload == Workload::WriteSeq;
let blob = prepare_blob(context, &cfg.root, PARTITION, BLOB_NAME, file_size).await?;
let mut rng = StdRng::seed_from_u64(cfg.seed);
let payload = random_write_payload(&mut rng, cfg.io_size, cfg.write_shape);
let start = Instant::now();
let deadline = start + cfg.duration();
let workers = (0..cfg.inflight)
.map(|worker| {
let blob = blob.clone();
let payload = payload.clone();
async move {
if sequential {
run_write_loop(
blob,
deadline,
cfg.io_size,
payload,
cfg.sync_mode,
sequential_blocks(worker as u64 % total_blocks, inflight, total_blocks),
|_| {},
)
.await
} else {
run_write_loop(
blob,
deadline,
cfg.io_size,
payload,
cfg.sync_mode,
random_blocks(worker_seed(cfg.seed, worker), total_blocks),
|_| {},
)
.await
}
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
if cfg.sync_mode == SyncMode::End {
blob.sync().await?;
}
Ok(Report::new(start.elapsed(), None, Some(workers), file_size))
}
async fn run_write_append(cfg: &Config, context: &Context) -> Result<Report> {
let blob = prepare_blob(context, &cfg.root, PARTITION, BLOB_NAME, 0).await?;
let mut rng = StdRng::seed_from_u64(cfg.seed);
let payload = random_write_payload(&mut rng, cfg.io_size, cfg.write_shape);
let start = Instant::now();
let deadline = start + cfg.duration();
let stats = run_write_loop(
blob.clone(),
deadline,
cfg.io_size,
payload,
cfg.sync_mode,
sequential_blocks(0, 1, u64::MAX),
|_| {},
)
.await?;
if cfg.sync_mode == SyncMode::End {
blob.sync().await?;
}
let final_file_size = stats.bytes;
Ok(Report::new(
start.elapsed(),
None,
Some(vec![stats]),
final_file_size,
))
}
async fn run_write_sync(cfg: &Config, context: &Context) -> Result<Report> {
let file_size = cfg.file_size();
let total_blocks = file_size / cfg.io_size as u64;
let inflight = cfg.inflight as u64;
let blob = prepare_blob(context, &cfg.root, PARTITION, BLOB_NAME, file_size).await?;
let mut rng = StdRng::seed_from_u64(cfg.seed);
let payload = random_write_payload(&mut rng, cfg.io_size, cfg.write_shape);
let start = Instant::now();
let deadline = start + cfg.duration();
let workers = (0..cfg.inflight)
.map(|worker| {
let blob = blob.clone();
let payload = payload.clone();
async move {
run_sync_write_loop(
blob,
deadline,
cfg.io_size,
payload,
cfg.sync_method,
sequential_blocks(worker as u64 % total_blocks, inflight, total_blocks),
)
.await
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(Report::new(start.elapsed(), None, Some(workers), file_size))
}
async fn run_read_write_append(cfg: &Config, context: &Context) -> Result<Report> {
let initial_size = cfg.file_size();
let total_blocks = initial_size / cfg.io_size as u64;
let io_size = cfg.io_size as u64;
let mut rng = StdRng::seed_from_u64(cfg.seed);
let blob = prepare_filled_blob(
&mut rng,
context,
&cfg.root,
PARTITION,
BLOB_NAME,
initial_size,
)
.await?;
prepare_cache(cfg, &blob, total_blocks).await?;
let payload = random_write_payload(&mut rng, cfg.io_size, cfg.write_shape);
let current_len = Arc::new(AtomicU64::new(initial_size));
let start = Instant::now();
let deadline = start + cfg.duration();
let writer = {
let blob = blob.clone();
let current_len = current_len.clone();
async move {
run_write_loop(
blob,
deadline,
cfg.io_size,
payload,
cfg.sync_mode,
sequential_blocks(total_blocks, 1, u64::MAX),
|end_offset| current_len.store(end_offset, Ordering::Relaxed),
)
.await
}
};
let readers = (0..cfg.inflight)
.map(|worker| {
let blob = blob.clone();
let current_len = current_len.clone();
let mut rng = SmallRng::seed_from_u64(worker_seed(cfg.seed, worker));
async move {
let random_block = || {
let total_blocks = current_len.load(Ordering::Relaxed) / io_size;
rng.gen_range(0..total_blocks)
};
run_read_loop(blob, deadline, cfg.io_size, random_block).await
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>();
let (write_stats, read_workers) = futures::try_join!(writer, readers)?;
let final_file_size = initial_size + write_stats.bytes;
if cfg.sync_mode == SyncMode::End {
blob.sync().await?;
}
Ok(Report::new(
start.elapsed(),
Some(read_workers),
Some(vec![write_stats]),
final_file_size,
))
}
async fn prepare_cache(cfg: &Config, blob: &RuntimeBlob, total_blocks: u64) -> Result<()> {
let cache = cfg.cache.expect("validated");
if cache == CacheMode::Cold {
drop_page_cache(&cfg.root, PARTITION, BLOB_NAME)?;
return Ok(());
}
let inflight = cfg.inflight as u64;
let sequential = cfg.workload == Workload::ReadSeq;
(0..cfg.inflight)
.map(|worker| {
let blob = blob.clone();
async move {
if sequential {
let warm_ops = total_blocks.div_ceil(inflight);
warm_read_loop(
blob,
cfg.io_size,
warm_ops,
sequential_blocks(worker as u64 % total_blocks, inflight, total_blocks),
)
.await
} else {
let warm_ops = total_blocks.saturating_mul(3).div_ceil(inflight).max(1);
warm_read_loop(
blob,
cfg.io_size,
warm_ops,
random_blocks(worker_seed(cfg.seed, worker), total_blocks),
)
.await
}
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
#[inline]
const fn worker_seed(seed: u64, worker: usize) -> u64 {
seed.wrapping_add(worker as u64)
}