rust_rabbit/
connection.rs1use crate::{
2 circuit_breaker::{CircuitBreaker, CircuitBreakerConfig},
3 config::RabbitConfig,
4 error::{RabbitError, Result},
5};
6use lapin::{Channel, Connection as LapinConnection, ConnectionProperties};
7use std::sync::Arc;
8use tokio::sync::{Mutex, RwLock};
9use tokio::time::{sleep, timeout, Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12#[derive(Debug)]
14pub struct Connection {
15 inner: LapinConnection,
16 created_at: Instant,
17 last_used: Arc<RwLock<Instant>>,
18}
19
20impl Connection {
21 pub fn new(connection: LapinConnection) -> Self {
22 let now = Instant::now();
23 Self {
24 inner: connection,
25 created_at: now,
26 last_used: Arc::new(RwLock::new(now)),
27 }
28 }
29
30 pub fn inner(&self) -> &LapinConnection {
31 &self.inner
32 }
33
34 pub async fn create_channel(&self) -> Result<Channel> {
35 let mut last_used = self.last_used.write().await;
36 *last_used = Instant::now();
37 Ok(self.inner.create_channel().await?)
38 }
39
40 pub fn is_connected(&self) -> bool {
41 self.inner.status().connected()
42 }
43
44 pub async fn last_used(&self) -> Instant {
45 *self.last_used.read().await
46 }
47
48 pub fn created_at(&self) -> Instant {
49 self.created_at
50 }
51}
52
53#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
55pub struct ConnectionStats {
56 pub total_connections: usize,
57 pub healthy_connections: usize,
58 pub unhealthy_connections: usize,
59}
60
61#[derive(Debug, Clone)]
63pub struct ConnectionManager {
64 pub config: RabbitConfig,
65 connections: Arc<RwLock<Vec<Arc<Connection>>>>,
66 #[allow(dead_code)] connection_counter: Arc<Mutex<usize>>,
68 #[allow(dead_code)]
69 circuit_breaker: Arc<CircuitBreaker>,
70}
71
72impl ConnectionManager {
73 pub async fn new(config: RabbitConfig) -> Result<Self> {
75 let circuit_breaker_config = CircuitBreakerConfig {
76 failure_threshold: 5,
77 failure_window: Duration::from_secs(60),
78 recovery_timeout: Duration::from_secs(30),
79 success_threshold: 3,
80 half_open_max_requests: 5,
81 };
82
83 let manager = Self {
84 config,
85 connections: Arc::new(RwLock::new(Vec::new())),
86 connection_counter: Arc::new(Mutex::new(0)),
87 circuit_breaker: Arc::new(CircuitBreaker::with_config(circuit_breaker_config)),
88 };
89
90 manager.ensure_min_connections().await?;
92
93 if manager.config.health_check.enabled {
95 manager.start_health_monitoring().await;
96 }
97
98 Ok(manager)
99 }
100
101 pub async fn get_connection(&self) -> Result<Arc<Connection>> {
103 let connections = self.connections.read().await;
104
105 for conn in connections.iter() {
107 if conn.is_connected() {
108 return Ok(conn.clone());
109 }
110 }
111
112 drop(connections);
114
115 self.create_new_connection().await
116 }
117
118 async fn create_new_connection(&self) -> Result<Arc<Connection>> {
120 let mut retry_count = 0;
121 let mut delay = self.config.retry_config.initial_delay;
122
123 loop {
124 match self.establish_connection().await {
125 Ok(connection) => {
126 let conn = Arc::new(Connection::new(connection));
127
128 let mut connections = self.connections.write().await;
130 if connections.len() < self.config.pool_config.max_connections {
131 connections.push(conn.clone());
132 }
133
134 info!("Successfully established new RabbitMQ connection");
135 return Ok(conn);
136 }
137 Err(e) => {
138 retry_count += 1;
139 if retry_count > self.config.retry_config.max_retries {
140 error!(
141 "Failed to establish connection after {} retries: {}",
142 retry_count, e
143 );
144 return Err(RabbitError::RetryExhausted(format!(
145 "Connection failed after {} retries",
146 retry_count
147 )));
148 }
149
150 warn!(
151 "Connection attempt {} failed: {}. Retrying in {:?}",
152 retry_count, e, delay
153 );
154
155 sleep(delay).await;
156 delay = self.calculate_next_delay(delay);
157 }
158 }
159 }
160 }
161
162 async fn establish_connection(&self) -> Result<LapinConnection> {
164 let connection_future = LapinConnection::connect(
165 &self.config.connection_string,
166 ConnectionProperties::default(),
167 );
168
169 if let Some(timeout_duration) = self.config.connection_timeout {
170 timeout(timeout_duration, connection_future)
171 .await
172 .map_err(|_| RabbitError::Timeout("Connection timeout".to_string()))?
173 .map_err(RabbitError::Connection)
174 } else {
175 connection_future.await.map_err(RabbitError::Connection)
176 }
177 }
178
179 fn calculate_next_delay(&self, current_delay: Duration) -> Duration {
181 let base_delay = Duration::from_millis(
182 (current_delay.as_millis() as f64 * self.config.retry_config.backoff_multiplier) as u64,
183 );
184
185 let max_delay = self.config.retry_config.max_delay;
186 let delay = if base_delay > max_delay {
187 max_delay
188 } else {
189 base_delay
190 };
191
192 if self.config.retry_config.jitter > 0.0 {
194 let jitter_amount = (delay.as_millis() as f64 * self.config.retry_config.jitter) as u64;
195 let jitter = fastrand::u64(0..=jitter_amount);
196 Duration::from_millis(delay.as_millis() as u64 + jitter)
197 } else {
198 delay
199 }
200 }
201
202 async fn ensure_min_connections(&self) -> Result<()> {
204 let connections = self.connections.read().await;
205 let healthy_count = connections.iter().filter(|c| c.is_connected()).count();
206
207 if healthy_count >= self.config.pool_config.min_connections {
208 return Ok(());
209 }
210
211 drop(connections);
212
213 let needed = self.config.pool_config.min_connections - healthy_count;
214 debug!(
215 "Creating {} connections to meet minimum requirement",
216 needed
217 );
218
219 for _ in 0..needed {
220 if let Err(e) = self.create_new_connection().await {
221 warn!("Failed to create minimum connection: {}", e);
222 }
223 }
224
225 Ok(())
226 }
227
228 async fn start_health_monitoring(&self) {
230 let manager = self.clone();
231 tokio::spawn(async move {
232 let mut interval = tokio::time::interval(manager.config.health_check.check_interval);
233
234 loop {
235 interval.tick().await;
236 manager.perform_health_check().await;
237 }
238 });
239 }
240
241 async fn perform_health_check(&self) {
243 let mut connections = self.connections.write().await;
244 let mut unhealthy_indices = Vec::new();
245
246 for (i, conn) in connections.iter().enumerate() {
247 if !conn.is_connected() {
248 debug!("Connection {} is unhealthy, marking for removal", i);
249 unhealthy_indices.push(i);
250 }
251 }
252
253 for &i in unhealthy_indices.iter().rev() {
255 connections.remove(i);
256 }
257
258 if !unhealthy_indices.is_empty() {
259 info!("Removed {} unhealthy connections", unhealthy_indices.len());
260 }
261
262 drop(connections);
263
264 if let Err(e) = self.ensure_min_connections().await {
266 warn!(
267 "Failed to ensure minimum connections during health check: {}",
268 e
269 );
270 }
271 }
272
273 pub async fn get_stats(&self) -> ConnectionStats {
275 let connections = self.connections.read().await;
276 let total = connections.len();
277 let healthy = connections.iter().filter(|c| c.is_connected()).count();
278
279 ConnectionStats {
280 total_connections: total,
281 healthy_connections: healthy,
282 unhealthy_connections: total - healthy,
283 }
284 }
285
286 pub async fn close(&self) -> Result<()> {
288 let mut connections = self.connections.write().await;
289
290 for conn in connections.drain(..) {
291 if let Err(e) = conn.inner().close(0, "Shutdown").await {
292 warn!("Error closing connection: {}", e);
293 }
294 }
295
296 info!("All connections closed");
297 Ok(())
298 }
299}