Skip to main content

apalis_diesel_postgres/
error.rs

1use std::borrow::Cow;
2
3use apalis_core::error::BoxDynError;
4
5/// Error type returned by the Diesel PostgreSQL backend.
6#[derive(Debug, thiserror::Error)]
7#[non_exhaustive]
8pub enum Error {
9    /// Diesel query failed while running a named backend operation.
10    #[error("database error while {operation}: {source}{hint}", hint = database_hint(source))]
11    Database {
12        /// Backend operation that was running when Diesel returned the error.
13        operation: Cow<'static, str>,
14        /// Original Diesel error.
15        #[source]
16        source: diesel::result::Error,
17    },
18
19    /// Acquiring a pooled connection failed.
20    #[error(
21        "failed to acquire PostgreSQL connection from r2d2 pool: {0}; check that DATABASE_URL points to a reachable PostgreSQL server and that the pool has enough connections"
22    )]
23    Pool(#[from] diesel::r2d2::PoolError),
24
25    /// A blocking runtime task failed to complete.
26    #[error("blocking task failed: {0}")]
27    Blocking(#[source] BoxDynError),
28
29    /// Database migrations failed.
30    #[error("failed to run embedded migrations: {0}")]
31    Migration(#[source] BoxDynError),
32
33    /// A task row could not be converted into an Apalis task.
34    #[error("failed to convert database row into an Apalis task: {0}")]
35    Row(#[source] BoxDynError),
36
37    /// A caller-supplied argument was out of range for the backend.
38    #[error("invalid argument: {0}")]
39    InvalidArgument(String),
40
41    /// One or more tasks in an enqueue batch collided with the
42    /// `(job_type, idempotency_key)` unique constraint.
43    ///
44    /// The **whole batch is rolled back, not only the colliding rows**: the
45    /// duplicates are dropped by `ON CONFLICT DO NOTHING`, then the post-insert
46    /// check rolls the batch's SAVEPOINT back, so every task in the batch is
47    /// undone — including the non-conflicting ones. A surrounding (outer)
48    /// transaction stays alive, so the caller can decide whether to commit the
49    /// rest of its work or roll back. `conflicting_keys` lists exactly which
50    /// `idempotency_key`s collided, so a batch caller can drop them and
51    /// re-enqueue the rest. Match this variant instead of the
52    /// [`Error::InvalidArgument`] message text to tell a benign duplicate apart
53    /// from a real failure.
54    #[error(
55        "idempotency_key conflict in queue `{job_type}`: keys {conflicting_keys:?} collided with the unique constraint; {total} task(s) in the batch were all rolled back"
56    )]
57    IdempotencyConflict {
58        /// Queue (`job_type`) the conflicting batch targeted.
59        job_type: String,
60        /// The distinct `idempotency_key`s that collided — either against an
61        /// already-stored row or another task in the same batch.
62        conflicting_keys: Vec<String>,
63        /// Total tasks in the batch; all of them were rolled back.
64        total: usize,
65    },
66
67    /// A task payload or task result could not be decoded.
68    #[error("failed to decode task payload or result with the configured codec: {0}")]
69    Decode(#[source] BoxDynError),
70
71    /// JSON encoding or decoding failed.
72    #[error("json error: {0}")]
73    Json(#[from] serde_json::Error),
74
75    /// A required task field was missing.
76    #[error(
77        "task metadata is missing required field `{0}`; this usually means the task did not go through the expected poll/lock/ack lifecycle"
78    )]
79    MissingField(&'static str),
80
81    /// A worker or queue registration already exists.
82    #[error("worker registration already exists or is being registered concurrently: {0}")]
83    AlreadyRegistered(String),
84
85    /// A task could not be locked because it was absent or not currently lockable.
86    #[error("task not found while {operation} (task_id: {task_id}, queue: {queue}); {hint}")]
87    TaskNotFound {
88        /// Backend operation that failed.
89        operation: Cow<'static, str>,
90        /// Task id involved in the operation.
91        task_id: String,
92        /// Queue involved in the operation, or a placeholder when unconstrained.
93        queue: String,
94        /// Human-readable next step.
95        hint: &'static str,
96    },
97
98    /// A task acknowledgement no longer matches the stored lock state.
99    #[error(
100        "stale acknowledgement for task {task_id} in queue {queue} by worker {worker_id}; the task is no longer Running with the same lock owner, attempt, and lock timestamp"
101    )]
102    StaleAcknowledgement {
103        /// Task id involved in the acknowledgement.
104        task_id: String,
105        /// Queue involved in the acknowledgement.
106        queue: String,
107        /// Worker id involved in the acknowledgement.
108        worker_id: String,
109    },
110
111    /// A worker heartbeat could not be recorded because the worker row is absent.
112    #[error(
113        "worker not registered while {operation} (worker_id: {worker_id}, queue: {queue}); {hint}"
114    )]
115    WorkerNotRegistered {
116        /// Backend operation that failed.
117        operation: Cow<'static, str>,
118        /// Worker id involved in the operation.
119        worker_id: String,
120        /// Queue involved in the operation.
121        queue: String,
122        /// Human-readable next step.
123        hint: &'static str,
124    },
125
126    /// PostgreSQL notification listener failed.
127    #[error(
128        "PostgreSQL notification listener failed: {0}; polling fallback can still fetch jobs, but LISTEN/NOTIFY wakeups are disabled until the stream is recreated"
129    )]
130    NotifyListener(String),
131
132    /// A sink producer attempted to send without observing backpressure.
133    #[error("sink buffer is full; call poll_ready before start_send (capacity: {0})")]
134    SinkBufferFull(usize),
135}
136
137// `diesel::Connection::transaction` requires the closure error type to be
138// `From<diesel::result::Error>` so its transaction_manager can lift begin /
139// commit / rollback failures (`connection/transaction_manager.rs`) into our
140// error type. **Inside the closure**, every Diesel call should still use
141// `.map_err(Error::database("specific op"))` so the operation label reflects
142// the failing statement; this `From` only fires when an unhandled
143// `diesel::result::Error` reaches `transaction()` itself (begin/commit/
144// rollback, or an inner statement whose `?` was not explicitly mapped). The
145// generic label below makes the fallback path unambiguous in logs.
146impl From<diesel::result::Error> for Error {
147    fn from(source: diesel::result::Error) -> Self {
148        Self::Database {
149            operation: Cow::Borrowed(
150                "diesel transaction begin/commit/rollback (unlabeled — use map_err inside the closure)",
151            ),
152            source,
153        }
154    }
155}
156
157impl Error {
158    pub(crate) fn database(
159        operation: impl Into<Cow<'static, str>>,
160    ) -> impl FnOnce(diesel::result::Error) -> Self {
161        let operation = operation.into();
162        move |source| Self::Database { operation, source }
163    }
164
165    pub(crate) fn task_not_found(
166        operation: impl Into<Cow<'static, str>>,
167        task_id: impl Into<String>,
168        queue: Option<String>,
169        hint: &'static str,
170    ) -> Self {
171        Self::TaskNotFound {
172            operation: operation.into(),
173            task_id: task_id.into(),
174            queue: queue.unwrap_or_else(|| "<not constrained>".to_owned()),
175            hint,
176        }
177    }
178
179    pub(crate) fn stale_acknowledgement(
180        task_id: impl Into<String>,
181        queue: impl Into<String>,
182        worker_id: impl Into<String>,
183    ) -> Self {
184        Self::StaleAcknowledgement {
185            task_id: task_id.into(),
186            queue: queue.into(),
187            worker_id: worker_id.into(),
188        }
189    }
190
191    pub(crate) fn worker_not_registered(
192        operation: impl Into<Cow<'static, str>>,
193        worker_id: impl Into<String>,
194        queue: impl Into<String>,
195        hint: &'static str,
196    ) -> Self {
197        Self::WorkerNotRegistered {
198            operation: operation.into(),
199            worker_id: worker_id.into(),
200            queue: queue.into(),
201            hint,
202        }
203    }
204
205    pub(crate) fn idempotency_conflict(
206        job_type: impl Into<String>,
207        conflicting_keys: Vec<String>,
208        total: usize,
209    ) -> Self {
210        Self::IdempotencyConflict {
211            job_type: job_type.into(),
212            conflicting_keys,
213            total,
214        }
215    }
216}
217
218fn database_hint(error: &diesel::result::Error) -> &'static str {
219    use diesel::result::Error as DieselError;
220    match error {
221        // Locale-independent: diesel maps undefined_table errors into NotFound
222        // variants for queries that expect a result; but structured DatabaseError
223        // matches happen here. Prefer `table_name()` and `constraint_name()`
224        // (locale-independent) over `message()` substring matching, which fails
225        // on non-English PostgreSQL servers.
226        DieselError::DatabaseError(_, info) => {
227            if matches!(info.table_name(), Some(name) if name == "jobs")
228                && matches!(
229                    info.constraint_name(),
230                    Some(name)
231                        if name == "jobs_lock_by_worker_type_fkey"
232                            || name == "jobs_lock_by_fkey"
233                )
234            {
235                return "; register the worker for this queue before locking or acknowledging jobs";
236            }
237            // Fallback: message-based detection for installations where neither
238            // table_name nor constraint_name is populated (e.g. when the
239            // relation itself does not yet exist).
240            let message = info.message();
241            if message.contains("apalis.jobs")
242                && (message.contains("does not exist") || message.contains("relation"))
243            {
244                "; run apalis_diesel_postgres::setup(&pool).await before using the storage"
245            } else if message.contains("foreign key")
246                || message.contains("jobs_lock_by_worker_type_fkey")
247                || message.contains("jobs_lock_by_fkey")
248            {
249                "; register the worker for this queue before locking or acknowledging jobs"
250            } else {
251                ""
252            }
253        }
254        _ => "",
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use diesel::result::{DatabaseErrorInformation, DatabaseErrorKind, Error as DieselError};
262    use lets_expect::{AssertionError, AssertionResult, *};
263    use std::error::Error as StdError;
264
265    struct StubInfo {
266        message: &'static str,
267        table_name: Option<&'static str>,
268        constraint_name: Option<&'static str>,
269    }
270
271    impl DatabaseErrorInformation for StubInfo {
272        fn message(&self) -> &str {
273            self.message
274        }
275        fn details(&self) -> Option<&str> {
276            None
277        }
278        fn hint(&self) -> Option<&str> {
279            None
280        }
281        fn table_name(&self) -> Option<&str> {
282            self.table_name
283        }
284        fn column_name(&self) -> Option<&str> {
285            None
286        }
287        fn constraint_name(&self) -> Option<&str> {
288            self.constraint_name
289        }
290        fn statement_position(&self) -> Option<i32> {
291            None
292        }
293    }
294
295    fn database_error_with(
296        message: &'static str,
297        table_name: Option<&'static str>,
298        constraint_name: Option<&'static str>,
299    ) -> DieselError {
300        DieselError::DatabaseError(
301            DatabaseErrorKind::Unknown,
302            Box::new(StubInfo {
303                message,
304                table_name,
305                constraint_name,
306            }),
307        )
308    }
309
310    fn hint_for(
311        message: &'static str,
312        table_name: Option<&'static str>,
313        constraint_name: Option<&'static str>,
314    ) -> &'static str {
315        database_hint(&database_error_with(message, table_name, constraint_name))
316    }
317
318    fn non_database_hint() -> &'static str {
319        database_hint(&DieselError::NotFound)
320    }
321
322    fn json_error() -> serde_json::Error {
323        serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
324    }
325
326    fn boxed_error(message: &'static str) -> BoxDynError {
327        Box::new(std::io::Error::other(message))
328    }
329
330    fn database_error() -> Error {
331        Error::Database {
332            operation: Cow::Borrowed("fetching jobs"),
333            source: diesel::result::Error::NotFound,
334        }
335    }
336
337    fn displays_as(expected: &'static str) -> impl Fn(&Error) -> AssertionResult {
338        move |error| {
339            let actual = error.to_string();
340            if actual == expected {
341                Ok(())
342            } else {
343                Err(AssertionError::new(vec![format!(
344                    "expected display {expected:?}, got {actual:?}"
345                )]))
346            }
347        }
348    }
349
350    fn has_source_containing(expected: &'static str) -> impl Fn(&Error) -> AssertionResult {
351        move |error| match StdError::source(error) {
352            Some(source) if source.to_string().contains(expected) => Ok(()),
353            Some(source) => Err(AssertionError::new(vec![format!(
354                "expected source containing {expected:?}, got {:?}",
355                source.to_string()
356            )])),
357            None => Err(AssertionError::new(vec![format!(
358                "expected source containing {expected:?}, got no source"
359            )])),
360        }
361    }
362
363    fn has_no_source(error: &Error) -> AssertionResult {
364        match StdError::source(error) {
365            None => Ok(()),
366            Some(source) => Err(AssertionError::new(vec![format!(
367                "expected no source, got {:?}",
368                source.to_string()
369            )])),
370        }
371    }
372
373    fn is_task_not_found(error: &Error) -> AssertionResult {
374        match error {
375            Error::TaskNotFound {
376                operation,
377                task_id,
378                queue,
379                ..
380            } if *operation == "locking task" && task_id == "task-1" && queue == "queue-1" => {
381                Ok(())
382            }
383            other => Err(AssertionError::new(vec![format!(
384                "expected task not found, got {other:?}"
385            )])),
386        }
387    }
388
389    fn is_idempotency_conflict(error: &Error) -> AssertionResult {
390        match error {
391            Error::IdempotencyConflict {
392                job_type,
393                conflicting_keys,
394                total,
395            } if job_type == "emails"
396                && conflicting_keys.len() == 2
397                && conflicting_keys[0] == "k-1"
398                && conflicting_keys[1] == "k-2"
399                && *total == 3 =>
400            {
401                Ok(())
402            }
403            other => Err(AssertionError::new(vec![format!(
404                "expected idempotency conflict {{emails, [k-1, k-2], 3}}, got {other:?}"
405            )])),
406        }
407    }
408
409    lets_expect! {
410        expect(database_error()) {
411            to displays_the_operation_context { displays_as("database error while fetching jobs: Record not found") }
412            to exposes_the_database_error_as_the_source { has_source_containing("Record not found") }
413        }
414
415        expect(Error::Blocking(boxed_error("join cancelled"))) {
416            to displays_the_blocking_error { displays_as("blocking task failed: join cancelled") }
417            to exposes_the_blocking_error_as_the_source { has_source_containing("join cancelled") }
418        }
419
420        expect(Error::Migration(boxed_error("missing migration"))) {
421            to displays_the_migration_error { displays_as("failed to run embedded migrations: missing migration") }
422            to exposes_the_migration_error_as_the_source { has_source_containing("missing migration") }
423        }
424
425        expect(Error::Row(boxed_error("bad task row"))) {
426            to displays_the_row_conversion_error { displays_as("failed to convert database row into an Apalis task: bad task row") }
427            to exposes_the_row_error_as_the_source { has_source_containing("bad task row") }
428        }
429
430        expect(Error::Decode(boxed_error("bad payload"))) {
431            to displays_the_decode_error { displays_as("failed to decode task payload or result with the configured codec: bad payload") }
432            to exposes_the_decode_error_as_the_source { has_source_containing("bad payload") }
433        }
434
435        expect(Error::Json(json_error())) {
436            to displays_the_json_error { displays_as("json error: expected ident at line 1 column 2") }
437            to exposes_the_json_error_as_the_source { has_source_containing("expected ident") }
438        }
439
440        expect(Error::MissingField("run_at")) {
441            to displays_the_missing_field_error { displays_as("task metadata is missing required field `run_at`; this usually means the task did not go through the expected poll/lock/ack lifecycle") }
442            to has_no_error_source { has_no_source }
443        }
444
445        expect(Error::AlreadyRegistered("worker-1".to_string())) {
446            to displays_the_registration_error { displays_as("worker registration already exists or is being registered concurrently: worker-1") }
447            to has_no_error_source { has_no_source }
448        }
449
450        expect(Error::idempotency_conflict(
451            "emails",
452            vec!["k-1".to_owned(), "k-2".to_owned()],
453            3,
454        )) {
455            to identifies_the_conflicting_queue_and_keys { is_idempotency_conflict }
456            to displays_the_conflict_and_rollback_semantics { displays_as("idempotency_key conflict in queue `emails`: keys [\"k-1\", \"k-2\"] collided with the unique constraint; 3 task(s) in the batch were all rolled back") }
457            to has_no_error_source { has_no_source }
458        }
459
460        expect(Error::task_not_found(
461            "locking task",
462            "task-1",
463            Some("queue-1".to_owned()),
464            "the task may be delayed, already locked by another worker, completed, or in another queue",
465        )) {
466            to returns_a_contextual_task_not_found_error { is_task_not_found }
467            to displays_the_next_step { displays_as("task not found while locking task (task_id: task-1, queue: queue-1); the task may be delayed, already locked by another worker, completed, or in another queue") }
468        }
469
470        expect(Error::stale_acknowledgement("task-1", "queue-1", "worker-1")) {
471            to displays_the_ack_conflict { displays_as("stale acknowledgement for task task-1 in queue queue-1 by worker worker-1; the task is no longer Running with the same lock owner, attempt, and lock timestamp") }
472        }
473
474        expect(Error::worker_not_registered(
475            "updating worker heartbeat",
476            "worker-1",
477            "queue-1",
478            "recreate the worker stream so registration can run again",
479        )) {
480            to displays_the_worker_registration_problem { displays_as("worker not registered while updating worker heartbeat (worker_id: worker-1, queue: queue-1); recreate the worker stream so registration can run again") }
481        }
482
483        expect(Error::NotifyListener("LISTEN failed".to_owned())) {
484            to displays_the_notify_degradation { displays_as("PostgreSQL notification listener failed: LISTEN failed; polling fallback can still fetch jobs, but LISTEN/NOTIFY wakeups are disabled until the stream is recreated") }
485        }
486
487        expect(Error::SinkBufferFull(1)) {
488            to displays_the_sink_buffer_error { displays_as("sink buffer is full; call poll_ready before start_send (capacity: 1)") }
489            to has_no_error_source { has_no_source }
490        }
491
492        expect(non_database_hint()) {
493            when diesel_error_is_not_a_database_variant {
494                to returns_no_hint { equal("") }
495            }
496        }
497
498        expect(hint_for(message, table_name, constraint_name)) {
499            let message = "irrelevant";
500            let table_name: Option<&'static str> = None;
501            let constraint_name: Option<&'static str> = None;
502
503            when structured_info_points_at_the_worker_type_foreign_key {
504                let table_name = Some("jobs");
505                let constraint_name = Some("jobs_lock_by_worker_type_fkey");
506                to recommends_registering_the_worker {
507                    equal(
508                        "; register the worker for this queue before locking or acknowledging jobs",
509                    )
510                }
511            }
512
513            when structured_info_points_at_the_legacy_lock_by_foreign_key {
514                let table_name = Some("jobs");
515                let constraint_name = Some("jobs_lock_by_fkey");
516                to recommends_registering_the_worker_via_the_legacy_constraint {
517                    equal(
518                        "; register the worker for this queue before locking or acknowledging jobs",
519                    )
520                }
521            }
522
523            when message_indicates_a_missing_apalis_jobs_relation_with_does_not_exist {
524                let message = "relation \"apalis.jobs\" does not exist";
525                to recommends_running_setup {
526                    equal(
527                        "; run apalis_diesel_postgres::setup(&pool).await before using the storage",
528                    )
529                }
530            }
531
532            when message_indicates_a_missing_apalis_jobs_relation_via_the_word_relation {
533                let message = "missing relation apalis.jobs from schema";
534                to recommends_running_setup_via_the_relation_match {
535                    equal(
536                        "; run apalis_diesel_postgres::setup(&pool).await before using the storage",
537                    )
538                }
539            }
540
541            when message_mentions_a_generic_foreign_key_violation {
542                let message = "foreign key constraint violated";
543                to recommends_registering_the_worker_via_message {
544                    equal(
545                        "; register the worker for this queue before locking or acknowledging jobs",
546                    )
547                }
548            }
549
550            when message_mentions_the_worker_foreign_key_by_name {
551                let message = "jobs_lock_by_worker_type_fkey conflict";
552                to recommends_registering_the_worker_via_named_constraint {
553                    equal(
554                        "; register the worker for this queue before locking or acknowledging jobs",
555                    )
556                }
557            }
558
559            when message_mentions_the_legacy_foreign_key_by_name {
560                let message = "jobs_lock_by_fkey conflict";
561                to recommends_registering_the_worker_via_legacy_named_constraint {
562                    equal(
563                        "; register the worker for this queue before locking or acknowledging jobs",
564                    )
565                }
566            }
567
568            when message_is_unrelated_to_any_known_signal {
569                let message = "deadlock detected on update";
570                to returns_no_hint { equal("") }
571            }
572
573            when structured_constraint_matches_an_fk_name_but_table_is_not_jobs {
574                // The combined predicate at error.rs:177-184 requires BOTH
575                // table_name == "jobs" AND constraint matching a known FK.
576                // When the table differs (e.g. a custom mirror table that
577                // happens to reuse the FK name) the structured arm must NOT
578                // fire; the message-based fallback then decides.
579                let table_name = Some("custom_mirror");
580                let constraint_name = Some("jobs_lock_by_worker_type_fkey");
581                let message = "deadlock detected on update";
582                to falls_through_to_message_matching_and_returns_no_hint { equal("") }
583            }
584
585            when structured_table_is_jobs_but_constraint_is_an_unrelated_name {
586                // Sibling to the two FK-matching `when`s above: table is
587                // "jobs" but the constraint is something else (e.g. a check
588                // constraint). The structured arm must NOT fire.
589                let table_name = Some("jobs");
590                let constraint_name = Some("jobs_status_check");
591                let message = "violates check constraint";
592                to falls_through_to_message_matching_and_returns_no_hint { equal("") }
593            }
594        }
595    }
596}