eventuali_core/performance/
connection_pool.rs

1//! High-performance database connection pooling
2//!
3//! Provides optimized connection pool management with automatic sizing,
4//! health monitoring, and load balancing capabilities.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, Semaphore};
9use crate::error::EventualiError;
10
11/// Connection pool statistics for monitoring and optimization
12#[derive(Debug, Clone)]
13pub struct PoolStats {
14    pub total_connections: usize,
15    pub active_connections: usize,
16    pub idle_connections: usize,
17    pub total_requests: u64,
18    pub successful_requests: u64,
19    pub failed_requests: u64,
20    pub avg_wait_time_ms: f64,
21    pub max_wait_time_ms: u64,
22}
23
24impl Default for PoolStats {
25    fn default() -> Self {
26        Self {
27            total_connections: 0,
28            active_connections: 0,
29            idle_connections: 0,
30            total_requests: 0,
31            successful_requests: 0,
32            failed_requests: 0,
33            avg_wait_time_ms: 0.0,
34            max_wait_time_ms: 0,
35        }
36    }
37}
38
39/// Configuration for connection pool optimization
40#[derive(Debug, Clone)]
41pub struct PoolConfig {
42    pub min_connections: usize,
43    pub max_connections: usize,
44    pub connection_timeout_ms: u64,
45    pub idle_timeout_ms: u64,
46    pub health_check_interval_ms: u64,
47    pub auto_scaling_enabled: bool,
48    pub scale_up_threshold: f64,
49    pub scale_down_threshold: f64,
50}
51
52impl Default for PoolConfig {
53    fn default() -> Self {
54        Self {
55            min_connections: 5,
56            max_connections: 100,
57            connection_timeout_ms: 5000,
58            idle_timeout_ms: 300000, // 5 minutes
59            health_check_interval_ms: 30000, // 30 seconds
60            auto_scaling_enabled: true,
61            scale_up_threshold: 0.8, // Scale up when 80% connections are in use
62            scale_down_threshold: 0.3, // Scale down when less than 30% are in use
63        }
64    }
65}
66
67impl PoolConfig {
68    /// High-performance configuration for maximum throughput
69    pub fn high_performance() -> Self {
70        Self {
71            min_connections: 10,
72            max_connections: 200,
73            connection_timeout_ms: 2000,  // 2 seconds
74            idle_timeout_ms: 180000,      // 3 minutes
75            health_check_interval_ms: 15000, // 15 seconds
76            auto_scaling_enabled: true,
77            scale_up_threshold: 0.7, // Scale up when 70% connections are in use
78            scale_down_threshold: 0.2, // Scale down when less than 20% are in use
79        }
80    }
81}
82
83/// High-performance connection pool with automatic optimization
84pub struct ConnectionPool {
85    config: PoolConfig,
86    connection_count: Arc<Mutex<usize>>,
87    active_count: Arc<Mutex<usize>>,
88    semaphore: Arc<Semaphore>,
89    stats: Arc<Mutex<PoolStats>>,
90    database_path: String,
91}
92
93impl ConnectionPool {
94    /// Create a new connection pool with the specified configuration
95    pub async fn new(database_path: String, config: PoolConfig) -> Result<Self, EventualiError> {
96        let connection_count = Arc::new(Mutex::new(config.min_connections));
97        let active_count = Arc::new(Mutex::new(0));
98        let semaphore = Arc::new(Semaphore::new(config.max_connections));
99        let stats = Arc::new(Mutex::new(PoolStats {
100            total_connections: config.min_connections,
101            idle_connections: config.min_connections,
102            ..Default::default()
103        }));
104
105        let pool = Self {
106            config,
107            connection_count,
108            active_count,
109            semaphore,
110            stats,
111            database_path,
112        };
113
114        Ok(pool)
115    }
116
117    /// Get a connection from the pool with performance tracking
118    pub async fn get_connection(&self) -> Result<PoolGuard<'_>, EventualiError> {
119        let start_time = Instant::now();
120        
121        // Update stats
122        {
123            let mut stats = self.stats.lock().await;
124            stats.total_requests += 1;
125        }
126
127        // Acquire semaphore permit
128        let permit = match tokio::time::timeout(
129            Duration::from_millis(self.config.connection_timeout_ms),
130            self.semaphore.acquire()
131        ).await {
132            Ok(Ok(permit)) => permit,
133            Ok(Err(_)) => {
134                self.record_failed_request().await;
135                return Err(EventualiError::Configuration("Failed to acquire connection permit".to_string()));
136            }
137            Err(_) => {
138                self.record_failed_request().await;
139                return Err(EventualiError::Configuration("Connection timeout".to_string()));
140            }
141        };
142
143        // Increment active connection count
144        {
145            let mut active = self.active_count.lock().await;
146            *active += 1;
147        }
148
149        let wait_time = start_time.elapsed();
150        self.record_successful_request(wait_time).await;
151
152        Ok(PoolGuard {
153            database_path: self.database_path.clone(),
154            pool: self.clone(),
155            permit: Some(permit),
156        })
157    }
158
159    /// Get current pool statistics
160    pub async fn get_stats(&self) -> PoolStats {
161        let mut stats = self.stats.lock().await;
162        let active_count = *self.active_count.lock().await;
163        let total_count = *self.connection_count.lock().await;
164        
165        stats.active_connections = active_count;
166        stats.total_connections = total_count;
167        stats.idle_connections = total_count.saturating_sub(active_count);
168        
169        stats.clone()
170    }
171
172    /// Get pool configuration
173    pub fn get_config(&self) -> &PoolConfig {
174        &self.config
175    }
176
177    async fn record_successful_request(&self, wait_time: Duration) {
178        let mut stats = self.stats.lock().await;
179        stats.successful_requests += 1;
180        
181        let wait_time_ms = wait_time.as_millis() as u64;
182        if wait_time_ms > stats.max_wait_time_ms {
183            stats.max_wait_time_ms = wait_time_ms;
184        }
185        
186        // Update average wait time (simple moving average)
187        let total_completed = stats.successful_requests + stats.failed_requests;
188        stats.avg_wait_time_ms = (stats.avg_wait_time_ms * (total_completed - 1) as f64 + wait_time_ms as f64) / total_completed as f64;
189    }
190
191    async fn record_failed_request(&self) {
192        let mut stats = self.stats.lock().await;
193        stats.failed_requests += 1;
194    }
195
196    async fn release_connection(&self) {
197        let mut active = self.active_count.lock().await;
198        if *active > 0 {
199            *active -= 1;
200        }
201    }
202}
203
204impl Clone for ConnectionPool {
205    fn clone(&self) -> Self {
206        Self {
207            config: self.config.clone(),
208            connection_count: self.connection_count.clone(),
209            active_count: self.active_count.clone(),
210            semaphore: self.semaphore.clone(),
211            stats: self.stats.clone(),
212            database_path: self.database_path.clone(),
213        }
214    }
215}
216
217/// A guard that represents a connection slot in the pool
218pub struct PoolGuard<'a> {
219    database_path: String,
220    pool: ConnectionPool,
221    #[allow(dead_code)] // Semaphore permit for connection limiting (held but not directly accessed in current implementation)
222    permit: Option<tokio::sync::SemaphorePermit<'a>>,
223}
224
225impl<'a> PoolGuard<'a> {
226    /// Get the database path for creating connections
227    pub fn database_path(&self) -> &str {
228        &self.database_path
229    }
230
231    /// Create a new database connection optimized for performance
232    pub fn create_connection(&self) -> Result<rusqlite::Connection, EventualiError> {
233        let conn = if self.database_path == ":memory:" {
234            rusqlite::Connection::open_in_memory()
235        } else {
236            rusqlite::Connection::open(&self.database_path)
237        }.map_err(|e| EventualiError::Configuration(format!("Failed to create connection: {e}")))?;
238
239        // Optimize connection settings for performance
240        conn.execute_batch("
241            PRAGMA journal_mode = WAL;
242            PRAGMA synchronous = NORMAL;
243            PRAGMA cache_size = -2000;
244            PRAGMA temp_store = MEMORY;
245            PRAGMA mmap_size = 268435456;
246        ").map_err(|e| EventualiError::Configuration(format!("Failed to optimize connection: {e}")))?;
247
248        Ok(conn)
249    }
250}
251
252impl<'a> Drop for PoolGuard<'a> {
253    fn drop(&mut self) {
254        let pool = self.pool.clone();
255        tokio::spawn(async move {
256            pool.release_connection().await;
257        });
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[tokio::test]
266    async fn test_connection_pool_creation() {
267        let config = PoolConfig::default();
268        let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
269        
270        let stats = pool.get_stats().await;
271        assert_eq!(stats.total_connections, 5); // Default min_connections
272    }
273
274    #[tokio::test]
275    async fn test_connection_acquisition() {
276        let config = PoolConfig::default();
277        let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
278        
279        let guard = pool.get_connection().await.unwrap();
280        
281        // Test that we can create a connection
282        let conn = guard.create_connection().unwrap();
283        
284        // Test that we can execute a query
285        let result = conn.execute("CREATE TABLE test (id INTEGER)", []);
286        assert!(result.is_ok());
287    }
288
289    #[tokio::test]
290    async fn test_pool_stats_tracking() {
291        let config = PoolConfig::default();
292        let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
293        
294        let _guard = pool.get_connection().await.unwrap();
295        let stats = pool.get_stats().await;
296        
297        assert_eq!(stats.total_requests, 1);
298        assert_eq!(stats.successful_requests, 1);
299        assert_eq!(stats.active_connections, 1);
300    }
301}