mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::collections::{BTreeMap, HashSet};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};

use crate::storage::segment::SegmentMetadata;
use crate::storage::{Manifest, Result, StorageError};

const MANIFEST_VERSION: u32 = 1;

#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[serde(rename_all = "lowercase")]
pub enum SegmentTier {
    Hot,
    Warm,
    Cold,
}

impl SegmentTier {
    pub fn all() -> [SegmentTier; 3] {
        [SegmentTier::Hot, SegmentTier::Warm, SegmentTier::Cold]
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompactionMetadata {
    pub job_id: String,
    pub inputs: Vec<String>,
    pub output_id: String,
    pub output_digest: String,
    pub target_tier: SegmentTier,
    pub started_at_ms: u128,
    pub completed_at_ms: u128,
    pub bytes_rewritten: u64,
    pub retired: Vec<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct ManifestState {
    version: u32,
    tiers: BTreeMap<SegmentTier, Vec<SegmentMetadata>>,
    compactions: Vec<CompactionMetadata>,
}

impl ManifestState {
    fn new() -> Self {
        let mut tiers = BTreeMap::new();
        for tier in SegmentTier::all() {
            tiers.insert(tier, Vec::new());
        }
        Self {
            version: MANIFEST_VERSION,
            tiers,
            compactions: Vec::new(),
        }
    }
}

/// Persistent manifest backed by a JSON file on disk.
pub struct FileManifest {
    path: PathBuf,
    state: ManifestState,
}

impl FileManifest {
    pub fn open<P: Into<PathBuf>>(path: P) -> Result<Self> {
        let path = path.into();
        if !path.exists() {
            if let Some(parent) = path.parent() {
                fs::create_dir_all(parent)?;
            }
            let state = ManifestState::new();
            let manifest = Self { path, state };
            manifest.persist()?;
            return Ok(manifest);
        }

        let file = File::open(&path)?;
        let reader = BufReader::new(file);
        let state: ManifestState = serde_json::from_reader(reader)?;
        if state.version != MANIFEST_VERSION {
            return Err(StorageError::InvalidFormat(
                "unsupported manifest version".into(),
            ));
        }

        Ok(Self { path, state })
    }

    pub fn segments_in(&self, tier: &SegmentTier) -> &[SegmentMetadata] {
        self.state
            .tiers
            .get(tier)
            .map(Vec::as_slice)
            .unwrap_or_default()
    }

    pub fn compaction_history(&self) -> &[CompactionMetadata] {
        &self.state.compactions
    }

    fn remove_segments(&mut self, ids: &HashSet<String>) {
        for tier_segments in self.state.tiers.values_mut() {
            tier_segments.retain(|segment| !ids.contains(&segment.id));
        }
    }
}

impl Manifest for FileManifest {
    fn register_segment(&mut self, tier: SegmentTier, metadata: SegmentMetadata) -> Result<()> {
        let entry = self.state.tiers.entry(tier).or_insert_with(Vec::new);
        entry.push(metadata);
        Ok(())
    }

    fn record_compaction(
        &mut self,
        output_tier: SegmentTier,
        output: SegmentMetadata,
        retired: Vec<String>,
        metadata: CompactionMetadata,
    ) -> Result<()> {
        let retired_set: HashSet<String> = retired.into_iter().collect();
        self.remove_segments(&retired_set);
        self.register_segment(output_tier.clone(), output)?;
        self.state.compactions.push(metadata);
        Ok(())
    }

    fn persist(&self) -> Result<()> {
        if let Some(parent) = self.path.parent() {
            fs::create_dir_all(parent)?;
        }
        let file = File::create(&self.path)?;
        let writer = BufWriter::new(file);
        serde_json::to_writer_pretty(writer, &self.state)?;
        Ok(())
    }
}

impl FileManifest {
    pub fn path(&self) -> &Path {
        &self.path
    }
}