rustlite_wal/
recovery.rs

1// WAL recovery module - handles crash recovery logic
2//
3// Recovery is responsible for:
4// 1. Reading all WAL records from disk
5// 2. Tracking transaction boundaries (BEGIN/COMMIT)
6// 3. Only returning committed records (incomplete transactions are rolled back)
7// 4. Handling corrupted or truncated records gracefully
8
9use crate::{WalConfig, WalReader, WalRecord};
10use crate::record::RecordPayload;
11use rustlite_core::{Error, Result};
12use std::collections::{HashMap, HashSet};
13
14/// Manages WAL recovery after crash or restart
15pub struct RecoveryManager {
16    config: WalConfig,
17}
18
19/// Represents a transaction's state during recovery
20#[derive(Debug, Clone)]
21struct TransactionState {
22    /// Records belonging to this transaction
23    records: Vec<WalRecord>,
24    /// Whether the transaction was committed
25    committed: bool,
26}
27
28impl RecoveryManager {
29    /// Create a new recovery manager with the given configuration
30    pub fn new(config: WalConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    /// Recover records from WAL
35    ///
36    /// This method:
37    /// 1. Reads all records from all WAL segments
38    /// 2. Tracks transaction boundaries
39    /// 3. Only returns records from committed transactions
40    /// 4. For records outside transactions, returns them directly
41    ///
42    /// Returns a vector of recovered records in order
43    pub fn recover(&self) -> Result<Vec<WalRecord>> {
44        let mut reader = WalReader::new(&self.config.wal_dir)?;
45
46        if reader.segment_count() == 0 {
47            return Ok(Vec::new());
48        }
49
50        // Track active transactions
51        let mut transactions: HashMap<u64, TransactionState> = HashMap::new();
52        let mut committed_tx_ids: HashSet<u64> = HashSet::new();
53
54        // Records outside of any transaction
55        let mut standalone_records: Vec<WalRecord> = Vec::new();
56
57        // Current transaction context (for records that don't specify tx_id)
58        let mut current_tx_id: Option<u64> = None;
59
60        // Read all records
61        loop {
62            match reader.next_record() {
63                Ok(Some(record)) => {
64                    match &record.payload {
65                        RecordPayload::BeginTx { tx_id } => {
66                            // Start tracking a new transaction
67                            transactions.insert(
68                                *tx_id,
69                                TransactionState {
70                                    records: Vec::new(),
71                                    committed: false,
72                                },
73                            );
74                            current_tx_id = Some(*tx_id);
75                        }
76                        RecordPayload::CommitTx { tx_id } => {
77                            // Mark transaction as committed
78                            if let Some(tx_state) = transactions.get_mut(tx_id) {
79                                tx_state.committed = true;
80                                committed_tx_ids.insert(*tx_id);
81                            }
82                            // Clear current tx if it matches
83                            if current_tx_id == Some(*tx_id) {
84                                current_tx_id = None;
85                            }
86                        }
87                        RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
88                            // Data records - add to current transaction or standalone
89                            if let Some(tx_id) = current_tx_id {
90                                if let Some(tx_state) = transactions.get_mut(&tx_id) {
91                                    tx_state.records.push(record);
92                                } else {
93                                    // Transaction not found, treat as standalone
94                                    standalone_records.push(record);
95                                }
96                            } else {
97                                // No active transaction
98                                standalone_records.push(record);
99                            }
100                        }
101                        RecordPayload::Checkpoint { .. } => {
102                            // Checkpoint records can be used for optimization
103                            // For now, we just skip them during recovery
104                        }
105                    }
106                }
107                Ok(None) => {
108                    // End of WAL
109                    break;
110                }
111                Err(e) => {
112                    // Handle errors gracefully
113                    // CRC errors or truncation means we stop here
114                    // Records up to this point are still valid
115                    if Self::is_recoverable_error(&e) {
116                        break;
117                    }
118                    return Err(e);
119                }
120            }
121        }
122
123        // Collect results: standalone records + committed transaction records
124        let mut result = standalone_records;
125
126        // Add records from committed transactions in order
127        // Sort by tx_id for deterministic ordering
128        let mut committed_txs: Vec<_> = transactions
129            .into_iter()
130            .filter(|(_, state)| state.committed)
131            .collect();
132        committed_txs.sort_by_key(|(tx_id, _)| *tx_id);
133
134        for (_, tx_state) in committed_txs {
135            result.extend(tx_state.records);
136        }
137
138        Ok(result)
139    }
140
141    /// Recover records with transaction markers included
142    ///
143    /// Unlike `recover()`, this method returns all records including
144    /// BEGIN_TX and COMMIT_TX markers for committed transactions.
145    /// This is useful for replaying the exact WAL state.
146    pub fn recover_with_markers(&self) -> Result<Vec<WalRecord>> {
147        let mut reader = WalReader::new(&self.config.wal_dir)?;
148
149        if reader.segment_count() == 0 {
150            return Ok(Vec::new());
151        }
152
153        // First pass: identify committed transactions
154        let mut committed_tx_ids: HashSet<u64> = HashSet::new();
155        let mut all_records: Vec<WalRecord> = Vec::new();
156
157        loop {
158            match reader.next_record() {
159                Ok(Some(record)) => {
160                    if let RecordPayload::CommitTx { tx_id } = &record.payload {
161                        committed_tx_ids.insert(*tx_id);
162                    }
163                    all_records.push(record);
164                }
165                Ok(None) => break,
166                Err(e) => {
167                    if Self::is_recoverable_error(&e) {
168                        break;
169                    }
170                    return Err(e);
171                }
172            }
173        }
174
175        // Second pass: filter to only include committed transactions and standalone records
176        let mut result: Vec<WalRecord> = Vec::new();
177        let mut current_tx_id: Option<u64> = None;
178        let mut in_committed_tx = false;
179
180        for record in all_records {
181            let payload = &record.payload;
182            let should_include = match payload {
183                RecordPayload::BeginTx { tx_id } => {
184                    in_committed_tx = committed_tx_ids.contains(tx_id);
185                    current_tx_id = Some(*tx_id);
186                    in_committed_tx
187                }
188                RecordPayload::CommitTx { tx_id } => {
189                    let include = committed_tx_ids.contains(tx_id);
190                    if current_tx_id == Some(*tx_id) {
191                        current_tx_id = None;
192                        in_committed_tx = false;
193                    }
194                    include
195                }
196                RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
197                    if current_tx_id.is_some() {
198                        // In a transaction
199                        in_committed_tx
200                    } else {
201                        // Standalone record
202                        true
203                    }
204                }
205                RecordPayload::Checkpoint { .. } => {
206                    // Include checkpoint markers
207                    true
208                }
209            };
210
211            if should_include {
212                result.push(record);
213            }
214        }
215
216        Ok(result)
217    }
218
219    /// Check if an error is recoverable (we can continue without the corrupted data)
220    fn is_recoverable_error(err: &Error) -> bool {
221        match err {
222            Error::Storage(msg) => msg.contains("CRC mismatch"),
223            Error::Serialization(msg) => {
224                msg.contains("Incomplete") || msg.contains("truncated")
225            }
226            _ => false,
227        }
228    }
229
230    /// Get statistics about the WAL
231    pub fn get_stats(&self) -> Result<RecoveryStats> {
232        let mut reader = WalReader::new(&self.config.wal_dir)?;
233
234        let mut stats = RecoveryStats {
235            segment_count: reader.segment_count(),
236            total_records: 0,
237            put_records: 0,
238            delete_records: 0,
239            transactions_started: 0,
240            transactions_committed: 0,
241            transactions_incomplete: 0,
242            checkpoints: 0,
243        };
244
245        let mut active_transactions: HashSet<u64> = HashSet::new();
246
247        loop {
248            match reader.next_record() {
249                Ok(Some(record)) => {
250                    stats.total_records += 1;
251                    match &record.payload {
252                        RecordPayload::Put { .. } => stats.put_records += 1,
253                        RecordPayload::Delete { .. } => stats.delete_records += 1,
254                        RecordPayload::BeginTx { tx_id } => {
255                            stats.transactions_started += 1;
256                            active_transactions.insert(*tx_id);
257                        }
258                        RecordPayload::CommitTx { tx_id } => {
259                            stats.transactions_committed += 1;
260                            active_transactions.remove(tx_id);
261                        }
262                        RecordPayload::Checkpoint { .. } => stats.checkpoints += 1,
263                    }
264                }
265                Ok(None) => break,
266                Err(_) => break,
267            }
268        }
269
270        stats.transactions_incomplete = active_transactions.len();
271
272        Ok(stats)
273    }
274}
275
276/// Statistics about the WAL state
277#[derive(Debug, Clone, Default)]
278pub struct RecoveryStats {
279    /// Number of segment files
280    pub segment_count: usize,
281    /// Total number of records
282    pub total_records: usize,
283    /// Number of PUT records
284    pub put_records: usize,
285    /// Number of DELETE records
286    pub delete_records: usize,
287    /// Number of transactions started
288    pub transactions_started: usize,
289    /// Number of transactions committed
290    pub transactions_committed: usize,
291    /// Number of incomplete transactions (started but not committed)
292    pub transactions_incomplete: usize,
293    /// Number of checkpoint records
294    pub checkpoints: usize,
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::{RecordType, SyncMode, WalWriter};
301    use tempfile::TempDir;
302
303    fn setup_test_wal() -> (TempDir, WalConfig) {
304        let temp_dir = TempDir::new().expect("Failed to create temp dir");
305        let wal_path = temp_dir.path().join("wal");
306        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
307
308        let config = WalConfig {
309            wal_dir: wal_path,
310            sync_mode: SyncMode::Sync,
311            max_segment_size: 64 * 1024 * 1024,
312        };
313
314        (temp_dir, config)
315    }
316
317    #[test]
318    fn test_recovery_empty_wal() {
319        let (_temp_dir, config) = setup_test_wal();
320
321        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
322        let records = recovery.recover().expect("Failed to recover");
323
324        assert!(records.is_empty());
325    }
326
327    #[test]
328    fn test_recovery_standalone_records() {
329        let (_temp_dir, config) = setup_test_wal();
330
331        // Write standalone records (no transaction)
332        {
333            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
334                .expect("Failed to create writer");
335
336            for i in 0..5 {
337                let record = WalRecord::put(
338                    format!("key{}", i).into_bytes(),
339                    format!("value{}", i).into_bytes(),
340                );
341                writer.append(record).expect("Failed to append");
342            }
343            writer.sync().expect("Failed to sync");
344        }
345
346        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
347        let records = recovery.recover().expect("Failed to recover");
348
349        assert_eq!(records.len(), 5);
350    }
351
352    #[test]
353    fn test_recovery_committed_transaction() {
354        let (_temp_dir, config) = setup_test_wal();
355
356        // Write a complete transaction
357        {
358            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
359                .expect("Failed to create writer");
360
361            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
362            writer
363                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
364                .expect("Failed to append");
365            writer
366                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
367                .expect("Failed to append");
368            writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
369            writer.sync().expect("Failed to sync");
370        }
371
372        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
373        let records = recovery.recover().expect("Failed to recover");
374
375        // Should have 2 PUT records (BEGIN and COMMIT are filtered out by recover())
376        assert_eq!(records.len(), 2);
377        assert_eq!(records[0].record_type, RecordType::Put);
378        assert_eq!(records[1].record_type, RecordType::Put);
379    }
380
381    #[test]
382    fn test_recovery_incomplete_transaction_rollback() {
383        let (_temp_dir, config) = setup_test_wal();
384
385        // Write an incomplete transaction (no COMMIT)
386        {
387            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
388                .expect("Failed to create writer");
389
390            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
391            writer
392                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
393                .expect("Failed to append");
394            writer
395                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
396                .expect("Failed to append");
397            // NO COMMIT - simulating crash
398            writer.sync().expect("Failed to sync");
399        }
400
401        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
402        let records = recovery.recover().expect("Failed to recover");
403
404        // Incomplete transaction should be rolled back - no records recovered
405        assert_eq!(records.len(), 0);
406    }
407
408    #[test]
409    fn test_recovery_mixed_committed_and_incomplete() {
410        let (_temp_dir, config) = setup_test_wal();
411
412        // Write one complete and one incomplete transaction
413        {
414            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
415                .expect("Failed to create writer");
416
417            // Transaction 1: Complete
418            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
419            writer
420                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
421                .expect("Failed to append");
422            writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
423
424            // Transaction 2: Incomplete
425            writer.append(WalRecord::begin_tx(2)).expect("Failed to append");
426            writer
427                .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
428                .expect("Failed to append");
429            // NO COMMIT
430            writer.sync().expect("Failed to sync");
431        }
432
433        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
434        let records = recovery.recover().expect("Failed to recover");
435
436        // Only records from committed transaction
437        assert_eq!(records.len(), 1);
438    }
439
440    #[test]
441    fn test_recovery_with_markers() {
442        let (_temp_dir, config) = setup_test_wal();
443
444        // Write a complete transaction
445        {
446            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
447                .expect("Failed to create writer");
448
449            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
450            writer
451                .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
452                .expect("Failed to append");
453            writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
454            writer.sync().expect("Failed to sync");
455        }
456
457        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
458        let records = recovery.recover_with_markers().expect("Failed to recover");
459
460        // Should have all 3 records including markers
461        assert_eq!(records.len(), 3);
462        assert_eq!(records[0].record_type, RecordType::BeginTx);
463        assert_eq!(records[1].record_type, RecordType::Put);
464        assert_eq!(records[2].record_type, RecordType::CommitTx);
465    }
466
467    #[test]
468    fn test_recovery_stats() {
469        let (_temp_dir, config) = setup_test_wal();
470
471        // Write various records
472        {
473            let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
474                .expect("Failed to create writer");
475
476            // Complete transaction
477            writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
478            writer
479                .append(WalRecord::put(b"k1".to_vec(), b"v1".to_vec()))
480                .expect("Failed to append");
481            writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
482
483            // Incomplete transaction
484            writer.append(WalRecord::begin_tx(2)).expect("Failed to append");
485            writer
486                .append(WalRecord::delete(b"k2".to_vec()))
487                .expect("Failed to append");
488
489            writer.sync().expect("Failed to sync");
490        }
491
492        let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
493        let stats = recovery.get_stats().expect("Failed to get stats");
494
495        assert_eq!(stats.total_records, 5);
496        assert_eq!(stats.put_records, 1);
497        assert_eq!(stats.delete_records, 1);
498        assert_eq!(stats.transactions_started, 2);
499        assert_eq!(stats.transactions_committed, 1);
500        assert_eq!(stats.transactions_incomplete, 1);
501    }
502}