lol_core/storage/
rocksdb.rs

1use super::{Ballot, Entry};
2use crate::Index;
3use anyhow::Result;
4use rocksdb::{ColumnFamilyDescriptor, Options, DB};
5use std::cmp::Ordering;
6use std::path::Path;
7
8const CF_ENTRY: &str = "entry";
9const BALLOT_KEY: &str = "ballot";
10const CMP: &str = "index_asc";
11
12#[derive(serde::Serialize, serde::Deserialize, Debug)]
13struct IndexKey(u64);
14fn encode_index(i: Index) -> Vec<u8> {
15    bincode::serialize(&IndexKey(i)).unwrap()
16}
17fn decode_index(s: &[u8]) -> Index {
18    let x: IndexKey = bincode::deserialize(s).unwrap();
19    x.0
20}
21fn comparator_fn(x: &[u8], y: &[u8]) -> Ordering {
22    let x: Index = decode_index(x);
23    let y: Index = decode_index(y);
24    x.cmp(&y)
25}
26
27impl Storage {
28    pub fn destroy(path: &Path) -> Result<()> {
29        let opts = Options::default();
30        DB::destroy(&opts, path)?;
31        Ok(())
32    }
33    /// Create the initial state.
34    /// You should call `destory` before calling this function.
35    pub fn create(path: &Path) -> Result<()> {
36        let mut db_opts = Options::default();
37        db_opts.create_if_missing(true);
38        db_opts.create_missing_column_families(true);
39        let mut opts = Options::default();
40        opts.set_comparator(CMP, comparator_fn);
41        let cf_descs = vec![ColumnFamilyDescriptor::new(CF_ENTRY, opts)];
42        let db = DB::open_cf_descriptors(&db_opts, path, cf_descs)?;
43
44        let initial_ballot = Ballot {
45            cur_term: 0,
46            voted_for: None,
47        };
48        let b: Vec<u8> = initial_ballot.into();
49        db.put(BALLOT_KEY, b)?;
50
51        Ok(())
52    }
53    fn open_db(path: &Path) -> Result<DB> {
54        let db_opts = Options::default();
55        let mut opts = Options::default();
56        opts.set_comparator(CMP, comparator_fn);
57        let cf_descs = vec![ColumnFamilyDescriptor::new(CF_ENTRY, opts)];
58        let db = DB::open_cf_descriptors(&db_opts, path, cf_descs)?;
59        Ok(db)
60    }
61    pub fn open(path: &Path) -> Result<Self> {
62        let db = Self::open_db(path)?;
63        Ok(Self { db })
64    }
65}
66pub struct Storage {
67    db: DB,
68}
69#[async_trait::async_trait]
70impl super::RaftStorage for Storage {
71    async fn delete_entry(&self, i: Index) -> Result<()> {
72        let cf = self.db.cf_handle(CF_ENTRY).unwrap();
73        self.db.delete_cf(cf, encode_index(i))?;
74        Ok(())
75    }
76    async fn get_head_index(&self) -> Result<Index> {
77        let cf = self.db.cf_handle(CF_ENTRY).unwrap();
78        let mut iter = self.db.raw_iterator_cf(cf);
79        iter.seek_to_first();
80        // The iterator is empty
81        if !iter.valid() {
82            return Ok(0);
83        }
84        let key = iter.key().unwrap();
85        let v = decode_index(key);
86        Ok(v)
87    }
88    async fn get_last_index(&self) -> Result<Index> {
89        let cf = self.db.cf_handle(CF_ENTRY).unwrap();
90        let mut iter = self.db.raw_iterator_cf(cf);
91        iter.seek_to_last();
92        // The iterator is empty
93        if !iter.valid() {
94            return Ok(0);
95        }
96        let key = iter.key().unwrap();
97        let v = decode_index(key);
98        Ok(v)
99    }
100    async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
101        let cf = self.db.cf_handle(CF_ENTRY).unwrap();
102        let b: Vec<u8> = e.into();
103        self.db.put_cf(&cf, encode_index(i), b)?;
104        Ok(())
105    }
106    async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
107        let cf = self.db.cf_handle(CF_ENTRY).unwrap();
108        let b: Option<Vec<u8>> = self.db.get_cf(&cf, encode_index(i))?;
109        Ok(b.map(|x| x.into()))
110    }
111    async fn save_ballot(&self, v: Ballot) -> Result<()> {
112        let b: Vec<u8> = v.into();
113        self.db.put(BALLOT_KEY, b)?;
114        Ok(())
115    }
116    async fn load_ballot(&self) -> Result<Ballot> {
117        let b = self.db.get(BALLOT_KEY)?.unwrap();
118        let v = b.into();
119        Ok(v)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use serial_test::serial;
126
127    use super::*;
128    use crate::storage;
129    #[tokio::test]
130    #[serial]
131    async fn test_rocksdb_storage() -> Result<()> {
132        let _ = std::fs::create_dir("/tmp/lol");
133        let path = Path::new("/tmp/lol/disk.db");
134        Storage::destroy(path)?;
135        Storage::create(path)?;
136
137        let s = Storage::open(path)?;
138        storage::test_storage(s).await?;
139
140        Storage::destroy(path)?;
141        Ok(())
142    }
143
144    #[tokio::test]
145    #[serial]
146    async fn test_rocksdb_persistency() -> Result<()> {
147        let _ = std::fs::create_dir("/tmp/lol");
148        let path = Path::new("/tmp/lol/disk.db");
149        Storage::destroy(path)?;
150        Storage::create(path)?;
151
152        let s = Storage::open(path)?;
153        storage::persistency::test_pre_close(s).await?;
154
155        let s = Storage::open(path)?;
156        storage::persistency::test_post_close(s).await?;
157
158        Storage::destroy(path)?;
159
160        Ok(())
161    }
162}