#![cfg(feature = "worker")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use rust_job_queue_api_worker_system::{
connect, migrate,
queue::{enqueue, fetch_next, mark_succeeded},
JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use sqlx::query::query;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt};
use testcontainers_modules::postgres::Postgres;
use tokio::runtime::Builder;
use tokio::sync::Barrier;
const TOTAL_JOBS: usize = 2_000;
const CONCURRENCIES: &[usize] = &[1, 2, 4, 8, 16];
const SAMPLES: usize = 3;
#[derive(Clone, Copy)]
enum PgConfig {
Default,
Tuned,
AsyncCommit,
}
impl PgConfig {
fn label(&self) -> &'static str {
match self {
PgConfig::Default => "default",
PgConfig::Tuned => "tuned",
PgConfig::AsyncCommit => "async_commit",
}
}
fn description(&self) -> &'static str {
match self {
PgConfig::Default => "postgres:16-alpine, defaults",
PgConfig::Tuned => "shared_buffers=256MB, work_mem=8MB, max_connections=200",
PgConfig::AsyncCommit => "synchronous_commit=off, max_connections=200",
}
}
fn cmd(&self) -> Vec<String> {
match self {
PgConfig::Default => vec!["postgres".into()],
PgConfig::Tuned => vec![
"postgres".into(),
"-c".into(),
"shared_buffers=256MB".into(),
"-c".into(),
"work_mem=8MB".into(),
"-c".into(),
"max_connections=200".into(),
],
PgConfig::AsyncCommit => vec![
"postgres".into(),
"-c".into(),
"synchronous_commit=off".into(),
"-c".into(),
"max_connections=200".into(),
],
}
}
}
struct Measurement {
config: &'static str,
config_description: &'static str,
concurrency: usize,
elapsed_per_sample: Vec<Duration>,
}
impl Measurement {
fn throughputs_jobs_per_sec(&self) -> Vec<f64> {
self.elapsed_per_sample
.iter()
.map(|d| TOTAL_JOBS as f64 / d.as_secs_f64())
.collect()
}
fn mean(&self) -> f64 {
let v = self.throughputs_jobs_per_sec();
v.iter().sum::<f64>() / v.len() as f64
}
fn stddev(&self) -> f64 {
let v = self.throughputs_jobs_per_sec();
let mean = self.mean();
let var = v.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / v.len() as f64;
var.sqrt()
}
}
async fn boot_pg(config: PgConfig) -> ContainerAsync<Postgres> {
Postgres::default()
.with_cmd(config.cmd())
.start()
.await
.expect("start pg container")
}
async fn fresh_pool(container: &ContainerAsync<Postgres>) -> PgPool {
let port = container.get_host_port_ipv4(5432).await.expect("port");
let admin = sqlx_postgres::PgPool::connect(&format!(
"postgres://postgres:postgres@127.0.0.1:{port}/postgres"
))
.await
.expect("admin connect");
let db = format!("t{}", uuid::Uuid::now_v7().simple());
query(&format!("CREATE DATABASE {db}"))
.execute(&admin)
.await
.expect("create db");
admin.close().await;
let pool = connect(
&PoolConfig::from_url(format!(
"postgres://postgres:postgres@127.0.0.1:{port}/{db}"
))
.with_max_connections(32),
)
.await
.expect("pool connect");
migrate(&pool).await.expect("migrate");
pool
}
async fn seed_jobs(pool: &PgPool, n: usize) {
for _ in 0..n {
enqueue(
pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({ "text": "lorem" }),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await
.expect("enqueue");
}
}
async fn run_workers(pool: &PgPool, concurrency: usize, target: usize) -> Duration {
let processed = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(concurrency + 1));
let mut tasks = Vec::with_capacity(concurrency);
for i in 0..concurrency {
let pool = pool.clone();
let processed = processed.clone();
let barrier = barrier.clone();
let worker_id = format!("bench-worker-{i}");
tasks.push(tokio::spawn(async move {
barrier.wait().await;
loop {
if processed.load(Ordering::Relaxed) >= target {
return;
}
match fetch_next(&pool, &worker_id).await {
Ok(Some(job)) => {
let _ = mark_succeeded(&pool, job.id).await;
processed.fetch_add(1, Ordering::Relaxed);
}
Ok(None) => {
if processed.load(Ordering::Relaxed) >= target {
return;
}
tokio::task::yield_now().await;
}
Err(e) => {
eprintln!("bench: fetch_next error: {e}");
return;
}
}
}
}));
}
barrier.wait().await;
let started = Instant::now();
for t in tasks {
let _ = t.await;
}
started.elapsed()
}
fn main() {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime");
let configs = [PgConfig::Default, PgConfig::Tuned, PgConfig::AsyncCommit];
let mut measurements: Vec<Measurement> = Vec::new();
for cfg in configs {
eprintln!(
"\n=== Postgres config: {} ({}) ===",
cfg.label(),
cfg.description()
);
rt.block_on(async {
let container = boot_pg(cfg).await;
for &concurrency in CONCURRENCIES {
let mut samples = Vec::with_capacity(SAMPLES);
for sample_i in 0..SAMPLES {
let pool = fresh_pool(&container).await;
seed_jobs(&pool, TOTAL_JOBS).await;
let elapsed = run_workers(&pool, concurrency, TOTAL_JOBS).await;
let throughput = TOTAL_JOBS as f64 / elapsed.as_secs_f64();
eprintln!(
" concurrency={concurrency:>2} sample={} elapsed={:>7.3}s throughput={:>8.1} jobs/s",
sample_i + 1,
elapsed.as_secs_f64(),
throughput
);
samples.push(elapsed);
pool.close().await;
}
measurements.push(Measurement {
config: cfg.label(),
config_description: cfg.description(),
concurrency,
elapsed_per_sample: samples,
});
}
});
}
print_markdown_results(&measurements);
}
fn print_markdown_results(measurements: &[Measurement]) {
println!();
println!("<!-- BEGIN: bench results table -->");
println!();
println!("| Postgres config | Concurrency | Throughput (jobs/s) | Stddev (jobs/s) | Samples |");
println!("|---|---:|---:|---:|---:|");
let mut last_cfg = "";
for m in measurements {
let mean = m.mean();
let stddev = m.stddev();
let cfg_label = if m.config == last_cfg {
"".to_string()
} else {
format!("**{}** _({})_", m.config, m.config_description)
};
last_cfg = m.config;
println!(
"| {} | {} | {:.1} | {:.1} | {} |",
cfg_label,
m.concurrency,
mean,
stddev,
m.elapsed_per_sample.len()
);
}
println!();
println!("<!-- END: bench results table -->");
}