1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//! SQLite implementation of [`CatalogBackend`] backed by `sqlx::SqlitePool`.
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use sqlx::sqlite::{
SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions, SqliteSynchronous,
};
use sqlx::ConnectOptions;
use super::backend::{classify, BackendError, BackendKind, CatalogBackend, Transaction, TxOptions};
/// SQLite-backed catalog. Wraps a connection pool with WAL mode + 5 s busy
/// timeout, matching the original `r2d2_sqlite`-based behaviour.
pub struct SqliteBackend {
pool: SqlitePool,
}
impl SqliteBackend {
/// Open (or create) the catalog database at `path`.
pub async fn open(path: &Path) -> Result<Arc<Self>, BackendError> {
let opts = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(Duration::from_secs(5))
.synchronous(SqliteSynchronous::Normal)
.foreign_keys(true)
.log_statements(tracing::log::LevelFilter::Trace);
let pool = SqlitePoolOptions::new()
.max_connections(8)
.connect_with(opts)
.await
.map_err(classify)?;
Ok(Arc::new(Self { pool }))
}
}
impl CatalogBackend for SqliteBackend {
/// Open a transaction and run `f` within it.
///
/// **Invariant: this future must be driven by a live tokio runtime — never
/// blocked on from a runtime worker thread.** The uncancellable `BEGIN`
/// (see the body) runs on a detached `tokio::spawn(...).await`, so a
/// runtime must be free to poll that spawned task while this future awaits
/// its join handle. Awaiting this future normally (on either a
/// multi-thread or a single-thread runtime) is fine: the executor
/// interleaves the spawned begin with this await. What deadlocks is
/// `Handle::current().block_on(transaction(..))` *from inside* a runtime
/// worker — that pins the worker on the join handle and the spawned begin
/// never gets polled. The Postgres backend does not spawn and carries no
/// such constraint.
fn transaction<'a, F, R>(
&'a self,
opts: TxOptions,
f: F,
) -> Pin<Box<dyn Future<Output = Result<R, BackendError>> + Send + 'a>>
where
F: for<'tx> FnOnce(
&'tx mut Transaction<'tx>,
)
-> Pin<Box<dyn Future<Output = Result<R, BackendError>> + Send + 'tx>>
+ Send
+ 'a,
R: Send + 'a,
{
Box::pin(async move {
// SQLite has no SET TRANSACTION ISOLATION LEVEL; isolation is fixed
// by the journal mode (WAL gives snapshot reads). The write/read
// distinction is carried entirely by the BEGIN mode, which `sqlx`'s
// default `Pool::begin` (always DEFERRED) cannot express — so we
// open the transaction through `Pool::begin_with`, which runs our
// custom BEGIN yet still yields a sqlx `Transaction` that rolls back
// on drop/cancel.
//
// A write transaction MUST take the database write lock at BEGIN
// time (`BEGIN IMMEDIATE`): under WAL, two DEFERRED transactions
// that each read then upgrade to a write deadlock with
// SQLITE_BUSY_SNAPSHOT, which `busy_timeout` cannot break (waiting
// never resolves a snapshot-upgrade conflict). IMMEDIATE makes
// concurrent writers serialise on `busy_timeout` instead. A
// read-only transaction stays DEFERRED so reads take a snapshot
// without serialising against each other or against writers.
let begin = if opts.read_only {
"BEGIN DEFERRED"
} else {
"BEGIN IMMEDIATE"
};
let _ = (opts.isolation, opts.read_only);
// The BEGIN itself must be uncancellable. `Pool::begin_with` issues
// the `BEGIN` statement and only then constructs the sqlx
// `Transaction` whose drop guard rolls back. If the caller's future
// is dropped *while that begin is in flight* — after the worker has
// run `BEGIN IMMEDIATE` and bumped its per-connection transaction
// depth, but before the `Transaction` exists — there is no guard to
// roll it back: the pooled connection returns to the pool still
// inside a transaction, holding the WAL write lock. Its next checkout
// then fails (`InvalidSavePointStatement`, because a custom `BEGIN`
// is illegal at depth > 0), and every other writer starves on the
// leaked write lock (`database is locked`). Running the begin on a
// detached task closes the window: a cancelled caller drops only the
// `JoinHandle`, the task still drives the begin to a fully-formed
// `Transaction`, and that `Transaction` then drops through its own
// guard — rolling back and returning the connection clean.
let pool = self.pool.clone();
let mut tx = tokio::spawn(async move { pool.begin_with(begin).await })
.await
.map_err(|join| {
BackendError::Unavailable(format!("transaction begin task failed: {join}"))
})?
.map_err(classify)?;
// Scope wrapper so its borrow of `tx` ends before we move `tx`
// into commit/rollback. The HRTB on `f` borrows wrapper for its
// entire lifetime, so wrapper must drop before tx moves.
let outcome = {
let mut wrapper = Transaction::new_sqlite(&mut tx);
f(&mut wrapper).await
};
match outcome {
Ok(value) => {
tx.commit().await.map_err(classify)?;
Ok(value)
}
Err(err) => {
let _ = tx.rollback().await;
Err(err)
}
}
})
}
fn migrate(&self) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + '_>> {
Box::pin(async move { super::migrations::run(self).await })
}
fn ping(&self) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + '_>> {
Box::pin(async move {
sqlx::query("SELECT 1")
.execute(&self.pool)
.await
.map_err(classify)?;
Ok(())
})
}
fn backend_kind(&self) -> BackendKind {
BackendKind::Sqlite
}
}