1use crate::persistence::point_in_time::{CheckpointRef, PointInTimeRestore};
12use crate::vector_store::VectorStore;
13use crate::wal::WalEntry;
14use crate::Vector;
15use anyhow::{anyhow, Result};
16use std::path::Path;
17
18#[derive(Debug, Clone)]
24pub struct RestoreReport {
25 pub entries_replayed: usize,
27 pub base_checkpoint: Option<CheckpointRef>,
29 pub target_timestamp_secs: u64,
31}
32
33pub fn restore_to_timestamp(
49 store: &mut VectorStore,
50 target_timestamp_secs: u64,
51 wal_dir: &Path,
52) -> Result<RestoreReport> {
53 let pit = PointInTimeRestore::new(target_timestamp_secs, wal_dir.to_owned());
54
55 let base = pit.find_base_checkpoint()?;
56 let entries = pit.replay_wal_to_timestamp(base.as_ref())?;
57
58 let count = entries.len();
59 for entry in &entries {
60 apply_wal_entry(store, entry)?;
61 }
62
63 Ok(RestoreReport {
64 entries_replayed: count,
65 base_checkpoint: base,
66 target_timestamp_secs,
67 })
68}
69
70pub fn apply_wal_entry(store: &mut VectorStore, entry: &WalEntry) -> Result<()> {
79 match entry {
80 WalEntry::Insert { id, vector, .. } => {
81 store
82 .index_vector(id.clone(), Vector::new(vector.clone()))
83 .map_err(|e| anyhow!("PIT restore: insert '{}' failed: {}", id, e))?;
84 }
85 WalEntry::Update { id, vector, .. } => {
86 store
88 .index_vector(id.clone(), Vector::new(vector.clone()))
89 .map_err(|e| anyhow!("PIT restore: update '{}' failed: {}", id, e))?;
90 }
91 WalEntry::Delete { id, .. } => {
92 store
95 .remove_vector(id)
96 .map_err(|e| anyhow!("PIT restore: delete '{}' failed: {}", id, e))?;
97 }
98 WalEntry::Batch { entries, .. } => {
99 for inner in entries {
100 apply_wal_entry(store, inner)?;
101 }
102 }
103 WalEntry::Checkpoint { .. }
105 | WalEntry::BeginTransaction { .. }
106 | WalEntry::CommitTransaction { .. }
107 | WalEntry::AbortTransaction { .. } => {}
108 }
109 Ok(())
110}
111
112#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::vector_store::VectorStore;
120 use crate::wal::{WalConfig, WalEntry, WalManager};
121 use tempfile::TempDir;
122
123 fn populate_wal(dir: &std::path::Path, entries: &[WalEntry]) -> Result<()> {
124 let config = WalConfig {
125 wal_directory: dir.to_path_buf(),
126 checkpoint_interval: u64::MAX,
127 sync_on_write: true,
128 ..WalConfig::default()
129 };
130 let mgr = WalManager::new(config)?;
131 for e in entries {
132 mgr.append(e.clone())?;
133 }
134 mgr.flush()
135 }
136
137 #[test]
138 fn test_restore_basic_inserts() -> Result<()> {
139 let tmp = TempDir::new()?;
140 let entries = vec![
141 WalEntry::Insert {
142 id: "http://ex.org/a".into(),
143 vector: vec![1.0, 0.0],
144 metadata: None,
145 timestamp: 100,
146 },
147 WalEntry::Insert {
148 id: "http://ex.org/b".into(),
149 vector: vec![0.0, 1.0],
150 metadata: None,
151 timestamp: 200,
152 },
153 ];
154 populate_wal(tmp.path(), &entries)?;
155
156 let mut store = VectorStore::new();
157 let report = restore_to_timestamp(&mut store, 300, tmp.path())?;
158
159 assert_eq!(report.entries_replayed, 2);
160 assert_eq!(report.target_timestamp_secs, 300);
161 Ok(())
162 }
163
164 #[test]
165 fn test_restore_excludes_entries_after_target() -> Result<()> {
166 let tmp = TempDir::new()?;
167 let entries = vec![
168 WalEntry::Insert {
169 id: "http://ex.org/early".into(),
170 vector: vec![1.0],
171 metadata: None,
172 timestamp: 100,
173 },
174 WalEntry::Insert {
175 id: "http://ex.org/late".into(),
176 vector: vec![2.0],
177 metadata: None,
178 timestamp: 900,
179 },
180 ];
181 populate_wal(tmp.path(), &entries)?;
182
183 let mut store = VectorStore::new();
184 let report = restore_to_timestamp(&mut store, 500, tmp.path())?;
186 assert_eq!(report.entries_replayed, 1);
187 Ok(())
188 }
189
190 #[test]
191 fn test_restore_no_wal_entries() -> Result<()> {
192 let tmp = TempDir::new()?;
193 let config = WalConfig {
195 wal_directory: tmp.path().to_path_buf(),
196 checkpoint_interval: u64::MAX,
197 sync_on_write: true,
198 ..WalConfig::default()
199 };
200 let mgr = WalManager::new(config)?;
201 mgr.flush()?;
202 drop(mgr);
203
204 let mut store = VectorStore::new();
205 let report = restore_to_timestamp(&mut store, 9999, tmp.path())?;
206 assert_eq!(report.entries_replayed, 0);
207 assert!(report.base_checkpoint.is_none());
208 Ok(())
209 }
210
211 #[test]
212 fn test_restore_batch_entries_counted_individually() -> Result<()> {
213 let tmp = TempDir::new()?;
214 let batch = WalEntry::Batch {
215 entries: vec![
216 WalEntry::Insert {
217 id: "http://ex.org/x".into(),
218 vector: vec![1.0],
219 metadata: None,
220 timestamp: 50,
221 },
222 WalEntry::Insert {
223 id: "http://ex.org/y".into(),
224 vector: vec![2.0],
225 metadata: None,
226 timestamp: 50,
227 },
228 ],
229 timestamp: 50,
230 };
231 populate_wal(tmp.path(), &[batch])?;
232
233 let mut store = VectorStore::new();
234 let report = restore_to_timestamp(&mut store, 200, tmp.path())?;
236 assert_eq!(report.entries_replayed, 1);
238 Ok(())
239 }
240}