sqlx_sqlite_conn_mgr/database.rs
1//! SQLite database with connection pooling and optional write access
2
3use crate::Result;
4use crate::config::SqliteDatabaseConfig;
5use crate::error::Error;
6use crate::registry::{get_or_open_database, is_memory_database, uncache_database};
7use crate::write_guard::WriteGuard;
8use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
9use sqlx::{ConnectOptions, Pool, Sqlite};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use tracing::error;
14
15/// Analysis limit for PRAGMA optimize on close.
16/// SQLite recommends 100-1000 for older versions; 3.46.0+ handles automatically.
17/// See: https://www.sqlite.org/lang_analyze.html#recommended_usage_pattern
18const OPTIMIZE_ANALYSIS_LIMIT: u32 = 400;
19
20/// SQLite database with connection pooling for concurrent reads and optional exclusive writes.
21///
22/// Once the database is opened it can be used for read-only operations by calling `read_pool()`.
23/// Write operations are available by calling `acquire_writer()` which lazily initializes WAL mode
24/// on first use.
25///
26/// # Example
27///
28/// ```no_run
29/// use sqlx_sqlite_conn_mgr::SqliteDatabase;
30/// use std::sync::Arc;
31///
32/// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
33/// let db = SqliteDatabase::connect("test.db", None).await?;
34///
35/// // Use read_pool for SELECT queries (concurrent reads)
36/// let rows = sqlx::query("SELECT * FROM users")
37/// .fetch_all(db.read_pool()?)
38/// .await?;
39///
40/// // Optionally acquire writer for INSERT/UPDATE/DELETE (exclusive)
41/// let mut writer = db.acquire_writer().await?;
42/// sqlx::query("INSERT INTO users (name) VALUES (?)")
43/// .bind("Alice")
44/// .execute(&mut *writer)
45/// .await?;
46///
47/// db.close().await?;
48/// # Ok(())
49/// # }
50/// ```
51#[derive(Debug)]
52pub struct SqliteDatabase {
53 /// Pool of read-only connections (defaults to max_connections=6) for concurrent reads
54 read_pool: Pool<Sqlite>,
55
56 /// Single read-write connection pool (max_connections=1) for serialized writes
57 write_conn: Pool<Sqlite>,
58
59 /// Tracks if WAL mode has been initialized (set on first write)
60 wal_initialized: AtomicBool,
61
62 /// Marks database as closed to prevent further operations
63 closed: AtomicBool,
64
65 /// Path to database file (used for cleanup and registry lookups)
66 path: PathBuf,
67}
68
69impl SqliteDatabase {
70 /// Get the database file path as a string
71 ///
72 /// Used internally (crate-private) for ATTACH DATABASE statements
73 pub(crate) fn path_str(&self) -> String {
74 self.path.to_string_lossy().to_string()
75 }
76
77 /// Connect to a SQLite database
78 ///
79 /// If the database is already connected, returns the existing connection.
80 /// Multiple calls with the same path will return the same database instance.
81 ///
82 /// The database is created if it doesn't exist. WAL mode is enabled when
83 /// `acquire_writer()` is first called.
84 ///
85 /// # Arguments
86 ///
87 /// * `path` - Path to the SQLite database file (will be created if missing)
88 /// * `custom_config` - Optional custom configuration for connection pools.
89 /// Pass `None` to use defaults (6 max read connections, 30 second idle timeout).
90 /// Specify a custom configuration when the defaults don't meet your requirements.
91 ///
92 /// # Examples
93 ///
94 /// ```no_run
95 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
96 /// use std::sync::Arc;
97 ///
98 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
99 /// // Connect with default configuration (recommended for most use cases)
100 /// let db = SqliteDatabase::connect("test.db", None).await?;
101 /// # Ok(())
102 /// # }
103 /// ```
104 ///
105 /// ```no_run
106 /// use sqlx_sqlite_conn_mgr::{SqliteDatabase, SqliteDatabaseConfig};
107 /// use std::sync::Arc;
108 ///
109 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
110 /// // Customize configuration when defaults don't meet your requirements
111 /// let custom_config = SqliteDatabaseConfig {
112 /// max_read_connections: 10,
113 /// idle_timeout_secs: 60,
114 /// };
115 /// let db = SqliteDatabase::connect("test.db", Some(custom_config)).await?;
116 /// # Ok(())
117 /// # }
118 /// ```
119 pub async fn connect(
120 path: impl AsRef<Path>,
121 custom_config: Option<SqliteDatabaseConfig>,
122 ) -> Result<Arc<Self>> {
123 let config = custom_config.unwrap_or_default();
124 let path = path.as_ref();
125
126 // Validate path is not empty
127 if path.as_os_str().is_empty() {
128 return Err(crate::error::Error::Io(std::io::Error::new(
129 std::io::ErrorKind::InvalidInput,
130 "Database path cannot be empty",
131 )));
132 }
133
134 let path = path.to_path_buf();
135
136 get_or_open_database(&path, || async {
137 // Check if database file exists
138 let db_exists = path.exists();
139
140 // If database doesn't exist and not :memory:, create it with a temporary connection
141 // We don't keep this connection - WAL mode will be set later in acquire_writer()
142 //
143 // Why do we need to manually create the database file? We could just let the connection
144 // create it if it doesn't exist, using `create_if_missing(true)`, right? Not if we called
145 // connect and then our very first query was a read-only query, like `PRAGMA user_version;`,
146 // for example. That would fail because the read pool connections are read-only and cannot
147 // create the file
148 if !db_exists && !is_memory_database(&path) {
149 let create_options = SqliteConnectOptions::new()
150 .filename(&path)
151 .create_if_missing(true)
152 .read_only(false);
153
154 // Create database file with a temporary connection
155 let conn = create_options.connect().await?;
156 drop(conn); // Close immediately after creating the file
157 }
158
159 // Create read pool with read-only connections
160 let read_options = SqliteConnectOptions::new()
161 .filename(&path)
162 .read_only(true)
163 .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);
164
165 let read_pool = SqlitePoolOptions::new()
166 .max_connections(config.max_read_connections)
167 .min_connections(0)
168 .idle_timeout(Some(std::time::Duration::from_secs(
169 config.idle_timeout_secs,
170 )))
171 .connect_with(read_options)
172 .await?;
173
174 // Create write pool with a single read-write connection
175 let write_options = SqliteConnectOptions::new()
176 .filename(&path)
177 .read_only(false)
178 .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);
179
180 let write_conn = SqlitePoolOptions::new()
181 .max_connections(1)
182 .min_connections(0)
183 .idle_timeout(Some(std::time::Duration::from_secs(
184 config.idle_timeout_secs,
185 )))
186 .connect_with(write_options)
187 .await?;
188
189 Ok(Self {
190 read_pool,
191 write_conn,
192 wal_initialized: AtomicBool::new(false),
193 closed: AtomicBool::new(false),
194 path: path.clone(),
195 })
196 })
197 .await
198 }
199
200 /// Get a reference to the connection pool for executing read queries
201 ///
202 /// Use this for concurrent read operations. Multiple readers can access
203 /// the pool simultaneously.
204 ///
205 /// # Example
206 ///
207 /// ```no_run
208 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
209 /// use sqlx::query;
210 /// use std::sync::Arc;
211 ///
212 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
213 /// let db = SqliteDatabase::connect("test.db", None).await?;
214 /// let result = query("SELECT * FROM users")
215 /// .fetch_all(db.read_pool()?)
216 /// .await?;
217 /// # Ok(())
218 /// # }
219 /// ```
220 pub fn read_pool(&self) -> Result<&Pool<Sqlite>> {
221 if self.closed.load(Ordering::SeqCst) {
222 return Err(Error::DatabaseClosed);
223 }
224 Ok(&self.read_pool)
225 }
226
227 /// Acquire exclusive write access to the database
228 ///
229 /// This method returns a `WriteGuard` that provides exclusive access to
230 /// the single write connection. Only one writer can exist at a time.
231 ///
232 /// On the first call, this method will enable WAL mode on the database.
233 /// Subsequent calls reuse the same write connection.
234 ///
235 /// # Example
236 ///
237 /// ```no_run
238 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
239 /// use sqlx::query;
240 /// use std::sync::Arc;
241 ///
242 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
243 /// let db = SqliteDatabase::connect("test.db", None).await?;
244 /// let mut writer = db.acquire_writer().await?;
245 /// query("INSERT INTO users (name) VALUES (?)")
246 /// .bind("Alice")
247 /// .execute(&mut *writer)
248 /// .await?;
249 /// # Ok(())
250 /// # }
251 /// ```
252 pub async fn acquire_writer(&self) -> Result<WriteGuard> {
253 if self.closed.load(Ordering::SeqCst) {
254 return Err(Error::DatabaseClosed);
255 }
256
257 // Acquire connection from pool (max=1 ensures exclusive access)
258 let mut conn = self.write_conn.acquire().await?;
259
260 // Initialize WAL mode on first use (atomic check-and-set)
261 if self
262 .wal_initialized
263 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
264 .is_ok()
265 {
266 sqlx::query("PRAGMA journal_mode = WAL")
267 .execute(&mut *conn)
268 .await?;
269
270 // https://www.sqlite.org/wal.html#performance_considerations
271 sqlx::query("PRAGMA synchronous = NORMAL")
272 .execute(&mut *conn)
273 .await?;
274 }
275
276 // Return WriteGuard wrapping the pool connection
277 Ok(WriteGuard::new(conn))
278 }
279
280 /// Run database migrations using the provided migrator
281 ///
282 /// This method runs all pending migrations from the provided `Migrator`.
283 /// Migrations are executed using the write connection to ensure exclusive access.
284 /// WAL mode is enabled automatically before running migrations.
285 ///
286 /// SQLx tracks applied migrations in a `_sqlx_migrations` table, so calling
287 /// this method multiple times is safe - already-applied migrations are skipped.
288 ///
289 /// # Arguments
290 ///
291 /// * `migrator` - A reference to a `Migrator` containing the migrations to run.
292 /// Typically created using `sqlx::migrate!()` macro.
293 ///
294 /// # Example
295 ///
296 /// ```no_run
297 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
298 ///
299 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
300 /// // sqlx::migrate! is evaluated at compile time
301 /// static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
302 ///
303 /// let db = SqliteDatabase::connect("test.db", None).await?;
304 /// db.run_migrations(&MIGRATOR).await?;
305 /// # Ok(())
306 /// # }
307 /// ```
308 pub async fn run_migrations(&self, migrator: &sqlx::migrate::Migrator) -> Result<()> {
309 // Ensure WAL mode is initialized via acquire_writer
310 // (WriteGuard dropped immediately, returning connection to pool)
311 {
312 let _writer = self.acquire_writer().await?;
313 }
314
315 // Migrator acquires its own connection from the write pool
316 migrator.run(&self.write_conn).await?;
317
318 Ok(())
319 }
320
321 /// Close the database and clean up resources
322 ///
323 /// This closes all connections in the pool and removes the database from the cache.
324 /// After calling close, any operations on this database will return `Error::DatabaseClosed`.
325 ///
326 /// Note: Takes `Arc<Self>` to consume ownership, preventing use-after-close at compile time.
327 /// The registry stores `Weak` references, so when this Arc is dropped, the database is freed.
328 ///
329 /// # Example
330 ///
331 /// ```no_run
332 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
333 ///
334 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
335 /// let db = SqliteDatabase::connect("test.db", None).await?;
336 /// // ... use database ...
337 /// db.close().await?;
338 /// # Ok(())
339 /// # }
340 /// ```
341 pub async fn close(self: Arc<Self>) -> Result<()> {
342 // Mark as closed
343 self.closed.store(true, Ordering::SeqCst);
344
345 // Remove from registry
346 if let Err(e) = uncache_database(&self.path).await {
347 error!("Failed to remove database from cache: {}", e);
348 }
349
350 // This will await all readers to be returned
351 self.read_pool.close().await;
352
353 // Checkpoint WAL before closing the write connection to flush changes and truncate WAL file
354 // Only attempt if WAL was initialized (write connection was used)
355 if self.wal_initialized.load(Ordering::SeqCst)
356 && let Ok(mut conn) = self.write_conn.acquire().await
357 {
358 let _ = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
359 .execute(&mut *conn)
360 .await;
361 }
362
363 self.write_conn.close().await;
364
365 Ok(())
366 }
367
368 /// Close the database and delete all database files
369 ///
370 /// This closes all connections and then deletes the database file,
371 /// WAL file, and SHM file from disk. Use with caution!
372 ///
373 /// Note: Takes `Arc<Self>` to consume ownership, preventing use-after-close at compile time.
374 /// The registry stores `Weak` references, so when this Arc is dropped, the database is freed.
375 ///
376 /// # Example
377 ///
378 /// ```no_run
379 /// use sqlx_sqlite_conn_mgr::SqliteDatabase;
380 ///
381 /// # async fn example() -> Result<(), sqlx_sqlite_conn_mgr::Error> {
382 /// let db = SqliteDatabase::connect("temp.db", None).await?;
383 /// // ... use database ...
384 /// db.remove().await?;
385 /// # Ok(())
386 /// # }
387 /// ```
388 pub async fn remove(self: Arc<Self>) -> Result<()> {
389 // Clone path before closing (since close consumes self)
390 let path = self.path.clone();
391
392 // Close all connections and clean up
393 self.close().await?;
394
395 // Remove main database file - propagate errors (file should exist)
396 std::fs::remove_file(&path).map_err(Error::Io)?;
397
398 // Remove WAL and SHM files - ignore "not found" but propagate other errors
399 // (these files may not exist if WAL was never initialized)
400 let wal_path = path.with_extension("db-wal");
401 if let Err(e) = std::fs::remove_file(&wal_path)
402 && e.kind() != std::io::ErrorKind::NotFound
403 {
404 return Err(Error::Io(e));
405 }
406
407 let shm_path = path.with_extension("db-shm");
408 if let Err(e) = std::fs::remove_file(&shm_path)
409 && e.kind() != std::io::ErrorKind::NotFound
410 {
411 return Err(Error::Io(e));
412 }
413
414 Ok(())
415 }
416}