db/storage/kvs/rocksdb/
mod.rs1pub mod tx;
2pub mod ty;
3
4use async_trait::async_trait;
5pub use tx::*;
6pub use ty::*;
7
8use crate::{
9 constant::CF_NAMES,
10 err::Error,
11 model::{DBTransaction, DatastoreAdapter, StorageAdapter, StorageAdapterName},
12 util::generate_path,
13 StorageVariant,
14};
15use rocksdb::{DBCompactionStyle, OptimisticTransactionDB, Options};
16
17#[derive(Debug)]
18pub struct RocksDBAdapter(StorageAdapter<DBType>);
19
20#[cfg(feature = "test-suite")]
21crate::full_adapter_test_impl!(RocksDBAdapter::default());
22
23impl RocksDBAdapter {
24 impl_new_type_adapter!(DBType);
25
26 pub fn new(path: &str, max_open_files: Option<i32>) -> Result<RocksDBAdapter, Error> {
27 let path = &path["rocksdb:".len()..];
28 let opts = get_options(max_open_files);
29 let cf_names = CF_NAMES.iter();
30 let db_instance = OptimisticTransactionDB::open_cf(&opts, path, cf_names)?;
31 Ok(RocksDBAdapter(StorageAdapter::<DBType>::new(
32 StorageAdapterName::RocksDB,
33 path.to_string(),
34 db_instance,
35 StorageVariant::KeyValueStore,
36 )?))
37 }
38}
39
40#[async_trait]
41impl DatastoreAdapter for RocksDBAdapter {
42 type Transaction = RocksDBTransaction;
43
44 fn default() -> Self {
45 let path = &generate_path("rocksdb", None);
46 RocksDBAdapter::new(path, None).unwrap()
47 }
48
49 fn spawn(&self) -> Self {
50 RocksDBAdapter::default()
51 }
52
53 fn path(&self) -> &str {
54 &self.0.path
55 }
56
57 async fn transaction(&self, rw: bool) -> Result<RocksDBTransaction, Error> {
58 let inner = self.get_initialized_inner().unwrap();
59 let db = &inner.db_instance;
60 let tx = db.transaction();
61
62 let tx = unsafe { extend_tx_lifetime(tx) };
63
64 Ok(DBTransaction::<DBType, TxType>::new(tx, db.clone(), rw).unwrap())
65 }
66}
67
68unsafe fn extend_tx_lifetime(
72 tx: rocksdb::Transaction<'_, OptimisticTransactionDB>,
73) -> rocksdb::Transaction<'static, OptimisticTransactionDB> {
74 std::mem::transmute::<
75 rocksdb::Transaction<'_, OptimisticTransactionDB>,
76 rocksdb::Transaction<'static, OptimisticTransactionDB>,
77 >(tx)
78}
79
80pub fn get_options(max_open_files: Option<i32>) -> Options {
81 let mut opts = Options::default();
85 opts.create_if_missing(true);
86
87 opts.create_missing_column_families(true);
89 opts.set_compaction_style(DBCompactionStyle::Level);
90 opts.set_write_buffer_size(67_108_864); opts.set_max_write_buffer_number(3);
92 opts.set_target_file_size_base(67_108_864); opts.set_level_zero_file_num_compaction_trigger(8);
94 opts.set_level_zero_slowdown_writes_trigger(17);
95 opts.set_level_zero_stop_writes_trigger(24);
96 opts.set_num_levels(4);
97 opts.set_max_bytes_for_level_base(536_870_912); opts.set_max_bytes_for_level_multiplier(8.0);
99
100 if let Some(max_open_files) = max_open_files {
101 opts.set_max_open_files(max_open_files);
102 }
103
104 opts
105}