use std::hash::Hash;
use std::marker::PhantomData;
use crate::Key;
use crate::byte_view::ByteView;
use crate::codec::Codec;
use crate::compaction::CompactionIndex;
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook, VarTypedHookAdapter};
use crate::var_map::{VarMap, VarMapShard};
pub struct VarTypedMap<
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T> = NoHook,
> {
inner: VarMap<K, VarTypedHookAdapter<K, T, C, H>>,
codec: C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone> VarTypedMap<K, T, C> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, NoHook)
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedMap<K, T, C, H>
{
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, hook)
}
fn open_hooked_inner(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
let adapter = VarTypedHookAdapter {
inner: hook,
codec: codec.clone(),
_marker: PhantomData,
};
let inner = VarMap::open_hooked(path, config, adapter)?;
Ok(Self {
inner,
codec,
_marker: PhantomData,
})
}
pub fn close(self) -> DbResult<()> {
self.inner.close()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.inner.flush_buffers()
}
pub fn config(&self) -> &Config {
self.inner.config()
}
pub fn compact(&self) -> DbResult<usize> {
self.inner.compact()
}
pub fn sync_hints(&self) -> DbResult<()> {
self.inner.sync_hints()
}
pub fn warmup(&self) -> DbResult<()> {
self.inner.warmup()
}
pub fn as_inner(&self) -> &VarMap<K, VarTypedHookAdapter<K, T, C, H>> {
&self.inner
}
pub fn codec(&self) -> &C {
&self.codec
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner.get(key)?;
self.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner.contains(key)
}
pub fn put(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf)?;
self.inner.put(key, &buf)
}
pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf)?;
self.inner.insert(key, &buf)
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
self.inner.delete(key)
}
pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
let mut exp_buf = Vec::new();
self.codec.encode_to(expected, &mut exp_buf)?;
let mut new_buf = Vec::new();
self.codec.encode_to(new_value, &mut new_buf)?;
self.inner.cas(key, &exp_buf, &new_buf)
}
pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.try_update_inner(
key,
|bytes| {
let Some(current) = self.codec.decode_from(bytes).ok() else {
return Ok(None);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf)?;
out.set(Some(new_val));
Ok(Some(ByteView::from_vec(buf)))
},
false,
)?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.try_update_inner(
key,
|bytes| {
let Some(current) = self.codec.decode_from(bytes).ok() else {
return Ok(None);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf)?;
out.set(Some(current));
Ok(Some(ByteView::from_vec(buf)))
},
true,
)?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarTypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
) -> DbResult<R> {
self.inner.atomic(shard_key, |var_shard| {
let inner_ptr: *mut () = (var_shard as *mut VarMapShard<'_, _, _>).cast();
let mut shard = VarTypedMapShard {
tree: self,
inner: inner_ptr,
_marker: PhantomData,
};
f(&mut shard)
})
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn shard_for(&self, key: &K) -> usize {
self.inner.shard_for(key)
}
pub fn entry_len(&self, key: &K) -> Option<u32> {
self.inner.entry_len(key)
}
pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
use crate::MigrateAction;
self.inner
.migrate(|key, bytes| match self.codec.decode_from(bytes) {
Ok(current) => match f(key, ¤t) {
MigrateAction::Keep => MigrateAction::Keep,
MigrateAction::Update(new) => {
let mut buf = Vec::new();
match self.codec.encode_to(&new, &mut buf) {
Ok(()) => MigrateAction::Update(ByteView::from_vec(buf)),
Err(_) => {
tracing::warn!(
"var_typed_map migrate: encode error, keeping entry"
);
MigrateAction::Keep
}
}
}
MigrateAction::Delete => MigrateAction::Delete,
},
Err(_) => {
tracing::warn!("var_typed_map migrate: decode error, keeping entry");
MigrateAction::Keep
}
})
}
pub(crate) fn replay_init(&self) {
self.inner.replay_init();
}
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
CompactionIndex<K> for VarTypedMap<K, T, C, H>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
self.inner.update_if_match(key, old_loc, new_loc)
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.inner.invalidate_blocks(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.inner.contains(key)
}
}
#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
crate::replication::ReplicationTarget for VarTypedMap<K, T, C, H>
{
fn apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
self.inner.apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
key,
value,
)
}
fn try_apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
self.inner.try_apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
raw_after_header,
)
}
fn key_len(&self) -> usize {
self.inner.key_len()
}
}
pub struct VarTypedMapShard<
'tree,
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> {
tree: &'tree VarTypedMap<K, T, C, H>,
inner: *mut (),
_marker: PhantomData<&'tree mut ()>,
}
unsafe impl<
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> Send for VarTypedMapShard<'_, K, T, C, H>
{
}
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedMapShard<'_, K, T, C, H>
{
fn inner_mut(&mut self) -> &mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &mut *(self.inner as *mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
fn inner_ref(&self) -> &VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &*(self.inner as *const VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
pub fn put(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf)?;
self.inner_mut().put(key, &buf)
}
pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf)?;
self.inner_mut().insert(key, &buf)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.inner_mut().delete(key)
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner_ref().get(key)?;
self.tree.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner_ref().contains(key)
}
}
#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for VarTypedMap<T::SelfId, T, C, H>
where
T: crate::CollectionMeta + Send + Sync,
C: Codec<T> + Clone + 'static,
H: TypedWriteHook<T::SelfId, T>,
T::SelfId: crate::Key + Send + Sync + Hash + Eq,
{
fn name(&self) -> &str {
T::NAME
}
fn len(&self) -> usize {
self.len()
}
fn compact(&self) -> DbResult<usize> {
self.compact()
}
fn flush(&self) -> DbResult<()> {
self.flush_buffers()?;
self.sync_hints()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::RapiraCodec;
use crate::config::Config;
use tempfile::tempdir;
#[test]
fn entry_len_returns_some_for_existing_key() {
let tmp = tempdir().expect("tmp");
let map: VarTypedMap<[u8; 8], Vec<u8>, RapiraCodec> =
VarTypedMap::open(tmp.path(), Config::test(), RapiraCodec).expect("open");
let key = 1u64.to_be_bytes();
map.put(&key, &vec![10u8, 20, 30, 40]).expect("put");
assert_eq!(map.entry_len(&key), Some(8u32));
}
#[test]
fn entry_len_returns_none_for_missing_key() {
let tmp = tempdir().expect("tmp");
let map: VarTypedMap<[u8; 8], Vec<u8>, RapiraCodec> =
VarTypedMap::open(tmp.path(), Config::test(), RapiraCodec).expect("open");
let key = 99u64.to_be_bytes();
assert_eq!(map.entry_len(&key), None);
}
}