lol_core/storage/
rocksdb.rs1use super::{Ballot, Entry};
2use crate::Index;
3use anyhow::Result;
4use rocksdb::{ColumnFamilyDescriptor, Options, DB};
5use std::cmp::Ordering;
6use std::path::Path;
7
8const CF_ENTRY: &str = "entry";
9const BALLOT_KEY: &str = "ballot";
10const CMP: &str = "index_asc";
11
12#[derive(serde::Serialize, serde::Deserialize, Debug)]
13struct IndexKey(u64);
14fn encode_index(i: Index) -> Vec<u8> {
15 bincode::serialize(&IndexKey(i)).unwrap()
16}
17fn decode_index(s: &[u8]) -> Index {
18 let x: IndexKey = bincode::deserialize(s).unwrap();
19 x.0
20}
21fn comparator_fn(x: &[u8], y: &[u8]) -> Ordering {
22 let x: Index = decode_index(x);
23 let y: Index = decode_index(y);
24 x.cmp(&y)
25}
26
27impl Storage {
28 pub fn destroy(path: &Path) -> Result<()> {
29 let opts = Options::default();
30 DB::destroy(&opts, path)?;
31 Ok(())
32 }
33 pub fn create(path: &Path) -> Result<()> {
36 let mut db_opts = Options::default();
37 db_opts.create_if_missing(true);
38 db_opts.create_missing_column_families(true);
39 let mut opts = Options::default();
40 opts.set_comparator(CMP, comparator_fn);
41 let cf_descs = vec![ColumnFamilyDescriptor::new(CF_ENTRY, opts)];
42 let db = DB::open_cf_descriptors(&db_opts, path, cf_descs)?;
43
44 let initial_ballot = Ballot {
45 cur_term: 0,
46 voted_for: None,
47 };
48 let b: Vec<u8> = initial_ballot.into();
49 db.put(BALLOT_KEY, b)?;
50
51 Ok(())
52 }
53 fn open_db(path: &Path) -> Result<DB> {
54 let db_opts = Options::default();
55 let mut opts = Options::default();
56 opts.set_comparator(CMP, comparator_fn);
57 let cf_descs = vec![ColumnFamilyDescriptor::new(CF_ENTRY, opts)];
58 let db = DB::open_cf_descriptors(&db_opts, path, cf_descs)?;
59 Ok(db)
60 }
61 pub fn open(path: &Path) -> Result<Self> {
62 let db = Self::open_db(path)?;
63 Ok(Self { db })
64 }
65}
66pub struct Storage {
67 db: DB,
68}
69#[async_trait::async_trait]
70impl super::RaftStorage for Storage {
71 async fn delete_entry(&self, i: Index) -> Result<()> {
72 let cf = self.db.cf_handle(CF_ENTRY).unwrap();
73 self.db.delete_cf(cf, encode_index(i))?;
74 Ok(())
75 }
76 async fn get_head_index(&self) -> Result<Index> {
77 let cf = self.db.cf_handle(CF_ENTRY).unwrap();
78 let mut iter = self.db.raw_iterator_cf(cf);
79 iter.seek_to_first();
80 if !iter.valid() {
82 return Ok(0);
83 }
84 let key = iter.key().unwrap();
85 let v = decode_index(key);
86 Ok(v)
87 }
88 async fn get_last_index(&self) -> Result<Index> {
89 let cf = self.db.cf_handle(CF_ENTRY).unwrap();
90 let mut iter = self.db.raw_iterator_cf(cf);
91 iter.seek_to_last();
92 if !iter.valid() {
94 return Ok(0);
95 }
96 let key = iter.key().unwrap();
97 let v = decode_index(key);
98 Ok(v)
99 }
100 async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
101 let cf = self.db.cf_handle(CF_ENTRY).unwrap();
102 let b: Vec<u8> = e.into();
103 self.db.put_cf(&cf, encode_index(i), b)?;
104 Ok(())
105 }
106 async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
107 let cf = self.db.cf_handle(CF_ENTRY).unwrap();
108 let b: Option<Vec<u8>> = self.db.get_cf(&cf, encode_index(i))?;
109 Ok(b.map(|x| x.into()))
110 }
111 async fn save_ballot(&self, v: Ballot) -> Result<()> {
112 let b: Vec<u8> = v.into();
113 self.db.put(BALLOT_KEY, b)?;
114 Ok(())
115 }
116 async fn load_ballot(&self) -> Result<Ballot> {
117 let b = self.db.get(BALLOT_KEY)?.unwrap();
118 let v = b.into();
119 Ok(v)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use serial_test::serial;
126
127 use super::*;
128 use crate::storage;
129 #[tokio::test]
130 #[serial]
131 async fn test_rocksdb_storage() -> Result<()> {
132 let _ = std::fs::create_dir("/tmp/lol");
133 let path = Path::new("/tmp/lol/disk.db");
134 Storage::destroy(path)?;
135 Storage::create(path)?;
136
137 let s = Storage::open(path)?;
138 storage::test_storage(s).await?;
139
140 Storage::destroy(path)?;
141 Ok(())
142 }
143
144 #[tokio::test]
145 #[serial]
146 async fn test_rocksdb_persistency() -> Result<()> {
147 let _ = std::fs::create_dir("/tmp/lol");
148 let path = Path::new("/tmp/lol/disk.db");
149 Storage::destroy(path)?;
150 Storage::create(path)?;
151
152 let s = Storage::open(path)?;
153 storage::persistency::test_pre_close(s).await?;
154
155 let s = Storage::open(path)?;
156 storage::persistency::test_post_close(s).await?;
157
158 Storage::destroy(path)?;
159
160 Ok(())
161 }
162}