use noxu_db::{
DatabaseConfig, DatabaseEntry, DiskOrderedCursorConfig, Environment,
EnvironmentConfig, OperationStatus, open_disk_ordered_cursor_multi,
};
use std::collections::{HashMap, HashSet};
use tempfile::TempDir;
fn open_env(dir: &TempDir) -> Environment {
let env_config = EnvironmentConfig::new(dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
Environment::open(env_config).unwrap()
}
fn open_db(env: &Environment, name: &str) -> noxu_db::Database {
let db_config = DatabaseConfig::new().with_allow_create(true);
env.open_database(None, name, &db_config).unwrap()
}
fn put(db: &noxu_db::Database, key: &[u8], data: &[u8]) {
let k = DatabaseEntry::from_data(key);
let v = DatabaseEntry::from_data(data);
db.put(None, &k, &v).unwrap();
}
fn delete(db: &noxu_db::Database, key: &[u8]) {
let k = DatabaseEntry::from_data(key);
db.delete(None, &k).unwrap();
}
fn drain(
cursor: &mut noxu_db::DiskOrderedCursor<'_>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut out = Vec::new();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
while cursor.next(&mut k, &mut v).unwrap() == OperationStatus::Success {
out.push((k.data().to_vec(), v.data().to_vec()));
}
out
}
#[test]
fn walks_all_inserted_records() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_walk");
let n = 1000;
let mut expected: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
for i in 0..n {
let k = format!("key-{i:06}").into_bytes();
let v = format!("value-{i}").into_bytes();
put(&db, &k, &v);
expected.insert(k, v);
}
env.checkpoint(None).unwrap();
let mut cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
let got = drain(&mut cursor);
let mut got_map: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
for (k, v) in got {
got_map.insert(k, v);
}
assert_eq!(got_map, expected, "bulk scan must return every live record");
}
#[test]
fn skips_deleted_records() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_delete");
for i in 0..50 {
put(&db, format!("k{i}").as_bytes(), format!("v{i}").as_bytes());
}
for i in (0..50).step_by(2) {
delete(&db, format!("k{i}").as_bytes());
}
env.checkpoint(None).unwrap();
let mut cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
let got = drain(&mut cursor);
let live: HashSet<Vec<u8>> =
(1..50).step_by(2).map(|i| format!("k{i}").into_bytes()).collect();
let mut keys_seen: HashSet<Vec<u8>> = HashSet::new();
for (k, _) in &got {
keys_seen.insert(k.clone());
}
for k in &live {
assert!(keys_seen.contains(k), "live key {k:?} missing from scan");
}
}
#[test]
fn skips_deleted_records_with_dedup() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_delete_dedup");
for i in 0..50 {
put(&db, format!("k{i}").as_bytes(), format!("v{i}").as_bytes());
}
for i in (0..50).step_by(2) {
delete(&db, format!("k{i}").as_bytes());
}
env.checkpoint(None).unwrap();
let mut cursor = db
.open_disk_ordered_cursor(
DiskOrderedCursorConfig::new().with_dedup_keys(true),
)
.unwrap();
let got = drain(&mut cursor);
let keys_seen: HashSet<Vec<u8>> =
got.iter().map(|(k, _)| k.clone()).collect();
assert_eq!(keys_seen.len(), got.len(), "dedup must yield each key once");
for i in 0..50 {
let k = format!("k{i}").into_bytes();
assert!(keys_seen.contains(&k), "key {i} missing from dedup scan");
}
}
#[test]
fn multi_db_scan_returns_all_dbs() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db_a = open_db(&env, "doc_multi_a");
let db_b = open_db(&env, "doc_multi_b");
for i in 0..100 {
put(&db_a, format!("a-{i}").as_bytes(), format!("va-{i}").as_bytes());
put(&db_b, format!("b-{i}").as_bytes(), format!("vb-{i}").as_bytes());
}
env.checkpoint(None).unwrap();
let dbs: [&noxu_db::Database; 2] = [&db_a, &db_b];
let mut cursor =
open_disk_ordered_cursor_multi(&dbs, DiskOrderedCursorConfig::new())
.unwrap();
let got = drain(&mut cursor);
let mut a_count = 0;
let mut b_count = 0;
for (k, _) in &got {
if k.starts_with(b"a-") {
a_count += 1;
} else if k.starts_with(b"b-") {
b_count += 1;
} else {
panic!("unexpected key prefix: {k:?}");
}
}
assert!(a_count >= 100, "got {a_count} a-* keys; expected >= 100");
assert!(b_count >= 100, "got {b_count} b-* keys; expected >= 100");
}
#[test]
fn bounded_queue_completes() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_bounded");
for i in 0..200 {
put(&db, format!("k{i:04}").as_bytes(), format!("v{i}").as_bytes());
}
env.checkpoint(None).unwrap();
let cfg = DiskOrderedCursorConfig::new()
.with_queue_size(2)
.with_internal_memory_limit(64);
let mut cursor = db.open_disk_ordered_cursor(cfg).unwrap();
let got = drain(&mut cursor);
let keys_seen: HashSet<Vec<u8>> =
got.iter().map(|(k, _)| k.clone()).collect();
for i in 0..200 {
let k = format!("k{i:04}").into_bytes();
assert!(keys_seen.contains(&k), "key {k:?} missing from bounded scan");
}
}
#[test]
fn drop_mid_iteration_joins_producer() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_drop_mid");
for i in 0..500 {
put(&db, format!("k{i:04}").as_bytes(), format!("v{i}").as_bytes());
}
env.checkpoint(None).unwrap();
let cfg = DiskOrderedCursorConfig::new()
.with_queue_size(1)
.with_internal_memory_limit(8);
let mut cursor = db.open_disk_ordered_cursor(cfg).unwrap();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
for _ in 0..5 {
let st = cursor.next(&mut k, &mut v).unwrap();
assert_eq!(st, OperationStatus::Success);
}
drop(cursor);
drop(db);
env.close().unwrap();
}
#[test]
fn close_is_idempotent() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_close_idem");
put(&db, b"only", b"one");
env.checkpoint(None).unwrap();
let cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
cursor.close().unwrap();
let cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
drop(cursor);
}
#[test]
fn stale_versions_visible_by_default() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_stale");
let key = b"the-key";
put(&db, key, b"v1");
put(&db, key, b"v2");
put(&db, key, b"v3");
env.checkpoint(None).unwrap();
let mut cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
let got = drain(&mut cursor);
let values_seen: HashSet<Vec<u8>> =
got.iter().filter(|(k, _)| k == key).map(|(_, v)| v.clone()).collect();
assert!(values_seen.contains(b"v1".as_slice()));
assert!(values_seen.contains(b"v2".as_slice()));
assert!(values_seen.contains(b"v3".as_slice()));
}
#[test]
fn dedup_keys_filters_repeated_keys() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_dedup");
let key = b"the-key";
put(&db, key, b"v1");
put(&db, key, b"v2");
put(&db, key, b"v3");
env.checkpoint(None).unwrap();
let mut cursor = db
.open_disk_ordered_cursor(
DiskOrderedCursorConfig::new().with_dedup_keys(true),
)
.unwrap();
let got = drain(&mut cursor);
let count = got.iter().filter(|(k, _)| k == key).count();
assert_eq!(count, 1, "dedup must yield each key exactly once");
}
#[test]
fn current_returns_last_record() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_current");
put(&db, b"a", b"1");
put(&db, b"b", b"2");
env.checkpoint(None).unwrap();
let mut cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
assert_eq!(
cursor.current(&mut k, &mut v).unwrap(),
OperationStatus::NotFound
);
assert_eq!(cursor.next(&mut k, &mut v).unwrap(), OperationStatus::Success);
let first_k = k.data().to_vec();
let first_v = v.data().to_vec();
let mut k2 = DatabaseEntry::new();
let mut v2 = DatabaseEntry::new();
assert_eq!(
cursor.current(&mut k2, &mut v2).unwrap(),
OperationStatus::Success
);
assert_eq!(k2.data(), first_k.as_slice());
assert_eq!(v2.data(), first_v.as_slice());
}
#[test]
fn empty_db_yields_no_records() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_empty");
env.checkpoint(None).unwrap();
let mut cursor =
db.open_disk_ordered_cursor(DiskOrderedCursorConfig::new()).unwrap();
let mut k = DatabaseEntry::new();
let mut v = DatabaseEntry::new();
assert_eq!(cursor.next(&mut k, &mut v).unwrap(), OperationStatus::NotFound);
}
#[test]
fn keys_only_returns_empty_data() {
let dir = TempDir::new().unwrap();
let env = open_env(&dir);
let db = open_db(&env, "doc_keys_only");
for i in 0..20 {
put(
&db,
format!("k{i}").as_bytes(),
format!("longish-value-{i}").as_bytes(),
);
}
env.checkpoint(None).unwrap();
let mut cursor = db
.open_disk_ordered_cursor(
DiskOrderedCursorConfig::new().with_keys_only(true),
)
.unwrap();
let got = drain(&mut cursor);
assert!(!got.is_empty(), "should still return keys");
for (_, v) in &got {
assert!(v.is_empty(), "keys_only mode must elide data");
}
}
#[test]
fn empty_db_list_is_rejected() {
let dir = TempDir::new().unwrap();
let _env = open_env(&dir);
let dbs: [&noxu_db::Database; 0] = [];
let res =
open_disk_ordered_cursor_multi(&dbs, DiskOrderedCursorConfig::new());
assert!(res.is_err(), "empty db list must be rejected");
}