1#![allow(missing_docs)]
2
3use crate::chain_service::ChainService;
5use crate::init_load_unverified::InitLoadUnverified;
6use crate::orphan_broker::OrphanBroker;
7use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel;
8use crate::utils::orphan_block_pool::OrphanBlockPool;
9use crate::verify::ConsumeUnverifiedBlocks;
10use crate::{LonelyBlockHash, UnverifiedBlock, chain_controller::ChainController};
11use ckb_channel::{self as channel, SendError};
12use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW;
13use ckb_logger::warn;
14use ckb_shared::ChainServicesBuilder;
15use ckb_stop_handler::register_thread;
16use ckb_types::packed::Byte32;
17use dashmap::DashSet;
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::thread;
21
22const ORPHAN_BLOCK_SIZE: usize = BLOCK_DOWNLOAD_WINDOW as usize;
23
24pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
32 let (chain_service, chain_service_thread) = build_chain_services(builder);
33 register_thread("ChainService", chain_service_thread);
34
35 chain_service
36}
37
38pub fn build_chain_services(
41 builder: ChainServicesBuilder,
42) -> (ChainController, thread::JoinHandle<()>) {
43 let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));
44
45 let (truncate_block_tx, truncate_block_rx) = channel::bounded(1);
46
47 let (preload_unverified_stop_tx, preload_unverified_stop_rx) = ckb_channel::bounded::<()>(1);
48
49 let (preload_unverified_tx, preload_unverified_rx) =
50 channel::bounded::<LonelyBlockHash>(BLOCK_DOWNLOAD_WINDOW as usize * 10);
51
52 let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1);
53 let (unverified_block_tx, unverified_block_rx) = channel::bounded::<UnverifiedBlock>(128usize);
54
55 let is_pending_verify: Arc<DashSet<Byte32>> = Arc::new(DashSet::new());
56
57 let consumer_unverified_thread = thread::Builder::new()
58 .name("verify_blocks".into())
59 .spawn({
60 let shared = builder.shared.clone();
61 let is_pending_verify = Arc::clone(&is_pending_verify);
62 move || {
63 let consume_unverified = ConsumeUnverifiedBlocks::new(
64 shared,
65 unverified_block_rx,
66 truncate_block_rx,
67 builder.proposal_table,
68 is_pending_verify,
69 unverified_queue_stop_rx,
70 );
71
72 consume_unverified.start();
73 }
74 })
75 .expect("start unverified_queue consumer thread should ok");
76
77 let preload_unverified_block_thread = thread::Builder::new()
78 .name("preload_unverified_block".into())
79 .spawn({
80 let shared = builder.shared.clone();
81 move || {
82 let preload_unverified_block = PreloadUnverifiedBlocksChannel::new(
83 shared,
84 preload_unverified_rx,
85 unverified_block_tx,
86 preload_unverified_stop_rx,
87 );
88 preload_unverified_block.start()
89 }
90 })
91 .expect("start preload_unverified_block should ok");
92
93 let (process_block_tx, process_block_rx) = channel::bounded(0);
94
95 let is_verifying_unverified_blocks_on_startup = Arc::new(AtomicBool::new(true));
96
97 let chain_controller = ChainController::new(
98 process_block_tx,
99 truncate_block_tx,
100 Arc::clone(&orphan_blocks_broker),
101 Arc::clone(&is_verifying_unverified_blocks_on_startup),
102 );
103
104 let init_load_unverified_thread = thread::Builder::new()
105 .name("init_load_unverified_blocks".into())
106 .spawn({
107 let chain_controller = chain_controller.clone();
108 let shared = builder.shared.clone();
109
110 move || {
111 let init_load_unverified: InitLoadUnverified = InitLoadUnverified::new(
112 shared,
113 chain_controller,
114 is_verifying_unverified_blocks_on_startup,
115 );
116 init_load_unverified.start();
117 }
118 })
119 .expect("start unverified_queue consumer thread should ok");
120
121 let consume_orphan = OrphanBroker::new(
122 builder.shared.clone(),
123 orphan_blocks_broker,
124 preload_unverified_tx,
125 is_pending_verify,
126 );
127
128 let chain_service: ChainService =
129 ChainService::new(builder.shared, process_block_rx, consume_orphan);
130 let chain_service_thread = thread::Builder::new()
131 .name("ChainService".into())
132 .spawn({
133 move || {
134 chain_service.start_process_block();
135
136 let _ = init_load_unverified_thread.join();
137
138 if preload_unverified_stop_tx.send(()).is_err(){
139 warn!("trying to notify preload unverified thread to stop, but preload_unverified_stop_tx already closed");
140 }
141 let _ = preload_unverified_block_thread.join();
142
143 if let Err(SendError(_)) = unverified_queue_stop_tx.send(()) {
144 warn!("trying to notify consume unverified thread to stop, but unverified_queue_stop_tx already closed");
145 }
146 let _ = consumer_unverified_thread.join();
147 }
148 })
149 .expect("start chain_service thread should ok");
150
151 (chain_controller, chain_service_thread)
152}
153
154pub struct ChainServiceScope(Option<(ChainController, thread::JoinHandle<()>)>);
160
161impl ChainServiceScope {
162 pub fn new(builder: ChainServicesBuilder) -> Self {
164 let (controller, join_handle) = build_chain_services(builder);
165 Self(Some((controller, join_handle)))
166 }
167
168 pub fn chain_controller(&self) -> &ChainController {
170 &self.0.as_ref().unwrap().0
171 }
172}
173
174impl Drop for ChainServiceScope {
175 fn drop(&mut self) {
176 let (controller, join_handle) = self.0.take().unwrap();
177 drop(controller);
178 let _ = join_handle.join();
179 }
180}