elif_orm/connection/
pool.rs

1//! Connection Pool Management
2//!
3//! This module provides managed connection pools with statistics tracking,
4//! health monitoring, and comprehensive error handling.
5
6use super::health::PoolHealthReport;
7use super::statistics::ExtendedPoolStats;
8use crate::backends::{DatabasePool as DatabasePoolTrait, DatabasePoolConfig, DatabasePoolStats};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13/// Database connection pool error types
14#[derive(Debug, thiserror::Error)]
15pub enum PoolError {
16    #[error("Connection acquisition failed: {0}")]
17    AcquisitionFailed(String),
18
19    #[error("Pool is closed")]
20    PoolClosed,
21
22    #[error("Connection timeout after {timeout}s")]
23    ConnectionTimeout { timeout: u64 },
24
25    #[error("Pool exhausted: all {max_connections} connections in use")]
26    PoolExhausted { max_connections: u32 },
27
28    #[error("Health check failed: {reason}")]
29    HealthCheckFailed { reason: String },
30
31    #[error("Configuration error: {message}")]
32    ConfigurationError { message: String },
33}
34
35/// ORM-specific error for database operations
36impl From<PoolError> for crate::error::ModelError {
37    fn from(err: PoolError) -> Self {
38        match err {
39            PoolError::AcquisitionFailed(err_msg) => crate::error::ModelError::Connection(format!(
40                "Database connection failed: {}",
41                err_msg
42            )),
43            PoolError::PoolClosed => {
44                crate::error::ModelError::Connection("Database pool is closed".to_string())
45            }
46            PoolError::ConnectionTimeout { timeout } => crate::error::ModelError::Connection(
47                format!("Database connection timeout after {}s", timeout),
48            ),
49            PoolError::PoolExhausted { max_connections } => {
50                crate::error::ModelError::Connection(format!(
51                    "Database pool exhausted: {} connections in use",
52                    max_connections
53                ))
54            }
55            PoolError::HealthCheckFailed { reason } => crate::error::ModelError::Connection(
56                format!("Database health check failed: {}", reason),
57            ),
58            PoolError::ConfigurationError { message } => crate::error::ModelError::Connection(
59                format!("Database configuration error: {}", message),
60            ),
61        }
62    }
63}
64
65/// Managed connection pool wrapper with statistics and health monitoring
66pub struct ManagedPool {
67    pool: Arc<dyn DatabasePoolTrait>,
68    config: DatabasePoolConfig,
69    acquire_count: AtomicU64,
70    acquire_errors: AtomicU64,
71    created_at: Instant,
72}
73
74impl ManagedPool {
75    pub fn new(pool: Arc<dyn DatabasePoolTrait>, config: DatabasePoolConfig) -> Self {
76        Self {
77            pool,
78            config,
79            acquire_count: AtomicU64::new(0),
80            acquire_errors: AtomicU64::new(0),
81            created_at: Instant::now(),
82        }
83    }
84
85    /// Get the underlying pool
86    pub fn pool(&self) -> &dyn DatabasePoolTrait {
87        &*self.pool
88    }
89
90    /// Acquire a connection from the pool with statistics tracking and enhanced error handling
91    pub async fn acquire(&self) -> Result<Box<dyn crate::backends::DatabaseConnection>, PoolError> {
92        self.acquire_count.fetch_add(1, Ordering::Relaxed);
93
94        match self.pool.acquire().await {
95            Ok(conn) => {
96                let stats = self.pool.stats();
97                tracing::debug!(
98                    "Database connection acquired successfully (total: {}, idle: {})",
99                    stats.total_connections,
100                    stats.idle_connections
101                );
102                Ok(conn)
103            }
104            Err(e) => {
105                self.acquire_errors.fetch_add(1, Ordering::Relaxed);
106                let pool_error = PoolError::AcquisitionFailed(e.to_string());
107                tracing::error!("Failed to acquire database connection: {}", pool_error);
108                Err(pool_error)
109            }
110        }
111    }
112
113    /// Execute a query directly with the pool
114    pub async fn execute(
115        &self,
116        sql: &str,
117        params: &[crate::backends::DatabaseValue],
118    ) -> Result<u64, PoolError> {
119        self.pool
120            .execute(sql, params)
121            .await
122            .map_err(|e| PoolError::AcquisitionFailed(e.to_string()))
123    }
124
125    /// Begin a database transaction with statistics tracking
126    pub async fn begin_transaction(
127        &self,
128    ) -> Result<Box<dyn crate::backends::DatabaseTransaction>, PoolError> {
129        self.acquire_count.fetch_add(1, Ordering::Relaxed);
130
131        match self.pool.begin_transaction().await {
132            Ok(tx) => {
133                tracing::debug!("Database transaction started successfully");
134                Ok(tx)
135            }
136            Err(e) => {
137                self.acquire_errors.fetch_add(1, Ordering::Relaxed);
138                let pool_error = PoolError::AcquisitionFailed(e.to_string());
139                tracing::error!("Failed to begin database transaction: {}", pool_error);
140                Err(pool_error)
141            }
142        }
143    }
144
145    /// Get pool statistics with extended metrics
146    pub fn extended_stats(&self) -> ExtendedPoolStats {
147        ExtendedPoolStats {
148            pool_stats: self.pool.stats(),
149            acquire_count: self.acquire_count.load(Ordering::Relaxed),
150            acquire_errors: self.acquire_errors.load(Ordering::Relaxed),
151            created_at: self.created_at,
152        }
153    }
154
155    /// Get current pool statistics (legacy method for backward compatibility)
156    pub fn stats(&self) -> DatabasePoolStats {
157        self.pool.stats()
158    }
159
160    /// Check pool health with comprehensive error reporting
161    pub async fn health_check(&self) -> Result<Duration, PoolError> {
162        match self.pool.health_check().await {
163            Ok(duration) => {
164                tracing::debug!("Database health check passed in {:?}", duration);
165                Ok(duration)
166            }
167            Err(e) => {
168                let pool_error = PoolError::HealthCheckFailed {
169                    reason: e.to_string(),
170                };
171                tracing::error!("Database health check failed: {}", pool_error);
172                Err(pool_error)
173            }
174        }
175    }
176
177    /// Check pool health and log detailed statistics
178    pub async fn detailed_health_check(&self) -> Result<PoolHealthReport, PoolError> {
179        let start = Instant::now();
180        let _initial_stats = self.extended_stats();
181
182        // Perform the actual health check
183        let check_duration = self.health_check().await?;
184
185        // Get updated statistics
186        let final_stats = self.extended_stats();
187
188        let report = PoolHealthReport {
189            check_duration,
190            total_check_time: start.elapsed(),
191            pool_size: final_stats.pool_stats.total_connections,
192            idle_connections: final_stats.pool_stats.idle_connections,
193            active_connections: final_stats.pool_stats.active_connections,
194            total_acquires: final_stats.acquire_count,
195            total_errors: final_stats.acquire_errors,
196            error_rate: if final_stats.acquire_count > 0 {
197                (final_stats.acquire_errors as f64 / final_stats.acquire_count as f64) * 100.0
198            } else {
199                0.0
200            },
201            created_at: final_stats.created_at,
202        };
203
204        tracing::info!("Database pool health report: {:?}", report);
205        Ok(report)
206    }
207
208    /// Get connection pool configuration
209    pub fn config(&self) -> &DatabasePoolConfig {
210        &self.config
211    }
212
213    /// Close the connection pool
214    pub async fn close(&self) -> Result<(), PoolError> {
215        self.pool
216            .close()
217            .await
218            .map_err(|e| PoolError::ConfigurationError {
219                message: e.to_string(),
220            })
221    }
222}
223
224/// Legacy aliases for backward compatibility
225pub type PoolConfig = DatabasePoolConfig;
226pub type PoolStats = DatabasePoolStats;
227pub type DatabasePool = ManagedPool;