use bytes::Bytes;
use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::black_box;
use criterion::criterion_group;
use criterion::criterion_main;
use d_engine_core::StateMachine;
use d_engine_core::watch::WatchDispatcher;
use d_engine_core::watch::WatchRegistry;
use d_engine_core::watch::WatcherHandle;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::Insert;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::common::entry_payload::Payload;
use d_engine_server::storage::FileStateMachine;
use prost::Message;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
async fn create_test_state_machine() -> (FileStateMachine, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let mut sm = FileStateMachine::new(temp_dir.path().to_path_buf())
.await
.expect("Failed to create state machine");
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let lease = Arc::new(d_engine_server::storage::DefaultLease::new(lease_config));
sm.set_lease(lease);
(sm, temp_dir)
}
fn create_watch_system(
event_queue_size: usize,
watcher_buffer_size: usize,
) -> (
Arc<WatchRegistry>,
broadcast::Sender<d_engine_proto::client::WatchResponse>,
) {
let (broadcast_tx, broadcast_rx) = broadcast::channel(event_queue_size);
let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
let registry = Arc::new(WatchRegistry::new(watcher_buffer_size, unregister_tx));
let dispatcher = WatchDispatcher::new(Arc::clone(®istry), broadcast_rx, unregister_rx);
tokio::spawn(async move {
dispatcher.run().await;
});
(registry, broadcast_tx)
}
fn register_watchers(
registry: &WatchRegistry,
count: usize,
key_prefix: &str,
) -> Vec<WatcherHandle> {
let mut handles = Vec::with_capacity(count);
for i in 0..count {
let key = format!("{key_prefix}{i}");
handles.push(registry.register(key.into()));
}
handles
}
fn create_entries_without_ttl(
count: usize,
start_index: u64,
) -> Vec<Entry> {
(0..count)
.map(|i| {
let key = format!("key_{}", start_index + i as u64);
let value = format!("value_{}", start_index + i as u64);
let insert = Insert {
key: Bytes::from(key),
value: Bytes::from(value),
ttl_secs: 0,
};
let write_cmd = WriteCommand {
operation: Some(Operation::Insert(insert)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index: start_index + i as u64,
term: 1,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
})
.collect()
}
fn create_entries_with_ttl(
count: usize,
start_index: u64,
ttl_secs: u64,
) -> Vec<Entry> {
(0..count)
.map(|i| {
let key = format!("key_ttl_{}", start_index + i as u64);
let value = format!("value_ttl_{}", start_index + i as u64);
let insert = Insert {
key: Bytes::from(key),
value: Bytes::from(value),
ttl_secs,
};
let write_cmd = WriteCommand {
operation: Some(Operation::Insert(insert)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index: start_index + i as u64,
term: 1,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
})
.collect()
}
fn bench_apply_without_ttl(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_without_ttl", |b| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_without_ttl(1, 1);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_apply_with_ttl(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_with_ttl", |b| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(1, 1, 3600);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_get_without_ttl(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let entries = create_entries_without_ttl(100, 1);
sm.apply_chunk(entries).await.unwrap();
(sm, temp_dir)
});
c.bench_function("get_without_ttl", |b| {
b.iter(|| {
let key = b"key_50";
black_box(sm.get(key).unwrap());
});
});
}
fn bench_get_with_ttl_check(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let entries = create_entries_with_ttl(100, 1, 3600); sm.apply_chunk(entries).await.unwrap();
(sm, temp_dir)
});
c.bench_function("get_with_ttl_check", |b| {
b.iter(|| {
let key = b"key_ttl_50";
black_box(sm.get(key).unwrap());
});
});
}
fn bench_get_expired_ttl(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let entries = create_entries_with_ttl(100, 1, 1); sm.apply_chunk(entries).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
(sm, temp_dir)
});
c.bench_function("get_expired_ttl", |b| {
b.iter(|| {
let key = b"key_ttl_50";
black_box(sm.get(key).unwrap());
});
});
}
fn bench_batch_apply(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("batch_apply");
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
for size in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_without_ttl(size, 1);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
group.finish();
}
fn bench_batch_apply_with_ttl(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("batch_apply_with_ttl");
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
for size in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(size, 1, 3600);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
group.finish();
}
fn bench_apply_without_watch(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_without_watch", |b| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_without_ttl(100, 1);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_apply_with_1_watcher(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_with_1_watcher", |b| {
b.to_async(&runtime).iter(|| async {
let (registry, broadcast_tx) = create_watch_system(1000, 10);
let _watchers = register_watchers(®istry, 1, "key_");
let entries = create_entries_without_ttl(100, 1);
for entry in &entries {
if let Some(payload) = &entry.payload {
if let Some(Payload::Command(cmd_bytes)) = &payload.payload {
if let Ok(write_cmd) = WriteCommand::decode(cmd_bytes.as_ref()) {
if let Some(op) = write_cmd.operation {
match op {
Operation::Insert(insert) => {
let event = d_engine_proto::client::WatchResponse {
key: insert.key.clone(),
value: insert.value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::Delete(delete) => {
let event = d_engine_proto::client::WatchResponse {
key: delete.key.clone(),
value: bytes::Bytes::new(),
event_type:
d_engine_proto::client::WatchEventType::Delete
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::CompareAndSwap(cas) => {
let event = d_engine_proto::client::WatchResponse {
key: cas.key.clone(),
value: cas.new_value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
}
}
}
}
}
}
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_apply_with_10_watchers(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_with_10_watchers", |b| {
b.to_async(&runtime).iter(|| async {
let (registry, broadcast_tx) = create_watch_system(1000, 10);
let _watchers = register_watchers(®istry, 10, "key_");
let entries = create_entries_without_ttl(100, 1);
for entry in &entries {
if let Some(payload) = &entry.payload {
if let Some(Payload::Command(cmd_bytes)) = &payload.payload {
if let Ok(write_cmd) = WriteCommand::decode(cmd_bytes.as_ref()) {
if let Some(op) = write_cmd.operation {
match op {
Operation::Insert(insert) => {
let event = d_engine_proto::client::WatchResponse {
key: insert.key.clone(),
value: insert.value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::Delete(delete) => {
let event = d_engine_proto::client::WatchResponse {
key: delete.key.clone(),
value: bytes::Bytes::new(),
event_type:
d_engine_proto::client::WatchEventType::Delete
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::CompareAndSwap(cas) => {
let event = d_engine_proto::client::WatchResponse {
key: cas.key.clone(),
value: cas.new_value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
}
}
}
}
}
}
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_apply_with_100_watchers(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
c.bench_function("apply_with_100_watchers", |b| {
b.to_async(&runtime).iter(|| async {
let (registry, broadcast_tx) = create_watch_system(1000, 10);
let _watchers = register_watchers(®istry, 100, "key_");
let entries = create_entries_without_ttl(100, 1);
for entry in &entries {
if let Some(payload) = &entry.payload {
if let Some(Payload::Command(cmd_bytes)) = &payload.payload {
if let Ok(write_cmd) = WriteCommand::decode(cmd_bytes.as_ref()) {
if let Some(op) = write_cmd.operation {
match op {
Operation::Insert(insert) => {
let event = d_engine_proto::client::WatchResponse {
key: insert.key.clone(),
value: insert.value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::Delete(delete) => {
let event = d_engine_proto::client::WatchResponse {
key: delete.key.clone(),
value: bytes::Bytes::new(),
event_type:
d_engine_proto::client::WatchEventType::Delete
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
Operation::CompareAndSwap(cas) => {
let event = d_engine_proto::client::WatchResponse {
key: cas.key.clone(),
value: cas.new_value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put
as i32,
error: 0,
};
let _ = broadcast_tx.send(event);
}
}
}
}
}
}
}
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
fn bench_watch_e2e_latency(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
c.bench_function("watch_e2e_latency", |b| {
b.to_async(&runtime).iter(|| async {
let (registry, broadcast_tx) = create_watch_system(1000, 10);
let key = Bytes::from("test_key");
let value = Bytes::from("test_value");
let mut watcher = registry.register(key.clone());
let start = tokio::time::Instant::now();
let event = d_engine_proto::client::WatchResponse {
key: key.clone(),
value: value.clone(),
event_type: d_engine_proto::client::WatchEventType::Put as i32,
error: 0, };
let _ = broadcast_tx.send(event);
if let Some(_event) = watcher.receiver_mut().recv().await {
let latency = start.elapsed();
black_box(latency);
}
});
});
}
criterion_group!(
benches,
bench_apply_without_ttl,
bench_apply_with_ttl,
bench_get_without_ttl,
bench_get_with_ttl_check,
bench_get_expired_ttl,
bench_batch_apply,
bench_batch_apply_with_ttl,
bench_apply_without_watch,
bench_apply_with_1_watcher,
bench_apply_with_10_watchers,
bench_apply_with_100_watchers,
bench_watch_e2e_latency,
);
criterion_main!(benches);