1use crate::record::RecordPayload;
10use crate::{WalConfig, WalReader, WalRecord};
11use rustlite_core::{Error, Result};
12use std::collections::{HashMap, HashSet};
13
14pub struct RecoveryManager {
16 config: WalConfig,
17}
18
19#[derive(Debug, Clone)]
21struct TransactionState {
22 records: Vec<WalRecord>,
24 committed: bool,
26}
27
28impl RecoveryManager {
29 pub fn new(config: WalConfig) -> Result<Self> {
31 Ok(Self { config })
32 }
33
34 pub fn recover(&self) -> Result<Vec<WalRecord>> {
44 let mut reader = WalReader::new(&self.config.wal_dir)?;
45
46 if reader.segment_count() == 0 {
47 return Ok(Vec::new());
48 }
49
50 let mut transactions: HashMap<u64, TransactionState> = HashMap::new();
52 let mut committed_tx_ids: HashSet<u64> = HashSet::new();
53
54 let mut standalone_records: Vec<WalRecord> = Vec::new();
56
57 let mut current_tx_id: Option<u64> = None;
59
60 loop {
62 match reader.next_record() {
63 Ok(Some(record)) => {
64 match &record.payload {
65 RecordPayload::BeginTx { tx_id } => {
66 transactions.insert(
68 *tx_id,
69 TransactionState {
70 records: Vec::new(),
71 committed: false,
72 },
73 );
74 current_tx_id = Some(*tx_id);
75 }
76 RecordPayload::CommitTx { tx_id } => {
77 if let Some(tx_state) = transactions.get_mut(tx_id) {
79 tx_state.committed = true;
80 committed_tx_ids.insert(*tx_id);
81 }
82 if current_tx_id == Some(*tx_id) {
84 current_tx_id = None;
85 }
86 }
87 RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
88 if let Some(tx_id) = current_tx_id {
90 if let Some(tx_state) = transactions.get_mut(&tx_id) {
91 tx_state.records.push(record);
92 } else {
93 standalone_records.push(record);
95 }
96 } else {
97 standalone_records.push(record);
99 }
100 }
101 RecordPayload::Checkpoint { .. } => {
102 }
105 }
106 }
107 Ok(None) => {
108 break;
110 }
111 Err(e) => {
112 if Self::is_recoverable_error(&e) {
116 break;
117 }
118 return Err(e);
119 }
120 }
121 }
122
123 let mut result = standalone_records;
125
126 let mut committed_txs: Vec<_> = transactions
129 .into_iter()
130 .filter(|(_, state)| state.committed)
131 .collect();
132 committed_txs.sort_by_key(|(tx_id, _)| *tx_id);
133
134 for (_, tx_state) in committed_txs {
135 result.extend(tx_state.records);
136 }
137
138 Ok(result)
139 }
140
141 pub fn recover_with_markers(&self) -> Result<Vec<WalRecord>> {
147 let mut reader = WalReader::new(&self.config.wal_dir)?;
148
149 if reader.segment_count() == 0 {
150 return Ok(Vec::new());
151 }
152
153 let mut committed_tx_ids: HashSet<u64> = HashSet::new();
155 let mut all_records: Vec<WalRecord> = Vec::new();
156
157 loop {
158 match reader.next_record() {
159 Ok(Some(record)) => {
160 if let RecordPayload::CommitTx { tx_id } = &record.payload {
161 committed_tx_ids.insert(*tx_id);
162 }
163 all_records.push(record);
164 }
165 Ok(None) => break,
166 Err(e) => {
167 if Self::is_recoverable_error(&e) {
168 break;
169 }
170 return Err(e);
171 }
172 }
173 }
174
175 let mut result: Vec<WalRecord> = Vec::new();
177 let mut current_tx_id: Option<u64> = None;
178 let mut in_committed_tx = false;
179
180 for record in all_records {
181 let payload = &record.payload;
182 let should_include = match payload {
183 RecordPayload::BeginTx { tx_id } => {
184 in_committed_tx = committed_tx_ids.contains(tx_id);
185 current_tx_id = Some(*tx_id);
186 in_committed_tx
187 }
188 RecordPayload::CommitTx { tx_id } => {
189 let include = committed_tx_ids.contains(tx_id);
190 if current_tx_id == Some(*tx_id) {
191 current_tx_id = None;
192 in_committed_tx = false;
193 }
194 include
195 }
196 RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
197 if current_tx_id.is_some() {
198 in_committed_tx
200 } else {
201 true
203 }
204 }
205 RecordPayload::Checkpoint { .. } => {
206 true
208 }
209 };
210
211 if should_include {
212 result.push(record);
213 }
214 }
215
216 Ok(result)
217 }
218
219 fn is_recoverable_error(err: &Error) -> bool {
221 match err {
222 Error::Storage(msg) => msg.contains("CRC mismatch"),
223 Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
224 _ => false,
225 }
226 }
227
228 pub fn get_stats(&self) -> Result<RecoveryStats> {
230 let mut reader = WalReader::new(&self.config.wal_dir)?;
231
232 let mut stats = RecoveryStats {
233 segment_count: reader.segment_count(),
234 total_records: 0,
235 put_records: 0,
236 delete_records: 0,
237 transactions_started: 0,
238 transactions_committed: 0,
239 transactions_incomplete: 0,
240 checkpoints: 0,
241 };
242
243 let mut active_transactions: HashSet<u64> = HashSet::new();
244
245 loop {
246 match reader.next_record() {
247 Ok(Some(record)) => {
248 stats.total_records += 1;
249 match &record.payload {
250 RecordPayload::Put { .. } => stats.put_records += 1,
251 RecordPayload::Delete { .. } => stats.delete_records += 1,
252 RecordPayload::BeginTx { tx_id } => {
253 stats.transactions_started += 1;
254 active_transactions.insert(*tx_id);
255 }
256 RecordPayload::CommitTx { tx_id } => {
257 stats.transactions_committed += 1;
258 active_transactions.remove(tx_id);
259 }
260 RecordPayload::Checkpoint { .. } => stats.checkpoints += 1,
261 }
262 }
263 Ok(None) => break,
264 Err(_) => break,
265 }
266 }
267
268 stats.transactions_incomplete = active_transactions.len();
269
270 Ok(stats)
271 }
272}
273
274#[derive(Debug, Clone, Default)]
276pub struct RecoveryStats {
277 pub segment_count: usize,
279 pub total_records: usize,
281 pub put_records: usize,
283 pub delete_records: usize,
285 pub transactions_started: usize,
287 pub transactions_committed: usize,
289 pub transactions_incomplete: usize,
291 pub checkpoints: usize,
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::{RecordType, SyncMode, WalWriter};
299 use tempfile::TempDir;
300
301 fn setup_test_wal() -> (TempDir, WalConfig) {
302 let temp_dir = TempDir::new().expect("Failed to create temp dir");
303 let wal_path = temp_dir.path().join("wal");
304 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
305
306 let config = WalConfig {
307 wal_dir: wal_path,
308 sync_mode: SyncMode::Sync,
309 max_segment_size: 64 * 1024 * 1024,
310 };
311
312 (temp_dir, config)
313 }
314
315 #[test]
316 fn test_recovery_empty_wal() {
317 let (_temp_dir, config) = setup_test_wal();
318
319 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
320 let records = recovery.recover().expect("Failed to recover");
321
322 assert!(records.is_empty());
323 }
324
325 #[test]
326 fn test_recovery_standalone_records() {
327 let (_temp_dir, config) = setup_test_wal();
328
329 {
331 let mut writer =
332 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
333 .expect("Failed to create writer");
334
335 for i in 0..5 {
336 let record = WalRecord::put(
337 format!("key{}", i).into_bytes(),
338 format!("value{}", i).into_bytes(),
339 );
340 writer.append(record).expect("Failed to append");
341 }
342 writer.sync().expect("Failed to sync");
343 }
344
345 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
346 let records = recovery.recover().expect("Failed to recover");
347
348 assert_eq!(records.len(), 5);
349 }
350
351 #[test]
352 fn test_recovery_committed_transaction() {
353 let (_temp_dir, config) = setup_test_wal();
354
355 {
357 let mut writer =
358 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
359 .expect("Failed to create writer");
360
361 writer
362 .append(WalRecord::begin_tx(1))
363 .expect("Failed to append");
364 writer
365 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
366 .expect("Failed to append");
367 writer
368 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
369 .expect("Failed to append");
370 writer
371 .append(WalRecord::commit_tx(1))
372 .expect("Failed to append");
373 writer.sync().expect("Failed to sync");
374 }
375
376 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
377 let records = recovery.recover().expect("Failed to recover");
378
379 assert_eq!(records.len(), 2);
381 assert_eq!(records[0].record_type, RecordType::Put);
382 assert_eq!(records[1].record_type, RecordType::Put);
383 }
384
385 #[test]
386 fn test_recovery_incomplete_transaction_rollback() {
387 let (_temp_dir, config) = setup_test_wal();
388
389 {
391 let mut writer =
392 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
393 .expect("Failed to create writer");
394
395 writer
396 .append(WalRecord::begin_tx(1))
397 .expect("Failed to append");
398 writer
399 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
400 .expect("Failed to append");
401 writer
402 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
403 .expect("Failed to append");
404 writer.sync().expect("Failed to sync");
406 }
407
408 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
409 let records = recovery.recover().expect("Failed to recover");
410
411 assert_eq!(records.len(), 0);
413 }
414
415 #[test]
416 fn test_recovery_mixed_committed_and_incomplete() {
417 let (_temp_dir, config) = setup_test_wal();
418
419 {
421 let mut writer =
422 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
423 .expect("Failed to create writer");
424
425 writer
427 .append(WalRecord::begin_tx(1))
428 .expect("Failed to append");
429 writer
430 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
431 .expect("Failed to append");
432 writer
433 .append(WalRecord::commit_tx(1))
434 .expect("Failed to append");
435
436 writer
438 .append(WalRecord::begin_tx(2))
439 .expect("Failed to append");
440 writer
441 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
442 .expect("Failed to append");
443 writer.sync().expect("Failed to sync");
445 }
446
447 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
448 let records = recovery.recover().expect("Failed to recover");
449
450 assert_eq!(records.len(), 1);
452 }
453
454 #[test]
455 fn test_recovery_with_markers() {
456 let (_temp_dir, config) = setup_test_wal();
457
458 {
460 let mut writer =
461 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
462 .expect("Failed to create writer");
463
464 writer
465 .append(WalRecord::begin_tx(1))
466 .expect("Failed to append");
467 writer
468 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
469 .expect("Failed to append");
470 writer
471 .append(WalRecord::commit_tx(1))
472 .expect("Failed to append");
473 writer.sync().expect("Failed to sync");
474 }
475
476 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
477 let records = recovery.recover_with_markers().expect("Failed to recover");
478
479 assert_eq!(records.len(), 3);
481 assert_eq!(records[0].record_type, RecordType::BeginTx);
482 assert_eq!(records[1].record_type, RecordType::Put);
483 assert_eq!(records[2].record_type, RecordType::CommitTx);
484 }
485
486 #[test]
487 fn test_recovery_stats() {
488 let (_temp_dir, config) = setup_test_wal();
489
490 {
492 let mut writer =
493 WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
494 .expect("Failed to create writer");
495
496 writer
498 .append(WalRecord::begin_tx(1))
499 .expect("Failed to append");
500 writer
501 .append(WalRecord::put(b"k1".to_vec(), b"v1".to_vec()))
502 .expect("Failed to append");
503 writer
504 .append(WalRecord::commit_tx(1))
505 .expect("Failed to append");
506
507 writer
509 .append(WalRecord::begin_tx(2))
510 .expect("Failed to append");
511 writer
512 .append(WalRecord::delete(b"k2".to_vec()))
513 .expect("Failed to append");
514
515 writer.sync().expect("Failed to sync");
516 }
517
518 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
519 let stats = recovery.get_stats().expect("Failed to get stats");
520
521 assert_eq!(stats.total_records, 5);
522 assert_eq!(stats.put_records, 1);
523 assert_eq!(stats.delete_records, 1);
524 assert_eq!(stats.transactions_started, 2);
525 assert_eq!(stats.transactions_committed, 1);
526 assert_eq!(stats.transactions_incomplete, 1);
527 }
528}