elif_orm/connection/
pool.rs1use super::health::PoolHealthReport;
7use super::statistics::ExtendedPoolStats;
8use crate::backends::{DatabasePool as DatabasePoolTrait, DatabasePoolConfig, DatabasePoolStats};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13#[derive(Debug, thiserror::Error)]
15pub enum PoolError {
16 #[error("Connection acquisition failed: {0}")]
17 AcquisitionFailed(String),
18
19 #[error("Pool is closed")]
20 PoolClosed,
21
22 #[error("Connection timeout after {timeout}s")]
23 ConnectionTimeout { timeout: u64 },
24
25 #[error("Pool exhausted: all {max_connections} connections in use")]
26 PoolExhausted { max_connections: u32 },
27
28 #[error("Health check failed: {reason}")]
29 HealthCheckFailed { reason: String },
30
31 #[error("Configuration error: {message}")]
32 ConfigurationError { message: String },
33}
34
35impl From<PoolError> for crate::error::ModelError {
37 fn from(err: PoolError) -> Self {
38 match err {
39 PoolError::AcquisitionFailed(err_msg) => crate::error::ModelError::Connection(format!(
40 "Database connection failed: {}",
41 err_msg
42 )),
43 PoolError::PoolClosed => {
44 crate::error::ModelError::Connection("Database pool is closed".to_string())
45 }
46 PoolError::ConnectionTimeout { timeout } => crate::error::ModelError::Connection(
47 format!("Database connection timeout after {}s", timeout),
48 ),
49 PoolError::PoolExhausted { max_connections } => {
50 crate::error::ModelError::Connection(format!(
51 "Database pool exhausted: {} connections in use",
52 max_connections
53 ))
54 }
55 PoolError::HealthCheckFailed { reason } => crate::error::ModelError::Connection(
56 format!("Database health check failed: {}", reason),
57 ),
58 PoolError::ConfigurationError { message } => crate::error::ModelError::Connection(
59 format!("Database configuration error: {}", message),
60 ),
61 }
62 }
63}
64
65pub struct ManagedPool {
67 pool: Arc<dyn DatabasePoolTrait>,
68 config: DatabasePoolConfig,
69 acquire_count: AtomicU64,
70 acquire_errors: AtomicU64,
71 created_at: Instant,
72}
73
74impl ManagedPool {
75 pub fn new(pool: Arc<dyn DatabasePoolTrait>, config: DatabasePoolConfig) -> Self {
76 Self {
77 pool,
78 config,
79 acquire_count: AtomicU64::new(0),
80 acquire_errors: AtomicU64::new(0),
81 created_at: Instant::now(),
82 }
83 }
84
85 pub fn pool(&self) -> &dyn DatabasePoolTrait {
87 &*self.pool
88 }
89
90 pub async fn acquire(&self) -> Result<Box<dyn crate::backends::DatabaseConnection>, PoolError> {
92 self.acquire_count.fetch_add(1, Ordering::Relaxed);
93
94 match self.pool.acquire().await {
95 Ok(conn) => {
96 let stats = self.pool.stats();
97 tracing::debug!(
98 "Database connection acquired successfully (total: {}, idle: {})",
99 stats.total_connections,
100 stats.idle_connections
101 );
102 Ok(conn)
103 }
104 Err(e) => {
105 self.acquire_errors.fetch_add(1, Ordering::Relaxed);
106 let pool_error = PoolError::AcquisitionFailed(e.to_string());
107 tracing::error!("Failed to acquire database connection: {}", pool_error);
108 Err(pool_error)
109 }
110 }
111 }
112
113 pub async fn execute(
115 &self,
116 sql: &str,
117 params: &[crate::backends::DatabaseValue],
118 ) -> Result<u64, PoolError> {
119 self.pool
120 .execute(sql, params)
121 .await
122 .map_err(|e| PoolError::AcquisitionFailed(e.to_string()))
123 }
124
125 pub async fn begin_transaction(
127 &self,
128 ) -> Result<Box<dyn crate::backends::DatabaseTransaction>, PoolError> {
129 self.acquire_count.fetch_add(1, Ordering::Relaxed);
130
131 match self.pool.begin_transaction().await {
132 Ok(tx) => {
133 tracing::debug!("Database transaction started successfully");
134 Ok(tx)
135 }
136 Err(e) => {
137 self.acquire_errors.fetch_add(1, Ordering::Relaxed);
138 let pool_error = PoolError::AcquisitionFailed(e.to_string());
139 tracing::error!("Failed to begin database transaction: {}", pool_error);
140 Err(pool_error)
141 }
142 }
143 }
144
145 pub fn extended_stats(&self) -> ExtendedPoolStats {
147 ExtendedPoolStats {
148 pool_stats: self.pool.stats(),
149 acquire_count: self.acquire_count.load(Ordering::Relaxed),
150 acquire_errors: self.acquire_errors.load(Ordering::Relaxed),
151 created_at: self.created_at,
152 }
153 }
154
155 pub fn stats(&self) -> DatabasePoolStats {
157 self.pool.stats()
158 }
159
160 pub async fn health_check(&self) -> Result<Duration, PoolError> {
162 match self.pool.health_check().await {
163 Ok(duration) => {
164 tracing::debug!("Database health check passed in {:?}", duration);
165 Ok(duration)
166 }
167 Err(e) => {
168 let pool_error = PoolError::HealthCheckFailed {
169 reason: e.to_string(),
170 };
171 tracing::error!("Database health check failed: {}", pool_error);
172 Err(pool_error)
173 }
174 }
175 }
176
177 pub async fn detailed_health_check(&self) -> Result<PoolHealthReport, PoolError> {
179 let start = Instant::now();
180 let _initial_stats = self.extended_stats();
181
182 let check_duration = self.health_check().await?;
184
185 let final_stats = self.extended_stats();
187
188 let report = PoolHealthReport {
189 check_duration,
190 total_check_time: start.elapsed(),
191 pool_size: final_stats.pool_stats.total_connections,
192 idle_connections: final_stats.pool_stats.idle_connections,
193 active_connections: final_stats.pool_stats.active_connections,
194 total_acquires: final_stats.acquire_count,
195 total_errors: final_stats.acquire_errors,
196 error_rate: if final_stats.acquire_count > 0 {
197 (final_stats.acquire_errors as f64 / final_stats.acquire_count as f64) * 100.0
198 } else {
199 0.0
200 },
201 created_at: final_stats.created_at,
202 };
203
204 tracing::info!("Database pool health report: {:?}", report);
205 Ok(report)
206 }
207
208 pub fn config(&self) -> &DatabasePoolConfig {
210 &self.config
211 }
212
213 pub async fn close(&self) -> Result<(), PoolError> {
215 self.pool
216 .close()
217 .await
218 .map_err(|e| PoolError::ConfigurationError {
219 message: e.to_string(),
220 })
221 }
222}
223
224pub type PoolConfig = DatabasePoolConfig;
226pub type PoolStats = DatabasePoolStats;
227pub type DatabasePool = ManagedPool;