carrot_cake/consumers/transient_error_hook.rs
1use super::{incoming_message::Delivery, BrokerAction};
2
3#[async_trait::async_trait]
4/// A hook to determine if failed messages due to a transient error should be requeued.
5///
6/// # Use case
7///
8/// The framework nacks all messages if processing returns an error.
9/// By default, those messages are never requeued.
10///
11/// If the error encountered during processing is marked as [`ErrorType::Transient`]
12/// you can inject your own logic to determine if the failed messages should be requeued using a
13/// transient error hook.
14///
15/// # On fatal errors
16///
17/// A transient error hook, as the name implies, is only invoked on transient errors.
18/// Errors marked as [`ErrorType::Fatal`] are nacked and never requeued.
19///
20/// # Plug and play implementations
21///
22/// You can find two ready-to-go hooks in the [`hooks::transient_error`] module -
23/// [`AlwaysRequeue`] and [`NeverRequeue`].
24///
25///
26/// [`ErrorType::Transient`]: crate::consumers::ErrorType::Transient
27/// [`ErrorType::Fatal`]: crate::consumers::ErrorType::Fatal
28/// [`hooks::transient_error`]: crate::consumers::hooks::transient_error
29/// [`AlwaysRequeue`]: crate::consumers::hooks::transient_error::AlwaysRequeue
30/// [`NeverRequeue`]: crate::consumers::hooks::transient_error::NeverRequeue
31pub trait ConsumerTransientErrorHook: Send + Sync + 'static {
32 /// `ConsumerRetryHook::on_transient_error` determines if the `requeue` flag should be
33 /// set to true by returning a [`ShouldRequeue`] instance.
34 ///
35 /// If [`ShouldRequeue::Requeue`] is returned, the message will be requeued and become
36 /// _immediately_ available again for consumption.
37 /// If [`ShouldRequeue::Discard`] is returned, the message will not be requeued
38 /// and it will not end up in the dead letter exchange if configured.
39 /// If [`ShouldRequeue::DeadLetterOrDiscard`] is returned, the message will be not be requeued
40 /// and it will end up in the dead letter exchange if configured.
41 async fn on_transient_error(&self, delivery: &Delivery) -> ShouldRequeue;
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45/// Determines if a failed message should be re-queued.
46///
47/// Check out [`ConsumerTransientErrorHook`]'s documentation for more details.
48pub enum ShouldRequeue {
49 Requeue,
50 Discard,
51 DeadLetterOrDiscard,
52}
53
54impl From<ShouldRequeue> for BrokerAction {
55 fn from(value: ShouldRequeue) -> Self {
56 match value {
57 ShouldRequeue::Requeue => Self::Nack,
58 ShouldRequeue::Discard => Self::Ack,
59 ShouldRequeue::DeadLetterOrDiscard => Self::Reject,
60 }
61 }
62}