sqlx_sqlite_conn_mgr/
database.rs

1//! SQLite database with connection pooling and optional write access
2
3use crate::Result;
4use crate::config::SqliteDatabaseConfig;
5use crate::error::Error;
6use crate::registry::{get_or_open_database, is_memory_database, uncache_database};
7use crate::write_guard::WriteGuard;
8use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
9use sqlx::{ConnectOptions, Pool, Sqlite};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use tracing::error;
14
15/// Analysis limit for PRAGMA optimize on close.
16/// SQLite recommends 100-1000 for older versions; 3.46.0+ handles automatically.
17/// See: https://www.sqlite.org/lang_analyze.html#recommended_usage_pattern
18const OPTIMIZE_ANALYSIS_LIMIT: u32 = 400;
19
20/// SQLite database with connection pooling for concurrent reads and optional exclusive writes.
21///
22/// Once the database is opened it can be used for read-only operations by calling `read_pool()`.
23/// Write operations are available by calling `acquire_writer()` which lazily initializes WAL mode
24/// on first use.
25///
26/// # Example
27///
28/// ```no_run
29/// use sqlx_sqlite_conn_mgr::SqliteDatabase;
30/// use std::sync::Arc;
31///
32/// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
33/// let db = SqliteDatabase::connect("test.db", None).await?;
34///
35/// // Use read_pool for SELECT queries (concurrent reads)
36/// let rows = sqlx::query("SELECT * FROM users")
37///     .fetch_all(db.read_pool()?)
38///     .await?;
39///
40/// // Optionally acquire writer for INSERT/UPDATE/DELETE (exclusive)
41/// let mut writer = db.acquire_writer().await?;
42/// sqlx::query("INSERT INTO users (name) VALUES (?)")
43///     .bind("Alice")
44///     .execute(&mut *writer)
45///     .await?;
46///
47/// db.close().await?;
48/// # Ok(())
49/// # }
50/// ```
51#[derive(Debug)]
52pub struct SqliteDatabase {
53   /// Pool of read-only connections (defaults to max_connections=6) for concurrent reads
54   read_pool: Pool<Sqlite>,
55
56   /// Single read-write connection pool (max_connections=1) for serialized writes
57   write_conn: Pool<Sqlite>,
58
59   /// Tracks if WAL mode has been initialized (set on first write)
60   wal_initialized: AtomicBool,
61
62   /// Marks database as closed to prevent further operations
63   closed: AtomicBool,
64
65   /// Path to database file (used for cleanup and registry lookups)
66   path: PathBuf,
67}
68
69impl SqliteDatabase {
70   /// Get the database file path as a string
71   ///
72   /// Used internally (crate-private) for ATTACH DATABASE statements
73   pub(crate) fn path_str(&self) -> String {
74      self.path.to_string_lossy().to_string()
75   }
76
77   /// Connect to a SQLite database
78   ///
79   /// If the database is already connected, returns the existing connection.
80   /// Multiple calls with the same path will return the same database instance.
81   ///
82   /// The database is created if it doesn't exist. WAL mode is enabled when
83   /// `acquire_writer()` is first called.
84   ///
85   /// # Arguments
86   ///
87   /// * `path` - Path to the SQLite database file (will be created if missing)
88   /// * `custom_config` - Optional custom configuration for connection pools.
89   ///   Pass `None` to use defaults (6 max read connections, 30 second idle timeout).
90   ///   Specify a custom configuration when the defaults don't meet your requirements.
91   ///
92   /// # Examples
93   ///
94   /// ```no_run
95   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
96   /// use std::sync::Arc;
97   ///
98   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
99   /// // Connect with default configuration (recommended for most use cases)
100   /// let db = SqliteDatabase::connect("test.db", None).await?;
101   /// # Ok(())
102   /// # }
103   /// ```
104   ///
105   /// ```no_run
106   /// use sqlx_sqlite_conn_mgr::{SqliteDatabase, SqliteDatabaseConfig};
107   /// use std::sync::Arc;
108   ///
109   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
110   /// // Customize configuration when defaults don't meet your requirements
111   /// let custom_config = SqliteDatabaseConfig {
112   ///    max_read_connections: 10,
113   ///    idle_timeout_secs: 60,
114   /// };
115   /// let db = SqliteDatabase::connect("test.db", Some(custom_config)).await?;
116   /// # Ok(())
117   /// # }
118   /// ```
119   pub async fn connect(
120      path: impl AsRef<Path>,
121      custom_config: Option<SqliteDatabaseConfig>,
122   ) -> Result<Arc<Self>> {
123      let config = custom_config.unwrap_or_default();
124      let path = path.as_ref();
125
126      // Validate path is not empty
127      if path.as_os_str().is_empty() {
128         return Err(crate::error::Error::Io(std::io::Error::new(
129            std::io::ErrorKind::InvalidInput,
130            "Database path cannot be empty",
131         )));
132      }
133
134      let path = path.to_path_buf();
135
136      get_or_open_database(&path, || async {
137         // Check if database file exists
138         let db_exists = path.exists();
139
140         // If database doesn't exist and not :memory:, create it with a temporary connection
141         // We don't keep this connection - WAL mode will be set later in acquire_writer()
142         //
143         // Why do we need to manually create the database file? We could just let the connection
144         // create it if it doesn't exist, using `create_if_missing(true)`, right? Not if we called
145         // connect and then our very first query was a read-only query, like `PRAGMA user_version;`,
146         // for example. That would fail because the read pool connections are read-only and cannot
147         // create the file
148         if !db_exists && !is_memory_database(&path) {
149            let create_options = SqliteConnectOptions::new()
150               .filename(&path)
151               .create_if_missing(true)
152               .read_only(false);
153
154            // Create database file with a temporary connection
155            let conn = create_options.connect().await?;
156            drop(conn); // Close immediately after creating the file
157         }
158
159         // Create read pool with read-only connections
160         let read_options = SqliteConnectOptions::new()
161            .filename(&path)
162            .read_only(true)
163            .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);
164
165         let read_pool = SqlitePoolOptions::new()
166            .max_connections(config.max_read_connections)
167            .min_connections(0)
168            .idle_timeout(Some(std::time::Duration::from_secs(
169               config.idle_timeout_secs,
170            )))
171            .connect_with(read_options)
172            .await?;
173
174         // Create write pool with a single read-write connection
175         let write_options = SqliteConnectOptions::new()
176            .filename(&path)
177            .read_only(false)
178            .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);
179
180         let write_conn = SqlitePoolOptions::new()
181            .max_connections(1)
182            .min_connections(0)
183            .idle_timeout(Some(std::time::Duration::from_secs(
184               config.idle_timeout_secs,
185            )))
186            .connect_with(write_options)
187            .await?;
188
189         Ok(Self {
190            read_pool,
191            write_conn,
192            wal_initialized: AtomicBool::new(false),
193            closed: AtomicBool::new(false),
194            path: path.clone(),
195         })
196      })
197      .await
198   }
199
200   /// Get a reference to the connection pool for executing read queries
201   ///
202   /// Use this for concurrent read operations. Multiple readers can access
203   /// the pool simultaneously.
204   ///
205   /// # Example
206   ///
207   /// ```no_run
208   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
209   /// use sqlx::query;
210   /// use std::sync::Arc;
211   ///
212   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
213   /// let db = SqliteDatabase::connect("test.db", None).await?;
214   /// let result = query("SELECT * FROM users")
215   ///     .fetch_all(db.read_pool()?)
216   ///     .await?;
217   /// # Ok(())
218   /// # }
219   /// ```
220   pub fn read_pool(&self) -> Result<&Pool<Sqlite>> {
221      if self.closed.load(Ordering::SeqCst) {
222         return Err(Error::DatabaseClosed);
223      }
224      Ok(&self.read_pool)
225   }
226
227   /// Acquire exclusive write access to the database
228   ///
229   /// This method returns a `WriteGuard` that provides exclusive access to
230   /// the single write connection. Only one writer can exist at a time.
231   ///
232   /// On the first call, this method will enable WAL mode on the database.
233   /// Subsequent calls reuse the same write connection.
234   ///
235   /// # Example
236   ///
237   /// ```no_run
238   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
239   /// use sqlx::query;
240   /// use std::sync::Arc;
241   ///
242   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
243   /// let db = SqliteDatabase::connect("test.db", None).await?;
244   /// let mut writer = db.acquire_writer().await?;
245   /// query("INSERT INTO users (name) VALUES (?)")
246   ///     .bind("Alice")
247   ///     .execute(&mut *writer)
248   ///     .await?;
249   /// # Ok(())
250   /// # }
251   /// ```
252   pub async fn acquire_writer(&self) -> Result<WriteGuard> {
253      if self.closed.load(Ordering::SeqCst) {
254         return Err(Error::DatabaseClosed);
255      }
256
257      // Acquire connection from pool (max=1 ensures exclusive access)
258      let mut conn = self.write_conn.acquire().await?;
259
260      // Initialize WAL mode on first use (atomic check-and-set)
261      if self
262         .wal_initialized
263         .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
264         .is_ok()
265      {
266         sqlx::query("PRAGMA journal_mode = WAL")
267            .execute(&mut *conn)
268            .await?;
269
270         // https://www.sqlite.org/wal.html#performance_considerations
271         sqlx::query("PRAGMA synchronous = NORMAL")
272            .execute(&mut *conn)
273            .await?;
274      }
275
276      // Return WriteGuard wrapping the pool connection
277      Ok(WriteGuard::new(conn))
278   }
279
280   /// Run database migrations using the provided migrator
281   ///
282   /// This method runs all pending migrations from the provided `Migrator`.
283   /// Migrations are executed using the write connection to ensure exclusive access.
284   /// WAL mode is enabled automatically before running migrations.
285   ///
286   /// SQLx tracks applied migrations in a `_sqlx_migrations` table, so calling
287   /// this method multiple times is safe - already-applied migrations are skipped.
288   ///
289   /// # Arguments
290   ///
291   /// * `migrator` - A reference to a `Migrator` containing the migrations to run.
292   ///   Typically created using `sqlx::migrate!()` macro.
293   ///
294   /// # Example
295   ///
296   /// ```no_run
297   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
298   ///
299   /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
300   /// // sqlx::migrate! is evaluated at compile time
301   /// static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
302   ///
303   /// let db = SqliteDatabase::connect("test.db", None).await?;
304   /// db.run_migrations(&MIGRATOR).await?;
305   /// # Ok(())
306   /// # }
307   /// ```
308   pub async fn run_migrations(&self, migrator: &sqlx::migrate::Migrator) -> Result<()> {
309      // Ensure WAL mode is initialized via acquire_writer
310      // (WriteGuard dropped immediately, returning connection to pool)
311      {
312         let _writer = self.acquire_writer().await?;
313      }
314
315      // Migrator acquires its own connection from the write pool
316      migrator.run(&self.write_conn).await?;
317
318      Ok(())
319   }
320
321   /// Close the database and clean up resources
322   ///
323   /// This closes all connections in the pool and removes the database from the cache.
324   /// After calling close, any operations on this database will return `Error::DatabaseClosed`.
325   ///
326   /// Note: Takes `Arc<Self>` to consume ownership, preventing use-after-close at compile time.
327   /// The registry stores `Weak` references, so when this Arc is dropped, the database is freed.
328   ///
329   /// # Example
330   ///
331   /// ```no_run
332   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
333   ///
334   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
335   /// let db = SqliteDatabase::connect("test.db", None).await?;
336   /// // ... use database ...
337   /// db.close().await?;
338   /// # Ok(())
339   /// # }
340   /// ```
341   pub async fn close(self: Arc<Self>) -> Result<()> {
342      // Mark as closed
343      self.closed.store(true, Ordering::SeqCst);
344
345      // Remove from registry
346      if let Err(e) = uncache_database(&self.path).await {
347         error!("Failed to remove database from cache: {}", e);
348      }
349
350      // This will await all readers to be returned
351      self.read_pool.close().await;
352
353      // Checkpoint WAL before closing the write connection to flush changes and truncate WAL file
354      // Only attempt if WAL was initialized (write connection was used)
355      if self.wal_initialized.load(Ordering::SeqCst)
356         && let Ok(mut conn) = self.write_conn.acquire().await
357      {
358         let _ = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
359            .execute(&mut *conn)
360            .await;
361      }
362
363      self.write_conn.close().await;
364
365      Ok(())
366   }
367
368   /// Close the database and delete all database files
369   ///
370   /// This closes all connections and then deletes the database file,
371   /// WAL file, and SHM file from disk. Use with caution!
372   ///
373   /// Note: Takes `Arc<Self>` to consume ownership, preventing use-after-close at compile time.
374   /// The registry stores `Weak` references, so when this Arc is dropped, the database is freed.
375   ///
376   /// # Example
377   ///
378   /// ```no_run
379   /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
380   ///
381   /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
382   /// let db = SqliteDatabase::connect("temp.db", None).await?;
383   /// // ... use database ...
384   /// db.remove().await?;
385   /// # Ok(())
386   /// # }
387   /// ```
388   pub async fn remove(self: Arc<Self>) -> Result<()> {
389      // Clone path before closing (since close consumes self)
390      let path = self.path.clone();
391
392      // Close all connections and clean up
393      self.close().await?;
394
395      // Remove main database file - propagate errors (file should exist)
396      std::fs::remove_file(&path).map_err(Error::Io)?;
397
398      // Remove WAL and SHM files - ignore "not found" but propagate other errors
399      // (these files may not exist if WAL was never initialized)
400      let wal_path = path.with_extension("db-wal");
401      if let Err(e) = std::fs::remove_file(&wal_path)
402         && e.kind() != std::io::ErrorKind::NotFound
403      {
404         return Err(Error::Io(e));
405      }
406
407      let shm_path = path.with_extension("db-shm");
408      if let Err(e) = std::fs::remove_file(&shm_path)
409         && e.kind() != std::io::ErrorKind::NotFound
410      {
411         return Err(Error::Io(e));
412      }
413
414      Ok(())
415   }
416}