1use crate::error::{RaftError, RaftResult};
44use crate::log::{Command, LogEntry};
45use crate::types::LogIndex;
46
47use std::fs::{self, File, OpenOptions};
48use std::io::{Read, Write};
49use std::path::{Path, PathBuf};
50
51const WAL_MAGIC: u32 = 0x57414C32;
57
58const WAL_MAGIC_V1: u32 = 0x57414C31;
60
61const WAL_VERSION: u32 = 2;
63
64const WAL_VERSION_V1: u32 = 1;
66
67const SEGMENT_HEADER_SIZE: usize = 12;
69
70const DEFAULT_MAX_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
72
73#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum SyncMode {
80 EveryWrite,
82 Batched(usize),
84 OsManaged,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CorruptionPolicy {
95 AlertAndContinue,
98 TruncateToLastGood,
102 RefuseStart,
105}
106
107#[derive(Debug, Clone, Default, PartialEq, Eq)]
113pub struct WalDiagnostics {
114 pub valid_entries: u64,
116 pub corrupt_entries: u64,
118 pub truncated_segments: u64,
120 pub recovered_bytes: u64,
122}
123
124struct SegmentHeader {
130 magic: u32,
131 version: u32,
132 segment_id: u32,
133}
134
135impl SegmentHeader {
136 fn new(segment_id: u32) -> Self {
137 Self {
138 magic: WAL_MAGIC,
139 version: WAL_VERSION,
140 segment_id,
141 }
142 }
143
144 fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
145 let mut buf = [0u8; SEGMENT_HEADER_SIZE];
146 buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
147 buf[4..8].copy_from_slice(&self.version.to_le_bytes());
148 buf[8..12].copy_from_slice(&self.segment_id.to_le_bytes());
149 buf
150 }
151
152 fn decode(data: &[u8]) -> RaftResult<Self> {
153 if data.len() < SEGMENT_HEADER_SIZE {
154 return Err(RaftError::StorageError {
155 message: "segment header too short".to_string(),
156 });
157 }
158 let magic = u32::from_le_bytes(read_4(data, 0)?);
159 let version = u32::from_le_bytes(read_4(data, 4)?);
160 let segment_id = u32::from_le_bytes(read_4(data, 8)?);
161
162 let accepted = (magic == WAL_MAGIC && version == WAL_VERSION)
164 || (magic == WAL_MAGIC_V1 && version == WAL_VERSION_V1);
165 if !accepted {
166 return Err(RaftError::StorageError {
167 message: format!(
168 "bad WAL header: magic={magic:#010x}, version={version} \
169 (expected WAL2/v2 or WAL1/v1)"
170 ),
171 });
172 }
173 Ok(Self {
174 magic,
175 version,
176 segment_id,
177 })
178 }
179
180 fn is_v1(&self) -> bool {
182 self.version == WAL_VERSION_V1
183 }
184}
185
186pub struct WalWriter {
197 dir: PathBuf,
198 current_segment: Option<File>,
199 current_segment_id: u32,
200 current_segment_size: u64,
201 max_segment_size: u64,
202 sync_mode: SyncMode,
203 writes_since_sync: usize,
204}
205
206impl WalWriter {
207 pub fn new(dir: &Path, sync_mode: SyncMode, max_segment_size: u64) -> RaftResult<Self> {
212 fs::create_dir_all(dir).map_err(|e| RaftError::StorageError {
213 message: format!("failed to create WAL dir {}: {e}", dir.display()),
214 })?;
215
216 let max_segment_size = if max_segment_size == 0 {
217 DEFAULT_MAX_SEGMENT_SIZE
218 } else {
219 max_segment_size
220 };
221
222 let mut writer = Self {
223 dir: dir.to_path_buf(),
224 current_segment: None,
225 current_segment_id: 0,
226 current_segment_size: 0,
227 max_segment_size,
228 sync_mode,
229 writes_since_sync: 0,
230 };
231
232 let existing = list_segments(dir)?;
234 if let Some(&last_id) = existing.last() {
235 writer.current_segment_id = last_id;
236 let path = segment_path(dir, last_id);
237 let meta = fs::metadata(&path).map_err(|e| RaftError::StorageError {
238 message: format!("failed to stat segment {}: {e}", path.display()),
239 })?;
240 writer.current_segment_size = meta.len();
241 let file = OpenOptions::new().append(true).open(&path).map_err(|e| {
242 RaftError::StorageError {
243 message: format!("failed to open segment {}: {e}", path.display()),
244 }
245 })?;
246 writer.current_segment = Some(file);
247 }
248
249 Ok(writer)
250 }
251
252 pub fn append(&mut self, entry: &LogEntry) -> RaftResult<()> {
258 let encoded = encode_entry(entry);
259 let encoded_len = encoded.len() as u64;
260
261 if self.current_segment.is_some()
263 && self.current_segment_size + encoded_len > self.max_segment_size
264 && self.current_segment_size > SEGMENT_HEADER_SIZE as u64
265 {
266 self.rotate_segment()?;
267 }
268
269 if self.current_segment.is_none() {
271 self.open_new_segment()?;
272 }
273
274 let file = self
275 .current_segment
276 .as_mut()
277 .ok_or_else(|| RaftError::StorageError {
278 message: "no open segment after rotation".to_string(),
279 })?;
280
281 file.write_all(&encoded)
282 .map_err(|e| RaftError::StorageError {
283 message: format!("failed to write WAL entry: {e}"),
284 })?;
285
286 self.current_segment_size += encoded_len;
287 self.writes_since_sync += 1;
288
289 self.maybe_sync()?;
290
291 Ok(())
292 }
293
294 pub fn sync(&mut self) -> RaftResult<()> {
296 if let Some(ref file) = self.current_segment {
297 file.sync_data().map_err(|e| RaftError::StorageError {
298 message: format!("failed to fsync WAL: {e}"),
299 })?;
300 self.writes_since_sync = 0;
301 }
302 Ok(())
303 }
304
305 pub fn truncate_from(&mut self, from_index: LogIndex) -> RaftResult<()> {
311 self.current_segment = None;
313
314 let reader = WalReader::new(&self.dir);
315 let all_entries = reader.recover()?;
316 let kept: Vec<&LogEntry> = all_entries
317 .iter()
318 .filter(|e| e.index < from_index)
319 .collect();
320
321 let segments = list_segments(&self.dir)?;
323 for seg_id in &segments {
324 let path = segment_path(&self.dir, *seg_id);
325 let _ = fs::remove_file(&path);
326 }
327
328 self.current_segment_id = 0;
330 self.current_segment_size = 0;
331 self.writes_since_sync = 0;
332
333 for entry in kept {
335 self.append(entry)?;
336 }
337
338 self.sync()?;
339
340 Ok(())
341 }
342
343 fn open_new_segment(&mut self) -> RaftResult<()> {
346 let path = segment_path(&self.dir, self.current_segment_id);
347 let mut file = File::create(&path).map_err(|e| RaftError::StorageError {
348 message: format!("failed to create segment {}: {e}", path.display()),
349 })?;
350
351 let header = SegmentHeader::new(self.current_segment_id);
352 file.write_all(&header.encode())
353 .map_err(|e| RaftError::StorageError {
354 message: format!("failed to write segment header: {e}"),
355 })?;
356
357 self.current_segment_size = SEGMENT_HEADER_SIZE as u64;
358 self.current_segment = Some(file);
359 Ok(())
360 }
361
362 fn rotate_segment(&mut self) -> RaftResult<()> {
363 self.sync()?;
365 self.current_segment = None;
366 self.current_segment_id += 1;
367 self.open_new_segment()?;
368 Ok(())
369 }
370
371 fn maybe_sync(&mut self) -> RaftResult<()> {
372 match &self.sync_mode {
373 SyncMode::EveryWrite => self.sync(),
374 SyncMode::Batched(n) => {
375 if self.writes_since_sync >= *n {
376 self.sync()
377 } else {
378 Ok(())
379 }
380 }
381 SyncMode::OsManaged => Ok(()),
382 }
383 }
384}
385
386pub struct WalReader {
395 dir: PathBuf,
396}
397
398impl WalReader {
399 pub fn new(dir: &Path) -> Self {
401 Self {
402 dir: dir.to_path_buf(),
403 }
404 }
405
406 pub fn read_all(&self) -> RaftResult<Vec<LogEntry>> {
411 let segments = list_segments(&self.dir)?;
412 let mut all_entries = Vec::new();
413
414 for seg_id in segments {
415 let path = segment_path(&self.dir, seg_id);
416 let data = read_segment_file(&path)?;
417
418 if data.len() < SEGMENT_HEADER_SIZE {
419 return Err(RaftError::StorageError {
420 message: format!(
421 "segment {} too small ({} bytes)",
422 path.display(),
423 data.len()
424 ),
425 });
426 }
427
428 let header = SegmentHeader::decode(&data[..SEGMENT_HEADER_SIZE])?;
430
431 let entries = decode_entries(&data[SEGMENT_HEADER_SIZE..], false, header.is_v1())?;
433 all_entries.extend(entries);
434 }
435
436 Ok(all_entries)
437 }
438
439 pub fn recover(&self) -> RaftResult<Vec<LogEntry>> {
450 let (entries, _diag) = self.recover_with_policy(CorruptionPolicy::TruncateToLastGood)?;
451 Ok(entries)
452 }
453
454 pub fn recover_with_policy(
459 &self,
460 policy: CorruptionPolicy,
461 ) -> RaftResult<(Vec<LogEntry>, WalDiagnostics)> {
462 let segments = list_segments(&self.dir)?;
463 let seg_count = segments.len();
464 let mut all_entries = Vec::new();
465 let mut diag = WalDiagnostics::default();
466
467 for (i, seg_id) in segments.into_iter().enumerate() {
468 let path = segment_path(&self.dir, seg_id);
469 let data = read_segment_file(&path)?;
470
471 if data.len() < SEGMENT_HEADER_SIZE {
472 if i == seg_count - 1 {
473 diag.truncated_segments += 1;
474 tracing::warn!(
475 segment_id = seg_id,
476 bytes = data.len(),
477 "skipping incomplete final segment header"
478 );
479 break;
480 }
481 return Err(RaftError::StorageError {
482 message: format!(
483 "segment {} too small ({} bytes)",
484 path.display(),
485 data.len()
486 ),
487 });
488 }
489
490 let header = SegmentHeader::decode(&data[..SEGMENT_HEADER_SIZE])?;
492
493 let is_last = i == seg_count - 1;
494 let (entries, seg_diag) = decode_entries_with_policy(
495 &data[SEGMENT_HEADER_SIZE..],
496 is_last,
497 policy,
498 seg_id,
499 header.is_v1(),
500 )?;
501 diag.valid_entries += seg_diag.valid_entries;
502 diag.corrupt_entries += seg_diag.corrupt_entries;
503 diag.truncated_segments += seg_diag.truncated_segments;
504 diag.recovered_bytes += seg_diag.recovered_bytes;
505 all_entries.extend(entries);
506 }
507
508 Ok((all_entries, diag))
509 }
510}
511
512fn encode_entry(entry: &LogEntry) -> Vec<u8> {
524 let cmd_bytes = &entry.command.data;
525 let payload_len = 8 + 8 + 4 + cmd_bytes.len() + 8 + 4;
527
528 let mut buf = Vec::with_capacity(4 + payload_len);
529
530 buf.extend_from_slice(&(payload_len as u32).to_le_bytes());
532 buf.extend_from_slice(&entry.term.to_le_bytes());
534 buf.extend_from_slice(&entry.index.to_le_bytes());
536 buf.extend_from_slice(&(cmd_bytes.len() as u32).to_le_bytes());
538 buf.extend_from_slice(cmd_bytes);
540 buf.extend_from_slice(&entry.fencing_token.to_le_bytes());
542 let crc = crc32fast::hash(&buf[4..]);
544 buf.extend_from_slice(&crc.to_le_bytes());
545
546 buf
547}
548
549fn decode_entries(data: &[u8], lenient_tail: bool, is_v1: bool) -> RaftResult<Vec<LogEntry>> {
556 let mut entries = Vec::new();
557 let mut pos = 0;
558
559 while pos + 4 <= data.len() {
560 let entry_len = u32::from_le_bytes(read_4(data, pos)?) as usize;
561
562 if pos + 4 + entry_len > data.len() {
564 if lenient_tail {
565 break; }
567 return Err(RaftError::StorageError {
568 message: format!(
569 "truncated entry at offset {pos}: need {} more bytes",
570 (pos + 4 + entry_len) - data.len()
571 ),
572 });
573 }
574
575 let record_start = pos + 4;
576 let record_end = record_start + entry_len;
577 let record = &data[record_start..record_end];
578
579 if entry_len < 4 {
580 if lenient_tail && record_end >= data.len() {
581 break;
582 }
583 return Err(RaftError::StorageError {
584 message: format!("entry_len too small ({entry_len}) at offset {pos}"),
585 });
586 }
587
588 let payload = &record[..entry_len - 4];
589 let stored_crc = u32::from_le_bytes(read_4(record, entry_len - 4)?);
590 let computed_crc = crc32fast::hash(payload);
591
592 if stored_crc != computed_crc {
593 if lenient_tail && record_end >= data.len() {
594 break; }
596 return Err(RaftError::StorageError {
597 message: format!(
598 "CRC mismatch at offset {pos}: stored={stored_crc:#010x}, computed={computed_crc:#010x}"
599 ),
600 });
601 }
602
603 let entry = parse_payload(payload, is_v1, pos)?;
604 entries.push(entry);
605
606 pos = record_end;
607 }
608
609 Ok(entries)
610}
611
612fn parse_payload(payload: &[u8], is_v1: bool, offset: usize) -> RaftResult<LogEntry> {
617 let min_len = if is_v1 { 20 } else { 28 };
618 if payload.len() < min_len {
619 return Err(RaftError::StorageError {
620 message: format!("record payload too short at offset {offset}"),
621 });
622 }
623
624 let term = u64::from_le_bytes(read_8(payload, 0)?);
625 let index = u64::from_le_bytes(read_8(payload, 8)?);
626 let cmd_len = u32::from_le_bytes(read_4(payload, 16)?) as usize;
627
628 let cmd_end = 20 + cmd_len;
629 if payload.len() < cmd_end {
630 return Err(RaftError::StorageError {
631 message: format!("cmd_len exceeds record at offset {offset}"),
632 });
633 }
634
635 let cmd_data = payload[20..cmd_end].to_vec();
636
637 let fencing_token = if is_v1 {
638 0u64
639 } else {
640 if payload.len() < cmd_end + 8 {
641 return Err(RaftError::StorageError {
642 message: format!("missing fencing_token bytes at offset {offset}"),
643 });
644 }
645 u64::from_le_bytes(read_8(payload, cmd_end)?)
646 };
647
648 Ok(LogEntry::with_fencing_token(
649 term,
650 index,
651 Command::new(cmd_data),
652 fencing_token,
653 ))
654}
655
656fn decode_entries_with_policy(
662 data: &[u8],
663 lenient_tail: bool,
664 policy: CorruptionPolicy,
665 segment_id: u32,
666 is_v1: bool,
667) -> RaftResult<(Vec<LogEntry>, WalDiagnostics)> {
668 let mut entries = Vec::new();
669 let mut diag = WalDiagnostics::default();
670 let mut pos = 0;
671 let mut entry_idx: u64 = 0;
672
673 while pos + 4 <= data.len() {
674 let entry_len = u32::from_le_bytes(read_4(data, pos)?) as usize;
675
676 if pos + 4 + entry_len > data.len() {
678 if lenient_tail {
679 diag.truncated_segments += 1;
680 tracing::warn!(
681 segment_id,
682 entry_idx,
683 offset = pos,
684 "partial trailing entry discarded"
685 );
686 break;
687 }
688 return Err(RaftError::StorageError {
689 message: format!(
690 "truncated entry at offset {pos}: need {} more bytes",
691 (pos + 4 + entry_len) - data.len()
692 ),
693 });
694 }
695
696 let record_start = pos + 4;
697 let record_end = record_start + entry_len;
698 let record = &data[record_start..record_end];
699 let record_total_bytes = 4 + entry_len;
700
701 if entry_len < 4 {
702 if lenient_tail && record_end >= data.len() {
703 diag.truncated_segments += 1;
704 break;
705 }
706 return Err(RaftError::StorageError {
707 message: format!("entry_len too small ({entry_len}) at offset {pos}"),
708 });
709 }
710
711 let payload = &record[..entry_len - 4];
712 let stored_crc = u32::from_le_bytes(read_4(record, entry_len - 4)?);
713 let computed_crc = crc32fast::hash(payload);
714
715 if stored_crc != computed_crc {
716 tracing::warn!(
717 segment_id,
718 entry_idx,
719 offset = pos,
720 stored_crc = format_args!("{stored_crc:#010x}"),
721 computed_crc = format_args!("{computed_crc:#010x}"),
722 policy = ?policy,
723 "CRC mismatch detected"
724 );
725 diag.corrupt_entries += 1;
726
727 match policy {
728 CorruptionPolicy::RefuseStart => {
729 return Err(RaftError::StorageError {
730 message: format!(
731 "CRC mismatch at segment {segment_id}, offset {pos}: \
732 stored={stored_crc:#010x}, computed={computed_crc:#010x}"
733 ),
734 });
735 }
736 CorruptionPolicy::TruncateToLastGood => {
737 tracing::warn!(
738 segment_id,
739 entry_idx,
740 offset = pos,
741 "truncating WAL at corruption point"
742 );
743 break;
744 }
745 CorruptionPolicy::AlertAndContinue => {
746 tracing::warn!(
747 segment_id,
748 entry_idx,
749 offset = pos,
750 "skipping corrupted entry (AlertAndContinue)"
751 );
752 pos = record_end;
753 entry_idx += 1;
754 continue;
755 }
756 }
757 }
758
759 let entry = parse_payload(payload, is_v1, pos)?;
760 entries.push(entry);
761
762 diag.valid_entries += 1;
763 diag.recovered_bytes += record_total_bytes as u64;
764 pos = record_end;
765 entry_idx += 1;
766 }
767
768 Ok((entries, diag))
769}
770
771fn segment_path(dir: &Path, segment_id: u32) -> PathBuf {
777 dir.join(format!("wal-{segment_id:08}.seg"))
778}
779
780fn list_segments(dir: &Path) -> RaftResult<Vec<u32>> {
782 if !dir.exists() {
783 return Ok(Vec::new());
784 }
785
786 let mut ids: Vec<u32> = Vec::new();
787 let read_dir = fs::read_dir(dir).map_err(|e| RaftError::StorageError {
788 message: format!("failed to read WAL dir {}: {e}", dir.display()),
789 })?;
790
791 for entry in read_dir {
792 let entry = entry.map_err(|e| RaftError::StorageError {
793 message: format!("failed to read dir entry: {e}"),
794 })?;
795 let name = entry.file_name();
796 let name_str = name.to_string_lossy();
797 if let Some(id) = parse_segment_name(&name_str) {
798 ids.push(id);
799 }
800 }
801
802 ids.sort_unstable();
803 Ok(ids)
804}
805
806fn parse_segment_name(name: &str) -> Option<u32> {
808 let rest = name.strip_prefix("wal-")?;
809 let digits = rest.strip_suffix(".seg")?;
810 digits.parse::<u32>().ok()
811}
812
813fn read_segment_file(path: &Path) -> RaftResult<Vec<u8>> {
815 let mut file = File::open(path).map_err(|e| RaftError::StorageError {
816 message: format!("failed to open segment {}: {e}", path.display()),
817 })?;
818 let mut data = Vec::new();
819 file.read_to_end(&mut data)
820 .map_err(|e| RaftError::StorageError {
821 message: format!("failed to read segment {}: {e}", path.display()),
822 })?;
823 Ok(data)
824}
825
826fn read_4(data: &[u8], offset: usize) -> RaftResult<[u8; 4]> {
831 data.get(offset..offset + 4)
832 .and_then(|s| s.try_into().ok())
833 .ok_or_else(|| RaftError::StorageError {
834 message: format!("unexpected EOF reading 4 bytes at offset {offset}"),
835 })
836}
837
838fn read_8(data: &[u8], offset: usize) -> RaftResult<[u8; 8]> {
839 data.get(offset..offset + 8)
840 .and_then(|s| s.try_into().ok())
841 .ok_or_else(|| RaftError::StorageError {
842 message: format!("unexpected EOF reading 8 bytes at offset {offset}"),
843 })
844}
845
846#[cfg(test)]
851mod tests {
852 use super::*;
853 use crate::log::Command;
854
855 fn test_wal_dir(name: &str) -> PathBuf {
857 let dir = std::env::temp_dir().join(format!(
858 "amaters_wal_test_{name}_{}",
859 std::time::SystemTime::now()
860 .duration_since(std::time::UNIX_EPOCH)
861 .map(|d| d.as_nanos())
862 .unwrap_or(0)
863 ));
864 let _ = fs::remove_dir_all(&dir);
865 dir
866 }
867
868 fn make_entry(term: u64, index: u64, payload: &str) -> LogEntry {
870 LogEntry::new(term, index, Command::new(payload.as_bytes().to_vec()))
871 }
872
873 #[test]
874 fn test_wal_append_and_read_back() {
875 let dir = test_wal_dir("append_read");
876 let mut writer =
877 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
878
879 for i in 1..=10 {
880 let entry = make_entry(1, i, &format!("cmd-{i}"));
881 writer.append(&entry).expect("append");
882 }
883
884 let reader = WalReader::new(&dir);
885 let entries = reader.read_all().expect("read_all");
886 assert_eq!(entries.len(), 10);
887
888 for (i, entry) in entries.iter().enumerate() {
889 let idx = (i + 1) as u64;
890 assert_eq!(entry.term, 1);
891 assert_eq!(entry.index, idx);
892 assert_eq!(entry.command.data, format!("cmd-{idx}").as_bytes().to_vec());
893 }
894
895 let _ = fs::remove_dir_all(&dir);
896 }
897
898 #[test]
899 fn test_wal_crc_corruption_detection() {
900 let dir = test_wal_dir("crc_corrupt");
901 let mut writer =
902 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
903
904 for i in 1..=5 {
905 writer
906 .append(&make_entry(1, i, &format!("data-{i}")))
907 .expect("append");
908 }
909
910 let segments = list_segments(&dir).expect("list");
912 assert!(!segments.is_empty());
913 let seg_path = segment_path(&dir, segments[0]);
914
915 let mut data = fs::read(&seg_path).expect("read seg");
916 let corrupt_offset = SEGMENT_HEADER_SIZE + 10;
918 if corrupt_offset < data.len() {
919 data[corrupt_offset] ^= 0xFF;
920 }
921 fs::write(&seg_path, &data).expect("write corrupted");
922
923 let reader = WalReader::new(&dir);
925 assert!(reader.read_all().is_err());
926
927 let result = reader.recover();
933 assert!(result.is_ok() || result.is_err());
936
937 let _ = fs::remove_dir_all(&dir);
938 }
939
940 #[test]
941 fn test_wal_segment_rotation() {
942 let dir = test_wal_dir("rotation");
943 let mut writer = WalWriter::new(&dir, SyncMode::EveryWrite, 256).expect("writer");
945
946 for i in 1..=20 {
947 writer
948 .append(&make_entry(1, i, &format!("rot-{i}")))
949 .expect("append");
950 }
951
952 let segments = list_segments(&dir).expect("list");
953 assert!(
954 segments.len() > 1,
955 "expected multiple segments, got {}",
956 segments.len()
957 );
958
959 let reader = WalReader::new(&dir);
960 let entries = reader.read_all().expect("read_all");
961 assert_eq!(entries.len(), 20);
962 for (i, entry) in entries.iter().enumerate() {
963 assert_eq!(entry.index, (i + 1) as u64);
964 }
965
966 let _ = fs::remove_dir_all(&dir);
967 }
968
969 #[test]
970 fn test_wal_crash_recovery() {
971 let dir = test_wal_dir("crash_recovery");
972 let mut writer =
973 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
974
975 for i in 1..=5 {
976 writer
977 .append(&make_entry(1, i, &format!("ok-{i}")))
978 .expect("append");
979 }
980
981 let segments = list_segments(&dir).expect("list");
984 let seg_path = segment_path(&dir, *segments.last().expect("last seg"));
985
986 {
987 let mut f = OpenOptions::new()
988 .append(true)
989 .open(&seg_path)
990 .expect("open for partial write");
991 let fake_len: u32 = 100;
994 f.write_all(&fake_len.to_le_bytes())
995 .expect("write partial len");
996 f.write_all(&[0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE])
997 .expect("write partial data");
998 }
999
1000 let reader = WalReader::new(&dir);
1002 assert!(reader.read_all().is_err());
1003
1004 let recovered = reader.recover().expect("recover");
1006 assert_eq!(recovered.len(), 5);
1007 for (i, entry) in recovered.iter().enumerate() {
1008 assert_eq!(entry.index, (i + 1) as u64);
1009 }
1010
1011 let _ = fs::remove_dir_all(&dir);
1012 }
1013
1014 #[test]
1015 fn test_wal_empty_startup() {
1016 let dir = test_wal_dir("empty");
1017 let _writer =
1018 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1019
1020 let reader = WalReader::new(&dir);
1021 let entries = reader.read_all().expect("read_all");
1022 assert!(entries.is_empty());
1023
1024 let recovered = reader.recover().expect("recover");
1026 assert!(recovered.is_empty());
1027
1028 let _ = fs::remove_dir_all(&dir);
1029 }
1030
1031 #[test]
1032 fn test_wal_truncate_from() {
1033 let dir = test_wal_dir("truncate");
1034 let mut writer =
1035 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1036
1037 for i in 1..=10 {
1038 writer
1039 .append(&make_entry(1, i, &format!("entry-{i}")))
1040 .expect("append");
1041 }
1042
1043 writer.truncate_from(6).expect("truncate_from(6)");
1044
1045 let reader = WalReader::new(&dir);
1046 let entries = reader.read_all().expect("read_all");
1047 assert_eq!(entries.len(), 5);
1048 for (i, entry) in entries.iter().enumerate() {
1049 let idx = (i + 1) as u64;
1050 assert_eq!(entry.index, idx);
1051 assert_eq!(
1052 entry.command.data,
1053 format!("entry-{idx}").as_bytes().to_vec()
1054 );
1055 }
1056
1057 let _ = fs::remove_dir_all(&dir);
1058 }
1059
1060 fn write_entries(dir: &Path, count: u64) -> PathBuf {
1066 let mut writer =
1067 WalWriter::new(dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1068 for i in 1..=count {
1069 writer
1070 .append(&make_entry(1, i, &format!("payload-{i}")))
1071 .expect("append");
1072 }
1073 let segs = list_segments(dir).expect("list");
1074 segment_path(dir, *segs.last().expect("segment"))
1075 }
1076
1077 fn corrupt_entry_n(seg_path: &Path, entry_number: usize) {
1081 let mut data = fs::read(seg_path).expect("read segment");
1082 let mut pos = SEGMENT_HEADER_SIZE;
1083 for n in 1..=entry_number {
1084 let entry_len =
1085 u32::from_le_bytes(data[pos..pos + 4].try_into().expect("4 bytes")) as usize;
1086 if n == entry_number {
1087 let payload_start = pos + 4;
1089 let flip_offset = payload_start + 2;
1090 data[flip_offset] ^= 0xFF;
1091 break;
1092 }
1093 pos += 4 + entry_len;
1094 }
1095 fs::write(seg_path, &data).expect("write corrupted");
1096 }
1097
1098 #[test]
1101 fn test_wal_corrupted_refuse_start() {
1102 let dir = test_wal_dir("wal_corrupted_refuse_start");
1103 let seg_path = write_entries(&dir, 5);
1104 corrupt_entry_n(&seg_path, 3);
1105
1106 let reader = WalReader::new(&dir);
1107 let result = reader.recover_with_policy(CorruptionPolicy::RefuseStart);
1108 assert!(
1109 result.is_err(),
1110 "RefuseStart should return error on corruption"
1111 );
1112
1113 let _ = fs::remove_dir_all(&dir);
1114 }
1115
1116 #[test]
1117 fn test_wal_corrupted_truncate() {
1118 let dir = test_wal_dir("wal_corrupted_truncate");
1119 let seg_path = write_entries(&dir, 5);
1120 corrupt_entry_n(&seg_path, 3);
1122
1123 let reader = WalReader::new(&dir);
1124 let (entries, diag) = reader
1125 .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1126 .expect("recover");
1127
1128 assert_eq!(
1129 entries.len(),
1130 2,
1131 "TruncateToLastGood: keep entries before corruption"
1132 );
1133 assert_eq!(diag.valid_entries, 2);
1134 assert_eq!(diag.corrupt_entries, 1);
1135 assert!(diag.recovered_bytes > 0);
1136
1137 let _ = fs::remove_dir_all(&dir);
1138 }
1139
1140 #[test]
1141 fn test_wal_corrupted_alert_continue() {
1142 let dir = test_wal_dir("wal_corrupted_alert_continue");
1143 let seg_path = write_entries(&dir, 5);
1144 corrupt_entry_n(&seg_path, 2);
1146
1147 let reader = WalReader::new(&dir);
1148 let (entries, diag) = reader
1149 .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1150 .expect("recover");
1151
1152 assert_eq!(
1153 entries.len(),
1154 4,
1155 "AlertAndContinue: skip only the corrupted entry"
1156 );
1157 assert_eq!(diag.corrupt_entries, 1);
1158 assert_eq!(diag.valid_entries, 4);
1159
1160 let indices: Vec<u64> = entries.iter().map(|e| e.index).collect();
1162 assert_eq!(indices, vec![1, 3, 4, 5]);
1163
1164 let _ = fs::remove_dir_all(&dir);
1165 }
1166
1167 #[test]
1170 fn test_corruption_policy_refuse_start_inner() {
1171 let dir = test_wal_dir("corruption_refuse_start_inner");
1172 let seg_path = write_entries(&dir, 5);
1173 corrupt_entry_n(&seg_path, 3);
1174
1175 let reader = WalReader::new(&dir);
1176 let result = reader.recover_with_policy(CorruptionPolicy::RefuseStart);
1177 assert!(
1178 result.is_err(),
1179 "RefuseStart should return error on corruption"
1180 );
1181
1182 let _ = fs::remove_dir_all(&dir);
1183 }
1184
1185 #[test]
1186 fn test_corruption_policy_truncate_to_last_good() {
1187 let dir = test_wal_dir("corruption_truncate_last_good");
1188 let seg_path = write_entries(&dir, 5);
1189 corrupt_entry_n(&seg_path, 3);
1191
1192 let reader = WalReader::new(&dir);
1193 let (entries, diag) = reader
1194 .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1195 .expect("recover");
1196
1197 assert_eq!(entries.len(), 2, "should keep entries before corruption");
1198 assert_eq!(diag.valid_entries, 2);
1199 assert_eq!(diag.corrupt_entries, 1);
1200 assert!(diag.recovered_bytes > 0);
1201
1202 let _ = fs::remove_dir_all(&dir);
1203 }
1204
1205 #[test]
1206 fn test_corruption_policy_alert_and_continue() {
1207 let dir = test_wal_dir("corruption_alert_continue");
1208 let seg_path = write_entries(&dir, 5);
1209 corrupt_entry_n(&seg_path, 2);
1211
1212 let reader = WalReader::new(&dir);
1213 let (entries, diag) = reader
1214 .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1215 .expect("recover");
1216
1217 assert_eq!(entries.len(), 4, "should skip only the corrupted entry");
1218 assert_eq!(diag.corrupt_entries, 1);
1219 assert_eq!(diag.valid_entries, 4);
1220
1221 let indices: Vec<u64> = entries.iter().map(|e| e.index).collect();
1223 assert_eq!(indices, vec![1, 3, 4, 5]);
1224
1225 let _ = fs::remove_dir_all(&dir);
1226 }
1227
1228 #[test]
1229 fn test_corruption_policy_first_entry() {
1230 let dir = test_wal_dir("corruption_first");
1231 let seg_path = write_entries(&dir, 5);
1232 corrupt_entry_n(&seg_path, 1);
1233
1234 let reader = WalReader::new(&dir);
1235
1236 let (entries, diag) = reader
1238 .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1239 .expect("recover");
1240 assert_eq!(entries.len(), 4);
1241 assert_eq!(diag.corrupt_entries, 1);
1242
1243 let (entries, diag) = reader
1245 .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1246 .expect("recover");
1247 assert_eq!(entries.len(), 0);
1248 assert_eq!(diag.corrupt_entries, 1);
1249
1250 let _ = fs::remove_dir_all(&dir);
1251 }
1252
1253 #[test]
1254 fn test_corruption_policy_last_entry() {
1255 let dir = test_wal_dir("corruption_last");
1256 let seg_path = write_entries(&dir, 5);
1257 corrupt_entry_n(&seg_path, 5);
1258
1259 let reader = WalReader::new(&dir);
1260
1261 let (entries, diag) = reader
1263 .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1264 .expect("recover");
1265 assert_eq!(entries.len(), 4);
1266 assert_eq!(diag.corrupt_entries, 1);
1267
1268 let (entries, diag) = reader
1270 .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1271 .expect("recover");
1272 assert_eq!(entries.len(), 4);
1273 assert_eq!(diag.corrupt_entries, 1);
1274
1275 let _ = fs::remove_dir_all(&dir);
1276 }
1277
1278 #[test]
1279 fn test_corruption_diagnostics_no_corruption() {
1280 let dir = test_wal_dir("diag_clean");
1281 write_entries(&dir, 10);
1282
1283 let reader = WalReader::new(&dir);
1284 let (entries, diag) = reader
1285 .recover_with_policy(CorruptionPolicy::RefuseStart)
1286 .expect("recover");
1287 assert_eq!(entries.len(), 10);
1288 assert_eq!(diag.valid_entries, 10);
1289 assert_eq!(diag.corrupt_entries, 0);
1290 assert_eq!(diag.truncated_segments, 0);
1291 assert!(diag.recovered_bytes > 0);
1292
1293 let _ = fs::remove_dir_all(&dir);
1294 }
1295
1296 #[test]
1297 fn test_corruption_recover_backward_compat() {
1298 let dir = test_wal_dir("corruption_compat");
1300 let seg_path = write_entries(&dir, 5);
1301 corrupt_entry_n(&seg_path, 3);
1302
1303 let reader = WalReader::new(&dir);
1304 let entries = reader.recover().expect("recover");
1305 assert_eq!(entries.len(), 2);
1307
1308 let _ = fs::remove_dir_all(&dir);
1309 }
1310
1311 #[test]
1312 fn test_wal_sync_modes() {
1313 {
1315 let dir = test_wal_dir("sync_every");
1316 let mut writer = WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE)
1317 .expect("writer");
1318 for i in 1..=5 {
1319 writer.append(&make_entry(1, i, "a")).expect("append");
1320 }
1321 let reader = WalReader::new(&dir);
1322 assert_eq!(reader.read_all().expect("read").len(), 5);
1323 let _ = fs::remove_dir_all(&dir);
1324 }
1325
1326 {
1328 let dir = test_wal_dir("sync_os");
1329 let mut writer = WalWriter::new(&dir, SyncMode::OsManaged, DEFAULT_MAX_SEGMENT_SIZE)
1330 .expect("writer");
1331 for i in 1..=5 {
1332 writer.append(&make_entry(1, i, "b")).expect("append");
1333 }
1334 writer.sync().expect("manual sync");
1335 let reader = WalReader::new(&dir);
1336 assert_eq!(reader.read_all().expect("read").len(), 5);
1337 let _ = fs::remove_dir_all(&dir);
1338 }
1339
1340 {
1342 let dir = test_wal_dir("sync_batched");
1343 let mut writer = WalWriter::new(&dir, SyncMode::Batched(3), DEFAULT_MAX_SEGMENT_SIZE)
1344 .expect("writer");
1345 for i in 1..=7 {
1346 writer.append(&make_entry(1, i, "c")).expect("append");
1347 }
1348 writer.sync().expect("final sync");
1349 let reader = WalReader::new(&dir);
1350 assert_eq!(reader.read_all().expect("read").len(), 7);
1351 let _ = fs::remove_dir_all(&dir);
1352 }
1353 }
1354
1355 #[test]
1361 fn test_wal_v2_fencing_token_roundtrip() {
1362 use crate::log::Command;
1363 let dir = test_wal_dir("v2_token_roundtrip");
1364 let mut writer =
1365 WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1366
1367 let token_raw: u64 = ((3u64) << 32) | 7u64; let entry = LogEntry::with_fencing_token(1, 1, Command::new(b"hello".to_vec()), token_raw);
1369 writer.append(&entry).expect("append");
1370
1371 let reader = WalReader::new(&dir);
1372 let entries = reader.read_all().expect("read_all");
1373 assert_eq!(entries.len(), 1);
1374 assert_eq!(entries[0].fencing_token, token_raw);
1375 assert_eq!(entries[0].command.data, b"hello");
1376
1377 let _ = fs::remove_dir_all(&dir);
1378 }
1379
1380 #[test]
1382 fn test_wal_v1_backward_compat_read() {
1383 let dir = test_wal_dir("v1_compat");
1384 fs::create_dir_all(&dir).expect("mkdir");
1385 let seg_path = dir.join("wal-00000000.seg");
1386
1387 let mut buf: Vec<u8> = Vec::new();
1389
1390 buf.extend_from_slice(&WAL_MAGIC_V1.to_le_bytes()); buf.extend_from_slice(&WAL_VERSION_V1.to_le_bytes()); buf.extend_from_slice(&0u32.to_le_bytes()); let cmd = b"v1cmd";
1397 let term: u64 = 1;
1398 let index: u64 = 1;
1399
1400 let mut payload: Vec<u8> = Vec::new();
1401 payload.extend_from_slice(&term.to_le_bytes());
1402 payload.extend_from_slice(&index.to_le_bytes());
1403 payload.extend_from_slice(&(cmd.len() as u32).to_le_bytes());
1404 payload.extend_from_slice(cmd);
1405 let crc = crc32fast::hash(&payload);
1406
1407 let entry_len = (payload.len() + 4) as u32; buf.extend_from_slice(&entry_len.to_le_bytes());
1409 buf.extend_from_slice(&payload);
1410 buf.extend_from_slice(&crc.to_le_bytes());
1411
1412 fs::write(&seg_path, &buf).expect("write v1 segment");
1413
1414 let reader = WalReader::new(&dir);
1415 let entries = reader.read_all().expect("read v1 segment");
1416 assert_eq!(entries.len(), 1);
1417 assert_eq!(entries[0].term, 1);
1418 assert_eq!(entries[0].index, 1);
1419 assert_eq!(entries[0].command.data, b"v1cmd");
1420 assert_eq!(entries[0].fencing_token, 0);
1422
1423 let _ = fs::remove_dir_all(&dir);
1424 }
1425}