armdb 0.1.10

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::sync::Mutex;

use crate::error::{DbError, DbResult};
use crate::fixed::config::FixedConfig;
use crate::fixed::shard::FixedShardInner;

// ── db.meta format (16 bytes) ─────────────────────────────────────
// [0..4]   magic: b"ARMD"
// [4]      version: u8 = 2
// [5]      backend: u8 = 1 (FixedStore)
// [6]      shard_count: u8
// [7]      shard_prefix_bits: u8
// [8]      flags: u8 = 0 (reserved for encryption)
// [9..16]  reserved zeros

const META_SIZE: usize = 16;
const META_MAGIC: &[u8; 4] = b"ARMD";
const META_VERSION: u8 = 2;
const META_BACKEND: u8 = 1; // FixedStore

/// Wrapper that pairs a shard id with a mutex-protected `FixedShardInner`.
#[allow(dead_code)]
pub(crate) struct FixedShard {
    pub id: u8,
    pub inner: Mutex<FixedShardInner>,
}

/// Multi-shard engine for the fixed-slot storage backend.
///
/// Manages the lifecycle of all shards, the `db.meta` file, and
/// open/close coordination.
pub(crate) struct FixedEngine {
    path: PathBuf,
    shards: Arc<Vec<FixedShard>>,
    config: FixedConfig,
}

// ── db.meta I/O ───────────────────────────────────────────────────

fn write_meta(path: &Path, config: &FixedConfig) -> DbResult<()> {
    let mut buf = [0u8; META_SIZE];
    buf[0..4].copy_from_slice(META_MAGIC);
    buf[4] = META_VERSION;
    buf[5] = META_BACKEND;
    buf[6] = config.shard_count as u8;
    buf[7] = config.shard_prefix_bits as u8;
    buf[8] = 0; // flags (reserved)
    // [9..16] reserved zeros — already zero
    std::fs::write(path, buf)?;
    Ok(())
}

fn validate_meta(path: &Path, config: &FixedConfig) -> DbResult<()> {
    let meta = std::fs::read(path)?;

    if meta.len() != META_SIZE {
        return Err(DbError::FormatMismatch(format!(
            "db.meta: unexpected size: expected {META_SIZE}, got {}",
            meta.len()
        )));
    }

    if &meta[0..4] != META_MAGIC {
        return Err(DbError::FormatMismatch(
            "db.meta: bad magic (expected ARMD)".into(),
        ));
    }

    if meta[4] != META_VERSION {
        return Err(DbError::FormatMismatch(format!(
            "db.meta: version mismatch: stored {}, expected {META_VERSION}",
            meta[4]
        )));
    }

    if meta[5] != META_BACKEND {
        return Err(DbError::FormatMismatch(format!(
            "db.meta: backend mismatch: stored {}, expected {META_BACKEND} (FixedStore)",
            meta[5]
        )));
    }

    let stored_shards = meta[6] as usize;
    if stored_shards != config.shard_count {
        return Err(DbError::FormatMismatch(format!(
            "db.meta: shard_count mismatch: stored {stored_shards}, config {}",
            config.shard_count
        )));
    }

    let stored_prefix = meta[7] as usize;
    if stored_prefix != config.shard_prefix_bits {
        return Err(DbError::FormatMismatch(format!(
            "db.meta: shard_prefix_bits mismatch: stored {stored_prefix}, config {}",
            config.shard_prefix_bits
        )));
    }

    Ok(())
}

// ── FixedEngine ───────────────────────────────────────────────────

