1pub(crate) mod custom_filters;
4pub(crate) mod error;
5pub(crate) mod pool;
6pub(crate) mod store;
7
8pub use crate::custom_filters::CustomFilters;
9pub use crate::error::Error;
10pub use crate::pool::{Pool, PoolService};
11pub use crate::store::SecondaryDB;
12
13use ckb_app_config::{DBConfig, IndexerSyncConfig};
14use ckb_async_runtime::{
15 tokio::{self, time},
16 Handle,
17};
18use ckb_db_schema::{
19 COLUMN_BLOCK_BODY, COLUMN_BLOCK_EXTENSION, COLUMN_BLOCK_HEADER, COLUMN_BLOCK_PROPOSAL_IDS,
20 COLUMN_BLOCK_UNCLE, COLUMN_INDEX, COLUMN_META,
21};
22use ckb_logger::{error, info};
23use ckb_notify::NotifyController;
24use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken};
25use ckb_store::ChainStore;
26use ckb_types::{
27 core::{self, BlockNumber, BlockView},
28 packed::Byte32,
29 prelude::*,
30 H256,
31};
32use rocksdb::prelude::*;
33
34use std::marker::Send;
35use std::num::NonZeroUsize;
36use std::sync::{Arc, RwLock};
37use std::thread::sleep;
38use std::time::Duration;
39
40const DEFAULT_LOG_KEEP_NUM: usize = 1;
41const INDEXER_NODE_TIP_GAP: u64 = 10;
42
43pub trait IndexerSync {
45 fn tip(&self) -> Result<Option<(BlockNumber, Byte32)>, Error>;
47 fn append(&self, block: &BlockView) -> Result<(), Error>;
49 fn rollback(&self) -> Result<(), Error>;
51 fn get_identity(&self) -> &str;
53 fn set_init_tip(&self, init_tip_number: u64, init_tip_hash: &H256);
55}
56
57pub fn new_secondary_db(ckb_db_config: &DBConfig, config: &IndexerSyncConfig) -> SecondaryDB {
59 let cf_names = vec![
60 COLUMN_INDEX,
61 COLUMN_META,
62 COLUMN_BLOCK_HEADER,
63 COLUMN_BLOCK_BODY,
64 COLUMN_BLOCK_UNCLE,
65 COLUMN_BLOCK_PROPOSAL_IDS,
66 COLUMN_BLOCK_EXTENSION,
67 ];
68 let secondary_opts = indexer_secondary_options(config);
69 SecondaryDB::open_cf(
70 &secondary_opts,
71 &ckb_db_config.path,
72 cf_names,
73 config.secondary_path.to_string_lossy().to_string(),
74 )
75}
76
77#[derive(Clone)]
79pub struct IndexerSyncService {
80 secondary_db: SecondaryDB,
81 pool_service: PoolService,
82 poll_interval: Duration,
83 async_handle: Handle,
84 init_tip_hash: Option<H256>,
85}
86
87impl IndexerSyncService {
88 pub fn new(
90 secondary_db: SecondaryDB,
91 pool_service: PoolService,
92 config: &IndexerSyncConfig,
93 async_handle: Handle,
94 init_tip_hash: Option<H256>,
95 ) -> Self {
96 Self {
97 secondary_db,
98 pool_service,
99 poll_interval: Duration::from_secs(config.poll_interval),
100 async_handle,
101 init_tip_hash,
102 }
103 }
104
105 fn apply_init_tip<I>(&self, indexer_service: I)
107 where
108 I: IndexerSync + Clone + Send + 'static,
109 {
110 if let Some(init_tip_hash) = &self.init_tip_hash {
111 let indexer_tip = indexer_service
112 .tip()
113 .expect("indexer_service tip should be OK");
114 if let Some((indexer_tip, _)) = indexer_tip {
115 if let Some(init_tip) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
116 if indexer_tip >= init_tip.number() {
117 return;
118 }
119 }
120 }
121 loop {
122 if has_received_stop_signal() {
123 info!("apply_init_tip received exit signal, exit now");
124 break;
125 }
126
127 if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
128 error!("secondary_db try_catch_up_with_primary error {}", e);
129 }
130 if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
131 let init_tip_number = header.number();
132 indexer_service.set_init_tip(init_tip_number, init_tip_hash);
133 break;
134 }
135 sleep(Duration::from_secs(1));
136 }
137 }
138 }
139
140 fn try_loop_sync<I>(&self, indexer: I)
141 where
142 I: IndexerSync + Clone + Send + 'static,
143 {
144 if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
145 error!("secondary_db try_catch_up_with_primary error {}", e);
146 }
147 loop {
148 if has_received_stop_signal() {
149 info!("try_loop_sync received exit signal, exit now");
150 break;
151 }
152
153 match indexer.tip() {
154 Ok(Some((tip_number, tip_hash))) => {
155 match self.get_block_by_number(tip_number + 1) {
156 Some(block) => {
157 if block.parent_hash() == tip_hash {
158 info!(
159 "{} append {}, {}",
160 indexer.get_identity(),
161 block.number(),
162 block.hash()
163 );
164 if let Err(e) = indexer.append(&block) {
165 error!("Failed to append block: {}. Will attempt to retry.", e);
166 }
167 } else {
168 info!(
169 "{} rollback {}, {}",
170 indexer.get_identity(),
171 tip_number,
172 tip_hash
173 );
174 indexer.rollback().expect("rollback block should be OK");
175 }
176 }
177 None => {
178 break;
179 }
180 }
181 }
182 Ok(None) => match self.get_block_by_number(0) {
183 Some(block) => {
184 if let Err(e) = indexer.append(&block) {
185 error!("Failed to append block: {}. Will attempt to retry.", e);
186 }
187 }
188 None => {
189 error!("CKB node returns an empty genesis block");
190 break;
191 }
192 },
193 Err(e) => {
194 error!("Failed to get tip: {}", e);
195 }
196 }
197 }
198 }
199
200 pub fn spawn_poll<I>(
202 &self,
203 notify_controller: NotifyController,
204 subscriber_name: String,
205 indexer_service: I,
206 ) where
207 I: IndexerSync + Clone + Send + 'static,
208 {
209 let initial_service = self.clone();
211 let indexer = indexer_service.clone();
212 let initial_syncing = self.async_handle.spawn_blocking(move || {
213 initial_service.apply_init_tip(indexer.clone());
214 initial_service.try_loop_sync(indexer)
215 });
216
217 let stop: CancellationToken = new_tokio_exit_rx();
219 let async_handle = self.async_handle.clone();
220 let poll_service = self.clone();
221 self.async_handle.spawn(async move {
222 let _initial_finished = initial_syncing.await;
223 if stop.is_cancelled() {
224 info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
225 return;
226 }
227
228 info!("initial_syncing finished");
229
230 let mut new_block_watcher = notify_controller.watch_new_block(subscriber_name).await;
231 let mut interval = time::interval(poll_service.poll_interval);
232 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
233 loop {
234 let indexer = indexer_service.clone();
235 tokio::select! {
236 Ok(_) = new_block_watcher.changed() => {
237 let service = poll_service.clone();
238 if let Err(e) = async_handle.spawn_blocking(move || {
239 service.try_loop_sync(indexer)
240 }).await {
241 error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
242 }
243 new_block_watcher.borrow_and_update();
244 },
245 _ = interval.tick() => {
246 let service = poll_service.clone();
247 if let Err(e) = async_handle.spawn_blocking(move || {
248 service.try_loop_sync(indexer)
249 }).await {
250 error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
251 }
252 }
253 _ = stop.cancelled() => {
254 info!("{} received exit signal, exit now", indexer_service.get_identity());
255 break
256 },
257 }
258 }
259 });
260 }
261
262 pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
264 self.pool_service.pool()
265 }
266
267 pub fn index_tx_pool<I>(&mut self, indexer_service: I, notify_controller: NotifyController)
269 where
270 I: IndexerSync + Clone + Send + 'static,
271 {
272 let secondary_db = self.secondary_db.clone();
273 let check_index_tx_pool_ready = self.async_handle.spawn_blocking(move || loop {
274 if has_received_stop_signal() {
275 info!("check_index_tx_pool_ready received exit signal, exit now");
276 break;
277 }
278
279 if let Err(e) = secondary_db.try_catch_up_with_primary() {
280 error!("secondary_db try_catch_up_with_primary error {}", e);
281 }
282 if let (Some(header), Ok(Some((indexer_tip, _)))) =
283 (secondary_db.get_tip_header(), indexer_service.tip())
284 {
285 let node_tip = header.number();
286 if node_tip - indexer_tip <= INDEXER_NODE_TIP_GAP {
287 break;
288 }
289 }
290 std::thread::sleep(Duration::from_secs(1));
291 });
292
293 self.pool_service
294 .index_tx_pool(notify_controller, check_index_tx_pool_ready);
295 }
296
297 fn get_block_by_number(&self, block_number: u64) -> Option<core::BlockView> {
298 let block_hash = self.secondary_db.get_block_hash(block_number)?;
299 self.secondary_db.get_block(&block_hash)
300 }
301}
302
303fn indexer_secondary_options(config: &IndexerSyncConfig) -> Options {
304 let mut opts = Options::default();
305 opts.create_if_missing(true);
306 opts.create_missing_column_families(true);
307 opts.set_keep_log_file_num(
308 config
309 .db_keep_log_file_num
310 .map(NonZeroUsize::get)
311 .unwrap_or(DEFAULT_LOG_KEEP_NUM),
312 );
313 opts
314}