1#![allow(missing_docs)]
3use crate::block_status::BlockStatus;
4use crate::{HeaderMap, Snapshot, SnapshotMgr};
5use arc_swap::{ArcSwap, Guard};
6use ckb_async_runtime::Handle;
7use ckb_chain_spec::consensus::Consensus;
8use ckb_constant::store::TX_INDEX_UPPER_BOUND;
9use ckb_constant::sync::MAX_TIP_AGE;
10use ckb_db::{Direction, IteratorMode};
11use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH};
12use ckb_error::{AnyError, Error};
13use ckb_logger::debug;
14use ckb_notify::NotifyController;
15use ckb_proposal_table::ProposalView;
16use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
17use ckb_store::{ChainDB, ChainStore};
18use ckb_systemtime::unix_time_as_millis;
19use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController};
20use ckb_types::{
21 H256, U256,
22 core::{BlockNumber, EpochExt, EpochNumber, HeaderView, Version},
23 packed::{self, Byte32},
24 prelude::*,
25};
26use ckb_util::{Mutex, MutexGuard, shrink_to_fit};
27use ckb_verification::cache::TxVerificationCache;
28use dashmap::DashMap;
29use std::cmp;
30use std::collections::BTreeMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::thread;
34use std::time::Duration;
35
36const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
37const THRESHOLD_EPOCH: EpochNumber = 2;
38const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
39
40pub const SHRINK_THRESHOLD: usize = 300;
41
42pub struct FreezerClose {
44 stopped: Arc<AtomicBool>,
45}
46
47impl Drop for FreezerClose {
48 fn drop(&mut self) {
49 self.stopped.store(true, Ordering::SeqCst);
50 }
51}
52
53#[derive(Clone)]
55pub struct Shared {
56 pub(crate) store: ChainDB,
57 pub(crate) tx_pool_controller: TxPoolController,
58 pub(crate) notify_controller: NotifyController,
59 pub(crate) txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
60 pub(crate) consensus: Arc<Consensus>,
61 pub(crate) snapshot_mgr: Arc<SnapshotMgr>,
62 pub(crate) async_handle: Handle,
63 pub(crate) ibd_finished: Arc<AtomicBool>,
64
65 pub(crate) assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
66 pub(crate) assume_valid_target_specified: Arc<Option<H256>>,
67
68 pub header_map: Arc<HeaderMap>,
69 pub(crate) block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
70 pub(crate) unverified_tip: Arc<ArcSwap<crate::HeaderIndex>>,
71}
72
73impl Shared {
74 #[allow(clippy::too_many_arguments)]
76 pub fn new(
77 store: ChainDB,
78 tx_pool_controller: TxPoolController,
79 notify_controller: NotifyController,
80 txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
81 consensus: Arc<Consensus>,
82 snapshot_mgr: Arc<SnapshotMgr>,
83 async_handle: Handle,
84 ibd_finished: Arc<AtomicBool>,
85
86 assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
87 assume_valid_target_specified: Arc<Option<H256>>,
88 header_map: Arc<HeaderMap>,
89 block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
90 ) -> Shared {
91 let header = store
92 .get_tip_header()
93 .unwrap_or(consensus.genesis_block().header());
94 let unverified_tip = Arc::new(ArcSwap::new(Arc::new(crate::HeaderIndex::new(
95 header.number(),
96 header.hash(),
97 header.difficulty(),
98 ))));
99
100 Shared {
101 store,
102 tx_pool_controller,
103 notify_controller,
104 txs_verify_cache,
105 consensus,
106 snapshot_mgr,
107 async_handle,
108 ibd_finished,
109 assume_valid_targets,
110 assume_valid_target_specified,
111 header_map,
112 block_status_map,
113 unverified_tip,
114 }
115 }
116 pub fn spawn_freeze(&self) -> Option<FreezerClose> {
118 if let Some(freezer) = self.store.freezer() {
119 ckb_logger::info!("Freezer enabled");
120 let signal_receiver = new_crossbeam_exit_rx();
121 let shared = self.clone();
122 let freeze_jh = thread::Builder::new()
123 .spawn(move || {
124 loop {
125 match signal_receiver.recv_timeout(FREEZER_INTERVAL) {
126 Err(_) => {
127 if let Err(e) = shared.freeze() {
128 ckb_logger::error!("Freezer error {}", e);
129 break;
130 }
131 }
132 Ok(_) => {
133 ckb_logger::info!("Freezer closing");
134 break;
135 }
136 }
137 }
138 })
139 .expect("Start FreezerService failed");
140
141 register_thread("freeze", freeze_jh);
142
143 return Some(FreezerClose {
144 stopped: Arc::clone(&freezer.stopped),
145 });
146 }
147 None
148 }
149
150 fn freeze(&self) -> Result<(), Error> {
151 let freezer = self.store.freezer().expect("freezer inited");
152 let snapshot = self.snapshot();
153 let current_epoch = snapshot.epoch_ext().number();
154
155 if self.is_initial_block_download() {
156 ckb_logger::trace!("is_initial_block_download freeze skip");
157 return Ok(());
158 }
159
160 if current_epoch <= THRESHOLD_EPOCH {
161 ckb_logger::trace!("Freezer idles");
162 return Ok(());
163 }
164
165 let limit_block_hash = snapshot
166 .get_epoch_index(current_epoch + 1 - THRESHOLD_EPOCH)
167 .and_then(|index| snapshot.get_epoch_ext(&index))
168 .expect("get_epoch_ext")
169 .last_block_hash_in_previous_epoch();
170
171 let frozen_number = freezer.number();
172
173 let threshold = cmp::min(
174 snapshot
175 .get_block_number(&limit_block_hash)
176 .expect("get_block_number"),
177 frozen_number + MAX_FREEZE_LIMIT,
178 );
179
180 ckb_logger::trace!(
181 "Freezer current_epoch {} number {} threshold {}",
182 current_epoch,
183 frozen_number,
184 threshold
185 );
186
187 let store = self.store();
188 let get_unfrozen_block = |number: BlockNumber| {
189 store
190 .get_block_hash(number)
191 .and_then(|hash| store.get_unfrozen_block(&hash))
192 };
193
194 let ret = freezer.freeze(threshold, get_unfrozen_block)?;
195
196 let stopped = freezer.stopped.load(Ordering::SeqCst);
197
198 self.wipe_out_frozen_data(&snapshot, ret, stopped)?;
200
201 ckb_logger::trace!("Freezer completed");
202
203 Ok(())
204 }
205
206 fn wipe_out_frozen_data(
207 &self,
208 snapshot: &Snapshot,
209 frozen: BTreeMap<packed::Byte32, (BlockNumber, u32)>,
210 stopped: bool,
211 ) -> Result<(), Error> {
212 let mut side = BTreeMap::new();
213 let mut batch = self.store.new_write_batch();
214
215 ckb_logger::trace!("freezer wipe_out_frozen_data {} ", frozen.len());
216
217 if !frozen.is_empty() {
218 for (hash, (number, txs)) in &frozen {
220 batch.delete_block_body(*number, hash, *txs).map_err(|e| {
221 ckb_logger::error!("Freezer delete_block_body failed {}", e);
222 e
223 })?;
224
225 let pack_number: packed::Uint64 = number.pack();
226 let prefix = pack_number.as_slice();
227 for (key, value) in snapshot
228 .get_iter(
229 COLUMN_NUMBER_HASH,
230 IteratorMode::From(prefix, Direction::Forward),
231 )
232 .take_while(|(key, _)| key.starts_with(prefix))
233 {
234 let reader = packed::NumberHashReader::from_slice_should_be_ok(key.as_ref());
235 let block_hash = reader.block_hash().to_entity();
236 if &block_hash != hash {
237 let txs =
238 packed::Uint32Reader::from_slice_should_be_ok(value.as_ref()).unpack();
239 side.insert(block_hash, (reader.number().to_entity(), txs));
240 }
241 }
242 }
243 self.store.write_sync(&batch).map_err(|e| {
244 ckb_logger::error!("Freezer write_batch delete failed {}", e);
245 e
246 })?;
247 batch.clear()?;
248
249 if !stopped {
250 let start = frozen.keys().min().expect("frozen empty checked");
251 let end = frozen.keys().max().expect("frozen empty checked");
252 self.compact_block_body(start, end);
253 }
254 }
255
256 if !side.is_empty() {
257 for (hash, (number, txs)) in &side {
259 batch
260 .delete_block(number.unpack(), hash, *txs)
261 .map_err(|e| {
262 ckb_logger::error!("Freezer delete_block_body failed {}", e);
263 e
264 })?;
265 }
266
267 self.store.write(&batch).map_err(|e| {
268 ckb_logger::error!("Freezer write_batch delete failed {}", e);
269 e
270 })?;
271
272 if !stopped {
273 let start = side.keys().min().expect("side empty checked");
274 let end = side.keys().max().expect("side empty checked");
275 self.compact_block_body(start, end);
276 }
277 }
278 Ok(())
279 }
280
281 fn compact_block_body(&self, start: &packed::Byte32, end: &packed::Byte32) {
282 let start_t = packed::TransactionKey::new_builder()
283 .block_hash(start.clone())
284 .index(0u32.pack())
285 .build();
286
287 let end_t = packed::TransactionKey::new_builder()
288 .block_hash(end.clone())
289 .index(TX_INDEX_UPPER_BOUND.pack())
290 .build();
291
292 if let Err(e) = self.store.compact_range(
293 COLUMN_BLOCK_BODY,
294 Some(start_t.as_slice()),
295 Some(end_t.as_slice()),
296 ) {
297 ckb_logger::error!("Freezer compact_range {}-{} error {}", start, end, e);
298 }
299 }
300
301 pub fn tx_pool_controller(&self) -> &TxPoolController {
303 &self.tx_pool_controller
304 }
305
306 pub fn txs_verify_cache(&self) -> Arc<TokioRwLock<TxVerificationCache>> {
308 Arc::clone(&self.txs_verify_cache)
309 }
310
311 pub fn notify_controller(&self) -> &NotifyController {
313 &self.notify_controller
314 }
315
316 pub fn snapshot(&self) -> Guard<Arc<Snapshot>> {
318 self.snapshot_mgr.load()
319 }
320
321 pub fn cloned_snapshot(&self) -> Arc<Snapshot> {
323 Arc::clone(&self.snapshot())
324 }
325
326 pub fn store_snapshot(&self, snapshot: Arc<Snapshot>) {
328 self.snapshot_mgr.store(snapshot)
329 }
330
331 pub fn refresh_snapshot(&self) {
333 let new = self.snapshot().refresh(self.store.get_snapshot());
334 self.store_snapshot(Arc::new(new));
335 }
336
337 pub fn new_snapshot(
339 &self,
340 tip_header: HeaderView,
341 total_difficulty: U256,
342 epoch_ext: EpochExt,
343 proposals: ProposalView,
344 ) -> Arc<Snapshot> {
345 Arc::new(Snapshot::new(
346 tip_header,
347 total_difficulty,
348 epoch_ext,
349 self.store.get_snapshot(),
350 proposals,
351 Arc::clone(&self.consensus),
352 ))
353 }
354
355 pub fn consensus(&self) -> &Consensus {
357 &self.consensus
358 }
359
360 pub fn cloned_consensus(&self) -> Arc<Consensus> {
362 Arc::clone(&self.consensus)
363 }
364
365 pub fn async_handle(&self) -> &Handle {
367 &self.async_handle
368 }
369
370 pub fn genesis_hash(&self) -> Byte32 {
372 self.consensus.genesis_hash()
373 }
374
375 pub fn store(&self) -> &ChainDB {
377 &self.store
378 }
379
380 pub fn is_initial_block_download(&self) -> bool {
382 if self.ibd_finished.load(Ordering::Acquire) {
384 false
385 } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp())
386 > MAX_TIP_AGE
387 {
388 true
389 } else {
390 self.ibd_finished.store(true, Ordering::Release);
391 false
392 }
393 }
394
395 pub fn get_block_template(
397 &self,
398 bytes_limit: Option<u64>,
399 proposals_limit: Option<u64>,
400 max_version: Option<Version>,
401 ) -> Result<Result<BlockTemplate, AnyError>, AnyError> {
402 self.tx_pool_controller()
403 .get_block_template(bytes_limit, proposals_limit, max_version)
404 }
405
406 pub fn set_unverified_tip(&self, header: crate::HeaderIndex) {
407 self.unverified_tip.store(Arc::new(header));
408 }
409 pub fn get_unverified_tip(&self) -> crate::HeaderIndex {
410 self.unverified_tip.load().as_ref().clone()
411 }
412
413 pub fn header_map(&self) -> &HeaderMap {
414 &self.header_map
415 }
416 pub fn remove_header_view(&self, hash: &Byte32) {
417 self.header_map.remove(hash);
418 }
419
420 pub fn block_status_map(&self) -> &DashMap<Byte32, BlockStatus> {
421 &self.block_status_map
422 }
423
424 pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
425 match self.block_status_map().get(block_hash) {
426 Some(status_ref) => *status_ref.value(),
427 None => {
428 if self.header_map().contains_key(block_hash) {
429 BlockStatus::HEADER_VALID
430 } else {
431 let verified = self
432 .snapshot()
433 .get_block_ext(block_hash)
434 .map(|block_ext| block_ext.verified);
435 match verified {
436 None => BlockStatus::UNKNOWN,
437 Some(None) => BlockStatus::BLOCK_STORED,
438 Some(Some(true)) => BlockStatus::BLOCK_VALID,
439 Some(Some(false)) => BlockStatus::BLOCK_INVALID,
440 }
441 }
442 }
443 }
444 }
445
446 pub fn contains_block_status<T: ChainStore>(
447 &self,
448 block_hash: &Byte32,
449 status: BlockStatus,
450 ) -> bool {
451 self.get_block_status(block_hash).contains(status)
452 }
453
454 pub fn insert_block_status(&self, block_hash: Byte32, status: BlockStatus) {
455 self.block_status_map.insert(block_hash, status);
456 }
457
458 pub fn remove_block_status(&self, block_hash: &Byte32) {
459 let log_now = std::time::Instant::now();
460 self.block_status_map.remove(block_hash);
461 debug!("remove_block_status cost {:?}", log_now.elapsed());
462 shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
463 debug!(
464 "remove_block_status shrink_to_fit cost {:?}",
465 log_now.elapsed()
466 );
467 }
468
469 pub fn assume_valid_targets(&self) -> MutexGuard<Option<Vec<H256>>> {
470 self.assume_valid_targets.lock()
471 }
472
473 pub fn assume_valid_target_specified(&self) -> Arc<Option<H256>> {
474 Arc::clone(&self.assume_valid_target_specified)
475 }
476}