db/storage/kvs/rocksdb/
mod.rs

1pub mod tx;
2pub mod ty;
3
4use async_trait::async_trait;
5pub use tx::*;
6pub use ty::*;
7
8use crate::{
9	constant::CF_NAMES,
10	err::Error,
11	model::{DBTransaction, DatastoreAdapter, StorageAdapter, StorageAdapterName},
12	util::generate_path,
13	StorageVariant,
14};
15use rocksdb::{DBCompactionStyle, OptimisticTransactionDB, Options};
16
17#[derive(Debug)]
18pub struct RocksDBAdapter(StorageAdapter<DBType>);
19
20#[cfg(feature = "test-suite")]
21crate::full_adapter_test_impl!(RocksDBAdapter::default());
22
23impl RocksDBAdapter {
24	impl_new_type_adapter!(DBType);
25
26	pub fn new(path: &str, max_open_files: Option<i32>) -> Result<RocksDBAdapter, Error> {
27		let path = &path["rocksdb:".len()..];
28		let opts = get_options(max_open_files);
29		let cf_names = CF_NAMES.iter();
30		let db_instance = OptimisticTransactionDB::open_cf(&opts, path, cf_names)?;
31		Ok(RocksDBAdapter(StorageAdapter::<DBType>::new(
32			StorageAdapterName::RocksDB,
33			path.to_string(),
34			db_instance,
35			StorageVariant::KeyValueStore,
36		)?))
37	}
38}
39
40#[async_trait]
41impl DatastoreAdapter for RocksDBAdapter {
42	type Transaction = RocksDBTransaction;
43
44	fn default() -> Self {
45		let path = &generate_path("rocksdb", None);
46		RocksDBAdapter::new(path, None).unwrap()
47	}
48
49	fn spawn(&self) -> Self {
50		RocksDBAdapter::default()
51	}
52
53	fn path(&self) -> &str {
54		&self.0.path
55	}
56
57	async fn transaction(&self, rw: bool) -> Result<RocksDBTransaction, Error> {
58		let inner = self.get_initialized_inner().unwrap();
59		let db = &inner.db_instance;
60		let tx = db.transaction();
61
62		let tx = unsafe { extend_tx_lifetime(tx) };
63
64		Ok(DBTransaction::<DBType, TxType>::new(tx, db.clone(), rw).unwrap())
65	}
66}
67
68// The database reference must always outlive the transaction. If it doesn't then this
69// is undefined behavior. This unsafe block ensures that the transaction reference is
70// static, but will cause a crash if the datastore is dropped prematurely.
71unsafe fn extend_tx_lifetime(
72	tx: rocksdb::Transaction<'_, OptimisticTransactionDB>,
73) -> rocksdb::Transaction<'static, OptimisticTransactionDB> {
74	std::mem::transmute::<
75		rocksdb::Transaction<'_, OptimisticTransactionDB>,
76		rocksdb::Transaction<'static, OptimisticTransactionDB>,
77	>(tx)
78}
79
80pub fn get_options(max_open_files: Option<i32>) -> Options {
81	// Current tuning based off of the total ordered example, flash
82	// storage example on
83	// https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
84	let mut opts = Options::default();
85	opts.create_if_missing(true);
86
87	// If there is no column missing, create a new column family
88	opts.create_missing_column_families(true);
89	opts.set_compaction_style(DBCompactionStyle::Level);
90	opts.set_write_buffer_size(67_108_864); // 64mb
91	opts.set_max_write_buffer_number(3);
92	opts.set_target_file_size_base(67_108_864); // 64mb
93	opts.set_level_zero_file_num_compaction_trigger(8);
94	opts.set_level_zero_slowdown_writes_trigger(17);
95	opts.set_level_zero_stop_writes_trigger(24);
96	opts.set_num_levels(4);
97	opts.set_max_bytes_for_level_base(536_870_912); // 512mb
98	opts.set_max_bytes_for_level_multiplier(8.0);
99
100	if let Some(max_open_files) = max_open_files {
101		opts.set_max_open_files(max_open_files);
102	}
103
104	opts
105}