1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use std::sync::atomic::{AtomicU64, Ordering};
9use elif_core::{ServiceProvider, Container, ContainerBuilder};
10use crate::error::ModelError;
11use crate::backends::{
12 DatabasePool as DatabasePoolTrait, DatabaseBackend, DatabaseBackendRegistry,
13 DatabasePoolConfig, DatabasePoolStats, DatabaseBackendType
14};
15
16#[derive(Debug, thiserror::Error)]
18pub enum PoolError {
19 #[error("Connection acquisition failed: {0}")]
20 AcquisitionFailed(String),
21
22 #[error("Pool is closed")]
23 PoolClosed,
24
25 #[error("Connection timeout after {timeout}s")]
26 ConnectionTimeout { timeout: u64 },
27
28 #[error("Pool exhausted: all {max_connections} connections in use")]
29 PoolExhausted { max_connections: u32 },
30
31 #[error("Health check failed: {reason}")]
32 HealthCheckFailed { reason: String },
33
34 #[error("Configuration error: {message}")]
35 ConfigurationError { message: String },
36}
37
38impl From<PoolError> for ModelError {
40 fn from(err: PoolError) -> Self {
41 match err {
42 PoolError::AcquisitionFailed(err_msg) => {
43 ModelError::Connection(format!("Database connection failed: {}", err_msg))
44 },
45 PoolError::PoolClosed => {
46 ModelError::Connection("Database pool is closed".to_string())
47 },
48 PoolError::ConnectionTimeout { timeout } => {
49 ModelError::Connection(format!("Database connection timeout after {}s", timeout))
50 },
51 PoolError::PoolExhausted { max_connections } => {
52 ModelError::Connection(format!("Database pool exhausted: {} connections in use", max_connections))
53 },
54 PoolError::HealthCheckFailed { reason } => {
55 ModelError::Connection(format!("Database health check failed: {}", reason))
56 },
57 PoolError::ConfigurationError { message } => {
58 ModelError::Connection(format!("Database configuration error: {}", message))
59 },
60 }
61 }
62}
63
64pub type PoolConfig = DatabasePoolConfig;
66
67pub type PoolStats = DatabasePoolStats;
69
70#[derive(Debug, Clone)]
72pub struct ExtendedPoolStats {
73 pub pool_stats: DatabasePoolStats,
74 pub acquire_count: u64,
75 pub acquire_errors: u64,
76 pub created_at: Instant,
77}
78
79#[derive(Debug, Clone)]
81pub struct PoolHealthReport {
82 pub check_duration: Duration,
83 pub total_check_time: Duration,
84 pub pool_size: u32,
85 pub idle_connections: u32,
86 pub active_connections: u32,
87 pub total_acquires: u64,
88 pub total_errors: u64,
89 pub error_rate: f64,
90 pub created_at: Instant,
91}
92
93pub struct ManagedPool {
95 pool: Arc<dyn DatabasePoolTrait>,
96 config: DatabasePoolConfig,
97 acquire_count: AtomicU64,
98 acquire_errors: AtomicU64,
99 created_at: Instant,
100}
101
102impl ManagedPool {
103 pub fn new(pool: Arc<dyn DatabasePoolTrait>, config: DatabasePoolConfig) -> Self {
104 Self {
105 pool,
106 config,
107 acquire_count: AtomicU64::new(0),
108 acquire_errors: AtomicU64::new(0),
109 created_at: Instant::now(),
110 }
111 }
112
113 pub fn pool(&self) -> &dyn DatabasePoolTrait {
115 &*self.pool
116 }
117
118 pub async fn acquire(&self) -> Result<Box<dyn crate::backends::DatabaseConnection>, PoolError> {
120 self.acquire_count.fetch_add(1, Ordering::Relaxed);
121
122 match self.pool.acquire().await {
123 Ok(conn) => {
124 let stats = self.pool.stats();
125 tracing::debug!("Database connection acquired successfully (total: {}, idle: {})",
126 stats.total_connections, stats.idle_connections);
127 Ok(conn)
128 },
129 Err(e) => {
130 self.acquire_errors.fetch_add(1, Ordering::Relaxed);
131 let pool_error = PoolError::AcquisitionFailed(e.to_string());
132 tracing::error!("Failed to acquire database connection: {}", pool_error);
133 Err(pool_error)
134 }
135 }
136 }
137
138 pub async fn execute(&self, sql: &str, params: &[crate::backends::DatabaseValue]) -> Result<u64, PoolError> {
140 self.pool.execute(sql, params).await
141 .map_err(|e| PoolError::AcquisitionFailed(e.to_string()))
142 }
143
144 pub async fn begin_transaction(&self) -> Result<Box<dyn crate::backends::DatabaseTransaction>, PoolError> {
146 self.acquire_count.fetch_add(1, Ordering::Relaxed);
147
148 match self.pool.begin_transaction().await {
149 Ok(tx) => {
150 tracing::debug!("Database transaction started successfully");
151 Ok(tx)
152 },
153 Err(e) => {
154 self.acquire_errors.fetch_add(1, Ordering::Relaxed);
155 let pool_error = PoolError::AcquisitionFailed(e.to_string());
156 tracing::error!("Failed to begin database transaction: {}", pool_error);
157 Err(pool_error)
158 }
159 }
160 }
161
162 pub fn extended_stats(&self) -> ExtendedPoolStats {
164 ExtendedPoolStats {
165 pool_stats: self.pool.stats(),
166 acquire_count: self.acquire_count.load(Ordering::Relaxed),
167 acquire_errors: self.acquire_errors.load(Ordering::Relaxed),
168 created_at: self.created_at,
169 }
170 }
171
172 pub fn stats(&self) -> DatabasePoolStats {
174 self.pool.stats()
175 }
176
177 pub async fn health_check(&self) -> Result<Duration, PoolError> {
179 match self.pool.health_check().await {
180 Ok(duration) => {
181 tracing::debug!("Database health check passed in {:?}", duration);
182 Ok(duration)
183 },
184 Err(e) => {
185 let pool_error = PoolError::HealthCheckFailed {
186 reason: e.to_string()
187 };
188 tracing::error!("Database health check failed: {}", pool_error);
189 Err(pool_error)
190 }
191 }
192 }
193
194 pub async fn detailed_health_check(&self) -> Result<PoolHealthReport, PoolError> {
196 let start = Instant::now();
197 let initial_stats = self.extended_stats();
198
199 let check_duration = self.health_check().await?;
201
202 let final_stats = self.extended_stats();
204
205 let report = PoolHealthReport {
206 check_duration,
207 total_check_time: start.elapsed(),
208 pool_size: final_stats.pool_stats.total_connections,
209 idle_connections: final_stats.pool_stats.idle_connections,
210 active_connections: final_stats.pool_stats.active_connections,
211 total_acquires: final_stats.acquire_count,
212 total_errors: final_stats.acquire_errors,
213 error_rate: if final_stats.acquire_count > 0 {
214 (final_stats.acquire_errors as f64 / final_stats.acquire_count as f64) * 100.0
215 } else {
216 0.0
217 },
218 created_at: final_stats.created_at,
219 };
220
221 tracing::info!("Database pool health report: {:?}", report);
222 Ok(report)
223 }
224
225 pub fn config(&self) -> &DatabasePoolConfig {
227 &self.config
228 }
229
230 pub async fn close(&self) -> Result<(), PoolError> {
232 self.pool.close().await
233 .map_err(|e| PoolError::ConfigurationError { message: e.to_string() })
234 }
235}
236
237pub struct DatabaseServiceProvider {
239 database_url: String,
240 config: DatabasePoolConfig,
241 service_name: String,
242 backend_registry: Arc<DatabaseBackendRegistry>,
243}
244
245impl DatabaseServiceProvider {
246 pub fn new(database_url: String) -> Self {
247 let mut registry = DatabaseBackendRegistry::new();
248 registry.register(
249 DatabaseBackendType::PostgreSQL,
250 Arc::new(crate::backends::PostgresBackend::new())
251 );
252
253 Self {
254 database_url,
255 config: DatabasePoolConfig::default(),
256 service_name: "database_pool".to_string(),
257 backend_registry: Arc::new(registry),
258 }
259 }
260
261 pub fn with_registry(mut self, registry: Arc<DatabaseBackendRegistry>) -> Self {
262 self.backend_registry = registry;
263 self
264 }
265
266 pub fn with_config(mut self, config: DatabasePoolConfig) -> Self {
267 self.config = config;
268 self
269 }
270
271 pub fn with_max_connections(mut self, max_connections: u32) -> Self {
272 self.config.max_connections = max_connections;
273 self
274 }
275
276 pub fn with_min_connections(mut self, min_connections: u32) -> Self {
277 self.config.min_connections = min_connections;
278 self
279 }
280
281 pub fn with_acquire_timeout(mut self, timeout_seconds: u64) -> Self {
282 self.config.acquire_timeout_seconds = timeout_seconds;
283 self
284 }
285
286 pub fn with_idle_timeout(mut self, timeout_seconds: Option<u64>) -> Self {
287 self.config.idle_timeout_seconds = timeout_seconds;
288 self
289 }
290
291 pub fn with_max_lifetime(mut self, lifetime_seconds: Option<u64>) -> Self {
292 self.config.max_lifetime_seconds = lifetime_seconds;
293 self
294 }
295
296 pub fn with_test_before_acquire(mut self, enabled: bool) -> Self {
297 self.config.test_before_acquire = enabled;
298 self
299 }
300
301 pub fn with_service_name(mut self, service_name: String) -> Self {
302 self.service_name = service_name;
303 self
304 }
305
306 pub async fn create_pool(&self) -> Result<Arc<dyn DatabasePoolTrait>, ModelError> {
308 self.backend_registry.create_pool(&self.database_url, self.config.clone())
309 .await
310 .map_err(|e| ModelError::Connection(e.to_string()))
311 }
312
313 pub async fn create_managed_pool(&self) -> Result<ManagedPool, ModelError> {
315 let pool = self.create_pool().await?;
316 Ok(ManagedPool::new(pool, self.config.clone()))
317 }
318
319 pub fn database_url(&self) -> &str {
321 &self.database_url
322 }
323
324 pub fn service_name(&self) -> &str {
326 &self.service_name
327 }
328
329 pub fn config(&self) -> &DatabasePoolConfig {
331 &self.config
332 }
333}
334
335impl ServiceProvider for DatabaseServiceProvider {
336 fn name(&self) -> &'static str {
337 "DatabaseServiceProvider"
338 }
339
340 fn register(&self, builder: ContainerBuilder) -> Result<ContainerBuilder, elif_core::ProviderError> {
341 tracing::debug!("Registering database service with URL: {}",
344 self.database_url.split('@').last().unwrap_or("unknown"));
345 Ok(builder)
346 }
347
348 fn boot(&self, _container: &Container) -> Result<(), elif_core::ProviderError> {
349 tracing::info!("✅ Database service provider booted successfully");
350 tracing::debug!("Database pool configuration: max_connections={}, min_connections={}, acquire_timeout={}s, idle_timeout={:?}s, max_lifetime={:?}s, test_before_acquire={}",
351 self.config.max_connections, self.config.min_connections, self.config.acquire_timeout_seconds,
352 self.config.idle_timeout_seconds, self.config.max_lifetime_seconds, self.config.test_before_acquire);
353 Ok(())
354 }
355}
356
357pub async fn create_database_pool(database_url: &str) -> Result<Arc<dyn DatabasePoolTrait>, ModelError> {
359 create_database_pool_with_config(database_url, &DatabasePoolConfig::default()).await
360}
361
362pub async fn create_database_pool_with_config(
364 database_url: &str,
365 config: &DatabasePoolConfig
366) -> Result<Arc<dyn DatabasePoolTrait>, ModelError> {
367 tracing::debug!("Creating database pool with config: max={}, min={}, timeout={}s, idle_timeout={:?}s, max_lifetime={:?}s, test_before_acquire={}",
368 config.max_connections, config.min_connections, config.acquire_timeout_seconds,
369 config.idle_timeout_seconds, config.max_lifetime_seconds, config.test_before_acquire);
370
371 let mut registry = DatabaseBackendRegistry::new();
372 registry.register(
373 DatabaseBackendType::PostgreSQL,
374 Arc::new(crate::backends::PostgresBackend::new())
375 );
376
377 let pool = registry.create_pool(database_url, config.clone())
378 .await
379 .map_err(|e| {
380 tracing::error!("Failed to create database pool: {}", e);
381 ModelError::Connection(format!("Failed to create database pool: {}", e))
382 })?;
383
384 tracing::info!("✅ Database pool created successfully with {} max connections", config.max_connections);
385 Ok(pool)
386}
387
388pub struct PoolRegistry {
390 pools: std::collections::HashMap<String, Arc<ManagedPool>>,
391}
392
393pub type DatabasePool = ManagedPool;
395
396impl PoolRegistry {
397 pub fn new() -> Self {
398 Self {
399 pools: std::collections::HashMap::new(),
400 }
401 }
402
403 pub fn register(&mut self, name: String, pool: Arc<ManagedPool>) {
405 tracing::info!("Registering database pool: {}", name);
406 self.pools.insert(name, pool);
407 }
408
409 pub fn get(&self, name: &str) -> Option<Arc<ManagedPool>> {
411 self.pools.get(name).cloned()
412 }
413
414 pub fn get_default(&self) -> Option<Arc<ManagedPool>> {
416 self.get("database_pool")
417 }
418
419 pub fn pool_names(&self) -> Vec<&String> {
421 self.pools.keys().collect()
422 }
423
424 pub fn get_all_stats(&self) -> std::collections::HashMap<String, DatabasePoolStats> {
426 self.pools
427 .iter()
428 .map(|(name, pool)| (name.clone(), pool.stats()))
429 .collect()
430 }
431
432 pub async fn health_check_all(&self) -> std::collections::HashMap<String, Result<Duration, PoolError>> {
434 let mut results = std::collections::HashMap::new();
435
436 for (name, pool) in &self.pools {
437 let result = pool.health_check().await;
438 results.insert(name.clone(), result);
439 }
440
441 results
442 }
443}
444
445impl Default for PoolRegistry {
446 fn default() -> Self {
447 Self::new()
448 }
449}
450
451pub async fn get_database_pool(_container: &Container) -> Result<Arc<dyn DatabasePoolTrait>, String> {
453 Err("Database pool not yet integrated with current Container implementation - use PoolRegistry for now".to_string())
456}
457
458pub async fn get_named_database_pool(
460 _container: &Container,
461 service_name: &str
462) -> Result<Arc<dyn DatabasePoolTrait>, String> {
463 Err(format!("Database pool '{}' not yet integrated with current Container implementation - use PoolRegistry for now", service_name))
466}
467
468pub async fn create_default_pool_registry(database_url: &str) -> Result<PoolRegistry, ModelError> {
470 let mut registry = PoolRegistry::new();
471
472 let provider = DatabaseServiceProvider::new(database_url.to_string());
473 let managed_pool = provider.create_managed_pool().await?;
474
475 registry.register("database_pool".to_string(), Arc::new(managed_pool));
476
477 tracing::info!("Created default pool registry with database_pool");
478 Ok(registry)
479}
480
481pub async fn create_custom_pool_registry(
483 pools: Vec<(String, String, DatabasePoolConfig)>
484) -> Result<PoolRegistry, ModelError> {
485 let mut registry = PoolRegistry::new();
486
487 for (name, database_url, config) in pools {
488 let provider = DatabaseServiceProvider::new(database_url)
489 .with_config(config);
490 let managed_pool = provider.create_managed_pool().await?;
491
492 registry.register(name, Arc::new(managed_pool));
493 }
494
495 tracing::info!("Created custom pool registry with {} pools", registry.pool_names().len());
496 Ok(registry)
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_pool_config_defaults() {
505 let config = DatabasePoolConfig::default();
506 assert_eq!(config.max_connections, 10);
507 assert_eq!(config.min_connections, 1);
508 assert_eq!(config.acquire_timeout_seconds, 30);
509 assert_eq!(config.idle_timeout_seconds, Some(600));
510 assert_eq!(config.max_lifetime_seconds, Some(1800));
511 assert!(config.test_before_acquire);
512 }
513
514 #[test]
515 fn test_database_service_provider_creation() {
516 let provider = DatabaseServiceProvider::new("postgresql://test".to_string());
517 assert_eq!(provider.database_url(), "postgresql://test");
518 assert_eq!(provider.config().max_connections, 10);
519 assert_eq!(provider.config().min_connections, 1);
520 assert_eq!(provider.config().acquire_timeout_seconds, 30);
521 assert_eq!(provider.service_name(), "database_pool");
522 }
523
524 #[test]
525 fn test_database_service_provider_configuration() {
526 let provider = DatabaseServiceProvider::new("postgresql://test".to_string())
527 .with_max_connections(20)
528 .with_min_connections(5)
529 .with_acquire_timeout(60)
530 .with_idle_timeout(Some(300))
531 .with_max_lifetime(Some(900))
532 .with_test_before_acquire(false)
533 .with_service_name("custom_db".to_string());
534
535 assert_eq!(provider.config().max_connections, 20);
536 assert_eq!(provider.config().min_connections, 5);
537 assert_eq!(provider.config().acquire_timeout_seconds, 60);
538 assert_eq!(provider.config().idle_timeout_seconds, Some(300));
539 assert_eq!(provider.config().max_lifetime_seconds, Some(900));
540 assert!(!provider.config().test_before_acquire);
541 assert_eq!(provider.service_name(), "custom_db");
542 }
543
544 #[test]
545 fn test_provider_name() {
546 let provider = DatabaseServiceProvider::new("postgresql://test".to_string());
547 assert_eq!(provider.name(), "DatabaseServiceProvider");
548 }
549
550 #[test]
551 fn test_database_service_provider_accessors() {
552 let provider = DatabaseServiceProvider::new("postgresql://test_db".to_string())
553 .with_service_name("custom_service".to_string());
554
555 assert_eq!(provider.database_url(), "postgresql://test_db");
556 assert_eq!(provider.service_name(), "custom_service");
557 }
558
559 #[test]
560 fn test_database_service_provider_defaults() {
561 let provider = DatabaseServiceProvider::new("postgresql://test".to_string());
562
563 assert_eq!(provider.config().max_connections, 10);
564 assert_eq!(provider.config().min_connections, 1);
565 assert_eq!(provider.config().acquire_timeout_seconds, 30);
566 assert_eq!(provider.config().idle_timeout_seconds, Some(600));
567 assert_eq!(provider.config().max_lifetime_seconds, Some(1800));
568 assert!(provider.config().test_before_acquire);
569 assert_eq!(provider.service_name(), "database_pool");
570 }
571
572 #[test]
573 fn test_database_service_provider_fluent_configuration() {
574 let provider = DatabaseServiceProvider::new("postgresql://test".to_string())
575 .with_max_connections(50)
576 .with_min_connections(10)
577 .with_acquire_timeout(120)
578 .with_idle_timeout(None)
579 .with_max_lifetime(Some(3600))
580 .with_service_name("production_db".to_string());
581
582 assert_eq!(provider.config().max_connections, 50);
583 assert_eq!(provider.config().min_connections, 10);
584 assert_eq!(provider.config().acquire_timeout_seconds, 120);
585 assert_eq!(provider.config().idle_timeout_seconds, None);
586 assert_eq!(provider.config().max_lifetime_seconds, Some(3600));
587 assert_eq!(provider.service_name(), "production_db");
588 assert_eq!(provider.database_url(), "postgresql://test");
589 }
590
591 #[test]
592 fn test_pool_config_creation() {
593 let config = PoolConfig::default();
594 assert_eq!(config.max_connections, 10);
595 assert_eq!(config.min_connections, 1);
596 assert_eq!(config.acquire_timeout_seconds, 30);
597 assert_eq!(config.idle_timeout_seconds, Some(600));
598 assert_eq!(config.max_lifetime_seconds, Some(1800));
599 assert!(config.test_before_acquire);
600 }
601
602 #[test]
603 fn test_managed_pool_config_access() {
604 let config = DatabasePoolConfig {
605 max_connections: 5,
606 min_connections: 2,
607 acquire_timeout_seconds: 60,
608 idle_timeout_seconds: None,
609 max_lifetime_seconds: Some(3600),
610 test_before_acquire: false,
611 };
612
613 assert_eq!(config.max_connections, 5);
615 assert_eq!(config.min_connections, 2);
616 assert_eq!(config.acquire_timeout_seconds, 60);
617 assert_eq!(config.idle_timeout_seconds, None);
618 assert_eq!(config.max_lifetime_seconds, Some(3600));
619 assert!(!config.test_before_acquire);
620 }
621
622 #[test]
623 fn test_pool_config_builder() {
624 let config = DatabasePoolConfig {
625 max_connections: 20,
626 min_connections: 2,
627 acquire_timeout_seconds: 45,
628 idle_timeout_seconds: Some(300),
629 max_lifetime_seconds: Some(1200),
630 test_before_acquire: false,
631 };
632
633 let provider = DatabaseServiceProvider::new("postgresql://test".to_string())
634 .with_config(config.clone());
635
636 assert_eq!(provider.config().max_connections, 20);
637 assert_eq!(provider.config().min_connections, 2);
638 assert_eq!(provider.config().acquire_timeout_seconds, 45);
639 assert_eq!(provider.config().idle_timeout_seconds, Some(300));
640 assert_eq!(provider.config().max_lifetime_seconds, Some(1200));
641 assert!(!provider.config().test_before_acquire);
642 }
643
644 #[test]
645 fn test_pool_registry_creation() {
646 let registry = PoolRegistry::new();
647 assert!(registry.get_default().is_none());
648 assert!(registry.pool_names().is_empty());
649
650 let stats = registry.get_all_stats();
652 assert!(stats.is_empty());
653 }
654
655 #[test]
656 fn test_pool_error_types() {
657 let timeout_error = PoolError::ConnectionTimeout { timeout: 30 };
658 let pool_closed_error = PoolError::PoolClosed;
659 let exhausted_error = PoolError::PoolExhausted { max_connections: 10 };
660
661 assert!(timeout_error.to_string().contains("timeout"));
663 assert!(pool_closed_error.to_string().contains("closed"));
664 assert!(exhausted_error.to_string().contains("exhausted"));
665 }
666
667 #[test]
668 fn test_pool_error_model_conversion() {
669 let pool_error = PoolError::PoolExhausted { max_connections: 5 };
670 let model_error: ModelError = pool_error.into();
671
672 assert!(matches!(model_error, ModelError::Connection(_)));
674 }
675}