use std::time::Duration;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rust_job_queue_api_worker_system::{
connect, migrate,
queue::{enqueue, fetch_next, mark_succeeded},
JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres;
use tokio::runtime::Runtime;
async fn bootstrap_pool() -> (PgPool, ContainerAsync<Postgres>) {
let container = Postgres::default().start().await.expect("start pg");
let port = container.get_host_port_ipv4(5432).await.expect("port");
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = connect(&PoolConfig::from_url(url).with_max_connections(8))
.await
.expect("connect");
migrate(&pool).await.expect("migrate");
(pool, container)
}
async fn seed_jobs(pool: &PgPool, n: usize) {
for _ in 0..n {
enqueue(
pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({ "text": "lorem" }),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await
.expect("enqueue");
}
}
fn bench_dequeue(c: &mut Criterion) {
let rt = Runtime::new().expect("tokio runtime");
let (pool, _container) = rt.block_on(bootstrap_pool());
let mut group = c.benchmark_group("dequeue_and_ack");
group.measurement_time(Duration::from_secs(15));
group.sample_size(10);
for &n in &[100usize, 500] {
group.throughput(Throughput::Elements(n as u64));
group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, &n| {
b.to_async(&rt).iter(|| async {
seed_jobs(&pool, n).await;
for _ in 0..n {
let job = fetch_next(&pool, "bench-worker")
.await
.expect("fetch_next")
.expect("a job available");
mark_succeeded(&pool, job.id).await.expect("mark_succeeded");
}
});
});
}
group.finish();
}
criterion_group!(benches, bench_dequeue);
criterion_main!(benches);