rustlite_wal/
recovery.rs

1// WAL recovery module - handles crash recovery logic
2//
3// Recovery is responsible for:
4// 1. Reading all WAL records from disk
5// 2. Tracking transaction boundaries (BEGIN/COMMIT)
6// 3. Only returning committed records (incomplete transactions are rolled back)
7// 4. Handling corrupted or truncated records gracefully
8
9use crate::record::RecordPayload;
10use crate::{WalConfig, WalReader, WalRecord};
11use rustlite_core::{Error, Result};
12use std::collections::{HashMap, HashSet};
13
14/// Manages WAL recovery after crash or restart
15pub struct RecoveryManager {
16    config: WalConfig,
17}
18
19/// Represents a transaction's state during recovery
20#[derive(Debug, Clone)]
21struct TransactionState {
22    /// Records belonging to this transaction
23    records: Vec<WalRecord>,
24    /// Whether the transaction was committed
25    committed: bool,
26}
27
28impl RecoveryManager {
29    /// Create a new recovery manager with the given configuration
30    pub fn new(config: WalConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    /// Recover records from WAL
35    ///
36    /// This method:
37    /// 1. Reads all records from all WAL segments
38    /// 2. Tracks transaction boundaries
39    /// 3. Only returns records from committed transactions
40    /// 4. For records outside transactions, returns them directly
41    ///
42    /// Returns a vector of recovered records in order
43    pub fn recover(&self) -> Result<Vec<WalRecord>> {
44        let mut reader = WalReader::new(&self.config.wal_dir)?;
45
46        if reader.segment_count() == 0 {
47            return Ok(Vec::new());
48        }
49
50        // Track active transactions
51        let mut transactions: HashMap<u64, TransactionState> = HashMap::new();
52        let mut committed_tx_ids: HashSet<u64> = HashSet::new();
53
54        // Records outside of any transaction
55        let mut standalone_records: Vec<WalRecord> = Vec::new();
56
57        // Current transaction context (for records that don't specify tx_id)
58        let mut current_tx_id: Option<u64> = None;
59
60        // Read all records
61        loop {
62            match reader.next_record() {
63                Ok(Some(record)) => {
64                    match &record.payload {
65                        RecordPayload::BeginTx { tx_id } => {
66                            // Start tracking a new transaction
67                            transactions.insert(
68                                *tx_id,
69                                TransactionState {
70                                    records: Vec::new(),
71                                    committed: false,
72                                },
73                            );
74                            current_tx_id = Some(*tx_id);
75                        }
76                        RecordPayload::CommitTx { tx_id } => {
77                            // Mark transaction as committed
78                            if let Some(tx_state) = transactions.get_mut(tx_id) {
79                                tx_state.committed = true;
80                                committed_tx_ids.insert(*tx_id);
81                            }
82                            // Clear current tx if it matches
83                            if current_tx_id == Some(*tx_id) {
84                                current_tx_id = None;
85                            }
86                        }
87                        RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
88                            // Data records - add to current transaction or standalone
89                            if let Some(tx_id) = current_tx_id {
90                                if let Some(tx_state) = transactions.get_mut(&tx_id) {
91                                    tx_state.records.push(record);
92                                } else {
93                                    // Transaction not found, treat as standalone
94                                    standalone_records.push(record);
95                                }
96                            } else {
97                                // No active transaction
98                                standalone_records.push(record);
99                            }
100                        }
101                        RecordPayload::Checkpoint { .. } => {
102                            // Checkpoint records can be used for optimization
103                            // For now, we just skip them during recovery
104                        }
105                    }
106                }
107                Ok(None) => {
108                    // End of WAL
109                    break;
110                }
111                Err(e) => {
112                    // Handle errors gracefully
113                    // CRC errors or truncation means we stop here
114                    // Records up to this point are still valid
115                    if Self::is_recoverable_error(&e) {
116                        break;
117                    }
118                    return Err(e);
119                }
120            }
121        }
122
123        // Collect results: standalone records + committed transaction records
124        let mut result = standalone_records;
125
126        // Add records from committed transactions in order
127        // Sort by tx_id for deterministic ordering
128        let mut committed_txs: Vec<_> = transactions
129            .into_iter()
130            .filter(|(_, state)| state.committed)
131            .collect();
132        committed_txs.sort_by_key(|(tx_id, _)| *tx_id);
133
134        for (_, tx_state) in committed_txs {
135            result.extend(tx_state.records);
136        }
137
138        Ok(result)
139    }
140
141    /// Recover records with transaction markers included
142    ///
143    /// Unlike `recover()`, this method returns all records including
144    /// BEGIN_TX and COMMIT_TX markers for committed transactions.
145    /// This is useful for replaying the exact WAL state.
146    pub fn recover_with_markers(&self) -> Result<Vec<WalRecord>> {
147        let mut reader = WalReader::new(&self.config.wal_dir)?;
148
149        if reader.segment_count() == 0 {
150            return Ok(Vec::new());
151        }
152
153        // First pass: identify committed transactions
154        let mut committed_tx_ids: HashSet<u64> = HashSet::new();
155        let mut all_records: Vec<WalRecord> = Vec::new();
156
157        loop {
158            match reader.next_record() {
159                Ok(Some(record)) => {
160                    if let RecordPayload::CommitTx { tx_id } = &record.payload {
161                        committed_tx_ids.insert(*tx_id);
162                    }
163                    all_records.push(record);
164                }
165                Ok(None) => break,
166                Err(e) => {
167                    if Self::is_recoverable_error(&e) {
168                        break;
169                    }
170                    return Err(e);
171                }
172            }
173        }
174
175        // Second pass: filter to only include committed transactions and standalone records
176        let mut result: Vec<WalRecord> = Vec::new();
177        let mut current_tx_id: Option<u64> = None;
178        let mut in_committed_tx = false;
179
180        for record in all_records {
181            let payload = &record.payload;
182            let should_include = match payload {
183                RecordPayload::BeginTx { tx_id } => {
184                    in_committed_tx = committed_tx_ids.contains(tx_id);
185                    current_tx_id = Some(*tx_id);
186                    in_committed_tx
187                }
188                RecordPayload::CommitTx { tx_id } => {
189                    let include = committed_tx_ids.contains(tx_id);
190                    if current_tx_id == Some(*tx_id) {
191                        current_tx_id = None;
192                        in_committed_tx = false;
193                    }
194                    include
195                }
196                RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
197                    if current_tx_id.is_some() {
198                        // In a transaction
199                        in_committed_tx
200                    } else {
201                        // Standalone record
202                        true
203                    }
204                }
205                RecordPayload::Checkpoint { .. } => {
206                    // Include checkpoint markers
207                    true
208                }
209            };
210
211            if should_include {
212                result.push(record);
213            }
214        }
215
216        Ok(result)
217    }
218
219    /// Check if an error is recoverable (we can continue without the corrupted data)
220    fn is_recoverable_error(err: &Error) -> bool {
221        match err {
222            Error::Storage(msg) => msg.contains("CRC mismatch"),
223            Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
224            _ => false,
225        }
226    }
227
228    /// Get statistics about the WAL
229    pub fn get_stats(&self) -> Result<RecoveryStats> {
230        let mut reader = WalReader::new(&self.config.wal_dir)?;
231
232        let mut stats = RecoveryStats {
233            segment_count: reader.segment_count(),
234            total_records: 0,
235            put_records: 0,
236            delete_records: 0,
237            transactions_started: 0,
238            transactions_committed: 0,
239            transactions_incomplete: 0,
240            checkpoints: 0,
241        };
242
243        let mut active_transactions: HashSet<u64> = HashSet::new();
244
245        loop {
246            match reader.next_record() {
247                Ok(Some(record)) => {
248                    stats.total_records += 1;
249                    match &record.payload {
250                        RecordPayload::Put { .. } => stats.put_records += 1,
251                        RecordPayload::Delete { .. } => stats.delete_records += 1,
252                        RecordPayload::BeginTx { tx_id } => {
253                            stats.transactions_started += 1;
254                            active_transactions.insert(*tx_id);
255                        }
256                        RecordPayload::CommitTx { tx_id } => {
257                            stats.transactions_committed += 1;
258                            active_transactions.remove(tx_id);
259                        }
260                        RecordPayload::Checkpoint { .. } => stats.checkpoints += 1,
261                    }
262                }
263                Ok(None) => break,
264                Err(_) => break,
265            }
266        }
267
268        stats.transactions_incomplete = active_transactions.len();
269
270        Ok(stats)
271    }
272}
273
274/// Statistics about the WAL state
275#[derive(Debug, Clone, Default)]
276pub struct RecoveryStats {
277    /// Number of segment files
278    pub segment_count: usize,
279    /// Total number of records
280    pub total_records: usize,
281    /// Number of PUT records
282    pub put_records: usize,
283    /// Number of DELETE records
284    pub delete_records: usize,
285    /// Number of transactions started
286    pub transactions_started: usize,
287    /// Number of transactions committed
288    pub transactions_committed: usize,
289    /// Number of incomplete transactions (started but not committed)
290    pub transactions_incomplete: usize,
291    /// Number of checkpoint records
292    pub checkpoints: usize,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::{RecordType, SyncMode, WalWriter};
299    use tempfile::TempDir;
300
301    fn setup_test_wal() -> (TempDir, WalConfig) {
302        let temp_dir = TempDir::new().expect("Failed to create temp dir");
303        let wal_path = temp_dir.path().join("wal");
304        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
305
306        let config = WalConfig {
307            wal_dir: wal_path,
308            sync_mode: SyncMode::Sync,
309            max_segment_size: 64 * 1024 * 1024,
310        };
311
312        (temp_dir, config)
313    }
314
315    #[test]
316    fn test_recovery_empty_wal() {
317        let (_temp_dir, config) = setup_test_wal();
318
319        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
320        let records = recovery.recover().expect("Failed to recover");
321
322        assert!(records.is_empty());
323    }
324
325    #[test]
326    fn test_recovery_standalone_records() {
327        let (_temp_dir, config) = setup_test_wal();
328
329        // Write standalone records (no transaction)
330        {
331            let mut writer =
332                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
333                    .expect("Failed to create writer");
334
335            for i in 0..5 {
336                let record = WalRecord::put(
337                    format!("key{}", i).into_bytes(),
338                    format!("value{}", i).into_bytes(),
339                );
340                writer.append(record).expect("Failed to append");
341            }
342            writer.sync().expect("Failed to sync");
343        }
344
345        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
346        let records = recovery.recover().expect("Failed to recover");
347
348        assert_eq!(records.len(), 5);
349    }
350
351    #[test]
352    fn test_recovery_committed_transaction() {
353        let (_temp_dir, config) = setup_test_wal();
354
355        // Write a complete transaction
356        {
357            let mut writer =
358                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
359                    .expect("Failed to create writer");
360
361            writer
362                .append(WalRecord::begin_tx(1))
363                .expect("Failed to append");
364            writer
365                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
366                .expect("Failed to append");
367            writer
368                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
369                .expect("Failed to append");
370            writer
371                .append(WalRecord::commit_tx(1))
372                .expect("Failed to append");
373            writer.sync().expect("Failed to sync");
374        }
375
376        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
377        let records = recovery.recover().expect("Failed to recover");
378
379        // Should have 2 PUT records (BEGIN and COMMIT are filtered out by recover())
380        assert_eq!(records.len(), 2);
381        assert_eq!(records[0].record_type, RecordType::Put);
382        assert_eq!(records[1].record_type, RecordType::Put);
383    }
384
385    #[test]
386    fn test_recovery_incomplete_transaction_rollback() {
387        let (_temp_dir, config) = setup_test_wal();
388
389        // Write an incomplete transaction (no COMMIT)
390        {
391            let mut writer =
392                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
393                    .expect("Failed to create writer");
394
395            writer
396                .append(WalRecord::begin_tx(1))
397                .expect("Failed to append");
398            writer
399                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
400                .expect("Failed to append");
401            writer
402                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
403                .expect("Failed to append");
404            // NO COMMIT - simulating crash
405            writer.sync().expect("Failed to sync");
406        }
407
408        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
409        let records = recovery.recover().expect("Failed to recover");
410
411        // Incomplete transaction should be rolled back - no records recovered
412        assert_eq!(records.len(), 0);
413    }
414
415    #[test]
416    fn test_recovery_mixed_committed_and_incomplete() {
417        let (_temp_dir, config) = setup_test_wal();
418
419        // Write one complete and one incomplete transaction
420        {
421            let mut writer =
422                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
423                    .expect("Failed to create writer");
424
425            // Transaction 1: Complete
426            writer
427                .append(WalRecord::begin_tx(1))
428                .expect("Failed to append");
429            writer
430                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
431                .expect("Failed to append");
432            writer
433                .append(WalRecord::commit_tx(1))
434                .expect("Failed to append");
435
436            // Transaction 2: Incomplete
437            writer
438                .append(WalRecord::begin_tx(2))
439                .expect("Failed to append");
440            writer
441                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
442                .expect("Failed to append");
443            // NO COMMIT
444            writer.sync().expect("Failed to sync");
445        }
446
447        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
448        let records = recovery.recover().expect("Failed to recover");
449
450        // Only records from committed transaction
451        assert_eq!(records.len(), 1);
452    }
453
454    #[test]
455    fn test_recovery_with_markers() {
456        let (_temp_dir, config) = setup_test_wal();
457
458        // Write a complete transaction
459        {
460            let mut writer =
461                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
462                    .expect("Failed to create writer");
463
464            writer
465                .append(WalRecord::begin_tx(1))
466                .expect("Failed to append");
467            writer
468                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
469                .expect("Failed to append");
470            writer
471                .append(WalRecord::commit_tx(1))
472                .expect("Failed to append");
473            writer.sync().expect("Failed to sync");
474        }
475
476        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
477        let records = recovery.recover_with_markers().expect("Failed to recover");
478
479        // Should have all 3 records including markers
480        assert_eq!(records.len(), 3);
481        assert_eq!(records[0].record_type, RecordType::BeginTx);
482        assert_eq!(records[1].record_type, RecordType::Put);
483        assert_eq!(records[2].record_type, RecordType::CommitTx);
484    }
485
486    #[test]
487    fn test_recovery_stats() {
488        let (_temp_dir, config) = setup_test_wal();
489
490        // Write various records
491        {
492            let mut writer =
493                WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
494                    .expect("Failed to create writer");
495
496            // Complete transaction
497            writer
498                .append(WalRecord::begin_tx(1))
499                .expect("Failed to append");
500            writer
501                .append(WalRecord::put(b"k1".to_vec(), b"v1".to_vec()))
502                .expect("Failed to append");
503            writer
504                .append(WalRecord::commit_tx(1))
505                .expect("Failed to append");
506
507            // Incomplete transaction
508            writer
509                .append(WalRecord::begin_tx(2))
510                .expect("Failed to append");
511            writer
512                .append(WalRecord::delete(b"k2".to_vec()))
513                .expect("Failed to append");
514
515            writer.sync().expect("Failed to sync");
516        }
517
518        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
519        let stats = recovery.get_stats().expect("Failed to get stats");
520
521        assert_eq!(stats.total_records, 5);
522        assert_eq!(stats.put_records, 1);
523        assert_eq!(stats.delete_records, 1);
524        assert_eq!(stats.transactions_started, 2);
525        assert_eq!(stats.transactions_committed, 1);
526        assert_eq!(stats.transactions_incomplete, 1);
527    }
528}