use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use oxirs_core::concurrent::{
BatchBuilder, BatchBuilderConfig, BatchConfig, BatchOperation, CoalescingStrategy,
ParallelBatchProcessor,
};
use oxirs_core::graph::Graph;
use oxirs_core::model::{NamedNode, Object, Predicate, Subject, Triple};
use oxirs_core::store::IndexedGraph;
use rayon::prelude::*;
use std::hint::black_box;
use std::sync::Arc;
fn create_test_triple(id: usize) -> Triple {
Triple::new(
Subject::NamedNode(NamedNode::new(format!("http://s/{id}")).unwrap()),
Predicate::NamedNode(NamedNode::new(format!("http://p/{}", id % 100)).unwrap()),
Object::NamedNode(NamedNode::new(format!("http://o/{id}")).unwrap()),
)
}
fn create_dataset(size: usize) -> Vec<Triple> {
(0..size).map(create_test_triple).collect()
}
fn bench_parallel_vs_sequential_insert(c: &mut Criterion) {
let mut group = c.benchmark_group("insert_comparison");
for size in [1000, 10000, 100000] {
let dataset = create_dataset(size);
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(
BenchmarkId::new("sequential", size),
&dataset,
|b, dataset| {
b.iter(|| {
let graph = IndexedGraph::new();
for triple in dataset {
graph.insert(black_box(triple));
}
});
},
);
group.bench_with_input(
BenchmarkId::new("parallel", size),
&dataset,
|b, dataset| {
b.iter(|| {
let graph = IndexedGraph::new();
graph.par_insert_batch(black_box(dataset.clone()));
});
},
);
group.bench_with_input(
BenchmarkId::new("batch_processor", size),
&dataset,
|b, dataset| {
b.iter(|| {
let graph = Arc::new(IndexedGraph::new());
let processor = ParallelBatchProcessor::new(BatchConfig::default());
for chunk in dataset.chunks(1000) {
processor
.submit(BatchOperation::insert(chunk.to_vec()))
.unwrap();
}
let graph_clone = graph.clone();
processor
.process(move |op| match op {
BatchOperation::Insert(triples) => {
for triple in triples {
graph_clone.insert(&triple);
}
Ok(())
}
_ => Ok(()),
})
.unwrap();
});
},
);
}
group.finish();
}
fn bench_batch_builder_strategies(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_builder_strategies");
let dataset = create_dataset(10000);
for strategy in [
CoalescingStrategy::None,
CoalescingStrategy::Deduplicate,
CoalescingStrategy::Merge,
CoalescingStrategy::OptimizeOrder,
] {
group.bench_with_input(
BenchmarkId::new("strategy", format!("{strategy:?}")),
&dataset,
|b, dataset| {
b.iter(|| {
let mut builder = BatchBuilder::new(BatchBuilderConfig {
coalescing_strategy: strategy,
auto_flush: false,
..Default::default()
});
for triple in dataset {
builder.insert(black_box(triple.clone())).unwrap();
}
builder.flush().unwrap()
});
},
);
}
group.finish();
}
fn bench_parallel_query_patterns(c: &mut Criterion) {
let mut group = c.benchmark_group("parallel_query");
let graph = Arc::new(IndexedGraph::new());
for i in 0..10000 {
for j in 0..10 {
let triple = Triple::new(
Subject::NamedNode(NamedNode::new(format!("http://s/{i}")).unwrap()),
Predicate::NamedNode(NamedNode::new(format!("http://p/{j}")).unwrap()),
Object::NamedNode(NamedNode::new(format!("http://o/{i}-{j}")).unwrap()),
);
graph.insert(&triple);
}
}
for num_patterns in [10, 100, 1000] {
let patterns: Vec<_> = (0..num_patterns)
.map(|i| {
(
Some(Subject::NamedNode(
NamedNode::new(format!("http://s/{i}")).unwrap(),
)),
None,
None,
)
})
.collect();
group.throughput(Throughput::Elements(num_patterns as u64));
group.bench_with_input(
BenchmarkId::new("sequential", num_patterns),
&patterns,
|b, patterns| {
b.iter(|| {
let mut results = Vec::new();
for (s, p, o) in patterns {
results.push(graph.query(s.as_ref(), p.as_ref(), o.as_ref()));
}
black_box(results)
});
},
);
group.bench_with_input(
BenchmarkId::new("parallel", num_patterns),
&patterns,
|b, patterns| {
b.iter(|| black_box(graph.par_query_batch(patterns.clone())));
},
);
}
group.finish();
}
fn bench_work_stealing_efficiency(c: &mut Criterion) {
let mut group = c.benchmark_group("work_stealing");
let mut operations = Vec::new();
for i in 0..1000 {
let size = if i % 10 == 0 { 100 } else { 10 };
operations.push(BatchOperation::insert(create_dataset(size)));
}
for num_threads in [1, 2, 4, 8] {
group.bench_with_input(
BenchmarkId::new("threads", num_threads),
&operations,
|b, operations| {
b.iter(|| {
let processor = ParallelBatchProcessor::new(BatchConfig {
num_threads: Some(num_threads),
..Default::default()
});
for op in operations {
processor.submit(op.clone()).unwrap();
}
processor
.process(|op| match op {
BatchOperation::Insert(triples) => {
std::thread::sleep(std::time::Duration::from_micros(
triples.len() as u64
));
Ok(())
}
_ => Ok(()),
})
.unwrap()
});
},
);
}
group.finish();
}
fn bench_parallel_transform(c: &mut Criterion) {
let mut group = c.benchmark_group("parallel_transform");
for size in [1000, 10000, 50000] {
let graph = Arc::new(IndexedGraph::new());
let dataset = create_dataset(size);
graph.par_insert_batch(dataset);
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(BenchmarkId::new("transform", size), &graph, |b, graph| {
b.iter(|| {
black_box(graph.par_transform(|triple| {
Some(Triple::new(
triple.subject().clone(),
Predicate::NamedNode(NamedNode::new("http://new-predicate").unwrap()),
triple.object().clone(),
))
}))
});
});
group.bench_with_input(BenchmarkId::new("filter", size), &graph, |b, graph| {
b.iter(|| {
black_box(graph.par_filter(|triple| match triple.subject() {
Subject::NamedNode(node) => node.as_str().ends_with('0'),
_ => false,
}))
});
});
}
group.finish();
}
fn bench_batch_size_impact(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_size_impact");
let dataset = create_dataset(100000);
for batch_size in [100, 1000, 5000, 10000] {
group.bench_with_input(
BenchmarkId::new("batch_size", batch_size),
&dataset,
|b, dataset| {
b.iter(|| {
let processor = ParallelBatchProcessor::new(BatchConfig {
batch_size,
..Default::default()
});
for chunk in dataset.chunks(batch_size) {
processor
.submit(BatchOperation::insert(chunk.to_vec()))
.unwrap();
}
processor
.process_rayon(|op| match op {
BatchOperation::Insert(triples) => Ok(triples.len()),
_ => Ok(0),
})
.unwrap()
});
},
);
}
group.finish();
}
fn bench_memory_efficiency(c: &mut Criterion) {
let mut group = c.benchmark_group("memory_efficiency");
let dataset = create_dataset(50000);
group.bench_function("with_deduplication", |b| {
b.iter(|| {
let mut builder = BatchBuilder::new(BatchBuilderConfig {
coalescing_strategy: CoalescingStrategy::Deduplicate,
auto_flush: false,
..Default::default()
});
for triple in &dataset[..10000] {
builder.insert(triple.clone()).unwrap();
builder.insert(triple.clone()).unwrap(); }
let batches = builder.flush().unwrap();
black_box(batches)
});
});
group.bench_function("without_deduplication", |b| {
b.iter(|| {
let mut builder = BatchBuilder::new(BatchBuilderConfig {
coalescing_strategy: CoalescingStrategy::None,
auto_flush: false,
..Default::default()
});
for triple in &dataset[..10000] {
builder.insert(triple.clone()).unwrap();
builder.insert(triple.clone()).unwrap(); }
let batches = builder.flush().unwrap();
black_box(batches)
});
});
group.finish();
}
fn bench_graph_parallel_methods(c: &mut Criterion) {
let mut group = c.benchmark_group("graph_parallel_methods");
for size in [1000, 10000, 50000] {
let dataset = create_dataset(size);
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(
BenchmarkId::new("graph_par_insert", size),
&dataset,
|b, dataset| {
b.iter(|| {
let mut graph = Graph::new();
black_box(graph.par_insert_batch(dataset.clone()).unwrap())
});
},
);
group.bench_with_input(
BenchmarkId::new("graph_seq_insert", size),
&dataset,
|b, dataset| {
b.iter(|| {
let mut graph = Graph::new();
for triple in dataset {
graph.add_triple(triple.clone());
}
black_box(graph.len())
});
},
);
let mut query_graph = Graph::new();
query_graph.extend(dataset.clone());
let queries: Vec<_> = (0..100)
.map(|i| {
(
Some(Subject::NamedNode(
NamedNode::new(format!("http://s/{i}")).unwrap(),
)),
None,
None,
)
})
.collect();
group.bench_with_input(
BenchmarkId::new("graph_par_query", size),
&queries,
|b, queries| {
b.iter(|| black_box(query_graph.par_query_batch(queries.clone()).unwrap()));
},
);
let patterns: Vec<_> = (0..50)
.map(|i| {
(
None,
Some(Predicate::NamedNode(
NamedNode::new(format!("http://p/{i}")).unwrap(),
)),
None,
)
})
.collect();
group.bench_with_input(
BenchmarkId::new("graph_par_count", size),
&patterns,
|b, patterns| {
b.iter(|| black_box(query_graph.par_count_patterns(patterns.clone())));
},
);
group.bench_with_input(
BenchmarkId::new("graph_par_unique", size),
&query_graph,
|b, graph| {
b.iter(|| black_box(graph.par_unique_terms()));
},
);
}
group.finish();
}
fn bench_graph_parallel_transform(c: &mut Criterion) {
let mut group = c.benchmark_group("graph_parallel_transform");
for size in [5000, 20000] {
let mut graph = Graph::new();
graph.extend(create_dataset(size));
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(
BenchmarkId::new("modify_predicates", size),
&graph,
|b, graph| {
let mut test_graph = graph.clone();
b.iter(|| {
let (transformed, removed) = test_graph
.par_transform(|triple| {
Some(Triple::new(
triple.subject().clone(),
Predicate::NamedNode(NamedNode::new("http://p/modified").unwrap()),
triple.object().clone(),
))
})
.unwrap();
black_box((transformed, removed))
});
},
);
group.bench_with_input(
BenchmarkId::new("filter_evens", size),
&graph,
|b, graph| {
let mut test_graph = graph.clone();
b.iter(|| {
let (transformed, removed) = test_graph
.par_transform(|triple| {
if let Subject::NamedNode(node) = triple.subject() {
if let Some(id_str) = node.as_str().strip_prefix("http://s/") {
if let Ok(id) = id_str.parse::<usize>() {
if id % 2 == 0 {
return None; }
}
}
}
Some(triple.clone())
})
.unwrap();
black_box((transformed, removed))
});
},
);
}
group.finish();
}
fn bench_graph_parallel_iterator(c: &mut Criterion) {
let mut group = c.benchmark_group("graph_parallel_iterator");
for size in [10000, 50000] {
let mut graph = Graph::new();
graph.extend(create_dataset(size));
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(
BenchmarkId::new("par_iter_filter", size),
&graph,
|b, graph| {
b.iter(|| {
let result: Vec<_> = graph
.par_iter()
.filter(|triple| match (triple.subject(), triple.object()) {
(Subject::NamedNode(s), Object::NamedNode(o)) => {
s.as_str().ends_with("0") || o.as_str().ends_with("5")
}
_ => false,
})
.cloned()
.collect();
black_box(result.len())
});
},
);
group.bench_with_input(
BenchmarkId::new("seq_iter_filter", size),
&graph,
|b, graph| {
b.iter(|| {
let result: Vec<_> = graph
.iter_triples()
.filter(|triple| match (triple.subject(), triple.object()) {
(Subject::NamedNode(s), Object::NamedNode(o)) => {
s.as_str().ends_with("0") || o.as_str().ends_with("5")
}
_ => false,
})
.cloned()
.collect();
black_box(result.len())
});
},
);
}
group.finish();
}
criterion_group!(
benches,
bench_parallel_vs_sequential_insert,
bench_batch_builder_strategies,
bench_parallel_query_patterns,
bench_work_stealing_efficiency,
bench_parallel_transform,
bench_batch_size_impact,
bench_memory_efficiency,
bench_graph_parallel_methods,
bench_graph_parallel_transform,
bench_graph_parallel_iterator
);
criterion_main!(benches);