Skip to main content

ember_cluster/
raft_log.rs

1//! Persistent storage for Raft state.
2//!
3//! Provides disk-backed persistence for vote metadata, log entries, and
4//! snapshots so that a cluster node can recover its Raft state after a
5//! restart without re-bootstrapping.
6//!
7//! # File layout
8//!
9//! Three files live inside a `raft/` directory under the configured data dir:
10//!
11//! | file | purpose | write pattern |
12//! |------|---------|---------------|
13//! | `raft-meta` | vote + last_purged LogId | atomic rewrite |
14//! | `raft-log` | append-only log entries | append + fsync |
15//! | `raft-snapshot` | latest snapshot blob | atomic rewrite |
16//!
17//! Each file starts with a 5-byte header (`[magic:4B][version:1B]`) followed
18//! by length-prefixed, CRC32-checksummed records encoded with postcard.
19
20use std::collections::BTreeMap;
21use std::fs::{self, File, OpenOptions};
22use std::io::{self, BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24
25use crc32fast::Hasher;
26use openraft::{Entry, LogId, SnapshotMeta, Vote};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tracing::{debug, warn};
30
31use crate::raft::TypeConfig;
32
33// ---------------------------------------------------------------------------
34// constants
35// ---------------------------------------------------------------------------
36
37const META_MAGIC: &[u8; 4] = b"ERMT";
38const LOG_MAGIC: &[u8; 4] = b"ERLO";
39const SNAP_MAGIC: &[u8; 4] = b"ERSS";
40const FORMAT_VERSION: u8 = 1;
41
42/// Maximum record payload size (64 MB). Prevents corrupt length fields from
43/// causing unbounded allocations.
44const MAX_RECORD_SIZE: u32 = 64 * 1024 * 1024;
45
46// ---------------------------------------------------------------------------
47// error type
48// ---------------------------------------------------------------------------
49
50#[derive(Debug, Error)]
51pub enum RaftDiskError {
52    #[error("io error: {0}")]
53    Io(#[from] io::Error),
54
55    #[error("invalid magic bytes in {file}")]
56    InvalidMagic { file: &'static str },
57
58    #[error("unsupported format version {version} in {file}")]
59    UnsupportedVersion { file: &'static str, version: u8 },
60
61    #[error("crc32 mismatch (expected {expected:#010x}, got {actual:#010x})")]
62    ChecksumMismatch { expected: u32, actual: u32 },
63
64    #[error("postcard error: {0}")]
65    Postcard(String),
66
67    #[error("record payload too large ({size} bytes, max {MAX_RECORD_SIZE})")]
68    RecordTooLarge { size: u32 },
69}
70
71impl From<RaftDiskError> for openraft::StorageError<u64> {
72    fn from(e: RaftDiskError) -> Self {
73        openraft::StorageIOError::write(&e).into()
74    }
75}
76
77// ---------------------------------------------------------------------------
78// on-disk record types
79// ---------------------------------------------------------------------------
80
81/// Metadata persisted in the `raft-meta` file.
82#[derive(Debug, Clone, Serialize, Deserialize, Default)]
83struct MetaRecord {
84    vote: Option<Vote<u64>>,
85    last_purged: Option<LogId<u64>>,
86}
87
88/// Snapshot envelope persisted in the `raft-snapshot` file.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90struct SnapshotRecord {
91    meta: SnapshotMeta<u64, openraft::BasicNode>,
92    data: Vec<u8>,
93}
94
95// ---------------------------------------------------------------------------
96// recovered state (returned to Storage::open)
97// ---------------------------------------------------------------------------
98
99/// Everything recovered from disk, ready to populate the in-memory Storage.
100#[derive(Debug, Default)]
101pub(crate) struct RecoveredState {
102    pub vote: Option<Vote<u64>>,
103    pub last_purged: Option<LogId<u64>>,
104    pub log: BTreeMap<u64, Entry<TypeConfig>>,
105    pub snapshot: Option<(SnapshotMeta<u64, openraft::BasicNode>, Vec<u8>)>,
106}
107
108// ---------------------------------------------------------------------------
109// RaftDisk
110// ---------------------------------------------------------------------------
111
112/// Disk-backed persistence for Raft state.
113///
114/// All writes go through `std::sync::Mutex` in the caller (Storage).
115/// Methods here are intentionally synchronous — they do blocking I/O
116/// but are only called on the Raft consensus path, not per client request.
117pub(crate) struct RaftDisk {
118    dir: PathBuf,
119    log_file: File,
120}
121
122impl std::fmt::Debug for RaftDisk {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("RaftDisk").field("dir", &self.dir).finish()
125    }
126}
127
128impl RaftDisk {
129    /// Opens (or creates) the raft persistence directory and recovers state.
130    ///
131    /// On a fresh start all three files are created with just their headers.
132    /// On recovery, existing files are read and any trailing corruption in
133    /// the log file is silently truncated (incomplete last write).
134    pub fn open(dir: &Path) -> Result<(Self, RecoveredState), RaftDiskError> {
135        fs::create_dir_all(dir)?;
136
137        let meta_path = dir.join("raft-meta");
138        let log_path = dir.join("raft-log");
139        let snap_path = dir.join("raft-snapshot");
140
141        // recover meta
142        let meta = if meta_path.exists() {
143            read_meta_file(&meta_path)?
144        } else {
145            write_meta_file(&meta_path, &MetaRecord::default())?;
146            MetaRecord::default()
147        };
148
149        // recover snapshot
150        let snapshot = if snap_path.exists() {
151            match read_snapshot_file(&snap_path) {
152                Ok(rec) => Some((rec.meta, rec.data)),
153                Err(e) => {
154                    warn!("raft snapshot file corrupt, ignoring: {e}");
155                    None
156                }
157            }
158        } else {
159            None
160        };
161
162        // recover log entries
163        let (log, valid_len) = if log_path.exists() {
164            recover_log_file(&log_path)?
165        } else {
166            (BTreeMap::new(), 0)
167        };
168
169        // open log file for appending (truncate any trailing corruption)
170        let log_file = if log_path.exists() {
171            let file = OpenOptions::new().write(true).open(&log_path)?;
172            file.set_len(valid_len)?;
173            file.sync_all()?;
174            // reopen in append mode
175            OpenOptions::new().append(true).open(&log_path)?
176        } else {
177            let mut file = File::create(&log_path)?;
178            write_header(&mut file, LOG_MAGIC)?;
179            file.sync_all()?;
180            OpenOptions::new().append(true).open(&log_path)?
181        };
182
183        let recovered = RecoveredState {
184            vote: meta.vote,
185            last_purged: meta.last_purged,
186            log,
187            snapshot,
188        };
189
190        debug!(
191            "raft disk recovered: vote={:?}, last_purged={:?}, log_entries={}, has_snapshot={}",
192            recovered.vote,
193            recovered.last_purged,
194            recovered.log.len(),
195            recovered.snapshot.is_some(),
196        );
197
198        Ok((
199            Self {
200                dir: dir.to_path_buf(),
201                log_file,
202            },
203            recovered,
204        ))
205    }
206
207    /// Persists the current vote and last_purged log ID.
208    pub fn write_meta(
209        &self,
210        vote: Option<Vote<u64>>,
211        last_purged: Option<LogId<u64>>,
212    ) -> Result<(), RaftDiskError> {
213        let record = MetaRecord { vote, last_purged };
214        write_meta_file(&self.dir.join("raft-meta"), &record)
215    }
216
217    /// Appends log entries to the log file.
218    pub fn append_entries(&mut self, entries: &[Entry<TypeConfig>]) -> Result<(), RaftDiskError> {
219        for entry in entries {
220            write_record(&mut self.log_file, entry)?;
221        }
222        self.log_file.flush()?;
223        Ok(())
224    }
225
226    /// Rewrites the log file with only the given entries.
227    ///
228    /// Used after purge or truncation to compact the on-disk log.
229    pub fn rewrite_log(
230        &mut self,
231        entries: &BTreeMap<u64, Entry<TypeConfig>>,
232    ) -> Result<(), RaftDiskError> {
233        let log_path = self.dir.join("raft-log");
234        let tmp_path = self.dir.join("raft-log.tmp");
235
236        let mut file = File::create(&tmp_path)?;
237        write_header(&mut file, LOG_MAGIC)?;
238        for entry in entries.values() {
239            write_record(&mut file, entry)?;
240        }
241        file.flush()?;
242        file.sync_all()?;
243
244        fs::rename(&tmp_path, &log_path)?;
245
246        // reopen for appending
247        self.log_file = OpenOptions::new().append(true).open(&log_path)?;
248        Ok(())
249    }
250
251    /// Atomically writes the latest snapshot to disk.
252    pub fn write_snapshot(
253        &self,
254        meta: &SnapshotMeta<u64, openraft::BasicNode>,
255        data: &[u8],
256    ) -> Result<(), RaftDiskError> {
257        let snap_path = self.dir.join("raft-snapshot");
258        let tmp_path = self.dir.join("raft-snapshot.tmp");
259
260        let record = SnapshotRecord {
261            meta: meta.clone(),
262            data: data.to_vec(),
263        };
264
265        let mut file = File::create(&tmp_path)?;
266        write_header(&mut file, SNAP_MAGIC)?;
267        write_record(&mut file, &record)?;
268        file.flush()?;
269        file.sync_all()?;
270
271        fs::rename(&tmp_path, &snap_path)?;
272        Ok(())
273    }
274}
275
276// ---------------------------------------------------------------------------
277// binary format helpers
278// ---------------------------------------------------------------------------
279
280fn crc32(data: &[u8]) -> u32 {
281    let mut h = Hasher::new();
282    h.update(data);
283    h.finalize()
284}
285
286fn write_header(w: &mut impl Write, magic: &[u8; 4]) -> Result<(), RaftDiskError> {
287    w.write_all(magic)?;
288    w.write_all(&[FORMAT_VERSION])?;
289    Ok(())
290}
291
292fn read_header(
293    r: &mut impl Read,
294    expected_magic: &[u8; 4],
295    file_name: &'static str,
296) -> Result<(), RaftDiskError> {
297    let mut magic = [0u8; 4];
298    r.read_exact(&mut magic).map_err(|e| {
299        if e.kind() == io::ErrorKind::UnexpectedEof {
300            RaftDiskError::InvalidMagic { file: file_name }
301        } else {
302            RaftDiskError::Io(e)
303        }
304    })?;
305    if &magic != expected_magic {
306        return Err(RaftDiskError::InvalidMagic { file: file_name });
307    }
308    let mut ver = [0u8; 1];
309    r.read_exact(&mut ver)?;
310    if ver[0] != FORMAT_VERSION {
311        return Err(RaftDiskError::UnsupportedVersion {
312            file: file_name,
313            version: ver[0],
314        });
315    }
316    Ok(())
317}
318
319/// Writes a single record: `[payload_len:4B LE][payload:postcard][crc32:4B LE]`
320fn write_record<T: Serialize>(w: &mut impl Write, value: &T) -> Result<(), RaftDiskError> {
321    let payload =
322        postcard::to_allocvec(value).map_err(|e| RaftDiskError::Postcard(e.to_string()))?;
323    let len = payload.len() as u32;
324    w.write_all(&len.to_le_bytes())?;
325    w.write_all(&payload)?;
326    let checksum = crc32(&payload);
327    w.write_all(&checksum.to_le_bytes())?;
328    Ok(())
329}
330
331/// Reads a single record, returning the deserialized value.
332/// Returns `None` on clean EOF (no bytes available).
333fn read_record<T: for<'de> Deserialize<'de>>(
334    r: &mut impl Read,
335) -> Result<Option<T>, RaftDiskError> {
336    // try to read the length prefix — EOF here is clean
337    let mut len_buf = [0u8; 4];
338    match r.read_exact(&mut len_buf) {
339        Ok(()) => {}
340        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
341        Err(e) => return Err(RaftDiskError::Io(e)),
342    }
343    let len = u32::from_le_bytes(len_buf);
344    if len > MAX_RECORD_SIZE {
345        return Err(RaftDiskError::RecordTooLarge { size: len });
346    }
347
348    let mut payload = vec![0u8; len as usize];
349    r.read_exact(&mut payload)?;
350
351    let mut crc_buf = [0u8; 4];
352    r.read_exact(&mut crc_buf)?;
353    let stored_crc = u32::from_le_bytes(crc_buf);
354
355    let computed_crc = crc32(&payload);
356    if computed_crc != stored_crc {
357        return Err(RaftDiskError::ChecksumMismatch {
358            expected: stored_crc,
359            actual: computed_crc,
360        });
361    }
362
363    let value =
364        postcard::from_bytes(&payload).map_err(|e| RaftDiskError::Postcard(e.to_string()))?;
365    Ok(Some(value))
366}
367
368// ---------------------------------------------------------------------------
369// file-level read/write
370// ---------------------------------------------------------------------------
371
372fn write_meta_file(path: &Path, record: &MetaRecord) -> Result<(), RaftDiskError> {
373    let tmp_path = path.with_extension("tmp");
374    let mut file = BufWriter::new(File::create(&tmp_path)?);
375    write_header(&mut file, META_MAGIC)?;
376    write_record(&mut file, record)?;
377    file.flush()?;
378    file.into_inner()
379        .map_err(|e| RaftDiskError::Io(e.into_error()))?
380        .sync_all()?;
381    fs::rename(&tmp_path, path)?;
382    Ok(())
383}
384
385fn read_meta_file(path: &Path) -> Result<MetaRecord, RaftDiskError> {
386    let mut reader = BufReader::new(File::open(path)?);
387    read_header(&mut reader, META_MAGIC, "raft-meta")?;
388    match read_record(&mut reader)? {
389        Some(record) => Ok(record),
390        None => Ok(MetaRecord::default()),
391    }
392}
393
394fn read_snapshot_file(path: &Path) -> Result<SnapshotRecord, RaftDiskError> {
395    let mut reader = BufReader::new(File::open(path)?);
396    read_header(&mut reader, SNAP_MAGIC, "raft-snapshot")?;
397    match read_record(&mut reader)? {
398        Some(record) => Ok(record),
399        None => Err(RaftDiskError::Io(io::Error::new(
400            io::ErrorKind::UnexpectedEof,
401            "empty snapshot file",
402        ))),
403    }
404}
405
406/// Recovers log entries from the log file. Returns the entries and the byte
407/// offset of the last valid record (used to truncate trailing corruption).
408fn recover_log_file(path: &Path) -> Result<(BTreeMap<u64, Entry<TypeConfig>>, u64), RaftDiskError> {
409    let file = File::open(path)?;
410    let file_len = file.metadata()?.len();
411    let mut reader = BufReader::new(file);
412
413    // read header (5 bytes)
414    if file_len < 5 {
415        // file too short — treat as empty
416        return Ok((BTreeMap::new(), 0));
417    }
418    read_header(&mut reader, LOG_MAGIC, "raft-log")?;
419
420    let mut entries = BTreeMap::new();
421    let mut valid_pos: u64 = 5; // after header
422
423    loop {
424        // try reading the next record — EOF is clean
425        let remaining = file_len.saturating_sub(valid_pos);
426        if remaining == 0 {
427            break;
428        }
429
430        match read_record::<Entry<TypeConfig>>(&mut reader) {
431            Ok(Some(entry)) => {
432                let payload_bytes = postcard::to_allocvec(&entry)
433                    .map_err(|e| RaftDiskError::Postcard(e.to_string()))?
434                    .len() as u64;
435                // record size: 4 (len) + payload + 4 (crc)
436                valid_pos += 4 + payload_bytes + 4;
437                entries.insert(entry.log_id.index, entry);
438            }
439            Ok(None) => break,
440            Err(e) => {
441                // corrupt or incomplete record — truncate here
442                warn!("raft log truncated at offset {valid_pos}: {e}");
443                break;
444            }
445        }
446    }
447
448    Ok((entries, valid_pos))
449}
450
451// ---------------------------------------------------------------------------
452// tests
453// ---------------------------------------------------------------------------
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use crate::raft::ClusterCommand;
459    use crate::topology::NodeId;
460    use openraft::{CommittedLeaderId, EntryPayload, StoredMembership};
461
462    fn log_id(term: u64, index: u64) -> LogId<u64> {
463        LogId::new(CommittedLeaderId::new(term, 0), index)
464    }
465
466    fn test_entry(term: u64, index: u64) -> Entry<TypeConfig> {
467        Entry {
468            log_id: log_id(term, index),
469            payload: EntryPayload::Blank,
470        }
471    }
472
473    fn test_entry_with_data(term: u64, index: u64) -> Entry<TypeConfig> {
474        Entry {
475            log_id: log_id(term, index),
476            payload: EntryPayload::Normal(ClusterCommand::AddNode {
477                node_id: NodeId::new(),
478                raft_id: index,
479                addr: "127.0.0.1:6379".to_string(),
480                is_primary: true,
481            }),
482        }
483    }
484
485    #[test]
486    fn meta_round_trip() {
487        let dir = tempfile::tempdir().unwrap();
488        let path = dir.path().join("raft-meta");
489
490        let record = MetaRecord {
491            vote: Some(Vote::new(1, 2)),
492            last_purged: Some(log_id(1, 5)),
493        };
494        write_meta_file(&path, &record).unwrap();
495
496        let recovered = read_meta_file(&path).unwrap();
497        assert_eq!(recovered.vote, record.vote);
498        assert_eq!(recovered.last_purged, record.last_purged);
499    }
500
501    #[test]
502    fn meta_default_round_trip() {
503        let dir = tempfile::tempdir().unwrap();
504        let path = dir.path().join("raft-meta");
505
506        write_meta_file(&path, &MetaRecord::default()).unwrap();
507        let recovered = read_meta_file(&path).unwrap();
508        assert!(recovered.vote.is_none());
509        assert!(recovered.last_purged.is_none());
510    }
511
512    #[test]
513    fn log_round_trip() {
514        let dir = tempfile::tempdir().unwrap();
515        let (mut disk, recovered) = RaftDisk::open(dir.path()).unwrap();
516        assert!(recovered.log.is_empty());
517
518        let entries = vec![
519            test_entry(1, 1),
520            test_entry(1, 2),
521            test_entry_with_data(1, 3),
522        ];
523        disk.append_entries(&entries).unwrap();
524
525        // reopen and recover
526        let (_disk2, recovered2) = RaftDisk::open(dir.path()).unwrap();
527        assert_eq!(recovered2.log.len(), 3);
528        assert!(recovered2.log.contains_key(&1));
529        assert!(recovered2.log.contains_key(&2));
530        assert!(recovered2.log.contains_key(&3));
531    }
532
533    #[test]
534    fn log_truncation_on_corruption() {
535        let dir = tempfile::tempdir().unwrap();
536        let log_path = dir.path().join("raft-log");
537
538        // write two valid entries
539        {
540            let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
541            disk.append_entries(&[test_entry(1, 1), test_entry(1, 2)])
542                .unwrap();
543        }
544
545        // append garbage to the log file
546        {
547            let mut file = OpenOptions::new().append(true).open(&log_path).unwrap();
548            file.write_all(b"GARBAGE_BYTES").unwrap();
549        }
550
551        // recover — should get the two valid entries and truncate the garbage
552        let (_disk, recovered) = RaftDisk::open(dir.path()).unwrap();
553        assert_eq!(recovered.log.len(), 2);
554    }
555
556    #[test]
557    fn log_rewrite() {
558        let dir = tempfile::tempdir().unwrap();
559        let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
560
561        disk.append_entries(&[test_entry(1, 1), test_entry(1, 2), test_entry(1, 3)])
562            .unwrap();
563
564        // rewrite with only entry 3
565        let mut remaining = BTreeMap::new();
566        remaining.insert(3, test_entry(1, 3));
567        disk.rewrite_log(&remaining).unwrap();
568
569        // recover
570        let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
571        assert_eq!(recovered.log.len(), 1);
572        assert!(recovered.log.contains_key(&3));
573    }
574
575    #[test]
576    fn snapshot_round_trip() {
577        let dir = tempfile::tempdir().unwrap();
578        let (disk, _) = RaftDisk::open(dir.path()).unwrap();
579
580        let meta = SnapshotMeta {
581            last_log_id: Some(log_id(1, 5)),
582            last_membership: StoredMembership::default(),
583            snapshot_id: "1-5".to_string(),
584        };
585        let data = b"snapshot-payload-here".to_vec();
586        disk.write_snapshot(&meta, &data).unwrap();
587
588        // reopen and recover
589        let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
590        let (rec_meta, rec_data) = recovered.snapshot.unwrap();
591        assert_eq!(rec_meta.last_log_id, meta.last_log_id);
592        assert_eq!(rec_data, data);
593    }
594
595    #[test]
596    fn write_meta_persists_vote() {
597        let dir = tempfile::tempdir().unwrap();
598        let (disk, _) = RaftDisk::open(dir.path()).unwrap();
599
600        let vote = Vote::new(2, 1);
601        let purged = log_id(1, 10);
602        disk.write_meta(Some(vote), Some(purged)).unwrap();
603
604        // reopen
605        let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
606        assert_eq!(recovered.vote, Some(vote));
607        assert_eq!(recovered.last_purged, Some(purged));
608    }
609
610    #[test]
611    fn fresh_directory_creates_files() {
612        let dir = tempfile::tempdir().unwrap();
613        let raft_dir = dir.path().join("raft");
614
615        let (_disk, recovered) = RaftDisk::open(&raft_dir).unwrap();
616        assert!(recovered.log.is_empty());
617        assert!(recovered.vote.is_none());
618        assert!(recovered.snapshot.is_none());
619
620        assert!(raft_dir.join("raft-meta").exists());
621        assert!(raft_dir.join("raft-log").exists());
622    }
623
624    #[test]
625    fn corrupt_meta_magic_returns_error() {
626        let dir = tempfile::tempdir().unwrap();
627        let meta_path = dir.path().join("raft-meta");
628
629        // write garbage
630        fs::write(&meta_path, b"JUNK").unwrap();
631
632        let result = RaftDisk::open(dir.path());
633        assert!(result.is_err());
634    }
635
636    #[test]
637    fn record_crc_mismatch_detected() {
638        let dir = tempfile::tempdir().unwrap();
639
640        // write one valid entry then corrupt its CRC
641        {
642            let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
643            disk.append_entries(&[test_entry(1, 1)]).unwrap();
644        }
645
646        // flip a byte in the log file payload
647        let log_path = dir.path().join("raft-log");
648        let mut data = fs::read(&log_path).unwrap();
649        // payload starts at offset 5 (header) + 4 (length) = 9
650        if data.len() > 10 {
651            data[10] ^= 0xFF;
652        }
653        fs::write(&log_path, &data).unwrap();
654
655        // recover — corrupt entry should be truncated
656        let (_disk, recovered) = RaftDisk::open(dir.path()).unwrap();
657        assert!(recovered.log.is_empty());
658    }
659}