ckb_indexer_sync/
lib.rs

1//! The built-in synchronization service in CKB can provide block synchronization services for indexers.
2
3pub(crate) mod custom_filters;
4pub(crate) mod error;
5pub(crate) mod pool;
6pub(crate) mod store;
7
8pub use crate::custom_filters::CustomFilters;
9pub use crate::error::Error;
10pub use crate::pool::{Pool, PoolService};
11pub use crate::store::SecondaryDB;
12
13use ckb_app_config::{DBConfig, IndexerSyncConfig};
14use ckb_async_runtime::{
15    Handle,
16    tokio::{self, time},
17};
18use ckb_db_schema::{
19    COLUMN_BLOCK_BODY, COLUMN_BLOCK_EXTENSION, COLUMN_BLOCK_HEADER, COLUMN_BLOCK_PROPOSAL_IDS,
20    COLUMN_BLOCK_UNCLE, COLUMN_INDEX, COLUMN_META,
21};
22use ckb_logger::{error, info};
23use ckb_notify::NotifyController;
24use ckb_stop_handler::{CancellationToken, has_received_stop_signal, new_tokio_exit_rx};
25use ckb_store::ChainStore;
26use ckb_types::{
27    H256,
28    core::{self, BlockNumber, BlockView},
29    packed::Byte32,
30};
31use rocksdb::{LogLevel, prelude::*};
32
33use std::marker::Send;
34use std::sync::{Arc, RwLock};
35use std::thread::sleep;
36use std::time::Duration;
37
38const INDEXER_NODE_TIP_GAP: u64 = 10;
39
40/// Trait for an indexer's synchronization interface
41pub trait IndexerSync {
42    /// Retrieves the tip of the indexer
43    fn tip(&self) -> Result<Option<(BlockNumber, Byte32)>, Error>;
44    /// Appends a new block to the indexer
45    fn append(&self, block: &BlockView) -> Result<(), Error>;
46    /// Rollback the indexer to a previous state
47    fn rollback(&self) -> Result<(), Error>;
48    /// Get indexer identity
49    fn get_identity(&self) -> &str;
50    /// Set init tip
51    fn set_init_tip(&self, init_tip_number: u64, init_tip_hash: &H256);
52}
53
54/// Construct new secondary db instance
55pub fn new_secondary_db(ckb_db_config: &DBConfig, config: &IndexerSyncConfig) -> SecondaryDB {
56    let cf_names = vec![
57        COLUMN_INDEX,
58        COLUMN_META,
59        COLUMN_BLOCK_HEADER,
60        COLUMN_BLOCK_BODY,
61        COLUMN_BLOCK_UNCLE,
62        COLUMN_BLOCK_PROPOSAL_IDS,
63        COLUMN_BLOCK_EXTENSION,
64    ];
65    let secondary_opts = indexer_secondary_options();
66    SecondaryDB::open_cf(
67        &secondary_opts,
68        &ckb_db_config.path,
69        cf_names,
70        config.secondary_path.to_string_lossy().to_string(),
71    )
72}
73
74/// Indexer sync service
75#[derive(Clone)]
76pub struct IndexerSyncService {
77    secondary_db: SecondaryDB,
78    pool_service: PoolService,
79    poll_interval: Duration,
80    async_handle: Handle,
81    init_tip_hash: Option<H256>,
82}
83
84impl IndexerSyncService {
85    /// Construct new Indexer sync service instance
86    pub fn new(
87        secondary_db: SecondaryDB,
88        pool_service: PoolService,
89        config: &IndexerSyncConfig,
90        async_handle: Handle,
91        init_tip_hash: Option<H256>,
92    ) -> Self {
93        Self {
94            secondary_db,
95            pool_service,
96            poll_interval: Duration::from_secs(config.poll_interval),
97            async_handle,
98            init_tip_hash,
99        }
100    }
101
102    /// Apply init tip
103    fn apply_init_tip<I>(&self, indexer_service: I)
104    where
105        I: IndexerSync + Clone + Send + 'static,
106    {
107        if let Some(init_tip_hash) = &self.init_tip_hash {
108            let indexer_tip = indexer_service
109                .tip()
110                .expect("indexer_service tip should be OK");
111            if let Some((indexer_tip, _)) = indexer_tip {
112                if let Some(init_tip) = self.secondary_db.get_block_header(&init_tip_hash.into()) {
113                    if indexer_tip >= init_tip.number() {
114                        return;
115                    }
116                }
117            }
118            loop {
119                if has_received_stop_signal() {
120                    info!("apply_init_tip received exit signal, exit now");
121                    break;
122                }
123
124                if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
125                    error!("secondary_db try_catch_up_with_primary error {}", e);
126                }
127                if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.into()) {
128                    let init_tip_number = header.number();
129                    indexer_service.set_init_tip(init_tip_number, init_tip_hash);
130                    break;
131                }
132                sleep(Duration::from_secs(1));
133            }
134        }
135    }
136
137    fn try_loop_sync<I>(&self, indexer: I)
138    where
139        I: IndexerSync + Clone + Send + 'static,
140    {
141        if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
142            error!("secondary_db try_catch_up_with_primary error {}", e);
143        }
144        loop {
145            if has_received_stop_signal() {
146                info!("try_loop_sync received exit signal, exit now");
147                break;
148            }
149
150            match indexer.tip() {
151                Ok(Some((tip_number, tip_hash))) => {
152                    match self.get_block_by_number(tip_number + 1) {
153                        Some(block) => {
154                            if block.parent_hash() == tip_hash {
155                                info!(
156                                    "{} append {}, {}",
157                                    indexer.get_identity(),
158                                    block.number(),
159                                    block.hash()
160                                );
161                                if let Err(e) = indexer.append(&block) {
162                                    error!("Failed to append block: {}. Will attempt to retry.", e);
163                                }
164                            } else {
165                                info!(
166                                    "{} rollback {}, {}",
167                                    indexer.get_identity(),
168                                    tip_number,
169                                    tip_hash
170                                );
171                                indexer.rollback().expect("rollback block should be OK");
172                                if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
173                                    error!(
174                                        "after rollback, secondary_db try_catch_up_with_primary error {}",
175                                        e
176                                    );
177                                }
178                            }
179                        }
180                        None => {
181                            break;
182                        }
183                    }
184                }
185                Ok(None) => match self.get_block_by_number(0) {
186                    Some(block) => {
187                        if let Err(e) = indexer.append(&block) {
188                            error!("Failed to append block: {}. Will attempt to retry.", e);
189                        }
190                    }
191                    None => {
192                        error!("CKB node returns an empty genesis block");
193                        break;
194                    }
195                },
196                Err(e) => {
197                    error!("Failed to get tip: {}", e);
198                }
199            }
200        }
201    }
202
203    /// Processes that handle block cell and expect to be spawned to run in tokio runtime
204    pub fn spawn_poll<I>(
205        &self,
206        notify_controller: NotifyController,
207        subscriber_name: String,
208        indexer_service: I,
209    ) where
210        I: IndexerSync + Clone + Send + 'static,
211    {
212        // Initial sync
213        let initial_service = self.clone();
214        let indexer = indexer_service.clone();
215        let initial_syncing = self.async_handle.spawn_blocking(move || {
216            initial_service.apply_init_tip(indexer.clone());
217            initial_service.try_loop_sync(indexer)
218        });
219
220        // Follow-up sync
221        let stop: CancellationToken = new_tokio_exit_rx();
222        let async_handle = self.async_handle.clone();
223        let poll_service = self.clone();
224        self.async_handle.spawn(async move {
225            let _initial_finished = initial_syncing.await;
226            if stop.is_cancelled() {
227                info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
228                return;
229            }
230
231            info!("initial_syncing finished");
232
233            let mut new_block_watcher = notify_controller.watch_new_block(subscriber_name).await;
234            let mut interval = time::interval(poll_service.poll_interval);
235            interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
236            loop {
237                let indexer = indexer_service.clone();
238                tokio::select! {
239                    Ok(_) = new_block_watcher.changed() => {
240                        let service = poll_service.clone();
241                        if let Err(e) = async_handle.spawn_blocking(move || {
242                            service.try_loop_sync(indexer)
243                        }).await {
244                            error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
245                        }
246                        new_block_watcher.borrow_and_update();
247                    },
248                    _ = interval.tick() => {
249                        let service = poll_service.clone();
250                        if let Err(e) = async_handle.spawn_blocking(move || {
251                            service.try_loop_sync(indexer)
252                        }).await {
253                            error!("{} syncing join error {:?}", indexer_service.get_identity(), e);
254                        }
255                    }
256                    _ = stop.cancelled() => {
257                        info!("{} received exit signal, exit now", indexer_service.get_identity());
258                        break
259                    },
260                }
261            }
262        });
263    }
264
265    /// Get index data based on transaction pool synchronization
266    pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
267        self.pool_service.pool()
268    }
269
270    /// Index transaction pool
271    pub fn index_tx_pool<I>(&mut self, indexer_service: I, notify_controller: NotifyController)
272    where
273        I: IndexerSync + Clone + Send + 'static,
274    {
275        let secondary_db = self.secondary_db.clone();
276        let check_index_tx_pool_ready = self.async_handle.spawn_blocking(move || {
277            loop {
278                if has_received_stop_signal() {
279                    info!("check_index_tx_pool_ready received exit signal, exit now");
280                    break;
281                }
282
283                if let Err(e) = secondary_db.try_catch_up_with_primary() {
284                    error!("secondary_db try_catch_up_with_primary error {}", e);
285                }
286                if let (Some(header), Ok(Some((indexer_tip, _)))) =
287                    (secondary_db.get_tip_header(), indexer_service.tip())
288                {
289                    let node_tip = header.number();
290                    if node_tip - indexer_tip <= INDEXER_NODE_TIP_GAP {
291                        break;
292                    }
293                }
294                std::thread::sleep(Duration::from_secs(1));
295            }
296        });
297
298        self.pool_service
299            .index_tx_pool(notify_controller, check_index_tx_pool_ready);
300    }
301
302    fn get_block_by_number(&self, block_number: u64) -> Option<core::BlockView> {
303        let block_hash = self.secondary_db.get_block_hash(block_number)?;
304        self.secondary_db.get_block(&block_hash)
305    }
306}
307
308fn indexer_secondary_options() -> Options {
309    let mut opts = Options::default();
310    opts.create_if_missing(true);
311    opts.create_missing_column_families(true);
312    opts.set_log_level(LogLevel::Warn);
313    opts
314}