use std::time::{Duration, Instant};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Priority, Scheduler, SchedulerEvent, TaskError,
TaskStore, TaskSubmission, TypedExecutor, TypedTask,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;
struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}
#[derive(Serialize, Deserialize)]
struct BenchTask;
impl TypedTask for BenchTask {
type Domain = BenchDomain;
const TASK_TYPE: &'static str = "test";
}
#[derive(Serialize, Deserialize)]
struct ByteTestTask;
impl TypedTask for ByteTestTask {
type Domain = BenchDomain;
const TASK_TYPE: &'static str = "byte-test";
}
struct NoopExecutor;
impl TypedExecutor<BenchTask> for NoopExecutor {
async fn execute<'a>(
&'a self,
_payload: BenchTask,
_ctx: DomainTaskContext<'a, BenchDomain>,
) -> Result<(), TaskError> {
Ok(())
}
}
struct ByteProgressExecutor {
total: u64,
chunk_size: u64,
}
impl TypedExecutor<ByteTestTask> for ByteProgressExecutor {
async fn execute<'a>(
&'a self,
_payload: ByteTestTask,
ctx: DomainTaskContext<'a, BenchDomain>,
) -> Result<(), TaskError> {
ctx.set_bytes_total(self.total);
let mut remaining = self.total;
while remaining > 0 {
let chunk = remaining.min(self.chunk_size);
ctx.add_bytes(chunk);
remaining -= chunk;
}
Ok(())
}
}
async fn build_scheduler(max_concurrency: usize) -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<BenchDomain>::new().task::<BenchTask>(NoopExecutor))
.max_concurrency(max_concurrency)
.poll_interval(std::time::Duration::from_millis(10))
.build()
.await
.unwrap()
}
fn bench_submit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("submit_tasks");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(4).await;
let start = Instant::now();
for i in 0..1000 {
sched
.submit(&TaskSubmission::new("bench::test").key(format!("s-{i}")))
.await
.unwrap();
}
total += start.elapsed();
}
total
});
});
group.finish();
}
fn bench_submit_dedup_hit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("submit_dedup_hit");
group.throughput(Throughput::Elements(999));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(4).await;
sched
.submit(&TaskSubmission::new("bench::test").key("same-key"))
.await
.unwrap();
let start = Instant::now();
for _ in 0..999 {
sched
.submit(&TaskSubmission::new("bench::test").key("same-key"))
.await
.unwrap();
}
total += start.elapsed();
}
total
});
});
group.finish();
}
fn bench_dispatch_and_complete(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("dispatch_and_complete");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(8).await;
let start = Instant::now();
for i in 0..1000 {
sched
.submit(&TaskSubmission::new("bench::test").key(format!("d-{i}")))
.await
.unwrap();
}
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let mut completed = 0;
while completed < 1000 {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
group.finish();
}
fn bench_peek_next_varying_depth(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("peek_next");
group.throughput(Throughput::Elements(1));
for size in [100, 1000, 5000] {
let store = rt.block_on(async {
let store = TaskStore::open_memory().await.unwrap();
for i in 0..size {
store
.submit(&TaskSubmission::new("test").key(format!("pk-{i}")))
.await
.unwrap();
}
store
});
group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| {
let store = store.clone();
b.to_async(&rt).iter_custom(|iters| {
let store = store.clone();
async move {
let start = Instant::now();
for _ in 0..iters {
let _ = store.peek_next(None).await.unwrap();
}
start.elapsed()
}
});
});
}
group.finish();
}
fn bench_concurrency_scaling(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("concurrency_scaling");
group.throughput(Throughput::Elements(500));
for concurrency in [1, 2, 4, 8] {
group.bench_with_input(
BenchmarkId::from_parameter(concurrency),
&concurrency,
|b, &concurrency| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(concurrency).await;
let start = Instant::now();
for i in 0..500 {
sched
.submit(&TaskSubmission::new("bench::test").key(format!("cs-{i}")))
.await
.unwrap();
}
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let mut completed = 0;
while completed < 500 {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
},
);
}
group.finish();
}
fn bench_batch_submit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("batch_submit");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(4).await;
let submissions: Vec<_> = (0..1000)
.map(|i| TaskSubmission::new("bench::test").key(format!("b-{i}")))
.collect();
let start = Instant::now();
sched.submit_batch(&submissions).await.unwrap();
total += start.elapsed();
}
total
});
});
group.finish();
}
fn bench_mixed_priority_dispatch(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("mixed_priority_dispatch");
group.throughput(Throughput::Elements(500));
group.bench_function("500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(4).await;
let start = Instant::now();
let priorities = [
Priority::IDLE,
Priority::BACKGROUND,
Priority::NORMAL,
Priority::HIGH,
Priority::REALTIME,
];
for i in 0..500 {
let priority = priorities[i % priorities.len()];
sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("mp-{i}"))
.priority(priority),
)
.await
.unwrap();
}
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let mut completed = 0;
while completed < 500 {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
group.finish();
}
fn bench_byte_progress_overhead(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("byte_progress");
group.throughput(Throughput::Elements(500));
group.bench_function("noop_500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(8).await;
let start = Instant::now();
for i in 0..500 {
sched
.submit(&TaskSubmission::new("bench::test").key(format!("bp-noop-{i}")))
.await
.unwrap();
}
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let mut completed = 0;
while completed < 500 {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
group.bench_function("byte_reporting_500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<BenchDomain>::new().task::<ByteTestTask>(
ByteProgressExecutor {
total: 1_048_576,
chunk_size: 1024,
},
))
.max_concurrency(8)
.poll_interval(std::time::Duration::from_millis(10))
.progress_interval(std::time::Duration::from_millis(100))
.build()
.await
.unwrap();
let start = Instant::now();
for i in 0..500 {
sched
.submit(&TaskSubmission::new("bench::byte-test").key(format!("bp-{i}")))
.await
.unwrap();
}
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let mut completed = 0;
while completed < 500 {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
group.finish();
}
fn bench_byte_progress_snapshot(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("byte_progress_snapshot");
group.throughput(Throughput::Elements(100));
group.bench_function("100_tasks", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<BenchDomain>::new().task::<ByteTestTask>(
ByteProgressExecutor {
total: 10_485_760,
chunk_size: 65_536,
},
))
.max_concurrency(100)
.poll_interval(std::time::Duration::from_millis(10))
.build()
.await
.unwrap();
for i in 0..100 {
sched
.submit(&TaskSubmission::new("bench::byte-test").key(format!("snap-{i}")))
.await
.unwrap();
}
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let start = Instant::now();
for _ in 0..100 {
let _ = sched.snapshot().await;
}
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
group.finish();
}
criterion_group!(
benches,
bench_submit,
bench_submit_dedup_hit,
bench_dispatch_and_complete,
bench_peek_next_varying_depth,
bench_concurrency_scaling,
bench_batch_submit,
bench_mixed_priority_dispatch,
bench_byte_progress_overhead,
bench_byte_progress_snapshot,
);
criterion_main!(benches);