Skip to main content

oxirs_vec/persistence/
restore.rs

1//! Point-in-time restore for a [`VectorStore`].
2//!
3//! Provides a high-level function that applies WAL replay results to a
4//! [`VectorStore`], effectively rolling the store back (or forward) to any
5//! historical timestamp recorded in the WAL.
6//!
7//! The implementation purposely avoids rebuilding the index from scratch:
8//! only the delta entries between the nearest checkpoint and the target
9//! timestamp are replayed into the existing (or freshly created) store.
10
11use crate::persistence::point_in_time::{CheckpointRef, PointInTimeRestore};
12use crate::vector_store::VectorStore;
13use crate::wal::WalEntry;
14use crate::Vector;
15use anyhow::{anyhow, Result};
16use std::path::Path;
17
18// ─────────────────────────────────────────────────────────────────────────────
19// RestoreReport
20// ─────────────────────────────────────────────────────────────────────────────
21
22/// Summary produced by [`restore_to_timestamp`].
23#[derive(Debug, Clone)]
24pub struct RestoreReport {
25    /// How many WAL data entries were applied to the store.
26    pub entries_replayed: usize,
27    /// The checkpoint that was used as the recovery base, if any.
28    pub base_checkpoint: Option<CheckpointRef>,
29    /// Target Unix-epoch timestamp (seconds) that was requested.
30    pub target_timestamp_secs: u64,
31}
32
33// ─────────────────────────────────────────────────────────────────────────────
34// Public API
35// ─────────────────────────────────────────────────────────────────────────────
36
37/// Restore a [`VectorStore`] to its state at `target_timestamp_secs` (Unix
38/// epoch, seconds).
39///
40/// The function:
41/// 1. Scans `wal_dir` to find the latest checkpoint ≤ `target_timestamp_secs`.
42/// 2. Replays all data entries between that checkpoint and `target_timestamp_secs`
43///    into `store` (insert / update / delete / batch).
44/// 3. Returns a [`RestoreReport`] with replay statistics.
45///
46/// **Note**: this function does not clear the store before replaying.  Callers
47/// that want a clean restore should pass a fresh `VectorStore::new()`.
48pub fn restore_to_timestamp(
49    store: &mut VectorStore,
50    target_timestamp_secs: u64,
51    wal_dir: &Path,
52) -> Result<RestoreReport> {
53    let pit = PointInTimeRestore::new(target_timestamp_secs, wal_dir.to_owned());
54
55    let base = pit.find_base_checkpoint()?;
56    let entries = pit.replay_wal_to_timestamp(base.as_ref())?;
57
58    let count = entries.len();
59    for entry in &entries {
60        apply_wal_entry(store, entry)?;
61    }
62
63    Ok(RestoreReport {
64        entries_replayed: count,
65        base_checkpoint: base,
66        target_timestamp_secs,
67    })
68}
69
70// ─────────────────────────────────────────────────────────────────────────────
71// apply_wal_entry — maps a WalEntry variant onto VectorStore operations
72// ─────────────────────────────────────────────────────────────────────────────
73
74/// Apply a single [`WalEntry`] to a [`VectorStore`].
75///
76/// Structural markers (Checkpoint, Begin/Commit/AbortTransaction) are silently
77/// ignored — only data-bearing variants mutate the store.
78pub fn apply_wal_entry(store: &mut VectorStore, entry: &WalEntry) -> Result<()> {
79    match entry {
80        WalEntry::Insert { id, vector, .. } => {
81            store
82                .index_vector(id.clone(), Vector::new(vector.clone()))
83                .map_err(|e| anyhow!("PIT restore: insert '{}' failed: {}", id, e))?;
84        }
85        WalEntry::Update { id, vector, .. } => {
86            // VectorStore has no dedicated update; re-inserting replaces the entry
87            store
88                .index_vector(id.clone(), Vector::new(vector.clone()))
89                .map_err(|e| anyhow!("PIT restore: update '{}' failed: {}", id, e))?;
90        }
91        WalEntry::Delete { id, .. } => {
92            // MemoryVectorIndex's remove_vector is a no-op by default — if the
93            // concrete index supports deletion the trait impl will pick it up.
94            store
95                .remove_vector(id)
96                .map_err(|e| anyhow!("PIT restore: delete '{}' failed: {}", id, e))?;
97        }
98        WalEntry::Batch { entries, .. } => {
99            for inner in entries {
100                apply_wal_entry(store, inner)?;
101            }
102        }
103        // Structural markers — skip
104        WalEntry::Checkpoint { .. }
105        | WalEntry::BeginTransaction { .. }
106        | WalEntry::CommitTransaction { .. }
107        | WalEntry::AbortTransaction { .. } => {}
108    }
109    Ok(())
110}
111
112// ─────────────────────────────────────────────────────────────────────────────
113// Tests
114// ─────────────────────────────────────────────────────────────────────────────
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::vector_store::VectorStore;
120    use crate::wal::{WalConfig, WalEntry, WalManager};
121    use tempfile::TempDir;
122
123    fn populate_wal(dir: &std::path::Path, entries: &[WalEntry]) -> Result<()> {
124        let config = WalConfig {
125            wal_directory: dir.to_path_buf(),
126            checkpoint_interval: u64::MAX,
127            sync_on_write: true,
128            ..WalConfig::default()
129        };
130        let mgr = WalManager::new(config)?;
131        for e in entries {
132            mgr.append(e.clone())?;
133        }
134        mgr.flush()
135    }
136
137    #[test]
138    fn test_restore_basic_inserts() -> Result<()> {
139        let tmp = TempDir::new()?;
140        let entries = vec![
141            WalEntry::Insert {
142                id: "http://ex.org/a".into(),
143                vector: vec![1.0, 0.0],
144                metadata: None,
145                timestamp: 100,
146            },
147            WalEntry::Insert {
148                id: "http://ex.org/b".into(),
149                vector: vec![0.0, 1.0],
150                metadata: None,
151                timestamp: 200,
152            },
153        ];
154        populate_wal(tmp.path(), &entries)?;
155
156        let mut store = VectorStore::new();
157        let report = restore_to_timestamp(&mut store, 300, tmp.path())?;
158
159        assert_eq!(report.entries_replayed, 2);
160        assert_eq!(report.target_timestamp_secs, 300);
161        Ok(())
162    }
163
164    #[test]
165    fn test_restore_excludes_entries_after_target() -> Result<()> {
166        let tmp = TempDir::new()?;
167        let entries = vec![
168            WalEntry::Insert {
169                id: "http://ex.org/early".into(),
170                vector: vec![1.0],
171                metadata: None,
172                timestamp: 100,
173            },
174            WalEntry::Insert {
175                id: "http://ex.org/late".into(),
176                vector: vec![2.0],
177                metadata: None,
178                timestamp: 900,
179            },
180        ];
181        populate_wal(tmp.path(), &entries)?;
182
183        let mut store = VectorStore::new();
184        // Restore to ts=500 → only the ts=100 entry should be replayed
185        let report = restore_to_timestamp(&mut store, 500, tmp.path())?;
186        assert_eq!(report.entries_replayed, 1);
187        Ok(())
188    }
189
190    #[test]
191    fn test_restore_no_wal_entries() -> Result<()> {
192        let tmp = TempDir::new()?;
193        // Write an empty WAL (just open and close)
194        let config = WalConfig {
195            wal_directory: tmp.path().to_path_buf(),
196            checkpoint_interval: u64::MAX,
197            sync_on_write: true,
198            ..WalConfig::default()
199        };
200        let mgr = WalManager::new(config)?;
201        mgr.flush()?;
202        drop(mgr);
203
204        let mut store = VectorStore::new();
205        let report = restore_to_timestamp(&mut store, 9999, tmp.path())?;
206        assert_eq!(report.entries_replayed, 0);
207        assert!(report.base_checkpoint.is_none());
208        Ok(())
209    }
210
211    #[test]
212    fn test_restore_batch_entries_counted_individually() -> Result<()> {
213        let tmp = TempDir::new()?;
214        let batch = WalEntry::Batch {
215            entries: vec![
216                WalEntry::Insert {
217                    id: "http://ex.org/x".into(),
218                    vector: vec![1.0],
219                    metadata: None,
220                    timestamp: 50,
221                },
222                WalEntry::Insert {
223                    id: "http://ex.org/y".into(),
224                    vector: vec![2.0],
225                    metadata: None,
226                    timestamp: 50,
227                },
228            ],
229            timestamp: 50,
230        };
231        populate_wal(tmp.path(), &[batch])?;
232
233        let mut store = VectorStore::new();
234        // Batch itself is one WAL entry — replay_wal_to_timestamp returns 1 Batch
235        let report = restore_to_timestamp(&mut store, 200, tmp.path())?;
236        // The single Batch entry is replayed; it internally inserts two vectors
237        assert_eq!(report.entries_replayed, 1);
238        Ok(())
239    }
240}