#![cfg(any(feature = "var-collections", feature = "typed-tree"))]
use armdb::codec::Codec;
use armdb::{Config, TypedMap, TypedTree, TypedWriteHook};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::time::{Duration, Instant};
use tempfile::tempdir;
use armdb::{ConstMap, ConstTree, MigrateAction, WriteHook};
#[cfg(feature = "typed-tree")]
use armdb::{ZeroMap, ZeroTree};
#[cfg(feature = "var-collections")]
use armdb::{VarMap, VarTree};
#[cfg(any(feature = "var-collections", feature = "typed-tree"))]
use parking_lot::Mutex;
#[cfg(feature = "var-collections")]
struct BlockingHook {
block_key: Arc<Mutex<Option<[u8; 8]>>>,
entered: Arc<AtomicBool>,
proceed: Arc<AtomicBool>,
}
#[cfg(feature = "var-collections")]
impl WriteHook<[u8; 8]> for BlockingHook {
const NEEDS_OLD_VALUE: bool = false;
fn on_write(&self, key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {
let block_key = *self.block_key.lock();
if block_key != Some(*key) {
return;
}
self.entered.store(true, Ordering::Release);
while !self.proceed.load(Ordering::Acquire) {
std::hint::spin_loop();
}
}
}
fn peer_key_on_same_shard(key: &[u8; 8], shard_for: impl Fn(&[u8; 8]) -> usize) -> [u8; 8] {
let shard = shard_for(key);
(0u64..10_000)
.map(|i| i.to_be_bytes())
.find(|k| k != key && shard_for(k) == shard)
.expect("second key on same shard")
}
fn wait_for_entered(entered: &AtomicBool, label: &str) {
let start = Instant::now();
while !entered.load(Ordering::Acquire) {
assert!(
start.elapsed() < Duration::from_secs(2),
"{label}: hook was not entered before timeout"
);
std::thread::yield_now();
}
}
fn assert_thread_completes(label: &str, f: impl FnOnce() + Send + 'static) {
let (tx, rx) = mpsc::sync_channel(1);
std::thread::spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(Ok(())) => {}
Ok(Err(panic)) => std::panic::resume_unwind(panic),
Err(_) => panic!("{label}: operation did not complete before timeout"),
}
}
fn wait_for_worker<T: Send + 'static>(rx: mpsc::Receiver<T>, label: &str) -> T {
rx.recv_timeout(Duration::from_secs(2))
.unwrap_or_else(|_| panic!("{label}: worker did not complete before timeout"))
}
#[cfg(feature = "var-collections")]
fn run_blocking_hook_var_tree_put() {
let dir = tempdir().unwrap();
let key1 = 1u64.to_be_bytes();
let block_key = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let proceed = Arc::new(AtomicBool::new(false));
let hook = BlockingHook {
block_key: Arc::clone(&block_key),
entered: Arc::clone(&entered),
proceed: Arc::clone(&proceed),
};
let tree = Arc::new(
VarTree::<[u8; 8], BlockingHook>::open_hooked(dir.path(), Config::test(), hook).unwrap(),
);
let key2 = peer_key_on_same_shard(&key1, |k| tree.shard_for(k));
let val = b"test";
tree.put(&key1, b"seed1").unwrap();
tree.put(&key2, b"seed2").unwrap();
*block_key.lock() = Some(key1);
let tree2 = Arc::clone(&tree);
let (done_tx, done_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || {
let result = tree2.put(&key1, val);
let _ = done_tx.send(result);
});
wait_for_entered(&entered, "var_tree blocking put");
let tree3 = Arc::clone(&tree);
assert_thread_completes("var_tree same-shard peer put", move || {
tree3.put(&key2, val).unwrap();
});
proceed.store(true, Ordering::Release);
wait_for_worker(done_rx, "var_tree blocking put")
.expect("var_tree blocking put should succeed");
}
#[cfg(feature = "var-collections")]
#[test]
fn test_hook_does_not_hold_shard_lock_var_tree() {
run_blocking_hook_var_tree_put();
}
#[cfg(feature = "var-collections")]
#[test]
fn test_hook_does_not_hold_shard_lock_var_map() {
let dir = tempdir().unwrap();
let key1 = 1u64.to_be_bytes();
let block_key = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let proceed = Arc::new(AtomicBool::new(false));
let hook = BlockingHook {
block_key: Arc::clone(&block_key),
entered: Arc::clone(&entered),
proceed: Arc::clone(&proceed),
};
let map = Arc::new(
VarMap::<[u8; 8], BlockingHook>::open_hooked(dir.path(), Config::test(), hook).unwrap(),
);
let key2 = peer_key_on_same_shard(&key1, |k| map.shard_for(k));
let val = b"test";
map.put(&key1, b"seed1").unwrap();
map.put(&key2, b"seed2").unwrap();
*block_key.lock() = Some(key1);
let map2 = Arc::clone(&map);
let (done_tx, done_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || {
let result = map2.put(&key1, val);
let _ = done_tx.send(result);
});
wait_for_entered(&entered, "var_map blocking put");
let map3 = Arc::clone(&map);
assert_thread_completes("var_map same-shard peer put", move || {
map3.put(&key2, val).unwrap();
});
proceed.store(true, Ordering::Release);
wait_for_worker(done_rx, "var_map blocking put").expect("var_map blocking put should succeed");
}
#[cfg(feature = "typed-tree")]
struct TypedBlockingHook {
block_key: Arc<Mutex<Option<[u8; 8]>>>,
entered: Arc<AtomicBool>,
proceed: Arc<AtomicBool>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for TypedBlockingHook {
const NEEDS_OLD_VALUE: bool = false;
fn on_write(&self, key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {
let block_key = *self.block_key.lock();
if block_key != Some(*key) {
return;
}
self.entered.store(true, Ordering::Release);
while !self.proceed.load(Ordering::Acquire) {
std::hint::spin_loop();
}
}
}
#[cfg(feature = "typed-tree")]
struct U64Codec;
#[cfg(feature = "typed-tree")]
impl Codec<u64> for U64Codec {
fn encode_to(&self, value: &u64, buf: &mut Vec<u8>) -> armdb::DbResult<()> {
buf.clear();
buf.extend_from_slice(&value.to_be_bytes());
Ok(())
}
fn decode_from(&self, bytes: &[u8]) -> armdb::DbResult<u64> {
let arr: [u8; 8] = bytes
.try_into()
.map_err(|_| armdb::DbError::CorruptedEntry { offset: 0 })?;
Ok(u64::from_be_bytes(arr))
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn test_hook_does_not_hold_shard_lock_typed_tree() {
let dir = tempdir().unwrap();
let key1 = 1u64.to_be_bytes();
let block_key = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let proceed = Arc::new(AtomicBool::new(false));
let hook = TypedBlockingHook {
block_key: Arc::clone(&block_key),
entered: Arc::clone(&entered),
proceed: Arc::clone(&proceed),
};
let tree = Arc::new(
TypedTree::<[u8; 8], u64, U64Codec, TypedBlockingHook>::open_hooked(
dir.path(),
Config::test(),
U64Codec,
hook,
)
.unwrap(),
);
let key2 = peer_key_on_same_shard(&key1, |k| tree.shard_for(k));
tree.put(&key1, 10).unwrap();
tree.put(&key2, 20).unwrap();
*block_key.lock() = Some(key1);
let tree2 = Arc::clone(&tree);
let (done_tx, done_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || {
let result = tree2.put(&key1, 100).map(|_| ());
let _ = done_tx.send(result);
});
wait_for_entered(&entered, "typed_tree blocking put");
let tree3 = Arc::clone(&tree);
assert_thread_completes("typed_tree same-shard peer put", move || {
tree3.put(&key2, 200).unwrap();
});
proceed.store(true, Ordering::Release);
wait_for_worker(done_rx, "typed_tree blocking put")
.expect("typed_tree blocking put should succeed");
}
#[cfg(feature = "typed-tree")]
#[test]
fn test_hook_does_not_hold_shard_lock_typed_map() {
let dir = tempdir().unwrap();
let key1 = 1u64.to_be_bytes();
let block_key = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let proceed = Arc::new(AtomicBool::new(false));
let hook = TypedBlockingHook {
block_key: Arc::clone(&block_key),
entered: Arc::clone(&entered),
proceed: Arc::clone(&proceed),
};
let map = Arc::new(
TypedMap::<[u8; 8], u64, U64Codec, TypedBlockingHook>::open_hooked(
dir.path(),
Config::test(),
U64Codec,
hook,
)
.unwrap(),
);
let key2 = peer_key_on_same_shard(&key1, |k| map.shard_for(k));
map.put(&key1, 10).unwrap();
map.put(&key2, 20).unwrap();
*block_key.lock() = Some(key1);
let map2 = Arc::clone(&map);
let (done_tx, done_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || {
let result = map2.put(&key1, 100).map(|_| ());
let _ = done_tx.send(result);
});
wait_for_entered(&entered, "typed_map blocking put");
let map3 = Arc::clone(&map);
assert_thread_completes("typed_map same-shard peer put", move || {
map3.put(&key2, 200).unwrap();
});
proceed.store(true, Ordering::Release);
wait_for_worker(done_rx, "typed_map blocking put")
.expect("typed_map blocking put should succeed");
}
#[cfg(feature = "var-collections")]
#[allow(clippy::type_complexity)]
struct VarTreeInitReenterHook {
target: Arc<Mutex<Option<Arc<VarTree<[u8; 8], VarTreeInitReenterHook>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "var-collections")]
impl WriteHook<[u8; 8]> for VarTreeInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {}
fn on_init(&self, key: &[u8; 8], _value: &[u8]) {
self.entered.store(true, Ordering::Release);
let tree = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = tree.get(key);
}
}
#[cfg(feature = "var-collections")]
#[test]
fn test_migrate_update_on_init_can_reenter_var_tree() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = VarTreeInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let tree = Arc::new(
VarTree::<[u8; 8], VarTreeInitReenterHook>::open_hooked(dir.path(), Config::test(), hook)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&tree));
let key = 1u64.to_be_bytes();
tree.put(&key, b"old").unwrap();
let tree2 = Arc::clone(&tree);
assert_thread_completes("var_tree migrate update on_init reentry", move || {
let count = tree2
.migrate(|_, _| MigrateAction::Update(armdb::ByteView::new(b"new")))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(tree.get(&key).unwrap().as_ref(), b"new");
}
#[cfg(feature = "var-collections")]
#[allow(clippy::type_complexity)]
struct VarMapInitReenterHook {
target: Arc<Mutex<Option<Arc<VarMap<[u8; 8], VarMapInitReenterHook>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "var-collections")]
impl WriteHook<[u8; 8]> for VarMapInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {}
fn on_init(&self, key: &[u8; 8], _value: &[u8]) {
self.entered.store(true, Ordering::Release);
let map = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = map.get(key);
}
}
#[cfg(feature = "var-collections")]
#[test]
fn test_migrate_update_on_init_can_reenter_var_map() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = VarMapInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let map = Arc::new(
VarMap::<[u8; 8], VarMapInitReenterHook>::open_hooked(dir.path(), Config::test(), hook)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, b"old").unwrap();
let map2 = Arc::clone(&map);
assert_thread_completes("var_map migrate update on_init reentry", move || {
let count = map2
.migrate(|_, _| MigrateAction::Update(armdb::ByteView::new(b"new")))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(map.get(&key).unwrap().as_ref(), b"new");
}
#[cfg(feature = "typed-tree")]
#[allow(clippy::type_complexity)]
struct TypedTreeInitReenterHook {
target: Arc<Mutex<Option<Arc<TypedTree<[u8; 8], u64, U64Codec, TypedTreeInitReenterHook>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for TypedTreeInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {}
fn on_init(&self, key: &[u8; 8], value: &u64) {
self.entered.store(true, Ordering::Release);
let tree = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
tree.put(key, *value + 1).unwrap();
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn test_migrate_update_on_init_can_reenter_typed_tree() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = TypedTreeInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let tree = Arc::new(
TypedTree::<[u8; 8], u64, U64Codec, TypedTreeInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
U64Codec,
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&tree));
let key = 1u64.to_be_bytes();
tree.put(&key, 10).unwrap();
let tree2 = Arc::clone(&tree);
assert_thread_completes("typed_tree migrate update on_init reentry", move || {
let count = tree2
.migrate(|_, _| armdb::MigrateAction::Update(100))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(*tree.get(&key).unwrap(), 101);
}
#[allow(clippy::type_complexity)]
struct ConstMapInitReenterHook {
target: Arc<Mutex<Option<Arc<ConstMap<[u8; 8], 8, Self>>>>>,
entered: Arc<AtomicBool>,
}
impl WriteHook<[u8; 8]> for ConstMapInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {}
fn on_init(&self, key: &[u8; 8], _value: &[u8]) {
self.entered.store(true, Ordering::Release);
let map = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = map.get(key);
}
}
#[test]
fn hook_lifecycle_migrate_update_reentry_const_map() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = ConstMapInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let map = Arc::new(
ConstMap::<[u8; 8], 8, ConstMapInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, &[0u8; 8]).unwrap();
let map2 = Arc::clone(&map);
assert_thread_completes("const_map migrate update on_init reentry", move || {
let count = map2
.migrate(|_, _| MigrateAction::Update([1u8; 8]))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(map.get(&key), Some([1u8; 8]));
}
#[test]
fn hook_lifecycle_migrate_keep_reentry_const_map() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = ConstMapInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let map = Arc::new(
ConstMap::<[u8; 8], 8, ConstMapInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, &[0u8; 8]).unwrap();
let map2 = Arc::clone(&map);
assert_thread_completes("const_map migrate keep on_init reentry", move || {
let count = map2.migrate(|_, _| MigrateAction::Keep).unwrap();
assert_eq!(count, 0);
});
assert!(entered.load(Ordering::Acquire));
}
#[allow(clippy::type_complexity)]
struct ConstTreeInitReenterHook {
target: Arc<Mutex<Option<Arc<ConstTree<[u8; 8], 8, Self>>>>>,
entered: Arc<AtomicBool>,
}
impl WriteHook<[u8; 8]> for ConstTreeInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {}
fn on_init(&self, key: &[u8; 8], _value: &[u8]) {
self.entered.store(true, Ordering::Release);
let tree = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = tree.get(key);
}
}
#[test]
fn hook_lifecycle_migrate_update_reentry_const_tree() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = ConstTreeInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let tree = Arc::new(
ConstTree::<[u8; 8], 8, ConstTreeInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&tree));
let key = 1u64.to_be_bytes();
tree.put(&key, &[0u8; 8]).unwrap();
let tree2 = Arc::clone(&tree);
assert_thread_completes("const_tree migrate update on_init reentry", move || {
let count = tree2
.migrate(|_, _| MigrateAction::Update([1u8; 8]))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(tree.get(&key), Some([1u8; 8]));
}
#[cfg(feature = "typed-tree")]
#[allow(clippy::type_complexity)]
struct ZeroMapInitReenterHook {
target: Arc<Mutex<Option<Arc<ZeroMap<[u8; 8], 8, u64, Self>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for ZeroMapInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {}
fn on_init(&self, key: &[u8; 8], _value: &u64) {
self.entered.store(true, Ordering::Release);
let map = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = map.get(key);
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn hook_lifecycle_migrate_update_reentry_zero_map() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = ZeroMapInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let map = Arc::new(
ZeroMap::<[u8; 8], 8, u64, ZeroMapInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, &42u64).unwrap();
let map2 = Arc::clone(&map);
assert_thread_completes("zero_map migrate update on_init reentry", move || {
let count = map2.migrate(|_, _| MigrateAction::Update(100u64)).unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(map.get(&key).unwrap(), 100u64);
}
#[cfg(feature = "typed-tree")]
#[allow(clippy::type_complexity)]
struct ZeroTreeInitReenterHook {
target: Arc<Mutex<Option<Arc<ZeroTree<[u8; 8], 8, u64, Self>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for ZeroTreeInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {}
fn on_init(&self, key: &[u8; 8], _value: &u64) {
self.entered.store(true, Ordering::Release);
let tree = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = tree.get(key);
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn hook_lifecycle_migrate_update_reentry_zero_tree() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = ZeroTreeInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let tree = Arc::new(
ZeroTree::<[u8; 8], 8, u64, ZeroTreeInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&tree));
let key = 1u64.to_be_bytes();
tree.put(&key, &42u64).unwrap();
let tree2 = Arc::clone(&tree);
assert_thread_completes("zero_tree migrate update on_init reentry", move || {
let count = tree2.migrate(|_, _| MigrateAction::Update(100u64)).unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(tree.get(&key).unwrap(), 100u64);
}
#[cfg(feature = "typed-tree")]
#[allow(clippy::type_complexity)]
struct TypedMapInitReenterHook {
target: Arc<Mutex<Option<Arc<TypedMap<[u8; 8], u64, U64Codec, Self>>>>>,
entered: Arc<AtomicBool>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for TypedMapInitReenterHook {
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {}
fn on_init(&self, key: &[u8; 8], value: &u64) {
self.entered.store(true, Ordering::Release);
let map = self
.target
.lock()
.as_ref()
.expect("hook target must be installed")
.clone();
map.put(key, *value + 1).unwrap();
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn hook_lifecycle_migrate_update_reentry_typed_map() {
let dir = tempdir().unwrap();
let target = Arc::new(Mutex::new(None));
let entered = Arc::new(AtomicBool::new(false));
let hook = TypedMapInitReenterHook {
target: Arc::clone(&target),
entered: Arc::clone(&entered),
};
let map = Arc::new(
TypedMap::<[u8; 8], u64, U64Codec, TypedMapInitReenterHook>::open_hooked(
dir.path(),
Config::test(),
U64Codec,
hook,
)
.unwrap(),
);
*target.lock() = Some(Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, 10).unwrap();
let map2 = Arc::clone(&map);
assert_thread_completes("typed_map migrate update on_init reentry", move || {
let count = map2
.migrate(|_, _| armdb::MigrateAction::Update(100))
.unwrap();
assert_eq!(count, 1);
});
assert!(entered.load(Ordering::Acquire));
assert_eq!(*map.get(&key).unwrap(), 101);
}
struct AtomicNoHooksWriteHook {
writes: Arc<std::sync::atomic::AtomicUsize>,
}
impl WriteHook<[u8; 8]> for AtomicNoHooksWriteHook {
const NEEDS_OLD_VALUE: bool = false;
const NEEDS_WRITE: bool = true;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {
self.writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[test]
fn hook_lifecycle_atomic_no_hooks_const_map() {
let dir = tempdir().unwrap();
let writes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let hook = AtomicNoHooksWriteHook {
writes: Arc::clone(&writes),
};
let map = ConstMap::<[u8; 8], 8, AtomicNoHooksWriteHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap();
let key = 1u64.to_be_bytes();
map.put(&key, &[0u8; 8]).unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
map.atomic(&key, |shard| {
shard.put(&key, &[1u8; 8])?;
Ok(())
})
.unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(map.get(&key), Some([1u8; 8]));
}
#[test]
fn hook_lifecycle_atomic_no_hooks_const_tree() {
let dir = tempdir().unwrap();
let writes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let hook = AtomicNoHooksWriteHook {
writes: Arc::clone(&writes),
};
let tree = ConstTree::<[u8; 8], 8, AtomicNoHooksWriteHook>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap();
let key = 1u64.to_be_bytes();
tree.put(&key, &[0u8; 8]).unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
tree.atomic(&key, |shard| {
shard.put(&key, &[1u8; 8])?;
Ok(())
})
.unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(tree.get(&key), Some([1u8; 8]));
}
#[cfg(feature = "typed-tree")]
struct AtomicNoHooksTypedHook {
writes: Arc<std::sync::atomic::AtomicUsize>,
}
#[cfg(feature = "typed-tree")]
impl TypedWriteHook<[u8; 8], u64> for AtomicNoHooksTypedHook {
const NEEDS_OLD_VALUE: bool = false;
const NEEDS_WRITE: bool = true;
fn on_write(&self, _key: &[u8; 8], _old: Option<&u64>, _new: Option<&u64>) {
self.writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(feature = "typed-tree")]
#[test]
fn hook_lifecycle_atomic_no_hooks_typed_map() {
let dir = tempdir().unwrap();
let writes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let hook = AtomicNoHooksTypedHook {
writes: Arc::clone(&writes),
};
let map = TypedMap::<[u8; 8], u64, U64Codec, AtomicNoHooksTypedHook>::open_hooked(
dir.path(),
Config::test(),
U64Codec,
hook,
)
.unwrap();
let key = 1u64.to_be_bytes();
map.put(&key, 10).unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
map.atomic(&key, |shard| {
shard.put(&key, 20)?;
Ok(())
})
.unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(*map.get(&key).unwrap(), 20);
}
#[cfg(feature = "var-collections")]
struct AtomicNoHooksVarHook {
writes: Arc<std::sync::atomic::AtomicUsize>,
}
#[cfg(feature = "var-collections")]
impl WriteHook<[u8; 8]> for AtomicNoHooksVarHook {
const NEEDS_OLD_VALUE: bool = false;
const NEEDS_WRITE: bool = true;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {
self.writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(feature = "var-collections")]
#[test]
fn hook_lifecycle_atomic_no_hooks_var_tree() {
let dir = tempdir().unwrap();
let writes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let hook = AtomicNoHooksVarHook {
writes: Arc::clone(&writes),
};
let tree =
VarTree::<[u8; 8], AtomicNoHooksVarHook>::open_hooked(dir.path(), Config::test(), hook)
.unwrap();
let key = 1u64.to_be_bytes();
tree.put(&key, b"hello").unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
tree.atomic(&key, |shard| {
shard.put(&key, b"world")?;
Ok(())
})
.unwrap();
assert_eq!(writes.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(tree.get(&key).unwrap().as_ref(), b"world");
}
#[cfg(feature = "var-collections")]
#[allow(clippy::type_complexity)]
struct NoOldCasHook {
events: Arc<Mutex<Vec<(Option<Vec<u8>>, Option<Vec<u8>>)>>>,
}
#[cfg(feature = "var-collections")]
impl WriteHook<[u8; 8]> for NoOldCasHook {
const NEEDS_OLD_VALUE: bool = false;
fn on_write(&self, _key: &[u8; 8], old: Option<&[u8]>, new: Option<&[u8]>) {
self.events
.lock()
.push((old.map(|v| v.to_vec()), new.map(|v| v.to_vec())));
}
}
#[cfg(feature = "var-collections")]
fn run_cas_respects_needs_old_value_false_var_tree() {
let dir = tempdir().unwrap();
let events = Arc::new(Mutex::new(Vec::new()));
let hook = NoOldCasHook {
events: Arc::clone(&events),
};
let tree =
VarTree::<[u8; 8], NoOldCasHook>::open_hooked(dir.path(), Config::test(), hook).unwrap();
let key = 1u64.to_be_bytes();
tree.put(&key, b"v1").unwrap();
events.lock().clear();
tree.cas(&key, b"v1", b"v2").unwrap();
let evs = events.lock();
assert_eq!(evs.len(), 1);
assert_eq!(
evs[0].0, None,
"old must be None when NEEDS_OLD_VALUE=false"
);
assert_eq!(evs[0].1, Some(b"v2".to_vec()));
drop(evs);
events.lock().clear();
tree.update(&key, |_| armdb::ByteView::new(b"v3")).unwrap();
let evs = events.lock();
assert_eq!(evs.len(), 1);
assert_eq!(
evs[0].0, None,
"old must be None when NEEDS_OLD_VALUE=false"
);
}
#[cfg(feature = "var-collections")]
#[test]
fn test_cas_respects_needs_old_value_false_var_tree() {
run_cas_respects_needs_old_value_false_var_tree();
}
#[cfg(feature = "var-collections")]
#[test]
fn test_cas_respects_needs_old_value_false_var_map() {
let dir = tempdir().unwrap();
let events = Arc::new(Mutex::new(Vec::new()));
let hook = NoOldCasHook {
events: Arc::clone(&events),
};
let map =
VarMap::<[u8; 8], NoOldCasHook>::open_hooked(dir.path(), Config::test(), hook).unwrap();
let key = 1u64.to_be_bytes();
map.put(&key, b"v1").unwrap();
events.lock().clear();
map.cas(&key, b"v1", b"v2").unwrap();
let evs = events.lock();
assert_eq!(evs.len(), 1);
assert_eq!(
evs[0].0, None,
"old must be None when NEEDS_OLD_VALUE=false"
);
assert_eq!(evs[0].1, Some(b"v2".to_vec()));
drop(evs);
events.lock().clear();
map.update(&key, |_| armdb::ByteView::new(b"v3")).unwrap();
let evs = events.lock();
assert_eq!(evs.len(), 1);
assert_eq!(
evs[0].0, None,
"old must be None when NEEDS_OLD_VALUE=false"
);
}