Skip to main content

laminar_storage/per_core_wal/
recovery.rs

1//! Recovery from per-core WAL segments.
2//!
3//! Provides functionality to recover state from checkpoint + WAL segments.
4
5use std::path::{Path, PathBuf};
6
7use crate::incremental::{RecoveredState as BaseRecoveredState, RecoveryConfig, RecoveryManager};
8
9use super::entry::{PerCoreWalEntry, WalOperation};
10use super::error::PerCoreWalError;
11use super::manager::PerCoreWalConfig;
12use super::reader::PerCoreWalReader;
13
14/// Recovered state from per-core WAL.
15#[derive(Debug)]
16pub struct PerCoreRecoveredState {
17    /// Base recovered state from checkpoint.
18    pub base_state: BaseRecoveredState,
19    /// State changes from WAL replay.
20    pub state_changes: Vec<StateChange>,
21    /// Per-core WAL positions after replay.
22    pub wal_positions: Vec<u64>,
23    /// Number of WAL entries replayed.
24    pub entries_replayed: usize,
25    /// Final epoch after recovery.
26    pub final_epoch: u64,
27}
28
29/// A single state change from WAL replay.
30#[derive(Debug, Clone)]
31pub struct StateChange {
32    /// The key.
33    pub key: Vec<u8>,
34    /// The value (None for deletes).
35    pub value: Option<Vec<u8>>,
36    /// Epoch when the change was made.
37    pub epoch: u64,
38    /// Core that made the change.
39    pub core_id: u16,
40}
41
42/// Recovery manager for per-core WAL segments.
43pub struct PerCoreRecoveryManager {
44    /// WAL configuration.
45    wal_config: PerCoreWalConfig,
46    /// Checkpoint recovery configuration.
47    recovery_config: RecoveryConfig,
48}
49
50impl PerCoreRecoveryManager {
51    /// Creates a new per-core recovery manager.
52    #[must_use]
53    pub fn new(wal_config: PerCoreWalConfig, recovery_config: RecoveryConfig) -> Self {
54        Self {
55            wal_config,
56            recovery_config,
57        }
58    }
59
60    /// Recovers state from checkpoint + all WAL segments.
61    ///
62    /// # Process
63    ///
64    /// 1. Load latest checkpoint using `RecoveryManager`
65    /// 2. Read all WAL segments from checkpoint positions
66    /// 3. Merge and sort entries by (epoch, timestamp)
67    /// 4. Apply merged entries to state
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if recovery fails.
72    pub fn recover(&self) -> Result<PerCoreRecoveredState, PerCoreWalError> {
73        self.recover_from_positions(&[])
74    }
75
76    /// Recovers state using checkpoint-stored per-core WAL positions.
77    ///
78    /// If `positions` matches `num_cores`, replay starts from those positions
79    /// instead of the beginning. On mismatch, falls back to full replay.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if checkpoint recovery or WAL replay fails.
84    pub fn recover_from_positions(
85        &self,
86        positions: &[u64],
87    ) -> Result<PerCoreRecoveredState, PerCoreWalError> {
88        // 1. Recover base state from checkpoint
89        let recovery_manager = RecoveryManager::new(self.recovery_config.clone());
90        let base_state = recovery_manager.recover()?;
91
92        // 2. Determine starting positions for each segment.
93        let starting_positions = if positions.len() == self.wal_config.num_cores {
94            positions.to_vec()
95        } else {
96            if !positions.is_empty() {
97                tracing::warn!(
98                    expected = self.wal_config.num_cores,
99                    got = positions.len(),
100                    "Per-core WAL position count mismatch, replaying from 0"
101                );
102            }
103            vec![0u64; self.wal_config.num_cores]
104        };
105
106        // 3. Read and merge WAL segments
107        let (entries, wal_positions) = self.read_all_segments(&starting_positions)?;
108
109        // 4. Process entries into state changes
110        let mut state_changes = Vec::new();
111        let mut final_epoch = base_state.epoch;
112
113        for entry in &entries {
114            final_epoch = final_epoch.max(entry.epoch);
115
116            match &entry.operation {
117                WalOperation::Put { key, value } => {
118                    state_changes.push(StateChange {
119                        key: key.clone(),
120                        value: Some(value.clone()),
121                        epoch: entry.epoch,
122                        core_id: entry.core_id,
123                    });
124                }
125                WalOperation::Delete { key } => {
126                    state_changes.push(StateChange {
127                        key: key.clone(),
128                        value: None,
129                        epoch: entry.epoch,
130                        core_id: entry.core_id,
131                    });
132                }
133                _ => {}
134            }
135        }
136
137        let entries_replayed = entries.len();
138
139        Ok(PerCoreRecoveredState {
140            base_state,
141            state_changes,
142            wal_positions,
143            entries_replayed,
144            final_epoch,
145        })
146    }
147
148    /// Recovers state from WAL segments only (no checkpoint).
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if reading segments fails.
153    pub fn recover_wal_only(&self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
154        let starting_positions = vec![0u64; self.wal_config.num_cores];
155        let (entries, _) = self.read_all_segments(&starting_positions)?;
156        Ok(entries)
157    }
158
159    /// Reads and merges all WAL segments from given positions.
160    fn read_all_segments(
161        &self,
162        starting_positions: &[u64],
163    ) -> Result<(Vec<PerCoreWalEntry>, Vec<u64>), PerCoreWalError> {
164        let mut all_entries = Vec::new();
165        let mut final_positions = Vec::with_capacity(self.wal_config.num_cores);
166
167        for core_id in 0..self.wal_config.num_cores {
168            let path = self.wal_config.segment_path(core_id);
169
170            if path.exists() {
171                let start_pos = starting_positions.get(core_id).copied().unwrap_or(0);
172
173                let mut reader = PerCoreWalReader::open_from(core_id, &path, start_pos)?;
174                let entries = reader.read_all()?;
175                all_entries.extend(entries);
176                final_positions.push(reader.position());
177            } else {
178                final_positions.push(0);
179            }
180        }
181
182        // Sort by (epoch, timestamp_ns, core_id, sequence)
183        all_entries.sort();
184
185        Ok((all_entries, final_positions))
186    }
187
188    /// Repairs all WAL segments by truncating at torn writes.
189    ///
190    /// # Returns
191    ///
192    /// Returns the valid end positions for each segment.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if repair fails.
197    pub fn repair_all_segments(&self) -> Result<Vec<u64>, PerCoreWalError> {
198        let mut valid_positions = Vec::with_capacity(self.wal_config.num_cores);
199
200        for core_id in 0..self.wal_config.num_cores {
201            let path = self.wal_config.segment_path(core_id);
202
203            if path.exists() {
204                let mut reader = PerCoreWalReader::open(core_id, &path)?;
205                let valid_end = reader.find_valid_end()?;
206
207                // Truncate to valid end if needed
208                if valid_end < reader.file_len() {
209                    use std::fs::OpenOptions;
210                    let file = OpenOptions::new().write(true).open(&path)?;
211                    file.set_len(valid_end)?;
212                }
213
214                valid_positions.push(valid_end);
215            } else {
216                valid_positions.push(0);
217            }
218        }
219
220        Ok(valid_positions)
221    }
222
223    /// Gets statistics about all WAL segments.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if reading segments fails.
228    pub fn segment_stats(&self) -> Result<Vec<SegmentStats>, PerCoreWalError> {
229        let mut stats = Vec::with_capacity(self.wal_config.num_cores);
230
231        for core_id in 0..self.wal_config.num_cores {
232            let path = self.wal_config.segment_path(core_id);
233
234            if path.exists() {
235                let mut reader = PerCoreWalReader::open(core_id, &path)?;
236                let entries = reader.read_all()?;
237
238                let min_epoch = entries.iter().map(|e| e.epoch).min().unwrap_or(0);
239                let max_epoch = entries.iter().map(|e| e.epoch).max().unwrap_or(0);
240
241                stats.push(SegmentStats {
242                    core_id,
243                    path: path.clone(),
244                    file_size: reader.file_len(),
245                    entry_count: entries.len(),
246                    min_epoch,
247                    max_epoch,
248                });
249            } else {
250                stats.push(SegmentStats {
251                    core_id,
252                    path,
253                    file_size: 0,
254                    entry_count: 0,
255                    min_epoch: 0,
256                    max_epoch: 0,
257                });
258            }
259        }
260
261        Ok(stats)
262    }
263}
264
265/// Statistics for a single WAL segment.
266#[derive(Debug, Clone)]
267pub struct SegmentStats {
268    /// Core ID.
269    pub core_id: usize,
270    /// Path to the segment file.
271    pub path: PathBuf,
272    /// File size in bytes.
273    pub file_size: u64,
274    /// Number of entries.
275    pub entry_count: usize,
276    /// Minimum epoch in segment.
277    pub min_epoch: u64,
278    /// Maximum epoch in segment.
279    pub max_epoch: u64,
280}
281
282impl std::fmt::Debug for PerCoreRecoveryManager {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("PerCoreRecoveryManager")
285            .field("num_cores", &self.wal_config.num_cores)
286            .field("base_dir", &self.wal_config.base_dir)
287            .finish_non_exhaustive()
288    }
289}
290
291/// Convenience function to recover from per-core WAL.
292///
293/// # Errors
294///
295/// Returns an error if recovery fails.
296pub fn recover_per_core(
297    wal_dir: &Path,
298    checkpoint_dir: &Path,
299    num_cores: usize,
300) -> Result<PerCoreRecoveredState, PerCoreWalError> {
301    // Use the first core's WAL file as the "main" WAL for recovery config
302    // (the per-core recovery will read all segments separately)
303    let wal_path = wal_dir.join("wal-0.log");
304    let wal_config = PerCoreWalConfig::new(wal_dir, num_cores);
305    let recovery_config = RecoveryConfig::new(checkpoint_dir, &wal_path);
306
307    let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
308    manager.recover()
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::incremental::CheckpointConfig;
315    use crate::per_core_wal::{CheckpointCoordinator, PerCoreWalManager};
316    use tempfile::TempDir;
317
318    fn setup_recovery_test() -> (TempDir, PathBuf, PathBuf) {
319        let temp_dir = TempDir::new().unwrap();
320        let wal_dir = temp_dir.path().join("wal");
321        let checkpoint_dir = temp_dir.path().join("checkpoints");
322        std::fs::create_dir_all(&wal_dir).unwrap();
323        std::fs::create_dir_all(&checkpoint_dir).unwrap();
324        (temp_dir, wal_dir, checkpoint_dir)
325    }
326
327    #[test]
328    fn test_recover_empty() {
329        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
330
331        // Create checkpoint first
332        let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
333        let wal_manager = PerCoreWalManager::new(wal_config.clone()).unwrap();
334        let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
335        let mut coordinator = CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
336        coordinator.create_checkpoint(1).unwrap();
337
338        // Now recover
339        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
340        let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
341
342        let state = manager.recover().unwrap();
343        assert_eq!(state.entries_replayed, 0);
344        assert!(state.state_changes.is_empty());
345    }
346
347    #[test]
348    fn test_recover_with_data() {
349        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
350
351        // Create data and checkpoint
352        {
353            let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
354            let wal_manager = PerCoreWalManager::new(wal_config).unwrap();
355            let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
356            let mut coordinator =
357                CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
358
359            coordinator.wal_manager_mut().set_epoch_all(1);
360            coordinator
361                .wal_manager_mut()
362                .writer(0)
363                .append_put(b"key1", b"value1")
364                .unwrap();
365            coordinator
366                .wal_manager_mut()
367                .writer(1)
368                .append_put(b"key2", b"value2")
369                .unwrap();
370
371            coordinator.create_checkpoint(1).unwrap();
372
373            // Write more data after checkpoint
374            coordinator.wal_manager_mut().set_epoch_all(2);
375            coordinator
376                .wal_manager_mut()
377                .writer(0)
378                .append_put(b"key3", b"value3")
379                .unwrap();
380            coordinator.wal_manager_mut().sync_all().unwrap();
381        }
382
383        // Recover
384        let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
385        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
386        let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
387
388        let state = manager.recover().unwrap();
389
390        // Should have key3 in state changes (written after checkpoint)
391        assert!(!state.state_changes.is_empty());
392        assert!(state.state_changes.iter().any(|c| c.key == b"key3"));
393    }
394
395    #[test]
396    fn test_recover_wal_only() {
397        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
398
399        // Write to WAL directly
400        {
401            let config = PerCoreWalConfig::new(&wal_dir, 2);
402            let mut manager = PerCoreWalManager::new(config).unwrap();
403
404            manager.set_epoch_all(1);
405            manager.writer(0).append_put(b"key1", b"value1").unwrap();
406            manager.writer(1).append_put(b"key2", b"value2").unwrap();
407            manager.set_epoch_all(2);
408            manager.writer(0).append_put(b"key3", b"value3").unwrap();
409            manager.sync_all().unwrap();
410        }
411
412        let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
413        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
414        let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
415
416        let entries = manager.recover_wal_only().unwrap();
417        assert_eq!(entries.len(), 3);
418
419        // Check ordering: epoch 1 first, then epoch 2
420        assert_eq!(entries[0].epoch, 1);
421        assert_eq!(entries[1].epoch, 1);
422        assert_eq!(entries[2].epoch, 2);
423    }
424
425    #[test]
426    fn test_repair_segments() {
427        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
428
429        // Write valid data
430        {
431            let config = PerCoreWalConfig::new(&wal_dir, 2);
432            let mut manager = PerCoreWalManager::new(config).unwrap();
433            manager.writer(0).append_put(b"key1", b"value1").unwrap();
434            manager.sync_all().unwrap();
435        }
436
437        // Append garbage to simulate torn write
438        {
439            use std::io::Write;
440            let path = wal_dir.join("wal-0.log");
441            let mut file = std::fs::OpenOptions::new()
442                .append(true)
443                .open(&path)
444                .unwrap();
445            file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
446            file.sync_all().unwrap();
447        }
448
449        let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
450        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
451        let manager = PerCoreRecoveryManager::new(wal_config.clone(), recovery_config);
452
453        let valid_positions = manager.repair_all_segments().unwrap();
454
455        // Segment 0 should be truncated
456        let path = wal_config.segment_path(0);
457        let file_size = std::fs::metadata(&path).unwrap().len();
458        assert_eq!(file_size, valid_positions[0]);
459    }
460
461    #[test]
462    fn test_segment_stats() {
463        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
464
465        // Write data
466        {
467            let config = PerCoreWalConfig::new(&wal_dir, 2);
468            let mut manager = PerCoreWalManager::new(config).unwrap();
469            manager.set_epoch_all(1);
470            manager.writer(0).append_put(b"key1", b"value1").unwrap();
471            manager.writer(0).append_put(b"key2", b"value2").unwrap();
472            manager.set_epoch_all(2);
473            manager.writer(0).append_put(b"key3", b"value3").unwrap();
474            manager.writer(1).append_put(b"key4", b"value4").unwrap();
475            manager.sync_all().unwrap();
476        }
477
478        let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
479        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
480        let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
481
482        let stats = manager.segment_stats().unwrap();
483
484        assert_eq!(stats.len(), 2);
485        assert_eq!(stats[0].entry_count, 3); // 3 entries on core 0
486        assert_eq!(stats[1].entry_count, 1); // 1 entry on core 1
487        assert_eq!(stats[0].min_epoch, 1);
488        assert_eq!(stats[0].max_epoch, 2);
489    }
490
491    #[test]
492    fn test_convenience_function() {
493        let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
494
495        // Create checkpoint
496        {
497            let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
498            let wal_manager = PerCoreWalManager::new(wal_config).unwrap();
499            let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
500            let mut coordinator =
501                CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
502            coordinator.create_checkpoint(1).unwrap();
503        }
504
505        let state = recover_per_core(&wal_dir, &checkpoint_dir, 2).unwrap();
506        assert!(state.state_changes.is_empty());
507    }
508}