1use crate::{WalConfig, WalReader, WalRecord};
10use crate::record::RecordPayload;
11use rustlite_core::{Error, Result};
12use std::collections::{HashMap, HashSet};
13
14pub struct RecoveryManager {
16 config: WalConfig,
17}
18
19#[derive(Debug, Clone)]
21struct TransactionState {
22 records: Vec<WalRecord>,
24 committed: bool,
26}
27
28impl RecoveryManager {
29 pub fn new(config: WalConfig) -> Result<Self> {
31 Ok(Self { config })
32 }
33
34 pub fn recover(&self) -> Result<Vec<WalRecord>> {
44 let mut reader = WalReader::new(&self.config.wal_dir)?;
45
46 if reader.segment_count() == 0 {
47 return Ok(Vec::new());
48 }
49
50 let mut transactions: HashMap<u64, TransactionState> = HashMap::new();
52 let mut committed_tx_ids: HashSet<u64> = HashSet::new();
53
54 let mut standalone_records: Vec<WalRecord> = Vec::new();
56
57 let mut current_tx_id: Option<u64> = None;
59
60 loop {
62 match reader.next_record() {
63 Ok(Some(record)) => {
64 match &record.payload {
65 RecordPayload::BeginTx { tx_id } => {
66 transactions.insert(
68 *tx_id,
69 TransactionState {
70 records: Vec::new(),
71 committed: false,
72 },
73 );
74 current_tx_id = Some(*tx_id);
75 }
76 RecordPayload::CommitTx { tx_id } => {
77 if let Some(tx_state) = transactions.get_mut(tx_id) {
79 tx_state.committed = true;
80 committed_tx_ids.insert(*tx_id);
81 }
82 if current_tx_id == Some(*tx_id) {
84 current_tx_id = None;
85 }
86 }
87 RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
88 if let Some(tx_id) = current_tx_id {
90 if let Some(tx_state) = transactions.get_mut(&tx_id) {
91 tx_state.records.push(record);
92 } else {
93 standalone_records.push(record);
95 }
96 } else {
97 standalone_records.push(record);
99 }
100 }
101 RecordPayload::Checkpoint { .. } => {
102 }
105 }
106 }
107 Ok(None) => {
108 break;
110 }
111 Err(e) => {
112 if Self::is_recoverable_error(&e) {
116 break;
117 }
118 return Err(e);
119 }
120 }
121 }
122
123 let mut result = standalone_records;
125
126 let mut committed_txs: Vec<_> = transactions
129 .into_iter()
130 .filter(|(_, state)| state.committed)
131 .collect();
132 committed_txs.sort_by_key(|(tx_id, _)| *tx_id);
133
134 for (_, tx_state) in committed_txs {
135 result.extend(tx_state.records);
136 }
137
138 Ok(result)
139 }
140
141 pub fn recover_with_markers(&self) -> Result<Vec<WalRecord>> {
147 let mut reader = WalReader::new(&self.config.wal_dir)?;
148
149 if reader.segment_count() == 0 {
150 return Ok(Vec::new());
151 }
152
153 let mut committed_tx_ids: HashSet<u64> = HashSet::new();
155 let mut all_records: Vec<WalRecord> = Vec::new();
156
157 loop {
158 match reader.next_record() {
159 Ok(Some(record)) => {
160 if let RecordPayload::CommitTx { tx_id } = &record.payload {
161 committed_tx_ids.insert(*tx_id);
162 }
163 all_records.push(record);
164 }
165 Ok(None) => break,
166 Err(e) => {
167 if Self::is_recoverable_error(&e) {
168 break;
169 }
170 return Err(e);
171 }
172 }
173 }
174
175 let mut result: Vec<WalRecord> = Vec::new();
177 let mut current_tx_id: Option<u64> = None;
178 let mut in_committed_tx = false;
179
180 for record in all_records {
181 let payload = &record.payload;
182 let should_include = match payload {
183 RecordPayload::BeginTx { tx_id } => {
184 in_committed_tx = committed_tx_ids.contains(tx_id);
185 current_tx_id = Some(*tx_id);
186 in_committed_tx
187 }
188 RecordPayload::CommitTx { tx_id } => {
189 let include = committed_tx_ids.contains(tx_id);
190 if current_tx_id == Some(*tx_id) {
191 current_tx_id = None;
192 in_committed_tx = false;
193 }
194 include
195 }
196 RecordPayload::Put { .. } | RecordPayload::Delete { .. } => {
197 if current_tx_id.is_some() {
198 in_committed_tx
200 } else {
201 true
203 }
204 }
205 RecordPayload::Checkpoint { .. } => {
206 true
208 }
209 };
210
211 if should_include {
212 result.push(record);
213 }
214 }
215
216 Ok(result)
217 }
218
219 fn is_recoverable_error(err: &Error) -> bool {
221 match err {
222 Error::Storage(msg) => msg.contains("CRC mismatch"),
223 Error::Serialization(msg) => {
224 msg.contains("Incomplete") || msg.contains("truncated")
225 }
226 _ => false,
227 }
228 }
229
230 pub fn get_stats(&self) -> Result<RecoveryStats> {
232 let mut reader = WalReader::new(&self.config.wal_dir)?;
233
234 let mut stats = RecoveryStats {
235 segment_count: reader.segment_count(),
236 total_records: 0,
237 put_records: 0,
238 delete_records: 0,
239 transactions_started: 0,
240 transactions_committed: 0,
241 transactions_incomplete: 0,
242 checkpoints: 0,
243 };
244
245 let mut active_transactions: HashSet<u64> = HashSet::new();
246
247 loop {
248 match reader.next_record() {
249 Ok(Some(record)) => {
250 stats.total_records += 1;
251 match &record.payload {
252 RecordPayload::Put { .. } => stats.put_records += 1,
253 RecordPayload::Delete { .. } => stats.delete_records += 1,
254 RecordPayload::BeginTx { tx_id } => {
255 stats.transactions_started += 1;
256 active_transactions.insert(*tx_id);
257 }
258 RecordPayload::CommitTx { tx_id } => {
259 stats.transactions_committed += 1;
260 active_transactions.remove(tx_id);
261 }
262 RecordPayload::Checkpoint { .. } => stats.checkpoints += 1,
263 }
264 }
265 Ok(None) => break,
266 Err(_) => break,
267 }
268 }
269
270 stats.transactions_incomplete = active_transactions.len();
271
272 Ok(stats)
273 }
274}
275
276#[derive(Debug, Clone, Default)]
278pub struct RecoveryStats {
279 pub segment_count: usize,
281 pub total_records: usize,
283 pub put_records: usize,
285 pub delete_records: usize,
287 pub transactions_started: usize,
289 pub transactions_committed: usize,
291 pub transactions_incomplete: usize,
293 pub checkpoints: usize,
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::{RecordType, SyncMode, WalWriter};
301 use tempfile::TempDir;
302
303 fn setup_test_wal() -> (TempDir, WalConfig) {
304 let temp_dir = TempDir::new().expect("Failed to create temp dir");
305 let wal_path = temp_dir.path().join("wal");
306 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
307
308 let config = WalConfig {
309 wal_dir: wal_path,
310 sync_mode: SyncMode::Sync,
311 max_segment_size: 64 * 1024 * 1024,
312 };
313
314 (temp_dir, config)
315 }
316
317 #[test]
318 fn test_recovery_empty_wal() {
319 let (_temp_dir, config) = setup_test_wal();
320
321 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
322 let records = recovery.recover().expect("Failed to recover");
323
324 assert!(records.is_empty());
325 }
326
327 #[test]
328 fn test_recovery_standalone_records() {
329 let (_temp_dir, config) = setup_test_wal();
330
331 {
333 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
334 .expect("Failed to create writer");
335
336 for i in 0..5 {
337 let record = WalRecord::put(
338 format!("key{}", i).into_bytes(),
339 format!("value{}", i).into_bytes(),
340 );
341 writer.append(record).expect("Failed to append");
342 }
343 writer.sync().expect("Failed to sync");
344 }
345
346 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
347 let records = recovery.recover().expect("Failed to recover");
348
349 assert_eq!(records.len(), 5);
350 }
351
352 #[test]
353 fn test_recovery_committed_transaction() {
354 let (_temp_dir, config) = setup_test_wal();
355
356 {
358 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
359 .expect("Failed to create writer");
360
361 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
362 writer
363 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
364 .expect("Failed to append");
365 writer
366 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
367 .expect("Failed to append");
368 writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
369 writer.sync().expect("Failed to sync");
370 }
371
372 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
373 let records = recovery.recover().expect("Failed to recover");
374
375 assert_eq!(records.len(), 2);
377 assert_eq!(records[0].record_type, RecordType::Put);
378 assert_eq!(records[1].record_type, RecordType::Put);
379 }
380
381 #[test]
382 fn test_recovery_incomplete_transaction_rollback() {
383 let (_temp_dir, config) = setup_test_wal();
384
385 {
387 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
388 .expect("Failed to create writer");
389
390 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
391 writer
392 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
393 .expect("Failed to append");
394 writer
395 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
396 .expect("Failed to append");
397 writer.sync().expect("Failed to sync");
399 }
400
401 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
402 let records = recovery.recover().expect("Failed to recover");
403
404 assert_eq!(records.len(), 0);
406 }
407
408 #[test]
409 fn test_recovery_mixed_committed_and_incomplete() {
410 let (_temp_dir, config) = setup_test_wal();
411
412 {
414 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
415 .expect("Failed to create writer");
416
417 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
419 writer
420 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
421 .expect("Failed to append");
422 writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
423
424 writer.append(WalRecord::begin_tx(2)).expect("Failed to append");
426 writer
427 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
428 .expect("Failed to append");
429 writer.sync().expect("Failed to sync");
431 }
432
433 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
434 let records = recovery.recover().expect("Failed to recover");
435
436 assert_eq!(records.len(), 1);
438 }
439
440 #[test]
441 fn test_recovery_with_markers() {
442 let (_temp_dir, config) = setup_test_wal();
443
444 {
446 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
447 .expect("Failed to create writer");
448
449 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
450 writer
451 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
452 .expect("Failed to append");
453 writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
454 writer.sync().expect("Failed to sync");
455 }
456
457 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
458 let records = recovery.recover_with_markers().expect("Failed to recover");
459
460 assert_eq!(records.len(), 3);
462 assert_eq!(records[0].record_type, RecordType::BeginTx);
463 assert_eq!(records[1].record_type, RecordType::Put);
464 assert_eq!(records[2].record_type, RecordType::CommitTx);
465 }
466
467 #[test]
468 fn test_recovery_stats() {
469 let (_temp_dir, config) = setup_test_wal();
470
471 {
473 let mut writer = WalWriter::new(&config.wal_dir, config.max_segment_size, config.sync_mode)
474 .expect("Failed to create writer");
475
476 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
478 writer
479 .append(WalRecord::put(b"k1".to_vec(), b"v1".to_vec()))
480 .expect("Failed to append");
481 writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
482
483 writer.append(WalRecord::begin_tx(2)).expect("Failed to append");
485 writer
486 .append(WalRecord::delete(b"k2".to_vec()))
487 .expect("Failed to append");
488
489 writer.sync().expect("Failed to sync");
490 }
491
492 let recovery = RecoveryManager::new(config).expect("Failed to create recovery manager");
493 let stats = recovery.get_stats().expect("Failed to get stats");
494
495 assert_eq!(stats.total_records, 5);
496 assert_eq!(stats.put_records, 1);
497 assert_eq!(stats.delete_records, 1);
498 assert_eq!(stats.transactions_started, 2);
499 assert_eq!(stats.transactions_committed, 1);
500 assert_eq!(stats.transactions_incomplete, 1);
501 }
502}