ckb_indexer_sync/store.rs
1use crate::error::Error;
2
3use ckb_db_schema::Col;
4use ckb_store::{ChainStore, Freezer, StoreCache};
5use rocksdb::{
6 ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, IteratorMode,
7 SecondaryDB as SecondaryRocksDB, SecondaryOpenDescriptor, ops::OpenCF, prelude::*,
8};
9use std::path::Path;
10use std::sync::Arc;
11
12/// Open DB as secondary instance with specified column families
13//
14// When opening DB in secondary mode, you can specify only a subset of column
15// families in the database that should be opened. However, you always need
16// to specify default column family. The default column family name is
17// 'default' and it's stored in ROCKSDB_NAMESPACE::kDefaultColumnFamilyName
18//
19// Column families created by the primary after the secondary instance starts
20// are currently ignored by the secondary instance. Column families opened
21// by secondary and dropped by the primary will be dropped by secondary as
22// well (on next invocation of TryCatchUpWithPrimary()). However the user
23// of the secondary instance can still access the data of such dropped column
24// family as long as they do not destroy the corresponding column family
25// handle.
26//
27// The options argument specifies the options to open the secondary instance.
28// Options.max_open_files should be set to -1.
29// The name argument specifies the name of the primary db that you have used
30// to open the primary instance.
31// The secondary_path argument points to a directory where the secondary
32// instance stores its info log.
33// The column_families argument specifies a list of column families to open.
34// If default column family is not specified or if any specified column
35// families does not exist, the function returns non-OK status.
36
37// Notice: rust-rocksdb `OpenRaw` handle 'default' column automatically
38#[derive(Clone)]
39pub struct SecondaryDB {
40 inner: Arc<SecondaryRocksDB>,
41}
42
43impl SecondaryDB {
44 /// Open a SecondaryDB
45 pub fn open_cf<P, I, N>(opts: &Options, path: P, cf_names: I, secondary_path: String) -> Self
46 where
47 P: AsRef<Path>,
48 I: IntoIterator<Item = N>,
49 N: Into<String>,
50 {
51 let cf_descriptors: Vec<_> = cf_names
52 .into_iter()
53 .map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
54 .collect();
55
56 let descriptor = SecondaryOpenDescriptor::new(secondary_path);
57 let inner = SecondaryRocksDB::open_cf_descriptors_with_descriptor(
58 opts,
59 path,
60 cf_descriptors,
61 descriptor,
62 )
63 .expect("Failed to open SecondaryDB");
64 SecondaryDB {
65 inner: Arc::new(inner),
66 }
67 }
68
69 /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
70 /// so as to avoid unnecessary memory copy.
71 pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>, Error> {
72 let cf = self
73 .inner
74 .cf_handle(col)
75 .ok_or_else(|| Error::DB(format!("column {col} not found")))?;
76 self.inner.get_pinned_cf(cf, key).map_err(Into::into)
77 }
78
79 /// Make the secondary instance catch up with the primary by tailing and
80 /// replaying the MANIFEST and WAL of the primary.
81 // Column families created by the primary after the secondary instance starts
82 // will be ignored unless the secondary instance closes and restarts with the
83 // newly created column families.
84 // Column families that exist before secondary instance starts and dropped by
85 // the primary afterwards will be marked as dropped. However, as long as the
86 // secondary instance does not delete the corresponding column family
87 // handles, the data of the column family is still accessible to the
88 // secondary.
89 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
90 self.inner.try_catch_up_with_primary().map_err(Into::into)
91 }
92
93 /// This is used when you want to iterate over a specific ColumnFamily
94 fn iter(&self, col: Col, mode: IteratorMode) -> Result<DBIterator, Error> {
95 let opts = ReadOptions::default();
96 let cf = self
97 .inner
98 .cf_handle(col)
99 .ok_or_else(|| Error::DB(format!("column {col} not found")))?;
100 self.inner
101 .iterator_cf_opt(cf, mode, &opts)
102 .map_err(Into::into)
103 }
104}
105
106impl ChainStore for SecondaryDB {
107 fn cache(&self) -> Option<&StoreCache> {
108 None
109 }
110
111 fn freezer(&self) -> Option<&Freezer> {
112 None
113 }
114
115 fn get(&self, col: Col, key: &[u8]) -> Option<DBPinnableSlice> {
116 self.get_pinned(col, key)
117 .expect("db operation should be ok")
118 }
119
120 fn get_iter(&self, col: Col, mode: IteratorMode) -> DBIterator {
121 self.iter(col, mode).expect("db operation should be ok")
122 }
123}