mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};

use crossbeam_skiplist::SkipMap;
use crossbeam_utils::CachePadded;

const DEFAULT_LIMIT_BYTES: u64 = 3 * 1024 * 1024 * 1024;
const ENTRY_BASE_OVERHEAD: usize = 32;
const BACKPRESSURE_MARGIN: u64 = 64 * 1024 * 1024;

/// Errors returned by the memtable APIs.
#[derive(Debug, thiserror::Error)]
pub enum MemTableError {
    /// Raised when inserts arrive after the memtable has been frozen.
    #[error("memtable is frozen")]
    Frozen,
    /// Triggered when the memtable is close to exhausting its soft capacity.
    #[error("memtable back-pressure active")]
    Backpressure,
}

/// In-memory record representation.
#[derive(Clone, Debug)]
pub struct MemTableEntry {
    pub key: Arc<[u8]>,
    pub value: Arc<[u8]>,
    pub sequence: u64,
    pub tombstone: bool,
}

impl MemTableEntry {
    fn heap_usage(&self) -> usize {
        self.key.len() + self.value.len() + ENTRY_BASE_OVERHEAD
    }
}

#[derive(Clone)]
struct EntryValue {
    value: Arc<[u8]>,
    sequence: u64,
    tombstone: bool,
    heap_bytes: usize,
}

impl EntryValue {
    fn into_entry(self, key: Arc<[u8]>) -> MemTableEntry {
        MemTableEntry {
            key,
            value: self.value,
            sequence: self.sequence,
            tombstone: self.tombstone,
        }
    }
}

struct Arena {
    used: CachePadded<AtomicU64>,
}

/// Lock-free skiplist memtable with per-CPU arenas and back-pressure.
pub struct MemTable {
    inner: SkipMap<Arc<[u8]>, EntryValue>,
    arenas: Arc<Vec<Arena>>,
    bytes_used: AtomicU64,
    backpressure: AtomicBool,
    frozen: AtomicBool,
    sequence: AtomicU64,
    limit_bytes: u64,
}

impl MemTable {
    /// Constructs a new memtable with the default 3 GiB soft limit.
    pub fn new() -> Self {
        Self::with_limit(DEFAULT_LIMIT_BYTES)
    }

    /// Constructs a new memtable with the provided soft limit in bytes.
    pub fn with_limit(limit_bytes: u64) -> Self {
        let cpu_count = std::thread::available_parallelism()
            .map(|n| n.get())
            .unwrap_or(1)
            .max(1);
        let arenas = (0..cpu_count)
            .map(|_| Arena {
                used: CachePadded::new(AtomicU64::new(0)),
            })
            .collect();

        Self {
            inner: SkipMap::new(),
            arenas: Arc::new(arenas),
            bytes_used: AtomicU64::new(0),
            backpressure: AtomicBool::new(false),
            frozen: AtomicBool::new(false),
            sequence: AtomicU64::new(0),
            limit_bytes: limit_bytes.max(1),
        }
    }

    fn threshold(&self) -> u64 {
        self.limit_bytes
            .saturating_sub(BACKPRESSURE_MARGIN.min(self.limit_bytes / 8))
    }

    fn arena_for_thread(&self) -> &Arena {
        let mut hasher = DefaultHasher::new();
        std::thread::current().id().hash(&mut hasher);
        let id = hasher.finish() as usize;
        let index = id % self.arenas.len().max(1);
        &self.arenas[index]
    }

    fn reserve_bytes(&self, bytes: usize) -> Result<(), MemTableError> {
        if bytes == 0 {
            return Ok(());
        }

        let mut current = self.bytes_used.load(AtomicOrdering::Relaxed);
        loop {
            let proposed = current + bytes as u64;
            if proposed > self.limit_bytes {
                self.backpressure.store(true, AtomicOrdering::Release);
                return Err(MemTableError::Backpressure);
            }

            match self.bytes_used.compare_exchange_weak(
                current,
                proposed,
                AtomicOrdering::SeqCst,
                AtomicOrdering::Relaxed,
            ) {
                Ok(_) => {
                    self.arena_for_thread()
                        .used
                        .fetch_add(bytes as u64, AtomicOrdering::Relaxed);
                    if proposed >= self.threshold() {
                        self.backpressure.store(true, AtomicOrdering::Release);
                    }
                    return Ok(());
                }
                Err(actual) => current = actual,
            }
        }
    }

    fn release_bytes(&self, bytes: usize) {
        if bytes == 0 {
            return;
        }

        self.bytes_used
            .fetch_sub(bytes as u64, AtomicOrdering::Relaxed);
        self.arena_for_thread()
            .used
            .fetch_sub(bytes as u64, AtomicOrdering::Relaxed);

        if self.bytes_used.load(AtomicOrdering::Relaxed) < self.threshold() {
            self.backpressure.store(false, AtomicOrdering::Release);
        }
    }

    /// Inserts or replaces a key/value pair in the memtable.
    pub fn put(&self, key: &[u8], value: Vec<u8>) -> Result<MemTableEntry, MemTableError> {
        if self.frozen.load(AtomicOrdering::Acquire) {
            return Err(MemTableError::Frozen);
        }

        let key_arc: Arc<[u8]> = Arc::from(key.to_owned());
        let value_arc: Arc<[u8]> = Arc::from(value);
        let entry = EntryValue {
            value: value_arc.clone(),
            sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
            tombstone: false,
            heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
        };

        let previous = self
            .inner
            .get(&key_arc)
            .map(|existing| existing.value().heap_bytes);

        self.reserve_bytes(entry.heap_bytes)?;
        self.inner.insert(key_arc.clone(), entry.clone());
        if let Some(bytes) = previous {
            self.release_bytes(bytes);
        }

        Ok(entry.into_entry(key_arc))
    }

