Skip to main content

microsandbox_db/
connection.rs

1//! Typed wrappers around `sea_orm::DatabaseConnection`.
2//!
3//! Splits a connection into [`DbReadConnection`] and [`DbWriteConnection`]
4//! so the type system enforces which pool a given operation hits. SQLite is
5//! single-writer system-wide; routing every write through a dedicated
6//! single-connection write pool turns intra-process contention into an
7//! in-process queue rather than `SQLITE_BUSY` retries.
8//!
9//! Both types implement [`sea_orm::ConnectionTrait`], so existing query
10//! builders (`Entity::find().all(db)`, `Entity::insert(...).exec(db)`, etc.)
11//! work without source changes — callers just pick the right type for the
12//! operation.
13
14use std::{future::Future, path::Path, time::Duration};
15
16use sea_orm::{
17    ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, ExecResult,
18    QueryResult, Statement, TransactionTrait,
19};
20
21use crate::{pool, retry, retry::IsSqliteBusy};
22
23/// Read pool. Multi-connection; concurrent reads enabled by WAL mode.
24///
25/// `ConnectionTrait` is implemented so SELECTs work transparently. Writes
26/// also technically execute (sea-orm has no read-only enforcement at the
27/// trait level), but doing so via this type defeats the purpose — write
28/// paths must take a [`DbWriteConnection`] argument.
29///
30/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
31/// the underlying sqlx pool, so clones share connection state.
32#[derive(Debug, Clone)]
33pub struct DbReadConnection(DatabaseConnection);
34
35/// Write pool. Single connection; serialises in-process writes so the
36/// SQLite writer lock is never contested from within one process.
37///
38/// Cross-process contention with other writers (e.g. the in-VM runtime)
39/// still exists and is absorbed by the `busy_timeout` PRAGMA + the
40/// retry-on-busy transaction helpers (added in a follow-up step).
41///
42/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
43/// the underlying sqlx pool, so clones share the same single connection.
44#[derive(Debug, Clone)]
45pub struct DbWriteConnection(DatabaseConnection);
46
47impl DbReadConnection {
48    /// Wrap a sea-orm connection as a read pool.
49    pub fn new(inner: DatabaseConnection) -> Self {
50        Self(inner)
51    }
52
53    /// Open a stand-alone read pool at `db_path` with shared PRAGMAs.
54    ///
55    /// Read-only: opening a non-existent DB fails rather than creating it, so a
56    /// read consumer never authors or pre-empts the catalog owned by `msb`.
57    pub async fn open(
58        db_path: &Path,
59        max_connections: u32,
60        connect_timeout: Duration,
61        busy_timeout: Duration,
62    ) -> Result<Self, sqlx::Error> {
63        let conn = pool::build_pool(
64            db_path,
65            max_connections,
66            connect_timeout,
67            busy_timeout,
68            false,
69        )
70        .await?;
71        Ok(Self(conn))
72    }
73
74    /// Borrow the underlying sea-orm connection.
75    pub fn inner(&self) -> &DatabaseConnection {
76        &self.0
77    }
78}
79
80impl DbWriteConnection {
81    /// Wrap a sea-orm connection as a write pool.
82    pub fn new(inner: DatabaseConnection) -> Self {
83        Self(inner)
84    }
85
86    /// Open a stand-alone single-connection write pool at `db_path`.
87    ///
88    /// Used by callers that don't need a paired read pool (e.g. the in-VM
89    /// runtime, which only writes run records).
90    pub async fn open(
91        db_path: &Path,
92        connect_timeout: Duration,
93        busy_timeout: Duration,
94    ) -> Result<Self, sqlx::Error> {
95        let conn = pool::build_pool(db_path, 1, connect_timeout, busy_timeout, true).await?;
96        Ok(Self(conn))
97    }
98
99    /// Borrow the underlying sea-orm connection.
100    pub fn inner(&self) -> &DatabaseConnection {
101        &self.0
102    }
103
104    /// Run a multi-statement atomic write inside a transaction with
105    /// automatic retry on `SQLITE_BUSY` / `SQLITE_BUSY_SNAPSHOT`. Use this
106    /// when you need several writes to commit (or roll back) as a unit.
107    /// Single-statement writes don't need this — auto-commit `.exec(db)`
108    /// already retries via the `ConnectionTrait` impl below.
109    ///
110    /// `f` is invoked once per attempt with a freshly opened transaction.
111    /// Return `Ok((txn, value))` to commit, or any `Err` to roll back (the
112    /// helper drops the transaction on failure, which sea-orm rolls back).
113    /// The closure must be callable multiple times: clone owned data inside
114    /// the body so retries see fresh values.
115    ///
116    /// Generic over the closure's error type `E` so callers can return
117    /// app-level errors directly (e.g. `MicrosandboxError`) provided
118    /// `E: From<DbErr> + IsSqliteBusy`.
119    pub async fn transaction<F, Fut, T, E>(&self, f: F) -> Result<T, E>
120    where
121        F: Fn(DatabaseTransaction) -> Fut,
122        Fut: Future<Output = Result<(DatabaseTransaction, T), E>> + Send,
123        T: Send,
124        E: From<DbErr> + IsSqliteBusy,
125    {
126        retry::retry_on_busy(|| async {
127            let txn = self.0.begin().await?;
128            let (txn, value) = f(txn).await?;
129            txn.commit().await?;
130            Ok(value)
131        })
132        .await
133    }
134}
135
136#[async_trait::async_trait]
137impl ConnectionTrait for DbReadConnection {
138    fn get_database_backend(&self) -> DbBackend {
139        self.0.get_database_backend()
140    }
141
142    async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
143        self.0.execute(stmt).await
144    }
145
146    async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
147        self.0.execute_unprepared(sql).await
148    }
149
150    async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
151        self.0.query_one(stmt).await
152    }
153
154    async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
155        self.0.query_all(stmt).await
156    }
157
158    fn support_returning(&self) -> bool {
159        self.0.support_returning()
160    }
161
162    fn is_mock_connection(&self) -> bool {
163        self.0.is_mock_connection()
164    }
165}
166
167// Auto-retry every auto-commit operation on the writer pool. Sea-orm
168// callers (`Entity::insert(...).exec(db)` etc.) ultimately funnel through
169// these `ConnectionTrait` methods, so wrapping them in `retry_on_busy`
170// gives every single-statement write inter-process retry semantics
171// without per-call-site code.
172//
173// `Statement` is `Clone`, so the closure can produce a fresh future on
174// each retry. Multi-statement atomic work still uses `transaction()`
175// above (which retries the whole closure body); statements *inside* a
176// transaction call `ConnectionTrait` methods on `DatabaseTransaction`,
177// not on this type, so no double-retry occurs.
178#[async_trait::async_trait]
179impl ConnectionTrait for DbWriteConnection {
180    fn get_database_backend(&self) -> DbBackend {
181        self.0.get_database_backend()
182    }
183
184    async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
185        retry::retry_on_busy(|| async { self.0.execute(stmt.clone()).await }).await
186    }
187
188    async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
189        retry::retry_on_busy(|| async { self.0.execute_unprepared(sql).await }).await
190    }
191
192    async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
193        retry::retry_on_busy(|| async { self.0.query_one(stmt.clone()).await }).await
194    }
195
196    async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
197        retry::retry_on_busy(|| async { self.0.query_all(stmt.clone()).await }).await
198    }
199
200    fn support_returning(&self) -> bool {
201        self.0.support_returning()
202    }
203
204    fn is_mock_connection(&self) -> bool {
205        self.0.is_mock_connection()
206    }
207}
208
209//--------------------------------------------------------------------------------------------------
210// Tests
211//--------------------------------------------------------------------------------------------------
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    const TIMEOUT: Duration = Duration::from_secs(5);
218
219    #[tokio::test]
220    async fn read_open_does_not_create_db() {
221        // Existing directory, missing DB file.
222        let dir = tempfile::tempdir().unwrap();
223        let db_path = dir.path().join("catalog.db");
224
225        let result = DbReadConnection::open(&db_path, 1, TIMEOUT, TIMEOUT).await;
226
227        assert!(result.is_err(), "read open should fail on a missing db");
228        assert!(
229            !db_path.exists(),
230            "read open must not create the catalog db file"
231        );
232    }
233
234    #[tokio::test]
235    async fn write_open_creates_db() {
236        let dir = tempfile::tempdir().unwrap();
237        let db_path = dir.path().join("catalog.db");
238
239        let conn = DbWriteConnection::open(&db_path, TIMEOUT, TIMEOUT).await;
240
241        assert!(conn.is_ok(), "write open should succeed");
242        assert!(
243            db_path.exists(),
244            "write open should create the catalog db file"
245        );
246    }
247}