Skip to main content

khive_db/
pool.rs

1//! Connection pool for SQLite: one exclusive writer, N concurrent readers.
2use crossbeam_queue::ArrayQueue;
3use parking_lot::Mutex;
4use rusqlite::{Connection, OpenFlags};
5use std::ops::{Deref, DerefMut};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crate::error::SqliteError;
12
13const CACHE_SIZE_KIB: &str = "-65536";
14const MMAP_SIZE_BYTES: &str = "1073741824";
15const WAL_AUTOCHECKPOINT_PAGES: &str = "4000";
16const JOURNAL_SIZE_LIMIT_BYTES: &str = "67108864";
17const DEFAULT_READER_CAP: usize = 8;
18
19/// Configuration for the connection pool.
20#[derive(Clone, Debug)]
21pub struct PoolConfig {
22    /// Database path. None = in-memory (pool degrades to single connection).
23    pub path: Option<PathBuf>,
24    /// Number of reader connections (default: min(num_cpus, 8)).
25    pub max_readers: usize,
26    /// WAL mode (must be true for pooling to work; default: true).
27    pub wal_mode: bool,
28    /// Busy timeout per connection (default: 30s).
29    pub busy_timeout: Duration,
30    /// Time to wait for a reader connection before returning an error (default: 5s).
31    pub checkout_timeout: Duration,
32}
33
34impl Default for PoolConfig {
35    fn default() -> Self {
36        Self {
37            path: None,
38            max_readers: std::thread::available_parallelism()
39                .map(|n| n.get())
40                .unwrap_or(1)
41                .clamp(1, DEFAULT_READER_CAP),
42            wal_mode: true,
43            busy_timeout: Duration::from_secs(30),
44            checkout_timeout: Duration::from_secs(5),
45        }
46    }
47}
48
49/// A read-write connection pool for SQLite.
50///
51/// Architecture:
52/// - 1 writer connection protected by a Mutex (exclusive access)
53/// - N reader connections in a lock-free queue (concurrent access)
54/// - All connections share the same database file in WAL mode
55///
56/// For in-memory databases, or when WAL mode is disabled/unavailable, the pool
57/// degrades to single-connection mode and routes all operations through the
58/// writer connection.
59pub struct ConnectionPool {
60    writer: Arc<Mutex<Connection>>,
61    readers: ArrayQueue<Connection>,
62    max_readers: usize,
63    config: PoolConfig,
64}
65
66enum ReaderLease<'pool> {
67    Pooled(Connection),
68    Shared(parking_lot::MutexGuard<'pool, Connection>),
69}
70
71/// A reader connection checked out from the pool.
72/// Returns the connection to the pool on drop.
73pub struct ReaderGuard<'pool> {
74    lease: Option<ReaderLease<'pool>>,
75    pool: &'pool ConnectionPool,
76}
77
78impl<'pool> ReaderGuard<'pool> {
79    /// Access the connection.
80    pub fn conn(&self) -> &Connection {
81        match self
82            .lease
83            .as_ref()
84            .expect("reader guard missing connection")
85        {
86            ReaderLease::Pooled(conn) => conn,
87            ReaderLease::Shared(guard) => guard,
88        }
89    }
90}
91
92impl<'pool> Deref for ReaderGuard<'pool> {
93    type Target = Connection;
94
95    fn deref(&self) -> &Self::Target {
96        self.conn()
97    }
98}
99
100impl<'pool> Drop for ReaderGuard<'pool> {
101    fn drop(&mut self) {
102        let Some(lease) = self.lease.take() else {
103            return;
104        };
105
106        match lease {
107            ReaderLease::Pooled(conn) => self.pool.return_reader(conn),
108            ReaderLease::Shared(_guard) => {}
109        }
110    }
111}
112
113/// A writer connection checked out from the pool.
114/// The Mutex ensures only one writer at a time.
115pub struct WriterGuard<'pool> {
116    guard: parking_lot::MutexGuard<'pool, Connection>,
117}
118
119impl<'pool> WriterGuard<'pool> {
120    /// Returns a shared reference to the underlying connection.
121    pub fn conn(&self) -> &Connection {
122        &self.guard
123    }
124
125    /// Returns a mutable reference to the underlying connection.
126    pub fn conn_mut(&mut self) -> &mut Connection {
127        &mut self.guard
128    }
129
130    /// Execute a write transaction.
131    /// Wraps the closure in BEGIN IMMEDIATE ... COMMIT.
132    pub fn transaction<F, R>(&self, f: F) -> Result<R, SqliteError>
133    where
134        F: FnOnce(&Connection) -> Result<R, SqliteError>,
135    {
136        self.guard.execute_batch("BEGIN IMMEDIATE")?;
137
138        match f(&self.guard) {
139            Ok(result) => {
140                if let Err(err) = self.guard.execute_batch("COMMIT") {
141                    let _ = self.guard.execute_batch("ROLLBACK");
142                    return Err(err.into());
143                }
144                Ok(result)
145            }
146            Err(err) => {
147                let _ = self.guard.execute_batch("ROLLBACK");
148                Err(err)
149            }
150        }
151    }
152}
153
154impl<'pool> Deref for WriterGuard<'pool> {
155    type Target = Connection;
156
157    fn deref(&self) -> &Self::Target {
158        self.conn()
159    }
160}
161
162impl<'pool> DerefMut for WriterGuard<'pool> {
163    fn deref_mut(&mut self) -> &mut Self::Target {
164        self.conn_mut()
165    }
166}
167
168impl ConnectionPool {
169    /// Create a new connection pool.
170    ///
171    /// Opens 1 writer + N reader connections to the same database when pooling
172    /// is enabled. All connections are configured consistently (busy timeout,
173    /// foreign keys, cache, mmap, temp store). For in-memory databases, or when
174    /// WAL is disabled or unavailable, the pool falls back to single-connection
175    /// mode.
176    pub fn new(config: PoolConfig) -> Result<Self, SqliteError> {
177        let writer = open_writer_connection(&config)?;
178        let wal_enabled = configure_writer_connection(&writer, &config)?;
179        let max_readers = effective_reader_count(&config, wal_enabled);
180
181        let readers = ArrayQueue::new(max_readers.max(1));
182
183        let pool = Self {
184            writer: Arc::new(Mutex::new(writer)),
185            readers,
186            max_readers,
187            config,
188        };
189
190        for _ in 0..pool.max_readers {
191            let conn = pool.open_reader_connection()?;
192            pool.readers
193                .push(conn)
194                .expect("reader queue must have capacity during pool initialization");
195        }
196
197        Ok(pool)
198    }
199
200    /// Check out a reader connection.
201    ///
202    /// Tries to pop from the lock-free queue. If empty, spins briefly then
203    /// waits with exponential backoff up to `checkout_timeout`.
204    ///
205    /// # Deadlock Warning
206    ///
207    /// In degraded mode (WAL unavailable, `max_readers == 0`), this method locks
208    /// the writer mutex. If the calling thread already holds a [`WriterGuard`],
209    /// this will deadlock (parking_lot `Mutex` is not reentrant). Never call
210    /// `reader()` while holding a `WriterGuard` on the same pool.
211    pub fn reader(&self) -> Result<ReaderGuard<'_>, SqliteError> {
212        if self.max_readers == 0 {
213            return Ok(ReaderGuard {
214                lease: Some(ReaderLease::Shared(self.writer.lock())),
215                pool: self,
216            });
217        }
218
219        let started = Instant::now();
220        let mut attempt = 0u32;
221
222        loop {
223            if let Some(conn) = self.readers.pop() {
224                return Ok(ReaderGuard {
225                    lease: Some(ReaderLease::Pooled(conn)),
226                    pool: self,
227                });
228            }
229
230            if started.elapsed() >= self.config.checkout_timeout {
231                return Err(pool_exhausted_error(
232                    self.config.checkout_timeout,
233                    self.max_readers,
234                ));
235            }
236
237            match attempt {
238                0..=7 => {
239                    let spins = 1usize << attempt;
240                    for _ in 0..spins {
241                        std::hint::spin_loop();
242                    }
243                }
244                8..=15 => thread::yield_now(),
245                _ => {
246                    let remaining = self
247                        .config
248                        .checkout_timeout
249                        .saturating_sub(started.elapsed());
250                    let sleep = Duration::from_micros(50 * (1u64 << (attempt - 16).min(6)));
251                    thread::sleep(sleep.min(remaining).min(Duration::from_millis(2)));
252                }
253            }
254
255            attempt = attempt.saturating_add(1);
256        }
257    }
258
259    /// Check out the writer connection.
260    ///
261    /// Waits up to `checkout_timeout` for the writer Mutex and returns
262    /// `Err(SqliteError::InvalidData)` if the timeout is exceeded.
263    pub fn writer(&self) -> Result<WriterGuard<'_>, SqliteError> {
264        let guard = self
265            .writer
266            .try_lock_for(self.config.checkout_timeout)
267            .ok_or_else(|| {
268                SqliteError::InvalidData(format!(
269                    "timed out after {:?} waiting for sqlite writer connection",
270                    self.config.checkout_timeout
271                ))
272            })?;
273        Ok(WriterGuard { guard })
274    }
275
276    /// Non-panicking writer checkout.
277    ///
278    /// Returns `Err` on timeout instead of panicking. Use this in request
279    /// handlers where a 500 is preferable to crashing the process.
280    pub fn try_writer(&self) -> Result<WriterGuard<'_>, SqliteError> {
281        self.writer()
282    }
283
284    /// Get the current number of available reader connections.
285    pub fn available_readers(&self) -> usize {
286        self.readers.len()
287    }
288
289    /// Get the total number of reader connections in the pool.
290    pub fn max_readers(&self) -> usize {
291        self.max_readers
292    }
293
294    /// Return the pool configuration.
295    pub fn config(&self) -> &PoolConfig {
296        &self.config
297    }
298
299    /// Compatibility method: returns the writer connection wrapped in Arc<Mutex>.
300    ///
301    /// WARNING: This exists only for backward compatibility with code that
302    /// calls `store.conn()`. New code should use `reader()` and `writer()`.
303    pub fn legacy_conn(&self) -> Arc<Mutex<Connection>> {
304        Arc::clone(&self.writer)
305    }
306
307    fn open_reader_connection(&self) -> Result<Connection, SqliteError> {
308        let path = self
309            .config
310            .path
311            .as_ref()
312            .expect("reader connections require a file-backed database");
313        open_reader_connection(path, &self.config)
314    }
315
316    fn return_reader(&self, conn: Connection) {
317        if self.max_readers == 0 {
318            return;
319        }
320
321        let conn = if reset_reader_connection(&conn) && reader_connection_is_healthy(&conn) {
322            Some(conn)
323        } else {
324            close_connection_quietly(conn);
325            self.open_reader_connection().ok()
326        };
327
328        if let Some(conn) = conn {
329            if let Err(conn) = self.readers.push(conn) {
330                eprintln!(
331                    "[sqlite-pool] reader pool queue full, discarding replacement connection"
332                );
333                close_connection_quietly(conn);
334            }
335        }
336    }
337}
338
339fn effective_reader_count(config: &PoolConfig, wal_enabled: bool) -> usize {
340    if config.path.is_some() && config.wal_mode && wal_enabled {
341        config.max_readers
342    } else {
343        0
344    }
345}
346
347fn open_writer_connection(config: &PoolConfig) -> Result<Connection, SqliteError> {
348    match config.path.as_ref() {
349        Some(path) => Connection::open_with_flags(path, writer_open_flags()).map_err(Into::into),
350        None => Connection::open_in_memory().map_err(Into::into),
351    }
352}
353
354fn open_reader_connection(path: &Path, config: &PoolConfig) -> Result<Connection, SqliteError> {
355    let conn = Connection::open_with_flags(path, reader_open_flags())?;
356    configure_reader_connection(&conn, config)?;
357    Ok(conn)
358}
359
360fn writer_open_flags() -> OpenFlags {
361    OpenFlags::SQLITE_OPEN_READ_WRITE
362        | OpenFlags::SQLITE_OPEN_CREATE
363        | OpenFlags::SQLITE_OPEN_URI
364        | OpenFlags::SQLITE_OPEN_NO_MUTEX
365}
366
367fn reader_open_flags() -> OpenFlags {
368    OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX
369}
370
371fn configure_writer_connection(
372    conn: &Connection,
373    config: &PoolConfig,
374) -> Result<bool, SqliteError> {
375    let wants_wal = config.path.is_some() && config.wal_mode;
376
377    if wants_wal {
378        conn.pragma_update(None, "journal_mode", "WAL")?;
379    }
380
381    conn.pragma_update(None, "synchronous", "NORMAL")?;
382    conn.pragma_update(None, "foreign_keys", "ON")?;
383    conn.busy_timeout(config.busy_timeout)?;
384    conn.pragma_update(None, "cache_size", CACHE_SIZE_KIB)?;
385    conn.pragma_update(None, "mmap_size", MMAP_SIZE_BYTES)?;
386    conn.pragma_update(None, "temp_store", "MEMORY")?;
387
388    let wal_enabled = wants_wal && current_journal_mode(conn)?.eq_ignore_ascii_case("wal");
389
390    if wal_enabled {
391        conn.pragma_update(None, "wal_autocheckpoint", WAL_AUTOCHECKPOINT_PAGES)?;
392        conn.pragma_update(None, "journal_size_limit", JOURNAL_SIZE_LIMIT_BYTES)?;
393    }
394
395    Ok(wal_enabled)
396}
397
398fn configure_reader_connection(conn: &Connection, config: &PoolConfig) -> Result<(), SqliteError> {
399    conn.pragma_update(None, "foreign_keys", "ON")?;
400    conn.busy_timeout(config.busy_timeout)?;
401    conn.pragma_update(None, "cache_size", CACHE_SIZE_KIB)?;
402    conn.pragma_update(None, "mmap_size", MMAP_SIZE_BYTES)?;
403    conn.pragma_update(None, "temp_store", "MEMORY")?;
404    Ok(())
405}
406
407fn current_journal_mode(conn: &Connection) -> Result<String, SqliteError> {
408    conn.pragma_query_value(None, "journal_mode", |row| row.get::<_, String>(0))
409        .map(|mode| mode.to_ascii_lowercase())
410        .map_err(Into::into)
411}
412
413fn reset_reader_connection(conn: &Connection) -> bool {
414    if conn.is_autocommit() {
415        return true;
416    }
417
418    match conn.execute_batch("ROLLBACK") {
419        Ok(()) => conn.is_autocommit(),
420        Err(rusqlite::Error::SqliteFailure(err, _)) => {
421            if matches!(
422                err.code,
423                rusqlite::ErrorCode::CannotOpen
424                    | rusqlite::ErrorCode::DatabaseCorrupt
425                    | rusqlite::ErrorCode::NotADatabase
426                    | rusqlite::ErrorCode::DiskFull
427            ) {
428                return false;
429            }
430            conn.is_autocommit()
431        }
432        Err(_) => false,
433    }
434}
435
436fn reader_connection_is_healthy(conn: &Connection) -> bool {
437    match conn.query_row("SELECT 1", [], |row| row.get::<_, i64>(0)) {
438        Ok(_) => true,
439        Err(rusqlite::Error::SqliteFailure(err, _)) => !matches!(
440            err.code,
441            rusqlite::ErrorCode::CannotOpen
442                | rusqlite::ErrorCode::NotADatabase
443                | rusqlite::ErrorCode::DatabaseCorrupt
444                | rusqlite::ErrorCode::PermissionDenied
445                | rusqlite::ErrorCode::SystemIoFailure
446        ),
447        Err(_) => true,
448    }
449}
450
451fn close_connection_quietly(conn: Connection) {
452    match conn.close() {
453        Ok(()) => {}
454        Err((conn, _)) => drop(conn),
455    }
456}
457
458fn pool_exhausted_error(timeout: Duration, max_readers: usize) -> SqliteError {
459    rusqlite::Error::SqliteFailure(
460        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
461        Some(format!(
462            "Pool exhausted: no reader available after {timeout:?} (max_readers={max_readers})"
463        )),
464    )
465    .into()
466}