use std::{collections::BTreeMap, sync::Arc};
use chrono::{DateTime, Utc};
use zebra_chain::{
amount::{Amount, NonNegative},
block::{self, Block, ChainHistoryMmrRootHash},
block_info::BlockInfo,
orchard,
parameters::Network,
sapling,
serialization::DateTime32,
subtree::{NoteCommitmentSubtreeData, NoteCommitmentSubtreeIndex},
transaction::{self, Transaction},
transparent,
value_balance::ValueBalance,
};
use zebra_chain::work::difficulty::CompactDifficulty;
#[allow(unused_imports)]
use crate::{ReadRequest, Request};
use crate::{service::read::AddressUtxos, NonFinalizedState, TransactionLocation, WatchReceiver};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Response {
Committed(block::Hash),
Invalidated(block::Hash),
Reconsidered(Vec<block::Hash>),
Depth(Option<u32>),
Tip(Option<(block::Height, block::Hash)>),
BlockLocator(Vec<block::Hash>),
Transaction(Option<Arc<Transaction>>),
AnyChainTransaction(Option<AnyTx>),
UnspentBestChainUtxo(Option<transparent::Utxo>),
Block(Option<Arc<Block>>),
BlockAndSize(Option<(Arc<Block>, usize)>),
BlockHeader {
header: Arc<block::Header>,
hash: block::Hash,
height: block::Height,
next_block_hash: Option<block::Hash>,
},
Utxo(transparent::Utxo),
BlockHashes(Vec<block::Hash>),
BlockHeaders(Vec<block::CountedHeader>),
ValidBestChainTipNullifiersAndAnchors,
BestChainNextMedianTimePast(DateTime32),
BlockHash(Option<block::Hash>),
KnownBlock(Option<KnownBlock>),
ValidBlockProposal,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum KnownBlock {
Finalized,
BestChain,
SideChain,
WriteChannel,
Queue,
}
impl std::fmt::Display for KnownBlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KnownBlock::Finalized => write!(f, "finalized state"),
KnownBlock::BestChain => write!(f, "best chain"),
KnownBlock::SideChain => write!(f, "side chain"),
KnownBlock::WriteChannel => write!(f, "block write channel"),
KnownBlock::Queue => write!(f, "validation/commit queue"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AnyTx {
Mined(MinedTx),
Side((Arc<Transaction>, block::Hash)),
}
impl From<AnyTx> for Arc<Transaction> {
fn from(any_tx: AnyTx) -> Self {
match any_tx {
AnyTx::Mined(mined_tx) => mined_tx.tx,
AnyTx::Side((tx, _)) => tx,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MinedTx {
pub tx: Arc<Transaction>,
pub height: block::Height,
pub confirmations: u32,
pub block_time: DateTime<Utc>,
}
impl MinedTx {
pub fn new(
tx: Arc<Transaction>,
height: block::Height,
confirmations: u32,
block_time: DateTime<Utc>,
) -> Self {
Self {
tx,
height,
confirmations,
block_time,
}
}
}
const NON_FINALIZED_STATE_CHANGE_BUFFER_SIZE: usize = 1_000;
#[derive(Clone, Debug)]
pub struct NonFinalizedBlocksListener(
pub Arc<tokio::sync::mpsc::Receiver<(zebra_chain::block::Hash, Arc<zebra_chain::block::Block>)>>,
);
impl NonFinalizedBlocksListener {
pub fn spawn(
network: Network,
mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(NON_FINALIZED_STATE_CHANGE_BUFFER_SIZE);
tokio::spawn(async move {
let mut prev_non_finalized_state = NonFinalizedState::new(&network);
loop {
let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
let new_blocks = latest_non_finalized_state
.chain_iter()
.flat_map(|chain| {
let mut new_blocks: Vec<_> = chain
.blocks
.values()
.rev()
.take_while(|cv_block| {
!prev_non_finalized_state.any_chain_contains(&cv_block.hash)
})
.collect();
new_blocks.reverse();
new_blocks
})
.map(|cv_block| (cv_block.hash, cv_block.block.clone()));
for new_block_with_hash in new_blocks {
if sender.send(new_block_with_hash).await.is_err() {
tracing::debug!("non-finalized blocks receiver closed, ending task");
return;
}
}
prev_non_finalized_state = latest_non_finalized_state;
if let Err(error) = non_finalized_state_receiver.changed().await {
warn!(
?error,
"non-finalized state receiver closed, is Zebra shutting down?"
);
break;
}
}
});
Self(Arc::new(receiver))
}
pub fn unwrap(
self,
) -> tokio::sync::mpsc::Receiver<(zebra_chain::block::Hash, Arc<zebra_chain::block::Block>)>
{
Arc::try_unwrap(self.0).unwrap()
}
}
impl PartialEq for NonFinalizedBlocksListener {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for NonFinalizedBlocksListener {}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ReadResponse {
UsageInfo(u64),
Tip(Option<(block::Height, block::Hash)>),
TipPoolValues {
tip_height: block::Height,
tip_hash: block::Hash,
value_balance: ValueBalance<NonNegative>,
},
BlockInfo(Option<BlockInfo>),
Depth(Option<u32>),
Block(Option<Arc<Block>>),
BlockAndSize(Option<(Arc<Block>, usize)>),
BlockHeader {
header: Arc<block::Header>,
hash: block::Hash,
height: block::Height,
next_block_hash: Option<block::Hash>,
},
Transaction(Option<MinedTx>),
AnyChainTransaction(Option<AnyTx>),
TransactionIdsForBlock(Option<Arc<[transaction::Hash]>>),
AnyChainTransactionIdsForBlock(Option<(Arc<[transaction::Hash]>, bool)>),
#[cfg(feature = "indexer")]
TransactionId(Option<transaction::Hash>),
BlockLocator(Vec<block::Hash>),
BlockHashes(Vec<block::Hash>),
BlockHeaders(Vec<block::CountedHeader>),
UnspentBestChainUtxo(Option<transparent::Utxo>),
AnyChainUtxo(Option<transparent::Utxo>),
SaplingTree(Option<Arc<sapling::tree::NoteCommitmentTree>>),
OrchardTree(Option<Arc<orchard::tree::NoteCommitmentTree>>),
SaplingSubtrees(
BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<sapling_crypto::Node>>,
),
OrchardSubtrees(
BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<orchard::tree::Node>>,
),
AddressBalance {
balance: Amount<NonNegative>,
received: u64,
},
AddressesTransactionIds(BTreeMap<TransactionLocation, transaction::Hash>),
AddressUtxos(AddressUtxos),
ValidBestChainTipNullifiersAndAnchors,
BestChainNextMedianTimePast(DateTime32),
BlockHash(Option<block::Hash>),
ChainInfo(GetBlockTemplateChainInfo),
SolutionRate(Option<u128>),
ValidBlockProposal,
TipBlockSize(Option<usize>),
NonFinalizedBlocksListener(NonFinalizedBlocksListener),
IsTransparentOutputSpent(bool),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetBlockTemplateChainInfo {
pub tip_hash: block::Hash,
pub tip_height: block::Height,
pub chain_history_root: Option<ChainHistoryMmrRootHash>,
pub expected_difficulty: CompactDifficulty,
pub cur_time: DateTime32,
pub min_time: DateTime32,
pub max_time: DateTime32,
}
impl TryFrom<ReadResponse> for Response {
type Error = &'static str;
fn try_from(response: ReadResponse) -> Result<Response, Self::Error> {
match response {
ReadResponse::Tip(height_and_hash) => Ok(Response::Tip(height_and_hash)),
ReadResponse::Depth(depth) => Ok(Response::Depth(depth)),
ReadResponse::BestChainNextMedianTimePast(median_time_past) => Ok(Response::BestChainNextMedianTimePast(median_time_past)),
ReadResponse::BlockHash(hash) => Ok(Response::BlockHash(hash)),
ReadResponse::Block(block) => Ok(Response::Block(block)),
ReadResponse::BlockAndSize(block) => Ok(Response::BlockAndSize(block)),
ReadResponse::BlockHeader {
header,
hash,
height,
next_block_hash
} => Ok(Response::BlockHeader {
header,
hash,
height,
next_block_hash
}),
ReadResponse::Transaction(tx_info) => {
Ok(Response::Transaction(tx_info.map(|tx_info| tx_info.tx)))
}
ReadResponse::AnyChainTransaction(tx) => Ok(Response::AnyChainTransaction(tx)),
ReadResponse::UnspentBestChainUtxo(utxo) => Ok(Response::UnspentBestChainUtxo(utxo)),
ReadResponse::AnyChainUtxo(_) => Err("ReadService does not track pending UTXOs. \
Manually unwrap the response, and handle pending UTXOs."),
ReadResponse::BlockLocator(hashes) => Ok(Response::BlockLocator(hashes)),
ReadResponse::BlockHashes(hashes) => Ok(Response::BlockHashes(hashes)),
ReadResponse::BlockHeaders(headers) => Ok(Response::BlockHeaders(headers)),
ReadResponse::ValidBestChainTipNullifiersAndAnchors => Ok(Response::ValidBestChainTipNullifiersAndAnchors),
ReadResponse::UsageInfo(_)
| ReadResponse::TipPoolValues { .. }
| ReadResponse::BlockInfo(_)
| ReadResponse::TransactionIdsForBlock(_)
| ReadResponse::AnyChainTransactionIdsForBlock(_)
| ReadResponse::SaplingTree(_)
| ReadResponse::OrchardTree(_)
| ReadResponse::SaplingSubtrees(_)
| ReadResponse::OrchardSubtrees(_)
| ReadResponse::AddressBalance { .. }
| ReadResponse::AddressesTransactionIds(_)
| ReadResponse::AddressUtxos(_)
| ReadResponse::ChainInfo(_)
| ReadResponse::NonFinalizedBlocksListener(_)
| ReadResponse::IsTransparentOutputSpent(_) => {
Err("there is no corresponding Response for this ReadResponse")
}
#[cfg(feature = "indexer")]
ReadResponse::TransactionId(_) => Err("there is no corresponding Response for this ReadResponse"),
ReadResponse::ValidBlockProposal => Ok(Response::ValidBlockProposal),
ReadResponse::SolutionRate(_) | ReadResponse::TipBlockSize(_) => {
Err("there is no corresponding Response for this ReadResponse")
}
}
}
}