use std::{
collections::{HashMap, HashSet},
fmt::{self, Display, Formatter},
};
use datasize::DataSize;
use derive_more::Display;
use tracing::{debug, error, info, trace, warn};
use casper_storage::block_store::types::ApprovalsHashes;
use casper_types::{
execution::ExecutionResult, Block, BlockHash, BlockHeader, Digest, EraId, FinalitySignature,
ProtocolVersion, PublicKey, TransactionHash, TransactionId,
};
use crate::{
components::block_synchronizer::{
block_acquisition_action::BlockAcquisitionAction,
deploy_acquisition::TransactionAcquisition, peer_list::PeerList,
signature_acquisition::SignatureAcquisition, BlockAcquisitionError,
ExecutionResultsAcquisition, ExecutionResultsChecksum,
},
types::{BlockExecutionResultsOrChunk, EraValidatorWeights, ExecutableBlock, SignatureWeight},
NodeRng,
};
use super::deploy_acquisition::TransactionIdentifier;
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Clone, DataSize, Debug)]
pub(super) enum BlockAcquisitionState {
Initialized(BlockHash, SignatureAcquisition),
HaveBlockHeader(Box<BlockHeader>, SignatureAcquisition),
HaveWeakFinalitySignatures(Box<BlockHeader>, SignatureAcquisition),
HaveBlock(Box<Block>, SignatureAcquisition, TransactionAcquisition),
HaveGlobalState(
Box<Block>,
SignatureAcquisition,
TransactionAcquisition,
ExecutionResultsAcquisition,
),
HaveAllExecutionResults(
Box<Block>,
SignatureAcquisition,
TransactionAcquisition,
ExecutionResultsChecksum,
),
HaveApprovalsHashes(Box<Block>, SignatureAcquisition, TransactionAcquisition),
HaveAllDeploys(Box<Block>, SignatureAcquisition),
HaveStrictFinalitySignatures(Box<Block>, SignatureAcquisition),
HaveExecutableBlock(Box<Block>, Box<ExecutableBlock>, bool),
Complete(Box<Block>),
Failed(BlockHash, Option<u64>),
}
impl Display for BlockAcquisitionState {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
BlockAcquisitionState::Initialized(block_hash, _) => {
write!(f, "initialized for: {}", block_hash)
}
BlockAcquisitionState::HaveBlockHeader(block_header, _) => write!(
f,
"have block header({}) for: {}",
block_header.height(),
block_header.block_hash()
),
BlockAcquisitionState::HaveWeakFinalitySignatures(block_header, _) => write!(
f,
"have weak finality({}) for: {}",
block_header.height(),
block_header.block_hash()
),
BlockAcquisitionState::HaveBlock(block, _, _) => write!(
f,
"have block body({}) for: {}",
block.height(),
block.hash()
),
BlockAcquisitionState::HaveGlobalState(block, _, _, _) => write!(
f,
"have global state({}) for: {}",
block.height(),
block.hash()
),
BlockAcquisitionState::HaveAllExecutionResults(block, _, _, _) => write!(
f,
"have execution results({}) for: {}",
block.height(),
block.hash()
),
BlockAcquisitionState::HaveApprovalsHashes(block, _, _) => write!(
f,
"have approvals hashes({}) for: {}",
block.height(),
block.hash()
),
BlockAcquisitionState::HaveAllDeploys(block, _) => {
write!(f, "have deploys({}) for: {}", block.height(), block.hash())
}
BlockAcquisitionState::HaveStrictFinalitySignatures(block, _) => write!(
f,
"have strict finality({}) for: {}",
block.height(),
block.hash()
),
BlockAcquisitionState::HaveExecutableBlock(block, _, _) => write!(
f,
"have finalized block({}) for: {}",
block.height(),
*block.hash()
),
BlockAcquisitionState::Complete(block) => {
write!(
f,
"have complete block({}) for: {}",
block.height(),
*block.hash()
)
}
BlockAcquisitionState::Failed(block_hash, maybe_block_height) => {
write!(f, "fatal({:?}) for: {}", maybe_block_height, block_hash)
}
}
}
}
impl BlockAcquisitionState {
pub(crate) fn block_hash(&self) -> BlockHash {
match self {
BlockAcquisitionState::Initialized(block_hash, _)
| BlockAcquisitionState::Failed(block_hash, _) => *block_hash,
BlockAcquisitionState::HaveBlockHeader(block_header, _)
| BlockAcquisitionState::HaveWeakFinalitySignatures(block_header, _) => {
block_header.block_hash()
}
BlockAcquisitionState::HaveBlock(block, _, _)
| BlockAcquisitionState::HaveGlobalState(block, _, _, _)
| BlockAcquisitionState::HaveAllExecutionResults(block, _, _, _)
| BlockAcquisitionState::HaveApprovalsHashes(block, _, _)
| BlockAcquisitionState::HaveAllDeploys(block, _)
| BlockAcquisitionState::HaveStrictFinalitySignatures(block, _)
| BlockAcquisitionState::HaveExecutableBlock(block, ..)
| BlockAcquisitionState::Complete(block) => *block.hash(),
}
}
pub(crate) fn maybe_block(&self) -> Option<Box<Block>> {
match self {
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..) => None,
BlockAcquisitionState::HaveAllDeploys(block, _)
| BlockAcquisitionState::HaveStrictFinalitySignatures(block, _)
| BlockAcquisitionState::HaveBlock(block, _, _)
| BlockAcquisitionState::HaveGlobalState(block, _, _, _)
| BlockAcquisitionState::HaveAllExecutionResults(block, _, _, _)
| BlockAcquisitionState::HaveApprovalsHashes(block, _, _)
| BlockAcquisitionState::HaveExecutableBlock(block, _, _)
| BlockAcquisitionState::Complete(block) => Some(block.clone()),
}
}
}
#[derive(Clone, Copy, Debug, Display, PartialEq)]
#[must_use]
pub(super) enum Acceptance {
#[display(fmt = "had it")]
HadIt,
#[display(fmt = "needed it")]
NeededIt,
}
pub(super) struct RegisterExecResultsOutcome {
pub(super) exec_results: Option<HashMap<TransactionHash, ExecutionResult>>,
pub(super) acceptance: Option<Acceptance>,
}
#[cfg_attr(doc, aquamarine::aquamarine)]
impl BlockAcquisitionState {
pub(super) fn next_action(
&mut self,
peer_list: &PeerList,
validator_weights: &EraValidatorWeights,
rng: &mut NodeRng,
is_historical: bool,
max_simultaneous_peers: u8,
) -> Result<BlockAcquisitionAction, BlockAcquisitionError> {
let ret = match self {
BlockAcquisitionState::Initialized(block_hash, ..) => Ok(
BlockAcquisitionAction::block_header(peer_list, rng, *block_hash),
),
BlockAcquisitionState::HaveBlockHeader(block_header, signatures) => {
Ok(signatures_from_missing_validators(
validator_weights,
signatures,
max_simultaneous_peers,
peer_list,
rng,
block_header.era_id(),
block_header.block_hash(),
))
}
BlockAcquisitionState::HaveWeakFinalitySignatures(header, _) => Ok(
BlockAcquisitionAction::block_body(peer_list, rng, header.block_hash()),
),
BlockAcquisitionState::HaveBlock(block, signatures, transactions) => {
if is_historical {
Ok(BlockAcquisitionAction::global_state(
peer_list,
rng,
*block.hash(),
*block.state_root_hash(),
))
} else if transactions.needs_transaction() {
Ok(BlockAcquisitionAction::approvals_hashes(
block, peer_list, rng,
))
} else if signatures.has_sufficient_finality(is_historical, true) {
Ok(BlockAcquisitionAction::switch_to_have_sufficient_finality(
*block.hash(),
block.height(),
))
} else {
Ok(signatures_from_missing_validators(
validator_weights,
signatures,
max_simultaneous_peers,
peer_list,
rng,
block.era_id(),
*block.hash(),
))
}
}
BlockAcquisitionState::HaveGlobalState(
block,
signatures,
deploy_state,
exec_results,
) => {
if false == is_historical {
Err(BlockAcquisitionError::InvalidStateTransition)
} else if deploy_state.needs_transaction() {
BlockAcquisitionAction::maybe_execution_results(
block,
peer_list,
rng,
exec_results,
)
} else if signatures.has_sufficient_finality(is_historical, true) {
Ok(BlockAcquisitionAction::switch_to_have_sufficient_finality(
*block.hash(),
block.height(),
))
} else {
Ok(signatures_from_missing_validators(
validator_weights,
signatures,
max_simultaneous_peers,
peer_list,
rng,
block.era_id(),
*block.hash(),
))
}
}
BlockAcquisitionState::HaveAllExecutionResults(
block,
signatures,
deploys,
checksum,
) if is_historical => {
let is_checkable = checksum.is_checkable();
signatures.set_is_legacy(!is_checkable);
if is_checkable {
Ok(BlockAcquisitionAction::approvals_hashes(
block, peer_list, rng,
))
} else if let Some(needed_deploy) = deploys.next_needed_transaction() {
let deploy_hash = match needed_deploy {
TransactionIdentifier::ByHash(TransactionHash::Deploy(deploy_hash)) => {
deploy_hash
}
_ => return Err(BlockAcquisitionError::InvalidTransactionType),
};
debug!("BlockAcquisition: requesting missing deploy by hash");
Ok(BlockAcquisitionAction::legacy_deploy_by_hash(
*block.hash(),
deploy_hash,
peer_list,
rng,
))
} else {
Ok(
BlockAcquisitionAction::next_action_after_deploy_acquisition(
*block.hash(),
block.height(),
block.era_id(),
peer_list,
rng,
validator_weights,
signatures,
is_historical,
max_simultaneous_peers,
),
)
}
}
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, _) => {
Err(BlockAcquisitionError::InvalidStateTransition)
}
BlockAcquisitionState::HaveApprovalsHashes(block, signatures, transactions) => {
if let Some(needed_txn_id) = transactions.next_needed_transaction() {
let txn_id = match needed_txn_id {
TransactionIdentifier::ByHash(txn_hash) => {
Err(BlockAcquisitionError::MissingApprovalsHashes(txn_hash))
}
TransactionIdentifier::ById(txn_id) => Ok(txn_id),
}?;
debug!("BlockAcquisition: requesting missing transaction by ID");
Ok(BlockAcquisitionAction::transaction_by_id(
*block.hash(),
txn_id,
peer_list,
rng,
))
} else {
Ok(
BlockAcquisitionAction::next_action_after_deploy_acquisition(
*block.hash(),
block.height(),
block.era_id(),
peer_list,
rng,
validator_weights,
signatures,
is_historical,
max_simultaneous_peers,
),
)
}
}
BlockAcquisitionState::HaveAllDeploys(block, signatures) => {
if signatures.has_sufficient_finality(is_historical, true) {
Ok(BlockAcquisitionAction::switch_to_have_sufficient_finality(
*block.hash(),
block.height(),
))
} else {
Ok(signatures_from_missing_validators(
validator_weights,
signatures,
max_simultaneous_peers,
peer_list,
rng,
block.era_id(),
*block.hash(),
))
}
}
BlockAcquisitionState::HaveStrictFinalitySignatures(block, ..) => {
if is_historical {
Ok(BlockAcquisitionAction::block_marked_complete(
*block.hash(),
block.height(),
))
} else {
Ok(BlockAcquisitionAction::make_executable_block(
*block.hash(),
block.height(),
))
}
}
BlockAcquisitionState::HaveExecutableBlock(block, executable_block, enqueued) => {
if is_historical {
Err(BlockAcquisitionError::InvalidStateTransition)
} else if *enqueued == false {
Ok(BlockAcquisitionAction::enqueue_block_for_execution(
block.hash(),
executable_block.clone(),
))
} else {
Ok(BlockAcquisitionAction::need_nothing(*block.hash()))
}
}
BlockAcquisitionState::Complete(block) => {
Ok(BlockAcquisitionAction::need_nothing(*block.hash()))
}
BlockAcquisitionState::Failed(block_hash, ..) => {
Ok(BlockAcquisitionAction::need_nothing(*block_hash))
}
};
ret
}
pub(super) fn block_height(&self) -> Option<u64> {
match self {
BlockAcquisitionState::Initialized(..) | BlockAcquisitionState::Failed(..) => None,
BlockAcquisitionState::HaveBlockHeader(header, _)
| BlockAcquisitionState::HaveWeakFinalitySignatures(header, _) => Some(header.height()),
BlockAcquisitionState::HaveExecutableBlock(_, executable_block, _) => {
Some(executable_block.height)
}
BlockAcquisitionState::HaveBlock(block, _, _)
| BlockAcquisitionState::HaveGlobalState(block, ..)
| BlockAcquisitionState::HaveAllExecutionResults(block, _, _, _)
| BlockAcquisitionState::HaveApprovalsHashes(block, _, _)
| BlockAcquisitionState::HaveAllDeploys(block, ..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(block, _)
| BlockAcquisitionState::Complete(block) => Some(block.height()),
}
}
pub(super) fn register_block_header(
&mut self,
header: BlockHeader,
strict_finality_protocol_version: ProtocolVersion,
is_historical: bool,
) -> Result<Option<Acceptance>, BlockAcquisitionError> {
let new_state = match self {
BlockAcquisitionState::Initialized(block_hash, signatures) => {
if header.block_hash() == *block_hash {
info!(
"BlockAcquisition: registering header for: {:?}, height: {}",
block_hash,
header.height()
);
let is_legacy_block = is_historical
&& header.protocol_version() < strict_finality_protocol_version;
signatures.set_is_legacy(is_legacy_block);
BlockAcquisitionState::HaveBlockHeader(Box::new(header), signatures.clone())
} else {
return Err(BlockAcquisitionError::BlockHashMismatch {
expected: *block_hash,
actual: header.block_hash(),
});
}
}
BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => return Ok(None),
};
self.set_state(new_state);
Ok(Some(Acceptance::NeededIt))
}
pub(super) fn register_block(
&mut self,
block: Block,
need_execution_state: bool,
) -> Result<Option<Acceptance>, BlockAcquisitionError> {
let new_state = match self {
BlockAcquisitionState::HaveWeakFinalitySignatures(header, signatures) => {
let expected_block_hash = header.block_hash();
let actual_block_hash = block.hash();
if *actual_block_hash != expected_block_hash {
return Err(BlockAcquisitionError::BlockHashMismatch {
expected: expected_block_hash,
actual: *actual_block_hash,
});
}
info!(
"BlockAcquisition: registering block for: {}",
header.block_hash()
);
let transaction_hashes = match &block {
Block::V1(v1) => v1
.deploy_and_transfer_hashes()
.copied()
.map(TransactionHash::from)
.collect(),
Block::V2(v2) => v2.all_transactions().copied().collect(),
};
let deploy_acquisition =
TransactionAcquisition::new_by_hash(transaction_hashes, need_execution_state);
BlockAcquisitionState::HaveBlock(
Box::new(block),
signatures.clone(),
deploy_acquisition,
)
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(None);
}
};
self.set_state(new_state);
Ok(Some(Acceptance::NeededIt))
}
pub(super) fn switch_to_have_strict_finality(
&mut self,
block_hash: BlockHash,
is_historical: bool,
) -> Result<(), BlockAcquisitionError> {
if block_hash != self.block_hash() {
return Err(BlockAcquisitionError::BlockHashMismatch {
expected: self.block_hash(),
actual: block_hash,
});
}
let maybe_new_state = match self {
BlockAcquisitionState::HaveBlock(block, acquired_signatures, ..)
| BlockAcquisitionState::HaveGlobalState(block, acquired_signatures, ..)
| BlockAcquisitionState::HaveAllDeploys(block, acquired_signatures)
| BlockAcquisitionState::HaveApprovalsHashes(block, acquired_signatures, ..) => {
if acquired_signatures.has_sufficient_finality(is_historical, true) {
Some(BlockAcquisitionState::HaveStrictFinalitySignatures(
block.clone(),
acquired_signatures.clone(),
))
} else {
return Err(BlockAcquisitionError::InvalidStateTransition);
}
}
BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Complete(..) => None,
};
if let Some(new_state) = maybe_new_state {
self.set_state(new_state);
};
Ok(())
}
pub(super) fn register_finality_signature_pending(&mut self, validator: PublicKey) {
match self {
BlockAcquisitionState::HaveBlockHeader(_, acquired_signatures)
| BlockAcquisitionState::HaveBlock(_, acquired_signatures, ..)
| BlockAcquisitionState::HaveGlobalState(_, acquired_signatures, ..)
| BlockAcquisitionState::HaveApprovalsHashes(_, acquired_signatures, ..)
| BlockAcquisitionState::HaveAllExecutionResults(_, acquired_signatures, ..)
| BlockAcquisitionState::HaveAllDeploys(_, acquired_signatures)
| BlockAcquisitionState::HaveStrictFinalitySignatures(_, acquired_signatures)
| BlockAcquisitionState::HaveWeakFinalitySignatures(_, acquired_signatures) => {
acquired_signatures.register_pending(validator);
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {}
};
}
pub(super) fn actively_acquiring_signatures(&self, is_historical: bool) -> bool {
match self {
BlockAcquisitionState::HaveBlockHeader(..) => true,
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => false,
BlockAcquisitionState::HaveBlock(_, acquired_signatures, acquired_transactions) => {
!is_historical
&& acquired_transactions.needs_transaction() == false
&& acquired_signatures.signature_weight() != SignatureWeight::Strict
}
BlockAcquisitionState::HaveGlobalState(
_,
acquired_signatures,
acquired_deploys,
..,
) => {
acquired_deploys.needs_transaction() == false
&& acquired_signatures.signature_weight() != SignatureWeight::Strict
}
BlockAcquisitionState::HaveApprovalsHashes(
_,
acquired_signatures,
acquired_deploys,
) => {
acquired_deploys.needs_transaction() == false
&& acquired_signatures.signature_weight() != SignatureWeight::Strict
}
BlockAcquisitionState::HaveAllExecutionResults(
_,
acquired_signatures,
acquired_deploys,
..,
) => {
acquired_signatures.is_legacy()
&& acquired_deploys.needs_transaction() == false
&& acquired_signatures.signature_weight() != SignatureWeight::Strict
}
BlockAcquisitionState::HaveAllDeploys(_, acquired_signatures) => {
acquired_signatures.signature_weight() != SignatureWeight::Strict
}
}
}
pub(super) fn register_finality_signature(
&mut self,
signature: FinalitySignature,
validator_weights: &EraValidatorWeights,
is_historical: bool,
) -> Result<Option<Acceptance>, BlockAcquisitionError> {
let cloned_sig = signature.clone();
let signer = signature.public_key().clone();
let acceptance: Acceptance;
let maybe_block_hash: Option<BlockHash>;
let currently_acquiring_sigs = self.actively_acquiring_signatures(is_historical);
let maybe_new_state: Option<BlockAcquisitionState> = match self {
BlockAcquisitionState::HaveBlockHeader(header, acquired_signatures) => {
maybe_block_hash = Some(header.block_hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
if acquired_signatures.has_sufficient_finality(is_historical, false) {
Some(BlockAcquisitionState::HaveWeakFinalitySignatures(
header.clone(),
acquired_signatures.clone(),
))
} else {
None
}
}
BlockAcquisitionState::HaveBlock(block, acquired_signatures, acquired_transactions) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
if !is_historical
&& acquired_transactions.needs_transaction() == false
&& acquired_signatures.has_sufficient_finality(is_historical, true)
{
Some(BlockAcquisitionState::HaveStrictFinalitySignatures(
block.clone(),
acquired_signatures.clone(),
))
} else {
None
}
}
BlockAcquisitionState::HaveGlobalState(
block,
acquired_signatures,
acquired_deploys,
..,
) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
if !acquired_deploys.needs_transaction()
&& acquired_signatures.has_sufficient_finality(is_historical, true)
{
Some(BlockAcquisitionState::HaveStrictFinalitySignatures(
block.clone(),
acquired_signatures.clone(),
))
} else {
None
}
}
BlockAcquisitionState::HaveApprovalsHashes(block, acquired_signatures, ..) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
None
}
BlockAcquisitionState::HaveAllExecutionResults(
block,
acquired_signatures,
acquired_deploys,
..,
) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
if acquired_signatures.is_legacy()
&& acquired_deploys.needs_transaction() == false
&& acquired_signatures.has_sufficient_finality(is_historical, true)
{
Some(BlockAcquisitionState::HaveStrictFinalitySignatures(
block.clone(),
acquired_signatures.clone(),
))
} else {
None
}
}
BlockAcquisitionState::HaveAllDeploys(block, acquired_signatures) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
if acquired_signatures.has_sufficient_finality(is_historical, true) {
Some(BlockAcquisitionState::HaveStrictFinalitySignatures(
block.clone(),
acquired_signatures.clone(),
))
} else {
None
}
}
BlockAcquisitionState::HaveStrictFinalitySignatures(block, acquired_signatures) => {
maybe_block_hash = Some(*block.hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
None
}
BlockAcquisitionState::HaveWeakFinalitySignatures(header, acquired_signatures) => {
maybe_block_hash = Some(header.block_hash());
acceptance = acquired_signatures.apply_signature(signature, validator_weights);
None
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => return Ok(None),
};
let ret = currently_acquiring_sigs.then_some(acceptance);
info!(
signature=%cloned_sig,
?ret,
"BlockAcquisition: registering finality signature for: {}",
if let Some(block_hash) = maybe_block_hash {
block_hash.to_string()
} else {
"unknown block".to_string()
}
);
self.log_finality_signature_acceptance(&maybe_block_hash, &signer, ret);
if let Some(new_state) = maybe_new_state {
self.set_state(new_state);
}
Ok(ret)
}
pub(super) fn register_approvals_hashes(
&mut self,
approvals_hashes: &ApprovalsHashes,
need_execution_state: bool,
) -> Result<Option<Acceptance>, BlockAcquisitionError> {
let new_state = match self {
BlockAcquisitionState::HaveBlock(block, signatures, acquired)
if !need_execution_state =>
{
info!(
"BlockAcquisition: registering approvals hashes for: {}",
block.hash()
);
acquired.apply_approvals_hashes(approvals_hashes)?;
BlockAcquisitionState::HaveApprovalsHashes(
block.clone(),
signatures.clone(),
acquired.clone(),
)
}
BlockAcquisitionState::HaveAllExecutionResults(block, signatures, transactions, _)
if need_execution_state =>
{
transactions.apply_approvals_hashes(approvals_hashes)?;
info!(
"BlockAcquisition: registering approvals hashes for: {}",
block.hash()
);
BlockAcquisitionState::HaveApprovalsHashes(
block.clone(),
signatures.clone(),
transactions.clone(),
)
}
BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(_, _, _)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(None);
}
};
self.set_state(new_state);
Ok(Some(Acceptance::NeededIt))
}
pub(super) fn register_global_state(
&mut self,
root_hash: Digest,
need_execution_state: bool,
) -> Result<(), BlockAcquisitionError> {
let new_state = match self {
BlockAcquisitionState::HaveBlock(block, signatures, transactions)
if need_execution_state =>
{
info!(
"BlockAcquisition: registering global state for: {}",
block.hash()
);
if block.state_root_hash() == &root_hash {
let block_hash = *block.hash();
BlockAcquisitionState::HaveGlobalState(
block.clone(),
signatures.clone(),
transactions.clone(),
ExecutionResultsAcquisition::Needed { block_hash },
)
} else {
return Err(BlockAcquisitionError::RootHashMismatch {
expected: *block.state_root_hash(),
actual: root_hash,
});
}
}
BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
self.set_state(new_state);
Ok(())
}
pub(super) fn register_execution_results_checksum(
&mut self,
execution_results_checksum: ExecutionResultsChecksum,
need_execution_state: bool,
) -> Result<(), BlockAcquisitionError> {
debug!(state=%self, need_execution_state, "BlockAcquisitionState: register_execution_results_checksum");
match self {
BlockAcquisitionState::HaveGlobalState(
block,
_,
_,
acq @ ExecutionResultsAcquisition::Needed { .. },
) if need_execution_state => {
info!(
"BlockAcquisition: registering execution results hash for: {}",
block.hash()
);
*acq = acq
.clone()
.apply_checksum(execution_results_checksum)
.map_err(BlockAcquisitionError::ExecutionResults)?;
}
BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
Ok(())
}
pub(super) fn register_execution_results_or_chunk(
&mut self,
block_execution_results_or_chunk: BlockExecutionResultsOrChunk,
need_execution_state: bool,
) -> Result<RegisterExecResultsOutcome, BlockAcquisitionError> {
debug!(state=%self, need_execution_state,
block_execution_results_or_chunk=%block_execution_results_or_chunk,
"register_execution_results_or_chunk");
let (new_state, maybe_exec_results, acceptance) = match self {
BlockAcquisitionState::HaveGlobalState(
block,
signatures,
deploys,
exec_results_acq,
) if need_execution_state => {
info!(
"BlockAcquisition: registering execution result or chunk for: {}",
block.hash()
);
let transaction_hashes = match block.as_ref() {
Block::V1(v1) => v1
.deploy_and_transfer_hashes()
.copied()
.map(TransactionHash::from)
.collect(),
Block::V2(v2) => v2.all_transactions().copied().collect(),
};
match exec_results_acq
.clone()
.apply_block_execution_results_or_chunk(
block_execution_results_or_chunk,
transaction_hashes,
) {
Ok((new_acquisition, acceptance)) => match new_acquisition {
ExecutionResultsAcquisition::Needed { .. }
| ExecutionResultsAcquisition::Pending { .. } => {
debug!("apply_block_execution_results_or_chunk: Needed | Pending");
return Ok(RegisterExecResultsOutcome {
exec_results: None,
acceptance: Some(acceptance),
});
}
ExecutionResultsAcquisition::Complete { ref results, .. } => {
debug!("apply_block_execution_results_or_chunk: Complete");
let new_state = BlockAcquisitionState::HaveGlobalState(
block.clone(),
signatures.clone(),
deploys.clone(),
new_acquisition.clone(),
);
let maybe_exec_results = Some(results.clone());
(new_state, maybe_exec_results, acceptance)
}
ExecutionResultsAcquisition::Acquiring { .. } => {
debug!("apply_block_execution_results_or_chunk: Acquiring");
let new_state = BlockAcquisitionState::HaveGlobalState(
block.clone(),
signatures.clone(),
deploys.clone(),
new_acquisition,
);
let maybe_exec_results = None;
(new_state, maybe_exec_results, acceptance)
}
},
Err(error) => {
warn!(%error, "failed to apply execution results");
return Err(BlockAcquisitionError::ExecutionResults(error));
}
}
}
BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(RegisterExecResultsOutcome {
exec_results: None,
acceptance: None,
});
}
};
self.set_state(new_state);
Ok(RegisterExecResultsOutcome {
exec_results: maybe_exec_results,
acceptance: Some(acceptance),
})
}
pub(super) fn register_execution_results_stored_notification(
&mut self,
need_execution_state: bool,
) -> Result<(), BlockAcquisitionError> {
let new_state = match self {
BlockAcquisitionState::HaveGlobalState(
block,
signatures,
deploys,
ExecutionResultsAcquisition::Complete { checksum, .. },
) if need_execution_state => {
info!(
"BlockAcquisition: registering execution results stored notification for: {}",
block.hash()
);
BlockAcquisitionState::HaveAllExecutionResults(
block.clone(),
signatures.clone(),
deploys.clone(),
*checksum,
)
}
BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
self.set_state(new_state);
Ok(())
}
pub(super) fn register_deploy(
&mut self,
txn_id: TransactionId,
is_historical: bool,
) -> Result<Option<Acceptance>, BlockAcquisitionError> {
let (block, signatures, deploys) = match self {
BlockAcquisitionState::HaveBlock(block, signatures, transactions)
if false == is_historical =>
{
(block, signatures, transactions)
}
BlockAcquisitionState::HaveApprovalsHashes(block, signatures, transactions) => {
(block, signatures, transactions)
}
BlockAcquisitionState::HaveAllExecutionResults(
block,
signatures,
deploys,
checksum,
) if is_historical => match checksum {
ExecutionResultsChecksum::Uncheckable => (block, signatures, deploys),
ExecutionResultsChecksum::Checkable(_) => {
return Err(BlockAcquisitionError::InvalidAttemptToApplyTransaction { txn_id });
}
},
BlockAcquisitionState::Initialized(_, _)
| BlockAcquisitionState::HaveBlockHeader(_, _)
| BlockAcquisitionState::HaveWeakFinalitySignatures(_, _)
| BlockAcquisitionState::HaveBlock(_, _, _)
| BlockAcquisitionState::HaveGlobalState(_, _, _, _)
| BlockAcquisitionState::HaveAllExecutionResults(_, _, _, _)
| BlockAcquisitionState::HaveAllDeploys(_, _)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(_, _)
| BlockAcquisitionState::Failed(_, _)
| BlockAcquisitionState::Complete(..) => {
debug!(
?txn_id,
"BlockAcquisition: invalid attempt to register deploy for: {}",
self.block_hash()
);
return Ok(None);
}
};
info!("BlockAcquisition: registering deploy for: {}", block.hash());
let maybe_acceptance = deploys.apply_transaction(txn_id);
if !deploys.needs_transaction() {
let new_state =
BlockAcquisitionState::HaveAllDeploys(block.clone(), signatures.clone());
self.set_state(new_state);
}
Ok(maybe_acceptance)
}
pub(super) fn register_made_finalized_block(
&mut self,
need_execution_state: bool,
executable_block: ExecutableBlock,
) -> Result<(), BlockAcquisitionError> {
if need_execution_state {
return Err(BlockAcquisitionError::InvalidAttemptToEnqueueBlockForExecution);
}
let new_state = match self {
BlockAcquisitionState::HaveStrictFinalitySignatures(block, _) => {
BlockAcquisitionState::HaveExecutableBlock(
block.clone(),
Box::new(executable_block),
false,
)
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
self.set_state(new_state);
Ok(())
}
pub(super) fn register_block_execution_enqueued(
&mut self,
) -> Result<(), BlockAcquisitionError> {
match self {
BlockAcquisitionState::HaveExecutableBlock(block, _, enqueued) => {
info!(
"BlockAcquisition: registering block enqueued for execution for: {}",
block
);
*enqueued = true;
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {}
};
Ok(())
}
pub(super) fn register_block_executed(
&mut self,
need_execution_state: bool,
) -> Result<(), BlockAcquisitionError> {
if need_execution_state {
return Err(BlockAcquisitionError::InvalidAttemptToEnqueueBlockForExecution);
}
let new_state = match self {
BlockAcquisitionState::HaveExecutableBlock(block, _, _) => {
info!(
"BlockAcquisition: registering block executed for: {}",
*block.hash()
);
BlockAcquisitionState::Complete(block.clone())
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveStrictFinalitySignatures(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
self.set_state(new_state);
Ok(())
}
pub(super) fn register_marked_complete(
&mut self,
need_execution_state: bool,
) -> Result<(), BlockAcquisitionError> {
if !need_execution_state {
return Err(BlockAcquisitionError::InvalidAttemptToMarkComplete);
}
let new_state = match self {
BlockAcquisitionState::HaveStrictFinalitySignatures(block, _) => {
info!(
"BlockAcquisition: registering marked complete for: {}",
*block.hash()
);
BlockAcquisitionState::Complete(block.clone())
}
BlockAcquisitionState::Initialized(..)
| BlockAcquisitionState::HaveWeakFinalitySignatures(..)
| BlockAcquisitionState::HaveBlockHeader(..)
| BlockAcquisitionState::HaveBlock(..)
| BlockAcquisitionState::HaveGlobalState(..)
| BlockAcquisitionState::HaveAllExecutionResults(..)
| BlockAcquisitionState::HaveApprovalsHashes(..)
| BlockAcquisitionState::HaveAllDeploys(..)
| BlockAcquisitionState::HaveExecutableBlock(..)
| BlockAcquisitionState::Failed(..)
| BlockAcquisitionState::Complete(..) => {
return Ok(());
}
};
self.set_state(new_state);
Ok(())
}
fn log_finality_signature_acceptance(
&self,
maybe_block_hash: &Option<BlockHash>,
signer: &PublicKey,
acceptance: Option<Acceptance>,
) {
match maybe_block_hash {
None => {
error!(
"BlockAcquisition: unknown block_hash for finality signature from {}",
signer
);
}
Some(block_hash) => match acceptance {
Some(Acceptance::HadIt) => {
trace!(
"BlockAcquisition: existing finality signature for {:?} from {}",
block_hash,
signer
);
}
Some(Acceptance::NeededIt) => {
debug!(
"BlockAcquisition: new finality signature for {:?} from {}",
block_hash, signer
);
}
None => {
debug!(
"BlockAcquisition: finality signature for {:?} from {} while not actively \
trying to acquire finality signatures",
block_hash, signer
);
}
},
}
}
fn set_state(&mut self, new_state: BlockAcquisitionState) {
debug!(
"BlockAcquisition: {} (transitioned from: {})",
new_state, self
);
*self = new_state;
}
}
pub(super) fn signatures_from_missing_validators(
validator_weights: &EraValidatorWeights,
signatures: &mut SignatureAcquisition,
max_simultaneous_peers: u8,
peer_list: &PeerList,
rng: &mut NodeRng,
era_id: EraId,
block_hash: BlockHash,
) -> BlockAcquisitionAction {
let mut missing_signatures_in_random_order: HashSet<PublicKey> = validator_weights
.missing_validators(signatures.not_vacant())
.cloned()
.collect();
if (missing_signatures_in_random_order.len() as u8) < max_simultaneous_peers {
missing_signatures_in_random_order.extend(
validator_weights
.missing_validators(signatures.not_pending())
.cloned(),
);
}
BlockAcquisitionAction::finality_signatures(
peer_list,
rng,
era_id,
block_hash,
missing_signatures_in_random_order.into_iter().collect(),
)
}