use std::path::Path;
use tracing::{info, warn};
use crate::data::snapshot::{CoreSnapshot, KvPair};
use crate::storage::snapshot_writer::{load_core_snapshot, load_manifest};
use crate::types::Lsn;
#[derive(Debug, Clone)]
pub struct RestoreResult {
pub snapshot_id: u64,
pub restored_lsn: Lsn,
pub cores_restored: usize,
pub documents_restored: u64,
pub vectors_restored: u64,
pub wal_records_replayed: u64,
}
pub fn execute_restore(
data_dir: &Path,
snap_dir: &Path,
wal_records: &[nodedb_wal::WalRecord],
) -> crate::Result<RestoreResult> {
let manifest = load_manifest(snap_dir)?;
let snapshot_lsn = manifest.meta.end_lsn.as_u64();
info!(
snapshot_id = manifest.meta.snapshot_id,
snapshot_lsn,
cores = manifest.num_cores,
"starting snapshot restore"
);
let mut total_docs = 0u64;
let mut total_vectors = 0u64;
for core_id in 0..manifest.num_cores {
let core_snap = load_core_snapshot(snap_dir, core_id)?;
let (docs, vectors) = restore_core_state(data_dir, core_id, &core_snap)?;
total_docs += docs;
total_vectors += vectors;
info!(
core_id,
documents = docs,
vectors,
watermark = core_snap.watermark,
"core state restored"
);
}
let wal_to_replay: Vec<_> = wal_records
.iter()
.filter(|r| r.header.lsn > snapshot_lsn)
.collect();
let wal_count = wal_to_replay.len() as u64;
if wal_count > 0 {
info!(
records = wal_count,
from_lsn = snapshot_lsn + 1,
"replaying WAL records after snapshot"
);
write_restore_marker(data_dir, snapshot_lsn)?;
}
let result = RestoreResult {
snapshot_id: manifest.meta.snapshot_id,
restored_lsn: manifest.meta.end_lsn,
cores_restored: manifest.num_cores,
documents_restored: total_docs,
vectors_restored: total_vectors,
wal_records_replayed: wal_count,
};
info!(
snapshot_id = result.snapshot_id,
restored_lsn = result.restored_lsn.as_u64(),
documents = result.documents_restored,
vectors = result.vectors_restored,
wal_replayed = result.wal_records_replayed,
"snapshot restore complete"
);
Ok(result)
}
fn restore_core_state(
data_dir: &Path,
core_id: usize,
snap: &CoreSnapshot,
) -> crate::Result<(u64, u64)> {
let sparse_path = data_dir.join(format!("sparse/core-{core_id}.redb"));
if let Some(parent) = sparse_path.parent() {
std::fs::create_dir_all(parent).map_err(crate::Error::Io)?;
}
let sparse = crate::engine::sparse::btree::SparseEngine::open(&sparse_path)?;
restore_sparse_data(&sparse, &snap.sparse_documents, &snap.sparse_indexes)?;
let graph_path = data_dir.join(format!("graph/core-{core_id}.redb"));
if let Some(parent) = graph_path.parent() {
std::fs::create_dir_all(parent).map_err(crate::Error::Io)?;
}
let edge_store = crate::engine::graph::edge_store::store::EdgeStore::open(&graph_path)?;
restore_edge_data(&edge_store, &snap.edges, &snap.reverse_edges)?;
let vectors = restore_vector_checkpoints(data_dir, core_id, &snap.hnsw_indexes)?;
restore_crdt_checkpoints(data_dir, core_id, &snap.crdt_snapshots)?;
Ok((snap.sparse_documents.len() as u64, vectors))
}
fn restore_sparse_data(
sparse: &crate::engine::sparse::btree::SparseEngine,
documents: &[KvPair],
indexes: &[KvPair],
) -> crate::Result<()> {
for kv in documents {
sparse.put_raw(&kv.key, &kv.value)?;
}
for kv in indexes {
sparse.put_raw(&kv.key, &kv.value)?;
}
if !documents.is_empty() {
info!(
documents = documents.len(),
indexes = indexes.len(),
"sparse engine data restored"
);
}
Ok(())
}
fn restore_edge_data(
edge_store: &crate::engine::graph::edge_store::store::EdgeStore,
edges: &[KvPair],
reverse_edges: &[KvPair],
) -> crate::Result<()> {
for kv in edges {
edge_store.put_raw(&kv.key, &kv.value)?;
}
for kv in reverse_edges {
edge_store.put_raw_reverse(&kv.key, &kv.value)?;
}
if !edges.is_empty() {
info!(
edges = edges.len(),
reverse = reverse_edges.len(),
"edge store data restored"
);
}
Ok(())
}
fn restore_vector_checkpoints(
data_dir: &Path,
_core_id: usize,
hnsw_indexes: &[crate::data::snapshot::HnswSnapshot],
) -> crate::Result<u64> {
if hnsw_indexes.is_empty() {
return Ok(0);
}
let ckpt_dir = data_dir.join("vector-ckpt");
std::fs::create_dir_all(&ckpt_dir).map_err(crate::Error::Io)?;
let mut total_vectors = 0u64;
for idx in hnsw_indexes {
let key = format!("{}:{}:emb", idx.tenant_id, idx.collection);
let ckpt_path = ckpt_dir.join(format!("{key}.ckpt"));
let tmp_path = ckpt_dir.join(format!("{key}.ckpt.tmp"));
std::fs::write(&tmp_path, &idx.checkpoint_bytes).map_err(crate::Error::Io)?;
std::fs::rename(&tmp_path, &ckpt_path).map_err(crate::Error::Io)?;
total_vectors += 1; }
info!(
collections = hnsw_indexes.len(),
"vector checkpoints restored"
);
Ok(total_vectors)
}
fn restore_crdt_checkpoints(
data_dir: &Path,
_core_id: usize,
crdt_snapshots: &[crate::data::snapshot::CrdtSnapshot],
) -> crate::Result<()> {
if crdt_snapshots.is_empty() {
return Ok(());
}
let ckpt_dir = data_dir.join("crdt-ckpt");
std::fs::create_dir_all(&ckpt_dir).map_err(crate::Error::Io)?;
for snap in crdt_snapshots {
let ckpt_path = ckpt_dir.join(format!("tenant-{}.ckpt", snap.tenant_id));
let tmp_path = ckpt_dir.join(format!("tenant-{}.ckpt.tmp", snap.tenant_id));
std::fs::write(&tmp_path, &snap.snapshot_bytes).map_err(crate::Error::Io)?;
std::fs::rename(&tmp_path, &ckpt_path).map_err(crate::Error::Io)?;
}
info!(tenants = crdt_snapshots.len(), "CRDT checkpoints restored");
Ok(())
}
fn write_restore_marker(data_dir: &Path, snapshot_lsn: u64) -> crate::Result<()> {
let marker_path = data_dir.join("restore_from_lsn");
std::fs::write(&marker_path, snapshot_lsn.to_string().as_bytes()).map_err(crate::Error::Io)?;
info!(snapshot_lsn, path = %marker_path.display(), "restore marker written");
Ok(())
}
pub fn read_restore_marker(data_dir: &Path) -> Option<u64> {
let marker_path = data_dir.join("restore_from_lsn");
if !marker_path.exists() {
return None;
}
let content = std::fs::read_to_string(&marker_path).ok()?;
let lsn = content.trim().parse::<u64>().ok()?;
if let Err(e) = std::fs::remove_file(&marker_path) {
warn!(error = %e, "failed to delete restore marker");
}
info!(
restore_from_lsn = lsn,
"restore marker found — WAL replay will start from this LSN"
);
Some(lsn)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::snapshot::CoreSnapshot;
#[test]
fn restore_marker_roundtrip() {
let dir = tempfile::tempdir().unwrap();
assert!(read_restore_marker(dir.path()).is_none());
write_restore_marker(dir.path(), 42).unwrap();
assert_eq!(read_restore_marker(dir.path()), Some(42));
assert!(read_restore_marker(dir.path()).is_none());
}
#[test]
fn end_to_end_snapshot_restore() {
let dir = tempfile::tempdir().unwrap();
let data_dir = dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let snap = CoreSnapshot {
watermark: 50,
sparse_documents: vec![
KvPair {
key: "1:docs:d1".into(),
value: b"hello".to_vec(),
},
KvPair {
key: "1:docs:d2".into(),
value: b"world".to_vec(),
},
],
sparse_indexes: vec![],
edges: vec![],
reverse_edges: vec![],
hnsw_indexes: vec![],
crdt_snapshots: vec![],
};
let snap_bytes = snap.to_bytes().unwrap();
let core_snaps = vec![(0, snap_bytes)];
let (meta, snap_dir) =
crate::storage::snapshot_writer::create_base_snapshot(&data_dir, core_snaps, "test")
.unwrap();
let result = execute_restore(&data_dir, &snap_dir, &[]).unwrap();
assert_eq!(result.snapshot_id, meta.snapshot_id);
assert_eq!(result.cores_restored, 1);
assert_eq!(result.documents_restored, 2);
assert_eq!(result.wal_records_replayed, 0);
let sparse_path = data_dir.join("sparse/core-0.redb");
let sparse = crate::engine::sparse::btree::SparseEngine::open(&sparse_path).unwrap();
assert!(sparse.get_raw("1:docs:d1").unwrap().is_some());
assert!(sparse.get_raw("1:docs:d2").unwrap().is_some());
}
}