ckb_indexer_sync/
store.rs

1use crate::error::Error;
2
3use ckb_db_schema::Col;
4use ckb_store::{ChainStore, Freezer, StoreCache};
5use rocksdb::{
6    ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, IteratorMode,
7    SecondaryDB as SecondaryRocksDB, SecondaryOpenDescriptor, ops::OpenCF, prelude::*,
8};
9use std::path::Path;
10use std::sync::Arc;
11
12/// Open DB as secondary instance with specified column families
13//
14// When opening DB in secondary mode, you can specify only a subset of column
15// families in the database that should be opened. However, you always need
16// to specify default column family. The default column family name is
17// 'default' and it's stored in ROCKSDB_NAMESPACE::kDefaultColumnFamilyName
18//
19// Column families created by the primary after the secondary instance starts
20// are currently ignored by the secondary instance.  Column families opened
21// by secondary and dropped by the primary will be dropped by secondary as
22// well (on next invocation of TryCatchUpWithPrimary()). However the user
23// of the secondary instance can still access the data of such dropped column
24// family as long as they do not destroy the corresponding column family
25// handle.
26//
27// The options argument specifies the options to open the secondary instance.
28// Options.max_open_files should be set to -1.
29// The name argument specifies the name of the primary db that you have used
30// to open the primary instance.
31// The secondary_path argument points to a directory where the secondary
32// instance stores its info log.
33// The column_families argument specifies a list of column families to open.
34// If default column family is not specified or if any specified column
35// families does not exist, the function returns non-OK status.
36
37// Notice: rust-rocksdb `OpenRaw` handle 'default' column automatically
38#[derive(Clone)]
39pub struct SecondaryDB {
40    inner: Arc<SecondaryRocksDB>,
41}
42
43impl SecondaryDB {
44    /// Open a SecondaryDB
45    pub fn open_cf<P, I, N>(opts: &Options, path: P, cf_names: I, secondary_path: String) -> Self
46    where
47        P: AsRef<Path>,
48        I: IntoIterator<Item = N>,
49        N: Into<String>,
50    {
51        let cf_descriptors: Vec<_> = cf_names
52            .into_iter()
53            .map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
54            .collect();
55
56        let descriptor = SecondaryOpenDescriptor::new(secondary_path);
57        let inner = SecondaryRocksDB::open_cf_descriptors_with_descriptor(
58            opts,
59            path,
60            cf_descriptors,
61            descriptor,
62        )
63        .expect("Failed to open SecondaryDB");
64        SecondaryDB {
65            inner: Arc::new(inner),
66        }
67    }
68
69    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
70    /// so as to avoid unnecessary memory copy.
71    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>, Error> {
72        let cf = self
73            .inner
74            .cf_handle(col)
75            .ok_or_else(|| Error::DB(format!("column {col} not found")))?;
76        self.inner.get_pinned_cf(cf, key).map_err(Into::into)
77    }
78
79    /// Make the secondary instance catch up with the primary by tailing and
80    /// replaying the MANIFEST and WAL of the primary.
81    // Column families created by the primary after the secondary instance starts
82    // will be ignored unless the secondary instance closes and restarts with the
83    // newly created column families.
84    // Column families that exist before secondary instance starts and dropped by
85    // the primary afterwards will be marked as dropped. However, as long as the
86    // secondary instance does not delete the corresponding column family
87    // handles, the data of the column family is still accessible to the
88    // secondary.
89    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
90        self.inner.try_catch_up_with_primary().map_err(Into::into)
91    }
92
93    /// This is used when you want to iterate over a specific ColumnFamily
94    fn iter(&self, col: Col, mode: IteratorMode) -> Result<DBIterator, Error> {
95        let opts = ReadOptions::default();
96        let cf = self
97            .inner
98            .cf_handle(col)
99            .ok_or_else(|| Error::DB(format!("column {col} not found")))?;
100        self.inner
101            .iterator_cf_opt(cf, mode, &opts)
102            .map_err(Into::into)
103    }
104}
105
106impl ChainStore for SecondaryDB {
107    fn cache(&self) -> Option<&StoreCache> {
108        None
109    }
110
111    fn freezer(&self) -> Option<&Freezer> {
112        None
113    }
114
115    fn get(&self, col: Col, key: &[u8]) -> Option<DBPinnableSlice> {
116        self.get_pinned(col, key)
117            .expect("db operation should be ok")
118    }
119
120    fn get_iter(&self, col: Col, mode: IteratorMode) -> DBIterator {
121        self.iter(col, mode).expect("db operation should be ok")
122    }
123}