Skip to main content

concurrent_writers/
concurrent_writers.rs

1//! End-to-end `BEGIN CONCURRENT` demo with two sibling handles.
2//!
3//! Run with: `cargo run --example concurrent_writers`
4//!
5//! Phase 11 (SQLR-22) opt-in MVCC. The example:
6//!
7//! 1. Opens a connection, opts the database into `journal_mode = mvcc`.
8//! 2. Mints a sibling handle via `Connection::connect` so two writers
9//!    share the same backing database.
10//! 3. Runs two concurrent transactions:
11//!       - A and B touch *disjoint* rows → both commit.
12//!       - A and B touch the *same* row → the second commit fails
13//!         with `SQLRiteError::Busy`; the retry takes a fresh
14//!         `begin_ts`, observes the post-commit state, and lands.
15//!
16//! The retry loop is the canonical shape every SDK reuses; see
17//! [`docs/concurrent-writes.md`](../../docs/concurrent-writes.md).
18
19use sqlrite::{Connection, Result};
20
21fn main() -> Result<()> {
22    let mut a = Connection::open_in_memory()?;
23    a.execute("PRAGMA journal_mode = mvcc")?;
24    a.execute(
25        "CREATE TABLE accounts (
26             id      INTEGER PRIMARY KEY,
27             holder  TEXT NOT NULL,
28             balance INTEGER NOT NULL
29         )",
30    )?;
31    a.execute("INSERT INTO accounts (id, holder, balance) VALUES (1, 'alice', 100)")?;
32    a.execute("INSERT INTO accounts (id, holder, balance) VALUES (2, 'bob',   100)")?;
33
34    // Sibling handle on the same Arc<Mutex<Database>>. In real apps
35    // you'd hand this to a worker thread; we keep it on the main
36    // thread to keep the demo readable.
37    let mut b = a.connect();
38
39    println!("=== Disjoint-row commits both succeed ===");
40    a.execute("BEGIN CONCURRENT")?;
41    b.execute("BEGIN CONCURRENT")?;
42    a.execute("UPDATE accounts SET balance = balance + 10 WHERE id = 1")?;
43    b.execute("UPDATE accounts SET balance = balance + 20 WHERE id = 2")?;
44    a.execute("COMMIT")?;
45    b.execute("COMMIT")?; // write-sets don't intersect — no conflict.
46    print_balances(&mut a)?;
47
48    println!("\n=== Same-row commits: A wins, B retries ===");
49    // Interleave BEGINs so A.begin_ts < B.begin_ts and both see the
50    // same pre-update value.
51    a.execute("BEGIN CONCURRENT")?;
52    b.execute("BEGIN CONCURRENT")?;
53    a.execute("UPDATE accounts SET balance = balance + 5 WHERE id = 1")?;
54    b.execute("UPDATE accounts SET balance = balance + 50 WHERE id = 1")?;
55    a.execute("COMMIT")?;
56    // B's commit sees a version newer than its own `begin_ts` → Busy.
57    // The transaction is already dropped on the failed COMMIT;
58    // there's no ROLLBACK to run. Start a fresh BEGIN CONCURRENT.
59    match b.execute("COMMIT") {
60        Err(e) if e.is_retryable() => {
61            eprintln!("  B lost the race: {e}");
62            b.execute("BEGIN CONCURRENT")?;
63            b.execute("UPDATE accounts SET balance = balance + 50 WHERE id = 1")?;
64            b.execute("COMMIT")?;
65        }
66        other => {
67            other?;
68        }
69    }
70    print_balances(&mut a)?;
71
72    Ok(())
73}
74
75fn print_balances(conn: &mut Connection) -> Result<()> {
76    let stmt = conn.prepare("SELECT id, holder, balance FROM accounts ORDER BY id")?;
77    let mut rows = stmt.query()?;
78    while let Some(row) = rows.next()? {
79        let id: i64 = row.get_by_name("id")?;
80        let holder: String = row.get_by_name("holder")?;
81        let balance: i64 = row.get_by_name("balance")?;
82        println!("  account {id} ({holder}): {balance}");
83    }
84    Ok(())
85}