Skip to main content

do_memory_storage_turso/pool/
mod.rs

1//! Connection pool for Turso/libSQL database connections
2//!
3//! Provides efficient connection management, concurrency limits, and performance monitoring.
4//!
5//! Note: libSQL's Database is already a connection factory. This pool adds:
6//! - Concurrency limits via semaphore
7//! - Connection health validation
8//! - Performance metrics and monitoring
9//! - Graceful lifecycle management
10//! - Adaptive pool sizing for variable loads
11//!
12//! ## Keep-Alive Pool
13//!
14//! The keep-alive pool (`keepalive.rs`) provides additional features:
15//! - Connection last-used tracking
16//! - Stale connection detection and automatic refresh
17//! - Proactive keep-alive pings
18//! - Reduced connection overhead (45ms -> ~5ms)
19//!
20//! ## Adaptive Pool
21//!
22//! The adaptive pool (`adaptive.rs`) provides automatic scaling:
23//! - Dynamically adjusts pool size based on load
24//! - Scales up when utilization exceeds threshold
25//! - Scales down during low utilization periods
26//! - 20% better performance under variable load
27
28pub mod adaptive;
29pub mod caching_pool;
30mod config;
31pub mod connection_wrapper;
32pub mod keepalive;
33
34pub use adaptive::{
35    AdaptiveConnectionPool, AdaptivePoolConfig, AdaptivePoolMetrics, AdaptivePooledConnection,
36    ConnectionCleanupCallback, ConnectionId,
37};
38pub use caching_pool::{CachingPool, CachingPoolConfig, CachingPoolStats, ConnectionGuard};
39pub use config::{PoolConfig, PoolStatistics, PooledConnection};
40pub use connection_wrapper::PooledConnection as WrappedPooledConnection;
41pub use keepalive::{KeepAliveConfig, KeepAliveConnection, KeepAlivePool, KeepAliveStatistics};
42
43use do_memory_core::{Error, Result};
44use libsql::{Connection, Database};
45use parking_lot::RwLock;
46use std::sync::Arc;
47use std::time::{Duration, Instant};
48use tokio::sync::Semaphore;
49use tracing::{debug, info, warn};
50
51/// Connection pool for managing database connections
52///
53/// This pool provides:
54/// - Concurrency limits via semaphore (max_connections)
55/// - Connection health validation
56/// - Performance metrics
57/// - Graceful shutdown
58pub struct ConnectionPool {
59    db: Arc<Database>,
60    config: PoolConfig,
61    semaphore: Arc<Semaphore>,
62    stats: Arc<RwLock<PoolStatistics>>,
63}
64
65impl ConnectionPool {
66    /// Create a new connection pool
67    ///
68    /// # Arguments
69    ///
70    /// * `db` - Database instance to create connections from
71    /// * `config` - Pool configuration
72    ///
73    /// # Example
74    ///
75    /// ```no_run
76    /// use std::sync::Arc;
77    /// use libsql::Builder;
78    /// use do_memory_storage_turso::pool::{ConnectionPool, PoolConfig};
79    ///
80    /// # async fn example() -> anyhow::Result<()> {
81    /// let db = Builder::new_local("test.db").build().await?;
82    /// let config = PoolConfig::default();
83    /// let pool = ConnectionPool::new(Arc::new(db), config).await?;
84    /// # Ok(())
85    /// # }
86    /// ```
87    pub async fn new(db: Arc<Database>, config: PoolConfig) -> Result<Self> {
88        info!(
89            "Creating connection pool with max_connections={}",
90            config.max_connections
91        );
92
93        // Create a semaphore wrapped in Arc for shared ownership
94        let semaphore = Arc::new(Semaphore::new(config.max_connections));
95        let stats = Arc::new(RwLock::new(PoolStatistics::default()));
96
97        let pool = Self {
98            db,
99            config,
100            semaphore,
101            stats,
102        };
103
104        // Validate database connectivity
105        pool.validate_database().await?;
106
107        info!("Connection pool created successfully");
108        Ok(pool)
109    }
110
111    /// Validate database connectivity
112    async fn validate_database(&self) -> Result<()> {
113        let conn = self
114            .db
115            .connect()
116            .map_err(|e| Error::Storage(format!("Failed to connect to database: {}", e)))?;
117
118        conn.query("SELECT 1", ())
119            .await
120            .map_err(|e| Error::Storage(format!("Database validation failed: {}", e)))?;
121
122        Ok(())
123    }
124
125    /// Create a new database connection
126    async fn create_connection(&self) -> Result<Connection> {
127        let conn = self
128            .db
129            .connect()
130            .map_err(|e| Error::Storage(format!("Failed to create connection: {}", e)))?;
131
132        // Update statistics
133        {
134            let mut stats = self.stats.write();
135            stats.total_created += 1;
136        }
137
138        Ok(conn)
139    }
140
141    /// Get a connection from the pool
142    ///
143    /// This will:
144    /// 1. Wait for a semaphore permit (respects max_connections limit)
145    /// 2. Create a new connection from the database
146    /// 3. Optionally validate the connection health
147    /// 4. Return a PooledConnection guard that releases the permit on drop
148    ///
149    /// # Errors
150    ///
151    /// Returns error if:
152    /// - Timeout waiting for available connection slot
153    /// - Failed to create connection
154    /// - Connection health check fails
155    pub async fn get(&self) -> Result<PooledConnection> {
156        let start = Instant::now();
157
158        // Acquire an owned semaphore permit (limits concurrent connections)
159        let owned_permit_fut = Arc::clone(&self.semaphore).acquire_owned();
160        let permit = tokio::time::timeout(self.config.connection_timeout, owned_permit_fut)
161            .await
162            .map_err(|_| {
163                Error::Storage(format!(
164                    "Connection pool timeout after {:?}: max {} connections in use",
165                    self.config.connection_timeout, self.config.max_connections
166                ))
167            })?
168            .map_err(|e| Error::Storage(format!("Failed to acquire connection permit: {}", e)))?;
169
170        let wait_time = start.elapsed();
171
172        // Create a new connection
173        let conn = self.create_connection().await?;
174
175        // Validate connection health if enabled
176        if self.config.enable_health_check {
177            if let Err(e) = self.validate_connection_health(&conn).await {
178                let mut stats = self.stats.write();
179                stats.total_health_checks_failed += 1;
180                return Err(e);
181            }
182
183            let mut stats = self.stats.write();
184            stats.total_health_checks_passed += 1;
185        }
186
187        // Update statistics
188        {
189            let mut stats = self.stats.write();
190            stats.total_checkouts += 1;
191            stats.total_wait_time_ms += wait_time.as_millis() as u64;
192            stats.active_connections += 1;
193            stats.update_averages();
194        }
195
196        debug!(
197            "Connection acquired (wait: {:?}, active: {})",
198            wait_time,
199            self.stats.read().active_connections
200        );
201
202        Ok(PooledConnection {
203            connection: Some(conn),
204            _permit: permit,
205            stats: Arc::clone(&self.stats),
206        })
207    }
208
209    /// Validate a connection is still healthy
210    async fn validate_connection_health(&self, conn: &Connection) -> Result<()> {
211        tokio::time::timeout(self.config.health_check_timeout, conn.query("SELECT 1", ()))
212            .await
213            .map_err(|_| Error::Storage("Connection health check timeout".to_string()))?
214            .map_err(|e| Error::Storage(format!("Connection health check failed: {}", e)))?;
215
216        Ok(())
217    }
218
219    /// Get current pool statistics
220    pub async fn statistics(&self) -> PoolStatistics {
221        self.stats.read().clone()
222    }
223
224    /// Get current pool utilization (0.0 to 1.0)
225    pub async fn utilization(&self) -> f32 {
226        let stats = self.stats.read();
227        if self.config.max_connections == 0 {
228            return 0.0;
229        }
230        stats.active_connections as f32 / self.config.max_connections as f32
231    }
232
233    /// Get number of available connection slots
234    pub async fn available_connections(&self) -> usize {
235        let stats = self.stats.read();
236        self.config
237            .max_connections
238            .saturating_sub(stats.active_connections)
239    }
240
241    /// Check if pool has available capacity
242    pub async fn has_capacity(&self) -> bool {
243        self.available_connections().await > 0
244    }
245
246    /// Gracefully shutdown the pool
247    ///
248    /// Waits for active connections to be returned (up to 30 seconds).
249    pub async fn shutdown(&self) -> Result<()> {
250        info!("Shutting down connection pool");
251
252        let shutdown_timeout = Duration::from_secs(30);
253        let start = Instant::now();
254
255        while start.elapsed() < shutdown_timeout {
256            let active = self.stats.read().active_connections;
257            if active == 0 {
258                break;
259            }
260
261            debug!("Waiting for {} active connections to complete", active);
262            tokio::time::sleep(Duration::from_millis(100)).await;
263        }
264
265        let final_active = self.stats.read().active_connections;
266        if final_active > 0 {
267            warn!(
268                "Shutdown completed with {} active connections still in use",
269                final_active
270            );
271        } else {
272            info!("Connection pool shutdown complete");
273        }
274
275        Ok(())
276    }
277}