armdb 0.1.14

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::hash::Hash;
use std::marker::PhantomData;

use crate::Key;
use crate::byte_view::ByteView;
use crate::codec::Codec;
use crate::compaction::CompactionIndex;
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook, VarTypedHookAdapter};
use crate::var_map::{VarMap, VarMapShard};

/// A map with fixed-size keys and typed values `T`. Values are encoded via a
/// [`Codec`] and stored on disk (variable length), with a `BlockCache` for reads.
/// Uses per-shard HashMap for O(1) lookup. No ordered iteration — use
/// [`VarTypedTree`](crate::VarTypedTree) if you need prefix/range scans.
///
/// Thin wrapper around [`VarMap<K, VarTypedHookAdapter<K, T, C, H>>`].
///
/// # Error handling
///
/// Same convention as [`VarTypedTree`](crate::VarTypedTree): `get` returns
/// `None` on decode errors. `migrate` keeps entries that fail to decode.
pub struct VarTypedMap<
    K: Key + Send + Sync + Hash + Eq,
    T: Send + Sync,
    C: Codec<T> + Clone,
    H: TypedWriteHook<K, T> = NoHook,
> {
    inner: VarMap<K, VarTypedHookAdapter<K, T, C, H>>,
    codec: C,
    _marker: PhantomData<fn() -> T>,
}

impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone> VarTypedMap<K, T, C> {
    /// Open or create a `VarTypedMap` at the given path.
    pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
        Self::open_hooked_inner(path, config, codec, NoHook)
    }
}

impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
    VarTypedMap<K, T, C, H>
{
    /// Open or create a `VarTypedMap` with a typed write hook.
    pub fn open_hooked(
        path: impl AsRef<std::path::Path>,
        config: Config,
        codec: C,
        hook: H,
    ) -> DbResult<Self> {
        Self::open_hooked_inner(path, config, codec, hook)
    }

    fn open_hooked_inner(
        path: impl AsRef<std::path::Path>,
        config: Config,
        codec: C,
        hook: H,
    ) -> DbResult<Self> {
        let adapter = VarTypedHookAdapter {
            inner: hook,
            codec: codec.clone(),
            _marker: PhantomData,
        };
        let inner = VarMap::open_hooked(path, config, adapter)?;
        Ok(Self {
            inner,
            codec,
            _marker: PhantomData,
        })
    }

    /// Graceful shutdown: write hint files (if enabled), flush write buffers + fsync.
    pub fn close(self) -> DbResult<()> {
        self.inner.close()
    }

    /// Flush all shard write buffers to disk (without fsync).
    pub fn flush_buffers(&self) -> DbResult<()> {
        self.inner.flush_buffers()
    }

    /// Get the database configuration.
    pub fn config(&self) -> &Config {
        self.inner.config()
    }

    /// Trigger a compaction pass across all shards.
    pub fn compact(&self) -> DbResult<usize> {
        self.inner.compact()
    }

    /// Write hint files for all active shard files. Call during graceful shutdown.
    pub fn sync_hints(&self) -> DbResult<()> {
        self.inner.sync_hints()
    }

    /// Pre-populate the block cache with blocks containing live values.
    pub fn warmup(&self) -> DbResult<()> {
        self.inner.warmup()
    }

    /// Access the underlying `VarMap`.
    pub fn as_inner(&self) -> &VarMap<K, VarTypedHookAdapter<K, T, C, H>> {
        &self.inner
    }

    /// Access the codec used for encoding / decoding values.
    pub fn codec(&self) -> &C {
        &self.codec
    }

    // -- Reads ----------------------------------------------------------------

    pub fn get(&self, key: &K) -> Option<T> {
        let bytes = self.inner.get(key)?;
        self.codec.decode_from(&bytes).ok()
    }

    pub fn get_or_err(&self, key: &K) -> DbResult<T> {
        self.get(key).ok_or(DbError::KeyNotFound)
    }

    pub fn contains(&self, key: &K) -> bool {
        self.inner.contains(key)
    }

    // -- Writes ---------------------------------------------------------------

    pub fn put(&self, key: &K, value: &T) -> DbResult<()> {
        let mut buf = Vec::new();
        self.codec.encode_to(value, &mut buf);
        self.inner.put(key, &buf)
    }

    pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
        let mut buf = Vec::new();
        self.codec.encode_to(value, &mut buf);
        self.inner.insert(key, &buf)
    }

    pub fn delete(&self, key: &K) -> DbResult<bool> {
        self.inner.delete(key)
    }

    pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
        let mut exp_buf = Vec::new();
        self.codec.encode_to(expected, &mut exp_buf);
        let mut new_buf = Vec::new();
        self.codec.encode_to(new_value, &mut new_buf);
        self.inner.cas(key, &exp_buf, &new_buf)
    }

    pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
        use std::cell::Cell;
        let out: Cell<Option<T>> = Cell::new(None);
        let result = self.inner.update(key, |bytes| {
            let Ok(current) = self.codec.decode_from(bytes) else {
                return ByteView::new(bytes);
            };
            let new_val = f(&current);
            let mut buf = Vec::new();
            self.codec.encode_to(&new_val, &mut buf);
            out.set(Some(new_val));
            ByteView::from_vec(buf)
        })?;
        if result.is_none() {
            return Ok(None);
        }
        Ok(out.into_inner())
    }

    pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
        use std::cell::Cell;
        let out: Cell<Option<T>> = Cell::new(None);
        let result = self.inner.fetch_update(key, |bytes| {
            let Ok(current) = self.codec.decode_from(bytes) else {
                return ByteView::new(bytes);
            };
            let new_val = f(&current);
            let mut buf = Vec::new();
            self.codec.encode_to(&new_val, &mut buf);
            out.set(Some(current));
            ByteView::from_vec(buf)
        })?;
        if result.is_none() {
            return Ok(None);
        }
        Ok(out.into_inner())
    }

    // -- Atomic ---------------------------------------------------------------

    pub fn atomic<R>(
        &self,
        shard_key: &K,
        f: impl FnOnce(&mut VarTypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
    ) -> DbResult<R> {
        self.inner.atomic(shard_key, |var_shard| {
            // SAFETY: erase VarMapShard lifetime via `*mut ()`; see VarTypedMapShard doc.
            let inner_ptr: *mut () = (var_shard as *mut VarMapShard<'_, _, _>).cast();
            let mut shard = VarTypedMapShard {
                tree: self,
                inner: inner_ptr,
                _marker: PhantomData,
            };
            f(&mut shard)
        })
    }

    // -- Info -----------------------------------------------------------------

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }

    pub fn shard_for(&self, key: &K) -> usize {
        self.inner.shard_for(key)
    }

    // -- Migration ------------------------------------------------------------

    pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
        use crate::MigrateAction;
        self.inner
            .migrate(|key, bytes| match self.codec.decode_from(bytes) {
                Ok(current) => match f(key, &current) {
                    MigrateAction::Keep => MigrateAction::Keep,
                    MigrateAction::Update(new) => {
                        let mut buf = Vec::new();
                        self.codec.encode_to(&new, &mut buf);
                        MigrateAction::Update(ByteView::from_vec(buf))
                    }
                    MigrateAction::Delete => MigrateAction::Delete,
                },
                Err(_) => {
                    tracing::warn!("var_typed_map migrate: decode error, keeping entry");
                    MigrateAction::Keep
                }
            })
    }
}

impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
    CompactionIndex<K> for VarTypedMap<K, T, C, H>
{
    fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
        self.inner.update_if_match(key, old_loc, new_loc)
    }

    fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
        self.inner.invalidate_blocks(shard_id, file_id, total_bytes);
    }

    fn contains_key(&self, key: &K) -> bool {
        self.inner.contains(key)
    }
}

