1use crossbeam_queue::ArrayQueue;
3use parking_lot::Mutex;
4use rusqlite::{Connection, OpenFlags};
5use std::ops::{Deref, DerefMut};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use crate::error::SqliteError;
12
13const CACHE_SIZE_KIB: &str = "-65536";
14const MMAP_SIZE_BYTES: &str = "1073741824";
15const WAL_AUTOCHECKPOINT_PAGES: &str = "4000";
16const JOURNAL_SIZE_LIMIT_BYTES: &str = "67108864";
17const DEFAULT_READER_CAP: usize = 8;
18
19#[derive(Clone, Debug)]
21pub struct PoolConfig {
22 pub path: Option<PathBuf>,
24 pub max_readers: usize,
26 pub wal_mode: bool,
28 pub busy_timeout: Duration,
30 pub checkout_timeout: Duration,
32}
33
34impl Default for PoolConfig {
35 fn default() -> Self {
36 Self {
37 path: None,
38 max_readers: std::thread::available_parallelism()
39 .map(|n| n.get())
40 .unwrap_or(1)
41 .clamp(1, DEFAULT_READER_CAP),
42 wal_mode: true,
43 busy_timeout: Duration::from_secs(30),
44 checkout_timeout: Duration::from_secs(5),
45 }
46 }
47}
48
49pub struct ConnectionPool {
60 writer: Arc<Mutex<Connection>>,
61 readers: ArrayQueue<Connection>,
62 max_readers: usize,
63 config: PoolConfig,
64}
65
66enum ReaderLease<'pool> {
67 Pooled(Connection),
68 Shared(parking_lot::MutexGuard<'pool, Connection>),
69}
70
71pub struct ReaderGuard<'pool> {
74 lease: Option<ReaderLease<'pool>>,
75 pool: &'pool ConnectionPool,
76}
77
78impl<'pool> ReaderGuard<'pool> {
79 pub fn conn(&self) -> &Connection {
81 match self
82 .lease
83 .as_ref()
84 .expect("reader guard missing connection")
85 {
86 ReaderLease::Pooled(conn) => conn,
87 ReaderLease::Shared(guard) => guard,
88 }
89 }
90}
91
92impl<'pool> Deref for ReaderGuard<'pool> {
93 type Target = Connection;
94
95 fn deref(&self) -> &Self::Target {
96 self.conn()
97 }
98}
99
100impl<'pool> Drop for ReaderGuard<'pool> {
101 fn drop(&mut self) {
102 let Some(lease) = self.lease.take() else {
103 return;
104 };
105
106 match lease {
107 ReaderLease::Pooled(conn) => self.pool.return_reader(conn),
108 ReaderLease::Shared(_guard) => {}
109 }
110 }
111}
112
113pub struct WriterGuard<'pool> {
116 guard: parking_lot::MutexGuard<'pool, Connection>,
117}
118
119impl<'pool> WriterGuard<'pool> {
120 pub fn conn(&self) -> &Connection {
122 &self.guard
123 }
124
125 pub fn conn_mut(&mut self) -> &mut Connection {
127 &mut self.guard
128 }
129
130 pub fn transaction<F, R>(&self, f: F) -> Result<R, SqliteError>
133 where
134 F: FnOnce(&Connection) -> Result<R, SqliteError>,
135 {
136 self.guard.execute_batch("BEGIN IMMEDIATE")?;
137
138 match f(&self.guard) {
139 Ok(result) => {
140 if let Err(err) = self.guard.execute_batch("COMMIT") {
141 let _ = self.guard.execute_batch("ROLLBACK");
142 return Err(err.into());
143 }
144 Ok(result)
145 }
146 Err(err) => {
147 let _ = self.guard.execute_batch("ROLLBACK");
148 Err(err)
149 }
150 }
151 }
152}
153
154impl<'pool> Deref for WriterGuard<'pool> {
155 type Target = Connection;
156
157 fn deref(&self) -> &Self::Target {
158 self.conn()
159 }
160}
161
162impl<'pool> DerefMut for WriterGuard<'pool> {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 self.conn_mut()
165 }
166}
167
168impl ConnectionPool {
169 pub fn new(config: PoolConfig) -> Result<Self, SqliteError> {
177 let writer = open_writer_connection(&config)?;
178 let wal_enabled = configure_writer_connection(&writer, &config)?;
179 let max_readers = effective_reader_count(&config, wal_enabled);
180
181 let readers = ArrayQueue::new(max_readers.max(1));
182
183 let pool = Self {
184 writer: Arc::new(Mutex::new(writer)),
185 readers,
186 max_readers,
187 config,
188 };
189
190 for _ in 0..pool.max_readers {
191 let conn = pool.open_reader_connection()?;
192 pool.readers
193 .push(conn)
194 .expect("reader queue must have capacity during pool initialization");
195 }
196
197 Ok(pool)
198 }
199
200 pub fn reader(&self) -> Result<ReaderGuard<'_>, SqliteError> {
212 if self.max_readers == 0 {
213 return Ok(ReaderGuard {
214 lease: Some(ReaderLease::Shared(self.writer.lock())),
215 pool: self,
216 });
217 }
218
219 let started = Instant::now();
220 let mut attempt = 0u32;
221
222 loop {
223 if let Some(conn) = self.readers.pop() {
224 return Ok(ReaderGuard {
225 lease: Some(ReaderLease::Pooled(conn)),
226 pool: self,
227 });
228 }
229
230 if started.elapsed() >= self.config.checkout_timeout {
231 return Err(pool_exhausted_error(
232 self.config.checkout_timeout,
233 self.max_readers,
234 ));
235 }
236
237 match attempt {
238 0..=7 => {
239 let spins = 1usize << attempt;
240 for _ in 0..spins {
241 std::hint::spin_loop();
242 }
243 }
244 8..=15 => thread::yield_now(),
245 _ => {
246 let remaining = self
247 .config
248 .checkout_timeout
249 .saturating_sub(started.elapsed());
250 let sleep = Duration::from_micros(50 * (1u64 << (attempt - 16).min(6)));
251 thread::sleep(sleep.min(remaining).min(Duration::from_millis(2)));
252 }
253 }
254
255 attempt = attempt.saturating_add(1);
256 }
257 }
258
259 pub fn writer(&self) -> Result<WriterGuard<'_>, SqliteError> {
264 let guard = self
265 .writer
266 .try_lock_for(self.config.checkout_timeout)
267 .ok_or_else(|| {
268 SqliteError::InvalidData(format!(
269 "timed out after {:?} waiting for sqlite writer connection",
270 self.config.checkout_timeout
271 ))
272 })?;
273 Ok(WriterGuard { guard })
274 }
275
276 pub fn try_writer(&self) -> Result<WriterGuard<'_>, SqliteError> {
281 self.writer()
282 }
283
284 pub fn available_readers(&self) -> usize {
286 self.readers.len()
287 }
288
289 pub fn max_readers(&self) -> usize {
291 self.max_readers
292 }
293
294 pub fn config(&self) -> &PoolConfig {
296 &self.config
297 }
298
299 pub fn legacy_conn(&self) -> Arc<Mutex<Connection>> {
304 Arc::clone(&self.writer)
305 }
306
307 fn open_reader_connection(&self) -> Result<Connection, SqliteError> {
308 let path = self
309 .config
310 .path
311 .as_ref()
312 .expect("reader connections require a file-backed database");
313 open_reader_connection(path, &self.config)
314 }
315
316 fn return_reader(&self, conn: Connection) {
317 if self.max_readers == 0 {
318 return;
319 }
320
321 let conn = if reset_reader_connection(&conn) && reader_connection_is_healthy(&conn) {
322 Some(conn)
323 } else {
324 close_connection_quietly(conn);
325 self.open_reader_connection().ok()
326 };
327
328 if let Some(conn) = conn {
329 if let Err(conn) = self.readers.push(conn) {
330 eprintln!(
331 "[sqlite-pool] reader pool queue full, discarding replacement connection"
332 );
333 close_connection_quietly(conn);
334 }
335 }
336 }
337}
338
339fn effective_reader_count(config: &PoolConfig, wal_enabled: bool) -> usize {
340 if config.path.is_some() && config.wal_mode && wal_enabled {
341 config.max_readers
342 } else {
343 0
344 }
345}
346
347fn open_writer_connection(config: &PoolConfig) -> Result<Connection, SqliteError> {
348 match config.path.as_ref() {
349 Some(path) => Connection::open_with_flags(path, writer_open_flags()).map_err(Into::into),
350 None => Connection::open_in_memory().map_err(Into::into),
351 }
352}
353
354fn open_reader_connection(path: &Path, config: &PoolConfig) -> Result<Connection, SqliteError> {
355 let conn = Connection::open_with_flags(path, reader_open_flags())?;
356 configure_reader_connection(&conn, config)?;
357 Ok(conn)
358}
359
360fn writer_open_flags() -> OpenFlags {
361 OpenFlags::SQLITE_OPEN_READ_WRITE
362 | OpenFlags::SQLITE_OPEN_CREATE
363 | OpenFlags::SQLITE_OPEN_URI
364 | OpenFlags::SQLITE_OPEN_NO_MUTEX
365}
366
367fn reader_open_flags() -> OpenFlags {
368 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX
369}
370
371fn configure_writer_connection(
372 conn: &Connection,
373 config: &PoolConfig,
374) -> Result<bool, SqliteError> {
375 let wants_wal = config.path.is_some() && config.wal_mode;
376
377 if wants_wal {
378 conn.pragma_update(None, "journal_mode", "WAL")?;
379 }
380
381 conn.pragma_update(None, "synchronous", "NORMAL")?;
382 conn.pragma_update(None, "foreign_keys", "ON")?;
383 conn.busy_timeout(config.busy_timeout)?;
384 conn.pragma_update(None, "cache_size", CACHE_SIZE_KIB)?;
385 conn.pragma_update(None, "mmap_size", MMAP_SIZE_BYTES)?;
386 conn.pragma_update(None, "temp_store", "MEMORY")?;
387
388 let wal_enabled = wants_wal && current_journal_mode(conn)?.eq_ignore_ascii_case("wal");
389
390 if wal_enabled {
391 conn.pragma_update(None, "wal_autocheckpoint", WAL_AUTOCHECKPOINT_PAGES)?;
392 conn.pragma_update(None, "journal_size_limit", JOURNAL_SIZE_LIMIT_BYTES)?;
393 }
394
395 Ok(wal_enabled)
396}
397
398fn configure_reader_connection(conn: &Connection, config: &PoolConfig) -> Result<(), SqliteError> {
399 conn.pragma_update(None, "foreign_keys", "ON")?;
400 conn.busy_timeout(config.busy_timeout)?;
401 conn.pragma_update(None, "cache_size", CACHE_SIZE_KIB)?;
402 conn.pragma_update(None, "mmap_size", MMAP_SIZE_BYTES)?;
403 conn.pragma_update(None, "temp_store", "MEMORY")?;
404 Ok(())
405}
406
407fn current_journal_mode(conn: &Connection) -> Result<String, SqliteError> {
408 conn.pragma_query_value(None, "journal_mode", |row| row.get::<_, String>(0))
409 .map(|mode| mode.to_ascii_lowercase())
410 .map_err(Into::into)
411}
412
413fn reset_reader_connection(conn: &Connection) -> bool {
414 if conn.is_autocommit() {
415 return true;
416 }
417
418 match conn.execute_batch("ROLLBACK") {
419 Ok(()) => conn.is_autocommit(),
420 Err(rusqlite::Error::SqliteFailure(err, _)) => {
421 if matches!(
422 err.code,
423 rusqlite::ErrorCode::CannotOpen
424 | rusqlite::ErrorCode::DatabaseCorrupt
425 | rusqlite::ErrorCode::NotADatabase
426 | rusqlite::ErrorCode::DiskFull
427 ) {
428 return false;
429 }
430 conn.is_autocommit()
431 }
432 Err(_) => false,
433 }
434}
435
436fn reader_connection_is_healthy(conn: &Connection) -> bool {
437 match conn.query_row("SELECT 1", [], |row| row.get::<_, i64>(0)) {
438 Ok(_) => true,
439 Err(rusqlite::Error::SqliteFailure(err, _)) => !matches!(
440 err.code,
441 rusqlite::ErrorCode::CannotOpen
442 | rusqlite::ErrorCode::NotADatabase
443 | rusqlite::ErrorCode::DatabaseCorrupt
444 | rusqlite::ErrorCode::PermissionDenied
445 | rusqlite::ErrorCode::SystemIoFailure
446 ),
447 Err(_) => true,
448 }
449}
450
451fn close_connection_quietly(conn: Connection) {
452 match conn.close() {
453 Ok(()) => {}
454 Err((conn, _)) => drop(conn),
455 }
456}
457
458fn pool_exhausted_error(timeout: Duration, max_readers: usize) -> SqliteError {
459 rusqlite::Error::SqliteFailure(
460 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
461 Some(format!(
462 "Pool exhausted: no reader available after {timeout:?} (max_readers={max_readers})"
463 )),
464 )
465 .into()
466}