Skip to main content

microsandbox_db/
connection.rs

1//! Typed wrappers around `sea_orm::DatabaseConnection`.
2//!
3//! Splits a connection into [`DbReadConnection`] and [`DbWriteConnection`]
4//! so the type system enforces which pool a given operation hits. SQLite is
5//! single-writer system-wide; routing every write through a dedicated
6//! single-connection write pool turns intra-process contention into an
7//! in-process queue rather than `SQLITE_BUSY` retries.
8//!
9//! Both types implement [`sea_orm::ConnectionTrait`], so existing query
10//! builders (`Entity::find().all(db)`, `Entity::insert(...).exec(db)`, etc.)
11//! work without source changes — callers just pick the right type for the
12//! operation.
13
14use std::{future::Future, path::Path, time::Duration};
15
16use sea_orm::{
17    ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, ExecResult,
18    QueryResult, Statement, TransactionTrait,
19};
20
21use crate::{pool, retry, retry::IsSqliteBusy};
22
23/// Read pool. Multi-connection; concurrent reads enabled by WAL mode.
24///
25/// `ConnectionTrait` is implemented so SELECTs work transparently. Writes
26/// also technically execute (sea-orm has no read-only enforcement at the
27/// trait level), but doing so via this type defeats the purpose — write
28/// paths must take a [`DbWriteConnection`] argument.
29///
30/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
31/// the underlying sqlx pool, so clones share connection state.
32#[derive(Debug, Clone)]
33pub struct DbReadConnection(DatabaseConnection);
34
35/// Write pool. Single connection; serialises in-process writes so the
36/// SQLite writer lock is never contested from within one process.
37///
38/// Cross-process contention with other writers (e.g. the in-VM runtime)
39/// still exists and is absorbed by the `busy_timeout` PRAGMA + the
40/// retry-on-busy transaction helpers (added in a follow-up step).
41///
42/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
43/// the underlying sqlx pool, so clones share the same single connection.
44#[derive(Debug, Clone)]
45pub struct DbWriteConnection(DatabaseConnection);
46
47impl DbReadConnection {
48    /// Wrap a sea-orm connection as a read pool.
49    pub fn new(inner: DatabaseConnection) -> Self {
50        Self(inner)
51    }
52
53    /// Open a stand-alone read pool at `db_path` with shared PRAGMAs.
54    pub async fn open(
55        db_path: &Path,
56        max_connections: u32,
57        connect_timeout: Duration,
58        busy_timeout: Duration,
59    ) -> Result<Self, sqlx::Error> {
60        let conn =
61            pool::build_pool(db_path, max_connections, connect_timeout, busy_timeout).await?;
62        Ok(Self(conn))
63    }
64
65    /// Borrow the underlying sea-orm connection.
66    pub fn inner(&self) -> &DatabaseConnection {
67        &self.0
68    }
69}
70
71impl DbWriteConnection {
72    /// Wrap a sea-orm connection as a write pool.
73    pub fn new(inner: DatabaseConnection) -> Self {
74        Self(inner)
75    }
76
77    /// Open a stand-alone single-connection write pool at `db_path`.
78    ///
79    /// Used by callers that don't need a paired read pool (e.g. the in-VM
80    /// runtime, which only writes run records).
81    pub async fn open(
82        db_path: &Path,
83        connect_timeout: Duration,
84        busy_timeout: Duration,
85    ) -> Result<Self, sqlx::Error> {
86        let conn = pool::build_pool(db_path, 1, connect_timeout, busy_timeout).await?;
87        Ok(Self(conn))
88    }
89
90    /// Borrow the underlying sea-orm connection.
91    pub fn inner(&self) -> &DatabaseConnection {
92        &self.0
93    }
94
95    /// Run a multi-statement atomic write inside a transaction with
96    /// automatic retry on `SQLITE_BUSY` / `SQLITE_BUSY_SNAPSHOT`. Use this
97    /// when you need several writes to commit (or roll back) as a unit.
98    /// Single-statement writes don't need this — auto-commit `.exec(db)`
99    /// already retries via the `ConnectionTrait` impl below.
100    ///
101    /// `f` is invoked once per attempt with a freshly opened transaction.
102    /// Return `Ok((txn, value))` to commit, or any `Err` to roll back (the
103    /// helper drops the transaction on failure, which sea-orm rolls back).
104    /// The closure must be callable multiple times: clone owned data inside
105    /// the body so retries see fresh values.
106    ///
107    /// Generic over the closure's error type `E` so callers can return
108    /// app-level errors directly (e.g. `MicrosandboxError`) provided
109    /// `E: From<DbErr> + IsSqliteBusy`.
110    pub async fn transaction<F, Fut, T, E>(&self, f: F) -> Result<T, E>
111    where
112        F: Fn(DatabaseTransaction) -> Fut,
113        Fut: Future<Output = Result<(DatabaseTransaction, T), E>> + Send,
114        T: Send,
115        E: From<DbErr> + IsSqliteBusy,
116    {
117        retry::retry_on_busy(|| async {
118            let txn = self.0.begin().await?;
119            let (txn, value) = f(txn).await?;
120            txn.commit().await?;
121            Ok(value)
122        })
123        .await
124    }
125}
126
127#[async_trait::async_trait]
128impl ConnectionTrait for DbReadConnection {
129    fn get_database_backend(&self) -> DbBackend {
130        self.0.get_database_backend()
131    }
132
133    async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
134        self.0.execute(stmt).await
135    }
136
137    async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
138        self.0.execute_unprepared(sql).await
139    }
140
141    async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
142        self.0.query_one(stmt).await
143    }
144
145    async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
146        self.0.query_all(stmt).await
147    }
148
149    fn support_returning(&self) -> bool {
150        self.0.support_returning()
151    }
152
153    fn is_mock_connection(&self) -> bool {
154        self.0.is_mock_connection()
155    }
156}
157
158// Auto-retry every auto-commit operation on the writer pool. Sea-orm
159// callers (`Entity::insert(...).exec(db)` etc.) ultimately funnel through
160// these `ConnectionTrait` methods, so wrapping them in `retry_on_busy`
161// gives every single-statement write inter-process retry semantics
162// without per-call-site code.
163//
164// `Statement` is `Clone`, so the closure can produce a fresh future on
165// each retry. Multi-statement atomic work still uses `transaction()`
166// above (which retries the whole closure body); statements *inside* a
167// transaction call `ConnectionTrait` methods on `DatabaseTransaction`,
168// not on this type, so no double-retry occurs.
169#[async_trait::async_trait]
170impl ConnectionTrait for DbWriteConnection {
171    fn get_database_backend(&self) -> DbBackend {
172        self.0.get_database_backend()
173    }
174
175    async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
176        retry::retry_on_busy(|| async { self.0.execute(stmt.clone()).await }).await
177    }
178
179    async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
180        retry::retry_on_busy(|| async { self.0.execute_unprepared(sql).await }).await
181    }
182
183    async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
184        retry::retry_on_busy(|| async { self.0.query_one(stmt.clone()).await }).await
185    }
186
187    async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
188        retry::retry_on_busy(|| async { self.0.query_all(stmt.clone()).await }).await
189    }
190
191    fn support_returning(&self) -> bool {
192        self.0.support_returning()
193    }
194
195    fn is_mock_connection(&self) -> bool {
196        self.0.is_mock_connection()
197    }
198}