use std::time::Duration;
use apalis_diesel_postgres::{
Config, Error as PgError, PgPool, PgTask, PostgresStorage, build_pool_with, setup,
};
use diesel::{Connection, PgConnection, QueryableByName, RunQueryDsl, sql_query, sql_types::Text};
use ulid::Ulid;
const BUSINESS_TABLE: &str = "outbox_example_orders";
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct SendConfirmationEmail {
order_id: String,
to: String,
}
#[derive(QueryableByName, Debug)]
struct CountRow {
#[diesel(sql_type = diesel::sql_types::BigInt)]
n: i64,
}
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[tokio::main]
async fn main() -> Result<(), BoxError> {
let Ok(database_url) = std::env::var("DATABASE_URL") else {
eprintln!(
"set DATABASE_URL to a reachable PostgreSQL server before running this example, e.g.:\n\
DATABASE_URL=postgres://127.0.0.1:5432/apalis_diesel_postgres cargo run --example outbox"
);
return Ok(());
};
let backend_pool = build_pool_with(database_url.clone(), |builder| {
builder
.max_size(8)
.connection_timeout(Duration::from_secs(2))
})?;
let apalis_pool = build_pool_with(database_url, |builder| {
builder
.max_size(4)
.connection_timeout(Duration::from_secs(2))
})?;
setup(&apalis_pool).await?;
let queue = format!("outbox-example-{}", Ulid::new());
let storage = PostgresStorage::<SendConfirmationEmail>::new_with_config(
&apalis_pool,
&Config::new(&queue),
);
ensure_business_table(backend_pool.clone()).await?;
cleanup(&backend_pool, &apalis_pool, &queue).await?;
println!("queue: {queue}\n");
scenario_commit(&backend_pool, &storage, &queue).await?;
cleanup(&backend_pool, &apalis_pool, &queue).await?;
scenario_rollback(&backend_pool, &storage, &queue).await?;
cleanup(&backend_pool, &apalis_pool, &queue).await?;
scenario_idempotency_conflict(&backend_pool, &storage, &queue).await?;
cleanup(&backend_pool, &apalis_pool, &queue).await?;
Ok(())
}
async fn scenario_commit(
backend_pool: &PgPool,
storage: &PostgresStorage<SendConfirmationEmail>,
queue: &str,
) -> Result<(), BoxError> {
println!("scenario 1: commit");
let order_id = Ulid::new().to_string();
let task_id = tokio::task::spawn_blocking({
let storage = storage.clone();
let backend_pool = backend_pool.clone();
let order_id = order_id.clone();
move || -> Result<String, PgError> {
let mut conn = backend_pool.get().map_err(PgError::Pool)?;
let id = conn.transaction(|c| {
insert_order(c, &order_id, "alice@example.com")?;
storage.push_with_conn(
c,
SendConfirmationEmail {
order_id: order_id.clone(),
to: "alice@example.com".to_owned(),
},
)
})?;
Ok(id.to_string())
}
})
.await??;
println!(" enqueued task_id={task_id}");
println!(" order row id={order_id}");
print_counts(backend_pool, queue.to_owned()).await?;
println!();
Ok(())
}
async fn scenario_rollback(
backend_pool: &PgPool,
storage: &PostgresStorage<SendConfirmationEmail>,
queue: &str,
) -> Result<(), BoxError> {
println!("scenario 2: rollback");
let order_id = Ulid::new().to_string();
let outcome = tokio::task::spawn_blocking({
let storage = storage.clone();
let backend_pool = backend_pool.clone();
let order_id = order_id.clone();
move || -> Result<(), diesel::result::Error> {
let mut conn = backend_pool
.get()
.map_err(|_| diesel::result::Error::BrokenTransactionManager)?;
conn.transaction(|c| {
insert_order(c, &order_id, "bob@example.com")
.map_err(|_| diesel::result::Error::RollbackTransaction)?;
storage
.push_with_conn(
c,
SendConfirmationEmail {
order_id: order_id.clone(),
to: "bob@example.com".to_owned(),
},
)
.map_err(|_| diesel::result::Error::RollbackTransaction)?;
Err(diesel::result::Error::RollbackTransaction)
})
}
})
.await?;
match outcome {
Err(diesel::result::Error::RollbackTransaction) => {
println!(" closure returned Err → outer transaction rolled back");
}
other => panic!("unexpected outcome: {other:?}"),
}
print_counts(backend_pool, queue.to_owned()).await?;
println!();
Ok(())
}
async fn scenario_idempotency_conflict(
backend_pool: &PgPool,
storage: &PostgresStorage<SendConfirmationEmail>,
queue: &str,
) -> Result<(), BoxError> {
println!("scenario 3: idempotency conflict");
let order_id = Ulid::new().to_string();
let idempotency_key = format!("order:{order_id}");
{
let storage = storage.clone();
let backend_pool = backend_pool.clone();
let idempotency_key = idempotency_key.clone();
let order_id_for_seed = order_id.clone();
tokio::task::spawn_blocking(move || -> Result<(), PgError> {
let mut conn = backend_pool.get().map_err(PgError::Pool)?;
let mut task = PgTask::<SendConfirmationEmail>::new(SendConfirmationEmail {
order_id: order_id_for_seed,
to: "carol@example.com".to_owned(),
});
task.parts.idempotency_key = Some(idempotency_key);
conn.transaction(|c| storage.push_task_with_conn(c, task).map(|_| ()))
})
.await??;
}
let outcome = tokio::task::spawn_blocking({
let storage = storage.clone();
let backend_pool = backend_pool.clone();
let order_id = order_id.clone();
let idempotency_key = idempotency_key.clone();
move || -> Result<&'static str, PgError> {
let mut conn = backend_pool.get().map_err(PgError::Pool)?;
conn.transaction(|c| {
insert_order(c, &order_id, "carol@example.com")?;
let mut task = PgTask::<SendConfirmationEmail>::new(SendConfirmationEmail {
order_id: order_id.clone(),
to: "carol@example.com".to_owned(),
});
task.parts.idempotency_key = Some(idempotency_key);
match storage.push_task_with_conn(c, task) {
Ok(_) => Ok("no conflict observed"),
Err(PgError::IdempotencyConflict { .. }) => {
Ok("conflict surfaced; business write committed")
}
Err(other) => Err(other),
}
})
}
})
.await??;
println!(" outcome: {outcome}");
print_counts(backend_pool, queue.to_owned()).await?;
println!();
Ok(())
}
fn insert_order(
conn: &mut PgConnection,
id: &str,
email: &str,
) -> Result<(), diesel::result::Error> {
let sql = format!(
"INSERT INTO {BUSINESS_TABLE} (id, recipient) VALUES ($1, $2) \
ON CONFLICT (id) DO NOTHING"
);
sql_query(sql)
.bind::<Text, _>(id)
.bind::<Text, _>(email)
.execute(conn)
.map(|_| ())
}
async fn ensure_business_table(pool: PgPool) -> Result<(), BoxError> {
tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = pool.get()?;
sql_query(format!(
"CREATE TABLE IF NOT EXISTS {BUSINESS_TABLE} (
id TEXT PRIMARY KEY,
recipient TEXT NOT NULL
)"
))
.execute(&mut conn)?;
Ok(())
},
)
.await??;
Ok(())
}
async fn cleanup(backend_pool: &PgPool, apalis_pool: &PgPool, queue: &str) -> Result<(), BoxError> {
let queue = queue.to_owned();
let backend_pool = backend_pool.clone();
let apalis_pool = apalis_pool.clone();
tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = backend_pool.get()?;
sql_query(format!("DELETE FROM {BUSINESS_TABLE}")).execute(&mut conn)?;
let mut conn = apalis_pool.get()?;
sql_query("DELETE FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&queue)
.execute(&mut conn)?;
Ok(())
},
)
.await??;
Ok(())
}
async fn print_counts(pool: &PgPool, queue: String) -> Result<(), BoxError> {
let pool = pool.clone();
let (jobs, orders) = tokio::task::spawn_blocking(
move || -> Result<(i64, i64), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = pool.get()?;
let jobs =
sql_query("SELECT COUNT(*)::bigint AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&queue)
.get_result::<CountRow>(&mut conn)?
.n;
let orders = sql_query(format!(
"SELECT COUNT(*)::bigint AS n FROM {BUSINESS_TABLE}"
))
.get_result::<CountRow>(&mut conn)?
.n;
Ok((jobs, orders))
},
)
.await??;
println!(" apalis.jobs rows for this queue: {jobs}");
println!(" {BUSINESS_TABLE} rows total: {orders}");
Ok(())
}