1use crate::mapper::{Disk, Mapper, Memory};
2use crate::sstable::SSTable;
3use crate::storage::MemTable;
4use crate::writelog::WriteLog;
5use std::collections::BTreeMap;
6use std::fs;
7use std::io;
8use std::ops::RangeInclusive;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{Receiver, Sender};
12use std::sync::{Arc, Mutex, RwLock};
13use std::thread::JoinHandle;
14
15mod compactor;
16mod error;
17mod io_utils;
18mod mapper;
19mod readtx;
20mod sstable;
21mod storage;
22mod writebatch;
23mod writelog;
24
25#[macro_use]
26extern crate serde_derive;
27
28pub use self::error::{Error, Result};
29pub use self::readtx::ReadTx as Snapshot;
30pub use self::sstable::Key;
31pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch};
32pub use self::writelog::Config as LogConfig;
33
34const TABLES_FILE: &str = "tables.meta";
35const LOG_FILE: &str = "mem-log";
36const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024;
37const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024;
38const DEFAULT_MAX_PAGES: usize = 10;
39const COMMIT_ORDERING: Ordering = Ordering::Relaxed;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub struct Config {
43 pub max_mem: usize,
44 pub max_tables: usize,
45 pub page_size: usize,
46 pub in_memory: bool,
47 pub log_config: LogConfig,
48}
49
50#[derive(Debug)]
51pub struct KvStore {
52 config: Config,
53 root: PathBuf,
54 commit: AtomicUsize,
55 mem: RwLock<MemTable>,
56 log: Arc<RwLock<WriteLog>>,
57 tables: RwLock<Vec<BTreeMap<Key, SSTable>>>,
58 mapper: Arc<dyn Mapper>,
59 sender: Mutex<Sender<compactor::Req>>,
60 receiver: Mutex<Receiver<compactor::Resp>>,
61 compactor_handle: JoinHandle<()>,
62}
63
64impl KvStore {
65 pub fn open_default<P>(root: P) -> Result<Self>
66 where
67 P: AsRef<Path>,
68 {
69 let mapper = Disk::single(root.as_ref());
70 open(root.as_ref(), Arc::new(mapper), Config::default())
71 }
72
73 pub fn open<P>(root: P, config: Config) -> Result<Self>
74 where
75 P: AsRef<Path>,
76 {
77 let mapper: Arc<dyn Mapper> = if config.in_memory {
78 Arc::new(Memory::new())
79 } else {
80 Arc::new(Disk::single(root.as_ref()))
81 };
82 open(root.as_ref(), mapper, config)
83 }
84
85 pub fn partitioned<P, P2>(root: P, storage_dirs: &[P2], config: Config) -> Result<Self>
86 where
87 P: AsRef<Path>,
88 P2: AsRef<Path>,
89 {
90 let mapper = Disk::new(storage_dirs);
91 open(root.as_ref(), Arc::new(mapper), config)
92 }
93
94 pub fn config(&self) -> &Config {
95 &self.config
96 }
97
98 pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> {
99 let mut memtable = self.mem.write().unwrap();
100 let mut log = self.log.write().unwrap();
101 let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
102
103 log.log_put(key, commit, data).unwrap();
104 memtable.put(key, commit, data);
105
106 self.ensure_memtable(&mut *memtable, &mut *log)?;
107
108 Ok(())
109 }
110
111 pub fn put_many<Iter, Tup, K, V>(&self, rows: Iter) -> Result<()>
112 where
113 Iter: Iterator<Item = Tup>,
114 Tup: std::borrow::Borrow<(K, V)>,
115 K: std::borrow::Borrow<Key>,
116 V: std::borrow::Borrow<[u8]>,
117 {
118 let mut memtable = self.mem.write().unwrap();
119 let mut log = self.log.write().unwrap();
120 let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
121
122 for pair in rows {
123 let (ref k, ref d) = pair.borrow();
124 let (key, data) = (k.borrow(), d.borrow());
125
126 log.log_put(key, commit, data).unwrap();
127 memtable.put(key, commit, data);
128 }
129
130 self.ensure_memtable(&mut *memtable, &mut *log)?;
131
132 Ok(())
133 }
134
135 pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
136 self.query_compactor()?;
137
138 let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
139
140 storage::get(&memtable.values, &*tables, key)
141 }
142
143 pub fn delete(&self, key: &Key) -> Result<()> {
144 let mut memtable = self.mem.write().unwrap();
145 let mut log = self.log.write().unwrap();
146 let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
147
148 log.log_delete(key, commit).unwrap();
149 memtable.delete(key, commit);
150
151 self.ensure_memtable(&mut *memtable, &mut *log)?;
152
153 Ok(())
154 }
155
156 pub fn delete_many<Iter, K>(&self, rows: Iter) -> Result<()>
157 where
158 Iter: Iterator<Item = K>,
159 K: std::borrow::Borrow<Key>,
160 {
161 let mut memtable = self.mem.write().unwrap();
162 let mut log = self.log.write().unwrap();
163 let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
164
165 for k in rows {
166 let key = k.borrow();
167 log.log_delete(key, commit).unwrap();
168 memtable.delete(key, commit);
169 }
170
171 self.ensure_memtable(&mut *memtable, &mut *log)?;
172
173 Ok(())
174 }
175
176 pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch {
177 let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
178
179 WriteBatch {
180 config,
181 commit,
182 memtable: MemTable::new(BTreeMap::new()),
183 log: Arc::clone(&self.log),
184 }
185 }
186
187 pub fn commit(&self, mut batch: WriteBatch) -> Result<()> {
188 let mut memtable = self.mem.write().unwrap();
189 let mut log = self.log.write().unwrap();
190
191 memtable.values.append(&mut batch.memtable.values);
192 self.ensure_memtable(&mut *memtable, &mut *log)?;
193
194 Ok(())
195 }
196
197 pub fn snapshot(&self) -> Snapshot {
198 let (memtable, tables) = (
199 self.mem.read().unwrap().values.clone(),
200 self.tables.read().unwrap().clone(),
201 );
202
203 Snapshot::new(memtable, tables)
204 }
205
206 pub fn range(
207 &self,
208 range: RangeInclusive<Key>,
209 ) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
210 self.query_compactor()?;
211
212 let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
213
214 storage::range(&memtable.values, &*tables, range)
215 }
216
217 pub fn destroy<P>(path: P) -> Result<()>
218 where
219 P: AsRef<Path>,
220 {
221 let path = path.as_ref();
222 if !path.exists() {
223 return Ok(());
224 }
225
226 fs::remove_dir_all(path)?;
227 Ok(())
228 }
229
230 fn query_compactor(&self) -> Result<()> {
231 if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = (
232 self.sender.try_lock(),
233 self.receiver.try_lock(),
234 self.tables.try_write(),
235 ) {
236 query_compactor(
237 &self.root,
238 &*self.mapper,
239 &mut *tables,
240 &mut *receiver,
241 &mut *sender,
242 )?;
243 }
244
245 Ok(())
246 }
247
248 fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> {
249 if mem.mem_size < self.config.max_mem {
250 return Ok(());
251 }
252
253 let mut tables = self.tables.write().unwrap();
254
255 storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?;
256 mem.values.clear();
257 mem.mem_size = 0;
258 log.reset().expect("Write-log rotation failed");
259
260 if is_lvl0_full(&tables, &self.config) {
261 let sender = self.sender.lock().unwrap();
262
263 sender.send(compactor::Req::Start(PathBuf::new()))?;
264 }
265
266 Ok(())
267 }
268}
269
270impl Default for Config {
271 fn default() -> Config {
272 Config {
273 max_mem: DEFAULT_MEM_SIZE,
274 max_tables: DEFAULT_MAX_PAGES,
275 page_size: DEFAULT_TABLE_SIZE,
276 in_memory: false,
277 log_config: LogConfig::default(),
278 }
279 }
280}
281
282fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore> {
283 let root = root.to_path_buf();
284 let log_path = root.join(LOG_FILE);
285 let restore_log = log_path.exists();
286
287 if !root.exists() {
288 fs::create_dir(&root)?;
289 }
290
291 let commit = chrono::Utc::now().timestamp();
292 let mut log = WriteLog::open(&log_path, config.log_config)?;
293 let values = if restore_log && !config.in_memory {
294 log.materialize()?
295 } else {
296 BTreeMap::new()
297 };
298 let mem = MemTable::new(values);
299
300 let tables = load_tables(&root, &*mapper)?;
301
302 let cfg = compactor::Config {
303 max_pages: config.max_tables,
304 page_size: config.page_size,
305 };
306 let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
307 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
308
309 Ok(KvStore {
310 config,
311 root,
312 commit: AtomicUsize::new(commit as usize),
313 mem: RwLock::new(mem),
314 log: Arc::new(RwLock::new(log)),
315 tables: RwLock::new(tables),
316 mapper,
317 sender: Mutex::new(sender),
318 receiver: Mutex::new(receiver),
319 compactor_handle,
320 })
321}
322
323fn load_tables(root: &Path, mapper: &dyn Mapper) -> Result<Vec<BTreeMap<Key, SSTable>>> {
324 let mut tables = Vec::new();
325 let meta_path = root.join(TABLES_FILE);
326
327 if meta_path.exists() {
328 mapper.load_state_from(&meta_path)?;
329 tables = SSTable::sorted_tables(&mapper.active_set()?);
330 }
331
332 Ok(tables)
333}
334
335fn dump_tables(root: &Path, mapper: &dyn Mapper) -> Result<()> {
336 mapper.serialize_state_to(&root.join(TABLES_FILE))?;
337 Ok(())
338}
339
340fn query_compactor(
341 root: &Path,
342 mapper: &dyn Mapper,
343 tables: &mut Vec<BTreeMap<Key, SSTable>>,
344 receiver: &mut Receiver<compactor::Resp>,
345 sender: &mut Sender<compactor::Req>,
346) -> Result<()> {
347 match receiver.try_recv() {
348 Ok(compactor::Resp::Done(new_tables)) => {
349 std::mem::replace(tables, new_tables);
350 dump_tables(root, mapper)?;
351 sender.send(compactor::Req::Gc).unwrap();
352 }
353 Ok(compactor::Resp::Failed(e)) => {
354 return Err(e);
355 }
356 _ => {}
358 }
359
360 Ok(())
361}
362
363#[inline]
364fn is_lvl0_full(tables: &[BTreeMap<Key, SSTable>], config: &Config) -> bool {
365 if tables.is_empty() {
366 false
367 } else {
368 tables[0].len() > config.max_tables
369 }
370}
371
372pub mod test {
373 pub mod gen {
374 use crate::Key;
375 use rand::distributions::Uniform;
376 use rand::{rngs::SmallRng, FromEntropy, Rng};
377 use std::iter;
378 use std::ops::Range;
379
380 pub fn keys() -> impl Iterator<Item = Key> {
381 let mut rng = SmallRng::from_entropy();
382 iter::repeat_with(move || Key(rng.gen()))
383 }
384
385 pub fn data(size: usize) -> impl Iterator<Item = Vec<u8>> {
386 iter::repeat(vec![0; size])
387 }
388
389 pub fn data_vary(range: Range<u64>) -> impl Iterator<Item = Vec<u8>> {
390 let dist = Uniform::from(range);
391 let mut rng = SmallRng::from_entropy();
392
393 iter::repeat_with(move || {
394 let size: u64 = rng.sample(dist);
395 vec![0; size as usize]
396 })
397 }
398
399 pub fn pairs(size: usize) -> impl Iterator<Item = (Key, Vec<u8>)> {
400 keys().zip(data(size))
401 }
402
403 pub fn pairs_vary(range: Range<u64>) -> impl Iterator<Item = (Key, Vec<u8>)> {
404 keys().zip(data_vary(range))
405 }
406 }
407}