Skip to main content

prax_sqlite/
pool.rs

1//! Connection pool for SQLite.
2//!
3//! This module provides an optimized connection pool for SQLite databases.
4//! Unlike PostgreSQL/MySQL, SQLite has unique characteristics:
5//!
6//! - In-memory databases: Each connection has its own isolated database
7//! - File-based databases: Connections share the same database file
8//!
9//! For file-based databases, this pool reuses connections to avoid the
10//! overhead of opening new connections (~200µs per open).
11
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::Mutex;
17use tokio::sync::Semaphore;
18use tokio_rusqlite::Connection;
19use tracing::{debug, info, trace};
20
21use crate::config::SqliteConfig;
22use crate::connection::{PooledConnection, SqliteConnection};
23use crate::error::{SqliteError, SqliteResult};
24
25/// A connection pool for SQLite.
26///
27/// This pool provides connection reuse for file-based SQLite databases,
28/// significantly improving performance by avoiding repeated connection opens.
29///
30/// # Example
31///
32/// ```rust,ignore
33/// use prax_sqlite::{SqlitePool, SqliteConfig};
34///
35/// let pool = SqlitePool::new(SqliteConfig::file("data.db")).await?;
36/// let conn = pool.get().await?;
37/// // Use connection...
38/// // Connection is returned to pool when dropped
39/// ```
40#[derive(Clone)]
41pub struct SqlitePool {
42    config: Arc<SqliteConfig>,
43    /// Semaphore to limit concurrent connections.
44    semaphore: Arc<Semaphore>,
45    /// Pool of idle connections (only for file-based databases).
46    idle_connections: Arc<Mutex<VecDeque<PooledConnection>>>,
47    pool_config: Arc<PoolConfig>,
48    /// Statistics about pool usage.
49    stats: Arc<Mutex<PoolStats>>,
50}
51
52/// Statistics about pool usage.
53#[derive(Debug, Default, Clone)]
54pub struct PoolStats {
55    /// Number of connection reuses.
56    pub reuses: u64,
57    /// Number of new connections opened.
58    pub opens: u64,
59    /// Number of connections closed due to expiration.
60    pub expirations: u64,
61    /// Number of connections currently in use.
62    pub in_use: usize,
63}
64
65impl SqlitePool {
66    /// Create a new connection pool from configuration.
67    pub async fn new(config: SqliteConfig) -> SqliteResult<Self> {
68        Self::with_pool_config(config, PoolConfig::default()).await
69    }
70
71    /// Create a new connection pool with custom pool configuration.
72    pub async fn with_pool_config(
73        config: SqliteConfig,
74        pool_config: PoolConfig,
75    ) -> SqliteResult<Self> {
76        info!(
77            path = %config.path_str(),
78            max_connections = %pool_config.max_connections,
79            "SQLite connection pool created"
80        );
81
82        // Verify we can open at least one connection
83        let test_conn = Self::open_connection(&config).await?;
84        drop(test_conn);
85
86        let pool = Self {
87            config: Arc::new(config),
88            semaphore: Arc::new(Semaphore::new(pool_config.max_connections)),
89            idle_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
90                pool_config.max_connections,
91            ))),
92            pool_config: Arc::new(pool_config),
93            stats: Arc::new(Mutex::new(PoolStats::default())),
94        };
95
96        // Pre-warm the pool with min_connections
97        if !pool.config.path.is_memory() && pool.pool_config.min_connections > 0 {
98            debug!(
99                "Pre-warming pool with {} connections",
100                pool.pool_config.min_connections
101            );
102            for _ in 0..pool.pool_config.min_connections {
103                if let Ok(conn) = Self::open_connection(&pool.config).await {
104                    let mut idle = pool.idle_connections.lock();
105                    idle.push_back(PooledConnection::new(conn));
106                }
107            }
108        }
109
110        Ok(pool)
111    }
112
113    /// Open a new connection with the given configuration.
114    async fn open_connection(config: &SqliteConfig) -> SqliteResult<Connection> {
115        let path = config.path_str().to_string();
116        let init_sql = config.init_sql();
117
118        let conn = if config.path.is_memory() {
119            Connection::open_in_memory().await?
120        } else {
121            Connection::open(&path).await?
122        };
123
124        // Run initialization SQL
125        conn.call(move |conn| {
126            conn.execute_batch(&init_sql)?;
127            Ok(())
128        })
129        .await?;
130
131        Ok(conn)
132    }
133
134    /// Get a connection from the pool.
135    ///
136    /// For file-based databases, this will try to reuse an idle connection
137    /// before opening a new one. For in-memory databases, always opens a new
138    /// connection (since each connection has its own database).
139    pub async fn get(&self) -> SqliteResult<SqliteConnection> {
140        trace!("Acquiring connection from pool");
141
142        // Wait for a permit (limits concurrent connections)
143        let permit = self
144            .semaphore
145            .clone()
146            .acquire_owned()
147            .await
148            .map_err(|e| SqliteError::pool(format!("failed to acquire permit: {}", e)))?;
149
150        // Update stats
151        {
152            let mut stats = self.stats.lock();
153            stats.in_use += 1;
154        }
155
156        // For in-memory databases, always create a new connection
157        // (each connection has its own separate database)
158        if self.config.path.is_memory() {
159            let conn = Self::open_connection(&self.config).await?;
160            {
161                let mut stats = self.stats.lock();
162                stats.opens += 1;
163            }
164            return Ok(SqliteConnection::new_pooled(
165                conn, permit, None, // No return channel for in-memory
166            ));
167        }
168
169        // Try to get an idle connection
170        let conn: Option<Connection> = {
171            let mut idle = self.idle_connections.lock();
172
173            // Clean up expired connections while searching
174            while let Some(pooled) = idle.pop_front() {
175                let is_expired = if let Some(lifetime) = self.pool_config.max_lifetime {
176                    pooled.created_at.elapsed() > lifetime
177                } else {
178                    false
179                };
180                let is_idle_expired = if let Some(timeout) = self.pool_config.idle_timeout {
181                    pooled.last_used.elapsed() > timeout
182                } else {
183                    false
184                };
185
186                if is_expired || is_idle_expired {
187                    let mut stats = self.stats.lock();
188                    stats.expirations += 1;
189                    // Connection will be dropped
190                    continue;
191                }
192                // Found a valid connection
193                let mut stats = self.stats.lock();
194                stats.reuses += 1;
195                return Ok(SqliteConnection::new_pooled(
196                    pooled.conn,
197                    permit,
198                    Some(self.idle_connections.clone()),
199                ));
200            }
201            None
202        };
203
204        // No idle connection available, open a new one
205        if conn.is_none() {
206            debug!("No idle connections, opening new connection");
207            let new_conn = Self::open_connection(&self.config).await?;
208            {
209                let mut stats = self.stats.lock();
210                stats.opens += 1;
211            }
212            return Ok(SqliteConnection::new_pooled(
213                new_conn,
214                permit,
215                Some(self.idle_connections.clone()),
216            ));
217        }
218
219        unreachable!()
220    }
221
222    /// Get the pool configuration.
223    pub fn config(&self) -> &SqliteConfig {
224        &self.config
225    }
226
227    /// Get the pool settings.
228    pub fn pool_config(&self) -> &PoolConfig {
229        &self.pool_config
230    }
231
232    /// Get pool statistics.
233    pub fn stats(&self) -> PoolStats {
234        self.stats.lock().clone()
235    }
236
237    /// Reset pool statistics.
238    pub fn reset_stats(&self) {
239        let mut stats = self.stats.lock();
240        *stats = PoolStats::default();
241    }
242
243    /// Check if the pool is healthy by attempting to get a connection.
244    pub async fn is_healthy(&self) -> bool {
245        match Self::open_connection(&self.config).await {
246            Ok(conn) => {
247                let result = conn
248                    .call(|conn| {
249                        conn.execute("SELECT 1", [])?;
250                        Ok(())
251                    })
252                    .await;
253                result.is_ok()
254            }
255            Err(_) => false,
256        }
257    }
258
259    /// Get the number of available permits (potential concurrent connections).
260    pub fn available_permits(&self) -> usize {
261        self.semaphore.available_permits()
262    }
263
264    /// Get the number of idle connections in the pool.
265    pub fn idle_count(&self) -> usize {
266        self.idle_connections.lock().len()
267    }
268
269    /// Create a builder for configuring the pool.
270    pub fn builder() -> SqlitePoolBuilder {
271        SqlitePoolBuilder::new()
272    }
273}
274
275/// Configuration for the connection pool.
276#[derive(Debug, Clone)]
277pub struct PoolConfig {
278    /// Maximum number of concurrent connections.
279    pub max_connections: usize,
280    /// Minimum number of connections to keep in the pool.
281    pub min_connections: usize,
282    /// Connection timeout.
283    pub connection_timeout: Option<Duration>,
284    /// Maximum idle time before a connection is closed.
285    pub idle_timeout: Option<Duration>,
286    /// Maximum lifetime of a connection before it's recycled.
287    pub max_lifetime: Option<Duration>,
288}
289
290impl Default for PoolConfig {
291    fn default() -> Self {
292        Self {
293            max_connections: 5, // SQLite benefits from fewer connections
294            min_connections: 1,
295            connection_timeout: Some(Duration::from_secs(30)),
296            idle_timeout: Some(Duration::from_secs(300)), // 5 minutes
297            max_lifetime: Some(Duration::from_secs(1800)), // 30 minutes
298        }
299    }
300}
301
302/// Builder for creating a connection pool.
303#[derive(Debug, Default)]
304pub struct SqlitePoolBuilder {
305    config: Option<SqliteConfig>,
306    url: Option<String>,
307    pool_config: PoolConfig,
308}
309
310impl SqlitePoolBuilder {
311    /// Create a new pool builder.
312    pub fn new() -> Self {
313        Self {
314            config: None,
315            url: None,
316            pool_config: PoolConfig::default(),
317        }
318    }
319
320    /// Set the database URL.
321    pub fn url(mut self, url: impl Into<String>) -> Self {
322        self.url = Some(url.into());
323        self
324    }
325
326    /// Set the configuration.
327    pub fn config(mut self, config: SqliteConfig) -> Self {
328        self.config = Some(config);
329        self
330    }
331
332    /// Set the maximum number of connections.
333    pub fn max_connections(mut self, n: usize) -> Self {
334        self.pool_config.max_connections = n;
335        self
336    }
337
338    /// Set the connection timeout.
339    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
340        self.pool_config.connection_timeout = Some(timeout);
341        self
342    }
343
344    /// Set the idle timeout.
345    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
346        self.pool_config.idle_timeout = Some(timeout);
347        self
348    }
349
350    /// Build the connection pool.
351    pub async fn build(self) -> SqliteResult<SqlitePool> {
352        let config = if let Some(config) = self.config {
353            config
354        } else if let Some(url) = self.url {
355            SqliteConfig::from_url(url)?
356        } else {
357            return Err(SqliteError::config("no database URL or config provided"));
358        };
359
360        SqlitePool::with_pool_config(config, self.pool_config).await
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[test]
369    fn test_pool_config_default() {
370        let config = PoolConfig::default();
371        assert_eq!(config.max_connections, 5);
372    }
373
374    #[test]
375    fn test_pool_builder() {
376        let builder = SqlitePoolBuilder::new()
377            .url("sqlite::memory:")
378            .max_connections(10);
379
380        assert!(builder.url.is_some());
381        assert_eq!(builder.pool_config.max_connections, 10);
382    }
383
384    #[tokio::test]
385    async fn test_pool_memory() {
386        let pool = SqlitePool::new(SqliteConfig::memory()).await.unwrap();
387        // For in-memory databases, is_healthy opens a new database
388        // so we just verify the pool was created successfully
389        assert!(pool.available_permits() > 0);
390    }
391
392    #[tokio::test]
393    async fn test_pool_get_connection() {
394        let pool = SqlitePool::new(SqliteConfig::memory()).await.unwrap();
395        let conn = pool.get().await;
396        assert!(conn.is_ok());
397    }
398}