Skip to main content

grafeo_adapters/storage/wal/
recovery.rs

1//! WAL recovery.
2
3use super::record::WalEntry;
4use super::{CheckpointMetadata, WalManager, WalRecord};
5use grafeo_common::utils::error::{Error, Result, StorageError};
6use std::fs::File;
7use std::io::{BufReader, Read};
8use std::path::Path;
9
10/// Name of the checkpoint metadata file.
11const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
12
13/// Handles WAL recovery after a crash.
14pub struct WalRecovery {
15    /// Directory containing WAL files.
16    dir: std::path::PathBuf,
17}
18
19impl WalRecovery {
20    /// Creates a new recovery handler for the given WAL directory.
21    pub fn new(dir: impl AsRef<Path>) -> Self {
22        Self {
23            dir: dir.as_ref().to_path_buf(),
24        }
25    }
26
27    /// Creates a recovery handler from a WAL manager.
28    #[must_use]
29    pub fn from_wal(wal: &WalManager) -> Self {
30        Self {
31            dir: wal.dir().to_path_buf(),
32        }
33    }
34
35    /// Reads checkpoint metadata if it exists.
36    ///
37    /// Returns `None` if no checkpoint metadata is found.
38    pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
39        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
40
41        if !metadata_path.exists() {
42            return Ok(None);
43        }
44
45        let file = File::open(&metadata_path)?;
46        let mut reader = BufReader::new(file);
47        let mut data = Vec::new();
48        reader.read_to_end(&mut data)?;
49
50        let (metadata, _): (CheckpointMetadata, _) =
51            bincode::serde::decode_from_slice(&data, bincode::config::standard())
52                .map_err(|e| Error::Serialization(e.to_string()))?;
53
54        Ok(Some(metadata))
55    }
56
57    /// Returns the checkpoint metadata, if any.
58    ///
59    /// This is useful for determining whether to perform a full or
60    /// incremental recovery.
61    #[must_use]
62    pub fn checkpoint(&self) -> Option<CheckpointMetadata> {
63        self.read_checkpoint_metadata().ok().flatten()
64    }
65
66    /// Recovers committed records from all WAL files.
67    ///
68    /// Returns only records that were part of committed transactions.
69    /// If checkpoint metadata exists, only replays files from the
70    /// checkpoint sequence onwards.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if recovery fails.
75    pub fn recover(&self) -> Result<Vec<WalRecord>> {
76        self.recover_as::<WalRecord>()
77    }
78
79    /// Recovers committed records of a specific type from all WAL files.
80    ///
81    /// This is the generic version of [`recover`](Self::recover). Use it
82    /// when recovering a WAL that stores a custom record type.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if recovery fails.
87    pub fn recover_as<R: WalEntry>(&self) -> Result<Vec<R>> {
88        let checkpoint = self.read_checkpoint_metadata()?;
89        self.recover_internal_as::<R>(checkpoint)
90    }
91
92    /// Recovers committed records, starting from a specific checkpoint.
93    ///
94    /// This can be used for incremental recovery when you want to
95    /// skip WAL files that precede the checkpoint.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if recovery fails.
100    pub fn recover_from_checkpoint(
101        &self,
102        checkpoint: Option<&CheckpointMetadata>,
103    ) -> Result<Vec<WalRecord>> {
104        self.recover_internal_as::<WalRecord>(checkpoint.cloned())
105    }
106
107    fn recover_internal_as<R: WalEntry>(
108        &self,
109        checkpoint: Option<CheckpointMetadata>,
110    ) -> Result<Vec<R>> {
111        let mut current_tx_records = Vec::new();
112        let mut committed_records = Vec::new();
113
114        // Get all log files in order
115        let log_files = self.get_log_files()?;
116
117        // Determine the minimum sequence number to process
118        let min_sequence = checkpoint.as_ref().map_or(0, |cp| cp.log_sequence);
119
120        if checkpoint.is_some() {
121            tracing::info!(
122                "Recovering from checkpoint at epoch {:?}, starting from log sequence {}",
123                checkpoint.as_ref().map(|c| c.epoch),
124                min_sequence
125            );
126        }
127
128        // Read log files in sequence, skipping those before checkpoint
129        for log_file in log_files {
130            // Extract sequence number from filename
131            let sequence = Self::sequence_from_path(&log_file).unwrap_or(0);
132
133            // Skip files that are completely before the checkpoint
134            // We include the checkpoint sequence file because it may contain
135            // records after the checkpoint record itself
136            if sequence < min_sequence {
137                tracing::debug!(
138                    "Skipping log file {:?} (sequence {} < checkpoint {})",
139                    log_file,
140                    sequence,
141                    min_sequence
142                );
143                continue;
144            }
145
146            let file = match File::open(&log_file) {
147                Ok(f) => f,
148                Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
149                Err(e) => return Err(e.into()),
150            };
151            let mut reader = BufReader::new(file);
152
153            // Read all records from this file
154            loop {
155                match self.read_record_as::<R>(&mut reader) {
156                    Ok(Some(record)) => {
157                        if record.is_commit() {
158                            committed_records.append(&mut current_tx_records);
159                            committed_records.push(record);
160                        } else if record.is_abort() {
161                            current_tx_records.clear();
162                        } else if record.is_checkpoint() {
163                            current_tx_records.clear();
164                            committed_records.push(record);
165                        } else {
166                            current_tx_records.push(record);
167                        }
168                    }
169                    Ok(None) => break, // EOF
170                    Err(e) => {
171                        // Log corruption - stop reading this file but continue
172                        // with remaining files (best-effort recovery)
173                        tracing::warn!("WAL corruption detected in {:?}: {}", log_file, e);
174                        break;
175                    }
176                }
177            }
178        }
179
180        // Uncommitted records in current_tx_records are discarded
181
182        Ok(committed_records)
183    }
184
185    /// Extracts the sequence number from a WAL log file path.
186    fn sequence_from_path(path: &Path) -> Option<u64> {
187        path.file_stem()
188            .and_then(|s| s.to_str())
189            .and_then(|s| s.strip_prefix("wal_"))
190            .and_then(|s| s.parse().ok())
191    }
192
193    /// Recovers committed records from a single WAL file.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if recovery fails.
198    pub fn recover_file(&self, path: impl AsRef<Path>) -> Result<Vec<WalRecord>> {
199        self.recover_file_as::<WalRecord>(path)
200    }
201
202    /// Recovers committed records of a specific type from a single WAL file.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if recovery fails.
207    pub fn recover_file_as<R: WalEntry>(&self, path: impl AsRef<Path>) -> Result<Vec<R>> {
208        let file = File::open(path.as_ref())?;
209        let mut reader = BufReader::new(file);
210
211        let mut current_tx_records = Vec::new();
212        let mut committed_records = Vec::new();
213
214        loop {
215            match self.read_record_as::<R>(&mut reader) {
216                Ok(Some(record)) => {
217                    if record.is_commit() {
218                        committed_records.append(&mut current_tx_records);
219                        committed_records.push(record);
220                    } else if record.is_abort() {
221                        current_tx_records.clear();
222                    } else {
223                        current_tx_records.push(record);
224                    }
225                }
226                Ok(None) => break,
227                Err(e) => {
228                    tracing::warn!("WAL corruption detected: {}", e);
229                    break;
230                }
231            }
232        }
233
234        Ok(committed_records)
235    }
236
237    fn get_log_files(&self) -> Result<Vec<std::path::PathBuf>> {
238        let mut files = Vec::new();
239
240        if !self.dir.exists() {
241            return Ok(files);
242        }
243
244        if let Ok(entries) = std::fs::read_dir(&self.dir) {
245            for entry in entries.flatten() {
246                let path = entry.path();
247                if path.extension().is_some_and(|ext| ext == "log") {
248                    files.push(path);
249                }
250            }
251        }
252
253        // Sort by filename (which includes sequence number)
254        files.sort();
255
256        Ok(files)
257    }
258
259    fn read_record_as<R: WalEntry>(&self, reader: &mut BufReader<File>) -> Result<Option<R>> {
260        // Read length prefix
261        let mut len_buf = [0u8; 4];
262        match reader.read_exact(&mut len_buf) {
263            Ok(()) => {}
264            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
265            Err(e) => return Err(e.into()),
266        }
267        let len = u32::from_le_bytes(len_buf) as usize;
268
269        // Read data
270        let mut data = vec![0u8; len];
271        reader.read_exact(&mut data)?;
272
273        // Read and verify checksum
274        let mut checksum_buf = [0u8; 4];
275        reader.read_exact(&mut checksum_buf)?;
276        let stored_checksum = u32::from_le_bytes(checksum_buf);
277        let computed_checksum = crc32fast::hash(&data);
278
279        if stored_checksum != computed_checksum {
280            return Err(Error::Storage(StorageError::Corruption(
281                "WAL checksum mismatch".to_string(),
282            )));
283        }
284
285        // Deserialize
286        let (record, _): (R, _) =
287            bincode::serde::decode_from_slice(&data, bincode::config::standard())
288                .map_err(|e| Error::Serialization(e.to_string()))?;
289
290        Ok(Some(record))
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use grafeo_common::types::{NodeId, TransactionId};
298    use tempfile::tempdir;
299
300    #[test]
301    fn test_recovery_committed() {
302        let dir = tempdir().unwrap();
303
304        // Write some records
305        {
306            let wal = WalManager::open(dir.path()).unwrap();
307
308            wal.log(&WalRecord::CreateNode {
309                id: NodeId::new(1),
310                labels: vec!["Person".to_string()],
311            })
312            .unwrap();
313
314            wal.log(&WalRecord::TransactionCommit {
315                transaction_id: TransactionId::new(1),
316            })
317            .unwrap();
318
319            wal.sync().unwrap();
320        }
321
322        // Recover
323        let recovery = WalRecovery::new(dir.path());
324        let records = recovery.recover().unwrap();
325
326        assert_eq!(records.len(), 2);
327    }
328
329    #[test]
330    fn test_recovery_uncommitted() {
331        let dir = tempdir().unwrap();
332
333        // Write some records without commit
334        {
335            let wal = WalManager::open(dir.path()).unwrap();
336
337            wal.log(&WalRecord::CreateNode {
338                id: NodeId::new(1),
339                labels: vec!["Person".to_string()],
340            })
341            .unwrap();
342
343            // No commit!
344            wal.sync().unwrap();
345        }
346
347        // Recover
348        let recovery = WalRecovery::new(dir.path());
349        let records = recovery.recover().unwrap();
350
351        // Uncommitted records should be discarded
352        assert_eq!(records.len(), 0);
353    }
354
355    #[test]
356    fn test_recovery_multiple_files() {
357        let dir = tempdir().unwrap();
358
359        // Write records across multiple files
360        {
361            let config = super::super::WalConfig {
362                max_log_size: 100, // Force rotation
363                ..Default::default()
364            };
365            let wal = WalManager::with_config(dir.path(), config).unwrap();
366
367            // First transaction
368            for i in 0..5 {
369                wal.log(&WalRecord::CreateNode {
370                    id: NodeId::new(i),
371                    labels: vec!["Test".to_string()],
372                })
373                .unwrap();
374            }
375            wal.log(&WalRecord::TransactionCommit {
376                transaction_id: TransactionId::new(1),
377            })
378            .unwrap();
379
380            // Second transaction
381            for i in 5..10 {
382                wal.log(&WalRecord::CreateNode {
383                    id: NodeId::new(i),
384                    labels: vec!["Test".to_string()],
385                })
386                .unwrap();
387            }
388            wal.log(&WalRecord::TransactionCommit {
389                transaction_id: TransactionId::new(2),
390            })
391            .unwrap();
392
393            wal.sync().unwrap();
394        }
395
396        // Recover
397        let recovery = WalRecovery::new(dir.path());
398        let records = recovery.recover().unwrap();
399
400        // Should have 10 CreateNode + 2 TransactionCommit
401        assert_eq!(records.len(), 12);
402    }
403
404    #[test]
405    fn test_checkpoint_metadata() {
406        use grafeo_common::types::EpochId;
407
408        let dir = tempdir().unwrap();
409
410        // Write records and create a checkpoint
411        {
412            let wal = WalManager::open(dir.path()).unwrap();
413
414            // First transaction
415            wal.log(&WalRecord::CreateNode {
416                id: NodeId::new(1),
417                labels: vec!["Test".to_string()],
418            })
419            .unwrap();
420            wal.log(&WalRecord::TransactionCommit {
421                transaction_id: TransactionId::new(1),
422            })
423            .unwrap();
424
425            // Create checkpoint
426            wal.checkpoint(TransactionId::new(1), EpochId::new(10))
427                .unwrap();
428
429            // Second transaction after checkpoint
430            wal.log(&WalRecord::CreateNode {
431                id: NodeId::new(2),
432                labels: vec!["Test".to_string()],
433            })
434            .unwrap();
435            wal.log(&WalRecord::TransactionCommit {
436                transaction_id: TransactionId::new(2),
437            })
438            .unwrap();
439
440            wal.sync().unwrap();
441        }
442
443        // Verify checkpoint metadata was written
444        let recovery = WalRecovery::new(dir.path());
445        let checkpoint = recovery.checkpoint();
446        assert!(checkpoint.is_some(), "Checkpoint metadata should exist");
447
448        let cp = checkpoint.unwrap();
449        assert_eq!(cp.epoch.as_u64(), 10);
450        assert_eq!(cp.transaction_id.as_u64(), 1);
451    }
452
453    #[test]
454    fn test_recovery_from_checkpoint() {
455        use super::super::WalConfig;
456        use grafeo_common::types::EpochId;
457
458        let dir = tempdir().unwrap();
459
460        // Write records across multiple log files with checkpoint
461        {
462            let config = WalConfig {
463                max_log_size: 100, // Force rotation
464                ..Default::default()
465            };
466            let wal = WalManager::with_config(dir.path(), config).unwrap();
467
468            // First batch of records (should end up in early log files)
469            for i in 0..5 {
470                wal.log(&WalRecord::CreateNode {
471                    id: NodeId::new(i),
472                    labels: vec!["Before".to_string()],
473                })
474                .unwrap();
475            }
476            wal.log(&WalRecord::TransactionCommit {
477                transaction_id: TransactionId::new(1),
478            })
479            .unwrap();
480
481            // Create checkpoint
482            wal.checkpoint(TransactionId::new(1), EpochId::new(100))
483                .unwrap();
484
485            // Second batch after checkpoint
486            for i in 100..103 {
487                wal.log(&WalRecord::CreateNode {
488                    id: NodeId::new(i),
489                    labels: vec!["After".to_string()],
490                })
491                .unwrap();
492            }
493            wal.log(&WalRecord::TransactionCommit {
494                transaction_id: TransactionId::new(2),
495            })
496            .unwrap();
497
498            wal.sync().unwrap();
499        }
500
501        // Recovery should use checkpoint metadata to skip old files
502        let recovery = WalRecovery::new(dir.path());
503        let records = recovery.recover().unwrap();
504
505        // We should get all committed records (checkpoint metadata is used for optimization)
506        // The number depends on how many log files were skipped
507        assert!(!records.is_empty(), "Should recover some records");
508    }
509
510    #[test]
511    fn test_recover_as_generic() {
512        let dir = tempdir().unwrap();
513
514        // Write records using WalManager
515        {
516            let wal = WalManager::open(dir.path()).unwrap();
517
518            wal.log(&WalRecord::CreateNode {
519                id: NodeId::new(1),
520                labels: vec!["Person".to_string()],
521            })
522            .unwrap();
523
524            wal.log(&WalRecord::TransactionCommit {
525                transaction_id: TransactionId::new(1),
526            })
527            .unwrap();
528
529            wal.sync().unwrap();
530        }
531
532        // Recover using the generic method
533        let recovery = WalRecovery::new(dir.path());
534        let records: Vec<WalRecord> = recovery.recover_as().unwrap();
535
536        assert_eq!(records.len(), 2);
537
538        // Verify the records are correct via WalEntry trait methods
539        assert!(!records[0].is_commit());
540        assert!(records[1].is_commit());
541    }
542
543    #[test]
544    fn test_recovery_truncated_wal_mid_record() {
545        let dir = tempdir().unwrap();
546
547        // Write valid records first
548        {
549            let wal = WalManager::open(dir.path()).unwrap();
550            wal.log(&WalRecord::CreateNode {
551                id: NodeId::new(1),
552                labels: vec!["Person".to_string()],
553            })
554            .unwrap();
555            wal.log(&WalRecord::TransactionCommit {
556                transaction_id: TransactionId::new(1),
557            })
558            .unwrap();
559            wal.sync().unwrap();
560        }
561
562        // Find the WAL file and append a truncated record (length prefix only, no data)
563        let wal_files: Vec<_> = std::fs::read_dir(dir.path())
564            .unwrap()
565            .filter_map(|e| {
566                let e = e.ok()?;
567                if e.path().extension().is_some_and(|ext| ext == "log") {
568                    Some(e.path())
569                } else {
570                    None
571                }
572            })
573            .collect();
574        assert!(!wal_files.is_empty());
575
576        // Append a partial record: just a length prefix, then truncate
577        use std::io::Write;
578        let mut f = std::fs::OpenOptions::new()
579            .append(true)
580            .open(&wal_files[0])
581            .unwrap();
582        f.write_all(&100u32.to_le_bytes()).unwrap(); // length=100 but no data follows
583
584        // Recovery should still return the committed records (best-effort)
585        let recovery = WalRecovery::new(dir.path());
586        let records = recovery.recover().unwrap();
587        assert_eq!(
588            records.len(),
589            2,
590            "committed records before truncation should be recovered"
591        );
592    }
593
594    #[test]
595    fn test_recovery_corrupted_checksum() {
596        let dir = tempdir().unwrap();
597
598        // Write valid records
599        {
600            let wal = WalManager::open(dir.path()).unwrap();
601            wal.log(&WalRecord::CreateNode {
602                id: NodeId::new(1),
603                labels: vec!["First".to_string()],
604            })
605            .unwrap();
606            wal.log(&WalRecord::TransactionCommit {
607                transaction_id: TransactionId::new(1),
608            })
609            .unwrap();
610            wal.sync().unwrap();
611        }
612
613        // Find the WAL file and corrupt a byte in the data section
614        let wal_files: Vec<_> = std::fs::read_dir(dir.path())
615            .unwrap()
616            .filter_map(|e| {
617                let e = e.ok()?;
618                if e.path().extension().is_some_and(|ext| ext == "log") {
619                    Some(e.path())
620                } else {
621                    None
622                }
623            })
624            .collect();
625        assert!(!wal_files.is_empty());
626
627        let mut data = std::fs::read(&wal_files[0]).unwrap();
628        // Flip a byte in the middle of the data (after the 4-byte length prefix)
629        if data.len() > 8 {
630            data[6] ^= 0xFF;
631        }
632        std::fs::write(&wal_files[0], &data).unwrap();
633
634        // Recovery should handle corruption gracefully (not panic)
635        let recovery = WalRecovery::new(dir.path());
636        let result = recovery.recover();
637        // Should either succeed with fewer records or return an error
638        match result {
639            Ok(records) => {
640                // Best-effort: may have recovered 0 records due to corruption
641                assert!(records.len() <= 2);
642            }
643            Err(_) => {
644                // Also acceptable: corruption detection as error
645            }
646        }
647    }
648
649    #[test]
650    fn test_recovery_empty_wal_file() {
651        let dir = tempdir().unwrap();
652
653        // Create an empty WAL file
654        std::fs::write(dir.path().join("wal_00000000.log"), []).unwrap();
655
656        let recovery = WalRecovery::new(dir.path());
657        let records = recovery.recover().unwrap();
658        assert_eq!(records.len(), 0, "empty WAL should produce no records");
659    }
660}
661
662/// Crash injection tests for WAL recovery.
663///
664/// These tests verify that WAL recovery produces a consistent state after
665/// simulated crashes at every crash point in the write path. The three crash
666/// points are:
667/// - `wal_before_write`: before writing length prefix + data + checksum
668/// - `wal_after_write`: after writing data but before durability handling
669/// - `wal_before_flush`: before fsync on TransactionCommit in Sync mode
670///
671/// Run with:
672/// ```bash
673/// cargo test -p grafeo-adapters --features "wal,testing-crash-injection" -- crash
674/// ```
675#[cfg(all(test, feature = "testing-crash-injection"))]
676mod crash_tests {
677    use super::*;
678    use grafeo_common::types::{EpochId, NodeId, TransactionId, Value};
679    use grafeo_core::testing::crash::{CrashResult, with_crash_at};
680    use tempfile::tempdir;
681
682    /// Helper: Sync durability config so all three crash points are reachable.
683    fn sync_config() -> super::super::WalConfig {
684        super::super::WalConfig {
685            durability: super::super::DurabilityMode::Sync,
686            ..Default::default()
687        }
688    }
689
690    /// Crash at `wal_before_write`: no record bytes reach disk.
691    /// Recovery should only return previously committed data.
692    #[test]
693    fn test_crash_before_write_discards_record() {
694        let dir = tempdir().unwrap();
695        let path = dir.path().to_path_buf();
696
697        // Seed one committed transaction
698        {
699            let wal = WalManager::with_config(&path, sync_config()).unwrap();
700            wal.log(&WalRecord::CreateNode {
701                id: NodeId::new(1),
702                labels: vec!["Committed".into()],
703            })
704            .unwrap();
705            wal.log(&WalRecord::TransactionCommit {
706                transaction_id: TransactionId::new(1),
707            })
708            .unwrap();
709        }
710
711        // Crash at the first crash point (wal_before_write)
712        let p = path.clone();
713        let result = with_crash_at(1, move || {
714            let wal = WalManager::with_config(&p, sync_config()).unwrap();
715            wal.log(&WalRecord::CreateNode {
716                id: NodeId::new(2),
717                labels: vec!["Lost".into()],
718            })
719            .unwrap();
720        });
721        assert!(matches!(result, CrashResult::Crashed));
722
723        // Only the first committed tx should survive
724        let recovery = WalRecovery::new(&path);
725        let records = recovery.recover().unwrap();
726        assert_eq!(records.len(), 2, "CreateNode(1) + TransactionCommit(1)");
727    }
728
729    /// Crash at `wal_after_write`: data may be in BufWriter but no commit
730    /// marker. Recovery should discard the uncommitted record.
731    #[test]
732    fn test_crash_after_write_uncommitted_discarded() {
733        let dir = tempdir().unwrap();
734        let path = dir.path().to_path_buf();
735
736        // For a non-commit record the crash points are:
737        //   1 = wal_before_write, 2 = wal_after_write
738        let p = path.clone();
739        let result = with_crash_at(2, move || {
740            let wal = WalManager::with_config(&p, sync_config()).unwrap();
741            wal.log(&WalRecord::CreateNode {
742                id: NodeId::new(1),
743                labels: vec!["Partial".into()],
744            })
745            .unwrap();
746        });
747        assert!(matches!(result, CrashResult::Crashed));
748
749        // No committed tx ⇒ recovery returns nothing
750        let recovery = WalRecovery::new(&path);
751        let records = recovery.recover().unwrap();
752        assert_eq!(records.len(), 0, "Uncommitted records must be discarded");
753    }
754
755    /// Two committed transactions, then crash during the third.
756    /// Recovery should preserve exactly the first two.
757    #[test]
758    fn test_crash_preserves_prior_committed_transactions() {
759        let dir = tempdir().unwrap();
760        let path = dir.path().to_path_buf();
761
762        // Commit two transactions
763        {
764            let wal = WalManager::with_config(&path, sync_config()).unwrap();
765            wal.log(&WalRecord::CreateNode {
766                id: NodeId::new(1),
767                labels: vec!["T1".into()],
768            })
769            .unwrap();
770            wal.log(&WalRecord::TransactionCommit {
771                transaction_id: TransactionId::new(1),
772            })
773            .unwrap();
774            wal.log(&WalRecord::CreateNode {
775                id: NodeId::new(2),
776                labels: vec!["T2".into()],
777            })
778            .unwrap();
779            wal.log(&WalRecord::TransactionCommit {
780                transaction_id: TransactionId::new(2),
781            })
782            .unwrap();
783        }
784
785        // Third transaction crashes immediately
786        let p = path.clone();
787        let result = with_crash_at(1, move || {
788            let wal = WalManager::with_config(&p, sync_config()).unwrap();
789            wal.log(&WalRecord::CreateNode {
790                id: NodeId::new(3),
791                labels: vec!["T3".into()],
792            })
793            .unwrap();
794        });
795        assert!(matches!(result, CrashResult::Crashed));
796
797        // Both committed txs intact, third discarded
798        let recovery = WalRecovery::new(&path);
799        let records = recovery.recover().unwrap();
800        assert_eq!(records.len(), 4, "2 CreateNode + 2 TransactionCommit");
801    }
802
803    /// Crash during checkpoint: committed data must still be recoverable.
804    #[test]
805    fn test_crash_during_checkpoint_preserves_data() {
806        for crash_at in 1..15 {
807            let dir = tempdir().unwrap();
808            let path = dir.path().to_path_buf();
809
810            // Seed committed data
811            {
812                let wal = WalManager::with_config(&path, sync_config()).unwrap();
813                wal.log(&WalRecord::CreateNode {
814                    id: NodeId::new(1),
815                    labels: vec!["A".into()],
816                })
817                .unwrap();
818                wal.log(&WalRecord::TransactionCommit {
819                    transaction_id: TransactionId::new(1),
820                })
821                .unwrap();
822            }
823
824            // Crash during checkpoint
825            let p = path.clone();
826            let _result = with_crash_at(crash_at, move || {
827                let wal = WalManager::with_config(&p, sync_config()).unwrap();
828                wal.checkpoint(TransactionId::new(1), EpochId::new(10))
829                    .unwrap();
830            });
831
832            // Committed data must survive regardless of checkpoint outcome
833            let recovery = WalRecovery::new(&path);
834            let records = recovery.recover().unwrap();
835            assert!(
836                !records.is_empty(),
837                "crash_at={crash_at}: committed data must survive checkpoint crash"
838            );
839        }
840    }
841
842    /// Crash with rotated log files: recovery should span all files.
843    #[test]
844    fn test_crash_with_log_rotation() {
845        let dir = tempdir().unwrap();
846        let path = dir.path().to_path_buf();
847
848        // Write enough to trigger rotation
849        {
850            let config = super::super::WalConfig {
851                durability: super::super::DurabilityMode::Sync,
852                max_log_size: 100, // force rotation
853                ..Default::default()
854            };
855            let wal = WalManager::with_config(&path, config).unwrap();
856            for i in 0..5 {
857                wal.log(&WalRecord::CreateNode {
858                    id: NodeId::new(i),
859                    labels: vec!["Rotated".into()],
860                })
861                .unwrap();
862            }
863            wal.log(&WalRecord::TransactionCommit {
864                transaction_id: TransactionId::new(1),
865            })
866            .unwrap();
867        }
868
869        // Crash during additional write
870        let p = path.clone();
871        let result = with_crash_at(1, move || {
872            let config = super::super::WalConfig {
873                durability: super::super::DurabilityMode::Sync,
874                max_log_size: 100,
875                ..Default::default()
876            };
877            let wal = WalManager::with_config(&p, config).unwrap();
878            wal.log(&WalRecord::CreateNode {
879                id: NodeId::new(99),
880                labels: vec!["Crash".into()],
881            })
882            .unwrap();
883        });
884        assert!(matches!(result, CrashResult::Crashed));
885
886        // All committed data across rotated files should survive
887        let recovery = WalRecovery::new(&path);
888        let records = recovery.recover().unwrap();
889        assert_eq!(records.len(), 6, "5 CreateNode + 1 TransactionCommit");
890    }
891
892    /// Exhaustive sweep: crash at every possible point during a multi-record
893    /// transaction and verify recovery invariants.
894    ///
895    /// Invariants checked:
896    /// 1. Previously committed transactions always survive
897    /// 2. Recovery output never contains partial (uncommitted) transactions
898    #[test]
899    fn test_crash_sweep_all_points() {
900        for crash_at in 1..20 {
901            let dir = tempdir().unwrap();
902            let path = dir.path().to_path_buf();
903
904            // Seed one committed transaction
905            {
906                let wal = WalManager::with_config(&path, sync_config()).unwrap();
907                wal.log(&WalRecord::CreateNode {
908                    id: NodeId::new(1),
909                    labels: vec!["Base".into()],
910                })
911                .unwrap();
912                wal.log(&WalRecord::TransactionCommit {
913                    transaction_id: TransactionId::new(1),
914                })
915                .unwrap();
916            }
917
918            // Attempt a second transaction with crash injection
919            let p = path.clone();
920            let result = with_crash_at(crash_at, move || {
921                let wal = WalManager::with_config(&p, sync_config()).unwrap();
922                wal.log(&WalRecord::CreateNode {
923                    id: NodeId::new(100),
924                    labels: vec!["New".into()],
925                })
926                .unwrap();
927                wal.log(&WalRecord::SetNodeProperty {
928                    id: NodeId::new(100),
929                    key: "name".into(),
930                    value: Value::String("test".into()),
931                })
932                .unwrap();
933                wal.log(&WalRecord::TransactionCommit {
934                    transaction_id: TransactionId::new(2),
935                })
936                .unwrap();
937            });
938
939            // Verify recovery invariants
940            let recovery = WalRecovery::new(&path);
941            let records = recovery.recover().unwrap();
942
943            // Invariant 1: base committed tx always survives
944            assert!(
945                records.len() >= 2,
946                "crash_at={crash_at}: base tx must survive, got {} records",
947                records.len()
948            );
949
950            // Invariant 2: no partial transactions in output
951            let mut pending = 0usize;
952            for record in &records {
953                match record {
954                    WalRecord::TransactionCommit { .. }
955                    | WalRecord::TransactionAbort { .. }
956                    | WalRecord::Checkpoint { .. } => pending = 0,
957                    _ => pending += 1,
958                }
959            }
960            assert_eq!(
961                pending, 0,
962                "crash_at={crash_at}: recovery must not output partial transactions"
963            );
964
965            // If the operation completed, the second tx should also be present
966            if matches!(result, CrashResult::Completed(())) {
967                assert!(
968                    records.len() >= 5,
969                    "crash_at={crash_at}: completed run should include second tx"
970                );
971            }
972        }
973    }
974
975    /// Aborted transactions are not recovered even without a crash.
976    /// Verifies that TransactionAbort correctly discards pending records.
977    #[test]
978    fn test_abort_then_crash_discards_aborted_tx() {
979        let dir = tempdir().unwrap();
980        let path = dir.path().to_path_buf();
981
982        {
983            let wal = WalManager::with_config(&path, sync_config()).unwrap();
984            // Committed tx
985            wal.log(&WalRecord::CreateNode {
986                id: NodeId::new(1),
987                labels: vec!["Keep".into()],
988            })
989            .unwrap();
990            wal.log(&WalRecord::TransactionCommit {
991                transaction_id: TransactionId::new(1),
992            })
993            .unwrap();
994            // Aborted tx
995            wal.log(&WalRecord::CreateNode {
996                id: NodeId::new(2),
997                labels: vec!["Discard".into()],
998            })
999            .unwrap();
1000            wal.log(&WalRecord::TransactionAbort {
1001                transaction_id: TransactionId::new(2),
1002            })
1003            .unwrap();
1004        }
1005
1006        // Crash during a third transaction
1007        let p = path.clone();
1008        let result = with_crash_at(1, move || {
1009            let wal = WalManager::with_config(&p, sync_config()).unwrap();
1010            wal.log(&WalRecord::CreateNode {
1011                id: NodeId::new(3),
1012                labels: vec!["Also lost".into()],
1013            })
1014            .unwrap();
1015        });
1016        assert!(matches!(result, CrashResult::Crashed));
1017
1018        let recovery = WalRecovery::new(&path);
1019        let records = recovery.recover().unwrap();
1020        // Only the committed tx (2 records)
1021        assert_eq!(
1022            records.len(),
1023            2,
1024            "Aborted + crashed records should both be discarded"
1025        );
1026    }
1027}