Skip to main content

amaters_cluster/
wal.rs

1//! Segment-based Write-Ahead Log (WAL) with CRC32 integrity and fsync.
2//!
3//! Provides crash-safe, durable logging for Raft consensus. Each segment
4//! file contains a header followed by length-prefixed, CRC32-checksummed
5//! entries.  [`WalWriter`] appends entries with configurable fsync
6//! behaviour, while [`WalReader`] iterates over all segments and validates
7//! integrity.
8//!
9//! # Release notes
10//!
11//! ## WAL v2 (current)
12//! - Added 8-byte packed fencing token per log entry.
13//! - WAL magic bumped from `WAL1` (0x57414C31) to `WAL2` (0x57414C32).
14//! - WAL v1 segments are still readable: the missing token field is filled with 0.
15//! - [`CorruptionPolicy`] variants renamed: `SkipCorrupted` → `AlertAndContinue`,
16//!   `TruncateAtCorruption` → `TruncateToLastGood`, `FailHard` → `RefuseStart`.
17//!
18//! # On-disk format
19//!
20//! ## Segment header (12 bytes)
21//!
22//! ```text
23//! [magic: u32 LE][version: u32 LE][segment_id: u32 LE]
24//! ```
25//!
26//! ## Entry format (WAL v2)
27//!
28//! ```text
29//! [entry_len: u32 LE][term: u64 LE][index: u64 LE][cmd_len: u32 LE][cmd: N bytes][fencing_token: u64 LE][crc32: u32 LE]
30//! ```
31//!
32//! ## Entry format (WAL v1, read-only compat)
33//!
34//! ```text
35//! [entry_len: u32 LE][term: u64 LE][index: u64 LE][cmd_len: u32 LE][cmd: N bytes][crc32: u32 LE]
36//! ```
37//!
38//! `entry_len` covers everything after itself (including the trailing CRC).
39//! For v2, the CRC is computed over `term + index + cmd_len + cmd + fencing_token`
40//! (i.e. all payload bytes excluding `entry_len` itself and the CRC).
41//! For v1 the fencing token bytes are absent from the CRC computation.
42
43use crate::error::{RaftError, RaftResult};
44use crate::log::{Command, LogEntry};
45use crate::types::LogIndex;
46
47use std::fs::{self, File, OpenOptions};
48use std::io::{Read, Write};
49use std::path::{Path, PathBuf};
50
51// ---------------------------------------------------------------------------
52// Constants
53// ---------------------------------------------------------------------------
54
55/// Magic bytes identifying a WAL v2 segment file (`"WAL2"` as little-endian u32).
56const WAL_MAGIC: u32 = 0x57414C32;
57
58/// Magic bytes for legacy WAL v1 segments (`"WAL1"` as little-endian u32).
59const WAL_MAGIC_V1: u32 = 0x57414C31;
60
61/// Current WAL on-disk format version.
62const WAL_VERSION: u32 = 2;
63
64/// Legacy WAL v1 format version (read-compat only).
65const WAL_VERSION_V1: u32 = 1;
66
67/// Size of the segment header in bytes: magic(4) + version(4) + segment_id(4).
68const SEGMENT_HEADER_SIZE: usize = 12;
69
70/// Default maximum segment size before rotation (64 MiB).
71const DEFAULT_MAX_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
72
73// ---------------------------------------------------------------------------
74// SyncMode
75// ---------------------------------------------------------------------------
76
77/// Controls when `fsync` is called on the WAL segment file.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum SyncMode {
80    /// Call `fsync` after every write.
81    EveryWrite,
82    /// Call `fsync` after every `n` writes.
83    Batched(usize),
84    /// Let the OS decide when to flush — no explicit `fsync`.
85    OsManaged,
86}
87
88// ---------------------------------------------------------------------------
89// CorruptionPolicy
90// ---------------------------------------------------------------------------
91
92/// Strategy for handling mid-segment corruption during WAL recovery.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CorruptionPolicy {
95    /// Skip corrupted entries and continue scanning for valid ones.
96    /// Previously called `SkipCorrupted`.
97    AlertAndContinue,
98    /// Truncate the log at the first encountered corruption (discard all
99    /// entries from that point onward).
100    /// Previously called `TruncateAtCorruption`.
101    TruncateToLastGood,
102    /// Fail immediately on any corruption (return an error).
103    /// Previously called `FailHard`.
104    RefuseStart,
105}
106
107// ---------------------------------------------------------------------------
108// WalDiagnostics
109// ---------------------------------------------------------------------------
110
111/// Diagnostic information collected during corruption-aware WAL recovery.
112#[derive(Debug, Clone, Default, PartialEq, Eq)]
113pub struct WalDiagnostics {
114    /// Number of entries that were valid and recovered.
115    pub valid_entries: u64,
116    /// Number of entries that had CRC mismatches or structural damage.
117    pub corrupt_entries: u64,
118    /// Number of segment tails that were truncated due to partial writes.
119    pub truncated_segments: u64,
120    /// Total bytes occupied by recovered (valid) entries.
121    pub recovered_bytes: u64,
122}
123
124// ---------------------------------------------------------------------------
125// Segment header
126// ---------------------------------------------------------------------------
127
128/// Header written at the start of every WAL segment file.
129struct SegmentHeader {
130    magic: u32,
131    version: u32,
132    segment_id: u32,
133}
134
135impl SegmentHeader {
136    fn new(segment_id: u32) -> Self {
137        Self {
138            magic: WAL_MAGIC,
139            version: WAL_VERSION,
140            segment_id,
141        }
142    }
143
144    fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
145        let mut buf = [0u8; SEGMENT_HEADER_SIZE];
146        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
147        buf[4..8].copy_from_slice(&self.version.to_le_bytes());
148        buf[8..12].copy_from_slice(&self.segment_id.to_le_bytes());
149        buf
150    }
151
152    fn decode(data: &[u8]) -> RaftResult<Self> {
153        if data.len() < SEGMENT_HEADER_SIZE {
154            return Err(RaftError::StorageError {
155                message: "segment header too short".to_string(),
156            });
157        }
158        let magic = u32::from_le_bytes(read_4(data, 0)?);
159        let version = u32::from_le_bytes(read_4(data, 4)?);
160        let segment_id = u32::from_le_bytes(read_4(data, 8)?);
161
162        // Accept both WAL v1 (legacy, read-only compat) and WAL v2 (current).
163        let accepted = (magic == WAL_MAGIC && version == WAL_VERSION)
164            || (magic == WAL_MAGIC_V1 && version == WAL_VERSION_V1);
165        if !accepted {
166            return Err(RaftError::StorageError {
167                message: format!(
168                    "bad WAL header: magic={magic:#010x}, version={version} \
169                     (expected WAL2/v2 or WAL1/v1)"
170                ),
171            });
172        }
173        Ok(Self {
174            magic,
175            version,
176            segment_id,
177        })
178    }
179
180    /// Returns `true` if this is a legacy v1 segment.
181    fn is_v1(&self) -> bool {
182        self.version == WAL_VERSION_V1
183    }
184}
185
186// ---------------------------------------------------------------------------
187// WalWriter
188// ---------------------------------------------------------------------------
189
190/// Appends [`LogEntry`] values to a segment-based WAL on disk.
191///
192/// Segment files are named `wal-{segment_id:08}.seg` inside the configured
193/// directory.  When the current segment exceeds
194/// `max_segment_size` a new segment is
195/// created automatically.
196pub struct WalWriter {
197    dir: PathBuf,
198    current_segment: Option<File>,
199    current_segment_id: u32,
200    current_segment_size: u64,
201    max_segment_size: u64,
202    sync_mode: SyncMode,
203    writes_since_sync: usize,
204}
205
206impl WalWriter {
207    /// Create a new writer rooted at `dir`.
208    ///
209    /// The directory is created if it does not exist.  Any existing segments
210    /// are discovered so that new writes continue from the latest segment.
211    pub fn new(dir: &Path, sync_mode: SyncMode, max_segment_size: u64) -> RaftResult<Self> {
212        fs::create_dir_all(dir).map_err(|e| RaftError::StorageError {
213            message: format!("failed to create WAL dir {}: {e}", dir.display()),
214        })?;
215
216        let max_segment_size = if max_segment_size == 0 {
217            DEFAULT_MAX_SEGMENT_SIZE
218        } else {
219            max_segment_size
220        };
221
222        let mut writer = Self {
223            dir: dir.to_path_buf(),
224            current_segment: None,
225            current_segment_id: 0,
226            current_segment_size: 0,
227            max_segment_size,
228            sync_mode,
229            writes_since_sync: 0,
230        };
231
232        // Discover existing segments
233        let existing = list_segments(dir)?;
234        if let Some(&last_id) = existing.last() {
235            writer.current_segment_id = last_id;
236            let path = segment_path(dir, last_id);
237            let meta = fs::metadata(&path).map_err(|e| RaftError::StorageError {
238                message: format!("failed to stat segment {}: {e}", path.display()),
239            })?;
240            writer.current_segment_size = meta.len();
241            let file = OpenOptions::new().append(true).open(&path).map_err(|e| {
242                RaftError::StorageError {
243                    message: format!("failed to open segment {}: {e}", path.display()),
244                }
245            })?;
246            writer.current_segment = Some(file);
247        }
248
249        Ok(writer)
250    }
251
252    /// Append a single log entry to the WAL.
253    ///
254    /// Rotates to a new segment when the current one exceeds the configured
255    /// maximum size.  Calls `fsync` according to the configured
256    /// [`SyncMode`].
257    pub fn append(&mut self, entry: &LogEntry) -> RaftResult<()> {
258        let encoded = encode_entry(entry);
259        let encoded_len = encoded.len() as u64;
260
261        // Rotate if necessary (but always allow at least one entry per segment)
262        if self.current_segment.is_some()
263            && self.current_segment_size + encoded_len > self.max_segment_size
264            && self.current_segment_size > SEGMENT_HEADER_SIZE as u64
265        {
266            self.rotate_segment()?;
267        }
268
269        // Ensure we have an open segment
270        if self.current_segment.is_none() {
271            self.open_new_segment()?;
272        }
273
274        let file = self
275            .current_segment
276            .as_mut()
277            .ok_or_else(|| RaftError::StorageError {
278                message: "no open segment after rotation".to_string(),
279            })?;
280
281        file.write_all(&encoded)
282            .map_err(|e| RaftError::StorageError {
283                message: format!("failed to write WAL entry: {e}"),
284            })?;
285
286        self.current_segment_size += encoded_len;
287        self.writes_since_sync += 1;
288
289        self.maybe_sync()?;
290
291        Ok(())
292    }
293
294    /// Force an `fsync` on the current segment, regardless of [`SyncMode`].
295    pub fn sync(&mut self) -> RaftResult<()> {
296        if let Some(ref file) = self.current_segment {
297            file.sync_data().map_err(|e| RaftError::StorageError {
298                message: format!("failed to fsync WAL: {e}"),
299            })?;
300            self.writes_since_sync = 0;
301        }
302        Ok(())
303    }
304
305    /// Truncate all entries with index >= `from_index`.
306    ///
307    /// This rewrites segment files, removing entries at or beyond the given
308    /// index.  After truncation the writer re-opens the last remaining
309    /// segment (or creates a fresh one if all entries were removed).
310    pub fn truncate_from(&mut self, from_index: LogIndex) -> RaftResult<()> {
311        // Close the current fd
312        self.current_segment = None;
313
314        let reader = WalReader::new(&self.dir);
315        let all_entries = reader.recover()?;
316        let kept: Vec<&LogEntry> = all_entries
317            .iter()
318            .filter(|e| e.index < from_index)
319            .collect();
320
321        // Remove all existing segment files
322        let segments = list_segments(&self.dir)?;
323        for seg_id in &segments {
324            let path = segment_path(&self.dir, *seg_id);
325            let _ = fs::remove_file(&path);
326        }
327
328        // Reset state
329        self.current_segment_id = 0;
330        self.current_segment_size = 0;
331        self.writes_since_sync = 0;
332
333        // Re-write kept entries
334        for entry in kept {
335            self.append(entry)?;
336        }
337
338        self.sync()?;
339
340        Ok(())
341    }
342
343    // -- private helpers --
344
345    fn open_new_segment(&mut self) -> RaftResult<()> {
346        let path = segment_path(&self.dir, self.current_segment_id);
347        let mut file = File::create(&path).map_err(|e| RaftError::StorageError {
348            message: format!("failed to create segment {}: {e}", path.display()),
349        })?;
350
351        let header = SegmentHeader::new(self.current_segment_id);
352        file.write_all(&header.encode())
353            .map_err(|e| RaftError::StorageError {
354                message: format!("failed to write segment header: {e}"),
355            })?;
356
357        self.current_segment_size = SEGMENT_HEADER_SIZE as u64;
358        self.current_segment = Some(file);
359        Ok(())
360    }
361
362    fn rotate_segment(&mut self) -> RaftResult<()> {
363        // Sync before closing
364        self.sync()?;
365        self.current_segment = None;
366        self.current_segment_id += 1;
367        self.open_new_segment()?;
368        Ok(())
369    }
370
371    fn maybe_sync(&mut self) -> RaftResult<()> {
372        match &self.sync_mode {
373            SyncMode::EveryWrite => self.sync(),
374            SyncMode::Batched(n) => {
375                if self.writes_since_sync >= *n {
376                    self.sync()
377                } else {
378                    Ok(())
379                }
380            }
381            SyncMode::OsManaged => Ok(()),
382        }
383    }
384}
385
386// ---------------------------------------------------------------------------
387// WalReader
388// ---------------------------------------------------------------------------
389
390/// Reads entries from all WAL segments in a directory.
391///
392/// Segments are processed in ascending `segment_id` order.  CRC32
393/// checksums are validated for every entry.
394pub struct WalReader {
395    dir: PathBuf,
396}
397
398impl WalReader {
399    /// Create a new reader for the WAL directory.
400    pub fn new(dir: &Path) -> Self {
401        Self {
402            dir: dir.to_path_buf(),
403        }
404    }
405
406    /// Read all entries from all segments, validating CRC for every entry.
407    ///
408    /// Returns an error if any corruption or CRC mismatch is detected.
409    /// Accepts both WAL v1 and WAL v2 segments transparently.
410    pub fn read_all(&self) -> RaftResult<Vec<LogEntry>> {
411        let segments = list_segments(&self.dir)?;
412        let mut all_entries = Vec::new();
413
414        for seg_id in segments {
415            let path = segment_path(&self.dir, seg_id);
416            let data = read_segment_file(&path)?;
417
418            if data.len() < SEGMENT_HEADER_SIZE {
419                return Err(RaftError::StorageError {
420                    message: format!(
421                        "segment {} too small ({} bytes)",
422                        path.display(),
423                        data.len()
424                    ),
425                });
426            }
427
428            // Validate header
429            let header = SegmentHeader::decode(&data[..SEGMENT_HEADER_SIZE])?;
430
431            // Decode entries — strict mode (errors on CRC mismatch)
432            let entries = decode_entries(&data[SEGMENT_HEADER_SIZE..], false, header.is_v1())?;
433            all_entries.extend(entries);
434        }
435
436        Ok(all_entries)
437    }
438
439    /// Recover entries from all segments, tolerating a partial or corrupted
440    /// final entry.
441    ///
442    /// Complete, CRC-valid entries are returned.  If the very last entry of
443    /// the very last segment is incomplete or has a bad CRC it is silently
444    /// discarded (crash recovery).  Corruption in the middle of a segment
445    /// still returns an error.
446    ///
447    /// Uses [`CorruptionPolicy::TruncateToLastGood`] for backward
448    /// compatibility.
449    pub fn recover(&self) -> RaftResult<Vec<LogEntry>> {
450        let (entries, _diag) = self.recover_with_policy(CorruptionPolicy::TruncateToLastGood)?;
451        Ok(entries)
452    }
453
454    /// Recover entries with an explicit [`CorruptionPolicy`].
455    ///
456    /// Returns the recovered entries together with [`WalDiagnostics`]
457    /// describing what was found.
458    pub fn recover_with_policy(
459        &self,
460        policy: CorruptionPolicy,
461    ) -> RaftResult<(Vec<LogEntry>, WalDiagnostics)> {
462        let segments = list_segments(&self.dir)?;
463        let seg_count = segments.len();
464        let mut all_entries = Vec::new();
465        let mut diag = WalDiagnostics::default();
466
467        for (i, seg_id) in segments.into_iter().enumerate() {
468            let path = segment_path(&self.dir, seg_id);
469            let data = read_segment_file(&path)?;
470
471            if data.len() < SEGMENT_HEADER_SIZE {
472                if i == seg_count - 1 {
473                    diag.truncated_segments += 1;
474                    tracing::warn!(
475                        segment_id = seg_id,
476                        bytes = data.len(),
477                        "skipping incomplete final segment header"
478                    );
479                    break;
480                }
481                return Err(RaftError::StorageError {
482                    message: format!(
483                        "segment {} too small ({} bytes)",
484                        path.display(),
485                        data.len()
486                    ),
487                });
488            }
489
490            // Validate header
491            let header = SegmentHeader::decode(&data[..SEGMENT_HEADER_SIZE])?;
492
493            let is_last = i == seg_count - 1;
494            let (entries, seg_diag) = decode_entries_with_policy(
495                &data[SEGMENT_HEADER_SIZE..],
496                is_last,
497                policy,
498                seg_id,
499                header.is_v1(),
500            )?;
501            diag.valid_entries += seg_diag.valid_entries;
502            diag.corrupt_entries += seg_diag.corrupt_entries;
503            diag.truncated_segments += seg_diag.truncated_segments;
504            diag.recovered_bytes += seg_diag.recovered_bytes;
505            all_entries.extend(entries);
506        }
507
508        Ok((all_entries, diag))
509    }
510}
511
512// ---------------------------------------------------------------------------
513// Encoding / decoding helpers
514// ---------------------------------------------------------------------------
515
516/// Encode a [`LogEntry`] to the WAL v2 on-disk binary format.
517///
518/// Format: `[entry_len:4 LE][term:8 LE][index:8 LE][cmd_len:4 LE][cmd:N][fencing_token:8 LE][crc32:4 LE]`
519///
520/// `entry_len` covers everything after the first 4 bytes up to and including
521/// the CRC.  The CRC is computed over all payload bytes (term, index, cmd_len,
522/// cmd, fencing_token) but *excludes* `entry_len` and the CRC itself.
523fn encode_entry(entry: &LogEntry) -> Vec<u8> {
524    let cmd_bytes = &entry.command.data;
525    // payload = term(8) + index(8) + cmd_len(4) + cmd(N) + fencing_token(8) + crc(4)
526    let payload_len = 8 + 8 + 4 + cmd_bytes.len() + 8 + 4;
527
528    let mut buf = Vec::with_capacity(4 + payload_len);
529
530    // entry_len (u32 LE) — everything after these 4 bytes
531    buf.extend_from_slice(&(payload_len as u32).to_le_bytes());
532    // term (u64 LE)
533    buf.extend_from_slice(&entry.term.to_le_bytes());
534    // index (u64 LE)
535    buf.extend_from_slice(&entry.index.to_le_bytes());
536    // cmd_len (u32 LE)
537    buf.extend_from_slice(&(cmd_bytes.len() as u32).to_le_bytes());
538    // cmd bytes
539    buf.extend_from_slice(cmd_bytes);
540    // fencing_token (u64 LE)
541    buf.extend_from_slice(&entry.fencing_token.to_le_bytes());
542    // crc32 over payload (everything between entry_len and here)
543    let crc = crc32fast::hash(&buf[4..]);
544    buf.extend_from_slice(&crc.to_le_bytes());
545
546    buf
547}
548
549/// Decode entries from raw bytes after the segment header.
550///
551/// When `lenient_tail` is `true`, a partial or CRC-bad final entry is
552/// silently discarded (crash recovery mode).  When `false`, any corruption
553/// returns an error.  Pass `is_v1 = true` for legacy WAL v1 segments that
554/// do not carry the 8-byte fencing token per entry.
555fn decode_entries(data: &[u8], lenient_tail: bool, is_v1: bool) -> RaftResult<Vec<LogEntry>> {
556    let mut entries = Vec::new();
557    let mut pos = 0;
558
559    while pos + 4 <= data.len() {
560        let entry_len = u32::from_le_bytes(read_4(data, pos)?) as usize;
561
562        // Do we have enough bytes for the full record?
563        if pos + 4 + entry_len > data.len() {
564            if lenient_tail {
565                break; // partial trailing entry — discard
566            }
567            return Err(RaftError::StorageError {
568                message: format!(
569                    "truncated entry at offset {pos}: need {} more bytes",
570                    (pos + 4 + entry_len) - data.len()
571                ),
572            });
573        }
574
575        let record_start = pos + 4;
576        let record_end = record_start + entry_len;
577        let record = &data[record_start..record_end];
578
579        if entry_len < 4 {
580            if lenient_tail && record_end >= data.len() {
581                break;
582            }
583            return Err(RaftError::StorageError {
584                message: format!("entry_len too small ({entry_len}) at offset {pos}"),
585            });
586        }
587
588        let payload = &record[..entry_len - 4];
589        let stored_crc = u32::from_le_bytes(read_4(record, entry_len - 4)?);
590        let computed_crc = crc32fast::hash(payload);
591
592        if stored_crc != computed_crc {
593            if lenient_tail && record_end >= data.len() {
594                break; // corrupted last entry — discard in recovery mode
595            }
596            return Err(RaftError::StorageError {
597                message: format!(
598                    "CRC mismatch at offset {pos}: stored={stored_crc:#010x}, computed={computed_crc:#010x}"
599                ),
600            });
601        }
602
603        let entry = parse_payload(payload, is_v1, pos)?;
604        entries.push(entry);
605
606        pos = record_end;
607    }
608
609    Ok(entries)
610}
611
612/// Parse a single entry payload (after the outer CRC has already been validated).
613///
614/// v1 layout: term(8) + index(8) + cmd_len(4) + cmd(N)  [no token]
615/// v2 layout: term(8) + index(8) + cmd_len(4) + cmd(N) + fencing_token(8)
616fn parse_payload(payload: &[u8], is_v1: bool, offset: usize) -> RaftResult<LogEntry> {
617    let min_len = if is_v1 { 20 } else { 28 };
618    if payload.len() < min_len {
619        return Err(RaftError::StorageError {
620            message: format!("record payload too short at offset {offset}"),
621        });
622    }
623
624    let term = u64::from_le_bytes(read_8(payload, 0)?);
625    let index = u64::from_le_bytes(read_8(payload, 8)?);
626    let cmd_len = u32::from_le_bytes(read_4(payload, 16)?) as usize;
627
628    let cmd_end = 20 + cmd_len;
629    if payload.len() < cmd_end {
630        return Err(RaftError::StorageError {
631            message: format!("cmd_len exceeds record at offset {offset}"),
632        });
633    }
634
635    let cmd_data = payload[20..cmd_end].to_vec();
636
637    let fencing_token = if is_v1 {
638        0u64
639    } else {
640        if payload.len() < cmd_end + 8 {
641            return Err(RaftError::StorageError {
642                message: format!("missing fencing_token bytes at offset {offset}"),
643            });
644        }
645        u64::from_le_bytes(read_8(payload, cmd_end)?)
646    };
647
648    Ok(LogEntry::with_fencing_token(
649        term,
650        index,
651        Command::new(cmd_data),
652        fencing_token,
653    ))
654}
655
656/// Decode entries from raw bytes with a configurable [`CorruptionPolicy`].
657///
658/// When `lenient_tail` is `true`, a partial trailing entry is silently
659/// discarded regardless of policy.  `is_v1` controls whether the legacy v1
660/// entry format (no fencing token) is expected.
661fn decode_entries_with_policy(
662    data: &[u8],
663    lenient_tail: bool,
664    policy: CorruptionPolicy,
665    segment_id: u32,
666    is_v1: bool,
667) -> RaftResult<(Vec<LogEntry>, WalDiagnostics)> {
668    let mut entries = Vec::new();
669    let mut diag = WalDiagnostics::default();
670    let mut pos = 0;
671    let mut entry_idx: u64 = 0;
672
673    while pos + 4 <= data.len() {
674        let entry_len = u32::from_le_bytes(read_4(data, pos)?) as usize;
675
676        // Do we have enough bytes for the full record?
677        if pos + 4 + entry_len > data.len() {
678            if lenient_tail {
679                diag.truncated_segments += 1;
680                tracing::warn!(
681                    segment_id,
682                    entry_idx,
683                    offset = pos,
684                    "partial trailing entry discarded"
685                );
686                break;
687            }
688            return Err(RaftError::StorageError {
689                message: format!(
690                    "truncated entry at offset {pos}: need {} more bytes",
691                    (pos + 4 + entry_len) - data.len()
692                ),
693            });
694        }
695
696        let record_start = pos + 4;
697        let record_end = record_start + entry_len;
698        let record = &data[record_start..record_end];
699        let record_total_bytes = 4 + entry_len;
700
701        if entry_len < 4 {
702            if lenient_tail && record_end >= data.len() {
703                diag.truncated_segments += 1;
704                break;
705            }
706            return Err(RaftError::StorageError {
707                message: format!("entry_len too small ({entry_len}) at offset {pos}"),
708            });
709        }
710
711        let payload = &record[..entry_len - 4];
712        let stored_crc = u32::from_le_bytes(read_4(record, entry_len - 4)?);
713        let computed_crc = crc32fast::hash(payload);
714
715        if stored_crc != computed_crc {
716            tracing::warn!(
717                segment_id,
718                entry_idx,
719                offset = pos,
720                stored_crc = format_args!("{stored_crc:#010x}"),
721                computed_crc = format_args!("{computed_crc:#010x}"),
722                policy = ?policy,
723                "CRC mismatch detected"
724            );
725            diag.corrupt_entries += 1;
726
727            match policy {
728                CorruptionPolicy::RefuseStart => {
729                    return Err(RaftError::StorageError {
730                        message: format!(
731                            "CRC mismatch at segment {segment_id}, offset {pos}: \
732                             stored={stored_crc:#010x}, computed={computed_crc:#010x}"
733                        ),
734                    });
735                }
736                CorruptionPolicy::TruncateToLastGood => {
737                    tracing::warn!(
738                        segment_id,
739                        entry_idx,
740                        offset = pos,
741                        "truncating WAL at corruption point"
742                    );
743                    break;
744                }
745                CorruptionPolicy::AlertAndContinue => {
746                    tracing::warn!(
747                        segment_id,
748                        entry_idx,
749                        offset = pos,
750                        "skipping corrupted entry (AlertAndContinue)"
751                    );
752                    pos = record_end;
753                    entry_idx += 1;
754                    continue;
755                }
756            }
757        }
758
759        let entry = parse_payload(payload, is_v1, pos)?;
760        entries.push(entry);
761
762        diag.valid_entries += 1;
763        diag.recovered_bytes += record_total_bytes as u64;
764        pos = record_end;
765        entry_idx += 1;
766    }
767
768    Ok((entries, diag))
769}
770
771// ---------------------------------------------------------------------------
772// File / path helpers
773// ---------------------------------------------------------------------------
774
775/// Build the path for a segment file:  `<dir>/wal-<segment_id:08>.seg`
776fn segment_path(dir: &Path, segment_id: u32) -> PathBuf {
777    dir.join(format!("wal-{segment_id:08}.seg"))
778}
779
780/// List segment IDs present in `dir`, sorted ascending.
781fn list_segments(dir: &Path) -> RaftResult<Vec<u32>> {
782    if !dir.exists() {
783        return Ok(Vec::new());
784    }
785
786    let mut ids: Vec<u32> = Vec::new();
787    let read_dir = fs::read_dir(dir).map_err(|e| RaftError::StorageError {
788        message: format!("failed to read WAL dir {}: {e}", dir.display()),
789    })?;
790
791    for entry in read_dir {
792        let entry = entry.map_err(|e| RaftError::StorageError {
793            message: format!("failed to read dir entry: {e}"),
794        })?;
795        let name = entry.file_name();
796        let name_str = name.to_string_lossy();
797        if let Some(id) = parse_segment_name(&name_str) {
798            ids.push(id);
799        }
800    }
801
802    ids.sort_unstable();
803    Ok(ids)
804}
805
806/// Parse a segment filename like `wal-00000003.seg` into the id `3`.
807fn parse_segment_name(name: &str) -> Option<u32> {
808    let rest = name.strip_prefix("wal-")?;
809    let digits = rest.strip_suffix(".seg")?;
810    digits.parse::<u32>().ok()
811}
812
813/// Read an entire segment file into memory.
814fn read_segment_file(path: &Path) -> RaftResult<Vec<u8>> {
815    let mut file = File::open(path).map_err(|e| RaftError::StorageError {
816        message: format!("failed to open segment {}: {e}", path.display()),
817    })?;
818    let mut data = Vec::new();
819    file.read_to_end(&mut data)
820        .map_err(|e| RaftError::StorageError {
821            message: format!("failed to read segment {}: {e}", path.display()),
822        })?;
823    Ok(data)
824}
825
826// ---------------------------------------------------------------------------
827// Byte-reading helpers (same contract as persistence.rs)
828// ---------------------------------------------------------------------------
829
830fn read_4(data: &[u8], offset: usize) -> RaftResult<[u8; 4]> {
831    data.get(offset..offset + 4)
832        .and_then(|s| s.try_into().ok())
833        .ok_or_else(|| RaftError::StorageError {
834            message: format!("unexpected EOF reading 4 bytes at offset {offset}"),
835        })
836}
837
838fn read_8(data: &[u8], offset: usize) -> RaftResult<[u8; 8]> {
839    data.get(offset..offset + 8)
840        .and_then(|s| s.try_into().ok())
841        .ok_or_else(|| RaftError::StorageError {
842            message: format!("unexpected EOF reading 8 bytes at offset {offset}"),
843        })
844}
845
846// ===========================================================================
847// Tests
848// ===========================================================================
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use crate::log::Command;
854
855    /// Helper: create a unique temp directory for a test.
856    fn test_wal_dir(name: &str) -> PathBuf {
857        let dir = std::env::temp_dir().join(format!(
858            "amaters_wal_test_{name}_{}",
859            std::time::SystemTime::now()
860                .duration_since(std::time::UNIX_EPOCH)
861                .map(|d| d.as_nanos())
862                .unwrap_or(0)
863        ));
864        let _ = fs::remove_dir_all(&dir);
865        dir
866    }
867
868    /// Helper: build a simple log entry.
869    fn make_entry(term: u64, index: u64, payload: &str) -> LogEntry {
870        LogEntry::new(term, index, Command::new(payload.as_bytes().to_vec()))
871    }
872
873    #[test]
874    fn test_wal_append_and_read_back() {
875        let dir = test_wal_dir("append_read");
876        let mut writer =
877            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
878
879        for i in 1..=10 {
880            let entry = make_entry(1, i, &format!("cmd-{i}"));
881            writer.append(&entry).expect("append");
882        }
883
884        let reader = WalReader::new(&dir);
885        let entries = reader.read_all().expect("read_all");
886        assert_eq!(entries.len(), 10);
887
888        for (i, entry) in entries.iter().enumerate() {
889            let idx = (i + 1) as u64;
890            assert_eq!(entry.term, 1);
891            assert_eq!(entry.index, idx);
892            assert_eq!(entry.command.data, format!("cmd-{idx}").as_bytes().to_vec());
893        }
894
895        let _ = fs::remove_dir_all(&dir);
896    }
897
898    #[test]
899    fn test_wal_crc_corruption_detection() {
900        let dir = test_wal_dir("crc_corrupt");
901        let mut writer =
902            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
903
904        for i in 1..=5 {
905            writer
906                .append(&make_entry(1, i, &format!("data-{i}")))
907                .expect("append");
908        }
909
910        // Corrupt a byte inside the first segment (after header, inside an entry payload)
911        let segments = list_segments(&dir).expect("list");
912        assert!(!segments.is_empty());
913        let seg_path = segment_path(&dir, segments[0]);
914
915        let mut data = fs::read(&seg_path).expect("read seg");
916        // Flip a byte in the middle of the segment data
917        let corrupt_offset = SEGMENT_HEADER_SIZE + 10;
918        if corrupt_offset < data.len() {
919            data[corrupt_offset] ^= 0xFF;
920        }
921        fs::write(&seg_path, &data).expect("write corrupted");
922
923        // read_all should fail
924        let reader = WalReader::new(&dir);
925        assert!(reader.read_all().is_err());
926
927        // recover should silently truncate or skip the corrupted entry
928        // (since it's not necessarily the last entry, recover may also error
929        // — but if corruption is in the only segment's first entry the
930        // recovered log will simply be empty.)
931        // For a robust test, we accept either Ok([]) or Err.
932        let result = reader.recover();
933        // With corruption in the middle, recover returns Err unless
934        // the corruption is in the tail entry.  Both outcomes are valid.
935        assert!(result.is_ok() || result.is_err());
936
937        let _ = fs::remove_dir_all(&dir);
938    }
939
940    #[test]
941    fn test_wal_segment_rotation() {
942        let dir = test_wal_dir("rotation");
943        // Use a tiny segment size to force frequent rotation
944        let mut writer = WalWriter::new(&dir, SyncMode::EveryWrite, 256).expect("writer");
945
946        for i in 1..=20 {
947            writer
948                .append(&make_entry(1, i, &format!("rot-{i}")))
949                .expect("append");
950        }
951
952        let segments = list_segments(&dir).expect("list");
953        assert!(
954            segments.len() > 1,
955            "expected multiple segments, got {}",
956            segments.len()
957        );
958
959        let reader = WalReader::new(&dir);
960        let entries = reader.read_all().expect("read_all");
961        assert_eq!(entries.len(), 20);
962        for (i, entry) in entries.iter().enumerate() {
963            assert_eq!(entry.index, (i + 1) as u64);
964        }
965
966        let _ = fs::remove_dir_all(&dir);
967    }
968
969    #[test]
970    fn test_wal_crash_recovery() {
971        let dir = test_wal_dir("crash_recovery");
972        let mut writer =
973            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
974
975        for i in 1..=5 {
976            writer
977                .append(&make_entry(1, i, &format!("ok-{i}")))
978                .expect("append");
979        }
980
981        // Simulate a crash: append a partial entry (truncated bytes) to
982        // the segment file.
983        let segments = list_segments(&dir).expect("list");
984        let seg_path = segment_path(&dir, *segments.last().expect("last seg"));
985
986        {
987            let mut f = OpenOptions::new()
988                .append(true)
989                .open(&seg_path)
990                .expect("open for partial write");
991            // Write a partial entry header (entry_len says 100 but we only
992            // write 6 bytes of payload — incomplete).
993            let fake_len: u32 = 100;
994            f.write_all(&fake_len.to_le_bytes())
995                .expect("write partial len");
996            f.write_all(&[0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE])
997                .expect("write partial data");
998        }
999
1000        // read_all should fail (strict mode sees truncated entry)
1001        let reader = WalReader::new(&dir);
1002        assert!(reader.read_all().is_err());
1003
1004        // recover should return only the 5 complete entries
1005        let recovered = reader.recover().expect("recover");
1006        assert_eq!(recovered.len(), 5);
1007        for (i, entry) in recovered.iter().enumerate() {
1008            assert_eq!(entry.index, (i + 1) as u64);
1009        }
1010
1011        let _ = fs::remove_dir_all(&dir);
1012    }
1013
1014    #[test]
1015    fn test_wal_empty_startup() {
1016        let dir = test_wal_dir("empty");
1017        let _writer =
1018            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1019
1020        let reader = WalReader::new(&dir);
1021        let entries = reader.read_all().expect("read_all");
1022        assert!(entries.is_empty());
1023
1024        // recover also returns empty
1025        let recovered = reader.recover().expect("recover");
1026        assert!(recovered.is_empty());
1027
1028        let _ = fs::remove_dir_all(&dir);
1029    }
1030
1031    #[test]
1032    fn test_wal_truncate_from() {
1033        let dir = test_wal_dir("truncate");
1034        let mut writer =
1035            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1036
1037        for i in 1..=10 {
1038            writer
1039                .append(&make_entry(1, i, &format!("entry-{i}")))
1040                .expect("append");
1041        }
1042
1043        writer.truncate_from(6).expect("truncate_from(6)");
1044
1045        let reader = WalReader::new(&dir);
1046        let entries = reader.read_all().expect("read_all");
1047        assert_eq!(entries.len(), 5);
1048        for (i, entry) in entries.iter().enumerate() {
1049            let idx = (i + 1) as u64;
1050            assert_eq!(entry.index, idx);
1051            assert_eq!(
1052                entry.command.data,
1053                format!("entry-{idx}").as_bytes().to_vec()
1054            );
1055        }
1056
1057        let _ = fs::remove_dir_all(&dir);
1058    }
1059
1060    // -----------------------------------------------------------------------
1061    // Corruption-policy tests
1062    // -----------------------------------------------------------------------
1063
1064    /// Helper: write `count` entries and return the segment path.
1065    fn write_entries(dir: &Path, count: u64) -> PathBuf {
1066        let mut writer =
1067            WalWriter::new(dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1068        for i in 1..=count {
1069            writer
1070                .append(&make_entry(1, i, &format!("payload-{i}")))
1071                .expect("append");
1072        }
1073        let segs = list_segments(dir).expect("list");
1074        segment_path(dir, *segs.last().expect("segment"))
1075    }
1076
1077    /// Corrupt a single byte inside entry `entry_number` (1-based) of the
1078    /// first segment.  This targets the payload region so the CRC will not
1079    /// match.
1080    fn corrupt_entry_n(seg_path: &Path, entry_number: usize) {
1081        let mut data = fs::read(seg_path).expect("read segment");
1082        let mut pos = SEGMENT_HEADER_SIZE;
1083        for n in 1..=entry_number {
1084            let entry_len =
1085                u32::from_le_bytes(data[pos..pos + 4].try_into().expect("4 bytes")) as usize;
1086            if n == entry_number {
1087                // Flip a byte inside the payload of this entry
1088                let payload_start = pos + 4;
1089                let flip_offset = payload_start + 2;
1090                data[flip_offset] ^= 0xFF;
1091                break;
1092            }
1093            pos += 4 + entry_len;
1094        }
1095        fs::write(seg_path, &data).expect("write corrupted");
1096    }
1097
1098    // --- Spec-named B2 tests ---
1099
1100    #[test]
1101    fn test_wal_corrupted_refuse_start() {
1102        let dir = test_wal_dir("wal_corrupted_refuse_start");
1103        let seg_path = write_entries(&dir, 5);
1104        corrupt_entry_n(&seg_path, 3);
1105
1106        let reader = WalReader::new(&dir);
1107        let result = reader.recover_with_policy(CorruptionPolicy::RefuseStart);
1108        assert!(
1109            result.is_err(),
1110            "RefuseStart should return error on corruption"
1111        );
1112
1113        let _ = fs::remove_dir_all(&dir);
1114    }
1115
1116    #[test]
1117    fn test_wal_corrupted_truncate() {
1118        let dir = test_wal_dir("wal_corrupted_truncate");
1119        let seg_path = write_entries(&dir, 5);
1120        // Corrupt entry 3 (entries 1,2 should survive)
1121        corrupt_entry_n(&seg_path, 3);
1122
1123        let reader = WalReader::new(&dir);
1124        let (entries, diag) = reader
1125            .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1126            .expect("recover");
1127
1128        assert_eq!(
1129            entries.len(),
1130            2,
1131            "TruncateToLastGood: keep entries before corruption"
1132        );
1133        assert_eq!(diag.valid_entries, 2);
1134        assert_eq!(diag.corrupt_entries, 1);
1135        assert!(diag.recovered_bytes > 0);
1136
1137        let _ = fs::remove_dir_all(&dir);
1138    }
1139
1140    #[test]
1141    fn test_wal_corrupted_alert_continue() {
1142        let dir = test_wal_dir("wal_corrupted_alert_continue");
1143        let seg_path = write_entries(&dir, 5);
1144        // Corrupt entry 2 — entries 1, 3, 4, 5 should survive
1145        corrupt_entry_n(&seg_path, 2);
1146
1147        let reader = WalReader::new(&dir);
1148        let (entries, diag) = reader
1149            .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1150            .expect("recover");
1151
1152        assert_eq!(
1153            entries.len(),
1154            4,
1155            "AlertAndContinue: skip only the corrupted entry"
1156        );
1157        assert_eq!(diag.corrupt_entries, 1);
1158        assert_eq!(diag.valid_entries, 4);
1159
1160        // Verify the indices are 1, 3, 4, 5
1161        let indices: Vec<u64> = entries.iter().map(|e| e.index).collect();
1162        assert_eq!(indices, vec![1, 3, 4, 5]);
1163
1164        let _ = fs::remove_dir_all(&dir);
1165    }
1166
1167    // --- Extended corruption-policy tests ---
1168
1169    #[test]
1170    fn test_corruption_policy_refuse_start_inner() {
1171        let dir = test_wal_dir("corruption_refuse_start_inner");
1172        let seg_path = write_entries(&dir, 5);
1173        corrupt_entry_n(&seg_path, 3);
1174
1175        let reader = WalReader::new(&dir);
1176        let result = reader.recover_with_policy(CorruptionPolicy::RefuseStart);
1177        assert!(
1178            result.is_err(),
1179            "RefuseStart should return error on corruption"
1180        );
1181
1182        let _ = fs::remove_dir_all(&dir);
1183    }
1184
1185    #[test]
1186    fn test_corruption_policy_truncate_to_last_good() {
1187        let dir = test_wal_dir("corruption_truncate_last_good");
1188        let seg_path = write_entries(&dir, 5);
1189        // Corrupt entry 3 (entries 1,2 should survive)
1190        corrupt_entry_n(&seg_path, 3);
1191
1192        let reader = WalReader::new(&dir);
1193        let (entries, diag) = reader
1194            .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1195            .expect("recover");
1196
1197        assert_eq!(entries.len(), 2, "should keep entries before corruption");
1198        assert_eq!(diag.valid_entries, 2);
1199        assert_eq!(diag.corrupt_entries, 1);
1200        assert!(diag.recovered_bytes > 0);
1201
1202        let _ = fs::remove_dir_all(&dir);
1203    }
1204
1205    #[test]
1206    fn test_corruption_policy_alert_and_continue() {
1207        let dir = test_wal_dir("corruption_alert_continue");
1208        let seg_path = write_entries(&dir, 5);
1209        // Corrupt entry 2 — entries 1, 3, 4, 5 should survive
1210        corrupt_entry_n(&seg_path, 2);
1211
1212        let reader = WalReader::new(&dir);
1213        let (entries, diag) = reader
1214            .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1215            .expect("recover");
1216
1217        assert_eq!(entries.len(), 4, "should skip only the corrupted entry");
1218        assert_eq!(diag.corrupt_entries, 1);
1219        assert_eq!(diag.valid_entries, 4);
1220
1221        // Verify the indices are 1, 3, 4, 5
1222        let indices: Vec<u64> = entries.iter().map(|e| e.index).collect();
1223        assert_eq!(indices, vec![1, 3, 4, 5]);
1224
1225        let _ = fs::remove_dir_all(&dir);
1226    }
1227
1228    #[test]
1229    fn test_corruption_policy_first_entry() {
1230        let dir = test_wal_dir("corruption_first");
1231        let seg_path = write_entries(&dir, 5);
1232        corrupt_entry_n(&seg_path, 1);
1233
1234        let reader = WalReader::new(&dir);
1235
1236        // AlertAndContinue: entries 2..5 survive
1237        let (entries, diag) = reader
1238            .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1239            .expect("recover");
1240        assert_eq!(entries.len(), 4);
1241        assert_eq!(diag.corrupt_entries, 1);
1242
1243        // TruncateToLastGood: nothing survives
1244        let (entries, diag) = reader
1245            .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1246            .expect("recover");
1247        assert_eq!(entries.len(), 0);
1248        assert_eq!(diag.corrupt_entries, 1);
1249
1250        let _ = fs::remove_dir_all(&dir);
1251    }
1252
1253    #[test]
1254    fn test_corruption_policy_last_entry() {
1255        let dir = test_wal_dir("corruption_last");
1256        let seg_path = write_entries(&dir, 5);
1257        corrupt_entry_n(&seg_path, 5);
1258
1259        let reader = WalReader::new(&dir);
1260
1261        // TruncateToLastGood: entries 1..4 survive
1262        let (entries, diag) = reader
1263            .recover_with_policy(CorruptionPolicy::TruncateToLastGood)
1264            .expect("recover");
1265        assert_eq!(entries.len(), 4);
1266        assert_eq!(diag.corrupt_entries, 1);
1267
1268        // AlertAndContinue: entries 1..4 survive
1269        let (entries, diag) = reader
1270            .recover_with_policy(CorruptionPolicy::AlertAndContinue)
1271            .expect("recover");
1272        assert_eq!(entries.len(), 4);
1273        assert_eq!(diag.corrupt_entries, 1);
1274
1275        let _ = fs::remove_dir_all(&dir);
1276    }
1277
1278    #[test]
1279    fn test_corruption_diagnostics_no_corruption() {
1280        let dir = test_wal_dir("diag_clean");
1281        write_entries(&dir, 10);
1282
1283        let reader = WalReader::new(&dir);
1284        let (entries, diag) = reader
1285            .recover_with_policy(CorruptionPolicy::RefuseStart)
1286            .expect("recover");
1287        assert_eq!(entries.len(), 10);
1288        assert_eq!(diag.valid_entries, 10);
1289        assert_eq!(diag.corrupt_entries, 0);
1290        assert_eq!(diag.truncated_segments, 0);
1291        assert!(diag.recovered_bytes > 0);
1292
1293        let _ = fs::remove_dir_all(&dir);
1294    }
1295
1296    #[test]
1297    fn test_corruption_recover_backward_compat() {
1298        // The old recover() should still work and use TruncateToLastGood
1299        let dir = test_wal_dir("corruption_compat");
1300        let seg_path = write_entries(&dir, 5);
1301        corrupt_entry_n(&seg_path, 3);
1302
1303        let reader = WalReader::new(&dir);
1304        let entries = reader.recover().expect("recover");
1305        // TruncateToLastGood: entries 1, 2
1306        assert_eq!(entries.len(), 2);
1307
1308        let _ = fs::remove_dir_all(&dir);
1309    }
1310
1311    #[test]
1312    fn test_wal_sync_modes() {
1313        // EveryWrite
1314        {
1315            let dir = test_wal_dir("sync_every");
1316            let mut writer = WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE)
1317                .expect("writer");
1318            for i in 1..=5 {
1319                writer.append(&make_entry(1, i, "a")).expect("append");
1320            }
1321            let reader = WalReader::new(&dir);
1322            assert_eq!(reader.read_all().expect("read").len(), 5);
1323            let _ = fs::remove_dir_all(&dir);
1324        }
1325
1326        // OsManaged
1327        {
1328            let dir = test_wal_dir("sync_os");
1329            let mut writer = WalWriter::new(&dir, SyncMode::OsManaged, DEFAULT_MAX_SEGMENT_SIZE)
1330                .expect("writer");
1331            for i in 1..=5 {
1332                writer.append(&make_entry(1, i, "b")).expect("append");
1333            }
1334            writer.sync().expect("manual sync");
1335            let reader = WalReader::new(&dir);
1336            assert_eq!(reader.read_all().expect("read").len(), 5);
1337            let _ = fs::remove_dir_all(&dir);
1338        }
1339
1340        // Batched
1341        {
1342            let dir = test_wal_dir("sync_batched");
1343            let mut writer = WalWriter::new(&dir, SyncMode::Batched(3), DEFAULT_MAX_SEGMENT_SIZE)
1344                .expect("writer");
1345            for i in 1..=7 {
1346                writer.append(&make_entry(1, i, "c")).expect("append");
1347            }
1348            writer.sync().expect("final sync");
1349            let reader = WalReader::new(&dir);
1350            assert_eq!(reader.read_all().expect("read").len(), 7);
1351            let _ = fs::remove_dir_all(&dir);
1352        }
1353    }
1354
1355    // -----------------------------------------------------------------------
1356    // B3 – Fencing token WAL v2 tests
1357    // -----------------------------------------------------------------------
1358
1359    /// Write an entry carrying a specific fencing token, then read it back.
1360    #[test]
1361    fn test_wal_v2_fencing_token_roundtrip() {
1362        use crate::log::Command;
1363        let dir = test_wal_dir("v2_token_roundtrip");
1364        let mut writer =
1365            WalWriter::new(&dir, SyncMode::EveryWrite, DEFAULT_MAX_SEGMENT_SIZE).expect("writer");
1366
1367        let token_raw: u64 = ((3u64) << 32) | 7u64; // term=3, seq=7
1368        let entry = LogEntry::with_fencing_token(1, 1, Command::new(b"hello".to_vec()), token_raw);
1369        writer.append(&entry).expect("append");
1370
1371        let reader = WalReader::new(&dir);
1372        let entries = reader.read_all().expect("read_all");
1373        assert_eq!(entries.len(), 1);
1374        assert_eq!(entries[0].fencing_token, token_raw);
1375        assert_eq!(entries[0].command.data, b"hello");
1376
1377        let _ = fs::remove_dir_all(&dir);
1378    }
1379
1380    /// WAL v1 compat: hand-craft a v1 segment (no token) and verify it loads.
1381    #[test]
1382    fn test_wal_v1_backward_compat_read() {
1383        let dir = test_wal_dir("v1_compat");
1384        fs::create_dir_all(&dir).expect("mkdir");
1385        let seg_path = dir.join("wal-00000000.seg");
1386
1387        // Build a minimal v1 segment
1388        let mut buf: Vec<u8> = Vec::new();
1389
1390        // Segment header: magic=WAL1, version=1, segment_id=0
1391        buf.extend_from_slice(&WAL_MAGIC_V1.to_le_bytes()); // magic
1392        buf.extend_from_slice(&WAL_VERSION_V1.to_le_bytes()); // version
1393        buf.extend_from_slice(&0u32.to_le_bytes()); // segment_id
1394
1395        // One v1 entry: [entry_len:4][term:8][index:8][cmd_len:4][cmd:N][crc32:4]
1396        let cmd = b"v1cmd";
1397        let term: u64 = 1;
1398        let index: u64 = 1;
1399
1400        let mut payload: Vec<u8> = Vec::new();
1401        payload.extend_from_slice(&term.to_le_bytes());
1402        payload.extend_from_slice(&index.to_le_bytes());
1403        payload.extend_from_slice(&(cmd.len() as u32).to_le_bytes());
1404        payload.extend_from_slice(cmd);
1405        let crc = crc32fast::hash(&payload);
1406
1407        let entry_len = (payload.len() + 4) as u32; // +4 for CRC
1408        buf.extend_from_slice(&entry_len.to_le_bytes());
1409        buf.extend_from_slice(&payload);
1410        buf.extend_from_slice(&crc.to_le_bytes());
1411
1412        fs::write(&seg_path, &buf).expect("write v1 segment");
1413
1414        let reader = WalReader::new(&dir);
1415        let entries = reader.read_all().expect("read v1 segment");
1416        assert_eq!(entries.len(), 1);
1417        assert_eq!(entries[0].term, 1);
1418        assert_eq!(entries[0].index, 1);
1419        assert_eq!(entries[0].command.data, b"v1cmd");
1420        // Token should be 0 for v1 entries
1421        assert_eq!(entries[0].fencing_token, 0);
1422
1423        let _ = fs::remove_dir_all(&dir);
1424    }
1425}