1use std::borrow::Cow;
2
3use apalis_core::error::BoxDynError;
4
5#[derive(Debug, thiserror::Error)]
7#[non_exhaustive]
8pub enum Error {
9 #[error("database error while {operation}: {source}{hint}", hint = database_hint(source))]
11 Database {
12 operation: Cow<'static, str>,
14 #[source]
16 source: diesel::result::Error,
17 },
18
19 #[error(
21 "failed to acquire PostgreSQL connection from r2d2 pool: {0}; check that DATABASE_URL points to a reachable PostgreSQL server and that the pool has enough connections"
22 )]
23 Pool(#[from] diesel::r2d2::PoolError),
24
25 #[error("blocking task failed: {0}")]
27 Blocking(#[source] BoxDynError),
28
29 #[error("failed to run embedded migrations: {0}")]
31 Migration(#[source] BoxDynError),
32
33 #[error("failed to convert database row into an Apalis task: {0}")]
35 Row(#[source] BoxDynError),
36
37 #[error("invalid argument: {0}")]
39 InvalidArgument(String),
40
41 #[error(
55 "idempotency_key conflict in queue `{job_type}`: keys {conflicting_keys:?} collided with the unique constraint; {total} task(s) in the batch were all rolled back"
56 )]
57 IdempotencyConflict {
58 job_type: String,
60 conflicting_keys: Vec<String>,
63 total: usize,
65 },
66
67 #[error("failed to decode task payload or result with the configured codec: {0}")]
69 Decode(#[source] BoxDynError),
70
71 #[error("json error: {0}")]
73 Json(#[from] serde_json::Error),
74
75 #[error(
77 "task metadata is missing required field `{0}`; this usually means the task did not go through the expected poll/lock/ack lifecycle"
78 )]
79 MissingField(&'static str),
80
81 #[error("worker registration already exists or is being registered concurrently: {0}")]
83 AlreadyRegistered(String),
84
85 #[error("task not found while {operation} (task_id: {task_id}, queue: {queue}); {hint}")]
87 TaskNotFound {
88 operation: Cow<'static, str>,
90 task_id: String,
92 queue: String,
94 hint: &'static str,
96 },
97
98 #[error(
100 "stale acknowledgement for task {task_id} in queue {queue} by worker {worker_id}; the task is no longer Running with the same lock owner, attempt, and lock timestamp"
101 )]
102 StaleAcknowledgement {
103 task_id: String,
105 queue: String,
107 worker_id: String,
109 },
110
111 #[error(
113 "worker not registered while {operation} (worker_id: {worker_id}, queue: {queue}); {hint}"
114 )]
115 WorkerNotRegistered {
116 operation: Cow<'static, str>,
118 worker_id: String,
120 queue: String,
122 hint: &'static str,
124 },
125
126 #[error(
128 "PostgreSQL notification listener failed: {0}; polling fallback can still fetch jobs, but LISTEN/NOTIFY wakeups are disabled until the stream is recreated"
129 )]
130 NotifyListener(String),
131
132 #[error("sink buffer is full; call poll_ready before start_send (capacity: {0})")]
134 SinkBufferFull(usize),
135}
136
137impl From<diesel::result::Error> for Error {
147 fn from(source: diesel::result::Error) -> Self {
148 Self::Database {
149 operation: Cow::Borrowed(
150 "diesel transaction begin/commit/rollback (unlabeled — use map_err inside the closure)",
151 ),
152 source,
153 }
154 }
155}
156
157impl Error {
158 pub(crate) fn database(
159 operation: impl Into<Cow<'static, str>>,
160 ) -> impl FnOnce(diesel::result::Error) -> Self {
161 let operation = operation.into();
162 move |source| Self::Database { operation, source }
163 }
164
165 pub(crate) fn task_not_found(
166 operation: impl Into<Cow<'static, str>>,
167 task_id: impl Into<String>,
168 queue: Option<String>,
169 hint: &'static str,
170 ) -> Self {
171 Self::TaskNotFound {
172 operation: operation.into(),
173 task_id: task_id.into(),
174 queue: queue.unwrap_or_else(|| "<not constrained>".to_owned()),
175 hint,
176 }
177 }
178
179 pub(crate) fn stale_acknowledgement(
180 task_id: impl Into<String>,
181 queue: impl Into<String>,
182 worker_id: impl Into<String>,
183 ) -> Self {
184 Self::StaleAcknowledgement {
185 task_id: task_id.into(),
186 queue: queue.into(),
187 worker_id: worker_id.into(),
188 }
189 }
190
191 pub(crate) fn worker_not_registered(
192 operation: impl Into<Cow<'static, str>>,
193 worker_id: impl Into<String>,
194 queue: impl Into<String>,
195 hint: &'static str,
196 ) -> Self {
197 Self::WorkerNotRegistered {
198 operation: operation.into(),
199 worker_id: worker_id.into(),
200 queue: queue.into(),
201 hint,
202 }
203 }
204
205 pub(crate) fn idempotency_conflict(
206 job_type: impl Into<String>,
207 conflicting_keys: Vec<String>,
208 total: usize,
209 ) -> Self {
210 Self::IdempotencyConflict {
211 job_type: job_type.into(),
212 conflicting_keys,
213 total,
214 }
215 }
216}
217
218fn database_hint(error: &diesel::result::Error) -> &'static str {
219 use diesel::result::Error as DieselError;
220 match error {
221 DieselError::DatabaseError(_, info) => {
227 if matches!(info.table_name(), Some(name) if name == "jobs")
228 && matches!(
229 info.constraint_name(),
230 Some(name)
231 if name == "jobs_lock_by_worker_type_fkey"
232 || name == "jobs_lock_by_fkey"
233 )
234 {
235 return "; register the worker for this queue before locking or acknowledging jobs";
236 }
237 let message = info.message();
241 if message.contains("apalis.jobs")
242 && (message.contains("does not exist") || message.contains("relation"))
243 {
244 "; run apalis_diesel_postgres::setup(&pool).await before using the storage"
245 } else if message.contains("foreign key")
246 || message.contains("jobs_lock_by_worker_type_fkey")
247 || message.contains("jobs_lock_by_fkey")
248 {
249 "; register the worker for this queue before locking or acknowledging jobs"
250 } else {
251 ""
252 }
253 }
254 _ => "",
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use diesel::result::{DatabaseErrorInformation, DatabaseErrorKind, Error as DieselError};
262 use lets_expect::{AssertionError, AssertionResult, *};
263 use std::error::Error as StdError;
264
265 struct StubInfo {
266 message: &'static str,
267 table_name: Option<&'static str>,
268 constraint_name: Option<&'static str>,
269 }
270
271 impl DatabaseErrorInformation for StubInfo {
272 fn message(&self) -> &str {
273 self.message
274 }
275 fn details(&self) -> Option<&str> {
276 None
277 }
278 fn hint(&self) -> Option<&str> {
279 None
280 }
281 fn table_name(&self) -> Option<&str> {
282 self.table_name
283 }
284 fn column_name(&self) -> Option<&str> {
285 None
286 }
287 fn constraint_name(&self) -> Option<&str> {
288 self.constraint_name
289 }
290 fn statement_position(&self) -> Option<i32> {
291 None
292 }
293 }
294
295 fn database_error_with(
296 message: &'static str,
297 table_name: Option<&'static str>,
298 constraint_name: Option<&'static str>,
299 ) -> DieselError {
300 DieselError::DatabaseError(
301 DatabaseErrorKind::Unknown,
302 Box::new(StubInfo {
303 message,
304 table_name,
305 constraint_name,
306 }),
307 )
308 }
309
310 fn hint_for(
311 message: &'static str,
312 table_name: Option<&'static str>,
313 constraint_name: Option<&'static str>,
314 ) -> &'static str {
315 database_hint(&database_error_with(message, table_name, constraint_name))
316 }
317
318 fn non_database_hint() -> &'static str {
319 database_hint(&DieselError::NotFound)
320 }
321
322 fn json_error() -> serde_json::Error {
323 serde_json::from_str::<serde_json::Value>("not json").unwrap_err()
324 }
325
326 fn boxed_error(message: &'static str) -> BoxDynError {
327 Box::new(std::io::Error::other(message))
328 }
329
330 fn database_error() -> Error {
331 Error::Database {
332 operation: Cow::Borrowed("fetching jobs"),
333 source: diesel::result::Error::NotFound,
334 }
335 }
336
337 fn displays_as(expected: &'static str) -> impl Fn(&Error) -> AssertionResult {
338 move |error| {
339 let actual = error.to_string();
340 if actual == expected {
341 Ok(())
342 } else {
343 Err(AssertionError::new(vec![format!(
344 "expected display {expected:?}, got {actual:?}"
345 )]))
346 }
347 }
348 }
349
350 fn has_source_containing(expected: &'static str) -> impl Fn(&Error) -> AssertionResult {
351 move |error| match StdError::source(error) {
352 Some(source) if source.to_string().contains(expected) => Ok(()),
353 Some(source) => Err(AssertionError::new(vec![format!(
354 "expected source containing {expected:?}, got {:?}",
355 source.to_string()
356 )])),
357 None => Err(AssertionError::new(vec![format!(
358 "expected source containing {expected:?}, got no source"
359 )])),
360 }
361 }
362
363 fn has_no_source(error: &Error) -> AssertionResult {
364 match StdError::source(error) {
365 None => Ok(()),
366 Some(source) => Err(AssertionError::new(vec![format!(
367 "expected no source, got {:?}",
368 source.to_string()
369 )])),
370 }
371 }
372
373 fn is_task_not_found(error: &Error) -> AssertionResult {
374 match error {
375 Error::TaskNotFound {
376 operation,
377 task_id,
378 queue,
379 ..
380 } if *operation == "locking task" && task_id == "task-1" && queue == "queue-1" => {
381 Ok(())
382 }
383 other => Err(AssertionError::new(vec![format!(
384 "expected task not found, got {other:?}"
385 )])),
386 }
387 }
388
389 fn is_idempotency_conflict(error: &Error) -> AssertionResult {
390 match error {
391 Error::IdempotencyConflict {
392 job_type,
393 conflicting_keys,
394 total,
395 } if job_type == "emails"
396 && conflicting_keys.len() == 2
397 && conflicting_keys[0] == "k-1"
398 && conflicting_keys[1] == "k-2"
399 && *total == 3 =>
400 {
401 Ok(())
402 }
403 other => Err(AssertionError::new(vec![format!(
404 "expected idempotency conflict {{emails, [k-1, k-2], 3}}, got {other:?}"
405 )])),
406 }
407 }
408
409 lets_expect! {
410 expect(database_error()) {
411 to displays_the_operation_context { displays_as("database error while fetching jobs: Record not found") }
412 to exposes_the_database_error_as_the_source { has_source_containing("Record not found") }
413 }
414
415 expect(Error::Blocking(boxed_error("join cancelled"))) {
416 to displays_the_blocking_error { displays_as("blocking task failed: join cancelled") }
417 to exposes_the_blocking_error_as_the_source { has_source_containing("join cancelled") }
418 }
419
420 expect(Error::Migration(boxed_error("missing migration"))) {
421 to displays_the_migration_error { displays_as("failed to run embedded migrations: missing migration") }
422 to exposes_the_migration_error_as_the_source { has_source_containing("missing migration") }
423 }
424
425 expect(Error::Row(boxed_error("bad task row"))) {
426 to displays_the_row_conversion_error { displays_as("failed to convert database row into an Apalis task: bad task row") }
427 to exposes_the_row_error_as_the_source { has_source_containing("bad task row") }
428 }
429
430 expect(Error::Decode(boxed_error("bad payload"))) {
431 to displays_the_decode_error { displays_as("failed to decode task payload or result with the configured codec: bad payload") }
432 to exposes_the_decode_error_as_the_source { has_source_containing("bad payload") }
433 }
434
435 expect(Error::Json(json_error())) {
436 to displays_the_json_error { displays_as("json error: expected ident at line 1 column 2") }
437 to exposes_the_json_error_as_the_source { has_source_containing("expected ident") }
438 }
439
440 expect(Error::MissingField("run_at")) {
441 to displays_the_missing_field_error { displays_as("task metadata is missing required field `run_at`; this usually means the task did not go through the expected poll/lock/ack lifecycle") }
442 to has_no_error_source { has_no_source }
443 }
444
445 expect(Error::AlreadyRegistered("worker-1".to_string())) {
446 to displays_the_registration_error { displays_as("worker registration already exists or is being registered concurrently: worker-1") }
447 to has_no_error_source { has_no_source }
448 }
449
450 expect(Error::idempotency_conflict(
451 "emails",
452 vec!["k-1".to_owned(), "k-2".to_owned()],
453 3,
454 )) {
455 to identifies_the_conflicting_queue_and_keys { is_idempotency_conflict }
456 to displays_the_conflict_and_rollback_semantics { displays_as("idempotency_key conflict in queue `emails`: keys [\"k-1\", \"k-2\"] collided with the unique constraint; 3 task(s) in the batch were all rolled back") }
457 to has_no_error_source { has_no_source }
458 }
459
460 expect(Error::task_not_found(
461 "locking task",
462 "task-1",
463 Some("queue-1".to_owned()),
464 "the task may be delayed, already locked by another worker, completed, or in another queue",
465 )) {
466 to returns_a_contextual_task_not_found_error { is_task_not_found }
467 to displays_the_next_step { displays_as("task not found while locking task (task_id: task-1, queue: queue-1); the task may be delayed, already locked by another worker, completed, or in another queue") }
468 }
469
470 expect(Error::stale_acknowledgement("task-1", "queue-1", "worker-1")) {
471 to displays_the_ack_conflict { displays_as("stale acknowledgement for task task-1 in queue queue-1 by worker worker-1; the task is no longer Running with the same lock owner, attempt, and lock timestamp") }
472 }
473
474 expect(Error::worker_not_registered(
475 "updating worker heartbeat",
476 "worker-1",
477 "queue-1",
478 "recreate the worker stream so registration can run again",
479 )) {
480 to displays_the_worker_registration_problem { displays_as("worker not registered while updating worker heartbeat (worker_id: worker-1, queue: queue-1); recreate the worker stream so registration can run again") }
481 }
482
483 expect(Error::NotifyListener("LISTEN failed".to_owned())) {
484 to displays_the_notify_degradation { displays_as("PostgreSQL notification listener failed: LISTEN failed; polling fallback can still fetch jobs, but LISTEN/NOTIFY wakeups are disabled until the stream is recreated") }
485 }
486
487 expect(Error::SinkBufferFull(1)) {
488 to displays_the_sink_buffer_error { displays_as("sink buffer is full; call poll_ready before start_send (capacity: 1)") }
489 to has_no_error_source { has_no_source }
490 }
491
492 expect(non_database_hint()) {
493 when diesel_error_is_not_a_database_variant {
494 to returns_no_hint { equal("") }
495 }
496 }
497
498 expect(hint_for(message, table_name, constraint_name)) {
499 let message = "irrelevant";
500 let table_name: Option<&'static str> = None;
501 let constraint_name: Option<&'static str> = None;
502
503 when structured_info_points_at_the_worker_type_foreign_key {
504 let table_name = Some("jobs");
505 let constraint_name = Some("jobs_lock_by_worker_type_fkey");
506 to recommends_registering_the_worker {
507 equal(
508 "; register the worker for this queue before locking or acknowledging jobs",
509 )
510 }
511 }
512
513 when structured_info_points_at_the_legacy_lock_by_foreign_key {
514 let table_name = Some("jobs");
515 let constraint_name = Some("jobs_lock_by_fkey");
516 to recommends_registering_the_worker_via_the_legacy_constraint {
517 equal(
518 "; register the worker for this queue before locking or acknowledging jobs",
519 )
520 }
521 }
522
523 when message_indicates_a_missing_apalis_jobs_relation_with_does_not_exist {
524 let message = "relation \"apalis.jobs\" does not exist";
525 to recommends_running_setup {
526 equal(
527 "; run apalis_diesel_postgres::setup(&pool).await before using the storage",
528 )
529 }
530 }
531
532 when message_indicates_a_missing_apalis_jobs_relation_via_the_word_relation {
533 let message = "missing relation apalis.jobs from schema";
534 to recommends_running_setup_via_the_relation_match {
535 equal(
536 "; run apalis_diesel_postgres::setup(&pool).await before using the storage",
537 )
538 }
539 }
540
541 when message_mentions_a_generic_foreign_key_violation {
542 let message = "foreign key constraint violated";
543 to recommends_registering_the_worker_via_message {
544 equal(
545 "; register the worker for this queue before locking or acknowledging jobs",
546 )
547 }
548 }
549
550 when message_mentions_the_worker_foreign_key_by_name {
551 let message = "jobs_lock_by_worker_type_fkey conflict";
552 to recommends_registering_the_worker_via_named_constraint {
553 equal(
554 "; register the worker for this queue before locking or acknowledging jobs",
555 )
556 }
557 }
558
559 when message_mentions_the_legacy_foreign_key_by_name {
560 let message = "jobs_lock_by_fkey conflict";
561 to recommends_registering_the_worker_via_legacy_named_constraint {
562 equal(
563 "; register the worker for this queue before locking or acknowledging jobs",
564 )
565 }
566 }
567
568 when message_is_unrelated_to_any_known_signal {
569 let message = "deadlock detected on update";
570 to returns_no_hint { equal("") }
571 }
572
573 when structured_constraint_matches_an_fk_name_but_table_is_not_jobs {
574 let table_name = Some("custom_mirror");
580 let constraint_name = Some("jobs_lock_by_worker_type_fkey");
581 let message = "deadlock detected on update";
582 to falls_through_to_message_matching_and_returns_no_hint { equal("") }
583 }
584
585 when structured_table_is_jobs_but_constraint_is_an_unrelated_name {
586 let table_name = Some("jobs");
590 let constraint_name = Some("jobs_status_check");
591 let message = "violates check constraint";
592 to falls_through_to_message_matching_and_returns_no_hint { equal("") }
593 }
594 }
595 }
596}