1pub(crate) mod custom_filters;
4pub(crate) mod error;
5pub(crate) mod pool;
6pub(crate) mod store;
7
8pub use crate::custom_filters::CustomFilters;
9pub use crate::error::Error;
10pub use crate::pool::{Pool, PoolService};
11pub use crate::store::SecondaryDB;
12
13use ckb_app_config::{DBConfig, IndexerSyncConfig};
14use ckb_async_runtime::{
15 Handle,
16 tokio::{self, time},
17};
18use ckb_db_schema::{
19 COLUMN_BLOCK_BODY, COLUMN_BLOCK_EXTENSION, COLUMN_BLOCK_HEADER, COLUMN_BLOCK_PROPOSAL_IDS,
20 COLUMN_BLOCK_UNCLE, COLUMN_INDEX, COLUMN_META,
21};
22use ckb_logger::{error, info};
23use ckb_notify::NotifyController;
24use ckb_stop_handler::{CancellationToken, has_received_stop_signal, new_tokio_exit_rx};
25use ckb_store::ChainStore;
26use ckb_types::{
27 H256,
28 core::{self, BlockNumber, BlockView},
29 packed::Byte32,
30};
31use rocksdb::{LogLevel, prelude::*};
32
33use std::marker::Send;
34use std::sync::{Arc, RwLock};
35use std::thread::sleep;
36use std::time::Duration;
37
38const INDEXER_NODE_TIP_GAP: u64 = 10;
39
40pub trait IndexerSync {
42 fn tip(&self) -> Result<Option<(BlockNumber, Byte32)>, Error>;
44 fn append(&self, block: &BlockView) -> Result<(), Error>;
46 fn rollback(&self) -> Result<(), Error>;
48 fn get_identity(&self) -> &str;
50 fn set_init_tip(&self, init_tip_number: u64, init_tip_hash: &H256);
52}
53
54pub fn new_secondary_db(ckb_db_config: &DBConfig, config: &IndexerSyncConfig) -> SecondaryDB {
56 let cf_names = vec![
57 COLUMN_INDEX,
58 COLUMN_META,
59 COLUMN_BLOCK_HEADER,
60 COLUMN_BLOCK_BODY,
61 COLUMN_BLOCK_UNCLE,
62 COLUMN_BLOCK_PROPOSAL_IDS,
63 COLUMN_BLOCK_EXTENSION,
64 ];
65 let secondary_opts = indexer_secondary_options();
66 SecondaryDB::open_cf(
67 &secondary_opts,
68 &ckb_db_config.path,
69 cf_names,
70 config.secondary_path.to_string_lossy().to_string(),
71 )
72}
73
74#[derive(Clone)]
76pub struct IndexerSyncService {
77 secondary_db: SecondaryDB,
78 pool_service: PoolService,
79 poll_interval: Duration,
80 async_handle: Handle,
81 init_tip_hash: Option<H256>,
82}
83
84impl IndexerSyncService {
85 pub fn new(
87 secondary_db: SecondaryDB,
88 pool_service: PoolService,
89 config: &IndexerSyncConfig,
90 async_handle: Handle,
91 init_tip_hash: Option<H256>,
92 ) -> Self {
93 Self {
94 secondary_db,
95 pool_service,
96 poll_interval: Duration::from_secs(config.poll_interval),
97 async_handle,
98 init_tip_hash,
99 }
100 }
101
102 fn apply_init_tip<I>(&self, indexer_service: I)
104 where
105 I: IndexerSync + Clone + Send + 'static,
106 {
107 if let Some(init_tip_hash) = &self.init_tip_hash {
108 let indexer_tip = indexer_service
109 .tip()
110 .expect("indexer_service tip should be OK");
111 if let Some((indexer_tip, _)) = indexer_tip {
112 if let Some(init_tip) = self.secondary_db.get_block_header(&init_tip_hash.into()) {
113 if indexer_tip >= init_tip.number() {
114 return;
115 }
116 }
117 }
118 loop {
119 if has_received_stop_signal() {
120 info!("apply_init_tip received exit signal, exit now");
121 break;
122 }
123
124 if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
125 error!("secondary_db try_catch_up_with_primary error {}", e);
126 }
127 if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.into()) {
128 let init_tip_number = header.number();
129 indexer_service.set_init_tip(init_tip_number, init_tip_hash);
130 break;
131 }
132 sleep(Duration::from_secs(1));
133 }
134 }
135 }
136
137 fn try_loop_sync<I>(&self, indexer: I)
138 where
139 I: IndexerSync + Clone + Send + 'static,
140 {
141 if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
142 error!("secondary_db try_catch_up_with_primary error {}", e);
143 }
144 loop {
145 if has_received_stop_signal() {
146 info!("try_loop_sync received exit signal, exit now");
147 break;
148 }
149
150 match indexer.tip() {
151 Ok(Some((tip_number, tip_hash))) => {
152 match self.get_block_by_number(tip_number + 1) {
153 Some(block) => {
154 if block.parent_hash() == tip_hash {
155 info!(
156 "{} append {}, {}",
157 indexer.get_identity(),
158 block.number(),
159 block.hash()
160 );
161 if let Err(e) = indexer.append(&block) {
162 error!("Failed to append block: {}. Will attempt to retry.", e);
163 }
164 } else {
165 info!(
166 "{} rollback {}, {}",
167 indexer.get_identity(),
168 tip_number,
169 tip_hash
170 );
171 indexer.rollback().expect("rollback block should be OK");
172 if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
173 error!(
174 "after rollback, secondary_db try_catch_up_with_primary error {}",
175 e
176 );
177 }
178 }
179 }
180 None => {
181 break;
182 }
183 }
184 }
185 Ok(None) => match self.get_block_by_number(0) {
186 Some(block) => {
187 if let Err(e) = indexer.append(&block) {
188 error!("Failed to append block: {}. Will attempt to retry.", e);
189 }
190 }
191 None => {
192 error!("CKB node returns an empty genesis block");
193 break;
194 }
195 },
196 Err(e) => {
197 error!("Failed to get tip: {}", e);
198 }
199 }
200 }
201 }
202
203 pub fn spawn_poll<I>(
205 &self,
206 notify_controller: NotifyController,
207 subscriber_name: String,
208 indexer_service: I,
209 ) where
210 I: IndexerSync + Clone + Send + 'static,
211 {
212 let initial_service = self.clone();
214 let indexer = indexer_service.clone();
215 let initial_syncing = self.async_handle.spawn_blocking(move || {
216 initial_service.apply_init_tip(indexer.clone());
217 initial_service.try_loop_sync(indexer)
218 });
219
220 let stop: CancellationToken = new_tokio_exit_rx();
222 let async_handle = self.async_handle.clone();
223 let poll_service = self.clone();
224 self.async_handle.spawn(async move {
225 let _initial_finished = initial_syncing.await;
226 if stop.is_cancelled() {
227 info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
228 return;
229 }
230
231 info!("initial_syncing finished");
232
233 let mut new_block_watcher = notify_controller.watch_new_block(subscriber_name).await;
234 let mut interval = time::interval(poll_service.poll_interval);
235 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
236 loop {
237 let indexer = indexer_service.clone();
238 tokio::select! {
239 Ok(_) = new_block_watcher.changed() => {
240 let service = poll_service.clone();
241 if let Err(e) = async_handle.spawn_blocking(move || {
242 service.try_loop_sync(indexer)
243 }).await {
244 error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
245 }
246 new_block_watcher.borrow_and_update();
247 },
248 _ = interval.tick() => {
249 let service = poll_service.clone();
250 if let Err(e) = async_handle.spawn_blocking(move || {
251 service.try_loop_sync(indexer)
252 }).await {
253 error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
254 }
255 }
256 _ = stop.cancelled() => {
257 info!("{} received exit signal, exit now", indexer_service.get_identity());
258 break
259 },
260 }
261 }
262 });
263 }
264
265 pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
267 self.pool_service.pool()
268 }
269
270 pub fn index_tx_pool<I>(&mut self, indexer_service: I, notify_controller: NotifyController)
272 where
273 I: IndexerSync + Clone + Send + 'static,
274 {
275 let secondary_db = self.secondary_db.clone();
276 let check_index_tx_pool_ready = self.async_handle.spawn_blocking(move || {
277 loop {
278 if has_received_stop_signal() {
279 info!("check_index_tx_pool_ready received exit signal, exit now");
280 break;
281 }
282
283 if let Err(e) = secondary_db.try_catch_up_with_primary() {
284 error!("secondary_db try_catch_up_with_primary error {}", e);
285 }
286 if let (Some(header), Ok(Some((indexer_tip, _)))) =
287 (secondary_db.get_tip_header(), indexer_service.tip())
288 {
289 let node_tip = header.number();
290 if node_tip - indexer_tip <= INDEXER_NODE_TIP_GAP {
291 break;
292 }
293 }
294 std::thread::sleep(Duration::from_secs(1));
295 }
296 });
297
298 self.pool_service
299 .index_tx_pool(notify_controller, check_index_tx_pool_ready);
300 }
301
302 fn get_block_by_number(&self, block_number: u64) -> Option<core::BlockView> {
303 let block_hash = self.secondary_db.get_block_hash(block_number)?;
304 self.secondary_db.get_block(&block_hash)
305 }
306}
307
308fn indexer_secondary_options() -> Options {
309 let mut opts = Options::default();
310 opts.create_if_missing(true);
311 opts.create_missing_column_families(true);
312 opts.set_log_level(LogLevel::Warn);
313 opts
314}