ckb_shared/
shared.rs

1//! Provide Shared
2#![allow(missing_docs)]
3use crate::block_status::BlockStatus;
4use crate::{HeaderMap, Snapshot, SnapshotMgr};
5use arc_swap::{ArcSwap, Guard};
6use ckb_async_runtime::Handle;
7use ckb_chain_spec::consensus::Consensus;
8use ckb_constant::store::TX_INDEX_UPPER_BOUND;
9use ckb_constant::sync::MAX_TIP_AGE;
10use ckb_db::{Direction, IteratorMode};
11use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH};
12use ckb_error::{AnyError, Error};
13use ckb_logger::debug;
14use ckb_notify::NotifyController;
15use ckb_proposal_table::ProposalView;
16use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
17use ckb_store::{ChainDB, ChainStore};
18use ckb_systemtime::unix_time_as_millis;
19use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController};
20use ckb_types::{
21    H256, U256,
22    core::{BlockNumber, EpochExt, EpochNumber, HeaderView, Version},
23    packed::{self, Byte32},
24    prelude::*,
25};
26use ckb_util::{Mutex, MutexGuard, shrink_to_fit};
27use ckb_verification::cache::TxVerificationCache;
28use dashmap::DashMap;
29use std::cmp;
30use std::collections::BTreeMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::thread;
34use std::time::Duration;
35
36const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
37const THRESHOLD_EPOCH: EpochNumber = 2;
38const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
39
40pub const SHRINK_THRESHOLD: usize = 300;
41
42/// An owned permission to close on a freezer thread
43pub struct FreezerClose {
44    stopped: Arc<AtomicBool>,
45}
46
47impl Drop for FreezerClose {
48    fn drop(&mut self) {
49        self.stopped.store(true, Ordering::SeqCst);
50    }
51}
52
53/// Shared blockchain state accessible across the CKB node.
54///
55/// Provides thread-safe access to the chain store, transaction pool, consensus parameters,
56/// and other core components needed throughout the node.
57#[derive(Clone)]
58pub struct Shared {
59    pub(crate) store: ChainDB,
60    pub(crate) tx_pool_controller: TxPoolController,
61    pub(crate) notify_controller: NotifyController,
62    pub(crate) txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
63    pub(crate) consensus: Arc<Consensus>,
64    pub(crate) snapshot_mgr: Arc<SnapshotMgr>,
65    pub(crate) async_handle: Handle,
66    pub(crate) ibd_finished: Arc<AtomicBool>,
67
68    pub(crate) assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
69    pub(crate) assume_valid_target_specified: Arc<Option<H256>>,
70
71    pub header_map: Arc<HeaderMap>,
72    pub(crate) block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
73    pub(crate) unverified_tip: Arc<ArcSwap<crate::HeaderIndex>>,
74}
75
76impl Shared {
77    /// Construct new Shared
78    #[allow(clippy::too_many_arguments)]
79    pub fn new(
80        store: ChainDB,
81        tx_pool_controller: TxPoolController,
82        notify_controller: NotifyController,
83        txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
84        consensus: Arc<Consensus>,
85        snapshot_mgr: Arc<SnapshotMgr>,
86        async_handle: Handle,
87        ibd_finished: Arc<AtomicBool>,
88
89        assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
90        assume_valid_target_specified: Arc<Option<H256>>,
91        header_map: Arc<HeaderMap>,
92        block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
93    ) -> Shared {
94        let header = store
95            .get_tip_header()
96            .unwrap_or(consensus.genesis_block().header());
97        let unverified_tip = Arc::new(ArcSwap::new(Arc::new(crate::HeaderIndex::new(
98            header.number(),
99            header.hash(),
100            header.difficulty(),
101        ))));
102
103        Shared {
104            store,
105            tx_pool_controller,
106            notify_controller,
107            txs_verify_cache,
108            consensus,
109            snapshot_mgr,
110            async_handle,
111            ibd_finished,
112            assume_valid_targets,
113            assume_valid_target_specified,
114            header_map,
115            block_status_map,
116            unverified_tip,
117        }
118    }
119    /// Spawn freeze background thread that periodically checks and moves ancient data from the kv database into the freezer.
120    pub fn spawn_freeze(&self) -> Option<FreezerClose> {
121        if let Some(freezer) = self.store.freezer() {
122            ckb_logger::info!("Freezer enabled");
123            let signal_receiver = new_crossbeam_exit_rx();
124            let shared = self.clone();
125            let freeze_jh = thread::Builder::new()
126                .spawn(move || {
127                    loop {
128                        match signal_receiver.recv_timeout(FREEZER_INTERVAL) {
129                            Err(_) => {
130                                if let Err(e) = shared.freeze() {
131                                    ckb_logger::error!("Freezer error {}", e);
132                                    break;
133                                }
134                            }
135                            Ok(_) => {
136                                ckb_logger::info!("Freezer closing");
137                                break;
138                            }
139                        }
140                    }
141                })
142                .expect("Start FreezerService failed");
143
144            register_thread("freeze", freeze_jh);
145
146            return Some(FreezerClose {
147                stopped: Arc::clone(&freezer.stopped),
148            });
149        }
150        None
151    }
152
153    fn freeze(&self) -> Result<(), Error> {
154        let freezer = self.store.freezer().expect("freezer inited");
155        let snapshot = self.snapshot();
156        let current_epoch = snapshot.epoch_ext().number();
157
158        if self.is_initial_block_download() {
159            ckb_logger::trace!("is_initial_block_download freeze skip");
160            return Ok(());
161        }
162
163        if current_epoch <= THRESHOLD_EPOCH {
164            ckb_logger::trace!("Freezer idles");
165            return Ok(());
166        }
167
168        let limit_block_hash = snapshot
169            .get_epoch_index(current_epoch + 1 - THRESHOLD_EPOCH)
170            .and_then(|index| snapshot.get_epoch_ext(&index))
171            .expect("get_epoch_ext")
172            .last_block_hash_in_previous_epoch();
173
174        let frozen_number = freezer.number();
175
176        let threshold = cmp::min(
177            snapshot
178                .get_block_number(&limit_block_hash)
179                .expect("get_block_number"),
180            frozen_number + MAX_FREEZE_LIMIT,
181        );
182
183        ckb_logger::trace!(
184            "Freezer current_epoch {} number {} threshold {}",
185            current_epoch,
186            frozen_number,
187            threshold
188        );
189
190        let store = self.store();
191        let get_unfrozen_block = |number: BlockNumber| {
192            store
193                .get_block_hash(number)
194                .and_then(|hash| store.get_unfrozen_block(&hash))
195        };
196
197        let ret = freezer.freeze(threshold, get_unfrozen_block)?;
198
199        let stopped = freezer.stopped.load(Ordering::SeqCst);
200
201        // Wipe out frozen data
202        self.wipe_out_frozen_data(&snapshot, ret, stopped)?;
203
204        ckb_logger::trace!("Freezer completed");
205
206        Ok(())
207    }
208
209    fn wipe_out_frozen_data(
210        &self,
211        snapshot: &Snapshot,
212        frozen: BTreeMap<packed::Byte32, (BlockNumber, u32)>,
213        stopped: bool,
214    ) -> Result<(), Error> {
215        let mut side = BTreeMap::new();
216        let mut batch = self.store.new_write_batch();
217
218        ckb_logger::trace!("freezer wipe_out_frozen_data {} ", frozen.len());
219
220        if !frozen.is_empty() {
221            // remain header
222            for (hash, (number, txs)) in &frozen {
223                batch.delete_block_body(*number, hash, *txs).map_err(|e| {
224                    ckb_logger::error!("Freezer delete_block_body failed {}", e);
225                    e
226                })?;
227
228                let pack_number: packed::Uint64 = number.into();
229                let prefix = pack_number.as_slice();
230                for (key, value) in snapshot
231                    .get_iter(
232                        COLUMN_NUMBER_HASH,
233                        IteratorMode::From(prefix, Direction::Forward),
234                    )
235                    .take_while(|(key, _)| key.starts_with(prefix))
236                {
237                    let reader = packed::NumberHashReader::from_slice_should_be_ok(key.as_ref());
238                    let block_hash = reader.block_hash().to_entity();
239                    if &block_hash != hash {
240                        let txs =
241                            packed::Uint32Reader::from_slice_should_be_ok(value.as_ref()).into();
242                        side.insert(block_hash, (reader.number().to_entity(), txs));
243                    }
244                }
245            }
246            self.store.write_sync(&batch).map_err(|e| {
247                ckb_logger::error!("Freezer write_batch delete failed {}", e);
248                e
249            })?;
250            batch.clear()?;
251
252            if !stopped {
253                let start = frozen.keys().min().expect("frozen empty checked");
254                let end = frozen.keys().max().expect("frozen empty checked");
255                self.compact_block_body(start, end);
256            }
257        }
258
259        if !side.is_empty() {
260            // Wipe out side chain
261            for (hash, (number, txs)) in &side {
262                batch.delete_block(number.into(), hash, *txs).map_err(|e| {
263                    ckb_logger::error!("Freezer delete_block_body failed {}", e);
264                    e
265                })?;
266            }
267
268            self.store.write(&batch).map_err(|e| {
269                ckb_logger::error!("Freezer write_batch delete failed {}", e);
270                e
271            })?;
272
273            if !stopped {
274                let start = side.keys().min().expect("side empty checked");
275                let end = side.keys().max().expect("side empty checked");
276                self.compact_block_body(start, end);
277            }
278        }
279        Ok(())
280    }
281
282    fn compact_block_body(&self, start: &packed::Byte32, end: &packed::Byte32) {
283        let start_t = packed::TransactionKey::new_builder()
284            .block_hash(start.clone())
285            .index(0u32)
286            .build();
287
288        let end_t = packed::TransactionKey::new_builder()
289            .block_hash(end.clone())
290            .index(TX_INDEX_UPPER_BOUND)
291            .build();
292
293        if let Err(e) = self.store.compact_range(
294            COLUMN_BLOCK_BODY,
295            Some(start_t.as_slice()),
296            Some(end_t.as_slice()),
297        ) {
298            ckb_logger::error!("Freezer compact_range {}-{} error {}", start, end, e);
299        }
300    }
301
302    /// Returns a reference to the transaction pool controller.
303    pub fn tx_pool_controller(&self) -> &TxPoolController {
304        &self.tx_pool_controller
305    }
306
307    /// Returns the transaction verification cache.
308    pub fn txs_verify_cache(&self) -> Arc<TokioRwLock<TxVerificationCache>> {
309        Arc::clone(&self.txs_verify_cache)
310    }
311
312    /// Returns a reference to the notification controller.
313    pub fn notify_controller(&self) -> &NotifyController {
314        &self.notify_controller
315    }
316
317    /// Returns a guard to the current snapshot of the blockchain state.
318    pub fn snapshot(&self) -> Guard<Arc<Snapshot>> {
319        self.snapshot_mgr.load()
320    }
321
322    /// Return arc cloned snapshot
323    pub fn cloned_snapshot(&self) -> Arc<Snapshot> {
324        Arc::clone(&self.snapshot())
325    }
326
327    /// Stores a new snapshot of the blockchain state.
328    pub fn store_snapshot(&self, snapshot: Arc<Snapshot>) {
329        self.snapshot_mgr.store(snapshot)
330    }
331
332    /// Refreshes the current snapshot with the latest store data.
333    pub fn refresh_snapshot(&self) {
334        let new = self.snapshot().refresh(self.store.get_snapshot());
335        self.store_snapshot(Arc::new(new));
336    }
337
338    /// Creates a new snapshot with the given tip and metadata.
339    pub fn new_snapshot(
340        &self,
341        tip_header: HeaderView,
342        total_difficulty: U256,
343        epoch_ext: EpochExt,
344        proposals: ProposalView,
345    ) -> Arc<Snapshot> {
346        Arc::new(Snapshot::new(
347            tip_header,
348            total_difficulty,
349            epoch_ext,
350            self.store.get_snapshot(),
351            proposals,
352            Arc::clone(&self.consensus),
353        ))
354    }
355
356    /// Returns a reference to the consensus parameters.
357    pub fn consensus(&self) -> &Consensus {
358        &self.consensus
359    }
360
361    /// Return arc cloned consensus re
362    pub fn cloned_consensus(&self) -> Arc<Consensus> {
363        Arc::clone(&self.consensus)
364    }
365
366    /// Return async runtime handle
367    pub fn async_handle(&self) -> &Handle {
368        &self.async_handle
369    }
370
371    /// Returns the hash of the genesis block.
372    pub fn genesis_hash(&self) -> Byte32 {
373        self.consensus.genesis_hash()
374    }
375
376    /// Returns a reference to the chain store.
377    pub fn store(&self) -> &ChainDB {
378        &self.store
379    }
380
381    /// Return whether chain is in initial block download
382    pub fn is_initial_block_download(&self) -> bool {
383        // Once this function has returned false, it must remain false.
384        if self.ibd_finished.load(Ordering::Acquire) {
385            false
386        } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp())
387            > MAX_TIP_AGE
388        {
389            true
390        } else {
391            self.ibd_finished.store(true, Ordering::Release);
392            false
393        }
394    }
395
396    /// Generate and return block_template
397    pub fn get_block_template(
398        &self,
399        bytes_limit: Option<u64>,
400        proposals_limit: Option<u64>,
401        max_version: Option<Version>,
402    ) -> Result<Result<BlockTemplate, AnyError>, AnyError> {
403        self.tx_pool_controller()
404            .get_block_template(bytes_limit, proposals_limit, max_version)
405    }
406
407    pub fn set_unverified_tip(&self, header: crate::HeaderIndex) {
408        self.unverified_tip.store(Arc::new(header));
409    }
410    pub fn get_unverified_tip(&self) -> crate::HeaderIndex {
411        self.unverified_tip.load().as_ref().clone()
412    }
413
414    pub fn header_map(&self) -> &HeaderMap {
415        &self.header_map
416    }
417    pub fn remove_header_view(&self, hash: &Byte32) {
418        self.header_map.remove(hash);
419    }
420
421    pub fn block_status_map(&self) -> &DashMap<Byte32, BlockStatus> {
422        &self.block_status_map
423    }
424
425    pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
426        match self.block_status_map().get(block_hash) {
427            Some(status_ref) => *status_ref.value(),
428            None => {
429                if self.header_map().contains_key(block_hash) {
430                    BlockStatus::HEADER_VALID
431                } else {
432                    let verified = self
433                        .snapshot()
434                        .get_block_ext(block_hash)
435                        .map(|block_ext| block_ext.verified);
436                    match verified {
437                        None => BlockStatus::UNKNOWN,
438                        Some(None) => BlockStatus::BLOCK_STORED,
439                        Some(Some(true)) => BlockStatus::BLOCK_VALID,
440                        Some(Some(false)) => BlockStatus::BLOCK_INVALID,
441                    }
442                }
443            }
444        }
445    }
446
447    pub fn contains_block_status<T: ChainStore>(
448        &self,
449        block_hash: &Byte32,
450        status: BlockStatus,
451    ) -> bool {
452        self.get_block_status(block_hash).contains(status)
453    }
454
455    pub fn insert_block_status(&self, block_hash: Byte32, status: BlockStatus) {
456        self.block_status_map.insert(block_hash, status);
457    }
458
459    pub fn remove_block_status(&self, block_hash: &Byte32) {
460        let log_now = std::time::Instant::now();
461        self.block_status_map.remove(block_hash);
462        debug!("remove_block_status cost {:?}", log_now.elapsed());
463        shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
464        debug!(
465            "remove_block_status shrink_to_fit cost {:?}",
466            log_now.elapsed()
467        );
468    }
469
470    pub fn assume_valid_targets(&self) -> MutexGuard<Option<Vec<H256>>> {
471        self.assume_valid_targets.lock()
472    }
473
474    pub fn assume_valid_target_specified(&self) -> Arc<Option<H256>> {
475        Arc::clone(&self.assume_valid_target_specified)
476    }
477}