lol_core/storage/
file.rs

1use super::{Ballot, Entry};
2use crate::Index;
3use anyhow::Result;
4use std::collections::BTreeSet;
5use std::path::{Path, PathBuf};
6use tokio::fs;
7
8fn extract_entry_index(path: &Path) -> Index {
9    let name = path.file_name().unwrap();
10    let name = name.to_str().unwrap();
11    name.parse().unwrap()
12}
13
14pub struct Storage {
15    root_dir: PathBuf,
16}
17impl Storage {
18    fn ballot_path(&self) -> PathBuf {
19        self.root_dir.join("ballot")
20    }
21    fn entry_path(&self, i: Index) -> PathBuf {
22        self.root_dir.join("entry").join(format!("{i}"))
23    }
24    async fn entries(&self) -> Result<BTreeSet<Index>> {
25        let root_dir = self.root_dir.join("entry");
26        let mut dir_iter = tokio::fs::read_dir(root_dir).await?;
27        let mut out = BTreeSet::new();
28        while let Some(entry) = dir_iter.next_entry().await? {
29            let idx = extract_entry_index(&entry.path());
30            out.insert(idx);
31        }
32        Ok(out)
33    }
34    pub fn destory(root_dir: &Path) -> Result<()> {
35        std::fs::remove_dir_all(root_dir).ok();
36        Ok(())
37    }
38    /// Create the initial state.
39    /// You should call `destory` before calling this function.
40    pub fn create(root_dir: &Path) -> Result<()> {
41        std::fs::create_dir(root_dir)?;
42        std::fs::create_dir(root_dir.join("entry"))?;
43        let init_ballot = Ballot::new();
44        let init_ballot: Vec<u8> = init_ballot.into();
45        let ballot_path = root_dir.join("ballot");
46        std::fs::write(ballot_path, init_ballot)?;
47        Ok(())
48    }
49    pub fn open(root_dir: &Path) -> Result<Self> {
50        Ok(Self {
51            root_dir: root_dir.to_owned(),
52        })
53    }
54}
55#[async_trait::async_trait]
56impl super::RaftStorage for Storage {
57    async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
58        let path = self.entry_path(i);
59        let bin: Vec<u8> = e.into();
60        tokio::fs::write(path, bin).await?;
61        Ok(())
62    }
63    async fn delete_entry(&self, i: Index) -> Result<()> {
64        let path = self.entry_path(i);
65        fs::remove_file(&path).await?;
66        Ok(())
67    }
68    async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
69        let path = self.entry_path(i);
70        if !path.exists() {
71            return Ok(None);
72        }
73        let bin = tokio::fs::read(&path).await?;
74        let entry = Entry::from(bin);
75        Ok(Some(entry))
76    }
77    async fn get_head_index(&self) -> Result<Index> {
78        let entries = self.entries().await?;
79        let r = match entries.iter().next() {
80            Some(k) => *k,
81            None => 0,
82        };
83        Ok(r)
84    }
85    async fn get_last_index(&self) -> Result<Index> {
86        let entries = self.entries().await?;
87        let r = match entries.iter().next_back() {
88            Some(k) => *k,
89            None => 0,
90        };
91        Ok(r)
92    }
93    async fn save_ballot(&self, v: Ballot) -> Result<()> {
94        let path = self.ballot_path();
95        let bin: Vec<u8> = v.into();
96        tokio::fs::write(path, bin).await?;
97        Ok(())
98    }
99    async fn load_ballot(&self) -> Result<Ballot> {
100        let path = self.ballot_path();
101        let bin = tokio::fs::read(path).await?;
102        Ok(Ballot::from(bin))
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use serial_test::serial;
109
110    use super::*;
111    use crate::storage;
112
113    #[test]
114    fn test_extract_entry_index() {
115        let path1 = Path::new("/root/entry/10");
116        assert_eq!(extract_entry_index(&path1), 10);
117
118        let path2 = Path::new("/root/entry/10000000000000");
119        assert_eq!(extract_entry_index(&path2), 10000000000000);
120    }
121
122    #[tokio::test]
123    #[serial]
124    async fn test_file_storage() -> Result<()> {
125        let _ = std::fs::create_dir("/tmp/lol");
126        let path = Path::new("/tmp/lol/file.db");
127        Storage::destory(&path).unwrap();
128        Storage::create(&path).unwrap();
129        let s = Storage::open(&path).unwrap();
130
131        storage::test_storage(s).await?;
132
133        Storage::destory(&path).unwrap();
134        Ok(())
135    }
136
137    #[tokio::test]
138    #[serial]
139    async fn test_file_storage_persistency() -> Result<()> {
140        let _ = std::fs::create_dir("/tmp/lol");
141        let path = Path::new("/tmp/lol/file.db");
142        Storage::destory(&path).unwrap();
143        Storage::create(&path).unwrap();
144
145        let s = Storage::open(&path).unwrap();
146        storage::persistency::test_pre_close(s).await?;
147
148        let s = Storage::open(&path).unwrap();
149        storage::persistency::test_post_close(s).await?;
150
151        Storage::destory(&path).unwrap();
152        Ok(())
153    }
154}