carrot_cake/consumers/
telemetry_middleware.rs

1//! Middleware types are heavily inspired by `tide`'s approach to middleware.
2use crate::consumers::processing_middleware::Next;
3use crate::consumers::{
4    ConsumerTransientErrorHook, ErrorType, HandlerError, Incoming, ShouldRequeue,
5};
6use lapin::options::{BasicAckOptions, BasicNackOptions, BasicRejectOptions};
7use std::sync::Arc;
8
9use super::incoming_message::Delivery;
10
11/// Middlewares to collect and emit telemetry data based on the outcome of message processing.
12///
13/// # Use case
14///
15/// `TelemetryMiddleware`s get **read-only** access to the input and outputs of message processing.
16///
17/// `TelemetryMiddleware`s execute after **all** the message processing has taken place,
18/// including ack/nacking with the broker. They are therefore best places to emit logs,
19/// collect metrics, etc.. - telemetry!
20///
21/// Before the handler is executed, telemetry middlewares can:
22///
23/// - Extract information from the incoming message and [record it in the message extensions];
24///
25/// After the handler has been executed, middlewares can:
26///
27/// - [Extract information recorded in the message extensions] to perform a task;
28/// - Perform actions based on the handler's outcome (e.g. log errors).
29///
30/// # What middleware should I use?
31///
32/// Does the processing outcome (success/failure) change based on the logic executed in the middleware?
33///
34/// If yes, use a [`ProcessingMiddleware`].
35/// If no, use a `TelemetryMiddleware`.
36///
37/// [record it in the message extensions]: crate::consumers::get_message_local_item
38/// [Extract information recorded in the message extensions]: crate::consumers::set_message_local_item
39/// [`ProcessingMiddleware`]: crate::consumers::ProcessingMiddleware
40#[async_trait::async_trait]
41pub trait TelemetryMiddleware<Context, Error>: 'static + Send + Sync {
42    /// Asynchronously handle the request, and return a response.
43    async fn handle<'a>(
44        &'a self,
45        incoming: &'a Incoming<Context>,
46        next: MessageProcessing<'a, Context, Error>,
47    ) -> ProcessingOutcome<Error>;
48}
49
50/// The remainder of the middleware chain (telemetry + processing), including the final message handler.
51#[allow(missing_debug_implementations)]
52pub struct MessageProcessing<'a, Context, Error> {
53    /// Logic to handle transient failures returned by the processing chain.
54    pub(super) transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
55    /// The chain of processing middlewares, including the final message handler.
56    pub(super) processing_chain: Next<'a, Context, Error>,
57    /// The remainder of the telemetry middleware chain.
58    pub(super) next_telemetry_middleware: &'a [Arc<dyn TelemetryMiddleware<Context, Error>>],
59}
60
61/// The outcome of message processing:
62/// - processing middleware chain;
63/// - message handler;
64/// - ack/nack against the AMQP broker.
65///
66/// [`ProcessingOutcome`] is what [`TelemetryMiddleware`]s work with on the way out in
67/// the middleware execution pipeline.
68///
69/// You can convert into a `Result` using [`ProcessingOutcome::result`].
70///
71/// # Why a struct?
72///
73/// [`TelemetryMiddleware`] should pass the message processing outcome unaltered along
74/// the telemetry middleware chain.
75/// To ensure no tampering (mostly by mistake), we encapsulate `Result<(), ProcessingError>` into
76/// a struct, [`ProcessingOutcome`].
77/// [`ProcessingOutcome`] does not expose any constructor: it is impossible to build a new
78/// [`ProcessingOutcome`] in [`TelemetryMiddleware::handle`]. The telemetry middleware is forced
79/// to propagate the outcome returned by [`MessageProcessing`]
80#[derive(Debug)]
81pub struct ProcessingOutcome<Error> {
82    outcome: Result<(), ProcessingError<Error>>,
83    broker_action: BrokerAction,
84}
85
86impl<Error> ProcessingOutcome<Error> {
87    pub fn result(&self) -> &Result<(), ProcessingError<Error>> {
88        &self.outcome
89    }
90
91    /// Returns `true` if we instructed the broker to requeue the message after a transient failure
92    /// in processing. Returns `false` otherwise.
93    ///
94    /// It returns `true` even if we experienced an issue when dispatching the nack instruction to
95    /// the AMQP broker (e.g. network timeout).
96    pub fn was_requeued(&self) -> bool {
97        match &self.broker_action {
98            BrokerAction::Ack => false,
99            BrokerAction::Nack => true,
100            BrokerAction::Reject => false,
101        }
102    }
103}
104
105#[derive(thiserror::Error, Debug)]
106pub enum ProcessingError<Error> {
107    /// An error was encountered while processing the message.
108    #[error("An error was encountered while processing of the message.")]
109    HandlerError(HandlerError<Error>),
110    /// Failed to ack message.
111    #[error("Failed to ack message.")]
112    AckError(#[source] anyhow::Error),
113    /// Failed to nack message.
114    #[error("Failed to nack message.")]
115    NackError {
116        #[source]
117        error: anyhow::Error,
118
119        /// The processing error that led us to try to tell the AMQP broker to nack the message.
120        handler_error: Option<Error>,
121
122        error_type: ErrorType,
123    },
124}
125
126impl<'a, Context: 'static, Error: 'static> MessageProcessing<'a, Context, Error> {
127    /// Asynchronously execute the remaining middleware chain.
128    pub async fn run(mut self, incoming: &Incoming<Context>) -> ProcessingOutcome<Error> {
129        // If there is at least one middleware in the chain, get a reference to it and store
130        // the remaining ones in `next_middleware`.
131        // Then call the middleware passing `self` in the handler, recursively.
132        if let Some((current, next)) = self.next_telemetry_middleware.split_first() {
133            self.next_telemetry_middleware = next;
134            current.handle(incoming, self).await
135        } else {
136            // We have now executed all telemetry middlewares (or simply there were none).
137            // Time to kick-off the processing chain: processing middlewares + handler.
138            let Self {
139                transient_error_hook,
140                processing_chain,
141                ..
142            } = self;
143
144            let message = &incoming.message;
145            let outcome = processing_chain.run(incoming).await;
146
147            match ack_or_nack(transient_error_hook, message, &outcome).await {
148                Ok(broker_action) => ProcessingOutcome {
149                    outcome: outcome.map_err(ProcessingError::HandlerError).map(|_| ()),
150                    broker_action,
151                },
152                Err(inner_error) => match inner_error {
153                    InnerBrokerError::AckError(e) => ProcessingOutcome {
154                        outcome: Err(ProcessingError::AckError(e)),
155                        broker_action: BrokerAction::Ack,
156                    },
157                    InnerBrokerError::NackError { error, requeue } => {
158                        // We can get a Nack failure independently of the value returned by the handler
159                        // because it can return both `Ok(BrokerAction::Nack)` and `Err(HandlerError { error_type: Transient })`.
160                        // If we failed to Nack a message while processing a success (e.g., an `Ok(_)` return),
161                        // the only sensible thing to do is map it to a transient error.
162                        let (error_type, handler_error) = match outcome {
163                            Ok(_) => (ErrorType::Transient, None),
164                            Err(e) => (e.error_type, Some(e.inner_error)),
165                        };
166                        ProcessingOutcome {
167                            outcome: Err(ProcessingError::NackError {
168                                error,
169                                handler_error,
170                                error_type,
171                            }),
172                            broker_action: requeue.into(),
173                        }
174                    }
175                },
176            }
177        }
178    }
179}
180
181enum InnerBrokerError {
182    AckError(anyhow::Error),
183    NackError {
184        error: anyhow::Error,
185        requeue: ShouldRequeue,
186    },
187}
188
189/// The action we asked the broker to take when finalising the processing of
190/// the current message.
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub enum BrokerAction {
193    /// Positive acknowledgement - the message can be removed from the queue.
194    /// This happens if the message was processed successfully or
195    /// as a consequence of a transient error the message will be retried later.
196    Ack,
197    /// Negative acknowledgement - the message was not processed successfully and should be requeued
198    /// to retry processing.
199    Nack,
200    /// Rejection. The message was not processed successfully and should NOT be requeued.
201    /// The message will be sent to the dead letter exchange if configured.
202    Reject,
203}
204
205impl BrokerAction {
206    async fn execute(&self, acker: &lapin::acker::Acker) -> Result<(), InnerBrokerError> {
207        match self {
208            // Acknowledge the message
209            Self::Ack => {
210                let ack_options = BasicAckOptions { multiple: false };
211
212                acker
213                    .ack(ack_options)
214                    .await
215                    .map_err(anyhow::Error::from)
216                    .map_err(InnerBrokerError::AckError)
217            }
218
219            // Put the message back to the queue
220            Self::Nack => {
221                let nack_options = BasicNackOptions {
222                    multiple: false,
223                    requeue: true,
224                };
225
226                acker
227                    .nack(nack_options)
228                    .await
229                    .map_err(anyhow::Error::from)
230                    .map_err(|e| InnerBrokerError::NackError {
231                        error: e,
232                        requeue: ShouldRequeue::Requeue,
233                    })
234            }
235
236            // Remove the message from the queue and send it to the DLQ (if configured)
237            Self::Reject => {
238                let reject_options = BasicRejectOptions { requeue: false };
239
240                acker
241                    .reject(reject_options)
242                    .await
243                    .map_err(anyhow::Error::from)
244                    .map_err(|e| InnerBrokerError::NackError {
245                        error: e,
246                        requeue: ShouldRequeue::DeadLetterOrDiscard,
247                    })
248            }
249        }
250    }
251}
252
253/// Based on the outcome of processing communicate with the AMQP broker to ack/nack/reject the message.
254/// If processing failed, it takes care to determine (via the transient error hook) if the message
255/// should be requeued (nack), requeued with ack (ack) or not (reject).
256async fn ack_or_nack<Error>(
257    transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
258    message: &Delivery,
259    outcome: &Result<BrokerAction, HandlerError<Error>>,
260) -> Result<BrokerAction, InnerBrokerError> {
261    match outcome {
262        Ok(action) => {
263            action.execute(&message.acker).await?;
264            Ok(*action)
265        }
266        Err(e) => match e.error_type {
267            ErrorType::Fatal => handle_fatal_error(message).await,
268            ErrorType::Transient => handle_transient_error(message, transient_error_hook).await,
269        },
270    }
271}
272
273/// Removes the message from the queue, rejecting it.
274/// If a dead letter exchange has been configured, the rejected message will be delivered to it.
275async fn handle_fatal_error(message: &Delivery) -> Result<BrokerAction, InnerBrokerError> {
276    BrokerAction::Reject.execute(&message.acker).await?;
277    Ok(BrokerAction::Reject)
278}
279
280/// Determines how to handle a transient error on the basis of transient error hook:
281/// - `ShouldRequeue::Requeue`: the message is nacked and re-delivered to the queue.
282/// - `ShouldRequeue::DoNotRequeue`: the message is rejected
283/// and sent to the dead letter exchange if configured.
284/// - `ShouldRequeue::DoNotRequeueAck`: the message is acked.
285async fn handle_transient_error(
286    message: &Delivery,
287    transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
288) -> Result<BrokerAction, InnerBrokerError> {
289    let action: BrokerAction = transient_error_hook
290        .on_transient_error(message)
291        .await
292        .into();
293
294    action.execute(&message.acker).await?;
295    Ok(action)
296}