use std::collections::HashMap;
use std::hash::Hash;
use std::mem::size_of;
use crate::Key;
use crate::codec::Codec;
use crate::compaction::{CompactionIndex, compact_shard};
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::engine::Engine;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook};
use crate::recovery::recover_typed_map;
use crate::shard::ShardInner;
use crate::skiplist::node::TypedData;
use crate::sync::{self, Mutex, MutexGuard};
use crate::typed_tree::TypedRef;
pub(crate) struct TypedMapEntry<T> {
pub(crate) ptr: *mut TypedData<T>,
}
unsafe impl<T: Send> Send for TypedMapEntry<T> {}
unsafe impl<T: Sync> Sync for TypedMapEntry<T> {}
pub struct TypedMap<
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T>,
H: TypedWriteHook<K, T> = NoHook,
> {
indexes: Vec<Mutex<HashMap<K, TypedMapEntry<T>>>>,
collector: seize::Collector,
engine: Engine,
codec: C,
compaction_threshold: f64,
shard_prefix_bits: usize,
hook: H,
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync> TypedMap<K, T, C> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
Self::open_inner(path, config, codec, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
TypedMap<K, T, C, H>
{
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
Self::open_inner(path, config, codec, hook)
}
fn open_inner(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
let compaction_threshold = config.compaction_threshold;
let shard_prefix_bits = config.shard_prefix_bits;
let engine = Engine::open(path, config)?;
let shard_count = engine.shards().len();
let mut indexes = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
indexes.push(Mutex::new(HashMap::new()));
}
let map = Self {
indexes,
collector: seize::Collector::new(),
engine,
codec,
compaction_threshold,
shard_prefix_bits,
hook,
};
let shard_dirs = map.engine.shard_dirs();
let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
let shard_ids = map.engine.shard_ids();
let hints = map.engine.hints();
let max_gsn = recover_typed_map::<K, T, C>(
&shard_dir_refs,
&shard_ids,
map.indexes(),
&map.codec,
hints,
#[cfg(feature = "encryption")]
map.engine.cipher(),
)?;
map.engine
.gsn()
.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
if hints {
for shard in map.engine.shards().iter() {
shard.set_key_len(size_of::<K>());
}
}
tracing::info!(
key_size = size_of::<K>(),
entries = map.len(),
"typed_map recovered"
);
Ok(map)
}
pub fn close(self) -> DbResult<()> {
if self.engine.hints() {
self.sync_hints()?;
}
self.engine.flush()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.engine.flush_buffers()
}
pub fn config(&self) -> &Config {
self.engine.config()
}
}
impl<
K: Key + Send + Sync + Hash + Eq,
T: Clone + Send + Sync,
C: Codec<T> + Sync,
H: TypedWriteHook<K, T>,
> CompactionIndex<K> for TypedMap<K, T, C, H>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
if let Some(entry) = index.get_mut(key) {
let data = unsafe { &*entry.ptr };
if data.disk == old_loc {
let new_data = Box::into_raw(Box::new(TypedData {
disk: new_loc,
value: data.value.clone(),
}));
let old_ptr = entry.ptr;
entry.ptr = new_data;
unsafe {
self.collector
.retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
}
return true;
}
}
false
}
fn contains_key(&self, key: &K) -> bool {
self.contains(key)
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
TypedMap<K, T, C, H>
{
pub fn compact(&self) -> DbResult<usize>
where
T: Clone,
{
let mut total_compacted = 0;
for shard in self.engine.shards().iter() {
total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
}
Ok(total_compacted)
}
pub fn get(&self, key: &K) -> Option<TypedRef<'_, T>> {
metrics::counter!("armdb.ops", "op" => "get", "tree" => "typed_map").increment(1);
let guard = self.collector.enter();
let data_ptr = {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
let entry = index.get(key)?;
entry.ptr as *const TypedData<T>
};
Some(TypedRef::new(guard, data_ptr))
}
pub fn get_or_err(&self, key: &K) -> DbResult<TypedRef<'_, T>> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
let index = sync::lock(&self.indexes[self.shard_for(key)]);
index.contains_key(key)
}
pub fn put(&self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
metrics::counter!("armdb.ops", "op" => "put", "tree" => "typed_map").increment(1);
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
self.put_locked(shard_id, &mut inner, &mut index, guard, key, value)
}
pub fn insert(&self, key: &K, value: T) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "insert", "tree" => "typed_map").increment(1);
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
self.insert_locked(shard_id, &mut inner, &mut index, &guard, key, value)
}
pub fn delete(&self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
metrics::counter!("armdb.ops", "op" => "delete", "tree" => "typed_map").increment(1);
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
self.delete_locked(shard_id, &mut inner, &mut index, guard, key)
}
pub fn cas(&self, key: &K, expected: &T, new_value: T) -> DbResult<()>
where
T: PartialEq,
{
metrics::counter!("armdb.ops", "op" => "cas", "tree" => "typed_map").increment(1);
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let entry = index.get(key).ok_or(DbError::KeyNotFound)?;
let current_data = unsafe { &*entry.ptr };
if current_data.value != *expected {
return Err(DbError::CasMismatch);
}
let mut buf = Vec::new();
self.codec.encode_to(&new_value, &mut buf);
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
let new_data = Box::new(TypedData {
disk: disk_loc,
value: new_value,
});
self.hook
.on_write(key, Some(¤t_data.value), Some(&new_data.value));
let new_data_ptr = Box::into_raw(new_data);
let old_ptr = entry.ptr;
index.get_mut(key).expect("key exists").ptr = new_data_ptr;
let old_disk = unsafe { (*old_ptr).disk };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.collector
.retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
}
Ok(())
}
pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<TypedRef<'_, T>>> {
self.update_inner(key, f, false)
}
pub fn fetch_update(
&self,
key: &K,
f: impl FnOnce(&T) -> T,
) -> DbResult<Option<TypedRef<'_, T>>> {
self.update_inner(key, f, true)
}
fn update_inner(
&self,
key: &K,
f: impl FnOnce(&T) -> T,
return_old: bool,
) -> DbResult<Option<TypedRef<'_, T>>> {
metrics::counter!("armdb.ops", "op" => "update", "tree" => "typed_map").increment(1);
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let entry = match index.get(key) {
Some(e) => e,
None => return Ok(None),
};
let old_data = unsafe { &*entry.ptr };
let new_value = f(&old_data.value);
let mut buf = Vec::new();
self.codec.encode_to(&new_value, &mut buf);
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
let new_data = Box::new(TypedData {
disk: disk_loc,
value: new_value,
});
self.hook
.on_write(key, Some(&old_data.value), Some(&new_data.value));
let new_data_ptr = Box::into_raw(new_data);
let old_ptr = entry.ptr;
index.get_mut(key).expect("key exists").ptr = new_data_ptr;
let old_disk = unsafe { (*old_ptr).disk };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.collector
.retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
}
let data = if return_old {
old_ptr as *const TypedData<T>
} else {
new_data_ptr as *const TypedData<T>
};
Ok(Some(TypedRef::new(self.collector.enter(), data)))
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut TypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
) -> DbResult<R> {
let shard_id = self.shard_for(shard_key);
let inner = self.engine.shards()[shard_id].lock();
let index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
let mut shard = TypedMapShard {
map: self,
inner,
index,
shard_id,
guard,
};
f(&mut shard)
}
pub fn len(&self) -> usize {
self.indexes.iter().map(|m| sync::lock(m).len()).sum()
}
pub fn is_empty(&self) -> bool {
self.indexes.iter().all(|m| sync::lock(m).is_empty())
}
pub fn sync_hints(&self) -> DbResult<()> {
for shard in self.engine.shards().iter() {
shard.write_active_hint(size_of::<K>())?;
}
Ok(())
}
pub fn shard_for(&self, key: &K) -> usize {
if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
return (hash as usize) % self.engine.shards().len();
}
let full_bytes = self.shard_prefix_bits / 8;
let extra_bits = self.shard_prefix_bits % 8;
let hash = if extra_bits == 0 {
xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
} else {
let mut buf = K::zeroed();
buf.as_bytes_mut()[..full_bytes].copy_from_slice(&key.as_bytes()[..full_bytes]);
let mask = !((1u8 << (8 - extra_bits)) - 1);
buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
};
(hash as usize) % self.engine.shards().len()
}
pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
use crate::MigrateAction;
let mut count = 0;
for i in 0..self.engine.shards().len() {
let keys: Vec<K> = {
let index = sync::lock(&self.indexes[i]);
index.keys().copied().collect()
};
for key in keys {
let value_ref = match self.get(&key) {
Some(v) => v,
None => continue,
};
let action = f(&key, &*value_ref);
drop(value_ref);
match action {
MigrateAction::Keep => {
if H::NEEDS_INIT
&& let Some(v) = self.get(&key)
{
self.hook.on_init(&key, &*v);
}
}
MigrateAction::Update(value) => {
if H::NEEDS_INIT {
self.hook.on_init(&key, &value);
}
let shard_id = self.shard_for(&key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
self.put_locked_inner::<false>(
shard_id, &mut inner, &mut index, guard, &key, value,
)?;
count += 1;
}
MigrateAction::Delete => {
let shard_id = self.shard_for(&key);
let mut inner = self.engine.shards()[shard_id].lock();
let mut index = sync::lock(&self.indexes[shard_id]);
let guard = self.collector.enter();
self.delete_locked_inner::<false>(
shard_id, &mut inner, &mut index, guard, &key,
)?;
count += 1;
}
}
}
}
tracing::info!(mutations = count, "typed_map migration complete");
Ok(count)
}
pub(crate) fn replay_init(&self) {
if !H::NEEDS_INIT {
return;
}
for shard in &self.indexes {
let index = sync::lock(shard);
for (key, entry) in index.iter() {
let data = unsafe { &*entry.ptr };
self.hook.on_init(key, &data.value);
}
}
}
pub(crate) fn indexes(&self) -> &[Mutex<HashMap<K, TypedMapEntry<T>>>] {
&self.indexes
}
fn put_locked<'g>(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, TypedMapEntry<T>>,
guard: seize::LocalGuard<'g>,
key: &K,
value: T,
) -> DbResult<Option<TypedRef<'g, T>>> {
self.put_locked_inner::<true>(shard_id, inner, index, guard, key, value)
}
fn put_locked_inner<'g, const HOOKS: bool>(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, TypedMapEntry<T>>,
guard: seize::LocalGuard<'g>,
key: &K,
value: T,
) -> DbResult<Option<TypedRef<'g, T>>> {
let mut buf = Vec::new();
self.codec.encode_to(&value, &mut buf);
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
let new_data_ptr = Box::into_raw(Box::new(TypedData {
disk: disk_loc,
value,
}));
if let Some(old_entry) = index.insert(*key, TypedMapEntry { ptr: new_data_ptr }) {
let old_data = old_entry.ptr as *const TypedData<T>;
if HOOKS {
self.hook.on_write(
key,
Some(unsafe { &(*old_data).value }),
Some(unsafe { &(*new_data_ptr).value }),
);
}
let old_disk = unsafe { (*old_entry.ptr).disk };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.collector
.retire(old_entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
}
Ok(Some(TypedRef::new(guard, old_data)))
} else {
if HOOKS {
self.hook
.on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
}
Ok(None)
}
}
fn insert_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, TypedMapEntry<T>>,
_guard: &seize::LocalGuard<'_>,
key: &K,
value: T,
) -> DbResult<()> {
if index.contains_key(key) {
return Err(DbError::KeyExists);
}
let mut buf = Vec::new();
self.codec.encode_to(&value, &mut buf);
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
let new_data_ptr = Box::into_raw(Box::new(TypedData {
disk: disk_loc,
value,
}));
index.insert(*key, TypedMapEntry { ptr: new_data_ptr });
self.hook
.on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
Ok(())
}
fn delete_locked<'g>(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, TypedMapEntry<T>>,
guard: seize::LocalGuard<'g>,
key: &K,
) -> DbResult<Option<TypedRef<'g, T>>> {
self.delete_locked_inner::<true>(shard_id, inner, index, guard, key)
}
fn delete_locked_inner<'g, const HOOKS: bool>(
&self,
shard_id: usize,
inner: &mut ShardInner,
index: &mut HashMap<K, TypedMapEntry<T>>,
guard: seize::LocalGuard<'g>,
key: &K,
) -> DbResult<Option<TypedRef<'g, T>>> {
let entry = match index.remove(key) {
Some(e) => e,
None => return Ok(None),
};
let old_data = entry.ptr as *const TypedData<T>;
if HOOKS {
self.hook
.on_write(key, Some(unsafe { &(*old_data).value }), None);
}
inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
let old_disk = unsafe { (*entry.ptr).disk };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.collector
.retire(entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
}
Ok(Some(TypedRef::new(guard, old_data)))
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T>, H: TypedWriteHook<K, T>> Drop
for TypedMap<K, T, C, H>
{
fn drop(&mut self) {
for index_mutex in &self.indexes {
let map = sync::lock(index_mutex);
for (_, entry) in map.iter() {
unsafe {
drop(Box::from_raw(entry.ptr));
}
}
}
}
}
pub struct TypedMapShard<
'a,
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T>,
H: TypedWriteHook<K, T> = NoHook,
> {
map: &'a TypedMap<K, T, C, H>,
inner: MutexGuard<'a, ShardInner>,
index: MutexGuard<'a, HashMap<K, TypedMapEntry<T>>>,
shard_id: usize,
guard: seize::LocalGuard<'a>,
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
TypedMapShard<'_, K, T, C, H>
{
pub fn put(&mut self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
self.check_shard(key)?;
let guard = self.map.collector.enter();
self.map.put_locked(
self.shard_id,
&mut self.inner,
&mut self.index,
guard,
key,
value,
)
}
pub fn insert(&mut self, key: &K, value: T) -> DbResult<()> {
self.check_shard(key)?;
self.map.insert_locked(
self.shard_id,
&mut self.inner,
&mut self.index,
&self.guard,
key,
value,
)
}
pub fn delete(&mut self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
self.check_shard(key)?;
let guard = self.map.collector.enter();
self.map
.delete_locked(self.shard_id, &mut self.inner, &mut self.index, guard, key)
}
pub fn get(&self, key: &K) -> Option<&T> {
let entry = self.index.get(key)?;
Some(unsafe { &(*entry.ptr).value })
}
pub fn get_or_err(&self, key: &K) -> DbResult<&T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.index.contains_key(key)
}
fn check_shard(&self, key: &K) -> DbResult<()> {
if self.map.shard_for(key) != self.shard_id {
return Err(DbError::ShardMismatch);
}
Ok(())
}
}
#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for TypedMap<T::SelfId, T, C, H>
where
T: crate::CollectionMeta + Clone + Send + Sync,
C: crate::Codec<T> + Sync,
H: crate::hook::TypedWriteHook<T::SelfId, T>,
T::SelfId: crate::Key + Send + Sync + std::hash::Hash + Eq,
{
fn name(&self) -> &str {
T::NAME
}
fn len(&self) -> usize {
self.len()
}
fn compact(&self) -> crate::DbResult<usize> {
self.compact()
}
}