armdb 0.1.12

sharded bitcask key-value storage optimized for NVMe
Documentation
//! Durability trait — abstracts Bitcask (append-only log) and FixedStore
//! (fixed-slot pwrite) backends behind a common interface.
//!
//! [`DurabilityInner`] covers per-shard write operations (one locked shard).
//! [`Durability`] covers engine-level operations (shard routing, open/close).
//!
//! Concrete types:
//! - [`Bitcask`] / [`ShardInner`](crate::shard::ShardInner) — append-only log
//! - [`Fixed`] / [`FixedShardInner`](crate::fixed::shard::FixedShardInner) — fixed-slot pwrite

use std::path::Path;

use crate::disk_loc::DiskLoc;
use crate::entry::entry_size;
use crate::error::DbResult;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot;
use crate::key::Location;
use crate::sync::{self, MutexGuard};

// ── Trait: per-shard write operations ────────────────────────────────────────

/// Per-shard write operations abstracted over storage backend.
///
/// Implementations operate on a single shard (already locked by the caller
/// via [`Durability::lock_shard`]).
pub trait DurabilityInner: Send {
    type Loc: Location;

    /// Write a new entry (key not previously present). Returns location.
    fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<Self::Loc>;

    /// Update an existing entry. `old_loc` is used for dead-bytes tracking
    /// (Bitcask) or in-place overwrite (FixedStore).
    /// Returns the new location (Bitcask: new DiskLoc; FixedStore: same slot_id).
    fn write_update(
        &mut self,
        shard_id: u8,
        old_loc: Self::Loc,
        key: &[u8],
        value: &[u8],
    ) -> DbResult<Self::Loc>;

    /// Delete an entry. Writes a tombstone (Bitcask) or marks the slot deleted
    /// (FixedStore).
    fn write_tombstone(&mut self, shard_id: u8, old_loc: Self::Loc, key: &[u8]) -> DbResult<()>;

    /// Discard a written location (race-condition cleanup).
    /// Bitcask: no-op (compaction handles stale entries).
    /// FixedStore: free the allocated slot.
    fn write_discard(&mut self, loc: Self::Loc) -> DbResult<()>;

    /// Whether sync is needed after recent writes.
    fn should_sync(&self) -> bool;

    /// Perform sync (fdatasync).
    fn sync(&mut self) -> DbResult<()>;
}

// ── Trait: engine-level operations ───────────────────────────────────────────

/// Engine-level durability operations.
///
/// One `Durability` instance owns the full set of shards for a single
/// database directory. Collections (ConstTree, ConstMap, etc.) are
/// generic over `D: Durability` and delegate all disk I/O through this
/// trait.
pub trait Durability: Send + Sync + Sized {
    type Loc: Location;
    type Inner: DurabilityInner<Loc = Self::Loc>;

    fn shard_count(&self) -> usize;
    fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, Self::Inner>;
    fn shard_prefix_bits(&self) -> usize;
    fn flush(&self) -> DbResult<()>;
    fn close(&self) -> DbResult<()>;
}

// ══════════════════════════════════════════════════════════════════════════════
// Bitcask backend
// ══════════════════════════════════════════════════════════════════════════════

/// Engine-level wrapper for the Bitcask (append-only log) backend.
#[allow(dead_code)]
pub struct Bitcask {
    pub(crate) engine: crate::engine::Engine,
    pub(crate) compaction_threshold: f64,
}

// ── DurabilityInner for ShardInner (Bitcask) ─────────────────────────────────

impl DurabilityInner for crate::shard::ShardInner {
    type Loc = DiskLoc;

    fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<DiskLoc> {
        let (loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
        Ok(loc)
    }

    fn write_update(
        &mut self,
        shard_id: u8,
        old_loc: DiskLoc,
        key: &[u8],
        value: &[u8],
    ) -> DbResult<DiskLoc> {
        let (new_loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
        self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
        Ok(new_loc)
    }

    fn write_tombstone(&mut self, shard_id: u8, old_loc: DiskLoc, key: &[u8]) -> DbResult<()> {
        let (_loc, _gsn) = self.append_entry(shard_id, key, &[], true)?;
        self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
        Ok(())
    }

    fn write_discard(&mut self, _loc: DiskLoc) -> DbResult<()> {
        // No-op: compaction handles stale entries in Bitcask.
        Ok(())
    }

    fn should_sync(&self) -> bool {
        // Bitcask syncs via write buffer flush, not per-entry.
        false
    }

    fn sync(&mut self) -> DbResult<()> {
        // Bitcask syncs via Shard::flush() at the engine level.
        Ok(())
    }
}

// ── Durability for Bitcask ───────────────────────────────────────────────────

impl Durability for Bitcask {
    type Loc = DiskLoc;
    type Inner = crate::shard::ShardInner;

    fn shard_count(&self) -> usize {
        self.engine.shards().len()
    }

    fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::shard::ShardInner> {
        self.engine.shards()[shard_id].lock()
    }

    fn shard_prefix_bits(&self) -> usize {
        self.engine.config().shard_prefix_bits
    }

    fn flush(&self) -> DbResult<()> {
        self.engine.flush()
    }

    fn close(&self) -> DbResult<()> {
        self.engine.flush()
    }
}

// ══════════════════════════════════════════════════════════════════════════════
// Fixed backend
// ══════════════════════════════════════════════════════════════════════════════

/// Engine-level wrapper for the FixedStore (fixed-slot pwrite) backend.
#[allow(dead_code)]
pub struct Fixed {
    pub(crate) engine: crate::fixed::engine::FixedEngine,
}

impl Fixed {
    /// Open or create a fixed-slot database at the given path.
    pub fn open(
        path: impl AsRef<Path>,
        config: FixedConfig,
        key_len: usize,
        value_len: usize,
    ) -> DbResult<Self> {
        let engine = crate::fixed::engine::FixedEngine::open(
            path,
            config,
            key_len as u16,
            value_len as u16,
        )?;
        Ok(Self { engine })
    }

    /// Recover entries from all shards. For each valid entry, calls `visitor`
    /// with `(shard_id, key_bytes, value_bytes, slot_id)`.
    ///
    /// Handles both clean shutdown (bitmap-guided) and dirty (full scan)
    /// recovery paths per shard.
    pub fn recover_entries(
        &self,
        mut visitor: impl FnMut(usize, &[u8], &[u8], u32),
    ) -> DbResult<u32> {
        let shards = self.engine.shards();
        let mut total_recovered = 0u32;

        for (shard_idx, shard) in shards.iter().enumerate() {
            let mut inner = shard.inner.lock();

            let key_len = inner.key_len() as usize;
            let value_len = inner.value_len() as usize;

            if inner.has_clean_shutdown() {
                // ── Clean shutdown path ──────────────────────────────────
                inner.load_bitmap_sidecar()?;
                let slot_count = inner.slot_count();

                for slot_id in 0..slot_count {
                    if !inner.bitmap.is_set(slot_id) {
                        continue;
                    }
                    let buf = inner.read_slot(slot_id)?;
                    if let Some((key_bytes, value_bytes)) =
                        slot::validate_slot(&buf, key_len, value_len)
                    {
                        visitor(shard_idx, key_bytes, value_bytes, slot_id);
                        total_recovered += 1;
                    } else {
                        // CRC mismatch or not occupied — clear the corrupted bit.
                        inner.bitmap.clear(slot_id);
                    }
                }
            } else {
                // ── Dirty path (full scan) ───────────────────────────────
                let slot_count = inner.slot_count();

                for slot_id in 0..slot_count {
                    let buf = inner.read_slot(slot_id)?;
                    if slot::slot_status(&buf) != slot::SLOT_OCCUPIED {
                        continue;
                    }
                    if let Some((key_bytes, value_bytes)) =
                        slot::validate_slot(&buf, key_len, value_len)
                    {
                        inner.bitmap.set(slot_id);
                        visitor(shard_idx, key_bytes, value_bytes, slot_id);
                        total_recovered += 1;
                    }
                    // Invalid CRC — bitmap stays 0 for this slot.
                }
            }

            inner.clear_clean_shutdown()?;
        }

        Ok(total_recovered)
    }
}

// ── DurabilityInner for FixedShardInner ──────────────────────────────────────

impl DurabilityInner for crate::fixed::shard::FixedShardInner {
    type Loc = u32;

    fn write_new(&mut self, _shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<u32> {
        let slot = self.alloc_slot()?;
        self.write_slot(slot, key, value)?;
        Ok(slot)
    }

    fn write_update(
        &mut self,
        _shard_id: u8,
        old_loc: u32,
        key: &[u8],
        value: &[u8],
    ) -> DbResult<u32> {
        self.write_slot(old_loc, key, value)?;
        Ok(old_loc)
    }

    fn write_tombstone(&mut self, _shard_id: u8, old_loc: u32, key: &[u8]) -> DbResult<()> {
        let _ = key; // key unused for FixedStore delete
        self.delete_slot(old_loc)?;
        self.bitmap.clear(old_loc);
        Ok(())
    }

    fn write_discard(&mut self, loc: u32) -> DbResult<()> {
        self.delete_slot(loc)?;
        self.bitmap.clear(loc);
        Ok(())
    }

    fn should_sync(&self) -> bool {
        self.should_sync()
    }

    fn sync(&mut self) -> DbResult<()> {
        self.sync()
    }
}

// ── Durability for Fixed ─────────────────────────────────────────────────────

impl Durability for Fixed {
    type Loc = u32;
    type Inner = crate::fixed::shard::FixedShardInner;

    fn shard_count(&self) -> usize {
        self.engine.shards().len()
    }

    fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::fixed::shard::FixedShardInner> {
        sync::lock(&self.engine.shards()[shard_id].inner)
    }

    fn shard_prefix_bits(&self) -> usize {
        self.engine.config().shard_prefix_bits
    }

    fn flush(&self) -> DbResult<()> {
        self.engine.flush()
    }

    fn close(&self) -> DbResult<()> {
        self.engine.close()
    }
}