nodedb 0.0.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
//! Snapshot restore execution: loads a snapshot and replays WAL to a target LSN.
//!
//! ## Restore flow
//!
//! 1. Validate the restore plan via `dry_run_restore()`.
//! 2. For each core, load its `CoreSnapshot` from the snapshot directory.
//! 3. Apply the snapshot to the core's engines (redb, HNSW, CRDT).
//! 4. Replay WAL records from `snapshot_lsn` to `target_lsn`.
//!
//! ## Offline restore
//!
//! Restore is an **offline operation** — the server must not be serving traffic.
//! The restore process opens engine storage directly (not via SPSC bridge).
//! After restore, the server is restarted normally and WAL replay covers
//! any records between the snapshot and the crash point.

use std::path::Path;

use tracing::{info, warn};

use crate::data::snapshot::{CoreSnapshot, KvPair};
use crate::storage::snapshot_writer::{load_core_snapshot, load_manifest};
use crate::types::Lsn;

/// Result of a snapshot restore operation.
#[derive(Debug, Clone)]
pub struct RestoreResult {
    /// Snapshot that was restored.
    pub snapshot_id: u64,
    /// LSN at which the restore completed (snapshot end_lsn).
    pub restored_lsn: Lsn,
    /// Number of cores restored.
    pub cores_restored: usize,
    /// Total documents restored across all cores.
    pub documents_restored: u64,
    /// Total vectors restored across all cores.
    pub vectors_restored: u64,
    /// WAL records replayed after snapshot.
    pub wal_records_replayed: u64,
}

/// Execute a snapshot restore.
///
/// Restores engine state from a snapshot directory, then replays WAL records
/// from the snapshot LSN to rebuild any writes that occurred after the snapshot.
///
/// `data_dir` is the server's data directory (where engine files live).
/// `snap_dir` is the snapshot directory to restore from.
/// `wal_records` are all available WAL records (already loaded by the caller).
///
/// This is an **offline operation** — must not be called while serving traffic.
pub fn execute_restore(
    data_dir: &Path,
    snap_dir: &Path,
    wal_records: &[nodedb_wal::WalRecord],
) -> crate::Result<RestoreResult> {
    let manifest = load_manifest(snap_dir)?;
    let snapshot_lsn = manifest.meta.end_lsn.as_u64();

    info!(
        snapshot_id = manifest.meta.snapshot_id,
        snapshot_lsn,
        cores = manifest.num_cores,
        "starting snapshot restore"
    );

    let mut total_docs = 0u64;
    let mut total_vectors = 0u64;

    // Restore each core's state.
    for core_id in 0..manifest.num_cores {
        let core_snap = load_core_snapshot(snap_dir, core_id)?;
        let (docs, vectors) = restore_core_state(data_dir, core_id, &core_snap)?;
        total_docs += docs;
        total_vectors += vectors;

        info!(
            core_id,
            documents = docs,
            vectors,
            watermark = core_snap.watermark,
            "core state restored"
        );
    }

    // Replay WAL records after the snapshot LSN.
    let wal_to_replay: Vec<_> = wal_records
        .iter()
        .filter(|r| r.header.lsn > snapshot_lsn)
        .collect();

    let wal_count = wal_to_replay.len() as u64;
    if wal_count > 0 {
        info!(
            records = wal_count,
            from_lsn = snapshot_lsn + 1,
            "replaying WAL records after snapshot"
        );
        // WAL replay is handled by the normal startup path (CoreLoop::replay_vector_wal
        // and the per-record replay in the event loop). We write a marker file so the
        // startup code knows to replay from this LSN.
        write_restore_marker(data_dir, snapshot_lsn)?;
    }

    let result = RestoreResult {
        snapshot_id: manifest.meta.snapshot_id,
        restored_lsn: manifest.meta.end_lsn,
        cores_restored: manifest.num_cores,
        documents_restored: total_docs,
        vectors_restored: total_vectors,
        wal_records_replayed: wal_count,
    };

    info!(
        snapshot_id = result.snapshot_id,
        restored_lsn = result.restored_lsn.as_u64(),
        documents = result.documents_restored,
        vectors = result.vectors_restored,
        wal_replayed = result.wal_records_replayed,
        "snapshot restore complete"
    );

    Ok(result)
}

