strut_rabbitmq/
connector.rs

1use crate::Handle;
2use futures::stream::FuturesUnordered;
3use futures::StreamExt;
4use lapin::{Channel, Connection, ConnectionProperties, Error as LapinError};
5use secure_string::SecureString;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use strut_core::{AppContext, AppSpindown, AppSpindownToken};
10use strut_sync::{Conduit, Retriever};
11use strut_util::Backoff;
12use thiserror::Error;
13use tokio::select;
14use tokio::sync::{oneshot, Mutex as AsyncMutex};
15use tokio::task::JoinHandle;
16use tracing::{info, warn};
17
18/// Runs in the background, maintains no more than one active connection to a
19/// RabbitMQ cluster (referred to herein as **current connection**) identified
20/// by the given [`Handle`]. Exposes a cheaply clone-able [`Gateway`], which
21/// any number of asynchronous tasks can use to retrieve a fresh [`Channel`]
22/// created in the current connection.
23///
24/// Fully encapsulates reconnection and clean-up logic. Reconnection is
25/// triggered whenever a channel is requested and this connector is unable to
26/// produce it (likely, because there is no connectivity to the RabbitMQ
27/// cluster). Reconnections are performed with an exponential backoff strategy.
28/// All connections are properly closed in the background before discarding.
29///
30/// The clients should keep their copy of [`Gateway`] and re-use it to request
31/// a new [`Channel`] whenever the previous channel seems to be no longer
32/// working (e.g., the underlying connection was lost). The clients should
33/// expect that the gateway may take a long or even indefinite time, depending
34/// on the RabbitMQ cluster availability.
35///
36/// This connector is integrated with [`AppSpindown`]: once the global
37/// [`AppContext`] is terminated, this connector will stop serving channels and
38/// will attempt to gracefully close the current connection.
39pub struct Connector {
40    /// The globally unique name of this connector, for logging/debugging
41    /// purposes.
42    name: Arc<str>,
43    /// The identifier of this connector’s [`Handle`], for logging/debugging
44    /// purposes.
45    identifier: Arc<str>,
46    /// The DSN of the RabbitMQ cluster, to which this connector connects.
47    dsn: SecureString,
48    /// The current [`Connection`] to the RabbitMQ cluster, if present.
49    connection: AsyncMutex<Option<Connection>>,
50    /// The collection of previous connections that are being closed in the
51    /// background.
52    discarded_connections: AsyncMutex<FuturesUnordered<JoinHandle<()>>>,
53    /// The counter for keeping track how many times we discarded a connection.
54    discarded_count: AtomicUsize,
55    /// The backoff algorithm to be used in repeated connection attempts.
56    backoff: Backoff,
57    /// The conduit for receiving [`Channel`] requests.
58    conduit: Conduit<Channel>,
59    /// The canary token, which (once it goes out of scope) will inform the
60    /// application that this connector gracefully completed.
61    _spindown_token: AppSpindownToken,
62}
63
64/// An asynchronous gateway to creating and retrieving fresh [`Channel`]s on an
65/// internally maintained [`Connection`].
66///
67/// A gateway is created by [starting](Connector::start) a [`Connector`].
68pub struct Gateway {
69    retriever: Retriever<Channel>,
70}
71
72impl Connector {
73    /// Creates a new [`Connector`] for the given [`Handle`] and sends it into
74    /// background to lazily serve [`Channel`] requests via the returned
75    /// [`Gateway`], which can be cheaply cloned and shared across
76    /// asynchronous tasks.
77    pub fn start(handle: impl AsRef<Handle>) -> Gateway {
78        let handle = handle.as_ref();
79        let name = Self::compose_name(handle);
80        let identifier = Arc::from(handle.identifier());
81        let dsn = handle.dsn().clone();
82        let connection = AsyncMutex::new(None);
83        let discarded_connections = AsyncMutex::new(FuturesUnordered::new());
84        let discarded_count = AtomicUsize::new(0);
85        let backoff = Backoff::new(handle.backoff());
86        let conduit = Conduit::new();
87        let retriever = conduit.retriever();
88        let _spindown_token = AppSpindown::register(&name);
89
90        let connector = Self {
91            name,
92            identifier,
93            dsn,
94            connection,
95            discarded_connections,
96            discarded_count,
97            backoff,
98            conduit,
99            _spindown_token,
100        };
101
102        tokio::spawn(connector.serve());
103
104        Gateway { retriever }
105    }
106
107    /// Composes a human-readable name for this connector.
108    fn compose_name(handle: &Handle) -> Arc<str> {
109        static COUNTER: AtomicUsize = AtomicUsize::new(0);
110
111        Arc::from(format!(
112            "rabbitmq:connector:{}:{}",
113            handle.name(),
114            COUNTER.fetch_add(1, Ordering::Relaxed),
115        ))
116    }
117}
118
119impl Connector {
120    /// Main, long-running serving function that serves the incoming [`Channel`]
121    /// requests until it hears that the global [`AppContext`] has been
122    /// terminated. After that it falls into the spindown phase, where it cleans
123    /// up before returning.
124    async fn serve(self) {
125        // Listen to incoming requests and serve them in an infinite loop
126        loop {
127            // Repeatedly wait for either the application context to terminate,
128            // or for an incoming request.
129            let state = select! {
130                biased;
131                _ = AppContext::terminated() => ServingState::Interrupted,
132                request = self.conduit.requested() => { // request received
133                    // Serving an incoming request is also an asynchronous operation,
134                    // so we have to monitor the application context here as well.
135                    select! {
136                        biased;
137                        _ = AppContext::terminated() => ServingState::Interrupted,
138                        state = self.receive_request(request) => state,
139                    }
140                }
141            };
142
143            // Check whether we can proceed or should break out
144            match state {
145                ServingState::Ongoing => continue,
146                ServingState::Interrupted => break,
147            }
148        }
149
150        // Announce spindown
151        info!(
152            name = self.name.as_ref(),
153            identifier = self.identifier.as_ref(),
154            "Closing the RabbitMQ connection",
155        );
156
157        // Disconnect from RabbitMQ
158        self.disconnect().await;
159
160        // Wait for all previously discarded connections to be closed before returning
161        self.drain_discarded_connections().await;
162    }
163}
164
165impl Gateway {
166    /// Asynchronously requests the linked [`Connector`] to create a fresh
167    /// [`Channel`] on its internally maintained [`Connection`] and return said
168    /// channel when ready.
169    ///
170    /// Depending on the connectivity to RabbitMQ this method may take
171    /// arbitrarily long to return. Use the
172    /// [`channel_with_timeout`](Gateway::channel_with_timeout) method to limit
173    /// the waiting time.
174    pub async fn channel(&self) -> Channel {
175        self.retriever.anticipate().await
176    }
177
178    /// Same as the [`channel`](Gateway::channel) method, but returns [`None`]
179    /// if waiting for the [`Channel`] exceeds the given `timeout`.
180    pub async fn channel_with_timeout(&self, timeout: Duration) -> Option<Channel> {
181        self.retriever.request_with_timeout(timeout).await
182    }
183}
184
185/// Internal marker that indicates the state of this connector.
186enum ServingState {
187    Ongoing,
188    Interrupted,
189}
190
191impl Connector {
192    /// Serves a single incoming request asynchronously.
193    async fn receive_request(&self, request: oneshot::Sender<Channel>) -> ServingState {
194        // Retrieve the channel, which may take any amount of time depending on connectivity
195        let channel = self.anticipate_channel().await;
196
197        // Send the channel back to the requester
198        let result = request.send(channel);
199
200        // Check the result
201        if result.is_err() {
202            // An error likely indicates that the requester didn’t wait long enough for the result
203            warn!(
204                name = self.name.as_ref(),
205                identifier = self.identifier.as_ref(),
206                "Too late to send the requested RabbitMQ channel",
207            );
208        }
209
210        // Error or not, we continue serving other requests
211        ServingState::Ongoing
212    }
213
214    /// Takes and discards the current connection to RabbitMQ, if any.
215    async fn disconnect(&self) {
216        // Grab the connection
217        let mut connection_guard = self.connection.lock().await;
218        let optional_connection = connection_guard.take();
219
220        // Discard the connection
221        if let Some(connection) = optional_connection {
222            self.discard_connection(connection).await;
223        }
224    }
225
226    /// Sequentially waits for and pops off all futures that are busy closing
227    /// discarded connections in the background. Returns when the collection of
228    /// futures is empty.
229    ///
230    /// The idea of this method is to be called periodically during repeated
231    /// reconnection attempts, in order to avoid infinitely accumulating clean-up
232    /// futures.
233    async fn drain_discarded_connections(&self) {
234        let mut discarded_connections = self.discarded_connections.lock().await;
235
236        while discarded_connections.next().await.is_some() {}
237    }
238}
239
240impl Connector {
241    /// Repeatedly attempts to create a channel out of an active connection,
242    /// infinitely re-connects if necessary (with a backoff strategy), returns a
243    /// channel upon first success.
244    async fn anticipate_channel(&self) -> Channel {
245        // Grab the connection
246        let mut connection_guard = self.connection.lock().await;
247        let mut optional_connection = connection_guard.take();
248
249        // Repeat until success
250        loop {
251            // Try to make a channel on the given connection
252            match self.try_create_channel(optional_connection).await {
253                // Success: we have a good connection and a fresh channel
254                Ok(CreatedChannel {
255                    connection,
256                    channel,
257                }) => {
258                    // Put the connection back under lock
259                    *connection_guard = Some(connection);
260
261                    // Return the fresh channel
262                    return channel;
263                }
264
265                // Error: either we didn’t have a connection to begin with, or it has gone bad
266                Err(_) => {
267                    // Attempt to establish a fresh connection before continuing
268                    optional_connection = self.establish_connection().await;
269                }
270            };
271        }
272    }
273
274    /// Tries to create a connection with the given channel. If successful,
275    /// returns both the channel and the connection. If not successful, performs
276    /// reporting and clean-up.
277    async fn try_create_channel(
278        &self,
279        optional_connection: Option<Connection>,
280    ) -> Result<CreatedChannel, ConnectorError> {
281        // Unwrap the connection
282        let connection = match optional_connection {
283            Some(connection) => connection,
284            None => return Err(ConnectorError::NoConnection),
285        };
286
287        // Try to create a channel
288        let channel_result = connection.create_channel().await;
289
290        // Inspect the result
291        match channel_result {
292            // Failed to create a channel
293            Err(error) => {
294                // Log the connection error
295                warn!(
296                    name = self.name.as_ref(),
297                    identifier = self.identifier.as_ref(),
298                    ?error,
299                    error_message = %error,
300                    "Failed to create a RabbitMQ channel",
301                );
302
303                // Discard the obviously bad connection
304                self.discard_connection(connection).await;
305
306                // Wait a bit
307                self.backoff.sleep_next().await;
308
309                Err(ConnectorError::ChannelCreationError)
310            }
311
312            // Successfully created a channel
313            Ok(channel) => {
314                // Reset backoff
315                self.backoff.reset();
316
317                Ok(CreatedChannel {
318                    channel,
319                    connection,
320                })
321            }
322        }
323    }
324
325    /// Attempts to establish a fresh connection to the RabbitMQ cluster behind
326    /// this connector’s [`Handle`].
327    async fn establish_connection(&self) -> Option<Connection> {
328        // Set up the connection properties to use the current Tokio context
329        let connection_properties = ConnectionProperties::default()
330            .with_executor(tokio_executor_trait::Tokio::current())
331            .with_reactor(tokio_reactor_trait::Tokio);
332
333        // Establish a connection
334        let connection_result =
335            Connection::connect(self.dsn.unsecure(), connection_properties).await;
336
337        // Check the result
338        match connection_result {
339            // Success: return the connection
340            Ok(connection) => Some(connection),
341
342            // Error: likely no connectivity with RabbitMQ
343            Err(error) => {
344                // Log the connection error
345                warn!(
346                    name = self.name.as_ref(),
347                    identifier = self.identifier.as_ref(),
348                    ?error,
349                    error_message = %error,
350                    "Failed to establish a RabbitMQ connection",
351                );
352
353                // Wait a bit
354                self.backoff.sleep_next().await;
355
356                // Return
357                None
358            }
359        }
360    }
361
362    /// Initiates discarding of the given channel. Every once in a while, this
363    /// method will also drain discarded connections, so they don’t accumulate
364    /// indefinitely.
365    async fn discard_connection(&self, connection: Connection) {
366        // Connection existed: create a clean-up future, then send it to the background
367        let future = Self::close_connection(self.name.clone(), self.identifier.clone(), connection);
368        let handle = tokio::spawn(future);
369
370        // Grab the collection of discarded connections; push a new one onto it
371        self.discarded_connections.lock().await.push(handle);
372
373        // Periodically drain discarded connections
374        const DISCARDED_COUNT_BETWEEN_CLEANUPS: usize = 10;
375        let discarded_count = self.discarded_count.fetch_add(1, Ordering::Relaxed);
376        if discarded_count % DISCARDED_COUNT_BETWEEN_CLEANUPS == 0 {
377            self.drain_discarded_connections().await;
378        }
379    }
380
381    /// Works on closing the given connection, uses the given name and identifier
382    /// for logging the outcome.
383    async fn close_connection(name: Arc<str>, identifier: Arc<str>, connection: Connection) {
384        // Close the given connection
385        let result = connection.close(0, "Discarded connection").await;
386
387        // Check and report the outcome
388        match result {
389            Ok(_) => info!(
390                name = name.as_ref(),
391                identifier = identifier.as_ref(),
392                "Closed a discarded RabbitMQ connection",
393            ),
394            Err(LapinError::InvalidConnectionState(_)) => info!(
395                name = name.as_ref(),
396                identifier = identifier.as_ref(),
397                "Discarded a previously lost RabbitMQ connection",
398            ),
399            Err(LapinError::InvalidChannelState(state)) => info!(
400                name = name.as_ref(),
401                identifier = identifier.as_ref(),
402                "Ignored a channel in the invalid state '{:?}' within a discarded RabbitMQ connection",
403                state,
404            ),
405            Err(error) => warn!(
406                name = name.as_ref(),
407                identifier = identifier.as_ref(),
408                ?error,
409                error_message = %error,
410                "Failed to cleanly close a discarded RabbitMQ connection",
411            ),
412        }
413    }
414}
415
416/// A little wrapper to conveniently pass both the fresh channel and the connection
417/// from which it originated.
418struct CreatedChannel {
419    connection: Connection,
420    channel: Channel,
421}
422
423/// Internal error representing the reasons while creating a channel may fail.
424#[derive(Error, Debug)]
425enum ConnectorError {
426    #[error("failed to create a channel: no connection provided")]
427    NoConnection,
428    #[error("failed to create a channel on the given connection")]
429    ChannelCreationError,
430}