use crate::{
config::{SyncMethod, SyncMode},
error::Result,
report::Stats,
};
use commonware_runtime::{Blob, IoBufMut, IoBufs};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::time::Instant;
const DEADLINE_CHECK_STRIDE: u64 = 8;
const LATENCY_SAMPLE_STRIDE: u64 = 16;
#[inline]
pub fn sequential_blocks(start: u64, stride: u64, total_blocks: u64) -> impl FnMut() -> u64 {
let mut block = start;
move || {
let cur = block;
block = (block + stride) % total_blocks;
cur
}
}
#[inline]
pub fn random_blocks(seed: u64, total_blocks: u64) -> impl FnMut() -> u64 {
let mut rng = SmallRng::seed_from_u64(seed);
move || rng.gen_range(0..total_blocks)
}
#[inline]
pub async fn warm_read_loop(
blob: impl Blob,
io_size: usize,
ops: u64,
mut next_block: impl FnMut() -> u64,
) -> Result<()> {
let mut buffer = IoBufMut::with_capacity(io_size).into();
for _ in 0..ops {
let offset = next_block() * io_size as u64;
buffer = blob.read_at_buf(offset, io_size, buffer).await?;
}
Ok(())
}
#[inline]
pub async fn run_read_loop(
blob: impl Blob,
deadline: Instant,
io_size: usize,
mut next_block: impl FnMut() -> u64,
) -> Result<Stats> {
let mut stats = Stats::default();
let mut buffer = IoBufMut::with_capacity(io_size).into();
while should_continue(deadline, stats.ops) {
let offset = next_block() * io_size as u64;
let started = should_sample_latency(stats.ops).then(Instant::now);
buffer = blob.read_at_buf(offset, io_size, buffer).await?;
stats.record(io_size as u64, started.map(|s| s.elapsed()));
}
Ok(stats)
}
#[inline]
pub async fn run_write_loop(
blob: impl Blob,
deadline: Instant,
io_size: usize,
payload: IoBufs,
sync_mode: SyncMode,
mut next_block: impl FnMut() -> u64,
mut after_write: impl FnMut(u64),
) -> Result<Stats> {
let mut stats = Stats::default();
let mut writes_since_sync = 0u64;
let io_size = io_size as u64;
while should_continue(deadline, stats.ops) {
let offset = next_block() * io_size;
let started = should_sample_latency(stats.ops).then(Instant::now);
blob.write_at(offset, payload.clone()).await?;
stats.record(io_size, started.map(|s| s.elapsed()));
after_write(offset + io_size);
writes_since_sync += 1;
if let SyncMode::Every(every) = sync_mode {
if writes_since_sync == every {
blob.sync().await?;
writes_since_sync = 0;
}
}
}
if matches!(sync_mode, SyncMode::Every(_)) && writes_since_sync != 0 {
blob.sync().await?;
}
Ok(stats)
}
#[inline]
pub async fn run_sync_write_loop(
blob: impl Blob,
deadline: Instant,
io_size: usize,
payload: IoBufs,
sync_method: SyncMethod,
mut next_block: impl FnMut() -> u64,
) -> Result<Stats> {
let mut stats = Stats::default();
let io_size = io_size as u64;
while should_continue(deadline, stats.ops) {
let offset = next_block() * io_size;
let started = should_sample_latency(stats.ops).then(Instant::now);
match sync_method {
SyncMethod::WriteThenSync => {
blob.write_at(offset, payload.clone()).await?;
blob.sync().await?;
}
SyncMethod::WriteAtSync => {
blob.write_at_sync(offset, payload.clone()).await?;
}
}
stats.record(io_size, started.map(|s| s.elapsed()));
}
Ok(stats)
}
#[inline(always)]
fn should_continue(deadline: Instant, completed_ops: u64) -> bool {
if completed_ops.is_multiple_of(DEADLINE_CHECK_STRIDE) {
Instant::now() < deadline
} else {
true
}
}
#[inline(always)]
const fn should_sample_latency(completed_ops: u64) -> bool {
completed_ops.is_multiple_of(LATENCY_SAMPLE_STRIDE)
}