1use crate::error::{Result, TdbError};
15use crate::store::TdbStore;
16use parking_lot::{Mutex, RwLock};
17use std::collections::VecDeque;
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23#[derive(Debug, Clone)]
25pub struct ConnectionPoolConfig {
26 pub min_connections: usize,
28 pub max_connections: usize,
30 pub acquire_timeout: Duration,
32 pub max_idle_time: Duration,
34 pub enable_health_check: bool,
36 pub health_check_interval: Duration,
38}
39
40impl Default for ConnectionPoolConfig {
41 fn default() -> Self {
42 Self {
43 min_connections: 2,
44 max_connections: 10,
45 acquire_timeout: Duration::from_secs(30),
46 max_idle_time: Duration::from_secs(300), enable_health_check: true,
48 health_check_interval: Duration::from_secs(60),
49 }
50 }
51}
52
53pub struct PooledConnection {
55 store: Option<TdbStore>,
57 id: u64,
59 last_used: Instant,
61 pool: Arc<ConnectionPoolInner>,
63}
64
65impl PooledConnection {
66 pub fn store(&self) -> &TdbStore {
68 self.store.as_ref().expect("Store should be present")
69 }
70
71 pub fn store_mut(&mut self) -> &mut TdbStore {
73 self.store.as_mut().expect("Store should be present")
74 }
75
76 pub fn id(&self) -> u64 {
78 self.id
79 }
80
81 pub fn idle_time(&self) -> Duration {
83 self.last_used.elapsed()
84 }
85
86 fn touch(&mut self) {
88 self.last_used = Instant::now();
89 }
90}
91
92impl Drop for PooledConnection {
93 fn drop(&mut self) {
94 if let Some(store) = self.store.take() {
96 self.pool.return_connection(store, self.id);
97 }
98 }
99}
100
101struct ConnectionPoolInner {
103 db_path: PathBuf,
105 config: ConnectionPoolConfig,
107 available: Mutex<VecDeque<(u64, TdbStore)>>,
109 next_id: AtomicU64,
111 current_size: AtomicUsize,
113 stats: ConnectionPoolStats,
115}
116
117impl ConnectionPoolInner {
118 fn create_connection(&self) -> Result<TdbStore> {
120 TdbStore::open(&self.db_path)
121 }
122
123 fn return_connection(&self, store: TdbStore, id: u64) {
125 let mut available = self.available.lock();
126
127 if available.len() < self.config.max_connections {
129 available.push_back((id, store));
130 self.stats
131 .returned_connections
132 .fetch_add(1, Ordering::Relaxed);
133 } else {
134 drop(store);
136 self.current_size.fetch_sub(1, Ordering::Relaxed);
137 self.stats
138 .closed_connections
139 .fetch_add(1, Ordering::Relaxed);
140 }
141 }
142}
143
144pub struct ConnectionPool {
146 inner: Arc<ConnectionPoolInner>,
147}
148
149impl ConnectionPool {
150 pub fn new<P: AsRef<Path>>(db_path: P, config: ConnectionPoolConfig) -> Result<Self> {
152 let db_path = db_path.as_ref().to_path_buf();
153
154 if config.min_connections > config.max_connections {
156 return Err(TdbError::Other(
157 "min_connections cannot exceed max_connections".to_string(),
158 ));
159 }
160
161 let inner = Arc::new(ConnectionPoolInner {
162 db_path: db_path.clone(),
163 config: config.clone(),
164 available: Mutex::new(VecDeque::with_capacity(config.max_connections)),
165 next_id: AtomicU64::new(1),
166 current_size: AtomicUsize::new(0),
167 stats: ConnectionPoolStats::default(),
168 });
169
170 for _ in 0..config.min_connections {
172 let store = TdbStore::open(&db_path)?;
173 let id = inner.next_id.fetch_add(1, Ordering::Relaxed);
174 inner.available.lock().push_back((id, store));
175 inner.current_size.fetch_add(1, Ordering::Relaxed);
176 }
177
178 Ok(Self { inner })
179 }
180
181 pub fn acquire(&self) -> Result<PooledConnection> {
183 self.inner
184 .stats
185 .acquire_requests
186 .fetch_add(1, Ordering::Relaxed);
187 let start = Instant::now();
188
189 loop {
191 {
193 let mut available = self.inner.available.lock();
194 if let Some((id, store)) = available.pop_front() {
195 self.inner
196 .stats
197 .successful_acquires
198 .fetch_add(1, Ordering::Relaxed);
199
200 return Ok(PooledConnection {
201 store: Some(store),
202 id,
203 last_used: Instant::now(),
204 pool: Arc::clone(&self.inner),
205 });
206 }
207 }
208
209 let current_size = self.inner.current_size.load(Ordering::Relaxed);
211 if current_size < self.inner.config.max_connections {
212 match self.inner.create_connection() {
213 Ok(store) => {
214 let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
215 self.inner.current_size.fetch_add(1, Ordering::Relaxed);
216 self.inner
217 .stats
218 .created_connections
219 .fetch_add(1, Ordering::Relaxed);
220 self.inner
221 .stats
222 .successful_acquires
223 .fetch_add(1, Ordering::Relaxed);
224
225 return Ok(PooledConnection {
226 store: Some(store),
227 id,
228 last_used: Instant::now(),
229 pool: Arc::clone(&self.inner),
230 });
231 }
232 Err(e) => {
233 self.inner
234 .stats
235 .failed_acquires
236 .fetch_add(1, Ordering::Relaxed);
237 return Err(e);
238 }
239 }
240 }
241
242 if start.elapsed() >= self.inner.config.acquire_timeout {
244 self.inner
245 .stats
246 .timeout_acquires
247 .fetch_add(1, Ordering::Relaxed);
248 return Err(TdbError::Other(format!(
249 "Connection acquire timeout after {:?}",
250 self.inner.config.acquire_timeout
251 )));
252 }
253
254 std::thread::sleep(Duration::from_millis(10));
256 }
257 }
258
259 pub fn stats(&self) -> ConnectionPoolStatsSnapshot {
261 ConnectionPoolStatsSnapshot {
262 current_size: self.inner.current_size.load(Ordering::Relaxed),
263 available: self.inner.available.lock().len(),
264 acquire_requests: self.inner.stats.acquire_requests.load(Ordering::Relaxed),
265 successful_acquires: self.inner.stats.successful_acquires.load(Ordering::Relaxed),
266 failed_acquires: self.inner.stats.failed_acquires.load(Ordering::Relaxed),
267 timeout_acquires: self.inner.stats.timeout_acquires.load(Ordering::Relaxed),
268 created_connections: self.inner.stats.created_connections.load(Ordering::Relaxed),
269 returned_connections: self
270 .inner
271 .stats
272 .returned_connections
273 .load(Ordering::Relaxed),
274 closed_connections: self.inner.stats.closed_connections.load(Ordering::Relaxed),
275 }
276 }
277
278 pub fn size(&self) -> usize {
280 self.inner.current_size.load(Ordering::Relaxed)
281 }
282
283 pub fn available(&self) -> usize {
285 self.inner.available.lock().len()
286 }
287
288 pub fn close_idle_connections(&self) -> usize {
290 let mut available = self.inner.available.lock();
291 let _max_idle = self.inner.config.max_idle_time;
292
293 let closed_count = 0;
294 let _now = Instant::now();
295
296 available.retain(|(_, _)| {
298 true
301 });
302
303 closed_count
304 }
305
306 pub fn resize(&self, new_size: usize) -> Result<()> {
308 if new_size < self.inner.config.min_connections {
309 return Err(TdbError::Other(format!(
310 "New size {} is below minimum {}",
311 new_size, self.inner.config.min_connections
312 )));
313 }
314
315 if new_size > self.inner.config.max_connections {
316 return Err(TdbError::Other(format!(
317 "New size {} exceeds maximum {}",
318 new_size, self.inner.config.max_connections
319 )));
320 }
321
322 let current_size = self.inner.current_size.load(Ordering::Relaxed);
323
324 if new_size > current_size {
325 for _ in current_size..new_size {
327 let store = self.inner.create_connection()?;
328 let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
329 self.inner.available.lock().push_back((id, store));
330 self.inner.current_size.fetch_add(1, Ordering::Relaxed);
331 self.inner
332 .stats
333 .created_connections
334 .fetch_add(1, Ordering::Relaxed);
335 }
336 } else if new_size < current_size {
337 let to_remove = current_size - new_size;
339 let mut available = self.inner.available.lock();
340
341 for _ in 0..to_remove.min(available.len()) {
342 if available.pop_back().is_some() {
343 self.inner.current_size.fetch_sub(1, Ordering::Relaxed);
344 self.inner
345 .stats
346 .closed_connections
347 .fetch_add(1, Ordering::Relaxed);
348 }
349 }
350 }
351
352 Ok(())
353 }
354}
355
356#[derive(Debug, Default)]
358struct ConnectionPoolStats {
359 acquire_requests: AtomicU64,
361 successful_acquires: AtomicU64,
363 failed_acquires: AtomicU64,
365 timeout_acquires: AtomicU64,
367 created_connections: AtomicU64,
369 returned_connections: AtomicU64,
371 closed_connections: AtomicU64,
373}
374
375#[derive(Debug, Clone)]
377pub struct ConnectionPoolStatsSnapshot {
378 pub current_size: usize,
380 pub available: usize,
382 pub acquire_requests: u64,
384 pub successful_acquires: u64,
386 pub failed_acquires: u64,
388 pub timeout_acquires: u64,
390 pub created_connections: u64,
392 pub returned_connections: u64,
394 pub closed_connections: u64,
396}
397
398impl ConnectionPoolStatsSnapshot {
399 pub fn success_rate(&self) -> f64 {
401 if self.acquire_requests == 0 {
402 0.0
403 } else {
404 (self.successful_acquires as f64 / self.acquire_requests as f64) * 100.0
405 }
406 }
407
408 pub fn utilization_rate(&self) -> f64 {
410 if self.current_size == 0 {
411 0.0
412 } else {
413 let in_use = self.current_size - self.available;
414 (in_use as f64 / self.current_size as f64) * 100.0
415 }
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use tempfile::TempDir;
423
424 fn create_test_pool() -> (TempDir, ConnectionPool) {
425 let temp_dir = TempDir::new().unwrap();
426 let db_path = temp_dir.path().join("test.db");
427
428 let config = ConnectionPoolConfig {
429 min_connections: 2,
430 max_connections: 5,
431 acquire_timeout: Duration::from_secs(5),
432 ..Default::default()
433 };
434
435 let pool = ConnectionPool::new(&db_path, config).unwrap();
436 (temp_dir, pool)
437 }
438
439 #[test]
440 fn test_connection_pool_creation() {
441 let (_temp_dir, pool) = create_test_pool();
442
443 assert_eq!(pool.size(), 2); assert_eq!(pool.available(), 2);
445 }
446
447 #[test]
448 fn test_acquire_and_return() {
449 let (_temp_dir, pool) = create_test_pool();
450
451 {
453 let conn = pool.acquire().unwrap();
454 assert_eq!(pool.available(), 1);
455 let _ = conn.store();
457 }
458
459 assert_eq!(pool.available(), 2);
461 }
462
463 #[test]
464 fn test_multiple_acquires() {
465 let (_temp_dir, pool) = create_test_pool();
466
467 let conn1 = pool.acquire().unwrap();
468 let conn2 = pool.acquire().unwrap();
469 let conn3 = pool.acquire().unwrap(); assert_eq!(pool.size(), 3);
472 assert_eq!(pool.available(), 0);
473
474 drop(conn1);
475 assert_eq!(pool.available(), 1);
476
477 drop(conn2);
478 drop(conn3);
479 assert_eq!(pool.available(), 3);
480 }
481
482 #[test]
483 fn test_max_connections_limit() {
484 let (_temp_dir, pool) = create_test_pool();
485
486 let mut connections = Vec::new();
488 for _ in 0..5 {
489 connections.push(pool.acquire().unwrap());
490 }
491
492 assert_eq!(pool.size(), 5);
493 assert_eq!(pool.available(), 0);
494 }
495
496 #[test]
497 fn test_connection_pool_stats() {
498 let (_temp_dir, pool) = create_test_pool();
499
500 let _conn1 = pool.acquire().unwrap();
501 let _conn2 = pool.acquire().unwrap();
502
503 let stats = pool.stats();
504 assert_eq!(stats.acquire_requests, 2);
505 assert_eq!(stats.successful_acquires, 2);
506 assert!(stats.success_rate() > 99.0);
507 }
508
509 #[test]
510 fn test_pool_resize_grow() {
511 let (_temp_dir, pool) = create_test_pool();
512
513 assert_eq!(pool.size(), 2);
514
515 pool.resize(4).unwrap();
516 assert_eq!(pool.size(), 4);
517 assert_eq!(pool.available(), 4);
518 }
519
520 #[test]
521 fn test_pool_resize_shrink() {
522 let (_temp_dir, pool) = create_test_pool();
523
524 pool.resize(4).unwrap();
525 assert_eq!(pool.size(), 4);
526
527 pool.resize(2).unwrap();
528 assert_eq!(pool.size(), 2);
529 }
530
531 #[test]
532 fn test_resize_validation() {
533 let (_temp_dir, pool) = create_test_pool();
534
535 assert!(pool.resize(1).is_err());
537
538 assert!(pool.resize(10).is_err());
540 }
541
542 #[test]
543 fn test_utilization_rate() {
544 let (_temp_dir, pool) = create_test_pool();
545
546 let _conn1 = pool.acquire().unwrap();
547
548 let stats = pool.stats();
549 assert!((stats.utilization_rate() - 50.0).abs() < 1.0);
551 }
552
553 #[test]
554 fn test_connection_id() {
555 let (_temp_dir, pool) = create_test_pool();
556
557 let conn1 = pool.acquire().unwrap();
558 let conn2 = pool.acquire().unwrap();
559
560 assert_ne!(conn1.id(), conn2.id());
561 }
562
563 #[test]
564 fn test_pooled_connection_touch() {
565 let (_temp_dir, pool) = create_test_pool();
566
567 let mut conn = pool.acquire().unwrap();
568
569 std::thread::sleep(Duration::from_millis(100));
570 assert!(conn.idle_time() >= Duration::from_millis(100));
571
572 conn.touch();
573 assert!(conn.idle_time() < Duration::from_millis(50));
574 }
575
576 #[test]
577 fn test_concurrent_acquires() {
578 use std::thread;
579
580 let (_temp_dir, pool) = create_test_pool();
581 let pool = Arc::new(pool);
582
583 let mut handles = vec![];
584
585 for _ in 0..3 {
586 let pool_clone = Arc::clone(&pool);
587 let handle = thread::spawn(move || {
588 let _conn = pool_clone.acquire().unwrap();
589 thread::sleep(Duration::from_millis(50));
590 });
591 handles.push(handle);
592 }
593
594 for handle in handles {
595 handle.join().unwrap();
596 }
597
598 let stats = pool.stats();
599 assert_eq!(stats.successful_acquires, 3);
600 }
601
602 #[test]
603 fn test_stats_snapshot_success_rate() {
604 let stats = ConnectionPoolStatsSnapshot {
605 current_size: 5,
606 available: 2,
607 acquire_requests: 100,
608 successful_acquires: 95,
609 failed_acquires: 3,
610 timeout_acquires: 2,
611 created_connections: 5,
612 returned_connections: 90,
613 closed_connections: 0,
614 };
615
616 assert_eq!(stats.success_rate(), 95.0);
617 }
618}