1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use lapin::options::ConfirmSelectOptions;
use lapin::{Channel, Connection, ConnectionProperties};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use crate::SHUTDOWN_GRACE;
use crate::error::{Result, ShoveError};
/// RabbitMQ connection configuration.
#[derive(Clone)]
pub struct RabbitMqConfig {
/// AMQP connection string (e.g., "amqp://guest:guest@localhost:5672/%2f")
pub uri: String,
}
impl Debug for RabbitMqConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let redacted_uri = if let Some(at_idx) = self.uri.find('@') {
if let Some(proto_idx) = self.uri.find("://") {
let prefix = &self.uri[..proto_idx + 3];
let creds = &self.uri[proto_idx + 3..at_idx];
let suffix = &self.uri[at_idx..];
if let Some(colon_idx) = creds.find(':') {
let user = &creds[..colon_idx];
format!("{prefix}{user}:<redacted>{suffix}")
} else {
format!("{prefix}<redacted>{suffix}")
}
} else {
"<redacted>".to_string()
}
} else {
self.uri.clone()
};
f.debug_struct("RabbitMqConfig")
.field("uri", &redacted_uri)
.finish()
}
}
impl RabbitMqConfig {
pub fn new(uri: impl Into<String>) -> Self {
Self { uri: uri.into() }
}
}
/// RabbitMQ client with connection management and graceful shutdown.
#[derive(Clone)]
pub struct RabbitMqClient {
connection: Arc<Connection>,
shutdown_token: CancellationToken,
}
impl RabbitMqClient {
/// Establish a connection to RabbitMQ using the provided configuration.
///
/// The connection is named `shove-rs-{pid}` and a fresh [`CancellationToken`]
/// is created to coordinate shutdown across clones of this client.
pub async fn connect(config: &RabbitMqConfig) -> Result<Self> {
let pid = std::process::id();
let connection_name = format!("shove-rs-{pid}");
let properties =
ConnectionProperties::default().with_connection_name(connection_name.into());
let connection = Connection::connect(&config.uri, properties)
.await
.map_err(|e| ShoveError::Connection(e.to_string()))?;
Ok(Self {
connection: Arc::new(connection),
shutdown_token: CancellationToken::new(),
})
}
/// Like [`connect`](Self::connect), but retries up to `max_attempts` times
/// with exponential backoff on connection failure.
///
/// Useful for services that start alongside their broker (e.g. in Docker
/// Compose or Kubernetes) where the broker may not be ready immediately.
pub async fn connect_with_retry(config: &RabbitMqConfig, max_attempts: u32) -> Result<Self> {
let mut backoff = crate::retry::Backoff::default();
let mut last_err = None;
for attempt in 0..max_attempts {
match Self::connect(config).await {
Ok(client) => return Ok(client),
Err(e) => {
if attempt + 1 < max_attempts {
let delay = backoff.next().expect("backoff is infinite");
tracing::warn!(
attempt = attempt + 1,
max_attempts,
error = %e,
"RabbitMQ connection failed, retrying in {delay:?}"
);
tokio::time::sleep(delay).await;
}
last_err = Some(e);
}
}
}
Err(last_err.expect("loop ran at least once"))
}
/// Open a basic channel on the underlying connection.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested
/// or if the channel cannot be created.
pub async fn create_channel(&self) -> Result<Channel> {
if self.shutdown_token.is_cancelled() {
return Err(ShoveError::Connection(
"cannot create channel: client is shutting down".into(),
));
}
self.connection
.create_channel()
.await
.map_err(|e| ShoveError::Connection(e.to_string()))
}
/// Open a channel with publisher confirms enabled.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested,
/// if the channel cannot be created, or if confirms cannot be enabled.
pub async fn create_confirm_channel(&self) -> Result<Channel> {
if self.shutdown_token.is_cancelled() {
return Err(ShoveError::Connection(
"cannot create confirm channel: client is shutting down".into(),
));
}
let channel = self
.connection
.create_channel()
.await
.map_err(|e| ShoveError::Connection(e.to_string()))?;
channel
.confirm_select(ConfirmSelectOptions::default())
.await
.map_err(|e| ShoveError::Connection(e.to_string()))?;
Ok(channel)
}
/// Open a channel with AMQP transaction mode enabled (`tx_select`).
///
/// Used by consumers with [`ConsumerOptions::with_exactly_once`] to make
/// publish-to-hold-queue and ack/nack of the original delivery atomic.
/// Transaction mode is mutually exclusive with publisher confirms — do not
/// mix with [`create_confirm_channel`](Self::create_confirm_channel) on the
/// same connection.
///
/// Returns [`ShoveError::Connection`] if shutdown has already been requested,
/// if the channel cannot be created, or if `tx_select` cannot be enabled.
#[cfg(feature = "rabbitmq-transactional")]
pub async fn create_tx_channel(&self) -> Result<Channel> {
if self.shutdown_token.is_cancelled() {
return Err(ShoveError::Connection(
"cannot create tx channel: client is shutting down".into(),
));
}
let channel = self
.connection
.create_channel()
.await
.map_err(|e| ShoveError::Connection(e.to_string()))?;
channel
.tx_select()
.await
.map_err(|e| ShoveError::Connection(e.to_string()))?;
Ok(channel)
}
/// Return a clone of the shutdown [`CancellationToken`].
///
/// Callers can use this token to coordinate their own teardown with the
/// client's shutdown sequence.
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown_token.clone()
}
/// Return `true` if the underlying AMQP connection is still open.
pub fn is_connected(&self) -> bool {
self.connection.status().connected()
}
/// Initiate a graceful shutdown.
///
/// Cancels the shutdown token so that dependent tasks can begin winding
/// down, waits for [`SHUTDOWN_GRACE`] to allow in-flight operations to
/// complete, and then closes the underlying AMQP connection.
pub async fn shutdown(&self) {
self.shutdown_token.cancel();
sleep(SHUTDOWN_GRACE).await;
if let Err(e) = self.connection.close(0, "shutdown".into()).await {
tracing::warn!("error while closing RabbitMQ connection: {e}");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_debug_redacts_password_only() {
let config = RabbitMqConfig::new("amqp://admin:s3cret!@localhost:5672/%2F");
let debug_output = format!("{config:?}");
assert!(!debug_output.contains("s3cret!"));
assert!(debug_output.contains("amqp://admin:<redacted>@localhost:5672/%2F"));
}
#[test]
fn config_debug_no_creds_remains_clear() {
let config = RabbitMqConfig::new("amqp://localhost:5672/%2F");
let debug_output = format!("{config:?}");
assert!(debug_output.contains("amqp://localhost:5672/%2F"));
}
#[test]
fn config_new_stores_uri() {
let config = RabbitMqConfig::new("amqp://host:1234/%2F");
assert_eq!(config.uri, "amqp://host:1234/%2F");
}
}