strut_rabbitmq/transport/outbound/
publisher.rs

1use crate::transport::outbound::publisher::api::{BatchPublishingResult, PublishingResult};
2use crate::transport::outbound::publisher::inner::{
3    BatchTransmissionResult, Confirmed, ConfirmedBatch, NotTransmitted, PartlyTransmittedBatch,
4    TransmissionResult, Transmitted, TransmittedBatch,
5};
6use crate::util::RetrievePushMap;
7use crate::{Connector, DeliveryMode, Dispatch, Egress, Gateway, Handle};
8use lapin::options::{BasicPublishOptions, ConfirmSelectOptions};
9use lapin::Channel;
10use nonempty::NonEmpty;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
15use tracing::error;
16
17pub mod api;
18mod inner;
19
20/// Publishes outgoing [`Dispatch`]es to the RabbitMQ cluster.
21///
22/// Distinguishes single publishing
23/// ([`try_publish`](Publisher::try_publish) and [`publish`](Publisher::publish))
24/// from batch publishing ([`try_publish_many`](Publisher::try_publish_many) and
25/// [`publish_many`](Publisher::publish_many)).
26///
27/// Also, distinguishes fail-fast publishing
28/// ([`try_publish`](Publisher::try_publish) and
29/// [`try_publish_many`](Publisher::try_publish_many)) from error-less
30/// publishing ([`publish`](Publisher::publish) and
31/// [`publish_many`](Publisher::publish_many)).
32///
33/// ## Connection
34///
35/// This publisher delegates establishing connection and creation of [`Channel`]s
36/// to [`Connector`], which must be [started](Connector::start) before creating
37/// a publisher.
38///
39/// No more than one [`Channel`] is being kept by this publisher, and it is
40/// re-fetched whenever a connection issue is suspected.
41///
42/// ## Configuration
43///
44/// All publishing configuration is off-loaded to [`Egress`].
45///
46/// One important part of the egress configuration is
47/// [`ConfirmationLevel`](crate::ConfirmationLevel). This level has significant
48/// consequences for the publishing process, as described in its documentation.
49///
50/// ## Publishing
51///
52/// Publishing a [`Dispatch`] to RabbitMQ is a two-step process:
53///
54/// 1. **Transmit** the dispatch payload over network to the broker.
55/// 2. **Confirm** with the broker the successful reception of the message.
56///
57/// The transmission step plays out the same way regardless of the configuration.
58/// This publisher always transmits one dispatch at a time (there should be no
59/// benefit in transmitting dispatches in parallel in a single channel).
60///
61/// The confirmation step depends a lot on the
62/// [confirmation level](crate::ConfirmationLevel) selected on the
63/// [egress](Egress).
64///
65/// ### Batch benefits
66///
67/// The publishing of a [`Dispatch`] may be a one-step or a two-step process
68/// depending on the [confirmation level](crate::ConfirmationLevel) of this
69/// publisher’s [`Egress`].
70///
71/// The first step (transmission of the message to the broker) does not benefit
72/// from batching since the messages are transmitted one at a time. However, the
73/// second step (confirmation with the broker), if executed, can benefit from
74/// the batch approach.
75///
76/// ### Publishing API
77///
78/// The following four publishing methods are exposed.
79///
80/// All publishing methods return all [`Dispatch`]es that were passed into them
81/// (both in the happy path and in the case of an error). It is up to the caller
82/// to then either drop the dispatches or use them for a different purpose (e.g.,
83/// also publish them via a different [`Publisher`]).
84///
85/// #### [`try_publish`](Publisher::try_publish): single [`Dispatch`], fail-fast
86///
87/// Attempts once to publish a single dispatch and returns an error as soon as
88/// something goes wrong.
89///
90/// #### [`publish`](Publisher::publish): single [`Dispatch`], error-less
91///
92/// Repeatedly attempts to publish a single dispatch and returns only once the
93/// message is confirmed.
94///
95/// #### [`try_publish_many`](Publisher::try_publish_many): batch of [`Dispatch`]es, fail-fast
96///
97/// Attempts once to publish a batch of dispatches and returns an error as soon
98/// as something goes wrong.
99///
100/// #### [`publish_many`](Publisher::publish_many): batch of [`Dispatch`]es, error-less
101///
102/// Repeatedly attempts to publish a batch of dispatches and returns only once
103/// all the messages are confirmed.
104pub struct Publisher {
105    /// The globally unique name of this publisher, for logging/debugging
106    /// purposes.
107    name: Arc<str>,
108    /// The [`Egress`] used by this publisher to transport outgoing dispatches.
109    egress: Egress,
110    /// The [`Gateway`] for the RabbitMQ [`Channel`]s, as returned by
111    /// [`Connector`].
112    gateway: Gateway,
113    /// The current [`Channel`] of this publisher.
114    channel: AsyncMutex<Option<Channel>>,
115}
116
117impl Publisher {
118    /// Creates and returns a new [`Publisher`].
119    pub fn new(gateway: Gateway, egress: Egress) -> Self {
120        let name = Self::compose_name(&egress);
121        let channel = AsyncMutex::new(None);
122
123        Self {
124            name,
125            egress,
126            gateway,
127            channel,
128        }
129    }
130
131    /// Starts a new [`Connector`] with the given [`Handle`] and uses it to create
132    /// and return a new [`Publisher`] for the given [`Egress`].
133    pub fn start(handle: impl AsRef<Handle>, egress: Egress) -> Self {
134        let gateway = Connector::start(handle);
135
136        Self::new(gateway, egress)
137    }
138
139    /// Composes a globally unique, human-readable name for this [`Publisher`].
140    fn compose_name(egress: &Egress) -> Arc<str> {
141        static COUNTER: AtomicUsize = AtomicUsize::new(0);
142
143        Arc::from(format!(
144            "rabbitmq:pub:{}:{}",
145            egress.name(),
146            COUNTER.fetch_add(1, Ordering::Relaxed),
147        ))
148    }
149}
150
151impl Publisher {
152    /// Reports the name of this [`Publisher`].
153    pub fn name(&self) -> &str {
154        &self.name
155    }
156}
157
158impl Publisher {
159    /// Attempts once to publish a single [`Dispatch`] and returns an error as
160    /// soon as something goes wrong.
161    ///
162    /// The provided dispatch is returned back to the caller, both in the happy
163    /// path and in the case of an error.
164    ///
165    /// This is a fail-fast version of the single-dispatch publishing. For the
166    /// error-less approach, use [`publish`](Publisher::publish).
167    pub async fn try_publish(&self, dispatch: impl Into<Dispatch>) -> PublishingResult {
168        // Dive in
169        let dispatch = dispatch.into();
170
171        // Transmit and fail fast
172        let transmitted = self.try_transmit(dispatch).await?;
173
174        // Confirm and fail fast again
175        let confirmed = transmitted.confirm(self.name.as_ref()).await?;
176
177        // Return the confirmed dispatch
178        Ok(Dispatch::from(confirmed))
179    }
180
181    /// Repeatedly attempts to publish a single [`Dispatch`] and returns only
182    /// once the message is confirmed.
183    ///
184    /// The provided dispatch is returned back to the caller.
185    ///
186    /// Note that the [`ConfirmationLevel`](crate::ConfirmationLevel) on this
187    /// publisher’s [`Egress`] will significantly affect the publishing
188    /// semantics.
189    ///
190    /// This is an error-less version of the single-dispatch publishing. For the
191    /// fail-fast approach, use [`try_publish`](Publisher::try_publish).
192    pub async fn publish(&self, dispatch: impl Into<Dispatch>) -> Dispatch {
193        // Dive in
194        let dispatch = dispatch.into();
195
196        // Push the given dispatch until it is confirmed
197        let confirmed = self.push_one_until_confirmed(dispatch).await;
198
199        // Return the confirmed dispatch
200        Dispatch::from(confirmed)
201    }
202
203    /// Attempts once to publish a batch of [`Dispatch`]es and returns an error
204    /// as soon as something goes wrong.
205    ///
206    /// The provided dispatches are all returned back to the caller, both in the
207    /// happy path and in the case of an error.
208    ///
209    /// This is a fail-fast version of the batch publishing. For the error-less
210    /// approach, use [`publish_many`](Publisher::publish_many).
211    pub async fn try_publish_many<I>(&self, dispatches: I) -> BatchPublishingResult
212    where
213        I: IntoIterator,
214        I::Item: Into<Dispatch>,
215    {
216        // Package input into a vector
217        let dispatches = dispatches.into_iter().map(Into::into).collect::<Vec<_>>();
218
219        // Short circuit
220        if dispatches.is_empty() {
221            return Ok(dispatches);
222        }
223
224        // Transmit the batch but don’t fail fast just yet (give a chance to
225        // partially transmitted batches)
226        let batch_transmission_result = self.try_transmit_many(dispatches).await;
227
228        // Attempt to confirm the (partially) transmitted dispatches, and fail
229        // fast here
230        let confirmed_batch = match batch_transmission_result {
231            Ok(transmitted_batch) => transmitted_batch.confirm(self.name.as_ref()).await?,
232            Err(partly_transmitted_batch) => {
233                partly_transmitted_batch.confirm(self.name.as_ref()).await?
234            }
235        };
236
237        // Return the confirmed dispatches
238        Ok(Vec::from(confirmed_batch))
239    }
240
241    /// Repeatedly attempts to publish a batch of [`Dispatch`]es and returns
242    /// only once all the messages are confirmed.
243    ///
244    /// The provided dispatches are all returned back to the caller.
245    ///
246    /// Note that the [`ConfirmationLevel`](crate::ConfirmationLevel) on this
247    /// publisher’s [`Egress`] will significantly affect the publishing
248    /// semantics.
249    ///
250    /// This is an error-less version of the batch publishing. For the fail-fast
251    /// approach, use [`try_publish_many`](Publisher::try_publish_many).
252    pub async fn publish_many<I>(&self, dispatches: I) -> Vec<Dispatch>
253    where
254        I: IntoIterator,
255        I::Item: Into<Dispatch>,
256    {
257        // Package input into a vector
258        let dispatches = dispatches.into_iter().map(Into::into).collect::<Vec<_>>();
259
260        // Short circuit
261        if dispatches.is_empty() {
262            return dispatches;
263        }
264
265        // Push the given dispatches until they are all confirmed
266        let confirmed_batch = self.push_many_until_confirmed(dispatches).await;
267
268        // Return the confirmed dispatches
269        Vec::from(confirmed_batch)
270    }
271}
272
273impl Publisher {
274    /// Repeatedly publishes the given [`Dispatch`]es until all of them are both
275    /// transmitted and confirmed.
276    async fn push_many_until_confirmed(&self, dispatches: Vec<Dispatch>) -> ConfirmedBatch {
277        // Re-structure dispatches into a deque for easy double-sided access
278        let mut pending_dispatches = VecDeque::from(dispatches);
279
280        // Prepare storage for confirmed dispatches
281        let mut confirmed_dispatches = Vec::with_capacity(pending_dispatches.len());
282
283        // In a happy path, we will send everything in one iteration of this loop
284        while !pending_dispatches.is_empty() {
285            // Prepare storage for dispatches transmitted in this iteration
286            let mut transmitted_dispatches = Vec::with_capacity(pending_dispatches.len());
287
288            // Transmit all pending dispatches
289            for dispatch in pending_dispatches.drain(..) {
290                // Transmit the dispatch
291                let transmitted = self.push_one_until_transmitted(dispatch).await;
292
293                // Queue up the transmission result
294                transmitted_dispatches.push(transmitted);
295            }
296
297            // Now, try to confirm all transmitted batches
298            for transmitted in transmitted_dispatches {
299                // Push for confirmation
300                let confirmation_result = transmitted.confirm(self.name.as_ref()).await;
301
302                // Check out the result
303                match confirmation_result {
304                    // Transmitted and confirmed: all good with this one
305                    Ok(confirmed) => {
306                        confirmed_dispatches.push(confirmed);
307                        continue;
308                    }
309
310                    // Transmitted but not confirmed: we’ll have to transmit again
311                    Err(not_confirmed) => {
312                        // Return the failed dispatch back into the pending collection
313                        pending_dispatches.push_back(Dispatch::from(not_confirmed));
314                    }
315                }
316            }
317        }
318
319        ConfirmedBatch {
320            dispatches: confirmed_dispatches,
321        }
322    }
323
324    /// Repeatedly publishes the given [`Dispatch`] until it is both transmitted
325    /// and confirmed.
326    async fn push_one_until_confirmed(&self, mut dispatch: Dispatch) -> Confirmed {
327        // Keep trying until transmission is successful and confirmed
328        loop {
329            // Transmit and confirm
330            let transmitted = self.push_one_until_transmitted(dispatch).await;
331            let confirmation_result = transmitted.confirm(self.name.as_ref()).await;
332
333            // Inspect the outcome
334            match confirmation_result {
335                // Transmitted and confirmed: all good with this one
336                Ok(confirmed) => {
337                    return confirmed;
338                }
339
340                // Transmitted but not confirmed: we’ll have to transmit again
341                Err(not_confirmed) => {
342                    // Put the failed dispatch back into the hot seat
343                    dispatch = Dispatch::from(not_confirmed);
344                }
345            }
346        }
347    }
348
349    /// Repeatedly transmits the given [`Dispatch`] until it is successfully
350    /// transmitted.
351    async fn push_one_until_transmitted(&self, mut dispatch: Dispatch) -> Transmitted {
352        // Keep trying until transmission is successful
353        loop {
354            // Send a message
355            let outcome = self.try_transmit(dispatch).await;
356
357            // Check out the outcome
358            match outcome {
359                // Successfully transmitted: return
360                Ok(transmitted) => {
361                    return transmitted;
362                }
363
364                // Failed to transmit: try again
365                Err(non_transmitted) => {
366                    // Put the non-transmitted dispatch back into hot seat
367                    dispatch = Dispatch::from(non_transmitted);
368                }
369            };
370        }
371    }
372
373    /// Attempts to transmit the given [`Dispatch`]es once, and gives up as soon
374    /// as anything goes wrong.
375    async fn try_transmit_many(&self, dispatches: Vec<Dispatch>) -> BatchTransmissionResult {
376        // Prepare storage for transmitted and non-transmitted dispatches
377        let mut transmitted_dispatches = Vec::new();
378        let mut not_transmitted_dispatches = Vec::new();
379
380        // Create an iterator for the dispatches
381        let mut remaining = dispatches.into_iter();
382
383        // Follow the happy path and iterate until first failure
384        while let Some(dispatch) = remaining.next() {
385            // Attempt to transmit
386            let transmitted_result = self.try_transmit(dispatch).await;
387
388            // Break on first failure
389            match transmitted_result {
390                Ok(transmitted) => transmitted_dispatches.push(transmitted),
391                Err(not_transmitted) => {
392                    not_transmitted_dispatches.push(not_transmitted);
393                    break;
394                }
395            }
396        }
397
398        // If there are still dispatches remaining, last sending attempt failed
399        while let Some(dispatch) = remaining.next() {
400            // Record all remaining dispatches as “not attempted”
401            not_transmitted_dispatches.push(NotTransmitted::NotAttempted(dispatch));
402        }
403
404        // Check whether there were any issues
405        if let Some(not_transmitted_dispatches) = NonEmpty::from_vec(not_transmitted_dispatches) {
406            // Issue detected, return the appropriate error
407            return Err(PartlyTransmittedBatch {
408                transmitted_dispatches,
409                not_transmitted_dispatches,
410            });
411        }
412
413        Ok(TransmittedBatch {
414            dispatches: transmitted_dispatches,
415        })
416    }
417
418    /// Attempts to transmit the given [`Dispatch`] once, and gives up as soon
419    /// as anything goes wrong.
420    async fn try_transmit(&self, dispatch: Dispatch) -> TransmissionResult {
421        // Prepare message properties for publishing, optionally forcing durability
422        let mut properties = dispatch.properties();
423        if self.egress.force_durable() {
424            properties = properties.push_delivery_mode(DeliveryMode::Durable);
425        }
426
427        // Grab the channel
428        let (mut channel_guard, channel) = self.grab_channel().await;
429
430        // Infer the routing key
431        let routing_key = dispatch
432            .routing_key()
433            .unwrap_or_else(|| self.egress.routing_key());
434
435        // Publish the message and store the initial result
436        let result = channel
437            .basic_publish(
438                self.egress.exchange(),
439                routing_key,
440                BasicPublishOptions {
441                    mandatory: self.egress.requires_mandatory_publish(),
442                    immediate: false, // this flag is not supported and ignored by RabbitMQ v3+
443                },
444                dispatch.bytes(),
445                properties,
446            )
447            .await;
448
449        // If all is good, wrap channel in `Some`, otherwise drop it
450        let optional_channel = result.is_ok().then(|| channel);
451
452        // Put the `Option` of channel back
453        *channel_guard = optional_channel;
454        drop(channel_guard);
455
456        // Inspect whether the message was pushed successfully
457        match result {
458            // RabbitMQ received the message
459            Ok(future_confirm) => {
460                // Good hit
461                Ok(Transmitted {
462                    dispatch,
463                    future_confirm,
464                })
465            }
466
467            // RabbitMQ did not receive the message (likely a connectivity issue)
468            Err(error) => {
469                error!(
470                    alert = true,
471                    publisher = self.name.as_ref(),
472                    ?error,
473                    error_message = %error,
474                    byte_preview = String::from_utf8_lossy(dispatch.bytes()).as_ref(),
475                    "Failed to publish a message to RabbitMQ (did not transmit over network)",
476                );
477                Err(NotTransmitted::TransmissionError(dispatch, error))
478            }
479        }
480    }
481
482    /// Encapsulates obtaining a channel either from under the lock, or by
483    /// fetching a fresh one.
484    async fn grab_channel(&self) -> (MutexGuard<'_, Option<Channel>>, Channel) {
485        // Obtain the channel guard
486        let mut channel_guard = self.channel.lock().await;
487
488        // Either take the channel or fetch a fresh one
489        let channel = match channel_guard.take() {
490            Some(channel) => channel,
491            None => self.fetch_channel().await,
492        };
493
494        // Return the pair
495        (channel_guard, channel)
496    }
497
498    /// Fetches a fresh channel. If the egress definition requires publisher
499    /// confirms, this method will call the appropriate method on the channel
500    /// before returning it.
501    ///
502    /// Fetching of a fresh channel may take a long time (depends on connectivity
503    /// to RabbitMQ), but when the channel is returned, it is generally in a
504    /// healthy state.
505    async fn fetch_channel(&self) -> Channel {
506        // Repeat until we manage to both retrieve and configure the channel
507        loop {
508            // Retrieve a channel
509            let channel = self.gateway.channel().await;
510
511            // Check if publisher confirms are required on the channel
512            if self.egress.requires_any_confirmation() {
513                // Enable publisher confirms
514                let result = channel
515                    .confirm_select(ConfirmSelectOptions { nowait: false })
516                    .await;
517
518                // Check the result
519                if let Err(error) = result {
520                    // Report
521                    error!(
522                        alert = true,
523                        publisher = self.name.as_ref(),
524                        ?error,
525                        error_message = %error,
526                        "Failed to enable publisher confirms on a RabbitMQ channel",
527                    );
528
529                    // Try again with a different channel
530                    continue;
531                }
532            }
533
534            return channel;
535        }
536    }
537}