Skip to main content

sqlrite/mvcc/
transaction.rs

1//! [`ConcurrentTx`] — per-`Connection` `BEGIN CONCURRENT`
2//! transaction state (Phase 11.4).
3//!
4//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
5//!
6//! > `BEGIN CONCURRENT` doesn't acquire any locks; writes go to the
7//! > version chain tagged with the transaction id; reads use
8//! > snapshot-isolation visibility.
9//!
10//! ## How this slice does it
11//!
12//! Each `Connection` owns at most one [`ConcurrentTx`] at a time.
13//! When the user issues `BEGIN CONCURRENT`, the connection deep-
14//! clones the database's `tables` map into `ConcurrentTx::tables`
15//! and stores a [`TxHandle`] (which advances the
16//! [`MvccClock`] to allocate a `begin_ts`). Subsequent `INSERT` /
17//! `UPDATE` / `DELETE` statements run against the cloned `tables`
18//! (the executor thinks it's writing to the live database —
19//! `Connection` swaps the cloned tables in just for the duration
20//! of each statement). The live `Database::tables` stays
21//! unchanged until commit.
22//!
23//! At `COMMIT`:
24//!
25//! 1. Diff `tx.tables_at_begin` (the immutable BEGIN-time clone)
26//!    vs `tx.tables` (post-write) to derive a write-set: every
27//!    `(RowID, payload)` the transaction changed.
28//! 2. For each row in the write-set, walk the
29//!    [`super::MvStore`] chain. If any committed version's
30//!    `begin > tx.begin_ts`, ABORT with
31//!    [`crate::error::SQLRiteError::Busy`] — some other
32//!    transaction touched the row after our snapshot.
33//! 3. On success, allocate a `commit_ts`, push each write into
34//!    the `MvStore` as a committed version (caps the previous
35//!    latest version's `end` at `commit_ts`), apply the writes to
36//!    `db.tables`, and run the legacy `save_database` so changes
37//!    persist via the existing WAL.
38//!
39//! `ROLLBACK` just drops the `ConcurrentTx` — the cloned tables
40//! are released, the `TxHandle` drops (unregistering the
41//! transaction from `ActiveTxRegistry`), and `db.tables` is
42//! unchanged because we never touched it.
43//!
44//! ## What this slice doesn't do (yet)
45//!
46//! - **Snapshot-isolated reads inside the transaction.** Reads
47//!   inside `BEGIN CONCURRENT` see the cloned-at-BEGIN state of
48//!   the tables (because the executor is dispatched against
49//!   `tx.tables`), but they don't consult `MvStore` to filter by
50//!   `begin_ts`. Concurrent writes from outside the tx land on
51//!   `db.tables`, not on our snapshot — so we don't see them
52//!   inside the tx. That's *partial* snapshot isolation: it
53//!   isolates correctly under the current "lock the database
54//!   per statement" mutex, but doesn't survive once the engine
55//!   genuinely supports overlapping in-flight transactions
56//!   reading concurrently.
57//! - **DDL inside `BEGIN CONCURRENT`.** v0 rejects with a typed
58//!   error before the swap, mirroring the plan's stated
59//!   non-goal.
60//! - **`AUTOINCREMENT`.** Same — rejected with a typed error.
61//! - **Persistence of the in-flight write-set across crashes.**
62//!   The write-set lives entirely in memory until commit. A
63//!   crash mid-transaction loses everything — that's correct
64//!   (the transaction never committed), and the legacy WAL
65//!   still owns durability of `Database::tables` for committed
66//!   data. Phase 11.5 adds the MVCC log-record frame format
67//!   that lets writes start landing in the WAL pre-commit.
68
69use std::collections::HashMap;
70
71use crate::sql::db::table::Table;
72
73use super::{ActiveTxRegistry, MvccClock, TxHandle};
74
75/// Per-`Connection` snapshot of `BEGIN CONCURRENT` state.
76///
77/// Lives on [`Connection`](crate::Connection), not on
78/// [`Database`](crate::Database) — multiple sibling connections
79/// each carry their own concurrent transaction without stepping
80/// on each other's snapshots.
81#[derive(Debug)]
82pub struct ConcurrentTx {
83    /// RAII handle into the `ActiveTxRegistry`. Drops when this
84    /// struct drops (commit, rollback, or `Connection` close),
85    /// at which point the transaction is unregistered.
86    pub handle: TxHandle,
87
88    /// Working snapshot of `Database::tables` taken at `BEGIN
89    /// CONCURRENT` via `Table::deep_clone`. Each statement's
90    /// executor pass transparently swaps this in for `db.tables`
91    /// so writes land here, not on the live database.
92    pub tables: HashMap<String, Table>,
93
94    /// Immutable second clone of `Database::tables` taken at
95    /// `BEGIN`. Diffing `tables` against **this** at commit
96    /// produces the write-set. We can't diff against the live
97    /// `Database::tables` directly because between our `BEGIN`
98    /// and our `COMMIT`, *other* concurrent transactions may
99    /// have committed — their writes show up as differences
100    /// against the live state but aren't ours, and treating
101    /// them as our DELETEs would silently undo someone else's
102    /// commit. The doubled memory cost (two full clones per
103    /// transaction) is the price for that correctness in v0;
104    /// the obvious follow-up is a per-touched-row begin-state
105    /// map that captures only the rows we actually read or
106    /// wrote.
107    pub tables_at_begin: HashMap<String, Table>,
108
109    /// Sorted table-name fingerprint of `Database::tables` at
110    /// `BEGIN`. Used at commit to detect that DDL ran on the live
111    /// database under us — v0 rejects DDL inside the tx, but
112    /// nothing prevents another connection from running it
113    /// outside.
114    pub schema_at_begin: Vec<String>,
115}
116
117impl ConcurrentTx {
118    /// Allocates a new transaction. Advances the clock by one
119    /// (the `TxHandle::begin_ts`), records the table-name
120    /// fingerprint, and deep-clones every table.
121    ///
122    /// Caller is expected to have already verified
123    /// `journal_mode == Mvcc` and that no transaction is open.
124    pub fn begin(
125        clock: &MvccClock,
126        registry: &ActiveTxRegistry,
127        live_tables: &HashMap<String, Table>,
128    ) -> Self {
129        let handle = registry.register(clock);
130        let tables: HashMap<String, Table> = live_tables
131            .iter()
132            .map(|(k, v)| (k.clone(), v.deep_clone()))
133            .collect();
134        let tables_at_begin: HashMap<String, Table> = live_tables
135            .iter()
136            .map(|(k, v)| (k.clone(), v.deep_clone()))
137            .collect();
138        let mut schema_at_begin: Vec<String> = live_tables.keys().cloned().collect();
139        schema_at_begin.sort();
140        Self {
141            handle,
142            tables,
143            tables_at_begin,
144            schema_at_begin,
145        }
146    }
147
148    /// Convenience — the `begin_ts` snapshot timestamp this
149    /// transaction took at BEGIN. Used at commit to validate
150    /// against `MvStore` versions that committed after this
151    /// snapshot.
152    pub fn begin_ts(&self) -> u64 {
153        self.handle.begin_ts()
154    }
155
156    /// True if `live_tables` has the same table-name set this
157    /// transaction recorded at BEGIN. Used at commit to surface a
158    /// typed error rather than silently committing onto a
159    /// schema that drifted under us.
160    pub fn schema_unchanged(&self, live_tables: &HashMap<String, Table>) -> bool {
161        let mut current: Vec<&String> = live_tables.keys().collect();
162        current.sort();
163        if current.len() != self.schema_at_begin.len() {
164            return false;
165        }
166        current
167            .iter()
168            .zip(self.schema_at_begin.iter())
169            .all(|(a, b)| **a == *b)
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::sql::db::table::Table;
177    use crate::sql::parser::create::CreateQuery;
178    use std::collections::HashMap;
179
180    fn empty_table(name: &str) -> Table {
181        let _ = name;
182        // Build a minimal create-table to materialise a Table —
183        // mirror the existing test helpers that construct via the
184        // CREATE pipeline rather than poking the struct directly.
185        use crate::sql::dialect::SqlriteDialect;
186        use sqlparser::parser::Parser;
187        let sql = format!(
188            "CREATE TABLE {name} (id INTEGER PRIMARY KEY, v TEXT);",
189            name = name,
190        );
191        let dialect = SqlriteDialect::new();
192        let mut ast = Parser::parse_sql(&dialect, &sql).unwrap();
193        let stmt = ast.pop().unwrap();
194        let q = CreateQuery::new(&stmt).unwrap();
195        Table::new(q)
196    }
197
198    fn live_with_one_table(name: &str) -> HashMap<String, Table> {
199        let mut m = HashMap::new();
200        m.insert(name.to_string(), empty_table(name));
201        m
202    }
203
204    #[test]
205    fn begin_clones_tables_and_advances_clock() {
206        let clock = MvccClock::new(0);
207        let registry = ActiveTxRegistry::new();
208        let live = live_with_one_table("t");
209
210        let tx = ConcurrentTx::begin(&clock, &registry, &live);
211        // Clock advanced by one (begin_ts).
212        assert_eq!(clock.now(), 1);
213        assert_eq!(tx.begin_ts(), 1);
214        // Every table cloned.
215        assert!(tx.tables.contains_key("t"));
216        // Schema fingerprint matches.
217        assert_eq!(tx.schema_at_begin, vec!["t".to_string()]);
218        // Registered with the registry.
219        assert_eq!(registry.active_count(), 1);
220    }
221
222    #[test]
223    fn dropping_tx_unregisters() {
224        let clock = MvccClock::new(0);
225        let registry = ActiveTxRegistry::new();
226        let live = live_with_one_table("t");
227        let tx = ConcurrentTx::begin(&clock, &registry, &live);
228        assert_eq!(registry.active_count(), 1);
229        drop(tx);
230        assert_eq!(registry.active_count(), 0);
231    }
232
233    /// Clones really are deep — mutating the live map after
234    /// `begin` doesn't show up in `tx.tables`. The contract every
235    /// COMMIT-time diff relies on.
236    #[test]
237    fn clone_is_independent_of_live_tables() {
238        let clock = MvccClock::new(0);
239        let registry = ActiveTxRegistry::new();
240        let mut live = live_with_one_table("t");
241
242        let tx = ConcurrentTx::begin(&clock, &registry, &live);
243        // Add a new table to live — tx's snapshot must be unchanged.
244        live.insert("u".to_string(), empty_table("u"));
245        assert_eq!(tx.tables.len(), 1);
246        assert!(tx.tables.contains_key("t"));
247        assert!(!tx.tables.contains_key("u"));
248        // schema_unchanged catches the drift.
249        assert!(!tx.schema_unchanged(&live));
250    }
251
252    #[test]
253    fn schema_unchanged_recognises_identical_set() {
254        let clock = MvccClock::new(0);
255        let registry = ActiveTxRegistry::new();
256        let live = live_with_one_table("t");
257
258        let tx = ConcurrentTx::begin(&clock, &registry, &live);
259        // No drift — same single table.
260        assert!(tx.schema_unchanged(&live));
261    }
262}