use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use stoolap::{Database, IsolationLevel};
fn setup_cold_db(name: &str, rows: &[(i64, &str)]) -> (tempfile::TempDir, Database) {
let dir = tempfile::tempdir().expect("failed to create temp dir");
let dsn = format!("file://{}/{}", dir.path().display(), name);
let db = Database::open(&dsn).expect("failed to open database");
db.execute(
"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT NOT NULL)",
(),
)
.expect("failed to create table");
for &(id, val) in rows {
db.execute(&format!("INSERT INTO t VALUES ({}, '{}')", id, val), ())
.expect("failed to insert row");
}
db.execute("PRAGMA CHECKPOINT", ())
.expect("PRAGMA CHECKPOINT failed");
(dir, db)
}
fn q_i64(db: &Database, sql: &str) -> i64 {
db.query(sql, ())
.expect("query failed")
.next()
.and_then(|r| r.ok())
.and_then(|r| r.get::<i64>(0).ok())
.unwrap_or(-1)
}
fn tx_q_str(tx: &mut stoolap::api::transaction::Transaction, sql: &str) -> Option<String> {
let mut rows = tx.query(sql, ()).expect("tx query failed");
rows.next()
.and_then(|r| r.ok())
.and_then(|r| r.get::<String>(0).ok())
}
fn tx_q_i64(tx: &mut stoolap::api::transaction::Transaction, sql: &str) -> i64 {
tx.query_one::<i64, _>(sql, ()).unwrap_or(-1)
}
#[test]
fn test_cold_update_vs_update_conflict() {
let (_dir, db) = setup_cold_db("upd_upd", &[(1, "orig")]);
let success_count = Arc::new(AtomicUsize::new(0));
let failure_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let handles: Vec<_> = (0..2)
.map(|i| {
let db_clone = db.clone();
let sc = Arc::clone(&success_count);
let fc = Arc::clone(&failure_count);
let bar = Arc::clone(&barrier);
thread::spawn(move || {
let new_val = format!("thread{}", i);
db_clone.execute("BEGIN", ()).expect("BEGIN failed");
bar.wait();
let result = db_clone.execute(
&format!("UPDATE t SET val = '{}' WHERE id = 1", new_val),
(),
);
match result {
Ok(_) => {
match db_clone.execute("COMMIT", ()) {
Ok(_) => {
sc.fetch_add(1, Ordering::SeqCst);
}
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
}
}
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
let successes = success_count.load(Ordering::SeqCst);
let failures = failure_count.load(Ordering::SeqCst);
assert_eq!(
successes + failures,
2,
"expected exactly 2 outcomes, got {} successes + {} failures",
successes,
failures
);
assert!(
successes >= 1,
"at least one thread must succeed, but got {} successes",
successes
);
let count = q_i64(&db, "SELECT COUNT(*) FROM t WHERE id = 1");
assert_eq!(count, 1, "row id=1 must still exist exactly once");
db.close().ok();
}
#[test]
fn test_cold_delete_vs_delete_conflict() {
let (_dir, db) = setup_cold_db("del_del", &[(1, "victim"), (2, "survivor")]);
let success_count = Arc::new(AtomicUsize::new(0));
let failure_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let handles: Vec<_> = (0..2)
.map(|_| {
let db_clone = db.clone();
let sc = Arc::clone(&success_count);
let fc = Arc::clone(&failure_count);
let bar = Arc::clone(&barrier);
thread::spawn(move || {
db_clone.execute("BEGIN", ()).expect("BEGIN failed");
bar.wait();
let result = db_clone.execute("DELETE FROM t WHERE id = 1", ());
match result {
Ok(_) => match db_clone.execute("COMMIT", ()) {
Ok(_) => {
sc.fetch_add(1, Ordering::SeqCst);
}
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
},
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
let successes = success_count.load(Ordering::SeqCst);
let failures = failure_count.load(Ordering::SeqCst);
assert_eq!(
successes + failures,
2,
"expected 2 outcomes, got {} + {}",
successes,
failures
);
assert!(
successes >= 1,
"at least one delete must succeed, got {} successes",
successes
);
let c1 = q_i64(&db, "SELECT COUNT(*) FROM t WHERE id = 1");
assert_eq!(c1, 0, "row id=1 should have been deleted");
let c2 = q_i64(&db, "SELECT COUNT(*) FROM t WHERE id = 2");
assert_eq!(c2, 1, "row id=2 should survive");
db.close().ok();
}
#[test]
fn test_cold_update_vs_delete_conflict() {
let (_dir, db) = setup_cold_db("upd_del", &[(1, "orig")]);
let success_count = Arc::new(AtomicUsize::new(0));
let failure_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let handles: Vec<_> = (0..2)
.map(|i| {
let db_clone = db.clone();
let sc = Arc::clone(&success_count);
let fc = Arc::clone(&failure_count);
let bar = Arc::clone(&barrier);
thread::spawn(move || {
db_clone.execute("BEGIN", ()).expect("BEGIN failed");
bar.wait();
let result = if i == 0 {
db_clone.execute("UPDATE t SET val = 'updated' WHERE id = 1", ())
} else {
db_clone.execute("DELETE FROM t WHERE id = 1", ())
};
match result {
Ok(_) => match db_clone.execute("COMMIT", ()) {
Ok(_) => {
sc.fetch_add(1, Ordering::SeqCst);
}
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
},
Err(_) => {
let _ = db_clone.execute("ROLLBACK", ());
fc.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
let successes = success_count.load(Ordering::SeqCst);
let failures = failure_count.load(Ordering::SeqCst);
assert_eq!(
successes + failures,
2,
"expected 2 outcomes, got {} + {}",
successes,
failures
);
assert!(
successes >= 1,
"at least one operation must succeed, got {} successes",
successes
);
let count = q_i64(&db, "SELECT COUNT(*) FROM t WHERE id = 1");
assert!(
count == 0 || count == 1,
"row count for id=1 must be 0 or 1, got {}",
count
);
db.close().ok();
}
#[test]
fn test_snapshot_sees_cold_row_before_delete() {
let (_dir, db) = setup_cold_db("snap_del", &[(1, "alive"), (2, "also_alive")]);
let mut snap_tx = db
.begin_with_isolation(IsolationLevel::SnapshotIsolation)
.expect("failed to begin snapshot txn");
db.execute("DELETE FROM t WHERE id = 1", ())
.expect("auto-commit DELETE failed");
let count = tx_q_i64(&mut snap_tx, "SELECT COUNT(*) FROM t");
assert_eq!(count, 2, "snapshot should see both rows, but got {}", count);
let val = tx_q_str(&mut snap_tx, "SELECT val FROM t WHERE id = 1");
assert_eq!(
val.as_deref(),
Some("alive"),
"snapshot should see 'alive' for id=1, got {:?}",
val
);
snap_tx.rollback().expect("rollback failed");
let count_after = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(
count_after, 1,
"auto-commit should see 1 row after deletion"
);
db.close().ok();
}
#[test]
fn test_snapshot_sees_old_cold_value_before_update() {
let (_dir, db) = setup_cold_db("snap_upd", &[(1, "old_value")]);
let mut snap_tx = db
.begin_with_isolation(IsolationLevel::SnapshotIsolation)
.expect("failed to begin snapshot txn");
db.execute("UPDATE t SET val = 'new_value' WHERE id = 1", ())
.expect("auto-commit UPDATE failed");
let val = tx_q_str(&mut snap_tx, "SELECT val FROM t WHERE id = 1");
assert_eq!(
val.as_deref(),
Some("old_value"),
"snapshot should see 'old_value', got {:?}",
val
);
snap_tx.rollback().expect("rollback failed");
let current: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query_one failed");
assert_eq!(current, "new_value", "auto-commit should see 'new_value'");
db.close().ok();
}
#[test]
fn test_multiple_snapshots_different_states() {
let (_dir, db) = setup_cold_db("multi_snap", &[(1, "v1"), (2, "v1"), (3, "v1")]);
let mut snap_a = db
.begin_with_isolation(IsolationLevel::SnapshotIsolation)
.expect("snap_a begin failed");
db.execute("DELETE FROM t WHERE id = 3", ())
.expect("delete id=3 failed");
db.execute("UPDATE t SET val = 'v2' WHERE id = 1", ())
.expect("update id=1 failed");
let mut snap_b = db
.begin_with_isolation(IsolationLevel::SnapshotIsolation)
.expect("snap_b begin failed");
db.execute("DELETE FROM t WHERE id = 2", ())
.expect("delete id=2 failed");
let count_a = tx_q_i64(&mut snap_a, "SELECT COUNT(*) FROM t");
assert_eq!(count_a, 3, "snap_a should see 3 rows, got {}", count_a);
let val_a1 = tx_q_str(&mut snap_a, "SELECT val FROM t WHERE id = 1");
assert_eq!(val_a1.as_deref(), Some("v1"), "snap_a id=1 should be 'v1'");
let val_a3 = tx_q_str(&mut snap_a, "SELECT val FROM t WHERE id = 3");
assert_eq!(val_a3.as_deref(), Some("v1"), "snap_a id=3 should be 'v1'");
let count_b = tx_q_i64(&mut snap_b, "SELECT COUNT(*) FROM t");
assert_eq!(count_b, 2, "snap_b should see 2 rows, got {}", count_b);
let val_b1 = tx_q_str(&mut snap_b, "SELECT val FROM t WHERE id = 1");
assert_eq!(val_b1.as_deref(), Some("v2"), "snap_b id=1 should be 'v2'");
let val_b2 = tx_q_str(&mut snap_b, "SELECT val FROM t WHERE id = 2");
assert_eq!(val_b2.as_deref(), Some("v1"), "snap_b id=2 should be 'v1'");
let count_latest = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(
count_latest, 1,
"auto-commit should see 1 row, got {}",
count_latest
);
let val_latest: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query_one failed");
assert_eq!(val_latest, "v2", "auto-commit id=1 should be 'v2'");
snap_a.rollback().expect("snap_a rollback failed");
snap_b.rollback().expect("snap_b rollback failed");
db.close().ok();
}
#[test]
fn test_autocommit_always_sees_latest() {
let (_dir, db) = setup_cold_db("autocommit", &[(1, "orig")]);
db.execute("UPDATE t SET val = 'step1' WHERE id = 1", ())
.expect("update step1 failed");
let v1: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query failed");
assert_eq!(v1, "step1", "auto-commit should see 'step1' after update");
db.execute("UPDATE t SET val = 'step2' WHERE id = 1", ())
.expect("update step2 failed");
let v2: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query failed");
assert_eq!(
v2, "step2",
"auto-commit should see 'step2' after second update"
);
db.execute("DELETE FROM t WHERE id = 1", ())
.expect("delete failed");
let count = q_i64(&db, "SELECT COUNT(*) FROM t WHERE id = 1");
assert_eq!(count, 0, "auto-commit should see 0 rows after delete");
db.close().ok();
}
#[test]
fn test_pending_tombstone_invisible_to_others() {
let (_dir, db) = setup_cold_db("pending_ts", &[(1, "visible"), (2, "also_visible")]);
let db_a = db.clone();
db_a.execute("BEGIN", ()).expect("BEGIN failed");
db_a.execute("DELETE FROM t WHERE id = 1", ())
.expect("DELETE in txn A failed");
let count_a = db_a
.query("SELECT COUNT(*) FROM t", ())
.expect("query in txn A failed")
.next()
.and_then(|r| r.ok())
.and_then(|r| r.get::<i64>(0).ok())
.unwrap_or(-1);
assert_eq!(
count_a, 1,
"txn A should see 1 row (id=2 only), got {}",
count_a
);
let count_b = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(
count_b, 2,
"auto-commit should still see 2 rows while txn A is uncommitted, got {}",
count_b
);
db_a.execute("COMMIT", ()).expect("COMMIT failed");
let count_after = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(
count_after, 1,
"auto-commit should see 1 row after txn A commits, got {}",
count_after
);
db.close().ok();
}
#[test]
fn test_pending_update_invisible_to_others() {
let (_dir, db) = setup_cold_db("pending_upd", &[(1, "original")]);
let db_a = db.clone();
db_a.execute("BEGIN", ()).expect("BEGIN failed");
db_a.execute("UPDATE t SET val = 'modified' WHERE id = 1", ())
.expect("UPDATE in txn A failed");
let val_a: String = db_a
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query in txn A failed");
assert_eq!(
val_a, "modified",
"txn A should see its own update 'modified', got '{}'",
val_a
);
let val_b: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("auto-commit query failed");
assert_eq!(
val_b, "original",
"auto-commit should see 'original' while txn A is uncommitted, got '{}'",
val_b
);
db_a.execute("COMMIT", ()).expect("COMMIT failed");
let val_after: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("auto-commit query after commit failed");
assert_eq!(
val_after, "modified",
"auto-commit should see 'modified' after commit, got '{}'",
val_after
);
db.close().ok();
}
#[test]
fn test_pending_tombstone_rollback_preserves_row() {
let (_dir, db) = setup_cold_db("pending_rb", &[(1, "keep_me")]);
let db_a = db.clone();
db_a.execute("BEGIN", ()).expect("BEGIN failed");
db_a.execute("DELETE FROM t WHERE id = 1", ())
.expect("DELETE failed");
let count_a = db_a
.query("SELECT COUNT(*) FROM t WHERE id = 1", ())
.expect("query failed")
.next()
.and_then(|r| r.ok())
.and_then(|r| r.get::<i64>(0).ok())
.unwrap_or(-1);
assert_eq!(count_a, 0, "txn A should see 0 for deleted row");
db_a.execute("ROLLBACK", ()).expect("ROLLBACK failed");
let val: String = db
.query_one("SELECT val FROM t WHERE id = 1", ())
.expect("query after rollback failed");
assert_eq!(
val, "keep_me",
"row should be visible after rollback, got '{}'",
val
);
db.close().ok();
}
#[test]
fn test_scan_consistency_during_concurrent_mods() {
let rows: Vec<(i64, String)> = (1..=100).map(|i| (i, format!("row_{}", i))).collect();
let row_refs: Vec<(i64, &str)> = rows.iter().map(|(id, val)| (*id, val.as_str())).collect();
let (_dir, db) = setup_cold_db("scan_conc", &row_refs);
let pre_count = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(pre_count, 100, "setup should have 100 rows");
let barrier = Arc::new(Barrier::new(2));
let db_scan = db.clone();
let bar1 = Arc::clone(&barrier);
let scanner = thread::spawn(move || {
bar1.wait();
let result = db_scan
.query("SELECT id, val FROM t ORDER BY id", ())
.expect("scan query failed");
let mut scanned_ids: Vec<i64> = Vec::new();
for row_result in result {
let row = row_result.expect("row iteration failed");
let id: i64 = row.get(0).expect("get id failed");
scanned_ids.push(id);
}
scanned_ids
});
let db_mod = db.clone();
let bar2 = Arc::clone(&barrier);
let modifier = thread::spawn(move || {
bar2.wait();
for &id in &[10, 20, 30] {
let _ = db_mod.execute(&format!("DELETE FROM t WHERE id = {}", id), ());
}
for &id in &[50, 60, 70] {
let _ = db_mod.execute(
&format!("UPDATE t SET val = 'changed_{}' WHERE id = {}", id, id),
(),
);
}
});
let scanned_ids = scanner.join().expect("scanner thread panicked");
modifier.join().expect("modifier thread panicked");
let mut sorted = scanned_ids.clone();
sorted.sort();
sorted.dedup();
assert_eq!(
sorted.len(),
scanned_ids.len(),
"scan returned duplicate IDs: {:?}",
scanned_ids
);
for window in scanned_ids.windows(2) {
assert!(
window[0] < window[1],
"scan result not sorted: {} >= {}",
window[0],
window[1]
);
}
db.close().ok();
}
#[test]
fn test_snapshot_scan_during_concurrent_deletes() {
let rows: Vec<(i64, String)> = (1..=50).map(|i| (i, format!("val_{}", i))).collect();
let row_refs: Vec<(i64, &str)> = rows.iter().map(|(id, val)| (*id, val.as_str())).collect();
let (_dir, db) = setup_cold_db("snap_scan", &row_refs);
let mut snap_tx = db
.begin_with_isolation(IsolationLevel::SnapshotIsolation)
.expect("snapshot begin failed");
for id in 1..=25 {
db.execute(&format!("DELETE FROM t WHERE id = {}", id), ())
.expect("concurrent delete failed");
}
let snap_count = tx_q_i64(&mut snap_tx, "SELECT COUNT(*) FROM t");
assert_eq!(
snap_count, 50,
"snapshot should see all 50 rows despite concurrent deletes, got {}",
snap_count
);
let rows_iter = snap_tx
.query("SELECT id FROM t ORDER BY id", ())
.expect("snapshot scan query failed");
let mut ids: Vec<i64> = Vec::new();
for row_result in rows_iter {
let row = row_result.expect("row iteration failed");
let id: i64 = row.get(0).expect("get id failed");
ids.push(id);
}
assert_eq!(
ids.len(),
50,
"snapshot scan should yield 50 rows, got {}",
ids.len()
);
assert_eq!(ids[0], 1, "first id should be 1");
assert_eq!(ids[49], 50, "last id should be 50");
snap_tx.rollback().expect("rollback failed");
let final_count = q_i64(&db, "SELECT COUNT(*) FROM t");
assert_eq!(
final_count, 25,
"auto-commit should see 25 rows after deletes, got {}",
final_count
);
db.close().ok();
}
#[test]
fn test_concurrent_updates_different_cold_rows() {
let rows: Vec<(i64, String)> = (1..=10).map(|i| (i, format!("orig_{}", i))).collect();
let row_refs: Vec<(i64, &str)> = rows.iter().map(|(id, val)| (*id, val.as_str())).collect();
let (_dir, db) = setup_cold_db("diff_rows", &row_refs);
let barrier = Arc::new(Barrier::new(10));
let success_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (1..=10)
.map(|id| {
let db_clone = db.clone();
let bar = Arc::clone(&barrier);
let sc = Arc::clone(&success_count);
thread::spawn(move || {
db_clone.execute("BEGIN", ()).expect("BEGIN failed");
bar.wait();
db_clone
.execute(
&format!("UPDATE t SET val = 'updated_{}' WHERE id = {}", id, id),
(),
)
.expect("UPDATE should succeed for different rows");
db_clone
.execute("COMMIT", ())
.expect("COMMIT should succeed");
sc.fetch_add(1, Ordering::SeqCst);
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
assert_eq!(
success_count.load(Ordering::SeqCst),
10,
"all 10 updates on different rows should succeed"
);
for id in 1..=10 {
let val: String = db
.query_one(&format!("SELECT val FROM t WHERE id = {}", id), ())
.expect("query_one failed");
assert_eq!(
val,
format!("updated_{}", id),
"row {} should be 'updated_{}', got '{}'",
id,
id,
val
);
}
db.close().ok();
}