mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]
//! Snapshot tooling for MindB manifests and immutable segments.
//!
//! A snapshot is a point-in-time export of the durable metadata coupled with
//! copies (or hard-links) of the immutable segment files. The helper is used by
//! integration tests and disaster-recovery workflows to ship data to secondary
//! sites.

use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

use crate::storage::manifest::{FileManifest, SegmentTier};
use crate::storage::{Manifest, StorageError};
use libc::EXDEV;

/// Errors produced while attempting to create a snapshot.
#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
    #[error("io error: {0}")]
    Io(#[from] io::Error),
    #[error("storage error: {0}")]
    Storage(#[from] StorageError),
}

/// Controls how snapshots are materialised on disk.
#[derive(Clone, Debug)]
pub struct SnapshotOptions {
    pub output_dir: PathBuf,
    pub include_segments: bool,
    pub hard_link_segments: bool,
    pub snapshot_name: Option<String>,
}

impl SnapshotOptions {
    pub fn new<P: Into<PathBuf>>(output_dir: P) -> Self {
        Self {
            output_dir: output_dir.into(),
            include_segments: true,
            hard_link_segments: false,
            snapshot_name: None,
        }
    }

    pub fn with_segments(mut self, include: bool) -> Self {
        self.include_segments = include;
        self
    }

    pub fn with_hard_links(mut self, enable: bool) -> Self {
        self.hard_link_segments = enable;
        self
    }

    pub fn with_name<S: Into<String>>(mut self, name: S) -> Self {
        self.snapshot_name = Some(name.into());
        self
    }
}

/// Snapshot metadata returned after a successful export.
#[derive(Clone, Debug)]
pub struct SnapshotHandle {
    pub root: PathBuf,
    pub manifest_path: PathBuf,
    pub segments: Vec<SnapshotSegment>,
    pub bytes_copied: u64,
    pub created_at_ms: u128,
}

/// Segment copy represented inside a snapshot.
#[derive(Clone, Debug)]
pub struct SnapshotSegment {
    pub id: String,
    pub tier: SegmentTier,
    pub original: PathBuf,
    pub snapshot: PathBuf,
    pub bytes: u64,
}

/// Coordinator responsible for materialising snapshot directories.
#[derive(Clone, Debug)]
pub struct SnapshotManager {
    options: SnapshotOptions,
}

impl SnapshotManager {
    pub fn new(options: SnapshotOptions) -> Self {
        Self { options }
    }

    /// Persists the manifest and copies the immutable segments into the
    /// configured output directory. The manifest is always persisted before the
    /// copy step so that the snapshot captures the most recent view of the
    /// metadata.
    pub fn create(&self, manifest: &FileManifest) -> Result<SnapshotHandle, SnapshotError> {
        manifest.persist()?;

        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis();
        let name = self
            .options
            .snapshot_name
            .clone()
            .unwrap_or_else(|| format!("snapshot-{timestamp}"));
        let root = self.options.output_dir.join(&name);
        fs::create_dir_all(&root)?;

        let manifest_target = root.join("MANIFEST.json");
        let bytes_copied = fs::copy(manifest.path(), &manifest_target)?;

        let mut copied_bytes = bytes_copied as u64;
        let mut segments = Vec::new();

        if self.options.include_segments {
            let segment_dir = root.join("segments");
            fs::create_dir_all(&segment_dir)?;

            for tier in SegmentTier::all() {
                for segment in manifest.segments_in(&tier) {
                    let file_name =
                        file_name_for_segment(&segment.path).unwrap_or_else(|| segment.id.clone());
                    let snapshot_path = segment_dir.join(file_name);
                    let bytes = self.copy_segment(&segment.path, &snapshot_path)?;
                    copied_bytes += bytes;
                    segments.push(SnapshotSegment {
                        id: segment.id.clone(),
                        tier,
                        original: segment.path.clone(),
                        snapshot: snapshot_path,
                        bytes,
                    });
                }
            }
        }

        Ok(SnapshotHandle {
            root,
            manifest_path: manifest_target,
            segments,
            bytes_copied: copied_bytes,
            created_at_ms: timestamp,
        })
    }

    fn copy_segment(&self, src: &Path, dst: &Path) -> Result<u64, SnapshotError> {
        if let Some(parent) = dst.parent() {
            fs::create_dir_all(parent)?;
        }

        if self.options.hard_link_segments {
            match fs::hard_link(src, dst) {
                Ok(()) => {
                    let meta = fs::metadata(src)?;
                    return Ok(meta.len());
                }
                Err(err) => {
                    if err.raw_os_error() != Some(EXDEV) {
                        return Err(SnapshotError::Io(err));
                    }
                    // Fall back to copying if the filesystem does not allow links.
                }
            }
        }

        let copied = fs::copy(src, dst)?;
        Ok(copied as u64)
    }
}

fn file_name_for_segment(path: &Path) -> Option<String> {
    path.file_name().map(|os| os.to_string_lossy().to_string())
}