ckb_shared/
shared_builder.rs

1//! shared_builder provide SharedBuilder and SharedPacakge
2use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5    BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6    StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24    TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::prelude::Pack;
29use ckb_types::{
30    core::EpochExt, core::HeaderView, core::service::PoolTransactionEntry, core::tx_pool::Reject,
31};
32use ckb_util::Mutex;
33use ckb_verification::cache::init_cache;
34use dashmap::DashMap;
35use std::cmp::Ordering;
36use std::collections::HashSet;
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::AtomicBool;
40use tempfile::TempDir;
41
42/// Shared builder for construct new shared.
43pub struct SharedBuilder {
44    db: RocksDB,
45    ancient_path: Option<PathBuf>,
46    consensus: Consensus,
47    tx_pool_config: Option<TxPoolConfig>,
48    store_config: Option<StoreConfig>,
49    sync_config: Option<SyncConfig>,
50    block_assembler_config: Option<BlockAssemblerConfig>,
51    notify_config: Option<NotifyConfig>,
52    async_handle: Handle,
53    fee_estimator_config: Option<FeeEstimatorConfig>,
54
55    header_map_tmp_dir: Option<PathBuf>,
56}
57
58/// Open or create a rocksdb
59pub fn open_or_create_db(
60    bin_name: &str,
61    root_dir: &Path,
62    config: &DBConfig,
63    hardforks: HardForks,
64) -> Result<RocksDB, ExitCode> {
65    let migrate = Migrate::new(&config.path, hardforks);
66
67    let read_only_db = migrate.open_read_only_db().map_err(|e| {
68        eprintln!("Migration error {e}");
69        ExitCode::Failure
70    })?;
71
72    if let Some(db) = read_only_db {
73        match migrate.check(&db, true) {
74            Ordering::Greater => {
75                eprintln!(
76                    "The database was created by a higher version CKB executable binary \n\
77                     and cannot be opened by the current binary.\n\
78                     Please download the latest CKB executable binary."
79                );
80                Err(ExitCode::Failure)
81            }
82            Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
83            Ordering::Less => {
84                let can_run_in_background = migrate.can_run_in_background(&db);
85                if migrate.require_expensive(&db, false) && !can_run_in_background {
86                    eprintln!(
87                        "For optimal performance, CKB recommends migrating your data into a new format.\n\
88                        If you prefer to stick with the older version, \n\
89                        it's important to note that they may have unfixed vulnerabilities.\n\
90                        Before migrating, we strongly recommend backuping your data directory.\n\
91                        To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
92                        bin_name,
93                        root_dir.display()
94                    );
95                    Err(ExitCode::Failure)
96                } else if can_run_in_background {
97                    info!("process migrations in background ...");
98                    let db = RocksDB::open(config, COLUMNS);
99                    migrate.migrate(db.clone(), true).map_err(|err| {
100                        eprintln!("Run error: {err:?}");
101                        ExitCode::Failure
102                    })?;
103                    Ok(db)
104                } else {
105                    info!("Processing fast migrations ...");
106
107                    let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
108                        eprintln!("Migration error {e}");
109                        ExitCode::Failure
110                    })?;
111
112                    if let Some(db) = bulk_load_db_db {
113                        migrate.migrate(db, false).map_err(|err| {
114                            eprintln!("Run error: {err:?}");
115                            ExitCode::Failure
116                        })?;
117                    }
118
119                    Ok(RocksDB::open(config, COLUMNS))
120                }
121            }
122        }
123    } else {
124        let db = RocksDB::open(config, COLUMNS);
125        migrate.init_db_version(&db).map_err(|e| {
126            eprintln!("Migrate init_db_version error {e}");
127            ExitCode::Failure
128        })?;
129        Ok(db)
130    }
131}
132
133impl SharedBuilder {
134    /// Generates the base SharedBuilder with ancient path and async_handle
135    pub fn new(
136        bin_name: &str,
137        root_dir: &Path,
138        db_config: &DBConfig,
139        ancient: Option<PathBuf>,
140        async_handle: Handle,
141        consensus: Consensus,
142    ) -> Result<SharedBuilder, ExitCode> {
143        let db = open_or_create_db(
144            bin_name,
145            root_dir,
146            db_config,
147            consensus.hardfork_switch.clone(),
148        )?;
149
150        Ok(SharedBuilder {
151            db,
152            ancient_path: ancient,
153            consensus,
154            tx_pool_config: None,
155            notify_config: None,
156            store_config: None,
157            sync_config: None,
158            block_assembler_config: None,
159            async_handle,
160            fee_estimator_config: None,
161            header_map_tmp_dir: None,
162        })
163    }
164
165    /// Generates the SharedBuilder with temp db
166    /// NOTICE: this is only used in testing
167    pub fn with_temp_db() -> Self {
168        use std::{
169            borrow::Borrow,
170            sync::atomic::{AtomicUsize, Ordering},
171        };
172
173        // once #[thread_local] is stable
174        // #[thread_local]
175        // static RUNTIME_HANDLE: unsync::OnceCell<...
176        thread_local! {
177            // NOTICE:we can't put the runtime directly into thread_local here,
178            // on windows the runtime in thread_local will get stuck when dropping
179            static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
180        }
181
182        static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
183        static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
184
185        let db = {
186            let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
187            let db_base_dir = TMP_DIR
188                .borrow()
189                .get_or_init(|| TempDir::new().unwrap())
190                .path()
191                .to_path_buf();
192            let db_dir = db_base_dir.join(format!("db_{db_id}"));
193            RocksDB::open_in(db_dir, COLUMNS)
194        };
195
196        RUNTIME_HANDLE.with(|runtime| SharedBuilder {
197            db,
198            ancient_path: None,
199            consensus: Consensus::default(),
200            tx_pool_config: None,
201            notify_config: None,
202            store_config: None,
203            sync_config: None,
204            block_assembler_config: None,
205            async_handle: runtime.get_or_init(new_background_runtime).clone(),
206            fee_estimator_config: None,
207
208            header_map_tmp_dir: None,
209        })
210    }
211}
212
213impl SharedBuilder {
214    /// TODO(doc): @quake
215    pub fn consensus(mut self, value: Consensus) -> Self {
216        self.consensus = value;
217        self
218    }
219
220    /// TODO(doc): @quake
221    pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
222        self.tx_pool_config = Some(config);
223        self
224    }
225
226    /// TODO(doc): @quake
227    pub fn notify_config(mut self, config: NotifyConfig) -> Self {
228        self.notify_config = Some(config);
229        self
230    }
231
232    /// TODO(doc): @quake
233    pub fn store_config(mut self, config: StoreConfig) -> Self {
234        self.store_config = Some(config);
235        self
236    }
237
238    /// TODO(doc): @eval-exec
239    pub fn sync_config(mut self, config: SyncConfig) -> Self {
240        self.sync_config = Some(config);
241        self
242    }
243
244    /// TODO(doc): @eval-exec
245    pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
246        self.header_map_tmp_dir = header_map_tmp_dir;
247        self
248    }
249
250    /// TODO(doc): @quake
251    pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
252        self.block_assembler_config = config;
253        self
254    }
255
256    /// Sets the configuration for the fee estimator.
257    pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
258        self.fee_estimator_config = Some(config);
259        self
260    }
261
262    /// specifies the async_handle for the shared
263    pub fn async_handle(mut self, async_handle: Handle) -> Self {
264        self.async_handle = async_handle;
265        self
266    }
267
268    fn init_proposal_table(
269        store: &ChainDB,
270        consensus: &Consensus,
271    ) -> (ProposalTable, ProposalView) {
272        let proposal_window = consensus.tx_proposal_window();
273        let tip_number = store.get_tip_header().expect("store inited").number();
274        let mut proposal_ids = ProposalTable::new(proposal_window);
275        let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
276        for bn in proposal_start..=tip_number {
277            if let Some(hash) = store.get_block_hash(bn) {
278                let mut ids_set = HashSet::new();
279                if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
280                    ids_set.extend(ids)
281                }
282
283                if let Some(us) = store.get_block_uncles(&hash) {
284                    for u in us.data().into_iter() {
285                        ids_set.extend(u.proposals().into_iter());
286                    }
287                }
288                proposal_ids.insert(bn, ids_set);
289            }
290        }
291        let dummy_proposals = ProposalView::default();
292        let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
293        (proposal_ids, proposals)
294    }
295
296    fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
297        match store
298            .get_tip_header()
299            .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
300        {
301            Some((tip_header, epoch)) => {
302                if let Some(genesis_hash) = store.get_block_hash(0) {
303                    let expect_genesis_hash = consensus.genesis_hash();
304                    if genesis_hash == expect_genesis_hash {
305                        Ok((tip_header, epoch))
306                    } else {
307                        Err(SpecError::GenesisMismatch {
308                            expected: expect_genesis_hash,
309                            actual: genesis_hash,
310                        }
311                        .into())
312                    }
313                } else {
314                    Err(InternalErrorKind::Database
315                        .other("genesis does not exist in database")
316                        .into())
317                }
318            }
319            None => store.init(consensus).map(|_| {
320                (
321                    consensus.genesis_block().header(),
322                    consensus.genesis_epoch_ext().to_owned(),
323                )
324            }),
325        }
326    }
327
328    fn init_snapshot(
329        store: &ChainDB,
330        consensus: Arc<Consensus>,
331    ) -> Result<(Snapshot, ProposalTable), Error> {
332        let (tip_header, epoch) = Self::init_store(store, &consensus)?;
333        let total_difficulty = store
334            .get_block_ext(&tip_header.hash())
335            .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
336            .total_difficulty;
337        let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
338
339        let snapshot = Snapshot::new(
340            tip_header,
341            total_difficulty,
342            epoch,
343            store.get_snapshot(),
344            proposal_view,
345            consensus,
346        );
347
348        Ok((snapshot, proposal_table))
349    }
350
351    /// TODO(doc): @quake
352    pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
353        let SharedBuilder {
354            db,
355            ancient_path,
356            consensus,
357            tx_pool_config,
358            store_config,
359            sync_config,
360            block_assembler_config,
361            notify_config,
362            async_handle,
363            fee_estimator_config,
364            header_map_tmp_dir,
365        } = self;
366
367        let tx_pool_config = tx_pool_config.unwrap_or_default();
368        let notify_config = notify_config.unwrap_or_default();
369        let store_config = store_config.unwrap_or_default();
370        let sync_config = sync_config.unwrap_or_default();
371        let consensus = Arc::new(consensus);
372
373        let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
374
375        let ibd_finished = Arc::new(AtomicBool::new(false));
376
377        let header_map = Arc::new(HeaderMap::new(
378            header_map_tmp_dir,
379            header_map_memory_limit,
380            &async_handle,
381            Arc::clone(&ibd_finished),
382        ));
383
384        let notify_controller = start_notify_service(notify_config, async_handle.clone());
385
386        let store = build_store(db, store_config, ancient_path).map_err(|e| {
387            eprintln!("build_store {e}");
388            ExitCode::Failure
389        })?;
390
391        let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
392
393        let (snapshot, table) =
394            Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
395                eprintln!("init_snapshot {e}");
396                ExitCode::Failure
397            })?;
398        let snapshot = Arc::new(snapshot);
399        let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
400
401        let (sender, receiver) = ckb_channel::unbounded();
402
403        let fee_estimator_algo = fee_estimator_config
404            .map(|config| config.algorithm)
405            .unwrap_or(None);
406        let fee_estimator = match fee_estimator_algo {
407            Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
408            Some(FeeEstimatorAlgo::ConfirmationFraction) => {
409                FeeEstimator::new_confirmation_fraction()
410            }
411            None => FeeEstimator::new_dummy(),
412        };
413
414        let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
415            tx_pool_config,
416            Arc::clone(&snapshot),
417            block_assembler_config,
418            Arc::clone(&txs_verify_cache),
419            &async_handle,
420            sender,
421            fee_estimator.clone(),
422        );
423
424        register_tx_pool_callback(
425            &mut tx_pool_builder,
426            notify_controller.clone(),
427            fee_estimator,
428        );
429
430        let block_status_map = Arc::new(DashMap::new());
431
432        let assume_valid_targets = Arc::new(Mutex::new({
433            let not_exists_targets: Option<Vec<H256>> =
434                sync_config.assume_valid_targets.clone().map(|targets| {
435                    targets
436                        .iter()
437                        .filter(|&target_hash| {
438                            let exists = snapshot.block_exists(&target_hash.pack());
439                            if exists {
440                                info!("assume-valid target 0x{} exists in local db", target_hash);
441                            }
442                            !exists
443                        })
444                        .cloned()
445                        .collect::<Vec<H256>>()
446                });
447
448            if not_exists_targets
449                .as_ref()
450                .is_some_and(|targets| targets.is_empty())
451            {
452                info!("all assume-valid targets synchronized, enter full verification mode");
453                None
454            } else {
455                not_exists_targets
456            }
457        }));
458
459        let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
460            sync_config
461                .assume_valid_targets
462                .and_then(|targets| targets.last().cloned()),
463        );
464
465        let shared = Shared::new(
466            store,
467            tx_pool_controller,
468            notify_controller,
469            txs_verify_cache,
470            consensus,
471            snapshot_mgr,
472            async_handle,
473            ibd_finished,
474            assume_valid_targets,
475            assume_valid_target_specified,
476            header_map,
477            block_status_map,
478        );
479
480        let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
481
482        let pack = SharedPackage {
483            chain_services_builder: Some(chain_services_builder),
484            tx_pool_builder: Some(tx_pool_builder),
485            relay_tx_receiver: Some(receiver),
486        };
487
488        Ok((shared, pack))
489    }
490}
491
492/// SharedBuilder build returning the shared/package halves
493/// The package structs used for init other component
494pub struct SharedPackage {
495    chain_services_builder: Option<ChainServicesBuilder>,
496    tx_pool_builder: Option<TxPoolServiceBuilder>,
497    relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
498}
499
500impl SharedPackage {
501    /// Takes the chain_services_builder out of the package, leaving a None in its place.
502    pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
503        self.chain_services_builder
504            .take()
505            .expect("take chain_services_builder")
506    }
507
508    /// Takes the tx_pool_builder out of the package, leaving a None in its place.
509    pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
510        self.tx_pool_builder.take().expect("take tx_pool_builder")
511    }
512
513    /// Takes the relay_tx_receiver out of the package, leaving a None in its place.
514    pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
515        self.relay_tx_receiver
516            .take()
517            .expect("take relay_tx_receiver")
518    }
519}
520
521fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
522    NotifyService::new(notify_config, handle).start()
523}
524
525fn build_store(
526    db: RocksDB,
527    store_config: StoreConfig,
528    ancient_path: Option<PathBuf>,
529) -> Result<ChainDB, Error> {
530    let store = if store_config.freezer_enable && ancient_path.is_some() {
531        let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
532        ChainDB::new_with_freezer(db, freezer, store_config)
533    } else {
534        ChainDB::new(db, store_config)
535    };
536    Ok(store)
537}
538
539fn register_tx_pool_callback(
540    tx_pool_builder: &mut TxPoolServiceBuilder,
541    notify: NotifyController,
542    fee_estimator: FeeEstimator,
543) {
544    let notify_pending = notify.clone();
545
546    let tx_relay_sender = tx_pool_builder.tx_relay_sender();
547    let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
548        transaction: entry.rtx.transaction.clone(),
549        cycles: entry.cycles,
550        size: entry.size,
551        fee: entry.fee,
552        timestamp: entry.timestamp,
553    };
554
555    let fee_estimator_clone = fee_estimator.clone();
556    tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
557        // notify
558        let notify_tx_entry = create_notify_entry(entry);
559        notify_pending.notify_new_transaction(notify_tx_entry);
560        let tx_hash = entry.transaction().hash();
561        let entry_info = entry.to_info();
562        fee_estimator_clone.accept_tx(tx_hash, entry_info);
563    }));
564
565    let notify_proposed = notify.clone();
566    tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
567        // notify
568        let notify_tx_entry = create_notify_entry(entry);
569        notify_proposed.notify_proposed_transaction(notify_tx_entry);
570    }));
571
572    let notify_reject = notify;
573    tx_pool_builder.register_reject(Box::new(
574        move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
575            let tx_hash = entry.transaction().hash();
576            // record recent reject
577            if reject.should_recorded() {
578                if let Some(ref mut recent_reject) = tx_pool.recent_reject {
579                    if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
580                        error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
581                    }
582                }
583            }
584
585            if reject.is_allowed_relay() {
586                if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
587                    tx_hash: tx_hash.clone(),
588                }) {
589                    error!("tx-pool tx_relay_sender internal error {}", e);
590                }
591            }
592
593            // notify
594            let notify_tx_entry = create_notify_entry(entry);
595            notify_reject.notify_reject_transaction(notify_tx_entry, reject);
596
597            // fee estimator
598            fee_estimator.reject_tx(&tx_hash);
599        },
600    ));
601}