sqlrite/mvcc/transaction.rs
1//! [`ConcurrentTx`] — per-`Connection` `BEGIN CONCURRENT`
2//! transaction state (Phase 11.4).
3//!
4//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
5//!
6//! > `BEGIN CONCURRENT` doesn't acquire any locks; writes go to the
7//! > version chain tagged with the transaction id; reads use
8//! > snapshot-isolation visibility.
9//!
10//! ## How this slice does it
11//!
12//! Each `Connection` owns at most one [`ConcurrentTx`] at a time.
13//! When the user issues `BEGIN CONCURRENT`, the connection deep-
14//! clones the database's `tables` map into `ConcurrentTx::tables`
15//! and stores a [`TxHandle`] (which advances the
16//! [`MvccClock`] to allocate a `begin_ts`). Subsequent `INSERT` /
17//! `UPDATE` / `DELETE` statements run against the cloned `tables`
18//! (the executor thinks it's writing to the live database —
19//! `Connection` swaps the cloned tables in just for the duration
20//! of each statement). The live `Database::tables` stays
21//! unchanged until commit.
22//!
23//! At `COMMIT`:
24//!
25//! 1. Diff `tx.tables_at_begin` (the immutable BEGIN-time clone)
26//! vs `tx.tables` (post-write) to derive a write-set: every
27//! `(RowID, payload)` the transaction changed.
28//! 2. For each row in the write-set, walk the
29//! [`super::MvStore`] chain. If any committed version's
30//! `begin > tx.begin_ts`, ABORT with
31//! [`crate::error::SQLRiteError::Busy`] — some other
32//! transaction touched the row after our snapshot.
33//! 3. On success, allocate a `commit_ts`, push each write into
34//! the `MvStore` as a committed version (caps the previous
35//! latest version's `end` at `commit_ts`), apply the writes to
36//! `db.tables`, and run the legacy `save_database` so changes
37//! persist via the existing WAL.
38//!
39//! `ROLLBACK` just drops the `ConcurrentTx` — the cloned tables
40//! are released, the `TxHandle` drops (unregistering the
41//! transaction from `ActiveTxRegistry`), and `db.tables` is
42//! unchanged because we never touched it.
43//!
44//! ## What this slice doesn't do (yet)
45//!
46//! - **Snapshot-isolated reads inside the transaction.** Reads
47//! inside `BEGIN CONCURRENT` see the cloned-at-BEGIN state of
48//! the tables (because the executor is dispatched against
49//! `tx.tables`), but they don't consult `MvStore` to filter by
50//! `begin_ts`. Concurrent writes from outside the tx land on
51//! `db.tables`, not on our snapshot — so we don't see them
52//! inside the tx. That's *partial* snapshot isolation: it
53//! isolates correctly under the current "lock the database
54//! per statement" mutex, but doesn't survive once the engine
55//! genuinely supports overlapping in-flight transactions
56//! reading concurrently.
57//! - **DDL inside `BEGIN CONCURRENT`.** v0 rejects with a typed
58//! error before the swap, mirroring the plan's stated
59//! non-goal.
60//! - **`AUTOINCREMENT`.** Same — rejected with a typed error.
61//! - **Persistence of the in-flight write-set across crashes.**
62//! The write-set lives entirely in memory until commit. A
63//! crash mid-transaction loses everything — that's correct
64//! (the transaction never committed), and the legacy WAL
65//! still owns durability of `Database::tables` for committed
66//! data. Phase 11.5 adds the MVCC log-record frame format
67//! that lets writes start landing in the WAL pre-commit.
68
69use std::collections::HashMap;
70
71use crate::sql::db::table::Table;
72
73use super::{ActiveTxRegistry, MvccClock, TxHandle};
74
75/// Per-`Connection` snapshot of `BEGIN CONCURRENT` state.
76///
77/// Lives on [`Connection`](crate::Connection), not on
78/// [`Database`](crate::Database) — multiple sibling connections
79/// each carry their own concurrent transaction without stepping
80/// on each other's snapshots.
81#[derive(Debug)]
82pub struct ConcurrentTx {
83 /// RAII handle into the `ActiveTxRegistry`. Drops when this
84 /// struct drops (commit, rollback, or `Connection` close),
85 /// at which point the transaction is unregistered.
86 pub handle: TxHandle,
87
88 /// Working snapshot of `Database::tables` taken at `BEGIN
89 /// CONCURRENT` via `Table::deep_clone`. Each statement's
90 /// executor pass transparently swaps this in for `db.tables`
91 /// so writes land here, not on the live database.
92 pub tables: HashMap<String, Table>,
93
94 /// Immutable second clone of `Database::tables` taken at
95 /// `BEGIN`. Diffing `tables` against **this** at commit
96 /// produces the write-set. We can't diff against the live
97 /// `Database::tables` directly because between our `BEGIN`
98 /// and our `COMMIT`, *other* concurrent transactions may
99 /// have committed — their writes show up as differences
100 /// against the live state but aren't ours, and treating
101 /// them as our DELETEs would silently undo someone else's
102 /// commit. The doubled memory cost (two full clones per
103 /// transaction) is the price for that correctness in v0;
104 /// the obvious follow-up is a per-touched-row begin-state
105 /// map that captures only the rows we actually read or
106 /// wrote.
107 pub tables_at_begin: HashMap<String, Table>,
108
109 /// Sorted table-name fingerprint of `Database::tables` at
110 /// `BEGIN`. Used at commit to detect that DDL ran on the live
111 /// database under us — v0 rejects DDL inside the tx, but
112 /// nothing prevents another connection from running it
113 /// outside.
114 pub schema_at_begin: Vec<String>,
115}
116
117impl ConcurrentTx {
118 /// Allocates a new transaction. Advances the clock by one
119 /// (the `TxHandle::begin_ts`), records the table-name
120 /// fingerprint, and deep-clones every table.
121 ///
122 /// Caller is expected to have already verified
123 /// `journal_mode == Mvcc` and that no transaction is open.
124 pub fn begin(
125 clock: &MvccClock,
126 registry: &ActiveTxRegistry,
127 live_tables: &HashMap<String, Table>,
128 ) -> Self {
129 let handle = registry.register(clock);
130 let tables: HashMap<String, Table> = live_tables
131 .iter()
132 .map(|(k, v)| (k.clone(), v.deep_clone()))
133 .collect();
134 let tables_at_begin: HashMap<String, Table> = live_tables
135 .iter()
136 .map(|(k, v)| (k.clone(), v.deep_clone()))
137 .collect();
138 let mut schema_at_begin: Vec<String> = live_tables.keys().cloned().collect();
139 schema_at_begin.sort();
140 Self {
141 handle,
142 tables,
143 tables_at_begin,
144 schema_at_begin,
145 }
146 }
147
148 /// Convenience — the `begin_ts` snapshot timestamp this
149 /// transaction took at BEGIN. Used at commit to validate
150 /// against `MvStore` versions that committed after this
151 /// snapshot.
152 pub fn begin_ts(&self) -> u64 {
153 self.handle.begin_ts()
154 }
155
156 /// True if `live_tables` has the same table-name set this
157 /// transaction recorded at BEGIN. Used at commit to surface a
158 /// typed error rather than silently committing onto a
159 /// schema that drifted under us.
160 pub fn schema_unchanged(&self, live_tables: &HashMap<String, Table>) -> bool {
161 let mut current: Vec<&String> = live_tables.keys().collect();
162 current.sort();
163 if current.len() != self.schema_at_begin.len() {
164 return false;
165 }
166 current
167 .iter()
168 .zip(self.schema_at_begin.iter())
169 .all(|(a, b)| **a == *b)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::sql::db::table::Table;
177 use crate::sql::parser::create::CreateQuery;
178 use std::collections::HashMap;
179
180 fn empty_table(name: &str) -> Table {
181 let _ = name;
182 // Build a minimal create-table to materialise a Table —
183 // mirror the existing test helpers that construct via the
184 // CREATE pipeline rather than poking the struct directly.
185 use crate::sql::dialect::SqlriteDialect;
186 use sqlparser::parser::Parser;
187 let sql = format!(
188 "CREATE TABLE {name} (id INTEGER PRIMARY KEY, v TEXT);",
189 name = name,
190 );
191 let dialect = SqlriteDialect::new();
192 let mut ast = Parser::parse_sql(&dialect, &sql).unwrap();
193 let stmt = ast.pop().unwrap();
194 let q = CreateQuery::new(&stmt).unwrap();
195 Table::new(q)
196 }
197
198 fn live_with_one_table(name: &str) -> HashMap<String, Table> {
199 let mut m = HashMap::new();
200 m.insert(name.to_string(), empty_table(name));
201 m
202 }
203
204 #[test]
205 fn begin_clones_tables_and_advances_clock() {
206 let clock = MvccClock::new(0);
207 let registry = ActiveTxRegistry::new();
208 let live = live_with_one_table("t");
209
210 let tx = ConcurrentTx::begin(&clock, ®istry, &live);
211 // Clock advanced by one (begin_ts).
212 assert_eq!(clock.now(), 1);
213 assert_eq!(tx.begin_ts(), 1);
214 // Every table cloned.
215 assert!(tx.tables.contains_key("t"));
216 // Schema fingerprint matches.
217 assert_eq!(tx.schema_at_begin, vec!["t".to_string()]);
218 // Registered with the registry.
219 assert_eq!(registry.active_count(), 1);
220 }
221
222 #[test]
223 fn dropping_tx_unregisters() {
224 let clock = MvccClock::new(0);
225 let registry = ActiveTxRegistry::new();
226 let live = live_with_one_table("t");
227 let tx = ConcurrentTx::begin(&clock, ®istry, &live);
228 assert_eq!(registry.active_count(), 1);
229 drop(tx);
230 assert_eq!(registry.active_count(), 0);
231 }
232
233 /// Clones really are deep — mutating the live map after
234 /// `begin` doesn't show up in `tx.tables`. The contract every
235 /// COMMIT-time diff relies on.
236 #[test]
237 fn clone_is_independent_of_live_tables() {
238 let clock = MvccClock::new(0);
239 let registry = ActiveTxRegistry::new();
240 let mut live = live_with_one_table("t");
241
242 let tx = ConcurrentTx::begin(&clock, ®istry, &live);
243 // Add a new table to live — tx's snapshot must be unchanged.
244 live.insert("u".to_string(), empty_table("u"));
245 assert_eq!(tx.tables.len(), 1);
246 assert!(tx.tables.contains_key("t"));
247 assert!(!tx.tables.contains_key("u"));
248 // schema_unchanged catches the drift.
249 assert!(!tx.schema_unchanged(&live));
250 }
251
252 #[test]
253 fn schema_unchanged_recognises_identical_set() {
254 let clock = MvccClock::new(0);
255 let registry = ActiveTxRegistry::new();
256 let live = live_with_one_table("t");
257
258 let tx = ConcurrentTx::begin(&clock, ®istry, &live);
259 // No drift — same single table.
260 assert!(tx.schema_unchanged(&live));
261 }
262}