use std::{
cmp,
cmp::{Ordering, max, min},
collections::VecDeque,
convert::TryFrom,
mem,
ops::{Bound, RangeBounds},
sync::{
Arc,
RwLock,
RwLockReadGuard,
RwLockWriteGuard,
atomic::{self, AtomicBool},
},
time::{Duration, Instant},
};
use blake2::Blake2b;
use digest::consts::U32;
use jmt::{
JellyfishMerkleTree,
KeyHash,
OwnedValue,
Version,
storage::{LeafNode, Node, NodeKey, TreeReader},
};
use log::*;
use primitive_types::U512;
use serde::{Deserialize, Serialize};
use tari_common_types::{
chain_metadata::ChainMetadata,
epoch::VnEpoch,
types::{
BadBlock,
BlockHash,
CompressedCommitment,
CompressedPublicKey,
CompressedSignature,
FixedHash,
HashOutput,
UncompressedCommitment,
},
};
use tari_hashing::TransactionHashDomain;
use tari_mmr::{MerkleProof, pruned_hashset::PrunedHashSet};
use tari_node_components::blocks::{
Block,
BlockHeader,
BlockHeaderAccumulatedData,
BlockHeaderValidationError,
ChainBlock,
ChainHeader,
HistoricalBlock,
NewBlockTemplate,
};
use tari_transaction_components::{
BanPeriod,
consensus::{ConsensusConstants, DomainSeparatedConsensusHasher},
tari_proof_of_work::PowAlgorithm,
transaction_components::{TransactionInput, TransactionKernel, TransactionOutput},
};
use tari_utilities::{ByteArray, epoch_time::EpochTime, hex::Hex};
use super::{
AccumulatedDataRebuildStatus,
BlockchainCheckRequest,
CheckFailure,
MinedInfo,
PayrefRebuildStatus,
TemplateRegistrationEntry,
ValidatorNodeRegistrationInfo,
smt_hasher::SmtHasher,
};
use crate::{
PrunedInputMmr,
PrunedKernelMmr,
PrunedOutputMmr,
block_output_mr_hash_from_pruned_mmr,
blocks::{
BlockAccumulatedData,
BlockHeaderAccumulatedDataBuilder,
UpdateBlockAccumulatedData,
genesis_block::VALIDATOR_MR_EMPTY_PLACEHOLDER_HASH,
},
chain_storage::{
BlockAddResult,
BlockchainBackend,
DbBasicStats,
DbTotalSizeStats,
HorizonData,
InputMinedInfo,
MmrTree,
Optional,
OrNotFound,
Reorg,
TargetDifficulties,
consts::{
BACKGROUND_PRUNING_CHUNK_SIZE,
BACKGROUND_PRUNING_THRESHOLD,
BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY,
BLOCKCHAIN_DATABASE_PRUNED_MODE_PRUNING_INTERVAL,
BLOCKCHAIN_DATABASE_PRUNING_HORIZON,
},
db_transaction::{DbKey, DbTransaction, DbValue, HorizonSyncOutputCheckpoint},
error::ChainStorageError,
kernel_merkle_proof::KernelMerkleProof,
lmdb_db::{BREATHING_TIME_MS_MAX, BREATHING_TIME_MS_MIN, BlockchainCheckStatus},
smt_hasher::ValidatorNodeJmtHasher,
utxo_mined_info::OutputMinedInfo,
},
common::rolling_vec::RollingVec,
consensus::{BaseNodeConsensusManager, chain_strength_comparer::ChainStrengthComparer},
input_mr_hash_from_pruned_mmr,
kernel_mr_hash_from_pruned_mmr,
proof_of_work::{TargetDifficultyWindow, randomx_factory::RandomXFactory},
validation::{
CandidateBlockValidator,
DifficultyCalculator,
HeaderChainLinkedValidator,
InternalConsistencyValidator,
ValidationError,
helpers::calc_median_timestamp,
tari_rx_vm_key_height,
},
};
const LOG_TARGET: &str = "c::cs::database";
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BlockchainDatabaseConfig {
pub orphan_storage_capacity: usize,
pub pruning_horizon: u64,
pub pruning_interval: u64,
pub track_reorgs: bool,
pub cleanup_orphans_at_startup: bool,
pub clear_bad_blocks_at_startup: bool,
}
impl Default for BlockchainDatabaseConfig {
fn default() -> Self {
Self {
orphan_storage_capacity: BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY,
pruning_horizon: BLOCKCHAIN_DATABASE_PRUNING_HORIZON,
pruning_interval: BLOCKCHAIN_DATABASE_PRUNED_MODE_PRUNING_INTERVAL,
track_reorgs: false,
cleanup_orphans_at_startup: true,
clear_bad_blocks_at_startup: true,
}
}
}
pub struct Validators<B> {
pub block: Arc<dyn CandidateBlockValidator<B>>,
pub header: Arc<dyn HeaderChainLinkedValidator<B>>,
pub orphan: Arc<dyn InternalConsistencyValidator>,
}
impl<B: BlockchainBackend> Validators<B> {
pub fn new(
block: impl CandidateBlockValidator<B> + 'static,
header: impl HeaderChainLinkedValidator<B> + 'static,
orphan: impl InternalConsistencyValidator + 'static,
) -> Self {
Self {
block: Arc::new(block),
header: Arc::new(header),
orphan: Arc::new(orphan),
}
}
}
impl<B> Clone for Validators<B> {
fn clone(&self) -> Self {
Validators {
block: Arc::clone(&self.block),
header: Arc::clone(&self.header),
orphan: Arc::clone(&self.orphan),
}
}
}
macro_rules! fetch {
($db:ident, $key_val:expr, $key_var:ident) => {{
let key = DbKey::$key_var($key_val);
match $db.fetch(&key) {
Ok(None) => Err(key.to_value_not_found_error()),
Ok(Some(DbValue::$key_var(k))) => Ok(*k),
Ok(Some(other)) => unexpected_result(key, other),
Err(e) => log_error(key, e),
}
}};
}
macro_rules! try_fetch {
($db:ident, $key_val:expr, $key_var:ident) => {{
let key = DbKey::$key_var($key_val);
match $db.fetch(&key) {
Ok(None) => Ok(None),
Ok(Some(DbValue::$key_var(k))) => Ok(Some(*k)),
Ok(Some(other)) => unexpected_result(key, other),
Err(e) => log_error(key, e),
}
}};
}
pub struct BlockchainDatabase<B> {
db: Arc<RwLock<B>>,
validators: Validators<B>,
config: BlockchainDatabaseConfig,
consensus_manager: BaseNodeConsensusManager,
difficulty_calculator: Arc<DifficultyCalculator>,
disable_add_block_flag: Arc<AtomicBool>,
is_background_pruning: Arc<AtomicBool>,
}
#[allow(clippy::ptr_arg)]
impl<B> BlockchainDatabase<B>
where B: BlockchainBackend
{
pub fn new(
db: B,
consensus_manager: BaseNodeConsensusManager,
validators: Validators<B>,
config: BlockchainDatabaseConfig,
difficulty_calculator: DifficultyCalculator,
) -> Result<Self, ChainStorageError> {
trace!(target: LOG_TARGET, "BlockchainDatabase config: {config:?}");
let blockchain_db = BlockchainDatabase {
db: Arc::new(RwLock::new(db)),
validators,
config,
consensus_manager,
difficulty_calculator: Arc::new(difficulty_calculator),
disable_add_block_flag: Arc::new(AtomicBool::new(false)),
is_background_pruning: Arc::new(AtomicBool::new(false)),
};
Ok(blockchain_db)
}
pub fn start_new(
db: B,
consensus_manager: BaseNodeConsensusManager,
validators: Validators<B>,
config: BlockchainDatabaseConfig,
difficulty_calculator: DifficultyCalculator,
) -> Result<Self, ChainStorageError> {
let blockchain_db = BlockchainDatabase {
db: Arc::new(RwLock::new(db)),
validators,
config,
consensus_manager,
difficulty_calculator: Arc::new(difficulty_calculator),
disable_add_block_flag: Arc::new(AtomicBool::new(false)),
is_background_pruning: Arc::new(AtomicBool::new(false)),
};
blockchain_db.start()?;
Ok(blockchain_db)
}
pub fn start(&self) -> Result<(), ChainStorageError> {
let (is_empty, config) = {
let db = self.db_read_access()?;
(db.is_empty()?, &self.config)
};
let genesis_block = Arc::new(self.consensus_manager.get_genesis_block());
if is_empty {
info!(
target: LOG_TARGET,
"Blockchain db is empty. Adding genesis block {}.",
genesis_block.block().body.to_counts_string()
);
let mut txn = DbTransaction::new();
self.write(txn)?;
txn = DbTransaction::new();
self.insert_block(genesis_block.clone())?;
let body = &genesis_block.block().body;
let mut input_sum = UncompressedCommitment::default();
for input in body.inputs() {
input_sum = &input_sum + &input.commitment()?.to_commitment()?;
}
let mut output_sum = UncompressedCommitment::default();
for output in body.outputs() {
output_sum = &output_sum + &output.commitment.to_commitment()?;
}
let total_utxo_sum = CompressedCommitment::from_commitment(&output_sum - &input_sum);
let mut kernel_sum = UncompressedCommitment::default();
for kernel in body.kernels() {
kernel_sum = &kernel_sum + &kernel.excess.to_commitment()?;
}
txn.update_block_accumulated_data(*genesis_block.hash(), UpdateBlockAccumulatedData {
kernel_sum: Some(CompressedCommitment::from_commitment(kernel_sum.clone())),
..Default::default()
});
txn.set_pruned_height(0);
txn.set_horizon_data(CompressedCommitment::from_commitment(kernel_sum), total_utxo_sum);
self.write(txn)?;
self.store_pruning_horizon(config.pruning_horizon)?;
} else if !self.chain_block_or_orphan_block_exists(genesis_block.accumulated_data().hash)? {
error!(
target: LOG_TARGET,
"Genesis block in database does not match the supplied genesis block in the code! Hash in the code \
{:?}, hash in the database {:?}",
self.fetch_chain_header(0)?.hash(),
genesis_block.accumulated_data().hash
);
return Err(ChainStorageError::CorruptedDatabase(
"Genesis block in database does not match the supplied genesis block in the code! Please delete and \
resync your blockchain database."
.into(),
));
} else {
info!(
target: LOG_TARGET,
"Blockchain db is not empty. Genesis block already exists in the database."
);
}
if config.cleanup_orphans_at_startup {
match self.cleanup_all_orphans() {
Ok(_) => info!(target: LOG_TARGET, "Orphan database cleaned out at startup.",),
Err(e) => warn!(
target: LOG_TARGET,
"Orphan database could not be cleaned out at startup: ({e:?})."
),
}
}
if config.clear_bad_blocks_at_startup {
match self.clear_all_bad_blocks() {
Ok(_) => info!(target: LOG_TARGET, "Bad blocks cleaned out at startup.",),
Err(e) => warn!(
target: LOG_TARGET,
"Bad blocks could not be cleaned out at startup: ({e:?})."
),
}
}
let pruning_horizon = self.get_chain_metadata()?.pruning_horizon();
if config.pruning_horizon != pruning_horizon {
debug!(
target: LOG_TARGET,
"Updating pruning horizon from {} to {}.", pruning_horizon, config.pruning_horizon,
);
self.store_pruning_horizon(config.pruning_horizon)?;
}
if !config.track_reorgs {
self.clear_all_reorgs()?;
}
self.rebuild_payref_indexes_background_task()?;
self.rebuild_accumulated_data_background_task()?;
self.initialize_blockchain_check_tasks()?;
self.prune_database_background_task()?;
Ok(())
}
pub fn prune_database_background_task(&self) -> Result<(), ChainStorageError> {
let metadata = {
let db = self.db_read_access()?;
db.fetch_chain_metadata()?
};
if !metadata.is_pruned_node() {
return Ok(());
}
let pruning_horizon = self.config.pruning_horizon;
let prune_to_height_target = metadata.best_block_height().saturating_sub(pruning_horizon);
let blocks_to_prune = prune_to_height_target.saturating_sub(metadata.pruned_height());
if blocks_to_prune <= BACKGROUND_PRUNING_THRESHOLD {
return Ok(());
}
if self
.is_background_pruning
.compare_exchange(false, true, atomic::Ordering::SeqCst, atomic::Ordering::SeqCst)
.is_err()
{
debug!(
target: LOG_TARGET,
"Background pruning task is already running, skipping."
);
return Ok(());
}
info!(
target: LOG_TARGET,
"Starting background database pruning: {} blocks to prune (from height {} to {})",
blocks_to_prune,
metadata.pruned_height(),
prune_to_height_target,
);
let db_rw_lock = self.db.clone();
let is_pruning_flag = self.is_background_pruning.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(BREATHING_TIME_MS_MIN)).await;
let db = db_rw_lock.clone();
let res = tokio::task::spawn_blocking(move || -> Result<bool, ChainStorageError> {
let mut db = db.write().map_err(|e| {
ChainStorageError::AccessError(format!("Write lock on blockchain backend failed: {e:?}"))
})?;
let metadata = db.fetch_chain_metadata()?;
let target = metadata.best_block_height().saturating_sub(pruning_horizon);
let blocks_remaining = target.saturating_sub(metadata.pruned_height());
if blocks_remaining <= BACKGROUND_PRUNING_THRESHOLD {
return Ok(true);
}
let chunk_end = (metadata.pruned_height() + BACKGROUND_PRUNING_CHUNK_SIZE).min(target);
prune_to_height(&mut *db, chunk_end)?;
info!(
target: LOG_TARGET,
"Background pruning: completed chunk up to height {} (target: {})",
chunk_end, target,
);
Ok(false)
})
.await;
match res {
Ok(Ok(true)) => {
break;
},
Ok(Ok(false)) => {
},
Ok(Err(e)) => {
error!(
target: LOG_TARGET,
"Background pruning failed: {e}",
);
break;
},
Err(e) => {
error!(
target: LOG_TARGET,
"Background pruning task panicked: {e}",
);
break;
},
}
}
is_pruning_flag.store(false, atomic::Ordering::SeqCst);
info!(target: LOG_TARGET, "Background pruning task completed.");
});
Ok(())
}
pub fn rebuild_accumulated_data_background_task(&self) -> Result<(), ChainStorageError> {
let initial_status = {
let db = self.db_read_access()?;
db.fetch_accumulated_data_rebuild_status()?
};
debug!(target: LOG_TARGET, "[AccData] Rebuilding accumulated data status: {initial_status:?}");
if initial_status.is_rebuilt {
debug!(target: LOG_TARGET, "[AccData] Accumulated data has already been rebuilt.");
return Ok(());
}
let db_rw_lock = self.db.clone();
let rules = self.consensus_manager.clone();
tokio::task::spawn(async move {
let difficulty_calculator = DifficultyCalculator::new(rules.clone(), RandomXFactory::new(1));
let start_height = initial_status.last_rebuild_height.unwrap_or(1);
let mut last_status = initial_status.clone();
debug!(
target: LOG_TARGET,
"[AccData] Start rebuilding accumulated data from height {start_height}"
);
let mut height = start_height;
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let db = db_rw_lock.clone();
let difficulty_calculator = difficulty_calculator.clone();
let cc = rules.consensus_constants(height).clone();
let res = tokio::task::spawn_blocking(move || {
process_accumulated_data_for_height(db, difficulty_calculator, height, &cc)
})
.await;
match res {
Ok(Ok(current_status)) => {
last_status = current_status;
},
Ok(Err(e)) => {
error!(
target: LOG_TARGET,
"[AccData] Rebuilding accumulated data failed. Initial status: {initial_status:?}. \
Last updated status: {last_status:?} ({e})"
);
break;
},
Err(e) => {
error!(
target: LOG_TARGET,
"[AccData] Rebuilding accumulated data failed. Initial status: {initial_status:?}. \
Last updated status: {last_status:?} ({e})",
);
break;
},
}
if last_status.is_rebuilt {
debug!(
target: LOG_TARGET,
"[AccData] Rebuilding accumulated data from height {start_height} completed, Final status: {last_status:?}"
);
break;
}
height = height.saturating_add(1);
}
});
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn check_accumulated_data_background_task(&self) -> Result<(), ChainStorageError> {
let accumulated_data_rebuild_status = {
let db = self.db_read_access()?;
db.fetch_accumulated_data_rebuild_status()?
};
if !accumulated_data_rebuild_status.is_rebuilt {
warn!(target: LOG_TARGET, "[AccData check] Accumulated data migration task in progress, cannot continue.");
return Err(ChainStorageError::AccDataMigrationStillInProgress);
}
let check_status = self.fetch_accumulated_data_check_status()?;
let initial_status = if let Some(status) = check_status {
if status.checked_status().0 {
debug!(target: LOG_TARGET, "[AccData check] Accumulated data check has concluded.");
return Ok(());
}
if status.is_running() {
debug!(target: LOG_TARGET, "[AccData check] Accumulated data check is already busy.");
return Ok(());
}
debug!(target: LOG_TARGET, "[AccData check] Accumulated data check in progress: {status:?}.");
status
} else {
return Ok(());
};
{
let db = self.db_write_access()?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::SetRunState(true))?;
}
let db_rw_lock = self.db.clone();
let rules = self.consensus_manager.clone();
tokio::task::spawn(async move {
let clear_flags = |db: Arc<RwLock<B>>, has_concluded: bool, last_failure: Option<CheckFailure>| match db
.write()
{
Ok(db) => {
if let Err(e) = db.update_accumulated_data_check_status(BlockchainCheckRequest::ClearRunningFlags {
has_concluded,
last_failure,
}) {
error!(
target: LOG_TARGET,
"[Blockchain check] Failed to clear the db consistency check status run flags: {e:?}"
);
}
},
Err(e) => {
error!(target: LOG_TARGET, "[Blockchain check] Write lock on blockchain db failed: {e:?}");
},
};
let difficulty_calculator = DifficultyCalculator::new(rules.clone(), RandomXFactory::new(1));
let start_height = initial_status.last_check_height.unwrap_or(1);
let mut last_status = initial_status.clone();
debug!(
target: LOG_TARGET,
"[AccData check] Start checking accumulated data from height {start_height}"
);
let mut height = start_height;
let sleep_ms = last_status
.breathing_time_ms
.clamp(BREATHING_TIME_MS_MIN, BREATHING_TIME_MS_MAX);
let autocorrect_enabled = initial_status.autocorrect_enabled();
loop {
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
let db = db_rw_lock.clone();
let difficulty_calculator = difficulty_calculator.clone();
let cc = rules.consensus_constants(height).clone();
let res = tokio::task::spawn_blocking(move || {
verify_accumulated_data_for_height(db, difficulty_calculator, height, &cc, autocorrect_enabled)
})
.await;
match res {
Ok(Ok(current_status)) => {
last_status = current_status;
},
Ok(ref _err @ Err(ChainStorageError::CorruptedDatabase(ref e))) => {
error!(
target: LOG_TARGET,
"[AccData check] Accumulated data check found corruption. Initial \
status: {initial_status:?}. Last updated status: {last_status:?}. ({e})"
);
info!(
target: LOG_TARGET,
"[AccData check] Autocorrect flag is disabled - re-run with autocorrect flag enabled",
);
clear_flags(
db_rw_lock.clone(),
true,
Some(CheckFailure {
corrupt_db: true,
error: e.to_string(),
}),
);
break;
},
Ok(Err(e)) => {
error!(
target: LOG_TARGET,
"[AccData check] Checking accumulated data failed. Initial status: {initial_status:?}. \
Last updated status: {last_status:?} ({e})"
);
clear_flags(
db_rw_lock.clone(),
false,
Some(CheckFailure {
corrupt_db: false,
error: e.to_string(),
}),
);
break;
},
Err(e) => {
error!(
target: LOG_TARGET,
"[AccData check] Checking accumulated data failed. Initial status: {initial_status:?}. \
Last updated status: {last_status:?} ({e})",
);
clear_flags(
db_rw_lock.clone(),
false,
Some(CheckFailure {
corrupt_db: false,
error: e.to_string(),
}),
);
break;
},
}
if last_status.checked_status().0 || last_status.stop_if_running {
clear_flags(db_rw_lock.clone(), true, None);
if last_status.checked_status().0 {
debug!(
target: LOG_TARGET,
"[AccData check] Accumulated data check from height {start_height} completed, Final status: \
{last_status:?}"
);
} else {
debug!(
target: LOG_TARGET,
"[AccData check] Accumulated data check stopped as requested, Final status: {last_status:?}"
);
}
break;
}
height = height.saturating_add(1);
}
});
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn check_blockchain_consistency_background_task(&self) -> Result<(), ChainStorageError> {
let accumulated_data_rebuild_status = {
let db = self.db_read_access()?;
db.fetch_accumulated_data_rebuild_status()?
};
if !accumulated_data_rebuild_status.is_rebuilt {
warn!(target: LOG_TARGET, "[Blockchain check] Accumulated data migration task in progress, cannot continue.");
return Err(ChainStorageError::AccDataMigrationStillInProgress);
}
let check_status = self.fetch_blockchain_consistency_check_status()?;
let initial_status = if let Some(status) = check_status {
if status.checked_status().0 {
debug!(target: LOG_TARGET, "[Blockchain check] Blockchain consistency check has concluded.");
return Ok(());
}
if status.is_running() {
debug!(target: LOG_TARGET, "[Blockchain check] Blockchain consistency check is already busy.");
return Ok(());
}
debug!(target: LOG_TARGET, "[Blockchain check] Blockchain consistency check in progress: {status:?}.");
status
} else {
return Ok(());
};
{
let db = self.db_write_access()?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetRunState(true))?;
}
let db_rw_lock = self.db.clone();
let validators = self.validators.clone();
tokio::task::spawn(async move {
let clear_flags =
|db: Arc<RwLock<B>>, has_concluded: bool, last_failure: Option<CheckFailure>| match db.write() {
Ok(db) => {
if let Err(e) =
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::ClearRunningFlags {
has_concluded,
last_failure,
})
{
error!(
target: LOG_TARGET,
"[Blockchain check] Failed to clear the db consistency check status run flags: {e:?}"
);
}
},
Err(e) => {
error!(target: LOG_TARGET, "[Blockchain check] Write lock on blockchain db failed: {e:?}");
},
};
let start_height = initial_status.last_check_height.unwrap_or(1);
let mut last_status = initial_status.clone();
debug!(
target: LOG_TARGET,
"[Blockchain check] Start checking blockchain consistency from height {start_height}"
);
let mut height = start_height;
let sleep_ms = last_status
.breathing_time_ms
.clamp(BREATHING_TIME_MS_MIN, BREATHING_TIME_MS_MAX);
loop {
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
let db = db_rw_lock.clone();
let validators = validators.clone();
let full_validation = initial_status.full_validation_enabled();
let res = tokio::task::spawn_blocking(move || {
verify_blockchain_consistency_for_height(db, &validators, height, full_validation)
})
.await;
match res {
Ok(Ok(current_status)) => {
last_status = current_status;
},
Ok(ref _err @ Err(ChainStorageError::CorruptedDatabase(ref e))) => {
error!(
target: LOG_TARGET,
"[Blockchain check] Blockchain consistency check found unrecoverable corruption. Initial \
status: {initial_status:?}. Last updated status: {last_status:?}. ({e})"
);
if initial_status.autocorrect_enabled() {
let db_rw_lock = db_rw_lock.clone();
let res = tokio::task::spawn_blocking(move || {
if let Ok(mut db) = db_rw_lock.write() {
rewind_to_height(&mut *db, height - 1)
} else {
Err(ChainStorageError::AccessError(
"Write lock on blockchain backend failed".into(),
))
}
})
.await;
match res {
Ok(Ok(_)) => {
info!(target: LOG_TARGET,
"[Blockchain check] Rewound the blockchain to height {} after unrecoverable \
corruption at height {}.",
height - 1, height
);
},
Ok(Err(e)) => {
error!(target: LOG_TARGET,
"[Blockchain check] Rewind after unrecoverable corruption at height {height} \
failed: {e}",
);
},
Err(e) => {
error!(target: LOG_TARGET,
"[Blockchain check] Rewind task join error after unrecoverable corruption at \
height {height}: {e}",
);
},
}
} else {
info!(
target: LOG_TARGET,
"[Blockchain check] Autocorrect flag is disabled - manually rewind to height {}",
height - 1,
);
}
clear_flags(
db_rw_lock.clone(),
true,
Some(CheckFailure {
corrupt_db: true,
error: e.to_string(),
}),
);
break;
},
Ok(Err(e)) => {
error!(
target: LOG_TARGET,
"[Blockchain check] Checking blockchain consistency failed. Initial status: \
{initial_status:?}. Last updated status: {last_status:?} ({e})"
);
clear_flags(
db_rw_lock.clone(),
false,
Some(CheckFailure {
corrupt_db: false,
error: e.to_string(),
}),
);
break;
},
Err(e) => {
error!(
target: LOG_TARGET,
"[Blockchain check] Checking blockchain consistency failed. Initial status: \
{initial_status:?}. Last updated status: {last_status:?} ({e})",
);
clear_flags(
db_rw_lock.clone(),
false,
Some(CheckFailure {
corrupt_db: false,
error: e.to_string(),
}),
);
break;
},
}
if last_status.checked_status().0 || last_status.stop_if_running {
clear_flags(db_rw_lock.clone(), true, None);
if last_status.checked_status().0 {
debug!(
target: LOG_TARGET,
"[Blockchain check] Blockchain consistency check from height {start_height} completed, \
Final status: {last_status:?}"
);
} else {
debug!(
target: LOG_TARGET,
"[Blockchain check] Blockchain consistency check stopped as requested, Final status: \
{last_status:?}"
);
}
break;
}
height = height.saturating_add(1);
}
});
Ok(())
}
fn initialize_blockchain_check_tasks(&self) -> Result<(), ChainStorageError> {
let db = self.db_write_access()?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::SetRunState(false))?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetRunState(false))?;
Ok(())
}
pub fn request_accumulated_data_check(
&self,
auto_correct: bool,
breathing_time_ms: u64,
) -> Result<(), ChainStorageError> {
{
if self
.fetch_accumulated_data_check_status()?
.unwrap_or_default()
.is_running()
{
return Err(ChainStorageError::InvalidOperation(
"[Blockchain check] Cannot start a new accumulated data check while one is already running."
.to_string(),
));
}
let db = self.db_write_access()?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::ResumeCheck)?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::SetAutoCorrect(auto_correct))?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::SetBreathingTime(breathing_time_ms))?;
trace!(
target: LOG_TARGET,
"[AccData check] Requested accumulated data check: auto_correct({auto_correct})"
);
}
self.check_accumulated_data_background_task()
}
pub fn request_blockchain_consistency_check(
&self,
full_validation: bool,
auto_correct: bool,
breathing_time_ms: u64,
) -> Result<(), ChainStorageError> {
{
if self
.fetch_blockchain_consistency_check_status()?
.unwrap_or_default()
.is_running()
{
return Err(ChainStorageError::InvalidOperation(
"[Blockchain check] Cannot start a new blockchain consistency check while one is already running."
.to_string(),
));
}
let db = self.db_write_access()?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::ResumeCheck)?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetFullValidation(full_validation))?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetAutoCorrect(auto_correct))?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetBreathingTime(breathing_time_ms))?;
trace!(
target: LOG_TARGET,
"[Blockchain check] Requested blockchain consistency check: auto_correct({auto_correct}), \
full_validation({full_validation})"
);
}
self.check_blockchain_consistency_background_task()
}
pub fn stop_running_accumulated_data_check_task(&self) -> Result<(), ChainStorageError> {
let db = self.db_write_access()?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::SetStopIfRunning(true))?;
trace!(target: LOG_TARGET, "[AccData check] Requested stop");
Ok(())
}
pub fn stop_running_blockchain_consistency_check_task(&self) -> Result<(), ChainStorageError> {
let db = self.db_write_access()?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetStopIfRunning(true))?;
trace!(target: LOG_TARGET, "[Blockchain check] Requested stop");
Ok(())
}
pub fn reset_accumulated_data_check_db_counters(&self) -> Result<(), ChainStorageError> {
let acc_diff_status = self.fetch_accumulated_data_check_status()?;
if let Some(acc_diff) = acc_diff_status &&
acc_diff.is_running()
{
return Err(ChainStorageError::InvalidOperation(
"[AccData check] Cannot reset counters while a check is running.".to_string(),
));
}
let db = self.db_write_access()?;
db.update_accumulated_data_check_status(BlockchainCheckRequest::ResetAllCounters)?;
trace!(target: LOG_TARGET, "[AccData check] Requested reset counters");
Ok(())
}
pub fn reset_blockchain_consistency_check_db_counters(&self) -> Result<(), ChainStorageError> {
let consistency_status = self.fetch_blockchain_consistency_check_status()?;
if let Some(consistency) = consistency_status &&
consistency.is_running()
{
return Err(ChainStorageError::InvalidOperation(
"[Blockchain check] Cannot reset counters while a check is running.".to_string(),
));
}
let db = self.db_write_access()?;
db.update_blockchain_consistency_check_status(BlockchainCheckRequest::ResetAllCounters)?;
trace!(target: LOG_TARGET, "[Blockchain check] Requested reset counters");
Ok(())
}
pub fn fetch_accumulated_data_check_status(&self) -> Result<Option<BlockchainCheckStatus>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_accumulated_data_check_status()
}
pub fn fetch_blockchain_consistency_check_status(
&self,
) -> Result<Option<BlockchainCheckStatus>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_blockchain_consistency_check_status()
}
pub fn rebuild_payref_indexes_background_task(&self) -> Result<(), ChainStorageError> {
let initial_status = {
let db = self.db_read_access()?;
db.fetch_payref_rebuild_status()?
};
if initial_status.is_rebuilt {
debug!(target: LOG_TARGET, "[PayRef] Payref indexes has already been rebuilt.");
return Ok(());
}
let metadata_at_start = if let Some(metadata) = initial_status.metadata_at_start.clone() {
metadata
} else {
let db = self.db_read_access()?;
db.fetch_chain_metadata()?
};
let db_rw_lock = self.db.clone();
tokio::task::spawn(async move {
let start_height = initial_status.last_rebuild_height.unwrap_or_default();
let mut last_status = initial_status.clone();
debug!(
target: LOG_TARGET,
"[PayRef] Starting index rebuilding for heights {} to {}",
start_height, metadata_at_start.best_block_height()
);
let mut initialize_stats = Some(metadata_at_start.best_block_height());
for height in start_height..=metadata_at_start.best_block_height() {
tokio::time::sleep(Duration::from_millis(100)).await;
let finalize = height == metadata_at_start.best_block_height();
let metadata = metadata_at_start.clone();
let db = db_rw_lock.clone();
let res = tokio::task::spawn_blocking(move || {
process_payref_for_height(db, height, metadata, initialize_stats, finalize)
})
.await;
match res {
Ok(Ok(current_status)) => {
last_status = current_status;
},
Ok(Err(e)) => {
error!(
target: LOG_TARGET,
"[PayRef] Index rebuilding failed. Initial status: {initial_status:?}. Last updated status: {last_status:?} ({e})"
);
break;
},
Err(e) => {
error!(
target: LOG_TARGET,
"[PayRef] Index rebuilding failed. Initial status: {initial_status:?}. Last updated status: {last_status:?} ({e})"
);
break;
},
}
if initialize_stats.is_some() {
initialize_stats = None;
}
if finalize || last_status.is_rebuilt {
debug!(
target: LOG_TARGET,
"[PayRef] Starting index rebuilding completed, Final status: {last_status:?}",
);
break;
}
}
});
Ok(())
}
pub fn fetch_genesis_block(&self) -> ChainBlock {
self.consensus_manager.get_genesis_block()
}
pub fn consensus_constants(&self) -> Result<&ConsensusConstants, ChainStorageError> {
let height = self.get_height()?;
Ok(self.rules().consensus_constants(height))
}
pub fn rules(&self) -> &BaseNodeConsensusManager {
&self.consensus_manager
}
pub fn db_read_access(&self) -> Result<RwLockReadGuard<'_, B>, ChainStorageError> {
self.db.read().map_err(|e| {
error!(
target: LOG_TARGET,
"An attempt to get a read lock on the blockchain backend failed. {e:?}"
);
ChainStorageError::AccessError("Read lock on blockchain backend failed".into())
})
}
#[cfg(test)]
pub fn test_db_write_access(&self) -> Result<RwLockWriteGuard<'_, B>, ChainStorageError> {
self.db.write().map_err(|e| {
error!(
target: LOG_TARGET,
"An attempt to get a write lock on the blockchain backend failed. {e:?}"
);
ChainStorageError::AccessError("Write lock on blockchain backend failed".into())
})
}
fn db_write_access(&self) -> Result<RwLockWriteGuard<'_, B>, ChainStorageError> {
self.db.write().map_err(|e| {
error!(
target: LOG_TARGET,
"An attempt to get a write lock on the blockchain backend failed. {e:?}"
);
ChainStorageError::AccessError("Write lock on blockchain backend failed".into())
})
}
pub(crate) fn is_add_block_disabled(&self) -> bool {
self.disable_add_block_flag.load(atomic::Ordering::SeqCst)
}
pub(crate) fn set_disable_add_block_flag(&self) {
self.disable_add_block_flag.store(true, atomic::Ordering::SeqCst);
}
pub(crate) fn clear_disable_add_block_flag(&self) {
self.disable_add_block_flag.store(false, atomic::Ordering::SeqCst);
}
pub fn write(&self, transaction: DbTransaction) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
db.write(transaction)
}
pub fn get_height(&self) -> Result<u64, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.fetch_chain_metadata()?.best_block_height())
}
pub fn get_accumulated_difficulty(&self) -> Result<U512, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.fetch_chain_metadata()?.accumulated_difficulty())
}
pub fn get_chain_metadata(&self) -> Result<ChainMetadata, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_chain_metadata()
}
pub fn fetch_output(&self, output_hash: HashOutput) -> Result<Option<OutputMinedInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_output(&output_hash)
}
pub fn fetch_input(&self, output_hash: HashOutput) -> Result<Option<InputMinedInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_input(&output_hash)
}
pub fn fetch_mined_info_by_payref(&self, payref: FixedHash) -> Result<MinedInfo, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_mined_info_by_payref(&payref)
}
pub fn fetch_mined_info_by_output_hash(&self, output_hash: HashOutput) -> Result<MinedInfo, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_mined_info_by_output_hash(&output_hash)
}
pub fn fetch_unspent_output_hash_by_commitment(
&self,
commitment: CompressedCommitment,
) -> Result<Option<HashOutput>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_unspent_output_hash_by_commitment(&commitment)
}
pub fn fetch_outputs_with_spend_status_at_tip(
&self,
hashes: Vec<HashOutput>,
) -> Result<Vec<Option<(TransactionOutput, bool)>>, ChainStorageError> {
let db = self.db_read_access()?;
let tip = db.fetch_chain_metadata()?.best_block_height();
let smt_reader = db.create_smt_reader()?;
let smt = JellyfishMerkleTree::<_, SmtHasher>::new(&smt_reader);
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let output = db.fetch_output(&hash)?;
trace!(
target: LOG_TARGET,
"fetch_outputs_with_spend_status_at_tip: hash: {}, output: {:?}",
hash.to_hex(),
output
);
if let Some(mined_info) = output {
let smt_key = KeyHash(
mined_info
.output
.commitment
.as_bytes()
.try_into()
.expect("must be 32 bytes"),
);
let spent = smt
.get(smt_key, tip)
.map_err(ChainStorageError::JellyfishMerkleTreeError)?
.is_none();
trace!(
target: LOG_TARGET,
"fetch_outputs_with_spend_status_at_tip: smt_key: {smt_key:?}, spent: {spent}"
);
result.push(Some((mined_info.output, spent)));
} else {
result.push(None);
}
}
Ok(result)
}
pub fn fetch_outputs_mined_info(
&self,
hashes: Vec<HashOutput>,
) -> Result<Vec<Option<OutputMinedInfo>>, ChainStorageError> {
let db = self.db_read_access()?;
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let output = db.fetch_output(&hash)?;
result.push(output);
}
Ok(result)
}
pub fn fetch_inputs_mined_info(
&self,
hashes: Vec<HashOutput>,
) -> Result<Vec<Option<InputMinedInfo>>, ChainStorageError> {
let db = self.db_read_access()?;
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let input = db.fetch_input(&hash)?;
result.push(input);
}
Ok(result)
}
pub fn fetch_kernel_by_excess_sig(
&self,
excess_sig: CompressedSignature,
) -> Result<Option<(TransactionKernel, HashOutput)>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_kernel_by_excess_sig(&excess_sig)
}
pub fn fetch_kernels_in_block(&self, hash: HashOutput) -> Result<Vec<TransactionKernel>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_kernels_in_block(&hash)
}
pub fn fetch_bad_blocks(&self) -> Result<Vec<BadBlock>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_bad_blocks()
}
pub fn clear_all_bad_blocks(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
db.clear_all_bad_blocks()
}
pub fn fetch_outputs_in_block_with_spend_state(
&self,
header_hash: HashOutput,
spend_status_at_header: Option<HashOutput>,
) -> Result<Vec<(TransactionOutput, bool)>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_outputs_in_block_with_spend_state(&header_hash, spend_status_at_header.as_ref())
}
pub fn fetch_outputs_in_block(&self, header_hash: HashOutput) -> Result<Vec<TransactionOutput>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_outputs_in_block(&header_hash)
}
pub fn fetch_inputs_in_block(&self, header_hash: HashOutput) -> Result<Vec<TransactionInput>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_inputs_in_block(&header_hash)
}
pub fn utxo_count(&self) -> Result<usize, ChainStorageError> {
let db = self.db_read_access()?;
db.utxo_count()
}
pub fn fetch_header(&self, height: u64) -> Result<Option<BlockHeader>, ChainStorageError> {
let db = self.db_read_access()?;
match fetch_header(&*db, height) {
Ok(header) => Ok(Some(header)),
Err(err) if err.is_value_not_found() => Ok(None),
Err(err) => Err(err),
}
}
pub fn fetch_chain_header(&self, height: u64) -> Result<ChainHeader, ChainStorageError> {
let db = self.db_read_access()?;
let chain_header = db.fetch_chain_header_by_height(height)?;
Ok(chain_header)
}
pub fn fetch_header_containing_kernel_mmr(&self, mmr_position: u64) -> Result<ChainHeader, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_header_containing_kernel_mmr(mmr_position)
}
pub fn find_headers_after_hash<I: IntoIterator<Item = HashOutput>>(
&self,
ordered_hashes: I,
count: u64,
) -> Result<Option<(usize, Vec<BlockHeader>)>, ChainStorageError> {
let db = self.db_read_access()?;
for (i, hash) in ordered_hashes.into_iter().enumerate() {
if hash.len() != 32 {
return Err(ChainStorageError::InvalidArguments {
func: "find_headers_after_hash",
arg: "ordered_hashes",
message: format!(
"Hash at index {} was an invalid length. Expected 32 but got {}",
i,
hash.len()
),
});
}
match fetch_header_by_block_hash(&*db, hash)? {
Some(header) => {
if count == 0 {
return Ok(Some((i, Vec::new())));
}
let end_height =
header
.height
.checked_add(count)
.ok_or_else(|| ChainStorageError::InvalidArguments {
func: "find_headers_after_hash",
arg: "count",
message: "count + block height will overflow u64".into(),
})?;
let headers = fetch_headers(&*db, header.height + 1, end_height)?;
return Ok(Some((i, headers)));
},
None => continue,
};
}
Ok(None)
}
pub fn fetch_block_timestamps(&self, start_hash: HashOutput) -> Result<RollingVec<EpochTime>, ChainStorageError> {
let start_header =
self.fetch_header_by_block_hash(start_hash)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockHeader",
field: "start_hash",
value: start_hash.to_hex(),
})?;
let constants = self.consensus_manager.consensus_constants(start_header.height);
let timestamp_window = constants.median_timestamp_count();
let start_window = start_header.height.saturating_sub(timestamp_window as u64);
let timestamps = self
.fetch_headers(start_window..=start_header.height)?
.iter()
.map(|h| h.timestamp)
.collect::<Vec<_>>();
let mut rolling = RollingVec::new(timestamp_window);
rolling.extend(timestamps);
Ok(rolling)
}
pub fn fetch_header_accumulated_data(
&self,
hash: HashOutput,
) -> Result<Option<BlockHeaderAccumulatedData>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_header_accumulated_data(&hash)
}
pub fn insert_valid_headers(&self, headers: Vec<ChainHeader>) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
insert_headers(&mut *db, headers)
}
pub fn fetch_headers<T: RangeBounds<u64>>(&self, bounds: T) -> Result<Vec<BlockHeader>, ChainStorageError> {
let db = self.db_read_access()?;
let (start, mut end) = convert_to_option_bounds(bounds);
if end.is_none() {
end = Some(db.fetch_last_header()?.height);
}
let (start, end) = (start.unwrap_or(0), end.unwrap());
if start > end {
return Ok(Vec::new());
}
fetch_headers(&*db, start, end)
}
pub fn fetch_chain_headers<T: RangeBounds<u64>>(&self, bounds: T) -> Result<Vec<ChainHeader>, ChainStorageError> {
let db = self.db_read_access()?;
let (start, mut end) = convert_to_option_bounds(bounds);
if end.is_none() {
end = Some(db.fetch_last_header()?.height);
}
let (start, end) = (start.unwrap_or(0), end.unwrap());
fetch_chain_headers(&*db, start, end)
}
pub fn fetch_header_by_block_hash(&self, hash: HashOutput) -> Result<Option<BlockHeader>, ChainStorageError> {
let db = self.db_read_access()?;
fetch_header_by_block_hash(&*db, hash)
}
pub fn fetch_chain_header_by_block_hash(&self, hash: HashOutput) -> Result<Option<ChainHeader>, ChainStorageError> {
let db = self.db_read_access()?;
if let Some(header) = fetch_header_by_block_hash(&*db, hash)? {
let accumulated_data =
db.fetch_header_accumulated_data(&hash)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockHeaderAccumulatedData",
field: "hash",
value: hash.to_hex(),
})?;
let height = header.height;
let header = ChainHeader::try_construct(header, accumulated_data).ok_or_else(|| {
ChainStorageError::DataInconsistencyDetected {
function: "fetch_chain_header_by_block_hash",
details: format!(
"Mismatch between header and accumulated data for header {hash} ({height}). This indicates an \
inconsistency in the blockchain database"
),
}
})?;
Ok(Some(header))
} else {
Ok(None)
}
}
pub fn fetch_tip_header(&self) -> Result<ChainHeader, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_tip_header()
}
pub fn fetch_last_header(&self) -> Result<BlockHeader, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_last_header()
}
pub fn fetch_last_chain_header(&self) -> Result<ChainHeader, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_last_chain_header()
}
pub fn fetch_kernel_commitment_sum(&self, at_hash: &HashOutput) -> Result<CompressedCommitment, ChainStorageError> {
Ok(self.fetch_block_accumulated_data(*at_hash)?.kernel_sum().clone())
}
pub fn fetch_block_hashes_from_header_tip(
&self,
n: usize,
offset: usize,
) -> Result<Vec<HashOutput>, ChainStorageError> {
if n == 0 {
return Ok(Vec::new());
}
let db = self.db_read_access()?;
let tip_header = db.fetch_last_header()?;
let end_height = match tip_header.height.checked_sub(offset as u64) {
Some(h) => h,
None => {
return Ok(Vec::new());
},
};
let start = end_height.saturating_sub(n as u64 - 1);
let headers = fetch_headers(&*db, start, end_height)?;
Ok(headers.into_iter().map(|h| h.hash()).rev().collect())
}
pub fn fetch_block_accumulated_data(&self, at_hash: HashOutput) -> Result<BlockAccumulatedData, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_block_accumulated_data(&at_hash)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockAccumulatedData",
field: "at_hash",
value: at_hash.to_hex(),
})
}
pub fn fetch_block_accumulated_data_by_height(
&self,
height: u64,
) -> Result<BlockAccumulatedData, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_block_accumulated_data_by_height(height).or_not_found(
"BlockAccumulatedData",
"height",
height.to_string(),
)
}
pub fn fetch_orphan(&self, hash: HashOutput) -> Result<Block, ChainStorageError> {
let db = self.db_read_access()?;
fetch_orphan(&*db, hash)
}
pub fn orphan_count(&self) -> Result<usize, ChainStorageError> {
let db = self.db_read_access()?;
db.orphan_count()
}
pub fn fetch_target_difficulty_for_next_block(
&self,
pow_algo: PowAlgorithm,
current_block_hash: HashOutput,
) -> Result<TargetDifficultyWindow, ChainStorageError> {
let db = self.db_read_access()?;
fetch_target_difficulty_for_next_block(&*db, &self.consensus_manager, pow_algo, ¤t_block_hash)
}
pub fn fetch_target_difficulties_for_next_block(
&self,
current_block_hash: HashOutput,
) -> Result<TargetDifficulties, ChainStorageError> {
let db = self.db_read_access()?;
let mut current_header = db.fetch_chain_header_in_all_chains(¤t_block_hash)?;
let mut targets = TargetDifficulties::new(&self.consensus_manager, current_header.height().saturating_add(1))
.map_err(ChainStorageError::UnexpectedResult)?;
targets
.add_front(
current_header.header(),
current_header.accumulated_data().target_difficulty,
)
.map_err(ChainStorageError::UnexpectedResult)?;
while current_header.height() > 0 && !targets.is_full() {
current_header = db.fetch_chain_header_in_all_chains(¤t_header.header().prev_hash)?;
if !targets
.is_algo_full(current_header.header().pow_algo())
.map_err(ChainStorageError::UnexpectedResult)?
{
targets
.add_front(
current_header.header(),
current_header.accumulated_data().target_difficulty,
)
.map_err(ChainStorageError::UnexpectedResult)?;
}
if targets.is_full() {
break;
}
}
Ok(targets)
}
pub fn prepare_new_block(&self, template: NewBlockTemplate) -> Result<Block, ChainStorageError> {
let NewBlockTemplate { header, mut body, .. } = template;
if header.height == 0 {
return Err(ChainStorageError::InvalidArguments {
func: "prepare_new_block",
arg: "template",
message: "Invalid height for NewBlockTemplate: must be greater than 0".to_string(),
});
}
body.sort();
let mut header = BlockHeader::from(header);
let prev_block_height = header.height - 1;
let min_height = header.height.saturating_sub(
self.consensus_manager
.consensus_constants(header.height)
.median_timestamp_count() as u64,
);
let db = self.db_read_access()?;
let tip_header = db.fetch_tip_header()?;
if header.height != tip_header.height() + 1 {
return Err(ChainStorageError::InvalidArguments {
func: "prepare_new_block",
arg: "template",
message: format!(
"Expected new block template height to be {} but was {}",
tip_header.height() + 1,
header.height
),
});
}
if header.prev_hash != *tip_header.hash() {
return Err(ChainStorageError::InvalidArguments {
func: "prepare_new_block",
arg: "template",
message: format!(
"Expected new block template previous hash to be set to the current tip hash ({}) but was {}",
tip_header.hash(),
header.prev_hash,
),
});
}
let timestamps = fetch_headers(&*db, min_height, prev_block_height)?
.iter()
.map(|h| h.timestamp)
.collect::<Vec<_>>();
if timestamps.is_empty() {
return Err(ChainStorageError::DataInconsistencyDetected {
function: "prepare_new_block",
details: format!(
"No timestamps were returned within heights {} - {} by the database despite the tip header height \
being {}",
min_height,
prev_block_height,
tip_header.height()
),
});
}
let median_timestamp = calc_median_timestamp(×tamps)?;
while median_timestamp >= header.timestamp {
header.timestamp = median_timestamp
.checked_add(EpochTime::from(1))
.ok_or(ChainStorageError::UnexpectedResult("Timestamp overflowed".to_string()))?;
}
let mut block = Block { header, body };
let roots = calculate_mmr_roots(&*db, self.rules(), &block)?;
block.header.kernel_mr = roots.kernel_mr;
block.header.kernel_mmr_size = roots.kernel_mmr_size;
block.header.input_mr = roots.input_mr;
block.header.output_mr = roots.output_mr;
block.header.block_output_mr = roots.block_output_mr;
block.header.output_smt_size = roots.output_smt_size;
block.header.validator_node_mr = roots.validator_node_mr;
block.header.validator_node_size = roots.validator_node_size;
Ok(block)
}
pub fn calculate_mmr_roots(&self, block: Block) -> Result<(Block, MmrRoots), ChainStorageError> {
let db = self.db_read_access()?;
if !block.body.is_sorted() {
return Err(ChainStorageError::InvalidBlock(
"calculate_mmr_roots expected a sorted block body, however the block body was not sorted".to_string(),
));
};
let mmr_roots = match calculate_mmr_roots(&*db, self.rules(), &block) {
Ok(v) => v,
Err(e) => {
return Err(e);
},
};
Ok((block, mmr_roots))
}
pub fn fetch_mmr_size(&self, tree: MmrTree) -> Result<u64, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_mmr_size(tree)
}
pub fn get_validator_node(
&self,
sidechain_pk: Option<CompressedPublicKey>,
public_key: CompressedPublicKey,
) -> Result<Option<ValidatorNodeRegistrationInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.get_validator_node(sidechain_pk.as_ref(), public_key)
}
pub fn add_block(&self, candidate_block: Arc<Block>) -> Result<BlockAddResult, ChainStorageError> {
let timer = Instant::now();
let block_hash = candidate_block.hash();
if self.is_add_block_disabled() {
warn!(
target: LOG_TARGET,
"add_block is disabled, node busy syncing. Ignoring candidate block #{} ({})",
candidate_block.header.height,
block_hash,
);
return Err(ChainStorageError::AddBlockOperationLocked);
}
let new_height = candidate_block.header.height;
trace!(
target: LOG_TARGET,
"[add_block] waiting for write access to add block block #{} '{}'",
new_height,
block_hash.to_hex(),
);
let before_lock = timer.elapsed();
let mut db = self.db_write_access()?;
let after_lock = timer.elapsed();
trace!(
target: LOG_TARGET,
"[add_block] acquired write access db lock for block #{} '{}' in {:.2?}",
new_height,
block_hash.to_hex(),
after_lock - before_lock,
);
if db.contains(&DbKey::HeaderHash(block_hash))? {
return Ok(BlockAddResult::BlockExists);
}
let (is_bad_block, reason) = db.bad_block_exists(block_hash)?;
if is_bad_block {
return Err(ChainStorageError::ValidationError {
source: ValidationError::BadBlockFound {
hash: block_hash.to_hex(),
reason,
},
});
}
let block_add_result = add_block(
&mut *db,
&self.config,
&self.consensus_manager,
&*self.validators.block,
&*self.validators.header,
self.consensus_manager.chain_strength_comparer(),
candidate_block,
)?;
if block_add_result.was_chain_modified() {
info!(
target: LOG_TARGET,
"Best chain is now at height: {}",
db.fetch_chain_metadata()?.best_block_height()
);
if self.is_background_pruning.load(atomic::Ordering::SeqCst) {
debug!(
target: LOG_TARGET,
"Background pruning is active, skipping inline prune_database_if_needed."
);
} else {
prune_database_if_needed(&mut *db, self.config.pruning_horizon, self.config.pruning_interval)?;
}
}
if let Err(e) = cleanup_orphans(&mut *db, self.config.orphan_storage_capacity) {
warn!(target: LOG_TARGET, "Failed to clean up orphans: {e}");
}
debug!(
target: LOG_TARGET,
"[add_block] released write access db lock for block #{} in {:.2?}, `add_block` result: {}",
new_height, timer.elapsed() - after_lock, block_add_result
);
Ok(block_add_result)
}
pub fn cleanup_orphans(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
cleanup_orphans(&mut *db, self.config.orphan_storage_capacity)?;
Ok(())
}
pub fn clear_all_pending_headers(&self) -> Result<usize, ChainStorageError> {
let db = self.db_write_access()?;
db.clear_all_pending_headers()
}
pub fn cleanup_all_orphans(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
cleanup_orphans(&mut *db, 0)?;
Ok(())
}
fn insert_block(&self, block: Arc<ChainBlock>) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
let mut txn = DbTransaction::new();
insert_best_block(&mut txn, block)?;
db.write(txn)
}
fn store_pruning_horizon(&self, pruning_horizon: u64) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
store_pruning_horizon(&mut *db, pruning_horizon)
}
pub fn prune_to_height(&self, height: u64) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
prune_to_height(&mut *db, height)
}
pub fn fetch_block(&self, height: u64, compact: bool) -> Result<HistoricalBlock, ChainStorageError> {
let db = self.db_read_access()?;
fetch_block(&*db, height, compact)
}
pub fn fetch_blocks<T: RangeBounds<u64>>(
&self,
bounds: T,
compact: bool,
) -> Result<Vec<HistoricalBlock>, ChainStorageError> {
let db = self.db_read_access()?;
let (mut start, mut end) = convert_to_option_bounds(bounds);
let metadata = db.fetch_chain_metadata()?;
if start.is_none() {
start = Some(metadata.pruned_height());
}
if end.is_none() {
end = Some(metadata.best_block_height());
}
let (start, end) = (start.unwrap(), end.unwrap());
if end > metadata.best_block_height() {
return Err(ChainStorageError::ValueNotFound {
entity: "Block",
field: "end height",
value: end.to_string(),
});
}
trace!(target: LOG_TARGET, "Fetching blocks {start}-{end}");
let blocks = fetch_blocks(&*db, start, end, compact)?;
trace!(target: LOG_TARGET, "Fetched {} block(s)", blocks.len());
Ok(blocks)
}
pub fn fetch_block_by_hash(
&self,
hash: BlockHash,
compact: bool,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
let db = self.db_read_access()?;
fetch_block_by_hash(&*db, hash, compact)
}
pub fn fetch_orphan_blocks(&self) -> Result<Vec<ChainHeader>, ChainStorageError> {
let db = self.db_read_access()?;
fetch_orphan_blocks(&*db)
}
pub fn fetch_block_with_kernel(
&self,
excess_sig: CompressedSignature,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
let db = self.db_read_access()?;
fetch_block_by_kernel_signature(&*db, excess_sig)
}
pub fn fetch_block_with_utxo(
&self,
commitment: CompressedCommitment,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
let db = self.db_read_access()?;
fetch_block_by_utxo_commitment(&*db, &commitment)
}
pub fn chain_block_or_orphan_block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.fetch_block_accumulated_data(&hash)?.is_some() || db.contains(&DbKey::OrphanBlock(hash))?)
}
pub fn chain_header_or_orphan_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.contains(&DbKey::HeaderHash(hash))? || db.contains(&DbKey::OrphanBlock(hash))?)
}
pub fn bad_block_exists(&self, hash: BlockHash) -> Result<(bool, String), ChainStorageError> {
let db = self.db_read_access()?;
db.bad_block_exists(hash)
}
pub fn commit(&self, txn: DbTransaction) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
db.write(txn)
}
pub fn rewind_to_height(&self, height: u64) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let mut db = self.db_write_access()?;
rewind_to_height(&mut *db, height)
}
pub fn rewind_to_hash(&self, hash: BlockHash) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let mut db = self.db_write_access()?;
rewind_to_hash(&mut *db, hash)
}
pub fn swap_to_highest_pow_chain(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
swap_to_highest_pow_chain(
&mut *db,
&self.config,
&*self.validators.block,
self.consensus_manager.chain_strength_comparer(),
)?;
Ok(())
}
pub fn fetch_horizon_data(&self) -> Result<HorizonData, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.fetch_horizon_data()?.unwrap_or_default())
}
pub fn fetch_horizon_sync_output_checkpoint(
&self,
) -> Result<Option<HorizonSyncOutputCheckpoint>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_horizon_sync_output_checkpoint()
}
pub fn verify_horizon_sync_output_root(
&self,
version: u64,
expected_root: HashOutput,
) -> Result<(), ChainStorageError> {
let db = self.db_read_access()?;
db.verify_horizon_sync_output_root(version, expected_root)
}
pub fn get_stats(&self) -> Result<DbBasicStats, ChainStorageError> {
let lock = self.db_read_access()?;
lock.get_stats()
}
pub fn fetch_total_size_stats(&self) -> Result<DbTotalSizeStats, ChainStorageError> {
let lock = self.db_read_access()?;
lock.fetch_total_size_stats()
}
pub fn fetch_all_reorgs(&self) -> Result<Vec<Reorg>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_all_reorgs()
}
pub fn clear_all_reorgs(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
let mut txn = DbTransaction::new();
txn.clear_all_reorgs();
db.write(txn)
}
pub fn fetch_all_active_validator_nodes(
&self,
height: u64,
) -> Result<Vec<ValidatorNodeRegistrationInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_all_active_validator_nodes(height)
}
pub fn fetch_all_orphans(&self) -> Result<Vec<ChainHeader>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_all_orphans()
}
pub fn fetch_active_validator_nodes(
&self,
height: u64,
sidechain_pk: Option<CompressedPublicKey>,
) -> Result<Vec<ValidatorNodeRegistrationInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_active_validator_nodes(sidechain_pk.as_ref(), height)
}
pub fn fetch_validators_activating_in_epoch(
&self,
sidechain_pk: Option<CompressedPublicKey>,
epoch: VnEpoch,
) -> Result<Vec<ValidatorNodeRegistrationInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_validators_activating_in_epoch(sidechain_pk.as_ref(), epoch)
}
pub fn fetch_validators_exiting_in_epoch(
&self,
sidechain_pk: Option<CompressedPublicKey>,
epoch: VnEpoch,
) -> Result<Vec<ValidatorNodeRegistrationInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_validators_exiting_in_epoch(sidechain_pk.as_ref(), epoch)
}
pub fn fetch_template_registrations<T: RangeBounds<u64>>(
&self,
range: T,
) -> Result<Vec<TemplateRegistrationEntry>, ChainStorageError> {
let db = self.db_read_access()?;
let (start, mut end) = convert_to_option_bounds(range);
if end.is_none() {
end = Some(db.fetch_last_header()?.height);
}
let (start, end) = (start.unwrap_or(0), end.unwrap());
db.fetch_template_registrations(start, end)
}
pub fn generate_kernel_merkle_proof(
&self,
excess_sig: CompressedSignature,
) -> Result<KernelMerkleProof, ChainStorageError> {
const OPERATION: &str = "generate_kernel_merkle_proof";
let db = self.db_read_access()?;
let (kernel, block_hash) =
db.fetch_kernel_by_excess_sig(&excess_sig)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "TransactionKernel",
field: "excess_sig",
value: excess_sig.get_signature().to_hex(),
})?;
let block = fetch_block_by_hash(&*db, block_hash, true)?.ok_or_else(|| {
ChainStorageError::DataInconsistencyDetected {
function: OPERATION,
details: format!(
"Kernel with excess sig {} found in database, but block not found in block database",
excess_sig.get_signature().reveal()
),
}
})?;
let BlockAccumulatedData { kernels, .. } = db
.fetch_block_accumulated_data(&block.header().prev_hash)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockAccumulatedData",
field: "block_hash",
value: block_hash.to_hex(),
})?;
info!(
target: LOG_TARGET,
"Generating kernel merkle proof for kernel in block #{} ({}) MMR size: {}",
block.header().height,
block_hash,
block.header().kernel_mmr_size,
);
let mut kernel_mmr = PrunedKernelMmr::new(kernels);
for kernel in block.block().body.kernels() {
let hash = kernel.hash();
kernel_mmr.push(hash.to_vec())?;
}
let kernel_hash = kernel.hash();
let leaf_index = kernel_mmr.find_leaf_index(kernel_hash.as_slice())?.ok_or_else(|| {
ChainStorageError::DataInconsistencyDetected {
function: OPERATION,
details: format!(
"Kernel with hash {} found in database, but not found in MMR",
kernel_hash
),
}
})?;
let merkle_proof = MerkleProof::for_leaf_node(&kernel_mmr, leaf_index)?;
Ok(KernelMerkleProof {
merkle_proof,
leaf_index,
kernel_hash,
block_hash,
})
}
}
fn unexpected_result<T>(request: DbKey, response: DbValue) -> Result<T, ChainStorageError> {
let msg = format!("Unexpected result for database query {request}. Response: {response}");
error!(target: LOG_TARGET, "{msg}");
Err(ChainStorageError::UnexpectedResult(msg))
}
#[derive(Debug, Clone)]
pub struct MmrRoots {
pub kernel_mr: FixedHash,
pub kernel_mmr_size: u64,
pub input_mr: FixedHash,
pub output_mr: FixedHash,
pub block_output_mr: FixedHash,
pub output_smt_size: u64,
pub validator_node_mr: FixedHash,
pub validator_node_size: u64,
}
impl std::fmt::Display for MmrRoots {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "MMR Roots")?;
writeln!(f, "Input MR : {}", self.input_mr)?;
writeln!(f, "Kernel MR : {}", self.kernel_mr)?;
writeln!(f, "Kernel MMR Size : {}", self.kernel_mmr_size)?;
writeln!(f, "Output MR : {}", self.output_mr)?;
writeln!(f, "Block Output MR : {}", self.block_output_mr)?;
writeln!(f, "Output SMT Size : {}", self.output_smt_size)?;
writeln!(f, "Validator MR : {}", self.validator_node_mr)?;
Ok(())
}
}
#[allow(clippy::too_many_lines)]
#[allow(clippy::similar_names)]
pub fn calculate_mmr_roots<T: BlockchainBackend>(
db: &T,
rules: &BaseNodeConsensusManager,
block: &Block,
) -> Result<MmrRoots, ChainStorageError> {
let header = &block.header;
let body = &block.body;
let smt_reader = db.create_smt_reader()?;
let metadata = db.fetch_chain_metadata()?;
if header.prev_hash != *metadata.best_block_hash() {
return Err(ChainStorageError::CannotCalculateNonTipMmr(format!(
"Block (#{}) is not building on tip, previous hash is {} but the current tip is #{} {}",
header.height,
header.prev_hash,
metadata.best_block_height(),
metadata.best_block_hash(),
)));
}
let BlockAccumulatedData { kernels, .. } =
db.fetch_block_accumulated_data(&header.prev_hash)?
.ok_or_else(|| ChainStorageError::ValueNotFound {
entity: "BlockAccumulatedData",
field: "header_hash",
value: header.prev_hash.to_hex(),
})?;
let mut kernel_mmr = PrunedKernelMmr::new(kernels);
let mut input_mmr = PrunedInputMmr::new(PrunedHashSet::default());
let mut block_output_mmr = PrunedOutputMmr::new(PrunedHashSet::default());
let mut normal_output_mmr = PrunedOutputMmr::new(PrunedHashSet::default());
let output_smt = JellyfishMerkleTree::<_, SmtHasher>::new(&smt_reader);
for kernel in body.kernels() {
kernel_mmr.push(kernel.hash().to_vec())?;
}
let mut batch = Vec::with_capacity(body.outputs().len() + body.inputs().len());
for output in body.outputs() {
if output.features.is_coinbase() {
block_output_mmr.push(output.hash().to_vec())?;
} else {
normal_output_mmr.push(output.hash().to_vec())?;
}
if !output.is_burned() {
let smt_key = KeyHash(output.commitment.as_bytes().try_into().expect("commitment is 32 bytes"));
let smt_value = output.smt_hash(header.height);
batch.push((smt_key, Some(smt_value.to_vec())));
}
}
block_output_mmr.push(normal_output_mmr.get_merkle_root()?.to_vec())?;
for input in body.inputs() {
input_mmr.push(input.canonical_hash().to_vec())?;
let smt_key = KeyHash(
input
.commitment()?
.as_bytes()
.try_into()
.expect("Commitment is 32 bytes"),
);
batch.push((smt_key, None));
}
let block_height = block.header.height;
let epoch_len = rules.consensus_constants(block_height).epoch_length();
let tip_header = fetch_header(db, block_height.saturating_sub(1))?;
let (validator_node_mr, validator_node_size) = if block_height.is_multiple_of(epoch_len) {
let validator_nodes = db.fetch_all_active_validator_nodes(block_height)?;
(calculate_validator_node_mr(&validator_nodes)?, validator_nodes.len())
} else {
(tip_header.validator_node_mr, 0)
};
let block_output_mr = block_output_mr_hash_from_pruned_mmr(&block_output_mmr)?;
let (output_smt_root, changes) = output_smt
.put_value_set(batch, header.height)
.map_err(ChainStorageError::JellyfishMerkleTreeError)?;
let mut size = tip_header.output_smt_size;
size += changes.node_stats.first().map(|s| s.new_leaves).unwrap_or(0) as u64;
size = size.saturating_sub(changes.node_stats.first().map(|s| s.stale_leaves).unwrap_or(0) as u64);
let mmr_roots = MmrRoots {
kernel_mr: kernel_mr_hash_from_pruned_mmr(&kernel_mmr)?,
kernel_mmr_size: kernel_mmr.get_leaf_count()? as u64,
input_mr: input_mr_hash_from_pruned_mmr(&input_mmr)?,
output_mr: FixedHash::from(output_smt_root.0),
block_output_mr,
output_smt_size: size,
validator_node_mr,
validator_node_size: validator_node_size as u64,
};
Ok(mmr_roots)
}
pub fn calculate_validator_node_mr(
validator_nodes: &[ValidatorNodeRegistrationInfo],
) -> Result<FixedHash, ChainStorageError> {
if validator_nodes.is_empty() {
return Ok(VALIDATOR_MR_EMPTY_PLACEHOLDER_HASH);
}
struct EmptyJmtStore;
impl TreeReader for EmptyJmtStore {
fn get_node_option(&self, _node_key: &NodeKey) -> anyhow::Result<Option<Node>> {
Ok(None)
}
fn get_value_option(&self, _max_version: Version, _key_hash: KeyHash) -> anyhow::Result<Option<OwnedValue>> {
Ok(None)
}
fn get_rightmost_leaf(&self) -> anyhow::Result<Option<(NodeKey, LeafNode)>> {
Ok(None)
}
}
fn hash_node((pk, s): &(&CompressedPublicKey, &[u8; 32])) -> KeyHash {
KeyHash(
DomainSeparatedConsensusHasher::<TransactionHashDomain, Blake2b<U32>>::new("validator_node")
.chain(pk)
.chain(s)
.finalize()
.into(),
)
}
fn hash_sid(sid: Option<&CompressedPublicKey>) -> KeyHash {
KeyHash(
DomainSeparatedConsensusHasher::<TransactionHashDomain, Blake2b<U32>>::new("validator_node_sid")
.chain(&sid)
.finalize()
.into(),
)
}
let mut validator_sets = Vec::<(Option<&CompressedPublicKey>, Vec<(&CompressedPublicKey, &[u8; 32])>)>::new();
for ValidatorNodeRegistrationInfo {
public_key: pk,
sidechain_id,
shard_key,
..
} in validator_nodes
{
match validator_sets.last_mut() {
Some((sid, set)) if *sid == sidechain_id.as_ref() => {
set.push((pk, shard_key));
},
Some(_) | None => {
validator_sets.push((sidechain_id.as_ref(), vec![(pk, shard_key)]));
},
}
}
let mut roots = Vec::with_capacity(validator_sets.len());
for (sidechain_pk, set) in validator_sets {
let sidechain_mt = JellyfishMerkleTree::<_, ValidatorNodeJmtHasher>::new(&EmptyJmtStore);
let (root, _) = sidechain_mt
.put_value_set(
set.iter()
.map(|pk_and_shard_key| (hash_node(pk_and_shard_key), Some(vec![]))),
1,
)
.map_err(ChainStorageError::JellyfishMerkleTreeError)?;
roots.push((hash_sid(sidechain_pk), root));
}
let root_mt = JellyfishMerkleTree::<_, ValidatorNodeJmtHasher>::new(&EmptyJmtStore);
let (root_hash, _) = root_mt
.put_value_set(roots.into_iter().map(|(sid, root)| (sid, Some(root.0.to_vec()))), 1)
.map_err(ChainStorageError::JellyfishMerkleTreeError)?;
Ok(FixedHash::from(root_hash.0))
}
pub fn fetch_header<T: BlockchainBackend>(db: &T, block_num: u64) -> Result<BlockHeader, ChainStorageError> {
fetch!(db, block_num, HeaderHeight)
}
pub fn fetch_headers<T: BlockchainBackend>(
db: &T,
mut start: u64,
mut end_inclusive: u64,
) -> Result<Vec<BlockHeader>, ChainStorageError> {
let is_reversed = start > end_inclusive;
if is_reversed {
mem::swap(&mut end_inclusive, &mut start);
}
#[allow(clippy::cast_possible_truncation)]
let mut headers = Vec::with_capacity(end_inclusive.saturating_sub(start) as usize);
for h in start..=end_inclusive {
match db.fetch(&DbKey::HeaderHeight(h))? {
Some(DbValue::HeaderHeight(header)) => {
headers.push(*header);
},
Some(_) => unreachable!(),
None => break,
}
}
if is_reversed {
Ok(headers.into_iter().rev().collect())
} else {
Ok(headers)
}
}
pub fn fetch_chain_headers<T: BlockchainBackend>(
db: &T,
start: u64,
end_inclusive: u64,
) -> Result<Vec<ChainHeader>, ChainStorageError> {
if start > end_inclusive {
return Err(ChainStorageError::InvalidQuery(
"end_inclusive must be greater than start".to_string(),
));
}
#[allow(clippy::cast_possible_truncation)]
let mut headers = Vec::with_capacity((end_inclusive - start) as usize);
for h in start..=end_inclusive {
match db.fetch_chain_header_by_height(h) {
Ok(header) => {
headers.push(header);
},
Err(ChainStorageError::ValueNotFound { .. }) => break,
Err(e) => return Err(e),
}
}
Ok(headers)
}
fn insert_headers<T: BlockchainBackend>(db: &mut T, headers: Vec<ChainHeader>) -> Result<(), ChainStorageError> {
let mut txn = DbTransaction::new();
headers.into_iter().for_each(|chain_header| {
txn.insert_chain_header(chain_header);
});
db.write(txn)
}
fn fetch_header_by_block_hash<T: BlockchainBackend>(
db: &T,
hash: BlockHash,
) -> Result<Option<BlockHeader>, ChainStorageError> {
try_fetch!(db, hash, HeaderHash)
}
fn fetch_orphan<T: BlockchainBackend>(db: &T, hash: BlockHash) -> Result<Block, ChainStorageError> {
fetch!(db, hash, OrphanBlock)
}
fn add_block<T: BlockchainBackend>(
db: &mut T,
config: &BlockchainDatabaseConfig,
consensus_manager: &BaseNodeConsensusManager,
block_validator: &dyn CandidateBlockValidator<T>,
header_validator: &dyn HeaderChainLinkedValidator<T>,
chain_strength_comparer: &dyn ChainStrengthComparer,
candidate_block: Arc<Block>,
) -> Result<BlockAddResult, ChainStorageError> {
handle_possible_reorg(
db,
config,
consensus_manager,
block_validator,
header_validator,
chain_strength_comparer,
candidate_block,
)
}
fn insert_best_block(txn: &mut DbTransaction, block: Arc<ChainBlock>) -> Result<(), ChainStorageError> {
let block_hash = block.accumulated_data().hash;
debug!(
target: LOG_TARGET,
"Storing new block #{} `{}`",
block.header().height,
block_hash,
);
let height = block.height();
let timestamp = block.header().timestamp().as_u64();
let accumulated_difficulty = block.accumulated_data().total_accumulated_difficulty;
let expected_prev_best_block = block.block().header.prev_hash;
txn.insert_chain_header(block.to_chain_header())
.insert_tip_block_body(block)
.set_best_block(
height,
block_hash,
accumulated_difficulty,
expected_prev_best_block,
timestamp,
);
Ok(())
}
fn store_pruning_horizon<T: BlockchainBackend>(db: &mut T, pruning_horizon: u64) -> Result<(), ChainStorageError> {
let mut txn = DbTransaction::new();
txn.set_pruning_horizon(pruning_horizon);
db.write(txn)
}
#[allow(clippy::ptr_arg)]
pub fn fetch_target_difficulty_for_next_block<T: BlockchainBackend>(
db: &T,
consensus_manager: &BaseNodeConsensusManager,
pow_algo: PowAlgorithm,
current_block_hash: &HashOutput,
) -> Result<TargetDifficultyWindow, ChainStorageError> {
let mut header = db.fetch_chain_header_in_all_chains(current_block_hash)?;
let mut target_difficulties = consensus_manager
.new_target_difficulty(pow_algo, header.height() + 1)
.map_err(ChainStorageError::UnexpectedResult)?;
if header.header().pow.pow_algo == pow_algo {
target_difficulties.add_front(header.header().timestamp(), header.accumulated_data().target_difficulty);
}
while header.height() > 0 && !target_difficulties.is_full() {
header = db.fetch_chain_header_in_all_chains(&header.header().prev_hash)?;
if header.header().pow.pow_algo == pow_algo {
target_difficulties.add_front(header.header().timestamp(), header.accumulated_data().target_difficulty);
}
}
Ok(target_difficulties)
}
fn fetch_block<T: BlockchainBackend>(db: &T, height: u64, compact: bool) -> Result<HistoricalBlock, ChainStorageError> {
let mark = Instant::now();
let (tip_height, _is_pruned) = check_for_valid_height(db, height)?;
let chain_header = db.fetch_chain_header_by_height(height)?;
let (header, accumulated_data) = chain_header.into_parts();
let kernels = db.fetch_kernels_in_block(&accumulated_data.hash)?;
let outputs = db.fetch_outputs_in_block(&accumulated_data.hash)?;
let inputs = db
.fetch_inputs_in_block(&accumulated_data.hash)?
.into_iter()
.map(|mut compact_input| {
if compact {
return Ok(compact_input);
}
let utxo_mined_info = match db.fetch_output(&compact_input.output_hash()) {
Ok(Some(o)) => o,
Ok(None) => {
return Err(ChainStorageError::InvalidBlock(
"An Input in a block doesn't contain a matching spending output".to_string(),
));
},
Err(e) => return Err(e),
};
compact_input.add_output_data(utxo_mined_info.output);
Ok(compact_input)
})
.collect::<Result<Vec<TransactionInput>, _>>()?;
let block = header
.into_builder()
.add_inputs(inputs)
.add_outputs(outputs)
.add_kernels(kernels)
.build();
trace!(
target: LOG_TARGET,
"Fetched block at height:{} in {:.0?}",
height,
mark.elapsed()
);
Ok(HistoricalBlock::new(block, tip_height - height + 1, accumulated_data))
}
fn fetch_blocks<T: BlockchainBackend>(
db: &T,
start: u64,
end_inclusive: u64,
compact: bool,
) -> Result<Vec<HistoricalBlock>, ChainStorageError> {
(start..=end_inclusive).map(|i| fetch_block(db, i, compact)).collect()
}
fn fetch_block_by_kernel_signature<T: BlockchainBackend>(
db: &T,
excess_sig: CompressedSignature,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
match db.fetch_kernel_by_excess_sig(&excess_sig) {
Ok(kernel) => match kernel {
Some((_kernel, hash)) => fetch_block_by_hash(db, hash, false),
None => Ok(None),
},
Err(_) => Err(ChainStorageError::ValueNotFound {
entity: "Kernel",
field: "Excess sig",
value: excess_sig.get_signature().to_hex(),
}),
}
}
fn fetch_block_by_utxo_commitment<T: BlockchainBackend>(
db: &T,
commitment: &CompressedCommitment,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
let output = db.fetch_unspent_output_hash_by_commitment(commitment)?;
match output {
Some(hash) => match db.fetch_output(&hash)? {
Some(mined_info) => fetch_block_by_hash(db, mined_info.header_hash, false),
None => Ok(None),
},
None => Ok(None),
}
}
fn fetch_block_by_hash<T: BlockchainBackend>(
db: &T,
hash: BlockHash,
compact: bool,
) -> Result<Option<HistoricalBlock>, ChainStorageError> {
if let Some(header) = fetch_header_by_block_hash(db, hash)? {
return Ok(Some(fetch_block(db, header.height, compact)?));
}
Ok(None)
}
fn fetch_orphan_blocks<T: BlockchainBackend>(db: &T) -> Result<Vec<ChainHeader>, ChainStorageError> {
db.fetch_all_orphans()
}
fn check_for_valid_height<T: BlockchainBackend>(db: &T, height: u64) -> Result<(u64, bool), ChainStorageError> {
let metadata = db.fetch_chain_metadata()?;
let tip_height = metadata.best_block_height();
if height > tip_height {
return Err(ChainStorageError::InvalidQuery(format!(
"Cannot get block at height {height}. Chain tip is at {tip_height}"
)));
}
let pruned_height = metadata.pruned_height();
Ok((tip_height, height < pruned_height))
}
#[allow(clippy::too_many_lines)]
pub(crate) fn rewind_to_height<T: BlockchainBackend>(
db: &mut T,
target_height: u64,
) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let last_header = db.fetch_last_header()?;
let last_header_height = last_header.height;
let metadata = db.fetch_chain_metadata()?;
let last_block_height = metadata.best_block_height();
let steps_back = last_header_height
.checked_sub(cmp::max(last_block_height, target_height))
.ok_or_else(|| {
ChainStorageError::InvalidQuery(format!(
"Cannot rewind to height ({}) that is greater than the tip header height {}.",
cmp::max(target_height, last_block_height),
last_header_height
))
})?;
if steps_back > 0 {
info!(
target: LOG_TARGET,
"Rewinding headers from height {} to {}",
last_header_height,
last_header_height - steps_back
);
}
let mut txn = DbTransaction::new();
for h in 0..steps_back {
let height = last_header_height - h;
info!(
target: LOG_TARGET,
"Rewinding headers at height {}",
height,
);
if db.fetch_block_accumulated_data_by_height(height)?.is_some() {
let header = fetch_header(db, height)?;
let header_hash = header.hash();
txn.delete_block_accumulated_data(height);
txn.delete_all_kernerls_in_block(header_hash);
txn.delete_all_inputs_in_block(header_hash);
}
txn.delete_header(height);
}
db.write(txn)?;
let mut steps_back = last_block_height.saturating_sub(target_height);
if steps_back == 0 {
return Ok(vec![]);
}
let mut removed_blocks = Vec::with_capacity(usize::try_from(steps_back).unwrap_or(usize::MAX));
info!(
target: LOG_TARGET,
"Rewinding blocks from height {last_block_height} to {target_height}"
);
let effective_pruning_horizon = metadata.best_block_height().saturating_sub(metadata.pruned_height());
let prune_past_horizon = metadata.is_pruned_node() && steps_back > effective_pruning_horizon;
if prune_past_horizon {
warn!(
target: LOG_TARGET,
"WARNING, reorg past pruning horizon (more than {effective_pruning_horizon} blocks back), rewinding back to 0"
);
steps_back = effective_pruning_horizon;
}
db.set_stats_total_height(steps_back);
for h in 0..steps_back {
if h % 50 == 0 {
db.update_stats_progress(h);
}
let mut txn = DbTransaction::new();
info!(target: LOG_TARGET, "Deleting block {}", last_block_height - h,);
let block = fetch_block(db, last_block_height - h, false)?;
let block = Arc::new(block.try_into_chain_block()?);
let block_hash = *block.hash();
txn.delete_tip_block(block_hash);
txn.delete_header(last_block_height - h);
if !prune_past_horizon && !db.contains(&DbKey::OrphanBlock(*block.hash()))? {
txn.insert_chained_orphan(block.clone());
}
removed_blocks.push(block);
let chain_header = db.fetch_chain_header_by_height(if prune_past_horizon && h + 1 == steps_back {
0
} else {
last_block_height - h - 1
})?;
let metadata = db.fetch_chain_metadata()?;
let expected_block_hash = *metadata.best_block_hash();
txn.set_best_block(
chain_header.height(),
chain_header.accumulated_data().hash,
chain_header.accumulated_data().total_accumulated_difficulty,
expected_block_hash,
chain_header.timestamp(),
);
if prune_past_horizon && h + 1 == steps_back {
txn.set_pruned_height(0);
}
if h == 0 {
debug!(target: LOG_TARGET, "Inserting new orphan chain tip: {block_hash}");
txn.insert_orphan_chain_tip(block_hash, chain_header.accumulated_data().total_accumulated_difficulty);
}
debug!(
target: LOG_TARGET,
"Updating best block to height (#{}), total accumulated difficulty: {}",
chain_header.height(),
chain_header.accumulated_data().total_accumulated_difficulty
);
db.write(txn)?;
}
if prune_past_horizon {
for h in 0..(last_block_height - steps_back) {
let height = last_block_height - h - steps_back;
debug!(
target: LOG_TARGET,
"Deleting pruned block data at height {}",
height,
);
let header = fetch_header(db, height)?;
let header_hash = header.hash();
let mut txn = DbTransaction::new();
txn.delete_block_accumulated_data(height);
txn.delete_all_kernerls_in_block(header_hash);
txn.delete_all_inputs_in_block(header_hash);
if height > target_height {
txn.delete_header(height);
}
db.write(txn)?;
}
}
Ok(removed_blocks)
}
fn rewind_to_hash<T: BlockchainBackend>(
db: &mut T,
block_hash: BlockHash,
) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let block_hash_hex = block_hash.to_hex();
let target_header = fetch_header_by_block_hash(&*db, block_hash)?.ok_or(ChainStorageError::ValueNotFound {
entity: "BlockHeader",
field: "block_hash",
value: block_hash_hex,
})?;
rewind_to_height(db, target_header.height)
}
fn handle_possible_reorg<T: BlockchainBackend>(
db: &mut T,
config: &BlockchainDatabaseConfig,
consensus_manager: &BaseNodeConsensusManager,
block_validator: &dyn CandidateBlockValidator<T>,
header_validator: &dyn HeaderChainLinkedValidator<T>,
chain_strength_comparer: &dyn ChainStrengthComparer,
candidate_block: Arc<Block>,
) -> Result<BlockAddResult, ChainStorageError> {
let timer = Instant::now();
let height = candidate_block.header.height;
let hash = candidate_block.header.hash();
insert_orphan_and_find_new_tips(db, candidate_block, header_validator, consensus_manager)?;
let after_orphans = timer.elapsed();
let res = swap_to_highest_pow_chain(db, config, block_validator, chain_strength_comparer);
trace!(
target: LOG_TARGET,
"[handle_possible_reorg] block #{}, insert_orphans in {:.2?}, swap_to_highest in {:.2?} '{}'",
height,
after_orphans,
timer.elapsed() - after_orphans,
hash.to_hex(),
);
res
}
fn reorganize_chain<T: BlockchainBackend>(
backend: &mut T,
block_validator: &dyn CandidateBlockValidator<T>,
fork_hash: HashOutput,
new_chain_from_fork: &VecDeque<Arc<ChainBlock>>,
) -> Result<Vec<Arc<ChainBlock>>, ChainStorageError> {
let removed_blocks = rewind_to_hash(backend, fork_hash)?;
debug!(
target: LOG_TARGET,
"Validate and add {} chain block(s) from block {}. Rewound blocks: [{}]",
new_chain_from_fork.len(),
fork_hash,
removed_blocks
.iter()
.map(|b| b.height().to_string())
.collect::<Vec<_>>()
.join(", ")
);
for (i, block) in new_chain_from_fork.iter().enumerate() {
let mut txn = DbTransaction::new();
let block_hash = *block.hash();
txn.delete_orphan(block_hash);
let chain_metadata = backend.fetch_chain_metadata()?;
if let Err(e) = block_validator.validate_body_with_metadata(backend, block, &chain_metadata) {
warn!(
target: LOG_TARGET,
"Orphan block {} ({}) failed validation during chain reorg: {:?}",
block.header().height,
block_hash,
e
);
if e.get_ban_reason().is_some() && e.get_ban_reason().unwrap().ban_duration != BanPeriod::Short {
txn.insert_bad_block(block.header().hash(), block.header().height, e.to_string());
}
for block in new_chain_from_fork.iter().skip(i + 1) {
txn.delete_orphan(*block.hash());
}
backend.write(txn)?;
info!(target: LOG_TARGET, "Restoring previous chain after failed reorg.");
restore_reorged_chain(backend, fork_hash, removed_blocks)?;
return Err(e.into());
}
insert_best_block(&mut txn, block.clone())?;
if let Err(e) = backend.write(txn) {
warn!(
target: LOG_TARGET,
"Failed to commit reorg chain: {e:?}. Restoring last chain."
);
restore_reorged_chain(backend, fork_hash, removed_blocks)?;
return Err(e);
}
}
Ok(removed_blocks)
}
fn swap_to_highest_pow_chain<T: BlockchainBackend>(
db: &mut T,
config: &BlockchainDatabaseConfig,
block_validator: &dyn CandidateBlockValidator<T>,
chain_strength_comparer: &dyn ChainStrengthComparer,
) -> Result<BlockAddResult, ChainStorageError> {
let strongest_orphan_tips = db.fetch_strongest_orphan_chain_tips()?;
if strongest_orphan_tips.is_empty() {
remove_non_canonical_headers(db)?;
return Ok(BlockAddResult::OrphanBlock);
}
let best_fork_header =
find_strongest_orphan_tip(strongest_orphan_tips, chain_strength_comparer).ok_or_else(|| {
warn!(
target: LOG_TARGET,
"Unable to find strongest orphan tip`. This should never happen.",
);
ChainStorageError::InvalidOperation("No chain tips found in orphan pool".to_string())
})?;
let tip_header = db.fetch_tip_header()?;
match chain_strength_comparer.compare(&best_fork_header, &tip_header) {
Ordering::Greater => {
debug!(
target: LOG_TARGET,
"Fork chain (accum_diff:{}, hash:{}) is stronger than the current tip (#{} ({})).",
best_fork_header.accumulated_data().total_accumulated_difficulty,
best_fork_header.accumulated_data().hash,
tip_header.height(),
tip_header.hash(),
);
},
Ordering::Less | Ordering::Equal => {
debug!(
target: LOG_TARGET,
"Fork chain (accum_diff:{}, hash:{}) with block {} ({}) has a weaker difficulty.",
best_fork_header.accumulated_data().total_accumulated_difficulty,
best_fork_header.accumulated_data().hash,
tip_header.header().height,
tip_header.hash(),
);
remove_non_canonical_headers(db)?;
return Ok(BlockAddResult::OrphanBlock);
},
}
let reorg_chain = get_orphan_link_main_chain(db, best_fork_header.hash())?;
let fork_hash = reorg_chain
.front()
.expect("The new orphan block should be in the queue")
.header()
.prev_hash;
let num_added_blocks = reorg_chain.len();
let removed_blocks = reorganize_chain(db, block_validator, fork_hash, &reorg_chain)?;
let num_removed_blocks = removed_blocks.len();
if num_removed_blocks > 0 || num_added_blocks > 1 {
if config.track_reorgs {
let mut txn = DbTransaction::new();
txn.insert_reorg(Reorg::from_reorged_blocks(&reorg_chain, &removed_blocks));
if let Err(e) = db.write(txn) {
error!(target: LOG_TARGET, "Failed to track reorg: {e}");
}
}
log!(
target: LOG_TARGET,
if num_removed_blocks > 1 {
Level::Warn
} else {
Level::Info
}, "Chain reorg required from {} to {} (accum_diff:{}, hash:{}) to (accum_diff:{}, hash:{}). Number of \
blocks to remove: {}, to add: {}.",
tip_header.header().height,
best_fork_header.header().height,
tip_header.accumulated_data().total_accumulated_difficulty,
tip_header.accumulated_data().hash,
best_fork_header.accumulated_data().total_accumulated_difficulty,
best_fork_header.accumulated_data().hash,
num_removed_blocks,
num_added_blocks,
);
Ok(BlockAddResult::ChainReorg {
removed: removed_blocks,
added: reorg_chain.into(),
})
} else {
trace!(
target: LOG_TARGET,
"No reorg required. Number of blocks to remove: {num_removed_blocks}, to add: {num_added_blocks}."
);
Ok(BlockAddResult::Ok(reorg_chain.front().unwrap().clone()))
}
}
fn remove_non_canonical_headers<T: BlockchainBackend>(db: &mut T) -> Result<usize, ChainStorageError> {
let metadata = db.fetch_chain_metadata()?;
let best_block_height = metadata.best_block_height();
let mut expected_prev_hash = *metadata.best_block_hash();
let mut height = best_block_height.saturating_add(1);
loop {
let next_chain_header = match db.fetch_chain_header_by_height(height) {
Ok(hdr) => hdr,
Err(ChainStorageError::ValueNotFound { .. }) => break, Err(e) => return Err(e),
};
if next_chain_header.header().prev_hash != expected_prev_hash {
let last_chain_header_height = db.fetch_last_chain_header()?.height();
rewind_to_height(db, max(height.saturating_sub(1), best_block_height))?;
let removed =
usize::try_from(last_chain_header_height.saturating_sub(height).saturating_add(1)).unwrap_or(0);
debug!(target: LOG_TARGET, "Trimmed {removed} non-canonical header(s) starting at height {height}");
return Ok(removed);
}
expected_prev_hash = *next_chain_header.hash();
height = height.saturating_add(1);
}
Ok(0)
}
fn restore_reorged_chain<T: BlockchainBackend>(
db: &mut T,
to_hash: HashOutput,
previous_chain: Vec<Arc<ChainBlock>>,
) -> Result<(), ChainStorageError> {
let invalid_chain = rewind_to_hash(db, to_hash)?;
debug!(
target: LOG_TARGET,
"Removed {} blocks during chain restore: {:?}.",
invalid_chain.len(),
invalid_chain
.iter()
.map(|block| block.accumulated_data().hash)
.collect::<Vec<_>>(),
);
let mut txn = DbTransaction::new();
for block in previous_chain.into_iter().rev() {
txn.delete_orphan(block.accumulated_data().hash);
insert_best_block(&mut txn, block)?;
}
db.write(txn)?;
Ok(())
}
fn get_vm_key_for_candidate_block<T: BlockchainBackend>(
db: &mut T,
candidate_block: Arc<Block>,
) -> Result<FixedHash, ChainStorageError> {
get_vm_key_for_candidate_header(db, candidate_block.header.clone())
}
fn get_vm_key_for_candidate_header<T: BlockchainBackend>(
db: &mut T,
header: BlockHeader,
) -> Result<FixedHash, ChainStorageError> {
let vm_height = tari_rx_vm_key_height(header.height);
let mut current_header = header.clone();
while current_header.height != vm_height {
let h = db.fetch_chain_header_in_all_chains(¤t_header.prev_hash)?;
let chain_header = db.fetch_chain_header_by_height(h.height())?;
if *h.header() == *chain_header.header() {
return Ok(*db.fetch_chain_header_by_height(vm_height)?.hash());
}
current_header = h.header().clone();
}
Ok(FixedHash::from(*current_header.hash()))
}
#[allow(clippy::too_many_lines)]
fn insert_orphan_and_find_new_tips<T: BlockchainBackend>(
db: &mut T,
candidate_block: Arc<Block>,
validator: &dyn HeaderChainLinkedValidator<T>,
rules: &BaseNodeConsensusManager,
) -> Result<(), ChainStorageError> {
let hash = candidate_block.hash();
if db.contains(&DbKey::OrphanBlock(hash))? {
return Ok(());
}
let mut txn = DbTransaction::new();
let parent = match db.fetch_orphan_chain_tip_by_hash(&candidate_block.header.prev_hash)? {
Some(curr_parent) => {
txn.remove_orphan_chain_tip(candidate_block.header.prev_hash);
info!(
target: LOG_TARGET,
"New orphan ({hash}) extends a chain in the current candidate tip set"
);
curr_parent
},
None => match db
.fetch_chain_header_in_all_chains(&candidate_block.header.prev_hash)
.optional()?
{
Some(curr_parent) => {
debug!(
target: LOG_TARGET,
"New orphan #{} ({}) does not have a parent in the current tip set. Parent is {}",
candidate_block.header.height,
hash,
curr_parent.hash(),
);
curr_parent
},
None => {
if db.contains(&DbKey::OrphanBlock(hash))? {
info!(
target: LOG_TARGET,
"Orphan #{} ({}) already found in orphan database", candidate_block.header.height, hash
);
} else {
info!(
target: LOG_TARGET,
"Orphan #{} ({}) was not connected to any previous headers. Inserting as true orphan",
candidate_block.header.height,
hash
);
txn.insert_orphan(candidate_block);
}
db.write(txn)?;
return Ok(());
},
},
};
let mut prev_timestamps = get_previous_timestamps(db, &candidate_block.header, rules)?;
let vm_key = get_vm_key_for_candidate_block(db, candidate_block.clone())?;
let result = validator.validate(
db,
&candidate_block.header,
parent.header(),
&prev_timestamps,
None,
vm_key,
);
let achieved_target_diff = match result {
Ok(achieved_target_diff) => achieved_target_diff,
Err(e @ ValidationError::BlockHeaderError(BlockHeaderValidationError::InvalidTimestampFutureTimeLimit)) |
Err(
e @ ValidationError::FatalStorageError(_) | e @ ValidationError::IncorrectNumberOfTimestampsProvided { .. },
) |
Err(e @ ValidationError::BadBlockFound { .. }) => {
db.write(txn)?;
return Err(e.into());
}
Err(e) => {
txn.insert_bad_block(candidate_block.header.hash(), candidate_block.header.height, e.to_string());
db.write(txn)?;
return Err(e.into());
},
};
prev_timestamps.push(candidate_block.header.timestamp);
let accumulated_data = BlockHeaderAccumulatedDataBuilder::from_previous(parent.accumulated_data())
.with_hash(hash)
.with_achieved_target_difficulty(achieved_target_diff)
.with_total_kernel_offset(candidate_block.header.total_kernel_offset.clone())
.build(rules.consensus_constants(candidate_block.header.height))?;
let chain_block = ChainBlock::try_construct(candidate_block, accumulated_data).ok_or(
ChainStorageError::UnexpectedResult("Somehow hash is missing from Chain block".to_string()),
)?;
let chain_header = chain_block.to_chain_header();
txn.insert_orphan(chain_block.to_arc_block());
txn.set_accumulated_data_for_orphan(chain_block.header().version, chain_block.accumulated_data().clone());
db.write(txn)?;
let height = chain_header.height();
let tips = find_orphan_descendant_tips_of(
db,
chain_header,
prev_timestamps,
validator,
rules.consensus_constants(height),
)?;
let mut txn = DbTransaction::new();
debug!(target: LOG_TARGET, "Found {} new orphan tips", tips.len());
for new_tip in &tips {
txn.insert_orphan_chain_tip(
*new_tip.hash(),
chain_block.accumulated_data().total_accumulated_difficulty,
);
}
db.write(txn)?;
Ok(())
}
fn find_orphan_descendant_tips_of<T: BlockchainBackend>(
db: &mut T,
prev_chain_header: ChainHeader,
prev_timestamps: RollingVec<EpochTime>,
validator: &dyn HeaderChainLinkedValidator<T>,
consensus_constants: &ConsensusConstants,
) -> Result<Vec<ChainHeader>, ChainStorageError> {
let children = db.fetch_orphan_children_of(*prev_chain_header.hash())?;
if children.is_empty() {
debug!(
target: LOG_TARGET,
"Found new orphan tip {} ({})",
&prev_chain_header.height(),
&prev_chain_header.hash(),
);
return Ok(vec![prev_chain_header]);
}
debug!(
target: LOG_TARGET,
"Found {} children of orphan {} ({})",
children.len(),
&prev_chain_header.height(),
&prev_chain_header.hash()
);
let mut res = vec![];
for child in children {
debug!(
target: LOG_TARGET,
"Validating header #{} ({}), descendant of #{} ({})",
child.header.height,
child.hash(),
prev_chain_header.height(),
prev_chain_header.hash(),
);
let vm_key = *db
.fetch_chain_header_by_height(tari_rx_vm_key_height(child.header.height))?
.hash();
match validator.validate(
db,
&child.header,
prev_chain_header.header(),
&prev_timestamps,
None,
vm_key,
) {
Ok(achieved_target) => {
let mut prev_timestamps_for_children = prev_timestamps.clone();
prev_timestamps_for_children.push(child.header.timestamp);
let child_hash = child.hash();
let accum_data = BlockHeaderAccumulatedDataBuilder::from_previous(prev_chain_header.accumulated_data())
.with_hash(child_hash)
.with_achieved_target_difficulty(achieved_target)
.with_total_kernel_offset(child.header.total_kernel_offset.clone())
.build(consensus_constants)?;
let chain_header = ChainHeader::try_construct(child.header, accum_data).ok_or_else(|| {
ChainStorageError::InvalidOperation(format!(
"Attempt to create mismatched ChainHeader with hash {child_hash}"
))
})?;
let mut txn = DbTransaction::new();
txn.set_accumulated_data_for_orphan(
chain_header.header().version,
chain_header.accumulated_data().clone(),
);
db.write(txn)?;
let children = find_orphan_descendant_tips_of(
db,
chain_header,
prev_timestamps_for_children,
validator,
consensus_constants,
)?;
res.extend(children);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Discarding orphan {} because it has an invalid header: {:?}",
child.hash(),
e
);
let mut txn = DbTransaction::new();
txn.delete_orphan(child.hash());
db.write(txn)?;
},
};
}
Ok(res)
}
fn get_previous_timestamps<T: BlockchainBackend>(
db: &mut T,
header: &BlockHeader,
rules: &BaseNodeConsensusManager,
) -> Result<RollingVec<EpochTime>, ChainStorageError> {
let median_timestamp_window_size = rules.consensus_constants(header.height).median_timestamp_count();
let prev_height = usize::try_from(header.height)
.map_err(|_| ChainStorageError::ConversionError("Block height overflowed usize".to_string()))?;
let prev_timestamps_count = cmp::min(median_timestamp_window_size, prev_height);
let mut timestamps = RollingVec::new(median_timestamp_window_size);
let mut curr_header = header.prev_hash;
for _ in 0..prev_timestamps_count {
let h = db.fetch_chain_header_in_all_chains(&curr_header)?;
curr_header = h.header().prev_hash;
timestamps.push(EpochTime::from(h.timestamp()));
}
timestamps.sort_unstable();
Ok(timestamps)
}
#[allow(clippy::ptr_arg)]
fn get_orphan_link_main_chain<T: BlockchainBackend>(
db: &T,
orphan_tip: &HashOutput,
) -> Result<VecDeque<Arc<ChainBlock>>, ChainStorageError> {
let mut chain: VecDeque<Arc<ChainBlock>> = VecDeque::new();
let mut curr_hash = *orphan_tip;
loop {
let curr_block = db.fetch_orphan_chain_block(curr_hash)?.ok_or_else(|| {
ChainStorageError::InvalidOperation(format!(
"get_orphan_link_main_chain: Failed to fetch orphan chain block by hash {curr_hash}"
))
})?;
curr_hash = curr_block.header().prev_hash;
chain.push_front(Arc::new(curr_block));
if db.contains(&DbKey::HeaderHash(curr_hash))? {
break;
}
}
Ok(chain)
}
fn find_strongest_orphan_tip(
orphan_chain_tips: Vec<ChainHeader>,
chain_strength_comparer: &dyn ChainStrengthComparer,
) -> Option<ChainHeader> {
let mut best_block_header: Option<ChainHeader> = None;
for tip in orphan_chain_tips {
best_block_header = match best_block_header {
Some(current_best) => match chain_strength_comparer.compare(¤t_best, &tip) {
Ordering::Less => Some(tip),
Ordering::Greater | Ordering::Equal => Some(current_best),
},
None => Some(tip),
};
}
best_block_header
}
fn cleanup_orphans<T: BlockchainBackend>(db: &mut T, orphan_storage_capacity: usize) -> Result<(), ChainStorageError> {
let metadata = db.fetch_chain_metadata()?;
let horizon_height = metadata.pruned_height_at_given_chain_tip(metadata.best_block_height());
db.delete_oldest_orphans(horizon_height, orphan_storage_capacity)
}
fn prune_database_if_needed<T: BlockchainBackend>(
db: &mut T,
pruning_horizon: u64,
pruning_interval: u64,
) -> Result<(), ChainStorageError> {
let metadata = db.fetch_chain_metadata()?;
if !metadata.is_pruned_node() {
return Ok(());
}
let prune_to_height_target = metadata.best_block_height().saturating_sub(pruning_horizon);
debug!(
target: LOG_TARGET,
"Blockchain height: {}, pruning horizon: {}, pruned height: {}, prune to height target: {}, pruning interval: {}",
metadata.best_block_height(),
metadata.pruning_horizon(),
metadata.pruned_height(),
prune_to_height_target,
pruning_interval,
);
if metadata.pruned_height() < prune_to_height_target.saturating_sub(pruning_interval) {
prune_to_height(db, prune_to_height_target)?;
}
Ok(())
}
fn prune_to_height<T: BlockchainBackend>(db: &mut T, target_horizon_height: u64) -> Result<(), ChainStorageError> {
let metadata = db.fetch_chain_metadata()?;
let last_pruned = metadata.pruned_height();
if target_horizon_height < last_pruned {
return Err(ChainStorageError::InvalidArguments {
func: "prune_to_height",
arg: "target_horizon_height",
message: format!(
"Target pruning horizon {target_horizon_height} is less than current pruning horizon {last_pruned}"
),
});
}
if target_horizon_height == last_pruned {
info!(
target: LOG_TARGET,
"Blockchain already pruned to height {target_horizon_height}"
);
return Ok(());
}
if target_horizon_height > metadata.best_block_height() {
return Err(ChainStorageError::InvalidArguments {
func: "prune_to_height",
arg: "target_horizon_height",
message: format!(
"Target pruning horizon {} is greater than current block height {}",
target_horizon_height,
metadata.best_block_height()
),
});
}
info!(
target: LOG_TARGET,
"Pruning blockchain database at height {target_horizon_height} (was={last_pruned})"
);
let mut txn = DbTransaction::new();
for block_to_prune in (last_pruned + 1)..=target_horizon_height {
let header = db.fetch_chain_header_by_height(block_to_prune)?;
txn.prune_outputs_spent_at_hash(*header.hash());
txn.delete_all_inputs_in_block(*header.hash());
if txn.operations().len() >= 100 {
txn.set_pruned_height(block_to_prune);
db.write(mem::take(&mut txn))?;
}
}
txn.set_pruned_height(target_horizon_height);
db.write(txn)?;
Ok(())
}
fn log_error<T>(req: DbKey, err: ChainStorageError) -> Result<T, ChainStorageError> {
error!(
target: LOG_TARGET,
"Database access error on request: {req}: {err}"
);
Err(err)
}
impl<T> Clone for BlockchainDatabase<T> {
fn clone(&self) -> Self {
BlockchainDatabase {
db: self.db.clone(),
validators: self.validators.clone(),
config: self.config,
consensus_manager: self.consensus_manager.clone(),
difficulty_calculator: self.difficulty_calculator.clone(),
disable_add_block_flag: self.disable_add_block_flag.clone(),
is_background_pruning: self.is_background_pruning.clone(),
}
}
}
fn convert_to_option_bounds<T: RangeBounds<u64>>(bounds: T) -> (Option<u64>, Option<u64>) {
let start = bounds.start_bound();
let end = bounds.end_bound();
use Bound::{Excluded, Included, Unbounded};
let start = match start {
Included(n) => Some(*n),
Excluded(n) => Some(n.saturating_add(1)),
Unbounded => None,
};
let end = match end {
Included(n) => Some(*n),
Excluded(n) => Some(n.saturating_sub(1)),
Unbounded => None,
};
(start, end)
}
fn process_payref_for_height<B: BlockchainBackend>(
db: Arc<RwLock<B>>,
height: u64,
metadata_at_start: ChainMetadata,
initialize_stats: Option<u64>,
finalize: bool,
) -> Result<PayrefRebuildStatus, ChainStorageError> {
debug!(target: LOG_TARGET, "[PayRef] Processing index rebuilding for height {height}");
let write_lock = db
.write()
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
let status =
write_lock.build_payref_indexes_for_height(height, metadata_at_start.clone(), initialize_stats, finalize)?;
if finalize || status.is_rebuilt {
debug!(
target: LOG_TARGET,
"[PayRef] Finalized index rebuilding for heights {} to {}",
metadata_at_start.best_block_height(), height
);
}
Ok(status)
}
fn process_accumulated_data_for_height<B: BlockchainBackend>(
db: Arc<RwLock<B>>,
difficulty_calculator: DifficultyCalculator,
height: u64,
consensus_constants: &ConsensusConstants,
) -> Result<AccumulatedDataRebuildStatus, ChainStorageError> {
debug!(target: LOG_TARGET, "[AccData] Processing accumulated data rebuilding for height {height}");
let write_lock = db
.write()
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
let last_chain_header = write_lock.fetch_last_chain_header()?;
let height = min(height, last_chain_header.height());
let chain_header = write_lock.fetch_chain_header_by_height(height)?;
let header = chain_header.header().clone();
let prev_chain_header = write_lock.fetch_chain_header_by_height(height.saturating_sub(1))?;
let achieved_difficulty = difficulty_calculator.check_achieved_and_target_difficulty(&*write_lock, &header)?;
let accumulated_data = BlockHeaderAccumulatedDataBuilder::from_previous(prev_chain_header.accumulated_data())
.with_hash(header.hash())
.with_achieved_target_difficulty(achieved_difficulty)
.with_total_kernel_offset(header.total_kernel_offset.clone())
.build(consensus_constants)?;
let status = write_lock.update_accumulated_difficulty(height, accumulated_data, last_chain_header, true)?;
Ok(status)
}
fn verify_accumulated_data_for_height<B: BlockchainBackend>(
db: Arc<RwLock<B>>,
difficulty_calculator: DifficultyCalculator,
height: u64,
consensus_constants: &ConsensusConstants,
autocorrect: bool,
) -> Result<BlockchainCheckStatus, ChainStorageError> {
debug!(target: LOG_TARGET, "[AccData check] Checking accumulated data for height {height}");
let read_lock = db
.read()
.map_err(|_e| ChainStorageError::AccessError("Read lock on blockchain backend failed".into()))?;
let last_chain_header = read_lock.fetch_last_chain_header()?;
let height = min(height, last_chain_header.height());
let chain_header = read_lock.fetch_chain_header_by_height(height)?;
let header = chain_header.header().clone();
let prev_chain_header = read_lock.fetch_chain_header_by_height(height.saturating_sub(1))?;
let achieved_difficulty = difficulty_calculator.check_achieved_and_target_difficulty(&*read_lock, &header)?;
drop(read_lock);
let calculated_accumulated_data =
BlockHeaderAccumulatedDataBuilder::from_previous(prev_chain_header.accumulated_data())
.with_hash(header.hash())
.with_achieved_target_difficulty(achieved_difficulty)
.with_total_kernel_offset(header.total_kernel_offset.clone())
.build(consensus_constants)?;
let current_accumulated_data = chain_header.accumulated_data();
if &calculated_accumulated_data == current_accumulated_data {
trace!(
target: LOG_TARGET,
"[AccData check] Accumulated data for height {height} is correct. No update needed."
);
} else if autocorrect {
let write_lock = db
.write()
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
write_lock.update_accumulated_difficulty(height, calculated_accumulated_data, last_chain_header, false)?;
info!(
target: LOG_TARGET,
"[AccData check] Accumulated data for height {height} was corrupted, but rebuilt."
);
} else {
return Err(ChainStorageError::CorruptedDatabase(format!(
"Accumulated data for height {height} is corrupted."
)));
}
let write_lock = db
.write()
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
let last_chain_header = write_lock.fetch_last_chain_header()?;
let status = write_lock.update_accumulated_data_check_status(BlockchainCheckRequest::SetCheckResult {
has_concluded: height == last_chain_header.height(),
last_check_height: height,
current_height: last_chain_header.height(),
})?;
Ok(status)
}
fn verify_blockchain_consistency_for_height<B: BlockchainBackend>(
db: Arc<RwLock<B>>,
validators: &Validators<B>,
height: u64,
full_validation: bool,
) -> Result<BlockchainCheckStatus, ChainStorageError> {
debug!(target: LOG_TARGET, "[Blockchain check] Checking blockchain data for height {height} with full_validation({full_validation})");
let read_lock = db
.read()
.map_err(|_e| ChainStorageError::AccessError("Read lock on blockchain backend failed".into()))?;
let last_chain_header = read_lock.fetch_last_chain_header()?;
let height = min(height, last_chain_header.height());
let block_data = {
let metadata = read_lock.fetch_chain_metadata()?;
let horizon_height = metadata.pruned_height_at_given_chain_tip(height);
if height > horizon_height {
let historical_block = fetch_block(&*read_lock, height, false).map_err(|e| {
ChainStorageError::CorruptedDatabase(format!("Could not fetch block for height {height}: {e}"))
})?;
Some((
historical_block.block().clone(),
historical_block.accumulated_data().clone(),
))
} else {
None
}
};
let prev_chain_header = read_lock.fetch_chain_header_by_height(height.saturating_sub(1))?;
let this_block_header = if let Some((ref block, ref _accumulated_data)) = block_data {
block.header.clone()
} else {
read_lock.fetch_chain_header_by_height(height)?.header().clone()
};
drop(read_lock);
if &this_block_header.prev_hash != prev_chain_header.hash() {
return Err(ChainStorageError::CorruptedDatabase(format!(
"Block at height {height} has invalid previous hash"
)));
}
if this_block_header.height != prev_chain_header.height() + 1 {
return Err(ChainStorageError::CorruptedDatabase(format!(
"Block at height {height} does not follow previous header height"
)));
}
if full_validation && let Some((block, accumulated_data)) = block_data {
let read_lock = db
.read()
.map_err(|_e| ChainStorageError::AccessError("Read lock on blockchain backend failed".into()))?;
let block_hash = block.hash();
let accumulated_data_hash = accumulated_data.hash;
let chain_block = ChainBlock::try_construct(Arc::new(block), accumulated_data).ok_or_else(|| {
ChainStorageError::CorruptedDatabase(format!(
"Inconsistent hash in historical block: block hash {} vs. acc_data hash {}",
block_hash, accumulated_data_hash
))
})?;
let block_validator = validators.block.clone();
block_validator
.validate_body_at_height(&read_lock, &chain_block)
.map_err(|e| {
ChainStorageError::CorruptedDatabase(format!("Block body validation failed for height {height}: {e}"))
})?;
let orphan_validator = validators.orphan.clone();
orphan_validator
.validate_internal_consistency(chain_block.block())
.map_err(|e| {
ChainStorageError::CorruptedDatabase(format!(
"Block internal consistency validation failed for height {height}: {e}"
))
})?;
}
let write_lock = db
.write()
.map_err(|_e| ChainStorageError::AccessError("Write lock on blockchain backend failed".into()))?;
let last_chain_header = write_lock.fetch_last_chain_header()?;
let status = write_lock.update_blockchain_consistency_check_status(BlockchainCheckRequest::SetCheckResult {
has_concluded: height == last_chain_header.height(),
last_check_height: height,
current_height: last_chain_header.height(),
})?;
Ok(status)
}
#[cfg(test)]
mod test {
#![allow(clippy::indexing_slicing)]
use std::{collections::HashMap, sync};
use rand::seq::SliceRandom;
use tari_common::configuration::Network;
use tari_test_utils::unpack_enum;
use tari_transaction_components::{
consensus::{ConsensusConstantsBuilder, consensus_constants::PowAlgorithmConstants},
tari_proof_of_work::Difficulty,
};
use super::*;
use crate::{
block_specs,
consensus::chain_strength_comparer::strongest_chain,
test_helpers::{
BlockSpecs,
blockchain::{
TempDatabase,
create_chained_blocks,
create_main_chain,
create_new_blockchain,
create_orphan_chain,
create_test_blockchain_db,
},
},
validation::{header::HeaderFullValidator, mocks::MockValidator},
};
#[test]
fn lmdb_fetch_monero_seeds() {
let db = create_test_blockchain_db();
let seed = b"test1";
{
let db_read = db.db_read_access().unwrap();
assert_eq!(db_read.fetch_monero_seed_first_seen_height(&seed[..]).unwrap(), 0);
}
{
let mut txn = DbTransaction::new();
txn.insert_monero_seed_height(seed.to_vec(), 5);
let mut db_write = db.test_db_write_access().unwrap();
assert!(db_write.write(txn).is_ok());
}
{
let db_read = db.db_read_access().unwrap();
assert_eq!(db_read.fetch_monero_seed_first_seen_height(&seed[..]).unwrap(), 5);
}
{
let mut txn = DbTransaction::new();
txn.insert_monero_seed_height(seed.to_vec(), 2);
let mut db_write = db.db_write_access().unwrap();
assert!(db_write.write(txn).is_ok());
}
{
let db_read = db.db_read_access().unwrap();
assert_eq!(db_read.fetch_monero_seed_first_seen_height(&seed[..]).unwrap(), 2);
}
}
mod get_orphan_link_main_chain {
use super::*;
#[tokio::test]
async fn it_gets_a_simple_link_to_genesis() {
let db = create_new_blockchain();
let genesis = db
.fetch_block(0, true)
.unwrap()
.try_into_chain_block()
.map(Arc::new)
.unwrap();
let (_, chain) =
create_orphan_chain(&db, &[("A->GB", 1, 120), ("B->A", 1, 120), ("C->B", 1, 120)], genesis);
let access = db.db_read_access().unwrap();
let orphan_chain = get_orphan_link_main_chain(&*access, chain.get("C").unwrap().hash()).unwrap();
assert_eq!(orphan_chain[2].hash(), chain.get("C").unwrap().hash());
assert_eq!(orphan_chain[1].hash(), chain.get("B").unwrap().hash());
assert_eq!(orphan_chain[0].hash(), chain.get("A").unwrap().hash());
assert_eq!(orphan_chain.len(), 3);
}
#[tokio::test]
async fn it_selects_a_large_reorg_chain() {
let db = create_new_blockchain();
let (_, mainchain) = create_main_chain(&db, &[
("A->GB", 1, 120),
("B->A", 1, 120),
("C->B", 1, 120),
("D->C", 1, 120),
]);
let fork_root = mainchain.get("B").unwrap().clone();
let (_, reorg_chain) = create_orphan_chain(
&db,
&[
("C2->GB", 2, 120),
("D2->C2", 1, 120),
("E2->D2", 1, 120),
("F2->E2", 1, 120),
],
fork_root,
);
let access = db.db_read_access().unwrap();
let orphan_chain = get_orphan_link_main_chain(&*access, reorg_chain.get("F2").unwrap().hash()).unwrap();
assert_eq!(orphan_chain[3].hash(), reorg_chain.get("F2").unwrap().hash());
assert_eq!(orphan_chain[2].hash(), reorg_chain.get("E2").unwrap().hash());
assert_eq!(orphan_chain[1].hash(), reorg_chain.get("D2").unwrap().hash());
assert_eq!(orphan_chain[0].hash(), reorg_chain.get("C2").unwrap().hash());
assert_eq!(orphan_chain.len(), 4);
}
#[test]
fn it_errors_if_orphan_not_exist() {
let db = create_new_blockchain();
let access = db.db_read_access().unwrap();
let err = get_orphan_link_main_chain(&*access, &FixedHash::zero()).unwrap_err();
assert!(matches!(err, ChainStorageError::InvalidOperation(_)));
}
}
mod insert_orphan_and_find_new_tips {
use super::*;
#[tokio::test]
async fn it_inserts_new_block_in_orphan_db_as_tip() {
let db = create_new_blockchain();
let validator = MockValidator::new(true);
let genesis_block = db
.fetch_block(0, true)
.unwrap()
.try_into_chain_block()
.map(Arc::new)
.unwrap();
let (_, chain) = create_chained_blocks(&db, &[("A->GB", 1u64, 120u64)], genesis_block);
let block = chain.get("A").unwrap().clone();
let mut access = db.db_write_access().unwrap();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let maybe_block = access.fetch_orphan_chain_tip_by_hash(block.hash()).unwrap();
assert_eq!(maybe_block.unwrap().header(), block.header());
}
#[tokio::test]
async fn it_inserts_true_orphan_chain() {
let db = create_new_blockchain();
let validator = MockValidator::new(true);
let (_, main_chain) = create_main_chain(&db, &[("A->GB", 1, 120), ("B->A", 1, 120)]);
let block_b = main_chain.get("B").unwrap().clone();
let (_, orphan_chain) = create_chained_blocks(
&db,
&[("C2->GB", 1, 120), ("D2->C2", 1, 120), ("E2->D2", 1, 120)],
block_b,
);
let mut access = db.db_write_access().unwrap();
let block_d2 = orphan_chain.get("D2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block_d2.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let block_e2 = orphan_chain.get("E2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block_e2.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let maybe_block = access.fetch_orphan_children_of(*block_d2.hash()).unwrap();
assert_eq!(maybe_block[0], *block_e2.to_arc_block());
}
#[tokio::test]
async fn it_correctly_handles_duplicate_blocks() {
let db = create_new_blockchain();
let validator = MockValidator::new(true);
let (_, main_chain) = create_main_chain(&db, &[("A->GB", 1, 120)]);
let fork_root = main_chain.get("A").unwrap().clone();
let (_, orphan_chain) = create_chained_blocks(&db, &[("B2->GB", 1, 120)], fork_root);
let mut access = db.db_write_access().unwrap();
let block = orphan_chain.get("B2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let fork_tip = access.fetch_orphan_chain_tip_by_hash(block.hash()).unwrap().unwrap();
assert_eq!(fork_tip, block.to_chain_header());
assert_eq!(fork_tip.accumulated_data().total_accumulated_difficulty, 3.into());
let strongest_tips = access.fetch_strongest_orphan_chain_tips().unwrap().len();
assert_eq!(strongest_tips, 1);
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let strongest_tips = access.fetch_strongest_orphan_chain_tips().unwrap().len();
assert_eq!(strongest_tips, 1);
}
#[ignore]
#[tokio::test]
async fn it_correctly_detects_strongest_orphan_tips() {
let db = create_new_blockchain();
let validator = MockValidator::new(true);
let (_, main_chain) = create_main_chain(&db, &[
("A->GB", 1, 120),
("B->A", 2, 120),
("C->B", 1, 120),
("D->C", 1, 120),
("E->D", 1, 120),
("F->E", 1, 120),
("G->F", 1, 120),
]);
let fork_root_1 = main_chain.get("A").unwrap().clone();
let (_, orphan_chain_1) = create_chained_blocks(
&db,
&[("B2->GB", 1, 120), ("C2->B2", 1, 120), ("D2->C2", 1, 120)],
fork_root_1,
);
let fork_root_2 = main_chain.get("GB").unwrap().clone();
let (_, orphan_chain_2) = create_chained_blocks(&db, &[("B3->GB", 1, 120)], fork_root_2);
let fork_root_3 = main_chain.get("B").unwrap().clone();
let (_, orphan_chain_3) = create_chained_blocks(&db, &[("B4->GB", 1, 120)], fork_root_3);
let mut access = db.db_write_access().unwrap();
let block = orphan_chain_1.get("B2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let block = orphan_chain_1.get("C2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let block = orphan_chain_1.get("D2").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let fork_tip_1 = access.fetch_orphan_chain_tip_by_hash(block.hash()).unwrap().unwrap();
assert_eq!(fork_tip_1, block.to_chain_header());
assert_eq!(fork_tip_1.accumulated_data().total_accumulated_difficulty, 5.into());
let block = orphan_chain_2.get("B3").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let fork_tip_2 = access.fetch_orphan_chain_tip_by_hash(block.hash()).unwrap().unwrap();
assert_eq!(fork_tip_2, block.to_chain_header());
assert_eq!(fork_tip_2.accumulated_data().total_accumulated_difficulty, 2.into());
let block = orphan_chain_3.get("B4").unwrap().clone();
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let fork_tip_3 = access.fetch_orphan_chain_tip_by_hash(block.hash()).unwrap().unwrap();
assert_eq!(fork_tip_3, block.to_chain_header());
assert_eq!(fork_tip_3.accumulated_data().total_accumulated_difficulty, 5.into());
assert_ne!(fork_tip_1, fork_tip_2);
assert_ne!(fork_tip_1, fork_tip_3);
let strongest_tips = access.fetch_strongest_orphan_chain_tips().unwrap();
assert_eq!(strongest_tips.len(), 2);
let mut found_tip_1 = false;
let mut found_tip_3 = false;
for tip in &strongest_tips {
if tip == &fork_tip_1 {
found_tip_1 = true;
}
if tip == &fork_tip_3 {
found_tip_3 = true;
}
}
assert!(found_tip_1 && found_tip_3);
insert_orphan_and_find_new_tips(&mut *access, block.to_arc_block(), &validator, &db.consensus_manager)
.unwrap();
let strongest_tips = access.fetch_strongest_orphan_chain_tips().unwrap();
assert_eq!(strongest_tips.len(), 2);
}
}
mod handle_possible_reorg {
use super::*;
#[ignore]
#[tokio::test]
async fn it_links_many_orphan_branches_to_main_chain() {
let test = TestHarness::setup();
let (_, main_chain) =
create_main_chain(&test.db, block_specs!(["1a->GB"], ["2a->1a"], ["3a->2a"], ["4a->3a"]));
let genesis = main_chain.get("GB").unwrap().clone();
let fork_root = main_chain.get("1a").unwrap().clone();
let (_, orphan_chain_b) = create_chained_blocks(
&test.db,
block_specs!(["2b->GB"], ["3b->2b"], ["4b->3b"], ["5b->4b"], ["6b->5b"]),
fork_root,
);
for name in ["5b", "3b", "4b", "6b"] {
let block = orphan_chain_b.get(name).unwrap();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
assert!(result.is_orphaned());
}
let fork_root = orphan_chain_b.get("3b").unwrap().clone();
let (_, orphan_chain_c) = create_chained_blocks(
&test.db,
block_specs!(["4c->GB"], ["5c->4c"], ["6c->5c"], ["7c->6c"]),
fork_root,
);
for name in ["7c", "5c", "6c", "4c"] {
let block = orphan_chain_c.get(name).unwrap();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
assert!(result.is_orphaned());
}
let fork_root = orphan_chain_c.get("6c").unwrap().clone();
let (_, orphan_chain_d) = create_chained_blocks(
&test.db,
block_specs!(["7d->GB", difficulty: Difficulty::from_u64(10).unwrap()]),
fork_root,
);
let block = orphan_chain_d.get("7d").unwrap();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
assert!(result.is_orphaned());
let block = orphan_chain_b.get("2b").unwrap();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
result.assert_reorg(6, 3);
{
let access = test.db_write_access();
let block = orphan_chain_b.get("2b").unwrap().clone();
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());
let block = orphan_chain_d.get("7d").unwrap().clone();
let tip = access.fetch_tip_header().unwrap();
assert_eq!(tip.hash(), block.hash());
let metadata = access.fetch_chain_metadata().unwrap();
assert_eq!(metadata.best_block_hash(), block.hash());
assert_eq!(metadata.best_block_height(), block.height());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());
let mut all_blocks = main_chain
.into_iter()
.chain(orphan_chain_b)
.chain(orphan_chain_c)
.chain(orphan_chain_d)
.collect::<HashMap<_, _>>();
all_blocks.insert("GB".to_string(), genesis);
let expected_chain = ["GB", "1a", "2b", "3b", "4c", "5c", "6c", "7d"];
for (height, name) in expected_chain.iter().enumerate() {
let expected_block = all_blocks.get(*name).unwrap();
unpack_enum!(
DbValue::HeaderHeight(found_block) =
access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap()
);
assert_eq!(*found_block, *expected_block.header());
}
}
}
#[ignore]
#[tokio::test]
async fn it_links_many_orphan_branches_to_main_chain_with_greater_reorg_than_median_timestamp_window() {
let test = TestHarness::setup();
assert_eq!(test.consensus.consensus_constants(0).median_timestamp_count(), 11);
let (_, main_chain) = create_main_chain(
&test.db,
block_specs!(
["1a->GB"],
["2a->1a"],
["3a->2a"],
["4a->3a"],
["5a->4a"],
["6a->5a"],
["7a->6a"],
["8a->7a"],
["9a->8a"],
["10a->9a"],
["11a->10a"],
["12a->11a"],
["13a->12a"],
),
);
let genesis = main_chain.get("GB").unwrap().clone();
let fork_root = main_chain.get("1a").unwrap().clone();
let (_, orphan_chain_b) = create_chained_blocks(
&test.db,
block_specs!(
["2b->GB"],
["3b->2b"],
["4b->3b"],
["5b->4b"],
["6b->5b"],
["7b->6b"],
["8b->7b"],
["9b->8b"],
["10b->9b"],
["11b->10b"],
["12b->11b", difficulty: Difficulty::from_u64(5).unwrap()]
),
fork_root,
);
let mut unordered = vec!["3b", "4b", "5b", "6b", "7b", "8b", "9b", "10b", "11b", "12b"];
unordered.shuffle(&mut rand::rng());
for name in unordered {
let block = orphan_chain_b.get(name).unwrap().clone();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
assert!(result.is_orphaned());
}
let block = orphan_chain_b.get("2b").unwrap().clone();
let result = test.handle_possible_reorg(block.to_arc_block()).unwrap();
result.assert_reorg(11, 12);
{
let access = test.db_write_access();
let block = orphan_chain_b.get("2b").unwrap().clone();
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());
let block = orphan_chain_b.get("12b").unwrap().clone();
let tip = access.fetch_tip_header().unwrap();
assert_eq!(tip.hash(), block.hash());
let metadata = access.fetch_chain_metadata().unwrap();
assert_eq!(metadata.best_block_hash(), block.hash());
assert_eq!(metadata.best_block_height(), block.height());
assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap());
let mut all_blocks = main_chain.into_iter().chain(orphan_chain_b).collect::<HashMap<_, _>>();
all_blocks.insert("GB".to_string(), genesis);
let expected_chain = [
"GB", "1a", "2b", "3b", "4b", "5b", "6b", "7b", "8b", "9b", "10b", "11b", "12b",
];
for (height, name) in expected_chain.iter().enumerate() {
let expected_block = all_blocks.get(*name).unwrap();
unpack_enum!(
DbValue::HeaderHeight(found_block) =
access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap()
);
assert_eq!(*found_block, *expected_block.header());
}
}
}
#[tokio::test]
async fn it_errors_if_reorging_to_an_invalid_height() {
let test = TestHarness::setup();
let (_, main_chain) =
create_main_chain(&test.db, block_specs!(["1a->GB"], ["2a->1a"], ["3a->2a"], ["4a->3a"]));
let fork_root = main_chain.get("1a").unwrap().clone();
let (_, orphan_chain_b) = create_chained_blocks(
&test.db,
block_specs!(["2b->GB", height: 10, difficulty: Difficulty::from_u64(10).unwrap()]),
fork_root,
);
let block = orphan_chain_b.get("2b").unwrap().clone();
let err = test.handle_possible_reorg(block.to_arc_block()).unwrap_err();
unpack_enum!(ChainStorageError::ValueNotFound { .. } = err);
}
#[tokio::test]
async fn it_allows_orphan_blocks_with_any_height() {
let test = TestHarness::setup();
let (_, main_chain) = create_main_chain(
&test.db,
block_specs!(["1a->GB", difficulty: Difficulty::from_u64(2).unwrap()]),
);
let fork_root = main_chain.get("GB").unwrap().clone();
let (_, orphan_chain_b) = create_orphan_chain(&test.db, block_specs!(["1b->GB", height: 10]), fork_root);
let block = orphan_chain_b.get("1b").unwrap().clone();
test.handle_possible_reorg(block.to_arc_block())
.unwrap()
.assert_orphaned();
}
}
#[tokio::test]
async fn test_handle_possible_reorg_case1() {
let (result, _blocks) = test_case_handle_possible_reorg(&[("A->GB", 1, 120), ("B->A", 1, 120)]).unwrap();
result[0].assert_added();
result[1].assert_added();
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_case2() {
let (result, blocks) =
test_case_handle_possible_reorg(&[("A->GB", 1, 120), ("B->A", 1, 120), ("A2->GB", 3, 120)]).unwrap();
result[0].assert_added();
result[1].assert_added();
result[2].assert_reorg(1, 2);
assert_added_hashes_eq(&result[2], vec!["A2"], &blocks);
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_case3() {
let (result, blocks) =
test_case_handle_possible_reorg(&[("A->GB", 1, 120), ("A2->GB", 2, 120), ("B->A", 2, 120)]).unwrap();
result[0].assert_added();
result[1].assert_reorg(1, 1);
result[2].assert_reorg(2, 1);
assert_added_hashes_eq(&result[2], vec!["A", "B"], &blocks);
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_case4() {
let (result, blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 120),
("A2->GB", 2, 120),
("B->A", 2, 120),
("A3->GB", 4, 120),
("C->B", 2, 120),
])
.unwrap();
result[0].assert_added();
result[1].assert_reorg(1, 1);
result[2].assert_reorg(2, 1);
result[3].assert_reorg(1, 2);
result[4].assert_reorg(3, 1);
assert_added_hashes_eq(&result[4], vec!["A", "B", "C"], &blocks);
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_case5() {
let (result, blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 120),
("B->A", 1, 120),
("A2->GB", 3, 120),
("C->B", 1, 120),
("D->C", 2, 120),
("B2->A", 5, 120),
("D2->C", 6, 120),
("D3->C", 7, 120),
("D4->C", 8, 120),
])
.unwrap();
result[0].assert_added();
result[1].assert_added();
result[2].assert_reorg(1, 2);
result[3].assert_orphaned();
result[4].assert_reorg(4, 1);
result[5].assert_reorg(1, 3);
result[6].assert_reorg(3, 1);
result[7].assert_reorg(1, 1);
result[8].assert_reorg(1, 1);
assert_added_hashes_eq(&result[5], vec!["B2"], &blocks);
assert_difficulty_eq(&result[5], vec![7.into()]);
assert_added_hashes_eq(&result[6], vec!["B", "C", "D2"], &blocks);
assert_difficulty_eq(&result[6], vec![3.into(), 4.into(), 10.into()]);
assert_added_hashes_eq(&result[7], vec!["D3"], &blocks);
assert_difficulty_eq(&result[7], vec![11.into()]);
assert_added_hashes_eq(&result[8], vec!["D4"], &blocks);
assert_difficulty_eq(&result[8], vec![12.into()]);
}
#[tokio::test]
async fn test_handle_possible_reorg_case6_orphan_chain_link() {
let db = create_new_blockchain();
let (_, mainchain) = create_main_chain(&db, &[
("A->GB", 1, 120),
("B->A", 1, 120),
("C->B", 1, 120),
("D->C", 1, 120),
]);
let mock_validator = MockValidator::new(true);
let chain_strength_comparer = strongest_chain().by_sha3x_difficulty().build();
let fork_block = mainchain.get("B").unwrap().clone();
let (_, reorg_chain) = create_chained_blocks(
&db,
&[("C2->GB", 1, 120), ("D2->C2", 1, 120), ("E2->D2", 1, 120)],
fork_block,
);
let mut access = db.db_write_access().unwrap();
let result = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&mock_validator,
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("E2").unwrap().to_arc_block(),
)
.unwrap();
result.assert_orphaned();
let result = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&mock_validator,
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("E2").unwrap().to_arc_block(),
)
.unwrap();
result.assert_orphaned();
let result = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&mock_validator,
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("D2").unwrap().to_arc_block(),
)
.unwrap();
result.assert_orphaned();
let tip = access.fetch_last_header().unwrap();
assert_eq!(&tip, mainchain.get("D").unwrap().header());
let result = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&mock_validator,
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("C2").unwrap().to_arc_block(),
)
.unwrap();
result.assert_reorg(3, 2);
let tip = access.fetch_last_header().unwrap();
assert_eq!(&tip, reorg_chain.get("E2").unwrap().header());
check_whole_chain(&mut access);
}
#[tokio::test]
async fn test_handle_possible_reorg_case7_fail_reorg() {
let db = create_new_blockchain();
let (_, mainchain) = create_main_chain(&db, &[
("A->GB", 1, 120),
("B->A", 1, 120),
("C->B", 1, 120),
("D->C", 1, 120),
]);
let mock_validator = MockValidator::new(true);
let chain_strength_comparer = strongest_chain().by_sha3x_difficulty().build();
let fork_block = mainchain.get("C").unwrap().clone();
let (_, reorg_chain) = create_chained_blocks(&db, &[("D2->GB", 1, 120), ("E2->D2", 2, 120)], fork_block);
let mut access = db.db_write_access().unwrap();
let result = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&mock_validator,
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("E2").unwrap().to_arc_block(),
)
.unwrap();
result.assert_orphaned();
let _error = handle_possible_reorg(
&mut *access,
&Default::default(),
&db.consensus_manager,
&MockValidator::new(false),
&mock_validator,
&*chain_strength_comparer,
reorg_chain.get("D2").unwrap().to_arc_block(),
)
.unwrap_err();
let tip = access.fetch_last_header().unwrap();
assert_eq!(&tip, mainchain.get("D").unwrap().header());
check_whole_chain(&mut access);
}
#[tokio::test]
async fn test_handle_possible_reorg_target_difficulty_is_correct_case_1() {
let (result, _blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 12),
("B->A", 10, 40),
("C2->B", 20, 69),
("D2->C2", 40, 40),
])
.unwrap();
let mut expected_target_difficulties = vec![];
expected_target_difficulties.extend(result[0].added_blocks());
expected_target_difficulties.extend(result[1].added_blocks());
expected_target_difficulties.extend(result[2].added_blocks());
expected_target_difficulties.extend(result[3].added_blocks());
let expected_target_difficulties: Vec<u64> = expected_target_difficulties
.iter()
.map(|b| b.accumulated_data().target_difficulty.as_u64())
.collect();
assert_eq!(expected_target_difficulties, vec![1, 10, 19, 24]);
let (result, blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 12),
("B->A", 10, 40),
("C->B", 30, 155),
("C2->B", 20, 69),
("D2->C2", 40, 40),
])
.unwrap();
result[0].assert_added();
result[1].assert_added();
result[2].assert_added();
result[3].assert_orphaned();
result[4].assert_reorg(2, 1);
assert_added_hashes_eq(&result[4], vec!["C2", "D2"], &blocks);
assert_target_difficulties_eq(&result[4], vec![19, 24]);
}
#[tokio::test]
async fn test_handle_possible_reorg_banked_headers_not_aligned_with_propagated_block() {
let test = TestHarness::setup();
let (_, main_chain) = create_main_chain(
&test.db,
block_specs!(
["B1->GB"],
["B2->B1"],
["B3->B2"],
["H4->B3"],
["H5->H4"],
["H6->H5"],
["H7->H6"]
),
);
let banked_headers: Vec<_> = ["H4".to_string(), "H5".to_string(), "H6".to_string(), "H7".to_string()]
.iter()
.map(|n| main_chain.get(&n.clone()).unwrap().to_chain_header())
.collect();
let fork_root = main_chain.get("B3").unwrap().clone();
assert!(
banked_headers
.iter()
.all(|h| test.db.fetch_block_by_hash(*h.hash(), false).unwrap().is_some())
);
test.db.rewind_to_height(fork_root.height()).unwrap();
test.db.cleanup_all_orphans().unwrap();
assert!(
banked_headers
.iter()
.all(|h| test.db.fetch_block_by_hash(*h.hash(), false).unwrap().is_none())
);
test.db.insert_valid_headers(banked_headers.clone()).unwrap();
assert!(
banked_headers
.iter()
.all(|h| test.db.fetch_header_by_block_hash(*h.hash()).unwrap().is_some())
);
let (_, reorg_chain) = create_chained_blocks(&test.db, block_specs!(["newB->GB"]), fork_root);
let new_block = reorg_chain.get("newB").unwrap().clone().to_arc_block();
let result = test.handle_possible_reorg(new_block.clone());
assert!(result.is_ok());
assert!(test.db.fetch_block_by_hash(new_block.hash(), false).unwrap().is_some());
assert!(
banked_headers
.iter()
.all(|h| test.db.fetch_header_by_block_hash(*h.hash()).unwrap().is_none())
);
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_target_difficulty_is_correct_case_2() {
let (result, _blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 12),
("B2->A", 10, 40),
("C2->B2", 20, 70),
("D2->C2", 25, 70),
("E2->D2", 30, 70),
])
.unwrap();
let mut expected_target_difficulties = vec![];
expected_target_difficulties.extend(result[0].added_blocks());
expected_target_difficulties.extend(result[1].added_blocks());
expected_target_difficulties.extend(result[2].added_blocks());
expected_target_difficulties.extend(result[3].added_blocks());
expected_target_difficulties.extend(result[4].added_blocks());
let expected_target_difficulties: Vec<u64> = expected_target_difficulties
.iter()
.map(|b| b.accumulated_data().target_difficulty.as_u64())
.collect();
assert_eq!(expected_target_difficulties, vec![1, 10, 19, 23, 26]);
let (result, blocks) = test_case_handle_possible_reorg(&[
("A->GB", 1, 12),
("B->A", 35, 200),
("C->B", 35, 200),
("B2->A", 10, 40),
("C2->B2", 20, 70),
("D2->C2", 25, 70),
("E2->D2", 30, 70),
])
.unwrap();
result[0].assert_added();
result[1].assert_added();
result[2].assert_added();
result[3].assert_orphaned();
result[4].assert_orphaned();
result[5].assert_orphaned();
result[6].assert_reorg(4, 2);
assert_added_hashes_eq(&result[6], vec!["B2", "C2", "D2", "E2"], &blocks);
assert_target_difficulties_eq(&result[6], vec![10, 19, 23, 26]);
}
#[ignore]
#[tokio::test]
async fn test_handle_possible_reorg_accum_difficulty_is_correct_case_1() {
let (result, _blocks) = test_case_handle_possible_reorg(&[
("A0->GB", 1, 120), ("B0->A0", 1, 120), ("C0->B0", 1, 120), ("A1->C0", 2, 120), ("B1->A1", 2, 120), ("C1->B1", 2, 120), ("A2->C0", 2, 120), ("B2->A2", 2, 120), ("C2->B2", 2, 120), ("D2->C2", 1, 120), ("D1->C1", 1, 120), ("E1->D1", 1, 120), ("E2->D2", 1, 120), ])
.unwrap();
result[0].assert_added();
result[1].assert_added();
result[2].assert_added();
assert_difficulty_eq(&result[0], vec![2.into()]);
assert_difficulty_eq(&result[1], vec![3.into()]);
assert_difficulty_eq(&result[2], vec![4.into()]);
result[3].assert_added();
result[4].assert_added();
result[5].assert_added();
assert_difficulty_eq(&result[3], vec![6.into()]);
assert_difficulty_eq(&result[4], vec![8.into()]);
assert_difficulty_eq(&result[5], vec![10.into()]);
result[6].assert_orphaned();
result[7].assert_orphaned();
result[8].assert_orphaned();
result[9].assert_reorg(4, 3);
assert_difficulty_eq(&result[9], vec![6.into(), 8.into(), 10.into(), 11.into()]);
result[10].assert_orphaned();
result[11].assert_reorg(5, 4);
assert_difficulty_eq(&result[11], vec![6.into(), 8.into(), 10.into(), 11.into(), 12.into()]);
result[12].assert_orphaned();
}
fn check_whole_chain(db: &mut TempDatabase) {
let mut h = db.fetch_chain_metadata().unwrap().best_block_height();
while h > 0 {
db.fetch_chain_header_by_height(h).unwrap();
h -= 1;
}
}
fn assert_added_hashes_eq(
result: &BlockAddResult,
block_names: Vec<&str>,
blocks: &HashMap<String, Arc<ChainBlock>>,
) {
let added = result.added_blocks();
assert_eq!(
added.iter().map(|b| b.hash()).collect::<Vec<_>>(),
block_names
.iter()
.map(|b| blocks.get(*b).unwrap().hash())
.collect::<Vec<_>>()
);
}
fn assert_difficulty_eq(result: &BlockAddResult, values: Vec<U512>) {
let accum_difficulty: Vec<U512> = result
.added_blocks()
.iter()
.map(|cb| cb.accumulated_data().total_accumulated_difficulty)
.collect();
assert_eq!(accum_difficulty, values);
}
fn assert_target_difficulties_eq(result: &BlockAddResult, values: Vec<u64>) {
let accum_difficulty: Vec<u64> = result
.added_blocks()
.iter()
.map(|cb| cb.accumulated_data().target_difficulty.as_u64())
.collect();
assert_eq!(accum_difficulty, values);
}
struct TestHarness {
db: BlockchainDatabase<TempDatabase>,
config: BlockchainDatabaseConfig,
consensus: BaseNodeConsensusManager,
chain_strength_comparer: Box<dyn ChainStrengthComparer>,
post_orphan_body_validator: Box<dyn CandidateBlockValidator<TempDatabase>>,
header_validator: Box<dyn HeaderChainLinkedValidator<TempDatabase>>,
}
impl TestHarness {
pub fn setup() -> Self {
let consensus = create_consensus_rules();
let db = create_new_blockchain();
let difficulty_calculator = DifficultyCalculator::new(consensus.clone(), Default::default());
let header_validator = Box::new(HeaderFullValidator::new(consensus.clone(), difficulty_calculator));
let post_orphan_body_validator = Box::new(MockValidator::new(true));
let chain_strength_comparer = strongest_chain().by_sha3x_difficulty().build();
Self {
db,
config: Default::default(),
consensus,
chain_strength_comparer,
header_validator,
post_orphan_body_validator,
}
}
pub fn db_write_access(&self) -> sync::RwLockWriteGuard<'_, TempDatabase> {
self.db.db_write_access().unwrap()
}
pub fn handle_possible_reorg(&self, block: Arc<Block>) -> Result<BlockAddResult, ChainStorageError> {
let mut access = self.db_write_access();
handle_possible_reorg(
&mut *access,
&self.config,
&self.consensus,
&*self.post_orphan_body_validator,
&*self.header_validator,
&*self.chain_strength_comparer,
block,
)
}
}
#[allow(clippy::type_complexity)]
fn test_case_handle_possible_reorg<T: Into<BlockSpecs>>(
blocks: T,
) -> Result<(Vec<BlockAddResult>, HashMap<String, Arc<ChainBlock>>), ChainStorageError> {
let test = TestHarness::setup();
let genesis_block = test
.db
.fetch_block(0, true)
.unwrap()
.try_into_chain_block()
.map(Arc::new)
.unwrap();
let (block_names, chain) = { create_chained_blocks(&test.db, blocks, genesis_block) };
let mut results = vec![];
for name in block_names {
let block = chain.get(&name.to_string()).unwrap();
debug!(
"Testing handle_possible_reorg for block {} ({}, parent = {})",
block.height(),
block.hash(),
block.header().prev_hash,
);
results.push(test.handle_possible_reorg(block.to_arc_block()).unwrap());
}
Ok((results, chain))
}
fn create_consensus_rules() -> BaseNodeConsensusManager {
BaseNodeConsensusManager::builder(Network::LocalNet)
.add_consensus_constants(
ConsensusConstantsBuilder::new(Network::LocalNet)
.clear_proof_of_work()
.add_proof_of_work(PowAlgorithm::Sha3x, PowAlgorithmConstants {
min_difficulty: Difficulty::min(),
max_difficulty: Difficulty::from_u64(100).expect("valid difficulty"),
target_time: 120,
})
.build(),
)
.build()
.unwrap()
}
}