ckb_shared/
shared.rs

1//! Provide Shared
2#![allow(missing_docs)]
3use crate::block_status::BlockStatus;
4use crate::{HeaderMap, Snapshot, SnapshotMgr};
5use arc_swap::{ArcSwap, Guard};
6use ckb_async_runtime::Handle;
7use ckb_chain_spec::consensus::Consensus;
8use ckb_constant::store::TX_INDEX_UPPER_BOUND;
9use ckb_constant::sync::MAX_TIP_AGE;
10use ckb_db::{Direction, IteratorMode};
11use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH};
12use ckb_error::{AnyError, Error};
13use ckb_logger::debug;
14use ckb_notify::NotifyController;
15use ckb_proposal_table::ProposalView;
16use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
17use ckb_store::{ChainDB, ChainStore};
18use ckb_systemtime::unix_time_as_millis;
19use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController};
20use ckb_types::{
21    H256, U256,
22    core::{BlockNumber, EpochExt, EpochNumber, HeaderView, Version},
23    packed::{self, Byte32},
24    prelude::*,
25};
26use ckb_util::{Mutex, MutexGuard, shrink_to_fit};
27use ckb_verification::cache::TxVerificationCache;
28use dashmap::DashMap;
29use std::cmp;
30use std::collections::BTreeMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::thread;
34use std::time::Duration;
35
36const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
37const THRESHOLD_EPOCH: EpochNumber = 2;
38const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
39
40pub const SHRINK_THRESHOLD: usize = 300;
41
42/// An owned permission to close on a freezer thread
43pub struct FreezerClose {
44    stopped: Arc<AtomicBool>,
45}
46
47impl Drop for FreezerClose {
48    fn drop(&mut self) {
49        self.stopped.store(true, Ordering::SeqCst);
50    }
51}
52
53/// TODO(doc): @quake
54#[derive(Clone)]
55pub struct Shared {
56    pub(crate) store: ChainDB,
57    pub(crate) tx_pool_controller: TxPoolController,
58    pub(crate) notify_controller: NotifyController,
59    pub(crate) txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
60    pub(crate) consensus: Arc<Consensus>,
61    pub(crate) snapshot_mgr: Arc<SnapshotMgr>,
62    pub(crate) async_handle: Handle,
63    pub(crate) ibd_finished: Arc<AtomicBool>,
64
65    pub(crate) assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
66    pub(crate) assume_valid_target_specified: Arc<Option<H256>>,
67
68    pub header_map: Arc<HeaderMap>,
69    pub(crate) block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
70    pub(crate) unverified_tip: Arc<ArcSwap<crate::HeaderIndex>>,
71}
72
73impl Shared {
74    /// Construct new Shared
75    #[allow(clippy::too_many_arguments)]
76    pub fn new(
77        store: ChainDB,
78        tx_pool_controller: TxPoolController,
79        notify_controller: NotifyController,
80        txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
81        consensus: Arc<Consensus>,
82        snapshot_mgr: Arc<SnapshotMgr>,
83        async_handle: Handle,
84        ibd_finished: Arc<AtomicBool>,
85
86        assume_valid_targets: Arc<Mutex<Option<Vec<H256>>>>,
87        assume_valid_target_specified: Arc<Option<H256>>,
88        header_map: Arc<HeaderMap>,
89        block_status_map: Arc<DashMap<Byte32, BlockStatus>>,
90    ) -> Shared {
91        let header = store
92            .get_tip_header()
93            .unwrap_or(consensus.genesis_block().header());
94        let unverified_tip = Arc::new(ArcSwap::new(Arc::new(crate::HeaderIndex::new(
95            header.number(),
96            header.hash(),
97            header.difficulty(),
98        ))));
99
100        Shared {
101            store,
102            tx_pool_controller,
103            notify_controller,
104            txs_verify_cache,
105            consensus,
106            snapshot_mgr,
107            async_handle,
108            ibd_finished,
109            assume_valid_targets,
110            assume_valid_target_specified,
111            header_map,
112            block_status_map,
113            unverified_tip,
114        }
115    }
116    /// Spawn freeze background thread that periodically checks and moves ancient data from the kv database into the freezer.
117    pub fn spawn_freeze(&self) -> Option<FreezerClose> {
118        if let Some(freezer) = self.store.freezer() {
119            ckb_logger::info!("Freezer enabled");
120            let signal_receiver = new_crossbeam_exit_rx();
121            let shared = self.clone();
122            let freeze_jh = thread::Builder::new()
123                .spawn(move || {
124                    loop {
125                        match signal_receiver.recv_timeout(FREEZER_INTERVAL) {
126                            Err(_) => {
127                                if let Err(e) = shared.freeze() {
128                                    ckb_logger::error!("Freezer error {}", e);
129                                    break;
130                                }
131                            }
132                            Ok(_) => {
133                                ckb_logger::info!("Freezer closing");
134                                break;
135                            }
136                        }
137                    }
138                })
139                .expect("Start FreezerService failed");
140
141            register_thread("freeze", freeze_jh);
142
143            return Some(FreezerClose {
144                stopped: Arc::clone(&freezer.stopped),
145            });
146        }
147        None
148    }
149
150    fn freeze(&self) -> Result<(), Error> {
151        let freezer = self.store.freezer().expect("freezer inited");
152        let snapshot = self.snapshot();
153        let current_epoch = snapshot.epoch_ext().number();
154
155        if self.is_initial_block_download() {
156            ckb_logger::trace!("is_initial_block_download freeze skip");
157            return Ok(());
158        }
159
160        if current_epoch <= THRESHOLD_EPOCH {
161            ckb_logger::trace!("Freezer idles");
162            return Ok(());
163        }
164
165        let limit_block_hash = snapshot
166            .get_epoch_index(current_epoch + 1 - THRESHOLD_EPOCH)
167            .and_then(|index| snapshot.get_epoch_ext(&index))
168            .expect("get_epoch_ext")
169            .last_block_hash_in_previous_epoch();
170
171        let frozen_number = freezer.number();
172
173        let threshold = cmp::min(
174            snapshot
175                .get_block_number(&limit_block_hash)
176                .expect("get_block_number"),
177            frozen_number + MAX_FREEZE_LIMIT,
178        );
179
180        ckb_logger::trace!(
181            "Freezer current_epoch {} number {} threshold {}",
182            current_epoch,
183            frozen_number,
184            threshold
185        );
186
187        let store = self.store();
188        let get_unfrozen_block = |number: BlockNumber| {
189            store
190                .get_block_hash(number)
191                .and_then(|hash| store.get_unfrozen_block(&hash))
192        };
193
194        let ret = freezer.freeze(threshold, get_unfrozen_block)?;
195
196        let stopped = freezer.stopped.load(Ordering::SeqCst);
197
198        // Wipe out frozen data
199        self.wipe_out_frozen_data(&snapshot, ret, stopped)?;
200
201        ckb_logger::trace!("Freezer completed");
202
203        Ok(())
204    }
205
206    fn wipe_out_frozen_data(
207        &self,
208        snapshot: &Snapshot,
209        frozen: BTreeMap<packed::Byte32, (BlockNumber, u32)>,
210        stopped: bool,
211    ) -> Result<(), Error> {
212        let mut side = BTreeMap::new();
213        let mut batch = self.store.new_write_batch();
214
215        ckb_logger::trace!("freezer wipe_out_frozen_data {} ", frozen.len());
216
217        if !frozen.is_empty() {
218            // remain header
219            for (hash, (number, txs)) in &frozen {
220                batch.delete_block_body(*number, hash, *txs).map_err(|e| {
221                    ckb_logger::error!("Freezer delete_block_body failed {}", e);
222                    e
223                })?;
224
225                let pack_number: packed::Uint64 = number.pack();
226                let prefix = pack_number.as_slice();
227                for (key, value) in snapshot
228                    .get_iter(
229                        COLUMN_NUMBER_HASH,
230                        IteratorMode::From(prefix, Direction::Forward),
231                    )
232                    .take_while(|(key, _)| key.starts_with(prefix))
233                {
234                    let reader = packed::NumberHashReader::from_slice_should_be_ok(key.as_ref());
235                    let block_hash = reader.block_hash().to_entity();
236                    if &block_hash != hash {
237                        let txs =
238                            packed::Uint32Reader::from_slice_should_be_ok(value.as_ref()).unpack();
239                        side.insert(block_hash, (reader.number().to_entity(), txs));
240                    }
241                }
242            }
243            self.store.write_sync(&batch).map_err(|e| {
244                ckb_logger::error!("Freezer write_batch delete failed {}", e);
245                e
246            })?;
247            batch.clear()?;
248
249            if !stopped {
250                let start = frozen.keys().min().expect("frozen empty checked");
251                let end = frozen.keys().max().expect("frozen empty checked");
252                self.compact_block_body(start, end);
253            }
254        }
255
256        if !side.is_empty() {
257            // Wipe out side chain
258            for (hash, (number, txs)) in &side {
259                batch
260                    .delete_block(number.unpack(), hash, *txs)
261                    .map_err(|e| {
262                        ckb_logger::error!("Freezer delete_block_body failed {}", e);
263                        e
264                    })?;
265            }
266
267            self.store.write(&batch).map_err(|e| {
268                ckb_logger::error!("Freezer write_batch delete failed {}", e);
269                e
270            })?;
271
272            if !stopped {
273                let start = side.keys().min().expect("side empty checked");
274                let end = side.keys().max().expect("side empty checked");
275                self.compact_block_body(start, end);
276            }
277        }
278        Ok(())
279    }
280
281    fn compact_block_body(&self, start: &packed::Byte32, end: &packed::Byte32) {
282        let start_t = packed::TransactionKey::new_builder()
283            .block_hash(start.clone())
284            .index(0u32.pack())
285            .build();
286
287        let end_t = packed::TransactionKey::new_builder()
288            .block_hash(end.clone())
289            .index(TX_INDEX_UPPER_BOUND.pack())
290            .build();
291
292        if let Err(e) = self.store.compact_range(
293            COLUMN_BLOCK_BODY,
294            Some(start_t.as_slice()),
295            Some(end_t.as_slice()),
296        ) {
297            ckb_logger::error!("Freezer compact_range {}-{} error {}", start, end, e);
298        }
299    }
300
301    /// TODO(doc): @quake
302    pub fn tx_pool_controller(&self) -> &TxPoolController {
303        &self.tx_pool_controller
304    }
305
306    /// TODO(doc): @quake
307    pub fn txs_verify_cache(&self) -> Arc<TokioRwLock<TxVerificationCache>> {
308        Arc::clone(&self.txs_verify_cache)
309    }
310
311    /// TODO(doc): @quake
312    pub fn notify_controller(&self) -> &NotifyController {
313        &self.notify_controller
314    }
315
316    /// TODO(doc): @quake
317    pub fn snapshot(&self) -> Guard<Arc<Snapshot>> {
318        self.snapshot_mgr.load()
319    }
320
321    /// Return arc cloned snapshot
322    pub fn cloned_snapshot(&self) -> Arc<Snapshot> {
323        Arc::clone(&self.snapshot())
324    }
325
326    /// TODO(doc): @quake
327    pub fn store_snapshot(&self, snapshot: Arc<Snapshot>) {
328        self.snapshot_mgr.store(snapshot)
329    }
330
331    /// TODO(doc): @quake
332    pub fn refresh_snapshot(&self) {
333        let new = self.snapshot().refresh(self.store.get_snapshot());
334        self.store_snapshot(Arc::new(new));
335    }
336
337    /// TODO(doc): @quake
338    pub fn new_snapshot(
339        &self,
340        tip_header: HeaderView,
341        total_difficulty: U256,
342        epoch_ext: EpochExt,
343        proposals: ProposalView,
344    ) -> Arc<Snapshot> {
345        Arc::new(Snapshot::new(
346            tip_header,
347            total_difficulty,
348            epoch_ext,
349            self.store.get_snapshot(),
350            proposals,
351            Arc::clone(&self.consensus),
352        ))
353    }
354
355    /// TODO(doc): @quake
356    pub fn consensus(&self) -> &Consensus {
357        &self.consensus
358    }
359
360    /// Return arc cloned consensus re
361    pub fn cloned_consensus(&self) -> Arc<Consensus> {
362        Arc::clone(&self.consensus)
363    }
364
365    /// Return async runtime handle
366    pub fn async_handle(&self) -> &Handle {
367        &self.async_handle
368    }
369
370    /// TODO(doc): @quake
371    pub fn genesis_hash(&self) -> Byte32 {
372        self.consensus.genesis_hash()
373    }
374
375    /// TODO(doc): @quake
376    pub fn store(&self) -> &ChainDB {
377        &self.store
378    }
379
380    /// Return whether chain is in initial block download
381    pub fn is_initial_block_download(&self) -> bool {
382        // Once this function has returned false, it must remain false.
383        if self.ibd_finished.load(Ordering::Acquire) {
384            false
385        } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp())
386            > MAX_TIP_AGE
387        {
388            true
389        } else {
390            self.ibd_finished.store(true, Ordering::Release);
391            false
392        }
393    }
394
395    /// Generate and return block_template
396    pub fn get_block_template(
397        &self,
398        bytes_limit: Option<u64>,
399        proposals_limit: Option<u64>,
400        max_version: Option<Version>,
401    ) -> Result<Result<BlockTemplate, AnyError>, AnyError> {
402        self.tx_pool_controller()
403            .get_block_template(bytes_limit, proposals_limit, max_version)
404    }
405
406    pub fn set_unverified_tip(&self, header: crate::HeaderIndex) {
407        self.unverified_tip.store(Arc::new(header));
408    }
409    pub fn get_unverified_tip(&self) -> crate::HeaderIndex {
410        self.unverified_tip.load().as_ref().clone()
411    }
412
413    pub fn header_map(&self) -> &HeaderMap {
414        &self.header_map
415    }
416    pub fn remove_header_view(&self, hash: &Byte32) {
417        self.header_map.remove(hash);
418    }
419
420    pub fn block_status_map(&self) -> &DashMap<Byte32, BlockStatus> {
421        &self.block_status_map
422    }
423
424    pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
425        match self.block_status_map().get(block_hash) {
426            Some(status_ref) => *status_ref.value(),
427            None => {
428                if self.header_map().contains_key(block_hash) {
429                    BlockStatus::HEADER_VALID
430                } else {
431                    let verified = self
432                        .snapshot()
433                        .get_block_ext(block_hash)
434                        .map(|block_ext| block_ext.verified);
435                    match verified {
436                        None => BlockStatus::UNKNOWN,
437                        Some(None) => BlockStatus::BLOCK_STORED,
438                        Some(Some(true)) => BlockStatus::BLOCK_VALID,
439                        Some(Some(false)) => BlockStatus::BLOCK_INVALID,
440                    }
441                }
442            }
443        }
444    }
445
446    pub fn contains_block_status<T: ChainStore>(
447        &self,
448        block_hash: &Byte32,
449        status: BlockStatus,
450    ) -> bool {
451        self.get_block_status(block_hash).contains(status)
452    }
453
454    pub fn insert_block_status(&self, block_hash: Byte32, status: BlockStatus) {
455        self.block_status_map.insert(block_hash, status);
456    }
457
458    pub fn remove_block_status(&self, block_hash: &Byte32) {
459        let log_now = std::time::Instant::now();
460        self.block_status_map.remove(block_hash);
461        debug!("remove_block_status cost {:?}", log_now.elapsed());
462        shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
463        debug!(
464            "remove_block_status shrink_to_fit cost {:?}",
465            log_now.elapsed()
466        );
467    }
468
469    pub fn assume_valid_targets(&self) -> MutexGuard<Option<Vec<H256>>> {
470        self.assume_valid_targets.lock()
471    }
472
473    pub fn assume_valid_target_specified(&self) -> Arc<Option<H256>> {
474        Arc::clone(&self.assume_valid_target_specified)
475    }
476}