ckb_index/index/
mod.rs

1mod key;
2mod types;
3
4use std::collections::BTreeMap;
5use std::fmt;
6use std::io;
7
8use ckb_sdk::{AddressPayload, NetworkType};
9use ckb_types::{
10    core::{BlockView, HeaderView},
11    packed::{Byte32, Header, OutPoint, Script},
12    prelude::*,
13};
14use rocksdb::{ColumnFamily, DB};
15use thiserror::Error;
16
17use crate::{KVReader, KVTxn, RocksReader, RocksTxn};
18pub use key::{Key, KeyMetrics, KeyType};
19pub use types::{CellIndex, HashType, LiveCellInfo, TxInfo};
20
21use types::{BlockDeltaInfo, KEEP_RECENT_BLOCKS};
22
23// NOTE: You should reopen to increase database size when processed enough blocks
24//  [reference]: https://stackoverflow.com/a/33571804
25pub struct IndexDatabase<'a> {
26    db: &'a DB,
27    cf: &'a ColumnFamily,
28    // network: NetworkType,
29    genesis_header: HeaderView,
30    last_header: Option<HeaderView>,
31    tip_header: HeaderView,
32    init_block_buf: Vec<BlockView>,
33    // Disable record tx info by default
34    enable_explorer: bool,
35}
36
37impl<'a> IndexDatabase<'a> {
38    pub fn from_db(
39        db: &'a DB,
40        cf: &'a ColumnFamily,
41        network: NetworkType,
42        genesis_header: HeaderView,
43        enable_explorer: bool,
44    ) -> Result<IndexDatabase<'a>, IndexError> {
45        if genesis_header.number() != 0 {
46            return Err(IndexError::InvalidBlockNumber(genesis_header.number()));
47        }
48
49        let (genesis_hash_opt, network_opt): (Option<Byte32>, Option<NetworkType>) = {
50            let reader = RocksReader::new(db, cf);
51            let genesis_hash_opt = reader
52                .get(&Key::GenesisHash.to_bytes())
53                .map(|bytes| Byte32::from_slice(&bytes).unwrap());
54            let network_opt = reader
55                .get(&Key::Network.to_bytes())
56                .map(|bytes| match bytes[0] {
57                    0 => NetworkType::Mainnet,
58                    1 => NetworkType::Testnet,
59                    254 => NetworkType::Staging,
60                    255 => NetworkType::Dev,
61                    _ => panic!("Corrupted index database (network field)"),
62                });
63            (genesis_hash_opt, network_opt)
64        };
65        if let Some(genesis_hash) = genesis_hash_opt {
66            if network_opt != Some(network) {
67                return Err(IndexError::InvalidNetworkType(format!(
68                    "expected: {}, found: {:?}",
69                    network, network_opt
70                )));
71            }
72            let hash: Byte32 = genesis_header.hash();
73            if genesis_hash != hash {
74                return Err(IndexError::InvalidGenesis(format!(
75                    "{:#x}, expected: {:#x}",
76                    genesis_hash, hash,
77                )));
78            }
79        } else {
80            log::info!("genesis not found, init db");
81            let mut writer = RocksTxn::new(db, cf);
82            writer.put_pair(Key::pair_network(network));
83            writer.put_pair(Key::pair_genesis_hash(&genesis_header.hash().unpack()));
84            writer.commit();
85        }
86
87        let last_header = RocksReader::new(db, cf)
88            .get(&Key::LastHeader.to_bytes())
89            .map(|bytes| Header::new_unchecked(bytes.into()).into_view());
90        Ok(IndexDatabase {
91            db,
92            cf,
93            // network,
94            last_header,
95            genesis_header: genesis_header.clone(),
96            tip_header: genesis_header,
97            init_block_buf: Vec::new(),
98            enable_explorer,
99        })
100    }
101
102    pub fn apply_next_block(&mut self, block: BlockView) -> Result<(), IndexError> {
103        let number = block.header().number();
104        let block_hash = block.header().hash();
105        if let Some(last_header) = self.last_header.clone() {
106            if number != last_header.number() + 1 {
107                return Err(IndexError::InvalidBlockNumber(number));
108            }
109            if block.header().parent_hash() != last_header.hash() {
110                if number == 1 {
111                    return Err(IndexError::IllegalBlock(block_hash));
112                }
113
114                log::warn!("Rollback because of block: {:#x}", block_hash);
115                self.init_block_buf.clear();
116                // Reload last header
117                let last_block_delta: BlockDeltaInfo = {
118                    let reader = RocksReader::new(self.db, self.cf);
119                    let last_header: HeaderView = reader
120                        .get(&Key::LastHeader.to_bytes())
121                        .map(|bytes| Header::new_unchecked(bytes.into()).into_view())
122                        .unwrap();
123                    reader
124                        .get(&Key::BlockDelta(last_header.number()).to_bytes())
125                        .map(|bytes| bincode::deserialize(&bytes).unwrap())
126                        .ok_or(IndexError::LongFork)?
127                };
128                let mut txn = RocksTxn::new(self.db, self.cf);
129                last_block_delta.rollback(&mut txn);
130                txn.commit();
131                self.last_header = last_block_delta.parent_header();
132                return Ok(());
133            }
134            if number > self.tip_header.number() {
135                return Err(IndexError::BlockImmature(number));
136            }
137            self.apply_block_unchecked(block);
138            Ok(())
139        } else if number == 0 {
140            let genesis_hash = self.genesis_header.hash();
141            if block_hash != genesis_hash {
142                Err(IndexError::InvalidGenesis(format!(
143                    "{:#x}, expected: {:#x}",
144                    block_hash, genesis_hash,
145                )))
146            } else {
147                self.apply_block_unchecked(block);
148                Ok(())
149            }
150        } else {
151            Err(IndexError::NotInit)
152        }
153    }
154
155    pub fn update_tip(&mut self, header: HeaderView) {
156        self.tip_header = header
157    }
158
159    pub fn tip_header(&self) -> &HeaderView {
160        &self.tip_header
161    }
162
163    pub fn last_header(&self) -> Option<&HeaderView> {
164        self.last_header.as_ref()
165    }
166
167    pub fn last_number(&self) -> Option<u64> {
168        self.last_header.as_ref().map(HeaderView::number)
169    }
170
171    pub fn next_number(&self) -> Option<u64> {
172        self.last_number().map(|number| number + 1)
173    }
174
175    fn get_address_inner(&self, reader: &RocksReader, lock_hash: Byte32) -> Option<AddressPayload> {
176        reader
177            .get(&Key::LockScript(lock_hash.unpack()).to_bytes())
178            .map(|bytes| AddressPayload::from(Script::new_unchecked(bytes.into())))
179    }
180
181    pub fn get_capacity(&self, lock_hash: Byte32) -> Option<u64> {
182        let reader = RocksReader::new(self.db, self.cf);
183        reader
184            .get(&Key::LockTotalCapacity(lock_hash.unpack()).to_bytes())
185            .map(|bytes| {
186                let mut data = [0u8; 8];
187                data.copy_from_slice(&bytes[..8]);
188                u64::from_le_bytes(data)
189            })
190    }
191
192    pub fn get_lock_script_by_hash(&self, lock_hash: Byte32) -> Option<Script> {
193        let reader = RocksReader::new(self.db, self.cf);
194        reader
195            .get(&Key::LockScript(lock_hash.unpack()).to_bytes())
196            .map(|bytes| Script::new_unchecked(bytes.into()))
197    }
198
199    pub fn get_live_cells_by_lock<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
200        &self,
201        lock_hash: Byte32,
202        from_number: Option<u64>,
203        terminator: F,
204    ) -> Vec<LiveCellInfo> {
205        let key_prefix = Key::LockLiveCellIndexPrefix(lock_hash.unpack(), None);
206        let key_start = Key::LockLiveCellIndexPrefix(lock_hash.unpack(), from_number);
207        self.get_live_cell_infos(key_prefix, key_start, terminator)
208    }
209
210    pub fn get_live_cells_by_type<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
211        &self,
212        type_hash: Byte32,
213        from_number: Option<u64>,
214        terminator: F,
215    ) -> Vec<LiveCellInfo> {
216        let key_prefix = Key::TypeLiveCellIndexPrefix(type_hash.unpack(), None);
217        let key_start = Key::TypeLiveCellIndexPrefix(type_hash.unpack(), from_number);
218        self.get_live_cell_infos(key_prefix, key_start, terminator)
219    }
220
221    pub fn get_live_cells_by_code<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
222        &self,
223        code_hash: Byte32,
224        from_number: Option<u64>,
225        terminator: F,
226    ) -> Vec<LiveCellInfo> {
227        let key_prefix = Key::CodeLiveCellIndexPrefix(code_hash.unpack(), None);
228        let key_start = Key::CodeLiveCellIndexPrefix(code_hash.unpack(), from_number);
229        self.get_live_cell_infos(key_prefix, key_start, terminator)
230    }
231
232    pub fn get_live_cell_infos<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
233        &self,
234        key_prefix: Key,
235        key_start: Key,
236        mut terminator: F,
237    ) -> Vec<LiveCellInfo> {
238        fn get_live_cell_info(reader: &RocksReader, out_point: OutPoint) -> Option<LiveCellInfo> {
239            reader
240                .get(&Key::LiveCellMap(out_point).to_bytes())
241                .map(|bytes| bincode::deserialize(&bytes).unwrap())
242        }
243
244        let reader = RocksReader::new(self.db, self.cf);
245        let key_prefix = key_prefix.to_bytes();
246        let key_start = key_start.to_bytes();
247
248        let mut infos = Vec::new();
249        for (idx, (key_bytes, value_bytes)) in reader.iter_from(&key_start).enumerate() {
250            if key_bytes[..key_prefix.len()] != key_prefix[..] {
251                log::debug!("Reach the end of this lock");
252                break;
253            }
254            let out_point = OutPoint::new_unchecked(value_bytes.into());
255            let live_cell_info = get_live_cell_info(&reader, out_point).unwrap();
256            let (stop, push_info) = terminator(idx, &live_cell_info);
257            if push_info {
258                infos.push(live_cell_info);
259            }
260            if stop {
261                log::trace!("Stop search");
262                break;
263            }
264        }
265        infos
266    }
267
268    pub fn get_top_n(&self, n: usize) -> Vec<(Byte32, Option<AddressPayload>, u64)> {
269        let reader = RocksReader::new(self.db, self.cf);
270        let key_prefix: Vec<u8> = KeyType::LockTotalCapacityIndex.to_bytes();
271
272        let mut pairs = Vec::new();
273        for (key_bytes, _) in reader.iter_from(&key_prefix) {
274            if key_bytes[..key_prefix.len()] != key_prefix[..] {
275                log::debug!("Reach the end of this type");
276                break;
277            }
278            if let Key::LockTotalCapacityIndex(capacity, lock_hash) = Key::from_bytes(&key_bytes) {
279                let address_opt = self.get_address_inner(&reader, lock_hash.clone().pack());
280                pairs.push((lock_hash.pack(), address_opt, capacity));
281            } else {
282                panic!("Got invalid key: {:?}", key_bytes);
283            }
284            if pairs.len() >= n {
285                break;
286            }
287        }
288        pairs
289    }
290
291    fn apply_block_unchecked(&mut self, block: BlockView) {
292        let header = block.header();
293        let block_hash = header.hash();
294        log::debug!("Block: {} => {:x}", header.number(), block_hash);
295
296        // TODO: should forbid query when Init
297        self.last_header = Some(header);
298        let blocks = if self.last_number().unwrap() < self.tip_header.number().saturating_sub(256) {
299            self.init_block_buf.push(block);
300            if self.init_block_buf.len() >= 200 {
301                self.init_block_buf.split_off(0)
302            } else {
303                Vec::new()
304            }
305        } else {
306            let mut blocks = self.init_block_buf.split_off(0);
307            blocks.push(block);
308            blocks
309        };
310
311        let mut txn = RocksTxn::new(self.db, self.cf);
312        let blocks_len = blocks.len();
313        for (idx, block) in blocks.into_iter().enumerate() {
314            let clear_old = idx + 1 == blocks_len;
315            let block_delta_info = BlockDeltaInfo::from_block(&block, &txn, clear_old);
316            let number = block_delta_info.number();
317            let hash = block_delta_info.hash();
318            let result = block_delta_info.apply(&mut txn, self.enable_explorer);
319            log::info!(
320                "Block: {} => {:x} (chain_capacity={}, delta={}), txs={}, cell-removed={}, cell-added={}",
321                number,
322                hash,
323                result.chain_capacity,
324                result.capacity_delta,
325                result.txs,
326                result.cell_removed,
327                result.cell_added,
328            );
329        }
330        txn.commit();
331    }
332
333    pub fn get_metrics(&self, key_type_opt: Option<KeyType>) -> BTreeMap<KeyType, KeyMetrics> {
334        let mut key_types = BTreeMap::default();
335        if let Some(key_type) = key_type_opt {
336            key_types.insert(key_type, KeyMetrics::default());
337        } else {
338            let mut types = vec![
339                KeyType::GenesisHash,
340                KeyType::Network,
341                KeyType::LastHeader,
342                KeyType::TotalCapacity,
343                KeyType::RecentHeader,
344                KeyType::BlockDelta,
345                KeyType::LiveCellMap,
346                KeyType::LiveCellIndex,
347                KeyType::LockScript,
348                KeyType::LockTotalCapacity,
349                KeyType::LockTotalCapacityIndex,
350                KeyType::LockLiveCellIndex,
351                KeyType::TypeLiveCellIndex,
352                KeyType::CodeLiveCellIndex,
353            ];
354            if self.enable_explorer {
355                types.extend(vec![KeyType::TxMap, KeyType::LockTx, KeyType::GlobalHash]);
356            }
357            for key_type in types {
358                key_types.insert(key_type, KeyMetrics::default());
359            }
360        }
361        let reader = RocksReader::new(self.db, self.cf);
362        for (key_type, metrics) in &mut key_types {
363            let key_prefix = key_type.to_bytes();
364            for (key_bytes, value_bytes) in reader.iter_from(&key_prefix) {
365                if key_bytes[..key_prefix.len()] != key_prefix[..] {
366                    log::debug!("Reach the end of this lock");
367                    break;
368                }
369                metrics.add_pair(&key_bytes, &value_bytes);
370            }
371        }
372        key_types
373    }
374}
375
376#[derive(Debug, Clone, Eq, PartialEq, Error)]
377pub enum IndexError {
378    BlockImmature(u64),
379    IllegalBlock(Byte32),
380    InvalidBlockNumber(u64),
381    NotInit,
382    IoError(String),
383    InvalidGenesis(String),
384    InvalidNetworkType(String),
385    LongFork,
386}
387
388impl From<io::Error> for IndexError {
389    fn from(err: io::Error) -> IndexError {
390        IndexError::IoError(err.to_string())
391    }
392}
393
394impl fmt::Display for IndexError {
395    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
396        match self {
397            IndexError::BlockImmature(number) => {
398                write!(
399                    f,
400                    "Current applied block number {} greater than tip block number",
401                    number
402                )?;
403            }
404            IndexError::IllegalBlock(_) => {
405                write!(f, "Current applied block number is 1, but the parent hash not match genesis block hash")?;
406            }
407            IndexError::InvalidBlockNumber(number) => {
408                write!(
409                    f,
410                    "Current applied block number {} is not the next block of lastest block",
411                    number
412                )?;
413            }
414            IndexError::NotInit => {
415                write!(f, "Apply block before database initialization")?;
416            }
417            IndexError::IoError(msg) => {
418                write!(f, "IO error: {}", msg)?;
419            }
420            IndexError::InvalidGenesis(msg) => {
421                write!(f, "Genesis hash not match with DB, {}", msg)?;
422            }
423            IndexError::InvalidNetworkType(msg) => {
424                write!(f, "NetworkType not match with DB, {}", msg)?;
425            }
426            IndexError::LongFork => {
427                write!(
428                    f,
429                    "Already rollbacked {} blocks, long fork detected",
430                    KEEP_RECENT_BLOCKS
431                )?;
432            }
433        }
434        Ok(())
435    }
436}