#![cfg(feature = "tokio")]
mod support;
use std::{
str::FromStr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use apalis_core::backend::{ListQueues, ListWorkers};
use apalis_core::{
backend::{Backend, RegisterWorker, TaskSink, TaskSinkError},
error::BoxDynError,
task::{Task, attempt::Attempt, builder::TaskBuilder, status::Status, task_id::TaskId},
worker::{context::WorkerContext, ext::ack::Acknowledge},
};
use apalis_diesel_postgres::{
Config, Error as PgError, PgAck, PgContext, PgPool, PgTask, PgTaskId, PostgresStorage,
refresh_queue_stats_snapshot, setup, verify_schema,
};
use apalis_sql::{DateTime, DateTimeExt, context::SqlContext};
use diesel::{
Connection, PgConnection, QueryableByName, RunQueryDsl, sql_query,
sql_types::{BigInt, Integer, Jsonb, Nullable, Text, Timestamptz},
};
use futures::StreamExt;
use lets_expect::{AssertionError, AssertionResult, *};
use serde_json::Value;
use std::sync::Arc;
use ulid::Ulid;
#[derive(Debug)]
enum Outcome<T> {
Skipped,
Completed(T),
}
fn observe<T, F>(
label: &'static str,
body: F,
) -> impl Fn(&Result<Outcome<T>, String>) -> AssertionResult
where
F: Fn(&T) -> Result<(), String>,
{
move |result| match result {
Err(error) => Err(AssertionError::new(vec![format!(
"{label}: scenario failed: {error}"
)])),
Ok(Outcome::Skipped) => Ok(()),
Ok(Outcome::Completed(run)) => {
body(run).map_err(|reason| AssertionError::new(vec![format!("{label}: {reason}")]))
}
}
}
async fn test_pool() -> Result<Option<PgPool>, String> {
support::shared_pool().await
}
async fn with_conn<F, T>(pool: PgPool, work: F) -> Result<T, String>
where
F: FnOnce(&mut PgConnection) -> Result<T, String> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(move || {
let mut conn = pool.get().map_err(|e| e.to_string())?;
work(&mut conn)
})
.await
.map_err(|e| e.to_string())?
}
async fn cleanup_queue(pool: PgPool, queue: String) -> Result<(), String> {
with_conn(pool, move |conn| {
sql_query("DELETE FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&queue)
.execute(conn)
.map_err(|e| e.to_string())?;
sql_query("DELETE FROM apalis.workers WHERE worker_type = $1")
.bind::<Text, _>(&queue)
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await
}
fn now_unix() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before unix epoch")
.as_secs()
}
fn task_id() -> PgTaskId {
TaskId::from_str(&Ulid::new().to_string()).expect("generated ULID parses as task id")
}
fn task(
payload: &'static str,
run_at: u64,
attempts: usize,
max_attempts: i32,
) -> Task<String, PgContext, Ulid> {
TaskBuilder::new(payload.to_owned())
.with_task_id(task_id())
.run_at_timestamp(run_at)
.with_attempt(Attempt::new_with_value(attempts))
.with_ctx(SqlContext::new().with_max_attempts(max_attempts))
.build()
}
async fn next_task(
stream: &mut (
impl futures::Stream<Item = Result<Option<PgTask<String>>, apalis_diesel_postgres::Error>>
+ Unpin
),
) -> Result<PgTask<String>, String> {
let deadline = Duration::from_secs(5);
loop {
let item = tokio::time::timeout(deadline, stream.next())
.await
.map_err(|_| "timed out waiting for a task".to_owned())?
.ok_or_else(|| "task stream ended".to_owned())?
.map_err(|e| e.to_string())?;
if let Some(task) = item {
return Ok(task);
}
}
}
#[derive(Debug)]
struct FailedRetryRun {
polled_payload: Option<String>,
polled_attempts: usize,
}
async fn insert_failed_task(
pool: PgPool,
queue: String,
attempts: i32,
max_attempts: i32,
) -> Result<PgTaskId, String> {
let id = Ulid::new();
let job = serde_json::to_vec("retry-me").map_err(|e| e.to_string())?;
let task_id = TaskId::from_str(&id.to_string()).map_err(|e| e.to_string())?;
with_conn(pool, move |conn| {
sql_query(
"INSERT INTO apalis.jobs (
id, job_type, job, status, attempts, max_attempts, run_at, last_result
) VALUES ($1, $2, $3, 'Failed', $4, $5, now() - INTERVAL '1 second', '{\"Err\":\"boom\"}'::jsonb)",
)
.bind::<Text, _>(id.to_string())
.bind::<Text, _>(queue)
.bind::<diesel::sql_types::Binary, _>(job)
.bind::<Integer, _>(attempts)
.bind::<Integer, _>(max_attempts)
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await?;
Ok(task_id)
}
async fn run_failed_retry(retryable: bool) -> Result<Outcome<FailedRetryRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-failed-retry-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let (attempts, max_attempts) = if retryable { (1, 3) } else { (3, 3) };
insert_failed_task(pool.clone(), queue.clone(), attempts, max_attempts).await?;
let storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let worker = WorkerContext::new::<()>(&format!("spec-failed-retry-worker-{queue}"));
let mut stream = storage.poll(&worker);
let polled = tokio::time::timeout(Duration::from_secs(3), async {
let mut polled: Option<PgTask<String>> = None;
for _ in 0..6 {
match tokio::time::timeout(Duration::from_millis(800), next_task(&mut stream)).await {
Ok(Ok(t)) => {
polled = Some(t);
break;
}
_ => continue,
}
}
polled
})
.await
.unwrap_or(None);
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(FailedRetryRun {
polled_attempts: polled
.as_ref()
.map(|t| t.parts.attempt.current())
.unwrap_or(0),
polled_payload: polled.map(|t| t.args),
}))
}
fn failed_retry_reclaims_row()
-> impl Fn(&Result<Outcome<FailedRetryRun>, String>) -> AssertionResult {
observe::<FailedRetryRun, _>("failed retry reclaim", |run| match &run.polled_payload {
Some(v) if v == "retry-me" => Ok(()),
Some(other) => Err(format!(
"expected to reclaim retryable Failed row, got {other:?}"
)),
None => Err("expected fetch_next to reclaim Failed row below max_attempts".into()),
})
}
fn failed_retry_preserves_attempt_count()
-> impl Fn(&Result<Outcome<FailedRetryRun>, String>) -> AssertionResult {
observe::<FailedRetryRun, _>("failed retry attempts", |run| {
if run.polled_payload.is_none() {
return Ok(()); }
if run.polled_attempts == 1 {
Ok(())
} else {
Err(format!(
"expected reclaimed task to carry attempts=1, got {}",
run.polled_attempts
))
}
})
}
fn failed_exhausted_not_reclaimed()
-> impl Fn(&Result<Outcome<FailedRetryRun>, String>) -> AssertionResult {
observe::<FailedRetryRun, _>("failed exhausted skip", |run| {
if run.polled_payload.is_none() {
Ok(())
} else {
Err("expected exhausted Failed row to remain hidden from fetch_next".into())
}
})
}
#[derive(Debug)]
struct ConcurrentRegisterRun {
first_ok: bool,
second_ok: bool,
row_count: i64,
}
#[derive(Debug, diesel::QueryableByName)]
struct CountRow {
#[diesel(sql_type = diesel::sql_types::BigInt)]
count: i64,
}
async fn run_concurrent_admin_register() -> Result<Outcome<ConcurrentRegisterRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-concurrent-register-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let worker_id = format!("spec-concurrent-worker-{queue}");
let mut a = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let mut b = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let id_a = worker_id.clone();
let id_b = worker_id.clone();
let (ra, rb) = tokio::join!(a.register_worker(id_a), b.register_worker(id_b));
let q = queue.clone();
let count = with_conn(pool.clone(), move |conn| {
sql_query("SELECT COUNT(*) AS count FROM apalis.workers WHERE worker_type = $1")
.bind::<Text, _>(q)
.load::<CountRow>(conn)
.map_err(|e| e.to_string())?
.into_iter()
.next()
.map(|r| r.count)
.ok_or_else(|| "count query returned no rows".to_owned())
})
.await?;
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(ConcurrentRegisterRun {
first_ok: ra.is_ok(),
second_ok: rb.is_ok(),
row_count: count,
}))
}
fn concurrent_admin_register_both_succeed()
-> impl Fn(&Result<Outcome<ConcurrentRegisterRun>, String>) -> AssertionResult {
observe::<ConcurrentRegisterRun, _>("concurrent admin register", |run| {
if run.first_ok && run.second_ok {
Ok(())
} else {
Err(format!(
"expected both admin RegisterWorker calls to succeed (UPSERT semantics), got first_ok={} second_ok={}",
run.first_ok, run.second_ok
))
}
})
}
fn concurrent_admin_register_creates_single_row()
-> impl Fn(&Result<Outcome<ConcurrentRegisterRun>, String>) -> AssertionResult {
observe::<ConcurrentRegisterRun, _>("concurrent admin register row count", |run| {
if run.row_count == 1 {
Ok(())
} else {
Err(format!(
"expected ON CONFLICT DO UPDATE to keep exactly one workers row, got {}",
run.row_count
))
}
})
}
#[derive(Debug)]
struct TwoWorkerRaceRun {
total: usize,
duplicates: usize,
}
async fn run_two_worker_race() -> Result<Outcome<TwoWorkerRaceRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-race-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let mut producer =
PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue).set_buffer_size(8));
let n = 8usize;
for i in 0..n {
let payload: &'static str = Box::leak(format!("race-{i}").into_boxed_str());
producer
.push_task(task(payload, now_unix() - 1, 0, 25))
.await
.map_err(|e| e.to_string())?;
}
let storage_a =
PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue).set_buffer_size(4));
let storage_b =
PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue).set_buffer_size(4));
let worker_a = WorkerContext::new::<()>(&format!("spec-race-a-{queue}"));
let worker_b = WorkerContext::new::<()>(&format!("spec-race-b-{queue}"));
let collect = |storage: PostgresStorage<String>, worker: WorkerContext| async move {
let mut out = Vec::new();
let mut stream = storage.poll(&worker);
let deadline = Duration::from_secs(3);
let started = std::time::Instant::now();
while started.elapsed() < deadline && out.len() < 16 {
match tokio::time::timeout(Duration::from_millis(500), stream.next()).await {
Ok(Some(Ok(Some(t)))) => out.push(t.args),
Ok(Some(Ok(None))) => continue,
Ok(Some(Err(_))) => break,
Ok(None) => break,
Err(_) => continue,
}
}
out
};
let (a_args, b_args) =
tokio::join!(collect(storage_a, worker_a), collect(storage_b, worker_b),);
let mut all = a_args;
all.extend(b_args);
let mut sorted = all.clone();
sorted.sort();
let mut duplicates = 0;
for w in sorted.windows(2) {
if w[0] == w[1] {
duplicates += 1;
}
}
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(TwoWorkerRaceRun {
total: all.len(),
duplicates,
}))
}
fn two_workers_share_set_without_duplicates()
-> impl Fn(&Result<Outcome<TwoWorkerRaceRun>, String>) -> AssertionResult {
observe::<TwoWorkerRaceRun, _>("two-worker race", |run| {
if run.duplicates == 0 && run.total >= 1 {
Ok(())
} else {
Err(format!(
"expected SKIP LOCKED to keep deliveries disjoint, got total={} duplicates={}",
run.total, run.duplicates
))
}
})
}
#[allow(dead_code)]
fn _force_status_import() -> Status {
Status::Pending
}
#[derive(Debug)]
struct RefreshSnapshotRun {
refresh_result: Result<(), String>,
populated_after: bool,
}
async fn run_refresh_unpopulated_snapshot() -> Result<Outcome<RefreshSnapshotRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
with_conn(pool.clone(), |conn| {
sql_query("REFRESH MATERIALIZED VIEW apalis.queue_stats_snapshot WITH NO DATA")
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await?;
let refresh_result = refresh_queue_stats_snapshot(&pool)
.await
.map_err(|e| e.to_string());
let populated_after = with_conn(pool.clone(), |conn| {
sql_query(
"SELECT ispopulated AS populated
FROM pg_matviews
WHERE schemaname = 'apalis' AND matviewname = 'queue_stats_snapshot'",
)
.load::<PopulatedRow>(conn)
.map_err(|e| e.to_string())
.map(|rows| rows.first().map(|r| r.populated).unwrap_or(false))
})
.await?;
Ok(Outcome::Completed(RefreshSnapshotRun {
refresh_result,
populated_after,
}))
}
#[derive(Debug, diesel::QueryableByName)]
struct PopulatedRow {
#[diesel(sql_type = diesel::sql_types::Bool)]
populated: bool,
}
fn refresh_unpopulated_snapshot_succeeds()
-> impl Fn(&Result<Outcome<RefreshSnapshotRun>, String>) -> AssertionResult {
observe::<RefreshSnapshotRun, _>("refresh unpopulated snapshot", |run| {
run.refresh_result
.as_ref()
.map(|_| ())
.map_err(|err| format!("expected refresh to succeed on unpopulated matview, got {err}"))
})
}
fn refresh_unpopulated_snapshot_populates()
-> impl Fn(&Result<Outcome<RefreshSnapshotRun>, String>) -> AssertionResult {
observe::<RefreshSnapshotRun, _>("refresh populates matview", |run| {
if run.refresh_result.is_err() {
return Ok(()); }
if run.populated_after {
Ok(())
} else {
Err("matview should be populated after a successful blocking refresh".into())
}
})
}
#[derive(Debug)]
struct UnlistenRun {
channels_after_drop: Vec<String>,
}
#[derive(Debug, diesel::QueryableByName)]
struct ChannelRow {
#[diesel(sql_type = Text)]
channel: String,
}
async fn run_unlisten_after_drop() -> Result<Outcome<UnlistenRun>, String> {
let Some(url) = support::database_url_or_skip()? else {
return Ok(Outcome::Skipped);
};
let pool = apalis_diesel_postgres::build_pool_with(&url, |b| b.max_size(1))
.map_err(|e| e.to_string())?;
setup(&pool).await.map_err(|e| e.to_string())?;
let queue = format!("apalis-spec-unlisten-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
{
let storage = PostgresStorage::<String>::new_with_notify(&pool, &Config::new(&queue));
let worker = WorkerContext::new::<()>(&format!("spec-unlisten-worker-{queue}"));
let mut stream = storage.poll(&worker);
let _ = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
drop(stream);
}
tokio::time::sleep(Duration::from_millis(500)).await;
let channels_after_drop = with_conn(pool.clone(), |conn| {
sql_query("SELECT pg_listening_channels()::text AS channel")
.load::<ChannelRow>(conn)
.map(|rows| rows.into_iter().map(|r| r.channel).collect::<Vec<_>>())
.map_err(|e| e.to_string())
})
.await?;
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(UnlistenRun {
channels_after_drop,
}))
}
fn no_stale_listen_subscription_after_drop()
-> impl Fn(&Result<Outcome<UnlistenRun>, String>) -> AssertionResult {
observe::<UnlistenRun, _>("UNLISTEN after drop", |run| {
let stale: Vec<_> = run
.channels_after_drop
.iter()
.filter(|c| c.contains("apalis::job::insert"))
.collect();
if stale.is_empty() {
Ok(())
} else {
Err(format!(
"expected pg_listening_channels() to be free of the apalis subscription after NotifyTaskIds drop, got {:?}",
run.channels_after_drop
))
}
})
}
#[derive(Debug)]
struct LockedWorkersRun {
workers_on_active_queue: Vec<String>,
}
async fn run_locked_workers_excludes_terminal() -> Result<Outcome<LockedWorkersRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-locked-workers-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let active_worker = format!("active-{queue}");
let done_worker = format!("done-{queue}");
let failed_worker = format!("failed-{queue}");
let killed_worker = format!("killed-{queue}");
let q = queue.clone();
let aw = active_worker.clone();
let dw = done_worker.clone();
let fw = failed_worker.clone();
let kw = killed_worker.clone();
with_conn(pool.clone(), move |conn| {
for wid in [aw.as_str(), dw.as_str(), fw.as_str(), kw.as_str()] {
sql_query(
"INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen, started_at)
VALUES ($1, $2, 'PostgresStorage', '', now(), now())
ON CONFLICT (id, worker_type) DO NOTHING",
)
.bind::<Text, _>(wid)
.bind::<Text, _>(&q)
.execute(conn)
.map_err(|e| e.to_string())?;
}
for (status, lock_by, attempts, last_result_sql) in [
("Running", aw.as_str(), 1, "NULL"),
("Done", dw.as_str(), 1, "'{\"Ok\":\"ok\"}'::jsonb"),
("Failed", fw.as_str(), 3, "'{\"Err\":\"err\"}'::jsonb"),
("Killed", kw.as_str(), 3, "'{\"Err\":\"k\"}'::jsonb"),
] {
let id = Ulid::new().to_string();
let sql = format!(
"INSERT INTO apalis.jobs (
id, job_type, job, status, attempts, max_attempts, run_at, lock_by, lock_at, last_result, done_at
) VALUES (
'{id}', $1, '\\x00'::bytea, '{status}', {attempts}, 3,
now() - INTERVAL '5 seconds', $2, now() - INTERVAL '5 seconds',
{last_result_sql},
CASE WHEN '{status}' IN ('Done','Failed','Killed') THEN now() ELSE NULL END
)"
);
sql_query(sql)
.bind::<Text, _>(&q)
.bind::<Text, _>(lock_by)
.execute(conn)
.map_err(|e| e.to_string())?;
}
Ok(())
})
.await?;
let storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let queues = storage.list_queues().await.map_err(|e| e.to_string())?;
let workers_on_active_queue = queues
.into_iter()
.find(|q| q.name == queue)
.map(|q| q.workers)
.unwrap_or_default();
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(LockedWorkersRun {
workers_on_active_queue,
}))
}
fn locked_workers_shows_active_only()
-> impl Fn(&Result<Outcome<LockedWorkersRun>, String>) -> AssertionResult {
observe::<LockedWorkersRun, _>("locked workers excludes terminal", |run| {
let mut has_active = false;
let mut has_terminal = false;
for w in &run.workers_on_active_queue {
if w.starts_with("active-") {
has_active = true;
}
if w.starts_with("done-") || w.starts_with("failed-") || w.starts_with("killed-") {
has_terminal = true;
}
}
if !has_active {
return Err(format!(
"expected list_queues.workers to include the Running worker, got {:?}",
run.workers_on_active_queue
));
}
if has_terminal {
return Err(format!(
"expected locks on Done/Failed/Killed jobs to be filtered out, got {:?}",
run.workers_on_active_queue
));
}
Ok(())
})
}
#[derive(Debug)]
struct ListWorkersBeyond100Run {
returned: usize,
inserted: usize,
}
async fn run_list_workers_beyond_100() -> Result<Outcome<ListWorkersBeyond100Run>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-list-workers-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let inserted = 110usize;
let q = queue.clone();
with_conn(pool.clone(), move |conn| {
for i in 0..inserted {
let id = format!("w-{i:03}-{q}");
sql_query(
"INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen, started_at)
VALUES ($1, $2, 'PostgresStorage', '', now(), now())",
)
.bind::<Text, _>(&id)
.bind::<Text, _>(&q)
.execute(conn)
.map_err(|e| e.to_string())?;
}
Ok(())
})
.await?;
let storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let workers = storage.list_workers().await.map_err(|e| e.to_string())?;
let returned = workers.len();
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(ListWorkersBeyond100Run {
returned,
inserted,
}))
}
fn list_workers_returns_every_row()
-> impl Fn(&Result<Outcome<ListWorkersBeyond100Run>, String>) -> AssertionResult {
observe::<ListWorkersBeyond100Run, _>("list_workers >100", |run| {
if run.returned == run.inserted {
Ok(())
} else {
Err(format!(
"expected list_workers to return all {} workers, got {}",
run.inserted, run.returned
))
}
})
}
#[derive(Debug)]
struct RegistrationGateRun {
items_seen: usize,
saw_already_registered_error: bool,
stream_ended: bool,
}
async fn run_registration_gate_blocks_fetcher() -> Result<Outcome<RegistrationGateRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-reggate-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let worker_id = format!("spec-reggate-worker-{queue}");
let q = queue.clone();
let wid = worker_id.clone();
with_conn(pool.clone(), move |conn| {
sql_query(
"INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen, started_at, lease_token)
VALUES ($1, $2, 'PostgresStorage', '', now(), now(), $3)",
)
.bind::<Text, _>(&wid)
.bind::<Text, _>(&q)
.bind::<Text, _>(format!("incumbent-{}", Ulid::new()))
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await?;
let mut producer =
PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue).set_buffer_size(1));
use apalis_core::backend::TaskSink;
producer
.push_task(task("must-not-dequeue", now_unix() - 1, 0, 25))
.await
.map_err(|e| e.to_string())?;
let storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let worker = WorkerContext::new::<()>(&worker_id);
let mut stream = storage.poll(&worker);
let mut items_seen = 0usize;
let mut saw_already_registered_error = false;
let mut stream_ended = false;
let deadline = std::time::Instant::now() + Duration::from_secs(4);
while std::time::Instant::now() < deadline {
let next = tokio::time::timeout(Duration::from_millis(800), stream.next()).await;
match next {
Err(_) => continue,
Ok(None) => {
stream_ended = true;
break;
}
Ok(Some(item)) => {
items_seen += 1;
if let Err(err) = item
&& matches!(err, PgError::AlreadyRegistered(_))
{
saw_already_registered_error = true;
}
}
}
}
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(RegistrationGateRun {
items_seen,
saw_already_registered_error,
stream_ended,
}))
}
fn registration_gate_emits_error_then_ends()
-> impl Fn(&Result<Outcome<RegistrationGateRun>, String>) -> AssertionResult {
observe::<RegistrationGateRun, _>("registration gate", |run| {
if !run.saw_already_registered_error {
return Err(format!(
"expected the AlreadyRegistered error to surface on the stream, items_seen={}",
run.items_seen
));
}
if !run.stream_ended {
return Err("expected the stream to terminate after the registration error".into());
}
if run.items_seen > 1 {
return Err(format!(
"expected exactly one item (the registration error) before stream end, got {}",
run.items_seen
));
}
Ok(())
})
}
#[derive(Debug)]
struct VerifySchemaRun {
applied_result: Result<(), String>,
pending_result: Result<(), String>,
}
async fn run_verify_schema() -> Result<Outcome<VerifySchemaRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let applied_result = verify_schema(&pool).await.map_err(|e| e.to_string());
let url = support::database_url_or_skip()?
.ok_or("DATABASE_URL disappeared between the pool build and the verify branch")?;
let pending_result = verify_pending_branch_on_temp_db(&url).await?;
Ok(Outcome::Completed(VerifySchemaRun {
applied_result,
pending_result,
}))
}
fn swap_database_name(url: &str, database: &str) -> Result<String, String> {
let scheme_end = url.find("://").ok_or("database URL has no scheme")? + 3;
let rest = &url[scheme_end..];
let path_start = rest.find('/').ok_or("database URL has no path")?;
let authority = &rest[..path_start];
let after_path = &rest[path_start + 1..];
let query = after_path.find('?').map(|q| &after_path[q..]).unwrap_or("");
Ok(format!(
"{}{authority}/{database}{query}",
&url[..scheme_end]
))
}
async fn verify_pending_branch_on_temp_db(
maintenance_url: &str,
) -> Result<Result<(), String>, String> {
let db_name = format!(
"apalis_verify_pending_{}",
Ulid::new().to_string().to_lowercase()
);
let provisioned = {
let maintenance_url = maintenance_url.to_owned();
let db_name = db_name.clone();
tokio::task::spawn_blocking(move || -> Result<bool, String> {
let mut conn = PgConnection::establish(&maintenance_url).map_err(|e| e.to_string())?;
#[derive(diesel::QueryableByName)]
struct Flag {
#[diesel(sql_type = diesel::sql_types::Bool)]
rolcreatedb: bool,
}
let can_create =
sql_query("SELECT rolcreatedb FROM pg_roles WHERE rolname = current_user")
.load::<Flag>(&mut conn)
.map_err(|e| e.to_string())?
.into_iter()
.next()
.map(|row| row.rolcreatedb)
.unwrap_or(false);
if !can_create {
return Ok(false);
}
sql_query(format!("CREATE DATABASE \"{db_name}\""))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(true)
})
.await
.map_err(|e| e.to_string())??
};
if !provisioned {
return Ok(Ok(()));
}
let temp_url = match swap_database_name(maintenance_url, &db_name) {
Ok(url) => url,
Err(_) => {
drop_temp_database(maintenance_url, &db_name).await;
return Ok(Ok(()));
}
};
let outcome: Result<Result<(), String>, String> = async {
let temp_pool = apalis_diesel_postgres::build_pool_with(&temp_url, |builder| {
builder.max_size(1).min_idle(Some(0))
})
.map_err(|e| e.to_string())?;
let on_temp_db = {
let expected = db_name.clone();
with_conn(temp_pool.clone(), move |conn| {
#[derive(diesel::QueryableByName)]
struct Db {
#[diesel(sql_type = Text)]
db: String,
}
let actual = sql_query("SELECT current_database()::text AS db")
.load::<Db>(conn)
.map_err(|e| e.to_string())?
.into_iter()
.next()
.map(|row| row.db)
.ok_or_else(|| "current_database() returned no row".to_owned())?;
Ok(actual == expected)
})
.await?
};
if !on_temp_db {
return Ok(Ok(()));
}
setup(&temp_pool).await.map_err(|e| e.to_string())?;
with_conn(temp_pool.clone(), |conn| {
sql_query(
"DELETE FROM __diesel_schema_migrations \
WHERE version = ( \
SELECT version FROM __diesel_schema_migrations \
ORDER BY version DESC LIMIT 1)",
)
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await?;
Ok(match verify_schema(&temp_pool).await {
Err(_) => Ok(()),
Ok(()) => Err("verify_schema returned Ok despite a missing migration row".into()),
})
}
.await;
drop_temp_database(maintenance_url, &db_name).await;
outcome
}
async fn drop_temp_database(maintenance_url: &str, db_name: &str) {
let maintenance_url = maintenance_url.to_owned();
let db_name = db_name.to_owned();
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut conn) = PgConnection::establish(&maintenance_url) {
let _ = sql_query(format!(
"DROP DATABASE IF EXISTS \"{db_name}\" WITH (FORCE)"
))
.execute(&mut conn);
}
})
.await;
}
fn verify_schema_accepts_a_fully_applied_database()
-> impl Fn(&Result<Outcome<VerifySchemaRun>, String>) -> AssertionResult {
observe::<VerifySchemaRun, _>("verify_schema applied", |run| {
run.applied_result
.as_ref()
.map(|_| ())
.map_err(|e| format!("expected Ok on an applied schema, got {e}"))
})
}
fn verify_schema_rejects_a_database_with_unrecorded_migrations()
-> impl Fn(&Result<Outcome<VerifySchemaRun>, String>) -> AssertionResult {
observe::<VerifySchemaRun, _>("verify_schema pending", |run| {
run.pending_result
.as_ref()
.map(|_| ())
.map_err(|e| e.to_string())
})
}
fn verify_schema_records_both_branches()
-> impl Fn(&Result<Outcome<VerifySchemaRun>, String>) -> AssertionResult {
let applied = verify_schema_accepts_a_fully_applied_database();
let pending = verify_schema_rejects_a_database_with_unrecorded_migrations();
move |result| {
applied(result)?;
pending(result)?;
Ok(())
}
}
#[derive(Debug)]
struct PartialBatchRun {
conflict: Option<(Vec<String>, usize)>,
other_error: Option<String>,
final_count: i64,
}
async fn run_partial_batch_conflict() -> Result<Outcome<PartialBatchRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-batch-conflict-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let mut seed_storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let seed = TaskBuilder::new("seed".to_owned())
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(SqlContext::new().with_max_attempts(5))
.with_idempotency_key("shared-key")
.build();
seed_storage
.push_task(seed)
.await
.map_err(|e| e.to_string())?;
let config = Config::new(&queue).set_buffer_size(3);
let mut batch_storage = PostgresStorage::<String>::new_with_config(&pool, &config);
let batch: Vec<Task<String, PgContext, Ulid>> = (0..3)
.map(|i| {
TaskBuilder::new(format!("dup-{i}"))
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(PgContext::new().with_max_attempts(5))
.with_idempotency_key("shared-key")
.build()
})
.collect();
let stream = futures::stream::iter(batch);
let push_result = batch_storage.push_all(stream).await;
let q = queue.clone();
let final_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
let (conflict, other_error) = match push_result {
Ok(()) => (None, None),
Err(TaskSinkError::PushError(PgError::IdempotencyConflict {
conflicting_keys,
total,
..
})) => (Some((conflicting_keys, total)), None),
Err(other) => (None, Some(other.to_string())),
};
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(PartialBatchRun {
conflict,
other_error,
final_count,
}))
}
fn partial_batch_rejects_with_count()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("partial-batch reject", |run| {
match (run.conflict.as_ref(), run.other_error.as_deref()) {
(Some((keys, total)), _)
if keys.len() == 1 && keys[0] == "shared-key" && *total == 3 =>
{
Ok(())
}
(Some((keys, total)), _) => Err(format!(
"expected conflicting_keys=[shared-key], total=3, got keys={keys:?}, total={total}"
)),
(None, Some(other)) => Err(format!(
"expected Error::IdempotencyConflict, got {other:?}"
)),
(None, None) => Err(
"expected push_all to be rejected when every task in the batch conflicts".into(),
),
}
})
}
fn partial_batch_rolls_back_inserts()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("partial-batch rollback", |run| {
if run.final_count == 1 {
Ok(())
} else {
Err(format!(
"expected exactly the seed row to remain (1), got {} rows",
run.final_count
))
}
})
}
async fn run_mixed_batch_conflict() -> Result<Outcome<PartialBatchRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-mixed-conflict-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let mut seed_storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let seed = TaskBuilder::new("seed".to_owned())
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(SqlContext::new().with_max_attempts(5))
.with_idempotency_key("shared-key")
.build();
seed_storage
.push_task(seed)
.await
.map_err(|e| e.to_string())?;
let config = Config::new(&queue).set_buffer_size(3);
let mut batch_storage = PostgresStorage::<String>::new_with_config(&pool, &config);
let keys = ["fresh-a", "shared-key", "fresh-b"];
let batch: Vec<Task<String, PgContext, Ulid>> = keys
.into_iter()
.enumerate()
.map(|(i, key)| {
TaskBuilder::new(format!("mixed-{i}"))
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(PgContext::new().with_max_attempts(5))
.with_idempotency_key(key)
.build()
})
.collect();
let stream = futures::stream::iter(batch);
let push_result = batch_storage.push_all(stream).await;
let q = queue.clone();
let final_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
let (conflict, other_error) = match push_result {
Ok(()) => (None, None),
Err(TaskSinkError::PushError(PgError::IdempotencyConflict {
conflicting_keys,
total,
..
})) => (Some((conflicting_keys, total)), None),
Err(other) => (None, Some(other.to_string())),
};
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(PartialBatchRun {
conflict,
other_error,
final_count,
}))
}
fn mixed_batch_reports_only_the_duplicate()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("mixed-batch reject key", |run| {
match (run.conflict.as_ref(), run.other_error.as_deref()) {
(Some((keys, total)), _)
if keys.len() == 1 && keys[0] == "shared-key" && *total == 3 =>
{
Ok(())
}
(Some((keys, total)), _) => Err(format!(
"expected conflicting_keys=[shared-key], total=3, got keys={keys:?}, total={total}"
)),
(None, Some(other)) => Err(format!(
"expected Error::IdempotencyConflict, got {other:?}"
)),
(None, None) => {
Err("expected the mixed batch to be rejected by the one duplicate".into())
}
}
})
}
fn mixed_batch_rolls_back_the_fresh_rows_too()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("mixed-batch all-or-nothing", |run| {
if run.final_count == 1 {
Ok(())
} else {
Err(format!(
"expected only the seed to survive (1) — the fresh rows must roll back with the batch — got {} rows",
run.final_count
))
}
})
}
async fn run_intrabatch_dup_with_nulls() -> Result<Outcome<PartialBatchRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-intrabatch-nulls-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let config = Config::new(&queue).set_buffer_size(4);
let mut batch_storage = PostgresStorage::<String>::new_with_config(&pool, &config);
let specs: [Option<&str>; 4] = [Some("dup-key"), None, Some("dup-key"), Some("unique-key")];
let batch: Vec<Task<String, PgContext, Ulid>> = specs
.into_iter()
.enumerate()
.map(|(i, key)| {
let builder = TaskBuilder::new(format!("nb-{i}"))
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(PgContext::new().with_max_attempts(5));
match key {
Some(k) => builder.with_idempotency_key(k).build(),
None => builder.build(),
}
})
.collect();
let stream = futures::stream::iter(batch);
let push_result = batch_storage.push_all(stream).await;
let q = queue.clone();
let final_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
let (conflict, other_error) = match push_result {
Ok(()) => (None, None),
Err(TaskSinkError::PushError(PgError::IdempotencyConflict {
conflicting_keys,
total,
..
})) => (Some((conflicting_keys, total)), None),
Err(other) => (None, Some(other.to_string())),
};
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(PartialBatchRun {
conflict,
other_error,
final_count,
}))
}
fn intrabatch_reports_only_the_repeated_key()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("intra-batch reject key", |run| {
match (run.conflict.as_ref(), run.other_error.as_deref()) {
(Some((keys, total)), _) if keys.len() == 1 && keys[0] == "dup-key" && *total == 4 => {
Ok(())
}
(Some((keys, total)), _) => Err(format!(
"expected conflicting_keys=[dup-key] (NULL row excluded), total=4, got keys={keys:?}, total={total}"
)),
(None, Some(other)) => Err(format!(
"expected Error::IdempotencyConflict, got {other:?}"
)),
(None, None) => Err(
"expected the intra-batch duplicate to be rejected even with no pre-existing row"
.into(),
),
}
})
}
fn intrabatch_dup_rolls_back_the_whole_batch()
-> impl Fn(&Result<Outcome<PartialBatchRun>, String>) -> AssertionResult {
observe::<PartialBatchRun, _>("intra-batch all-or-nothing", |run| {
if run.final_count == 0 {
Ok(())
} else {
Err(format!(
"expected an empty queue (0) — every row, even the non-colliding ones, must roll back — got {} rows",
run.final_count
))
}
})
}
#[derive(Debug)]
struct MetadataCapRun {
push_error: Option<String>,
row_present: bool,
}
async fn run_metadata_cap(meta_payload_len: usize) -> Result<Outcome<MetadataCapRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-metadata-cap-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let mut meta = serde_json::Map::new();
let value_len = meta_payload_len.saturating_sub(16).max(1);
meta.insert(
"payload".to_owned(),
serde_json::Value::String("x".repeat(value_len)),
);
let ctx = PgContext::new().with_max_attempts(5).with_meta(meta);
let task = TaskBuilder::new("metadata-cap-target".to_owned())
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(ctx)
.build();
let mut storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let push_result = storage.push_task(task).await;
let q = queue.clone();
let row_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(MetadataCapRun {
push_error: push_result.err().map(|e| e.to_string()),
row_present: row_count > 0,
}))
}
fn metadata_cap_succeeds() -> impl Fn(&Result<Outcome<MetadataCapRun>, String>) -> AssertionResult {
observe::<MetadataCapRun, _>("metadata under cap", |run| {
if let Some(err) = &run.push_error {
Err(format!(
"expected push to succeed under the cap, got error: {err}"
))
} else if !run.row_present {
Err("expected the row to land in apalis.jobs after a successful push".into())
} else {
Ok(())
}
})
}
fn metadata_cap_rejects() -> impl Fn(&Result<Outcome<MetadataCapRun>, String>) -> AssertionResult {
observe::<MetadataCapRun, _>("metadata over cap", |run| match run.push_error.as_deref() {
Some(msg) if msg.contains("metadata") && msg.contains("cap") => Ok(()),
Some(other) => Err(format!(
"expected InvalidArgument citing the metadata cap, got {other:?}"
)),
None => Err("expected push to be rejected for oversize metadata".into()),
})
}
fn metadata_cap_persists_nothing()
-> impl Fn(&Result<Outcome<MetadataCapRun>, String>) -> AssertionResult {
observe::<MetadataCapRun, _>("metadata cap row absent", |run| {
if run.row_present {
Err("expected no apalis.jobs row after a rejected oversize push".into())
} else {
Ok(())
}
})
}
#[derive(Debug)]
struct IdempotencyCapRun {
push_error: Option<String>,
row_present: bool,
}
async fn run_idempotency_cap(key_len: usize) -> Result<Outcome<IdempotencyCapRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-idem-cap-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let ctx = PgContext::new().with_max_attempts(5);
let task = TaskBuilder::new("idempotency-cap-target".to_owned())
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(ctx)
.with_idempotency_key("k".repeat(key_len))
.build();
let mut storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let push_result = storage.push_task(task).await;
let q = queue.clone();
let row_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(IdempotencyCapRun {
push_error: push_result.err().map(|e| e.to_string()),
row_present: row_count > 0,
}))
}
fn idempotency_cap_succeeds()
-> impl Fn(&Result<Outcome<IdempotencyCapRun>, String>) -> AssertionResult {
observe::<IdempotencyCapRun, _>("idempotency under cap", |run| {
if let Some(err) = &run.push_error {
Err(format!(
"expected push to succeed under the cap, got error: {err}"
))
} else if !run.row_present {
Err("expected the row to land in apalis.jobs after a successful push".into())
} else {
Ok(())
}
})
}
fn idempotency_cap_rejects()
-> impl Fn(&Result<Outcome<IdempotencyCapRun>, String>) -> AssertionResult {
observe::<IdempotencyCapRun, _>("idempotency over cap", |run| {
match run.push_error.as_deref() {
Some(msg) if msg.contains("idempotency_key") && msg.contains("cap") => Ok(()),
Some(other) => Err(format!(
"expected InvalidArgument citing the idempotency_key cap, got {other:?}"
)),
None => Err("expected push to be rejected for oversize idempotency_key".into()),
}
})
}
fn idempotency_cap_persists_nothing()
-> impl Fn(&Result<Outcome<IdempotencyCapRun>, String>) -> AssertionResult {
observe::<IdempotencyCapRun, _>("idempotency cap row absent", |run| {
if run.row_present {
Err("expected no apalis.jobs row after a rejected oversize push".into())
} else {
Ok(())
}
})
}
#[derive(Debug)]
struct QueueNameCapRun {
push_error: Option<String>,
row_present: bool,
queue: String,
}
async fn run_queue_name_cap(name_len: usize) -> Result<Outcome<QueueNameCapRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let prefix = format!("q-{}-", Ulid::new());
let pad = name_len.saturating_sub(prefix.len());
let queue = format!("{prefix}{}", "x".repeat(pad));
cleanup_queue(pool.clone(), queue.clone()).await?;
let ctx = PgContext::new().with_max_attempts(5);
let task = TaskBuilder::new("queue-name-cap-target".to_owned())
.with_task_id(task_id())
.run_at_timestamp(now_unix())
.with_attempt(Attempt::new_with_value(0))
.with_ctx(ctx)
.build();
let mut storage = PostgresStorage::<String>::new_with_config(&pool, &Config::new(&queue));
let push_result = storage.push_task(task).await;
let q = queue.clone();
let row_count: i64 = with_conn(pool.clone(), move |conn| {
#[derive(QueryableByName)]
struct C {
#[diesel(sql_type = BigInt)]
n: i64,
}
sql_query("SELECT COUNT(*) AS n FROM apalis.jobs WHERE job_type = $1")
.bind::<Text, _>(&q)
.get_result::<C>(conn)
.map(|c| c.n)
.map_err(|e| e.to_string())
})
.await?;
cleanup_queue(pool, queue.clone()).await?;
Ok(Outcome::Completed(QueueNameCapRun {
push_error: push_result.err().map(|e| e.to_string()),
row_present: row_count > 0,
queue,
}))
}
fn queue_name_cap_succeeds() -> impl Fn(&Result<Outcome<QueueNameCapRun>, String>) -> AssertionResult
{
observe::<QueueNameCapRun, _>("queue name under cap", |run| {
if let Some(err) = &run.push_error {
Err(format!(
"expected push to succeed under the cap (queue length {}), got error: {err}",
run.queue.len()
))
} else if !run.row_present {
Err("expected the row to land in apalis.jobs after a successful push".into())
} else {
Ok(())
}
})
}
fn queue_name_cap_rejects() -> impl Fn(&Result<Outcome<QueueNameCapRun>, String>) -> AssertionResult
{
observe::<QueueNameCapRun, _>("queue name over cap", |run| {
match run.push_error.as_deref() {
Some(msg) if msg.contains("queue name") && msg.contains("cap") => Ok(()),
Some(other) => Err(format!(
"expected InvalidArgument citing the queue name cap, got {other:?}"
)),
None => Err("expected push to be rejected for oversize queue name".into()),
}
})
}
fn queue_name_cap_persists_nothing()
-> impl Fn(&Result<Outcome<QueueNameCapRun>, String>) -> AssertionResult {
observe::<QueueNameCapRun, _>("queue name cap row absent", |run| {
if run.row_present {
Err("expected no apalis.jobs row after a rejected oversize push".into())
} else {
Ok(())
}
})
}
#[derive(Debug, QueryableByName)]
struct AckStatusRow {
#[diesel(sql_type = Text)]
status: String,
#[diesel(sql_type = Integer)]
attempts: i32,
#[diesel(sql_type = Nullable<Jsonb>)]
last_result: Option<Value>,
}
#[derive(Debug)]
struct AckPredicateRun {
ack_error: Option<String>,
row_status: String,
row_attempts: i32,
row_last_result: Option<Value>,
}
async fn job_status_row(pool: PgPool, id: PgTaskId) -> Result<AckStatusRow, String> {
let id_s = id.to_string();
with_conn(pool, move |conn| {
sql_query("SELECT status, attempts, last_result FROM apalis.jobs WHERE id = $1")
.bind::<Text, _>(&id_s)
.get_result::<AckStatusRow>(conn)
.map_err(|e| e.to_string())
})
.await
}
async fn insert_running_row(
pool: PgPool,
queue: String,
worker_id: String,
attempts: i32,
max_attempts: i32,
lock_at: DateTime,
) -> Result<PgTaskId, String> {
let id = Ulid::new();
let task_id = TaskId::from_str(&id.to_string()).map_err(|e| e.to_string())?;
let job = serde_json::to_vec("ack-target").map_err(|e| e.to_string())?;
with_conn(pool, move |conn| {
sql_query(
"INSERT INTO apalis.jobs (
id, job_type, job, status, attempts, max_attempts, run_at, lock_by, lock_at
) VALUES ($1, $2, $3, 'Running', $4, $5, now() - INTERVAL '1 second', $6, $7)",
)
.bind::<Text, _>(id.to_string())
.bind::<Text, _>(&queue)
.bind::<diesel::sql_types::Binary, _>(job)
.bind::<Integer, _>(attempts)
.bind::<Integer, _>(max_attempts)
.bind::<Text, _>(&worker_id)
.bind::<Timestamptz, _>(lock_at)
.execute(conn)
.map_err(|e| e.to_string())?;
Ok(())
})
.await?;
Ok(task_id)
}
async fn insert_worker_row(
pool: PgPool,
queue: String,
worker_id: String,
lease_token: Option<String>,
) -> Result<(), String> {
with_conn(pool, move |conn| {
match lease_token {
Some(token) => {
sql_query(
"INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen, started_at, lease_token)
VALUES ($1, $2, 'PostgresStorage', '', now(), now(), $3)",
)
.bind::<Text, _>(&worker_id)
.bind::<Text, _>(&queue)
.bind::<Text, _>(&token)
.execute(conn)
.map_err(|e| e.to_string())?;
}
None => {
sql_query(
"INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen, started_at)
VALUES ($1, $2, 'PostgresStorage', '', now(), now())",
)
.bind::<Text, _>(&worker_id)
.bind::<Text, _>(&queue)
.execute(conn)
.map_err(|e| e.to_string())?;
}
}
Ok(())
})
.await
}
#[derive(Debug, Clone, Copy)]
struct AckSetup {
pgack_token: Option<&'static str>,
workers_token: Option<&'static str>,
lock_at_delta_secs: i64,
override_queue: Option<&'static str>,
attempt_delta: i64,
fabricate_unknown_task_id: bool,
}
const ACK_OK: AckSetup = AckSetup {
pgack_token: None,
workers_token: None,
lock_at_delta_secs: 0,
override_queue: None,
attempt_delta: 0,
fabricate_unknown_task_id: false,
};
async fn run_ack_predicate(setup: AckSetup) -> Result<Outcome<AckPredicateRun>, String> {
let Some(pool) = test_pool().await? else {
return Ok(Outcome::Skipped);
};
let queue = format!("apalis-spec-ack-pred-{}", Ulid::new());
cleanup_queue(pool.clone(), queue.clone()).await?;
let worker_id = format!("spec-ack-pred-worker-{queue}");
let stored_lock_at_secs = now_unix() as i64;
let stored_lock_at = <DateTime as DateTimeExt>::from_unix_timestamp(stored_lock_at_secs);
insert_worker_row(
pool.clone(),
queue.clone(),
worker_id.clone(),
setup.workers_token.map(str::to_owned),
)
.await?;
let id = insert_running_row(
pool.clone(),
queue.clone(),
worker_id.clone(),
0,
2,
stored_lock_at,
)
.await?;
let attempt_value = (1i64 + setup.attempt_delta).max(0) as usize;
let parts_lock_at = stored_lock_at_secs + setup.lock_at_delta_secs;
let parts_lock_by = worker_id.clone();
let parts_queue = setup
.override_queue
.map(str::to_owned)
.unwrap_or_else(|| queue.clone());
let parts_task_id = if setup.fabricate_unknown_task_id {
TaskId::from_str(&Ulid::new().to_string()).map_err(|e| e.to_string())?
} else {
id
};
let parts = TaskBuilder::new(())
.with_task_id(parts_task_id)
.with_attempt(Attempt::new_with_value(attempt_value))
.with_ctx(
PgContext::new()
.with_max_attempts(2)
.with_queue(parts_queue)
.with_lock_at(Some(parts_lock_at))
.with_lock_by(Some(parts_lock_by)),
)
.build()
.parts;
let mut ack = match setup.pgack_token {
Some(t) => PgAck::with_lease_token(pool.clone(), Arc::<str>::from(t)),
None => PgAck::new(pool.clone()),
};
let result: Result<String, BoxDynError> = Ok("processed".to_owned());
let ack_result = ack.ack(&result, &parts).await;
let row = job_status_row(pool.clone(), id).await?;
cleanup_queue(pool, queue).await?;
Ok(Outcome::Completed(AckPredicateRun {
ack_error: ack_result.err().map(|e| e.to_string()),
row_status: row.status,
row_attempts: row.attempts,
row_last_result: row.last_result,
}))
}
fn ack_succeeds() -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult {
observe::<AckPredicateRun, _>("ack predicate", |run| {
if let Some(err) = &run.ack_error {
Err(format!("expected ack to succeed, got error: {err}"))
} else {
Ok(())
}
})
}
fn ack_writes_done() -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult {
observe::<AckPredicateRun, _>("ack writes Done", |run| {
if run.row_status == "Done" {
Ok(())
} else {
Err(format!("expected row Status=Done, got {}", run.row_status))
}
})
}
fn ack_persists_result() -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult {
observe::<AckPredicateRun, _>("ack writes last_result", |run| match &run.row_last_result {
Some(_) => Ok(()),
None => Err("expected last_result to be populated after successful ack".into()),
})
}
fn ack_rejected_as_stale() -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult
{
observe::<AckPredicateRun, _>("ack rejected", |run| match run.ack_error.as_deref() {
Some(msg) if msg.contains("stale acknowledgement") => Ok(()),
Some(other) => Err(format!(
"expected stale acknowledgement error, got {other:?}"
)),
None => Err("expected ack to be rejected as stale, but it succeeded".into()),
})
}
fn ack_row_stays_running() -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult
{
observe::<AckPredicateRun, _>("row stays Running", |run| {
if run.row_status == "Running" {
Ok(())
} else {
Err(format!(
"expected row to remain Running on rejection, got {}",
run.row_status
))
}
})
}
fn ack_row_keeps_null_last_result()
-> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult {
observe::<AckPredicateRun, _>("row keeps NULL last_result", |run| {
if run.row_last_result.is_none() {
Ok(())
} else {
Err("expected last_result to remain NULL after rejected ack".into())
}
})
}
fn ack_row_attempts(
expected: i32,
) -> impl Fn(&Result<Outcome<AckPredicateRun>, String>) -> AssertionResult {
observe::<AckPredicateRun, _>("row attempts", move |run| {
if run.row_attempts == expected {
Ok(())
} else {
Err(format!(
"expected attempts={expected} after the call, got {}",
run.row_attempts
))
}
})
}
lets_expect! { #tokio_test
expect(run_failed_retry(retryable).await) {
let retryable = true;
when a_failed_row_still_has_attempts_remaining {
to is_reclaimed_by_fetch_next { failed_retry_reclaims_row() }
to preserves_the_persisted_attempt_count { failed_retry_preserves_attempt_count() }
}
when a_failed_row_has_exhausted_its_attempts {
let retryable = false;
to is_not_reclaimed_by_fetch_next { failed_exhausted_not_reclaimed() }
}
}
expect(run_concurrent_admin_register().await) {
when two_admin_register_worker_calls_race_on_the_same_id {
to both_succeed_via_upsert_semantics {
concurrent_admin_register_both_succeed()
}
to leaves_exactly_one_workers_row {
concurrent_admin_register_creates_single_row()
}
}
}
expect(run_two_worker_race().await) {
when two_workers_poll_the_same_queue_concurrently {
to deliver_disjoint_payloads_thanks_to_for_update_skip_locked {
two_workers_share_set_without_duplicates()
}
}
}
expect(run_refresh_unpopulated_snapshot().await) {
when refresh_runs_against_a_freshly_created_with_no_data_matview {
to falls_back_to_a_blocking_refresh_and_succeeds {
refresh_unpopulated_snapshot_succeeds()
}
to leaves_the_matview_populated_for_subsequent_callers {
refresh_unpopulated_snapshot_populates()
}
}
}
expect(run_unlisten_after_drop().await) {
when notify_task_ids_is_dropped_and_the_connection_returns_to_the_pool {
to leaves_no_apalis_subscription_on_the_returned_connection {
no_stale_listen_subscription_after_drop()
}
}
}
expect(run_locked_workers_excludes_terminal().await) {
when terminal_jobs_still_carry_a_lock_by_value {
to omits_them_from_the_active_workers_column {
locked_workers_shows_active_only()
}
}
}
expect(run_list_workers_beyond_100().await) {
when more_than_one_hundred_workers_are_registered_for_the_queue {
to returns_every_row_without_a_hidden_limit {
list_workers_returns_every_row()
}
}
}
expect(run_registration_gate_blocks_fetcher().await) {
when the_initial_heartbeat_fails_with_already_registered {
to yields_the_registration_error_and_terminates_without_dequeue {
registration_gate_emits_error_then_ends()
}
}
}
expect(run_verify_schema().await) {
when verify_schema_is_called_against_a_freshly_migrated_database {
to records_both_branches_of_the_pending_predicate {
verify_schema_records_both_branches()
}
}
}
expect(run_partial_batch_conflict().await) {
when a_buffered_batch_collides_on_a_shared_idempotency_key {
to surfaces_an_idempotency_conflict_with_the_rejected_count {
partial_batch_rejects_with_count()
}
to rolls_back_every_partial_insertion_in_the_batch {
partial_batch_rolls_back_inserts()
}
}
}
expect(run_mixed_batch_conflict().await) {
when a_batch_mixes_fresh_keys_with_one_duplicate {
to reports_only_the_duplicate_as_rejected {
mixed_batch_reports_only_the_duplicate()
}
to rolls_back_the_fresh_rows_with_the_whole_batch {
mixed_batch_rolls_back_the_fresh_rows_too()
}
}
}
expect(run_intrabatch_dup_with_nulls().await) {
when a_batch_repeats_a_key_and_interleaves_a_null_key_with_no_seed {
to reports_only_the_repeated_key_excluding_the_null {
intrabatch_reports_only_the_repeated_key()
}
to rolls_back_every_row_leaving_the_queue_empty {
intrabatch_dup_rolls_back_the_whole_batch()
}
}
}
expect(run_metadata_cap(meta_payload_len).await) {
let meta_payload_len = 1024usize;
when the_metadata_serialization_length_is_well_below_the_cap {
to accepts_the_push_and_persists_the_row { metadata_cap_succeeds() }
}
when the_metadata_serialization_length_sits_just_below_the_eight_kib_cap {
let meta_payload_len = 8000usize;
to accepts_the_push_and_persists_the_row { metadata_cap_succeeds() }
}
when the_metadata_serialization_length_is_exactly_at_the_eight_kib_cap {
let meta_payload_len = 8194usize;
to accepts_the_push_and_persists_the_row { metadata_cap_succeeds() }
}
when the_metadata_serialization_length_is_one_byte_over_the_eight_kib_cap {
let meta_payload_len = 8195usize;
to rejects_the_push_with_invalid_argument { metadata_cap_rejects() }
to does_not_persist_the_apalis_jobs_row { metadata_cap_persists_nothing() }
}
when the_metadata_serialization_length_exceeds_the_eight_kib_cap {
let meta_payload_len = 16384usize;
to rejects_the_push_with_invalid_argument { metadata_cap_rejects() }
to does_not_persist_the_apalis_jobs_row { metadata_cap_persists_nothing() }
}
}
expect(run_idempotency_cap(key_len).await) {
let key_len = 36usize;
when the_idempotency_key_is_a_typical_short_uuid {
to accepts_the_push_and_persists_the_row { idempotency_cap_succeeds() }
}
when the_idempotency_key_sits_at_the_one_kib_cap_boundary {
let key_len = 1024usize;
to accepts_the_push_and_persists_the_row { idempotency_cap_succeeds() }
}
when the_idempotency_key_exceeds_the_one_kib_cap {
let key_len = 4096usize;
to rejects_the_push_with_invalid_argument { idempotency_cap_rejects() }
to does_not_persist_the_apalis_jobs_row { idempotency_cap_persists_nothing() }
}
}
expect(run_queue_name_cap(name_len).await) {
let name_len = 64usize;
when the_queue_name_is_a_typical_namespaced_identifier {
to accepts_the_push_and_persists_the_row { queue_name_cap_succeeds() }
}
when the_queue_name_sits_at_the_two_hundred_fifty_five_byte_cap {
let name_len = 255usize;
to accepts_the_push_and_persists_the_row { queue_name_cap_succeeds() }
}
when the_queue_name_exceeds_the_two_hundred_fifty_five_byte_cap {
let name_len = 1024usize;
to rejects_the_push_with_invalid_argument { queue_name_cap_rejects() }
to does_not_persist_the_apalis_jobs_row { queue_name_cap_persists_nothing() }
}
}
expect(run_ack_predicate(setup).await) {
let setup = ACK_OK;
when called_without_a_lease_token_on_a_matching_running_row {
to marks_the_row_done { ack_writes_done() }
to persists_the_serialized_result { ack_persists_result() }
to returns_ok { ack_succeeds() }
}
when called_with_a_lease_token_that_matches_the_workers_row {
let setup = AckSetup {
pgack_token: Some("matching-token"),
workers_token: Some("matching-token"),
..ACK_OK
};
to marks_the_row_done { ack_writes_done() }
to persists_the_serialized_result { ack_persists_result() }
to returns_ok { ack_succeeds() }
}
when called_with_a_lease_token_that_does_not_match_the_workers_row {
let setup = AckSetup {
pgack_token: Some("caller-token"),
workers_token: Some("other-token"),
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
to does_not_increment_attempts { ack_row_attempts(0) }
}
when called_with_a_lease_token_but_the_workers_row_has_null_lease_token {
let setup = AckSetup {
pgack_token: Some("caller-token"),
workers_token: None,
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
}
when the_callers_lock_at_disagrees_with_the_stored_row {
let setup = AckSetup {
lock_at_delta_secs: 1,
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
}
when the_callers_started_attempts_disagrees_with_the_stored_row {
let setup = AckSetup {
attempt_delta: 5,
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
}
when the_callers_task_id_does_not_exist_in_the_jobs_table {
let setup = AckSetup {
fabricate_unknown_task_id: true,
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_original_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
}
when the_callers_queue_disagrees_with_the_stored_row {
let setup = AckSetup {
override_queue: Some("apalis-spec-ack-pred-wrong-queue"),
..ACK_OK
};
to is_rejected_as_a_stale_acknowledgement { ack_rejected_as_stale() }
to leaves_the_row_in_running_state { ack_row_stays_running() }
to does_not_write_last_result { ack_row_keeps_null_last_result() }
}
}
}