ckb_chain/
init.rs

1#![allow(missing_docs)]
2
3//! Bootstrap InitLoadUnverified, PreloadUnverifiedBlock, ChainService and ConsumeUnverified threads.
4use crate::chain_service::ChainService;
5use crate::init_load_unverified::InitLoadUnverified;
6use crate::orphan_broker::OrphanBroker;
7use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel;
8use crate::utils::orphan_block_pool::OrphanBlockPool;
9use crate::verify::ConsumeUnverifiedBlocks;
10use crate::{LonelyBlockHash, UnverifiedBlock, chain_controller::ChainController};
11use ckb_channel::{self as channel, SendError};
12use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW;
13use ckb_logger::warn;
14use ckb_shared::ChainServicesBuilder;
15use ckb_stop_handler::register_thread;
16use ckb_types::packed::Byte32;
17use dashmap::DashSet;
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::thread;
21
22const ORPHAN_BLOCK_SIZE: usize = BLOCK_DOWNLOAD_WINDOW as usize;
23
24/// Here we distinguish between build_chain_services and start_chain_services:
25/// * build_chain_services simply initializes ChainController, setting up all relevant
26///   threads, and return join handle for the main chain service thread.
27/// * start_chain_services first builds relevant data just like build_chain_services,
28///   in addition, it register the main chain service thread against CKB's handler. As
29///   a result, start_chain_services only returns ChainController, it is expected that
30///   CKB's stop handler shall be used to terminate the created chain service.
31pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
32    let (chain_service, chain_service_thread) = build_chain_services(builder);
33    register_thread("ChainService", chain_service_thread);
34
35    chain_service
36}
37
38/// Please refer to +start_chain_services+ for difference between build_chain_services
39/// and start_chain_services
40pub fn build_chain_services(
41    builder: ChainServicesBuilder,
42) -> (ChainController, thread::JoinHandle<()>) {
43    let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));
44
45    let (truncate_block_tx, truncate_block_rx) = channel::bounded(1);
46
47    let (preload_unverified_stop_tx, preload_unverified_stop_rx) = ckb_channel::bounded::<()>(1);
48
49    let (preload_unverified_tx, preload_unverified_rx) =
50        channel::bounded::<LonelyBlockHash>(BLOCK_DOWNLOAD_WINDOW as usize * 10);
51
52    let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1);
53    let (unverified_block_tx, unverified_block_rx) = channel::bounded::<UnverifiedBlock>(128usize);
54
55    let is_pending_verify: Arc<DashSet<Byte32>> = Arc::new(DashSet::new());
56
57    let consumer_unverified_thread = thread::Builder::new()
58        .name("verify_blocks".into())
59        .spawn({
60            let shared = builder.shared.clone();
61            let is_pending_verify = Arc::clone(&is_pending_verify);
62            move || {
63                let consume_unverified = ConsumeUnverifiedBlocks::new(
64                    shared,
65                    unverified_block_rx,
66                    truncate_block_rx,
67                    builder.proposal_table,
68                    is_pending_verify,
69                    unverified_queue_stop_rx,
70                );
71
72                consume_unverified.start();
73            }
74        })
75        .expect("start unverified_queue consumer thread should ok");
76
77    let preload_unverified_block_thread = thread::Builder::new()
78        .name("preload_unverified_block".into())
79        .spawn({
80            let shared = builder.shared.clone();
81            move || {
82                let preload_unverified_block = PreloadUnverifiedBlocksChannel::new(
83                    shared,
84                    preload_unverified_rx,
85                    unverified_block_tx,
86                    preload_unverified_stop_rx,
87                );
88                preload_unverified_block.start()
89            }
90        })
91        .expect("start preload_unverified_block should ok");
92
93    let (process_block_tx, process_block_rx) = channel::bounded(0);
94
95    let is_verifying_unverified_blocks_on_startup = Arc::new(AtomicBool::new(true));
96
97    let chain_controller = ChainController::new(
98        process_block_tx,
99        truncate_block_tx,
100        Arc::clone(&orphan_blocks_broker),
101        Arc::clone(&is_verifying_unverified_blocks_on_startup),
102    );
103
104    let init_load_unverified_thread = thread::Builder::new()
105        .name("init_load_unverified_blocks".into())
106        .spawn({
107            let chain_controller = chain_controller.clone();
108            let shared = builder.shared.clone();
109
110            move || {
111                let init_load_unverified: InitLoadUnverified = InitLoadUnverified::new(
112                    shared,
113                    chain_controller,
114                    is_verifying_unverified_blocks_on_startup,
115                );
116                init_load_unverified.start();
117            }
118        })
119        .expect("start unverified_queue consumer thread should ok");
120
121    let consume_orphan = OrphanBroker::new(
122        builder.shared.clone(),
123        orphan_blocks_broker,
124        preload_unverified_tx,
125        is_pending_verify,
126    );
127
128    let chain_service: ChainService =
129        ChainService::new(builder.shared, process_block_rx, consume_orphan);
130    let chain_service_thread = thread::Builder::new()
131        .name("ChainService".into())
132        .spawn({
133            move || {
134                chain_service.start_process_block();
135
136                let _ = init_load_unverified_thread.join();
137
138                if preload_unverified_stop_tx.send(()).is_err(){
139                    warn!("trying to notify preload unverified thread to stop, but preload_unverified_stop_tx already closed");
140                }
141                let _ = preload_unverified_block_thread.join();
142
143                if let Err(SendError(_)) = unverified_queue_stop_tx.send(()) {
144                    warn!("trying to notify consume unverified thread to stop, but unverified_queue_stop_tx already closed");
145                }
146                let _ = consumer_unverified_thread.join();
147            }
148        })
149        .expect("start chain_service thread should ok");
150
151    (chain_controller, chain_service_thread)
152}
153
154/// This structure restricts the scope of chain service, and forces chain
155/// service threads to terminate before dropping the structure.
156/// The content of this struct will always be present, the reason we
157/// wrap them in an option, is that we will need to consume them in
158/// Drop trait impl of this struct.
159pub struct ChainServiceScope(Option<(ChainController, thread::JoinHandle<()>)>);
160
161impl ChainServiceScope {
162    /// Creates a new ChainServiceScope structure
163    pub fn new(builder: ChainServicesBuilder) -> Self {
164        let (controller, join_handle) = build_chain_services(builder);
165        Self(Some((controller, join_handle)))
166    }
167
168    /// Returns a reference to chain controller
169    pub fn chain_controller(&self) -> &ChainController {
170        &self.0.as_ref().unwrap().0
171    }
172}
173
174impl Drop for ChainServiceScope {
175    fn drop(&mut self) {
176        let (controller, join_handle) = self.0.take().unwrap();
177        drop(controller);
178        let _ = join_handle.join();
179    }
180}