eventuali_core/performance/
connection_pool.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, Semaphore};
9use crate::error::EventualiError;
10
11#[derive(Debug, Clone)]
13pub struct PoolStats {
14 pub total_connections: usize,
15 pub active_connections: usize,
16 pub idle_connections: usize,
17 pub total_requests: u64,
18 pub successful_requests: u64,
19 pub failed_requests: u64,
20 pub avg_wait_time_ms: f64,
21 pub max_wait_time_ms: u64,
22}
23
24impl Default for PoolStats {
25 fn default() -> Self {
26 Self {
27 total_connections: 0,
28 active_connections: 0,
29 idle_connections: 0,
30 total_requests: 0,
31 successful_requests: 0,
32 failed_requests: 0,
33 avg_wait_time_ms: 0.0,
34 max_wait_time_ms: 0,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct PoolConfig {
42 pub min_connections: usize,
43 pub max_connections: usize,
44 pub connection_timeout_ms: u64,
45 pub idle_timeout_ms: u64,
46 pub health_check_interval_ms: u64,
47 pub auto_scaling_enabled: bool,
48 pub scale_up_threshold: f64,
49 pub scale_down_threshold: f64,
50}
51
52impl Default for PoolConfig {
53 fn default() -> Self {
54 Self {
55 min_connections: 5,
56 max_connections: 100,
57 connection_timeout_ms: 5000,
58 idle_timeout_ms: 300000, health_check_interval_ms: 30000, auto_scaling_enabled: true,
61 scale_up_threshold: 0.8, scale_down_threshold: 0.3, }
64 }
65}
66
67impl PoolConfig {
68 pub fn high_performance() -> Self {
70 Self {
71 min_connections: 10,
72 max_connections: 200,
73 connection_timeout_ms: 2000, idle_timeout_ms: 180000, health_check_interval_ms: 15000, auto_scaling_enabled: true,
77 scale_up_threshold: 0.7, scale_down_threshold: 0.2, }
80 }
81}
82
83pub struct ConnectionPool {
85 config: PoolConfig,
86 connection_count: Arc<Mutex<usize>>,
87 active_count: Arc<Mutex<usize>>,
88 semaphore: Arc<Semaphore>,
89 stats: Arc<Mutex<PoolStats>>,
90 database_path: String,
91}
92
93impl ConnectionPool {
94 pub async fn new(database_path: String, config: PoolConfig) -> Result<Self, EventualiError> {
96 let connection_count = Arc::new(Mutex::new(config.min_connections));
97 let active_count = Arc::new(Mutex::new(0));
98 let semaphore = Arc::new(Semaphore::new(config.max_connections));
99 let stats = Arc::new(Mutex::new(PoolStats {
100 total_connections: config.min_connections,
101 idle_connections: config.min_connections,
102 ..Default::default()
103 }));
104
105 let pool = Self {
106 config,
107 connection_count,
108 active_count,
109 semaphore,
110 stats,
111 database_path,
112 };
113
114 Ok(pool)
115 }
116
117 pub async fn get_connection(&self) -> Result<PoolGuard<'_>, EventualiError> {
119 let start_time = Instant::now();
120
121 {
123 let mut stats = self.stats.lock().await;
124 stats.total_requests += 1;
125 }
126
127 let permit = match tokio::time::timeout(
129 Duration::from_millis(self.config.connection_timeout_ms),
130 self.semaphore.acquire()
131 ).await {
132 Ok(Ok(permit)) => permit,
133 Ok(Err(_)) => {
134 self.record_failed_request().await;
135 return Err(EventualiError::Configuration("Failed to acquire connection permit".to_string()));
136 }
137 Err(_) => {
138 self.record_failed_request().await;
139 return Err(EventualiError::Configuration("Connection timeout".to_string()));
140 }
141 };
142
143 {
145 let mut active = self.active_count.lock().await;
146 *active += 1;
147 }
148
149 let wait_time = start_time.elapsed();
150 self.record_successful_request(wait_time).await;
151
152 Ok(PoolGuard {
153 database_path: self.database_path.clone(),
154 pool: self.clone(),
155 permit: Some(permit),
156 })
157 }
158
159 pub async fn get_stats(&self) -> PoolStats {
161 let mut stats = self.stats.lock().await;
162 let active_count = *self.active_count.lock().await;
163 let total_count = *self.connection_count.lock().await;
164
165 stats.active_connections = active_count;
166 stats.total_connections = total_count;
167 stats.idle_connections = total_count.saturating_sub(active_count);
168
169 stats.clone()
170 }
171
172 pub fn get_config(&self) -> &PoolConfig {
174 &self.config
175 }
176
177 async fn record_successful_request(&self, wait_time: Duration) {
178 let mut stats = self.stats.lock().await;
179 stats.successful_requests += 1;
180
181 let wait_time_ms = wait_time.as_millis() as u64;
182 if wait_time_ms > stats.max_wait_time_ms {
183 stats.max_wait_time_ms = wait_time_ms;
184 }
185
186 let total_completed = stats.successful_requests + stats.failed_requests;
188 stats.avg_wait_time_ms = (stats.avg_wait_time_ms * (total_completed - 1) as f64 + wait_time_ms as f64) / total_completed as f64;
189 }
190
191 async fn record_failed_request(&self) {
192 let mut stats = self.stats.lock().await;
193 stats.failed_requests += 1;
194 }
195
196 async fn release_connection(&self) {
197 let mut active = self.active_count.lock().await;
198 if *active > 0 {
199 *active -= 1;
200 }
201 }
202}
203
204impl Clone for ConnectionPool {
205 fn clone(&self) -> Self {
206 Self {
207 config: self.config.clone(),
208 connection_count: self.connection_count.clone(),
209 active_count: self.active_count.clone(),
210 semaphore: self.semaphore.clone(),
211 stats: self.stats.clone(),
212 database_path: self.database_path.clone(),
213 }
214 }
215}
216
217pub struct PoolGuard<'a> {
219 database_path: String,
220 pool: ConnectionPool,
221 #[allow(dead_code)] permit: Option<tokio::sync::SemaphorePermit<'a>>,
223}
224
225impl<'a> PoolGuard<'a> {
226 pub fn database_path(&self) -> &str {
228 &self.database_path
229 }
230
231 pub fn create_connection(&self) -> Result<rusqlite::Connection, EventualiError> {
233 let conn = if self.database_path == ":memory:" {
234 rusqlite::Connection::open_in_memory()
235 } else {
236 rusqlite::Connection::open(&self.database_path)
237 }.map_err(|e| EventualiError::Configuration(format!("Failed to create connection: {e}")))?;
238
239 conn.execute_batch("
241 PRAGMA journal_mode = WAL;
242 PRAGMA synchronous = NORMAL;
243 PRAGMA cache_size = -2000;
244 PRAGMA temp_store = MEMORY;
245 PRAGMA mmap_size = 268435456;
246 ").map_err(|e| EventualiError::Configuration(format!("Failed to optimize connection: {e}")))?;
247
248 Ok(conn)
249 }
250}
251
252impl<'a> Drop for PoolGuard<'a> {
253 fn drop(&mut self) {
254 let pool = self.pool.clone();
255 tokio::spawn(async move {
256 pool.release_connection().await;
257 });
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[tokio::test]
266 async fn test_connection_pool_creation() {
267 let config = PoolConfig::default();
268 let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
269
270 let stats = pool.get_stats().await;
271 assert_eq!(stats.total_connections, 5); }
273
274 #[tokio::test]
275 async fn test_connection_acquisition() {
276 let config = PoolConfig::default();
277 let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
278
279 let guard = pool.get_connection().await.unwrap();
280
281 let conn = guard.create_connection().unwrap();
283
284 let result = conn.execute("CREATE TABLE test (id INTEGER)", []);
286 assert!(result.is_ok());
287 }
288
289 #[tokio::test]
290 async fn test_pool_stats_tracking() {
291 let config = PoolConfig::default();
292 let pool = ConnectionPool::new(":memory:".to_string(), config).await.unwrap();
293
294 let _guard = pool.get_connection().await.unwrap();
295 let stats = pool.get_stats().await;
296
297 assert_eq!(stats.total_requests, 1);
298 assert_eq!(stats.successful_requests, 1);
299 assert_eq!(stats.active_connections, 1);
300 }
301}