1use std::fs::{File, OpenOptions};
2use std::io::{self, Read, Seek, Write};
3use std::path::{Path, PathBuf};
4
5use hotmint_types::Height;
6use serde::{Deserialize, Serialize};
7use tracing::warn;
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11enum WalEntry {
12 CommitIntent {
15 target_height: Height,
17 },
18 CommitDone { target_height: Height },
20}
21
22const WAL_FILE: &str = "consensus.wal";
23const ENTRY_MAGIC: [u8; 4] = [0x57, 0x41, 0x4C, 0x31]; pub struct ConsensusWal {
33 _path: PathBuf,
34 file: File,
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub enum WalRecovery {
40 Clean,
42 NeedsReplay { target_height: Height },
45}
46
47impl ConsensusWal {
48 pub fn open(data_dir: &Path) -> io::Result<Self> {
50 let path = data_dir.join(WAL_FILE);
51 let mut file = OpenOptions::new()
52 .create(true)
53 .truncate(false)
54 .write(true)
55 .read(true)
56 .open(&path)?;
57 file.seek(io::SeekFrom::End(0))?;
61 Ok(Self { _path: path, file })
62 }
63
64 pub fn check_recovery(data_dir: &Path) -> io::Result<WalRecovery> {
66 let path = data_dir.join(WAL_FILE);
67 if !path.exists() {
68 return Ok(WalRecovery::Clean);
69 }
70
71 let mut file = File::open(&path)?;
72 let mut buf = Vec::new();
73 file.read_to_end(&mut buf)?;
74
75 let mut last_entry = None;
76 let mut offset = 0;
77
78 while offset < buf.len() {
79 match decode_entry(&buf[offset..]) {
80 Some((entry, consumed)) => {
81 last_entry = Some(entry);
82 offset += consumed;
83 }
84 None => break, }
86 }
87
88 match last_entry {
89 Some(WalEntry::CommitIntent { target_height }) => {
90 warn!(
91 target_height = target_height.as_u64(),
92 "WAL: incomplete commit detected, replay needed"
93 );
94 Ok(WalRecovery::NeedsReplay { target_height })
95 }
96 _ => Ok(WalRecovery::Clean),
97 }
98 }
99
100 pub fn log_commit_intent(&mut self, target_height: Height) -> io::Result<()> {
102 let entry = WalEntry::CommitIntent { target_height };
103 self.write_entry(&entry)?;
104 self.file.sync_all()
105 }
106
107 pub fn log_commit_done(&mut self, target_height: Height) -> io::Result<()> {
109 let entry = WalEntry::CommitDone { target_height };
110 self.write_entry(&entry)?;
111 self.file.sync_all()?;
114 self.truncate()
115 }
116
117 fn truncate(&mut self) -> io::Result<()> {
119 self.file.set_len(0)?;
120 self.file.seek(io::SeekFrom::Start(0))?;
121 self.file.sync_all()
122 }
123
124 fn write_entry(&mut self, entry: &WalEntry) -> io::Result<()> {
125 let payload = postcard::to_allocvec(entry).map_err(io::Error::other)?;
126 let len = payload.len() as u32;
127 self.file.write_all(&ENTRY_MAGIC)?;
128 self.file.write_all(&len.to_le_bytes())?;
129 self.file.write_all(&payload)?;
130 Ok(())
131 }
132}
133
134fn decode_entry(buf: &[u8]) -> Option<(WalEntry, usize)> {
136 if buf.len() < 8 {
137 return None;
138 }
139 if buf[..4] != ENTRY_MAGIC {
140 return None;
141 }
142 let len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
143 if buf.len() < 8 + len {
144 return None;
145 }
146 let entry: WalEntry = postcard::from_bytes(&buf[8..8 + len]).ok()?;
147 Some((entry, 8 + len))
148}
149
150impl hotmint_consensus::Wal for ConsensusWal {
152 fn log_commit_intent(&mut self, target_height: Height) -> io::Result<()> {
153 self.log_commit_intent(target_height)
154 }
155 fn log_commit_done(&mut self, target_height: Height) -> io::Result<()> {
156 self.log_commit_done(target_height)
157 }
158}
159
160pub struct NoopWal;
162
163impl hotmint_consensus::Wal for NoopWal {
164 fn log_commit_intent(&mut self, _target_height: Height) -> io::Result<()> {
165 Ok(())
166 }
167 fn log_commit_done(&mut self, _target_height: Height) -> io::Result<()> {
168 Ok(())
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn wal_clean_when_empty() {
178 let dir = tempfile::tempdir().unwrap();
179 let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
180 assert_eq!(recovery, WalRecovery::Clean);
181 }
182
183 #[test]
184 fn wal_clean_after_done() {
185 let dir = tempfile::tempdir().unwrap();
186 let mut wal = ConsensusWal::open(dir.path()).unwrap();
187 wal.log_commit_intent(Height(5)).unwrap();
188 wal.log_commit_done(Height(5)).unwrap();
189
190 let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
191 assert_eq!(recovery, WalRecovery::Clean);
192 }
193
194 #[test]
195 fn wal_needs_replay_after_intent() {
196 let dir = tempfile::tempdir().unwrap();
197 let mut wal = ConsensusWal::open(dir.path()).unwrap();
198 wal.log_commit_intent(Height(10)).unwrap();
199 drop(wal);
201
202 let recovery = ConsensusWal::check_recovery(dir.path()).unwrap();
203 assert_eq!(
204 recovery,
205 WalRecovery::NeedsReplay {
206 target_height: Height(10),
207 }
208 );
209 }
210
211 #[test]
212 fn wal_truncated_after_done() {
213 let dir = tempfile::tempdir().unwrap();
214 let mut wal = ConsensusWal::open(dir.path()).unwrap();
215
216 for h in 1..=5 {
218 wal.log_commit_intent(Height(h)).unwrap();
219 wal.log_commit_done(Height(h)).unwrap();
220 }
221
222 let path = dir.path().join(WAL_FILE);
224 let size = std::fs::metadata(&path).unwrap().len();
225 assert_eq!(size, 0, "WAL should be truncated after successful commits");
226 }
227}