impl FixedEngine {
    /// Open or create a fixed-slot database at the given path.
    ///
    /// 1. Validates configuration.
    /// 2. Creates the root directory if needed.
    /// 3. Reads/writes `db.meta`.
    /// 4. Opens every shard (`shard_000/`, `shard_001/`, ...).
    #[tracing::instrument(skip(path,config), fields(path = %path.as_ref().display()))]
    pub fn open(
        path: impl AsRef<Path>,
        config: FixedConfig,
        key_len: u16,
        value_len: u16,
    ) -> DbResult<Self> {
        let path = path.as_ref().to_path_buf();
        config.validate()?;

        tracing::info!(shards = config.shard_count, "opening fixed-slot database");

        std::fs::create_dir_all(&path)?;

        // db.meta: validate or create
        let meta_path = path.join("db.meta");
        if meta_path.exists() {
            validate_meta(&meta_path, &config)?;
        } else {
            write_meta(&meta_path, &config)?;
        }

        // Open each shard
        let mut shards = Vec::with_capacity(config.shard_count);
        for i in 0..config.shard_count {
            let shard_dir = path.join(format!("shard_{i:03}"));
            let inner = FixedShardInner::open(&shard_dir, i as u8, key_len, value_len, &config)?;
            shards.push(FixedShard {
                id: i as u8,
                inner: Mutex::new(inner),
            });
        }

        tracing::info!("fixed-slot database opened");
        Ok(Self {
            path,
            shards: Arc::new(shards),
            config,
        })
    }

    /// Reference to the shard list.
    #[allow(dead_code)]
    pub fn shards(&self) -> &Arc<Vec<FixedShard>> {
        &self.shards
    }

    /// Reference to the engine configuration.
    #[allow(dead_code)]
    pub fn config(&self) -> &FixedConfig {
        &self.config
    }

    /// Root path of the database.
    #[allow(dead_code)]
    pub fn path(&self) -> &Path {
        &self.path
    }

    /// Sync all shard data files to disk.
    pub fn flush(&self) -> DbResult<()> {
        tracing::debug!("flushing all fixed-slot shards");
        for shard in self.shards.iter() {
            shard.inner.lock().sync()?;
        }
        Ok(())
    }

    /// Perform a clean shutdown of every shard.
    pub fn close(&self) -> DbResult<()> {
        tracing::info!("closing fixed-slot database");
        for shard in self.shards.iter() {
            shard.inner.lock().clean_shutdown()?;
        }
        tracing::info!("fixed-slot database closed");
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    fn test_config() -> FixedConfig {
        FixedConfig {
            shard_count: 2,
            grow_step: 64,
            ..FixedConfig::test()
        }
    }

    #[test]
    fn test_engine_open_creates_shards() {
        let dir = tempdir().unwrap();
        let db_path = dir.path().join("testdb");
        let config = test_config();

        let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();

        // db.meta exists
        assert!(db_path.join("db.meta").exists());

        // Shard directories exist
        for i in 0..config.shard_count {
            assert!(
                db_path.join(format!("shard_{i:03}")).exists(),
                "shard_{i:03} directory should exist"
            );
        }

        // Correct number of shards
        assert_eq!(engine.shards().len(), config.shard_count);

        // Each shard has the right id
        for (i, shard) in engine.shards().iter().enumerate() {
            assert_eq!(shard.id, i as u8);
        }
    }

    #[test]
    fn test_engine_reopen() {
        let dir = tempdir().unwrap();
        let db_path = dir.path().join("testdb");
        let config = test_config();

        // Open and close
        {
            let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();
            engine.close().unwrap();
        }

        // Reopen — must succeed with same config
        {
            let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();
            assert_eq!(engine.shards().len(), config.shard_count);
            engine.close().unwrap();
        }
    }

    #[test]
    fn test_engine_shard_count_mismatch() {
        let dir = tempdir().unwrap();
        let db_path = dir.path().join("testdb");

        // Open with 2 shards
        {
            let config = FixedConfig {
                shard_count: 2,
                grow_step: 64,
                ..FixedConfig::test()
            };
            let engine = FixedEngine::open(&db_path, config, 8, 32).unwrap();
            engine.close().unwrap();
        }

        // Reopen with 4 shards — should fail
        {
            let config = FixedConfig {
                shard_count: 4,
                grow_step: 64,
                ..FixedConfig::test()
            };
            let result = FixedEngine::open(&db_path, config, 8, 32);
            assert!(result.is_err());
            let msg = result.err().unwrap().to_string();
            assert!(
                msg.contains("shard_count mismatch"),
                "expected shard_count mismatch error, got: {msg}"
            );
        }
    }
}