Skip to main content

azoth_sqlite/
read_pool.rs

1//! SQLite Read Connection Pool
2//!
3//! Provides a pool of read-only SQLite connections for concurrent reads.
4//! Unlike LMDB, SQLite requires separate connections for true concurrency.
5
6use azoth_core::{
7    error::{AzothError, Result},
8    ReadPoolConfig,
9};
10use parking_lot::Mutex;
11use rusqlite::{Connection, OpenFlags};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::time::{Duration, Instant};
15use tokio::sync::{Semaphore, SemaphorePermit};
16
17/// A pooled read-only connection for SQLite
18///
19/// This wraps a SQLite read-only connection with automatic permit release
20/// when the connection is returned to the pool.
21pub struct PooledSqliteConnection<'a> {
22    conn: parking_lot::MutexGuard<'a, Connection>,
23    _permit: SemaphorePermit<'a>,
24}
25
26impl<'a> PooledSqliteConnection<'a> {
27    /// Execute a read-only query
28    ///
29    /// # Example
30    /// ```ignore
31    /// let conn = pool.acquire().await?;
32    /// let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |row| row.get(0))?;
33    /// ```
34    pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
35    where
36        P: rusqlite::Params,
37        F: FnOnce(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
38    {
39        self.conn
40            .query_row(sql, params, f)
41            .map_err(|e| AzothError::Projection(e.to_string()))
42    }
43
44    /// Prepare a statement for execution
45    pub fn prepare(&self, sql: &str) -> Result<rusqlite::Statement<'_>> {
46        self.conn
47            .prepare(sql)
48            .map_err(|e| AzothError::Projection(e.to_string()))
49    }
50
51    /// Get direct access to the underlying connection
52    ///
53    /// Use this for complex queries that need the full rusqlite API.
54    pub fn connection(&self) -> &Connection {
55        &self.conn
56    }
57}
58
59/// SQLite Read Connection Pool
60///
61/// Manages a pool of read-only SQLite connections for concurrent reads.
62/// Each connection is opened with `SQLITE_OPEN_READ_ONLY` flag.
63///
64/// # Example
65///
66/// ```ignore
67/// let pool = SqliteReadPool::new(&db_path, ReadPoolConfig::enabled(4))?;
68///
69/// // Acquire a pooled connection
70/// let conn = pool.acquire().await?;
71/// let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |row| row.get(0))?;
72/// // Connection is returned to pool when `conn` is dropped
73/// ```
74pub struct SqliteReadPool {
75    connections: Vec<Mutex<Connection>>,
76    semaphore: Semaphore,
77    acquire_timeout: Duration,
78    enabled: bool,
79    db_path: PathBuf,
80    /// Round-robin index for distributing connection acquisition attempts
81    next_idx: AtomicUsize,
82}
83
84impl SqliteReadPool {
85    /// Create a new read pool with the given configuration
86    ///
87    /// Opens `pool_size` read-only connections to the database.
88    pub fn new(db_path: &Path, config: ReadPoolConfig) -> Result<Self> {
89        let pool_size = if config.enabled { config.pool_size } else { 1 };
90        let mut connections = Vec::with_capacity(pool_size);
91
92        for _ in 0..pool_size {
93            let conn = Connection::open_with_flags(
94                db_path,
95                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
96            )
97            .map_err(|e| AzothError::Projection(e.to_string()))?;
98
99            connections.push(Mutex::new(conn));
100        }
101
102        Ok(Self {
103            connections,
104            semaphore: Semaphore::new(pool_size),
105            acquire_timeout: Duration::from_millis(config.acquire_timeout_ms),
106            enabled: config.enabled,
107            db_path: db_path.to_path_buf(),
108            next_idx: AtomicUsize::new(0),
109        })
110    }
111
112    /// Acquire a pooled read-only connection
113    ///
114    /// Waits up to the configured timeout for a connection to become available.
115    /// Returns an error if the timeout is exceeded.
116    pub async fn acquire(&self) -> Result<PooledSqliteConnection<'_>> {
117        let permit = tokio::time::timeout(self.acquire_timeout, self.semaphore.acquire())
118            .await
119            .map_err(|_| {
120                AzothError::Timeout(format!(
121                    "Read pool acquire timeout after {:?}",
122                    self.acquire_timeout
123                ))
124            })?
125            .map_err(|e| AzothError::Internal(format!("Semaphore closed: {}", e)))?;
126
127        // Round-robin: start from next_idx to distribute lock contention
128        let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
129        for i in 0..self.connections.len() {
130            let idx = (start + i) % self.connections.len();
131            if let Some(guard) = self.connections[idx].try_lock() {
132                return Ok(PooledSqliteConnection {
133                    conn: guard,
134                    _permit: permit,
135                });
136            }
137        }
138
139        // This shouldn't happen if semaphore is working correctly
140        Err(AzothError::Internal(
141            "No available connection despite having permit".into(),
142        ))
143    }
144
145    /// Try to acquire a pooled read-only connection without waiting
146    ///
147    /// Returns `None` if no connection is immediately available.
148    pub fn try_acquire(&self) -> Result<Option<PooledSqliteConnection<'_>>> {
149        match self.semaphore.try_acquire() {
150            Ok(permit) => {
151                // Round-robin: start from next_idx to distribute lock contention
152                let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
153                for i in 0..self.connections.len() {
154                    let idx = (start + i) % self.connections.len();
155                    if let Some(guard) = self.connections[idx].try_lock() {
156                        return Ok(Some(PooledSqliteConnection {
157                            conn: guard,
158                            _permit: permit,
159                        }));
160                    }
161                }
162                // No connection available, permit will be dropped
163                Ok(None)
164            }
165            Err(_) => Ok(None),
166        }
167    }
168
169    /// Get the number of available connections in the pool
170    pub fn available_permits(&self) -> usize {
171        self.semaphore.available_permits()
172    }
173
174    /// Check if pooling is enabled
175    pub fn is_enabled(&self) -> bool {
176        self.enabled
177    }
178
179    /// Get the database path
180    pub fn db_path(&self) -> &Path {
181        &self.db_path
182    }
183
184    /// Get the pool size
185    pub fn pool_size(&self) -> usize {
186        self.connections.len()
187    }
188
189    /// Acquire a pooled read-only connection (blocking)
190    ///
191    /// Blocks up to `acquire_timeout` waiting for an available connection.
192    pub fn acquire_blocking(&self) -> Result<PooledSqliteConnection<'_>> {
193        let deadline = Instant::now() + self.acquire_timeout;
194
195        loop {
196            if let Ok(Some(conn)) = self.try_acquire() {
197                return Ok(conn);
198            }
199
200            if Instant::now() >= deadline {
201                return Err(AzothError::Timeout(format!(
202                    "Read pool acquire timeout after {:?}",
203                    self.acquire_timeout
204                )));
205            }
206
207            std::thread::sleep(Duration::from_millis(1));
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use tempfile::TempDir;
216
217    fn create_test_db() -> (TempDir, PathBuf) {
218        let temp_dir = TempDir::new().unwrap();
219        let db_path = temp_dir.path().join("test.db");
220
221        // Create the database with some test data
222        let conn = Connection::open(&db_path).unwrap();
223        conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)", [])
224            .unwrap();
225        conn.execute("INSERT INTO test (id, value) VALUES (1, 'hello')", [])
226            .unwrap();
227        conn.execute("INSERT INTO test (id, value) VALUES (2, 'world')", [])
228            .unwrap();
229        drop(conn);
230
231        (temp_dir, db_path)
232    }
233
234    #[tokio::test]
235    async fn test_pool_acquire_release() {
236        let (_temp_dir, db_path) = create_test_db();
237        let config = ReadPoolConfig::enabled(2);
238        let pool = SqliteReadPool::new(&db_path, config).unwrap();
239
240        assert_eq!(pool.available_permits(), 2);
241
242        // Acquire first connection
243        let conn1 = pool.acquire().await.unwrap();
244        assert_eq!(pool.available_permits(), 1);
245
246        // Acquire second connection
247        let conn2 = pool.acquire().await.unwrap();
248        assert_eq!(pool.available_permits(), 0);
249
250        // Try acquire should fail now
251        assert!(pool.try_acquire().unwrap().is_none());
252
253        // Drop first connection - should release permit
254        drop(conn1);
255        assert_eq!(pool.available_permits(), 1);
256
257        // Drop second connection
258        drop(conn2);
259        assert_eq!(pool.available_permits(), 2);
260    }
261
262    #[tokio::test]
263    async fn test_pool_query() {
264        let (_temp_dir, db_path) = create_test_db();
265        let config = ReadPoolConfig::enabled(2);
266        let pool = SqliteReadPool::new(&db_path, config).unwrap();
267
268        let conn = pool.acquire().await.unwrap();
269        let value: String = conn
270            .query_row("SELECT value FROM test WHERE id = ?1", [1], |row| {
271                row.get(0)
272            })
273            .unwrap();
274        assert_eq!(value, "hello");
275
276        let count: i64 = conn
277            .query_row("SELECT COUNT(*) FROM test", [], |row| row.get(0))
278            .unwrap();
279        assert_eq!(count, 2);
280    }
281
282    #[test]
283    fn test_try_acquire() {
284        let (_temp_dir, db_path) = create_test_db();
285        let config = ReadPoolConfig::enabled(1);
286        let pool = SqliteReadPool::new(&db_path, config).unwrap();
287
288        // First try should succeed
289        let conn = pool.try_acquire().unwrap();
290        assert!(conn.is_some());
291
292        // Second try should return None
293        assert!(pool.try_acquire().unwrap().is_none());
294
295        // After drop, try should succeed again
296        drop(conn);
297        assert!(pool.try_acquire().unwrap().is_some());
298    }
299}