1mod key;
2mod types;
3
4use std::collections::BTreeMap;
5use std::fmt;
6use std::io;
7
8use ckb_sdk::{AddressPayload, NetworkType};
9use ckb_types::{
10 core::{BlockView, HeaderView},
11 packed::{Byte32, Header, OutPoint, Script},
12 prelude::*,
13};
14use rocksdb::{ColumnFamily, DB};
15use thiserror::Error;
16
17use crate::{KVReader, KVTxn, RocksReader, RocksTxn};
18pub use key::{Key, KeyMetrics, KeyType};
19pub use types::{CellIndex, HashType, LiveCellInfo, TxInfo};
20
21use types::{BlockDeltaInfo, KEEP_RECENT_BLOCKS};
22
23pub struct IndexDatabase<'a> {
26 db: &'a DB,
27 cf: &'a ColumnFamily,
28 genesis_header: HeaderView,
30 last_header: Option<HeaderView>,
31 tip_header: HeaderView,
32 init_block_buf: Vec<BlockView>,
33 enable_explorer: bool,
35}
36
37impl<'a> IndexDatabase<'a> {
38 pub fn from_db(
39 db: &'a DB,
40 cf: &'a ColumnFamily,
41 network: NetworkType,
42 genesis_header: HeaderView,
43 enable_explorer: bool,
44 ) -> Result<IndexDatabase<'a>, IndexError> {
45 if genesis_header.number() != 0 {
46 return Err(IndexError::InvalidBlockNumber(genesis_header.number()));
47 }
48
49 let (genesis_hash_opt, network_opt): (Option<Byte32>, Option<NetworkType>) = {
50 let reader = RocksReader::new(db, cf);
51 let genesis_hash_opt = reader
52 .get(&Key::GenesisHash.to_bytes())
53 .map(|bytes| Byte32::from_slice(&bytes).unwrap());
54 let network_opt = reader
55 .get(&Key::Network.to_bytes())
56 .map(|bytes| match bytes[0] {
57 0 => NetworkType::Mainnet,
58 1 => NetworkType::Testnet,
59 254 => NetworkType::Staging,
60 255 => NetworkType::Dev,
61 _ => panic!("Corrupted index database (network field)"),
62 });
63 (genesis_hash_opt, network_opt)
64 };
65 if let Some(genesis_hash) = genesis_hash_opt {
66 if network_opt != Some(network) {
67 return Err(IndexError::InvalidNetworkType(format!(
68 "expected: {}, found: {:?}",
69 network, network_opt
70 )));
71 }
72 let hash: Byte32 = genesis_header.hash();
73 if genesis_hash != hash {
74 return Err(IndexError::InvalidGenesis(format!(
75 "{:#x}, expected: {:#x}",
76 genesis_hash, hash,
77 )));
78 }
79 } else {
80 log::info!("genesis not found, init db");
81 let mut writer = RocksTxn::new(db, cf);
82 writer.put_pair(Key::pair_network(network));
83 writer.put_pair(Key::pair_genesis_hash(&genesis_header.hash().unpack()));
84 writer.commit();
85 }
86
87 let last_header = RocksReader::new(db, cf)
88 .get(&Key::LastHeader.to_bytes())
89 .map(|bytes| Header::new_unchecked(bytes.into()).into_view());
90 Ok(IndexDatabase {
91 db,
92 cf,
93 last_header,
95 genesis_header: genesis_header.clone(),
96 tip_header: genesis_header,
97 init_block_buf: Vec::new(),
98 enable_explorer,
99 })
100 }
101
102 pub fn apply_next_block(&mut self, block: BlockView) -> Result<(), IndexError> {
103 let number = block.header().number();
104 let block_hash = block.header().hash();
105 if let Some(last_header) = self.last_header.clone() {
106 if number != last_header.number() + 1 {
107 return Err(IndexError::InvalidBlockNumber(number));
108 }
109 if block.header().parent_hash() != last_header.hash() {
110 if number == 1 {
111 return Err(IndexError::IllegalBlock(block_hash));
112 }
113
114 log::warn!("Rollback because of block: {:#x}", block_hash);
115 self.init_block_buf.clear();
116 let last_block_delta: BlockDeltaInfo = {
118 let reader = RocksReader::new(self.db, self.cf);
119 let last_header: HeaderView = reader
120 .get(&Key::LastHeader.to_bytes())
121 .map(|bytes| Header::new_unchecked(bytes.into()).into_view())
122 .unwrap();
123 reader
124 .get(&Key::BlockDelta(last_header.number()).to_bytes())
125 .map(|bytes| bincode::deserialize(&bytes).unwrap())
126 .ok_or(IndexError::LongFork)?
127 };
128 let mut txn = RocksTxn::new(self.db, self.cf);
129 last_block_delta.rollback(&mut txn);
130 txn.commit();
131 self.last_header = last_block_delta.parent_header();
132 return Ok(());
133 }
134 if number > self.tip_header.number() {
135 return Err(IndexError::BlockImmature(number));
136 }
137 self.apply_block_unchecked(block);
138 Ok(())
139 } else if number == 0 {
140 let genesis_hash = self.genesis_header.hash();
141 if block_hash != genesis_hash {
142 Err(IndexError::InvalidGenesis(format!(
143 "{:#x}, expected: {:#x}",
144 block_hash, genesis_hash,
145 )))
146 } else {
147 self.apply_block_unchecked(block);
148 Ok(())
149 }
150 } else {
151 Err(IndexError::NotInit)
152 }
153 }
154
155 pub fn update_tip(&mut self, header: HeaderView) {
156 self.tip_header = header
157 }
158
159 pub fn tip_header(&self) -> &HeaderView {
160 &self.tip_header
161 }
162
163 pub fn last_header(&self) -> Option<&HeaderView> {
164 self.last_header.as_ref()
165 }
166
167 pub fn last_number(&self) -> Option<u64> {
168 self.last_header.as_ref().map(HeaderView::number)
169 }
170
171 pub fn next_number(&self) -> Option<u64> {
172 self.last_number().map(|number| number + 1)
173 }
174
175 fn get_address_inner(&self, reader: &RocksReader, lock_hash: Byte32) -> Option<AddressPayload> {
176 reader
177 .get(&Key::LockScript(lock_hash.unpack()).to_bytes())
178 .map(|bytes| AddressPayload::from(Script::new_unchecked(bytes.into())))
179 }
180
181 pub fn get_capacity(&self, lock_hash: Byte32) -> Option<u64> {
182 let reader = RocksReader::new(self.db, self.cf);
183 reader
184 .get(&Key::LockTotalCapacity(lock_hash.unpack()).to_bytes())
185 .map(|bytes| {
186 let mut data = [0u8; 8];
187 data.copy_from_slice(&bytes[..8]);
188 u64::from_le_bytes(data)
189 })
190 }
191
192 pub fn get_lock_script_by_hash(&self, lock_hash: Byte32) -> Option<Script> {
193 let reader = RocksReader::new(self.db, self.cf);
194 reader
195 .get(&Key::LockScript(lock_hash.unpack()).to_bytes())
196 .map(|bytes| Script::new_unchecked(bytes.into()))
197 }
198
199 pub fn get_live_cells_by_lock<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
200 &self,
201 lock_hash: Byte32,
202 from_number: Option<u64>,
203 terminator: F,
204 ) -> Vec<LiveCellInfo> {
205 let key_prefix = Key::LockLiveCellIndexPrefix(lock_hash.unpack(), None);
206 let key_start = Key::LockLiveCellIndexPrefix(lock_hash.unpack(), from_number);
207 self.get_live_cell_infos(key_prefix, key_start, terminator)
208 }
209
210 pub fn get_live_cells_by_type<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
211 &self,
212 type_hash: Byte32,
213 from_number: Option<u64>,
214 terminator: F,
215 ) -> Vec<LiveCellInfo> {
216 let key_prefix = Key::TypeLiveCellIndexPrefix(type_hash.unpack(), None);
217 let key_start = Key::TypeLiveCellIndexPrefix(type_hash.unpack(), from_number);
218 self.get_live_cell_infos(key_prefix, key_start, terminator)
219 }
220
221 pub fn get_live_cells_by_code<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
222 &self,
223 code_hash: Byte32,
224 from_number: Option<u64>,
225 terminator: F,
226 ) -> Vec<LiveCellInfo> {
227 let key_prefix = Key::CodeLiveCellIndexPrefix(code_hash.unpack(), None);
228 let key_start = Key::CodeLiveCellIndexPrefix(code_hash.unpack(), from_number);
229 self.get_live_cell_infos(key_prefix, key_start, terminator)
230 }
231
232 pub fn get_live_cell_infos<F: FnMut(usize, &LiveCellInfo) -> (bool, bool)>(
233 &self,
234 key_prefix: Key,
235 key_start: Key,
236 mut terminator: F,
237 ) -> Vec<LiveCellInfo> {
238 fn get_live_cell_info(reader: &RocksReader, out_point: OutPoint) -> Option<LiveCellInfo> {
239 reader
240 .get(&Key::LiveCellMap(out_point).to_bytes())
241 .map(|bytes| bincode::deserialize(&bytes).unwrap())
242 }
243
244 let reader = RocksReader::new(self.db, self.cf);
245 let key_prefix = key_prefix.to_bytes();
246 let key_start = key_start.to_bytes();
247
248 let mut infos = Vec::new();
249 for (idx, (key_bytes, value_bytes)) in reader.iter_from(&key_start).enumerate() {
250 if key_bytes[..key_prefix.len()] != key_prefix[..] {
251 log::debug!("Reach the end of this lock");
252 break;
253 }
254 let out_point = OutPoint::new_unchecked(value_bytes.into());
255 let live_cell_info = get_live_cell_info(&reader, out_point).unwrap();
256 let (stop, push_info) = terminator(idx, &live_cell_info);
257 if push_info {
258 infos.push(live_cell_info);
259 }
260 if stop {
261 log::trace!("Stop search");
262 break;
263 }
264 }
265 infos
266 }
267
268 pub fn get_top_n(&self, n: usize) -> Vec<(Byte32, Option<AddressPayload>, u64)> {
269 let reader = RocksReader::new(self.db, self.cf);
270 let key_prefix: Vec<u8> = KeyType::LockTotalCapacityIndex.to_bytes();
271
272 let mut pairs = Vec::new();
273 for (key_bytes, _) in reader.iter_from(&key_prefix) {
274 if key_bytes[..key_prefix.len()] != key_prefix[..] {
275 log::debug!("Reach the end of this type");
276 break;
277 }
278 if let Key::LockTotalCapacityIndex(capacity, lock_hash) = Key::from_bytes(&key_bytes) {
279 let address_opt = self.get_address_inner(&reader, lock_hash.clone().pack());
280 pairs.push((lock_hash.pack(), address_opt, capacity));
281 } else {
282 panic!("Got invalid key: {:?}", key_bytes);
283 }
284 if pairs.len() >= n {
285 break;
286 }
287 }
288 pairs
289 }
290
291 fn apply_block_unchecked(&mut self, block: BlockView) {
292 let header = block.header();
293 let block_hash = header.hash();
294 log::debug!("Block: {} => {:x}", header.number(), block_hash);
295
296 self.last_header = Some(header);
298 let blocks = if self.last_number().unwrap() < self.tip_header.number().saturating_sub(256) {
299 self.init_block_buf.push(block);
300 if self.init_block_buf.len() >= 200 {
301 self.init_block_buf.split_off(0)
302 } else {
303 Vec::new()
304 }
305 } else {
306 let mut blocks = self.init_block_buf.split_off(0);
307 blocks.push(block);
308 blocks
309 };
310
311 let mut txn = RocksTxn::new(self.db, self.cf);
312 let blocks_len = blocks.len();
313 for (idx, block) in blocks.into_iter().enumerate() {
314 let clear_old = idx + 1 == blocks_len;
315 let block_delta_info = BlockDeltaInfo::from_block(&block, &txn, clear_old);
316 let number = block_delta_info.number();
317 let hash = block_delta_info.hash();
318 let result = block_delta_info.apply(&mut txn, self.enable_explorer);
319 log::info!(
320 "Block: {} => {:x} (chain_capacity={}, delta={}), txs={}, cell-removed={}, cell-added={}",
321 number,
322 hash,
323 result.chain_capacity,
324 result.capacity_delta,
325 result.txs,
326 result.cell_removed,
327 result.cell_added,
328 );
329 }
330 txn.commit();
331 }
332
333 pub fn get_metrics(&self, key_type_opt: Option<KeyType>) -> BTreeMap<KeyType, KeyMetrics> {
334 let mut key_types = BTreeMap::default();
335 if let Some(key_type) = key_type_opt {
336 key_types.insert(key_type, KeyMetrics::default());
337 } else {
338 let mut types = vec![
339 KeyType::GenesisHash,
340 KeyType::Network,
341 KeyType::LastHeader,
342 KeyType::TotalCapacity,
343 KeyType::RecentHeader,
344 KeyType::BlockDelta,
345 KeyType::LiveCellMap,
346 KeyType::LiveCellIndex,
347 KeyType::LockScript,
348 KeyType::LockTotalCapacity,
349 KeyType::LockTotalCapacityIndex,
350 KeyType::LockLiveCellIndex,
351 KeyType::TypeLiveCellIndex,
352 KeyType::CodeLiveCellIndex,
353 ];
354 if self.enable_explorer {
355 types.extend(vec![KeyType::TxMap, KeyType::LockTx, KeyType::GlobalHash]);
356 }
357 for key_type in types {
358 key_types.insert(key_type, KeyMetrics::default());
359 }
360 }
361 let reader = RocksReader::new(self.db, self.cf);
362 for (key_type, metrics) in &mut key_types {
363 let key_prefix = key_type.to_bytes();
364 for (key_bytes, value_bytes) in reader.iter_from(&key_prefix) {
365 if key_bytes[..key_prefix.len()] != key_prefix[..] {
366 log::debug!("Reach the end of this lock");
367 break;
368 }
369 metrics.add_pair(&key_bytes, &value_bytes);
370 }
371 }
372 key_types
373 }
374}
375
376#[derive(Debug, Clone, Eq, PartialEq, Error)]
377pub enum IndexError {
378 BlockImmature(u64),
379 IllegalBlock(Byte32),
380 InvalidBlockNumber(u64),
381 NotInit,
382 IoError(String),
383 InvalidGenesis(String),
384 InvalidNetworkType(String),
385 LongFork,
386}
387
388impl From<io::Error> for IndexError {
389 fn from(err: io::Error) -> IndexError {
390 IndexError::IoError(err.to_string())
391 }
392}
393
394impl fmt::Display for IndexError {
395 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
396 match self {
397 IndexError::BlockImmature(number) => {
398 write!(
399 f,
400 "Current applied block number {} greater than tip block number",
401 number
402 )?;
403 }
404 IndexError::IllegalBlock(_) => {
405 write!(f, "Current applied block number is 1, but the parent hash not match genesis block hash")?;
406 }
407 IndexError::InvalidBlockNumber(number) => {
408 write!(
409 f,
410 "Current applied block number {} is not the next block of lastest block",
411 number
412 )?;
413 }
414 IndexError::NotInit => {
415 write!(f, "Apply block before database initialization")?;
416 }
417 IndexError::IoError(msg) => {
418 write!(f, "IO error: {}", msg)?;
419 }
420 IndexError::InvalidGenesis(msg) => {
421 write!(f, "Genesis hash not match with DB, {}", msg)?;
422 }
423 IndexError::InvalidNetworkType(msg) => {
424 write!(f, "NetworkType not match with DB, {}", msg)?;
425 }
426 IndexError::LongFork => {
427 write!(
428 f,
429 "Already rollbacked {} blocks, long fork detected",
430 KEEP_RECENT_BLOCKS
431 )?;
432 }
433 }
434 Ok(())
435 }
436}