microsandbox_db/connection.rs
1//! Typed wrappers around `sea_orm::DatabaseConnection`.
2//!
3//! Splits a connection into [`DbReadConnection`] and [`DbWriteConnection`]
4//! so the type system enforces which pool a given operation hits. SQLite is
5//! single-writer system-wide; routing every write through a dedicated
6//! single-connection write pool turns intra-process contention into an
7//! in-process queue rather than `SQLITE_BUSY` retries.
8//!
9//! Both types implement [`sea_orm::ConnectionTrait`], so existing query
10//! builders (`Entity::find().all(db)`, `Entity::insert(...).exec(db)`, etc.)
11//! work without source changes — callers just pick the right type for the
12//! operation.
13
14use std::{future::Future, path::Path, time::Duration};
15
16use sea_orm::{
17 ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, ExecResult,
18 QueryResult, Statement, TransactionTrait,
19};
20
21use crate::{pool, retry, retry::IsSqliteBusy};
22
23/// Read pool. Multi-connection; concurrent reads enabled by WAL mode.
24///
25/// `ConnectionTrait` is implemented so SELECTs work transparently. Writes
26/// also technically execute (sea-orm has no read-only enforcement at the
27/// trait level), but doing so via this type defeats the purpose — write
28/// paths must take a [`DbWriteConnection`] argument.
29///
30/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
31/// the underlying sqlx pool, so clones share connection state.
32#[derive(Debug, Clone)]
33pub struct DbReadConnection(DatabaseConnection);
34
35/// Write pool. Single connection; serialises in-process writes so the
36/// SQLite writer lock is never contested from within one process.
37///
38/// Cross-process contention with other writers (e.g. the in-VM runtime)
39/// still exists and is absorbed by the `busy_timeout` PRAGMA + the
40/// retry-on-busy transaction helpers (added in a follow-up step).
41///
42/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
43/// the underlying sqlx pool, so clones share the same single connection.
44#[derive(Debug, Clone)]
45pub struct DbWriteConnection(DatabaseConnection);
46
47impl DbReadConnection {
48 /// Wrap a sea-orm connection as a read pool.
49 pub fn new(inner: DatabaseConnection) -> Self {
50 Self(inner)
51 }
52
53 /// Open a stand-alone read pool at `db_path` with shared PRAGMAs.
54 ///
55 /// Read-only: opening a non-existent DB fails rather than creating it, so a
56 /// read consumer never authors or pre-empts the catalog owned by `msb`.
57 pub async fn open(
58 db_path: &Path,
59 max_connections: u32,
60 connect_timeout: Duration,
61 busy_timeout: Duration,
62 ) -> Result<Self, sqlx::Error> {
63 let conn = pool::build_pool(
64 db_path,
65 max_connections,
66 connect_timeout,
67 busy_timeout,
68 false,
69 )
70 .await?;
71 Ok(Self(conn))
72 }
73
74 /// Borrow the underlying sea-orm connection.
75 pub fn inner(&self) -> &DatabaseConnection {
76 &self.0
77 }
78}
79
80impl DbWriteConnection {
81 /// Wrap a sea-orm connection as a write pool.
82 pub fn new(inner: DatabaseConnection) -> Self {
83 Self(inner)
84 }
85
86 /// Open a stand-alone single-connection write pool at `db_path`.
87 ///
88 /// Used by callers that don't need a paired read pool (e.g. the in-VM
89 /// runtime, which only writes run records).
90 pub async fn open(
91 db_path: &Path,
92 connect_timeout: Duration,
93 busy_timeout: Duration,
94 ) -> Result<Self, sqlx::Error> {
95 let conn = pool::build_pool(db_path, 1, connect_timeout, busy_timeout, true).await?;
96 Ok(Self(conn))
97 }
98
99 /// Borrow the underlying sea-orm connection.
100 pub fn inner(&self) -> &DatabaseConnection {
101 &self.0
102 }
103
104 /// Run a multi-statement atomic write inside a transaction with
105 /// automatic retry on `SQLITE_BUSY` / `SQLITE_BUSY_SNAPSHOT`. Use this
106 /// when you need several writes to commit (or roll back) as a unit.
107 /// Single-statement writes don't need this — auto-commit `.exec(db)`
108 /// already retries via the `ConnectionTrait` impl below.
109 ///
110 /// `f` is invoked once per attempt with a freshly opened transaction.
111 /// Return `Ok((txn, value))` to commit, or any `Err` to roll back (the
112 /// helper drops the transaction on failure, which sea-orm rolls back).
113 /// The closure must be callable multiple times: clone owned data inside
114 /// the body so retries see fresh values.
115 ///
116 /// Generic over the closure's error type `E` so callers can return
117 /// app-level errors directly (e.g. `MicrosandboxError`) provided
118 /// `E: From<DbErr> + IsSqliteBusy`.
119 pub async fn transaction<F, Fut, T, E>(&self, f: F) -> Result<T, E>
120 where
121 F: Fn(DatabaseTransaction) -> Fut,
122 Fut: Future<Output = Result<(DatabaseTransaction, T), E>> + Send,
123 T: Send,
124 E: From<DbErr> + IsSqliteBusy,
125 {
126 retry::retry_on_busy(|| async {
127 let txn = self.0.begin().await?;
128 let (txn, value) = f(txn).await?;
129 txn.commit().await?;
130 Ok(value)
131 })
132 .await
133 }
134}
135
136#[async_trait::async_trait]
137impl ConnectionTrait for DbReadConnection {
138 fn get_database_backend(&self) -> DbBackend {
139 self.0.get_database_backend()
140 }
141
142 async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
143 self.0.execute(stmt).await
144 }
145
146 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
147 self.0.execute_unprepared(sql).await
148 }
149
150 async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
151 self.0.query_one(stmt).await
152 }
153
154 async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
155 self.0.query_all(stmt).await
156 }
157
158 fn support_returning(&self) -> bool {
159 self.0.support_returning()
160 }
161
162 fn is_mock_connection(&self) -> bool {
163 self.0.is_mock_connection()
164 }
165}
166
167// Auto-retry every auto-commit operation on the writer pool. Sea-orm
168// callers (`Entity::insert(...).exec(db)` etc.) ultimately funnel through
169// these `ConnectionTrait` methods, so wrapping them in `retry_on_busy`
170// gives every single-statement write inter-process retry semantics
171// without per-call-site code.
172//
173// `Statement` is `Clone`, so the closure can produce a fresh future on
174// each retry. Multi-statement atomic work still uses `transaction()`
175// above (which retries the whole closure body); statements *inside* a
176// transaction call `ConnectionTrait` methods on `DatabaseTransaction`,
177// not on this type, so no double-retry occurs.
178#[async_trait::async_trait]
179impl ConnectionTrait for DbWriteConnection {
180 fn get_database_backend(&self) -> DbBackend {
181 self.0.get_database_backend()
182 }
183
184 async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
185 retry::retry_on_busy(|| async { self.0.execute(stmt.clone()).await }).await
186 }
187
188 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
189 retry::retry_on_busy(|| async { self.0.execute_unprepared(sql).await }).await
190 }
191
192 async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
193 retry::retry_on_busy(|| async { self.0.query_one(stmt.clone()).await }).await
194 }
195
196 async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
197 retry::retry_on_busy(|| async { self.0.query_all(stmt.clone()).await }).await
198 }
199
200 fn support_returning(&self) -> bool {
201 self.0.support_returning()
202 }
203
204 fn is_mock_connection(&self) -> bool {
205 self.0.is_mock_connection()
206 }
207}
208
209//--------------------------------------------------------------------------------------------------
210// Tests
211//--------------------------------------------------------------------------------------------------
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 const TIMEOUT: Duration = Duration::from_secs(5);
218
219 #[tokio::test]
220 async fn read_open_does_not_create_db() {
221 // Existing directory, missing DB file.
222 let dir = tempfile::tempdir().unwrap();
223 let db_path = dir.path().join("catalog.db");
224
225 let result = DbReadConnection::open(&db_path, 1, TIMEOUT, TIMEOUT).await;
226
227 assert!(result.is_err(), "read open should fail on a missing db");
228 assert!(
229 !db_path.exists(),
230 "read open must not create the catalog db file"
231 );
232 }
233
234 #[tokio::test]
235 async fn write_open_creates_db() {
236 let dir = tempfile::tempdir().unwrap();
237 let db_path = dir.path().join("catalog.db");
238
239 let conn = DbWriteConnection::open(&db_path, TIMEOUT, TIMEOUT).await;
240
241 assert!(conn.is_ok(), "write open should succeed");
242 assert!(
243 db_path.exists(),
244 "write open should create the catalog db file"
245 );
246 }
247}