Skip to main content

hotmint_storage/
wal.rs

1use std::fs::{File, OpenOptions};
2use std::io::{self, Read, Seek, Write};
3use std::path::{Path, PathBuf};
4
5use hotmint_types::Height;
6use serde::{Deserialize, Serialize};
7use tracing::warn;
8
9/// WAL (Write-Ahead Log) entry types.
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11enum WalEntry {
12    /// Logged before starting a commit batch. If the node crashes after this
13    /// entry but before `CommitDone`, recovery knows a mid-commit crash occurred.
14    CommitIntent {
15        /// The target height (highest block being committed).
16        target_height: Height,
17    },
18    /// Logged after commit + persist_state succeeds.
19    CommitDone { target_height: Height },
20}
21
22const WAL_FILE: &str = "consensus.wal";
23/// Magic bytes at the start of each entry for corruption detection.
24const ENTRY_MAGIC: [u8; 4] = [0x57, 0x41, 0x4C, 0x31]; // "WAL1"
25
26/// Write-Ahead Log for consensus commit operations.
27///
28/// Provides crash recovery by recording commit intent before executing blocks
29/// and commit-done after persisting state. On restart, if a `CommitIntent`
30/// without a matching `CommitDone` is found, the node knows a mid-commit
31/// crash occurred and can re-execute from the block store.
32pub struct ConsensusWal {
33    _path: PathBuf,
34    file: File,
35}
36
37/// Result of WAL recovery check on startup.
38#[derive(Debug, Clone, PartialEq)]
39pub enum WalRecovery {
40    /// No recovery needed — last commit completed successfully or WAL is empty.
41    Clean,
42    /// A commit was in progress when the node crashed. The application should
43    /// re-execute blocks from `last_committed_height + 1` to `target_height`.
44    NeedsReplay { target_height: Height },
45}
46
47impl ConsensusWal {
48    /// Open or create the WAL file in the given data directory.
49    pub fn open(data_dir: &Path) -> io::Result<Self> {
50        let path = data_dir.join(WAL_FILE);
51        let mut file = OpenOptions::new()
52            .create(true)
53            .truncate(false)
54            .write(true)
55            .read(true)
56            .open(&path)?;
57        // Seek to end so new entries append after any existing content.
58        // Unlike `.append(true)`, this allows truncate+seek(0) to work
59        // correctly — subsequent writes go to position 0, not the old EOF.
60        file.seek(io::SeekFrom::End(0))?;
61        Ok(Self { _path: path, file })
62    }
63
64    /// Check the WAL for incomplete commits (called on startup).
65    pub fn check_recovery(data_dir: &Path) -> io::Result<WalRecovery> {
66        let path = data_dir.join(WAL_FILE);
67        if !path.exists() {
68            return Ok(WalRecovery::Clean);
69        }
70
71        let mut file = File::open(&path)?;
72        let mut buf = Vec::new();
73        file.read_to_end(&mut buf)?;
74
75        let mut last_entry = None;
76        let mut offset = 0;
77
78        while offset < buf.len() {
79            match decode_entry(&buf[offset..]) {
80                Some((entry, consumed)) => {
81                    last_entry = Some(entry);
82                    offset += consumed;
83                }
84                None => break, // corrupt or truncated tail
85            }
86        }
87
88        match last_entry {
89            Some(WalEntry::CommitIntent { target_height }) => {
90                warn!(
91                    target_height = target_height.as_u64(),
92                    "WAL: incomplete commit detected, replay needed"
93                );
94                Ok(WalRecovery::NeedsReplay { target_height })
95            }
96            _ => Ok(WalRecovery::Clean),
97        }
98    }
99
100    /// Record that a commit batch is about to start.
101    pub fn log_commit_intent(&mut self, target_height: Height) -> io::Result<()> {
102        let entry = WalEntry::CommitIntent { target_height };
103        self.write_entry(&entry)?;
104        self.file.sync_all()
105    }
106
107    /// Record that a commit batch completed successfully.
108    pub fn log_commit_done(&mut self, target_height: Height) -> io::Result<()> {
109        let entry = WalEntry::CommitDone { target_height };
110        self.write_entry(&entry)?;
111        // Truncate the WAL after a successful commit to keep it small.
112        // The WAL only needs to survive between intent and done.
113        self.file.sync_all()?;
114        self.truncate()
115    }
116
117    /// Truncate the WAL file (called after successful commit).
118    fn truncate(&mut self) -> io::Result<()> {
119        self.file.set_len(0)?;
120        self.file.seek(io::SeekFrom::Start(0))?;
121        self.file.sync_all()
122    }
123
124    fn write_entry(&mut self, entry: &WalEntry) -> io::Result<()> {
125        let payload = postcard::to_allocvec(entry).map_err(io::Error::other)?;
126        let len = payload.len() as u32;
127        self.file.write_all(&ENTRY_MAGIC)?;
128        self.file.write_all(&len.to_le_bytes())?;
129        self.file.write_all(&payload)?;
130        Ok(())
131    }
132}
133
134/// Decode one WAL entry from a byte slice. Returns the entry and bytes consumed.
135fn decode_entry(buf: &[u8]) -> Option<(WalEntry, usize)> {
136    if buf.len() < 8 {
137        return None;
138    }
139    if buf[..4] != ENTRY_MAGIC {
140        return None;
141    }
142    let len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
143    if buf.len() < 8 + len {
144        return None;
145    }
146    let entry: WalEntry = postcard::from_bytes(&buf[8..8 + len]).ok()?;
147    Some((entry, 8 + len))
148}
149
150/// The WAL struct implements `hotmint_consensus::Wal`.
151impl hotmint_consensus::Wal for ConsensusWal {
152    fn log_commit_intent(&mut self, target_height: Height) -> io::Result<()> {
153        self.log_commit_intent(target_height)
154    }
155    fn log_commit_done(&mut self, target_height: Height) -> io::Result<()> {
156        self.log_commit_done(target_height)
157    }
158}
159
160/// No-op WAL for testing or when WAL is disabled.
161pub struct NoopWal;
162
163impl hotmint_consensus::Wal for NoopWal {
164    fn log_commit_intent(&mut self, _target_height: Height) -> io::Result<()> {
165        Ok(())
166    }
167    fn log_commit_done(&mut self, _target_height: Height) -> io::Result<()> {
168        Ok(())
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    #[test]
177    fn wal_clean_when_empty() {
178        let dir = tempfile::tempdir().unwrap();
179        let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
180        assert_eq!(recovery, WalRecovery::Clean);
181    }
182
183    #[test]
184    fn wal_clean_after_done() {
185        let dir = tempfile::tempdir().unwrap();
186        let mut wal = ConsensusWal::open(dir.path()).unwrap();
187        wal.log_commit_intent(Height(5)).unwrap();
188        wal.log_commit_done(Height(5)).unwrap();
189
190        let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
191        assert_eq!(recovery, WalRecovery::Clean);
192    }
193
194    #[test]
195    fn wal_needs_replay_after_intent() {
196        let dir = tempfile::tempdir().unwrap();
197        let mut wal = ConsensusWal::open(dir.path()).unwrap();
198        wal.log_commit_intent(Height(10)).unwrap();
199        // Simulate crash — no log_commit_done
200        drop(wal);
201
202        let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
203        assert_eq!(
204            recovery,
205            WalRecovery::NeedsReplay {
206                target_height: Height(10),
207            }
208        );
209    }
210
211    #[test]
212    fn wal_truncated_after_done() {
213        let dir = tempfile::tempdir().unwrap();
214        let mut wal = ConsensusWal::open(dir.path()).unwrap();
215
216        // Multiple commit cycles
217        for h in 1..=5 {
218            wal.log_commit_intent(Height(h)).unwrap();
219            wal.log_commit_done(Height(h)).unwrap();
220        }
221
222        // WAL should be truncated/clean
223        let path = dir.path().join(WAL_FILE);
224        let size = std::fs::metadata(&path).unwrap().len();
225        assert_eq!(size, 0, "WAL should be truncated after successful commits");
226    }
227}