use super::{finalised_state::ZainoDB, source::BlockchainSource, NON_FINALIZED_DEPTH};
use crate::{
chain_index::types::{
self, BlockHash, BlockIndex, BlockMetadata, BlockWithMetadata, Height, TreeRootData,
},
error::FinalisedStateError,
ChainWork, IndexedBlock,
};
use arc_swap::ArcSwap;
use futures::lock::Mutex;
use primitive_types::U256;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;
use tracing::{info, instrument, warn};
use zebra_chain::{parameters::Network, serialization::BytesInDisplayOrder};
use zebra_state::HashOrHeight;
#[derive(Debug)]
pub struct NonFinalizedState<Source: BlockchainSource> {
pub(super) source: Source,
current: ArcSwap<NonfinalizedBlockCacheSnapshot>,
pub(crate) network: Network,
#[allow(clippy::type_complexity)]
nfs_change_listener: Option<
Mutex<
tokio::sync::mpsc::Receiver<(zebra_chain::block::Hash, Arc<zebra_chain::block::Block>)>,
>,
>,
}
#[derive(Debug, Clone)]
pub enum ChainIndexSnapshot {
NonFinalizedStateExists {
#[allow(private_interfaces)]
non_finalized_snapshot: Arc<NonfinalizedBlockCacheSnapshot>,
},
StillSyncingFinalizedState {
validator_finalized_height: Height,
},
}
impl ChainIndexSnapshot {
pub(crate) fn get_nfs_snapshot(&self) -> Option<&NonfinalizedBlockCacheSnapshot> {
match self {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => Some(non_finalized_snapshot),
ChainIndexSnapshot::StillSyncingFinalizedState { .. } => None,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct NonfinalizedBlockCacheSnapshot {
pub blocks: HashMap<BlockHash, IndexedBlock>,
pub heights_to_hashes: HashMap<Height, BlockHash>,
pub best_tip: BlockIndex,
}
#[derive(Debug)]
pub enum NodeConnectionError {
BadUri(String),
ConnectionFailure(reqwest::Error),
UnrecoverableError(Box<dyn std::error::Error + Send>),
}
#[derive(Debug)]
struct MissingBlockError(String);
impl std::fmt::Display for MissingBlockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "missing block: {}", self.0)
}
}
impl std::error::Error for MissingBlockError {}
#[derive(Debug, thiserror::Error)]
pub enum SyncError {
#[error("failed to connect to validator: {0:?}")]
ValidatorConnectionError(NodeConnectionError),
#[error("transient source error: {0}")]
ErrorFromSource(Box<dyn std::error::Error + Send>),
#[error("staging channel closed. Shutdown in progress")]
StagingChannelClosed,
#[error("multiple sync processes running")]
CompetingSyncProcess,
#[error("reorg failed: {0}")]
ReorgFailure(String),
#[error("error reading nonfinalized state")]
CannotReadFinalizedState(#[from] FinalisedStateError),
}
impl From<UpdateError> for SyncError {
fn from(value: UpdateError) -> Self {
match value {
UpdateError::ReceiverDisconnected => SyncError::StagingChannelClosed,
UpdateError::StaleSnapshot => SyncError::CompetingSyncProcess,
UpdateError::FinalizedStateCorruption => SyncError::CannotReadFinalizedState(
FinalisedStateError::Custom("mystery update failure".to_string()),
),
UpdateError::DatabaseHole => {
SyncError::ReorgFailure(String::from("could not determine best chain"))
}
UpdateError::ValidatorConnectionError(e) => SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(MissingBlockError(e.to_string()))),
),
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("Genesis block missing in validator")]
struct MissingGenesisBlock;
#[derive(thiserror::Error, Debug)]
#[error("data from validator invalid: {0}")]
struct InvalidData(String);
#[derive(Debug, thiserror::Error)]
pub enum InitError {
#[error("zebra returned invalid data: {0}")]
InvalidNodeData(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
MempoolInitialzationError(#[from] crate::error::MempoolError),
#[error(transparent)]
FinalisedStateInitialzationError(#[from] FinalisedStateError),
#[error("initial block not on best chain")]
InitalBlockMissingHeight,
}
impl BlockIndex {
fn from_block(block: &IndexedBlock) -> Self {
let height = block.height();
let hash = *block.hash();
Self { height, hash }
}
}
impl NonfinalizedBlockCacheSnapshot {
fn from_initial_block(block: IndexedBlock) -> Self {
let best_tip = BlockIndex::from_block(&block);
let hash = *block.hash();
let height = best_tip.height;
let mut blocks = HashMap::new();
let mut heights_to_hashes = HashMap::new();
blocks.insert(hash, block);
heights_to_hashes.insert(height, hash);
Self {
blocks,
heights_to_hashes,
best_tip,
}
}
fn add_block_new_chaintip(&mut self, block: IndexedBlock) {
self.best_tip = BlockIndex {
height: block.height(),
hash: *block.hash(),
};
self.add_block(block)
}
fn get_block_by_hash_bytes_in_serialized_order(&self, hash: [u8; 32]) -> Option<&IndexedBlock> {
self.blocks
.values()
.find(|block| block.hash_bytes_serialized_order() == hash)
}
fn remove_finalized_blocks(&mut self, finalized_height: Height) {
self.blocks
.retain(|_hash, block| block.height() >= finalized_height);
self.heights_to_hashes
.retain(|height, _hash| height >= &finalized_height);
}
fn add_block(&mut self, block: IndexedBlock) {
self.heights_to_hashes.insert(block.height(), *block.hash());
self.blocks.insert(*block.hash(), block);
}
}
impl<Source: BlockchainSource> NonFinalizedState<Source> {
#[instrument(name = "NonFinalizedState::initialize", skip(source, start_block), fields(network = %network))]
pub async fn initialize(
source: Source,
network: Network,
start_block: Option<IndexedBlock>,
) -> Result<Self, InitError> {
info!(network = %network, "Initializing non-finalized state");
let initial_block = Self::resolve_initial_block(&source, &network, start_block).await?;
let snapshot = NonfinalizedBlockCacheSnapshot::from_initial_block(initial_block);
let nfs_change_listener = Self::setup_listener(&source).await;
Ok(Self {
source,
current: ArcSwap::new(Arc::new(snapshot)),
network,
nfs_change_listener,
})
}
async fn get_genesis_indexed_block(
source: &Source,
network: &Network,
) -> Result<IndexedBlock, InitError> {
let genesis_block = source
.get_block(HashOrHeight::Height(zebra_chain::block::Height(0)))
.await
.map_err(|e| InitError::InvalidNodeData(Box::new(e)))?
.ok_or_else(|| InitError::InvalidNodeData(Box::new(MissingGenesisBlock)))?;
let (sapling_root_and_len, orchard_root_and_len) = source
.get_commitment_tree_roots(genesis_block.hash().into())
.await
.map_err(|e| InitError::InvalidNodeData(Box::new(e)))?;
let tree_roots = TreeRootData {
sapling: sapling_root_and_len,
orchard: orchard_root_and_len,
};
let genesis_work = ChainWork::from(U256::from(
genesis_block
.header
.difficulty_threshold
.to_work()
.ok_or_else(|| {
InitError::InvalidNodeData(Box::new(InvalidData(
"Invalid work field of genesis block".to_string(),
)))
})?
.as_u128(),
));
Self::create_indexed_block_with_optional_roots(
genesis_block.as_ref(),
&tree_roots,
genesis_work,
network.clone(),
)
.map_err(|e| InitError::InvalidNodeData(Box::new(InvalidData(e))))
}
async fn resolve_initial_block(
source: &Source,
network: &Network,
start_block: Option<IndexedBlock>,
) -> Result<IndexedBlock, InitError> {
match start_block {
Some(block) => Ok(block),
None => Self::get_genesis_indexed_block(source, network).await,
}
}
async fn setup_listener(
source: &Source,
) -> Option<
Mutex<
tokio::sync::mpsc::Receiver<(zebra_chain::block::Hash, Arc<zebra_chain::block::Block>)>,
>,
> {
source
.nonfinalized_listener()
.await
.ok()
.flatten()
.map(Mutex::new)
}
#[instrument(name = "NonFinalizedState::sync", skip(self, finalized_db))]
pub(super) async fn sync(
&self,
finalized_db: Arc<ZainoDB>,
chain_height: Height,
) -> Result<(), SyncError> {
let mut initial_state = self.get_snapshot();
let local_finalized_tip = finalized_db.to_reader().db_height().await?;
if Some(initial_state.best_tip.height) < local_finalized_tip {
self.current.swap(Arc::new(
NonfinalizedBlockCacheSnapshot::from_initial_block(
finalized_db
.to_reader()
.get_chain_block_by_height(
local_finalized_tip.expect("known to be some due to above if"),
)
.await?
.ok_or(FinalisedStateError::DataUnavailable(format!(
"Missing block {}",
local_finalized_tip.unwrap().0
)))?,
),
));
initial_state = self.get_snapshot()
}
let mut working_snapshot = initial_state.as_ref().clone();
while let Some(block) = self
.source
.get_block(HashOrHeight::Height(zebra_chain::block::Height(
u32::from(working_snapshot.best_tip.height) + 1,
)))
.await
.map_err(|e| {
SyncError::ValidatorConnectionError(NodeConnectionError::UnrecoverableError(
Box::new(e),
))
})?
{
if u32::from(working_snapshot.best_tip.height) + 1 > u32::from(chain_height) {
break;
}
let parent_hash = BlockHash::from(block.header.previous_block_hash);
if parent_hash == working_snapshot.best_tip.hash {
let prev_block = working_snapshot
.blocks
.get(&working_snapshot.best_tip.hash)
.ok_or_else(|| {
SyncError::ReorgFailure(format!(
"found blocks {:?}, expected block {:?}",
working_snapshot
.blocks
.values()
.map(|block| (block.context.index.hash, block.context.index.height))
.collect::<Vec<_>>(),
working_snapshot.best_tip
))
})?;
let chainblock = self.block_to_chainblock(prev_block, &block).await?;
info!(
height = (working_snapshot.best_tip.height + 1).0,
hash = %chainblock.context.index.hash,
"Syncing block"
);
working_snapshot.add_block_new_chaintip(chainblock);
} else {
self.handle_reorg(&mut working_snapshot, block.as_ref())
.await?;
}
if initial_state.best_tip.height + NON_FINALIZED_DEPTH
< working_snapshot.best_tip.height
{
self.update(finalized_db.clone(), initial_state, working_snapshot)
.await?;
initial_state = self.current.load_full();
working_snapshot = initial_state.as_ref().clone();
}
}
self.handle_nfs_change_listener(&mut working_snapshot)
.await?;
self.update(finalized_db.clone(), initial_state, working_snapshot)
.await?;
Ok(())
}
async fn handle_reorg(
&self,
working_snapshot: &mut NonfinalizedBlockCacheSnapshot,
block: &impl Block,
) -> Result<IndexedBlock, SyncError> {
let prev_block = match working_snapshot
.get_block_by_hash_bytes_in_serialized_order(block.prev_hash_bytes_serialized_order())
.cloned()
{
Some(prev_block) => {
if !working_snapshot
.heights_to_hashes
.values()
.any(|hash| hash == prev_block.hash())
{
Box::pin(self.handle_reorg(working_snapshot, &prev_block)).await?
} else {
prev_block
}
}
None => {
let prev_block = self
.source
.get_block(HashOrHeight::Hash(
zebra_chain::block::Hash::from_bytes_in_serialized_order(
block.prev_hash_bytes_serialized_order(),
),
))
.await
.map_err(|e| {
SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(e)),
)
})?
.ok_or(SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(MissingBlockError(
"zebrad missing block in best chain".to_string(),
))),
))?;
Box::pin(self.handle_reorg(working_snapshot, &*prev_block)).await?
}
};
let indexed_block = block.to_indexed_block(&prev_block, self).await?;
working_snapshot.add_block_new_chaintip(indexed_block.clone());
Ok(indexed_block)
}
async fn handle_nfs_change_listener(
&self,
working_snapshot: &mut NonfinalizedBlockCacheSnapshot,
) -> Result<(), SyncError> {
let Some(ref listener) = self.nfs_change_listener else {
return Ok(());
};
let Some(mut listener) = listener.try_lock() else {
warn!("Error fetching non-finalized change listener");
return Err(SyncError::CompetingSyncProcess);
};
loop {
match listener.try_recv() {
Ok((hash, block)) => {
if !self
.current
.load()
.blocks
.contains_key(&types::BlockHash(hash.0))
{
self.add_nonbest_block(working_snapshot, &*block).await?;
}
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(e @ mpsc::error::TryRecvError::Disconnected) => {
return Err(SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(e)),
))
}
}
}
Ok(())
}
pub(super) async fn update(
&self,
finalized_db: Arc<ZainoDB>,
initial_state: Arc<NonfinalizedBlockCacheSnapshot>,
mut new_snapshot: NonfinalizedBlockCacheSnapshot,
) -> Result<(), UpdateError> {
let finalized_height = finalized_db
.to_reader()
.db_height()
.await
.map_err(|_e| UpdateError::FinalizedStateCorruption)?
.unwrap_or(Height(0));
new_snapshot.remove_finalized_blocks(finalized_height);
let best_block = &new_snapshot
.blocks
.values()
.max_by_key(|block| block.chainwork())
.cloned()
.expect("empty snapshot impossible");
self.handle_reorg(&mut new_snapshot, best_block)
.await
.map_err(|_e| UpdateError::DatabaseHole)?;
let stored = self
.current
.compare_and_swap(&initial_state, Arc::new(new_snapshot));
if Arc::ptr_eq(&stored, &initial_state) {
let stale_best_tip = initial_state.best_tip;
let new_best_tip = stored.best_tip;
if new_best_tip != stale_best_tip {
if new_best_tip.height > stale_best_tip.height {
info!(
old_height = stale_best_tip.height.0,
new_height = new_best_tip.height.0,
old_hash = %stale_best_tip.hash,
new_hash = %new_best_tip.hash,
"Non-finalized tip advanced"
);
} else if new_best_tip.height == stale_best_tip.height
&& new_best_tip.hash != stale_best_tip.hash
{
info!(
height = new_best_tip.height.0,
old_hash = %stale_best_tip.hash,
new_hash = %new_best_tip.hash,
"Non-finalized tip reorg"
);
} else if new_best_tip.height < stale_best_tip.height {
info!(
old_height = stale_best_tip.height.0,
new_height = new_best_tip.height.0,
old_hash = %stale_best_tip.hash,
new_hash = %new_best_tip.hash,
"Non-finalized tip rollback"
);
}
}
Ok(())
} else {
Err(UpdateError::StaleSnapshot)
}
}
pub(super) fn get_snapshot(&self) -> Arc<NonfinalizedBlockCacheSnapshot> {
self.current.load_full()
}
async fn block_to_chainblock(
&self,
prev_block: &IndexedBlock,
block: &zebra_chain::block::Block,
) -> Result<IndexedBlock, SyncError> {
let tree_roots = self
.get_tree_roots_from_source(block.hash().into())
.await
.map_err(|e| {
SyncError::ValidatorConnectionError(NodeConnectionError::UnrecoverableError(
Box::new(InvalidData(format!("{}", e))),
))
})?;
Self::create_indexed_block_with_optional_roots(
block,
&tree_roots,
*prev_block.chainwork(),
self.network.clone(),
)
.map_err(|e| {
SyncError::ValidatorConnectionError(NodeConnectionError::UnrecoverableError(Box::new(
InvalidData(e),
)))
})
}
async fn get_tree_roots_from_source(
&self,
block_hash: BlockHash,
) -> Result<TreeRootData, super::source::BlockchainSourceError> {
let (sapling_root_and_len, orchard_root_and_len) =
self.source.get_commitment_tree_roots(block_hash).await?;
Ok(TreeRootData {
sapling: sapling_root_and_len,
orchard: orchard_root_and_len,
})
}
fn create_indexed_block_with_optional_roots(
block: &zebra_chain::block::Block,
tree_roots: &TreeRootData,
parent_chainwork: ChainWork,
network: Network,
) -> Result<IndexedBlock, String> {
let (sapling_root, sapling_size, orchard_root, orchard_size) =
tree_roots.clone().extract_with_defaults();
let metadata = BlockMetadata::new(
sapling_root,
sapling_size as u32,
orchard_root,
orchard_size as u32,
parent_chainwork,
network,
);
let block_with_metadata = BlockWithMetadata::new(block, metadata);
IndexedBlock::try_from(block_with_metadata)
}
async fn add_nonbest_block(
&self,
working_snapshot: &mut NonfinalizedBlockCacheSnapshot,
block: &impl Block,
) -> Result<IndexedBlock, SyncError> {
let prev_block = match working_snapshot
.get_block_by_hash_bytes_in_serialized_order(block.prev_hash_bytes_serialized_order())
.cloned()
{
Some(block) => block,
None => {
let prev_block = self
.source
.get_block(HashOrHeight::Hash(
zebra_chain::block::Hash::from_bytes_in_serialized_order(
block.prev_hash_bytes_serialized_order(),
),
))
.await
.map_err(|e| {
SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(e)),
)
})?
.ok_or(SyncError::ValidatorConnectionError(
NodeConnectionError::UnrecoverableError(Box::new(MissingBlockError(
"zebrad missing block".to_string(),
))),
))?;
Box::pin(self.add_nonbest_block(working_snapshot, &*prev_block)).await?
}
};
let indexed_block = block.to_indexed_block(&prev_block, self).await?;
working_snapshot
.blocks
.insert(*indexed_block.hash(), indexed_block.clone());
Ok(indexed_block)
}
}
pub enum UpdateError {
ReceiverDisconnected,
StaleSnapshot,
FinalizedStateCorruption,
DatabaseHole,
ValidatorConnectionError(Box<dyn std::error::Error>),
}
trait Block {
fn hash_bytes_serialized_order(&self) -> [u8; 32];
fn prev_hash_bytes_serialized_order(&self) -> [u8; 32];
async fn to_indexed_block<Source: BlockchainSource>(
&self,
prev_block: &IndexedBlock,
nfs: &NonFinalizedState<Source>,
) -> Result<IndexedBlock, SyncError>;
}
impl Block for IndexedBlock {
fn hash_bytes_serialized_order(&self) -> [u8; 32] {
self.hash().0
}
fn prev_hash_bytes_serialized_order(&self) -> [u8; 32] {
self.context.parent_hash.0
}
async fn to_indexed_block<Source: BlockchainSource>(
&self,
_prev_block: &IndexedBlock,
_nfs: &NonFinalizedState<Source>,
) -> Result<IndexedBlock, SyncError> {
Ok(self.clone())
}
}
impl Block for zebra_chain::block::Block {
fn hash_bytes_serialized_order(&self) -> [u8; 32] {
self.hash().bytes_in_serialized_order()
}
fn prev_hash_bytes_serialized_order(&self) -> [u8; 32] {
self.header.previous_block_hash.bytes_in_serialized_order()
}
async fn to_indexed_block<Source: BlockchainSource>(
&self,
prev_block: &IndexedBlock,
nfs: &NonFinalizedState<Source>,
) -> Result<IndexedBlock, SyncError> {
nfs.block_to_chainblock(prev_block, self).await
}
}