use std::collections::HashMap;
use std::hash::Hash;
use std::mem::size_of;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use crate::Key;
use crate::byte_view::ByteView;
use crate::cache::{BlockCache, BlockKey};
use crate::compaction::{CompactionIndex, compact_shard};
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::engine::Engine;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, WriteHook};
use crate::io::aligned_buf::AlignedBuf;
use crate::recovery::recover_var_map;
use crate::shard::ShardInner;
use crate::sync::{self, Mutex, MutexGuard};
const MAX_STALE_RETRIES: usize = 3;
pub struct VarMap<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K> = NoHook> {
indexes: Vec<Mutex<HashMap<K, DiskLoc>>>,
engine: Engine,
cache: BlockCache,
compaction_threshold: f64,
shard_prefix_bits: usize,
hook: H,
entry_count: AtomicUsize,
}
impl<K: Key + Send + Sync + Hash + Eq> VarMap<K> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
Self::open_inner(path, config, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> VarMap<K, H> {
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
hook: H,
) -> DbResult<Self> {
Self::open_inner(path, config, hook)
}
fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
let compaction_threshold = config.compaction_threshold;
let shard_prefix_bits = config.shard_prefix_bits;
let cache = BlockCache::new(&config.cache);
let engine = Engine::open(path, config)?;
let shard_count = engine.shards().len();
let mut indexes = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
indexes.push(Mutex::new(HashMap::new()));
}
let map = Self {
indexes,
engine,
cache,
compaction_threshold,
shard_prefix_bits,
hook,
entry_count: AtomicUsize::new(0),
};
let shard_dirs = map.engine.shard_dirs();
let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
let shard_ids = map.engine.shard_ids();
let hints = map.engine.hints();
let outcome = recover_var_map::<K>(
&shard_dir_refs,
&shard_ids,
map.indexes(),
hints,
#[cfg(feature = "encryption")]
map.engine.cipher(),
)?;
for tail in &outcome.active_tails {
map.engine.shards()[tail.shard_idx].apply_recovery_tail(tail)?;
}
for (shard_idx, dead) in outcome.shard_dead_bytes {
map.engine.shards()[shard_idx].install_dead_bytes(dead);
}
let max_gsn = outcome.max_gsn;
let initial_len: usize = map.indexes.iter().map(|m| sync::lock(m).len()).sum();
map.entry_count.store(initial_len, AtomicOrdering::Relaxed);
map.engine
.gsn()
.fetch_max(max_gsn + 1, AtomicOrdering::Relaxed);
if hints {
for shard in map.engine.shards().iter() {
shard.set_key_len(size_of::<K>());
}
}
tracing::info!(
key_size = size_of::<K>(),
entries = map.len(),
"var_map recovered"
);
Ok(map)
}
pub fn close(self) -> DbResult<()> {
if self.engine.hints() {
self.sync_hints()?;
}
self.engine.flush()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.engine.flush_buffers()
}
pub fn config(&self) -> &Config {
self.engine.config()
}
}
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> CompactionIndex<K> for VarMap<K, H> {
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
if let Some(disk) = index.get_mut(key)
&& *disk == old_loc
{
*disk = new_loc;
return true;
}
false
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.cache.invalidate_file(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.contains(key)
}
}
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> VarMap<K, H> {
pub fn compact(&self) -> DbResult<usize> {
let mut total_compacted = 0;
for shard in self.engine.shards().iter() {
total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
}
Ok(total_compacted)
}
pub fn get(&self, key: &K) -> Option<ByteView> {
metrics::counter!("armdb.ops", "op" => "get", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.get");
let shard_id = self.shard_for(key);
for _ in 0..MAX_STALE_RETRIES {
let disk = {
let index = sync::lock(&self.indexes[shard_id]);
match index.get(key) {
Some(d) => *d,
None => return None,
}
};
match self.read_value_cached_inner(&disk) {
Ok(v) => return Some(v),
Err(DbError::StaleDiskLoc) => {
metrics::counter!("armdb.read.stale_retry", "tree" => "var_map").increment(1);
continue;
}
Err(_e) => {
#[cfg(feature = "hot-path-tracing")]
tracing::error!("VarMap read_value_cached error: {:?}", _e);
return None;
}
}
}
None
}
pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn put(&self, key: &K, value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "put", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.put");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let old_value = if H::NEEDS_OLD_VALUE {
if let Some(disk) = index.get(key) {
Some(self.read_value_locked_result(disk, &inner)?)
} else {
None
}
} else {
None
};
self.put_locked(shard_id, &mut inner, &mut index, key, value)?;
drop(index);
drop(inner);
self.hook.on_write(key, old_value.as_deref(), Some(value));
Ok(())
}
pub fn insert(&self, key: &K, value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "insert", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.insert");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
self.insert_locked(shard_id, &mut inner, &mut index, key, value)?;
drop(index);
drop(inner);
self.hook.on_write(key, None, Some(value));
Ok(())
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
metrics::counter!("armdb.ops", "op" => "delete", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.delete");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let old_value = if H::NEEDS_OLD_VALUE {
if let Some(disk) = index.get(key) {
Some(self.read_value_locked_result(disk, &inner)?)
} else {
None
}
} else {
None
};
let existed = self.delete_locked(shard_id, &mut inner, &mut index, key)?;
drop(index);
drop(inner);
if existed {
self.hook.on_write(key, old_value.as_deref(), None);
}
Ok(existed)
}
pub fn cas(&self, key: &K, expected: &[u8], new_value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "cas", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.cas");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let disk = *index.get(key).ok_or(DbError::KeyNotFound)?;
let current = self
.read_value_locked(&disk, &inner)
.ok_or(DbError::KeyNotFound)?;
if current.as_ref() != expected {
return Err(DbError::CasMismatch);
}
let (new_disk_loc, _gsn) =
inner.append_entry(shard_id as u8, key.as_bytes(), new_value, false)?;
inner.add_dead_bytes(
disk.file_id,
crate::entry::entry_size(size_of::<K>(), disk.len),
);
let _old = index.insert(*key, new_disk_loc);
debug_assert!(_old.is_some(), "cas: key must exist in index");
drop(index);
drop(inner);
self.hook.on_write(
key,
if H::NEEDS_OLD_VALUE {
Some(&*current)
} else {
None
},
Some(new_value),
);
Ok(())
}
pub fn update(&self, key: &K, f: impl FnOnce(&[u8]) -> ByteView) -> DbResult<Option<ByteView>> {
self.update_inner(key, f, false)
}
pub fn fetch_update(
&self,
key: &K,
f: impl FnOnce(&[u8]) -> ByteView,
) -> DbResult<Option<ByteView>> {
self.update_inner(key, f, true)
}
pub(crate) fn try_update_inner(
&self,
key: &K,
f: impl FnOnce(&[u8]) -> DbResult<Option<ByteView>>,
return_old: bool,
) -> DbResult<Option<ByteView>> {
metrics::counter!("armdb.ops", "op" => "update", "tree" => "var_map").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_map.update");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let disk = match index.get(key) {
Some(d) => *d,
None => return Ok(None),
};
let current = match self.read_value_locked(&disk, &inner) {
Some(v) => v,
None => return Ok(None),
};
let new_value = match f(¤t)? {
Some(v) => v,
None => return Ok(Some(current)),
};
let (new_disk_loc, _gsn) =
inner.append_entry(shard_id as u8, key.as_bytes(), &new_value, false)?;
inner.add_dead_bytes(
disk.file_id,
crate::entry::entry_size(size_of::<K>(), disk.len),
);
let _old = index.insert(*key, new_disk_loc);
debug_assert!(_old.is_some(), "update: key must exist in index");
drop(index);
drop(inner);
self.hook.on_write(
key,
if H::NEEDS_OLD_VALUE {
Some(&*current)
} else {
None
},
Some(&*new_value),
);
Ok(Some(if return_old { current } else { new_value }))
}
fn update_inner(
&self,
key: &K,
f: impl FnOnce(&[u8]) -> ByteView,
return_old: bool,
) -> DbResult<Option<ByteView>> {
self.try_update_inner(key, |bytes| Ok(Some(f(bytes))), return_old)
}
pub fn contains(&self, key: &K) -> bool {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.contains_key(key)
}
pub fn entry_len(&self, key: &K) -> Option<u32> {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.get(key).map(|disk| disk.len)
}
pub fn len(&self) -> usize {
self.entry_count.load(AtomicOrdering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn sync_hints(&self) -> DbResult<()> {
for shard in self.engine.shards().iter() {
shard.write_active_hint(size_of::<K>())?;
}
Ok(())
}
pub fn warmup(&self) -> DbResult<()> {
use std::collections::BTreeSet;
let mut blocks: BTreeSet<(u8, u32, u64)> = BTreeSet::new();
for index in self.indexes.iter() {
let map = sync::lock(index);
for disk in map.values() {
let block_offset = disk.offset as u64 & !4095;
blocks.insert((disk.shard_id, disk.file_id, block_offset));
}
}
for (shard_id, file_id, block_offset) in &blocks {
let key = BlockKey {
shard_id: *shard_id,
file_id: *file_id,
block_offset: *block_offset,
};
if self.cache.get(&key).is_some() {
continue;
}
let shard = &self.engine.shards()[*shard_id as usize];
let (buf, is_full_block) = shard.read_block(*file_id, *block_offset)?;
if is_full_block {
self.cache.insert(key, Arc::new(buf));
}
}
Ok(())
}
pub fn migrate(
&self,
f: impl Fn(&K, &[u8]) -> crate::MigrateAction<ByteView>,
) -> DbResult<usize> {
use crate::MigrateAction;
let mut count = 0;
for i in 0..self.engine.shards().len() {
let keys: Vec<K> = {
let index = sync::lock(&self.indexes[i]);
index.keys().copied().collect()
};
for key in keys {
let value = match self.get(&key) {
Some(v) => v,
None => {
tracing::warn!(
key = ?key.as_bytes(),
"var_map migrate: skipping entry — value read failed"
);
continue;
}
};
match f(&key, &value) {
MigrateAction::Keep => {
if H::NEEDS_INIT {
self.hook.on_init(&key, &value);
}
}
MigrateAction::Update(new_value) => {
{
let mut inner = self.engine.shards()[i].lock();
let mut index = sync::lock(&self.indexes[i]);
self.put_locked(i, &mut inner, &mut index, &key, &new_value)?;
}
if H::NEEDS_INIT {
self.hook.on_init(&key, &new_value);
}
count += 1;
}
MigrateAction::Delete => {
let mut inner = self.engine.shards()[i].lock();
let mut index = sync::lock(&self.indexes[i]);
self.delete_locked(i, &mut inner, &mut index, &key)?;
count += 1;
}
}
}
}
tracing::info!(mutations = count, "var_map migration complete");
Ok(count)
}
pub(crate) fn replay_init(&self) {
if !H::NEEDS_INIT {
return;
}
let mut count = 0usize;
for i in 0..self.engine.shards().len() {
let keys: Vec<K> = {
let index = sync::lock(&self.indexes[i]);
index.keys().copied().collect()
};
for key in keys {
let value = match self.get(&key) {
Some(v) => v,
None => {
tracing::warn!(
key = ?key.as_bytes(),
"var_map replay_init: skipping entry — value read failed"
);
continue;
}
};
self.hook.on_init(&key, &value);
count += 1;
}
}
tracing::debug!(replayed = count, "var_map replay_init complete");
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarMapShard<'_, K, H>) -> DbResult<R>,
) -> DbResult<R> {
let shard_id = self.shard_for(shard_key);
let inner = self.engine.shards()[shard_id].lock();
let index = sync::lock(&self.indexes[shard_id]);
let mut shard = VarMapShard {
tree: self,
inner,
index,
shard_id,
};
f(&mut shard)
}
pub(crate) fn indexes(&self) -> &[Mutex<HashMap<K, DiskLoc>>] {
&self.indexes
}
fn put_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, DiskLoc>,
key: &K,
value: &[u8],
) -> DbResult<()> {
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
if let Some(old_disk) = index.insert(*key, disk_loc) {
inner.add_dead_bytes(
old_disk.file_id,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
} else {
self.entry_count.fetch_add(1, AtomicOrdering::Relaxed);
}
Ok(())
}
fn insert_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, DiskLoc>,
key: &K,
value: &[u8],
) -> DbResult<()> {
if index.contains_key(key) {
return Err(DbError::KeyExists);
}
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
index.insert(*key, disk_loc);
self.entry_count.fetch_add(1, AtomicOrdering::Relaxed);
Ok(())
}
fn delete_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, DiskLoc>,
key: &K,
) -> DbResult<bool> {
if !index.contains_key(key) {
return Ok(false);
}
inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
if let Some(old_disk) = index.remove(key) {
inner.add_dead_bytes(
old_disk.file_id,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
self.entry_count.fetch_sub(1, AtomicOrdering::Relaxed);
Ok(true)
} else {
Ok(false)
}
}
fn read_value_cached_inner(&self, disk: &DiskLoc) -> DbResult<ByteView> {
let len = disk.len as usize;
let start = (disk.offset & 4095) as usize;
if start + len > 8192 {
let shard = &self.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
return self.read_value_locked_result(disk, &inner);
}
let block_offset = disk.offset as u64 & !4095;
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id,
block_offset,
};
if let Some(block) = self.cache.get(&cache_key) {
metrics::counter!("armdb.cache.hit").increment(1);
return Self::extract_from_block(&block, start, len, || {
self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
});
}
{
let shard = &self.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
if inner.active.file_id == disk.file_id
&& let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
{
return Ok(ByteView::new(bytes));
}
}
metrics::counter!("armdb.cache.miss").increment(1);
let block = self.get_or_read_block(disk.shard_id, disk.file_id, block_offset)?;
Self::extract_from_block(&block, start, len, || {
self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
})
}
fn extract_from_block(
block: &AlignedBuf,
start: usize,
len: usize,
next_block: impl FnOnce() -> DbResult<Arc<AlignedBuf>>,
) -> DbResult<ByteView> {
debug_assert!(
start + len <= 8192,
"extract_from_block supports at most 2 blocks (8192 bytes)"
);
if start + len <= 4096 {
Ok(ByteView::new(&block[start..start + len]))
} else {
let next = next_block()?;
let first_part = &block[start..];
let second_len = len - first_part.len();
let mut combined = Vec::with_capacity(len);
combined.extend_from_slice(first_part);
combined.extend_from_slice(&next[..second_len]);
Ok(ByteView::from_vec(combined))
}
}
fn get_or_read_block(
&self,
shard_id: u8,
file_id: u32,
block_offset: u64,
) -> DbResult<Arc<AlignedBuf>> {
let key = BlockKey {
shard_id,
file_id,
block_offset,
};
if let Some(cached) = self.cache.get(&key) {
return Ok(cached);
}
let shard = &self.engine.shards()[shard_id as usize];
let (buf, is_full_block) = shard.read_block(file_id, block_offset)?;
let arc = Arc::new(buf);
if is_full_block {
self.cache.insert(key, arc.clone());
}
Ok(arc)
}
fn read_value_locked_result(&self, disk: &DiskLoc, inner: &ShardInner) -> DbResult<ByteView> {
let len = disk.len as usize;
if inner.active.file_id == disk.file_id
&& let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
{
return Ok(ByteView::new(bytes));
}
let block_offset = disk.offset as u64 & !4095;
let start = (disk.offset & 4095) as usize;
if start + len <= 4096 {
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id,
block_offset,
};
if let Some(block) = self.cache.get(&cache_key) {
return Ok(ByteView::new(&block[start..start + len]));
}
let (buf, is_full_block) = inner.read_block_locked(disk.file_id, block_offset)?;
let arc = Arc::new(buf);
if is_full_block {
self.cache.insert(cache_key, arc.clone());
}
return Ok(ByteView::new(&arc[start..start + len]));
}
let bytes = inner.read_value_from_disk_locked(disk)?;
Ok(ByteView::new(&bytes))
}
fn read_value_locked(&self, disk: &DiskLoc, inner: &ShardInner) -> Option<ByteView> {
match self.read_value_locked_result(disk, inner) {
Ok(v) => Some(v),
Err(DbError::StaleDiskLoc) => {
tracing::error!(
file_id = disk.file_id,
shard_id = disk.shard_id,
"stale DiskLoc under shard lock - programming bug",
);
None
}
Err(_) => None,
}
}
pub fn shard_for(&self, key: &K) -> usize {
crate::shard_for_key(key, self.shard_prefix_bits, self.engine.shards().len())
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> crate::replication::ReplicationTarget
for VarMap<K, H>
{
fn apply_entry(
&self,
_shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
_value: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
use crate::replication::ApplyOutcome;
let key = K::from_bytes(key);
let disk = DiskLoc::new(
shard_id,
file_id,
(entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64)
as u32,
header.value_len,
);
if header.is_tombstone() {
let old = sync::lock(&self.indexes[self.shard_for(&key)]).remove(&key);
match old {
Some(old_disk) => {
self.entry_count.fetch_sub(1, AtomicOrdering::Relaxed);
Ok(ApplyOutcome::TombstoneRemoved(old_disk))
}
None => Ok(ApplyOutcome::Inserted), }
} else {
let old = sync::lock(&self.indexes[self.shard_for(&key)]).insert(key, disk);
match old {
Some(old_disk) => Ok(ApplyOutcome::Replaced(old_disk)),
None => {
self.entry_count.fetch_add(1, AtomicOrdering::Relaxed);
Ok(ApplyOutcome::Inserted)
}
}
}
}
fn try_apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
use crate::replication::ApplyOutcome;
if raw_after_header.len() < size_of::<K>() + header.value_len as usize {
return Ok(ApplyOutcome::NotMatched);
}
let key = &raw_after_header[..size_of::<K>()];
let value = &raw_after_header[size_of::<K>()..size_of::<K>() + header.value_len as usize];
let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
if crc != header.crc32 {
return Ok(ApplyOutcome::NotMatched);
}
self.apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
key,
value,
)
}
fn key_len(&self) -> usize {
size_of::<K>()
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> VarMap<K, H> {
pub fn start_replication_server(
&self,
bind_addr: std::net::SocketAddr,
signal: crate::shutdown::ShutdownSignal,
) -> crate::error::DbResult<crate::replication::ReplicationServer> {
let consumers = self.install_replication_producers()?;
crate::replication::ReplicationServer::start(
bind_addr,
self.engine.shards().clone(),
consumers,
self.engine.config().max_file_size,
signal,
)
}
fn install_replication_producers(
&self,
) -> crate::error::DbResult<Vec<rtrb::Consumer<crate::replication::ReplicationEntry>>> {
const SPSC_CAPACITY: usize = 4096;
let shards = self.engine.shards();
let mut consumers = Vec::with_capacity(shards.len());
for shard in shards.iter() {
let (p, c) = rtrb::RingBuffer::new(SPSC_CAPACITY);
shard.set_replication_producer(p);
consumers.push(c);
}
Ok(consumers)
}
pub fn start_replication_client(
&self,
leader_addr: std::net::SocketAddr,
registry: std::sync::Arc<crate::replication::ReplicationRegistry>,
signal: crate::shutdown::ShutdownSignal,
) -> crate::error::DbResult<crate::replication::ReplicationClient> {
crate::replication::ReplicationClient::start(
leader_addr,
self.engine.shards().clone(),
registry,
size_of::<K>() as u16,
signal,
)
}
}
#[cfg(feature = "replication")]
impl<K, H> VarMap<K, H>
where
K: Key + Send + Sync + Hash + Eq + 'static,
H: WriteHook<K> + Send + Sync + 'static,
{
pub fn as_replication_target(
self: &std::sync::Arc<Self>,
) -> Box<dyn crate::replication::ReplicationTarget> {
Box::new(std::sync::Arc::clone(self))
}
}
pub struct VarMapShard<'a, K: Key + Send + Sync + Hash + Eq, H: WriteHook<K> = NoHook> {
tree: &'a VarMap<K, H>,
inner: MutexGuard<'a, ShardInner>,
index: MutexGuard<'a, HashMap<K, DiskLoc>>,
shard_id: usize,
}
impl<K: Key + Send + Sync + Hash + Eq, H: WriteHook<K>> VarMapShard<'_, K, H> {
pub fn put(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.put_locked(self.shard_id, &mut self.inner, &mut self.index, key, value)
}
pub fn insert(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.insert_locked(self.shard_id, &mut self.inner, &mut self.index, key, value)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.check_shard(key)?;
self.tree
.delete_locked(self.shard_id, &mut self.inner, &mut self.index, key)
}
pub fn get(&self, key: &K) -> Option<ByteView> {
let disk = *self.index.get(key)?;
self.tree.read_value_locked(&disk, &self.inner)
}
pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.index.contains_key(key)
}
fn check_shard(&self, key: &K) -> DbResult<()> {
if self.tree.shard_for(key) != self.shard_id {
return Err(DbError::ShardMismatch);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
use crate::compaction::compact_shard;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use tempfile::tempdir;
#[derive(Default)]
struct CountingHook<const NEEDS_INIT: bool, const NEEDS_OLD: bool> {
writes: AtomicUsize,
writes_with_old: AtomicUsize,
inits: AtomicUsize,
last_write_new: crate::sync::Mutex<Option<Vec<u8>>>,
last_init_value: crate::sync::Mutex<Option<Vec<u8>>>,
}
impl<const NEEDS_INIT: bool, const NEEDS_OLD: bool> WriteHook<[u8; 8]>
for CountingHook<NEEDS_INIT, NEEDS_OLD>
{
const NEEDS_OLD_VALUE: bool = NEEDS_OLD;
const NEEDS_INIT: bool = NEEDS_INIT;
fn on_write(&self, _key: &[u8; 8], old: Option<&[u8]>, new: Option<&[u8]>) {
self.writes.fetch_add(1, AtomicOrdering::Relaxed);
if old.is_some() {
self.writes_with_old.fetch_add(1, AtomicOrdering::Relaxed);
}
*crate::sync::lock(&self.last_write_new) = new.map(<[u8]>::to_vec);
}
fn on_init(&self, _key: &[u8; 8], value: &[u8]) {
self.inits.fetch_add(1, AtomicOrdering::Relaxed);
*crate::sync::lock(&self.last_init_value) = Some(value.to_vec());
}
}
fn open_test_map_hooked<const NEEDS_INIT: bool, const NEEDS_OLD: bool>(
dir: &std::path::Path,
hook: CountingHook<NEEDS_INIT, NEEDS_OLD>,
) -> VarMap<[u8; 8], CountingHook<NEEDS_INIT, NEEDS_OLD>> {
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 8192;
cfg.write_buffer_size = 8192;
cfg.compaction_threshold = 0.0;
VarMap::open_hooked(dir, cfg, hook).expect("open hooked test map")
}
fn open_test_map(dir: &std::path::Path) -> VarMap<[u8; 8]> {
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 8192;
cfg.write_buffer_size = 8192;
cfg.compaction_threshold = 0.0;
VarMap::open(dir, cfg).expect("open test map")
}
fn put_until_compactable(map: &VarMap<[u8; 8]>, key: [u8; 8]) -> DiskLoc {
map.put(&key, &[0u8; 256]).expect("first put");
let snap = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
for i in 1..65u8 {
map.put(&key, &[i; 256]).expect("overwrite");
}
map.put(&key, b"final-value-payload-XX").expect("final put");
snap
}
#[test]
fn var_map_len_tracks_mutations() {
let dir = tempdir().unwrap();
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), Config::default()).unwrap();
assert_eq!(map.len(), 0);
assert!(map.is_empty());
let k1 = [1u8; 8];
let k2 = [2u8; 8];
map.put(&k1, b"a").unwrap();
assert_eq!(map.len(), 1);
map.put(&k2, b"b").unwrap();
assert_eq!(map.len(), 2);
map.put(&k1, b"c").unwrap();
assert_eq!(map.len(), 2);
map.delete(&k1).unwrap();
assert_eq!(map.len(), 1);
map.delete(&k2).unwrap();
assert_eq!(map.len(), 0);
assert!(map.is_empty());
map.delete(&k1).unwrap();
assert_eq!(map.len(), 0);
}
#[test]
fn var_map_len_survives_reopen() {
let dir = tempdir().unwrap();
{
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), Config::default()).unwrap();
for i in 0u64..50 {
map.put(&i.to_le_bytes(), b"val").unwrap();
}
assert_eq!(map.len(), 50);
}
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), Config::default()).unwrap();
assert_eq!(map.len(), 50);
}
#[test]
fn read_value_cached_inner_returns_stale_after_compaction() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 7u64.to_be_bytes();
let snap = put_until_compactable(&map, key);
let shard = &map.engine.shards()[snap.shard_id as usize];
let _ = compact_shard(shard, &map, 0.0).expect("compaction");
match map.read_value_cached_inner(&snap) {
Err(DbError::StaleDiskLoc) => {}
Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn get_during_compaction_returns_some() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 11u64.to_be_bytes();
let _snap = put_until_compactable(&map, key);
let shard = &map.engine.shards()[0];
let _ = compact_shard(shard, &map, 0.0).expect("compaction");
let v = map.get(&key).expect("post-compaction get");
assert_eq!(v.as_bytes(), b"final-value-payload-XX");
}
#[test]
fn get_or_read_block_returns_stale_for_unknown_file_id() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
match map.get_or_read_block(0, 9999, 0) {
Err(DbError::StaleDiskLoc) => {}
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn retry_limit_returns_none_on_persistent_stale() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 99u64.to_be_bytes();
map.put(&key, b"payload").expect("put");
let snap = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
{
let shard = &map.engine.shards()[snap.shard_id as usize];
let mut inner = shard.lock();
inner.immutable = Vec::new();
inner.active.file_id = u32::MAX;
}
assert!(
map.get(&key).is_none(),
"MAX_STALE_RETRIES must terminate the retry loop and return None"
);
}
#[test]
fn var_map_replay_init_fires_on_init_per_live_key_raw() {
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
for i in 0u64..5 {
map.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
map.delete(&3u64.to_be_bytes()).expect("delete");
map.hook.writes.store(0, AtomicOrdering::Relaxed);
map.hook.inits.store(0, AtomicOrdering::Relaxed);
map.replay_init();
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 4);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
}
#[test]
fn var_map_replay_init_no_hook_is_noop() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
for i in 0u64..3 {
map.put(&i.to_be_bytes(), &[i as u8; 8]).expect("put");
}
map.replay_init();
assert!(map.get(&0u64.to_be_bytes()).is_some());
}
#[test]
fn var_map_migrate_keep_fires_on_init_not_on_write_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
for i in 0u64..4 {
map.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
map.hook.writes.store(0, AtomicOrdering::Relaxed);
map.hook.inits.store(0, AtomicOrdering::Relaxed);
let mutated = map.migrate(|_, _| MigrateAction::Keep).expect("migrate");
assert_eq!(mutated, 0);
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 4);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
}
#[test]
fn var_map_migrate_update_fires_on_init_with_new_value_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 42u64.to_be_bytes();
map.put(&key, b"old-value").expect("put");
map.hook.writes.store(0, AtomicOrdering::Relaxed);
map.hook.inits.store(0, AtomicOrdering::Relaxed);
let new = ByteView::new(b"new-value");
let mutated = map
.migrate(move |_, _| MigrateAction::Update(new.clone()))
.expect("migrate");
assert_eq!(mutated, 1);
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 1);
let g = crate::sync::lock(&map.hook.last_init_value);
assert_eq!(g.as_deref(), Some(b"new-value".as_ref()));
drop(g);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
assert_eq!(map.get(&key).unwrap().as_bytes(), b"new-value");
}
#[test]
fn var_map_migrate_delete_fires_no_hooks_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 7u64.to_be_bytes();
map.put(&key, b"x").expect("put");
map.hook.writes.store(0, AtomicOrdering::Relaxed);
map.hook.inits.store(0, AtomicOrdering::Relaxed);
let mutated = map.migrate(|_, _| MigrateAction::Delete).expect("migrate");
assert_eq!(mutated, 1);
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 0);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
assert!(map.get(&key).is_none());
}
#[test]
fn var_map_public_put_still_fires_on_write_once() {
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
map.put(&1u64.to_be_bytes(), b"v").expect("put");
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 1);
}
#[test]
fn var_map_migrate_no_init_hook_is_silent_for_keep_and_update() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<false, false>(dir.path(), CountingHook::default());
for i in 0u64..3 {
map.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
map.hook.writes.store(0, AtomicOrdering::Relaxed);
map.hook.inits.store(0, AtomicOrdering::Relaxed);
map.migrate(|_, _| MigrateAction::Keep)
.expect("migrate keep");
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 0);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
let new = ByteView::new(b"new");
map.migrate(move |_, _| MigrateAction::Update(new.clone()))
.expect("migrate update");
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 0);
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
}
#[test]
fn var_map_atomic_does_not_fire_hooks() {
let dir = tempdir().unwrap();
let map = open_test_map_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 1u64.to_be_bytes();
map.atomic(&key, |shard| {
shard.put(&key, b"a")?;
shard.delete(&key)?;
Ok(())
})
.expect("atomic");
assert_eq!(map.hook.writes.load(AtomicOrdering::Relaxed), 0);
assert_eq!(map.hook.inits.load(AtomicOrdering::Relaxed), 0);
}
#[test]
fn read_value_locked_result_ok_from_write_buf() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 1u64.to_be_bytes();
let payload = b"in-write-buffer-value";
map.put(&key, payload).expect("put");
let disk = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
let shard = &map.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
let v = map
.read_value_locked_result(&disk, &inner)
.expect("write-buf read must succeed");
assert_eq!(v.as_bytes(), payload);
}
#[test]
fn read_value_locked_result_ok_from_disk_immutable() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 2u64.to_be_bytes();
let payload = b"single-block-immutable";
map.put(&key, payload).expect("put");
for i in 100u64..135 {
map.put(&i.to_be_bytes(), &[i as u8; 256]).expect("rotator");
}
let disk = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
let shard = &map.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
assert_ne!(
disk.file_id, inner.active.file_id,
"test setup failed: key=2 entry is still in the active file's write buffer",
);
let v = map
.read_value_locked_result(&disk, &inner)
.expect("disk read must succeed");
assert_eq!(v.as_bytes(), payload);
}
#[test]
fn read_value_locked_result_propagates_stale_disk_loc() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 3u64.to_be_bytes();
let snap = put_until_compactable(&map, key);
let shard = &map.engine.shards()[snap.shard_id as usize];
let _ = compact_shard(shard, &map, 0.0).expect("compaction");
let inner = shard.lock();
match map.read_value_locked_result(&snap, &inner) {
Err(DbError::StaleDiskLoc) => {}
Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn read_value_locked_returns_none_on_stale() {
let dir = tempdir().unwrap();
let map = open_test_map(dir.path());
let key = 4u64.to_be_bytes();
let snap = put_until_compactable(&map, key);
let shard = &map.engine.shards()[snap.shard_id as usize];
let _ = compact_shard(shard, &map, 0.0).expect("compaction");
let inner = shard.lock();
assert!(map.read_value_locked(&snap, &inner).is_none());
}
#[test]
fn large_value_with_first_block_cached_uses_fallback() {
let dir = tempdir().unwrap();
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 128 * 1024;
cfg.write_buffer_size = 128 * 1024;
cfg.compaction_threshold = 0.0;
cfg.cache.max_size = 1 << 20;
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), cfg).expect("open");
let key = 42u64.to_be_bytes();
let payload: Vec<u8> = (0..20_000u32).map(|i| i as u8).collect();
map.put(&key, &payload).expect("put large");
map.engine.shards()[0]
.rotate_active_for_test(8)
.expect("rotate");
let disk = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
let start = (disk.offset & 4095) as usize;
assert!(
start + disk.len as usize > 8192,
"test precondition: large value must span >2 blocks",
);
let block_offset = disk.offset as u64 & !4095;
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id,
block_offset,
};
map.cache
.insert(cache_key, Arc::new(AlignedBuf::zeroed(4096)));
assert!(
map.cache.get(&cache_key).is_some(),
"cache must be enabled for warm-cache scenario",
);
let v = map
.read_value_cached_inner(&disk)
.expect("large value must read via locked fallback");
assert_eq!(v.as_bytes(), payload.as_slice());
}
#[test]
fn extract_from_block_single_block() {
let mut block = AlignedBuf::zeroed(4096);
for (i, byte) in block.iter_mut().enumerate() {
*byte = i as u8;
}
let v = VarMap::<[u8; 8]>::extract_from_block(&block, 100, 50, || {
panic!("next_block must not be called for single-block reads")
})
.expect("ok");
let expected: Vec<u8> = (100u8..150u8).collect();
assert_eq!(v.as_bytes(), expected.as_slice());
}
#[test]
fn extract_from_block_two_blocks_exact() {
let mut first = AlignedBuf::zeroed(4096);
for byte in first.iter_mut() {
*byte = 0xAA;
}
let mut second = AlignedBuf::zeroed(4096);
for byte in second.iter_mut() {
*byte = 0xBB;
}
let v = VarMap::<[u8; 8]>::extract_from_block(&first, 4095, 4097, || Ok(Arc::new(second)))
.expect("ok");
let bytes = v.as_bytes();
assert_eq!(bytes.len(), 4097);
assert_eq!(bytes[0], 0xAA);
assert_eq!(bytes[1], 0xBB);
assert_eq!(bytes[4096], 0xBB);
}
#[test]
fn extract_from_block_two_blocks_partial() {
let mut first = AlignedBuf::zeroed(4096);
for byte in first.iter_mut() {
*byte = 0x11;
}
let mut second = AlignedBuf::zeroed(4096);
for byte in second.iter_mut() {
*byte = 0x22;
}
let v = VarMap::<[u8; 8]>::extract_from_block(&first, 4000, 200, || Ok(Arc::new(second)))
.expect("ok");
let bytes = v.as_bytes();
assert_eq!(bytes.len(), 200);
assert!(bytes[..96].iter().all(|b| *b == 0x11));
assert!(bytes[96..].iter().all(|b| *b == 0x22));
}
fn open_large_value_map(dir: &std::path::Path) -> VarMap<[u8; 8]> {
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 128 * 1024;
cfg.write_buffer_size = 128 * 1024;
cfg.compaction_threshold = 0.0;
VarMap::open(dir, cfg).expect("open large-value test map")
}
fn build_large_payload(seed: u8) -> Vec<u8> {
(0..50_000u32)
.map(|i| (i as u8).wrapping_add(seed))
.collect()
}
#[test]
fn large_value_read_via_locked_fallback() {
let dir = tempdir().unwrap();
let map = open_large_value_map(dir.path());
let key = 100u64.to_be_bytes();
let payload = build_large_payload(0);
map.put(&key, &payload).expect("put large");
map.engine.shards()[0]
.rotate_active_for_test(8)
.expect("rotate");
let v = map
.get(&key)
.expect("read must succeed via locked fallback");
assert_eq!(v.as_bytes(), payload.as_slice());
}
#[cfg(feature = "encryption")]
#[test]
fn large_value_read_encrypted() {
let dir = tempdir().unwrap();
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 128 * 1024;
cfg.write_buffer_size = 128 * 1024;
cfg.compaction_threshold = 0.0;
cfg.encryption_key = Some([7u8; 32]);
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), cfg).expect("open enc");
let key = 101u64.to_be_bytes();
let payload = build_large_payload(0xAB);
map.put(&key, &payload).expect("put encrypted large");
map.engine.shards()[0]
.rotate_active_for_test(8)
.expect("rotate");
let v = map
.get(&key)
.expect("encrypted large read must succeed via pread_value_encrypted");
assert_eq!(v.as_bytes(), payload.as_slice());
}
#[test]
fn large_value_stale_disk_loc_deterministic() {
let dir = tempdir().unwrap();
let map = open_large_value_map(dir.path());
let key = 102u64.to_be_bytes();
let payload = build_large_payload(0x42);
map.put(&key, &payload).expect("first large put");
let snap = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
map.engine.shards()[0]
.rotate_active_for_test(8)
.expect("rotate after large put");
for i in 1..20u8 {
map.put(&key, &[i; 256]).expect("overwrite");
}
map.put(&key, b"live-after-compaction").expect("final put");
let shard = &map.engine.shards()[snap.shard_id as usize];
let _ = compact_shard(shard, &map, 0.0).expect("compaction");
match map.read_value_cached_inner(&snap) {
Err(DbError::StaleDiskLoc) => {}
Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
let v = map
.get(&key)
.expect("public path must retry and return live value");
assert_eq!(v.as_bytes(), b"live-after-compaction");
}
#[test]
fn read_value_locked_result_ok_from_cache_single_block() {
let dir = tempdir().unwrap();
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 8192;
cfg.write_buffer_size = 8192;
cfg.compaction_threshold = 0.0;
cfg.cache.max_size = 1 << 20;
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), cfg).expect("open");
let key = 9u64.to_be_bytes();
let payload = b"small-single-block-value";
map.put(&key, payload).expect("put");
for i in 100u64..135 {
map.put(&i.to_be_bytes(), &[i as u8; 256]).expect("rotator");
}
let disk = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
let start = (disk.offset & 4095) as usize;
let len = disk.len as usize;
assert!(
start + len <= 4096,
"test precondition: value must fit in a single block",
);
let block_offset = disk.offset as u64 & !4095;
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id,
block_offset,
};
let block = map
.get_or_read_block(disk.shard_id, disk.file_id, block_offset)
.expect("read block");
map.cache.insert(cache_key, block);
assert!(
map.cache.get(&cache_key).is_some(),
"cache must contain the block for step 2 to fire",
);
let shard = &map.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
assert_ne!(
disk.file_id, inner.active.file_id,
"test setup failed: key entry is still in the active write buffer",
);
let v = map
.read_value_locked_result(&disk, &inner)
.expect("cache-hit read must succeed");
assert_eq!(v.as_bytes(), payload);
}
#[test]
fn var_map_get_with_file_id_above_u16() {
let dir = tempdir().unwrap();
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 128 * 1024;
cfg.write_buffer_size = 128 * 1024;
cfg.compaction_threshold = 0.0;
cfg.cache.max_size = 1 << 20;
let map: VarMap<[u8; 8]> = VarMap::open(dir.path(), cfg).expect("open");
let shard = &map.engine.shards()[0];
shard.set_next_file_id(70_000);
shard.rotate_active_for_test(8).expect("first rotate");
assert!(
shard.active_file_id() >= 70_000,
"active_file_id should be >= 70_000 after rotation"
);
let key = 42u64.to_be_bytes();
let value = vec![0xC3u8; 512];
map.put(&key, &value).expect("put");
{
let disk = *sync::lock(&map.indexes[0]).get(&key).expect("indexed");
assert!(
disk.file_id > u16::MAX as u32,
"DiskLoc.file_id must be above u16::MAX, got {}",
disk.file_id,
);
}
shard.flush().expect("flush");
shard.rotate_active_for_test(8).expect("second rotate");
let got = map.get(&key).expect("get must return Some");
assert_eq!(got.as_bytes(), value.as_slice());
}
}