use indexmap::IndexMap;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
watch,
};
use zebra_chain::{
block::{self, Height},
transparent::EXTRA_ZEBRA_COINBASE_DATA,
};
use crate::{
constants::MAX_BLOCK_REORG_HEIGHT,
service::{
check,
finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::NonFinalizedState,
queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
BoxError, ChainTipBlock, ChainTipSender, CloneError,
},
CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
};
#[allow(unused_imports)]
use crate::service::{
chain_tip::{ChainTipChange, LatestChainTip},
non_finalized_state::Chain,
};
const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
#[tracing::instrument(
level = "debug",
skip(finalized_state, non_finalized_state, prepared),
fields(
height = ?prepared.height,
hash = %prepared.hash,
chains = non_finalized_state.chain_count()
)
)]
pub(crate) fn validate_and_commit_non_finalized(
finalized_state: &ZebraDb,
non_finalized_state: &mut NonFinalizedState,
prepared: SemanticallyVerifiedBlock,
) -> Result<(), CommitSemanticallyVerifiedError> {
check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
let parent_hash = prepared.block.header.previous_block_hash;
if finalized_state.finalized_tip_hash() == parent_hash {
non_finalized_state.commit_new_chain(prepared, finalized_state)?;
} else {
non_finalized_state.commit_block(prepared, finalized_state)?;
}
Ok(())
}
#[instrument(
level = "debug",
skip(
non_finalized_state,
chain_tip_sender,
non_finalized_state_sender,
last_zebra_mined_log_height
),
fields(chains = non_finalized_state.chain_count())
)]
fn update_latest_chain_channels(
non_finalized_state: &NonFinalizedState,
chain_tip_sender: &mut ChainTipSender,
non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
last_zebra_mined_log_height: &mut Option<Height>,
) -> block::Height {
let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
let tip_block = best_chain
.tip_block()
.expect("unexpected empty chain: must commit at least one block before updating channels")
.clone();
let tip_block = ChainTipBlock::from(tip_block);
log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
let tip_block_height = tip_block.height;
let _ = non_finalized_state_sender.send(non_finalized_state.clone());
chain_tip_sender.set_best_non_finalized_tip(tip_block);
tip_block_height
}
#[allow(clippy::too_many_arguments)]
#[instrument(
level = "debug",
skip(
finalized_block_write_receiver,
non_finalized_block_write_receiver,
finalized_state,
non_finalized_state,
invalid_block_reset_sender,
chain_tip_sender,
non_finalized_state_sender,
),
fields(
network = %non_finalized_state.network
)
)]
pub fn write_blocks_from_channels(
mut finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
mut non_finalized_block_write_receiver: UnboundedReceiver<QueuedSemanticallyVerified>,
mut finalized_state: FinalizedState,
mut non_finalized_state: NonFinalizedState,
invalid_block_reset_sender: UnboundedSender<block::Hash>,
mut chain_tip_sender: ChainTipSender,
non_finalized_state_sender: watch::Sender<NonFinalizedState>,
) {
let mut last_zebra_mined_log_height = None;
let mut prev_finalized_note_commitment_trees = None;
while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
if invalid_block_reset_sender.is_closed() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
let next_valid_height = finalized_state
.db
.finalized_tip_height()
.map(|height| (height + 1).expect("committed heights are valid"))
.unwrap_or(Height(0));
if ordered_block.0.height != next_valid_height {
debug!(
?next_valid_height,
invalid_height = ?ordered_block.0.height,
invalid_hash = ?ordered_block.0.hash,
"got a block that was the wrong height. \
Assuming a parent block failed, and dropping this block",
);
std::mem::drop(ordered_block);
continue;
}
match finalized_state
.commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
{
Ok((finalized, note_commitment_trees)) => {
let tip_block = ChainTipBlock::from(finalized);
prev_finalized_note_commitment_trees = Some(note_commitment_trees);
log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
chain_tip_sender.set_finalized_tip(tip_block);
}
Err(error) => {
let finalized_tip = finalized_state.db.tip();
info!(
?error,
last_valid_height = ?finalized_tip.map(|tip| tip.0),
last_valid_hash = ?finalized_tip.map(|tip| tip.1),
"committing a block to the finalized state failed, resetting state queue",
);
let send_result =
invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
if send_result.is_err() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
}
}
}
if invalid_block_reset_sender.is_closed() {
info!("StateService closed the block reset channel. Is Zebra shutting down?");
return;
}
let mut parent_error_map: IndexMap<block::Hash, CloneError> = IndexMap::new();
while let Some((queued_child, rsp_tx)) = non_finalized_block_write_receiver.blocking_recv() {
let child_hash = queued_child.hash;
let parent_hash = queued_child.block.header.previous_block_hash;
let parent_error = parent_error_map.get(&parent_hash);
let result;
if let Some(parent_error) = parent_error {
tracing::trace!(
?child_hash,
?parent_error,
"rejecting queued child due to parent error"
);
result = Err(parent_error.clone());
} else {
tracing::trace!(?child_hash, "validating queued child");
result = validate_and_commit_non_finalized(
&finalized_state.db,
&mut non_finalized_state,
queued_child,
)
.map_err(CloneError::from);
}
if let Err(ref error) = result {
let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
parent_error_map.insert(child_hash, error.clone());
if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
parent_error_map.shift_remove_index(0);
}
continue;
}
let tip_block_height = update_latest_chain_channels(
&non_finalized_state,
&mut chain_tip_sender,
&non_finalized_state_sender,
&mut last_zebra_mined_log_height,
);
let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
while non_finalized_state
.best_chain_len()
.expect("just successfully inserted a non-finalized block above")
> MAX_BLOCK_REORG_HEIGHT
{
tracing::trace!("finalizing block past the reorg limit");
let contextually_verified_with_trees = non_finalized_state.finalize();
prev_finalized_note_commitment_trees = finalized_state
.commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
.expect(
"unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
).1.into();
}
metrics::counter!("state.full_verifier.committed.block.count").increment(1);
metrics::counter!("zcash.chain.verified.block.total").increment(1);
metrics::gauge!("state.full_verifier.committed.block.height")
.set(tip_block_height.0 as f64);
metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
tracing::trace!("finished processing queued block");
}
finalized_state.db.shutdown(true);
std::mem::drop(finalized_state);
}
fn log_if_mined_by_zebra(
tip_block: &ChainTipBlock,
last_zebra_mined_log_height: &mut Option<Height>,
) {
const LOG_RATE_LIMIT: u32 = 1000;
let height = tip_block.height.0;
if let Some(last_height) = last_zebra_mined_log_height {
if height < last_height.0 + LOG_RATE_LIMIT {
return;
}
};
let coinbase_data = tip_block.transactions[0].inputs()[0]
.extra_coinbase_data()
.expect("valid blocks must start with a coinbase input")
.clone();
if coinbase_data
.as_ref()
.starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
{
let text = String::from_utf8_lossy(coinbase_data.as_ref());
*last_zebra_mined_log_height = Some(Height(height));
if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
info!(
%text,
%height,
hash = %tip_block.hash,
"looks like this block was mined by Zebra!"
);
} else {
let text = text.replace(
|c: char| {
!EXTRA_ZEBRA_COINBASE_DATA
.to_ascii_lowercase()
.contains(c.to_ascii_lowercase())
},
"?",
);
let data = hex::encode(coinbase_data.as_ref());
info!(
%text,
%data,
%height,
hash = %tip_block.hash,
"looks like this block was mined by Zebra!"
);
}
}
}