use noxu_db::{
DatabaseConfig, DatabaseEntry, EnvironmentConfig, OperationStatus,
TransactionConfig,
};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
fn setup() -> (TempDir, noxu_db::Environment, noxu_db::Database) {
let dir = TempDir::new().unwrap();
let env_config = EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = noxu_db::Environment::open(env_config).unwrap();
let db_config = DatabaseConfig::new().with_allow_create(true);
let db = env.open_database(None, "test", &db_config).unwrap();
(dir, env, db)
}
fn put_committed(
env: &noxu_db::Environment,
db: &noxu_db::Database,
key: &[u8],
val: &[u8],
) {
let txn = env.begin_transaction(None).unwrap();
let k = DatabaseEntry::from_bytes(key);
let v = DatabaseEntry::from_bytes(val);
db.put(Some(&txn), &k, &v).unwrap();
txn.commit().unwrap();
}
fn get_val(
db: &noxu_db::Database,
txn: Option<&noxu_db::Transaction>,
key: &[u8],
buf: &mut DatabaseEntry,
) -> OperationStatus {
let k = DatabaseEntry::from_bytes(key);
db.get(txn, &k, buf).unwrap()
}
#[test]
fn test_dirty_read_prevented_under_all_isolation_levels() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"key", b"v1");
let env = Arc::new(env);
let db = Arc::new(db);
let barrier_write_done = Arc::new(Barrier::new(2));
let barrier_reader_done = Arc::new(Barrier::new(2));
let env_w = Arc::clone(&env);
let db_w = Arc::clone(&db);
let bwd = Arc::clone(&barrier_write_done);
let brd = Arc::clone(&barrier_reader_done);
let writer = thread::spawn(move || {
let txn = env_w.begin_transaction(None).unwrap();
let k = DatabaseEntry::from_bytes(b"key");
let v = DatabaseEntry::from_bytes(b"v2");
db_w.put(Some(&txn), &k, &v).unwrap();
bwd.wait();
brd.wait();
txn.commit().unwrap();
});
barrier_write_done.wait();
let rc_config = TransactionConfig::read_committed();
let reader_txn = env.begin_transaction(Some(&rc_config)).unwrap();
let key = DatabaseEntry::from_bytes(b"key");
let mut out = DatabaseEntry::new();
let status = db.get(Some(&reader_txn), &key, &mut out);
barrier_reader_done.wait();
writer.join().unwrap();
drop(reader_txn);
let txn2 = env.begin_transaction(Some(&rc_config)).unwrap();
let mut out2 = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&txn2), b"key", &mut out2),
OperationStatus::Success
);
assert_eq!(
out2.data(),
b"v2",
"committed write must be visible after commit"
);
txn2.commit().unwrap();
let _ = status;
}
#[test]
fn test_serializable_read_lock_blocks_writer_no_wait() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"k", b"v1");
let ser_txn = env.begin_transaction(None).unwrap(); let mut out = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&ser_txn), b"k", &mut out),
OperationStatus::Success
);
assert_eq!(out.data(), b"v1");
let no_wait_config = TransactionConfig::new().with_no_wait(true);
let writer_txn = env.begin_transaction(Some(&no_wait_config)).unwrap();
let k = DatabaseEntry::from_bytes(b"k");
let v2 = DatabaseEntry::from_bytes(b"v2");
let write_result = db.put(Some(&writer_txn), &k, &v2);
assert!(
write_result.is_err(),
"no_wait writer should fail while serializable reader holds read lock"
);
drop(writer_txn);
ser_txn.commit().unwrap();
let writer_txn2 = env.begin_transaction(Some(&no_wait_config)).unwrap();
let k = DatabaseEntry::from_bytes(b"k");
let v2 = DatabaseEntry::from_bytes(b"v2");
assert_eq!(
db.put(Some(&writer_txn2), &k, &v2).unwrap(),
OperationStatus::Success,
"write must succeed after serializable reader commits"
);
writer_txn2.commit().unwrap();
}
#[test]
fn test_read_committed_releases_lock_allowing_concurrent_writer() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"k", b"v1");
let rc_config = TransactionConfig::read_committed();
let reader_txn = env.begin_transaction(Some(&rc_config)).unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&reader_txn), b"k", &mut out),
OperationStatus::Success
);
assert_eq!(out.data(), b"v1");
let no_wait_config = TransactionConfig::new().with_no_wait(true);
let writer_txn = env.begin_transaction(Some(&no_wait_config)).unwrap();
let k = DatabaseEntry::from_bytes(b"k");
let v2 = DatabaseEntry::from_bytes(b"v2");
assert_eq!(
db.put(Some(&writer_txn), &k, &v2).unwrap(),
OperationStatus::Success,
"no_wait writer must succeed because read-committed released the read lock"
);
writer_txn.commit().unwrap();
reader_txn.commit().unwrap();
let mut out2 = DatabaseEntry::new();
assert_eq!(get_val(&db, None, b"k", &mut out2), OperationStatus::Success);
assert_eq!(out2.data(), b"v2");
}
#[test]
fn test_write_write_conflict_no_wait() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"ww", b"initial");
let txn_a = env.begin_transaction(None).unwrap();
let k = DatabaseEntry::from_bytes(b"ww");
let va = DatabaseEntry::from_bytes(b"from_a");
db.put(Some(&txn_a), &k, &va).unwrap();
let no_wait_config = TransactionConfig::new().with_no_wait(true);
let txn_b = env.begin_transaction(Some(&no_wait_config)).unwrap();
let k2 = DatabaseEntry::from_bytes(b"ww");
let vb = DatabaseEntry::from_bytes(b"from_b");
let result_b = db.put(Some(&txn_b), &k2, &vb);
assert!(
result_b.is_err(),
"second writer must fail: first writer holds WRITE lock"
);
drop(txn_b);
txn_a.commit().unwrap();
let txn_c = env.begin_transaction(Some(&no_wait_config)).unwrap();
let k3 = DatabaseEntry::from_bytes(b"ww");
let vc = DatabaseEntry::from_bytes(b"from_c");
assert_eq!(
db.put(Some(&txn_c), &k3, &vc).unwrap(),
OperationStatus::Success
);
txn_c.commit().unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(get_val(&db, None, b"ww", &mut out), OperationStatus::Success);
assert_eq!(out.data(), b"from_c");
}
#[test]
fn test_read_committed_allows_non_repeatable_read() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"nr", b"v1");
let rc_config = TransactionConfig::read_committed();
let reader = env.begin_transaction(Some(&rc_config)).unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&reader), b"nr", &mut out),
OperationStatus::Success
);
assert_eq!(out.data(), b"v1");
put_committed(&env, &db, b"nr", b"v2");
let mut out2 = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&reader), b"nr", &mut out2),
OperationStatus::Success
);
assert_eq!(
out2.data(),
b"v2",
"read-committed must allow non-repeatable reads (new committed value visible)"
);
reader.commit().unwrap();
}
#[test]
fn test_serializable_prevents_non_repeatable_read() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"rr", b"v1");
let ser_txn = env.begin_transaction(None).unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&ser_txn), b"rr", &mut out),
OperationStatus::Success
);
assert_eq!(out.data(), b"v1");
let no_wait = TransactionConfig::new().with_no_wait(true);
let w = env.begin_transaction(Some(&no_wait)).unwrap();
let k = DatabaseEntry::from_bytes(b"rr");
let v2 = DatabaseEntry::from_bytes(b"v2");
assert!(
db.put(Some(&w), &k, &v2).is_err(),
"write must fail: serializable reader holds read lock"
);
drop(w);
let mut out2 = DatabaseEntry::new();
assert_eq!(
get_val(&db, Some(&ser_txn), b"rr", &mut out2),
OperationStatus::Success
);
assert_eq!(
out2.data(),
b"v1",
"serializable must provide repeatable reads (same value both times)"
);
ser_txn.commit().unwrap();
}
#[test]
fn test_atomic_commit_all_or_nothing_visibility() {
const N: u32 = 100;
let (_dir, env, db) = setup();
let txn = env.begin_transaction(None).unwrap();
for i in 0u32..N {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let v = DatabaseEntry::from_bytes(b"batch");
db.put(Some(&txn), &k, &v).unwrap();
}
txn.commit().unwrap();
let read_txn = env.begin_transaction(None).unwrap();
let mut missing = 0u32;
for i in 0u32..N {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let mut v = DatabaseEntry::new();
if db.get(Some(&read_txn), &k, &mut v).unwrap()
!= OperationStatus::Success
{
missing += 1;
}
}
assert_eq!(missing, 0, "{missing} keys missing — partial commit observed");
read_txn.commit().unwrap();
}
#[test]
fn test_aborted_transaction_full_rollback() {
let (_dir, env, db) = setup();
put_committed(&env, &db, b"existing", b"original");
let txn = env.begin_transaction(None).unwrap();
let k1 = DatabaseEntry::from_bytes(b"existing");
let v1 = DatabaseEntry::from_bytes(b"modified");
db.put(Some(&txn), &k1, &v1).unwrap();
let k2 = DatabaseEntry::from_bytes(b"new_key");
let v2 = DatabaseEntry::from_bytes(b"new_val");
db.put(Some(&txn), &k2, &v2).unwrap();
txn.abort().unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(
get_val(&db, None, b"existing", &mut out),
OperationStatus::Success
);
assert_eq!(out.data(), b"original", "abort must restore before-image");
let mut out2 = DatabaseEntry::new();
assert_eq!(
get_val(&db, None, b"new_key", &mut out2),
OperationStatus::NotFound,
"abort must remove newly inserted keys"
);
}
fn scratch_dir(prefix: &str) -> TempDir {
let mut builder = tempfile::Builder::new();
builder.prefix(prefix);
match std::env::var_os("NOXU_TEST_SCRATCH") {
Some(p) => {
builder.tempdir_in(std::path::Path::new(&p)).unwrap_or_else(|e| {
panic!(
"create temp dir under NOXU_TEST_SCRATCH={}: {e}",
std::path::Path::new(&p).display()
)
})
}
None => builder.tempdir().expect("create temp dir"),
}
}
#[test]
fn test_32_thread_concurrent_readers() {
const KEYS: u32 = 200;
const THREADS: usize = 32;
let (_dir, env, db) = setup();
let env = Arc::new(env);
let db = Arc::new(db);
for i in 0u32..KEYS {
put_committed(&env, &db, &i.to_be_bytes(), b"val");
}
let barrier = Arc::new(Barrier::new(THREADS));
let handles: Vec<_> = (0..THREADS)
.map(|_| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait(); let rc = TransactionConfig::read_committed();
let txn = env.begin_transaction(Some(&rc)).unwrap();
let mut missing = 0u32;
for i in 0u32..KEYS {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let mut v = DatabaseEntry::new();
match db.get(Some(&txn), &k, &mut v).unwrap() {
OperationStatus::Success => {
assert_eq!(v.data(), b"val");
}
OperationStatus::NotFound => missing += 1,
other => panic!("unexpected status {other:?}"),
}
}
txn.commit().unwrap();
missing
})
})
.collect();
let total_missing: u32 = handles
.into_iter()
.map(|h| h.join().expect("reader thread panicked"))
.sum();
assert_eq!(
total_missing, 0,
"{total_missing} key reads returned NotFound across 32 concurrent readers"
);
}
#[test]
fn test_8r8w_all_committed_data_visible() {
const KEYS_PER_WRITER: u32 = 10;
const WRITERS: u32 = 8;
let (_dir, env, db) = setup();
let env = Arc::new(env);
let db = Arc::new(db);
let start_barrier = Arc::new(Barrier::new(WRITERS as usize + 8));
let done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writers: Vec<_> = (0..WRITERS)
.map(|w| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let b = Arc::clone(&start_barrier);
thread::spawn(move || {
b.wait();
for j in 0u32..KEYS_PER_WRITER {
let key_idx = w * KEYS_PER_WRITER + j;
let k = DatabaseEntry::from_bytes(&key_idx.to_be_bytes());
let v = DatabaseEntry::from_bytes(b"written");
let txn = env.begin_transaction(None).unwrap();
db.put(Some(&txn), &k, &v).unwrap();
txn.commit().unwrap();
}
})
})
.collect();
let readers: Vec<_> = (0..8)
.map(|_| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let b = Arc::clone(&start_barrier);
let done = Arc::clone(&done);
thread::spawn(move || {
b.wait();
let rc = TransactionConfig::read_committed();
while !done.load(std::sync::atomic::Ordering::Relaxed) {
let txn = env.begin_transaction(Some(&rc)).unwrap();
for i in 0u32..WRITERS * KEYS_PER_WRITER {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let mut v = DatabaseEntry::new();
let _ = db.get(Some(&txn), &k, &mut v);
}
txn.commit().unwrap();
thread::sleep(Duration::from_millis(1));
}
})
})
.collect();
for w in writers {
w.join().expect("writer thread panicked");
}
done.store(true, std::sync::atomic::Ordering::Relaxed);
for r in readers {
r.join().expect("reader thread panicked");
}
let total = WRITERS * KEYS_PER_WRITER;
let mut missing = 0u32;
for i in 0u32..total {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let mut v = DatabaseEntry::new();
if db.get(None, &k, &mut v).unwrap() == OperationStatus::NotFound {
missing += 1;
}
}
assert_eq!(
missing, 0,
"{missing}/{total} keys missing after all writers committed"
);
}
#[test]
#[ignore = "stress: 64 concurrent readers × 1000 keys × 1000 txns; run with --ignored"]
fn test_64_thread_concurrent_readers() {
use std::time::Instant;
const KEYS: u32 = 1_000;
const THREADS: usize = 64;
const TXNS_PER_THREAD: u32 = 1_000;
const LOOKUPS_PER_TXN: u32 = 10;
let dir = scratch_dir("noxu_64r_");
let env_config = EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Arc::new(noxu_db::Environment::open(env_config).unwrap());
let db = Arc::new(
env.open_database(
None,
"test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap(),
);
for i in 0u32..KEYS {
let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
let v = DatabaseEntry::from_bytes(b"rval");
let txn = env.begin_transaction(None).unwrap();
db.put(Some(&txn), &k, &v).unwrap();
txn.commit().unwrap();
}
let barrier = Arc::new(Barrier::new(THREADS));
let start = std::sync::OnceLock::new();
let start = Arc::new(start);
let handles: Vec<_> = (0..THREADS)
.map(|tid| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
let start = Arc::clone(&start);
thread::spawn(move || {
barrier.wait();
start.get_or_init(Instant::now);
let rc = TransactionConfig::read_committed();
let mut errors = 0u32;
for _ in 0..TXNS_PER_THREAD {
let txn = env.begin_transaction(Some(&rc)).unwrap();
for j in 0u32..LOOKUPS_PER_TXN {
let idx = (tid as u32 * LOOKUPS_PER_TXN + j) % KEYS;
let k = DatabaseEntry::from_bytes(&idx.to_be_bytes());
let mut v = DatabaseEntry::new();
if db.get(Some(&txn), &k, &mut v).unwrap()
!= OperationStatus::Success
{
errors += 1;
}
}
txn.commit().unwrap();
}
errors
})
})
.collect();
let total_errors: u32 = handles
.into_iter()
.map(|h| h.join().expect("reader thread panicked"))
.sum();
let elapsed = start.get().map(|t| t.elapsed()).unwrap_or_default();
let total_ops =
THREADS as u64 * TXNS_PER_THREAD as u64 * LOOKUPS_PER_TXN as u64;
let ops_per_sec = total_ops as f64 / elapsed.as_secs_f64();
println!(
"64-thread readers: {total_ops} lookups in {elapsed:?} ({ops_per_sec:.0} ops/s)"
);
assert_eq!(
total_errors, 0,
"{total_errors} lookups returned NotFound across 64 concurrent readers"
);
}
#[test]
#[ignore = "stress: 32 reader + 32 writer threads × 5000 ops each; run with --ignored"]
fn test_32r32w_concurrent() {
const WRITERS: usize = 32;
const READERS: usize = 32;
const OPS_PER_WRITER: u32 = 5_000;
let dir = scratch_dir("noxu_32r32w_");
let env_config = EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Arc::new(noxu_db::Environment::open(env_config).unwrap());
let db = Arc::new(
env.open_database(
None,
"test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap(),
);
let done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(WRITERS + READERS));
let writers: Vec<_> = (0..WRITERS)
.map(|wid| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for j in 0..OPS_PER_WRITER {
let key = format!("w{wid:03}:{j:04}");
let k = DatabaseEntry::from_bytes(key.as_bytes());
let v = DatabaseEntry::from_bytes(b"wval");
let txn = env.begin_transaction(None).unwrap();
db.put(Some(&txn), &k, &v).unwrap();
txn.commit().unwrap();
}
})
})
.collect();
let readers: Vec<_> = (0..READERS)
.map(|_| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let done = Arc::clone(&done);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let rc = TransactionConfig::read_committed();
while !done.load(std::sync::atomic::Ordering::Relaxed) {
let txn = env.begin_transaction(Some(&rc)).unwrap();
let mut cursor = db.open_cursor(Some(&txn), None).unwrap();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
let _ =
cursor.get(&mut k, &mut v, noxu_db::Get::First, None);
while cursor
.get(&mut k, &mut v, noxu_db::Get::Next, None)
.unwrap()
== OperationStatus::Success
{}
cursor.close().unwrap();
txn.commit().unwrap();
thread::sleep(Duration::from_millis(1));
}
})
})
.collect();
for w in writers {
w.join().expect("writer thread panicked");
}
done.store(true, std::sync::atomic::Ordering::Relaxed);
for r in readers {
r.join().expect("reader thread panicked");
}
let mut missing = 0u32;
for wid in 0..WRITERS {
for j in 0..OPS_PER_WRITER {
let key = format!("w{wid:03}:{j:04}");
let k = DatabaseEntry::from_bytes(key.as_bytes());
let mut v = DatabaseEntry::new();
if db.get(None, &k, &mut v).unwrap() == OperationStatus::NotFound {
missing += 1;
}
}
}
let total = WRITERS as u32 * OPS_PER_WRITER;
assert_eq!(
missing, 0,
"{missing}/{total} keys missing after 32r32w workload"
);
}
#[test]
#[ignore = "stress: 200 threads × disjoint writers, up to 120 s wall time; run with --ignored"]
fn test_200_thread_disjoint_writers() {
use std::time::Instant;
const THREADS: usize = 200;
const KEYS_PER_THREAD: u32 = 50;
const TOTAL_KEYS: u32 = THREADS as u32 * KEYS_PER_THREAD;
let dir = scratch_dir("noxu_200w_");
let env_config = EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Arc::new(noxu_db::Environment::open(env_config).unwrap());
let db = Arc::new(
env.open_database(
None,
"test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap(),
);
let barrier = Arc::new(Barrier::new(THREADS));
let start = Instant::now();
let handles: Vec<_> = (0..THREADS)
.map(|tid| {
let env = Arc::clone(&env);
let db = Arc::clone(&db);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for i in 0..KEYS_PER_THREAD {
let key = format!("range{tid:03}:{i:04}");
let k = DatabaseEntry::from_bytes(key.as_bytes());
let v = DatabaseEntry::from_bytes(b"dval");
let txn = env.begin_transaction(None).unwrap();
db.put(Some(&txn), &k, &v).unwrap();
txn.commit().unwrap();
}
})
})
.collect();
for h in handles {
h.join().expect("writer thread panicked");
}
let elapsed = start.elapsed();
let ops_per_sec = TOTAL_KEYS as f64 / elapsed.as_secs_f64();
println!(
"200-thread disjoint writers: {TOTAL_KEYS} keys in {elapsed:?} ({ops_per_sec:.0} ops/s)"
);
assert!(
elapsed.as_secs() < 120,
"200-thread test took {elapsed:?}, exceeded 120 s budget"
);
let mut missing = 0u32;
for tid in 0..THREADS {
for i in 0..KEYS_PER_THREAD {
let key = format!("range{tid:03}:{i:04}");
let k = DatabaseEntry::from_bytes(key.as_bytes());
let mut v = DatabaseEntry::new();
if db.get(None, &k, &mut v).unwrap() == OperationStatus::NotFound {
missing += 1;
}
}
}
assert_eq!(
missing, 0,
"{missing}/{TOTAL_KEYS} keys missing after 200-thread write"
);
let mut cursor = db.open_cursor(None, None).unwrap();
let mut prev: Option<String> = None;
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
let mut order_errors = 0u32;
let mut checked = 0u32;
let mut op = noxu_db::Get::First;
loop {
if cursor.get(&mut k, &mut v, op, None).unwrap()
!= OperationStatus::Success
{
break;
}
let cur = String::from_utf8_lossy(k.get_data().unwrap_or_default())
.into_owned();
if let Some(ref p) = prev
&& cur < *p
{
order_errors += 1;
}
prev = Some(cur);
checked += 1;
op = noxu_db::Get::Next;
}
cursor.close().unwrap();
assert_eq!(
order_errors, 0,
"{order_errors} out-of-order keys found in cursor scan of {checked} entries"
);
assert_eq!(
checked, TOTAL_KEYS,
"cursor scan returned {checked} entries, expected {TOTAL_KEYS}"
);
}
#[test]
fn test_serializable_prevents_phantom_insert() {
let dir = TempDir::new().unwrap();
let env = noxu_db::Environment::open(
EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
let db = env
.open_database(
None,
"phantom_test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
for (k, v) in &[(b"a".as_ref(), b"val_a".as_ref()), (b"c", b"val_c")] {
let txn = env.begin_transaction(None).unwrap();
db.put(
Some(&txn),
&DatabaseEntry::from_bytes(k),
&DatabaseEntry::from_bytes(v),
)
.unwrap();
txn.commit().unwrap();
}
let ser_cfg = TransactionConfig::new().with_serializable_isolation(true);
let t1 = env.begin_transaction(Some(&ser_cfg)).unwrap();
let mut out = DatabaseEntry::new();
assert_eq!(
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"a"), &mut out).unwrap(),
OperationStatus::Success,
"T1 should read 'a'"
);
assert_eq!(
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"c"), &mut out).unwrap(),
OperationStatus::Success,
"T1 should read 'c'"
);
let no_wait_cfg = TransactionConfig::new().with_no_wait(true);
let t2 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
let insert_result = db.put(
Some(&t2),
&DatabaseEntry::from_bytes(b"bb"),
&DatabaseEntry::from_bytes(b"val_bb"),
);
let _ = t2.abort();
assert!(
insert_result.is_err(),
"T2's insert of 'bb' MUST fail (LockNotAvailable) while T1 holds \
RangeRead on the successor key 'c'. Got: {:?}",
insert_result
);
let err = insert_result.unwrap_err();
assert!(
matches!(err, noxu_db::NoxuError::LockNotAvailable),
"Expected LockNotAvailable (RangeRead⇔RangeInsert conflict), got: {err:?}"
);
t1.commit().unwrap();
let t3 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
let result = db.put(
Some(&t3),
&DatabaseEntry::from_bytes(b"bb"),
&DatabaseEntry::from_bytes(b"val_bb"),
);
assert!(
result.is_ok(),
"After T1 commits, insert of 'bb' must succeed; got: {result:?}"
);
t3.commit().unwrap();
}
#[test]
fn test_default_isolation_allows_phantom_insert() {
let dir = TempDir::new().unwrap();
let env = noxu_db::Environment::open(
EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
let db = env
.open_database(
None,
"phantom_rr_test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
for (k, v) in &[(b"a".as_ref(), b"v".as_ref()), (b"c", b"v")] {
let txn = env.begin_transaction(None).unwrap();
db.put(
Some(&txn),
&DatabaseEntry::from_bytes(k),
&DatabaseEntry::from_bytes(v),
)
.unwrap();
txn.commit().unwrap();
}
let t1 = env.begin_transaction(None).unwrap(); let mut out = DatabaseEntry::new();
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"a"), &mut out).unwrap();
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"c"), &mut out).unwrap();
let no_wait_cfg = TransactionConfig::new().with_no_wait(true);
let t2 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
let result = db.put(
Some(&t2),
&DatabaseEntry::from_bytes(b"bb"),
&DatabaseEntry::from_bytes(b"val_bb"),
);
assert!(
result.is_ok(),
"Under default (non-serializable) isolation, phantom insert MUST \
succeed. Got: {result:?}"
);
t2.commit().unwrap();
t1.commit().unwrap();
}
#[test]
fn test_read_committed_allows_phantom_insert() {
let dir = TempDir::new().unwrap();
let env = noxu_db::Environment::open(
EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
let db = env
.open_database(
None,
"phantom_rc_test",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
for (k, v) in &[(b"a".as_ref(), b"v".as_ref()), (b"c", b"v")] {
let txn = env.begin_transaction(None).unwrap();
db.put(
Some(&txn),
&DatabaseEntry::from_bytes(k),
&DatabaseEntry::from_bytes(v),
)
.unwrap();
txn.commit().unwrap();
}
let rc_cfg = TransactionConfig::read_committed();
let t1 = env.begin_transaction(Some(&rc_cfg)).unwrap();
let mut out = DatabaseEntry::new();
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"a"), &mut out).unwrap();
db.get(Some(&t1), &DatabaseEntry::from_bytes(b"c"), &mut out).unwrap();
let no_wait_cfg = TransactionConfig::new().with_no_wait(true);
let t2 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
let result = db.put(
Some(&t2),
&DatabaseEntry::from_bytes(b"bb"),
&DatabaseEntry::from_bytes(b"val_bb"),
);
assert!(
result.is_ok(),
"Under READ_COMMITTED isolation phantom insert must succeed \
(no RangeRead held after per-op release). Got: {result:?}"
);
t2.commit().unwrap();
t1.commit().unwrap();
}
#[test]
fn test_serializable_scan_then_insert_same_txn_no_panic() {
let dir = TempDir::new().unwrap();
let env = noxu_db::Environment::open(
EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
let db = env
.open_database(
None,
"scan_insert_same_txn",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
for (k, v) in &[(b"a".as_ref(), b"v".as_ref()), (b"c", b"v")] {
let txn = env.begin_transaction(None).unwrap();
db.put(
Some(&txn),
&DatabaseEntry::from_bytes(k),
&DatabaseEntry::from_bytes(v),
)
.unwrap();
txn.commit().unwrap();
}
let ser_cfg = TransactionConfig::new().with_serializable_isolation(true);
let txn = env.begin_transaction(Some(&ser_cfg)).unwrap();
let mut out = DatabaseEntry::new();
db.get(Some(&txn), &DatabaseEntry::from_bytes(b"c"), &mut out).unwrap();
let result = db.put(
Some(&txn),
&DatabaseEntry::from_bytes(b"bb"),
&DatabaseEntry::from_bytes(b"val_bb"),
);
assert!(
result.is_ok(),
"Same-txn scan+insert must not panic (owns_any_lock guard). Got: {result:?}"
);
txn.commit().unwrap();
}
#[test]
fn test_serializable_prevents_phantom_eof_insert() {
let dir = TempDir::new().unwrap();
let env = noxu_db::Environment::open(
EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
let db = env
.open_database(
None,
"phantom_eof_test",
&DatabaseConfig::new()
.with_allow_create(true)
.with_transactional(true),
)
.unwrap();
{
let txn = env.begin_transaction(None).unwrap();
db.put(
Some(&txn),
&DatabaseEntry::from_bytes(b"m"),
&DatabaseEntry::from_bytes(b"v"),
)
.unwrap();
txn.commit().unwrap();
}
let ser_cfg = TransactionConfig::new().with_serializable_isolation(true);
let t1 = env.begin_transaction(Some(&ser_cfg)).unwrap();
let mut cursor = db.open_cursor(Some(&t1), None).unwrap();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
assert_eq!(
cursor.get(&mut k, &mut v, noxu_db::Get::First, None).unwrap(),
OperationStatus::Success
);
assert_eq!(
cursor.get(&mut k, &mut v, noxu_db::Get::Next, None).unwrap(),
OperationStatus::NotFound
);
cursor.close().unwrap();
let no_wait_cfg = TransactionConfig::new().with_no_wait(true);
let t2 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
let insert_result = db.put(
Some(&t2),
&DatabaseEntry::from_bytes(b"z"),
&DatabaseEntry::from_bytes(b"val_z"),
);
let _ = t2.abort();
assert!(
insert_result.is_err(),
"T2's append-past-EOF insert of 'z' MUST fail while T1 holds \
RangeRead on the EOF sentinel. Got: {:?}",
insert_result
);
assert!(
matches!(
insert_result.unwrap_err(),
noxu_db::NoxuError::LockNotAvailable
),
"Expected LockNotAvailable from EOF sentinel conflict"
);
t1.commit().unwrap();
let t3 = env.begin_transaction(Some(&no_wait_cfg)).unwrap();
assert!(
db.put(
Some(&t3),
&DatabaseEntry::from_bytes(b"z"),
&DatabaseEntry::from_bytes(b"val_z"),
)
.is_ok(),
"After T1 commits, 'z' insert must succeed"
);
t3.commit().unwrap();
}