use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use sqlitegraph::backend::SubscriptionFilter;
use sqlitegraph::{EdgeSpec, GraphConfig, NodeSpec, open_graph, snapshot::SnapshotId};
mod bench_utils;
use bench_utils::{MEASURE, WARM_UP, create_benchmark_temp_dir};
fn create_chain_graph(size: usize) -> (tempfile::TempDir, std::path::PathBuf, Vec<i64>) {
let temp_dir = create_benchmark_temp_dir();
let db_path = temp_dir.path().join("benchmark.db");
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to create graph");
let mut node_ids = Vec::with_capacity(size);
for i in 0..size {
let node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
node_ids.push(node_id);
}
for i in 0..size.saturating_sub(1) {
graph
.insert_edge(EdgeSpec {
from: node_ids[i],
to: node_ids[i + 1],
edge_type: "chain".to_string(),
data: serde_json::json!({"order": i}),
})
.expect("Failed to insert edge");
}
(temp_dir, db_path, node_ids)
}
fn bench_memory_baseline(criterion: &mut Criterion) {
let mut group = criterion.benchmark_group("regression_pubsub_memory_baseline");
group.measurement_time(MEASURE);
group.warm_up_time(WARM_UP);
for &size in &[100, 500, 1000] {
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(
BenchmarkId::new("0_subscribers", size),
&size,
|b, &_size| {
b.iter(|| {
let (temp_dir, db_path, node_ids) = create_chain_graph(size);
let graph =
open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let start_node = node_ids[0];
let _result = graph
.bfs(SnapshotId::current(), start_node, size as u32)
.expect("BFS traversal failed");
std::mem::forget(temp_dir);
});
},
);
}
group.finish();
}
fn bench_memory_with_subscribers(criterion: &mut Criterion) {
let mut group = criterion.benchmark_group("regression_pubsub_memory_with_subs");
group.measurement_time(MEASURE);
group.warm_up_time(WARM_UP);
const SIZE: usize = 500;
for &subscriber_count in &[1, 5, 10] {
group.throughput(Throughput::Elements(SIZE as u64));
group.bench_with_input(
BenchmarkId::new("with_subscribers", subscriber_count),
&subscriber_count,
|b, &subscriber_count| {
b.iter(|| {
let (temp_dir, db_path, node_ids) = create_chain_graph(SIZE);
let graph =
open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let mut _receivers = Vec::with_capacity(subscriber_count);
for _ in 0..subscriber_count {
let (_id, rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
_receivers.push(rx);
}
let start_node = node_ids[0];
let _result = graph
.bfs(SnapshotId::current(), start_node, SIZE as u32)
.expect("BFS traversal failed");
std::mem::forget(temp_dir);
});
},
);
}
group.finish();
}
fn bench_memory_event_queue(criterion: &mut Criterion) {
let mut group = criterion.benchmark_group("regression_pubsub_memory_queue");
group.measurement_time(MEASURE);
group.warm_up_time(WARM_UP);
const SIZE: usize = 100;
for &commit_count in &[10, 50, 100] {
group.throughput(Throughput::Elements(commit_count as u64));
group.bench_with_input(
BenchmarkId::new("events_in_queue", commit_count),
&commit_count,
|b, &commit_count| {
b.iter(|| {
let temp_dir = create_benchmark_temp_dir();
let db_path = temp_dir.path().join("benchmark.db");
let graph = open_graph(&db_path, &GraphConfig::native())
.expect("Failed to create graph");
let mut _receivers = Vec::with_capacity(5);
for _ in 0..5 {
let (_id, rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
_receivers.push(rx);
}
for i in 0..commit_count {
let _node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
}
std::mem::forget(temp_dir);
});
},
);
}
group.finish();
}
#[allow(dead_code)]
fn estimate_pubsub_overhead(
subscriber_count: usize,
events_per_subscriber: usize,
) -> (usize, usize, usize) {
let publisher_base = 200;
let per_subscriber_state = 100;
let event_queue_size = events_per_subscriber * 40;
let per_subscriber_total = per_subscriber_state + event_queue_size;
let total_overhead = publisher_base + (subscriber_count * per_subscriber_total);
(publisher_base, per_subscriber_total, total_overhead)
}
criterion_group!(
benches,
bench_memory_baseline,
bench_memory_with_subscribers,
bench_memory_event_queue
);
criterion_main!(benches);