1use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5 BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6 StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24 TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::{
29 core::EpochExt,
30 core::HeaderView,
31 core::tx_pool::{PoolTransactionEntry, Reject},
32};
33use ckb_util::Mutex;
34use ckb_verification::cache::init_cache;
35use dashmap::DashMap;
36use std::cmp::Ordering;
37use std::collections::HashSet;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::AtomicBool;
41use tempfile::TempDir;
42
43pub struct SharedBuilder {
45 db: RocksDB,
46 ancient_path: Option<PathBuf>,
47 consensus: Consensus,
48 tx_pool_config: Option<TxPoolConfig>,
49 store_config: Option<StoreConfig>,
50 sync_config: Option<SyncConfig>,
51 block_assembler_config: Option<BlockAssemblerConfig>,
52 notify_config: Option<NotifyConfig>,
53 async_handle: Handle,
54 fee_estimator_config: Option<FeeEstimatorConfig>,
55
56 header_map_tmp_dir: Option<PathBuf>,
57}
58
59pub fn open_or_create_db(
61 bin_name: &str,
62 root_dir: &Path,
63 config: &DBConfig,
64 hardforks: HardForks,
65) -> Result<RocksDB, ExitCode> {
66 let migrate = Migrate::new(&config.path, hardforks);
67
68 let read_only_db = migrate.open_read_only_db().map_err(|e| {
69 eprintln!("Migration error {e}");
70 ExitCode::Failure
71 })?;
72
73 if let Some(db) = read_only_db {
74 match migrate.check(&db, true) {
75 Ordering::Greater => {
76 eprintln!(
77 "The database was created by a higher version CKB executable binary \n\
78 and cannot be opened by the current binary.\n\
79 Please download the latest CKB executable binary."
80 );
81 Err(ExitCode::Failure)
82 }
83 Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
84 Ordering::Less => {
85 let can_run_in_background = migrate.can_run_in_background(&db);
86 if migrate.require_expensive(&db, false) && !can_run_in_background {
87 eprintln!(
88 "For optimal performance, CKB recommends migrating your data into a new format.\n\
89 If you prefer to stick with the older version, \n\
90 it's important to note that they may have unfixed vulnerabilities.\n\
91 Before migrating, we strongly recommend backuping your data directory.\n\
92 To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
93 bin_name,
94 root_dir.display()
95 );
96 Err(ExitCode::Failure)
97 } else if can_run_in_background {
98 info!("process migrations in background ...");
99 let db = RocksDB::open(config, COLUMNS);
100 migrate.migrate(db.clone(), true).map_err(|err| {
101 eprintln!("Run error: {err:?}");
102 ExitCode::Failure
103 })?;
104 Ok(db)
105 } else {
106 info!("Processing fast migrations ...");
107
108 let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
109 eprintln!("Migration error {e}");
110 ExitCode::Failure
111 })?;
112
113 if let Some(db) = bulk_load_db_db {
114 migrate.migrate(db, false).map_err(|err| {
115 eprintln!("Run error: {err:?}");
116 ExitCode::Failure
117 })?;
118 }
119
120 Ok(RocksDB::open(config, COLUMNS))
121 }
122 }
123 }
124 } else {
125 let db = RocksDB::open(config, COLUMNS);
126 migrate.init_db_version(&db).map_err(|e| {
127 eprintln!("Migrate init_db_version error {e}");
128 ExitCode::Failure
129 })?;
130 Ok(db)
131 }
132}
133
134impl SharedBuilder {
135 pub fn new(
137 bin_name: &str,
138 root_dir: &Path,
139 db_config: &DBConfig,
140 ancient: Option<PathBuf>,
141 async_handle: Handle,
142 consensus: Consensus,
143 ) -> Result<SharedBuilder, ExitCode> {
144 let db = open_or_create_db(
145 bin_name,
146 root_dir,
147 db_config,
148 consensus.hardfork_switch.clone(),
149 )?;
150
151 Ok(SharedBuilder {
152 db,
153 ancient_path: ancient,
154 consensus,
155 tx_pool_config: None,
156 notify_config: None,
157 store_config: None,
158 sync_config: None,
159 block_assembler_config: None,
160 async_handle,
161 fee_estimator_config: None,
162 header_map_tmp_dir: None,
163 })
164 }
165
166 pub fn with_temp_db() -> Self {
169 use std::{
170 borrow::Borrow,
171 sync::atomic::{AtomicUsize, Ordering},
172 };
173
174 thread_local! {
178 static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
181 }
182
183 static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
184 static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
185
186 let db = {
187 let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
188 let db_base_dir = TMP_DIR
189 .borrow()
190 .get_or_init(|| TempDir::new().unwrap())
191 .path()
192 .to_path_buf();
193 let db_dir = db_base_dir.join(format!("db_{db_id}"));
194 RocksDB::open_in(db_dir, COLUMNS)
195 };
196
197 RUNTIME_HANDLE.with(|runtime| SharedBuilder {
198 db,
199 ancient_path: None,
200 consensus: Consensus::default(),
201 tx_pool_config: None,
202 notify_config: None,
203 store_config: None,
204 sync_config: None,
205 block_assembler_config: None,
206 async_handle: runtime.get_or_init(new_background_runtime).clone(),
207 fee_estimator_config: None,
208
209 header_map_tmp_dir: None,
210 })
211 }
212}
213
214impl SharedBuilder {
215 pub fn consensus(mut self, value: Consensus) -> Self {
217 self.consensus = value;
218 self
219 }
220
221 pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
223 self.tx_pool_config = Some(config);
224 self
225 }
226
227 pub fn notify_config(mut self, config: NotifyConfig) -> Self {
229 self.notify_config = Some(config);
230 self
231 }
232
233 pub fn store_config(mut self, config: StoreConfig) -> Self {
235 self.store_config = Some(config);
236 self
237 }
238
239 pub fn sync_config(mut self, config: SyncConfig) -> Self {
241 self.sync_config = Some(config);
242 self
243 }
244
245 pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
247 self.header_map_tmp_dir = header_map_tmp_dir;
248 self
249 }
250
251 pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
253 self.block_assembler_config = config;
254 self
255 }
256
257 pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
259 self.fee_estimator_config = Some(config);
260 self
261 }
262
263 pub fn async_handle(mut self, async_handle: Handle) -> Self {
265 self.async_handle = async_handle;
266 self
267 }
268
269 fn init_proposal_table(
270 store: &ChainDB,
271 consensus: &Consensus,
272 ) -> (ProposalTable, ProposalView) {
273 let proposal_window = consensus.tx_proposal_window();
274 let tip_number = store.get_tip_header().expect("store inited").number();
275 let mut proposal_ids = ProposalTable::new(proposal_window);
276 let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
277 for bn in proposal_start..=tip_number {
278 if let Some(hash) = store.get_block_hash(bn) {
279 let mut ids_set = HashSet::new();
280 if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
281 ids_set.extend(ids)
282 }
283
284 if let Some(us) = store.get_block_uncles(&hash) {
285 for u in us.data().into_iter() {
286 ids_set.extend(u.proposals().into_iter());
287 }
288 }
289 proposal_ids.insert(bn, ids_set);
290 }
291 }
292 let dummy_proposals = ProposalView::default();
293 let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
294 (proposal_ids, proposals)
295 }
296
297 fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
298 match store
299 .get_tip_header()
300 .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
301 {
302 Some((tip_header, epoch)) => {
303 if let Some(genesis_hash) = store.get_block_hash(0) {
304 let expect_genesis_hash = consensus.genesis_hash();
305 if genesis_hash == expect_genesis_hash {
306 Ok((tip_header, epoch))
307 } else {
308 Err(SpecError::GenesisMismatch {
309 expected: expect_genesis_hash,
310 actual: genesis_hash,
311 }
312 .into())
313 }
314 } else {
315 Err(InternalErrorKind::Database
316 .other("genesis does not exist in database")
317 .into())
318 }
319 }
320 None => store.init(consensus).map(|_| {
321 (
322 consensus.genesis_block().header(),
323 consensus.genesis_epoch_ext().to_owned(),
324 )
325 }),
326 }
327 }
328
329 fn init_snapshot(
330 store: &ChainDB,
331 consensus: Arc<Consensus>,
332 ) -> Result<(Snapshot, ProposalTable), Error> {
333 let (tip_header, epoch) = Self::init_store(store, &consensus)?;
334 let total_difficulty = store
335 .get_block_ext(&tip_header.hash())
336 .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
337 .total_difficulty;
338 let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
339
340 let snapshot = Snapshot::new(
341 tip_header,
342 total_difficulty,
343 epoch,
344 store.get_snapshot(),
345 proposal_view,
346 consensus,
347 );
348
349 Ok((snapshot, proposal_table))
350 }
351
352 pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
354 let SharedBuilder {
355 db,
356 ancient_path,
357 consensus,
358 tx_pool_config,
359 store_config,
360 sync_config,
361 block_assembler_config,
362 notify_config,
363 async_handle,
364 fee_estimator_config,
365 header_map_tmp_dir,
366 } = self;
367
368 let tx_pool_config = tx_pool_config.unwrap_or_default();
369 let notify_config = notify_config.unwrap_or_default();
370 let store_config = store_config.unwrap_or_default();
371 let sync_config = sync_config.unwrap_or_default();
372 let consensus = Arc::new(consensus);
373
374 let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
375
376 let ibd_finished = Arc::new(AtomicBool::new(false));
377
378 let header_map = Arc::new(HeaderMap::new(
379 header_map_tmp_dir,
380 header_map_memory_limit,
381 &async_handle,
382 Arc::clone(&ibd_finished),
383 ));
384
385 let notify_controller = start_notify_service(notify_config, async_handle.clone());
386
387 let store = build_store(db, store_config, ancient_path).map_err(|e| {
388 eprintln!("build_store {e}");
389 ExitCode::Failure
390 })?;
391
392 let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
393
394 let (snapshot, table) =
395 Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
396 eprintln!("init_snapshot {e}");
397 ExitCode::Failure
398 })?;
399 let snapshot = Arc::new(snapshot);
400 let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
401
402 let (sender, receiver) = ckb_channel::unbounded();
403
404 let fee_estimator_algo = fee_estimator_config
405 .map(|config| config.algorithm)
406 .unwrap_or(None);
407 let fee_estimator = match fee_estimator_algo {
408 Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
409 Some(FeeEstimatorAlgo::ConfirmationFraction) => {
410 FeeEstimator::new_confirmation_fraction()
411 }
412 None => FeeEstimator::new_dummy(),
413 };
414
415 let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
416 tx_pool_config,
417 Arc::clone(&snapshot),
418 block_assembler_config,
419 Arc::clone(&txs_verify_cache),
420 &async_handle,
421 sender,
422 fee_estimator.clone(),
423 );
424
425 register_tx_pool_callback(
426 &mut tx_pool_builder,
427 notify_controller.clone(),
428 fee_estimator,
429 );
430
431 let block_status_map = Arc::new(DashMap::new());
432
433 let assume_valid_targets = Arc::new(Mutex::new({
434 let not_exists_targets: Option<Vec<H256>> =
435 sync_config.assume_valid_targets.clone().map(|targets| {
436 targets
437 .iter()
438 .filter(|&target_hash| {
439 let exists = snapshot.block_exists(&target_hash.into());
440 if exists {
441 info!("assume-valid target 0x{} exists in local db", target_hash);
442 }
443 !exists
444 })
445 .cloned()
446 .collect::<Vec<H256>>()
447 });
448
449 if not_exists_targets
450 .as_ref()
451 .is_some_and(|targets| targets.is_empty())
452 {
453 info!("all assume-valid targets synchronized, enter full verification mode");
454 None
455 } else {
456 not_exists_targets
457 }
458 }));
459
460 let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
461 sync_config
462 .assume_valid_targets
463 .and_then(|targets| targets.last().cloned()),
464 );
465
466 let shared = Shared::new(
467 store,
468 tx_pool_controller,
469 notify_controller,
470 txs_verify_cache,
471 consensus,
472 snapshot_mgr,
473 async_handle,
474 ibd_finished,
475 assume_valid_targets,
476 assume_valid_target_specified,
477 header_map,
478 block_status_map,
479 );
480
481 let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
482
483 let pack = SharedPackage {
484 chain_services_builder: Some(chain_services_builder),
485 tx_pool_builder: Some(tx_pool_builder),
486 relay_tx_receiver: Some(receiver),
487 };
488
489 Ok((shared, pack))
490 }
491}
492
493pub struct SharedPackage {
496 chain_services_builder: Option<ChainServicesBuilder>,
497 tx_pool_builder: Option<TxPoolServiceBuilder>,
498 relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
499}
500
501impl SharedPackage {
502 pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
504 self.chain_services_builder
505 .take()
506 .expect("take chain_services_builder")
507 }
508
509 pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
511 self.tx_pool_builder.take().expect("take tx_pool_builder")
512 }
513
514 pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
516 self.relay_tx_receiver
517 .take()
518 .expect("take relay_tx_receiver")
519 }
520}
521
522fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
523 NotifyService::new(notify_config, handle).start()
524}
525
526fn build_store(
527 db: RocksDB,
528 store_config: StoreConfig,
529 ancient_path: Option<PathBuf>,
530) -> Result<ChainDB, Error> {
531 let store = if store_config.freezer_enable && ancient_path.is_some() {
532 let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
533 ChainDB::new_with_freezer(db, freezer, store_config)
534 } else {
535 ChainDB::new(db, store_config)
536 };
537 Ok(store)
538}
539
540fn register_tx_pool_callback(
541 tx_pool_builder: &mut TxPoolServiceBuilder,
542 notify: NotifyController,
543 fee_estimator: FeeEstimator,
544) {
545 let notify_pending = notify.clone();
546
547 let tx_relay_sender = tx_pool_builder.tx_relay_sender();
548 let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
549 transaction: entry.rtx.transaction.clone(),
550 cycles: entry.cycles,
551 size: entry.size,
552 fee: entry.fee,
553 timestamp: entry.timestamp,
554 };
555
556 let fee_estimator_clone = fee_estimator.clone();
557 tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
558 let notify_tx_entry = create_notify_entry(entry);
560 notify_pending.notify_new_transaction(notify_tx_entry);
561 let tx_hash = entry.transaction().hash();
562 let entry_info = entry.to_info();
563 fee_estimator_clone.accept_tx(tx_hash, entry_info);
564 }));
565
566 let notify_proposed = notify.clone();
567 tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
568 let notify_tx_entry = create_notify_entry(entry);
570 notify_proposed.notify_proposed_transaction(notify_tx_entry);
571 }));
572
573 let notify_reject = notify;
574 tx_pool_builder.register_reject(Box::new(
575 move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
576 let tx_hash = entry.transaction().hash();
577 if reject.should_recorded() {
579 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
580 if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
581 error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
582 }
583 }
584 }
585
586 if reject.is_allowed_relay() {
587 if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
588 tx_hash: tx_hash.clone(),
589 }) {
590 error!("tx-pool tx_relay_sender internal error {}", e);
591 }
592 }
593
594 let notify_tx_entry = create_notify_entry(entry);
596 notify_reject.notify_reject_transaction(notify_tx_entry, reject);
597
598 fee_estimator.reject_tx(&tx_hash);
600 },
601 ));
602}