1use crate::snapshot::RocksDBSnapshot;
3use crate::transaction::RocksDBTransaction;
4use crate::write_batch::RocksDBWriteBatch;
5use crate::{Result, internal_error};
6use ckb_app_config::DBConfig;
7use ckb_db_schema::Col;
8use ckb_logger::info;
9use rocksdb::ops::{
10 CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, GetPropertyCF,
11 IterateCF, OpenCF, Put, SetOptions, WriteOps,
12};
13use rocksdb::{
14 BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor,
15 DBPinnableSlice, FullOptions, IteratorMode, OptimisticTransactionDB,
16 OptimisticTransactionOptions, Options, SliceTransform, WriteBatch, WriteOptions, ffi,
17};
18use std::path::Path;
19use std::sync::Arc;
20
21const PROPERTY_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
22
23#[derive(Clone)]
27pub struct RocksDB {
28 pub(crate) inner: Arc<OptimisticTransactionDB>,
29}
30
31const DEFAULT_CACHE_SIZE: usize = 256 << 20;
32const DEFAULT_CACHE_ENTRY_CHARGE_SIZE: usize = 4096;
33
34impl RocksDB {
35 pub(crate) fn open_with_check(config: &DBConfig, columns: u32) -> Result<Self> {
36 let cf_names: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
37 let mut cache = None;
38
39 let (mut opts, mut cf_descriptors) = if let Some(ref file) = config.options_file {
40 cache = match config.cache_size {
41 Some(0) => None,
42 Some(size) => Some(Cache::new_hyper_clock_cache(
43 size,
44 DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
45 )),
46 None => Some(Cache::new_hyper_clock_cache(
47 DEFAULT_CACHE_SIZE,
48 DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
49 )),
50 };
51
52 let mut full_opts = FullOptions::load_from_file_with_cache(file, cache.clone(), false)
53 .map_err(|err| internal_error(format!("failed to load the options file: {err}")))?;
54 let cf_names_str: Vec<&str> = cf_names.iter().map(|s| s.as_str()).collect();
55 full_opts
56 .complete_column_families(&cf_names_str, false)
57 .map_err(|err| {
58 internal_error(format!("failed to check all column families: {err}"))
59 })?;
60 let FullOptions {
61 db_opts,
62 cf_descriptors,
63 } = full_opts;
64 (db_opts, cf_descriptors)
65 } else {
66 let opts = Options::default();
67 let cf_descriptors: Vec<_> = cf_names
68 .iter()
69 .map(|c| ColumnFamilyDescriptor::new(c, Options::default()))
70 .collect();
71 (opts, cf_descriptors)
72 };
73
74 for cf in cf_descriptors.iter_mut() {
75 let mut block_opts = BlockBasedOptions::default();
76 block_opts.set_ribbon_filter(10.0);
77 block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
78 block_opts.set_partition_filters(true);
79 block_opts.set_metadata_block_size(4096);
80 block_opts.set_pin_top_level_index_and_filter(true);
81 match cache {
82 Some(ref cache) => {
83 block_opts.set_block_cache(cache);
84 block_opts.set_cache_index_and_filter_blocks(true);
85 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
86 }
87 None => block_opts.disable_cache(),
88 }
89 if cf.name() == "2" {
91 block_opts.set_whole_key_filtering(false);
92 cf.options
93 .set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
94 }
95 cf.options.set_block_based_table_factory(&block_opts);
96 }
97
98 opts.create_if_missing(true);
99 opts.create_missing_column_families(true);
100 opts.enable_statistics();
101
102 let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
103 .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
104
105 if !config.options.is_empty() {
106 let rocksdb_options: Vec<(&str, &str)> = config
107 .options
108 .iter()
109 .map(|(k, v)| (k.as_str(), v.as_str()))
110 .collect();
111 db.set_options(&rocksdb_options)
112 .map_err(|_| internal_error("failed to set database option"))?;
113 }
114
115 Ok(RocksDB {
116 inner: Arc::new(db),
117 })
118 }
119
120 pub fn open(config: &DBConfig, columns: u32) -> Self {
122 Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{err}"))
123 }
124
125 pub fn open_in<P: AsRef<Path>>(path: P, columns: u32) -> Self {
127 let config = DBConfig {
128 path: path.as_ref().to_path_buf(),
129 ..Default::default()
130 };
131 Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{err}"))
132 }
133
134 pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
136 path: P,
137 columns: u32,
138 ) -> Result<Option<Self>> {
139 let mut opts = Options::default();
140
141 opts.create_missing_column_families(true);
142 opts.set_prepare_for_bulk_load();
143
144 let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
145 let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
146
147 OptimisticTransactionDB::open_cf(&opts, path, cf_options).map_or_else(
148 |err| {
149 let err_str = err.as_ref();
150 if err_str.starts_with("Invalid argument:")
151 && err_str.ends_with("does not exist (create_if_missing is false)")
152 {
153 Ok(None)
154 } else if err_str.starts_with("Corruption:") {
155 info!("DB corrupted: {err_str}.");
156 Err(internal_error(err_str))
157 } else {
158 Err(internal_error(format!(
159 "failed to open the database: {err}"
160 )))
161 }
162 },
163 |db| {
164 Ok(Some(RocksDB {
165 inner: Arc::new(db),
166 }))
167 },
168 )
169 }
170
171 pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
174 let cf = cf_handle(&self.inner, col)?;
175 self.inner.get_pinned_cf(cf, key).map_err(internal_error)
176 }
177
178 pub fn get_pinned_default(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
181 self.inner.get_pinned(key).map_err(internal_error)
182 }
183
184 pub fn put_default<K, V>(&self, key: K, value: V) -> Result<()>
186 where
187 K: AsRef<[u8]>,
188 V: AsRef<[u8]>,
189 {
190 self.inner.put(key, value).map_err(internal_error)
191 }
192
193 pub fn full_traverse<F>(&self, col: Col, callback: &mut F) -> Result<()>
195 where
196 F: FnMut(&[u8], &[u8]) -> Result<()>,
197 {
198 let cf = cf_handle(&self.inner, col)?;
199 let iter = self
200 .inner
201 .full_iterator_cf(cf, IteratorMode::Start)
202 .map_err(internal_error)?;
203 for (key, val) in iter {
204 callback(&key, &val)?;
205 }
206 Ok(())
207 }
208
209 pub fn traverse<F>(
211 &self,
212 col: Col,
213 callback: &mut F,
214 mode: IteratorMode,
215 limit: usize,
216 ) -> Result<(usize, Vec<u8>)>
217 where
218 F: FnMut(&[u8], &[u8]) -> Result<()>,
219 {
220 let mut count: usize = 0;
221 let mut next_key: Vec<u8> = vec![];
222 let cf = cf_handle(&self.inner, col)?;
223 let iter = self
224 .inner
225 .full_iterator_cf(cf, mode)
226 .map_err(internal_error)?;
227 for (key, val) in iter {
228 if count > limit {
229 next_key = key.to_vec();
230 break;
231 }
232
233 callback(&key, &val)?;
234 count += 1;
235 }
236 Ok((count, next_key))
237 }
238
239 pub fn transaction(&self) -> RocksDBTransaction {
241 let write_options = WriteOptions::default();
242 let mut transaction_options = OptimisticTransactionOptions::new();
243 transaction_options.set_snapshot(true);
244
245 RocksDBTransaction {
246 db: Arc::clone(&self.inner),
247 inner: self.inner.transaction(&write_options, &transaction_options),
248 }
249 }
250
251 pub fn new_write_batch(&self) -> RocksDBWriteBatch {
253 RocksDBWriteBatch {
254 db: Arc::clone(&self.inner),
255 inner: WriteBatch::default(),
256 }
257 }
258
259 pub fn write(&self, batch: &RocksDBWriteBatch) -> Result<()> {
261 self.inner.write(&batch.inner).map_err(internal_error)
262 }
263
264 pub fn write_sync(&self, batch: &RocksDBWriteBatch) -> Result<()> {
282 let mut wo = WriteOptions::new();
283 wo.set_sync(true);
284 self.inner
285 .write_opt(&batch.inner, &wo)
286 .map_err(internal_error)
287 }
288
289 pub fn compact_range(&self, col: Col, start: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> {
302 let cf = cf_handle(&self.inner, col)?;
303 self.inner.compact_range_cf(cf, start, end);
304 Ok(())
305 }
306
307 pub fn get_snapshot(&self) -> RocksDBSnapshot {
309 unsafe {
310 let snapshot = ffi::rocksdb_create_snapshot(self.inner.base_db_ptr());
311 RocksDBSnapshot::new(&self.inner, snapshot)
312 }
313 }
314
315 pub fn inner(&self) -> Arc<OptimisticTransactionDB> {
317 Arc::clone(&self.inner)
318 }
319
320 pub fn create_cf(&mut self, col: Col) -> Result<()> {
322 let inner = Arc::get_mut(&mut self.inner)
323 .ok_or_else(|| internal_error("create_cf get_mut failed"))?;
324 let opts = Options::default();
325 inner.create_cf(col, &opts).map_err(internal_error)
326 }
327
328 pub fn drop_cf(&mut self, col: Col) -> Result<()> {
330 let inner = Arc::get_mut(&mut self.inner)
331 .ok_or_else(|| internal_error("drop_cf get_mut failed"))?;
332 inner.drop_cf(col).map_err(internal_error)
333 }
334
335 pub fn estimate_num_keys_cf(&self, col: Col) -> Result<Option<u64>> {
338 let cf = cf_handle(&self.inner, col)?;
339 self.inner
340 .property_int_value_cf(cf, PROPERTY_NUM_KEYS)
341 .map_err(internal_error)
342 }
343}
344
345#[inline]
346pub(crate) fn cf_handle(db: &OptimisticTransactionDB, col: Col) -> Result<&ColumnFamily> {
347 db.cf_handle(col)
348 .ok_or_else(|| internal_error(format!("column {col} not found")))
349}