use std::time::Duration;
use bytes::Bytes;
use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::black_box;
use criterion::criterion_group;
use criterion::criterion_main;
use d_engine_core::Lease;
use d_engine_core::StateMachine;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::Insert;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::common::Noop;
use d_engine_proto::common::entry_payload::Payload;
use d_engine_server::storage::FileStateMachine;
use prost::Message;
use tempfile::TempDir;
async fn create_test_state_machine() -> (FileStateMachine, TempDir) {
use d_engine_server::storage::DefaultLease;
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let mut sm = FileStateMachine::new(temp_dir.path().to_path_buf())
.await
.expect("Failed to create state machine");
let lease = std::sync::Arc::new(DefaultLease::new(lease_config));
sm.set_lease(lease);
sm.load_lease_data().await.expect("Failed to load lease data");
(sm, temp_dir)
}
fn create_entries_with_ttl(
count: usize,
start_index: u64,
ttl_secs: u64,
) -> Vec<Entry> {
(0..count)
.map(|i| {
let key = format!("key_ttl_{}", start_index + i as u64);
let value = format!("value_ttl_{}", start_index + i as u64);
let insert = Insert {
key: Bytes::from(key),
value: Bytes::from(value),
ttl_secs,
};
let write_cmd = WriteCommand {
operation: Some(Operation::Insert(insert)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index: start_index + i as u64,
term: 1,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
})
.collect()
}
fn create_noop_entry(index: u64) -> Entry {
Entry {
index,
term: 1,
payload: Some(EntryPayload {
payload: Some(Payload::Noop(Noop {})),
}),
}
}
fn bench_piggyback_cleanup(c: &mut Criterion) {
use d_engine_server::storage::DefaultLease;
let mut group = c.benchmark_group("piggyback_cleanup");
for expired_count in [10, 50, 100, 500].iter() {
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let lease = DefaultLease::new(lease_config);
for i in 0..*expired_count {
let key = format!("key_ttl_{i}");
lease.register(bytes::Bytes::from(key), 1); }
std::thread::sleep(Duration::from_secs(2));
group.bench_with_input(
BenchmarkId::from_parameter(expired_count),
expired_count,
|b, _| {
b.iter(|| {
let expired_keys = lease.on_apply();
black_box(expired_keys);
});
},
);
}
group.finish();
}
fn bench_ttl_registration(c: &mut Criterion) {
use d_engine_server::storage::DefaultLease;
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let lease = DefaultLease::new(lease_config);
c.bench_function("ttl_registration", |b| {
let mut counter = 0u64;
b.iter(|| {
let key = format!("key_{counter}");
lease.register(bytes::Bytes::from(key), 3600);
counter += 1;
black_box(());
});
});
}
fn bench_batch_ttl_registration(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("batch_ttl_registration");
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
for size in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(size, 1, 3600);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
});
}
group.finish();
}
fn bench_mixed_ttl_workload(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("mixed_ttl_workload");
group.sample_size(10);
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let expired_entries = create_entries_with_ttl(50, 1, 1);
sm.apply_chunk(expired_entries).await.unwrap();
let active_entries = create_entries_with_ttl(50, 51, 3600);
sm.apply_chunk(active_entries).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
(sm, temp_dir)
});
group.bench_function("cleanup_mixed", |b| {
b.to_async(&runtime).iter(|| async {
let noop = create_noop_entry(10000);
sm.apply_chunk(vec![noop]).await.unwrap();
black_box(());
});
});
group.finish();
}
fn bench_piggyback_high_frequency(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("piggyback_high_frequency");
group.sample_size(10);
let (_temp_dir, sm) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
(temp_dir, sm)
});
group.bench_function("cleanup_with_expired_entries", |b| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(100, 1, 1);
sm.apply_chunk(entries).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let noop = create_noop_entry(10000);
sm.apply_chunk(vec![noop]).await.unwrap();
black_box(());
});
});
group.finish();
}
fn bench_varying_ttl_durations(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("varying_ttl_durations");
let (sm, _temp_dir) = runtime.block_on(async { create_test_state_machine().await });
for ttl_secs in [60, 3600, 86400].iter() {
group.bench_with_input(
BenchmarkId::from_parameter(ttl_secs),
ttl_secs,
|b, &ttl_secs| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(100, 1, ttl_secs);
sm.apply_chunk(entries).await.unwrap();
black_box(());
});
},
);
}
group.finish();
}
fn bench_worst_case_all_expired(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("worst_case_all_expired");
group.sample_size(10);
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let entries = create_entries_with_ttl(1000, 1, 1);
sm.apply_chunk(entries).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
(sm, temp_dir)
});
group.bench_function("cleanup_all_expired", |b| {
b.to_async(&runtime).iter(|| async {
let entries = create_entries_with_ttl(1000, 1, 1);
sm.apply_chunk(entries).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let noop = create_noop_entry(10000);
sm.apply_chunk(vec![noop]).await.unwrap();
black_box(());
});
});
group.finish();
}
fn bench_best_case_no_expired(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let mut group = c.benchmark_group("best_case_no_expired");
group.sample_size(10);
let (sm, _temp_dir) = runtime.block_on(async {
let (sm, temp_dir) = create_test_state_machine().await;
let entries = create_entries_with_ttl(1000, 1, 86400); sm.apply_chunk(entries).await.unwrap();
(sm, temp_dir)
});
group.bench_function("cleanup_no_expired", |b| {
b.to_async(&runtime).iter(|| async {
let noop = create_noop_entry(10000);
sm.apply_chunk(vec![noop]).await.unwrap();
black_box(());
});
});
group.finish();
}
criterion_group!(
benches,
bench_piggyback_cleanup,
bench_ttl_registration,
bench_batch_ttl_registration,
bench_mixed_ttl_workload,
bench_piggyback_high_frequency,
bench_varying_ttl_durations,
bench_worst_case_all_expired,
bench_best_case_no_expired,
);
criterion_main!(benches);