use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use futures_util::Future;
use std::time::Duration;
pub fn current_thread_runtime() -> tokio::runtime::Runtime {
let mut builder = tokio::runtime::Builder::new_current_thread();
builder.enable_io();
builder.enable_time();
builder.build().unwrap()
}
pub fn block_on_all<F>(f: F) -> F::Output
where
F: Future,
{
current_thread_runtime().block_on(f)
}
fn get_redis_host() -> String {
std::env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string())
}
fn get_redis_client() -> redis::Client {
let redis_host = get_redis_host();
redis::Client::open(format!("redis://{redis_host}:6379")).unwrap()
}
async fn get_rustis_client() -> rustis::client::Client {
let redis_host = get_redis_host();
rustis::client::Client::connect(redis_host).await.unwrap()
}
async fn get_fred_client() -> fred::clients::Client {
use fred::prelude::*;
let redis_host = get_redis_host();
let config = Config::from_url(&format!("redis://{redis_host}:6379/0")).unwrap();
let client = Client::new(config, None, None, None);
client.connect();
client.wait_for_connect().await.unwrap();
client
}
fn bench_redis_simple_getsetdel_pipeline(b: &mut Bencher) {
let client = get_redis_client();
let mut con = client.get_connection().unwrap();
b.iter(|| {
let key = "test_key";
let _result: ((), i64, usize) = redis::pipe()
.set(key, 42)
.get(key)
.del(key)
.query(&mut con)
.unwrap();
});
}
#[allow(dead_code)]
fn bench_fred_simple_getsetdel_pipeline(b: &mut Bencher) {
use fred::prelude::*;
let runtime = current_thread_runtime();
let client = runtime.block_on(get_fred_client());
b.iter(|| {
runtime
.block_on(async {
let key = "test_key";
let pipeline = client.pipeline();
pipeline.set::<(), _, _>(key, 42, None, None, false).await?;
pipeline.get::<i64, _>(key).await?;
pipeline.del::<u32, _>(key).await?;
let _result: ((), i64, usize) = pipeline.all().await?;
Ok::<_, Error>(())
})
.unwrap()
});
}
fn bench_rustis_simple_getsetdel_pipeline(b: &mut Bencher) {
use rustis::{
Error,
client::BatchPreparedCommand,
commands::{GenericCommands, StringCommands},
};
let runtime = current_thread_runtime();
let client = runtime.block_on(get_rustis_client());
b.iter(|| {
runtime
.block_on(async {
let key = "test_key";
let mut pipeline = client.create_pipeline();
pipeline.set(key, 42).queue();
pipeline.get::<i64>(key).queue();
pipeline.del(key).queue();
let _result: ((), i64, usize) = pipeline.execute().await?;
Ok::<_, Error>(())
})
.unwrap()
});
}
const PIPELINE_QUERIES: usize = 1_000;
fn bench_redis_async_long_pipeline(b: &mut Bencher) {
use redis::RedisError;
let client = get_redis_client();
let runtime = current_thread_runtime();
let mut con = runtime
.block_on(client.get_multiplexed_async_connection())
.unwrap();
b.iter(|| {
runtime
.block_on(async {
let mut pipe = redis::Pipeline::with_capacity(PIPELINE_QUERIES);
for i in 0..PIPELINE_QUERIES {
pipe.set(format!("foo{i}"), "bar");
}
let _result: Vec<String> = pipe.query_async(&mut con).await?;
Ok::<_, RedisError>(())
})
.unwrap();
});
}
fn bench_redis_multiplexed_async_long_pipeline(b: &mut Bencher) {
use redis::RedisError;
let client = get_redis_client();
let runtime = current_thread_runtime();
let mut con = runtime
.block_on(client.get_multiplexed_async_connection())
.unwrap();
b.iter(|| {
runtime
.block_on(async {
let mut pipe = redis::Pipeline::with_capacity(PIPELINE_QUERIES);
for i in 0..PIPELINE_QUERIES {
pipe.set(format!("foo{i}"), "bar");
}
let _result: Vec<String> = pipe.query_async(&mut con).await?;
Ok::<_, RedisError>(())
})
.unwrap();
});
}
fn bench_fred_long_pipeline(b: &mut Bencher) {
use fred::prelude::*;
let runtime = current_thread_runtime();
let client = runtime.block_on(get_fred_client());
b.iter(|| {
runtime
.block_on(async {
let pipeline = client.pipeline();
for i in 0..PIPELINE_QUERIES {
pipeline
.set::<String, _, _>(format!("foo{i}"), "bar", None, None, false)
.await?;
}
let _result: Vec<String> = pipeline.all().await?;
Ok::<_, Error>(())
})
.unwrap()
});
}
fn bench_rustis_long_pipeline(b: &mut Bencher) {
use rustis::{Error, client::BatchPreparedCommand, commands::StringCommands};
let runtime = current_thread_runtime();
let client = runtime.block_on(get_rustis_client());
b.iter(|| {
runtime
.block_on(async {
let mut pipeline = client.create_pipeline();
pipeline.reserve(PIPELINE_QUERIES);
for i in 0..PIPELINE_QUERIES {
pipeline.set(format!("foo{i}"), "bar").queue();
}
let _result: Vec<String> = pipeline.execute().await?;
Ok::<_, Error>(())
})
.unwrap()
});
}
fn bench_simple(c: &mut Criterion) {
let mut group = c.benchmark_group("simple_pipeline");
group
.measurement_time(Duration::from_secs(10))
.bench_function(
"redis_simple_getsetdel_pipeline",
bench_redis_simple_getsetdel_pipeline,
)
.bench_function(
"rustis_simple_getsetdel_pipeline",
bench_rustis_simple_getsetdel_pipeline,
);
group.finish();
}
fn bench_long(c: &mut Criterion) {
let mut group = c.benchmark_group("long_pipeline");
group
.measurement_time(Duration::from_secs(10))
.bench_function("redis_async_long_pipeline", bench_redis_async_long_pipeline)
.bench_function(
"redis_multiplexed_async_long_pipeline",
bench_redis_multiplexed_async_long_pipeline,
)
.bench_function("fred_long_pipeline", bench_fred_long_pipeline)
.bench_function("rustis_long_pipeline", bench_rustis_long_pipeline);
group.finish();
}
criterion_group!(bench, bench_simple, bench_long);
criterion_main!(bench);