carrot_cake/consumers/builders/
message_handler.rs

1use lapin::options::BasicConsumeOptions;
2
3use crate::consumers::{
4    ConsumerPreStartHook, ConsumerTransientErrorHook, Handler, ProcessingMiddleware,
5    TelemetryMiddleware,
6};
7use std::sync::Arc;
8
9/// A handler processing messages from a RabbitMq queue.
10///
11/// Use [`MessageHandler::builder`] to start composing a `MessageHandler` using a fluent builder
12/// API.
13///
14/// # Learn by doing
15///
16/// Check out the [`consumer` example on GitHub](https://github.com/TrueLayer/carrot-cake/tree/main/carrot-cake/examples)
17/// to see `MessageHandler` in action.
18///
19/// The example showcases most of the available knobs and what they are used for.
20///
21/// # `MessageHandler` vs `ConsumerGroup`
22///
23/// A `MessageHandler` is always part of a [`ConsumerGroup`] - it relies on its context and
24/// inherits its group-level configuration.
25///
26/// It is possible to:
27///
28/// - override some group-level configuration for a specific
29/// message handler (see [`MessageHandlerBuilder::with_pre_start_hook`]
30/// and [`MessageHandlerBuilder::transient_error_hook`])
31/// - add on top of what the group-level configuration provides
32/// (e.g. [`MessageHandlerBuilder::with_processing_middleware`] or [`MessageHandlerBuilder::with_telemetry_middleware`]).
33///
34/// [`ConsumerGroup`]: super::ConsumerGroup
35pub struct MessageHandler<Context, Error>
36where
37    Context: Send + Sync + 'static,
38    Error: Send + Sync + 'static,
39{
40    pub(super) queue_name: String,
41    pub(super) prefetch_count_override: Option<u16>,
42    pub(super) processing_middleware_chain: Vec<Arc<dyn ProcessingMiddleware<Context, Error>>>,
43    pub(super) telemetry_middleware_chain: Vec<Arc<dyn TelemetryMiddleware<Context, Error>>>,
44    pub(super) pre_start_hooks: Vec<Arc<dyn ConsumerPreStartHook>>,
45    pub(super) priority: Option<i32>,
46    pub(super) transient_error_hook: Option<Arc<dyn ConsumerTransientErrorHook>>,
47    pub(super) handler: Arc<dyn Handler<Context = Context, Error = Error>>,
48    pub(super) consume_options: Option<BasicConsumeOptions>,
49}
50
51impl<Context, Error> MessageHandler<Context, Error>
52where
53    Context: Send + Sync + 'static,
54    Error: Send + Sync + 'static,
55{
56    /// Start building a [`MessageHandler`].
57    ///
58    /// You need to provide the name of the queue you want to consume messages from.
59    pub fn builder<T: Into<String>>(queue_name: T) -> MessageHandlerBuilder<Context, Error> {
60        MessageHandlerBuilder::new(queue_name.into())
61    }
62}
63
64/// A builder to compose a [`MessageHandler`] with a fluent API.
65///
66/// Use [`MessageHandler::builder`] as entrypoint.
67pub struct MessageHandlerBuilder<Context, Error>
68where
69    Context: Send + Sync + 'static,
70    Error: Send + Sync + 'static,
71{
72    queue_name: String,
73    prefetch_count_override: Option<u16>,
74    processing_middleware_chain: Vec<Arc<dyn ProcessingMiddleware<Context, Error>>>,
75    telemetry_middleware_chain: Vec<Arc<dyn TelemetryMiddleware<Context, Error>>>,
76    pre_start_hooks: Vec<Arc<dyn ConsumerPreStartHook>>,
77    transient_error_hook: Option<Arc<dyn ConsumerTransientErrorHook>>,
78    priority: Option<i32>,
79    consume_options: Option<BasicConsumeOptions>,
80}
81
82impl<Context, Error> MessageHandlerBuilder<Context, Error>
83where
84    Context: Send + Sync + 'static,
85    Error: Send + Sync + 'static,
86{
87    pub(super) fn new<T: Into<String>>(queue_name: T) -> Self {
88        Self {
89            queue_name: queue_name.into(),
90            prefetch_count_override: None,
91            processing_middleware_chain: vec![],
92            telemetry_middleware_chain: vec![],
93            pre_start_hooks: vec![],
94            priority: None,
95            transient_error_hook: None,
96            consume_options: None,
97        }
98    }
99
100    /// Configure the prefetch count of the handler.
101    /// If not configured, the handler inherits the prefetch count
102    /// configured at the consumer group level.
103    #[must_use]
104    pub fn with_prefetch_count(mut self, prefetch_count: u16) -> Self {
105        self.prefetch_count_override = Some(prefetch_count);
106        self
107    }
108
109    /// To configure the consume options for the handler.
110    /// If not configured, the handler inherits the prefetch count
111    /// configured at the consumer group level.
112    #[must_use]
113    pub fn with_consume_options(mut self, consume_options: BasicConsumeOptions) -> Self {
114        self.consume_options = Some(consume_options);
115        self
116    }
117
118    /// You can add processing middleware to inject logic before and after the handler logic.
119    ///
120    /// Middlewares are executed in the order they are registered: the first registered
121    /// middleware executes first on the way in and last on the way out.
122    ///
123    /// Processing middlewares registered at the [`MessageHandler`] level are executed AFTER
124    /// processing middlewares registered at the [`ConsumerGroup`] level.
125    ///
126    /// Check out [`ProcessingMiddleware`](crate::consumers::ProcessingMiddleware)'s documentation for more details.
127    ///
128    /// [`ConsumerGroup`]: super::ConsumerGroup
129    #[must_use]
130    pub fn with_processing_middleware<M: ProcessingMiddleware<Context, Error>>(
131        self,
132        middleware: M,
133    ) -> Self {
134        self.with_dyn_processing_middleware(Arc::new(middleware))
135    }
136
137    /// Append dynamic processing middleware logic, see [`MessageHandlerBuilder::with_processing_middleware`].
138    #[must_use]
139    pub fn with_dyn_processing_middleware(
140        mut self,
141        middleware: Arc<dyn ProcessingMiddleware<Context, Error>>,
142    ) -> Self {
143        self.processing_middleware_chain.push(middleware);
144        self
145    }
146
147    /// Append multiple dynamic processing middlewares, see [`MessageHandlerBuilder::with_processing_middleware`].
148    #[must_use]
149    pub fn with_processing_middlewares<I>(mut self, middlewares: I) -> Self
150    where
151        I: IntoIterator<Item = Arc<dyn ProcessingMiddleware<Context, Error>>>,
152    {
153        self.processing_middleware_chain.extend(middlewares);
154        self
155    }
156
157    /// You can add telemetry middleware to inject logic before and after the message processing
158    /// has taken place.
159    ///
160    /// Telemetry middlewares are executed before all processing middlewares and the handler.
161    /// Telemetry middlewares are executed in the order they are registered: the first registered
162    /// telemetry middleware executes first on the way in and last on the way out.
163    ///
164    /// Telemetry middlewares registered at the [`MessageHandler`] level are executed AFTER
165    /// telemetry middlewares registered at the [`ConsumerGroup`] level.
166    /// Telemetry middlewares registered at the [`MessageHandler`] level are executed BEFORE
167    /// processing middlewares registered at the [`ConsumerGroup`] level and [`MessageHandler`] level.
168    ///
169    /// Check out [`TelemetryMiddleware`]'s documentation for more details.
170    ///
171    /// [`ConsumerGroup`]: super::ConsumerGroup
172    #[must_use]
173    pub fn with_telemetry_middleware<M: TelemetryMiddleware<Context, Error>>(
174        self,
175        middleware: M,
176    ) -> Self {
177        self.with_dyn_telemetry_middleware(Arc::new(middleware))
178    }
179
180    /// Append dynamic telemetry middleware logic, see [`MessageHandlerBuilder::with_telemetry_middleware`].
181    #[must_use]
182    pub fn with_dyn_telemetry_middleware(
183        mut self,
184        middleware: Arc<dyn TelemetryMiddleware<Context, Error>>,
185    ) -> Self {
186        self.telemetry_middleware_chain.push(middleware);
187        self
188    }
189
190    /// Append multiple dynamic telemetry middlewares, see [`MessageHandlerBuilder::with_telemetry_middleware`].
191    #[must_use]
192    pub fn with_telemetry_middlewares<I>(mut self, middlewares: I) -> Self
193    where
194        I: IntoIterator<Item = Arc<dyn TelemetryMiddleware<Context, Error>>>,
195    {
196        self.telemetry_middleware_chain.extend(middlewares);
197        self
198    }
199
200    /// Pre-start hooks are executed _before_ consumers start pulling messages from queues.
201    /// Pre-start hooks are used to execute setup logic for resources against the message broker -
202    /// e.g. create exchanges, bind queues, etc.
203    ///
204    /// Check out [`ConsumerPreStartHook`](crate::consumers::ConsumerPreStartHook)'s documentation
205    /// for more details.
206    ///
207    /// If no pre-start hook is specified at the [`MessageHandler`] level, the hook
208    /// specified at the [`ConsumerGroup`] level is executed.
209    ///
210    /// [`ConsumerGroup`]: super::ConsumerGroup
211    #[must_use]
212    pub fn with_pre_start_hook<H: ConsumerPreStartHook>(mut self, hook: H) -> Self {
213        self.pre_start_hooks.push(Arc::new(hook));
214        self
215    }
216
217    /// Append multiple pre-start hooks, see [`MessageHandlerBuilder::with_pre_start_hook`].
218    #[must_use]
219    pub fn with_pre_start_hooks<I>(mut self, hooks: I) -> Self
220    where
221        I: IntoIterator<Item = Arc<dyn ConsumerPreStartHook>>,
222    {
223        self.pre_start_hooks.extend(hooks);
224        self
225    }
226
227    /// Consumer priorities allow you to ensure that high priority consumers receive messages
228    /// while they are active, with messages only going to lower priority consumers when the
229    /// high priority consumers block.
230    ///
231    /// Sets the `x-priority` consume argument.
232    /// See <https://www.rabbitmq.com/consumer-priority.html#how-to-use>.
233    ///
234    /// Default not specified, equivalent to `0`.
235    #[must_use]
236    pub fn with_priority(mut self, priority: i32) -> Self {
237        self.priority = Some(priority);
238        self
239    }
240
241    /// A transient error hook allows you to customise the behaviour of the message handlers
242    /// when the processing of an incoming message fails with an error classified as transient -
243    /// e.g. retryable.
244    ///
245    /// Check out [`ConsumerTransientErrorHook`](crate::consumers::ConsumerTransientErrorHook)'s
246    /// documentation for more details.
247    ///
248    /// If no transient error hook is specified at the [`MessageHandler`] level, the hook
249    /// specified at the [`ConsumerGroup`] level is executed.
250    ///
251    /// [`ConsumerGroup`]: super::ConsumerGroup
252    #[must_use]
253    pub fn transient_error_hook<H: ConsumerTransientErrorHook>(self, hook: H) -> Self {
254        self.dyn_transient_error_hook(Arc::new(hook))
255    }
256
257    /// A version of [`MessageHandlerBuilder::transient_error_hook`] for already Arc-ed hooks.
258    ///
259    /// Useful for sharing `!Clone` hooks.
260    ///
261    /// # Example
262    /// ```no_run
263    /// # use std::sync::Arc;
264    /// # use carrot_cake::consumers::{MessageHandlerBuilder, ConsumerTransientErrorHook};
265    /// # let message_handler_builder: MessageHandlerBuilder<(), ()> = unimplemented!();
266    /// # let other_message_handler_builder: MessageHandlerBuilder<(), ()> = unimplemented!();
267    /// let hook: Arc<dyn ConsumerTransientErrorHook>; // some dynamic shared error hook
268    /// # hook = unimplemented!();
269    ///
270    /// message_handler_builder.dyn_transient_error_hook(Arc::clone(&hook));
271    /// other_message_handler_builder.dyn_transient_error_hook(hook);
272    /// ```
273    #[must_use]
274    pub fn dyn_transient_error_hook(mut self, hook: Arc<dyn ConsumerTransientErrorHook>) -> Self {
275        self.transient_error_hook = Some(hook);
276        self
277    }
278
279    /// The handler used to process incoming messages.
280    ///
281    /// Check out [`Handler`]'s documentation for more details.
282    ///
283    /// Passing in the handler finalises the `MessageHandler` construction - you will
284    /// not be able to register additional middlewares or hooks after having specified the handler.
285    pub fn handler<H: Handler<Context = Context, Error = Error>>(
286        self,
287        handler: H,
288    ) -> MessageHandler<Context, Error> {
289        self.arc_handler(Arc::new(handler))
290    }
291
292    /// The `Arc<handler>` used to process incoming messages.
293    ///
294    /// Check out [`Handler`]'s documentation for more details.
295    ///
296    /// Passing in the handler finalises the `MessageHandler` construction - you will
297    /// not be able to register additional middlewares or hooks after having specified the handler.
298    pub fn arc_handler(
299        self,
300        handler: Arc<dyn Handler<Context = Context, Error = Error>>,
301    ) -> MessageHandler<Context, Error> {
302        let Self {
303            queue_name,
304            prefetch_count_override,
305            processing_middleware_chain,
306            telemetry_middleware_chain,
307            pre_start_hooks,
308            priority,
309            transient_error_hook,
310            consume_options,
311        } = self;
312        MessageHandler {
313            queue_name,
314            prefetch_count_override,
315            processing_middleware_chain,
316            telemetry_middleware_chain,
317            pre_start_hooks,
318            priority,
319            transient_error_hook,
320            handler,
321            consume_options,
322        }
323    }
324}