ckb_shared/
shared_builder.rs

1//! shared_builder provide SharedBuilder and SharedPacakge
2use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5    BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6    StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24    TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::{
29    core::EpochExt,
30    core::HeaderView,
31    core::tx_pool::{PoolTransactionEntry, Reject},
32};
33use ckb_util::Mutex;
34use ckb_verification::cache::init_cache;
35use dashmap::DashMap;
36use std::cmp::Ordering;
37use std::collections::HashSet;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::AtomicBool;
41use tempfile::TempDir;
42
43/// Shared builder for construct new shared.
44pub struct SharedBuilder {
45    db: RocksDB,
46    ancient_path: Option<PathBuf>,
47    consensus: Consensus,
48    tx_pool_config: Option<TxPoolConfig>,
49    store_config: Option<StoreConfig>,
50    sync_config: Option<SyncConfig>,
51    block_assembler_config: Option<BlockAssemblerConfig>,
52    notify_config: Option<NotifyConfig>,
53    async_handle: Handle,
54    fee_estimator_config: Option<FeeEstimatorConfig>,
55
56    header_map_tmp_dir: Option<PathBuf>,
57}
58
59/// Open or create a rocksdb
60pub fn open_or_create_db(
61    bin_name: &str,
62    root_dir: &Path,
63    config: &DBConfig,
64    hardforks: HardForks,
65) -> Result<RocksDB, ExitCode> {
66    let migrate = Migrate::new(&config.path, hardforks);
67
68    let read_only_db = migrate.open_read_only_db().map_err(|e| {
69        eprintln!("Migration error {e}");
70        ExitCode::Failure
71    })?;
72
73    if let Some(db) = read_only_db {
74        match migrate.check(&db, true) {
75            Ordering::Greater => {
76                eprintln!(
77                    "The database was created by a higher version CKB executable binary \n\
78                     and cannot be opened by the current binary.\n\
79                     Please download the latest CKB executable binary."
80                );
81                Err(ExitCode::Failure)
82            }
83            Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
84            Ordering::Less => {
85                let can_run_in_background = migrate.can_run_in_background(&db);
86                if migrate.require_expensive(&db, false) && !can_run_in_background {
87                    eprintln!(
88                        "For optimal performance, CKB recommends migrating your data into a new format.\n\
89                        If you prefer to stick with the older version, \n\
90                        it's important to note that they may have unfixed vulnerabilities.\n\
91                        Before migrating, we strongly recommend backuping your data directory.\n\
92                        To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
93                        bin_name,
94                        root_dir.display()
95                    );
96                    Err(ExitCode::Failure)
97                } else if can_run_in_background {
98                    info!("process migrations in background ...");
99                    let db = RocksDB::open(config, COLUMNS);
100                    migrate.migrate(db.clone(), true).map_err(|err| {
101                        eprintln!("Run error: {err:?}");
102                        ExitCode::Failure
103                    })?;
104                    Ok(db)
105                } else {
106                    info!("Processing fast migrations ...");
107
108                    let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
109                        eprintln!("Migration error {e}");
110                        ExitCode::Failure
111                    })?;
112
113                    if let Some(db) = bulk_load_db_db {
114                        migrate.migrate(db, false).map_err(|err| {
115                            eprintln!("Run error: {err:?}");
116                            ExitCode::Failure
117                        })?;
118                    }
119
120                    Ok(RocksDB::open(config, COLUMNS))
121                }
122            }
123        }
124    } else {
125        let db = RocksDB::open(config, COLUMNS);
126        migrate.init_db_version(&db).map_err(|e| {
127            eprintln!("Migrate init_db_version error {e}");
128            ExitCode::Failure
129        })?;
130        Ok(db)
131    }
132}
133
134impl SharedBuilder {
135    /// Generates the base SharedBuilder with ancient path and async_handle
136    pub fn new(
137        bin_name: &str,
138        root_dir: &Path,
139        db_config: &DBConfig,
140        ancient: Option<PathBuf>,
141        async_handle: Handle,
142        consensus: Consensus,
143    ) -> Result<SharedBuilder, ExitCode> {
144        let db = open_or_create_db(
145            bin_name,
146            root_dir,
147            db_config,
148            consensus.hardfork_switch.clone(),
149        )?;
150
151        Ok(SharedBuilder {
152            db,
153            ancient_path: ancient,
154            consensus,
155            tx_pool_config: None,
156            notify_config: None,
157            store_config: None,
158            sync_config: None,
159            block_assembler_config: None,
160            async_handle,
161            fee_estimator_config: None,
162            header_map_tmp_dir: None,
163        })
164    }
165
166    /// Generates the SharedBuilder with temp db
167    /// NOTICE: this is only used in testing
168    pub fn with_temp_db() -> Self {
169        use std::{
170            borrow::Borrow,
171            sync::atomic::{AtomicUsize, Ordering},
172        };
173
174        // once #[thread_local] is stable
175        // #[thread_local]
176        // static RUNTIME_HANDLE: unsync::OnceCell<...
177        thread_local! {
178            // NOTICE:we can't put the runtime directly into thread_local here,
179            // on windows the runtime in thread_local will get stuck when dropping
180            static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
181        }
182
183        static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
184        static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
185
186        let db = {
187            let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
188            let db_base_dir = TMP_DIR
189                .borrow()
190                .get_or_init(|| TempDir::new().unwrap())
191                .path()
192                .to_path_buf();
193            let db_dir = db_base_dir.join(format!("db_{db_id}"));
194            RocksDB::open_in(db_dir, COLUMNS)
195        };
196
197        RUNTIME_HANDLE.with(|runtime| SharedBuilder {
198            db,
199            ancient_path: None,
200            consensus: Consensus::default(),
201            tx_pool_config: None,
202            notify_config: None,
203            store_config: None,
204            sync_config: None,
205            block_assembler_config: None,
206            async_handle: runtime.get_or_init(new_background_runtime).clone(),
207            fee_estimator_config: None,
208
209            header_map_tmp_dir: None,
210        })
211    }
212}
213
214impl SharedBuilder {
215    /// Sets the consensus configuration for the shared state.
216    pub fn consensus(mut self, value: Consensus) -> Self {
217        self.consensus = value;
218        self
219    }
220
221    /// Sets the transaction pool configuration.
222    pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
223        self.tx_pool_config = Some(config);
224        self
225    }
226
227    /// Sets the notification service configuration.
228    pub fn notify_config(mut self, config: NotifyConfig) -> Self {
229        self.notify_config = Some(config);
230        self
231    }
232
233    /// Sets the store configuration.
234    pub fn store_config(mut self, config: StoreConfig) -> Self {
235        self.store_config = Some(config);
236        self
237    }
238
239    /// Sets the sync configuration.
240    pub fn sync_config(mut self, config: SyncConfig) -> Self {
241        self.sync_config = Some(config);
242        self
243    }
244
245    /// Sets the temporary directory for header map storage.
246    pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
247        self.header_map_tmp_dir = header_map_tmp_dir;
248        self
249    }
250
251    /// Sets the block assembler configuration for mining.
252    pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
253        self.block_assembler_config = config;
254        self
255    }
256
257    /// Sets the configuration for the fee estimator.
258    pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
259        self.fee_estimator_config = Some(config);
260        self
261    }
262
263    /// specifies the async_handle for the shared
264    pub fn async_handle(mut self, async_handle: Handle) -> Self {
265        self.async_handle = async_handle;
266        self
267    }
268
269    fn init_proposal_table(
270        store: &ChainDB,
271        consensus: &Consensus,
272    ) -> (ProposalTable, ProposalView) {
273        let proposal_window = consensus.tx_proposal_window();
274        let tip_number = store.get_tip_header().expect("store inited").number();
275        let mut proposal_ids = ProposalTable::new(proposal_window);
276        let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
277        for bn in proposal_start..=tip_number {
278            if let Some(hash) = store.get_block_hash(bn) {
279                let mut ids_set = HashSet::new();
280                if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
281                    ids_set.extend(ids)
282                }
283
284                if let Some(us) = store.get_block_uncles(&hash) {
285                    for u in us.data().into_iter() {
286                        ids_set.extend(u.proposals().into_iter());
287                    }
288                }
289                proposal_ids.insert(bn, ids_set);
290            }
291        }
292        let dummy_proposals = ProposalView::default();
293        let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
294        (proposal_ids, proposals)
295    }
296
297    fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
298        match store
299            .get_tip_header()
300            .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
301        {
302            Some((tip_header, epoch)) => {
303                if let Some(genesis_hash) = store.get_block_hash(0) {
304                    let expect_genesis_hash = consensus.genesis_hash();
305                    if genesis_hash == expect_genesis_hash {
306                        Ok((tip_header, epoch))
307                    } else {
308                        Err(SpecError::GenesisMismatch {
309                            expected: expect_genesis_hash,
310                            actual: genesis_hash,
311                        }
312                        .into())
313                    }
314                } else {
315                    Err(InternalErrorKind::Database
316                        .other("genesis does not exist in database")
317                        .into())
318                }
319            }
320            None => store.init(consensus).map(|_| {
321                (
322                    consensus.genesis_block().header(),
323                    consensus.genesis_epoch_ext().to_owned(),
324                )
325            }),
326        }
327    }
328
329    fn init_snapshot(
330        store: &ChainDB,
331        consensus: Arc<Consensus>,
332    ) -> Result<(Snapshot, ProposalTable), Error> {
333        let (tip_header, epoch) = Self::init_store(store, &consensus)?;
334        let total_difficulty = store
335            .get_block_ext(&tip_header.hash())
336            .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
337            .total_difficulty;
338        let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
339
340        let snapshot = Snapshot::new(
341            tip_header,
342            total_difficulty,
343            epoch,
344            store.get_snapshot(),
345            proposal_view,
346            consensus,
347        );
348
349        Ok((snapshot, proposal_table))
350    }
351
352    /// Builds the shared state and related components.
353    ///
354    /// Returns the shared state and a package containing the transaction pool and block assembler.
355    pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
356        let SharedBuilder {
357            db,
358            ancient_path,
359            consensus,
360            tx_pool_config,
361            store_config,
362            sync_config,
363            block_assembler_config,
364            notify_config,
365            async_handle,
366            fee_estimator_config,
367            header_map_tmp_dir,
368        } = self;
369
370        let tx_pool_config = tx_pool_config.unwrap_or_default();
371        let notify_config = notify_config.unwrap_or_default();
372        let store_config = store_config.unwrap_or_default();
373        let sync_config = sync_config.unwrap_or_default();
374        let consensus = Arc::new(consensus);
375
376        let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
377
378        let ibd_finished = Arc::new(AtomicBool::new(false));
379
380        let header_map = Arc::new(HeaderMap::new(
381            header_map_tmp_dir,
382            header_map_memory_limit,
383            &async_handle,
384            Arc::clone(&ibd_finished),
385        ));
386
387        let notify_controller = start_notify_service(notify_config, async_handle.clone());
388
389        let store = build_store(db, store_config, ancient_path).map_err(|e| {
390            eprintln!("build_store {e}");
391            ExitCode::Failure
392        })?;
393
394        let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
395
396        let (snapshot, table) =
397            Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
398                eprintln!("init_snapshot {e}");
399                ExitCode::Failure
400            })?;
401        let snapshot = Arc::new(snapshot);
402        let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
403
404        let (sender, receiver) = ckb_channel::unbounded();
405
406        let fee_estimator_algo = fee_estimator_config
407            .map(|config| config.algorithm)
408            .unwrap_or(None);
409        let fee_estimator = match fee_estimator_algo {
410            Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
411            Some(FeeEstimatorAlgo::ConfirmationFraction) => {
412                FeeEstimator::new_confirmation_fraction()
413            }
414            None => FeeEstimator::new_dummy(),
415        };
416
417        let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
418            tx_pool_config,
419            Arc::clone(&snapshot),
420            block_assembler_config,
421            Arc::clone(&txs_verify_cache),
422            &async_handle,
423            sender,
424            fee_estimator.clone(),
425        );
426
427        register_tx_pool_callback(
428            &mut tx_pool_builder,
429            notify_controller.clone(),
430            fee_estimator,
431        );
432
433        let block_status_map = Arc::new(DashMap::new());
434
435        let assume_valid_targets = Arc::new(Mutex::new({
436            let not_exists_targets: Option<Vec<H256>> =
437                sync_config.assume_valid_targets.clone().map(|targets| {
438                    targets
439                        .iter()
440                        .filter(|&target_hash| {
441                            let exists = snapshot.block_exists(&target_hash.into());
442                            if exists {
443                                info!("assume-valid target 0x{} exists in local db", target_hash);
444                            }
445                            !exists
446                        })
447                        .cloned()
448                        .collect::<Vec<H256>>()
449                });
450
451            if not_exists_targets
452                .as_ref()
453                .is_some_and(|targets| targets.is_empty())
454            {
455                info!("all assume-valid targets synchronized, enter full verification mode");
456                None
457            } else {
458                not_exists_targets
459            }
460        }));
461
462        let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
463            sync_config
464                .assume_valid_targets
465                .and_then(|targets| targets.last().cloned()),
466        );
467
468        let shared = Shared::new(
469            store,
470            tx_pool_controller,
471            notify_controller,
472            txs_verify_cache,
473            consensus,
474            snapshot_mgr,
475            async_handle,
476            ibd_finished,
477            assume_valid_targets,
478            assume_valid_target_specified,
479            header_map,
480            block_status_map,
481        );
482
483        let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
484
485        let pack = SharedPackage {
486            chain_services_builder: Some(chain_services_builder),
487            tx_pool_builder: Some(tx_pool_builder),
488            relay_tx_receiver: Some(receiver),
489        };
490
491        Ok((shared, pack))
492    }
493}
494
495/// SharedBuilder build returning the shared/package halves
496/// The package structs used for init other component
497pub struct SharedPackage {
498    chain_services_builder: Option<ChainServicesBuilder>,
499    tx_pool_builder: Option<TxPoolServiceBuilder>,
500    relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
501}
502
503impl SharedPackage {
504    /// Takes the chain_services_builder out of the package, leaving a None in its place.
505    pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
506        self.chain_services_builder
507            .take()
508            .expect("take chain_services_builder")
509    }
510
511    /// Takes the tx_pool_builder out of the package, leaving a None in its place.
512    pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
513        self.tx_pool_builder.take().expect("take tx_pool_builder")
514    }
515
516    /// Takes the relay_tx_receiver out of the package, leaving a None in its place.
517    pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
518        self.relay_tx_receiver
519            .take()
520            .expect("take relay_tx_receiver")
521    }
522}
523
524fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
525    NotifyService::new(notify_config, handle).start()
526}
527
528fn build_store(
529    db: RocksDB,
530    store_config: StoreConfig,
531    ancient_path: Option<PathBuf>,
532) -> Result<ChainDB, Error> {
533    let store = if store_config.freezer_enable && ancient_path.is_some() {
534        let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
535        ChainDB::new_with_freezer(db, freezer, store_config)
536    } else {
537        ChainDB::new(db, store_config)
538    };
539    Ok(store)
540}
541
542fn register_tx_pool_callback(
543    tx_pool_builder: &mut TxPoolServiceBuilder,
544    notify: NotifyController,
545    fee_estimator: FeeEstimator,
546) {
547    let notify_pending = notify.clone();
548
549    let tx_relay_sender = tx_pool_builder.tx_relay_sender();
550    let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
551        transaction: entry.rtx.transaction.clone(),
552        cycles: entry.cycles,
553        size: entry.size,
554        fee: entry.fee,
555        timestamp: entry.timestamp,
556    };
557
558    let fee_estimator_clone = fee_estimator.clone();
559    tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
560        // notify
561        let notify_tx_entry = create_notify_entry(entry);
562        notify_pending.notify_new_transaction(notify_tx_entry);
563        let tx_hash = entry.transaction().hash();
564        let entry_info = entry.to_info();
565        fee_estimator_clone.accept_tx(tx_hash, entry_info);
566    }));
567
568    let notify_proposed = notify.clone();
569    tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
570        // notify
571        let notify_tx_entry = create_notify_entry(entry);
572        notify_proposed.notify_proposed_transaction(notify_tx_entry);
573    }));
574
575    let notify_reject = notify;
576    tx_pool_builder.register_reject(Box::new(
577        move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
578            let tx_hash = entry.transaction().hash();
579            // record recent reject
580            if reject.should_recorded() {
581                if let Some(ref mut recent_reject) = tx_pool.recent_reject {
582                    if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
583                        error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
584                    }
585                }
586            }
587
588            if reject.is_allowed_relay() {
589                if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
590                    tx_hash: tx_hash.clone(),
591                }) {
592                    error!("tx-pool tx_relay_sender internal error {}", e);
593                }
594            }
595
596            // notify
597            let notify_tx_entry = create_notify_entry(entry);
598            notify_reject.notify_reject_transaction(notify_tx_entry, reject);
599
600            // fee estimator
601            fee_estimator.reject_tx(&tx_hash);
602        },
603    ));
604}