ckb_shared/
shared_builder.rs

1//! shared_builder provide SharedBuilder and SharedPacakge
2use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5    BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6    StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24    TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::{
29    core::EpochExt,
30    core::HeaderView,
31    core::tx_pool::{PoolTransactionEntry, Reject},
32};
33use ckb_util::Mutex;
34use ckb_verification::cache::init_cache;
35use dashmap::DashMap;
36use std::cmp::Ordering;
37use std::collections::HashSet;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::AtomicBool;
41use tempfile::TempDir;
42
43/// Shared builder for construct new shared.
44pub struct SharedBuilder {
45    db: RocksDB,
46    ancient_path: Option<PathBuf>,
47    consensus: Consensus,
48    tx_pool_config: Option<TxPoolConfig>,
49    store_config: Option<StoreConfig>,
50    sync_config: Option<SyncConfig>,
51    block_assembler_config: Option<BlockAssemblerConfig>,
52    notify_config: Option<NotifyConfig>,
53    async_handle: Handle,
54    fee_estimator_config: Option<FeeEstimatorConfig>,
55
56    header_map_tmp_dir: Option<PathBuf>,
57}
58
59/// Open or create a rocksdb
60pub fn open_or_create_db(
61    bin_name: &str,
62    root_dir: &Path,
63    config: &DBConfig,
64    hardforks: HardForks,
65) -> Result<RocksDB, ExitCode> {
66    let migrate = Migrate::new(&config.path, hardforks);
67
68    let read_only_db = migrate.open_read_only_db().map_err(|e| {
69        eprintln!("Migration error {e}");
70        ExitCode::Failure
71    })?;
72
73    if let Some(db) = read_only_db {
74        match migrate.check(&db, true) {
75            Ordering::Greater => {
76                eprintln!(
77                    "The database was created by a higher version CKB executable binary \n\
78                     and cannot be opened by the current binary.\n\
79                     Please download the latest CKB executable binary."
80                );
81                Err(ExitCode::Failure)
82            }
83            Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
84            Ordering::Less => {
85                let can_run_in_background = migrate.can_run_in_background(&db);
86                if migrate.require_expensive(&db, false) && !can_run_in_background {
87                    eprintln!(
88                        "For optimal performance, CKB recommends migrating your data into a new format.\n\
89                        If you prefer to stick with the older version, \n\
90                        it's important to note that they may have unfixed vulnerabilities.\n\
91                        Before migrating, we strongly recommend backuping your data directory.\n\
92                        To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
93                        bin_name,
94                        root_dir.display()
95                    );
96                    Err(ExitCode::Failure)
97                } else if can_run_in_background {
98                    info!("process migrations in background ...");
99                    let db = RocksDB::open(config, COLUMNS);
100                    migrate.migrate(db.clone(), true).map_err(|err| {
101                        eprintln!("Run error: {err:?}");
102                        ExitCode::Failure
103                    })?;
104                    Ok(db)
105                } else {
106                    info!("Processing fast migrations ...");
107
108                    let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
109                        eprintln!("Migration error {e}");
110                        ExitCode::Failure
111                    })?;
112
113                    if let Some(db) = bulk_load_db_db {
114                        migrate.migrate(db, false).map_err(|err| {
115                            eprintln!("Run error: {err:?}");
116                            ExitCode::Failure
117                        })?;
118                    }
119
120                    Ok(RocksDB::open(config, COLUMNS))
121                }
122            }
123        }
124    } else {
125        let db = RocksDB::open(config, COLUMNS);
126        migrate.init_db_version(&db).map_err(|e| {
127            eprintln!("Migrate init_db_version error {e}");
128            ExitCode::Failure
129        })?;
130        Ok(db)
131    }
132}
133
134impl SharedBuilder {
135    /// Generates the base SharedBuilder with ancient path and async_handle
136    pub fn new(
137        bin_name: &str,
138        root_dir: &Path,
139        db_config: &DBConfig,
140        ancient: Option<PathBuf>,
141        async_handle: Handle,
142        consensus: Consensus,
143    ) -> Result<SharedBuilder, ExitCode> {
144        let db = open_or_create_db(
145            bin_name,
146            root_dir,
147            db_config,
148            consensus.hardfork_switch.clone(),
149        )?;
150
151        Ok(SharedBuilder {
152            db,
153            ancient_path: ancient,
154            consensus,
155            tx_pool_config: None,
156            notify_config: None,
157            store_config: None,
158            sync_config: None,
159            block_assembler_config: None,
160            async_handle,
161            fee_estimator_config: None,
162            header_map_tmp_dir: None,
163        })
164    }
165
166    /// Generates the SharedBuilder with temp db
167    /// NOTICE: this is only used in testing
168    pub fn with_temp_db() -> Self {
169        use std::{
170            borrow::Borrow,
171            sync::atomic::{AtomicUsize, Ordering},
172        };
173
174        // once #[thread_local] is stable
175        // #[thread_local]
176        // static RUNTIME_HANDLE: unsync::OnceCell<...
177        thread_local! {
178            // NOTICE:we can't put the runtime directly into thread_local here,
179            // on windows the runtime in thread_local will get stuck when dropping
180            static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
181        }
182
183        static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
184        static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
185
186        let db = {
187            let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
188            let db_base_dir = TMP_DIR
189                .borrow()
190                .get_or_init(|| TempDir::new().unwrap())
191                .path()
192                .to_path_buf();
193            let db_dir = db_base_dir.join(format!("db_{db_id}"));
194            RocksDB::open_in(db_dir, COLUMNS)
195        };
196
197        RUNTIME_HANDLE.with(|runtime| SharedBuilder {
198            db,
199            ancient_path: None,
200            consensus: Consensus::default(),
201            tx_pool_config: None,
202            notify_config: None,
203            store_config: None,
204            sync_config: None,
205            block_assembler_config: None,
206            async_handle: runtime.get_or_init(new_background_runtime).clone(),
207            fee_estimator_config: None,
208
209            header_map_tmp_dir: None,
210        })
211    }
212}
213
214impl SharedBuilder {
215    /// TODO(doc): @quake
216    pub fn consensus(mut self, value: Consensus) -> Self {
217        self.consensus = value;
218        self
219    }
220
221    /// TODO(doc): @quake
222    pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
223        self.tx_pool_config = Some(config);
224        self
225    }
226
227    /// TODO(doc): @quake
228    pub fn notify_config(mut self, config: NotifyConfig) -> Self {
229        self.notify_config = Some(config);
230        self
231    }
232
233    /// TODO(doc): @quake
234    pub fn store_config(mut self, config: StoreConfig) -> Self {
235        self.store_config = Some(config);
236        self
237    }
238
239    /// TODO(doc): @eval-exec
240    pub fn sync_config(mut self, config: SyncConfig) -> Self {
241        self.sync_config = Some(config);
242        self
243    }
244
245    /// TODO(doc): @eval-exec
246    pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
247        self.header_map_tmp_dir = header_map_tmp_dir;
248        self
249    }
250
251    /// TODO(doc): @quake
252    pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
253        self.block_assembler_config = config;
254        self
255    }
256
257    /// Sets the configuration for the fee estimator.
258    pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
259        self.fee_estimator_config = Some(config);
260        self
261    }
262
263    /// specifies the async_handle for the shared
264    pub fn async_handle(mut self, async_handle: Handle) -> Self {
265        self.async_handle = async_handle;
266        self
267    }
268
269    fn init_proposal_table(
270        store: &ChainDB,
271        consensus: &Consensus,
272    ) -> (ProposalTable, ProposalView) {
273        let proposal_window = consensus.tx_proposal_window();
274        let tip_number = store.get_tip_header().expect("store inited").number();
275        let mut proposal_ids = ProposalTable::new(proposal_window);
276        let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
277        for bn in proposal_start..=tip_number {
278            if let Some(hash) = store.get_block_hash(bn) {
279                let mut ids_set = HashSet::new();
280                if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
281                    ids_set.extend(ids)
282                }
283
284                if let Some(us) = store.get_block_uncles(&hash) {
285                    for u in us.data().into_iter() {
286                        ids_set.extend(u.proposals().into_iter());
287                    }
288                }
289                proposal_ids.insert(bn, ids_set);
290            }
291        }
292        let dummy_proposals = ProposalView::default();
293        let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
294        (proposal_ids, proposals)
295    }
296
297    fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
298        match store
299            .get_tip_header()
300            .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
301        {
302            Some((tip_header, epoch)) => {
303                if let Some(genesis_hash) = store.get_block_hash(0) {
304                    let expect_genesis_hash = consensus.genesis_hash();
305                    if genesis_hash == expect_genesis_hash {
306                        Ok((tip_header, epoch))
307                    } else {
308                        Err(SpecError::GenesisMismatch {
309                            expected: expect_genesis_hash,
310                            actual: genesis_hash,
311                        }
312                        .into())
313                    }
314                } else {
315                    Err(InternalErrorKind::Database
316                        .other("genesis does not exist in database")
317                        .into())
318                }
319            }
320            None => store.init(consensus).map(|_| {
321                (
322                    consensus.genesis_block().header(),
323                    consensus.genesis_epoch_ext().to_owned(),
324                )
325            }),
326        }
327    }
328
329    fn init_snapshot(
330        store: &ChainDB,
331        consensus: Arc<Consensus>,
332    ) -> Result<(Snapshot, ProposalTable), Error> {
333        let (tip_header, epoch) = Self::init_store(store, &consensus)?;
334        let total_difficulty = store
335            .get_block_ext(&tip_header.hash())
336            .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
337            .total_difficulty;
338        let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
339
340        let snapshot = Snapshot::new(
341            tip_header,
342            total_difficulty,
343            epoch,
344            store.get_snapshot(),
345            proposal_view,
346            consensus,
347        );
348
349        Ok((snapshot, proposal_table))
350    }
351
352    /// TODO(doc): @quake
353    pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
354        let SharedBuilder {
355            db,
356            ancient_path,
357            consensus,
358            tx_pool_config,
359            store_config,
360            sync_config,
361            block_assembler_config,
362            notify_config,
363            async_handle,
364            fee_estimator_config,
365            header_map_tmp_dir,
366        } = self;
367
368        let tx_pool_config = tx_pool_config.unwrap_or_default();
369        let notify_config = notify_config.unwrap_or_default();
370        let store_config = store_config.unwrap_or_default();
371        let sync_config = sync_config.unwrap_or_default();
372        let consensus = Arc::new(consensus);
373
374        let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
375
376        let ibd_finished = Arc::new(AtomicBool::new(false));
377
378        let header_map = Arc::new(HeaderMap::new(
379            header_map_tmp_dir,
380            header_map_memory_limit,
381            &async_handle,
382            Arc::clone(&ibd_finished),
383        ));
384
385        let notify_controller = start_notify_service(notify_config, async_handle.clone());
386
387        let store = build_store(db, store_config, ancient_path).map_err(|e| {
388            eprintln!("build_store {e}");
389            ExitCode::Failure
390        })?;
391
392        let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
393
394        let (snapshot, table) =
395            Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
396                eprintln!("init_snapshot {e}");
397                ExitCode::Failure
398            })?;
399        let snapshot = Arc::new(snapshot);
400        let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
401
402        let (sender, receiver) = ckb_channel::unbounded();
403
404        let fee_estimator_algo = fee_estimator_config
405            .map(|config| config.algorithm)
406            .unwrap_or(None);
407        let fee_estimator = match fee_estimator_algo {
408            Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
409            Some(FeeEstimatorAlgo::ConfirmationFraction) => {
410                FeeEstimator::new_confirmation_fraction()
411            }
412            None => FeeEstimator::new_dummy(),
413        };
414
415        let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
416            tx_pool_config,
417            Arc::clone(&snapshot),
418            block_assembler_config,
419            Arc::clone(&txs_verify_cache),
420            &async_handle,
421            sender,
422            fee_estimator.clone(),
423        );
424
425        register_tx_pool_callback(
426            &mut tx_pool_builder,
427            notify_controller.clone(),
428            fee_estimator,
429        );
430
431        let block_status_map = Arc::new(DashMap::new());
432
433        let assume_valid_targets = Arc::new(Mutex::new({
434            let not_exists_targets: Option<Vec<H256>> =
435                sync_config.assume_valid_targets.clone().map(|targets| {
436                    targets
437                        .iter()
438                        .filter(|&target_hash| {
439                            let exists = snapshot.block_exists(&target_hash.into());
440                            if exists {
441                                info!("assume-valid target 0x{} exists in local db", target_hash);
442                            }
443                            !exists
444                        })
445                        .cloned()
446                        .collect::<Vec<H256>>()
447                });
448
449            if not_exists_targets
450                .as_ref()
451                .is_some_and(|targets| targets.is_empty())
452            {
453                info!("all assume-valid targets synchronized, enter full verification mode");
454                None
455            } else {
456                not_exists_targets
457            }
458        }));
459
460        let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
461            sync_config
462                .assume_valid_targets
463                .and_then(|targets| targets.last().cloned()),
464        );
465
466        let shared = Shared::new(
467            store,
468            tx_pool_controller,
469            notify_controller,
470            txs_verify_cache,
471            consensus,
472            snapshot_mgr,
473            async_handle,
474            ibd_finished,
475            assume_valid_targets,
476            assume_valid_target_specified,
477            header_map,
478            block_status_map,
479        );
480
481        let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
482
483        let pack = SharedPackage {
484            chain_services_builder: Some(chain_services_builder),
485            tx_pool_builder: Some(tx_pool_builder),
486            relay_tx_receiver: Some(receiver),
487        };
488
489        Ok((shared, pack))
490    }
491}
492
493/// SharedBuilder build returning the shared/package halves
494/// The package structs used for init other component
495pub struct SharedPackage {
496    chain_services_builder: Option<ChainServicesBuilder>,
497    tx_pool_builder: Option<TxPoolServiceBuilder>,
498    relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
499}
500
501impl SharedPackage {
502    /// Takes the chain_services_builder out of the package, leaving a None in its place.
503    pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
504        self.chain_services_builder
505            .take()
506            .expect("take chain_services_builder")
507    }
508
509    /// Takes the tx_pool_builder out of the package, leaving a None in its place.
510    pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
511        self.tx_pool_builder.take().expect("take tx_pool_builder")
512    }
513
514    /// Takes the relay_tx_receiver out of the package, leaving a None in its place.
515    pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
516        self.relay_tx_receiver
517            .take()
518            .expect("take relay_tx_receiver")
519    }
520}
521
522fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
523    NotifyService::new(notify_config, handle).start()
524}
525
526fn build_store(
527    db: RocksDB,
528    store_config: StoreConfig,
529    ancient_path: Option<PathBuf>,
530) -> Result<ChainDB, Error> {
531    let store = if store_config.freezer_enable && ancient_path.is_some() {
532        let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
533        ChainDB::new_with_freezer(db, freezer, store_config)
534    } else {
535        ChainDB::new(db, store_config)
536    };
537    Ok(store)
538}
539
540fn register_tx_pool_callback(
541    tx_pool_builder: &mut TxPoolServiceBuilder,
542    notify: NotifyController,
543    fee_estimator: FeeEstimator,
544) {
545    let notify_pending = notify.clone();
546
547    let tx_relay_sender = tx_pool_builder.tx_relay_sender();
548    let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
549        transaction: entry.rtx.transaction.clone(),
550        cycles: entry.cycles,
551        size: entry.size,
552        fee: entry.fee,
553        timestamp: entry.timestamp,
554    };
555
556    let fee_estimator_clone = fee_estimator.clone();
557    tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
558        // notify
559        let notify_tx_entry = create_notify_entry(entry);
560        notify_pending.notify_new_transaction(notify_tx_entry);
561        let tx_hash = entry.transaction().hash();
562        let entry_info = entry.to_info();
563        fee_estimator_clone.accept_tx(tx_hash, entry_info);
564    }));
565
566    let notify_proposed = notify.clone();
567    tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
568        // notify
569        let notify_tx_entry = create_notify_entry(entry);
570        notify_proposed.notify_proposed_transaction(notify_tx_entry);
571    }));
572
573    let notify_reject = notify;
574    tx_pool_builder.register_reject(Box::new(
575        move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
576            let tx_hash = entry.transaction().hash();
577            // record recent reject
578            if reject.should_recorded() {
579                if let Some(ref mut recent_reject) = tx_pool.recent_reject {
580                    if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
581                        error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
582                    }
583                }
584            }
585
586            if reject.is_allowed_relay() {
587                if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
588                    tx_hash: tx_hash.clone(),
589                }) {
590                    error!("tx-pool tx_relay_sender internal error {}", e);
591                }
592            }
593
594            // notify
595            let notify_tx_entry = create_notify_entry(entry);
596            notify_reject.notify_reject_transaction(notify_tx_entry, reject);
597
598            // fee estimator
599            fee_estimator.reject_tx(&tx_hash);
600        },
601    ));
602}