1use std::default::Default;
2use std::path::Path;
3use std::sync::Arc;
4
5use crate::columns::map_columns;
6use crate::config::{Config, BACKGROUND_FLUSHES, WRITE_BUFFER_SIZE};
7use crate::database::{DataCategory, Database, Result};
8use crate::error::DatabaseError;
9use rocksdb::{
10 BlockBasedOptions, ColumnFamily, DBCompactionStyle, DBIterator, IteratorMode, Options,
11 ReadOptions, WriteBatch, WriteOptions, DB,
12};
13use std::fs::{metadata, remove_dir_all, rename};
14
15const BACKUP_PATH: &str = "backup_old_db";
17
18#[derive(Debug)]
20struct DBInfo {
21 db: DB,
22}
23
24pub struct RocksDB {
25 db_info: Arc<Option<DBInfo>>,
26 pub config: Config,
27 pub write_opts: WriteOptions,
28 path: String,
29}
30
31unsafe impl Sync for RocksDB {}
33unsafe impl Send for RocksDB {}
34
35impl RocksDB {
36 pub fn open_default(path: &str) -> Result<Self> {
38 Self::open(path, &Config::default())
39 }
40
41 pub fn open(path: &str, config: &Config) -> Result<Self> {
43 let mut opts = Options::default();
44 opts.set_write_buffer_size(WRITE_BUFFER_SIZE);
45 opts.set_max_background_jobs(BACKGROUND_FLUSHES);
46
47 opts.create_if_missing(true);
48 opts.create_missing_column_families(true);
50
51 let block_opts = BlockBasedOptions::default();
52 opts.set_block_based_table_factory(&block_opts);
53
54 opts.set_max_open_files(config.max_open_files);
55 opts.set_use_fsync(false);
56 opts.set_compaction_style(DBCompactionStyle::Level);
57 opts.set_target_file_size_base(config.compaction.target_file_size_base);
58 if let Some(level_multiplier) = config.compaction.max_bytes_for_level_multiplier {
59 opts.set_max_bytes_for_level_multiplier(level_multiplier);
60 }
61 if let Some(compactions) = config.compaction.max_background_compactions {
62 opts.set_max_background_jobs(compactions);
63 }
64
65 let mut write_opts = WriteOptions::default();
66 if !config.wal {
67 write_opts.disable_wal(true);
68 }
69
70 let columns: Vec<_> = (0..config.category_num.unwrap_or(0))
71 .map(|c| format!("col{}", c))
72 .collect();
73 let columns: Vec<&str> = columns.iter().map(|n| n as &str).collect();
74 debug!("[database] Columns: {:?}", columns);
75
76 let db = match config.category_num {
77 Some(_) => DB::open_cf(&opts, path, columns.iter())
78 .map_err(|e| DatabaseError::Internal(e.to_string()))?,
79 None => DB::open(&opts, path).map_err(|e| DatabaseError::Internal(e.to_string()))?,
80 };
81
82 Ok(RocksDB {
83 db_info: Arc::new(Some(DBInfo { db })),
84 write_opts,
85 config: config.clone(),
86 path: path.to_owned(),
87 })
88 }
89
90 pub fn close(&mut self) {
91 let new_db = Arc::new(None);
92 *Arc::get_mut(&mut self.db_info).unwrap() = Arc::try_unwrap(new_db).unwrap();
93 }
94
95 pub fn restore(&mut self, new_db_path: &str) -> Result<()> {
97 self.close();
100
101 let backup = !path_exists(BACKUP_PATH);
103
104 if backup {
106 rename(&self.path, &BACKUP_PATH)?;
107 }
108
109 match rename(&new_db_path, &self.path) {
111 Ok(_) => {
112 if backup {
114 remove_dir_all(&BACKUP_PATH)?;
115 }
116 }
117 Err(e) => {
118 if backup {
120 rename(&BACKUP_PATH, &self.path)?;
121 }
122 return Err(DatabaseError::Internal(e.to_string()));
123 }
124 }
125
126 let new_db = Self::open(&self.path, &self.config).unwrap().db_info;
128 *Arc::get_mut(&mut self.db_info).unwrap() = Arc::try_unwrap(new_db).unwrap();
129 Ok(())
130 }
131
132 pub fn iterator(&self, category: Option<DataCategory>) -> Option<DBIterator> {
133 match *self.db_info {
134 Some(DBInfo { ref db }) => {
135 let iter = {
136 if let Some(col) = category {
137 db.iterator_cf_opt(
138 get_column(db, col).unwrap(),
139 ReadOptions::default(),
140 IteratorMode::Start,
141 )
142 } else {
143 db.iterator_opt(IteratorMode::Start, ReadOptions::default())
144 }
145 };
146 Some(iter)
147 }
148 None => None,
149 }
150 }
151
152 #[cfg(test)]
153 fn clean_cf(&mut self) {
154 let columns: Vec<_> = (0..self.config.category_num.unwrap_or(0))
155 .map(|c| format!("col{}", c))
156 .collect();
157 let columns: Vec<&str> = columns.iter().map(|n| n as &str).collect();
158 if let Some(DBInfo { db }) = Arc::get_mut(&mut self.db_info).unwrap() {
159 for col in columns.iter() {
160 db.drop_cf(col).unwrap();
161 }
162 }
163 }
164
165 #[cfg(test)]
166 fn clean_db(&self) {
167 if path_exists(&self.path) {
168 remove_dir_all(&self.path).unwrap();
169 }
170 }
171}
172
173impl Database for RocksDB {
174 fn get(&self, category: Option<DataCategory>, key: &[u8]) -> Result<Option<Vec<u8>>> {
175 match *self.db_info {
176 Some(DBInfo { ref db }) => {
177 let key = key.to_vec();
179
180 let mut value = db.get(&key)?;
181 if let Some(category) = category {
182 let col = get_column(db, category)?;
183 value = db.get_cf(col, &key)?;
184 }
185 Ok(value.map(|v| v.to_vec()))
186 }
187 None => Ok(None),
188 }
189 }
190
191 fn get_batch(
192 &self,
193 category: Option<DataCategory>,
194 keys: &[Vec<u8>],
195 ) -> Result<Vec<Option<Vec<u8>>>> {
196 let mut values = Vec::with_capacity(keys.len());
197 if let Some(DBInfo { ref db }) = *self.db_info {
198 let keys = keys.to_vec();
199
200 for key in keys {
201 let mut value = db.get(&key)?;
202 if let Some(category) = category.clone() {
203 let col = get_column(db, category)?;
204 value = db.get_cf(col, &key)?;
205 }
206 values.push(value.map(|v| v.to_vec()));
207 }
208 }
209
210 Ok(values)
211 }
212
213 fn insert(&self, category: Option<DataCategory>, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
214 if let Some(DBInfo { ref db }) = *self.db_info {
215 match category {
216 Some(category) => {
217 let col = get_column(db, category)?;
218 db.put_cf(col, key, value)?;
219 }
220 None => db.put(key, value)?,
221 }
222 }
223
224 Ok(())
225 }
226
227 fn insert_batch(
228 &self,
229 category: Option<DataCategory>,
230 keys: Vec<Vec<u8>>,
231 values: Vec<Vec<u8>>,
232 ) -> Result<()> {
233 if keys.len() != values.len() {
234 return Err(DatabaseError::InvalidData);
235 }
236
237 if let Some(DBInfo { ref db }) = *self.db_info {
238 let mut batch = WriteBatch::default();
239
240 for i in 0..keys.len() {
241 match category.clone() {
242 Some(category) => {
243 let col = get_column(db, category)?;
244 batch.put_cf(col, &keys[i], &values[i]);
245 }
246 None => batch.put(&keys[i], &values[i]),
247 }
248 }
249 db.write(batch)?;
250 }
251
252 Ok(())
253 }
254
255 fn contains(&self, category: Option<DataCategory>, key: &[u8]) -> Result<bool> {
256 match *self.db_info {
257 Some(DBInfo { ref db }) => {
258 let key = key.to_vec();
259 let mut value = db.get(&key)?;
260 if let Some(category) = category {
261 let col = get_column(db, category)?;
262 value = db.get_cf(col, &key)?;
263 }
264
265 Ok(value.is_some())
266 }
267 None => Ok(false),
268 }
269 }
270
271 fn remove(&self, category: Option<DataCategory>, key: &[u8]) -> Result<()> {
272 if let Some(DBInfo { ref db }) = *self.db_info {
273 let key = key.to_vec();
274 match category {
275 Some(category) => {
276 let col = get_column(db, category)?;
277 db.delete_cf(col, key)?;
278 }
279 None => db.delete(key)?,
280 }
281 }
282
283 Ok(())
284 }
285
286 fn remove_batch(&self, category: Option<DataCategory>, keys: &[Vec<u8>]) -> Result<()> {
287 if let Some(DBInfo { ref db }) = *self.db_info {
288 let keys = keys.to_vec();
289 let mut batch = WriteBatch::default();
290
291 for key in keys {
292 match category.clone() {
293 Some(category) => {
294 let col = get_column(db, category)?;
295 batch.delete_cf(col, key);
296 }
297 None => db.delete(key)?,
298 }
299 }
300 db.write(batch)?;
301 }
302
303 Ok(())
304 }
305
306 fn restore(&mut self, new_db: &str) -> Result<()> {
307 RocksDB::restore(self, new_db)
308 }
309
310 fn iterator(&self, category: Option<DataCategory>) -> Option<DBIterator> {
311 RocksDB::iterator(self, category)
312 }
313
314 fn close(&mut self) {
315 RocksDB::close(self)
316 }
317
318 fn flush(&self) -> Result<()> {
319 if let Some(DBInfo { ref db }) = *self.db_info {
320 db.flush()?;
321 }
322
323 Ok(())
324 }
325}
326
327fn get_column(db: &DB, category: DataCategory) -> Result<&ColumnFamily> {
329 db.cf_handle(map_columns(category))
330 .ok_or(DatabaseError::NotFound)
331}
332
333fn path_exists(path: &str) -> bool {
335 metadata(Path::new(path)).is_ok()
336}
337
338#[cfg(test)]
339mod tests {
340 use super::{Config, RocksDB};
341 use crate::database::{DataCategory, Database};
342 use crate::error::DatabaseError;
343 use crate::rocksdb::{path_exists, BACKUP_PATH};
344 use crate::test::{batch_op, insert_get_contains_remove};
345 use std::fs::{create_dir, remove_dir_all};
346
347 #[test]
348 fn test_insert_get_contains_remove_with_category() {
349 let cfg = Config::with_category_num(Some(1));
350 let mut db = RocksDB::open(
351 "rocksdb_test/get_insert_contains_remove_with_category",
352 &cfg,
353 )
354 .unwrap();
355
356 insert_get_contains_remove(&db, Some(DataCategory::State));
357 db.clean_cf();
358 db.clean_db();
359 }
360
361 #[test]
362 fn test_insert_get_contains_remove() {
363 let db = RocksDB::open_default("rocksdb_test/get_insert_contains_remove").unwrap();
364
365 insert_get_contains_remove(&db, None);
366 db.clean_db();
367 }
368
369 #[test]
370 fn test_batch_op_with_category() {
371 let cfg = Config::with_category_num(Some(1));
372 let mut db = RocksDB::open("rocksdb_test/batch_op_with_category", &cfg).unwrap();
373
374 batch_op(&db, Some(DataCategory::State));
375
376 db.clean_cf();
377 db.clean_db();
378 }
379
380 #[test]
381 fn test_batch_op() {
382 let db = RocksDB::open_default("rocksdb_test/batch_op").unwrap();
383
384 batch_op(&db, None);
385 db.clean_db();
386 }
387
388 #[test]
389 fn test_insert_batch_error_with_category() {
390 let cfg = Config::with_category_num(Some(1));
391 let mut db = RocksDB::open("rocksdb_test/insert_batch_error_with_category", &cfg).unwrap();
392
393 let data = b"test".to_vec();
394
395 match db.insert_batch(Some(DataCategory::State), vec![data], vec![]) {
396 Err(DatabaseError::InvalidData) => (), _ => panic!("should return error DatabaseError::InvalidData"),
398 }
399
400 db.clean_cf();
401 db.clean_db();
402 }
403
404 #[test]
405 fn test_insert_batch_error() {
406 let db = RocksDB::open_default("rocksdb_test/insert_batch_error").unwrap();
407
408 let data = b"test".to_vec();
409
410 match db.insert_batch(None, vec![data], vec![]) {
411 Err(DatabaseError::InvalidData) => (), _ => panic!("should return error DatabaseError::InvalidData"),
413 }
414 db.clean_db();
415 }
416
417 #[test]
418 fn test_iterator_with_category() {
419 let cfg = Config::with_category_num(Some(1));
420 let mut db = RocksDB::open("rocksdb_test/iterator_with_category", &cfg).unwrap();
421
422 let data1 = b"test1".to_vec();
423 let data2 = b"test2".to_vec();
424
425 db.insert_batch(
426 Some(DataCategory::State),
427 vec![data1.clone(), data2.clone()],
428 vec![data1.clone(), data2.clone()],
429 )
430 .expect("Insert data ok.");
431
432 let contents: Vec<_> = db
433 .iterator(Some(DataCategory::State))
434 .into_iter()
435 .flat_map(|inner| inner)
436 .collect();
437
438 assert_eq!(contents.len(), 2);
439 assert_eq!(&*contents[0].clone().unwrap().0, &*data1);
440 assert_eq!(&*contents[0].clone().unwrap().1, &*data1);
441 assert_eq!(&*contents[1].clone().unwrap().0, &*data2);
442 assert_eq!(&*contents[1].clone().unwrap().1, &*data2);
443
444 db.clean_cf();
445 db.clean_db();
446 }
447
448 #[test]
449 fn test_iterator() {
450 let db = RocksDB::open_default("rocksdb_test/iterator").unwrap();
451
452 let data1 = b"test1".to_vec();
453 let data2 = b"test2".to_vec();
454
455 db.insert_batch(
456 None,
457 vec![data1.clone(), data2.clone()],
458 vec![data1.clone(), data2.clone()],
459 )
460 .expect("Insert data ok.");
461
462 let contents: Vec<_> = db
463 .iterator(None)
464 .into_iter()
465 .flat_map(|inner| inner)
466 .collect();
467 assert_eq!(contents.len(), 2);
468 assert_eq!(&*contents[0].clone().unwrap().0, &*data1);
469 assert_eq!(&*contents[0].clone().unwrap().1, &*data1);
470 assert_eq!(&*contents[1].clone().unwrap().0, &*data2);
471 assert_eq!(&*contents[1].clone().unwrap().1, &*data2);
472 db.clean_db();
473 }
474
475 #[test]
476 fn test_close_with_category() {
477 let cfg = Config::with_category_num(Some(1));
478 let mut db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
479 let data = b"test".to_vec();
480 db.insert(Some(DataCategory::State), data.clone(), data.clone())
481 .unwrap();
482 assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
483 match RocksDB::open_default("rocksdb_test/close_with_category") {
485 Err(DatabaseError::Internal(_)) => (), _ => panic!("should return error DatabaseError::Intrnal"),
488 }
489 db.close();
490 assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(false));
492
493 let cfg = Config::with_category_num(Some(1));
495 let db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
496 assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
497 db.clean_db();
498 }
499
500 #[test]
501 fn test_close() {
502 let mut db = RocksDB::open_default("rocksdb_test/close").unwrap();
503 let data = b"test".to_vec();
504 db.insert(None, data.clone(), data.clone()).unwrap();
505 assert_eq!(db.contains(None, &data), Ok(true));
506 match RocksDB::open_default("rocksdb_test/close") {
508 Err(DatabaseError::Internal(_)) => (), _ => panic!("should return error DatabaseError::Intrnal"),
511 }
512 db.close();
513 assert_eq!(db.contains(None, &data), Ok(false));
515
516 let db = RocksDB::open_default("rocksdb_test/close").unwrap();
518 assert_eq!(db.contains(None, &data), Ok(true));
519 db.clean_db();
520 }
521
522 #[test]
523 fn test_restore() {
524 if path_exists(BACKUP_PATH) {
526 remove_dir_all(BACKUP_PATH).unwrap();
527 }
528 let mut db = RocksDB::open_default("rocksdb_test/restore_backup").unwrap();
529 let new_path_with_backup = "rocksdb_test/restore_new_db_with_backup";
530 let new_db = RocksDB::open_default(new_path_with_backup).unwrap();
531 let data = b"test_no_backup".to_vec();
532 new_db.insert(None, data.clone(), data.clone()).unwrap();
533 assert_eq!(db.contains(None, &data), Ok(false));
534 assert_eq!(db.restore(new_path_with_backup), Ok(()));
535 assert_eq!(db.contains(None, &data), Ok(true));
536 db.clean_db();
538 new_db.clean_db();
539
540 let new_path_no_backup = "rocksdb_test/restore_new_db_no_backup";
542 let mut db = RocksDB::open_default("rocksdb_test/restore_no_backup").unwrap();
543 if !path_exists(BACKUP_PATH) {
544 create_dir(BACKUP_PATH).unwrap();
545 }
546
547 match db.restore(new_path_no_backup) {
548 Err(DatabaseError::Internal(_)) => (), _ => panic!("should return error DatabaseError::Intrnal"),
551 }
552
553 remove_dir_all(BACKUP_PATH).unwrap();
555 db.clean_db();
556 }
557}