1use crate::ChainServicesBuilder;
3use crate::{HeaderMap, Shared};
4use ckb_app_config::{
5 BlockAssemblerConfig, DBConfig, ExitCode, FeeEstimatorAlgo, FeeEstimatorConfig, NotifyConfig,
6 StoreConfig, SyncConfig, TxPoolConfig,
7};
8use ckb_async_runtime::{Handle, new_background_runtime};
9use ckb_chain_spec::SpecError;
10use ckb_chain_spec::consensus::Consensus;
11use ckb_channel::Receiver;
12use ckb_db::RocksDB;
13use ckb_db_schema::COLUMNS;
14use ckb_error::{Error, InternalErrorKind};
15use ckb_fee_estimator::FeeEstimator;
16use ckb_logger::{error, info};
17use ckb_migrate::migrate::Migrate;
18use ckb_notify::{NotifyController, NotifyService};
19use ckb_proposal_table::ProposalTable;
20use ckb_proposal_table::ProposalView;
21use ckb_snapshot::{Snapshot, SnapshotMgr};
22use ckb_store::{ChainDB, ChainStore, Freezer};
23use ckb_tx_pool::{
24 TokioRwLock, TxEntry, TxPool, TxPoolServiceBuilder, service::TxVerificationResult,
25};
26use ckb_types::H256;
27use ckb_types::core::hardfork::HardForks;
28use ckb_types::{
29 core::EpochExt,
30 core::HeaderView,
31 core::tx_pool::{PoolTransactionEntry, Reject},
32};
33use ckb_util::Mutex;
34use ckb_verification::cache::init_cache;
35use dashmap::DashMap;
36use std::cmp::Ordering;
37use std::collections::HashSet;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::AtomicBool;
41use tempfile::TempDir;
42
43pub struct SharedBuilder {
45 db: RocksDB,
46 ancient_path: Option<PathBuf>,
47 consensus: Consensus,
48 tx_pool_config: Option<TxPoolConfig>,
49 store_config: Option<StoreConfig>,
50 sync_config: Option<SyncConfig>,
51 block_assembler_config: Option<BlockAssemblerConfig>,
52 notify_config: Option<NotifyConfig>,
53 async_handle: Handle,
54 fee_estimator_config: Option<FeeEstimatorConfig>,
55
56 header_map_tmp_dir: Option<PathBuf>,
57}
58
59pub fn open_or_create_db(
61 bin_name: &str,
62 root_dir: &Path,
63 config: &DBConfig,
64 hardforks: HardForks,
65) -> Result<RocksDB, ExitCode> {
66 let migrate = Migrate::new(&config.path, hardforks);
67
68 let read_only_db = migrate.open_read_only_db().map_err(|e| {
69 eprintln!("Migration error {e}");
70 ExitCode::Failure
71 })?;
72
73 if let Some(db) = read_only_db {
74 match migrate.check(&db, true) {
75 Ordering::Greater => {
76 eprintln!(
77 "The database was created by a higher version CKB executable binary \n\
78 and cannot be opened by the current binary.\n\
79 Please download the latest CKB executable binary."
80 );
81 Err(ExitCode::Failure)
82 }
83 Ordering::Equal => Ok(RocksDB::open(config, COLUMNS)),
84 Ordering::Less => {
85 let can_run_in_background = migrate.can_run_in_background(&db);
86 if migrate.require_expensive(&db, false) && !can_run_in_background {
87 eprintln!(
88 "For optimal performance, CKB recommends migrating your data into a new format.\n\
89 If you prefer to stick with the older version, \n\
90 it's important to note that they may have unfixed vulnerabilities.\n\
91 Before migrating, we strongly recommend backuping your data directory.\n\
92 To migrate, run `\"{}\" migrate -C \"{}\"` and confirm by typing \"YES\".",
93 bin_name,
94 root_dir.display()
95 );
96 Err(ExitCode::Failure)
97 } else if can_run_in_background {
98 info!("process migrations in background ...");
99 let db = RocksDB::open(config, COLUMNS);
100 migrate.migrate(db.clone(), true).map_err(|err| {
101 eprintln!("Run error: {err:?}");
102 ExitCode::Failure
103 })?;
104 Ok(db)
105 } else {
106 info!("Processing fast migrations ...");
107
108 let bulk_load_db_db = migrate.open_bulk_load_db().map_err(|e| {
109 eprintln!("Migration error {e}");
110 ExitCode::Failure
111 })?;
112
113 if let Some(db) = bulk_load_db_db {
114 migrate.migrate(db, false).map_err(|err| {
115 eprintln!("Run error: {err:?}");
116 ExitCode::Failure
117 })?;
118 }
119
120 Ok(RocksDB::open(config, COLUMNS))
121 }
122 }
123 }
124 } else {
125 let db = RocksDB::open(config, COLUMNS);
126 migrate.init_db_version(&db).map_err(|e| {
127 eprintln!("Migrate init_db_version error {e}");
128 ExitCode::Failure
129 })?;
130 Ok(db)
131 }
132}
133
134impl SharedBuilder {
135 pub fn new(
137 bin_name: &str,
138 root_dir: &Path,
139 db_config: &DBConfig,
140 ancient: Option<PathBuf>,
141 async_handle: Handle,
142 consensus: Consensus,
143 ) -> Result<SharedBuilder, ExitCode> {
144 let db = open_or_create_db(
145 bin_name,
146 root_dir,
147 db_config,
148 consensus.hardfork_switch.clone(),
149 )?;
150
151 Ok(SharedBuilder {
152 db,
153 ancient_path: ancient,
154 consensus,
155 tx_pool_config: None,
156 notify_config: None,
157 store_config: None,
158 sync_config: None,
159 block_assembler_config: None,
160 async_handle,
161 fee_estimator_config: None,
162 header_map_tmp_dir: None,
163 })
164 }
165
166 pub fn with_temp_db() -> Self {
169 use std::{
170 borrow::Borrow,
171 sync::atomic::{AtomicUsize, Ordering},
172 };
173
174 thread_local! {
178 static RUNTIME_HANDLE: std::cell::OnceCell<Handle> = const { std::cell::OnceCell::new() };
181 }
182
183 static DB_COUNT: AtomicUsize = AtomicUsize::new(0);
184 static TMP_DIR: std::sync::OnceLock<TempDir> = std::sync::OnceLock::new();
185
186 let db = {
187 let db_id = DB_COUNT.fetch_add(1, Ordering::SeqCst);
188 let db_base_dir = TMP_DIR
189 .borrow()
190 .get_or_init(|| TempDir::new().unwrap())
191 .path()
192 .to_path_buf();
193 let db_dir = db_base_dir.join(format!("db_{db_id}"));
194 RocksDB::open_in(db_dir, COLUMNS)
195 };
196
197 RUNTIME_HANDLE.with(|runtime| SharedBuilder {
198 db,
199 ancient_path: None,
200 consensus: Consensus::default(),
201 tx_pool_config: None,
202 notify_config: None,
203 store_config: None,
204 sync_config: None,
205 block_assembler_config: None,
206 async_handle: runtime.get_or_init(new_background_runtime).clone(),
207 fee_estimator_config: None,
208
209 header_map_tmp_dir: None,
210 })
211 }
212}
213
214impl SharedBuilder {
215 pub fn consensus(mut self, value: Consensus) -> Self {
217 self.consensus = value;
218 self
219 }
220
221 pub fn tx_pool_config(mut self, config: TxPoolConfig) -> Self {
223 self.tx_pool_config = Some(config);
224 self
225 }
226
227 pub fn notify_config(mut self, config: NotifyConfig) -> Self {
229 self.notify_config = Some(config);
230 self
231 }
232
233 pub fn store_config(mut self, config: StoreConfig) -> Self {
235 self.store_config = Some(config);
236 self
237 }
238
239 pub fn sync_config(mut self, config: SyncConfig) -> Self {
241 self.sync_config = Some(config);
242 self
243 }
244
245 pub fn header_map_tmp_dir(mut self, header_map_tmp_dir: Option<PathBuf>) -> Self {
247 self.header_map_tmp_dir = header_map_tmp_dir;
248 self
249 }
250
251 pub fn block_assembler_config(mut self, config: Option<BlockAssemblerConfig>) -> Self {
253 self.block_assembler_config = config;
254 self
255 }
256
257 pub fn fee_estimator_config(mut self, config: FeeEstimatorConfig) -> Self {
259 self.fee_estimator_config = Some(config);
260 self
261 }
262
263 pub fn async_handle(mut self, async_handle: Handle) -> Self {
265 self.async_handle = async_handle;
266 self
267 }
268
269 fn init_proposal_table(
270 store: &ChainDB,
271 consensus: &Consensus,
272 ) -> (ProposalTable, ProposalView) {
273 let proposal_window = consensus.tx_proposal_window();
274 let tip_number = store.get_tip_header().expect("store inited").number();
275 let mut proposal_ids = ProposalTable::new(proposal_window);
276 let proposal_start = tip_number.saturating_sub(proposal_window.farthest());
277 for bn in proposal_start..=tip_number {
278 if let Some(hash) = store.get_block_hash(bn) {
279 let mut ids_set = HashSet::new();
280 if let Some(ids) = store.get_block_proposal_txs_ids(&hash) {
281 ids_set.extend(ids)
282 }
283
284 if let Some(us) = store.get_block_uncles(&hash) {
285 for u in us.data().into_iter() {
286 ids_set.extend(u.proposals().into_iter());
287 }
288 }
289 proposal_ids.insert(bn, ids_set);
290 }
291 }
292 let dummy_proposals = ProposalView::default();
293 let (_, proposals) = proposal_ids.finalize(&dummy_proposals, tip_number);
294 (proposal_ids, proposals)
295 }
296
297 fn init_store(store: &ChainDB, consensus: &Consensus) -> Result<(HeaderView, EpochExt), Error> {
298 match store
299 .get_tip_header()
300 .and_then(|header| store.get_current_epoch_ext().map(|epoch| (header, epoch)))
301 {
302 Some((tip_header, epoch)) => {
303 if let Some(genesis_hash) = store.get_block_hash(0) {
304 let expect_genesis_hash = consensus.genesis_hash();
305 if genesis_hash == expect_genesis_hash {
306 Ok((tip_header, epoch))
307 } else {
308 Err(SpecError::GenesisMismatch {
309 expected: expect_genesis_hash,
310 actual: genesis_hash,
311 }
312 .into())
313 }
314 } else {
315 Err(InternalErrorKind::Database
316 .other("genesis does not exist in database")
317 .into())
318 }
319 }
320 None => store.init(consensus).map(|_| {
321 (
322 consensus.genesis_block().header(),
323 consensus.genesis_epoch_ext().to_owned(),
324 )
325 }),
326 }
327 }
328
329 fn init_snapshot(
330 store: &ChainDB,
331 consensus: Arc<Consensus>,
332 ) -> Result<(Snapshot, ProposalTable), Error> {
333 let (tip_header, epoch) = Self::init_store(store, &consensus)?;
334 let total_difficulty = store
335 .get_block_ext(&tip_header.hash())
336 .ok_or_else(|| InternalErrorKind::Database.other("failed to get tip's block_ext"))?
337 .total_difficulty;
338 let (proposal_table, proposal_view) = Self::init_proposal_table(store, &consensus);
339
340 let snapshot = Snapshot::new(
341 tip_header,
342 total_difficulty,
343 epoch,
344 store.get_snapshot(),
345 proposal_view,
346 consensus,
347 );
348
349 Ok((snapshot, proposal_table))
350 }
351
352 pub fn build(self) -> Result<(Shared, SharedPackage), ExitCode> {
356 let SharedBuilder {
357 db,
358 ancient_path,
359 consensus,
360 tx_pool_config,
361 store_config,
362 sync_config,
363 block_assembler_config,
364 notify_config,
365 async_handle,
366 fee_estimator_config,
367 header_map_tmp_dir,
368 } = self;
369
370 let tx_pool_config = tx_pool_config.unwrap_or_default();
371 let notify_config = notify_config.unwrap_or_default();
372 let store_config = store_config.unwrap_or_default();
373 let sync_config = sync_config.unwrap_or_default();
374 let consensus = Arc::new(consensus);
375
376 let header_map_memory_limit = sync_config.header_map.memory_limit.as_u64() as usize;
377
378 let ibd_finished = Arc::new(AtomicBool::new(false));
379
380 let header_map = Arc::new(HeaderMap::new(
381 header_map_tmp_dir,
382 header_map_memory_limit,
383 &async_handle,
384 Arc::clone(&ibd_finished),
385 ));
386
387 let notify_controller = start_notify_service(notify_config, async_handle.clone());
388
389 let store = build_store(db, store_config, ancient_path).map_err(|e| {
390 eprintln!("build_store {e}");
391 ExitCode::Failure
392 })?;
393
394 let txs_verify_cache = Arc::new(TokioRwLock::new(init_cache()));
395
396 let (snapshot, table) =
397 Self::init_snapshot(&store, Arc::clone(&consensus)).map_err(|e| {
398 eprintln!("init_snapshot {e}");
399 ExitCode::Failure
400 })?;
401 let snapshot = Arc::new(snapshot);
402 let snapshot_mgr = Arc::new(SnapshotMgr::new(Arc::clone(&snapshot)));
403
404 let (sender, receiver) = ckb_channel::unbounded();
405
406 let fee_estimator_algo = fee_estimator_config
407 .map(|config| config.algorithm)
408 .unwrap_or(None);
409 let fee_estimator = match fee_estimator_algo {
410 Some(FeeEstimatorAlgo::WeightUnitsFlow) => FeeEstimator::new_weight_units_flow(),
411 Some(FeeEstimatorAlgo::ConfirmationFraction) => {
412 FeeEstimator::new_confirmation_fraction()
413 }
414 None => FeeEstimator::new_dummy(),
415 };
416
417 let (mut tx_pool_builder, tx_pool_controller) = TxPoolServiceBuilder::new(
418 tx_pool_config,
419 Arc::clone(&snapshot),
420 block_assembler_config,
421 Arc::clone(&txs_verify_cache),
422 &async_handle,
423 sender,
424 fee_estimator.clone(),
425 );
426
427 register_tx_pool_callback(
428 &mut tx_pool_builder,
429 notify_controller.clone(),
430 fee_estimator,
431 );
432
433 let block_status_map = Arc::new(DashMap::new());
434
435 let assume_valid_targets = Arc::new(Mutex::new({
436 let not_exists_targets: Option<Vec<H256>> =
437 sync_config.assume_valid_targets.clone().map(|targets| {
438 targets
439 .iter()
440 .filter(|&target_hash| {
441 let exists = snapshot.block_exists(&target_hash.into());
442 if exists {
443 info!("assume-valid target 0x{} exists in local db", target_hash);
444 }
445 !exists
446 })
447 .cloned()
448 .collect::<Vec<H256>>()
449 });
450
451 if not_exists_targets
452 .as_ref()
453 .is_some_and(|targets| targets.is_empty())
454 {
455 info!("all assume-valid targets synchronized, enter full verification mode");
456 None
457 } else {
458 not_exists_targets
459 }
460 }));
461
462 let assume_valid_target_specified: Arc<Option<H256>> = Arc::new(
463 sync_config
464 .assume_valid_targets
465 .and_then(|targets| targets.last().cloned()),
466 );
467
468 let shared = Shared::new(
469 store,
470 tx_pool_controller,
471 notify_controller,
472 txs_verify_cache,
473 consensus,
474 snapshot_mgr,
475 async_handle,
476 ibd_finished,
477 assume_valid_targets,
478 assume_valid_target_specified,
479 header_map,
480 block_status_map,
481 );
482
483 let chain_services_builder = ChainServicesBuilder::new(shared.clone(), table);
484
485 let pack = SharedPackage {
486 chain_services_builder: Some(chain_services_builder),
487 tx_pool_builder: Some(tx_pool_builder),
488 relay_tx_receiver: Some(receiver),
489 };
490
491 Ok((shared, pack))
492 }
493}
494
495pub struct SharedPackage {
498 chain_services_builder: Option<ChainServicesBuilder>,
499 tx_pool_builder: Option<TxPoolServiceBuilder>,
500 relay_tx_receiver: Option<Receiver<TxVerificationResult>>,
501}
502
503impl SharedPackage {
504 pub fn take_chain_services_builder(&mut self) -> ChainServicesBuilder {
506 self.chain_services_builder
507 .take()
508 .expect("take chain_services_builder")
509 }
510
511 pub fn take_tx_pool_builder(&mut self) -> TxPoolServiceBuilder {
513 self.tx_pool_builder.take().expect("take tx_pool_builder")
514 }
515
516 pub fn take_relay_tx_receiver(&mut self) -> Receiver<TxVerificationResult> {
518 self.relay_tx_receiver
519 .take()
520 .expect("take relay_tx_receiver")
521 }
522}
523
524fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController {
525 NotifyService::new(notify_config, handle).start()
526}
527
528fn build_store(
529 db: RocksDB,
530 store_config: StoreConfig,
531 ancient_path: Option<PathBuf>,
532) -> Result<ChainDB, Error> {
533 let store = if store_config.freezer_enable && ancient_path.is_some() {
534 let freezer = Freezer::open(ancient_path.expect("exist checked"))?;
535 ChainDB::new_with_freezer(db, freezer, store_config)
536 } else {
537 ChainDB::new(db, store_config)
538 };
539 Ok(store)
540}
541
542fn register_tx_pool_callback(
543 tx_pool_builder: &mut TxPoolServiceBuilder,
544 notify: NotifyController,
545 fee_estimator: FeeEstimator,
546) {
547 let notify_pending = notify.clone();
548
549 let tx_relay_sender = tx_pool_builder.tx_relay_sender();
550 let create_notify_entry = |entry: &TxEntry| PoolTransactionEntry {
551 transaction: entry.rtx.transaction.clone(),
552 cycles: entry.cycles,
553 size: entry.size,
554 fee: entry.fee,
555 timestamp: entry.timestamp,
556 };
557
558 let fee_estimator_clone = fee_estimator.clone();
559 tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
560 let notify_tx_entry = create_notify_entry(entry);
562 notify_pending.notify_new_transaction(notify_tx_entry);
563 let tx_hash = entry.transaction().hash();
564 let entry_info = entry.to_info();
565 fee_estimator_clone.accept_tx(tx_hash, entry_info);
566 }));
567
568 let notify_proposed = notify.clone();
569 tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
570 let notify_tx_entry = create_notify_entry(entry);
572 notify_proposed.notify_proposed_transaction(notify_tx_entry);
573 }));
574
575 let notify_reject = notify;
576 tx_pool_builder.register_reject(Box::new(
577 move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
578 let tx_hash = entry.transaction().hash();
579 if reject.should_recorded() {
581 if let Some(ref mut recent_reject) = tx_pool.recent_reject {
582 if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
583 error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
584 }
585 }
586 }
587
588 if reject.is_allowed_relay() {
589 if let Err(e) = tx_relay_sender.send(TxVerificationResult::Reject {
590 tx_hash: tx_hash.clone(),
591 }) {
592 error!("tx-pool tx_relay_sender internal error {}", e);
593 }
594 }
595
596 let notify_tx_entry = create_notify_entry(entry);
598 notify_reject.notify_reject_transaction(notify_tx_entry, reject);
599
600 fee_estimator.reject_tx(&tx_hash);
602 },
603 ));
604}