cita_database/
rocksdb.rs

1use std::default::Default;
2use std::path::Path;
3use std::sync::Arc;
4
5use crate::columns::map_columns;
6use crate::config::{Config, BACKGROUND_FLUSHES, WRITE_BUFFER_SIZE};
7use crate::database::{DataCategory, Database, Result};
8use crate::error::DatabaseError;
9use rocksdb::{
10    BlockBasedOptions, ColumnFamily, DBCompactionStyle, DBIterator, IteratorMode, Options,
11    ReadOptions, WriteBatch, WriteOptions, DB,
12};
13use std::fs::{metadata, remove_dir_all, rename};
14
15// The backup db path.
16const BACKUP_PATH: &str = "backup_old_db";
17
18// For the future: Add more info about db.
19#[derive(Debug)]
20struct DBInfo {
21    db: DB,
22}
23
24pub struct RocksDB {
25    db_info: Arc<Option<DBInfo>>,
26    pub config: Config,
27    pub write_opts: WriteOptions,
28    path: String,
29}
30
31// RocksDB guarantees synchronization
32unsafe impl Sync for RocksDB {}
33unsafe impl Send for RocksDB {}
34
35impl RocksDB {
36    /// Open a rocksDB with default config.
37    pub fn open_default(path: &str) -> Result<Self> {
38        Self::open(path, &Config::default())
39    }
40
41    /// Open rocksDB with config.
42    pub fn open(path: &str, config: &Config) -> Result<Self> {
43        let mut opts = Options::default();
44        opts.set_write_buffer_size(WRITE_BUFFER_SIZE);
45        opts.set_max_background_jobs(BACKGROUND_FLUSHES);
46
47        opts.create_if_missing(true);
48        // If true, any column families that didn't exist when opening the database will be created.
49        opts.create_missing_column_families(true);
50
51        let block_opts = BlockBasedOptions::default();
52        opts.set_block_based_table_factory(&block_opts);
53
54        opts.set_max_open_files(config.max_open_files);
55        opts.set_use_fsync(false);
56        opts.set_compaction_style(DBCompactionStyle::Level);
57        opts.set_target_file_size_base(config.compaction.target_file_size_base);
58        if let Some(level_multiplier) = config.compaction.max_bytes_for_level_multiplier {
59            opts.set_max_bytes_for_level_multiplier(level_multiplier);
60        }
61        if let Some(compactions) = config.compaction.max_background_compactions {
62            opts.set_max_background_jobs(compactions);
63        }
64
65        let mut write_opts = WriteOptions::default();
66        if !config.wal {
67            write_opts.disable_wal(true);
68        }
69
70        let columns: Vec<_> = (0..config.category_num.unwrap_or(0))
71            .map(|c| format!("col{}", c))
72            .collect();
73        let columns: Vec<&str> = columns.iter().map(|n| n as &str).collect();
74        debug!("[database] Columns: {:?}", columns);
75
76        let db = match config.category_num {
77            Some(_) => DB::open_cf(&opts, path, columns.iter())
78                .map_err(|e| DatabaseError::Internal(e.to_string()))?,
79            None => DB::open(&opts, path).map_err(|e| DatabaseError::Internal(e.to_string()))?,
80        };
81
82        Ok(RocksDB {
83            db_info: Arc::new(Some(DBInfo { db })),
84            write_opts,
85            config: config.clone(),
86            path: path.to_owned(),
87        })
88    }
89
90    pub fn close(&mut self) {
91        let new_db = Arc::new(None);
92        *Arc::get_mut(&mut self.db_info).unwrap() = Arc::try_unwrap(new_db).unwrap();
93    }
94
95    /// Restore the database from a copy at given path.
96    pub fn restore(&mut self, new_db_path: &str) -> Result<()> {
97        // Close it first
98        // https://github.com/facebook/rocksdb/wiki/Basic-Operations#closing-a-database
99        self.close();
100
101        // Backup if the backup_path does not exist.
102        let backup = !path_exists(BACKUP_PATH);
103
104        // Backup the old db
105        if backup {
106            rename(&self.path, &BACKUP_PATH)?;
107        }
108
109        // Restore the new db.
110        match rename(&new_db_path, &self.path) {
111            Ok(_) => {
112                // Clean up the backup db.
113                if backup {
114                    remove_dir_all(&BACKUP_PATH)?;
115                }
116            }
117            Err(e) => {
118                // Restore the backup db.
119                if backup {
120                    rename(&BACKUP_PATH, &self.path)?;
121                }
122                return Err(DatabaseError::Internal(e.to_string()));
123            }
124        }
125
126        // Reopen the database.
127        let new_db = Self::open(&self.path, &self.config).unwrap().db_info;
128        *Arc::get_mut(&mut self.db_info).unwrap() = Arc::try_unwrap(new_db).unwrap();
129        Ok(())
130    }
131
132    pub fn iterator(&self, category: Option<DataCategory>) -> Option<DBIterator> {
133        match *self.db_info {
134            Some(DBInfo { ref db }) => {
135                let iter = {
136                    if let Some(col) = category {
137                        db.iterator_cf_opt(
138                            get_column(db, col).unwrap(),
139                            ReadOptions::default(),
140                            IteratorMode::Start,
141                        )
142                    } else {
143                        db.iterator_opt(IteratorMode::Start, ReadOptions::default())
144                    }
145                };
146                Some(iter)
147            }
148            None => None,
149        }
150    }
151
152    #[cfg(test)]
153    fn clean_cf(&mut self) {
154        let columns: Vec<_> = (0..self.config.category_num.unwrap_or(0))
155            .map(|c| format!("col{}", c))
156            .collect();
157        let columns: Vec<&str> = columns.iter().map(|n| n as &str).collect();
158        if let Some(DBInfo { db }) = Arc::get_mut(&mut self.db_info).unwrap() {
159            for col in columns.iter() {
160                    db.drop_cf(col).unwrap();
161            }
162        }
163    }
164
165    #[cfg(test)]
166    fn clean_db(&self) {
167        if path_exists(&self.path) {
168            remove_dir_all(&self.path).unwrap();
169        }
170    }
171}
172
173impl Database for RocksDB {
174    fn get(&self, category: Option<DataCategory>, key: &[u8]) -> Result<Option<Vec<u8>>> {
175        match *self.db_info {
176            Some(DBInfo { ref db }) => {
177                // let db = Arc::clone(&self.db);
178                let key = key.to_vec();
179
180                let mut value = db.get(&key)?;
181                if let Some(category) = category {
182                    let col = get_column(db, category)?;
183                    value = db.get_cf(col, &key)?;
184                }
185                Ok(value.map(|v| v.to_vec()))
186            }
187            None => Ok(None),
188        }
189    }
190
191    fn get_batch(
192        &self,
193        category: Option<DataCategory>,
194        keys: &[Vec<u8>],
195    ) -> Result<Vec<Option<Vec<u8>>>> {
196        let mut values = Vec::with_capacity(keys.len());
197        if let Some(DBInfo { ref db }) = *self.db_info {
198            let keys = keys.to_vec();
199
200            for key in keys {
201                let mut value = db.get(&key)?;
202                if let Some(category) = category.clone() {
203                    let col = get_column(db, category)?;
204                    value = db.get_cf(col, &key)?;
205                }
206                values.push(value.map(|v| v.to_vec()));
207            }
208        }
209
210        Ok(values)
211    }
212
213    fn insert(&self, category: Option<DataCategory>, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
214        if let Some(DBInfo { ref db }) = *self.db_info {
215            match category {
216                Some(category) => {
217                    let col = get_column(db, category)?;
218                    db.put_cf(col, key, value)?;
219                }
220                None => db.put(key, value)?,
221            }
222        }
223
224        Ok(())
225    }
226
227    fn insert_batch(
228        &self,
229        category: Option<DataCategory>,
230        keys: Vec<Vec<u8>>,
231        values: Vec<Vec<u8>>,
232    ) -> Result<()> {
233        if keys.len() != values.len() {
234            return Err(DatabaseError::InvalidData);
235        }
236
237        if let Some(DBInfo { ref db }) = *self.db_info {
238            let mut batch = WriteBatch::default();
239
240            for i in 0..keys.len() {
241                match category.clone() {
242                    Some(category) => {
243                        let col = get_column(db, category)?;
244                        batch.put_cf(col, &keys[i], &values[i]);
245                    }
246                    None => batch.put(&keys[i], &values[i]),
247                }
248            }
249            db.write(batch)?;
250        }
251
252        Ok(())
253    }
254
255    fn contains(&self, category: Option<DataCategory>, key: &[u8]) -> Result<bool> {
256        match *self.db_info {
257            Some(DBInfo { ref db }) => {
258                let key = key.to_vec();
259                let mut value = db.get(&key)?;
260                if let Some(category) = category {
261                    let col = get_column(db, category)?;
262                    value = db.get_cf(col, &key)?;
263                }
264
265                Ok(value.is_some())
266            }
267            None => Ok(false),
268        }
269    }
270
271    fn remove(&self, category: Option<DataCategory>, key: &[u8]) -> Result<()> {
272        if let Some(DBInfo { ref db }) = *self.db_info {
273            let key = key.to_vec();
274            match category {
275                Some(category) => {
276                    let col = get_column(db, category)?;
277                    db.delete_cf(col, key)?;
278                }
279                None => db.delete(key)?,
280            }
281        }
282
283        Ok(())
284    }
285
286    fn remove_batch(&self, category: Option<DataCategory>, keys: &[Vec<u8>]) -> Result<()> {
287        if let Some(DBInfo { ref db }) = *self.db_info {
288            let keys = keys.to_vec();
289            let mut batch = WriteBatch::default();
290
291            for key in keys {
292                match category.clone() {
293                    Some(category) => {
294                        let col = get_column(db, category)?;
295                        batch.delete_cf(col, key);
296                    }
297                    None => db.delete(key)?,
298                }
299            }
300            db.write(batch)?;
301        }
302
303        Ok(())
304    }
305
306    fn restore(&mut self, new_db: &str) -> Result<()> {
307        RocksDB::restore(self, new_db)
308    }
309
310    fn iterator(&self, category: Option<DataCategory>) -> Option<DBIterator> {
311        RocksDB::iterator(self, category)
312    }
313
314    fn close(&mut self) {
315        RocksDB::close(self)
316    }
317
318    fn flush(&self) -> Result<()> {
319        if let Some(DBInfo { ref db }) = *self.db_info {
320            db.flush()?;
321        }
322
323        Ok(())
324    }
325}
326
327// Get the column from the data category.
328fn get_column(db: &DB, category: DataCategory) -> Result<&ColumnFamily> {
329    db.cf_handle(map_columns(category))
330        .ok_or(DatabaseError::NotFound)
331}
332
333// Check the path exists.
334fn path_exists(path: &str) -> bool {
335    metadata(Path::new(path)).is_ok()
336}
337
338#[cfg(test)]
339mod tests {
340    use super::{Config, RocksDB};
341    use crate::database::{DataCategory, Database};
342    use crate::error::DatabaseError;
343    use crate::rocksdb::{path_exists, BACKUP_PATH};
344    use crate::test::{batch_op, insert_get_contains_remove};
345    use std::fs::{create_dir, remove_dir_all};
346
347    #[test]
348    fn test_insert_get_contains_remove_with_category() {
349        let cfg = Config::with_category_num(Some(1));
350        let mut db = RocksDB::open(
351            "rocksdb_test/get_insert_contains_remove_with_category",
352            &cfg,
353        )
354        .unwrap();
355
356        insert_get_contains_remove(&db, Some(DataCategory::State));
357        db.clean_cf();
358        db.clean_db();
359    }
360
361    #[test]
362    fn test_insert_get_contains_remove() {
363        let db = RocksDB::open_default("rocksdb_test/get_insert_contains_remove").unwrap();
364
365        insert_get_contains_remove(&db, None);
366        db.clean_db();
367    }
368
369    #[test]
370    fn test_batch_op_with_category() {
371        let cfg = Config::with_category_num(Some(1));
372        let mut db = RocksDB::open("rocksdb_test/batch_op_with_category", &cfg).unwrap();
373
374        batch_op(&db, Some(DataCategory::State));
375
376        db.clean_cf();
377        db.clean_db();
378    }
379
380    #[test]
381    fn test_batch_op() {
382        let db = RocksDB::open_default("rocksdb_test/batch_op").unwrap();
383
384        batch_op(&db, None);
385        db.clean_db();
386    }
387
388    #[test]
389    fn test_insert_batch_error_with_category() {
390        let cfg = Config::with_category_num(Some(1));
391        let mut db = RocksDB::open("rocksdb_test/insert_batch_error_with_category", &cfg).unwrap();
392
393        let data = b"test".to_vec();
394
395        match db.insert_batch(Some(DataCategory::State), vec![data], vec![]) {
396            Err(DatabaseError::InvalidData) => (), // pass
397            _ => panic!("should return error DatabaseError::InvalidData"),
398        }
399
400        db.clean_cf();
401        db.clean_db();
402    }
403
404    #[test]
405    fn test_insert_batch_error() {
406        let db = RocksDB::open_default("rocksdb_test/insert_batch_error").unwrap();
407
408        let data = b"test".to_vec();
409
410        match db.insert_batch(None, vec![data], vec![]) {
411            Err(DatabaseError::InvalidData) => (), // pass
412            _ => panic!("should return error DatabaseError::InvalidData"),
413        }
414        db.clean_db();
415    }
416
417    #[test]
418    fn test_iterator_with_category() {
419        let cfg = Config::with_category_num(Some(1));
420        let mut db = RocksDB::open("rocksdb_test/iterator_with_category", &cfg).unwrap();
421
422        let data1 = b"test1".to_vec();
423        let data2 = b"test2".to_vec();
424
425        db.insert_batch(
426            Some(DataCategory::State),
427            vec![data1.clone(), data2.clone()],
428            vec![data1.clone(), data2.clone()],
429        )
430        .expect("Insert data ok.");
431
432        let contents: Vec<_> = db
433            .iterator(Some(DataCategory::State))
434            .into_iter()
435            .flat_map(|inner| inner)
436            .collect();
437
438        assert_eq!(contents.len(), 2);
439        assert_eq!(&*contents[0].clone().unwrap().0, &*data1);
440        assert_eq!(&*contents[0].clone().unwrap().1, &*data1);
441        assert_eq!(&*contents[1].clone().unwrap().0, &*data2);
442        assert_eq!(&*contents[1].clone().unwrap().1, &*data2);
443
444        db.clean_cf();
445        db.clean_db();
446    }
447
448    #[test]
449    fn test_iterator() {
450        let db = RocksDB::open_default("rocksdb_test/iterator").unwrap();
451
452        let data1 = b"test1".to_vec();
453        let data2 = b"test2".to_vec();
454
455        db.insert_batch(
456            None,
457            vec![data1.clone(), data2.clone()],
458            vec![data1.clone(), data2.clone()],
459        )
460        .expect("Insert data ok.");
461
462        let contents: Vec<_> = db
463            .iterator(None)
464            .into_iter()
465            .flat_map(|inner| inner)
466            .collect();
467        assert_eq!(contents.len(), 2);
468        assert_eq!(&*contents[0].clone().unwrap().0, &*data1);
469        assert_eq!(&*contents[0].clone().unwrap().1, &*data1);
470        assert_eq!(&*contents[1].clone().unwrap().0, &*data2);
471        assert_eq!(&*contents[1].clone().unwrap().1, &*data2);
472        db.clean_db();
473    }
474
475    #[test]
476    fn test_close_with_category() {
477        let cfg = Config::with_category_num(Some(1));
478        let mut db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
479        let data = b"test".to_vec();
480        db.insert(Some(DataCategory::State), data.clone(), data.clone())
481            .unwrap();
482        assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
483        // Can not open it again
484        match RocksDB::open_default("rocksdb_test/close_with_category") {
485            // "IO error: lock : rocksdb/test_close/LOCK: No locks available"
486            Err(DatabaseError::Internal(_)) => (), // pass
487            _ => panic!("should return error DatabaseError::Intrnal"),
488        }
489        db.close();
490        // Can not query
491        assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(false));
492
493        // Can open it again and query
494        let cfg = Config::with_category_num(Some(1));
495        let db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
496        assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
497        db.clean_db();
498    }
499
500    #[test]
501    fn test_close() {
502        let mut db = RocksDB::open_default("rocksdb_test/close").unwrap();
503        let data = b"test".to_vec();
504        db.insert(None, data.clone(), data.clone()).unwrap();
505        assert_eq!(db.contains(None, &data), Ok(true));
506        // Can not open it again
507        match RocksDB::open_default("rocksdb_test/close") {
508            // "IO error: lock : rocksdb/test_close/LOCK: No locks available"
509            Err(DatabaseError::Internal(_)) => (), // pass
510            _ => panic!("should return error DatabaseError::Intrnal"),
511        }
512        db.close();
513        // Can not query
514        assert_eq!(db.contains(None, &data), Ok(false));
515
516        // Can open it again and query
517        let db = RocksDB::open_default("rocksdb_test/close").unwrap();
518        assert_eq!(db.contains(None, &data), Ok(true));
519        db.clean_db();
520    }
521
522    #[test]
523    fn test_restore() {
524        // No backup
525        if path_exists(BACKUP_PATH) {
526            remove_dir_all(BACKUP_PATH).unwrap();
527        }
528        let mut db = RocksDB::open_default("rocksdb_test/restore_backup").unwrap();
529        let new_path_with_backup = "rocksdb_test/restore_new_db_with_backup";
530        let new_db = RocksDB::open_default(new_path_with_backup).unwrap();
531        let data = b"test_no_backup".to_vec();
532        new_db.insert(None, data.clone(), data.clone()).unwrap();
533        assert_eq!(db.contains(None, &data), Ok(false));
534        assert_eq!(db.restore(new_path_with_backup), Ok(()));
535        assert_eq!(db.contains(None, &data), Ok(true));
536        // Clean the data
537        db.clean_db();
538        new_db.clean_db();
539
540        // Backup
541        let new_path_no_backup = "rocksdb_test/restore_new_db_no_backup";
542        let mut db = RocksDB::open_default("rocksdb_test/restore_no_backup").unwrap();
543        if !path_exists(BACKUP_PATH) {
544            create_dir(BACKUP_PATH).unwrap();
545        }
546
547        match db.restore(new_path_no_backup) {
548            //`Err(Internal("Directory not empty (os error 39)"))`
549            Err(DatabaseError::Internal(_)) => (), // pass
550            _ => panic!("should return error DatabaseError::Intrnal"),
551        }
552
553        // Clean the data
554        remove_dir_all(BACKUP_PATH).unwrap();
555        db.clean_db();
556    }
557}