rust_rabbit/
connection.rs

1use crate::{
2    circuit_breaker::{CircuitBreaker, CircuitBreakerConfig},
3    config::RabbitConfig,
4    error::{RabbitError, Result},
5};
6use lapin::{Channel, Connection as LapinConnection, ConnectionProperties};
7use std::sync::Arc;
8use tokio::sync::{Mutex, RwLock};
9use tokio::time::{sleep, timeout, Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12/// Connection wrapper with metadata
13#[derive(Debug)]
14pub struct Connection {
15    inner: LapinConnection,
16    created_at: Instant,
17    last_used: Arc<RwLock<Instant>>,
18}
19
20impl Connection {
21    pub fn new(connection: LapinConnection) -> Self {
22        let now = Instant::now();
23        Self {
24            inner: connection,
25            created_at: now,
26            last_used: Arc::new(RwLock::new(now)),
27        }
28    }
29
30    pub fn inner(&self) -> &LapinConnection {
31        &self.inner
32    }
33
34    pub async fn create_channel(&self) -> Result<Channel> {
35        let mut last_used = self.last_used.write().await;
36        *last_used = Instant::now();
37        Ok(self.inner.create_channel().await?)
38    }
39
40    pub fn is_connected(&self) -> bool {
41        self.inner.status().connected()
42    }
43
44    pub async fn last_used(&self) -> Instant {
45        *self.last_used.read().await
46    }
47
48    pub fn created_at(&self) -> Instant {
49        self.created_at
50    }
51}
52
53/// Connection statistics
54#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
55pub struct ConnectionStats {
56    pub total_connections: usize,
57    pub healthy_connections: usize,
58    pub unhealthy_connections: usize,
59}
60
61/// Connection manager with pooling and health monitoring
62#[derive(Debug, Clone)]
63pub struct ConnectionManager {
64    pub config: RabbitConfig,
65    connections: Arc<RwLock<Vec<Arc<Connection>>>>,
66    #[allow(dead_code)] // Will be used for connection tracking in future versions
67    connection_counter: Arc<Mutex<usize>>,
68    #[allow(dead_code)]
69    circuit_breaker: Arc<CircuitBreaker>,
70}
71
72impl ConnectionManager {
73    /// Create a new connection manager
74    pub async fn new(config: RabbitConfig) -> Result<Self> {
75        let circuit_breaker_config = CircuitBreakerConfig {
76            failure_threshold: 5,
77            failure_window: Duration::from_secs(60),
78            recovery_timeout: Duration::from_secs(30),
79            success_threshold: 3,
80            half_open_max_requests: 5,
81        };
82
83        let manager = Self {
84            config,
85            connections: Arc::new(RwLock::new(Vec::new())),
86            connection_counter: Arc::new(Mutex::new(0)),
87            circuit_breaker: Arc::new(CircuitBreaker::with_config(circuit_breaker_config)),
88        };
89
90        // Initialize minimum connections
91        manager.ensure_min_connections().await?;
92
93        // Start background health monitoring
94        if manager.config.health_check.enabled {
95            manager.start_health_monitoring().await;
96        }
97
98        Ok(manager)
99    }
100
101    /// Get a connection from the pool
102    pub async fn get_connection(&self) -> Result<Arc<Connection>> {
103        let connections = self.connections.read().await;
104
105        // Find a healthy connection
106        for conn in connections.iter() {
107            if conn.is_connected() {
108                return Ok(conn.clone());
109            }
110        }
111
112        // No healthy connections found, drop the read lock and create new one
113        drop(connections);
114
115        self.create_new_connection().await
116    }
117
118    /// Create a new connection with retry mechanism
119    async fn create_new_connection(&self) -> Result<Arc<Connection>> {
120        let mut retry_count = 0;
121        let mut delay = self.config.retry_config.initial_delay;
122
123        loop {
124            match self.establish_connection().await {
125                Ok(connection) => {
126                    let conn = Arc::new(Connection::new(connection));
127
128                    // Add to pool if not at max capacity
129                    let mut connections = self.connections.write().await;
130                    if connections.len() < self.config.pool_config.max_connections {
131                        connections.push(conn.clone());
132                    }
133
134                    info!("Successfully established new RabbitMQ connection");
135                    return Ok(conn);
136                }
137                Err(e) => {
138                    retry_count += 1;
139                    if retry_count > self.config.retry_config.max_retries {
140                        error!(
141                            "Failed to establish connection after {} retries: {}",
142                            retry_count, e
143                        );
144                        return Err(RabbitError::RetryExhausted(format!(
145                            "Connection failed after {} retries",
146                            retry_count
147                        )));
148                    }
149
150                    warn!(
151                        "Connection attempt {} failed: {}. Retrying in {:?}",
152                        retry_count, e, delay
153                    );
154
155                    sleep(delay).await;
156                    delay = self.calculate_next_delay(delay);
157                }
158            }
159        }
160    }
161
162    /// Establish a raw connection to RabbitMQ
163    async fn establish_connection(&self) -> Result<LapinConnection> {
164        let connection_future = LapinConnection::connect(
165            &self.config.connection_string,
166            ConnectionProperties::default(),
167        );
168
169        if let Some(timeout_duration) = self.config.connection_timeout {
170            timeout(timeout_duration, connection_future)
171                .await
172                .map_err(|_| RabbitError::Timeout("Connection timeout".to_string()))?
173                .map_err(RabbitError::Connection)
174        } else {
175            connection_future.await.map_err(RabbitError::Connection)
176        }
177    }
178
179    /// Calculate the next retry delay with exponential backoff and jitter
180    fn calculate_next_delay(&self, current_delay: Duration) -> Duration {
181        let base_delay = Duration::from_millis(
182            (current_delay.as_millis() as f64 * self.config.retry_config.backoff_multiplier) as u64,
183        );
184
185        let max_delay = self.config.retry_config.max_delay;
186        let delay = if base_delay > max_delay {
187            max_delay
188        } else {
189            base_delay
190        };
191
192        // Add jitter
193        if self.config.retry_config.jitter > 0.0 {
194            let jitter_amount = (delay.as_millis() as f64 * self.config.retry_config.jitter) as u64;
195            let jitter = fastrand::u64(0..=jitter_amount);
196            Duration::from_millis(delay.as_millis() as u64 + jitter)
197        } else {
198            delay
199        }
200    }
201
202    /// Ensure minimum number of connections are available
203    async fn ensure_min_connections(&self) -> Result<()> {
204        let connections = self.connections.read().await;
205        let healthy_count = connections.iter().filter(|c| c.is_connected()).count();
206
207        if healthy_count >= self.config.pool_config.min_connections {
208            return Ok(());
209        }
210
211        drop(connections);
212
213        let needed = self.config.pool_config.min_connections - healthy_count;
214        debug!(
215            "Creating {} connections to meet minimum requirement",
216            needed
217        );
218
219        for _ in 0..needed {
220            if let Err(e) = self.create_new_connection().await {
221                warn!("Failed to create minimum connection: {}", e);
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Start background health monitoring
229    async fn start_health_monitoring(&self) {
230        let manager = self.clone();
231        tokio::spawn(async move {
232            let mut interval = tokio::time::interval(manager.config.health_check.check_interval);
233
234            loop {
235                interval.tick().await;
236                manager.perform_health_check().await;
237            }
238        });
239    }
240
241    /// Perform health check on all connections
242    async fn perform_health_check(&self) {
243        let mut connections = self.connections.write().await;
244        let mut unhealthy_indices = Vec::new();
245
246        for (i, conn) in connections.iter().enumerate() {
247            if !conn.is_connected() {
248                debug!("Connection {} is unhealthy, marking for removal", i);
249                unhealthy_indices.push(i);
250            }
251        }
252
253        // Remove unhealthy connections (in reverse order to maintain indices)
254        for &i in unhealthy_indices.iter().rev() {
255            connections.remove(i);
256        }
257
258        if !unhealthy_indices.is_empty() {
259            info!("Removed {} unhealthy connections", unhealthy_indices.len());
260        }
261
262        drop(connections);
263
264        // Ensure we have minimum connections
265        if let Err(e) = self.ensure_min_connections().await {
266            warn!(
267                "Failed to ensure minimum connections during health check: {}",
268                e
269            );
270        }
271    }
272
273    /// Get connection statistics
274    pub async fn get_stats(&self) -> ConnectionStats {
275        let connections = self.connections.read().await;
276        let total = connections.len();
277        let healthy = connections.iter().filter(|c| c.is_connected()).count();
278
279        ConnectionStats {
280            total_connections: total,
281            healthy_connections: healthy,
282            unhealthy_connections: total - healthy,
283        }
284    }
285
286    /// Close all connections
287    pub async fn close(&self) -> Result<()> {
288        let mut connections = self.connections.write().await;
289
290        for conn in connections.drain(..) {
291            if let Err(e) = conn.inner().close(0, "Shutdown").await {
292                warn!("Error closing connection: {}", e);
293            }
294        }
295
296        info!("All connections closed");
297        Ok(())
298    }
299}