strut_rabbitmq/transport/outbound/publisher.rs
1use crate::transport::outbound::publisher::api::{BatchPublishingResult, PublishingResult};
2use crate::transport::outbound::publisher::inner::{
3 BatchTransmissionResult, Confirmed, ConfirmedBatch, NotTransmitted, PartlyTransmittedBatch,
4 TransmissionResult, Transmitted, TransmittedBatch,
5};
6use crate::util::RetrievePushMap;
7use crate::{Connector, DeliveryMode, Dispatch, Egress, Gateway, Handle};
8use lapin::options::{BasicPublishOptions, ConfirmSelectOptions};
9use lapin::Channel;
10use nonempty::NonEmpty;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
15use tracing::error;
16
17pub mod api;
18mod inner;
19
20/// Publishes outgoing [`Dispatch`]es to the RabbitMQ cluster.
21///
22/// Distinguishes single publishing
23/// ([`try_publish`](Publisher::try_publish) and [`publish`](Publisher::publish))
24/// from batch publishing ([`try_publish_many`](Publisher::try_publish_many) and
25/// [`publish_many`](Publisher::publish_many)).
26///
27/// Also, distinguishes fail-fast publishing
28/// ([`try_publish`](Publisher::try_publish) and
29/// [`try_publish_many`](Publisher::try_publish_many)) from error-less
30/// publishing ([`publish`](Publisher::publish) and
31/// [`publish_many`](Publisher::publish_many)).
32///
33/// ## Connection
34///
35/// This publisher delegates establishing connection and creation of [`Channel`]s
36/// to [`Connector`], which must be [started](Connector::start) before creating
37/// a publisher.
38///
39/// No more than one [`Channel`] is being kept by this publisher, and it is
40/// re-fetched whenever a connection issue is suspected.
41///
42/// ## Configuration
43///
44/// All publishing configuration is off-loaded to [`Egress`].
45///
46/// One important part of the egress configuration is
47/// [`ConfirmationLevel`](crate::ConfirmationLevel). This level has significant
48/// consequences for the publishing process, as described in its documentation.
49///
50/// ## Publishing
51///
52/// Publishing a [`Dispatch`] to RabbitMQ is a two-step process:
53///
54/// 1. **Transmit** the dispatch payload over network to the broker.
55/// 2. **Confirm** with the broker the successful reception of the message.
56///
57/// The transmission step plays out the same way regardless of the configuration.
58/// This publisher always transmits one dispatch at a time (there should be no
59/// benefit in transmitting dispatches in parallel in a single channel).
60///
61/// The confirmation step depends a lot on the
62/// [confirmation level](crate::ConfirmationLevel) selected on the
63/// [egress](Egress).
64///
65/// ### Batch benefits
66///
67/// The publishing of a [`Dispatch`] may be a one-step or a two-step process
68/// depending on the [confirmation level](crate::ConfirmationLevel) of this
69/// publisher’s [`Egress`].
70///
71/// The first step (transmission of the message to the broker) does not benefit
72/// from batching since the messages are transmitted one at a time. However, the
73/// second step (confirmation with the broker), if executed, can benefit from
74/// the batch approach.
75///
76/// ### Publishing API
77///
78/// The following four publishing methods are exposed.
79///
80/// All publishing methods return all [`Dispatch`]es that were passed into them
81/// (both in the happy path and in the case of an error). It is up to the caller
82/// to then either drop the dispatches or use them for a different purpose (e.g.,
83/// also publish them via a different [`Publisher`]).
84///
85/// #### [`try_publish`](Publisher::try_publish): single [`Dispatch`], fail-fast
86///
87/// Attempts once to publish a single dispatch and returns an error as soon as
88/// something goes wrong.
89///
90/// #### [`publish`](Publisher::publish): single [`Dispatch`], error-less
91///
92/// Repeatedly attempts to publish a single dispatch and returns only once the
93/// message is confirmed.
94///
95/// #### [`try_publish_many`](Publisher::try_publish_many): batch of [`Dispatch`]es, fail-fast
96///
97/// Attempts once to publish a batch of dispatches and returns an error as soon
98/// as something goes wrong.
99///
100/// #### [`publish_many`](Publisher::publish_many): batch of [`Dispatch`]es, error-less
101///
102/// Repeatedly attempts to publish a batch of dispatches and returns only once
103/// all the messages are confirmed.
104pub struct Publisher {
105 /// The globally unique name of this publisher, for logging/debugging
106 /// purposes.
107 name: Arc<str>,
108 /// The [`Egress`] used by this publisher to transport outgoing dispatches.
109 egress: Egress,
110 /// The [`Gateway`] for the RabbitMQ [`Channel`]s, as returned by
111 /// [`Connector`].
112 gateway: Gateway,
113 /// The current [`Channel`] of this publisher.
114 channel: AsyncMutex<Option<Channel>>,
115}
116
117impl Publisher {
118 /// Creates and returns a new [`Publisher`].
119 pub fn new(gateway: Gateway, egress: Egress) -> Self {
120 let name = Self::compose_name(&egress);
121 let channel = AsyncMutex::new(None);
122
123 Self {
124 name,
125 egress,
126 gateway,
127 channel,
128 }
129 }
130
131 /// Starts a new [`Connector`] with the given [`Handle`] and uses it to create
132 /// and return a new [`Publisher`] for the given [`Egress`].
133 pub fn start(handle: impl AsRef<Handle>, egress: Egress) -> Self {
134 let gateway = Connector::start(handle);
135
136 Self::new(gateway, egress)
137 }
138
139 /// Composes a globally unique, human-readable name for this [`Publisher`].
140 fn compose_name(egress: &Egress) -> Arc<str> {
141 static COUNTER: AtomicUsize = AtomicUsize::new(0);
142
143 Arc::from(format!(
144 "rabbitmq:pub:{}:{}",
145 egress.name(),
146 COUNTER.fetch_add(1, Ordering::Relaxed),
147 ))
148 }
149}
150
151impl Publisher {
152 /// Reports the name of this [`Publisher`].
153 pub fn name(&self) -> &str {
154 &self.name
155 }
156}
157
158impl Publisher {
159 /// Attempts once to publish a single [`Dispatch`] and returns an error as
160 /// soon as something goes wrong.
161 ///
162 /// The provided dispatch is returned back to the caller, both in the happy
163 /// path and in the case of an error.
164 ///
165 /// This is a fail-fast version of the single-dispatch publishing. For the
166 /// error-less approach, use [`publish`](Publisher::publish).
167 pub async fn try_publish(&self, dispatch: impl Into<Dispatch>) -> PublishingResult {
168 // Dive in
169 let dispatch = dispatch.into();
170
171 // Transmit and fail fast
172 let transmitted = self.try_transmit(dispatch).await?;
173
174 // Confirm and fail fast again
175 let confirmed = transmitted.confirm(self.name.as_ref()).await?;
176
177 // Return the confirmed dispatch
178 Ok(Dispatch::from(confirmed))
179 }
180
181 /// Repeatedly attempts to publish a single [`Dispatch`] and returns only
182 /// once the message is confirmed.
183 ///
184 /// The provided dispatch is returned back to the caller.
185 ///
186 /// Note that the [`ConfirmationLevel`](crate::ConfirmationLevel) on this
187 /// publisher’s [`Egress`] will significantly affect the publishing
188 /// semantics.
189 ///
190 /// This is an error-less version of the single-dispatch publishing. For the
191 /// fail-fast approach, use [`try_publish`](Publisher::try_publish).
192 pub async fn publish(&self, dispatch: impl Into<Dispatch>) -> Dispatch {
193 // Dive in
194 let dispatch = dispatch.into();
195
196 // Push the given dispatch until it is confirmed
197 let confirmed = self.push_one_until_confirmed(dispatch).await;
198
199 // Return the confirmed dispatch
200 Dispatch::from(confirmed)
201 }
202
203 /// Attempts once to publish a batch of [`Dispatch`]es and returns an error
204 /// as soon as something goes wrong.
205 ///
206 /// The provided dispatches are all returned back to the caller, both in the
207 /// happy path and in the case of an error.
208 ///
209 /// This is a fail-fast version of the batch publishing. For the error-less
210 /// approach, use [`publish_many`](Publisher::publish_many).
211 pub async fn try_publish_many<I>(&self, dispatches: I) -> BatchPublishingResult
212 where
213 I: IntoIterator,
214 I::Item: Into<Dispatch>,
215 {
216 // Package input into a vector
217 let dispatches = dispatches.into_iter().map(Into::into).collect::<Vec<_>>();
218
219 // Short circuit
220 if dispatches.is_empty() {
221 return Ok(dispatches);
222 }
223
224 // Transmit the batch but don’t fail fast just yet (give a chance to
225 // partially transmitted batches)
226 let batch_transmission_result = self.try_transmit_many(dispatches).await;
227
228 // Attempt to confirm the (partially) transmitted dispatches, and fail
229 // fast here
230 let confirmed_batch = match batch_transmission_result {
231 Ok(transmitted_batch) => transmitted_batch.confirm(self.name.as_ref()).await?,
232 Err(partly_transmitted_batch) => {
233 partly_transmitted_batch.confirm(self.name.as_ref()).await?
234 }
235 };
236
237 // Return the confirmed dispatches
238 Ok(Vec::from(confirmed_batch))
239 }
240
241 /// Repeatedly attempts to publish a batch of [`Dispatch`]es and returns
242 /// only once all the messages are confirmed.
243 ///
244 /// The provided dispatches are all returned back to the caller.
245 ///
246 /// Note that the [`ConfirmationLevel`](crate::ConfirmationLevel) on this
247 /// publisher’s [`Egress`] will significantly affect the publishing
248 /// semantics.
249 ///
250 /// This is an error-less version of the batch publishing. For the fail-fast
251 /// approach, use [`try_publish_many`](Publisher::try_publish_many).
252 pub async fn publish_many<I>(&self, dispatches: I) -> Vec<Dispatch>
253 where
254 I: IntoIterator,
255 I::Item: Into<Dispatch>,
256 {
257 // Package input into a vector
258 let dispatches = dispatches.into_iter().map(Into::into).collect::<Vec<_>>();
259
260 // Short circuit
261 if dispatches.is_empty() {
262 return dispatches;
263 }
264
265 // Push the given dispatches until they are all confirmed
266 let confirmed_batch = self.push_many_until_confirmed(dispatches).await;
267
268 // Return the confirmed dispatches
269 Vec::from(confirmed_batch)
270 }
271}
272
273impl Publisher {
274 /// Repeatedly publishes the given [`Dispatch`]es until all of them are both
275 /// transmitted and confirmed.
276 async fn push_many_until_confirmed(&self, dispatches: Vec<Dispatch>) -> ConfirmedBatch {
277 // Re-structure dispatches into a deque for easy double-sided access
278 let mut pending_dispatches = VecDeque::from(dispatches);
279
280 // Prepare storage for confirmed dispatches
281 let mut confirmed_dispatches = Vec::with_capacity(pending_dispatches.len());
282
283 // In a happy path, we will send everything in one iteration of this loop
284 while !pending_dispatches.is_empty() {
285 // Prepare storage for dispatches transmitted in this iteration
286 let mut transmitted_dispatches = Vec::with_capacity(pending_dispatches.len());
287
288 // Transmit all pending dispatches
289 for dispatch in pending_dispatches.drain(..) {
290 // Transmit the dispatch
291 let transmitted = self.push_one_until_transmitted(dispatch).await;
292
293 // Queue up the transmission result
294 transmitted_dispatches.push(transmitted);
295 }
296
297 // Now, try to confirm all transmitted batches
298 for transmitted in transmitted_dispatches {
299 // Push for confirmation
300 let confirmation_result = transmitted.confirm(self.name.as_ref()).await;
301
302 // Check out the result
303 match confirmation_result {
304 // Transmitted and confirmed: all good with this one
305 Ok(confirmed) => {
306 confirmed_dispatches.push(confirmed);
307 continue;
308 }
309
310 // Transmitted but not confirmed: we’ll have to transmit again
311 Err(not_confirmed) => {
312 // Return the failed dispatch back into the pending collection
313 pending_dispatches.push_back(Dispatch::from(not_confirmed));
314 }
315 }
316 }
317 }
318
319 ConfirmedBatch {
320 dispatches: confirmed_dispatches,
321 }
322 }
323
324 /// Repeatedly publishes the given [`Dispatch`] until it is both transmitted
325 /// and confirmed.
326 async fn push_one_until_confirmed(&self, mut dispatch: Dispatch) -> Confirmed {
327 // Keep trying until transmission is successful and confirmed
328 loop {
329 // Transmit and confirm
330 let transmitted = self.push_one_until_transmitted(dispatch).await;
331 let confirmation_result = transmitted.confirm(self.name.as_ref()).await;
332
333 // Inspect the outcome
334 match confirmation_result {
335 // Transmitted and confirmed: all good with this one
336 Ok(confirmed) => {
337 return confirmed;
338 }
339
340 // Transmitted but not confirmed: we’ll have to transmit again
341 Err(not_confirmed) => {
342 // Put the failed dispatch back into the hot seat
343 dispatch = Dispatch::from(not_confirmed);
344 }
345 }
346 }
347 }
348
349 /// Repeatedly transmits the given [`Dispatch`] until it is successfully
350 /// transmitted.
351 async fn push_one_until_transmitted(&self, mut dispatch: Dispatch) -> Transmitted {
352 // Keep trying until transmission is successful
353 loop {
354 // Send a message
355 let outcome = self.try_transmit(dispatch).await;
356
357 // Check out the outcome
358 match outcome {
359 // Successfully transmitted: return
360 Ok(transmitted) => {
361 return transmitted;
362 }
363
364 // Failed to transmit: try again
365 Err(non_transmitted) => {
366 // Put the non-transmitted dispatch back into hot seat
367 dispatch = Dispatch::from(non_transmitted);
368 }
369 };
370 }
371 }
372
373 /// Attempts to transmit the given [`Dispatch`]es once, and gives up as soon
374 /// as anything goes wrong.
375 async fn try_transmit_many(&self, dispatches: Vec<Dispatch>) -> BatchTransmissionResult {
376 // Prepare storage for transmitted and non-transmitted dispatches
377 let mut transmitted_dispatches = Vec::new();
378 let mut not_transmitted_dispatches = Vec::new();
379
380 // Create an iterator for the dispatches
381 let mut remaining = dispatches.into_iter();
382
383 // Follow the happy path and iterate until first failure
384 while let Some(dispatch) = remaining.next() {
385 // Attempt to transmit
386 let transmitted_result = self.try_transmit(dispatch).await;
387
388 // Break on first failure
389 match transmitted_result {
390 Ok(transmitted) => transmitted_dispatches.push(transmitted),
391 Err(not_transmitted) => {
392 not_transmitted_dispatches.push(not_transmitted);
393 break;
394 }
395 }
396 }
397
398 // If there are still dispatches remaining, last sending attempt failed
399 while let Some(dispatch) = remaining.next() {
400 // Record all remaining dispatches as “not attempted”
401 not_transmitted_dispatches.push(NotTransmitted::NotAttempted(dispatch));
402 }
403
404 // Check whether there were any issues
405 if let Some(not_transmitted_dispatches) = NonEmpty::from_vec(not_transmitted_dispatches) {
406 // Issue detected, return the appropriate error
407 return Err(PartlyTransmittedBatch {
408 transmitted_dispatches,
409 not_transmitted_dispatches,
410 });
411 }
412
413 Ok(TransmittedBatch {
414 dispatches: transmitted_dispatches,
415 })
416 }
417
418 /// Attempts to transmit the given [`Dispatch`] once, and gives up as soon
419 /// as anything goes wrong.
420 async fn try_transmit(&self, dispatch: Dispatch) -> TransmissionResult {
421 // Prepare message properties for publishing, optionally forcing durability
422 let mut properties = dispatch.properties();
423 if self.egress.force_durable() {
424 properties = properties.push_delivery_mode(DeliveryMode::Durable);
425 }
426
427 // Grab the channel
428 let (mut channel_guard, channel) = self.grab_channel().await;
429
430 // Infer the routing key
431 let routing_key = dispatch
432 .routing_key()
433 .unwrap_or_else(|| self.egress.routing_key());
434
435 // Publish the message and store the initial result
436 let result = channel
437 .basic_publish(
438 self.egress.exchange(),
439 routing_key,
440 BasicPublishOptions {
441 mandatory: self.egress.requires_mandatory_publish(),
442 immediate: false, // this flag is not supported and ignored by RabbitMQ v3+
443 },
444 dispatch.bytes(),
445 properties,
446 )
447 .await;
448
449 // If all is good, wrap channel in `Some`, otherwise drop it
450 let optional_channel = result.is_ok().then(|| channel);
451
452 // Put the `Option` of channel back
453 *channel_guard = optional_channel;
454 drop(channel_guard);
455
456 // Inspect whether the message was pushed successfully
457 match result {
458 // RabbitMQ received the message
459 Ok(future_confirm) => {
460 // Good hit
461 Ok(Transmitted {
462 dispatch,
463 future_confirm,
464 })
465 }
466
467 // RabbitMQ did not receive the message (likely a connectivity issue)
468 Err(error) => {
469 error!(
470 alert = true,
471 publisher = self.name.as_ref(),
472 ?error,
473 error_message = %error,
474 byte_preview = String::from_utf8_lossy(dispatch.bytes()).as_ref(),
475 "Failed to publish a message to RabbitMQ (did not transmit over network)",
476 );
477 Err(NotTransmitted::TransmissionError(dispatch, error))
478 }
479 }
480 }
481
482 /// Encapsulates obtaining a channel either from under the lock, or by
483 /// fetching a fresh one.
484 async fn grab_channel(&self) -> (MutexGuard<'_, Option<Channel>>, Channel) {
485 // Obtain the channel guard
486 let mut channel_guard = self.channel.lock().await;
487
488 // Either take the channel or fetch a fresh one
489 let channel = match channel_guard.take() {
490 Some(channel) => channel,
491 None => self.fetch_channel().await,
492 };
493
494 // Return the pair
495 (channel_guard, channel)
496 }
497
498 /// Fetches a fresh channel. If the egress definition requires publisher
499 /// confirms, this method will call the appropriate method on the channel
500 /// before returning it.
501 ///
502 /// Fetching of a fresh channel may take a long time (depends on connectivity
503 /// to RabbitMQ), but when the channel is returned, it is generally in a
504 /// healthy state.
505 async fn fetch_channel(&self) -> Channel {
506 // Repeat until we manage to both retrieve and configure the channel
507 loop {
508 // Retrieve a channel
509 let channel = self.gateway.channel().await;
510
511 // Check if publisher confirms are required on the channel
512 if self.egress.requires_any_confirmation() {
513 // Enable publisher confirms
514 let result = channel
515 .confirm_select(ConfirmSelectOptions { nowait: false })
516 .await;
517
518 // Check the result
519 if let Err(error) = result {
520 // Report
521 error!(
522 alert = true,
523 publisher = self.name.as_ref(),
524 ?error,
525 error_message = %error,
526 "Failed to enable publisher confirms on a RabbitMQ channel",
527 );
528
529 // Try again with a different channel
530 continue;
531 }
532 }
533
534 return channel;
535 }
536 }
537}