1use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5 BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6 StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24 TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::prelude::Pack;
29use ckb_types::{
30 core::EpochExt, core::HeaderView, core::service::PoolTransactionEntry, core::tx_pool::Reject,
31};
32use ckb_util::Mutex;
33use ckb_verification::cache::init_cache;
34use dashmap::DashMap;
35use std::cmp::Ordering;
36use std::collections::HashSet;
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::sync::atomic::AtomicBool;
40use tempfile::TempDir;
41
42pub struct SharedBuilder {
44 db: RocksDB,
45 ancient_path: Option<PathBuf>,
46 consensus: Consensus,
47 tx_pool_config: Option<TxPoolConfig>,
48 store_config: Option<StoreConfig>,
49 sync_config: Option<SyncConfig>,
50 block_assembler_config: Option<BlockAssemblerConfig>,
51 notify_config: Option<NotifyConfig>,
52 async_handle: Handle,
53 fee_estimator_config: Option<FeeEstimatorConfig>,
54
55 header_map_tmp_dir: Option<PathBuf>,
56}
57
58pub fn open_or_create_db(
60 bin_name: &str,
61 root_dir: &Path,
62 config: &DBConfig,
63 hardforks: HardForks,
64) -> Result<RocksDB, ExitCode> {
65 let migrate = Migrate::new(&config.path, hardforks);
66
67 let read_only_db = migrate.open_read_only_db().map_err(|e| {
68 eprintln!("Migration error {e}");
69 ExitCode::Failure
70 })?;
71
72 if let Some(db) = read_only_db {
73 match migrate.check(&db, true) {
74 Ordering::Greater => {
75 eprintln!(
76 "The database was created by a higher version CKB executable binary \n\
77 and cannot be opened by the current binary.\n\
78 Please download the latest CKB executable binary."
79 );
80 Err(ExitCode::Failure)
81 }
82 Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
83 Ordering::Less => {
84 let can_run_in_background = migrate.can_run_in_background(&db);
85 if migrate.require_expensive(&db, false) && !can_run_in_background {
86 eprintln!(
87 "For optimal performance, CKB recommends migrating your data into a new format.\n\
88 If you prefer to stick with the older version, \n\
89 it's important to note that they may have unfixed vulnerabilities.\n\
90 Before migrating, we strongly recommend backuping your data directory.\n\
91 To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
92 bin_name,
93 root_dir.display()
94 );
95 Err(ExitCode::Failure)
96 } else if can_run_in_background {
97 info!("process migrations in background ...");
98 let db = RocksDB::open(config, COLUMNS);
99 migrate.migrate(db.clone(), true).map_err(|err| {
100 eprintln!("Run error: {err:?}");
101 ExitCode::Failure
102 })?;
103 Ok(db)
104 } else {
105 info!("Processing fast migrations ...");
106
107 let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
108 eprintln!("Migration error {e}");
109 ExitCode::Failure
110 })?;
111
112 if let Some(db) = bulk_load_db_db {
113 migrate.migrate(db, false).map_err(|err| {
114 eprintln!("Run error: {err:?}");
115 ExitCode::Failure
116 })?;
117 }
118
119 Ok(RocksDB::open(config, COLUMNS))
120 }
121 }
122 }
123 } else {
124 let db = RocksDB::open(config, COLUMNS);
125 migrate.init_db_version(&db).map_err(|e| {
126 eprintln!("Migrate init_db_version error {e}");
127 ExitCode::Failure
128 })?;
129 Ok(db)
130 }
131}
132
133impl SharedBuilder {
134 pub fn new(
136 bin_name: &str,
137 root_dir: &Path,
138 db_config: &DBConfig,
139 ancient: Option<PathBuf>,
140 async_handle: Handle,
141 consensus: Consensus,
142 ) -> Result<SharedBuilder, ExitCode> {
143 let db = open_or_create_db(
144 bin_name,
145 root_dir,
146 db_config,
147 consensus.hardfork_switch.clone(),
148 )?;
149
150 Ok(SharedBuilder {
151 db,
152 ancient_path: ancient,
153 consensus,
154 tx_pool_config: None,
155 notify_config: None,
156 store_config: None,
157 sync_config: None,
158 block_assembler_config: None,
159 async_handle,
160 fee_estimator_config: None,
161 header_map_tmp_dir: None,
162 })
163 }
164
165 pub fn with_temp_db() -> Self {
168 use std::{
169 borrow::Borrow,
170 sync::atomic::{AtomicUsize, Ordering},
171 };
172
173 thread_local! {
177 static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
180 }
181
182 static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
183 static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
184
185 let db = {
186 let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
187 let db_base_dir = TMP_DIR
188 .borrow()
189 .get_or_init(|| TempDir::new().unwrap())
190 .path()
191 .to_path_buf();
192 let db_dir = db_base_dir.join(format!("db_{db_id}"));
193 RocksDB::open_in(db_dir, COLUMNS)
194 };
195
196 RUNTIME_HANDLE.with(|runtime| SharedBuilder {
197 db,
198 ancient_path: None,
199 consensus: Consensus::default(),
200 tx_pool_config: None,
201 notify_config: None,
202 store_config: None,
203 sync_config: None,
204 block_assembler_config: None,
205 async_handle: runtime.get_or_init(new_background_runtime).clone(),
206 fee_estimator_config: None,
207
208 header_map_tmp_dir: None,
209 })
210 }
211}
212
213impl SharedBuilder {
214 pub fn consensus(mut self, value: Consensus) -> Self {
216 self.consensus = value;
217 self
218 }
219
220 pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
222 self.tx_pool_config = Some(config);
223 self
224 }
225
226 pub fn notify_config(mut self, config: NotifyConfig) -> Self {
228 self.notify_config = Some(config);
229 self
230 }
231
232 pub fn store_config(mut self, config: StoreConfig) -> Self {
234 self.store_config = Some(config);
235 self
236 }
237
238 pub fn sync_config(mut self, config: SyncConfig) -> Self {
240 self.sync_config = Some(config);
241 self
242 }
243
244 pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
246 self.header_map_tmp_dir = header_map_tmp_dir;
247 self
248 }
249
250 pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
252 self.block_assembler_config = config;
253 self
254 }
255
256 pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
258 self.fee_estimator_config = Some(config);
259 self
260 }
261
262 pub fn async_handle(mut self, async_handle: Handle) -> Self {
264 self.async_handle = async_handle;
265 self
266 }
267
268 fn init_proposal_table(
269 store: &ChainDB,
270 consensus: &Consensus,
271 ) -> (ProposalTable, ProposalView) {
272 let proposal_window = consensus.tx_proposal_window();
273 let tip_number = store.get_tip_header().expect("store inited").number();
274 let mut proposal_ids = ProposalTable::new(proposal_window);
275 let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
276 for bn in proposal_start..=tip_number {
277 if let Some(hash) = store.get_block_hash(bn) {
278 let mut ids_set = HashSet::new();
279 if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
280 ids_set.extend(ids)
281 }
282
283 if let Some(us) = store.get_block_uncles(&hash) {
284 for u in us.data().into_iter() {
285 ids_set.extend(u.proposals().into_iter());
286 }
287 }
288 proposal_ids.insert(bn, ids_set);
289 }
290 }
291 let dummy_proposals = ProposalView::default();
292 let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
293 (proposal_ids, proposals)
294 }
295
296 fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
297 match store
298 .get_tip_header()
299 .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
300 {
301 Some((tip_header, epoch)) => {
302 if let Some(genesis_hash) = store.get_block_hash(0) {
303 let expect_genesis_hash = consensus.genesis_hash();
304 if genesis_hash == expect_genesis_hash {
305 Ok((tip_header, epoch))
306 } else {
307 Err(SpecError::GenesisMismatch {
308 expected: expect_genesis_hash,
309 actual: genesis_hash,
310 }
311 .into())
312 }
313 } else {
314 Err(InternalErrorKind::Database
315 .other("genesis does not exist in database")
316 .into())
317 }
318 }
319 None => store.init(consensus).map(|_| {
320 (
321 consensus.genesis_block().header(),
322 consensus.genesis_epoch_ext().to_owned(),
323 )
324 }),
325 }
326 }
327
328 fn init_snapshot(
329 store: &ChainDB,
330 consensus: Arc<Consensus>,
331 ) -> Result<(Snapshot, ProposalTable), Error> {
332 let (tip_header, epoch) = Self::init_store(store, &consensus)?;
333 let total_difficulty = store
334 .get_block_ext(&tip_header.hash())
335 .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
336 .total_difficulty;
337 let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
338
339 let snapshot = Snapshot::new(
340 tip_header,
341 total_difficulty,
342 epoch,
343 store.get_snapshot(),
344 proposal_view,
345 consensus,
346 );
347
348 Ok((snapshot, proposal_table))
349 }
350
351 pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
353 let SharedBuilder {
354 db,
355 ancient_path,
356 consensus,
357 tx_pool_config,
358 store_config,
359 sync_config,
360 block_assembler_config,
361 notify_config,
362 async_handle,
363 fee_estimator_config,
364 header_map_tmp_dir,
365 } = self;
366
367 let tx_pool_config = tx_pool_config.unwrap_or_default();
368 let notify_config = notify_config.unwrap_or_default();
369 let store_config = store_config.unwrap_or_default();
370 let sync_config = sync_config.unwrap_or_default();
371 let consensus = Arc::new(consensus);
372
373 let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
374
375 let ibd_finished = Arc::new(AtomicBool::new(false));
376
377 let header_map = Arc::new(HeaderMap::new(
378 header_map_tmp_dir,
379 header_map_memory_limit,
380 &async_handle,
381 Arc::clone(&ibd_finished),
382 ));
383
384 let notify_controller = start_notify_service(notify_config, async_handle.clone());
385
386 let store = build_store(db, store_config, ancient_path).map_err(|e| {
387 eprintln!("build_store {e}");
388 ExitCode::Failure
389 })?;
390
391 let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
392
393 let (snapshot, table) =
394 Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
395 eprintln!("init_snapshot {e}");
396 ExitCode::Failure
397 })?;
398 let snapshot = Arc::new(snapshot);
399 let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
400
401 let (sender, receiver) = ckb_channel::unbounded();
402
403 let fee_estimator_algo = fee_estimator_config
404 .map(|config| config.algorithm)
405 .unwrap_or(None);
406 let fee_estimator = match fee_estimator_algo {
407 Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
408 Some(FeeEstimatorAlgo::ConfirmationFraction) => {
409 FeeEstimator::new_confirmation_fraction()
410 }
411 None => FeeEstimator::new_dummy(),
412 };
413
414 let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
415 tx_pool_config,
416 Arc::clone(&snapshot),
417 block_assembler_config,
418 Arc::clone(&txs_verify_cache),
419 &async_handle,
420 sender,
421 fee_estimator.clone(),
422 );
423
424 register_tx_pool_callback(
425 &mut tx_pool_builder,
426 notify_controller.clone(),
427 fee_estimator,
428 );
429
430 let block_status_map = Arc::new(DashMap::new());
431
432 let assume_valid_targets = Arc::new(Mutex::new({
433 let not_exists_targets: Option<Vec<H256>> =
434 sync_config.assume_valid_targets.clone().map(|targets| {
435 targets
436 .iter()
437 .filter(|&target_hash| {
438 let exists = snapshot.block_exists(&target_hash.pack());
439 if exists {
440 info!("assume-valid target 0x{} exists in local db", target_hash);
441 }
442 !exists
443 })
444 .cloned()
445 .collect::<Vec<H256>>()
446 });
447
448 if not_exists_targets
449 .as_ref()
450 .is_some_and(|targets| targets.is_empty())
451 {
452 info!("all assume-valid targets synchronized, enter full verification mode");
453 None
454 } else {
455 not_exists_targets
456 }
457 }));
458
459 let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
460 sync_config
461 .assume_valid_targets
462 .and_then(|targets| targets.last().cloned()),
463 );
464
465 let shared = Shared::new(
466 store,
467 tx_pool_controller,
468 notify_controller,
469 txs_verify_cache,
470 consensus,
471 snapshot_mgr,
472 async_handle,
473 ibd_finished,
474 assume_valid_targets,
475 assume_valid_target_specified,
476 header_map,
477 block_status_map,
478 );
479
480 let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
481
482 let pack = SharedPackage {
483 chain_services_builder: Some(chain_services_builder),
484 tx_pool_builder: Some(tx_pool_builder),
485 relay_tx_receiver: Some(receiver),
486 };
487
488 Ok((shared, pack))
489 }
490}
491
492pub struct SharedPackage {
495 chain_services_builder: Option<ChainServicesBuilder>,
496 tx_pool_builder: Option<TxPoolServiceBuilder>,
497 relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
498}
499
500impl SharedPackage {
501 pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
503 self.chain_services_builder
504 .take()
505 .expect("take chain_services_builder")
506 }
507
508 pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
510 self.tx_pool_builder.take().expect("take tx_pool_builder")
511 }
512
513 pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
515 self.relay_tx_receiver
516 .take()
517 .expect("take relay_tx_receiver")
518 }
519}
520
521fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
522 NotifyService::new(notify_config, handle).start()
523}
524
525fn build_store(
526 db: RocksDB,
527 store_config: StoreConfig,
528 ancient_path: Option<PathBuf>,
529) -> Result<ChainDB, Error> {
530 let store = if store_config.freezer_enable && ancient_path.is_some() {
531 let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
532 ChainDB::new_with_freezer(db, freezer, store_config)
533 } else {
534 ChainDB::new(db, store_config)
535 };
536 Ok(store)
537}
538
539fn register_tx_pool_callback(
540 tx_pool_builder: &mut TxPoolServiceBuilder,
541 notify: NotifyController,
542 fee_estimator: FeeEstimator,
543) {
544 let notify_pending = notify.clone();
545
546 let tx_relay_sender = tx_pool_builder.tx_relay_sender();
547 let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
548 transaction: entry.rtx.transaction.clone(),
549 cycles: entry.cycles,
550 size: entry.size,
551 fee: entry.fee,
552 timestamp: entry.timestamp,
553 };
554
555 let fee_estimator_clone = fee_estimator.clone();
556 tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
557 let notify_tx_entry = create_notify_entry(entry);
559 notify_pending.notify_new_transaction(notify_tx_entry);
560 let tx_hash = entry.transaction().hash();
561 let entry_info = entry.to_info();
562 fee_estimator_clone.accept_tx(tx_hash, entry_info);
563 }));
564
565 let notify_proposed = notify.clone();
566 tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
567 let notify_tx_entry = create_notify_entry(entry);
569 notify_proposed.notify_proposed_transaction(notify_tx_entry);
570 }));
571
572 let notify_reject = notify;
573 tx_pool_builder.register_reject(Box::new(
574 move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
575 let tx_hash = entry.transaction().hash();
576 if reject.should_recorded() {
578 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
579 if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
580 error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
581 }
582 }
583 }
584
585 if reject.is_allowed_relay() {
586 if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
587 tx_hash: tx_hash.clone(),
588 }) {
589 error!("tx-pool tx_relay_sender internal error {}", e);
590 }
591 }
592
593 let notify_tx_entry = create_notify_entry(entry);
595 notify_reject.notify_reject_transaction(notify_tx_entry, reject);
596
597 fee_estimator.reject_tx(&tx_hash);
599 },
600 ));
601}