1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use lapin::options::ConfirmSelectOptions;
use lapin::{Channel, Connection, ConnectionProperties};
use tokio::time::{Duration, sleep, timeout};
use tokio_util::sync::CancellationToken;
use crate::SHUTDOWN_GRACE;
use crate::backends::rabbitmq::map_lapin_error;
use crate::error::{Result, ShoveError};
use crate::metrics;
use crate::retry::Backoff;
/// RabbitMQ connection configuration.
#[derive(Clone)]
pub struct RabbitMqConfig {
/// AMQP connection string (e.g., "amqp://guest:guest@localhost:5672/%2f")
pub uri: String,
/// Optional management-API config. When set, the broker's
/// `queue_stats_provider()` returns a working `ManagementClient` and
/// `RabbitMqAutoscalerBackend` can poll real backlog. When `None`,
/// `snapshot` returns a clear "management not configured" error.
pub management: Option<super::management::ManagementConfig>,
}
impl Debug for RabbitMqConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let redacted_uri = if let Ok(mut url) = url::Url::parse(&self.uri) {
url.set_password(None).ok();
url.to_string()
} else {
"<unparseable>".to_string()
};
f.debug_struct("RabbitMqConfig")
.field("uri", &redacted_uri)
.field(
"management",
if self.management.is_some() {
&"<configured>"
} else {
&"<none>"
},
)
.finish()
}
}
impl RabbitMqConfig {
pub fn new(uri: impl Into<String>) -> Self {
Self {
uri: uri.into(),
management: None,
}
}
/// AMQP connection URI this config was built with.
pub fn uri(&self) -> &str {
&self.uri
}
/// Attach management-API credentials.
///
/// Required for `Broker<RabbitMq>::queue_stats_provider().snapshot(...)`
/// to return real data, and for `RabbitMqAutoscalerBackend` to poll
/// queue depth. Without management, those code paths surface a
/// `Topology` error directing the caller to set this.
pub fn with_management(mut self, management: super::management::ManagementConfig) -> Self {
self.management = Some(management);
self
}
/// Borrow the configured management-API credentials, if any.
pub fn management(&self) -> Option<&super::management::ManagementConfig> {
self.management.as_ref()
}
}
impl Default for RabbitMqConfig {
/// Default RabbitMQ endpoint for local development.
fn default() -> Self {
Self::new("amqp://guest:guest@localhost:5672")
}
}
/// Returns true for lapin errors that indicate the underlying AMQP connection
/// is permanently dead and the only recovery is to dial a new one. Matches
/// hard AMQP errors (codes 3xx/5xx that close the connection, such as
/// `CONNECTION-FORCED`, `FRAME-ERROR`, `COMMAND-INVALID`) as well as TCP/IO
/// failures and heartbeat timeouts. Channel-level soft errors (closed channel,
/// channels limit) are excluded — those don't mean the connection itself is
/// gone.
fn is_connection_dead(e: &lapin::Error) -> bool {
e.is_amqp_hard_error()
|| matches!(
e.kind(),
lapin::ErrorKind::InvalidConnectionState(_)
| lapin::ErrorKind::IOError(_)
| lapin::ErrorKind::MissingHeartbeatError
)
}
/// RabbitMQ client with connection management and graceful shutdown.
///
/// Internally wraps an `ArcSwap<Connection>` so the underlying AMQP connection
/// can be replaced after a broker disconnect without invalidating outstanding
/// `RabbitMqClient` clones, publishers, or consumers.
#[derive(Clone)]
pub struct RabbitMqClient {
inner: Arc<ClientInner>,
}
struct ClientInner {
/// Current connection. Read via `load_full()` to snapshot, replaced via
/// `store()` after a successful reconnect.
connection: arc_swap::ArcSwap<Connection>,
/// Stored so `reconnect()` can dial a fresh `Connection` with the same URI.
config: RabbitMqConfig,
/// Single-flight guard so concurrent failures don't dial-storm the broker.
/// Held across the async dial.
reconnect_lock: tokio::sync::Mutex<()>,
shutdown_token: CancellationToken,
}
impl RabbitMqClient {
/// Establish a connection to RabbitMQ using the provided configuration.
///
/// The connection is named `shove-rs-{pid}` and a fresh [`CancellationToken`]
/// is created to coordinate shutdown across clones of this client.
pub async fn connect(config: &RabbitMqConfig) -> Result<Self> {
let connection = Self::dial(config).await?;
Ok(Self {
inner: Arc::new(ClientInner {
connection: arc_swap::ArcSwap::from_pointee(connection),
config: config.clone(),
reconnect_lock: tokio::sync::Mutex::new(()),
shutdown_token: CancellationToken::new(),
}),
})
}
/// Like [`connect`](Self::connect), but retries up to `max_attempts` times
/// with exponential backoff on connection failure.
///
/// Useful for services that start alongside their broker (e.g. in Docker
/// Compose or Kubernetes) where the broker may not be ready immediately.
pub async fn connect_with_retry(config: &RabbitMqConfig, max_attempts: u32) -> Result<Self> {
let mut backoff = Backoff::default();
let mut last_err = None;
for attempt in 0..max_attempts {
match Self::connect(config).await {
Ok(client) => return Ok(client),
Err(e) => {
if attempt + 1 < max_attempts {
let delay = backoff.next().expect("backoff is infinite");
tracing::warn!(
attempt = attempt + 1,
max_attempts,
error = %e,
"RabbitMQ connection failed, retrying in {delay:?}"
);
tokio::time::sleep(delay).await;
}
last_err = Some(e);
}
}
}
Err(last_err.expect("loop ran at least once"))
}
/// Dial an independent sibling `RabbitMqClient` that reuses this client's
/// stored [`RabbitMqConfig`] but owns its own `Connection`, its own
/// `reconnect_lock`, and shares this client's `shutdown_token` so the
/// parent's shutdown propagates.
///
/// Used by `RabbitMqConsumerGroup` to fan a single user-supplied client
/// out into a small pool when `max_consumers > 50` — lapin serialises
/// all channels of one connection through one reader task, so multiple
/// connections are needed to scale past ~50 high-throughput channels.
///
/// Returns [`ShoveError::Connection`] if the new dial fails.
pub async fn dial_sibling(&self) -> Result<Self> {
let connection = Self::dial(&self.inner.config).await?;
Ok(Self {
inner: Arc::new(ClientInner {
connection: arc_swap::ArcSwap::from_pointee(connection),
config: self.inner.config.clone(),
reconnect_lock: tokio::sync::Mutex::new(()),
shutdown_token: self.inner.shutdown_token.clone(),
}),
})
}
/// Dial a fresh `Connection` using the stored config. Used by `connect`
/// and by `reconnect` after a broker disconnect.
///
/// Times out after 5 seconds so that a broker that accepts the TCP
/// connection but never completes the AMQP handshake (e.g. during
/// `stop_app`) does not block the caller indefinitely.
async fn dial(config: &RabbitMqConfig) -> Result<Connection> {
let pid = std::process::id();
let connection_name = format!("shove-rs-{pid}");
let properties =
ConnectionProperties::default().with_connection_name(connection_name.into());
timeout(
Duration::from_secs(5),
Connection::connect(&config.uri, properties),
)
.await
.map_err(|_| ShoveError::Connection("timed out connecting to RabbitMQ".into()))?
.map_err(|e| map_lapin_error("failed to connect to RabbitMQ", e))
}
/// Snapshot the current connection. The returned `Arc<Connection>` may
/// be replaced under us at any point; this is fine — operations on the
/// snapshot will simply fail with a connection-class error and the
/// caller can trigger a retry via `with_reconnect`.
fn snapshot(&self) -> Arc<Connection> {
self.inner.connection.load_full()
}
/// Run `op` against a snapshot of the current connection. If it fails with
/// a connection-class error, dial a fresh `Connection`, swap it in via the
/// single-flight reconnect path, and retry `op` exactly once against the
/// new snapshot. All other lapin errors are mapped and returned directly.
async fn with_reconnect<F, Fut, T>(&self, op_name: &'static str, op: F) -> Result<T>
where
F: Fn(Arc<Connection>) -> Fut,
Fut: Future<Output = std::result::Result<T, lapin::Error>>,
{
if self.inner.shutdown_token.is_cancelled() {
metrics::record_backend_error(
metrics::BackendLabel::RabbitMq,
metrics::BackendErrorKind::Connection,
);
return Err(ShoveError::Connection(format!(
"cannot {op_name}: client is shutting down"
)));
}
let observed = self.snapshot();
match op(observed.clone()).await {
Ok(v) => Ok(v),
Err(e) if is_connection_dead(&e) => {
tracing::warn!(error = %e, op = op_name, "RabbitMQ connection appears dead, reconnecting");
self.reconnect(&observed).await?;
let fresh = self.snapshot();
op(fresh)
.await
.map_err(|e| map_lapin_error(&format!("{op_name} failed after reconnect"), e))
}
Err(e) => Err(map_lapin_error(&format!("{op_name} failed"), e)),
}
}
/// Dial a fresh `Connection` and swap it into `inner.connection`, but only
/// if the current connection is still the one the caller observed failing.
/// Single-flight via `reconnect_lock` so concurrent failures don't dial-storm.
async fn reconnect(&self, observed: &Arc<Connection>) -> Result<()> {
if self.inner.shutdown_token.is_cancelled() {
return Err(ShoveError::Connection(
"cannot reconnect: client is shutting down".into(),
));
}
let _guard = self.inner.reconnect_lock.lock().await;
// Re-check after taking the lock — another caller may already have
// swapped in a fresh connection while we were waiting.
let current = self.inner.connection.load_full();
if !Arc::ptr_eq(¤t, observed) {
return Ok(());
}
let new_conn = Self::dial(&self.inner.config).await?;
self.inner.connection.store(Arc::new(new_conn));
tracing::info!("RabbitMQ connection re-established");
Ok(())
}
/// Open a basic channel on the underlying connection.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested
/// or if the channel cannot be created.
pub async fn create_channel(&self) -> Result<Channel> {
self.with_reconnect("create channel", |conn| async move {
conn.create_channel().await
})
.await
}
/// Open a channel with publisher confirms enabled.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested,
/// if the channel cannot be created, or if confirms cannot be enabled.
pub async fn create_confirm_channel(&self) -> Result<Channel> {
self.with_reconnect("create confirm channel", |conn| async move {
let channel = conn.create_channel().await?;
channel
.confirm_select(ConfirmSelectOptions::default())
.await?;
Ok(channel)
})
.await
}
/// Open a channel with AMQP transaction mode enabled (`tx_select`).
///
/// Used by consumers with [`ConsumerOptions::with_exactly_once`] to make
/// publish-to-hold-queue and ack/nack of the original delivery atomic.
/// Transaction mode is mutually exclusive with publisher confirms — do not
/// mix with [`create_confirm_channel`](Self::create_confirm_channel) on the
/// same connection.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested,
/// if the channel cannot be created, or if `tx_select` cannot be enabled.
#[cfg(feature = "rabbitmq-transactional")]
pub async fn create_tx_channel(&self) -> Result<Channel> {
self.with_reconnect("create tx channel", |conn| async move {
let channel = conn.create_channel().await?;
channel.tx_select().await?;
Ok(channel)
})
.await
}
/// Return a clone of the shutdown [`CancellationToken`].
///
/// Callers can use this token to coordinate their own teardown with the
/// client's shutdown sequence.
pub fn shutdown_token(&self) -> CancellationToken {
self.inner.shutdown_token.clone()
}
/// Snapshot the management-API credentials this client was constructed
/// with, if any. Returned by value so the caller can move it into a
/// `ManagementClient`.
pub fn management_config(&self) -> Option<super::management::ManagementConfig> {
self.inner.config.management.clone()
}
/// Return `true` if the underlying AMQP connection is still open.
pub fn is_connected(&self) -> bool {
self.snapshot().status().connected()
}
/// Liveness check. Opens a transient channel and closes it immediately.
///
/// Both operations require a real AMQP round-trip, proving the connection
/// is alive. `create_channel` already checks the shutdown token and
/// triggers a single reconnect on dead-connection errors — that behavior
/// is desirable for a liveness probe (broker reachable, our cached
/// connection went stale).
pub async fn ping(&self, timeout: Duration) -> Result<()> {
let fut = async {
let chan = self.create_channel().await?;
chan.close(200, "ping".into()).await.map_err(|e| {
ShoveError::Connection(format!("rabbitmq ping channel close failed: {e}"))
})?;
Ok::<(), ShoveError>(())
};
tokio::time::timeout(timeout, fut).await.map_err(|_| {
ShoveError::Connection(format!("rabbitmq ping timed out after {timeout:?}"))
})?
}
/// Initiate a graceful shutdown.
///
/// Cancels the shutdown token so that dependent tasks can begin winding
/// down, waits for [`SHUTDOWN_GRACE`] to allow in-flight operations to
/// complete, and then closes the underlying AMQP connection.
pub async fn shutdown(&self) {
self.inner.shutdown_token.cancel();
sleep(SHUTDOWN_GRACE).await;
if let Err(e) = self.snapshot().close(0, "shutdown".into()).await {
tracing::warn!("error while closing RabbitMQ connection: {e}");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use lapin::ErrorKind;
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError, AMQPSoftError};
#[test]
fn config_debug_redacts_password_only() {
let config = RabbitMqConfig::new("amqp://admin:s3cret!@localhost:5672/%2F");
let debug_output = format!("{config:?}");
assert!(!debug_output.contains("s3cret!"));
assert!(debug_output.contains("admin@localhost"));
}
#[test]
fn config_debug_no_creds_remains_clear() {
let config = RabbitMqConfig::new("amqp://localhost:5672/%2F");
let debug_output = format!("{config:?}");
assert!(debug_output.contains("amqp://localhost:5672/%2F"));
}
#[test]
fn config_new_stores_uri() {
let config = RabbitMqConfig::new("amqp://host:1234/%2F");
assert_eq!(config.uri, "amqp://host:1234/%2F");
}
#[test]
fn default_config_is_localhost() {
let cfg = RabbitMqConfig::default();
assert!(cfg.uri().contains("localhost:5672"));
}
#[test]
fn invalid_connection_state_is_dead() {
let err = lapin::Error::from(ErrorKind::InvalidConnectionState(
lapin::ConnectionState::Closed,
));
assert!(is_connection_dead(&err));
}
#[test]
fn missing_heartbeat_is_dead() {
let err = lapin::Error::from(ErrorKind::MissingHeartbeatError);
assert!(is_connection_dead(&err));
}
#[test]
fn invalid_channel_state_is_not_connection_dead() {
let err = lapin::Error::from(ErrorKind::InvalidChannelState(
lapin::ChannelState::Closed,
"test",
));
assert!(!is_connection_dead(&err));
}
#[test]
fn channels_limit_is_not_connection_dead() {
let err = lapin::Error::from(ErrorKind::ChannelsLimitReached);
assert!(!is_connection_dead(&err));
}
#[test]
fn io_error_is_dead() {
let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "broken pipe");
let err = lapin::Error::from(ErrorKind::IOError(Arc::new(io_err)));
assert!(is_connection_dead(&err));
}
#[test]
fn amqp_hard_error_is_dead() {
// CONNECTION_FORCED (320) is a hard error — broker initiated the close.
let amqp_err = AMQPError::new(
AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
"broker closed".into(),
);
let err = lapin::Error::from(ErrorKind::ProtocolError(amqp_err));
assert!(is_connection_dead(&err));
}
#[test]
fn amqp_soft_error_is_not_connection_dead() {
// ACCESS_REFUSED (403) is a soft (channel-class) error — connection lives.
let amqp_err = AMQPError::new(
AMQPErrorKind::Soft(AMQPSoftError::ACCESSREFUSED),
"denied".into(),
);
let err = lapin::Error::from(ErrorKind::ProtocolError(amqp_err));
assert!(!is_connection_dead(&err));
}
}