mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
//! Lock-free hot index that keeps track of the latest versions per key.
#![allow(dead_code)]

use std::sync::Arc;

use crossbeam_skiplist::SkipMap;
use parking_lot::RwLock;
use smallvec::SmallVec;

use super::{SequenceNumber, VersionPointer};

#[derive(Default)]
struct VersionChain {
    versions: RwLock<SmallVec<[VersionPointer; 4]>>,
    last: RwLock<Option<VersionPointer>>, // fast path for latest
}

impl VersionChain {
    fn append(&self, pointer: VersionPointer) {
        let mut guard = self.versions.write();
        if let Some(last) = guard.last_mut() {
            if last.sequence == pointer.sequence {
                *last = pointer;
                // update cached last
                *self.last.write() = Some(last.clone());
                return;
            }
            if last.sequence < pointer.sequence {
                guard.push(pointer);
                *self.last.write() = guard.last().cloned();
                return;
            }
        }
        // Fallback for empty or out-of-order insertions.
        guard.push(pointer);
        if guard.len() > 1 && guard[guard.len() - 2].sequence > guard[guard.len() - 1].sequence {
            guard.sort_by_key(|p| p.sequence);
        }
        *self.last.write() = guard.last().cloned();
    }

    fn latest_visible(&self, snapshot: SequenceNumber) -> Option<VersionPointer> {
        if let Some(cached) = self.last.read().as_ref() {
            if cached.is_visible_at(snapshot) {
                return Some(cached.clone());
            }
        }
        let guard = self.versions.read();
        guard.iter().rev().find(|p| p.is_visible_at(snapshot)).cloned()
    }

    fn iter_visible(&self, snapshot: SequenceNumber) -> Vec<VersionPointer> {
        let guard = self.versions.read();
        guard
            .iter()
            .filter(|pointer| pointer.is_visible_at(snapshot))
            .cloned()
            .collect()
    }
}

/// Lock-free hash index based on [`SkipMap`].
#[derive(Clone, Default)]
pub struct HotIndex {
    inner: Arc<SkipMap<Arc<[u8]>, Arc<VersionChain>>>,
}

impl HotIndex {
    /// Creates an empty hot index.
    pub fn new() -> Self {
        Self {
            inner: Arc::new(SkipMap::new()),
        }
    }

    /// Returns the total number of keys tracked by the hot index.
    pub fn len(&self) -> usize {
        self.inner.len()
    }

    /// Inserts or updates the chain for the provided key (Arc-backed).
    pub fn upsert_arc(&self, key: Arc<[u8]>, pointer: VersionPointer) {
        if let Some(entry) = self.inner.get(&*key) {
            entry.value().append(pointer);
        } else {
            let chain = Arc::new(VersionChain::default());
            chain.append(pointer);
            self.inner.insert(key, chain);
        }
    }

    /// Convenience upsert that accepts any key and allocates an Arc.
    pub fn upsert(&self, key: impl AsRef<[u8]>, pointer: VersionPointer) {
        let arc: Arc<[u8]> = Arc::from(key.as_ref().to_vec());
        self.upsert_arc(arc, pointer);
    }

    /// Batched upsert to reduce overhead when committing large transactions.
    pub fn upsert_batch<I>(&self, entries: I)
    where
        I: IntoIterator<Item = (Arc<[u8]>, VersionPointer)>,
    {
        for (key, pointer) in entries {
            self.upsert_arc(key, pointer);
        }
    }

    /// Returns the most recent visible version of the key.
    pub fn get_latest(&self, key: &[u8], snapshot: SequenceNumber) -> Option<VersionPointer> {
        self.inner
            .get(key)
            .and_then(|entry| entry.value().latest_visible(snapshot))
    }

    /// Returns all visible versions whose key starts with the provided prefix.
    pub fn scan_prefix(
        &self,
        prefix: &[u8],
        snapshot: SequenceNumber,
    ) -> Vec<(Vec<u8>, VersionPointer)> {
        self.inner
            .iter()
            .filter_map(|entry| {
                let key = entry.key();
                if key.as_ref().starts_with(prefix) {
                    entry
                        .value()
                        .latest_visible(snapshot)
                        .map(|pointer| (key.as_ref().to_vec(), pointer))
                } else {
                    None
                }
            })
            .collect()
    }

    /// Enumerates all keys visible under the snapshot.
    pub fn snapshot_iter(&self, snapshot: SequenceNumber) -> Vec<(Vec<u8>, Vec<VersionPointer>)> {
        self.inner
            .iter()
            .map(|entry| (entry.key().as_ref().to_vec(), entry.value().iter_visible(snapshot)))
            .collect()
    }
}