use std::hash::Hash;
use std::marker::PhantomData;
use crate::Key;
use crate::byte_view::ByteView;
use crate::codec::Codec;
use crate::compaction::CompactionIndex;
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook, VarTypedHookAdapter};
use crate::var_map::{VarMap, VarMapShard};
pub struct VarTypedMap<
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T> = NoHook,
> {
inner: VarMap<K, VarTypedHookAdapter<K, T, C, H>>,
codec: C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone> VarTypedMap<K, T, C> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedMap<K, T, C, H>
{
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, hook)
}
fn open_hooked_inner(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
let adapter = VarTypedHookAdapter {
inner: hook,
codec: codec.clone(),
_marker: PhantomData,
};
let inner = VarMap::open_hooked(path, config, adapter)?;
Ok(Self {
inner,
codec,
_marker: PhantomData,
})
}
pub fn close(self) -> DbResult<()> {
self.inner.close()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.inner.flush_buffers()
}
pub fn config(&self) -> &Config {
self.inner.config()
}
pub fn compact(&self) -> DbResult<usize> {
self.inner.compact()
}
pub fn sync_hints(&self) -> DbResult<()> {
self.inner.sync_hints()
}
pub fn warmup(&self) -> DbResult<()> {
self.inner.warmup()
}
pub fn as_inner(&self) -> &VarMap<K, VarTypedHookAdapter<K, T, C, H>> {
&self.inner
}
pub fn codec(&self) -> &C {
&self.codec
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner.get(key)?;
self.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner.contains(key)
}
pub fn put(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf);
self.inner.put(key, &buf)
}
pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf);
self.inner.insert(key, &buf)
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
self.inner.delete(key)
}
pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
let mut exp_buf = Vec::new();
self.codec.encode_to(expected, &mut exp_buf);
let mut new_buf = Vec::new();
self.codec.encode_to(new_value, &mut new_buf);
self.inner.cas(key, &exp_buf, &new_buf)
}
pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.update(key, |bytes| {
let Ok(current) = self.codec.decode_from(bytes) else {
return ByteView::new(bytes);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf);
out.set(Some(new_val));
ByteView::from_vec(buf)
})?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.fetch_update(key, |bytes| {
let Ok(current) = self.codec.decode_from(bytes) else {
return ByteView::new(bytes);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf);
out.set(Some(current));
ByteView::from_vec(buf)
})?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarTypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
) -> DbResult<R> {
self.inner.atomic(shard_key, |var_shard| {
let inner_ptr: *mut () = (var_shard as *mut VarMapShard<'_, _, _>).cast();
let mut shard = VarTypedMapShard {
tree: self,
inner: inner_ptr,
_marker: PhantomData,
};
f(&mut shard)
})
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn shard_for(&self, key: &K) -> usize {
self.inner.shard_for(key)
}
pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
use crate::MigrateAction;
self.inner
.migrate(|key, bytes| match self.codec.decode_from(bytes) {
Ok(current) => match f(key, ¤t) {
MigrateAction::Keep => MigrateAction::Keep,
MigrateAction::Update(new) => {
let mut buf = Vec::new();
self.codec.encode_to(&new, &mut buf);
MigrateAction::Update(ByteView::from_vec(buf))
}
MigrateAction::Delete => MigrateAction::Delete,
},
Err(_) => {
tracing::warn!("var_typed_map migrate: decode error, keeping entry");
MigrateAction::Keep
}
})
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
CompactionIndex<K> for VarTypedMap<K, T, C, H>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
self.inner.update_if_match(key, old_loc, new_loc)
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.inner.invalidate_blocks(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.inner.contains(key)
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
crate::replication::ReplicationTarget for VarTypedMap<K, T, C, H>
{
fn apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<()> {
self.inner
.apply_entry(shard_id, file_id, entry_offset, header, key, value)
}
fn try_apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<bool> {
self.inner
.try_apply_entry(shard_id, file_id, entry_offset, header, raw_after_header)
}
fn key_len(&self) -> usize {
self.inner.key_len()
}
}
pub struct VarTypedMapShard<
'tree,
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> {
tree: &'tree VarTypedMap<K, T, C, H>,
inner: *mut (),
_marker: PhantomData<&'tree mut ()>,
}
unsafe impl<
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> Send for VarTypedMapShard<'_, K, T, C, H>
{
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedMapShard<'_, K, T, C, H>
{
fn inner_mut(&mut self) -> &mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &mut *(self.inner as *mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
fn inner_ref(&self) -> &VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &*(self.inner as *const VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
pub fn put(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf);
self.inner_mut().put(key, &buf)
}
pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf);
self.inner_mut().insert(key, &buf)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.inner_mut().delete(key)
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner_ref().get(key)?;
self.tree.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner_ref().contains(key)
}
}
#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for VarTypedMap<T::SelfId, T, C, H>
where
T: crate::CollectionMeta + Send + Sync,
C: Codec<T> + Clone + 'static,
H: TypedWriteHook<T::SelfId, T>,
T::SelfId: crate::Key + Send + Sync + Hash + Eq + Ord,
{
fn name(&self) -> &str {
T::NAME
}
fn len(&self) -> usize {
self.len()
}
fn compact(&self) -> DbResult<usize> {
self.compact()
}
}