use crate::persistence::point_in_time::{CheckpointRef, PointInTimeRestore};
use crate::vector_store::VectorStore;
use crate::wal::WalEntry;
use crate::Vector;
use anyhow::{anyhow, Result};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct RestoreReport {
pub entries_replayed: usize,
pub base_checkpoint: Option<CheckpointRef>,
pub target_timestamp_secs: u64,
}
pub fn restore_to_timestamp(
store: &mut VectorStore,
target_timestamp_secs: u64,
wal_dir: &Path,
) -> Result<RestoreReport> {
let pit = PointInTimeRestore::new(target_timestamp_secs, wal_dir.to_owned());
let base = pit.find_base_checkpoint()?;
let entries = pit.replay_wal_to_timestamp(base.as_ref())?;
let count = entries.len();
for entry in &entries {
apply_wal_entry(store, entry)?;
}
Ok(RestoreReport {
entries_replayed: count,
base_checkpoint: base,
target_timestamp_secs,
})
}
pub fn apply_wal_entry(store: &mut VectorStore, entry: &WalEntry) -> Result<()> {
match entry {
WalEntry::Insert { id, vector, .. } => {
store
.index_vector(id.clone(), Vector::new(vector.clone()))
.map_err(|e| anyhow!("PIT restore: insert '{}' failed: {}", id, e))?;
}
WalEntry::Update { id, vector, .. } => {
store
.index_vector(id.clone(), Vector::new(vector.clone()))
.map_err(|e| anyhow!("PIT restore: update '{}' failed: {}", id, e))?;
}
WalEntry::Delete { id, .. } => {
store
.remove_vector(id)
.map_err(|e| anyhow!("PIT restore: delete '{}' failed: {}", id, e))?;
}
WalEntry::Batch { entries, .. } => {
for inner in entries {
apply_wal_entry(store, inner)?;
}
}
WalEntry::Checkpoint { .. }
| WalEntry::BeginTransaction { .. }
| WalEntry::CommitTransaction { .. }
| WalEntry::AbortTransaction { .. } => {}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::vector_store::VectorStore;
use crate::wal::{WalConfig, WalEntry, WalManager};
use tempfile::TempDir;
fn populate_wal(dir: &std::path::Path, entries: &[WalEntry]) -> Result<()> {
let config = WalConfig {
wal_directory: dir.to_path_buf(),
checkpoint_interval: u64::MAX,
sync_on_write: true,
..WalConfig::default()
};
let mgr = WalManager::new(config)?;
for e in entries {
mgr.append(e.clone())?;
}
mgr.flush()
}
#[test]
fn test_restore_basic_inserts() -> Result<()> {
let tmp = TempDir::new()?;
let entries = vec![
WalEntry::Insert {
id: "http://ex.org/a".into(),
vector: vec![1.0, 0.0],
metadata: None,
timestamp: 100,
},
WalEntry::Insert {
id: "http://ex.org/b".into(),
vector: vec![0.0, 1.0],
metadata: None,
timestamp: 200,
},
];
populate_wal(tmp.path(), &entries)?;
let mut store = VectorStore::new();
let report = restore_to_timestamp(&mut store, 300, tmp.path())?;
assert_eq!(report.entries_replayed, 2);
assert_eq!(report.target_timestamp_secs, 300);
Ok(())
}
#[test]
fn test_restore_excludes_entries_after_target() -> Result<()> {
let tmp = TempDir::new()?;
let entries = vec![
WalEntry::Insert {
id: "http://ex.org/early".into(),
vector: vec![1.0],
metadata: None,
timestamp: 100,
},
WalEntry::Insert {
id: "http://ex.org/late".into(),
vector: vec![2.0],
metadata: None,
timestamp: 900,
},
];
populate_wal(tmp.path(), &entries)?;
let mut store = VectorStore::new();
let report = restore_to_timestamp(&mut store, 500, tmp.path())?;
assert_eq!(report.entries_replayed, 1);
Ok(())
}
#[test]
fn test_restore_no_wal_entries() -> Result<()> {
let tmp = TempDir::new()?;
let config = WalConfig {
wal_directory: tmp.path().to_path_buf(),
checkpoint_interval: u64::MAX,
sync_on_write: true,
..WalConfig::default()
};
let mgr = WalManager::new(config)?;
mgr.flush()?;
drop(mgr);
let mut store = VectorStore::new();
let report = restore_to_timestamp(&mut store, 9999, tmp.path())?;
assert_eq!(report.entries_replayed, 0);
assert!(report.base_checkpoint.is_none());
Ok(())
}
#[test]
fn test_restore_batch_entries_counted_individually() -> Result<()> {
let tmp = TempDir::new()?;
let batch = WalEntry::Batch {
entries: vec![
WalEntry::Insert {
id: "http://ex.org/x".into(),
vector: vec![1.0],
metadata: None,
timestamp: 50,
},
WalEntry::Insert {
id: "http://ex.org/y".into(),
vector: vec![2.0],
metadata: None,
timestamp: 50,
},
],
timestamp: 50,
};
populate_wal(tmp.path(), &[batch])?;
let mut store = VectorStore::new();
let report = restore_to_timestamp(&mut store, 200, tmp.path())?;
assert_eq!(report.entries_replayed, 1);
Ok(())
}
}