1#![allow(missing_docs)]
3use crate::block_status::BlockStatus;
4use crate::{HeaderMap, Snapshot, SnapshotMgr};
5use arc_swap::{ArcSwap, Guard};
6use ckb_async_runtime::Handle;
7use ckb_chain_spec::consensus::Consensus;
8use ckb_constant::store::TX_INDEX_UPPER_BOUND;
9use ckb_constant::sync::MAX_TIP_AGE;
10use ckb_db::{Direction, IteratorMode};
11use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH};
12use ckb_error::{AnyError, Error};
13use ckb_logger::debug;
14use ckb_notify::NotifyController;
15use ckb_proposal_table::ProposalView;
16use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
17use ckb_store::{ChainDB, ChainStore};
18use ckb_systemtime::unix_time_as_millis;
19use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController};
20use ckb_types::{
21 H256, U256,
22 core::{BlockNumber, EpochExt, EpochNumber, HeaderView, Version},
23 packed::{self, Byte32},
24 prelude::*,
25};
26use ckb_util::{Mutex, MutexGuard, shrink_to_fit};
27use ckb_verification::cache::TxVerificationCache;
28use dashmap::DashMap;
29use std::cmp;
30use std::collections::BTreeMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::thread;
34use std::time::Duration;
35
36const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
37const THRESHOLD_EPOCH: EpochNumber = 2;
38const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
39
40pub const SHRINK_THRESHOLD: usize = 300;
41
42pub struct FreezerClose {
44 stopped: Arc<AtomicBool>,
45}
46
47impl Drop for FreezerClose {
48 fn drop(&mut self) {
49 self.stopped.store(true, Ordering::SeqCst);
50 }
51}
52
53#[derive(Clone)]
58pub struct Shared {
59 pub(crate) store: ChainDB,
60 pub(crate) tx_pool_controller: TxPoolController,
61 pub(crate) notify_controller: NotifyController,
62 pub(crate) txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
63 pub(crate) consensus: Arc<Consensus>,
64 pub(crate) snapshot_mgr: Arc<SnapshotMgr>,
65 pub(crate) async_handle: Handle,
66 pub(crate) ibd_finished: Arc<AtomicBool>,
67
68 pub(crate) assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
69 pub(crate) assume_valid_target_specified: Arc<Option<H256>>,
70
71 pub header_map: Arc<HeaderMap>,
72 pub(crate) block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
73 pub(crate) unverified_tip: Arc<ArcSwap<crate::HeaderIndex>>,
74}
75
76impl Shared {
77 #[allow(clippy::too_many_arguments)]
79 pub fn new(
80 store: ChainDB,
81 tx_pool_controller: TxPoolController,
82 notify_controller: NotifyController,
83 txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
84 consensus: Arc<Consensus>,
85 snapshot_mgr: Arc<SnapshotMgr>,
86 async_handle: Handle,
87 ibd_finished: Arc<AtomicBool>,
88
89 assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
90 assume_valid_target_specified: Arc<Option<H256>>,
91 header_map: Arc<HeaderMap>,
92 block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
93 ) -> Shared {
94 let header = store
95 .get_tip_header()
96 .unwrap_or(consensus.genesis_block().header());
97 let unverified_tip = Arc::new(ArcSwap::new(Arc::new(crate::HeaderIndex::new(
98 header.number(),
99 header.hash(),
100 header.difficulty(),
101 ))));
102
103 Shared {
104 store,
105 tx_pool_controller,
106 notify_controller,
107 txs_verify_cache,
108 consensus,
109 snapshot_mgr,
110 async_handle,
111 ibd_finished,
112 assume_valid_targets,
113 assume_valid_target_specified,
114 header_map,
115 block_status_map,
116 unverified_tip,
117 }
118 }
119 pub fn spawn_freeze(&self) -> Option<FreezerClose> {
121 if let Some(freezer) = self.store.freezer() {
122 ckb_logger::info!("Freezer enabled");
123 let signal_receiver = new_crossbeam_exit_rx();
124 let shared = self.clone();
125 let freeze_jh = thread::Builder::new()
126 .spawn(move || {
127 loop {
128 match signal_receiver.recv_timeout(FREEZER_INTERVAL) {
129 Err(_) => {
130 if let Err(e) = shared.freeze() {
131 ckb_logger::error!("Freezer error {}", e);
132 break;
133 }
134 }
135 Ok(_) => {
136 ckb_logger::info!("Freezer closing");
137 break;
138 }
139 }
140 }
141 })
142 .expect("Start FreezerService failed");
143
144 register_thread("freeze", freeze_jh);
145
146 return Some(FreezerClose {
147 stopped: Arc::clone(&freezer.stopped),
148 });
149 }
150 None
151 }
152
153 fn freeze(&self) -> Result<(), Error> {
154 let freezer = self.store.freezer().expect("freezer inited");
155 let snapshot = self.snapshot();
156 let current_epoch = snapshot.epoch_ext().number();
157
158 if self.is_initial_block_download() {
159 ckb_logger::trace!("is_initial_block_download freeze skip");
160 return Ok(());
161 }
162
163 if current_epoch <= THRESHOLD_EPOCH {
164 ckb_logger::trace!("Freezer idles");
165 return Ok(());
166 }
167
168 let limit_block_hash = snapshot
169 .get_epoch_index(current_epoch + 1 - THRESHOLD_EPOCH)
170 .and_then(|index| snapshot.get_epoch_ext(&index))
171 .expect("get_epoch_ext")
172 .last_block_hash_in_previous_epoch();
173
174 let frozen_number = freezer.number();
175
176 let threshold = cmp::min(
177 snapshot
178 .get_block_number(&limit_block_hash)
179 .expect("get_block_number"),
180 frozen_number + MAX_FREEZE_LIMIT,
181 );
182
183 ckb_logger::trace!(
184 "Freezer current_epoch {} number {} threshold {}",
185 current_epoch,
186 frozen_number,
187 threshold
188 );
189
190 let store = self.store();
191 let get_unfrozen_block = |number: BlockNumber| {
192 store
193 .get_block_hash(number)
194 .and_then(|hash| store.get_unfrozen_block(&hash))
195 };
196
197 let ret = freezer.freeze(threshold, get_unfrozen_block)?;
198
199 let stopped = freezer.stopped.load(Ordering::SeqCst);
200
201 self.wipe_out_frozen_data(&snapshot, ret, stopped)?;
203
204 ckb_logger::trace!("Freezer completed");
205
206 Ok(())
207 }
208
209 fn wipe_out_frozen_data(
210 &self,
211 snapshot: &Snapshot,
212 frozen: BTreeMap<packed::Byte32, (BlockNumber, u32)>,
213 stopped: bool,
214 ) -> Result<(), Error> {
215 let mut side = BTreeMap::new();
216 let mut batch = self.store.new_write_batch();
217
218 ckb_logger::trace!("freezer wipe_out_frozen_data {} ", frozen.len());
219
220 if !frozen.is_empty() {
221 for (hash, (number, txs)) in &frozen {
223 batch.delete_block_body(*number, hash, *txs).map_err(|e| {
224 ckb_logger::error!("Freezer delete_block_body failed {}", e);
225 e
226 })?;
227
228 let pack_number: packed::Uint64 = number.into();
229 let prefix = pack_number.as_slice();
230 for (key, value) in snapshot
231 .get_iter(
232 COLUMN_NUMBER_HASH,
233 IteratorMode::From(prefix, Direction::Forward),
234 )
235 .take_while(|(key, _)| key.starts_with(prefix))
236 {
237 let reader = packed::NumberHashReader::from_slice_should_be_ok(key.as_ref());
238 let block_hash = reader.block_hash().to_entity();
239 if &block_hash != hash {
240 let txs =
241 packed::Uint32Reader::from_slice_should_be_ok(value.as_ref()).into();
242 side.insert(block_hash, (reader.number().to_entity(), txs));
243 }
244 }
245 }
246 self.store.write_sync(&batch).map_err(|e| {
247 ckb_logger::error!("Freezer write_batch delete failed {}", e);
248 e
249 })?;
250 batch.clear()?;
251
252 if !stopped {
253 let start = frozen.keys().min().expect("frozen empty checked");
254 let end = frozen.keys().max().expect("frozen empty checked");
255 self.compact_block_body(start, end);
256 }
257 }
258
259 if !side.is_empty() {
260 for (hash, (number, txs)) in &side {
262 batch.delete_block(number.into(), hash, *txs).map_err(|e| {
263 ckb_logger::error!("Freezer delete_block_body failed {}", e);
264 e
265 })?;
266 }
267
268 self.store.write(&batch).map_err(|e| {
269 ckb_logger::error!("Freezer write_batch delete failed {}", e);
270 e
271 })?;
272
273 if !stopped {
274 let start = side.keys().min().expect("side empty checked");
275 let end = side.keys().max().expect("side empty checked");
276 self.compact_block_body(start, end);
277 }
278 }
279 Ok(())
280 }
281
282 fn compact_block_body(&self, start: &packed::Byte32, end: &packed::Byte32) {
283 let start_t = packed::TransactionKey::new_builder()
284 .block_hash(start.clone())
285 .index(0u32)
286 .build();
287
288 let end_t = packed::TransactionKey::new_builder()
289 .block_hash(end.clone())
290 .index(TX_INDEX_UPPER_BOUND)
291 .build();
292
293 if let Err(e) = self.store.compact_range(
294 COLUMN_BLOCK_BODY,
295 Some(start_t.as_slice()),
296 Some(end_t.as_slice()),
297 ) {
298 ckb_logger::error!("Freezer compact_range {}-{} error {}", start, end, e);
299 }
300 }
301
302 pub fn tx_pool_controller(&self) -> &TxPoolController {
304 &self.tx_pool_controller
305 }
306
307 pub fn txs_verify_cache(&self) -> Arc<TokioRwLock<TxVerificationCache>> {
309 Arc::clone(&self.txs_verify_cache)
310 }
311
312 pub fn notify_controller(&self) -> &NotifyController {
314 &self.notify_controller
315 }
316
317 pub fn snapshot(&self) -> Guard<Arc<Snapshot>> {
319 self.snapshot_mgr.load()
320 }
321
322 pub fn cloned_snapshot(&self) -> Arc<Snapshot> {
324 Arc::clone(&self.snapshot())
325 }
326
327 pub fn store_snapshot(&self, snapshot: Arc<Snapshot>) {
329 self.snapshot_mgr.store(snapshot)
330 }
331
332 pub fn refresh_snapshot(&self) {
334 let new = self.snapshot().refresh(self.store.get_snapshot());
335 self.store_snapshot(Arc::new(new));
336 }
337
338 pub fn new_snapshot(
340 &self,
341 tip_header: HeaderView,
342 total_difficulty: U256,
343 epoch_ext: EpochExt,
344 proposals: ProposalView,
345 ) -> Arc<Snapshot> {
346 Arc::new(Snapshot::new(
347 tip_header,
348 total_difficulty,
349 epoch_ext,
350 self.store.get_snapshot(),
351 proposals,
352 Arc::clone(&self.consensus),
353 ))
354 }
355
356 pub fn consensus(&self) -> &Consensus {
358 &self.consensus
359 }
360
361 pub fn cloned_consensus(&self) -> Arc<Consensus> {
363 Arc::clone(&self.consensus)
364 }
365
366 pub fn async_handle(&self) -> &Handle {
368 &self.async_handle
369 }
370
371 pub fn genesis_hash(&self) -> Byte32 {
373 self.consensus.genesis_hash()
374 }
375
376 pub fn store(&self) -> &ChainDB {
378 &self.store
379 }
380
381 pub fn is_initial_block_download(&self) -> bool {
383 if self.ibd_finished.load(Ordering::Acquire) {
385 false
386 } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp())
387 > MAX_TIP_AGE
388 {
389 true
390 } else {
391 self.ibd_finished.store(true, Ordering::Release);
392 false
393 }
394 }
395
396 pub fn get_block_template(
398 &self,
399 bytes_limit: Option<u64>,
400 proposals_limit: Option<u64>,
401 max_version: Option<Version>,
402 ) -> Result<Result<BlockTemplate, AnyError>, AnyError> {
403 self.tx_pool_controller()
404 .get_block_template(bytes_limit, proposals_limit, max_version)
405 }
406
407 pub fn set_unverified_tip(&self, header: crate::HeaderIndex) {
408 self.unverified_tip.store(Arc::new(header));
409 }
410 pub fn get_unverified_tip(&self) -> crate::HeaderIndex {
411 self.unverified_tip.load().as_ref().clone()
412 }
413
414 pub fn header_map(&self) -> &HeaderMap {
415 &self.header_map
416 }
417 pub fn remove_header_view(&self, hash: &Byte32) {
418 self.header_map.remove(hash);
419 }
420
421 pub fn block_status_map(&self) -> &DashMap<Byte32, BlockStatus> {
422 &self.block_status_map
423 }
424
425 pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
426 match self.block_status_map().get(block_hash) {
427 Some(status_ref) => *status_ref.value(),
428 None => {
429 if self.header_map().contains_key(block_hash) {
430 BlockStatus::HEADER_VALID
431 } else {
432 let verified = self
433 .snapshot()
434 .get_block_ext(block_hash)
435 .map(|block_ext| block_ext.verified);
436 match verified {
437 None => BlockStatus::UNKNOWN,
438 Some(None) => BlockStatus::BLOCK_STORED,
439 Some(Some(true)) => BlockStatus::BLOCK_VALID,
440 Some(Some(false)) => BlockStatus::BLOCK_INVALID,
441 }
442 }
443 }
444 }
445 }
446
447 pub fn contains_block_status<T: ChainStore>(
448 &self,
449 block_hash: &Byte32,
450 status: BlockStatus,
451 ) -> bool {
452 self.get_block_status(block_hash).contains(status)
453 }
454
455 pub fn insert_block_status(&self, block_hash: Byte32, status: BlockStatus) {
456 self.block_status_map.insert(block_hash, status);
457 }
458
459 pub fn remove_block_status(&self, block_hash: &Byte32) {
460 let log_now = std::time::Instant::now();
461 self.block_status_map.remove(block_hash);
462 debug!("remove_block_status cost {:?}", log_now.elapsed());
463 shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
464 debug!(
465 "remove_block_status shrink_to_fit cost {:?}",
466 log_now.elapsed()
467 );
468 }
469
470 pub fn assume_valid_targets(&self) -> MutexGuard<Option<Vec<H256>>> {
471 self.assume_valid_targets.lock()
472 }
473
474 pub fn assume_valid_target_specified(&self) -> Arc<Option<H256>> {
475 Arc::clone(&self.assume_valid_target_specified)
476 }
477}