carrot_cake/consumers/builders/message_handler.rs
1use lapin::options::BasicConsumeOptions;
2
3use crate::consumers::{
4 ConsumerPreStartHook, ConsumerTransientErrorHook, Handler, ProcessingMiddleware,
5 TelemetryMiddleware,
6};
7use std::sync::Arc;
8
9/// A handler processing messages from a RabbitMq queue.
10///
11/// Use [`MessageHandler::builder`] to start composing a `MessageHandler` using a fluent builder
12/// API.
13///
14/// # Learn by doing
15///
16/// Check out the [`consumer` example on GitHub](https://github.com/TrueLayer/carrot-cake/tree/main/carrot-cake/examples)
17/// to see `MessageHandler` in action.
18///
19/// The example showcases most of the available knobs and what they are used for.
20///
21/// # `MessageHandler` vs `ConsumerGroup`
22///
23/// A `MessageHandler` is always part of a [`ConsumerGroup`] - it relies on its context and
24/// inherits its group-level configuration.
25///
26/// It is possible to:
27///
28/// - override some group-level configuration for a specific
29/// message handler (see [`MessageHandlerBuilder::with_pre_start_hook`]
30/// and [`MessageHandlerBuilder::transient_error_hook`])
31/// - add on top of what the group-level configuration provides
32/// (e.g. [`MessageHandlerBuilder::with_processing_middleware`] or [`MessageHandlerBuilder::with_telemetry_middleware`]).
33///
34/// [`ConsumerGroup`]: super::ConsumerGroup
35pub struct MessageHandler<Context, Error>
36where
37 Context: Send + Sync + 'static,
38 Error: Send + Sync + 'static,
39{
40 pub(super) queue_name: String,
41 pub(super) prefetch_count_override: Option<u16>,
42 pub(super) processing_middleware_chain: Vec<Arc<dyn ProcessingMiddleware<Context, Error>>>,
43 pub(super) telemetry_middleware_chain: Vec<Arc<dyn TelemetryMiddleware<Context, Error>>>,
44 pub(super) pre_start_hooks: Vec<Arc<dyn ConsumerPreStartHook>>,
45 pub(super) priority: Option<i32>,
46 pub(super) transient_error_hook: Option<Arc<dyn ConsumerTransientErrorHook>>,
47 pub(super) handler: Arc<dyn Handler<Context = Context, Error = Error>>,
48 pub(super) consume_options: Option<BasicConsumeOptions>,
49}
50
51impl<Context, Error> MessageHandler<Context, Error>
52where
53 Context: Send + Sync + 'static,
54 Error: Send + Sync + 'static,
55{
56 /// Start building a [`MessageHandler`].
57 ///
58 /// You need to provide the name of the queue you want to consume messages from.
59 pub fn builder<T: Into<String>>(queue_name: T) -> MessageHandlerBuilder<Context, Error> {
60 MessageHandlerBuilder::new(queue_name.into())
61 }
62}
63
64/// A builder to compose a [`MessageHandler`] with a fluent API.
65///
66/// Use [`MessageHandler::builder`] as entrypoint.
67pub struct MessageHandlerBuilder<Context, Error>
68where
69 Context: Send + Sync + 'static,
70 Error: Send + Sync + 'static,
71{
72 queue_name: String,
73 prefetch_count_override: Option<u16>,
74 processing_middleware_chain: Vec<Arc<dyn ProcessingMiddleware<Context, Error>>>,
75 telemetry_middleware_chain: Vec<Arc<dyn TelemetryMiddleware<Context, Error>>>,
76 pre_start_hooks: Vec<Arc<dyn ConsumerPreStartHook>>,
77 transient_error_hook: Option<Arc<dyn ConsumerTransientErrorHook>>,
78 priority: Option<i32>,
79 consume_options: Option<BasicConsumeOptions>,
80}
81
82impl<Context, Error> MessageHandlerBuilder<Context, Error>
83where
84 Context: Send + Sync + 'static,
85 Error: Send + Sync + 'static,
86{
87 pub(super) fn new<T: Into<String>>(queue_name: T) -> Self {
88 Self {
89 queue_name: queue_name.into(),
90 prefetch_count_override: None,
91 processing_middleware_chain: vec![],
92 telemetry_middleware_chain: vec![],
93 pre_start_hooks: vec![],
94 priority: None,
95 transient_error_hook: None,
96 consume_options: None,
97 }
98 }
99
100 /// Configure the prefetch count of the handler.
101 /// If not configured, the handler inherits the prefetch count
102 /// configured at the consumer group level.
103 #[must_use]
104 pub fn with_prefetch_count(mut self, prefetch_count: u16) -> Self {
105 self.prefetch_count_override = Some(prefetch_count);
106 self
107 }
108
109 /// To configure the consume options for the handler.
110 /// If not configured, the handler inherits the prefetch count
111 /// configured at the consumer group level.
112 #[must_use]
113 pub fn with_consume_options(mut self, consume_options: BasicConsumeOptions) -> Self {
114 self.consume_options = Some(consume_options);
115 self
116 }
117
118 /// You can add processing middleware to inject logic before and after the handler logic.
119 ///
120 /// Middlewares are executed in the order they are registered: the first registered
121 /// middleware executes first on the way in and last on the way out.
122 ///
123 /// Processing middlewares registered at the [`MessageHandler`] level are executed AFTER
124 /// processing middlewares registered at the [`ConsumerGroup`] level.
125 ///
126 /// Check out [`ProcessingMiddleware`](crate::consumers::ProcessingMiddleware)'s documentation for more details.
127 ///
128 /// [`ConsumerGroup`]: super::ConsumerGroup
129 #[must_use]
130 pub fn with_processing_middleware<M: ProcessingMiddleware<Context, Error>>(
131 self,
132 middleware: M,
133 ) -> Self {
134 self.with_dyn_processing_middleware(Arc::new(middleware))
135 }
136
137 /// Append dynamic processing middleware logic, see [`MessageHandlerBuilder::with_processing_middleware`].
138 #[must_use]
139 pub fn with_dyn_processing_middleware(
140 mut self,
141 middleware: Arc<dyn ProcessingMiddleware<Context, Error>>,
142 ) -> Self {
143 self.processing_middleware_chain.push(middleware);
144 self
145 }
146
147 /// Append multiple dynamic processing middlewares, see [`MessageHandlerBuilder::with_processing_middleware`].
148 #[must_use]
149 pub fn with_processing_middlewares<I>(mut self, middlewares: I) -> Self
150 where
151 I: IntoIterator<Item = Arc<dyn ProcessingMiddleware<Context, Error>>>,
152 {
153 self.processing_middleware_chain.extend(middlewares);
154 self
155 }
156
157 /// You can add telemetry middleware to inject logic before and after the message processing
158 /// has taken place.
159 ///
160 /// Telemetry middlewares are executed before all processing middlewares and the handler.
161 /// Telemetry middlewares are executed in the order they are registered: the first registered
162 /// telemetry middleware executes first on the way in and last on the way out.
163 ///
164 /// Telemetry middlewares registered at the [`MessageHandler`] level are executed AFTER
165 /// telemetry middlewares registered at the [`ConsumerGroup`] level.
166 /// Telemetry middlewares registered at the [`MessageHandler`] level are executed BEFORE
167 /// processing middlewares registered at the [`ConsumerGroup`] level and [`MessageHandler`] level.
168 ///
169 /// Check out [`TelemetryMiddleware`]'s documentation for more details.
170 ///
171 /// [`ConsumerGroup`]: super::ConsumerGroup
172 #[must_use]
173 pub fn with_telemetry_middleware<M: TelemetryMiddleware<Context, Error>>(
174 self,
175 middleware: M,
176 ) -> Self {
177 self.with_dyn_telemetry_middleware(Arc::new(middleware))
178 }
179
180 /// Append dynamic telemetry middleware logic, see [`MessageHandlerBuilder::with_telemetry_middleware`].
181 #[must_use]
182 pub fn with_dyn_telemetry_middleware(
183 mut self,
184 middleware: Arc<dyn TelemetryMiddleware<Context, Error>>,
185 ) -> Self {
186 self.telemetry_middleware_chain.push(middleware);
187 self
188 }
189
190 /// Append multiple dynamic telemetry middlewares, see [`MessageHandlerBuilder::with_telemetry_middleware`].
191 #[must_use]
192 pub fn with_telemetry_middlewares<I>(mut self, middlewares: I) -> Self
193 where
194 I: IntoIterator<Item = Arc<dyn TelemetryMiddleware<Context, Error>>>,
195 {
196 self.telemetry_middleware_chain.extend(middlewares);
197 self
198 }
199
200 /// Pre-start hooks are executed _before_ consumers start pulling messages from queues.
201 /// Pre-start hooks are used to execute setup logic for resources against the message broker -
202 /// e.g. create exchanges, bind queues, etc.
203 ///
204 /// Check out [`ConsumerPreStartHook`](crate::consumers::ConsumerPreStartHook)'s documentation
205 /// for more details.
206 ///
207 /// If no pre-start hook is specified at the [`MessageHandler`] level, the hook
208 /// specified at the [`ConsumerGroup`] level is executed.
209 ///
210 /// [`ConsumerGroup`]: super::ConsumerGroup
211 #[must_use]
212 pub fn with_pre_start_hook<H: ConsumerPreStartHook>(mut self, hook: H) -> Self {
213 self.pre_start_hooks.push(Arc::new(hook));
214 self
215 }
216
217 /// Append multiple pre-start hooks, see [`MessageHandlerBuilder::with_pre_start_hook`].
218 #[must_use]
219 pub fn with_pre_start_hooks<I>(mut self, hooks: I) -> Self
220 where
221 I: IntoIterator<Item = Arc<dyn ConsumerPreStartHook>>,
222 {
223 self.pre_start_hooks.extend(hooks);
224 self
225 }
226
227 /// Consumer priorities allow you to ensure that high priority consumers receive messages
228 /// while they are active, with messages only going to lower priority consumers when the
229 /// high priority consumers block.
230 ///
231 /// Sets the `x-priority` consume argument.
232 /// See <https://www.rabbitmq.com/consumer-priority.html#how-to-use>.
233 ///
234 /// Default not specified, equivalent to `0`.
235 #[must_use]
236 pub fn with_priority(mut self, priority: i32) -> Self {
237 self.priority = Some(priority);
238 self
239 }
240
241 /// A transient error hook allows you to customise the behaviour of the message handlers
242 /// when the processing of an incoming message fails with an error classified as transient -
243 /// e.g. retryable.
244 ///
245 /// Check out [`ConsumerTransientErrorHook`](crate::consumers::ConsumerTransientErrorHook)'s
246 /// documentation for more details.
247 ///
248 /// If no transient error hook is specified at the [`MessageHandler`] level, the hook
249 /// specified at the [`ConsumerGroup`] level is executed.
250 ///
251 /// [`ConsumerGroup`]: super::ConsumerGroup
252 #[must_use]
253 pub fn transient_error_hook<H: ConsumerTransientErrorHook>(self, hook: H) -> Self {
254 self.dyn_transient_error_hook(Arc::new(hook))
255 }
256
257 /// A version of [`MessageHandlerBuilder::transient_error_hook`] for already Arc-ed hooks.
258 ///
259 /// Useful for sharing `!Clone` hooks.
260 ///
261 /// # Example
262 /// ```no_run
263 /// # use std::sync::Arc;
264 /// # use carrot_cake::consumers::{MessageHandlerBuilder, ConsumerTransientErrorHook};
265 /// # let message_handler_builder: MessageHandlerBuilder<(), ()> = unimplemented!();
266 /// # let other_message_handler_builder: MessageHandlerBuilder<(), ()> = unimplemented!();
267 /// let hook: Arc<dyn ConsumerTransientErrorHook>; // some dynamic shared error hook
268 /// # hook = unimplemented!();
269 ///
270 /// message_handler_builder.dyn_transient_error_hook(Arc::clone(&hook));
271 /// other_message_handler_builder.dyn_transient_error_hook(hook);
272 /// ```
273 #[must_use]
274 pub fn dyn_transient_error_hook(mut self, hook: Arc<dyn ConsumerTransientErrorHook>) -> Self {
275 self.transient_error_hook = Some(hook);
276 self
277 }
278
279 /// The handler used to process incoming messages.
280 ///
281 /// Check out [`Handler`]'s documentation for more details.
282 ///
283 /// Passing in the handler finalises the `MessageHandler` construction - you will
284 /// not be able to register additional middlewares or hooks after having specified the handler.
285 pub fn handler<H: Handler<Context = Context, Error = Error>>(
286 self,
287 handler: H,
288 ) -> MessageHandler<Context, Error> {
289 self.arc_handler(Arc::new(handler))
290 }
291
292 /// The `Arc<handler>` used to process incoming messages.
293 ///
294 /// Check out [`Handler`]'s documentation for more details.
295 ///
296 /// Passing in the handler finalises the `MessageHandler` construction - you will
297 /// not be able to register additional middlewares or hooks after having specified the handler.
298 pub fn arc_handler(
299 self,
300 handler: Arc<dyn Handler<Context = Context, Error = Error>>,
301 ) -> MessageHandler<Context, Error> {
302 let Self {
303 queue_name,
304 prefetch_count_override,
305 processing_middleware_chain,
306 telemetry_middleware_chain,
307 pre_start_hooks,
308 priority,
309 transient_error_hook,
310 consume_options,
311 } = self;
312 MessageHandler {
313 queue_name,
314 prefetch_count_override,
315 processing_middleware_chain,
316 telemetry_middleware_chain,
317 pre_start_hooks,
318 priority,
319 transient_error_hook,
320 handler,
321 consume_options,
322 }
323 }
324}