laminar_storage/per_core_wal/
recovery.rs1use std::path::{Path, PathBuf};
6
7use crate::incremental::{RecoveredState as BaseRecoveredState, RecoveryConfig, RecoveryManager};
8
9use super::entry::{PerCoreWalEntry, WalOperation};
10use super::error::PerCoreWalError;
11use super::manager::PerCoreWalConfig;
12use super::reader::PerCoreWalReader;
13
14#[derive(Debug)]
16pub struct PerCoreRecoveredState {
17 pub base_state: BaseRecoveredState,
19 pub state_changes: Vec<StateChange>,
21 pub wal_positions: Vec<u64>,
23 pub entries_replayed: usize,
25 pub final_epoch: u64,
27}
28
29#[derive(Debug, Clone)]
31pub struct StateChange {
32 pub key: Vec<u8>,
34 pub value: Option<Vec<u8>>,
36 pub epoch: u64,
38 pub core_id: u16,
40}
41
42pub struct PerCoreRecoveryManager {
44 wal_config: PerCoreWalConfig,
46 recovery_config: RecoveryConfig,
48}
49
50impl PerCoreRecoveryManager {
51 #[must_use]
53 pub fn new(wal_config: PerCoreWalConfig, recovery_config: RecoveryConfig) -> Self {
54 Self {
55 wal_config,
56 recovery_config,
57 }
58 }
59
60 pub fn recover(&self) -> Result<PerCoreRecoveredState, PerCoreWalError> {
73 self.recover_from_positions(&[])
74 }
75
76 pub fn recover_from_positions(
85 &self,
86 positions: &[u64],
87 ) -> Result<PerCoreRecoveredState, PerCoreWalError> {
88 let recovery_manager = RecoveryManager::new(self.recovery_config.clone());
90 let base_state = recovery_manager.recover()?;
91
92 let starting_positions = if positions.len() == self.wal_config.num_cores {
94 positions.to_vec()
95 } else {
96 if !positions.is_empty() {
97 tracing::warn!(
98 expected = self.wal_config.num_cores,
99 got = positions.len(),
100 "Per-core WAL position count mismatch, replaying from 0"
101 );
102 }
103 vec![0u64; self.wal_config.num_cores]
104 };
105
106 let (entries, wal_positions) = self.read_all_segments(&starting_positions)?;
108
109 let mut state_changes = Vec::new();
111 let mut final_epoch = base_state.epoch;
112
113 for entry in &entries {
114 final_epoch = final_epoch.max(entry.epoch);
115
116 match &entry.operation {
117 WalOperation::Put { key, value } => {
118 state_changes.push(StateChange {
119 key: key.clone(),
120 value: Some(value.clone()),
121 epoch: entry.epoch,
122 core_id: entry.core_id,
123 });
124 }
125 WalOperation::Delete { key } => {
126 state_changes.push(StateChange {
127 key: key.clone(),
128 value: None,
129 epoch: entry.epoch,
130 core_id: entry.core_id,
131 });
132 }
133 _ => {}
134 }
135 }
136
137 let entries_replayed = entries.len();
138
139 Ok(PerCoreRecoveredState {
140 base_state,
141 state_changes,
142 wal_positions,
143 entries_replayed,
144 final_epoch,
145 })
146 }
147
148 pub fn recover_wal_only(&self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
154 let starting_positions = vec![0u64; self.wal_config.num_cores];
155 let (entries, _) = self.read_all_segments(&starting_positions)?;
156 Ok(entries)
157 }
158
159 fn read_all_segments(
161 &self,
162 starting_positions: &[u64],
163 ) -> Result<(Vec<PerCoreWalEntry>, Vec<u64>), PerCoreWalError> {
164 let mut all_entries = Vec::new();
165 let mut final_positions = Vec::with_capacity(self.wal_config.num_cores);
166
167 for core_id in 0..self.wal_config.num_cores {
168 let path = self.wal_config.segment_path(core_id);
169
170 if path.exists() {
171 let start_pos = starting_positions.get(core_id).copied().unwrap_or(0);
172
173 let mut reader = PerCoreWalReader::open_from(core_id, &path, start_pos)?;
174 let entries = reader.read_all()?;
175 all_entries.extend(entries);
176 final_positions.push(reader.position());
177 } else {
178 final_positions.push(0);
179 }
180 }
181
182 all_entries.sort();
184
185 Ok((all_entries, final_positions))
186 }
187
188 pub fn repair_all_segments(&self) -> Result<Vec<u64>, PerCoreWalError> {
198 let mut valid_positions = Vec::with_capacity(self.wal_config.num_cores);
199
200 for core_id in 0..self.wal_config.num_cores {
201 let path = self.wal_config.segment_path(core_id);
202
203 if path.exists() {
204 let mut reader = PerCoreWalReader::open(core_id, &path)?;
205 let valid_end = reader.find_valid_end()?;
206
207 if valid_end < reader.file_len() {
209 use std::fs::OpenOptions;
210 let file = OpenOptions::new().write(true).open(&path)?;
211 file.set_len(valid_end)?;
212 }
213
214 valid_positions.push(valid_end);
215 } else {
216 valid_positions.push(0);
217 }
218 }
219
220 Ok(valid_positions)
221 }
222
223 pub fn segment_stats(&self) -> Result<Vec<SegmentStats>, PerCoreWalError> {
229 let mut stats = Vec::with_capacity(self.wal_config.num_cores);
230
231 for core_id in 0..self.wal_config.num_cores {
232 let path = self.wal_config.segment_path(core_id);
233
234 if path.exists() {
235 let mut reader = PerCoreWalReader::open(core_id, &path)?;
236 let entries = reader.read_all()?;
237
238 let min_epoch = entries.iter().map(|e| e.epoch).min().unwrap_or(0);
239 let max_epoch = entries.iter().map(|e| e.epoch).max().unwrap_or(0);
240
241 stats.push(SegmentStats {
242 core_id,
243 path: path.clone(),
244 file_size: reader.file_len(),
245 entry_count: entries.len(),
246 min_epoch,
247 max_epoch,
248 });
249 } else {
250 stats.push(SegmentStats {
251 core_id,
252 path,
253 file_size: 0,
254 entry_count: 0,
255 min_epoch: 0,
256 max_epoch: 0,
257 });
258 }
259 }
260
261 Ok(stats)
262 }
263}
264
265#[derive(Debug, Clone)]
267pub struct SegmentStats {
268 pub core_id: usize,
270 pub path: PathBuf,
272 pub file_size: u64,
274 pub entry_count: usize,
276 pub min_epoch: u64,
278 pub max_epoch: u64,
280}
281
282impl std::fmt::Debug for PerCoreRecoveryManager {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("PerCoreRecoveryManager")
285 .field("num_cores", &self.wal_config.num_cores)
286 .field("base_dir", &self.wal_config.base_dir)
287 .finish_non_exhaustive()
288 }
289}
290
291pub fn recover_per_core(
297 wal_dir: &Path,
298 checkpoint_dir: &Path,
299 num_cores: usize,
300) -> Result<PerCoreRecoveredState, PerCoreWalError> {
301 let wal_path = wal_dir.join("wal-0.log");
304 let wal_config = PerCoreWalConfig::new(wal_dir, num_cores);
305 let recovery_config = RecoveryConfig::new(checkpoint_dir, &wal_path);
306
307 let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
308 manager.recover()
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::incremental::CheckpointConfig;
315 use crate::per_core_wal::{CheckpointCoordinator, PerCoreWalManager};
316 use tempfile::TempDir;
317
318 fn setup_recovery_test() -> (TempDir, PathBuf, PathBuf) {
319 let temp_dir = TempDir::new().unwrap();
320 let wal_dir = temp_dir.path().join("wal");
321 let checkpoint_dir = temp_dir.path().join("checkpoints");
322 std::fs::create_dir_all(&wal_dir).unwrap();
323 std::fs::create_dir_all(&checkpoint_dir).unwrap();
324 (temp_dir, wal_dir, checkpoint_dir)
325 }
326
327 #[test]
328 fn test_recover_empty() {
329 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
330
331 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
333 let wal_manager = PerCoreWalManager::new(wal_config.clone()).unwrap();
334 let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
335 let mut coordinator = CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
336 coordinator.create_checkpoint(1).unwrap();
337
338 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
340 let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
341
342 let state = manager.recover().unwrap();
343 assert_eq!(state.entries_replayed, 0);
344 assert!(state.state_changes.is_empty());
345 }
346
347 #[test]
348 fn test_recover_with_data() {
349 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
350
351 {
353 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
354 let wal_manager = PerCoreWalManager::new(wal_config).unwrap();
355 let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
356 let mut coordinator =
357 CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
358
359 coordinator.wal_manager_mut().set_epoch_all(1);
360 coordinator
361 .wal_manager_mut()
362 .writer(0)
363 .append_put(b"key1", b"value1")
364 .unwrap();
365 coordinator
366 .wal_manager_mut()
367 .writer(1)
368 .append_put(b"key2", b"value2")
369 .unwrap();
370
371 coordinator.create_checkpoint(1).unwrap();
372
373 coordinator.wal_manager_mut().set_epoch_all(2);
375 coordinator
376 .wal_manager_mut()
377 .writer(0)
378 .append_put(b"key3", b"value3")
379 .unwrap();
380 coordinator.wal_manager_mut().sync_all().unwrap();
381 }
382
383 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
385 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
386 let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
387
388 let state = manager.recover().unwrap();
389
390 assert!(!state.state_changes.is_empty());
392 assert!(state.state_changes.iter().any(|c| c.key == b"key3"));
393 }
394
395 #[test]
396 fn test_recover_wal_only() {
397 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
398
399 {
401 let config = PerCoreWalConfig::new(&wal_dir, 2);
402 let mut manager = PerCoreWalManager::new(config).unwrap();
403
404 manager.set_epoch_all(1);
405 manager.writer(0).append_put(b"key1", b"value1").unwrap();
406 manager.writer(1).append_put(b"key2", b"value2").unwrap();
407 manager.set_epoch_all(2);
408 manager.writer(0).append_put(b"key3", b"value3").unwrap();
409 manager.sync_all().unwrap();
410 }
411
412 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
413 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
414 let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
415
416 let entries = manager.recover_wal_only().unwrap();
417 assert_eq!(entries.len(), 3);
418
419 assert_eq!(entries[0].epoch, 1);
421 assert_eq!(entries[1].epoch, 1);
422 assert_eq!(entries[2].epoch, 2);
423 }
424
425 #[test]
426 fn test_repair_segments() {
427 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
428
429 {
431 let config = PerCoreWalConfig::new(&wal_dir, 2);
432 let mut manager = PerCoreWalManager::new(config).unwrap();
433 manager.writer(0).append_put(b"key1", b"value1").unwrap();
434 manager.sync_all().unwrap();
435 }
436
437 {
439 use std::io::Write;
440 let path = wal_dir.join("wal-0.log");
441 let mut file = std::fs::OpenOptions::new()
442 .append(true)
443 .open(&path)
444 .unwrap();
445 file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
446 file.sync_all().unwrap();
447 }
448
449 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
450 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
451 let manager = PerCoreRecoveryManager::new(wal_config.clone(), recovery_config);
452
453 let valid_positions = manager.repair_all_segments().unwrap();
454
455 let path = wal_config.segment_path(0);
457 let file_size = std::fs::metadata(&path).unwrap().len();
458 assert_eq!(file_size, valid_positions[0]);
459 }
460
461 #[test]
462 fn test_segment_stats() {
463 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
464
465 {
467 let config = PerCoreWalConfig::new(&wal_dir, 2);
468 let mut manager = PerCoreWalManager::new(config).unwrap();
469 manager.set_epoch_all(1);
470 manager.writer(0).append_put(b"key1", b"value1").unwrap();
471 manager.writer(0).append_put(b"key2", b"value2").unwrap();
472 manager.set_epoch_all(2);
473 manager.writer(0).append_put(b"key3", b"value3").unwrap();
474 manager.writer(1).append_put(b"key4", b"value4").unwrap();
475 manager.sync_all().unwrap();
476 }
477
478 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
479 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_dir.join("wal-0.log"));
480 let manager = PerCoreRecoveryManager::new(wal_config, recovery_config);
481
482 let stats = manager.segment_stats().unwrap();
483
484 assert_eq!(stats.len(), 2);
485 assert_eq!(stats[0].entry_count, 3); assert_eq!(stats[1].entry_count, 1); assert_eq!(stats[0].min_epoch, 1);
488 assert_eq!(stats[0].max_epoch, 2);
489 }
490
491 #[test]
492 fn test_convenience_function() {
493 let (_temp_dir, wal_dir, checkpoint_dir) = setup_recovery_test();
494
495 {
497 let wal_config = PerCoreWalConfig::new(&wal_dir, 2);
498 let wal_manager = PerCoreWalManager::new(wal_config).unwrap();
499 let checkpoint_config = CheckpointConfig::new(&checkpoint_dir).with_wal_path(&wal_dir);
500 let mut coordinator =
501 CheckpointCoordinator::new(wal_manager, checkpoint_config).unwrap();
502 coordinator.create_checkpoint(1).unwrap();
503 }
504
505 let state = recover_per_core(&wal_dir, &checkpoint_dir, 2).unwrap();
506 assert!(state.state_changes.is_empty());
507 }
508}