ckb_indexer_sync/
lib.rs

1//! The built-in synchronization service in CKB can provide block synchronization services for indexers.
2
3pub(crate) mod custom_filters;
4pub(crate) mod error;
5pub(crate) mod pool;
6pub(crate) mod store;
7
8pub use crate::custom_filters::CustomFilters;
9pub use crate::error::Error;
10pub use crate::pool::{Pool, PoolService};
11pub use crate::store::SecondaryDB;
12
13use ckb_app_config::{DBConfig, IndexerSyncConfig};
14use ckb_async_runtime::{
15    tokio::{self, time},
16    Handle,
17};
18use ckb_db_schema::{
19    COLUMN_BLOCK_BODY, COLUMN_BLOCK_EXTENSION, COLUMN_BLOCK_HEADER, COLUMN_BLOCK_PROPOSAL_IDS,
20    COLUMN_BLOCK_UNCLE, COLUMN_INDEX, COLUMN_META,
21};
22use ckb_logger::{error, info};
23use ckb_notify::NotifyController;
24use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken};
25use ckb_store::ChainStore;
26use ckb_types::{
27    core::{self, BlockNumber, BlockView},
28    packed::Byte32,
29    prelude::*,
30    H256,
31};
32use rocksdb::prelude::*;
33
34use std::marker::Send;
35use std::num::NonZeroUsize;
36use std::sync::{Arc, RwLock};
37use std::thread::sleep;
38use std::time::Duration;
39
40const DEFAULT_LOG_KEEP_NUM: usize = 1;
41const INDEXER_NODE_TIP_GAP: u64 = 10;
42
43/// Trait for an indexer's synchronization interface
44pub trait IndexerSync {
45    /// Retrieves the tip of the indexer
46    fn tip(&self) -> Result<Option<(BlockNumber, Byte32)>, Error>;
47    /// Appends a new block to the indexer
48    fn append(&self, block: &BlockView) -> Result<(), Error>;
49    /// Rollback the indexer to a previous state
50    fn rollback(&self) -> Result<(), Error>;
51    /// Get indexer identity
52    fn get_identity(&self) -> &str;
53    /// Set init tip
54    fn set_init_tip(&self, init_tip_number: u64, init_tip_hash: &H256);
55}
56
57/// Construct new secondary db instance
58pub fn new_secondary_db(ckb_db_config: &DBConfig, config: &IndexerSyncConfig) -> SecondaryDB {
59    let cf_names = vec![
60        COLUMN_INDEX,
61        COLUMN_META,
62        COLUMN_BLOCK_HEADER,
63        COLUMN_BLOCK_BODY,
64        COLUMN_BLOCK_UNCLE,
65        COLUMN_BLOCK_PROPOSAL_IDS,
66        COLUMN_BLOCK_EXTENSION,
67    ];
68    let secondary_opts = indexer_secondary_options(config);
69    SecondaryDB::open_cf(
70        &secondary_opts,
71        &ckb_db_config.path,
72        cf_names,
73        config.secondary_path.to_string_lossy().to_string(),
74    )
75}
76
77/// Indexer sync service
78#[derive(Clone)]
79pub struct IndexerSyncService {
80    secondary_db: SecondaryDB,
81    pool_service: PoolService,
82    poll_interval: Duration,
83    async_handle: Handle,
84    init_tip_hash: Option<H256>,
85}
86
87impl IndexerSyncService {
88    /// Construct new Indexer sync service instance
89    pub fn new(
90        secondary_db: SecondaryDB,
91        pool_service: PoolService,
92        config: &IndexerSyncConfig,
93        async_handle: Handle,
94        init_tip_hash: Option<H256>,
95    ) -> Self {
96        Self {
97            secondary_db,
98            pool_service,
99            poll_interval: Duration::from_secs(config.poll_interval),
100            async_handle,
101            init_tip_hash,
102        }
103    }
104
105    /// Apply init tip
106    fn apply_init_tip<I>(&self, indexer_service: I)
107    where
108        I: IndexerSync + Clone + Send + 'static,
109    {
110        if let Some(init_tip_hash) = &self.init_tip_hash {
111            let indexer_tip = indexer_service
112                .tip()
113                .expect("indexer_service tip should be OK");
114            if let Some((indexer_tip, _)) = indexer_tip {
115                if let Some(init_tip) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
116                    if indexer_tip >= init_tip.number() {
117                        return;
118                    }
119                }
120            }
121            loop {
122                if has_received_stop_signal() {
123                    info!("apply_init_tip received exit signal, exit now");
124                    break;
125                }
126
127                if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
128                    error!("secondary_db try_catch_up_with_primary error {}", e);
129                }
130                if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
131                    let init_tip_number = header.number();
132                    indexer_service.set_init_tip(init_tip_number, init_tip_hash);
133                    break;
134                }
135                sleep(Duration::from_secs(1));
136            }
137        }
138    }
139
140    fn try_loop_sync<I>(&self, indexer: I)
141    where
142        I: IndexerSync + Clone + Send + 'static,
143    {
144        if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
145            error!("secondary_db try_catch_up_with_primary error {}", e);
146        }
147        loop {
148            if has_received_stop_signal() {
149                info!("try_loop_sync received exit signal, exit now");
150                break;
151            }
152
153            match indexer.tip() {
154                Ok(Some((tip_number, tip_hash))) => {
155                    match self.get_block_by_number(tip_number + 1) {
156                        Some(block) => {
157                            if block.parent_hash() == tip_hash {
158                                info!(
159                                    "{} append {}, {}",
160                                    indexer.get_identity(),
161                                    block.number(),
162                                    block.hash()
163                                );
164                                if let Err(e) = indexer.append(&block) {
165                                    error!("Failed to append block: {}. Will attempt to retry.", e);
166                                }
167                            } else {
168                                info!(
169                                    "{} rollback {}, {}",
170                                    indexer.get_identity(),
171                                    tip_number,
172                                    tip_hash
173                                );
174                                indexer.rollback().expect("rollback block should be OK");
175                            }
176                        }
177                        None => {
178                            break;
179                        }
180                    }
181                }
182                Ok(None) => match self.get_block_by_number(0) {
183                    Some(block) => {
184                        if let Err(e) = indexer.append(&block) {
185                            error!("Failed to append block: {}. Will attempt to retry.", e);
186                        }
187                    }
188                    None => {
189                        error!("CKB node returns an empty genesis block");
190                        break;
191                    }
192                },
193                Err(e) => {
194                    error!("Failed to get tip: {}", e);
195                }
196            }
197        }
198    }
199
200    /// Processes that handle block cell and expect to be spawned to run in tokio runtime
201    pub fn spawn_poll<I>(
202        &self,
203        notify_controller: NotifyController,
204        subscriber_name: String,
205        indexer_service: I,
206    ) where
207        I: IndexerSync + Clone + Send + 'static,
208    {
209        // Initial sync
210        let initial_service = self.clone();
211        let indexer = indexer_service.clone();
212        let initial_syncing = self.async_handle.spawn_blocking(move || {
213            initial_service.apply_init_tip(indexer.clone());
214            initial_service.try_loop_sync(indexer)
215        });
216
217        // Follow-up sync
218        let stop: CancellationToken = new_tokio_exit_rx();
219        let async_handle = self.async_handle.clone();
220        let poll_service = self.clone();
221        self.async_handle.spawn(async move {
222            let _initial_finished = initial_syncing.await;
223            if stop.is_cancelled() {
224                info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
225                return;
226            }
227
228            info!("initial_syncing finished");
229
230            let mut new_block_watcher = notify_controller.watch_new_block(subscriber_name).await;
231            let mut interval = time::interval(poll_service.poll_interval);
232            interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
233            loop {
234                let indexer = indexer_service.clone();
235                tokio::select! {
236                    Ok(_) = new_block_watcher.changed() => {
237                        let service = poll_service.clone();
238                        if let Err(e) = async_handle.spawn_blocking(move || {
239                            service.try_loop_sync(indexer)
240                        }).await {
241                            error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
242                        }
243                        new_block_watcher.borrow_and_update();
244                    },
245                    _ = interval.tick() => {
246                        let service = poll_service.clone();
247                        if let Err(e) = async_handle.spawn_blocking(move || {
248                            service.try_loop_sync(indexer)
249                        }).await {
250                            error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
251                        }
252                    }
253                    _ = stop.cancelled() => {
254                        info!("{} received exit signal, exit now", indexer_service.get_identity());
255                        break
256                    },
257                }
258            }
259        });
260    }
261
262    /// Get index data based on transaction pool synchronization
263    pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
264        self.pool_service.pool()
265    }
266
267    /// Index transaction pool
268    pub fn index_tx_pool<I>(&mut self, indexer_service: I, notify_controller: NotifyController)
269    where
270        I: IndexerSync + Clone + Send + 'static,
271    {
272        let secondary_db = self.secondary_db.clone();
273        let check_index_tx_pool_ready = self.async_handle.spawn_blocking(move || loop {
274            if has_received_stop_signal() {
275                info!("check_index_tx_pool_ready received exit signal, exit now");
276                break;
277            }
278
279            if let Err(e) = secondary_db.try_catch_up_with_primary() {
280                error!("secondary_db try_catch_up_with_primary error {}", e);
281            }
282            if let (Some(header), Ok(Some((indexer_tip, _)))) =
283                (secondary_db.get_tip_header(), indexer_service.tip())
284            {
285                let node_tip = header.number();
286                if node_tip - indexer_tip <= INDEXER_NODE_TIP_GAP {
287                    break;
288                }
289            }
290            std::thread::sleep(Duration::from_secs(1));
291        });
292
293        self.pool_service
294            .index_tx_pool(notify_controller, check_index_tx_pool_ready);
295    }
296
297    fn get_block_by_number(&self, block_number: u64) -> Option<core::BlockView> {
298        let block_hash = self.secondary_db.get_block_hash(block_number)?;
299        self.secondary_db.get_block(&block_hash)
300    }
301}
302
303fn indexer_secondary_options(config: &IndexerSyncConfig) -> Options {
304    let mut opts = Options::default();
305    opts.create_if_missing(true);
306    opts.create_missing_column_families(true);
307    opts.set_keep_log_file_num(
308        config
309            .db_keep_log_file_num
310            .map(NonZeroUsize::get)
311            .unwrap_or(DEFAULT_LOG_KEEP_NUM),
312    );
313    opts
314}