strut_rabbitmq/connector.rs
1use crate::Handle;
2use futures::stream::FuturesUnordered;
3use futures::StreamExt;
4use lapin::{Channel, Connection, ConnectionProperties, Error as LapinError};
5use secure_string::SecureString;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use strut_core::{AppContext, AppSpindown, AppSpindownToken};
10use strut_sync::{Conduit, Retriever};
11use strut_util::Backoff;
12use thiserror::Error;
13use tokio::select;
14use tokio::sync::{oneshot, Mutex as AsyncMutex};
15use tokio::task::JoinHandle;
16use tracing::{info, warn};
17
18/// Runs in the background, maintains no more than one active connection to a
19/// RabbitMQ cluster (referred to herein as **current connection**) identified
20/// by the given [`Handle`]. Exposes a cheaply clone-able [`Gateway`], which
21/// any number of asynchronous tasks can use to retrieve a fresh [`Channel`]
22/// created in the current connection.
23///
24/// Fully encapsulates reconnection and clean-up logic. Reconnection is
25/// triggered whenever a channel is requested and this connector is unable to
26/// produce it (likely, because there is no connectivity to the RabbitMQ
27/// cluster). Reconnections are performed with an exponential backoff strategy.
28/// All connections are properly closed in the background before discarding.
29///
30/// The clients should keep their copy of [`Gateway`] and re-use it to request
31/// a new [`Channel`] whenever the previous channel seems to be no longer
32/// working (e.g., the underlying connection was lost). The clients should
33/// expect that the gateway may take a long or even indefinite time, depending
34/// on the RabbitMQ cluster availability.
35///
36/// This connector is integrated with [`AppSpindown`]: once the global
37/// [`AppContext`] is terminated, this connector will stop serving channels and
38/// will attempt to gracefully close the current connection.
39pub struct Connector {
40 /// The globally unique name of this connector, for logging/debugging
41 /// purposes.
42 name: Arc<str>,
43 /// The identifier of this connector’s [`Handle`], for logging/debugging
44 /// purposes.
45 identifier: Arc<str>,
46 /// The DSN of the RabbitMQ cluster, to which this connector connects.
47 dsn: SecureString,
48 /// The current [`Connection`] to the RabbitMQ cluster, if present.
49 connection: AsyncMutex<Option<Connection>>,
50 /// The collection of previous connections that are being closed in the
51 /// background.
52 discarded_connections: AsyncMutex<FuturesUnordered<JoinHandle<()>>>,
53 /// The counter for keeping track how many times we discarded a connection.
54 discarded_count: AtomicUsize,
55 /// The backoff algorithm to be used in repeated connection attempts.
56 backoff: Backoff,
57 /// The conduit for receiving [`Channel`] requests.
58 conduit: Conduit<Channel>,
59 /// The canary token, which (once it goes out of scope) will inform the
60 /// application that this connector gracefully completed.
61 _spindown_token: AppSpindownToken,
62}
63
64/// An asynchronous gateway to creating and retrieving fresh [`Channel`]s on an
65/// internally maintained [`Connection`].
66///
67/// A gateway is created by [starting](Connector::start) a [`Connector`].
68pub struct Gateway {
69 retriever: Retriever<Channel>,
70}
71
72impl Connector {
73 /// Creates a new [`Connector`] for the given [`Handle`] and sends it into
74 /// background to lazily serve [`Channel`] requests via the returned
75 /// [`Gateway`], which can be cheaply cloned and shared across
76 /// asynchronous tasks.
77 pub fn start(handle: impl AsRef<Handle>) -> Gateway {
78 let handle = handle.as_ref();
79 let name = Self::compose_name(handle);
80 let identifier = Arc::from(handle.identifier());
81 let dsn = handle.dsn().clone();
82 let connection = AsyncMutex::new(None);
83 let discarded_connections = AsyncMutex::new(FuturesUnordered::new());
84 let discarded_count = AtomicUsize::new(0);
85 let backoff = Backoff::new(handle.backoff());
86 let conduit = Conduit::new();
87 let retriever = conduit.retriever();
88 let _spindown_token = AppSpindown::register(&name);
89
90 let connector = Self {
91 name,
92 identifier,
93 dsn,
94 connection,
95 discarded_connections,
96 discarded_count,
97 backoff,
98 conduit,
99 _spindown_token,
100 };
101
102 tokio::spawn(connector.serve());
103
104 Gateway { retriever }
105 }
106
107 /// Composes a human-readable name for this connector.
108 fn compose_name(handle: &Handle) -> Arc<str> {
109 static COUNTER: AtomicUsize = AtomicUsize::new(0);
110
111 Arc::from(format!(
112 "rabbitmq:connector:{}:{}",
113 handle.name(),
114 COUNTER.fetch_add(1, Ordering::Relaxed),
115 ))
116 }
117}
118
119impl Connector {
120 /// Main, long-running serving function that serves the incoming [`Channel`]
121 /// requests until it hears that the global [`AppContext`] has been
122 /// terminated. After that it falls into the spindown phase, where it cleans
123 /// up before returning.
124 async fn serve(self) {
125 // Listen to incoming requests and serve them in an infinite loop
126 loop {
127 // Repeatedly wait for either the application context to terminate,
128 // or for an incoming request.
129 let state = select! {
130 biased;
131 _ = AppContext::terminated() => ServingState::Interrupted,
132 request = self.conduit.requested() => { // request received
133 // Serving an incoming request is also an asynchronous operation,
134 // so we have to monitor the application context here as well.
135 select! {
136 biased;
137 _ = AppContext::terminated() => ServingState::Interrupted,
138 state = self.receive_request(request) => state,
139 }
140 }
141 };
142
143 // Check whether we can proceed or should break out
144 match state {
145 ServingState::Ongoing => continue,
146 ServingState::Interrupted => break,
147 }
148 }
149
150 // Announce spindown
151 info!(
152 name = self.name.as_ref(),
153 identifier = self.identifier.as_ref(),
154 "Closing the RabbitMQ connection",
155 );
156
157 // Disconnect from RabbitMQ
158 self.disconnect().await;
159
160 // Wait for all previously discarded connections to be closed before returning
161 self.drain_discarded_connections().await;
162 }
163}
164
165impl Gateway {
166 /// Asynchronously requests the linked [`Connector`] to create a fresh
167 /// [`Channel`] on its internally maintained [`Connection`] and return said
168 /// channel when ready.
169 ///
170 /// Depending on the connectivity to RabbitMQ this method may take
171 /// arbitrarily long to return. Use the
172 /// [`channel_with_timeout`](Gateway::channel_with_timeout) method to limit
173 /// the waiting time.
174 pub async fn channel(&self) -> Channel {
175 self.retriever.anticipate().await
176 }
177
178 /// Same as the [`channel`](Gateway::channel) method, but returns [`None`]
179 /// if waiting for the [`Channel`] exceeds the given `timeout`.
180 pub async fn channel_with_timeout(&self, timeout: Duration) -> Option<Channel> {
181 self.retriever.request_with_timeout(timeout).await
182 }
183}
184
185/// Internal marker that indicates the state of this connector.
186enum ServingState {
187 Ongoing,
188 Interrupted,
189}
190
191impl Connector {
192 /// Serves a single incoming request asynchronously.
193 async fn receive_request(&self, request: oneshot::Sender<Channel>) -> ServingState {
194 // Retrieve the channel, which may take any amount of time depending on connectivity
195 let channel = self.anticipate_channel().await;
196
197 // Send the channel back to the requester
198 let result = request.send(channel);
199
200 // Check the result
201 if result.is_err() {
202 // An error likely indicates that the requester didn’t wait long enough for the result
203 warn!(
204 name = self.name.as_ref(),
205 identifier = self.identifier.as_ref(),
206 "Too late to send the requested RabbitMQ channel",
207 );
208 }
209
210 // Error or not, we continue serving other requests
211 ServingState::Ongoing
212 }
213
214 /// Takes and discards the current connection to RabbitMQ, if any.
215 async fn disconnect(&self) {
216 // Grab the connection
217 let mut connection_guard = self.connection.lock().await;
218 let optional_connection = connection_guard.take();
219
220 // Discard the connection
221 if let Some(connection) = optional_connection {
222 self.discard_connection(connection).await;
223 }
224 }
225
226 /// Sequentially waits for and pops off all futures that are busy closing
227 /// discarded connections in the background. Returns when the collection of
228 /// futures is empty.
229 ///
230 /// The idea of this method is to be called periodically during repeated
231 /// reconnection attempts, in order to avoid infinitely accumulating clean-up
232 /// futures.
233 async fn drain_discarded_connections(&self) {
234 let mut discarded_connections = self.discarded_connections.lock().await;
235
236 while discarded_connections.next().await.is_some() {}
237 }
238}
239
240impl Connector {
241 /// Repeatedly attempts to create a channel out of an active connection,
242 /// infinitely re-connects if necessary (with a backoff strategy), returns a
243 /// channel upon first success.
244 async fn anticipate_channel(&self) -> Channel {
245 // Grab the connection
246 let mut connection_guard = self.connection.lock().await;
247 let mut optional_connection = connection_guard.take();
248
249 // Repeat until success
250 loop {
251 // Try to make a channel on the given connection
252 match self.try_create_channel(optional_connection).await {
253 // Success: we have a good connection and a fresh channel
254 Ok(CreatedChannel {
255 connection,
256 channel,
257 }) => {
258 // Put the connection back under lock
259 *connection_guard = Some(connection);
260
261 // Return the fresh channel
262 return channel;
263 }
264
265 // Error: either we didn’t have a connection to begin with, or it has gone bad
266 Err(_) => {
267 // Attempt to establish a fresh connection before continuing
268 optional_connection = self.establish_connection().await;
269 }
270 };
271 }
272 }
273
274 /// Tries to create a connection with the given channel. If successful,
275 /// returns both the channel and the connection. If not successful, performs
276 /// reporting and clean-up.
277 async fn try_create_channel(
278 &self,
279 optional_connection: Option<Connection>,
280 ) -> Result<CreatedChannel, ConnectorError> {
281 // Unwrap the connection
282 let connection = match optional_connection {
283 Some(connection) => connection,
284 None => return Err(ConnectorError::NoConnection),
285 };
286
287 // Try to create a channel
288 let channel_result = connection.create_channel().await;
289
290 // Inspect the result
291 match channel_result {
292 // Failed to create a channel
293 Err(error) => {
294 // Log the connection error
295 warn!(
296 name = self.name.as_ref(),
297 identifier = self.identifier.as_ref(),
298 ?error,
299 error_message = %error,
300 "Failed to create a RabbitMQ channel",
301 );
302
303 // Discard the obviously bad connection
304 self.discard_connection(connection).await;
305
306 // Wait a bit
307 self.backoff.sleep_next().await;
308
309 Err(ConnectorError::ChannelCreationError)
310 }
311
312 // Successfully created a channel
313 Ok(channel) => {
314 // Reset backoff
315 self.backoff.reset();
316
317 Ok(CreatedChannel {
318 channel,
319 connection,
320 })
321 }
322 }
323 }
324
325 /// Attempts to establish a fresh connection to the RabbitMQ cluster behind
326 /// this connector’s [`Handle`].
327 async fn establish_connection(&self) -> Option<Connection> {
328 // Set up the connection properties to use the current Tokio context
329 let connection_properties = ConnectionProperties::default()
330 .with_executor(tokio_executor_trait::Tokio::current())
331 .with_reactor(tokio_reactor_trait::Tokio);
332
333 // Establish a connection
334 let connection_result =
335 Connection::connect(self.dsn.unsecure(), connection_properties).await;
336
337 // Check the result
338 match connection_result {
339 // Success: return the connection
340 Ok(connection) => Some(connection),
341
342 // Error: likely no connectivity with RabbitMQ
343 Err(error) => {
344 // Log the connection error
345 warn!(
346 name = self.name.as_ref(),
347 identifier = self.identifier.as_ref(),
348 ?error,
349 error_message = %error,
350 "Failed to establish a RabbitMQ connection",
351 );
352
353 // Wait a bit
354 self.backoff.sleep_next().await;
355
356 // Return
357 None
358 }
359 }
360 }
361
362 /// Initiates discarding of the given channel. Every once in a while, this
363 /// method will also drain discarded connections, so they don’t accumulate
364 /// indefinitely.
365 async fn discard_connection(&self, connection: Connection) {
366 // Connection existed: create a clean-up future, then send it to the background
367 let future = Self::close_connection(self.name.clone(), self.identifier.clone(), connection);
368 let handle = tokio::spawn(future);
369
370 // Grab the collection of discarded connections; push a new one onto it
371 self.discarded_connections.lock().await.push(handle);
372
373 // Periodically drain discarded connections
374 const DISCARDED_COUNT_BETWEEN_CLEANUPS: usize = 10;
375 let discarded_count = self.discarded_count.fetch_add(1, Ordering::Relaxed);
376 if discarded_count % DISCARDED_COUNT_BETWEEN_CLEANUPS == 0 {
377 self.drain_discarded_connections().await;
378 }
379 }
380
381 /// Works on closing the given connection, uses the given name and identifier
382 /// for logging the outcome.
383 async fn close_connection(name: Arc<str>, identifier: Arc<str>, connection: Connection) {
384 // Close the given connection
385 let result = connection.close(0, "Discarded connection").await;
386
387 // Check and report the outcome
388 match result {
389 Ok(_) => info!(
390 name = name.as_ref(),
391 identifier = identifier.as_ref(),
392 "Closed a discarded RabbitMQ connection",
393 ),
394 Err(LapinError::InvalidConnectionState(_)) => info!(
395 name = name.as_ref(),
396 identifier = identifier.as_ref(),
397 "Discarded a previously lost RabbitMQ connection",
398 ),
399 Err(LapinError::InvalidChannelState(state)) => info!(
400 name = name.as_ref(),
401 identifier = identifier.as_ref(),
402 "Ignored a channel in the invalid state '{:?}' within a discarded RabbitMQ connection",
403 state,
404 ),
405 Err(error) => warn!(
406 name = name.as_ref(),
407 identifier = identifier.as_ref(),
408 ?error,
409 error_message = %error,
410 "Failed to cleanly close a discarded RabbitMQ connection",
411 ),
412 }
413 }
414}
415
416/// A little wrapper to conveniently pass both the fresh channel and the connection
417/// from which it originated.
418struct CreatedChannel {
419 connection: Connection,
420 channel: Channel,
421}
422
423/// Internal error representing the reasons while creating a channel may fail.
424#[derive(Error, Debug)]
425enum ConnectorError {
426 #[error("failed to create a channel: no connection provided")]
427 NoConnection,
428 #[error("failed to create a channel on the given connection")]
429 ChannelCreationError,
430}