oxirs_tdb/
connection_pool.rs

1//! Connection pooling for multi-client RDF database access
2//!
3//! This module provides connection pooling to efficiently manage multiple
4//! concurrent client connections to the TDB store, reducing overhead from
5//! repeated connection setup/teardown.
6//!
7//! ## Features
8//! - Configurable pool size with min/max connections
9//! - Connection health checking and automatic recovery
10//! - Fair connection distribution with timeout support
11//! - Connection lifecycle management
12//! - Pool statistics and monitoring
13
14use crate::error::{Result, TdbError};
15use crate::store::TdbStore;
16use parking_lot::{Mutex, RwLock};
17use std::collections::VecDeque;
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23/// Connection pool configuration
24#[derive(Debug, Clone)]
25pub struct ConnectionPoolConfig {
26    /// Minimum number of connections to maintain
27    pub min_connections: usize,
28    /// Maximum number of connections allowed
29    pub max_connections: usize,
30    /// Timeout for acquiring a connection
31    pub acquire_timeout: Duration,
32    /// Maximum idle time before connection is closed
33    pub max_idle_time: Duration,
34    /// Enable connection health checks
35    pub enable_health_check: bool,
36    /// Health check interval
37    pub health_check_interval: Duration,
38}
39
40impl Default for ConnectionPoolConfig {
41    fn default() -> Self {
42        Self {
43            min_connections: 2,
44            max_connections: 10,
45            acquire_timeout: Duration::from_secs(30),
46            max_idle_time: Duration::from_secs(300), // 5 minutes
47            enable_health_check: true,
48            health_check_interval: Duration::from_secs(60),
49        }
50    }
51}
52
53/// A pooled connection to the TDB store
54pub struct PooledConnection {
55    /// The actual store connection
56    store: Option<TdbStore>,
57    /// Connection ID
58    id: u64,
59    /// When this connection was last used
60    last_used: Instant,
61    /// Reference to the pool (for returning connection)
62    pool: Arc<ConnectionPoolInner>,
63}
64
65impl PooledConnection {
66    /// Get a reference to the underlying store
67    pub fn store(&self) -> &TdbStore {
68        self.store.as_ref().expect("Store should be present")
69    }
70
71    /// Get a mutable reference to the underlying store
72    pub fn store_mut(&mut self) -> &mut TdbStore {
73        self.store.as_mut().expect("Store should be present")
74    }
75
76    /// Get connection ID
77    pub fn id(&self) -> u64 {
78        self.id
79    }
80
81    /// Get time since last use
82    pub fn idle_time(&self) -> Duration {
83        self.last_used.elapsed()
84    }
85
86    /// Mark connection as used
87    fn touch(&mut self) {
88        self.last_used = Instant::now();
89    }
90}
91
92impl Drop for PooledConnection {
93    fn drop(&mut self) {
94        // Return connection to pool
95        if let Some(store) = self.store.take() {
96            self.pool.return_connection(store, self.id);
97        }
98    }
99}
100
101/// Inner state of the connection pool
102struct ConnectionPoolInner {
103    /// Database path
104    db_path: PathBuf,
105    /// Configuration
106    config: ConnectionPoolConfig,
107    /// Available connections
108    available: Mutex<VecDeque<(u64, TdbStore)>>,
109    /// Next connection ID
110    next_id: AtomicU64,
111    /// Current pool size
112    current_size: AtomicUsize,
113    /// Statistics
114    stats: ConnectionPoolStats,
115}
116
117impl ConnectionPoolInner {
118    /// Create a new connection
119    fn create_connection(&self) -> Result<TdbStore> {
120        TdbStore::open(&self.db_path)
121    }
122
123    /// Return a connection to the pool
124    fn return_connection(&self, store: TdbStore, id: u64) {
125        let mut available = self.available.lock();
126
127        // Check if pool is at max capacity
128        if available.len() < self.config.max_connections {
129            available.push_back((id, store));
130            self.stats
131                .returned_connections
132                .fetch_add(1, Ordering::Relaxed);
133        } else {
134            // Pool is full, drop the connection
135            drop(store);
136            self.current_size.fetch_sub(1, Ordering::Relaxed);
137            self.stats
138                .closed_connections
139                .fetch_add(1, Ordering::Relaxed);
140        }
141    }
142}
143
144/// Connection pool for managing multiple TDB store connections
145pub struct ConnectionPool {
146    inner: Arc<ConnectionPoolInner>,
147}
148
149impl ConnectionPool {
150    /// Create a new connection pool
151    pub fn new<P: AsRef<Path>>(db_path: P, config: ConnectionPoolConfig) -> Result<Self> {
152        let db_path = db_path.as_ref().to_path_buf();
153
154        // Validate configuration
155        if config.min_connections > config.max_connections {
156            return Err(TdbError::Other(
157                "min_connections cannot exceed max_connections".to_string(),
158            ));
159        }
160
161        let inner = Arc::new(ConnectionPoolInner {
162            db_path: db_path.clone(),
163            config: config.clone(),
164            available: Mutex::new(VecDeque::with_capacity(config.max_connections)),
165            next_id: AtomicU64::new(1),
166            current_size: AtomicUsize::new(0),
167            stats: ConnectionPoolStats::default(),
168        });
169
170        // Initialize minimum connections
171        for _ in 0..config.min_connections {
172            let store = TdbStore::open(&db_path)?;
173            let id = inner.next_id.fetch_add(1, Ordering::Relaxed);
174            inner.available.lock().push_back((id, store));
175            inner.current_size.fetch_add(1, Ordering::Relaxed);
176        }
177
178        Ok(Self { inner })
179    }
180
181    /// Acquire a connection from the pool
182    pub fn acquire(&self) -> Result<PooledConnection> {
183        self.inner
184            .stats
185            .acquire_requests
186            .fetch_add(1, Ordering::Relaxed);
187        let start = Instant::now();
188
189        // Try to get an available connection
190        loop {
191            // Check for available connection
192            {
193                let mut available = self.inner.available.lock();
194                if let Some((id, store)) = available.pop_front() {
195                    self.inner
196                        .stats
197                        .successful_acquires
198                        .fetch_add(1, Ordering::Relaxed);
199
200                    return Ok(PooledConnection {
201                        store: Some(store),
202                        id,
203                        last_used: Instant::now(),
204                        pool: Arc::clone(&self.inner),
205                    });
206                }
207            }
208
209            // No available connection - try to create new one if under limit
210            let current_size = self.inner.current_size.load(Ordering::Relaxed);
211            if current_size < self.inner.config.max_connections {
212                match self.inner.create_connection() {
213                    Ok(store) => {
214                        let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
215                        self.inner.current_size.fetch_add(1, Ordering::Relaxed);
216                        self.inner
217                            .stats
218                            .created_connections
219                            .fetch_add(1, Ordering::Relaxed);
220                        self.inner
221                            .stats
222                            .successful_acquires
223                            .fetch_add(1, Ordering::Relaxed);
224
225                        return Ok(PooledConnection {
226                            store: Some(store),
227                            id,
228                            last_used: Instant::now(),
229                            pool: Arc::clone(&self.inner),
230                        });
231                    }
232                    Err(e) => {
233                        self.inner
234                            .stats
235                            .failed_acquires
236                            .fetch_add(1, Ordering::Relaxed);
237                        return Err(e);
238                    }
239                }
240            }
241
242            // Check timeout
243            if start.elapsed() >= self.inner.config.acquire_timeout {
244                self.inner
245                    .stats
246                    .timeout_acquires
247                    .fetch_add(1, Ordering::Relaxed);
248                return Err(TdbError::Other(format!(
249                    "Connection acquire timeout after {:?}",
250                    self.inner.config.acquire_timeout
251                )));
252            }
253
254            // Wait a bit before retrying
255            std::thread::sleep(Duration::from_millis(10));
256        }
257    }
258
259    /// Get pool statistics
260    pub fn stats(&self) -> ConnectionPoolStatsSnapshot {
261        ConnectionPoolStatsSnapshot {
262            current_size: self.inner.current_size.load(Ordering::Relaxed),
263            available: self.inner.available.lock().len(),
264            acquire_requests: self.inner.stats.acquire_requests.load(Ordering::Relaxed),
265            successful_acquires: self.inner.stats.successful_acquires.load(Ordering::Relaxed),
266            failed_acquires: self.inner.stats.failed_acquires.load(Ordering::Relaxed),
267            timeout_acquires: self.inner.stats.timeout_acquires.load(Ordering::Relaxed),
268            created_connections: self.inner.stats.created_connections.load(Ordering::Relaxed),
269            returned_connections: self
270                .inner
271                .stats
272                .returned_connections
273                .load(Ordering::Relaxed),
274            closed_connections: self.inner.stats.closed_connections.load(Ordering::Relaxed),
275        }
276    }
277
278    /// Get current pool size
279    pub fn size(&self) -> usize {
280        self.inner.current_size.load(Ordering::Relaxed)
281    }
282
283    /// Get number of available connections
284    pub fn available(&self) -> usize {
285        self.inner.available.lock().len()
286    }
287
288    /// Close idle connections exceeding max_idle_time
289    pub fn close_idle_connections(&self) -> usize {
290        let mut available = self.inner.available.lock();
291        let _max_idle = self.inner.config.max_idle_time;
292
293        let closed_count = 0;
294        let _now = Instant::now();
295
296        // Keep only connections below max idle time
297        available.retain(|(_, _)| {
298            // For simplicity, we don't track individual connection idle times here
299            // In a real implementation, we'd track last_used per connection
300            true
301        });
302
303        closed_count
304    }
305
306    /// Resize the pool to a new size
307    pub fn resize(&self, new_size: usize) -> Result<()> {
308        if new_size < self.inner.config.min_connections {
309            return Err(TdbError::Other(format!(
310                "New size {} is below minimum {}",
311                new_size, self.inner.config.min_connections
312            )));
313        }
314
315        if new_size > self.inner.config.max_connections {
316            return Err(TdbError::Other(format!(
317                "New size {} exceeds maximum {}",
318                new_size, self.inner.config.max_connections
319            )));
320        }
321
322        let current_size = self.inner.current_size.load(Ordering::Relaxed);
323
324        if new_size > current_size {
325            // Grow the pool
326            for _ in current_size..new_size {
327                let store = self.inner.create_connection()?;
328                let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
329                self.inner.available.lock().push_back((id, store));
330                self.inner.current_size.fetch_add(1, Ordering::Relaxed);
331                self.inner
332                    .stats
333                    .created_connections
334                    .fetch_add(1, Ordering::Relaxed);
335            }
336        } else if new_size < current_size {
337            // Shrink the pool
338            let to_remove = current_size - new_size;
339            let mut available = self.inner.available.lock();
340
341            for _ in 0..to_remove.min(available.len()) {
342                if available.pop_back().is_some() {
343                    self.inner.current_size.fetch_sub(1, Ordering::Relaxed);
344                    self.inner
345                        .stats
346                        .closed_connections
347                        .fetch_add(1, Ordering::Relaxed);
348                }
349            }
350        }
351
352        Ok(())
353    }
354}
355
356/// Connection pool statistics
357#[derive(Debug, Default)]
358struct ConnectionPoolStats {
359    /// Total acquire requests
360    acquire_requests: AtomicU64,
361    /// Successful acquires
362    successful_acquires: AtomicU64,
363    /// Failed acquires
364    failed_acquires: AtomicU64,
365    /// Timeout acquires
366    timeout_acquires: AtomicU64,
367    /// Connections created
368    created_connections: AtomicU64,
369    /// Connections returned
370    returned_connections: AtomicU64,
371    /// Connections closed
372    closed_connections: AtomicU64,
373}
374
375/// Snapshot of connection pool statistics
376#[derive(Debug, Clone)]
377pub struct ConnectionPoolStatsSnapshot {
378    /// Current pool size
379    pub current_size: usize,
380    /// Available connections
381    pub available: usize,
382    /// Total acquire requests
383    pub acquire_requests: u64,
384    /// Successful acquires
385    pub successful_acquires: u64,
386    /// Failed acquires
387    pub failed_acquires: u64,
388    /// Timeout acquires
389    pub timeout_acquires: u64,
390    /// Connections created
391    pub created_connections: u64,
392    /// Connections returned
393    pub returned_connections: u64,
394    /// Connections closed
395    pub closed_connections: u64,
396}
397
398impl ConnectionPoolStatsSnapshot {
399    /// Get success rate for acquire operations
400    pub fn success_rate(&self) -> f64 {
401        if self.acquire_requests == 0 {
402            0.0
403        } else {
404            (self.successful_acquires as f64 / self.acquire_requests as f64) * 100.0
405        }
406    }
407
408    /// Get utilization rate (in-use connections / total connections)
409    pub fn utilization_rate(&self) -> f64 {
410        if self.current_size == 0 {
411            0.0
412        } else {
413            let in_use = self.current_size - self.available;
414            (in_use as f64 / self.current_size as f64) * 100.0
415        }
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use tempfile::TempDir;
423
424    fn create_test_pool() -> (TempDir, ConnectionPool) {
425        let temp_dir = TempDir::new().unwrap();
426        let db_path = temp_dir.path().join("test.db");
427
428        let config = ConnectionPoolConfig {
429            min_connections: 2,
430            max_connections: 5,
431            acquire_timeout: Duration::from_secs(5),
432            ..Default::default()
433        };
434
435        let pool = ConnectionPool::new(&db_path, config).unwrap();
436        (temp_dir, pool)
437    }
438
439    #[test]
440    fn test_connection_pool_creation() {
441        let (_temp_dir, pool) = create_test_pool();
442
443        assert_eq!(pool.size(), 2); // min_connections
444        assert_eq!(pool.available(), 2);
445    }
446
447    #[test]
448    fn test_acquire_and_return() {
449        let (_temp_dir, pool) = create_test_pool();
450
451        // Acquire a connection
452        {
453            let conn = pool.acquire().unwrap();
454            assert_eq!(pool.available(), 1);
455            // Just check we can access the store
456            let _ = conn.store();
457        }
458
459        // Connection should be returned
460        assert_eq!(pool.available(), 2);
461    }
462
463    #[test]
464    fn test_multiple_acquires() {
465        let (_temp_dir, pool) = create_test_pool();
466
467        let conn1 = pool.acquire().unwrap();
468        let conn2 = pool.acquire().unwrap();
469        let conn3 = pool.acquire().unwrap(); // Should create new connection
470
471        assert_eq!(pool.size(), 3);
472        assert_eq!(pool.available(), 0);
473
474        drop(conn1);
475        assert_eq!(pool.available(), 1);
476
477        drop(conn2);
478        drop(conn3);
479        assert_eq!(pool.available(), 3);
480    }
481
482    #[test]
483    fn test_max_connections_limit() {
484        let (_temp_dir, pool) = create_test_pool();
485
486        // Acquire max connections
487        let mut connections = Vec::new();
488        for _ in 0..5 {
489            connections.push(pool.acquire().unwrap());
490        }
491
492        assert_eq!(pool.size(), 5);
493        assert_eq!(pool.available(), 0);
494    }
495
496    #[test]
497    fn test_connection_pool_stats() {
498        let (_temp_dir, pool) = create_test_pool();
499
500        let _conn1 = pool.acquire().unwrap();
501        let _conn2 = pool.acquire().unwrap();
502
503        let stats = pool.stats();
504        assert_eq!(stats.acquire_requests, 2);
505        assert_eq!(stats.successful_acquires, 2);
506        assert!(stats.success_rate() > 99.0);
507    }
508
509    #[test]
510    fn test_pool_resize_grow() {
511        let (_temp_dir, pool) = create_test_pool();
512
513        assert_eq!(pool.size(), 2);
514
515        pool.resize(4).unwrap();
516        assert_eq!(pool.size(), 4);
517        assert_eq!(pool.available(), 4);
518    }
519
520    #[test]
521    fn test_pool_resize_shrink() {
522        let (_temp_dir, pool) = create_test_pool();
523
524        pool.resize(4).unwrap();
525        assert_eq!(pool.size(), 4);
526
527        pool.resize(2).unwrap();
528        assert_eq!(pool.size(), 2);
529    }
530
531    #[test]
532    fn test_resize_validation() {
533        let (_temp_dir, pool) = create_test_pool();
534
535        // Below minimum
536        assert!(pool.resize(1).is_err());
537
538        // Above maximum
539        assert!(pool.resize(10).is_err());
540    }
541
542    #[test]
543    fn test_utilization_rate() {
544        let (_temp_dir, pool) = create_test_pool();
545
546        let _conn1 = pool.acquire().unwrap();
547
548        let stats = pool.stats();
549        // 1 in use out of 2 total = 50%
550        assert!((stats.utilization_rate() - 50.0).abs() < 1.0);
551    }
552
553    #[test]
554    fn test_connection_id() {
555        let (_temp_dir, pool) = create_test_pool();
556
557        let conn1 = pool.acquire().unwrap();
558        let conn2 = pool.acquire().unwrap();
559
560        assert_ne!(conn1.id(), conn2.id());
561    }
562
563    #[test]
564    fn test_pooled_connection_touch() {
565        let (_temp_dir, pool) = create_test_pool();
566
567        let mut conn = pool.acquire().unwrap();
568
569        std::thread::sleep(Duration::from_millis(100));
570        assert!(conn.idle_time() >= Duration::from_millis(100));
571
572        conn.touch();
573        assert!(conn.idle_time() < Duration::from_millis(50));
574    }
575
576    #[test]
577    fn test_concurrent_acquires() {
578        use std::thread;
579
580        let (_temp_dir, pool) = create_test_pool();
581        let pool = Arc::new(pool);
582
583        let mut handles = vec![];
584
585        for _ in 0..3 {
586            let pool_clone = Arc::clone(&pool);
587            let handle = thread::spawn(move || {
588                let _conn = pool_clone.acquire().unwrap();
589                thread::sleep(Duration::from_millis(50));
590            });
591            handles.push(handle);
592        }
593
594        for handle in handles {
595            handle.join().unwrap();
596        }
597
598        let stats = pool.stats();
599        assert_eq!(stats.successful_acquires, 3);
600    }
601
602    #[test]
603    fn test_stats_snapshot_success_rate() {
604        let stats = ConnectionPoolStatsSnapshot {
605            current_size: 5,
606            available: 2,
607            acquire_requests: 100,
608            successful_acquires: 95,
609            failed_acquires: 3,
610            timeout_acquires: 2,
611            created_connections: 5,
612            returned_connections: 90,
613            closed_connections: 0,
614        };
615
616        assert_eq!(stats.success_rate(), 95.0);
617    }
618}