armdb 0.1.12

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use crate::config::Config;
use crate::error::DbResult;
use crate::shard::Shard;

#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;

/// On-disk metadata stored in `db.meta`.
///
/// Format (4 bytes):
///   [0] shard_count (u8, 1..=255)
///   [1] shard_prefix_bits (u8)
///   [2] flags: bit 0 = encrypted
///   [3] reserved (0)
const META_SIZE: usize = 4;

fn write_meta(
    path: &std::path::Path,
    config: &Config,
    #[cfg(feature = "encryption")] encrypted: bool,
) -> DbResult<()> {
    let mut buf = [0u8; META_SIZE];
    buf[0] = config.shard_count as u8;
    buf[1] = config.shard_prefix_bits as u8;
    #[cfg(feature = "encryption")]
    if encrypted {
        buf[2] |= 1;
    }
    std::fs::write(path, buf)?;
    Ok(())
}

fn validate_meta(
    path: &std::path::Path,
    config: &Config,
    #[cfg(feature = "encryption")] encrypted: bool,
) -> DbResult<()> {
    let meta = std::fs::read(path)?;

    if meta.len() != META_SIZE {
        return Err(crate::error::DbError::FormatMismatch(format!(
            "db.meta has unexpected size: expected {META_SIZE}, got {}",
            meta.len()
        )));
    }

    let stored_shards = meta[0] as usize;
    if stored_shards != config.shard_count {
        return Err(crate::error::DbError::FormatMismatch(format!(
            "shard_count mismatch: db has {stored_shards}, config has {}",
            config.shard_count,
        )));
    }

    let stored_prefix = meta[1] as usize;
    if stored_prefix != config.shard_prefix_bits {
        return Err(crate::error::DbError::FormatMismatch(format!(
            "shard_prefix_bits mismatch: db has {stored_prefix}, config has {}",
            config.shard_prefix_bits,
        )));
    }

    #[cfg(feature = "encryption")]
    {
        let was_encrypted = meta[2] & 1 != 0;
        if was_encrypted != encrypted {
            return Err(crate::error::DbError::FormatMismatch(if was_encrypted {
                "database is encrypted but no encryption_key provided".into()
            } else {
                "database is not encrypted but encryption_key was provided".into()
            }));
        }
    }

    Ok(())
}

/// Internal storage engine: owns config, shards, and optional cipher.
/// Each tree type embeds one `Engine` — one tree = one database.
#[allow(dead_code)]
pub(crate) struct Engine {
    path: PathBuf,
    config: Config,
    shards: Arc<Vec<Shard>>,
    gsn: Arc<AtomicU64>,
    #[cfg(feature = "encryption")]
    cipher: Option<Arc<PageCipher>>,
}

impl Engine {
    /// Open or create a database at the given path.
    /// Creates shard directories, validates encryption meta, initializes shards.
    #[tracing::instrument(skip(path, config), fields(path = %path.as_ref().display()))]
    pub fn open(path: impl AsRef<Path>, config: Config) -> DbResult<Self> {
        let path = path.as_ref().to_path_buf();
        config.validate()?;
        tracing::info!(shards = config.shard_count, "opening database");

        std::fs::create_dir_all(&path)?;

        #[cfg(feature = "encryption")]
        let cipher = config
            .encryption_key
            .as_ref()
            .map(|key| PageCipher::new(key).map(Arc::new))
            .transpose()?;

        // db.meta: validate immutable parameters (shard_count, shard_prefix_bits, encryption)
        {
            let meta_path = path.join("db.meta");
            if meta_path.exists() {
                validate_meta(
                    &meta_path,
                    &config,
                    #[cfg(feature = "encryption")]
                    cipher.is_some(),
                )?;
            } else {
                write_meta(
                    &meta_path,
                    &config,
                    #[cfg(feature = "encryption")]
                    cipher.is_some(),
                )?;
            }
        }

        let gsn = Arc::new(AtomicU64::new(1));

        let mut shards = Vec::with_capacity(config.shard_count);
        for i in 0..config.shard_count {
            let shard_dir = path.join(format!("shard_{i:03}"));
            #[cfg(feature = "encryption")]
            let shard = Shard::open_encrypted(
                i as u8,
                &shard_dir,
                config.max_file_size,
                config.write_buffer_size,
                config.hints,
                cipher.clone(),
                gsn.clone(),
            )?;
            #[cfg(not(feature = "encryption"))]
            let shard = Shard::open(
                i as u8,
                &shard_dir,
                config.max_file_size,
                config.write_buffer_size,
                config.hints,
                gsn.clone(),
            )?;
            shards.push(shard);
        }

        tracing::info!("database opened");
        Ok(Self {
            path,
            config,
            shards: Arc::new(shards),
            gsn,
            #[cfg(feature = "encryption")]
            cipher,
        })
    }

    #[allow(dead_code)]
    pub fn path(&self) -> &Path {
        &self.path
    }

    #[allow(dead_code)]
    pub fn config(&self) -> &Config {
        &self.config
    }

    pub fn gsn(&self) -> &AtomicU64 {
        &self.gsn
    }

    pub fn shards(&self) -> &Arc<Vec<Shard>> {
        &self.shards
    }

    pub fn shard_dirs(&self) -> Vec<std::path::PathBuf> {
        self.shards.iter().map(|s| s.dir().to_path_buf()).collect()
    }

    pub fn shard_dir_refs(dirs: &[std::path::PathBuf]) -> Vec<&Path> {
        dirs.iter().map(|p| p.as_path()).collect()
    }

    pub fn shard_ids(&self) -> Vec<u8> {
        self.shards.iter().map(|s| s.id).collect()
    }

    pub fn hints(&self) -> bool {
        self.config.hints
    }

    #[cfg(feature = "encryption")]
    pub fn cipher(&self) -> Option<Arc<PageCipher>> {
        self.cipher.clone()
    }

    /// Flush all shard write buffers to disk (without fsync).
    pub fn flush_buffers(&self) -> DbResult<()> {
        tracing::debug!("flushing all buffers");
        for shard in self.shards.iter() {
            shard.flush_buf()?;
        }
        Ok(())
    }

    /// Flush write buffers + fsync all shard active files.
    pub fn flush(&self) -> DbResult<()> {
        tracing::info!("closing database");
        for shard in self.shards.iter() {
            shard.flush()?;
        }
        tracing::info!("database closed");
        Ok(())
    }
}