1use crate::snapshot::RocksDBSnapshot;
3use crate::transaction::RocksDBTransaction;
4use crate::write_batch::RocksDBWriteBatch;
5use crate::{Result, internal_error};
6use ckb_app_config::DBConfig;
7use ckb_db_schema::Col;
8use ckb_logger::info;
9use rocksdb::ops::{
10 CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, IterateCF, OpenCF,
11 Put, SetOptions, WriteOps,
12};
13use rocksdb::{
14 BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor,
15 DBPinnableSlice, FullOptions, IteratorMode, OptimisticTransactionDB,
16 OptimisticTransactionOptions, Options, SliceTransform, WriteBatch, WriteOptions, ffi,
17};
18use std::path::Path;
19use std::sync::Arc;
20
21#[derive(Clone)]
25pub struct RocksDB {
26 pub(crate) inner: Arc<OptimisticTransactionDB>,
27}
28
29const DEFAULT_CACHE_SIZE: usize = 256 << 20;
30const DEFAULT_CACHE_ENTRY_CHARGE_SIZE: usize = 4096;
31
32impl RocksDB {
33 pub(crate) fn open_with_check(config: &DBConfig, columns: u32) -> Result<Self> {
34 let cf_names: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
35 let mut cache = None;
36
37 let (mut opts, mut cf_descriptors) = if let Some(ref file) = config.options_file {
38 cache = match config.cache_size {
39 Some(0) => None,
40 Some(size) => Some(Cache::new_hyper_clock_cache(
41 size,
42 DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
43 )),
44 None => Some(Cache::new_hyper_clock_cache(
45 DEFAULT_CACHE_SIZE,
46 DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
47 )),
48 };
49
50 let mut full_opts = FullOptions::load_from_file_with_cache(file, cache.clone(), false)
51 .map_err(|err| internal_error(format!("failed to load the options file: {err}")))?;
52 let cf_names_str: Vec<&str> = cf_names.iter().map(|s| s.as_str()).collect();
53 full_opts
54 .complete_column_families(&cf_names_str, false)
55 .map_err(|err| {
56 internal_error(format!("failed to check all column families: {err}"))
57 })?;
58 let FullOptions {
59 db_opts,
60 cf_descriptors,
61 } = full_opts;
62 (db_opts, cf_descriptors)
63 } else {
64 let opts = Options::default();
65 let cf_descriptors: Vec<_> = cf_names
66 .iter()
67 .map(|c| ColumnFamilyDescriptor::new(c, Options::default()))
68 .collect();
69 (opts, cf_descriptors)
70 };
71
72 for cf in cf_descriptors.iter_mut() {
73 let mut block_opts = BlockBasedOptions::default();
74 block_opts.set_ribbon_filter(10.0);
75 block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
76 block_opts.set_partition_filters(true);
77 block_opts.set_metadata_block_size(4096);
78 block_opts.set_pin_top_level_index_and_filter(true);
79 match cache {
80 Some(ref cache) => {
81 block_opts.set_block_cache(cache);
82 block_opts.set_cache_index_and_filter_blocks(true);
83 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
84 }
85 None => block_opts.disable_cache(),
86 }
87 if cf.name() == "2" {
89 block_opts.set_whole_key_filtering(false);
90 cf.options
91 .set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
92 }
93 cf.options.set_block_based_table_factory(&block_opts);
94 }
95
96 opts.create_if_missing(true);
97 opts.create_missing_column_families(true);
98 opts.enable_statistics();
99
100 let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
101 .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
102
103 if !config.options.is_empty() {
104 let rocksdb_options: Vec<(&str, &str)> = config
105 .options
106 .iter()
107 .map(|(k, v)| (k.as_str(), v.as_str()))
108 .collect();
109 db.set_options(&rocksdb_options)
110 .map_err(|_| internal_error("failed to set database option"))?;
111 }
112
113 Ok(RocksDB {
114 inner: Arc::new(db),
115 })
116 }
117
118 pub fn open(config: &DBConfig, columns: u32) -> Self {
120 Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{err}"))
121 }
122
123 pub fn open_in<P: AsRef<Path>>(path: P, columns: u32) -> Self {
125 let config = DBConfig {
126 path: path.as_ref().to_path_buf(),
127 ..Default::default()
128 };
129 Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{err}"))
130 }
131
132 pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
134 path: P,
135 columns: u32,
136 ) -> Result<Option<Self>> {
137 let mut opts = Options::default();
138
139 opts.create_missing_column_families(true);
140 opts.set_prepare_for_bulk_load();
141
142 let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
143 let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
144
145 OptimisticTransactionDB::open_cf(&opts, path, cf_options).map_or_else(
146 |err| {
147 let err_str = err.as_ref();
148 if err_str.starts_with("Invalid argument:")
149 && err_str.ends_with("does not exist (create_if_missing is false)")
150 {
151 Ok(None)
152 } else if err_str.starts_with("Corruption:") {
153 info!("DB corrupted: {err_str}.");
154 Err(internal_error(err_str))
155 } else {
156 Err(internal_error(format!(
157 "failed to open the database: {err}"
158 )))
159 }
160 },
161 |db| {
162 Ok(Some(RocksDB {
163 inner: Arc::new(db),
164 }))
165 },
166 )
167 }
168
169 pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
172 let cf = cf_handle(&self.inner, col)?;
173 self.inner.get_pinned_cf(cf, key).map_err(internal_error)
174 }
175
176 pub fn get_pinned_default(&self, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
179 self.inner.get_pinned(key).map_err(internal_error)
180 }
181
182 pub fn put_default<K, V>(&self, key: K, value: V) -> Result<()>
184 where
185 K: AsRef<[u8]>,
186 V: AsRef<[u8]>,
187 {
188 self.inner.put(key, value).map_err(internal_error)
189 }
190
191 pub fn full_traverse<F>(&self, col: Col, callback: &mut F) -> Result<()>
193 where
194 F: FnMut(&[u8], &[u8]) -> Result<()>,
195 {
196 let cf = cf_handle(&self.inner, col)?;
197 let iter = self
198 .inner
199 .full_iterator_cf(cf, IteratorMode::Start)
200 .map_err(internal_error)?;
201 for (key, val) in iter {
202 callback(&key, &val)?;
203 }
204 Ok(())
205 }
206
207 pub fn traverse<F>(
209 &self,
210 col: Col,
211 callback: &mut F,
212 mode: IteratorMode,
213 limit: usize,
214 ) -> Result<(usize, Vec<u8>)>
215 where
216 F: FnMut(&[u8], &[u8]) -> Result<()>,
217 {
218 let mut count: usize = 0;
219 let mut next_key: Vec<u8> = vec![];
220 let cf = cf_handle(&self.inner, col)?;
221 let iter = self
222 .inner
223 .full_iterator_cf(cf, mode)
224 .map_err(internal_error)?;
225 for (key, val) in iter {
226 if count > limit {
227 next_key = key.to_vec();
228 break;
229 }
230
231 callback(&key, &val)?;
232 count += 1;
233 }
234 Ok((count, next_key))
235 }
236
237 pub fn transaction(&self) -> RocksDBTransaction {
239 let write_options = WriteOptions::default();
240 let mut transaction_options = OptimisticTransactionOptions::new();
241 transaction_options.set_snapshot(true);
242
243 RocksDBTransaction {
244 db: Arc::clone(&self.inner),
245 inner: self.inner.transaction(&write_options, &transaction_options),
246 }
247 }
248
249 pub fn new_write_batch(&self) -> RocksDBWriteBatch {
251 RocksDBWriteBatch {
252 db: Arc::clone(&self.inner),
253 inner: WriteBatch::default(),
254 }
255 }
256
257 pub fn write(&self, batch: &RocksDBWriteBatch) -> Result<()> {
259 self.inner.write(&batch.inner).map_err(internal_error)
260 }
261
262 pub fn write_sync(&self, batch: &RocksDBWriteBatch) -> Result<()> {
280 let mut wo = WriteOptions::new();
281 wo.set_sync(true);
282 self.inner
283 .write_opt(&batch.inner, &wo)
284 .map_err(internal_error)
285 }
286
287 pub fn compact_range(&self, col: Col, start: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> {
300 let cf = cf_handle(&self.inner, col)?;
301 self.inner.compact_range_cf(cf, start, end);
302 Ok(())
303 }
304
305 pub fn get_snapshot(&self) -> RocksDBSnapshot {
307 unsafe {
308 let snapshot = ffi::rocksdb_create_snapshot(self.inner.base_db_ptr());
309 RocksDBSnapshot::new(&self.inner, snapshot)
310 }
311 }
312
313 pub fn inner(&self) -> Arc<OptimisticTransactionDB> {
315 Arc::clone(&self.inner)
316 }
317
318 pub fn create_cf(&mut self, col: Col) -> Result<()> {
320 let inner = Arc::get_mut(&mut self.inner)
321 .ok_or_else(|| internal_error("create_cf get_mut failed"))?;
322 let opts = Options::default();
323 inner.create_cf(col, &opts).map_err(internal_error)
324 }
325
326 pub fn drop_cf(&mut self, col: Col) -> Result<()> {
328 let inner = Arc::get_mut(&mut self.inner)
329 .ok_or_else(|| internal_error("drop_cf get_mut failed"))?;
330 inner.drop_cf(col).map_err(internal_error)
331 }
332}
333
334#[inline]
335pub(crate) fn cf_handle(db: &OptimisticTransactionDB, col: Col) -> Result<&ColumnFamily> {
336 db.cf_handle(col)
337 .ok_or_else(|| internal_error(format!("column {col} not found")))
338}