1use super::record::WalEntry;
4use super::{CheckpointMetadata, WalManager, WalRecord};
5use grafeo_common::utils::error::{Error, Result, StorageError};
6use std::fs::File;
7use std::io::{BufReader, Read};
8use std::path::Path;
9
10const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
12
13pub struct WalRecovery {
15 dir: std::path::PathBuf,
17}
18
19impl WalRecovery {
20 pub fn new(dir: impl AsRef<Path>) -> Self {
22 Self {
23 dir: dir.as_ref().to_path_buf(),
24 }
25 }
26
27 #[must_use]
29 pub fn from_wal(wal: &WalManager) -> Self {
30 Self {
31 dir: wal.dir().to_path_buf(),
32 }
33 }
34
35 pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
39 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
40
41 if !metadata_path.exists() {
42 return Ok(None);
43 }
44
45 let file = File::open(&metadata_path)?;
46 let mut reader = BufReader::new(file);
47 let mut data = Vec::new();
48 reader.read_to_end(&mut data)?;
49
50 let (metadata, _): (CheckpointMetadata, _) =
51 bincode::serde::decode_from_slice(&data, bincode::config::standard())
52 .map_err(|e| Error::Serialization(e.to_string()))?;
53
54 Ok(Some(metadata))
55 }
56
57 #[must_use]
62 pub fn checkpoint(&self) -> Option<CheckpointMetadata> {
63 self.read_checkpoint_metadata().ok().flatten()
64 }
65
66 pub fn recover(&self) -> Result<Vec<WalRecord>> {
76 self.recover_as::<WalRecord>()
77 }
78
79 pub fn recover_as<R: WalEntry>(&self) -> Result<Vec<R>> {
88 let checkpoint = self.read_checkpoint_metadata()?;
89 self.recover_internal_as::<R>(checkpoint)
90 }
91
92 pub fn recover_from_checkpoint(
101 &self,
102 checkpoint: Option<&CheckpointMetadata>,
103 ) -> Result<Vec<WalRecord>> {
104 self.recover_internal_as::<WalRecord>(checkpoint.cloned())
105 }
106
107 fn recover_internal_as<R: WalEntry>(
108 &self,
109 checkpoint: Option<CheckpointMetadata>,
110 ) -> Result<Vec<R>> {
111 let mut current_tx_records = Vec::new();
112 let mut committed_records = Vec::new();
113
114 let log_files = self.get_log_files()?;
116
117 let min_sequence = checkpoint.as_ref().map_or(0, |cp| cp.log_sequence);
119
120 if checkpoint.is_some() {
121 tracing::info!(
122 "Recovering from checkpoint at epoch {:?}, starting from log sequence {}",
123 checkpoint.as_ref().map(|c| c.epoch),
124 min_sequence
125 );
126 }
127
128 for log_file in log_files {
130 let sequence = Self::sequence_from_path(&log_file).unwrap_or(0);
132
133 if sequence < min_sequence {
137 tracing::debug!(
138 "Skipping log file {:?} (sequence {} < checkpoint {})",
139 log_file,
140 sequence,
141 min_sequence
142 );
143 continue;
144 }
145
146 let file = match File::open(&log_file) {
147 Ok(f) => f,
148 Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
149 Err(e) => return Err(e.into()),
150 };
151 let mut reader = BufReader::new(file);
152
153 loop {
155 match self.read_record_as::<R>(&mut reader) {
156 Ok(Some(record)) => {
157 if record.is_commit() {
158 committed_records.append(&mut current_tx_records);
159 committed_records.push(record);
160 } else if record.is_abort() {
161 current_tx_records.clear();
162 } else if record.is_checkpoint() {
163 current_tx_records.clear();
164 committed_records.push(record);
165 } else {
166 current_tx_records.push(record);
167 }
168 }
169 Ok(None) => break, Err(e) => {
171 tracing::warn!("WAL corruption detected in {:?}: {}", log_file, e);
174 break;
175 }
176 }
177 }
178 }
179
180 Ok(committed_records)
183 }
184
185 fn sequence_from_path(path: &Path) -> Option<u64> {
187 path.file_stem()
188 .and_then(|s| s.to_str())
189 .and_then(|s| s.strip_prefix("wal_"))
190 .and_then(|s| s.parse().ok())
191 }
192
193 pub fn recover_file(&self, path: impl AsRef<Path>) -> Result<Vec<WalRecord>> {
199 self.recover_file_as::<WalRecord>(path)
200 }
201
202 pub fn recover_file_as<R: WalEntry>(&self, path: impl AsRef<Path>) -> Result<Vec<R>> {
208 let file = File::open(path.as_ref())?;
209 let mut reader = BufReader::new(file);
210
211 let mut current_tx_records = Vec::new();
212 let mut committed_records = Vec::new();
213
214 loop {
215 match self.read_record_as::<R>(&mut reader) {
216 Ok(Some(record)) => {
217 if record.is_commit() {
218 committed_records.append(&mut current_tx_records);
219 committed_records.push(record);
220 } else if record.is_abort() {
221 current_tx_records.clear();
222 } else {
223 current_tx_records.push(record);
224 }
225 }
226 Ok(None) => break,
227 Err(e) => {
228 tracing::warn!("WAL corruption detected: {}", e);
229 break;
230 }
231 }
232 }
233
234 Ok(committed_records)
235 }
236
237 fn get_log_files(&self) -> Result<Vec<std::path::PathBuf>> {
238 let mut files = Vec::new();
239
240 if !self.dir.exists() {
241 return Ok(files);
242 }
243
244 if let Ok(entries) = std::fs::read_dir(&self.dir) {
245 for entry in entries.flatten() {
246 let path = entry.path();
247 if path.extension().is_some_and(|ext| ext == "log") {
248 files.push(path);
249 }
250 }
251 }
252
253 files.sort();
255
256 Ok(files)
257 }
258
259 fn read_record_as<R: WalEntry>(&self, reader: &mut BufReader<File>) -> Result<Option<R>> {
260 let mut len_buf = [0u8; 4];
262 match reader.read_exact(&mut len_buf) {
263 Ok(()) => {}
264 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
265 Err(e) => return Err(e.into()),
266 }
267 let len = u32::from_le_bytes(len_buf) as usize;
268
269 let mut data = vec![0u8; len];
271 reader.read_exact(&mut data)?;
272
273 let mut checksum_buf = [0u8; 4];
275 reader.read_exact(&mut checksum_buf)?;
276 let stored_checksum = u32::from_le_bytes(checksum_buf);
277 let computed_checksum = crc32fast::hash(&data);
278
279 if stored_checksum != computed_checksum {
280 return Err(Error::Storage(StorageError::Corruption(
281 "WAL checksum mismatch".to_string(),
282 )));
283 }
284
285 let (record, _): (R, _) =
287 bincode::serde::decode_from_slice(&data, bincode::config::standard())
288 .map_err(|e| Error::Serialization(e.to_string()))?;
289
290 Ok(Some(record))
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use grafeo_common::types::{NodeId, TransactionId};
298 use tempfile::tempdir;
299
300 #[test]
301 fn test_recovery_committed() {
302 let dir = tempdir().unwrap();
303
304 {
306 let wal = WalManager::open(dir.path()).unwrap();
307
308 wal.log(&WalRecord::CreateNode {
309 id: NodeId::new(1),
310 labels: vec!["Person".to_string()],
311 })
312 .unwrap();
313
314 wal.log(&WalRecord::TransactionCommit {
315 transaction_id: TransactionId::new(1),
316 })
317 .unwrap();
318
319 wal.sync().unwrap();
320 }
321
322 let recovery = WalRecovery::new(dir.path());
324 let records = recovery.recover().unwrap();
325
326 assert_eq!(records.len(), 2);
327 }
328
329 #[test]
330 fn test_recovery_uncommitted() {
331 let dir = tempdir().unwrap();
332
333 {
335 let wal = WalManager::open(dir.path()).unwrap();
336
337 wal.log(&WalRecord::CreateNode {
338 id: NodeId::new(1),
339 labels: vec!["Person".to_string()],
340 })
341 .unwrap();
342
343 wal.sync().unwrap();
345 }
346
347 let recovery = WalRecovery::new(dir.path());
349 let records = recovery.recover().unwrap();
350
351 assert_eq!(records.len(), 0);
353 }
354
355 #[test]
356 fn test_recovery_multiple_files() {
357 let dir = tempdir().unwrap();
358
359 {
361 let config = super::super::WalConfig {
362 max_log_size: 100, ..Default::default()
364 };
365 let wal = WalManager::with_config(dir.path(), config).unwrap();
366
367 for i in 0..5 {
369 wal.log(&WalRecord::CreateNode {
370 id: NodeId::new(i),
371 labels: vec!["Test".to_string()],
372 })
373 .unwrap();
374 }
375 wal.log(&WalRecord::TransactionCommit {
376 transaction_id: TransactionId::new(1),
377 })
378 .unwrap();
379
380 for i in 5..10 {
382 wal.log(&WalRecord::CreateNode {
383 id: NodeId::new(i),
384 labels: vec!["Test".to_string()],
385 })
386 .unwrap();
387 }
388 wal.log(&WalRecord::TransactionCommit {
389 transaction_id: TransactionId::new(2),
390 })
391 .unwrap();
392
393 wal.sync().unwrap();
394 }
395
396 let recovery = WalRecovery::new(dir.path());
398 let records = recovery.recover().unwrap();
399
400 assert_eq!(records.len(), 12);
402 }
403
404 #[test]
405 fn test_checkpoint_metadata() {
406 use grafeo_common::types::EpochId;
407
408 let dir = tempdir().unwrap();
409
410 {
412 let wal = WalManager::open(dir.path()).unwrap();
413
414 wal.log(&WalRecord::CreateNode {
416 id: NodeId::new(1),
417 labels: vec!["Test".to_string()],
418 })
419 .unwrap();
420 wal.log(&WalRecord::TransactionCommit {
421 transaction_id: TransactionId::new(1),
422 })
423 .unwrap();
424
425 wal.checkpoint(TransactionId::new(1), EpochId::new(10))
427 .unwrap();
428
429 wal.log(&WalRecord::CreateNode {
431 id: NodeId::new(2),
432 labels: vec!["Test".to_string()],
433 })
434 .unwrap();
435 wal.log(&WalRecord::TransactionCommit {
436 transaction_id: TransactionId::new(2),
437 })
438 .unwrap();
439
440 wal.sync().unwrap();
441 }
442
443 let recovery = WalRecovery::new(dir.path());
445 let checkpoint = recovery.checkpoint();
446 assert!(checkpoint.is_some(), "Checkpoint metadata should exist");
447
448 let cp = checkpoint.unwrap();
449 assert_eq!(cp.epoch.as_u64(), 10);
450 assert_eq!(cp.transaction_id.as_u64(), 1);
451 }
452
453 #[test]
454 fn test_recovery_from_checkpoint() {
455 use super::super::WalConfig;
456 use grafeo_common::types::EpochId;
457
458 let dir = tempdir().unwrap();
459
460 {
462 let config = WalConfig {
463 max_log_size: 100, ..Default::default()
465 };
466 let wal = WalManager::with_config(dir.path(), config).unwrap();
467
468 for i in 0..5 {
470 wal.log(&WalRecord::CreateNode {
471 id: NodeId::new(i),
472 labels: vec!["Before".to_string()],
473 })
474 .unwrap();
475 }
476 wal.log(&WalRecord::TransactionCommit {
477 transaction_id: TransactionId::new(1),
478 })
479 .unwrap();
480
481 wal.checkpoint(TransactionId::new(1), EpochId::new(100))
483 .unwrap();
484
485 for i in 100..103 {
487 wal.log(&WalRecord::CreateNode {
488 id: NodeId::new(i),
489 labels: vec!["After".to_string()],
490 })
491 .unwrap();
492 }
493 wal.log(&WalRecord::TransactionCommit {
494 transaction_id: TransactionId::new(2),
495 })
496 .unwrap();
497
498 wal.sync().unwrap();
499 }
500
501 let recovery = WalRecovery::new(dir.path());
503 let records = recovery.recover().unwrap();
504
505 assert!(!records.is_empty(), "Should recover some records");
508 }
509
510 #[test]
511 fn test_recover_as_generic() {
512 let dir = tempdir().unwrap();
513
514 {
516 let wal = WalManager::open(dir.path()).unwrap();
517
518 wal.log(&WalRecord::CreateNode {
519 id: NodeId::new(1),
520 labels: vec!["Person".to_string()],
521 })
522 .unwrap();
523
524 wal.log(&WalRecord::TransactionCommit {
525 transaction_id: TransactionId::new(1),
526 })
527 .unwrap();
528
529 wal.sync().unwrap();
530 }
531
532 let recovery = WalRecovery::new(dir.path());
534 let records: Vec<WalRecord> = recovery.recover_as().unwrap();
535
536 assert_eq!(records.len(), 2);
537
538 assert!(!records[0].is_commit());
540 assert!(records[1].is_commit());
541 }
542
543 #[test]
544 fn test_recovery_truncated_wal_mid_record() {
545 let dir = tempdir().unwrap();
546
547 {
549 let wal = WalManager::open(dir.path()).unwrap();
550 wal.log(&WalRecord::CreateNode {
551 id: NodeId::new(1),
552 labels: vec!["Person".to_string()],
553 })
554 .unwrap();
555 wal.log(&WalRecord::TransactionCommit {
556 transaction_id: TransactionId::new(1),
557 })
558 .unwrap();
559 wal.sync().unwrap();
560 }
561
562 let wal_files: Vec<_> = std::fs::read_dir(dir.path())
564 .unwrap()
565 .filter_map(|e| {
566 let e = e.ok()?;
567 if e.path().extension().is_some_and(|ext| ext == "log") {
568 Some(e.path())
569 } else {
570 None
571 }
572 })
573 .collect();
574 assert!(!wal_files.is_empty());
575
576 use std::io::Write;
578 let mut f = std::fs::OpenOptions::new()
579 .append(true)
580 .open(&wal_files[0])
581 .unwrap();
582 f.write_all(&100u32.to_le_bytes()).unwrap(); let recovery = WalRecovery::new(dir.path());
586 let records = recovery.recover().unwrap();
587 assert_eq!(
588 records.len(),
589 2,
590 "committed records before truncation should be recovered"
591 );
592 }
593
594 #[test]
595 fn test_recovery_corrupted_checksum() {
596 let dir = tempdir().unwrap();
597
598 {
600 let wal = WalManager::open(dir.path()).unwrap();
601 wal.log(&WalRecord::CreateNode {
602 id: NodeId::new(1),
603 labels: vec!["First".to_string()],
604 })
605 .unwrap();
606 wal.log(&WalRecord::TransactionCommit {
607 transaction_id: TransactionId::new(1),
608 })
609 .unwrap();
610 wal.sync().unwrap();
611 }
612
613 let wal_files: Vec<_> = std::fs::read_dir(dir.path())
615 .unwrap()
616 .filter_map(|e| {
617 let e = e.ok()?;
618 if e.path().extension().is_some_and(|ext| ext == "log") {
619 Some(e.path())
620 } else {
621 None
622 }
623 })
624 .collect();
625 assert!(!wal_files.is_empty());
626
627 let mut data = std::fs::read(&wal_files[0]).unwrap();
628 if data.len() > 8 {
630 data[6] ^= 0xFF;
631 }
632 std::fs::write(&wal_files[0], &data).unwrap();
633
634 let recovery = WalRecovery::new(dir.path());
636 let result = recovery.recover();
637 match result {
639 Ok(records) => {
640 assert!(records.len() <= 2);
642 }
643 Err(_) => {
644 }
646 }
647 }
648
649 #[test]
650 fn test_recovery_empty_wal_file() {
651 let dir = tempdir().unwrap();
652
653 std::fs::write(dir.path().join("wal_00000000.log"), []).unwrap();
655
656 let recovery = WalRecovery::new(dir.path());
657 let records = recovery.recover().unwrap();
658 assert_eq!(records.len(), 0, "empty WAL should produce no records");
659 }
660}
661
662#[cfg(all(test, feature = "testing-crash-injection"))]
676mod crash_tests {
677 use super::*;
678 use grafeo_common::types::{EpochId, NodeId, TransactionId, Value};
679 use grafeo_core::testing::crash::{CrashResult, with_crash_at};
680 use tempfile::tempdir;
681
682 fn sync_config() -> super::super::WalConfig {
684 super::super::WalConfig {
685 durability: super::super::DurabilityMode::Sync,
686 ..Default::default()
687 }
688 }
689
690 #[test]
693 fn test_crash_before_write_discards_record() {
694 let dir = tempdir().unwrap();
695 let path = dir.path().to_path_buf();
696
697 {
699 let wal = WalManager::with_config(&path, sync_config()).unwrap();
700 wal.log(&WalRecord::CreateNode {
701 id: NodeId::new(1),
702 labels: vec!["Committed".into()],
703 })
704 .unwrap();
705 wal.log(&WalRecord::TransactionCommit {
706 transaction_id: TransactionId::new(1),
707 })
708 .unwrap();
709 }
710
711 let p = path.clone();
713 let result = with_crash_at(1, move || {
714 let wal = WalManager::with_config(&p, sync_config()).unwrap();
715 wal.log(&WalRecord::CreateNode {
716 id: NodeId::new(2),
717 labels: vec!["Lost".into()],
718 })
719 .unwrap();
720 });
721 assert!(matches!(result, CrashResult::Crashed));
722
723 let recovery = WalRecovery::new(&path);
725 let records = recovery.recover().unwrap();
726 assert_eq!(records.len(), 2, "CreateNode(1) + TransactionCommit(1)");
727 }
728
729 #[test]
732 fn test_crash_after_write_uncommitted_discarded() {
733 let dir = tempdir().unwrap();
734 let path = dir.path().to_path_buf();
735
736 let p = path.clone();
739 let result = with_crash_at(2, move || {
740 let wal = WalManager::with_config(&p, sync_config()).unwrap();
741 wal.log(&WalRecord::CreateNode {
742 id: NodeId::new(1),
743 labels: vec!["Partial".into()],
744 })
745 .unwrap();
746 });
747 assert!(matches!(result, CrashResult::Crashed));
748
749 let recovery = WalRecovery::new(&path);
751 let records = recovery.recover().unwrap();
752 assert_eq!(records.len(), 0, "Uncommitted records must be discarded");
753 }
754
755 #[test]
758 fn test_crash_preserves_prior_committed_transactions() {
759 let dir = tempdir().unwrap();
760 let path = dir.path().to_path_buf();
761
762 {
764 let wal = WalManager::with_config(&path, sync_config()).unwrap();
765 wal.log(&WalRecord::CreateNode {
766 id: NodeId::new(1),
767 labels: vec!["T1".into()],
768 })
769 .unwrap();
770 wal.log(&WalRecord::TransactionCommit {
771 transaction_id: TransactionId::new(1),
772 })
773 .unwrap();
774 wal.log(&WalRecord::CreateNode {
775 id: NodeId::new(2),
776 labels: vec!["T2".into()],
777 })
778 .unwrap();
779 wal.log(&WalRecord::TransactionCommit {
780 transaction_id: TransactionId::new(2),
781 })
782 .unwrap();
783 }
784
785 let p = path.clone();
787 let result = with_crash_at(1, move || {
788 let wal = WalManager::with_config(&p, sync_config()).unwrap();
789 wal.log(&WalRecord::CreateNode {
790 id: NodeId::new(3),
791 labels: vec!["T3".into()],
792 })
793 .unwrap();
794 });
795 assert!(matches!(result, CrashResult::Crashed));
796
797 let recovery = WalRecovery::new(&path);
799 let records = recovery.recover().unwrap();
800 assert_eq!(records.len(), 4, "2 CreateNode + 2 TransactionCommit");
801 }
802
803 #[test]
805 fn test_crash_during_checkpoint_preserves_data() {
806 for crash_at in 1..15 {
807 let dir = tempdir().unwrap();
808 let path = dir.path().to_path_buf();
809
810 {
812 let wal = WalManager::with_config(&path, sync_config()).unwrap();
813 wal.log(&WalRecord::CreateNode {
814 id: NodeId::new(1),
815 labels: vec!["A".into()],
816 })
817 .unwrap();
818 wal.log(&WalRecord::TransactionCommit {
819 transaction_id: TransactionId::new(1),
820 })
821 .unwrap();
822 }
823
824 let p = path.clone();
826 let _result = with_crash_at(crash_at, move || {
827 let wal = WalManager::with_config(&p, sync_config()).unwrap();
828 wal.checkpoint(TransactionId::new(1), EpochId::new(10))
829 .unwrap();
830 });
831
832 let recovery = WalRecovery::new(&path);
834 let records = recovery.recover().unwrap();
835 assert!(
836 !records.is_empty(),
837 "crash_at={crash_at}: committed data must survive checkpoint crash"
838 );
839 }
840 }
841
842 #[test]
844 fn test_crash_with_log_rotation() {
845 let dir = tempdir().unwrap();
846 let path = dir.path().to_path_buf();
847
848 {
850 let config = super::super::WalConfig {
851 durability: super::super::DurabilityMode::Sync,
852 max_log_size: 100, ..Default::default()
854 };
855 let wal = WalManager::with_config(&path, config).unwrap();
856 for i in 0..5 {
857 wal.log(&WalRecord::CreateNode {
858 id: NodeId::new(i),
859 labels: vec!["Rotated".into()],
860 })
861 .unwrap();
862 }
863 wal.log(&WalRecord::TransactionCommit {
864 transaction_id: TransactionId::new(1),
865 })
866 .unwrap();
867 }
868
869 let p = path.clone();
871 let result = with_crash_at(1, move || {
872 let config = super::super::WalConfig {
873 durability: super::super::DurabilityMode::Sync,
874 max_log_size: 100,
875 ..Default::default()
876 };
877 let wal = WalManager::with_config(&p, config).unwrap();
878 wal.log(&WalRecord::CreateNode {
879 id: NodeId::new(99),
880 labels: vec!["Crash".into()],
881 })
882 .unwrap();
883 });
884 assert!(matches!(result, CrashResult::Crashed));
885
886 let recovery = WalRecovery::new(&path);
888 let records = recovery.recover().unwrap();
889 assert_eq!(records.len(), 6, "5 CreateNode + 1 TransactionCommit");
890 }
891
892 #[test]
899 fn test_crash_sweep_all_points() {
900 for crash_at in 1..20 {
901 let dir = tempdir().unwrap();
902 let path = dir.path().to_path_buf();
903
904 {
906 let wal = WalManager::with_config(&path, sync_config()).unwrap();
907 wal.log(&WalRecord::CreateNode {
908 id: NodeId::new(1),
909 labels: vec!["Base".into()],
910 })
911 .unwrap();
912 wal.log(&WalRecord::TransactionCommit {
913 transaction_id: TransactionId::new(1),
914 })
915 .unwrap();
916 }
917
918 let p = path.clone();
920 let result = with_crash_at(crash_at, move || {
921 let wal = WalManager::with_config(&p, sync_config()).unwrap();
922 wal.log(&WalRecord::CreateNode {
923 id: NodeId::new(100),
924 labels: vec!["New".into()],
925 })
926 .unwrap();
927 wal.log(&WalRecord::SetNodeProperty {
928 id: NodeId::new(100),
929 key: "name".into(),
930 value: Value::String("test".into()),
931 })
932 .unwrap();
933 wal.log(&WalRecord::TransactionCommit {
934 transaction_id: TransactionId::new(2),
935 })
936 .unwrap();
937 });
938
939 let recovery = WalRecovery::new(&path);
941 let records = recovery.recover().unwrap();
942
943 assert!(
945 records.len() >= 2,
946 "crash_at={crash_at}: base tx must survive, got {} records",
947 records.len()
948 );
949
950 let mut pending = 0usize;
952 for record in &records {
953 match record {
954 WalRecord::TransactionCommit { .. }
955 | WalRecord::TransactionAbort { .. }
956 | WalRecord::Checkpoint { .. } => pending = 0,
957 _ => pending += 1,
958 }
959 }
960 assert_eq!(
961 pending, 0,
962 "crash_at={crash_at}: recovery must not output partial transactions"
963 );
964
965 if matches!(result, CrashResult::Completed(())) {
967 assert!(
968 records.len() >= 5,
969 "crash_at={crash_at}: completed run should include second tx"
970 );
971 }
972 }
973 }
974
975 #[test]
978 fn test_abort_then_crash_discards_aborted_tx() {
979 let dir = tempdir().unwrap();
980 let path = dir.path().to_path_buf();
981
982 {
983 let wal = WalManager::with_config(&path, sync_config()).unwrap();
984 wal.log(&WalRecord::CreateNode {
986 id: NodeId::new(1),
987 labels: vec!["Keep".into()],
988 })
989 .unwrap();
990 wal.log(&WalRecord::TransactionCommit {
991 transaction_id: TransactionId::new(1),
992 })
993 .unwrap();
994 wal.log(&WalRecord::CreateNode {
996 id: NodeId::new(2),
997 labels: vec!["Discard".into()],
998 })
999 .unwrap();
1000 wal.log(&WalRecord::TransactionAbort {
1001 transaction_id: TransactionId::new(2),
1002 })
1003 .unwrap();
1004 }
1005
1006 let p = path.clone();
1008 let result = with_crash_at(1, move || {
1009 let wal = WalManager::with_config(&p, sync_config()).unwrap();
1010 wal.log(&WalRecord::CreateNode {
1011 id: NodeId::new(3),
1012 labels: vec!["Also lost".into()],
1013 })
1014 .unwrap();
1015 });
1016 assert!(matches!(result, CrashResult::Crashed));
1017
1018 let recovery = WalRecovery::new(&path);
1019 let records = recovery.recover().unwrap();
1020 assert_eq!(
1022 records.len(),
1023 2,
1024 "Aborted + crashed records should both be discarded"
1025 );
1026 }
1027}