solana_kvstore/
lib.rs

1use crate::mapper::{Disk, Mapper, Memory};
2use crate::sstable::SSTable;
3use crate::storage::MemTable;
4use crate::writelog::WriteLog;
5use std::collections::BTreeMap;
6use std::fs;
7use std::io;
8use std::ops::RangeInclusive;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{Receiver, Sender};
12use std::sync::{Arc, Mutex, RwLock};
13use std::thread::JoinHandle;
14
15mod compactor;
16mod error;
17mod io_utils;
18mod mapper;
19mod readtx;
20mod sstable;
21mod storage;
22mod writebatch;
23mod writelog;
24
25#[macro_use]
26extern crate serde_derive;
27
28pub use self::error::{Error, Result};
29pub use self::readtx::ReadTx as Snapshot;
30pub use self::sstable::Key;
31pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch};
32pub use self::writelog::Config as LogConfig;
33
34const TABLES_FILE: &str = "tables.meta";
35const LOG_FILE: &str = "mem-log";
36const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024;
37const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024;
38const DEFAULT_MAX_PAGES: usize = 10;
39const COMMIT_ORDERING: Ordering = Ordering::Relaxed;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub struct Config {
43    pub max_mem: usize,
44    pub max_tables: usize,
45    pub page_size: usize,
46    pub in_memory: bool,
47    pub log_config: LogConfig,
48}
49
50#[derive(Debug)]
51pub struct KvStore {
52    config: Config,
53    root: PathBuf,
54    commit: AtomicUsize,
55    mem: RwLock<MemTable>,
56    log: Arc<RwLock<WriteLog>>,
57    tables: RwLock<Vec<BTreeMap<Key, SSTable>>>,
58    mapper: Arc<dyn Mapper>,
59    sender: Mutex<Sender<compactor::Req>>,
60    receiver: Mutex<Receiver<compactor::Resp>>,
61    compactor_handle: JoinHandle<()>,
62}
63
64impl KvStore {
65    pub fn open_default<P>(root: P) -> Result<Self>
66    where
67        P: AsRef<Path>,
68    {
69        let mapper = Disk::single(root.as_ref());
70        open(root.as_ref(), Arc::new(mapper), Config::default())
71    }
72
73    pub fn open<P>(root: P, config: Config) -> Result<Self>
74    where
75        P: AsRef<Path>,
76    {
77        let mapper: Arc<dyn Mapper> = if config.in_memory {
78            Arc::new(Memory::new())
79        } else {
80            Arc::new(Disk::single(root.as_ref()))
81        };
82        open(root.as_ref(), mapper, config)
83    }
84
85    pub fn partitioned<P, P2>(root: P, storage_dirs: &[P2], config: Config) -> Result<Self>
86    where
87        P: AsRef<Path>,
88        P2: AsRef<Path>,
89    {
90        let mapper = Disk::new(storage_dirs);
91        open(root.as_ref(), Arc::new(mapper), config)
92    }
93
94    pub fn config(&self) -> &Config {
95        &self.config
96    }
97
98    pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> {
99        let mut memtable = self.mem.write().unwrap();
100        let mut log = self.log.write().unwrap();
101        let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
102
103        log.log_put(key, commit, data).unwrap();
104        memtable.put(key, commit, data);
105
106        self.ensure_memtable(&mut *memtable, &mut *log)?;
107
108        Ok(())
109    }
110
111    pub fn put_many<Iter, Tup, K, V>(&self, rows: Iter) -> Result<()>
112    where
113        Iter: Iterator<Item = Tup>,
114        Tup: std::borrow::Borrow<(K, V)>,
115        K: std::borrow::Borrow<Key>,
116        V: std::borrow::Borrow<[u8]>,
117    {
118        let mut memtable = self.mem.write().unwrap();
119        let mut log = self.log.write().unwrap();
120        let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
121
122        for pair in rows {
123            let (ref k, ref d) = pair.borrow();
124            let (key, data) = (k.borrow(), d.borrow());
125
126            log.log_put(key, commit, data).unwrap();
127            memtable.put(key, commit, data);
128        }
129
130        self.ensure_memtable(&mut *memtable, &mut *log)?;
131
132        Ok(())
133    }
134
135    pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
136        self.query_compactor()?;
137
138        let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
139
140        storage::get(&memtable.values, &*tables, key)
141    }
142
143    pub fn delete(&self, key: &Key) -> Result<()> {
144        let mut memtable = self.mem.write().unwrap();
145        let mut log = self.log.write().unwrap();
146        let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
147
148        log.log_delete(key, commit).unwrap();
149        memtable.delete(key, commit);
150
151        self.ensure_memtable(&mut *memtable, &mut *log)?;
152
153        Ok(())
154    }
155
156    pub fn delete_many<Iter, K>(&self, rows: Iter) -> Result<()>
157    where
158        Iter: Iterator<Item = K>,
159        K: std::borrow::Borrow<Key>,
160    {
161        let mut memtable = self.mem.write().unwrap();
162        let mut log = self.log.write().unwrap();
163        let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
164
165        for k in rows {
166            let key = k.borrow();
167            log.log_delete(key, commit).unwrap();
168            memtable.delete(key, commit);
169        }
170
171        self.ensure_memtable(&mut *memtable, &mut *log)?;
172
173        Ok(())
174    }
175
176    pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch {
177        let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
178
179        WriteBatch {
180            config,
181            commit,
182            memtable: MemTable::new(BTreeMap::new()),
183            log: Arc::clone(&self.log),
184        }
185    }
186
187    pub fn commit(&self, mut batch: WriteBatch) -> Result<()> {
188        let mut memtable = self.mem.write().unwrap();
189        let mut log = self.log.write().unwrap();
190
191        memtable.values.append(&mut batch.memtable.values);
192        self.ensure_memtable(&mut *memtable, &mut *log)?;
193
194        Ok(())
195    }
196
197    pub fn snapshot(&self) -> Snapshot {
198        let (memtable, tables) = (
199            self.mem.read().unwrap().values.clone(),
200            self.tables.read().unwrap().clone(),
201        );
202
203        Snapshot::new(memtable, tables)
204    }
205
206    pub fn range(
207        &self,
208        range: RangeInclusive<Key>,
209    ) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
210        self.query_compactor()?;
211
212        let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
213
214        storage::range(&memtable.values, &*tables, range)
215    }
216
217    pub fn destroy<P>(path: P) -> Result<()>
218    where
219        P: AsRef<Path>,
220    {
221        let path = path.as_ref();
222        if !path.exists() {
223            return Ok(());
224        }
225
226        fs::remove_dir_all(path)?;
227        Ok(())
228    }
229
230    fn query_compactor(&self) -> Result<()> {
231        if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = (
232            self.sender.try_lock(),
233            self.receiver.try_lock(),
234            self.tables.try_write(),
235        ) {
236            query_compactor(
237                &self.root,
238                &*self.mapper,
239                &mut *tables,
240                &mut *receiver,
241                &mut *sender,
242            )?;
243        }
244
245        Ok(())
246    }
247
248    fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> {
249        if mem.mem_size < self.config.max_mem {
250            return Ok(());
251        }
252
253        let mut tables = self.tables.write().unwrap();
254
255        storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?;
256        mem.values.clear();
257        mem.mem_size = 0;
258        log.reset().expect("Write-log rotation failed");
259
260        if is_lvl0_full(&tables, &self.config) {
261            let sender = self.sender.lock().unwrap();
262
263            sender.send(compactor::Req::Start(PathBuf::new()))?;
264        }
265
266        Ok(())
267    }
268}
269
270impl Default for Config {
271    fn default() -> Config {
272        Config {
273            max_mem: DEFAULT_MEM_SIZE,
274            max_tables: DEFAULT_MAX_PAGES,
275            page_size: DEFAULT_TABLE_SIZE,
276            in_memory: false,
277            log_config: LogConfig::default(),
278        }
279    }
280}
281
282fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore> {
283    let root = root.to_path_buf();
284    let log_path = root.join(LOG_FILE);
285    let restore_log = log_path.exists();
286
287    if !root.exists() {
288        fs::create_dir(&root)?;
289    }
290
291    let commit = chrono::Utc::now().timestamp();
292    let mut log = WriteLog::open(&log_path, config.log_config)?;
293    let values = if restore_log && !config.in_memory {
294        log.materialize()?
295    } else {
296        BTreeMap::new()
297    };
298    let mem = MemTable::new(values);
299
300    let tables = load_tables(&root, &*mapper)?;
301
302    let cfg = compactor::Config {
303        max_pages: config.max_tables,
304        page_size: config.page_size,
305    };
306    let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
307        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
308
309    Ok(KvStore {
310        config,
311        root,
312        commit: AtomicUsize::new(commit as usize),
313        mem: RwLock::new(mem),
314        log: Arc::new(RwLock::new(log)),
315        tables: RwLock::new(tables),
316        mapper,
317        sender: Mutex::new(sender),
318        receiver: Mutex::new(receiver),
319        compactor_handle,
320    })
321}
322
323fn load_tables(root: &Path, mapper: &dyn Mapper) -> Result<Vec<BTreeMap<Key, SSTable>>> {
324    let mut tables = Vec::new();
325    let meta_path = root.join(TABLES_FILE);
326
327    if meta_path.exists() {
328        mapper.load_state_from(&meta_path)?;
329        tables = SSTable::sorted_tables(&mapper.active_set()?);
330    }
331
332    Ok(tables)
333}
334
335fn dump_tables(root: &Path, mapper: &dyn Mapper) -> Result<()> {
336    mapper.serialize_state_to(&root.join(TABLES_FILE))?;
337    Ok(())
338}
339
340fn query_compactor(
341    root: &Path,
342    mapper: &dyn Mapper,
343    tables: &mut Vec<BTreeMap<Key, SSTable>>,
344    receiver: &mut Receiver<compactor::Resp>,
345    sender: &mut Sender<compactor::Req>,
346) -> Result<()> {
347    match receiver.try_recv() {
348        Ok(compactor::Resp::Done(new_tables)) => {
349            std::mem::replace(tables, new_tables);
350            dump_tables(root, mapper)?;
351            sender.send(compactor::Req::Gc).unwrap();
352        }
353        Ok(compactor::Resp::Failed(e)) => {
354            return Err(e);
355        }
356        // Nothing available, do nothing
357        _ => {}
358    }
359
360    Ok(())
361}
362
363#[inline]
364fn is_lvl0_full(tables: &[BTreeMap<Key, SSTable>], config: &Config) -> bool {
365    if tables.is_empty() {
366        false
367    } else {
368        tables[0].len() > config.max_tables
369    }
370}
371
372pub mod test {
373    pub mod gen {
374        use crate::Key;
375        use rand::distributions::Uniform;
376        use rand::{rngs::SmallRng, FromEntropy, Rng};
377        use std::iter;
378        use std::ops::Range;
379
380        pub fn keys() -> impl Iterator<Item = Key> {
381            let mut rng = SmallRng::from_entropy();
382            iter::repeat_with(move || Key(rng.gen()))
383        }
384
385        pub fn data(size: usize) -> impl Iterator<Item = Vec<u8>> {
386            iter::repeat(vec![0; size])
387        }
388
389        pub fn data_vary(range: Range<u64>) -> impl Iterator<Item = Vec<u8>> {
390            let dist = Uniform::from(range);
391            let mut rng = SmallRng::from_entropy();
392
393            iter::repeat_with(move || {
394                let size: u64 = rng.sample(dist);
395                vec![0; size as usize]
396            })
397        }
398
399        pub fn pairs(size: usize) -> impl Iterator<Item = (Key, Vec<u8>)> {
400            keys().zip(data(size))
401        }
402
403        pub fn pairs_vary(range: Range<u64>) -> impl Iterator<Item = (Key, Vec<u8>)> {
404            keys().zip(data_vary(range))
405        }
406    }
407}