Skip to main content

ckb_db/
db.rs

1//! RocksDB wrapper base on OptimisticTransactionDB
2use crate::snapshot::RocksDBSnapshot;
3use crate::transaction::RocksDBTransaction;
4use crate::write_batch::RocksDBWriteBatch;
5use crate::{Result, internal_error};
6use ckb_app_config::DBConfig;
7use ckb_db_schema::Col;
8use ckb_logger::info;
9use rocksdb::ops::{
10    CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, GetPropertyCF,
11    IterateCF, OpenCF, Put, SetOptions, WriteOps,
12};
13use rocksdb::{
14    BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor,
15    DBPinnableSlice, FullOptions, IteratorMode, OptimisticTransactionDB,
16    OptimisticTransactionOptions, Options, SliceTransform, WriteBatch, WriteOptions, ffi,
17};
18use std::path::Path;
19use std::sync::Arc;
20
21const PROPERTY_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
22
23/// RocksDB wrapper base on OptimisticTransactionDB
24///
25/// <https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb>
26#[derive(Clone)]
27pub struct RocksDB {
28    pub(crate) inner: Arc<OptimisticTransactionDB>,
29}
30
31const DEFAULT_CACHE_SIZE: usize = 256 << 20;
32const DEFAULT_CACHE_ENTRY_CHARGE_SIZE: usize = 4096;
33
34impl RocksDB {
35    pub(crate) fn open_with_check(config: &DBConfig, columns: u32) -> Result<Self> {
36        let cf_names: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
37        let mut cache = None;
38
39        let (mut opts, mut cf_descriptors) = if let Some(ref file) = config.options_file {
40            cache = match config.cache_size {
41                Some(0) => None,
42                Some(size) => Some(Cache::new_hyper_clock_cache(
43                    size,
44                    DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
45                )),
46                None => Some(Cache::new_hyper_clock_cache(
47                    DEFAULT_CACHE_SIZE,
48                    DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
49                )),
50            };
51
52            let mut full_opts = FullOptions::load_from_file_with_cache(file, cache.clone(), false)
53                .map_err(|err| internal_error(format!("failed to load the options file: {err}")))?;
54            let cf_names_str: Vec<&str> = cf_names.iter().map(|s| s.as_str()).collect();
55            full_opts
56                .complete_column_families(&cf_names_str, false)
57                .map_err(|err| {
58                    internal_error(format!("failed to check all column families: {err}"))
59                })?;
60            let FullOptions {
61                db_opts,
62                cf_descriptors,
63            } = full_opts;
64            (db_opts, cf_descriptors)
65        } else {
66            let opts = Options::default();
67            let cf_descriptors: Vec<_> = cf_names
68                .iter()
69                .map(|c| ColumnFamilyDescriptor::new(c, Options::default()))
70                .collect();
71            (opts, cf_descriptors)
72        };
73
74        for cf in cf_descriptors.iter_mut() {
75            let mut block_opts = BlockBasedOptions::default();
76            block_opts.set_ribbon_filter(10.0);
77            block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
78            block_opts.set_partition_filters(true);
79            block_opts.set_metadata_block_size(4096);
80            block_opts.set_pin_top_level_index_and_filter(true);
81            match cache {
82                Some(ref cache) => {
83                    block_opts.set_block_cache(cache);
84                    block_opts.set_cache_index_and_filter_blocks(true);
85                    block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
86                }
87                None => block_opts.disable_cache(),
88            }
89            // only COLUMN_BLOCK_BODY column family use prefix seek
90            if cf.name() == "2" {
91                block_opts.set_whole_key_filtering(false);
92                cf.options
93                    .set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
94            }
95            cf.options.set_block_based_table_factory(&block_opts);
96        }
97
98        opts.create_if_missing(true);
99        opts.create_missing_column_families(true);
100        opts.enable_statistics();
101
102        let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
103            .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
104
105        if !config.options.is_empty() {
106            let rocksdb_options: Vec<(&str, &str)> = config
107                .options
108                .iter()
109                .map(|(k, v)| (k.as_str(), v.as_str()))
110                .collect();
111            db.set_options(&rocksdb_options)
112                .map_err(|_| internal_error("failed to set database option"))?;
113        }
114
115        Ok(RocksDB {
116            inner: Arc::new(db),
117        })
118    }
119
120    /// Open a database with the given configuration and columns count.
121    pub fn open(config: &DBConfig, columns: u32) -> Self {
122        Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{err}"))
123    }
124
125    /// Open a database in the given directory with the default configuration and columns count.
126    pub fn open_in<P: AsRef<Path>>(path: P, columns: u32) -> Self {
127        let config = DBConfig {
128            path: path.as_ref().to_path_buf(),
129            ..Default::default()
130        };
131        Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{err}"))
132    }
133
134    /// Set appropriate parameters for bulk loading.
135    pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
136        path: P,
137        columns: u32,
138    ) -> Result<Option<Self>> {
139        let mut opts = Options::default();
140
141        opts.create_missing_column_families(true);
142        opts.set_prepare_for_bulk_load();
143
144        let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
145        let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
146
147        OptimisticTransactionDB::open_cf(&opts, path, cf_options).map_or_else(
148            |err| {
149                let err_str = err.as_ref();
150                if err_str.starts_with("Invalid argument:")
151                    && err_str.ends_with("does not exist (create_if_missing is false)")
152                {
153                    Ok(None)
154                } else if err_str.starts_with("Corruption:") {
155                    info!("DB corrupted: {err_str}.");
156                    Err(internal_error(err_str))
157                } else {
158                    Err(internal_error(format!(
159                        "failed to open the database: {err}"
160                    )))
161                }
162            },
163            |db| {
164                Ok(Some(RocksDB {
165                    inner: Arc::new(db),
166                }))
167            },
168        )
169    }
170
171    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
172    /// so as to avoid unnecessary memory copy.
173    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
174        let cf = cf_handle(&self.inner, col)?;
175        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
176    }
177
178    /// Return the value associated with a key using RocksDB's PinnableSlice from the default column
179    /// so as to avoid unnecessary memory copy.
180    pub fn get_pinned_default(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
181        self.inner.get_pinned(key).map_err(internal_error)
182    }
183
184    /// Insert a value into the database under the given key.
185    pub fn put_default<K, V>(&self, key: K, value: V) -> Result<()>
186    where
187        K: AsRef<[u8]>,
188        V: AsRef<[u8]>,
189    {
190        self.inner.put(key, value).map_err(internal_error)
191    }
192
193    /// Traverse database column with the given callback function.
194    pub fn full_traverse<F>(&self, col: Col, callback: &mut F) -> Result<()>
195    where
196        F: FnMut(&[u8], &[u8]) -> Result<()>,
197    {
198        let cf = cf_handle(&self.inner, col)?;
199        let iter = self
200            .inner
201            .full_iterator_cf(cf, IteratorMode::Start)
202            .map_err(internal_error)?;
203        for (key, val) in iter {
204            callback(&key, &val)?;
205        }
206        Ok(())
207    }
208
209    /// Traverse database column with the given callback function.
210    pub fn traverse<F>(
211        &self,
212        col: Col,
213        callback: &mut F,
214        mode: IteratorMode,
215        limit: usize,
216    ) -> Result<(usize, Vec<u8>)>
217    where
218        F: FnMut(&[u8], &[u8]) -> Result<()>,
219    {
220        let mut count: usize = 0;
221        let mut next_key: Vec<u8> = vec![];
222        let cf = cf_handle(&self.inner, col)?;
223        let iter = self
224            .inner
225            .full_iterator_cf(cf, mode)
226            .map_err(internal_error)?;
227        for (key, val) in iter {
228            if count > limit {
229                next_key = key.to_vec();
230                break;
231            }
232
233            callback(&key, &val)?;
234            count += 1;
235        }
236        Ok((count, next_key))
237    }
238
239    /// Set a snapshot at start of transaction by setting set_snapshot=true
240    pub fn transaction(&self) -> RocksDBTransaction {
241        let write_options = WriteOptions::default();
242        let mut transaction_options = OptimisticTransactionOptions::new();
243        transaction_options.set_snapshot(true);
244
245        RocksDBTransaction {
246            db: Arc::clone(&self.inner),
247            inner: self.inner.transaction(&write_options, &transaction_options),
248        }
249    }
250
251    /// Construct `RocksDBWriteBatch` with default option.
252    pub fn new_write_batch(&self) -> RocksDBWriteBatch {
253        RocksDBWriteBatch {
254            db: Arc::clone(&self.inner),
255            inner: WriteBatch::default(),
256        }
257    }
258
259    /// Write batch into transaction db.
260    pub fn write(&self, batch: &RocksDBWriteBatch) -> Result<()> {
261        self.inner.write(&batch.inner).map_err(internal_error)
262    }
263
264    /// WriteOptions set_sync true
265    /// If true, the write will be flushed from the operating system
266    /// buffer cache (by calling WritableFile::Sync()) before the write
267    /// is considered complete.  If this flag is true, writes will be
268    /// slower.
269    ///
270    /// If this flag is false, and the machine crashes, some recent
271    /// writes may be lost.  Note that if it is just the process that
272    /// crashes (i.e., the machine does not reboot), no writes will be
273    /// lost even if sync==false.
274    ///
275    /// In other words, a DB write with sync==false has similar
276    /// crash semantics as the "write()" system call.  A DB write
277    /// with sync==true has similar crash semantics to a "write()"
278    /// system call followed by "fdatasync()".
279    ///
280    /// Default: false
281    pub fn write_sync(&self, batch: &RocksDBWriteBatch) -> Result<()> {
282        let mut wo = WriteOptions::new();
283        wo.set_sync(true);
284        self.inner
285            .write_opt(&batch.inner, &wo)
286            .map_err(internal_error)
287    }
288
289    /// The begin and end arguments define the key range to be compacted.
290    /// The behavior varies depending on the compaction style being used by the db.
291    /// In case of universal and FIFO compaction styles, the begin and end arguments are ignored and all files are compacted.
292    /// Also, files in each level are compacted and left in the same level.
293    /// For leveled compaction style, all files containing keys in the given range are compacted to the last level containing files.
294    /// If either begin or end are NULL, it is taken to mean the key before all keys in the db or the key after all keys respectively.
295    ///
296    /// If more than one thread calls manual compaction,
297    /// only one will actually schedule it while the other threads will simply wait for
298    /// the scheduled manual compaction to complete.
299    ///
300    /// CompactRange waits while compaction is performed on the background threads and thus is a blocking call.
301    pub fn compact_range(&self, col: Col, start: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> {
302        let cf = cf_handle(&self.inner, col)?;
303        self.inner.compact_range_cf(cf, start, end);
304        Ok(())
305    }
306
307    /// Return `RocksDBSnapshot`.
308    pub fn get_snapshot(&self) -> RocksDBSnapshot {
309        unsafe {
310            let snapshot = ffi::rocksdb_create_snapshot(self.inner.base_db_ptr());
311            RocksDBSnapshot::new(&self.inner, snapshot)
312        }
313    }
314
315    /// Return rocksdb `OptimisticTransactionDB`.
316    pub fn inner(&self) -> Arc<OptimisticTransactionDB> {
317        Arc::clone(&self.inner)
318    }
319
320    /// Create a new column family for the database.
321    pub fn create_cf(&mut self, col: Col) -> Result<()> {
322        let inner = Arc::get_mut(&mut self.inner)
323            .ok_or_else(|| internal_error("create_cf get_mut failed"))?;
324        let opts = Options::default();
325        inner.create_cf(col, &opts).map_err(internal_error)
326    }
327
328    /// Delete column family.
329    pub fn drop_cf(&mut self, col: Col) -> Result<()> {
330        let inner = Arc::get_mut(&mut self.inner)
331            .ok_or_else(|| internal_error("drop_cf get_mut failed"))?;
332        inner.drop_cf(col).map_err(internal_error)
333    }
334
335    /// "rocksdb.estimate-num-keys" - returns estimated number of total keys in
336    /// the active and unflushed immutable memtables and storage.
337    pub fn estimate_num_keys_cf(&self, col: Col) -> Result<Option<u64>> {
338        let cf = cf_handle(&self.inner, col)?;
339        self.inner
340            .property_int_value_cf(cf, PROPERTY_NUM_KEYS)
341            .map_err(internal_error)
342    }
343}
344
345#[inline]
346pub(crate) fn cf_handle(db: &OptimisticTransactionDB, col: Col) -> Result<&ColumnFamily> {
347    db.cf_handle(col)
348        .ok_or_else(|| internal_error(format!("column {col} not found")))
349}