use std::marker::PhantomData;
use std::ops::Bound;
use crate::Key;
use crate::byte_view::ByteView;
use crate::codec::Codec;
use crate::compaction::CompactionIndex;
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook, VarTypedHookAdapter};
use crate::var_tree::{VarIter, VarShard, VarTree};
pub struct VarTypedTree<
K: Key,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T> = NoHook,
> {
inner: VarTree<K, VarTypedHookAdapter<K, T, C, H>>,
codec: C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone> VarTypedTree<K, T, C> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, NoHook)
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedTree<K, T, C, H>
{
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, hook)
}
fn open_hooked_inner(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
let adapter = VarTypedHookAdapter {
inner: hook,
codec: codec.clone(),
_marker: PhantomData,
};
let inner = VarTree::open_hooked(path, config, adapter)?;
Ok(Self {
inner,
codec,
_marker: PhantomData,
})
}
pub fn close(self) -> DbResult<()> {
self.inner.close()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.inner.flush_buffers()
}
pub fn config(&self) -> &Config {
self.inner.config()
}
pub fn compact(&self) -> DbResult<usize> {
self.inner.compact()
}
pub fn sync_hints(&self) -> DbResult<()> {
self.inner.sync_hints()
}
pub fn warmup(&self) -> DbResult<()> {
self.inner.warmup()
}
pub fn as_inner(&self) -> &VarTree<K, VarTypedHookAdapter<K, T, C, H>> {
&self.inner
}
pub fn codec(&self) -> &C {
&self.codec
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner.get(key)?;
self.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner.contains(key)
}
pub fn first(&self) -> Option<(K, T)> {
let (k, bytes) = self.inner.first()?;
let v = self.codec.decode_from(&bytes).ok()?;
Some((k, v))
}
pub fn last(&self) -> Option<(K, T)> {
let (k, bytes) = self.inner.last()?;
let v = self.codec.decode_from(&bytes).ok()?;
Some((k, v))
}
pub fn put(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf)?;
self.inner.put(key, &buf)
}
pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf)?;
self.inner.insert(key, &buf)
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
self.inner.delete(key)
}
pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
let mut exp_buf = Vec::new();
self.codec.encode_to(expected, &mut exp_buf)?;
let mut new_buf = Vec::new();
self.codec.encode_to(new_value, &mut new_buf)?;
self.inner.cas(key, &exp_buf, &new_buf)
}
pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.try_update_inner(
key,
|bytes| {
let Some(current) = self.codec.decode_from(bytes).ok() else {
return Ok(None);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf)?;
out.set(Some(new_val));
Ok(Some(ByteView::from_vec(buf)))
},
false,
)?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.try_update_inner(
key,
|bytes| {
let Some(current) = self.codec.decode_from(bytes).ok() else {
return Ok(None);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf)?;
out.set(Some(current));
Ok(Some(ByteView::from_vec(buf)))
},
true,
)?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarTypedShard<'_, K, T, C, H>) -> DbResult<R>,
) -> DbResult<R> {
self.inner.atomic(shard_key, |var_shard| {
let inner_ptr: *mut () = (var_shard as *mut VarShard<'_, _, _>).cast();
let mut shard = VarTypedShard {
tree: self,
inner: inner_ptr,
_marker: PhantomData,
};
f(&mut shard)
})
}
pub fn prefix_iter(&self, prefix: &[u8]) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.prefix_iter(prefix),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn iter(&self) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.iter(),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn range(&self, start: &K, end: &K) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.range(start, end),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.range_bounds(start, end),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn shard_for(&self, key: &K) -> usize {
self.inner.shard_for(key)
}
pub fn entry_len(&self, key: &K) -> Option<u32> {
self.inner.entry_len(key)
}
pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
use crate::MigrateAction;
self.inner
.migrate(|key, bytes| match self.codec.decode_from(bytes) {
Ok(current) => match f(key, ¤t) {
MigrateAction::Keep => MigrateAction::Keep,
MigrateAction::Update(new) => {
let mut buf = Vec::new();
match self.codec.encode_to(&new, &mut buf) {
Ok(()) => MigrateAction::Update(ByteView::from_vec(buf)),
Err(_) => {
tracing::warn!(
"var_typed_tree migrate: encode error, keeping entry"
);
MigrateAction::Keep
}
}
}
MigrateAction::Delete => MigrateAction::Delete,
},
Err(_) => {
tracing::warn!("var_typed_tree migrate: decode error, keeping entry");
MigrateAction::Keep
}
})
}
pub(crate) fn replay_init(&self) {
self.inner.replay_init();
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> CompactionIndex<K>
for VarTypedTree<K, T, C, H>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
self.inner.update_if_match(key, old_loc, new_loc)
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.inner.invalidate_blocks(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.inner.contains(key)
}
}
#[cfg(feature = "replication")]
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
crate::replication::ReplicationTarget for VarTypedTree<K, T, C, H>
{
fn apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
self.inner.apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
key,
value,
)
}
fn try_apply_entry(
&self,
shard_inner: &mut crate::shard::ShardInner,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<crate::replication::ApplyOutcome> {
self.inner.try_apply_entry(
shard_inner,
shard_id,
file_id,
entry_offset,
header,
raw_after_header,
)
}
fn key_len(&self) -> usize {
self.inner.key_len()
}
}
pub struct VarTypedIter<'a, K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> {
inner: VarIter<'a, K, VarTypedHookAdapter<K, T, C, H>>,
codec: &'a C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> Iterator
for VarTypedIter<'_, K, T, C, H>
{
type Item = (K, T);
fn next(&mut self) -> Option<Self::Item> {
loop {
let (k, bytes) = self.inner.next()?;
match self.codec.decode_from(&bytes) {
Ok(v) => return Some((k, v)),
Err(_) => {
tracing::debug!(
value_len = bytes.len(),
"var_typed_iter: decode error, skipping entry"
);
}
}
}
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> DoubleEndedIterator
for VarTypedIter<'_, K, T, C, H>
{
fn next_back(&mut self) -> Option<Self::Item> {
while let Some((k, bytes)) = self.inner.next_back() {
match self.codec.decode_from(&bytes) {
Ok(v) => return Some((k, v)),
Err(_) => {
tracing::debug!(
value_len = bytes.len(),
"var_typed_iter: decode error, skipping entry"
);
}
}
}
None
}
}
pub struct VarTypedShard<
'tree,
K: Key,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> {
tree: &'tree VarTypedTree<K, T, C, H>,
inner: *mut (),
_marker: PhantomData<&'tree mut ()>,
}
unsafe impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> Send
for VarTypedShard<'_, K, T, C, H>
{
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedShard<'_, K, T, C, H>
{
fn inner_mut(&mut self) -> &mut VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &mut *(self.inner as *mut VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
fn inner_ref(&self) -> &VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &*(self.inner as *const VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
pub fn put(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf)?;
self.inner_mut().put(key, &buf)
}
pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf)?;
self.inner_mut().insert(key, &buf)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.inner_mut().delete(key)
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner_ref().get(key)?;
self.tree.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner_ref().contains(key)
}
}
#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for VarTypedTree<T::SelfId, T, C, H>
where
T: crate::CollectionMeta + Send + Sync,
C: Codec<T> + Clone + 'static,
H: TypedWriteHook<T::SelfId, T>,
T::SelfId: crate::Key + Ord,
{
fn name(&self) -> &str {
T::NAME
}
fn len(&self) -> usize {
self.len()
}
fn compact(&self) -> DbResult<usize> {
self.compact()
}
fn flush(&self) -> DbResult<()> {
self.flush_buffers()?;
self.sync_hints()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::RapiraCodec;
use crate::config::Config;
use tempfile::tempdir;
#[test]
fn entry_len_returns_some_for_existing_key() {
let tmp = tempdir().expect("tmp");
let tree: VarTypedTree<[u8; 8], Vec<u8>, RapiraCodec> =
VarTypedTree::open(tmp.path(), Config::test(), RapiraCodec).expect("open");
let key = 1u64.to_be_bytes();
tree.put(&key, &vec![10u8, 20, 30, 40]).expect("put");
assert_eq!(tree.entry_len(&key), Some(8u32));
}
#[test]
fn entry_len_returns_none_for_missing_key() {
let tmp = tempdir().expect("tmp");
let tree: VarTypedTree<[u8; 8], Vec<u8>, RapiraCodec> =
VarTypedTree::open(tmp.path(), Config::test(), RapiraCodec).expect("open");
let key = 99u64.to_be_bytes();
assert_eq!(tree.entry_len(&key), None);
}
}