carrot_cake/consumers/telemetry_middleware.rs
1//! Middleware types are heavily inspired by `tide`'s approach to middleware.
2use crate::consumers::processing_middleware::Next;
3use crate::consumers::{
4 ConsumerTransientErrorHook, ErrorType, HandlerError, Incoming, ShouldRequeue,
5};
6use lapin::options::{BasicAckOptions, BasicNackOptions, BasicRejectOptions};
7use std::sync::Arc;
8
9use super::incoming_message::Delivery;
10
11/// Middlewares to collect and emit telemetry data based on the outcome of message processing.
12///
13/// # Use case
14///
15/// `TelemetryMiddleware`s get **read-only** access to the input and outputs of message processing.
16///
17/// `TelemetryMiddleware`s execute after **all** the message processing has taken place,
18/// including ack/nacking with the broker. They are therefore best places to emit logs,
19/// collect metrics, etc.. - telemetry!
20///
21/// Before the handler is executed, telemetry middlewares can:
22///
23/// - Extract information from the incoming message and [record it in the message extensions];
24///
25/// After the handler has been executed, middlewares can:
26///
27/// - [Extract information recorded in the message extensions] to perform a task;
28/// - Perform actions based on the handler's outcome (e.g. log errors).
29///
30/// # What middleware should I use?
31///
32/// Does the processing outcome (success/failure) change based on the logic executed in the middleware?
33///
34/// If yes, use a [`ProcessingMiddleware`].
35/// If no, use a `TelemetryMiddleware`.
36///
37/// [record it in the message extensions]: crate::consumers::get_message_local_item
38/// [Extract information recorded in the message extensions]: crate::consumers::set_message_local_item
39/// [`ProcessingMiddleware`]: crate::consumers::ProcessingMiddleware
40#[async_trait::async_trait]
41pub trait TelemetryMiddleware<Context, Error>: 'static + Send + Sync {
42 /// Asynchronously handle the request, and return a response.
43 async fn handle<'a>(
44 &'a self,
45 incoming: &'a Incoming<Context>,
46 next: MessageProcessing<'a, Context, Error>,
47 ) -> ProcessingOutcome<Error>;
48}
49
50/// The remainder of the middleware chain (telemetry + processing), including the final message handler.
51#[allow(missing_debug_implementations)]
52pub struct MessageProcessing<'a, Context, Error> {
53 /// Logic to handle transient failures returned by the processing chain.
54 pub(super) transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
55 /// The chain of processing middlewares, including the final message handler.
56 pub(super) processing_chain: Next<'a, Context, Error>,
57 /// The remainder of the telemetry middleware chain.
58 pub(super) next_telemetry_middleware: &'a [Arc<dyn TelemetryMiddleware<Context, Error>>],
59}
60
61/// The outcome of message processing:
62/// - processing middleware chain;
63/// - message handler;
64/// - ack/nack against the AMQP broker.
65///
66/// [`ProcessingOutcome`] is what [`TelemetryMiddleware`]s work with on the way out in
67/// the middleware execution pipeline.
68///
69/// You can convert into a `Result` using [`ProcessingOutcome::result`].
70///
71/// # Why a struct?
72///
73/// [`TelemetryMiddleware`] should pass the message processing outcome unaltered along
74/// the telemetry middleware chain.
75/// To ensure no tampering (mostly by mistake), we encapsulate `Result<(), ProcessingError>` into
76/// a struct, [`ProcessingOutcome`].
77/// [`ProcessingOutcome`] does not expose any constructor: it is impossible to build a new
78/// [`ProcessingOutcome`] in [`TelemetryMiddleware::handle`]. The telemetry middleware is forced
79/// to propagate the outcome returned by [`MessageProcessing`]
80#[derive(Debug)]
81pub struct ProcessingOutcome<Error> {
82 outcome: Result<(), ProcessingError<Error>>,
83 broker_action: BrokerAction,
84}
85
86impl<Error> ProcessingOutcome<Error> {
87 pub fn result(&self) -> &Result<(), ProcessingError<Error>> {
88 &self.outcome
89 }
90
91 /// Returns `true` if we instructed the broker to requeue the message after a transient failure
92 /// in processing. Returns `false` otherwise.
93 ///
94 /// It returns `true` even if we experienced an issue when dispatching the nack instruction to
95 /// the AMQP broker (e.g. network timeout).
96 pub fn was_requeued(&self) -> bool {
97 match &self.broker_action {
98 BrokerAction::Ack => false,
99 BrokerAction::Nack => true,
100 BrokerAction::Reject => false,
101 }
102 }
103}
104
105#[derive(thiserror::Error, Debug)]
106pub enum ProcessingError<Error> {
107 /// An error was encountered while processing the message.
108 #[error("An error was encountered while processing of the message.")]
109 HandlerError(HandlerError<Error>),
110 /// Failed to ack message.
111 #[error("Failed to ack message.")]
112 AckError(#[source] anyhow::Error),
113 /// Failed to nack message.
114 #[error("Failed to nack message.")]
115 NackError {
116 #[source]
117 error: anyhow::Error,
118
119 /// The processing error that led us to try to tell the AMQP broker to nack the message.
120 handler_error: Option<Error>,
121
122 error_type: ErrorType,
123 },
124}
125
126impl<'a, Context: 'static, Error: 'static> MessageProcessing<'a, Context, Error> {
127 /// Asynchronously execute the remaining middleware chain.
128 pub async fn run(mut self, incoming: &Incoming<Context>) -> ProcessingOutcome<Error> {
129 // If there is at least one middleware in the chain, get a reference to it and store
130 // the remaining ones in `next_middleware`.
131 // Then call the middleware passing `self` in the handler, recursively.
132 if let Some((current, next)) = self.next_telemetry_middleware.split_first() {
133 self.next_telemetry_middleware = next;
134 current.handle(incoming, self).await
135 } else {
136 // We have now executed all telemetry middlewares (or simply there were none).
137 // Time to kick-off the processing chain: processing middlewares + handler.
138 let Self {
139 transient_error_hook,
140 processing_chain,
141 ..
142 } = self;
143
144 let message = &incoming.message;
145 let outcome = processing_chain.run(incoming).await;
146
147 match ack_or_nack(transient_error_hook, message, &outcome).await {
148 Ok(broker_action) => ProcessingOutcome {
149 outcome: outcome.map_err(ProcessingError::HandlerError).map(|_| ()),
150 broker_action,
151 },
152 Err(inner_error) => match inner_error {
153 InnerBrokerError::AckError(e) => ProcessingOutcome {
154 outcome: Err(ProcessingError::AckError(e)),
155 broker_action: BrokerAction::Ack,
156 },
157 InnerBrokerError::NackError { error, requeue } => {
158 // We can get a Nack failure independently of the value returned by the handler
159 // because it can return both `Ok(BrokerAction::Nack)` and `Err(HandlerError { error_type: Transient })`.
160 // If we failed to Nack a message while processing a success (e.g., an `Ok(_)` return),
161 // the only sensible thing to do is map it to a transient error.
162 let (error_type, handler_error) = match outcome {
163 Ok(_) => (ErrorType::Transient, None),
164 Err(e) => (e.error_type, Some(e.inner_error)),
165 };
166 ProcessingOutcome {
167 outcome: Err(ProcessingError::NackError {
168 error,
169 handler_error,
170 error_type,
171 }),
172 broker_action: requeue.into(),
173 }
174 }
175 },
176 }
177 }
178 }
179}
180
181enum InnerBrokerError {
182 AckError(anyhow::Error),
183 NackError {
184 error: anyhow::Error,
185 requeue: ShouldRequeue,
186 },
187}
188
189/// The action we asked the broker to take when finalising the processing of
190/// the current message.
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub enum BrokerAction {
193 /// Positive acknowledgement - the message can be removed from the queue.
194 /// This happens if the message was processed successfully or
195 /// as a consequence of a transient error the message will be retried later.
196 Ack,
197 /// Negative acknowledgement - the message was not processed successfully and should be requeued
198 /// to retry processing.
199 Nack,
200 /// Rejection. The message was not processed successfully and should NOT be requeued.
201 /// The message will be sent to the dead letter exchange if configured.
202 Reject,
203}
204
205impl BrokerAction {
206 async fn execute(&self, acker: &lapin::acker::Acker) -> Result<(), InnerBrokerError> {
207 match self {
208 // Acknowledge the message
209 Self::Ack => {
210 let ack_options = BasicAckOptions { multiple: false };
211
212 acker
213 .ack(ack_options)
214 .await
215 .map_err(anyhow::Error::from)
216 .map_err(InnerBrokerError::AckError)
217 }
218
219 // Put the message back to the queue
220 Self::Nack => {
221 let nack_options = BasicNackOptions {
222 multiple: false,
223 requeue: true,
224 };
225
226 acker
227 .nack(nack_options)
228 .await
229 .map_err(anyhow::Error::from)
230 .map_err(|e| InnerBrokerError::NackError {
231 error: e,
232 requeue: ShouldRequeue::Requeue,
233 })
234 }
235
236 // Remove the message from the queue and send it to the DLQ (if configured)
237 Self::Reject => {
238 let reject_options = BasicRejectOptions { requeue: false };
239
240 acker
241 .reject(reject_options)
242 .await
243 .map_err(anyhow::Error::from)
244 .map_err(|e| InnerBrokerError::NackError {
245 error: e,
246 requeue: ShouldRequeue::DeadLetterOrDiscard,
247 })
248 }
249 }
250 }
251}
252
253/// Based on the outcome of processing communicate with the AMQP broker to ack/nack/reject the message.
254/// If processing failed, it takes care to determine (via the transient error hook) if the message
255/// should be requeued (nack), requeued with ack (ack) or not (reject).
256async fn ack_or_nack<Error>(
257 transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
258 message: &Delivery,
259 outcome: &Result<BrokerAction, HandlerError<Error>>,
260) -> Result<BrokerAction, InnerBrokerError> {
261 match outcome {
262 Ok(action) => {
263 action.execute(&message.acker).await?;
264 Ok(*action)
265 }
266 Err(e) => match e.error_type {
267 ErrorType::Fatal => handle_fatal_error(message).await,
268 ErrorType::Transient => handle_transient_error(message, transient_error_hook).await,
269 },
270 }
271}
272
273/// Removes the message from the queue, rejecting it.
274/// If a dead letter exchange has been configured, the rejected message will be delivered to it.
275async fn handle_fatal_error(message: &Delivery) -> Result<BrokerAction, InnerBrokerError> {
276 BrokerAction::Reject.execute(&message.acker).await?;
277 Ok(BrokerAction::Reject)
278}
279
280/// Determines how to handle a transient error on the basis of transient error hook:
281/// - `ShouldRequeue::Requeue`: the message is nacked and re-delivered to the queue.
282/// - `ShouldRequeue::DoNotRequeue`: the message is rejected
283/// and sent to the dead letter exchange if configured.
284/// - `ShouldRequeue::DoNotRequeueAck`: the message is acked.
285async fn handle_transient_error(
286 message: &Delivery,
287 transient_error_hook: Arc<dyn ConsumerTransientErrorHook>,
288) -> Result<BrokerAction, InnerBrokerError> {
289 let action: BrokerAction = transient_error_hook
290 .on_transient_error(message)
291 .await
292 .into();
293
294 action.execute(&message.acker).await?;
295 Ok(action)
296}