use std::time::Duration;
use crate::core::reactor::Reactor;
use crate::features::storage::api::{self, StorageConfig};
#[derive(Debug, Clone, Copy)]
pub struct WorkloadConfig {
pub seed: u64,
pub node_count: u64,
pub edge_count: u64,
pub hot_node_percent: u8,
pub max_low_degree: u16,
}
impl WorkloadConfig {
pub fn validate(&self) -> api::Result<()> {
if self.node_count == 0 {
return Err(api::StorageError::InvalidInput(
"node_count must be > 0".to_string(),
));
}
if self.hot_node_percent > 100 {
return Err(api::StorageError::InvalidInput(
"hot_node_percent must be <= 100".to_string(),
));
}
if self.max_low_degree == 0 {
return Err(api::StorageError::InvalidInput(
"max_low_degree must be > 0".to_string(),
));
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EdgeInsert {
pub src: u64,
pub dst: u64,
pub version: u64,
}
#[derive(Debug, Clone)]
pub struct WorkloadGenerator {
cfg: WorkloadConfig,
state: u64,
emitted: u64,
}
impl WorkloadGenerator {
pub fn new(cfg: WorkloadConfig) -> api::Result<Self> {
cfg.validate()?;
let state = cfg.seed ^ 0x9E37_79B9_7F4A_7C15_u64;
Ok(Self {
cfg,
state,
emitted: 0,
})
}
fn next_u64(&mut self) -> u64 {
self.state = self
.state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
self.state
}
fn pick_src(&mut self) -> u64 {
let hot_nodes = ((self.cfg.node_count * self.cfg.hot_node_percent as u64) / 100).max(1);
let use_hot = (self.next_u64() % 100) < self.cfg.hot_node_percent as u64;
if use_hot {
self.next_u64() % hot_nodes
} else {
self.next_u64() % self.cfg.node_count
}
}
fn pick_dst(&mut self, src: u64) -> u64 {
let degree_span = self.cfg.max_low_degree as u64;
let mut dst = (src + 1 + (self.next_u64() % degree_span)) % self.cfg.node_count;
if dst == src {
dst = (dst + 1) % self.cfg.node_count;
}
dst
}
}
impl Iterator for WorkloadGenerator {
type Item = EdgeInsert;
fn next(&mut self) -> Option<Self::Item> {
if self.emitted >= self.cfg.edge_count {
return None;
}
let src = self.pick_src();
let dst = self.pick_dst(src);
let version = self.emitted + 1;
self.emitted += 1;
Some(EdgeInsert { src, dst, version })
}
}
pub fn edge_workload_generator(cfg: WorkloadConfig) -> api::Result<WorkloadGenerator> {
WorkloadGenerator::new(cfg)
}
pub fn default_hot_graph_workload(seed: u64) -> WorkloadConfig {
WorkloadConfig {
seed,
node_count: 100_000_000,
edge_count: 200_000_000,
hot_node_percent: 10,
max_low_degree: 16,
}
}
#[derive(Debug, Clone)]
pub struct BenchResult {
pub iterations: u64,
pub duration: Duration,
pub ops_per_sec: f64,
}
#[derive(Debug, Clone)]
pub struct IngestBenchResult {
pub edges: u64,
pub duration: Duration,
pub edges_per_sec: f64,
}
#[derive(Debug, Clone)]
pub struct TraversalBenchResult {
pub samples: u64,
pub hops: u32,
pub p50_micros: f64,
pub p95_micros: f64,
pub p99_micros: f64,
}
#[derive(Debug, Clone)]
pub struct AmplificationBenchResult {
pub iterations: u64,
pub write_amp: Option<f64>,
pub read_amp: Option<f64>,
pub space_amp: Option<f64>,
}
#[derive(Debug, Clone)]
pub struct BenchSuiteResult {
pub seed: u64,
pub iterations: u64,
pub basic_put_get: BenchResult,
pub ingest: IngestBenchResult,
pub traversal: TraversalBenchResult,
pub amplification: AmplificationBenchResult,
}
impl BenchSuiteResult {
pub fn to_json_pretty(&self) -> String {
format!(
concat!(
"{{\n",
" \"seed\": {},\n",
" \"iterations\": {},\n",
" \"basic_put_get\": {{\n",
" \"iterations\": {},\n",
" \"duration_ms\": {},\n",
" \"ops_per_sec\": {:.4}\n",
" }},\n",
" \"ingest\": {{\n",
" \"edges\": {},\n",
" \"duration_ms\": {},\n",
" \"edges_per_sec\": {:.4}\n",
" }},\n",
" \"traversal\": {{\n",
" \"samples\": {},\n",
" \"hops\": {},\n",
" \"p50_micros\": {:.4},\n",
" \"p95_micros\": {:.4},\n",
" \"p99_micros\": {:.4}\n",
" }},\n",
" \"amplification\": {{\n",
" \"iterations\": {},\n",
" \"write_amp\": {},\n",
" \"read_amp\": {},\n",
" \"space_amp\": {}\n",
" }}\n",
"}}\n"
),
self.seed,
self.iterations,
self.basic_put_get.iterations,
self.basic_put_get.duration.as_millis(),
self.basic_put_get.ops_per_sec,
self.ingest.edges,
self.ingest.duration.as_millis(),
self.ingest.edges_per_sec,
self.traversal.samples,
self.traversal.hops,
self.traversal.p50_micros,
self.traversal.p95_micros,
self.traversal.p99_micros,
self.amplification.iterations,
json_opt_f64(self.amplification.write_amp),
json_opt_f64(self.amplification.read_amp),
json_opt_f64(self.amplification.space_amp),
)
}
}
pub struct BenchRunner;
impl BenchRunner {
pub fn run_basic_put_get(
iterations: u64,
base_dir: &std::path::Path,
) -> api::Result<BenchResult> {
Self::run_basic_put_get_with_reactor(
iterations,
base_dir,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
)
}
pub fn run_basic_put_get_with_reactor(
iterations: u64,
base_dir: &std::path::Path,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> api::Result<BenchResult> {
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let mut handle = api::open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 128,
wal_dir,
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir,
},
reactor.clone(),
)?;
let start = reactor.now();
for i in 0..iterations {
let payload = format!("payload-{}", i);
let delta = api::encode_delta(i, i, payload.as_bytes());
api::put_edge_delta(&mut handle, &delta)?;
let _ = api::get_logical_node(&mut handle, i)?;
}
let duration = reactor
.now()
.duration_since(start)
.unwrap_or(Duration::from_secs(0));
let ops_per_sec = if duration.as_secs_f64() > 0.0 {
iterations as f64 / duration.as_secs_f64()
} else {
0.0
};
Ok(BenchResult {
iterations,
duration,
ops_per_sec,
})
}
pub fn run_storage_suite(
iterations: u64,
seed: u64,
base_dir: &std::path::Path,
) -> api::Result<BenchSuiteResult> {
let basic_dir = base_dir.join("basic");
let ingest_dir = base_dir.join("ingest");
let traversal_dir = base_dir.join("traversal");
let amp_dir = base_dir.join("amplification");
std::fs::create_dir_all(&basic_dir)?;
std::fs::create_dir_all(&ingest_dir)?;
std::fs::create_dir_all(&traversal_dir)?;
std::fs::create_dir_all(&_dir)?;
let basic_put_get = Self::run_basic_put_get(iterations, &basic_dir)?;
let ingest = Self::run_ingest_bench(iterations, seed, &ingest_dir)?;
let traversal = Self::run_traversal_bench(iterations, seed, &traversal_dir)?;
let amplification = Self::run_amplification_bench(iterations, seed, &_dir)?;
Ok(BenchSuiteResult {
seed,
iterations,
basic_put_get,
ingest,
traversal,
amplification,
})
}
pub fn run_ingest_bench(
iterations: u64,
seed: u64,
base_dir: &std::path::Path,
) -> api::Result<IngestBenchResult> {
let reactor = std::sync::Arc::new(crate::core::reactor::SystemReactor);
let mut handle = open_bench_store(base_dir, reactor.clone())?;
let node_count = (iterations.saturating_mul(2)).max(1024);
let cfg = WorkloadConfig {
seed,
node_count,
edge_count: iterations,
hot_node_percent: 10,
max_low_degree: 16,
};
const BATCH_SIZE: usize = 1024;
let mut batch: Vec<Vec<u8>> = Vec::with_capacity(BATCH_SIZE);
let start = std::time::Instant::now();
for edge in edge_workload_generator(cfg)? {
let payload = edge.dst.to_le_bytes();
let delta = api::encode_delta(edge.src, edge.version, &payload);
batch.push(delta);
if batch.len() >= BATCH_SIZE {
api::put_edge_deltas_batch(&mut handle, &batch)?;
batch.clear();
}
}
if !batch.is_empty() {
api::put_edge_deltas_batch(&mut handle, &batch)?;
}
let duration = start.elapsed();
api::flush(&mut handle)?;
let edges_per_sec = if duration.as_secs_f64() > 0.0 {
iterations as f64 / duration.as_secs_f64()
} else {
0.0
};
Ok(IngestBenchResult {
edges: iterations,
duration,
edges_per_sec,
})
}
pub fn run_traversal_bench(
iterations: u64,
seed: u64,
base_dir: &std::path::Path,
) -> api::Result<TraversalBenchResult> {
let reactor = std::sync::Arc::new(crate::core::reactor::SystemReactor);
let mut handle = open_bench_store(base_dir, reactor.clone())?;
let graph_nodes = iterations.max(2048);
for node in 0..graph_nodes {
let next = (node + 1) % graph_nodes;
let alt = (node + 7) % graph_nodes;
api::put_full_node(&mut handle, node, 1, &[next, alt])?;
}
api::flush(&mut handle)?;
let samples = iterations.max(1000);
let hops = 10u32;
let mut latencies = Vec::with_capacity(samples as usize);
let hot_span = (graph_nodes / 10).max(1);
let mut state = seed ^ 0xA24B_AED4_0FBF_1234_u64;
for _ in 0..samples {
state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let mut node = state % hot_span;
let start = std::time::Instant::now();
for _ in 0..hops {
let logical = api::get_logical_node(&mut handle, node)?;
let adjacency = logical.adjacency();
if adjacency.is_empty() {
break;
}
node = adjacency[0];
}
latencies.push(start.elapsed().as_secs_f64() * 1_000_000.0);
}
let p50 = percentile(&latencies, 50.0);
let p95 = percentile(&latencies, 95.0);
let p99 = percentile(&latencies, 99.0);
Ok(TraversalBenchResult {
samples,
hops,
p50_micros: p50,
p95_micros: p95,
p99_micros: p99,
})
}
pub fn run_amplification_bench(
iterations: u64,
seed: u64,
base_dir: &std::path::Path,
) -> api::Result<AmplificationBenchResult> {
let reactor = std::sync::Arc::new(crate::core::reactor::SystemReactor);
let mut handle = open_bench_store(base_dir, reactor.clone())?;
let node_count = (iterations / 2).max(1024);
let cfg = WorkloadConfig {
seed: seed ^ 0x5DEECE66D,
node_count,
edge_count: iterations,
hot_node_percent: 10,
max_low_degree: 16,
};
for edge in edge_workload_generator(cfg)? {
let payload = edge.dst.to_le_bytes();
let delta = api::encode_delta(edge.src, edge.version, &payload);
api::put_edge_delta(&mut handle, &delta)?;
}
api::flush(&mut handle)?;
let read_samples = node_count.min(1024);
for node_id in 0..read_samples {
let _ = api::get_logical_node(&mut handle, node_id)?;
}
let report = api::report_metrics(&handle);
Ok(AmplificationBenchResult {
iterations,
write_amp: report.write_amp,
read_amp: report.read_amp,
space_amp: report.space_amp,
})
}
}
fn open_bench_store(
base_dir: &std::path::Path,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> api::Result<api::StorageHandle> {
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
api::open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 128,
wal_dir,
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir,
},
reactor,
)
}
fn percentile(values: &[f64], p: f64) -> f64 {
if values.is_empty() {
return 0.0;
}
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = (((p / 100.0) * ((sorted.len() - 1) as f64)).round() as usize).min(sorted.len() - 1);
sorted[idx]
}
fn json_opt_f64(value: Option<f64>) -> String {
match value {
Some(v) => format!("{:.6}", v),
None => "null".to_string(),
}
}
#[cfg(test)]
mod tests;