/// Restore a single core's engine state from a `CoreSnapshot`.
///
/// Opens the core's redb databases, clears existing data, and loads
/// the snapshot data. Returns `(documents_count, vectors_count)`.
fn restore_core_state(
    data_dir: &Path,
    core_id: usize,
    snap: &CoreSnapshot,
) -> crate::Result<(u64, u64)> {
    // Restore sparse engine (documents + indexes).
    let sparse_path = data_dir.join(format!("sparse/core-{core_id}.redb"));
    if let Some(parent) = sparse_path.parent() {
        std::fs::create_dir_all(parent).map_err(crate::Error::Io)?;
    }
    let sparse = crate::engine::sparse::btree::SparseEngine::open(&sparse_path)?;
    restore_sparse_data(&sparse, &snap.sparse_documents, &snap.sparse_indexes)?;

    // Restore edge store.
    let graph_path = data_dir.join(format!("graph/core-{core_id}.redb"));
    if let Some(parent) = graph_path.parent() {
        std::fs::create_dir_all(parent).map_err(crate::Error::Io)?;
    }
    let edge_store = crate::engine::graph::edge_store::store::EdgeStore::open(&graph_path)?;
    restore_edge_data(&edge_store, &snap.edges, &snap.reverse_edges)?;

    // Restore vector indexes by writing checkpoint files.
    // The normal startup path (load_vector_checkpoints) will load these.
    let vectors = restore_vector_checkpoints(data_dir, core_id, &snap.hnsw_indexes)?;

    // Restore CRDT snapshots by writing checkpoint files.
    // The normal startup path will import these.
    restore_crdt_checkpoints(data_dir, core_id, &snap.crdt_snapshots)?;

    Ok((snap.sparse_documents.len() as u64, vectors))
}

/// Restore sparse engine documents and indexes from snapshot data.
fn restore_sparse_data(
    sparse: &crate::engine::sparse::btree::SparseEngine,
    documents: &[KvPair],
    indexes: &[KvPair],
) -> crate::Result<()> {
    // Batch insert documents. We write directly using the raw key/value
    // pairs from the snapshot (they include the tenant:collection:id prefix).
    for kv in documents {
        sparse.put_raw(&kv.key, &kv.value)?;
    }

    // Batch insert index entries.
    for kv in indexes {
        sparse.put_raw(&kv.key, &kv.value)?;
    }

    if !documents.is_empty() {
        info!(
            documents = documents.len(),
            indexes = indexes.len(),
            "sparse engine data restored"
        );
    }

    Ok(())
}

/// Restore edge store data from snapshot.
fn restore_edge_data(
    edge_store: &crate::engine::graph::edge_store::store::EdgeStore,
    edges: &[KvPair],
    reverse_edges: &[KvPair],
) -> crate::Result<()> {
    for kv in edges {
        edge_store.put_raw(&kv.key, &kv.value)?;
    }
    for kv in reverse_edges {
        edge_store.put_raw_reverse(&kv.key, &kv.value)?;
    }

    if !edges.is_empty() {
        info!(
            edges = edges.len(),
            reverse = reverse_edges.len(),
            "edge store data restored"
        );
    }

    Ok(())
}

/// Write vector checkpoint files from snapshot data.
///
/// Writes `.ckpt` files that `load_vector_checkpoints()` will pick up on startup.
fn restore_vector_checkpoints(
    data_dir: &Path,
    _core_id: usize,
    hnsw_indexes: &[crate::data::snapshot::HnswSnapshot],
) -> crate::Result<u64> {
    if hnsw_indexes.is_empty() {
        return Ok(0);
    }

    let ckpt_dir = data_dir.join("vector-ckpt");
    std::fs::create_dir_all(&ckpt_dir).map_err(crate::Error::Io)?;

    let mut total_vectors = 0u64;
    for idx in hnsw_indexes {
        let key = format!("{}:{}:emb", idx.tenant_id, idx.collection);
        let ckpt_path = ckpt_dir.join(format!("{key}.ckpt"));
        let tmp_path = ckpt_dir.join(format!("{key}.ckpt.tmp"));

        std::fs::write(&tmp_path, &idx.checkpoint_bytes).map_err(crate::Error::Io)?;
        std::fs::rename(&tmp_path, &ckpt_path).map_err(crate::Error::Io)?;

        // Count vectors from checkpoint size (rough estimate: each vector
        // is ~3 KiB at 768-dim FP32, but we don't parse the checkpoint here).
        total_vectors += 1; // At minimum, one collection restored.
    }

    info!(
        collections = hnsw_indexes.len(),
        "vector checkpoints restored"
    );

    Ok(total_vectors)
}

