use mtop_client::{MemcachedClient, MtopError, Timeout};
use rand::Rng;
use rand_distr::Exp;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time;
use tokio::time::Instant;
const NUM_KEYS: usize = 1000;
const GET_BATCH_SIZE: usize = 100;
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq)]
#[repr(transparent)]
pub struct Percent(f64);
impl Percent {
pub fn unchecked(v: f64) -> Self {
assert!(v >= 0.0, "percent must be >= 0.0");
assert!(v <= 1.0, "percent must be <= 1.0");
Self(v)
}
pub fn as_f64(&self) -> f64 {
self.0
}
}
impl FromStr for Percent {
type Err = MtopError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let v = s
.parse()
.map_err(|e| MtopError::configuration_cause(format!("invalid percent {}", s), e))?;
if !(0.0..=1.0).contains(&v) {
Err(MtopError::configuration(format!("invalid percent {}", v)))
} else {
Ok(Self(v))
}
}
}
impl fmt::Display for Percent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug)]
pub struct Bencher {
client: Arc<MemcachedClient>,
handle: Handle,
delay: Duration,
timeout: Duration,
ttl: Duration,
write_percent: Percent,
concurrency: usize,
stop: Arc<AtomicBool>,
}
impl Bencher {
#[allow(clippy::too_many_arguments)]
pub fn new(
client: MemcachedClient,
handle: Handle,
delay: Duration,
timeout: Duration,
ttl: Duration,
write_percent: Percent,
concurrency: usize,
stop: Arc<AtomicBool>,
) -> Self {
Self {
client: Arc::new(client),
handle,
delay,
timeout,
ttl,
write_percent,
concurrency,
stop,
}
}
pub async fn run(&self, time: Duration) -> Vec<Summary> {
let mut tasks = Vec::with_capacity(self.concurrency);
for worker in 0..self.concurrency {
let mut interval = time::interval(self.delay);
let stop = self.stop.clone();
let client = self.client.clone();
let write_percent = self.write_percent;
let ttl = self.ttl.as_secs() as u32;
let timeout = self.timeout;
let start = Instant::now();
tasks.push((worker, self.handle.spawn(async move {
let fixture = FixtureData::new(worker, NUM_KEYS);
let mut stats = Summary { worker, ..Default::default() };
while !stop.load(Ordering::Acquire) && start.elapsed() < time {
let _ = interval.tick().await;
let set_start = Instant::now();
for kv in fixture.kvs.iter() {
if rand::rng().random_bool(write_percent.as_f64()) {
if let Err(e) = client.set(kv.key(), 0, ttl, kv.payload()).timeout(timeout, "client.set").await {
tracing::debug!(message = "unable to set item", key = kv.key(), payload_size = kv.len(), err = %e);
} else {
stats.sets += 1;
}
}
}
stats.sets_time += set_start.elapsed();
let get_start = Instant::now();
for batch in fixture.kvs.chunks(GET_BATCH_SIZE) {
let keys = batch.iter().map(|kv| kv.key()).collect::<Vec<_>>();
match client.get(keys).timeout(timeout, "client.get").await {
Ok(v) => {
stats.gets += GET_BATCH_SIZE as u64;
for (id, e) in v.errors {
tracing::debug!(message = "error getting items", first_key = batch.first().map(|kv| kv.key()), server = %id, err = %e);
}
}
Err(e) => {
tracing::debug!(message = "unable to get items", first_key = batch.first().map(|kv| kv.key()), err = %e);
}
}
}
stats.gets_time += get_start.elapsed();
}
stats
})));
}
let mut out = Vec::with_capacity(self.concurrency);
for (worker, t) in tasks {
match t.await {
Ok(m) => out.push(m),
Err(e) => {
tracing::error!(message = "unable to run benchmark task", worker = worker, err = %e);
}
}
}
out
}
}
#[derive(Debug)]
struct KVPair {
key: String,
len: usize,
}
impl KVPair {
const MAX_ITEM_SIZE: usize = 64 * 1024;
const ITEM_PAYLOAD: [u8; Self::MAX_ITEM_SIZE] = [b'x'; Self::MAX_ITEM_SIZE];
fn new(key: String, unit: f64) -> Self {
let len = ((Self::MAX_ITEM_SIZE as f64 * unit) as usize).min(Self::MAX_ITEM_SIZE);
Self { key, len }
}
fn key(&self) -> &str {
&self.key
}
fn len(&self) -> usize {
self.len
}
fn payload(&self) -> &[u8] {
&Self::ITEM_PAYLOAD[0..self.len]
}
}
#[derive(Debug)]
struct FixtureData {
kvs: Vec<KVPair>,
}
impl FixtureData {
fn new(worker: usize, num: usize) -> FixtureData {
let mut kvs = Vec::with_capacity(num);
let mut rng = rand::rng();
let dist = Exp::new(10.0).unwrap();
for i in 0..num {
let key = format!("mc-bench-{}-{}", worker, i);
let unit = rng.sample(dist);
kvs.push(KVPair::new(key, unit))
}
Self { kvs }
}
}
#[derive(Debug, Default)]
pub struct Summary {
pub gets_time: Duration,
pub gets: u64,
pub sets_time: Duration,
pub sets: u64,
pub worker: usize,
}
impl Summary {
pub fn gets_per_sec(&self) -> f64 {
self.gets as f64 / self.gets_time.as_secs_f64()
}
pub fn sets_per_sec(&self) -> f64 {
self.sets as f64 / self.sets_time.as_secs_f64()
}
}