ckb_db/
db.rs

1//! RocksDB wrapper base on OptimisticTransactionDB
2use crate::snapshot::RocksDBSnapshot;
3use crate::transaction::RocksDBTransaction;
4use crate::write_batch::RocksDBWriteBatch;
5use crate::{Result, internal_error};
6use ckb_app_config::DBConfig;
7use ckb_db_schema::Col;
8use ckb_logger::info;
9use rocksdb::ops::{
10    CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, IterateCF, OpenCF,
11    Put, SetOptions, WriteOps,
12};
13use rocksdb::{
14    BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor,
15    DBPinnableSlice, FullOptions, IteratorMode, OptimisticTransactionDB,
16    OptimisticTransactionOptions, Options, SliceTransform, WriteBatch, WriteOptions, ffi,
17};
18use std::path::Path;
19use std::sync::Arc;
20
21/// RocksDB wrapper base on OptimisticTransactionDB
22///
23/// <https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb>
24#[derive(Clone)]
25pub struct RocksDB {
26    pub(crate) inner: Arc<OptimisticTransactionDB>,
27}
28
29const DEFAULT_CACHE_SIZE: usize = 256 << 20;
30const DEFAULT_CACHE_ENTRY_CHARGE_SIZE: usize = 4096;
31
32impl RocksDB {
33    pub(crate) fn open_with_check(config: &DBConfig, columns: u32) -> Result<Self> {
34        let cf_names: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
35        let mut cache = None;
36
37        let (mut opts, mut cf_descriptors) = if let Some(ref file) = config.options_file {
38            cache = match config.cache_size {
39                Some(0) => None,
40                Some(size) => Some(Cache::new_hyper_clock_cache(
41                    size,
42                    DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
43                )),
44                None => Some(Cache::new_hyper_clock_cache(
45                    DEFAULT_CACHE_SIZE,
46                    DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
47                )),
48            };
49
50            let mut full_opts = FullOptions::load_from_file_with_cache(file, cache.clone(), false)
51                .map_err(|err| internal_error(format!("failed to load the options file: {err}")))?;
52            let cf_names_str: Vec<&str> = cf_names.iter().map(|s| s.as_str()).collect();
53            full_opts
54                .complete_column_families(&cf_names_str, false)
55                .map_err(|err| {
56                    internal_error(format!("failed to check all column families: {err}"))
57                })?;
58            let FullOptions {
59                db_opts,
60                cf_descriptors,
61            } = full_opts;
62            (db_opts, cf_descriptors)
63        } else {
64            let opts = Options::default();
65            let cf_descriptors: Vec<_> = cf_names
66                .iter()
67                .map(|c| ColumnFamilyDescriptor::new(c, Options::default()))
68                .collect();
69            (opts, cf_descriptors)
70        };
71
72        for cf in cf_descriptors.iter_mut() {
73            let mut block_opts = BlockBasedOptions::default();
74            block_opts.set_ribbon_filter(10.0);
75            block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
76            block_opts.set_partition_filters(true);
77            block_opts.set_metadata_block_size(4096);
78            block_opts.set_pin_top_level_index_and_filter(true);
79            match cache {
80                Some(ref cache) => {
81                    block_opts.set_block_cache(cache);
82                    block_opts.set_cache_index_and_filter_blocks(true);
83                    block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
84                }
85                None => block_opts.disable_cache(),
86            }
87            // only COLUMN_BLOCK_BODY column family use prefix seek
88            if cf.name() == "2" {
89                block_opts.set_whole_key_filtering(false);
90                cf.options
91                    .set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
92            }
93            cf.options.set_block_based_table_factory(&block_opts);
94        }
95
96        opts.create_if_missing(true);
97        opts.create_missing_column_families(true);
98        opts.enable_statistics();
99
100        let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
101            .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
102
103        if !config.options.is_empty() {
104            let rocksdb_options: Vec<(&str, &str)> = config
105                .options
106                .iter()
107                .map(|(k, v)| (k.as_str(), v.as_str()))
108                .collect();
109            db.set_options(&rocksdb_options)
110                .map_err(|_| internal_error("failed to set database option"))?;
111        }
112
113        Ok(RocksDB {
114            inner: Arc::new(db),
115        })
116    }
117
118    /// Open a database with the given configuration and columns count.
119    pub fn open(config: &DBConfig, columns: u32) -> Self {
120        Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{err}"))
121    }
122
123    /// Open a database in the given directory with the default configuration and columns count.
124    pub fn open_in<P: AsRef<Path>>(path: P, columns: u32) -> Self {
125        let config = DBConfig {
126            path: path.as_ref().to_path_buf(),
127            ..Default::default()
128        };
129        Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{err}"))
130    }
131
132    /// Set appropriate parameters for bulk loading.
133    pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
134        path: P,
135        columns: u32,
136    ) -> Result<Option<Self>> {
137        let mut opts = Options::default();
138
139        opts.create_missing_column_families(true);
140        opts.set_prepare_for_bulk_load();
141
142        let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
143        let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
144
145        OptimisticTransactionDB::open_cf(&opts, path, cf_options).map_or_else(
146            |err| {
147                let err_str = err.as_ref();
148                if err_str.starts_with("Invalid argument:")
149                    && err_str.ends_with("does not exist (create_if_missing is false)")
150                {
151                    Ok(None)
152                } else if err_str.starts_with("Corruption:") {
153                    info!("DB corrupted: {err_str}.");
154                    Err(internal_error(err_str))
155                } else {
156                    Err(internal_error(format!(
157                        "failed to open the database: {err}"
158                    )))
159                }
160            },
161            |db| {
162                Ok(Some(RocksDB {
163                    inner: Arc::new(db),
164                }))
165            },
166        )
167    }
168
169    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
170    /// so as to avoid unnecessary memory copy.
171    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
172        let cf = cf_handle(&self.inner, col)?;
173        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
174    }
175
176    /// Return the value associated with a key using RocksDB's PinnableSlice from the default column
177    /// so as to avoid unnecessary memory copy.
178    pub fn get_pinned_default(&self, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
179        self.inner.get_pinned(key).map_err(internal_error)
180    }
181
182    /// Insert a value into the database under the given key.
183    pub fn put_default<K, V>(&self, key: K, value: V) -> Result<()>
184    where
185        K: AsRef<[u8]>,
186        V: AsRef<[u8]>,
187    {
188        self.inner.put(key, value).map_err(internal_error)
189    }
190
191    /// Traverse database column with the given callback function.
192    pub fn full_traverse<F>(&self, col: Col, callback: &mut F) -> Result<()>
193    where
194        F: FnMut(&[u8], &[u8]) -> Result<()>,
195    {
196        let cf = cf_handle(&self.inner, col)?;
197        let iter = self
198            .inner
199            .full_iterator_cf(cf, IteratorMode::Start)
200            .map_err(internal_error)?;
201        for (key, val) in iter {
202            callback(&key, &val)?;
203        }
204        Ok(())
205    }
206
207    /// Traverse database column with the given callback function.
208    pub fn traverse<F>(
209        &self,
210        col: Col,
211        callback: &mut F,
212        mode: IteratorMode,
213        limit: usize,
214    ) -> Result<(usize, Vec<u8>)>
215    where
216        F: FnMut(&[u8], &[u8]) -> Result<()>,
217    {
218        let mut count: usize = 0;
219        let mut next_key: Vec<u8> = vec![];
220        let cf = cf_handle(&self.inner, col)?;
221        let iter = self
222            .inner
223            .full_iterator_cf(cf, mode)
224            .map_err(internal_error)?;
225        for (key, val) in iter {
226            if count > limit {
227                next_key = key.to_vec();
228                break;
229            }
230
231            callback(&key, &val)?;
232            count += 1;
233        }
234        Ok((count, next_key))
235    }
236
237    /// Set a snapshot at start of transaction by setting set_snapshot=true
238    pub fn transaction(&self) -> RocksDBTransaction {
239        let write_options = WriteOptions::default();
240        let mut transaction_options = OptimisticTransactionOptions::new();
241        transaction_options.set_snapshot(true);
242
243        RocksDBTransaction {
244            db: Arc::clone(&self.inner),
245            inner: self.inner.transaction(&write_options, &transaction_options),
246        }
247    }
248
249    /// Construct `RocksDBWriteBatch` with default option.
250    pub fn new_write_batch(&self) -> RocksDBWriteBatch {
251        RocksDBWriteBatch {
252            db: Arc::clone(&self.inner),
253            inner: WriteBatch::default(),
254        }
255    }
256
257    /// Write batch into transaction db.
258    pub fn write(&self, batch: &RocksDBWriteBatch) -> Result<()> {
259        self.inner.write(&batch.inner).map_err(internal_error)
260    }
261
262    /// WriteOptions set_sync true
263    /// If true, the write will be flushed from the operating system
264    /// buffer cache (by calling WritableFile::Sync()) before the write
265    /// is considered complete.  If this flag is true, writes will be
266    /// slower.
267    ///
268    /// If this flag is false, and the machine crashes, some recent
269    /// writes may be lost.  Note that if it is just the process that
270    /// crashes (i.e., the machine does not reboot), no writes will be
271    /// lost even if sync==false.
272    ///
273    /// In other words, a DB write with sync==false has similar
274    /// crash semantics as the "write()" system call.  A DB write
275    /// with sync==true has similar crash semantics to a "write()"
276    /// system call followed by "fdatasync()".
277    ///
278    /// Default: false
279    pub fn write_sync(&self, batch: &RocksDBWriteBatch) -> Result<()> {
280        let mut wo = WriteOptions::new();
281        wo.set_sync(true);
282        self.inner
283            .write_opt(&batch.inner, &wo)
284            .map_err(internal_error)
285    }
286
287    /// The begin and end arguments define the key range to be compacted.
288    /// The behavior varies depending on the compaction style being used by the db.
289    /// In case of universal and FIFO compaction styles, the begin and end arguments are ignored and all files are compacted.
290    /// Also, files in each level are compacted and left in the same level.
291    /// For leveled compaction style, all files containing keys in the given range are compacted to the last level containing files.
292    /// If either begin or end are NULL, it is taken to mean the key before all keys in the db or the key after all keys respectively.
293    ///
294    /// If more than one thread calls manual compaction,
295    /// only one will actually schedule it while the other threads will simply wait for
296    /// the scheduled manual compaction to complete.
297    ///
298    /// CompactRange waits while compaction is performed on the background threads and thus is a blocking call.
299    pub fn compact_range(&self, col: Col, start: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> {
300        let cf = cf_handle(&self.inner, col)?;
301        self.inner.compact_range_cf(cf, start, end);
302        Ok(())
303    }
304
305    /// Return `RocksDBSnapshot`.
306    pub fn get_snapshot(&self) -> RocksDBSnapshot {
307        unsafe {
308            let snapshot = ffi::rocksdb_create_snapshot(self.inner.base_db_ptr());
309            RocksDBSnapshot::new(&self.inner, snapshot)
310        }
311    }
312
313    /// Return rocksdb `OptimisticTransactionDB`.
314    pub fn inner(&self) -> Arc<OptimisticTransactionDB> {
315        Arc::clone(&self.inner)
316    }
317
318    /// Create a new column family for the database.
319    pub fn create_cf(&mut self, col: Col) -> Result<()> {
320        let inner = Arc::get_mut(&mut self.inner)
321            .ok_or_else(|| internal_error("create_cf get_mut failed"))?;
322        let opts = Options::default();
323        inner.create_cf(col, &opts).map_err(internal_error)
324    }
325
326    /// Delete column family.
327    pub fn drop_cf(&mut self, col: Col) -> Result<()> {
328        let inner = Arc::get_mut(&mut self.inner)
329            .ok_or_else(|| internal_error("drop_cf get_mut failed"))?;
330        inner.drop_cf(col).map_err(internal_error)
331    }
332}
333
334#[inline]
335pub(crate) fn cf_handle(db: &OptimisticTransactionDB, col: Col) -> Result<&ColumnFamily> {
336    db.cf_handle(col)
337        .ok_or_else(|| internal_error(format!("column {col} not found")))
338}