use crate::{Snapshot, SnapshotMgr};
use arc_swap::Guard;
use ckb_async_runtime::Handle;
use ckb_chain_spec::consensus::Consensus;
use ckb_constant::store::TX_INDEX_UPPER_BOUND;
use ckb_constant::sync::MAX_TIP_AGE;
use ckb_db::{Direction, IteratorMode};
use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_NUMBER_HASH};
use ckb_error::{AnyError, Error};
use ckb_notify::NotifyController;
use ckb_proposal_table::ProposalView;
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_store::{ChainDB, ChainStore};
use ckb_tx_pool::{BlockTemplate, TokioRwLock, TxPoolController};
use ckb_types::{
core::{service, BlockNumber, EpochExt, EpochNumber, HeaderView, Version},
packed::{self, Byte32},
prelude::*,
U256,
};
use ckb_verification::cache::TxVerificationCache;
use faketime::unix_time_as_millis;
use std::cmp;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
const THRESHOLD_EPOCH: EpochNumber = 2;
const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
pub struct FreezerClose {
stopped: Arc<AtomicBool>,
stop: StopHandler<()>,
}
impl Drop for FreezerClose {
fn drop(&mut self) {
self.stopped.store(true, Ordering::SeqCst);
self.stop.try_send(());
}
}
#[derive(Clone)]
pub struct Shared {
pub(crate) store: ChainDB,
pub(crate) tx_pool_controller: TxPoolController,
pub(crate) notify_controller: NotifyController,
pub(crate) txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
pub(crate) consensus: Arc<Consensus>,
pub(crate) snapshot_mgr: Arc<SnapshotMgr>,
pub(crate) async_handle: Handle,
pub(crate) ibd_finished: Arc<AtomicBool>,
}
impl Shared {
#[allow(clippy::too_many_arguments)]
pub fn new(
store: ChainDB,
tx_pool_controller: TxPoolController,
notify_controller: NotifyController,
txs_verify_cache: Arc<TokioRwLock<TxVerificationCache>>,
consensus: Arc<Consensus>,
snapshot_mgr: Arc<SnapshotMgr>,
async_handle: Handle,
ibd_finished: Arc<AtomicBool>,
) -> Shared {
Shared {
store,
tx_pool_controller,
notify_controller,
txs_verify_cache,
consensus,
snapshot_mgr,
async_handle,
ibd_finished,
}
}
pub fn spawn_freeze(&self) -> Option<FreezerClose> {
if let Some(freezer) = self.store.freezer() {
ckb_logger::info!("Freezer enable");
let (signal_sender, signal_receiver) =
ckb_channel::bounded::<()>(service::SIGNAL_CHANNEL_SIZE);
let shared = self.clone();
let thread = thread::Builder::new()
.spawn(move || loop {
match signal_receiver.recv_timeout(FREEZER_INTERVAL) {
Err(_) => {
if let Err(e) = shared.freeze() {
ckb_logger::error!("Freezer error {}", e);
break;
}
}
Ok(_) => {
ckb_logger::info!("Freezer closing");
break;
}
}
})
.expect("Start FreezerService failed");
let stop = StopHandler::new(
SignalSender::Crossbeam(signal_sender),
Some(thread),
"freezer".to_string(),
);
return Some(FreezerClose {
stopped: Arc::clone(&freezer.stopped),
stop,
});
}
None
}
fn freeze(&self) -> Result<(), Error> {
let freezer = self.store.freezer().expect("freezer inited");
let snapshot = self.snapshot();
let current_epoch = snapshot.epoch_ext().number();
if self.is_initial_block_download() {
ckb_logger::trace!("is_initial_block_download freeze skip");
return Ok(());
}
if current_epoch <= THRESHOLD_EPOCH {
ckb_logger::trace!("freezer loaf");
return Ok(());
}
let limit_block_hash = snapshot
.get_epoch_index(current_epoch + 1 - THRESHOLD_EPOCH)
.and_then(|index| snapshot.get_epoch_ext(&index))
.expect("get_epoch_ext")
.last_block_hash_in_previous_epoch();
let frozen_number = freezer.number();
let threshold = cmp::min(
snapshot
.get_block_number(&limit_block_hash)
.expect("get_block_number"),
frozen_number + MAX_FREEZE_LIMIT,
);
ckb_logger::trace!(
"freezer current_epoch {} number {} threshold {}",
current_epoch,
frozen_number,
threshold
);
let store = self.store();
let get_unfrozen_block = |number: BlockNumber| {
store
.get_block_hash(number)
.and_then(|hash| store.get_unfrozen_block(&hash))
};
let ret = freezer.freeze(threshold, get_unfrozen_block)?;
let stopped = freezer.stopped.load(Ordering::SeqCst);
self.wipe_out_frozen_data(&snapshot, ret, stopped)?;
ckb_logger::trace!("freezer finish");
Ok(())
}
fn wipe_out_frozen_data(
&self,
snapshot: &Snapshot,
frozen: BTreeMap<packed::Byte32, (BlockNumber, u32)>,
stopped: bool,
) -> Result<(), Error> {
let mut side = BTreeMap::new();
let mut batch = self.store.new_write_batch();
ckb_logger::trace!("freezer wipe_out_frozen_data {} ", frozen.len());
if !frozen.is_empty() {
for (hash, (number, txs)) in &frozen {
batch.delete_block_body(*number, hash, *txs).map_err(|e| {
ckb_logger::error!("freezer delete_block_body failed {}", e);
e
})?;
let pack_number: packed::Uint64 = number.pack();
let prefix = pack_number.as_slice();
for (key, value) in snapshot
.get_iter(
COLUMN_NUMBER_HASH,
IteratorMode::From(prefix, Direction::Forward),
)
.take_while(|(key, _)| key.starts_with(prefix))
{
let reader = packed::NumberHashReader::from_slice_should_be_ok(key.as_ref());
let block_hash = reader.block_hash().to_entity();
if &block_hash != hash {
let txs =
packed::Uint32Reader::from_slice_should_be_ok(value.as_ref()).unpack();
side.insert(block_hash, (reader.number().to_entity(), txs));
}
}
}
self.store.write_sync(&batch).map_err(|e| {
ckb_logger::error!("freezer write_batch delete failed {}", e);
e
})?;
batch.clear()?;
if !stopped {
let start = frozen.keys().min().expect("frozen empty checked");
let end = frozen.keys().max().expect("frozen empty checked");
self.compact_block_body(start, end);
}
}
if !side.is_empty() {
for (hash, (number, txs)) in &side {
batch
.delete_block(number.unpack(), hash, *txs)
.map_err(|e| {
ckb_logger::error!("freezer delete_block_body failed {}", e);
e
})?;
}
self.store.write(&batch).map_err(|e| {
ckb_logger::error!("freezer write_batch delete failed {}", e);
e
})?;
if !stopped {
let start = side.keys().min().expect("side empty checked");
let end = side.keys().max().expect("side empty checked");
self.compact_block_body(start, end);
}
}
Ok(())
}
fn compact_block_body(&self, start: &packed::Byte32, end: &packed::Byte32) {
let start_t = packed::TransactionKey::new_builder()
.block_hash(start.clone())
.index(0u32.pack())
.build();
let end_t = packed::TransactionKey::new_builder()
.block_hash(end.clone())
.index(TX_INDEX_UPPER_BOUND.pack())
.build();
if let Err(e) = self.store.compact_range(
COLUMN_BLOCK_BODY,
Some(start_t.as_slice()),
Some(end_t.as_slice()),
) {
ckb_logger::error!("freezer compact_range {}-{} error {}", start, end, e);
}
}
pub fn tx_pool_controller(&self) -> &TxPoolController {
&self.tx_pool_controller
}
pub fn txs_verify_cache(&self) -> Arc<TokioRwLock<TxVerificationCache>> {
Arc::clone(&self.txs_verify_cache)
}
pub fn notify_controller(&self) -> &NotifyController {
&self.notify_controller
}
pub fn snapshot(&self) -> Guard<Arc<Snapshot>> {
self.snapshot_mgr.load()
}
pub fn store_snapshot(&self, snapshot: Arc<Snapshot>) {
self.snapshot_mgr.store(snapshot)
}
pub fn refresh_snapshot(&self) {
let new = self.snapshot().refresh(self.store.get_snapshot());
self.store_snapshot(Arc::new(new));
}
pub fn new_snapshot(
&self,
tip_header: HeaderView,
total_difficulty: U256,
epoch_ext: EpochExt,
proposals: ProposalView,
) -> Arc<Snapshot> {
Arc::new(Snapshot::new(
tip_header,
total_difficulty,
epoch_ext,
self.store.get_snapshot(),
proposals,
Arc::clone(&self.consensus),
))
}
pub fn consensus(&self) -> &Consensus {
&self.consensus
}
pub fn async_handle(&self) -> &Handle {
&self.async_handle
}
pub fn genesis_hash(&self) -> Byte32 {
self.consensus.genesis_hash()
}
pub fn store(&self) -> &ChainDB {
&self.store
}
pub fn is_initial_block_download(&self) -> bool {
if self.ibd_finished.load(Ordering::Relaxed) {
false
} else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp())
> MAX_TIP_AGE
{
true
} else {
self.ibd_finished.store(true, Ordering::Relaxed);
false
}
}
pub fn get_block_template(
&self,
bytes_limit: Option<u64>,
proposals_limit: Option<u64>,
max_version: Option<Version>,
) -> Result<Result<BlockTemplate, AnyError>, AnyError> {
self.tx_pool_controller().get_block_template(
bytes_limit,
proposals_limit,
max_version.map(Into::into),
)
}
}