1use std::collections::BTreeMap;
21use std::fs::{self, File, OpenOptions};
22use std::io::{self, BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24
25use crc32fast::Hasher;
26use openraft::{Entry, LogId, SnapshotMeta, Vote};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tracing::{debug, warn};
30
31use crate::raft::TypeConfig;
32
33const META_MAGIC: &[u8; 4] = b"ERMT";
38const LOG_MAGIC: &[u8; 4] = b"ERLO";
39const SNAP_MAGIC: &[u8; 4] = b"ERSS";
40const FORMAT_VERSION: u8 = 1;
41
42const MAX_RECORD_SIZE: u32 = 64 * 1024 * 1024;
45
46#[derive(Debug, Error)]
51pub enum RaftDiskError {
52 #[error("io error: {0}")]
53 Io(#[from] io::Error),
54
55 #[error("invalid magic bytes in {file}")]
56 InvalidMagic { file: &'static str },
57
58 #[error("unsupported format version {version} in {file}")]
59 UnsupportedVersion { file: &'static str, version: u8 },
60
61 #[error("crc32 mismatch (expected {expected:#010x}, got {actual:#010x})")]
62 ChecksumMismatch { expected: u32, actual: u32 },
63
64 #[error("postcard error: {0}")]
65 Postcard(String),
66
67 #[error("record payload too large ({size} bytes, max {MAX_RECORD_SIZE})")]
68 RecordTooLarge { size: u32 },
69}
70
71impl From<RaftDiskError> for openraft::StorageError<u64> {
72 fn from(e: RaftDiskError) -> Self {
73 openraft::StorageIOError::write(&e).into()
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
83struct MetaRecord {
84 vote: Option<Vote<u64>>,
85 last_purged: Option<LogId<u64>>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90struct SnapshotRecord {
91 meta: SnapshotMeta<u64, openraft::BasicNode>,
92 data: Vec<u8>,
93}
94
95#[derive(Debug, Default)]
101pub(crate) struct RecoveredState {
102 pub vote: Option<Vote<u64>>,
103 pub last_purged: Option<LogId<u64>>,
104 pub log: BTreeMap<u64, Entry<TypeConfig>>,
105 pub snapshot: Option<(SnapshotMeta<u64, openraft::BasicNode>, Vec<u8>)>,
106}
107
108pub(crate) struct RaftDisk {
118 dir: PathBuf,
119 log_file: File,
120}
121
122impl std::fmt::Debug for RaftDisk {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_struct("RaftDisk").field("dir", &self.dir).finish()
125 }
126}
127
128impl RaftDisk {
129 pub fn open(dir: &Path) -> Result<(Self, RecoveredState), RaftDiskError> {
135 fs::create_dir_all(dir)?;
136
137 let meta_path = dir.join("raft-meta");
138 let log_path = dir.join("raft-log");
139 let snap_path = dir.join("raft-snapshot");
140
141 let meta = if meta_path.exists() {
143 read_meta_file(&meta_path)?
144 } else {
145 write_meta_file(&meta_path, &MetaRecord::default())?;
146 MetaRecord::default()
147 };
148
149 let snapshot = if snap_path.exists() {
151 match read_snapshot_file(&snap_path) {
152 Ok(rec) => Some((rec.meta, rec.data)),
153 Err(e) => {
154 warn!("raft snapshot file corrupt, ignoring: {e}");
155 None
156 }
157 }
158 } else {
159 None
160 };
161
162 let (log, valid_len) = if log_path.exists() {
164 recover_log_file(&log_path)?
165 } else {
166 (BTreeMap::new(), 0)
167 };
168
169 let log_file = if log_path.exists() {
171 let file = OpenOptions::new().write(true).open(&log_path)?;
172 file.set_len(valid_len)?;
173 file.sync_all()?;
174 OpenOptions::new().append(true).open(&log_path)?
176 } else {
177 let mut file = File::create(&log_path)?;
178 write_header(&mut file, LOG_MAGIC)?;
179 file.sync_all()?;
180 OpenOptions::new().append(true).open(&log_path)?
181 };
182
183 let recovered = RecoveredState {
184 vote: meta.vote,
185 last_purged: meta.last_purged,
186 log,
187 snapshot,
188 };
189
190 debug!(
191 "raft disk recovered: vote={:?}, last_purged={:?}, log_entries={}, has_snapshot={}",
192 recovered.vote,
193 recovered.last_purged,
194 recovered.log.len(),
195 recovered.snapshot.is_some(),
196 );
197
198 Ok((
199 Self {
200 dir: dir.to_path_buf(),
201 log_file,
202 },
203 recovered,
204 ))
205 }
206
207 pub fn write_meta(
209 &self,
210 vote: Option<Vote<u64>>,
211 last_purged: Option<LogId<u64>>,
212 ) -> Result<(), RaftDiskError> {
213 let record = MetaRecord { vote, last_purged };
214 write_meta_file(&self.dir.join("raft-meta"), &record)
215 }
216
217 pub fn append_entries(&mut self, entries: &[Entry<TypeConfig>]) -> Result<(), RaftDiskError> {
219 for entry in entries {
220 write_record(&mut self.log_file, entry)?;
221 }
222 self.log_file.flush()?;
223 Ok(())
224 }
225
226 pub fn rewrite_log(
230 &mut self,
231 entries: &BTreeMap<u64, Entry<TypeConfig>>,
232 ) -> Result<(), RaftDiskError> {
233 let log_path = self.dir.join("raft-log");
234 let tmp_path = self.dir.join("raft-log.tmp");
235
236 let mut file = File::create(&tmp_path)?;
237 write_header(&mut file, LOG_MAGIC)?;
238 for entry in entries.values() {
239 write_record(&mut file, entry)?;
240 }
241 file.flush()?;
242 file.sync_all()?;
243
244 fs::rename(&tmp_path, &log_path)?;
245
246 self.log_file = OpenOptions::new().append(true).open(&log_path)?;
248 Ok(())
249 }
250
251 pub fn write_snapshot(
253 &self,
254 meta: &SnapshotMeta<u64, openraft::BasicNode>,
255 data: &[u8],
256 ) -> Result<(), RaftDiskError> {
257 let snap_path = self.dir.join("raft-snapshot");
258 let tmp_path = self.dir.join("raft-snapshot.tmp");
259
260 let record = SnapshotRecord {
261 meta: meta.clone(),
262 data: data.to_vec(),
263 };
264
265 let mut file = File::create(&tmp_path)?;
266 write_header(&mut file, SNAP_MAGIC)?;
267 write_record(&mut file, &record)?;
268 file.flush()?;
269 file.sync_all()?;
270
271 fs::rename(&tmp_path, &snap_path)?;
272 Ok(())
273 }
274}
275
276fn crc32(data: &[u8]) -> u32 {
281 let mut h = Hasher::new();
282 h.update(data);
283 h.finalize()
284}
285
286fn write_header(w: &mut impl Write, magic: &[u8; 4]) -> Result<(), RaftDiskError> {
287 w.write_all(magic)?;
288 w.write_all(&[FORMAT_VERSION])?;
289 Ok(())
290}
291
292fn read_header(
293 r: &mut impl Read,
294 expected_magic: &[u8; 4],
295 file_name: &'static str,
296) -> Result<(), RaftDiskError> {
297 let mut magic = [0u8; 4];
298 r.read_exact(&mut magic).map_err(|e| {
299 if e.kind() == io::ErrorKind::UnexpectedEof {
300 RaftDiskError::InvalidMagic { file: file_name }
301 } else {
302 RaftDiskError::Io(e)
303 }
304 })?;
305 if &magic != expected_magic {
306 return Err(RaftDiskError::InvalidMagic { file: file_name });
307 }
308 let mut ver = [0u8; 1];
309 r.read_exact(&mut ver)?;
310 if ver[0] != FORMAT_VERSION {
311 return Err(RaftDiskError::UnsupportedVersion {
312 file: file_name,
313 version: ver[0],
314 });
315 }
316 Ok(())
317}
318
319fn write_record<T: Serialize>(w: &mut impl Write, value: &T) -> Result<(), RaftDiskError> {
321 let payload =
322 postcard::to_allocvec(value).map_err(|e| RaftDiskError::Postcard(e.to_string()))?;
323 let len = payload.len() as u32;
324 w.write_all(&len.to_le_bytes())?;
325 w.write_all(&payload)?;
326 let checksum = crc32(&payload);
327 w.write_all(&checksum.to_le_bytes())?;
328 Ok(())
329}
330
331fn read_record<T: for<'de> Deserialize<'de>>(
334 r: &mut impl Read,
335) -> Result<Option<T>, RaftDiskError> {
336 let mut len_buf = [0u8; 4];
338 match r.read_exact(&mut len_buf) {
339 Ok(()) => {}
340 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
341 Err(e) => return Err(RaftDiskError::Io(e)),
342 }
343 let len = u32::from_le_bytes(len_buf);
344 if len > MAX_RECORD_SIZE {
345 return Err(RaftDiskError::RecordTooLarge { size: len });
346 }
347
348 let mut payload = vec![0u8; len as usize];
349 r.read_exact(&mut payload)?;
350
351 let mut crc_buf = [0u8; 4];
352 r.read_exact(&mut crc_buf)?;
353 let stored_crc = u32::from_le_bytes(crc_buf);
354
355 let computed_crc = crc32(&payload);
356 if computed_crc != stored_crc {
357 return Err(RaftDiskError::ChecksumMismatch {
358 expected: stored_crc,
359 actual: computed_crc,
360 });
361 }
362
363 let value =
364 postcard::from_bytes(&payload).map_err(|e| RaftDiskError::Postcard(e.to_string()))?;
365 Ok(Some(value))
366}
367
368fn write_meta_file(path: &Path, record: &MetaRecord) -> Result<(), RaftDiskError> {
373 let tmp_path = path.with_extension("tmp");
374 let mut file = BufWriter::new(File::create(&tmp_path)?);
375 write_header(&mut file, META_MAGIC)?;
376 write_record(&mut file, record)?;
377 file.flush()?;
378 file.into_inner()
379 .map_err(|e| RaftDiskError::Io(e.into_error()))?
380 .sync_all()?;
381 fs::rename(&tmp_path, path)?;
382 Ok(())
383}
384
385fn read_meta_file(path: &Path) -> Result<MetaRecord, RaftDiskError> {
386 let mut reader = BufReader::new(File::open(path)?);
387 read_header(&mut reader, META_MAGIC, "raft-meta")?;
388 match read_record(&mut reader)? {
389 Some(record) => Ok(record),
390 None => Ok(MetaRecord::default()),
391 }
392}
393
394fn read_snapshot_file(path: &Path) -> Result<SnapshotRecord, RaftDiskError> {
395 let mut reader = BufReader::new(File::open(path)?);
396 read_header(&mut reader, SNAP_MAGIC, "raft-snapshot")?;
397 match read_record(&mut reader)? {
398 Some(record) => Ok(record),
399 None => Err(RaftDiskError::Io(io::Error::new(
400 io::ErrorKind::UnexpectedEof,
401 "empty snapshot file",
402 ))),
403 }
404}
405
406fn recover_log_file(path: &Path) -> Result<(BTreeMap<u64, Entry<TypeConfig>>, u64), RaftDiskError> {
409 let file = File::open(path)?;
410 let file_len = file.metadata()?.len();
411 let mut reader = BufReader::new(file);
412
413 if file_len < 5 {
415 return Ok((BTreeMap::new(), 0));
417 }
418 read_header(&mut reader, LOG_MAGIC, "raft-log")?;
419
420 let mut entries = BTreeMap::new();
421 let mut valid_pos: u64 = 5; loop {
424 let remaining = file_len.saturating_sub(valid_pos);
426 if remaining == 0 {
427 break;
428 }
429
430 match read_record::<Entry<TypeConfig>>(&mut reader) {
431 Ok(Some(entry)) => {
432 let payload_bytes = postcard::to_allocvec(&entry)
433 .map_err(|e| RaftDiskError::Postcard(e.to_string()))?
434 .len() as u64;
435 valid_pos += 4 + payload_bytes + 4;
437 entries.insert(entry.log_id.index, entry);
438 }
439 Ok(None) => break,
440 Err(e) => {
441 warn!("raft log truncated at offset {valid_pos}: {e}");
443 break;
444 }
445 }
446 }
447
448 Ok((entries, valid_pos))
449}
450
451#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::raft::ClusterCommand;
459 use crate::topology::NodeId;
460 use openraft::{CommittedLeaderId, EntryPayload, StoredMembership};
461
462 fn log_id(term: u64, index: u64) -> LogId<u64> {
463 LogId::new(CommittedLeaderId::new(term, 0), index)
464 }
465
466 fn test_entry(term: u64, index: u64) -> Entry<TypeConfig> {
467 Entry {
468 log_id: log_id(term, index),
469 payload: EntryPayload::Blank,
470 }
471 }
472
473 fn test_entry_with_data(term: u64, index: u64) -> Entry<TypeConfig> {
474 Entry {
475 log_id: log_id(term, index),
476 payload: EntryPayload::Normal(ClusterCommand::AddNode {
477 node_id: NodeId::new(),
478 raft_id: index,
479 addr: "127.0.0.1:6379".to_string(),
480 is_primary: true,
481 }),
482 }
483 }
484
485 #[test]
486 fn meta_round_trip() {
487 let dir = tempfile::tempdir().unwrap();
488 let path = dir.path().join("raft-meta");
489
490 let record = MetaRecord {
491 vote: Some(Vote::new(1, 2)),
492 last_purged: Some(log_id(1, 5)),
493 };
494 write_meta_file(&path, &record).unwrap();
495
496 let recovered = read_meta_file(&path).unwrap();
497 assert_eq!(recovered.vote, record.vote);
498 assert_eq!(recovered.last_purged, record.last_purged);
499 }
500
501 #[test]
502 fn meta_default_round_trip() {
503 let dir = tempfile::tempdir().unwrap();
504 let path = dir.path().join("raft-meta");
505
506 write_meta_file(&path, &MetaRecord::default()).unwrap();
507 let recovered = read_meta_file(&path).unwrap();
508 assert!(recovered.vote.is_none());
509 assert!(recovered.last_purged.is_none());
510 }
511
512 #[test]
513 fn log_round_trip() {
514 let dir = tempfile::tempdir().unwrap();
515 let (mut disk, recovered) = RaftDisk::open(dir.path()).unwrap();
516 assert!(recovered.log.is_empty());
517
518 let entries = vec![
519 test_entry(1, 1),
520 test_entry(1, 2),
521 test_entry_with_data(1, 3),
522 ];
523 disk.append_entries(&entries).unwrap();
524
525 let (_disk2, recovered2) = RaftDisk::open(dir.path()).unwrap();
527 assert_eq!(recovered2.log.len(), 3);
528 assert!(recovered2.log.contains_key(&1));
529 assert!(recovered2.log.contains_key(&2));
530 assert!(recovered2.log.contains_key(&3));
531 }
532
533 #[test]
534 fn log_truncation_on_corruption() {
535 let dir = tempfile::tempdir().unwrap();
536 let log_path = dir.path().join("raft-log");
537
538 {
540 let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
541 disk.append_entries(&[test_entry(1, 1), test_entry(1, 2)])
542 .unwrap();
543 }
544
545 {
547 let mut file = OpenOptions::new().append(true).open(&log_path).unwrap();
548 file.write_all(b"GARBAGE_BYTES").unwrap();
549 }
550
551 let (_disk, recovered) = RaftDisk::open(dir.path()).unwrap();
553 assert_eq!(recovered.log.len(), 2);
554 }
555
556 #[test]
557 fn log_rewrite() {
558 let dir = tempfile::tempdir().unwrap();
559 let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
560
561 disk.append_entries(&[test_entry(1, 1), test_entry(1, 2), test_entry(1, 3)])
562 .unwrap();
563
564 let mut remaining = BTreeMap::new();
566 remaining.insert(3, test_entry(1, 3));
567 disk.rewrite_log(&remaining).unwrap();
568
569 let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
571 assert_eq!(recovered.log.len(), 1);
572 assert!(recovered.log.contains_key(&3));
573 }
574
575 #[test]
576 fn snapshot_round_trip() {
577 let dir = tempfile::tempdir().unwrap();
578 let (disk, _) = RaftDisk::open(dir.path()).unwrap();
579
580 let meta = SnapshotMeta {
581 last_log_id: Some(log_id(1, 5)),
582 last_membership: StoredMembership::default(),
583 snapshot_id: "1-5".to_string(),
584 };
585 let data = b"snapshot-payload-here".to_vec();
586 disk.write_snapshot(&meta, &data).unwrap();
587
588 let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
590 let (rec_meta, rec_data) = recovered.snapshot.unwrap();
591 assert_eq!(rec_meta.last_log_id, meta.last_log_id);
592 assert_eq!(rec_data, data);
593 }
594
595 #[test]
596 fn write_meta_persists_vote() {
597 let dir = tempfile::tempdir().unwrap();
598 let (disk, _) = RaftDisk::open(dir.path()).unwrap();
599
600 let vote = Vote::new(2, 1);
601 let purged = log_id(1, 10);
602 disk.write_meta(Some(vote), Some(purged)).unwrap();
603
604 let (_disk2, recovered) = RaftDisk::open(dir.path()).unwrap();
606 assert_eq!(recovered.vote, Some(vote));
607 assert_eq!(recovered.last_purged, Some(purged));
608 }
609
610 #[test]
611 fn fresh_directory_creates_files() {
612 let dir = tempfile::tempdir().unwrap();
613 let raft_dir = dir.path().join("raft");
614
615 let (_disk, recovered) = RaftDisk::open(&raft_dir).unwrap();
616 assert!(recovered.log.is_empty());
617 assert!(recovered.vote.is_none());
618 assert!(recovered.snapshot.is_none());
619
620 assert!(raft_dir.join("raft-meta").exists());
621 assert!(raft_dir.join("raft-log").exists());
622 }
623
624 #[test]
625 fn corrupt_meta_magic_returns_error() {
626 let dir = tempfile::tempdir().unwrap();
627 let meta_path = dir.path().join("raft-meta");
628
629 fs::write(&meta_path, b"JUNK").unwrap();
631
632 let result = RaftDisk::open(dir.path());
633 assert!(result.is_err());
634 }
635
636 #[test]
637 fn record_crc_mismatch_detected() {
638 let dir = tempfile::tempdir().unwrap();
639
640 {
642 let (mut disk, _) = RaftDisk::open(dir.path()).unwrap();
643 disk.append_entries(&[test_entry(1, 1)]).unwrap();
644 }
645
646 let log_path = dir.path().join("raft-log");
648 let mut data = fs::read(&log_path).unwrap();
649 if data.len() > 10 {
651 data[10] ^= 0xFF;
652 }
653 fs::write(&log_path, &data).unwrap();
654
655 let (_disk, recovered) = RaftDisk::open(dir.path()).unwrap();
657 assert!(recovered.log.is_empty());
658 }
659}