Skip to main content

raft_hpc_core/
persistent_store.rs

1//! File-backed log store for Raft.
2//!
3//! Persists WAL entries as individual JSON files and the vote as a separate file.
4//! Layout:
5//! ```text
6//! {data_dir}/raft/
7//!   vote.json          — persisted vote
8//!   committed.json     — last committed log id
9//!   wal/
10//!     {index}.json     — one file per log entry
11//! ```
12
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::io;
16use std::ops::RangeBounds;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20use openraft::storage::{IOFlushed, RaftLogStorage};
21use openraft::{LogId, LogState, OptionalSend, RaftLogReader, RaftTypeConfig};
22use tokio::sync::RwLock;
23use tracing::{debug, warn};
24
25/// File-backed log store with in-memory cache.
26#[derive(Clone)]
27pub struct FileLogStore<C: RaftTypeConfig> {
28    inner: Arc<RwLock<FileLogStoreInner<C>>>,
29}
30
31struct FileLogStoreInner<C: RaftTypeConfig> {
32    wal_dir: PathBuf,
33    vote_path: PathBuf,
34    committed_path: PathBuf,
35    vote: Option<openraft::vote::Vote<C>>,
36    log: BTreeMap<u64, openraft::Entry<C>>,
37    committed: Option<LogId<C>>,
38    last_purged: Option<LogId<C>>,
39}
40
41impl<C> FileLogStore<C>
42where
43    C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
44{
45    /// Create a new file-backed log store.
46    ///
47    /// Creates the directory structure if it doesn't exist, then loads
48    /// existing state from disk.
49    pub fn new(data_dir: &Path) -> io::Result<Self> {
50        let raft_dir = data_dir.join("raft");
51        let wal_dir = raft_dir.join("wal");
52        let vote_path = raft_dir.join("vote.json");
53        let committed_path = raft_dir.join("committed.json");
54
55        std::fs::create_dir_all(&wal_dir)?;
56
57        // Load existing vote
58        let vote = if vote_path.exists() {
59            let data = std::fs::read_to_string(&vote_path)?;
60            match serde_json::from_str(&data) {
61                Ok(v) => Some(v),
62                Err(e) => {
63                    warn!("Failed to parse vote.json, starting fresh: {e}");
64                    None
65                }
66            }
67        } else {
68            None
69        };
70
71        // Load committed log id
72        let committed = if committed_path.exists() {
73            let data = std::fs::read_to_string(&committed_path)?;
74            match serde_json::from_str(&data) {
75                Ok(c) => Some(c),
76                Err(e) => {
77                    warn!("Failed to parse committed.json, starting fresh: {e}");
78                    None
79                }
80            }
81        } else {
82            None
83        };
84
85        // Scan WAL directory and load entries
86        let (log, last_purged) = Self::load_wal(&wal_dir)?;
87
88        debug!(
89            "FileLogStore loaded: {} entries, vote={:?}, committed={:?}",
90            log.len(),
91            vote,
92            committed
93        );
94
95        Ok(Self {
96            inner: Arc::new(RwLock::new(FileLogStoreInner {
97                wal_dir,
98                vote_path,
99                committed_path,
100                vote,
101                log,
102                committed,
103                last_purged,
104            })),
105        })
106    }
107
108    /// Load all WAL entries from disk.
109    #[allow(clippy::type_complexity)]
110    fn load_wal(
111        wal_dir: &Path,
112    ) -> io::Result<(BTreeMap<u64, openraft::Entry<C>>, Option<LogId<C>>)> {
113        let mut log = BTreeMap::new();
114        let mut last_purged: Option<LogId<C>> = None;
115
116        // Check for purged marker
117        let purged_path = wal_dir.join("purged.json");
118        if purged_path.exists() {
119            let data = std::fs::read_to_string(&purged_path)?;
120            match serde_json::from_str(&data) {
121                Ok(p) => last_purged = Some(p),
122                Err(e) => warn!("Failed to parse purged.json: {e}"),
123            }
124        }
125
126        for entry_result in std::fs::read_dir(wal_dir)? {
127            let entry = entry_result?;
128            let name = entry.file_name();
129            let name_str = name.to_string_lossy();
130
131            // Skip non-JSON files and special files
132            if !name_str.ends_with(".json") || name_str == "purged.json" {
133                continue;
134            }
135
136            // Parse index from filename
137            let index_str = name_str.trim_end_matches(".json");
138            let index: u64 = match index_str.parse() {
139                Ok(i) => i,
140                Err(_) => continue,
141            };
142
143            let data = std::fs::read_to_string(entry.path())?;
144            match serde_json::from_str::<openraft::Entry<C>>(&data) {
145                Ok(log_entry) => {
146                    log.insert(index, log_entry);
147                }
148                Err(e) => {
149                    warn!("Failed to parse WAL entry {}: {e}", name_str);
150                }
151            }
152        }
153
154        Ok((log, last_purged))
155    }
156}
157
158fn write_file_atomic(path: &Path, data: &[u8]) -> io::Result<()> {
159    let tmp = path.with_extension("tmp");
160    std::fs::write(&tmp, data)?;
161    // fsync the file
162    let file = std::fs::File::open(&tmp)?;
163    file.sync_all()?;
164    drop(file);
165    std::fs::rename(&tmp, path)?;
166    // fsync the parent directory
167    if let Some(parent) = path.parent() {
168        if let Ok(dir) = std::fs::File::open(parent) {
169            let _ = dir.sync_all();
170        }
171    }
172    Ok(())
173}
174
175// ── RaftLogReader ──────────────────────────────────────────
176
177#[derive(Clone)]
178pub struct FileLogReader<C: RaftTypeConfig> {
179    inner: Arc<RwLock<FileLogStoreInner<C>>>,
180}
181
182impl<C> RaftLogReader<C> for FileLogReader<C>
183where
184    C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
185    openraft::Entry<C>: Clone,
186{
187    async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
188        &mut self,
189        range: RB,
190    ) -> Result<Vec<C::Entry>, io::Error> {
191        let inner = self.inner.read().await;
192        let entries: Vec<C::Entry> = inner.log.range(range).map(|(_, e)| e.clone()).collect();
193        Ok(entries)
194    }
195
196    async fn read_vote(&mut self) -> Result<Option<C::Vote>, io::Error> {
197        let inner = self.inner.read().await;
198        Ok(inner.vote.clone())
199    }
200}
201
202// ── RaftLogStorage ─────────────────────────────────────────
203
204impl<C> RaftLogStorage<C> for FileLogStore<C>
205where
206    C: RaftTypeConfig<Entry = openraft::Entry<C>, Vote = openraft::vote::Vote<C>>,
207    openraft::Entry<C>: Clone,
208{
209    type LogReader = FileLogReader<C>;
210
211    async fn get_log_state(&mut self) -> Result<LogState<C>, io::Error> {
212        let inner = self.inner.read().await;
213        let last = inner.log.values().last().map(|e| e.log_id.clone());
214        Ok(LogState {
215            last_purged_log_id: inner.last_purged.clone(),
216            last_log_id: last,
217        })
218    }
219
220    async fn get_log_reader(&mut self) -> Self::LogReader {
221        FileLogReader {
222            inner: Arc::clone(&self.inner),
223        }
224    }
225
226    async fn save_vote(&mut self, vote: &C::Vote) -> Result<(), io::Error> {
227        let mut inner = self.inner.write().await;
228        let data = serde_json::to_vec(vote).map_err(io::Error::other)?;
229        write_file_atomic(&inner.vote_path, &data)?;
230        inner.vote = Some(vote.clone());
231        Ok(())
232    }
233
234    async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), io::Error>
235    where
236        I: IntoIterator<Item = C::Entry> + OptionalSend,
237        I::IntoIter: OptionalSend,
238    {
239        let mut inner = self.inner.write().await;
240        for entry in entries {
241            let index = entry.log_id.index;
242            let data = serde_json::to_vec(&entry).map_err(io::Error::other)?;
243            let path = inner.wal_dir.join(format!("{index}.json"));
244            write_file_atomic(&path, &data)?;
245            inner.log.insert(index, entry);
246        }
247        callback.io_completed(Ok(()));
248        Ok(())
249    }
250
251    async fn truncate_after(&mut self, last_log_id: Option<LogId<C>>) -> Result<(), io::Error> {
252        let mut inner = self.inner.write().await;
253        if let Some(id) = last_log_id {
254            let keys_to_remove: Vec<u64> =
255                inner.log.range((id.index + 1)..).map(|(k, _)| *k).collect();
256            for k in keys_to_remove {
257                inner.log.remove(&k);
258                let path = inner.wal_dir.join(format!("{k}.json"));
259                if path.exists() {
260                    std::fs::remove_file(&path)?;
261                }
262            }
263        } else {
264            // Remove all entries
265            for k in inner.log.keys() {
266                let path = inner.wal_dir.join(format!("{k}.json"));
267                if path.exists() {
268                    let _ = std::fs::remove_file(&path);
269                }
270            }
271            inner.log.clear();
272        }
273        Ok(())
274    }
275
276    async fn purge(&mut self, log_id: LogId<C>) -> Result<(), io::Error> {
277        let mut inner = self.inner.write().await;
278        let keys_to_remove: Vec<u64> = inner.log.range(..=log_id.index).map(|(k, _)| *k).collect();
279        for k in keys_to_remove {
280            inner.log.remove(&k);
281            let path = inner.wal_dir.join(format!("{k}.json"));
282            if path.exists() {
283                let _ = std::fs::remove_file(&path);
284            }
285        }
286        // Persist the purge marker
287        let purged_path = inner.wal_dir.join("purged.json");
288        let data = serde_json::to_vec(&log_id).map_err(io::Error::other)?;
289
290        inner.last_purged = Some(log_id);
291        write_file_atomic(&purged_path, &data)?;
292
293        Ok(())
294    }
295
296    async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), io::Error> {
297        let mut inner = self.inner.write().await;
298        let data = serde_json::to_vec(&committed).map_err(io::Error::other)?;
299        write_file_atomic(&inner.committed_path, &data)?;
300        inner.committed = committed;
301        Ok(())
302    }
303
304    async fn read_committed(&mut self) -> Result<Option<LogId<C>>, io::Error> {
305        let inner = self.inner.read().await;
306        Ok(inner.committed.clone())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::test_types::TestTypeConfig;
314
315    #[tokio::test]
316    async fn initial_state_is_empty() {
317        let dir = tempfile::tempdir().unwrap();
318        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
319        let state = store.get_log_state().await.unwrap();
320        assert!(state.last_log_id.is_none());
321        assert!(state.last_purged_log_id.is_none());
322    }
323
324    #[tokio::test]
325    async fn save_and_read_vote() {
326        let dir = tempfile::tempdir().unwrap();
327        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
328        let vote = openraft::vote::Vote::new(1, 1);
329        store.save_vote(&vote).await.unwrap();
330
331        let mut reader = store.get_log_reader().await;
332        let read = reader.read_vote().await.unwrap();
333        assert_eq!(read.unwrap(), vote);
334
335        // Verify persistence: create a new store from the same dir
336        let mut store2 = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
337        let mut reader2 = store2.get_log_reader().await;
338        let read2 = reader2.read_vote().await.unwrap();
339        assert_eq!(read2.unwrap(), vote);
340    }
341
342    #[tokio::test]
343    async fn vote_persists_across_restart() {
344        let dir = tempfile::tempdir().unwrap();
345
346        let vote = openraft::vote::Vote::new(3, 2);
347        {
348            let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
349            store.save_vote(&vote).await.unwrap();
350        }
351
352        // "Restart" by creating new store
353        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
354        let mut reader = store.get_log_reader().await;
355        let read = reader.read_vote().await.unwrap();
356        assert_eq!(read.unwrap(), vote);
357    }
358
359    #[tokio::test]
360    async fn committed_persists_across_restart() {
361        use openraft::vote::RaftLeaderId;
362        use openraft::vote::leader_id_adv::CommittedLeaderId;
363
364        let dir = tempfile::tempdir().unwrap();
365        let log_id = LogId::new(CommittedLeaderId::new(1, 1), 42);
366        {
367            let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
368            store.save_committed(Some(log_id)).await.unwrap();
369        }
370
371        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
372        let read = store.read_committed().await.unwrap();
373        assert_eq!(read.unwrap(), log_id);
374    }
375
376    #[tokio::test]
377    async fn wal_directory_created() {
378        let dir = tempfile::tempdir().unwrap();
379        let _store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
380        assert!(dir.path().join("raft/wal").exists());
381    }
382
383    use openraft::vote::RaftLeaderId;
384
385    #[tokio::test]
386    async fn truncate_after_none_on_empty() {
387        let dir = tempfile::tempdir().unwrap();
388        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
389        store.truncate_after(None).await.unwrap();
390        let state = store.get_log_state().await.unwrap();
391        assert!(state.last_log_id.is_none());
392    }
393
394    #[tokio::test]
395    async fn truncate_after_some_on_empty() {
396        use openraft::vote::leader_id_adv::CommittedLeaderId;
397        let dir = tempfile::tempdir().unwrap();
398        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
399        let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 5);
400        store.truncate_after(Some(log_id)).await.unwrap();
401    }
402
403    #[tokio::test]
404    async fn purge_sets_last_purged_and_writes_marker() {
405        use openraft::vote::leader_id_adv::CommittedLeaderId;
406        let dir = tempfile::tempdir().unwrap();
407        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
408        let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 3);
409        store.purge(log_id).await.unwrap();
410
411        let state = store.get_log_state().await.unwrap();
412        assert_eq!(state.last_purged_log_id.unwrap().index, 3);
413
414        // Verify purged.json written
415        assert!(dir.path().join("raft/wal/purged.json").exists());
416    }
417
418    #[tokio::test]
419    async fn purge_marker_persists_across_restart() {
420        use openraft::vote::leader_id_adv::CommittedLeaderId;
421        let dir = tempfile::tempdir().unwrap();
422        let log_id = openraft::LogId::new(CommittedLeaderId::new(1, 1), 7);
423        {
424            let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
425            store.purge(log_id).await.unwrap();
426        }
427
428        let mut store2 = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
429        let state = store2.get_log_state().await.unwrap();
430        assert_eq!(state.last_purged_log_id.unwrap().index, 7);
431    }
432
433    #[tokio::test]
434    async fn wal_entries_load_on_restart() {
435        use crate::test_types::{TestCommand, TestTypeConfig};
436        use openraft::vote::leader_id_adv::CommittedLeaderId;
437        use openraft::{Entry, EntryPayload, LogId};
438
439        let dir = tempfile::tempdir().unwrap();
440        let wal_dir = dir.path().join("raft/wal");
441        std::fs::create_dir_all(&wal_dir).unwrap();
442
443        // Manually write WAL entry files
444        for i in 1..=3u64 {
445            let entry = Entry::<TestTypeConfig> {
446                log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
447                payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
448            };
449            let data = serde_json::to_vec(&entry).unwrap();
450            std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
451        }
452
453        // Create store — should load the entries
454        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
455        let state = store.get_log_state().await.unwrap();
456        assert_eq!(state.last_log_id.unwrap().index, 3);
457
458        // Read entries via reader
459        let mut reader = store.get_log_reader().await;
460        let entries = reader.try_get_log_entries(1..=3).await.unwrap();
461        assert_eq!(entries.len(), 3);
462    }
463
464    #[tokio::test]
465    async fn truncate_after_with_wal_entries() {
466        use crate::test_types::{TestCommand, TestTypeConfig};
467        use openraft::vote::leader_id_adv::CommittedLeaderId;
468        use openraft::{Entry, EntryPayload, LogId};
469
470        let dir = tempfile::tempdir().unwrap();
471        let wal_dir = dir.path().join("raft/wal");
472        std::fs::create_dir_all(&wal_dir).unwrap();
473
474        // Write entries 1..=5
475        for i in 1..=5u64 {
476            let entry = Entry::<TestTypeConfig> {
477                log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
478                payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
479            };
480            let data = serde_json::to_vec(&entry).unwrap();
481            std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
482        }
483
484        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
485
486        // Truncate after index 3 (remove 4, 5)
487        let log_id = LogId::new(CommittedLeaderId::new(1, 1), 3);
488        store.truncate_after(Some(log_id)).await.unwrap();
489
490        let state = store.get_log_state().await.unwrap();
491        assert_eq!(state.last_log_id.unwrap().index, 3);
492
493        // Verify WAL files removed
494        assert!(!wal_dir.join("4.json").exists());
495        assert!(!wal_dir.join("5.json").exists());
496        assert!(wal_dir.join("3.json").exists());
497    }
498
499    #[tokio::test]
500    async fn purge_removes_wal_entries() {
501        use crate::test_types::{TestCommand, TestTypeConfig};
502        use openraft::vote::leader_id_adv::CommittedLeaderId;
503        use openraft::{Entry, EntryPayload, LogId};
504
505        let dir = tempfile::tempdir().unwrap();
506        let wal_dir = dir.path().join("raft/wal");
507        std::fs::create_dir_all(&wal_dir).unwrap();
508
509        for i in 1..=5u64 {
510            let entry = Entry::<TestTypeConfig> {
511                log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
512                payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
513            };
514            let data = serde_json::to_vec(&entry).unwrap();
515            std::fs::write(wal_dir.join(format!("{i}.json")), &data).unwrap();
516        }
517
518        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
519
520        // Purge up to index 3 (remove 1, 2, 3)
521        let log_id = LogId::new(CommittedLeaderId::new(1, 1), 3);
522        store.purge(log_id).await.unwrap();
523
524        // Verify files removed
525        assert!(!wal_dir.join("1.json").exists());
526        assert!(!wal_dir.join("2.json").exists());
527        assert!(!wal_dir.join("3.json").exists());
528        assert!(wal_dir.join("4.json").exists());
529        assert!(wal_dir.join("5.json").exists());
530
531        // Verify purged marker
532        assert!(wal_dir.join("purged.json").exists());
533    }
534
535    #[tokio::test]
536    async fn get_log_reader_reads_entries() {
537        use crate::test_types::TestCommand;
538        use openraft::vote::leader_id_adv::CommittedLeaderId;
539        use openraft::{Entry, EntryPayload, LogId};
540
541        let dir = tempfile::tempdir().unwrap();
542        let wal_dir = dir.path().join("raft/wal");
543        std::fs::create_dir_all(&wal_dir).unwrap();
544
545        let entry = Entry::<TestTypeConfig> {
546            log_id: LogId::new(CommittedLeaderId::new(1, 1), 1),
547            payload: EntryPayload::Normal(TestCommand::Set("a".into(), "b".into())),
548        };
549        std::fs::write(wal_dir.join("1.json"), serde_json::to_vec(&entry).unwrap()).unwrap();
550
551        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
552        let mut reader = store.get_log_reader().await;
553        let entries = reader.try_get_log_entries(1..=1).await.unwrap();
554        assert_eq!(entries.len(), 1);
555    }
556
557    #[tokio::test]
558    async fn corrupt_vote_json_starts_fresh() {
559        let dir = tempfile::tempdir().unwrap();
560        let raft_dir = dir.path().join("raft");
561        std::fs::create_dir_all(raft_dir.join("wal")).unwrap();
562        std::fs::write(raft_dir.join("vote.json"), b"not valid json").unwrap();
563
564        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
565        let mut reader = store.get_log_reader().await;
566        let vote = reader.read_vote().await.unwrap();
567        assert!(vote.is_none());
568    }
569
570    #[tokio::test]
571    async fn corrupt_committed_json_starts_fresh() {
572        let dir = tempfile::tempdir().unwrap();
573        let raft_dir = dir.path().join("raft");
574        std::fs::create_dir_all(raft_dir.join("wal")).unwrap();
575        std::fs::write(raft_dir.join("committed.json"), b"garbage").unwrap();
576
577        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
578        let committed = store.read_committed().await.unwrap();
579        assert!(committed.is_none());
580    }
581
582    #[tokio::test]
583    async fn wal_skips_non_json_and_invalid_files() {
584        use crate::test_types::TestCommand;
585        use openraft::vote::leader_id_adv::CommittedLeaderId;
586        use openraft::{Entry, EntryPayload, LogId};
587
588        let dir = tempfile::tempdir().unwrap();
589        let wal_dir = dir.path().join("raft/wal");
590        std::fs::create_dir_all(&wal_dir).unwrap();
591
592        // Non-JSON file should be skipped
593        std::fs::write(wal_dir.join("notes.txt"), b"not a wal entry").unwrap();
594        // Non-numeric JSON file should be skipped
595        std::fs::write(wal_dir.join("abc.json"), b"not a number index").unwrap();
596        // Corrupt JSON entry should be skipped
597        std::fs::write(wal_dir.join("99.json"), b"not valid entry json").unwrap();
598        let entry = Entry::<TestTypeConfig> {
599            log_id: LogId::new(CommittedLeaderId::new(1, 1), 1),
600            payload: EntryPayload::Normal(TestCommand::Set("a".into(), "b".into())),
601        };
602        std::fs::write(wal_dir.join("1.json"), serde_json::to_vec(&entry).unwrap()).unwrap();
603
604        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
605        let state = store.get_log_state().await.unwrap();
606        // Only the valid entry should be loaded
607        assert_eq!(state.last_log_id.unwrap().index, 1);
608    }
609
610    #[tokio::test]
611    async fn truncate_after_none_removes_all_entries() {
612        use crate::test_types::TestCommand;
613        use openraft::vote::leader_id_adv::CommittedLeaderId;
614        use openraft::{Entry, EntryPayload, LogId};
615
616        let dir = tempfile::tempdir().unwrap();
617        let wal_dir = dir.path().join("raft/wal");
618        std::fs::create_dir_all(&wal_dir).unwrap();
619
620        for i in 1..=3u64 {
621            let entry = Entry::<TestTypeConfig> {
622                log_id: LogId::new(CommittedLeaderId::new(1, 1), i),
623                payload: EntryPayload::Normal(TestCommand::Set(format!("k{i}"), format!("v{i}"))),
624            };
625            std::fs::write(
626                wal_dir.join(format!("{i}.json")),
627                serde_json::to_vec(&entry).unwrap(),
628            )
629            .unwrap();
630        }
631
632        let mut store = FileLogStore::<TestTypeConfig>::new(dir.path()).unwrap();
633        store.truncate_after(None).await.unwrap();
634
635        let state = store.get_log_state().await.unwrap();
636        assert!(state.last_log_id.is_none());
637
638        // Verify WAL files removed
639        assert!(!wal_dir.join("1.json").exists());
640        assert!(!wal_dir.join("2.json").exists());
641        assert!(!wal_dir.join("3.json").exists());
642    }
643}