/// Write CRDT checkpoint files from snapshot data.
fn restore_crdt_checkpoints(
    data_dir: &Path,
    _core_id: usize,
    crdt_snapshots: &[crate::data::snapshot::CrdtSnapshot],
) -> crate::Result<()> {
    if crdt_snapshots.is_empty() {
        return Ok(());
    }

    let ckpt_dir = data_dir.join("crdt-ckpt");
    std::fs::create_dir_all(&ckpt_dir).map_err(crate::Error::Io)?;

    for snap in crdt_snapshots {
        let ckpt_path = ckpt_dir.join(format!("tenant-{}.ckpt", snap.tenant_id));
        let tmp_path = ckpt_dir.join(format!("tenant-{}.ckpt.tmp", snap.tenant_id));

        std::fs::write(&tmp_path, &snap.snapshot_bytes).map_err(crate::Error::Io)?;
        std::fs::rename(&tmp_path, &ckpt_path).map_err(crate::Error::Io)?;
    }

    info!(tenants = crdt_snapshots.len(), "CRDT checkpoints restored");

    Ok(())
}

/// Write a restore marker file that tells the startup code to replay
/// WAL records only from the given LSN forward.
fn write_restore_marker(data_dir: &Path, snapshot_lsn: u64) -> crate::Result<()> {
    let marker_path = data_dir.join("restore_from_lsn");
    std::fs::write(&marker_path, snapshot_lsn.to_string().as_bytes()).map_err(crate::Error::Io)?;
    info!(snapshot_lsn, path = %marker_path.display(), "restore marker written");
    Ok(())
}

/// Read and consume the restore marker (if present).
///
/// Called during startup. Returns the LSN to replay from, or `None` if
/// no restore marker exists (normal startup).
pub fn read_restore_marker(data_dir: &Path) -> Option<u64> {
    let marker_path = data_dir.join("restore_from_lsn");
    if !marker_path.exists() {
        return None;
    }

    let content = std::fs::read_to_string(&marker_path).ok()?;
    let lsn = content.trim().parse::<u64>().ok()?;

    // Delete the marker after reading — it's a one-time signal.
    if let Err(e) = std::fs::remove_file(&marker_path) {
        warn!(error = %e, "failed to delete restore marker");
    }

    info!(
        restore_from_lsn = lsn,
        "restore marker found — WAL replay will start from this LSN"
    );
    Some(lsn)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::data::snapshot::CoreSnapshot;

    #[test]
    fn restore_marker_roundtrip() {
        let dir = tempfile::tempdir().unwrap();

        // No marker initially.
        assert!(read_restore_marker(dir.path()).is_none());

        // Write marker.
        write_restore_marker(dir.path(), 42).unwrap();

        // Read and consume.
        assert_eq!(read_restore_marker(dir.path()), Some(42));

        // Consumed — should be gone.
        assert!(read_restore_marker(dir.path()).is_none());
    }

    #[test]
    fn end_to_end_snapshot_restore() {
        let dir = tempfile::tempdir().unwrap();
        let data_dir = dir.path().join("data");
        std::fs::create_dir_all(&data_dir).unwrap();

        // Create a snapshot with some sparse data.
        let snap = CoreSnapshot {
            watermark: 50,
            sparse_documents: vec![
                KvPair {
                    key: "1:docs:d1".into(),
                    value: b"hello".to_vec(),
                },
                KvPair {
                    key: "1:docs:d2".into(),
                    value: b"world".to_vec(),
                },
            ],
            sparse_indexes: vec![],
            edges: vec![],
            reverse_edges: vec![],
            hnsw_indexes: vec![],
            crdt_snapshots: vec![],
        };

        let snap_bytes = snap.to_bytes().unwrap();
        let core_snaps = vec![(0, snap_bytes)];
        let (meta, snap_dir) =
            crate::storage::snapshot_writer::create_base_snapshot(&data_dir, core_snaps, "test")
                .unwrap();

        // Restore from the snapshot (no WAL records to replay).
        let result = execute_restore(&data_dir, &snap_dir, &[]).unwrap();
        assert_eq!(result.snapshot_id, meta.snapshot_id);
        assert_eq!(result.cores_restored, 1);
        assert_eq!(result.documents_restored, 2);
        assert_eq!(result.wal_records_replayed, 0);

        // Verify sparse engine has the restored data.
        let sparse_path = data_dir.join("sparse/core-0.redb");
        let sparse = crate::engine::sparse::btree::SparseEngine::open(&sparse_path).unwrap();
        assert!(sparse.get_raw("1:docs:d1").unwrap().is_some());
        assert!(sparse.get_raw("1:docs:d2").unwrap().is_some());
    }
}