1use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::io;
16use std::ops::RangeBounds;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20use openraft::storage::{IOFlushed, RaftLogStorage};
21use openraft::{LogId, LogState, OptionalSend, RaftLogReader, RaftTypeConfig};
22use tokio::sync::RwLock;
23use tracing::{debug, warn};
24
25#[derive(Clone)]
27pub struct FileLogStore<C: RaftTypeConfig> {
28 inner: Arc<RwLock<FileLogStoreInner<C>>>,
29}
30
31struct FileLogStoreInner<C: RaftTypeConfig> {
32 wal_dir: PathBuf,
33 vote_path: PathBuf,
34 committed_path: PathBuf,
35 vote: Option<openraft::vote::Vote<C>>,
36 log: BTreeMap<u64, openraft::Entry<C>>,
37 committed: Option<LogId<C>>,
38 last_purged: Option<LogId<C>>,
39}
40
41impl<C> FileLogStore<C>
42where
43 C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
44{
45 pub fn new(data_dir: &Path) -> io::Result<Self> {
50 let raft_dir = data_dir.join("raft");
51 let wal_dir = raft_dir.join("wal");
52 let vote_path = raft_dir.join("vote.json");
53 let committed_path = raft_dir.join("committed.json");
54
55 std::fs::create_dir_all(&wal_dir)?;
56
57 let vote = if vote_path.exists() {
59 let data = std::fs::read_to_string(&vote_path)?;
60 match serde_json::from_str(&data) {
61 Ok(v) => Some(v),
62 Err(e) => {
63 warn!("Failed to parse vote.json, starting fresh: {e}");
64 None
65 }
66 }
67 } else {
68 None
69 };
70
71 let committed = if committed_path.exists() {
73 let data = std::fs::read_to_string(&committed_path)?;
74 match serde_json::from_str(&data) {
75 Ok(c) => Some(c),
76 Err(e) => {
77 warn!("Failed to parse committed.json, starting fresh: {e}");
78 None
79 }
80 }
81 } else {
82 None
83 };
84
85 let (log, last_purged) = Self::load_wal(&wal_dir)?;
87
88 debug!(
89 "FileLogStore loaded: {} entries, vote={:?}, committed={:?}",
90 log.len(),
91 vote,
92 committed
93 );
94
95 Ok(Self {
96 inner: Arc::new(RwLock::new(FileLogStoreInner {
97 wal_dir,
98 vote_path,
99 committed_path,
100 vote,
101 log,
102 committed,
103 last_purged,
104 })),
105 })
106 }
107
108 #[allow(clippy::type_complexity)]
110 fn load_wal(
111 wal_dir: &Path,
112 ) -> io::Result<(BTreeMap<u64, openraft::Entry<C>>, Option<LogId<C>>)> {
113 let mut log = BTreeMap::new();
114 let mut last_purged: Option<LogId<C>> = None;
115
116 let purged_path = wal_dir.join("purged.json");
118 if purged_path.exists() {
119 let data = std::fs::read_to_string(&purged_path)?;
120 match serde_json::from_str(&data) {
121 Ok(p) => last_purged = Some(p),
122 Err(e) => warn!("Failed to parse purged.json: {e}"),
123 }
124 }
125
126 for entry_result in std::fs::read_dir(wal_dir)? {
127 let entry = entry_result?;
128 let name = entry.file_name();
129 let name_str = name.to_string_lossy();
130
131 if !name_str.ends_with(".json") || name_str == "purged.json" {
133 continue;
134 }
135
136 let index_str = name_str.trim_end_matches(".json");
138 let index: u64 = match index_str.parse() {
139 Ok(i) => i,
140 Err(_) => continue,
141 };
142
143 let data = std::fs::read_to_string(entry.path())?;
144 match serde_json::from_str::<openraft::Entry<C>>(&data) {
145 Ok(log_entry) => {
146 log.insert(index, log_entry);
147 }
148 Err(e) => {
149 warn!("Failed to parse WAL entry {}: {e}", name_str);
150 }
151 }
152 }
153
154 Ok((log, last_purged))
155 }
156}
157
158fn write_file_atomic(path: &Path, data: &[u8]) -> io::Result<()> {
159 let tmp = path.with_extension("tmp");
160 std::fs::write(&tmp, data)?;
161 let file = std::fs::File::open(&tmp)?;
163 file.sync_all()?;
164 drop(file);
165 std::fs::rename(&tmp, path)?;
166 if let Some(parent) = path.parent() {
168 if let Ok(dir) = std::fs::File::open(parent) {
169 let _ = dir.sync_all();
170 }
171 }
172 Ok(())
173}
174
175#[derive(Clone)]
178pub struct FileLogReader<C: RaftTypeConfig> {
179 inner: Arc<RwLock<FileLogStoreInner<C>>>,
180}
181
182impl<C> RaftLogReader<C> for FileLogReader<C>
183where
184 C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
185 openraft::Entry<C>: Clone,
186{
187 async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
188 &mut self,
189 range: RB,
190 ) -> Result<Vec<C::Entry>, io::Error> {
191 let inner = self.inner.read().await;
192 let entries: Vec<C::Entry> = inner.log.range(range).map(|(_, e)| e.clone()).collect();
193 Ok(entries)
194 }
195
196 async fn read_vote(&mut self) -> Result<Option<C::Vote>, io::Error> {
197 let inner = self.inner.read().await;
198 Ok(inner.vote.clone())
199 }
200}
201
202impl<C> RaftLogStorage<C> for FileLogStore<C>
205where
206 C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
207 openraft::Entry<C>: Clone,
208{
209 type LogReader = FileLogReader<C>;
210
211 async fn get_log_state(&mut self) -> Result<LogState<C>, io::Error> {
212 let inner = self.inner.read().await;
213 let last = inner.log.values().last().map(|e| e.log_id.clone());
214 Ok(LogState {
215 last_purged_log_id: inner.last_purged.clone(),
216 last_log_id: last,
217 })
218 }
219
220 async fn get_log_reader(&mut self) -> Self::LogReader {
221 FileLogReader {
222 inner: Arc::clone(&self.inner),
223 }
224 }
225
226 async fn save_vote(&mut self, vote: &C::Vote) -> Result<(), io::Error> {
227 let mut inner = self.inner.write().await;
228 let data = serde_json::to_vec(vote).map_err(io::Error::other)?;
229 write_file_atomic(&inner.vote_path, &data)?;
230 inner.vote = Some(vote.clone());
231 Ok(())
232 }
233
234 async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), io::Error>
235 where
236 I: IntoIterator<Item = C::Entry> + OptionalSend,
237 I::IntoIter: OptionalSend,
238 {
239 let mut inner = self.inner.write().await;
240 for entry in entries {
241 let index = entry.log_id.index;
242 let data = serde_json::to_vec(&entry).map_err(io::Error::other)?;
243 let path = inner.wal_dir.join(format!("{index}.json"));
244 write_file_atomic(&path, &data)?;
245 inner.log.insert(index, entry);
246 }
247 callback.io_completed(Ok(()));
248 Ok(())
249 }
250
251 async fn truncate_after(&mut self, last_log_id: Option<LogId<C>>) -> Result<(), io::Error> {
252 let mut inner = self.inner.write().await;
253 if let Some(id) = last_log_id {
254 let keys_to_remove: Vec<u64> =
255 inner.log.range((id.index + 1)..).map(|(k, _)| *k).collect();
256 for k in keys_to_remove {
257 inner.log.remove(&k);
258 let path = inner.wal_dir.join(format!("{k}.json"));
259 if path.exists() {
260 std::fs::remove_file(&path)?;
261 }
262 }
263 } else {
264 for k in inner.log.keys() {
266 let path = inner.wal_dir.join(format!("{k}.json"));
267 if path.exists() {
268 let _ = std::fs::remove_file(&path);
269 }
270 }
271 inner.log.clear();
272 }
273 Ok(())
274 }
275
276 async fn purge(&mut self, log_id: LogId<C>) -> Result<(), io::Error> {
277 let mut inner = self.inner.write().await;
278 let keys_to_remove: Vec<u64> = inner.log.range(..=log_id.index).map(|(k, _)| *k).collect();
279 for k in keys_to_remove {
280 inner.log.remove(&k);
281 let path = inner.wal_dir.join(format!("{k}.json"));
282 if path.exists() {
283 let _ = std::fs::remove_file(&path);
284 }
285 }
286 let purged_path = inner.wal_dir.join("purged.json");
288 let data = serde_json::to_vec(&log_id).map_err(io::Error::other)?;
289
290 inner.last_purged = Some(log_id);
291 write_file_atomic(&purged_path, &data)?;
292
293 Ok(())
294 }
295
296 async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), io::Error> {
297 let mut inner = self.inner.write().await;
298 let data = serde_json::to_vec(&committed).map_err(io::Error::other)?;
299 write_file_atomic(&inner.committed_path, &data)?;
300 inner.committed = committed;
301 Ok(())
302 }
303
304 async fn read_committed(&mut self) -> Result<Option<LogId<C>>, io::Error> {
305 let inner = self.inner.read().await;
306 Ok(inner.committed.clone())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::test_types::TestTypeConfig;
314
315 #[tokio::test]
316 async fn initial_state_is_empty() {
317 let dir = tempfile::tempdir().unwrap();
318 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
319 let state = store.get_log_state().await.unwrap();
320 assert!(state.last_log_id.is_none());
321 assert!(state.last_purged_log_id.is_none());
322 }
323
324 #[tokio::test]
325 async fn save_and_read_vote() {
326 let dir = tempfile::tempdir().unwrap();
327 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
328 let vote = openraft::vote::Vote::new(1, 1);
329 store.save_vote(&vote).await.unwrap();
330
331 let mut reader = store.get_log_reader().await;
332 let read = reader.read_vote().await.unwrap();
333 assert_eq!(read.unwrap(), vote);
334
335 let mut store2 = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
337 let mut reader2 = store2.get_log_reader().await;
338 let read2 = reader2.read_vote().await.unwrap();
339 assert_eq!(read2.unwrap(), vote);
340 }
341
342 #[tokio::test]
343 async fn vote_persists_across_restart() {
344 let dir = tempfile::tempdir().unwrap();
345
346 let vote = openraft::vote::Vote::new(3, 2);
347 {
348 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
349 store.save_vote(&vote).await.unwrap();
350 }
351
352 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
354 let mut reader = store.get_log_reader().await;
355 let read = reader.read_vote().await.unwrap();
356 assert_eq!(read.unwrap(), vote);
357 }
358
359 #[tokio::test]
360 async fn committed_persists_across_restart() {
361 use openraft::vote::RaftLeaderId;
362 use openraft::vote::leader_id_adv::CommittedLeaderId;
363
364 let dir = tempfile::tempdir().unwrap();
365 let log_id = LogId::new(CommittedLeaderId::new(1, 1), 42);
366 {
367 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
368 store.save_committed(Some(log_id)).await.unwrap();
369 }
370
371 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
372 let read = store.read_committed().await.unwrap();
373 assert_eq!(read.unwrap(), log_id);
374 }
375
376 #[tokio::test]
377 async fn wal_directory_created() {
378 let dir = tempfile::tempdir().unwrap();
379 let _store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
380 assert!(dir.path().join("raft/wal").exists());
381 }
382
383 use openraft::vote::RaftLeaderId;
384
385 #[tokio::test]
386 async fn truncate_after_none_on_empty() {
387 let dir = tempfile::tempdir().unwrap();
388 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
389 store.truncate_after(None).await.unwrap();
390 let state = store.get_log_state().await.unwrap();
391 assert!(state.last_log_id.is_none());
392 }
393
394 #[tokio::test]
395 async fn truncate_after_some_on_empty() {
396 use openraft::vote::leader_id_adv::CommittedLeaderId;
397 let dir = tempfile::tempdir().unwrap();
398 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
399 let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 5);
400 store.truncate_after(Some(log_id)).await.unwrap();
401 }
402
403 #[tokio::test]
404 async fn purge_sets_last_purged_and_writes_marker() {
405 use openraft::vote::leader_id_adv::CommittedLeaderId;
406 let dir = tempfile::tempdir().unwrap();
407 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
408 let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 3);
409 store.purge(log_id).await.unwrap();
410
411 let state = store.get_log_state().await.unwrap();
412 assert_eq!(state.last_purged_log_id.unwrap().index, 3);
413
414 assert!(dir.path().join("raft/wal/purged.json").exists());
416 }
417
418 #[tokio::test]
419 async fn purge_marker_persists_across_restart() {
420 use openraft::vote::leader_id_adv::CommittedLeaderId;
421 let dir = tempfile::tempdir().unwrap();
422 let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 7);
423 {
424 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
425 store.purge(log_id).await.unwrap();
426 }
427
428 let mut store2 = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
429 let state = store2.get_log_state().await.unwrap();
430 assert_eq!(state.last_purged_log_id.unwrap().index, 7);
431 }
432
433 #[tokio::test]
434 async fn wal_entries_load_on_restart() {
435 use crate::test_types::{TestCommand, TestTypeConfig};
436 use openraft::vote::leader_id_adv::CommittedLeaderId;
437 use openraft::{Entry, EntryPayload, LogId};
438
439 let dir = tempfile::tempdir().unwrap();
440 let wal_dir = dir.path().join("raft/wal");
441 std::fs::create_dir_all(&wal_dir).unwrap();
442
443 for i in 1..=3u64 {
445 let entry = Entry::<TestTypeConfig> {
446 log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
447 payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
448 };
449 let data = serde_json::to_vec(&entry).unwrap();
450 std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
451 }
452
453 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
455 let state = store.get_log_state().await.unwrap();
456 assert_eq!(state.last_log_id.unwrap().index, 3);
457
458 let mut reader = store.get_log_reader().await;
460 let entries = reader.try_get_log_entries(1..=3).await.unwrap();
461 assert_eq!(entries.len(), 3);
462 }
463
464 #[tokio::test]
465 async fn truncate_after_with_wal_entries() {
466 use crate::test_types::{TestCommand, TestTypeConfig};
467 use openraft::vote::leader_id_adv::CommittedLeaderId;
468 use openraft::{Entry, EntryPayload, LogId};
469
470 let dir = tempfile::tempdir().unwrap();
471 let wal_dir = dir.path().join("raft/wal");
472 std::fs::create_dir_all(&wal_dir).unwrap();
473
474 for i in 1..=5u64 {
476 let entry = Entry::<TestTypeConfig> {
477 log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
478 payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
479 };
480 let data = serde_json::to_vec(&entry).unwrap();
481 std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
482 }
483
484 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
485
486 let log_id = LogId::new(CommittedLeaderId::new(1, 1), 3);
488 store.truncate_after(Some(log_id)).await.unwrap();
489
490 let state = store.get_log_state().await.unwrap();
491 assert_eq!(state.last_log_id.unwrap().index, 3);
492
493 assert!(!wal_dir.join("4.json").exists());
495 assert!(!wal_dir.join("5.json").exists());
496 assert!(wal_dir.join("3.json").exists());
497 }
498
499 #[tokio::test]
500 async fn purge_removes_wal_entries() {
501 use crate::test_types::{TestCommand, TestTypeConfig};
502 use openraft::vote::leader_id_adv::CommittedLeaderId;
503 use openraft::{Entry, EntryPayload, LogId};
504
505 let dir = tempfile::tempdir().unwrap();
506 let wal_dir = dir.path().join("raft/wal");
507 std::fs::create_dir_all(&wal_dir).unwrap();
508
509 for i in 1..=5u64 {
510 let entry = Entry::<TestTypeConfig> {
511 log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
512 payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
513 };
514 let data = serde_json::to_vec(&entry).unwrap();
515 std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
516 }
517
518 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
519
520 let log_id = LogId::new(CommittedLeaderId::new(1, 1), 3);
522 store.purge(log_id).await.unwrap();
523
524 assert!(!wal_dir.join("1.json").exists());
526 assert!(!wal_dir.join("2.json").exists());
527 assert!(!wal_dir.join("3.json").exists());
528 assert!(wal_dir.join("4.json").exists());
529 assert!(wal_dir.join("5.json").exists());
530
531 assert!(wal_dir.join("purged.json").exists());
533 }
534
535 #[tokio::test]
536 async fn get_log_reader_reads_entries() {
537 use crate::test_types::TestCommand;
538 use openraft::vote::leader_id_adv::CommittedLeaderId;
539 use openraft::{Entry, EntryPayload, LogId};
540
541 let dir = tempfile::tempdir().unwrap();
542 let wal_dir = dir.path().join("raft/wal");
543 std::fs::create_dir_all(&wal_dir).unwrap();
544
545 let entry = Entry::<TestTypeConfig> {
546 log_id: LogId::new(CommittedLeaderId::new(1, 1), 1),
547 payload: EntryPayload::Normal(TestCommand::Set("a".into(), "b".into())),
548 };
549 std::fs::write(wal_dir.join("1.json"), serde_json::to_vec(&entry).unwrap()).unwrap();
550
551 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
552 let mut reader = store.get_log_reader().await;
553 let entries = reader.try_get_log_entries(1..=1).await.unwrap();
554 assert_eq!(entries.len(), 1);
555 }
556
557 #[tokio::test]
558 async fn corrupt_vote_json_starts_fresh() {
559 let dir = tempfile::tempdir().unwrap();
560 let raft_dir = dir.path().join("raft");
561 std::fs::create_dir_all(raft_dir.join("wal")).unwrap();
562 std::fs::write(raft_dir.join("vote.json"), b"not valid json").unwrap();
563
564 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
565 let mut reader = store.get_log_reader().await;
566 let vote = reader.read_vote().await.unwrap();
567 assert!(vote.is_none());
568 }
569
570 #[tokio::test]
571 async fn corrupt_committed_json_starts_fresh() {
572 let dir = tempfile::tempdir().unwrap();
573 let raft_dir = dir.path().join("raft");
574 std::fs::create_dir_all(raft_dir.join("wal")).unwrap();
575 std::fs::write(raft_dir.join("committed.json"), b"garbage").unwrap();
576
577 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
578 let committed = store.read_committed().await.unwrap();
579 assert!(committed.is_none());
580 }
581
582 #[tokio::test]
583 async fn wal_skips_non_json_and_invalid_files() {
584 use crate::test_types::TestCommand;
585 use openraft::vote::leader_id_adv::CommittedLeaderId;
586 use openraft::{Entry, EntryPayload, LogId};
587
588 let dir = tempfile::tempdir().unwrap();
589 let wal_dir = dir.path().join("raft/wal");
590 std::fs::create_dir_all(&wal_dir).unwrap();
591
592 std::fs::write(wal_dir.join("notes.txt"), b"not a wal entry").unwrap();
594 std::fs::write(wal_dir.join("abc.json"), b"not a number index").unwrap();
596 std::fs::write(wal_dir.join("99.json"), b"not valid entry json").unwrap();
598 let entry = Entry::<TestTypeConfig> {
599 log_id: LogId::new(CommittedLeaderId::new(1, 1), 1),
600 payload: EntryPayload::Normal(TestCommand::Set("a".into(), "b".into())),
601 };
602 std::fs::write(wal_dir.join("1.json"), serde_json::to_vec(&entry).unwrap()).unwrap();
603
604 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
605 let state = store.get_log_state().await.unwrap();
606 assert_eq!(state.last_log_id.unwrap().index, 1);
608 }
609
610 #[tokio::test]
611 async fn truncate_after_none_removes_all_entries() {
612 use crate::test_types::TestCommand;
613 use openraft::vote::leader_id_adv::CommittedLeaderId;
614 use openraft::{Entry, EntryPayload, LogId};
615
616 let dir = tempfile::tempdir().unwrap();
617 let wal_dir = dir.path().join("raft/wal");
618 std::fs::create_dir_all(&wal_dir).unwrap();
619
620 for i in 1..=3u64 {
621 let entry = Entry::<TestTypeConfig> {
622 log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
623 payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
624 };
625 std::fs::write(
626 wal_dir.join(format!("{i}.json")),
627 serde_json::to_vec(&entry).unwrap(),
628 )
629 .unwrap();
630 }
631
632 let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
633 store.truncate_after(None).await.unwrap();
634
635 let state = store.get_log_state().await.unwrap();
636 assert!(state.last_log_id.is_none());
637
638 assert!(!wal_dir.join("1.json").exists());
640 assert!(!wal_dir.join("2.json").exists());
641 assert!(!wal_dir.join("3.json").exists());
642 }
643}