#![forbid(unsafe_code)]
use std::ops::Bound;
use std::sync::mpsc;
use std::thread;
use obj::{Db, Document, IndexSpec};
use obj_core::codec::Dynamic;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Order {
customer_id: u64,
placed_at: u64,
}
impl Document for Order {
const COLLECTION: &'static str = "orders";
const VERSION: u32 = 1;
fn indexes() -> Vec<IndexSpec> {
vec![IndexSpec::standard("placed_at", "placed_at").expect("standard")]
}
}
fn fresh_db() -> (Db, TempDir) {
let dir = TempDir::new().expect("tmp");
let path = dir.path().join("txn-isolation.obj");
let db = Db::open(&path).expect("open");
(db, dir)
}
fn seed(db: &Db, lo: u64, hi: u64) {
for i in lo..hi {
let _ = db
.insert(Order {
customer_id: i,
placed_at: i,
})
.expect("seed insert");
}
}
fn assert_frozen_at_ten(coll: &obj::Collection<'_, Order>) -> obj::Result<()> {
let total = coll.count_all()?;
assert_eq!(
total, 10,
"count_all leaked post-snapshot rows \
(#12 — count walked the live B-tree, not the snapshot)",
);
let full: (Bound<Dynamic>, Bound<Dynamic>) = (Bound::Unbounded, Bound::Unbounded);
let hits: Vec<(Vec<u8>, Order)> = coll
.index_range("placed_at", full)?
.collect::<Result<Vec<_>, _>>()?;
assert_eq!(
hits.len(),
10,
"index_range leaked post-snapshot entries \
(#12 — range descent walked the live B-tree)",
);
assert!(
hits.iter().all(|(_k, o)| o.placed_at < 10),
"index_range surfaced a post-snapshot placed_at",
);
let windowed = coll.count_index_range(
"placed_at",
(
Bound::Included(Dynamic::U64(10)),
Bound::Excluded(Dynamic::U64(30)),
),
)?;
assert_eq!(
windowed, 0,
"count_index_range observed post-snapshot entries in [10, 30) \
(#12 — index range/count bypassed the snapshot)",
);
Ok(())
}
#[test]
fn read_txn_range_and_count_ignore_post_snapshot_writes() {
let (db, _dir) = fresh_db();
seed(&db, 0, 10);
let (snap_tx, snap_rx) = mpsc::channel::<()>();
let (commit_tx, commit_rx) = mpsc::channel::<()>();
thread::scope(|s| {
let db_r = &db;
let db_w = &db;
s.spawn(move || {
db_r.read_transaction(|tx| {
let coll = tx.collection::<Order>()?;
snap_tx.send(()).expect("snap_tx");
commit_rx.recv().expect("commit_rx");
assert_frozen_at_ten(&coll)
})
.expect("read_transaction");
});
s.spawn(move || {
snap_rx.recv().expect("snap_rx");
seed(db_w, 10, 30);
commit_tx.send(()).expect("commit_tx");
});
});
let (total, windowed) = db
.read_transaction(|tx| {
let coll = tx.collection::<Order>()?;
let total = coll.count_all()?;
let windowed = coll.count_index_range(
"placed_at",
(
Bound::Included(Dynamic::U64(10)),
Bound::Excluded(Dynamic::U64(30)),
),
)?;
Ok((total, windowed))
})
.expect("fresh read");
assert_eq!(total, 30, "fresh reader must see all 30 rows");
assert_eq!(
windowed, 20,
"fresh reader must see the 20 rows in [10, 30)"
);
}