    /// Inserts or replaces a key/value pair using owned key/value without re-cloning bytes.
    pub fn put_owned(&self, key: Vec<u8>, value: Vec<u8>) -> Result<MemTableEntry, MemTableError> {
        if self.frozen.load(AtomicOrdering::Acquire) {
            return Err(MemTableError::Frozen);
        }

        let key_arc: Arc<[u8]> = Arc::from(key);
        let value_arc: Arc<[u8]> = Arc::from(value);
        let entry = EntryValue {
            value: value_arc.clone(),
            sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
            tombstone: false,
            heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
        };

        let previous = self
            .inner
            .get(&key_arc)
            .map(|existing| existing.value().heap_bytes);

        self.reserve_bytes(entry.heap_bytes)?;
        self.inner.insert(key_arc.clone(), entry.clone());
        if let Some(bytes) = previous {
            self.release_bytes(bytes);
        }

        Ok(entry.into_entry(key_arc))
    }

    /// Inserts or replaces a key/value pair using Arc-backed key/value, allowing cheap retries.
    pub fn put_arc(
        &self,
        key_arc: Arc<[u8]>,
        value_arc: Arc<[u8]>,
    ) -> Result<MemTableEntry, MemTableError> {
        if self.frozen.load(AtomicOrdering::Acquire) {
            return Err(MemTableError::Frozen);
        }

        let entry = EntryValue {
            value: value_arc.clone(),
            sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
            tombstone: false,
            heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
        };

        let previous = self
            .inner
            .get(&key_arc)
            .map(|existing| existing.value().heap_bytes);

        self.reserve_bytes(entry.heap_bytes)?;
        self.inner.insert(key_arc.clone(), entry.clone());
        if let Some(bytes) = previous {
            self.release_bytes(bytes);
        }

        Ok(entry.into_entry(key_arc))
    }

    /// Inserts a tombstone for the provided key.
    pub fn delete(&self, key: &[u8]) -> Result<MemTableEntry, MemTableError> {
        if self.frozen.load(AtomicOrdering::Acquire) {
            return Err(MemTableError::Frozen);
        }

        let key_arc: Arc<[u8]> = Arc::from(key.to_owned());
        let value_arc: Arc<[u8]> = Arc::from(Vec::new());
        let entry = EntryValue {
            value: value_arc.clone(),
            sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
            tombstone: true,
            heap_bytes: key_arc.len() + ENTRY_BASE_OVERHEAD,
        };

        let previous = self
            .inner
            .get(&key_arc)
            .map(|existing| existing.value().heap_bytes);

        self.reserve_bytes(entry.heap_bytes)?;
        self.inner.insert(key_arc.clone(), entry.clone());
        if let Some(bytes) = previous {
            self.release_bytes(bytes);
        }

        Ok(entry.into_entry(key_arc))
    }

    /// Retrieves a key from the memtable if present.
    pub fn get(&self, key: &[u8]) -> Option<MemTableEntry> {
        self.inner
            .get(key)
            .map(|entry| entry.value().clone().into_entry(entry.key().clone()))
    }

    /// Returns the approximate heap usage of the memtable.
    pub fn approximate_size(&self) -> u64 {
        self.bytes_used.load(AtomicOrdering::Relaxed)
    }

    /// Returns the total bytes tracked for the current thread's arena.
    pub fn arena_usage(&self) -> u64 {
        self.arena_for_thread().used.load(AtomicOrdering::Relaxed)
    }

    /// Indicates whether the memtable is currently applying back-pressure.
    pub fn is_backpressured(&self) -> bool {
        self.backpressure.load(AtomicOrdering::Acquire)
    }

    /// Returns the number of live entries tracked by the memtable.
    pub fn len(&self) -> usize {
        self.inner.len()
    }

    /// Marks the memtable as frozen and captures a sorted snapshot of the entries.
    pub fn freeze(&self) -> FrozenMemTable {
        self.frozen.store(true, AtomicOrdering::SeqCst);

        let mut entries: Vec<MemTableEntry> = self
            .inner
            .iter()
            .map(|entry| entry.value().clone().into_entry(entry.key().clone()))
            .collect();

        entries.sort_by(|a, b| a.key.cmp(&b.key).then_with(|| a.sequence.cmp(&b.sequence)));

        FrozenMemTable {
            entries,
            total_bytes: self.approximate_size() as usize,
        }
    }

    /// Returns true if the memtable has been frozen and is read-only.
    pub fn is_frozen(&self) -> bool {
        self.frozen.load(AtomicOrdering::Acquire)
    }
}

/// Immutable memtable snapshot used by the flush coordinator.
pub struct FrozenMemTable {
    entries: Vec<MemTableEntry>,
    total_bytes: usize,
}

impl FrozenMemTable {
    pub fn entries(&self) -> &[MemTableEntry] {
        &self.entries
    }

    pub fn into_entries(self) -> Vec<MemTableEntry> {
        self.entries
    }

    pub fn total_bytes(&self) -> usize {
        self.total_bytes
    }
}