use super::TestContext;
use bytes::Bytes;
use d_engine_core::{
BufferedRaftLog, FlushPolicy, PersistenceConfig, PersistenceStrategy, RaftLog,
};
use d_engine_proto::common::{Entry, EntryPayload};
use d_engine_server::{FileStateMachine, FileStorageEngine, node::RaftTypeConfig};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tempfile::tempdir;
use tokio::time::Instant;
mod filter_out_conflicts_and_append_performance_tests {
use super::*;
#[tokio::test]
async fn test_filter_out_conflicts_performance_consistent_across_flush_intervals_fresh_cluster()
{
let test_cases = vec![
(10, 50), (100, 50), (1000, 50), ];
for (interval_ms, max_duration_ms) in test_cases {
let config = PersistenceConfig {
strategy: PersistenceStrategy::MemFirst,
flush_policy: FlushPolicy::Batch {
threshold: 1000,
interval_ms,
},
max_buffered_entries: 1000,
..Default::default()
};
let temp_dir = tempdir().unwrap();
let path = temp_dir.path().to_path_buf();
let (log, receiver) = BufferedRaftLog::<
RaftTypeConfig<FileStorageEngine, FileStateMachine>,
>::new(
1, config, Arc::new(FileStorageEngine::new(path).unwrap())
);
let log = log.start(receiver);
let mut entries = vec![];
for i in 1..=1000 {
entries.push(Entry {
index: i,
term: 1,
payload: Some(EntryPayload::command(Bytes::from(vec![0; 256]))), });
}
log.append_entries(entries.clone()).await.unwrap();
let start = Instant::now();
log.filter_out_conflicts_and_append(
0, 0, vec![Entry {
index: 501,
term: 1,
payload: Some(EntryPayload::command(Bytes::from(vec![1; 256]))),
}],
)
.await
.unwrap();
let duration = start.elapsed().as_millis() as u64;
println!("Interval {interval_ms}ms: Took {duration}ms");
assert!(
duration <= max_duration_ms,
"Duration {duration}ms exceeds max {max_duration_ms}ms for {interval_ms}ms interval"
);
assert!(log.entry(500).unwrap().is_none());
assert!(log.entry(501).unwrap().is_some());
assert!(log.entry(502).unwrap().is_none());
}
}
#[tokio::test]
async fn test_filter_out_conflicts_performance_consistent_across_flush_intervals() {
let test_cases = vec![
(10, 50), (100, 50), (1000, 50), ];
for (interval_ms, max_duration_ms) in test_cases {
let config = PersistenceConfig {
strategy: PersistenceStrategy::MemFirst,
flush_policy: FlushPolicy::Batch {
threshold: 1000,
interval_ms,
},
max_buffered_entries: 1000,
..Default::default()
};
let temp_dir = tempdir().unwrap();
let path = temp_dir.path().to_path_buf();
let (log, receiver) = BufferedRaftLog::<
RaftTypeConfig<FileStorageEngine, FileStateMachine>,
>::new(
1, config, Arc::new(FileStorageEngine::new(path).unwrap())
);
let log = log.start(receiver);
let mut entries = vec![];
for i in 1..=1000 {
entries.push(Entry {
index: i,
term: 1,
payload: Some(EntryPayload::command(Bytes::from(vec![0; 256]))), });
}
log.append_entries(entries.clone()).await.unwrap();
let start = Instant::now();
log.filter_out_conflicts_and_append(
500, 1, vec![Entry {
index: 501,
term: 1,
payload: Some(EntryPayload::command(Bytes::from(vec![1; 256]))),
}],
)
.await
.unwrap();
let duration = start.elapsed().as_millis() as u64;
println!("Interval {interval_ms}ms: Took {duration}ms");
assert!(
duration <= max_duration_ms,
"Duration {duration}ms exceeds max {max_duration_ms}ms for {interval_ms}ms interval"
);
assert!(log.entry(500).unwrap().is_some());
assert!(log.entry(501).unwrap().is_some());
assert!(log.entry(502).unwrap().is_none()); }
}
}
#[tokio::test]
async fn test_last_entry_id_performance() {
let test_context = TestContext::new(
PersistenceStrategy::MemFirst,
FlushPolicy::Batch {
threshold: 1_000_000,
interval_ms: 360_000,
},
"test_last_entry_id_performance",
);
const ENTRY_COUNT: usize = 1_000_000;
let entries: Vec<_> = (0..ENTRY_COUNT)
.map(|index| Entry {
index: index as u64,
term: index as u64,
payload: Some(EntryPayload::command(Bytes::from(vec![1; 256]))),
})
.collect();
let insert_start = Instant::now();
test_context.raft_log.append_entries(entries).await.unwrap();
let insert_duration = insert_start.elapsed();
println!("Insert duration: {insert_duration:?}");
let mut durations = Vec::with_capacity(10);
let mut last_id = 0;
for _ in 1..1_000 {
let start = Instant::now();
last_id = test_context.raft_log.last_entry_id();
let duration = start.elapsed();
durations.push(duration);
}
let avg_duration = durations.iter().sum::<Duration>() / durations.len() as u32;
println!("Average last_entry_id duration: {avg_duration:?}");
assert!(avg_duration < Duration::from_millis(1));
assert_eq!(last_id, ENTRY_COUNT as u64 - 1);
}
#[tokio::test]
async fn test_performance_benchmarks() {
if std::env::var("CARGO_LLVM_COV").is_ok() || std::env::var("CARGO_TARPAULIN").is_ok() {
eprintln!("Skipping performance test during coverage run");
return;
}
let is_ci = std::env::var("CI").is_ok();
let operations = if is_ci {
[
("append_entries", 500, 500.0),
("get_entries_range", 2500, 25000.0),
("entry_lookup", 5000, 100000.0),
("term_queries", 4000, 25000.0),
]
} else {
[
("append_entries", 1000, 1000.0),
("get_entries_range", 5000, 50000.0),
("entry_lookup", 10000, 200000.0),
("term_queries", 8000, 50000.0),
]
};
let ctx = TestContext::new(
PersistenceStrategy::MemFirst,
FlushPolicy::Batch {
threshold: 1000,
interval_ms: 100,
},
"performance_benchmark",
);
let mut entries = Vec::new();
let pre_populate_count = if is_ci { 5000 } else { 10000 };
for i in 1..=pre_populate_count {
entries.push(Entry {
index: i,
term: i / 100 + 1, payload: Some(EntryPayload::command(Bytes::from(vec![0; 256]))), });
}
ctx.raft_log.append_entries(entries).await.unwrap();
let mut results = HashMap::new();
for (op_name, count, min_ops_per_sec) in operations {
let start = Instant::now();
match op_name {
"append_entries" => {
for i in 0..count {
let entry = Entry {
index: 10000 + i as u64 + 1,
term: 101,
payload: Some(EntryPayload::command(Bytes::from(vec![0; 256]))),
};
ctx.raft_log.append_entries(vec![entry]).await.unwrap();
}
}
"get_entries_range" => {
for i in 0..count {
let start_idx = (i % 9000) as u64 + 1;
let end_idx = start_idx + 100;
let _ = ctx.raft_log.get_entries_range(start_idx..=end_idx);
}
}
"entry_lookup" => {
for i in 0..count {
let index = (i % 10000) as u64 + 1;
let _ = ctx.raft_log.entry(index);
}
}
"term_queries" => {
for i in 0..count {
let term = (i % 100) as u64 + 1;
let _ = ctx.raft_log.first_index_for_term(term);
let _ = ctx.raft_log.last_index_for_term(term);
}
}
_ => {}
}
let duration = start.elapsed();
let ops_per_sec = count as f64 / duration.as_secs_f64();
results.insert(op_name, (duration, ops_per_sec));
println!("{op_name}: {count} operations in {duration:?} ({ops_per_sec:.2} ops/sec)");
assert!(
ops_per_sec > min_ops_per_sec,
"{op_name} operations too slow: {ops_per_sec:.2} ops/sec (expected > {min_ops_per_sec:.2})"
);
}
}
#[tokio::test]
async fn test_read_performance_remains_lockfree() {
let expected_min = if std::env::var("CI").is_ok() {
10.0
} else {
10_000.0
};
let ctx = TestContext::new(
PersistenceStrategy::MemFirst,
FlushPolicy::Batch {
threshold: 1000,
interval_ms: 100,
},
"test_lockfree_reads",
);
ctx.append_entries(1, 10000, 1).await;
let log = ctx.raft_log.clone();
let write_handle = tokio::spawn(async move {
for i in 1..=1000 {
log.append_entries(vec![Entry {
index: 10000 + i,
term: 2,
payload: Some(EntryPayload::command(Bytes::from(vec![0; 1024]))),
}])
.await
.unwrap();
tokio::time::sleep(Duration::from_micros(100)).await;
}
});
let start = Instant::now();
let mut read_count: u64 = 0;
let timeout = Duration::from_millis(200);
while !write_handle.is_finished() && start.elapsed() < timeout {
for i in 1..=100 {
let index = (i * 100).min(10000); let _ = ctx.raft_log.entry(index);
read_count = read_count.saturating_add(1);
}
tokio::task::yield_now().await;
}
write_handle.await.unwrap();
let duration = start.elapsed();
let reads_per_sec = read_count as f64 / duration.as_secs_f64();
println!("Performed {read_count} reads in {duration:?} ({reads_per_sec:.2} reads/sec)",);
assert!(
reads_per_sec > expected_min,
"Read performance degraded: {reads_per_sec:.2} reads/sec (expected > {expected_min:.2})",
);
}