use std::collections::HashMap;
use std::hash::Hash;
use std::mem::size_of;
use crate::Key;
use crate::compaction::{CompactionIndex, compact_shard};
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::durability::{Bitcask, Durability, DurabilityInner};
use crate::engine::Engine;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, WriteHook};
use crate::key::Location;
use crate::recovery::recover_const_map;
use crate::sync::{self, Mutex, MutexGuard};
pub(crate) struct MapEntry<const V: usize, L: Location> {
pub(crate) loc: L,
pub(crate) value: [u8; V],
}
pub struct ConstMap<
K: Key + Send + Sync + Hash + Eq,
const V: usize,
H: WriteHook<K> = NoHook,
D: Durability = Bitcask,
> {
indexes: Vec<Mutex<HashMap<K, MapEntry<V, D::Loc>>>>,
durability: D,
shard_prefix_bits: usize,
hook: H,
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize> ConstMap<K, V, NoHook, Bitcask> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
Self::open_inner(path, config, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>> ConstMap<K, V, H, Bitcask> {
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
hook: H,
) -> DbResult<Self> {
Self::open_inner(path, config, hook)
}
fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
let compaction_threshold = config.compaction_threshold;
let shard_prefix_bits = config.shard_prefix_bits;
let engine = Engine::open(path, config)?;
let shard_count = engine.shards().len();
let mut indexes = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
indexes.push(Mutex::new(HashMap::new()));
}
let durability = Bitcask {
engine,
compaction_threshold,
};
let map = Self {
indexes,
durability,
shard_prefix_bits,
hook,
};
let shard_dirs = map.durability.engine.shard_dirs();
let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
let shard_ids = map.durability.engine.shard_ids();
let hints = map.durability.engine.hints();
let outcome = recover_const_map::<K, V>(
&shard_dir_refs,
&shard_ids,
map.indexes(),
hints,
#[cfg(feature = "encryption")]
map.durability.engine.cipher(),
)?;
for tail in &outcome.active_tails {
map.durability.engine.shards()[tail.shard_idx].apply_recovery_tail(tail)?;
}
for (shard_idx, dead) in outcome.shard_dead_bytes {
map.durability.engine.shards()[shard_idx].install_dead_bytes(dead);
}
let max_gsn = outcome.max_gsn;
map.durability
.engine
.gsn()
.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
if hints {
for shard in map.durability.engine.shards().iter() {
shard.set_key_len(size_of::<K>());
}
}
tracing::info!(
key_size = size_of::<K>(),
V,
entries = map.len(),
"const_map recovered"
);
Ok(map)
}
pub fn close(self) -> DbResult<()> {
if self.durability.engine.hints() {
self.sync_hints()?;
}
self.durability.engine.flush()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.durability.engine.flush_buffers()
}
pub fn config(&self) -> &Config {
self.durability.engine.config()
}
pub fn compact(&self) -> DbResult<usize> {
let mut total_compacted = 0;
for shard in self.durability.engine.shards().iter() {
total_compacted += compact_shard(shard, self, self.durability.compaction_threshold)?;
}
Ok(total_compacted)
}
pub fn sync_hints(&self) -> DbResult<()> {
for shard in self.durability.engine.shards().iter() {
shard.write_active_hint(size_of::<K>())?;
}
Ok(())
}
pub(crate) fn indexes(&self) -> &[Mutex<HashMap<K, MapEntry<V, DiskLoc>>>] {
&self.indexes
}
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>> CompactionIndex<K>
for ConstMap<K, V, H, Bitcask>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
if let Some(entry) = index.get_mut(key)
&& entry.loc == old_loc
{
entry.loc = new_loc;
return true;
}
false
}
fn contains_key(&self, key: &K) -> bool {
self.contains(key)
}
}
use crate::durability::Fixed;
use crate::fixed::config::FixedConfig;
impl<K: Key + Send + Sync + Hash + Eq, const V: usize> ConstMap<K, V, NoHook, Fixed> {
pub fn open(path: impl AsRef<std::path::Path>, config: FixedConfig) -> DbResult<Self> {
Self::open_fixed_inner(path, config, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>> ConstMap<K, V, H, Fixed> {
pub fn open_with_hook(
path: impl AsRef<std::path::Path>,
config: FixedConfig,
hook: H,
) -> DbResult<Self> {
Self::open_fixed_inner(path, config, hook)
}
fn open_fixed_inner(
path: impl AsRef<std::path::Path>,
config: FixedConfig,
hook: H,
) -> DbResult<Self> {
let shard_prefix_bits = config.shard_prefix_bits;
let dur = Fixed::open(path, config, std::mem::size_of::<K>(), V)?;
let shard_count = dur.shard_count();
let mut indexes = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
indexes.push(Mutex::new(HashMap::new()));
}
let total_recovered =
dur.recover_entries(|shard_idx, key_bytes, value_bytes, slot_id| {
let key = K::from_bytes(key_bytes);
let mut value = [0u8; V];
value.copy_from_slice(value_bytes);
sync::lock(&indexes[shard_idx]).insert(
key,
MapEntry {
loc: slot_id,
value,
},
);
})?;
tracing::info!(
key_size = std::mem::size_of::<K>(),
V,
entries = total_recovered,
"fixed_map recovered"
);
Ok(Self {
indexes,
durability: dur,
shard_prefix_bits,
hook,
})
}
pub fn close(self) -> DbResult<()> {
self.durability.close()
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>> ConstMap<K, V, H, Fixed> {
pub(crate) fn fixed_durability(&self) -> &Fixed {
&self.durability
}
pub fn fixed_engine_access(
&self,
) -> std::sync::Arc<dyn crate::fixed_replication::FixedEngineAccess> {
self.durability.engine.clone()
}
pub(crate) fn get_slot_id(&self, key: &K) -> Option<u32> {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.get(key).map(|e| e.loc)
}
pub(crate) fn remove_key_if_slot_matches(&self, key: &K, slot_id: u32) -> bool {
let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
match index.get(key) {
Some(entry) if entry.loc == slot_id => {
index.remove(key);
true
}
_ => false,
}
}
pub(crate) fn upsert_replicated(&self, key: &K, value: [u8; V], slot_id: u32) {
let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
index.insert(
*key,
MapEntry {
loc: slot_id,
value,
},
);
}
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>, D: Durability>
ConstMap<K, V, H, D>
{
pub fn get(&self, key: &K) -> Option<[u8; V]> {
metrics::counter!("armdb.ops", "op" => "get", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.get");
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.get(key).map(|e| e.value)
}
pub fn get_or_err(&self, key: &K) -> DbResult<[u8; V]> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn put(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "put", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.put");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let old = self.put_locked(shard_id, &mut *inner, &mut index, key, value)?;
let needs_sync = inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, old.as_ref().map(|v| &v[..]), Some(&value[..]));
Ok(old)
}
pub fn insert(&self, key: &K, value: &[u8; V]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "insert", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.insert");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
self.insert_locked(shard_id, &mut *inner, &mut index, key, value)?;
let needs_sync = inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook.on_write(key, None, Some(&value[..]));
Ok(())
}
pub fn delete(&self, key: &K) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "delete", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.delete");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let old = self.delete_locked(shard_id, &mut *inner, &mut index, key)?;
let needs_sync = inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
if let Some(ref old_val) = old {
self.hook.on_write(key, Some(&old_val[..]), None);
}
Ok(old)
}
pub fn cas(&self, key: &K, expected: &[u8; V], new_value: &[u8; V]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "cas", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.cas");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let entry = index.get(key).ok_or(DbError::KeyNotFound)?;
if entry.value != *expected {
return Err(DbError::CasMismatch);
}
let old_loc = entry.loc;
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), new_value)?;
index.insert(
*key,
MapEntry {
loc: new_loc,
value: *new_value,
},
);
let needs_sync = inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, Some(&expected[..]), Some(&new_value[..]));
Ok(())
}
pub fn update(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
) -> DbResult<Option<[u8; V]>> {
self.update_inner(key, f, false)
}
pub fn fetch_update(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
) -> DbResult<Option<[u8; V]>> {
self.update_inner(key, f, true)
}
fn update_inner(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
return_old: bool,
) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "update", "tree" => "const_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_map.update");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let entry = match index.get(key) {
Some(e) => e,
None => return Ok(None),
};
let old_value = entry.value;
let old_loc = entry.loc;
let new_value = f(&old_value);
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), &new_value)?;
index.insert(
*key,
MapEntry {
loc: new_loc,
value: new_value,
},
);
let needs_sync = inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, Some(&old_value[..]), Some(&new_value[..]));
Ok(Some(if return_old { old_value } else { new_value }))
}
pub fn contains(&self, key: &K) -> bool {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.contains_key(key)
}
pub fn len(&self) -> usize {
self.indexes.iter().map(|m| sync::lock(m).len()).sum()
}
pub fn is_empty(&self) -> bool {
self.indexes.iter().all(|m| sync::lock(m).is_empty())
}
fn put_no_hook(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let result = self.put_locked(shard_id, &mut *inner, &mut index, key, value);
let needs_sync = result.is_ok() && inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
result
}
fn delete_no_hook(&self, key: &K) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let mut index = sync::lock(&self.indexes[shard_id]);
let result = self.delete_locked(shard_id, &mut *inner, &mut index, key);
let needs_sync = result.is_ok() && inner.should_sync();
drop(index);
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
result
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut ConstMapShard<'_, K, V, H, D>) -> DbResult<R>,
) -> DbResult<R> {
let shard_id = self.shard_for(shard_key);
let inner = self.durability.lock_shard(shard_id);
let index = sync::lock(&self.indexes[shard_id]);
let mut shard = ConstMapShard {
tree: self,
inner,
index,
shard_id,
};
let result = f(&mut shard);
let needs_sync = shard.inner.should_sync();
drop(shard);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
result
}
fn put_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
index: &mut HashMap<K, MapEntry<V, D::Loc>>,
key: &K,
value: &[u8; V],
) -> DbResult<Option<[u8; V]>> {
if let Some(existing) = index.get(key) {
let old_value = existing.value;
let old_loc = existing.loc;
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), value)?;
index.insert(
*key,
MapEntry {
loc: new_loc,
value: *value,
},
);
return Ok(Some(old_value));
}
let loc = inner.write_new(shard_id as u8, key.as_bytes(), value)?;
index.insert(*key, MapEntry { loc, value: *value });
Ok(None)
}
fn insert_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
index: &mut HashMap<K, MapEntry<V, D::Loc>>,
key: &K,
value: &[u8; V],
) -> DbResult<()> {
if index.contains_key(key) {
return Err(DbError::KeyExists);
}
let loc = inner.write_new(shard_id as u8, key.as_bytes(), value)?;
index.insert(*key, MapEntry { loc, value: *value });
Ok(())
}
fn delete_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
index: &mut HashMap<K, MapEntry<V, D::Loc>>,
key: &K,
) -> DbResult<Option<[u8; V]>> {
let old = match index.get(key) {
Some(old) => old,
None => return Ok(None),
};
inner.write_tombstone(shard_id as u8, old.loc, key.as_bytes())?;
let value = old.value;
index.remove(key);
Ok(Some(value))
}
pub fn shard_for(&self, key: &K) -> usize {
if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
return (hash as usize) % self.durability.shard_count();
}
let full_bytes = self.shard_prefix_bits / 8;
let extra_bits = self.shard_prefix_bits % 8;
let hash = if extra_bits == 0 {
xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
} else {
let mut buf = K::zeroed();
buf.as_bytes_mut()[..full_bytes].copy_from_slice(&key.as_bytes()[..full_bytes]);
let mask = !((1u8 << (8 - extra_bits)) - 1);
buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
};
(hash as usize) % self.durability.shard_count()
}
pub fn flush(&self) -> DbResult<()> {
self.durability.flush()
}
pub fn migrate(
&self,
f: impl Fn(&K, &[u8; V]) -> crate::MigrateAction<[u8; V]>,
) -> DbResult<usize> {
use crate::MigrateAction;
let mut count = 0;
for i in 0..self.durability.shard_count() {
let entries: Vec<(K, [u8; V])> = {
let index = sync::lock(&self.indexes[i]);
index.iter().map(|(k, e)| (*k, e.value)).collect()
};
for (key, value) in entries {
match f(&key, &value) {
MigrateAction::Keep => {
if H::NEEDS_INIT {
self.hook.on_init(&key, &value[..]);
}
}
MigrateAction::Update(new_value) => {
self.put_no_hook(&key, &new_value)?;
if H::NEEDS_INIT {
self.hook.on_init(&key, &new_value[..]);
}
count += 1;
}
MigrateAction::Delete => {
self.delete_no_hook(&key)?;
count += 1;
}
}
}
}
tracing::info!(mutations = count, "const_map migration complete");
Ok(count)
}
pub(crate) fn replay_init(&self) {
if !H::NEEDS_INIT {
return;
}
for shard in &self.indexes {
let entries: Vec<(K, [u8; V])> = {
let index = sync::lock(shard);
index.iter().map(|(k, e)| (*k, e.value)).collect()
};
for (key, value) in entries {
self.hook.on_init(&key, &value[..]);
}
}
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>>
crate::replication::ReplicationTarget for ConstMap<K, V, H, Bitcask>
{
fn apply_entry(
&self,
_shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
use crate::replication::ApplyOutcome;
let key = K::from_bytes(key);
let value_offset =
entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64;
let disk = DiskLoc::new(shard_id, file_id, value_offset as u32, header.value_len);
if header.is_tombstone() {
let old = sync::lock(&self.indexes[self.shard_for(&key)]).remove(&key);
match old {
Some(old_entry) => Ok(ApplyOutcome::TombstoneRemoved(old_entry.loc)),
None => Ok(ApplyOutcome::Inserted), }
} else {
let value: [u8; V] = value.try_into().map_err(|_| DbError::CorruptedEntry {
offset: entry_offset,
})?;
let old = sync::lock(&self.indexes[self.shard_for(&key)])
.insert(key, MapEntry { loc: disk, value });
match old {
Some(old_entry) => Ok(ApplyOutcome::Replaced(old_entry.loc)),
None => Ok(ApplyOutcome::Inserted),
}
}
}
fn try_apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
use crate::replication::ApplyOutcome;
let value_len = header.value_len as usize;
if raw_after_header.len() < size_of::<K>() + value_len {
return Ok(ApplyOutcome::NotMatched);
}
let key = &raw_after_header[..size_of::<K>()];
let value = &raw_after_header[size_of::<K>()..size_of::<K>() + value_len];
let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
if crc != header.crc32 {
return Ok(ApplyOutcome::NotMatched);
}
self.apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
key,
value,
)
}
fn key_len(&self) -> usize {
size_of::<K>()
}
}
pub struct ConstMapShard<
'a,
K: Key + Send + Sync + Hash + Eq,
const V: usize,
H: WriteHook<K> = NoHook,
D: Durability = Bitcask,
> {
tree: &'a ConstMap<K, V, H, D>,
inner: MutexGuard<'a, D::Inner>,
index: MutexGuard<'a, HashMap<K, MapEntry<V, D::Loc>>>,
shard_id: usize,
}
impl<K: Key + Send + Sync + Hash + Eq, const V: usize, H: WriteHook<K>, D: Durability>
ConstMapShard<'_, K, V, H, D>
{
pub fn put(&mut self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
self.check_shard(key)?;
self.tree
.put_locked(self.shard_id, &mut *self.inner, &mut self.index, key, value)
}
pub fn insert(&mut self, key: &K, value: &[u8; V]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.insert_locked(self.shard_id, &mut *self.inner, &mut self.index, key, value)
}
pub fn delete(&mut self, key: &K) -> DbResult<Option<[u8; V]>> {
self.check_shard(key)?;
self.tree
.delete_locked(self.shard_id, &mut *self.inner, &mut self.index, key)
}
pub fn get(&self, key: &K) -> Option<[u8; V]> {
self.index.get(key).map(|e| e.value)
}
pub fn get_or_err(&self, key: &K) -> DbResult<[u8; V]> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.index.contains_key(key)
}
fn check_shard(&self, key: &K) -> DbResult<()> {
if self.tree.shard_for(key) != self.shard_id {
return Err(DbError::ShardMismatch);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::time::Duration;
use tempfile::tempdir;
#[allow(clippy::type_complexity)]
struct ReplayInitReenterHook<const V: usize> {
target: sync::Mutex<Option<std::sync::Arc<ConstMap<[u8; 8], V, Self>>>>,
entered: AtomicBool,
}
impl<const V: usize> WriteHook<[u8; 8]> for ReplayInitReenterHook<V> {
const NEEDS_OLD_VALUE: bool = false;
const NEEDS_INIT: bool = true;
const NEEDS_WRITE: bool = false;
fn on_write(&self, _key: &[u8; 8], _old: Option<&[u8]>, _new: Option<&[u8]>) {}
fn on_init(&self, key: &[u8; 8], _value: &[u8]) {
self.entered.store(true, Ordering::Release);
let map = sync::lock(&self.target)
.as_ref()
.expect("hook target must be installed")
.clone();
let _ = map.get(key);
}
}
#[test]
fn hook_lifecycle_replay_init_reentry_const_map() {
let dir = tempdir().unwrap();
let hook = ReplayInitReenterHook::<8> {
target: sync::Mutex::new(None),
entered: AtomicBool::new(false),
};
let map = std::sync::Arc::new(
ConstMap::<[u8; 8], 8, ReplayInitReenterHook<8>>::open_hooked(
dir.path(),
Config::test(),
hook,
)
.unwrap(),
);
*sync::lock(&map.hook.target) = Some(std::sync::Arc::clone(&map));
let key = 1u64.to_be_bytes();
map.put(&key, &[0u8; 8]).unwrap();
let (tx, rx) = mpsc::sync_channel(1);
let map2 = std::sync::Arc::clone(&map);
std::thread::spawn(move || {
map2.replay_init();
let _ = tx.send(());
});
rx.recv_timeout(Duration::from_secs(2))
.expect("replay_init deadlocked — on_init called with index lock held");
assert!(map.hook.entered.load(Ordering::Acquire));
}
}