use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
use dynamo_runtime::compute::ComputePool;
use std::sync::Arc;
fn compute_primes_sum(n: u64) -> u64 {
let mut sum = 0u64;
for candidate in 2..=n {
if is_prime(candidate) {
sum += candidate;
}
}
sum
}
fn is_prime(n: u64) -> bool {
if n <= 1 {
return false;
}
if n <= 3 {
return true;
}
if n.is_multiple_of(2) || n.is_multiple_of(3) {
return false;
}
let sqrt_n = (n as f64).sqrt() as u64;
for i in (5..=sqrt_n).step_by(6) {
if n.is_multiple_of(i) || n.is_multiple_of(i + 2) {
return false;
}
}
true
}
fn bench_compute_overhead(c: &mut Criterion) {
let test_sizes = [10, 1_000, 100_000];
let mut group = c.benchmark_group("compute_overhead");
group.sample_size(10);
let tokio_4thread = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let tokio_1thread = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
for n in test_sizes {
group.bench_with_input(BenchmarkId::new("tokio_direct", n), &n, |b, &n| {
b.to_async(&tokio_4thread)
.iter(|| async move { black_box(compute_primes_sum(black_box(n))) });
});
let pool = compute_pool.clone();
group.bench_with_input(BenchmarkId::new("rayon_offload", n), &n, |b, &n| {
b.to_async(&tokio_1thread).iter(|| {
let pool = pool.clone();
async move {
pool.execute(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
}
});
});
group.bench_with_input(BenchmarkId::new("spawn_blocking", n), &n, |b, &n| {
b.to_async(&tokio_4thread).iter(|| async move {
tokio::task::spawn_blocking(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
});
});
}
group.finish();
}
fn bench_parallel_tasks(c: &mut Criterion) {
let mut group = c.benchmark_group("parallel_tasks");
group.sample_size(10);
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
for batch_size in [10, 100] {
let n = 10_000;
group.bench_with_input(
BenchmarkId::new("tokio_direct_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| async move {
let tasks = (0..batch_size)
.map(|_| tokio::spawn(async move { compute_primes_sum(n) }))
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
});
},
);
let pool = compute_pool.clone();
group.bench_with_input(
BenchmarkId::new("rayon_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| {
let pool = pool.clone();
async move {
let tasks = (0..batch_size)
.map(|_| {
let pool = pool.clone();
tokio::spawn(async move {
pool.execute(move || compute_primes_sum(n)).await.unwrap()
})
})
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
}
});
},
);
group.bench_with_input(
BenchmarkId::new("spawn_blocking_parallel", batch_size),
&batch_size,
|b, &batch_size| {
b.to_async(&tokio_runtime).iter(|| async move {
let tasks = (0..batch_size)
.map(|_| {
tokio::spawn(async move {
tokio::task::spawn_blocking(move || compute_primes_sum(n))
.await
.unwrap()
})
})
.collect::<Vec<_>>();
for task in tasks {
black_box(task.await.unwrap());
}
});
},
);
}
group.finish();
}
fn bench_block_in_place_overhead(c: &mut Criterion) {
let test_sizes = [10, 1_000, 100_000];
let mut group = c.benchmark_group("block_in_place_overhead");
group.sample_size(10);
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
let compute_config = dynamo_runtime::compute::ComputeConfig {
num_threads: Some(4),
stack_size: Some(2 * 1024 * 1024),
thread_prefix: "bench".to_string(),
pin_threads: false,
};
let compute_pool = Arc::new(ComputePool::new(compute_config).unwrap());
for n in test_sizes {
group.bench_with_input(BenchmarkId::new("direct", n), &n, |b, &n| {
b.to_async(&tokio_runtime)
.iter(|| async move { black_box(compute_primes_sum(black_box(n))) });
});
group.bench_with_input(BenchmarkId::new("block_in_place", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| async move {
tokio::task::block_in_place(|| black_box(compute_primes_sum(black_box(n))))
});
});
group.bench_with_input(BenchmarkId::new("spawn_blocking", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| async move {
tokio::task::spawn_blocking(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
});
});
let pool = compute_pool.clone();
group.bench_with_input(BenchmarkId::new("rayon_offload", n), &n, |b, &n| {
b.to_async(&tokio_runtime).iter(|| {
let pool = pool.clone();
async move {
pool.execute(move || black_box(compute_primes_sum(black_box(n))))
.await
.unwrap()
}
});
});
}
group.finish();
}
criterion_group!(
benches,
bench_compute_overhead,
bench_parallel_tasks,
bench_block_in_place_overhead
);
criterion_main!(benches);