#[cfg(feature = "replication")]
impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
    crate::replication::ReplicationTarget for VarTypedMap<K, T, C, H>
{
    fn apply_entry(
        &self,
        shard_id: u8,
        file_id: u32,
        entry_offset: u64,
        header: &crate::entry::EntryHeader,
        key: &[u8],
        value: &[u8],
    ) -> DbResult<()> {
        self.inner
            .apply_entry(shard_id, file_id, entry_offset, header, key, value)
    }

    fn try_apply_entry(
        &self,
        shard_id: u8,
        file_id: u32,
        entry_offset: u64,
        header: &crate::entry::EntryHeader,
        raw_after_header: &[u8],
    ) -> DbResult<bool> {
        self.inner
            .try_apply_entry(shard_id, file_id, entry_offset, header, raw_after_header)
    }

    fn key_len(&self) -> usize {
        self.inner.key_len()
    }
}

// ---------------------------------------------------------------------------
// VarTypedMapShard
// ---------------------------------------------------------------------------

/// Handle for atomic multi-key operations on a single shard of a [`VarTypedMap`].
/// Obtained via [`VarTypedMap::atomic`]. Hook is **not** fired for operations
/// performed through this handle.
pub struct VarTypedMapShard<
    'tree,
    K: Key + Send + Sync + Hash + Eq,
    T: Send + Sync,
    C: Codec<T> + Clone,
    H: TypedWriteHook<K, T>,
> {
    tree: &'tree VarTypedMap<K, T, C, H>,
    // Type-erased raw pointer to a `VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>`.
    // Invariance of the inner lifetime would force it to `'static` if typed; we
    // erase via `*mut ()` and reconstruct on demand. Valid only within the
    // enclosing `atomic()` closure.
    inner: *mut (),
    _marker: PhantomData<&'tree mut ()>,
}

// SAFETY: see VarTypedShard for the same reasoning.
unsafe impl<
    K: Key + Send + Sync + Hash + Eq,
    T: Send + Sync,
    C: Codec<T> + Clone,
    H: TypedWriteHook<K, T>,
> Send for VarTypedMapShard<'_, K, T, C, H>
{
}

impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
    VarTypedMapShard<'_, K, T, C, H>
{
    fn inner_mut(&mut self) -> &mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
        // SAFETY: pointer was set from a live `&mut VarMapShard` and is only
        // dereferenced inside the enclosing `atomic()` closure.
        unsafe { &mut *(self.inner as *mut VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
    }

    fn inner_ref(&self) -> &VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
        // SAFETY: see `inner_mut`.
        unsafe { &*(self.inner as *const VarMapShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
    }

    pub fn put(&mut self, key: &K, value: &T) -> DbResult<()> {
        let mut buf = Vec::new();
        self.tree.codec.encode_to(value, &mut buf);
        self.inner_mut().put(key, &buf)
    }

    pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
        let mut buf = Vec::new();
        self.tree.codec.encode_to(value, &mut buf);
        self.inner_mut().insert(key, &buf)
    }

    pub fn delete(&mut self, key: &K) -> DbResult<bool> {
        self.inner_mut().delete(key)
    }

    pub fn get(&self, key: &K) -> Option<T> {
        let bytes = self.inner_ref().get(key)?;
        self.tree.codec.decode_from(&bytes).ok()
    }

    pub fn get_or_err(&self, key: &K) -> DbResult<T> {
        self.get(key).ok_or(DbError::KeyNotFound)
    }

    pub fn contains(&self, key: &K) -> bool {
        self.inner_ref().contains(key)
    }
}

#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for VarTypedMap<T::SelfId, T, C, H>
where
    T: crate::CollectionMeta + Send + Sync,
    C: Codec<T> + Clone + 'static,
    H: TypedWriteHook<T::SelfId, T>,
    T::SelfId: crate::Key + Send + Sync + Hash + Eq + Ord,
{
    fn name(&self) -> &str {
        T::NAME
    }
    fn len(&self) -> usize {
        self.len()
    }
    fn compact(&self) -> DbResult<usize> {
        self.compact()
    }
}