microsandbox_db/connection.rs
1//! Typed wrappers around `sea_orm::DatabaseConnection`.
2//!
3//! Splits a connection into [`DbReadConnection`] and [`DbWriteConnection`]
4//! so the type system enforces which pool a given operation hits. SQLite is
5//! single-writer system-wide; routing every write through a dedicated
6//! single-connection write pool turns intra-process contention into an
7//! in-process queue rather than `SQLITE_BUSY` retries.
8//!
9//! Both types implement [`sea_orm::ConnectionTrait`], so existing query
10//! builders (`Entity::find().all(db)`, `Entity::insert(...).exec(db)`, etc.)
11//! work without source changes — callers just pick the right type for the
12//! operation.
13
14use std::{future::Future, path::Path, time::Duration};
15
16use sea_orm::{
17 ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, ExecResult,
18 QueryResult, Statement, TransactionTrait,
19};
20
21use crate::{pool, retry, retry::IsSqliteBusy};
22
23/// Read pool. Multi-connection; concurrent reads enabled by WAL mode.
24///
25/// `ConnectionTrait` is implemented so SELECTs work transparently. Writes
26/// also technically execute (sea-orm has no read-only enforcement at the
27/// trait level), but doing so via this type defeats the purpose — write
28/// paths must take a [`DbWriteConnection`] argument.
29///
30/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
31/// the underlying sqlx pool, so clones share connection state.
32#[derive(Debug, Clone)]
33pub struct DbReadConnection(DatabaseConnection);
34
35/// Write pool. Single connection; serialises in-process writes so the
36/// SQLite writer lock is never contested from within one process.
37///
38/// Cross-process contention with other writers (e.g. the in-VM runtime)
39/// still exists and is absorbed by the `busy_timeout` PRAGMA + the
40/// retry-on-busy transaction helpers (added in a follow-up step).
41///
42/// `Clone` is cheap: the inner `DatabaseConnection` holds an `Arc` over
43/// the underlying sqlx pool, so clones share the same single connection.
44#[derive(Debug, Clone)]
45pub struct DbWriteConnection(DatabaseConnection);
46
47impl DbReadConnection {
48 /// Wrap a sea-orm connection as a read pool.
49 pub fn new(inner: DatabaseConnection) -> Self {
50 Self(inner)
51 }
52
53 /// Open a stand-alone read pool at `db_path` with shared PRAGMAs.
54 pub async fn open(
55 db_path: &Path,
56 max_connections: u32,
57 connect_timeout: Duration,
58 busy_timeout: Duration,
59 ) -> Result<Self, sqlx::Error> {
60 let conn =
61 pool::build_pool(db_path, max_connections, connect_timeout, busy_timeout).await?;
62 Ok(Self(conn))
63 }
64
65 /// Borrow the underlying sea-orm connection.
66 pub fn inner(&self) -> &DatabaseConnection {
67 &self.0
68 }
69}
70
71impl DbWriteConnection {
72 /// Wrap a sea-orm connection as a write pool.
73 pub fn new(inner: DatabaseConnection) -> Self {
74 Self(inner)
75 }
76
77 /// Open a stand-alone single-connection write pool at `db_path`.
78 ///
79 /// Used by callers that don't need a paired read pool (e.g. the in-VM
80 /// runtime, which only writes run records).
81 pub async fn open(
82 db_path: &Path,
83 connect_timeout: Duration,
84 busy_timeout: Duration,
85 ) -> Result<Self, sqlx::Error> {
86 let conn = pool::build_pool(db_path, 1, connect_timeout, busy_timeout).await?;
87 Ok(Self(conn))
88 }
89
90 /// Borrow the underlying sea-orm connection.
91 pub fn inner(&self) -> &DatabaseConnection {
92 &self.0
93 }
94
95 /// Run a multi-statement atomic write inside a transaction with
96 /// automatic retry on `SQLITE_BUSY` / `SQLITE_BUSY_SNAPSHOT`. Use this
97 /// when you need several writes to commit (or roll back) as a unit.
98 /// Single-statement writes don't need this — auto-commit `.exec(db)`
99 /// already retries via the `ConnectionTrait` impl below.
100 ///
101 /// `f` is invoked once per attempt with a freshly opened transaction.
102 /// Return `Ok((txn, value))` to commit, or any `Err` to roll back (the
103 /// helper drops the transaction on failure, which sea-orm rolls back).
104 /// The closure must be callable multiple times: clone owned data inside
105 /// the body so retries see fresh values.
106 ///
107 /// Generic over the closure's error type `E` so callers can return
108 /// app-level errors directly (e.g. `MicrosandboxError`) provided
109 /// `E: From<DbErr> + IsSqliteBusy`.
110 pub async fn transaction<F, Fut, T, E>(&self, f: F) -> Result<T, E>
111 where
112 F: Fn(DatabaseTransaction) -> Fut,
113 Fut: Future<Output = Result<(DatabaseTransaction, T), E>> + Send,
114 T: Send,
115 E: From<DbErr> + IsSqliteBusy,
116 {
117 retry::retry_on_busy(|| async {
118 let txn = self.0.begin().await?;
119 let (txn, value) = f(txn).await?;
120 txn.commit().await?;
121 Ok(value)
122 })
123 .await
124 }
125}
126
127#[async_trait::async_trait]
128impl ConnectionTrait for DbReadConnection {
129 fn get_database_backend(&self) -> DbBackend {
130 self.0.get_database_backend()
131 }
132
133 async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
134 self.0.execute(stmt).await
135 }
136
137 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
138 self.0.execute_unprepared(sql).await
139 }
140
141 async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
142 self.0.query_one(stmt).await
143 }
144
145 async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
146 self.0.query_all(stmt).await
147 }
148
149 fn support_returning(&self) -> bool {
150 self.0.support_returning()
151 }
152
153 fn is_mock_connection(&self) -> bool {
154 self.0.is_mock_connection()
155 }
156}
157
158// Auto-retry every auto-commit operation on the writer pool. Sea-orm
159// callers (`Entity::insert(...).exec(db)` etc.) ultimately funnel through
160// these `ConnectionTrait` methods, so wrapping them in `retry_on_busy`
161// gives every single-statement write inter-process retry semantics
162// without per-call-site code.
163//
164// `Statement` is `Clone`, so the closure can produce a fresh future on
165// each retry. Multi-statement atomic work still uses `transaction()`
166// above (which retries the whole closure body); statements *inside* a
167// transaction call `ConnectionTrait` methods on `DatabaseTransaction`,
168// not on this type, so no double-retry occurs.
169#[async_trait::async_trait]
170impl ConnectionTrait for DbWriteConnection {
171 fn get_database_backend(&self) -> DbBackend {
172 self.0.get_database_backend()
173 }
174
175 async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
176 retry::retry_on_busy(|| async { self.0.execute(stmt.clone()).await }).await
177 }
178
179 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
180 retry::retry_on_busy(|| async { self.0.execute_unprepared(sql).await }).await
181 }
182
183 async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
184 retry::retry_on_busy(|| async { self.0.query_one(stmt.clone()).await }).await
185 }
186
187 async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
188 retry::retry_on_busy(|| async { self.0.query_all(stmt.clone()).await }).await
189 }
190
191 fn support_returning(&self) -> bool {
192 self.0.support_returning()
193 }
194
195 fn is_mock_connection(&self) -> bool {
196 self.0.is_mock_connection()
197 }
198}