use crate::chain_index::non_finalised_state::ChainIndexSnapshot;
use crate::chain_index::source::GetTransactionLocation;
use crate::chain_index::types::db::metadata::MempoolInfo;
use crate::chain_index::types::helpers::{BlockMetadata, BlockWithMetadata, TreeRootData};
use crate::chain_index::types::BlockIndex;
use crate::chain_index::types::{BestChainLocation, NonBestChainLocation};
use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError};
use crate::status::Status;
use crate::{
ChainWork, CompactBlockStream, NamedAtomicStatus, NonFinalizedState, StatusType, SyncError,
TxOutCompact,
};
use crate::{IndexedBlock, Outpoint, TransactionHash};
use std::collections::HashSet;
use std::{sync::Arc, time::Duration};
use arc_swap::ArcSwapOption;
use futures::{FutureExt, Stream};
use hex::FromHex as _;
use non_finalised_state::NonfinalizedBlockCacheSnapshot;
use source::{BlockchainSource, ValidatorConnector};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument};
use zaino_fetch::jsonrpsee::response::{
address_deltas::{GetAddressDeltasParams, GetAddressDeltasResponse},
chain_tips::{ChainTip, ChainTipStatus, GetChainTipsResponse},
EmptyTxOutSetInfo, GetTxOutSetInfo, GetTxOutSetInfoResponse,
};
use zaino_proto::proto::utils::{compact_block_with_pool_types, PoolTypeFilter};
use zebra_chain::parameters::ConsensusBranchId;
pub use zebra_chain::parameters::Network as ZebraNetwork;
use zebra_chain::serialization::ZcashSerialize;
use zebra_rpc::{
client::{GetAddressBalanceRequest, GetAddressTxIdsRequest},
methods::{AddressBalance, GetAddressUtxos},
};
use zebra_state::HashOrHeight;
pub mod encoding;
pub mod finalised_state;
pub mod mempool;
pub mod non_finalised_state;
pub mod source;
pub mod types;
#[cfg(test)]
mod tests;
pub(crate) const NON_FINALIZED_DEPTH: u32 = zebra_state::MAX_BLOCK_REORG_HEIGHT + 1;
pub(crate) fn finalized_height_floor(chain_tip: u32) -> crate::Height {
crate::Height(chain_tip.saturating_sub(NON_FINALIZED_DEPTH))
}
pub(crate) fn chain_tips_from_nonfinalized_snapshot(
snapshot: &NonfinalizedBlockCacheSnapshot,
) -> GetChainTipsResponse {
let parent_hashes = snapshot
.blocks
.values()
.map(|block| *block.context.parent_hash())
.collect::<HashSet<_>>();
let mut tip_hashes = snapshot
.blocks
.keys()
.filter(|hash| !parent_hashes.contains(hash))
.copied()
.collect::<HashSet<_>>();
tip_hashes.insert(snapshot.best_tip.hash);
let mut tips = tip_hashes
.into_iter()
.filter_map(|hash| snapshot.blocks.get(&hash))
.map(|block| {
let is_active_tip = block.hash() == &snapshot.best_tip.hash;
let status = if is_active_tip {
ChainTipStatus::Active
} else {
ChainTipStatus::ValidFork
};
let branchlen = if is_active_tip {
0
} else {
branch_len_to_active_chain(snapshot, block)
};
ChainTip::new(
u32::from(block.height()),
block.hash().to_rpc_hex(),
branchlen,
status,
)
})
.collect::<Vec<_>>();
tips.sort_by(|left, right| {
right
.height
.cmp(&left.height)
.then_with(|| left.hash.cmp(&right.hash))
});
tips
}
fn branch_len_to_active_chain(
snapshot: &NonfinalizedBlockCacheSnapshot,
block: &IndexedBlock,
) -> u32 {
let mut branch_len = 0;
let mut current = block;
loop {
if snapshot.heights_to_hashes.get(¤t.height()) == Some(current.hash()) {
return branch_len;
}
branch_len += 1;
let parent_hash = current.context.parent_hash();
let Some(parent) = snapshot.blocks.get(parent_hash) else {
return branch_len;
};
current = parent;
}
}
#[doc = simple_mermaid::mermaid!("chain_index_passthrough.mmd")]
pub trait ChainIndex {
type Snapshot: NonFinalizedSnapshot;
type Error;
fn snapshot_nonfinalized_state(
&self,
) -> impl std::future::Future<Output = Result<Self::Snapshot, Self::Error>>;
fn get_block_height(
&self,
snapshot: &Self::Snapshot,
hash: types::BlockHash,
) -> impl std::future::Future<Output = Result<Option<types::Height>, Self::Error>>;
fn get_block_hash(
&self,
snapshot: &Self::Snapshot,
hash: types::Height,
) -> impl std::future::Future<Output = Result<Option<types::BlockHash>, Self::Error>>;
fn get_indexed_block_by_hash(
&self,
snapshot: &Self::Snapshot,
target_hash: &types::BlockHash,
) -> impl std::future::Future<Output = Result<Option<IndexedBlock>, Self::Error>>;
fn get_indexed_block_by_height(
&self,
snapshot: &Self::Snapshot,
target_height: &types::Height,
) -> impl std::future::Future<Output = Result<Option<IndexedBlock>, Self::Error>>;
#[allow(clippy::type_complexity)]
fn get_block_range(
&self,
snapshot: &Self::Snapshot,
start: types::Height,
end: Option<types::Height>,
) -> Option<impl futures::Stream<Item = Result<Vec<u8>, Self::Error>>>;
#[allow(clippy::type_complexity)]
fn get_compact_block(
&self,
nonfinalized_snapshot: &Self::Snapshot,
height: types::Height,
pool_types: PoolTypeFilter,
) -> impl std::future::Future<
Output = Result<Option<zaino_proto::proto::compact_formats::CompactBlock>, Self::Error>,
>;
#[allow(clippy::type_complexity)]
fn get_compact_block_stream(
&self,
nonfinalized_snapshot: &Self::Snapshot,
start_height: types::Height,
end_height: types::Height,
pool_types: PoolTypeFilter,
) -> impl std::future::Future<Output = Result<Option<CompactBlockStream>, Self::Error>>;
#[allow(clippy::type_complexity)]
fn get_raw_transaction(
&self,
snapshot: &Self::Snapshot,
txid: &types::TransactionHash,
) -> impl std::future::Future<Output = Result<Option<(Vec<u8>, Option<u32>)>, Self::Error>>;
#[allow(clippy::type_complexity)]
fn get_transaction_status(
&self,
snapshot: &Self::Snapshot,
txid: &types::TransactionHash,
) -> impl std::future::Future<
Output = Result<(Option<BestChainLocation>, HashSet<NonBestChainLocation>), Self::Error>,
>;
fn get_mempool_txids(
&self,
) -> impl std::future::Future<Output = Result<Vec<types::TransactionHash>, Self::Error>>;
fn get_mempool_transactions(
&self,
exclude_list: Vec<String>,
) -> impl std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>>;
#[allow(clippy::type_complexity)]
fn get_mempool_stream(
&self,
snapshot: Option<&Self::Snapshot>,
) -> Option<impl futures::Stream<Item = Result<Vec<u8>, Self::Error>>>;
fn best_chaintip(
&self,
nonfinalized_snapshot: &Self::Snapshot,
) -> impl std::future::Future<Output = Result<BlockIndex, Self::Error>>;
fn find_fork_point(
&self,
snapshot: &Self::Snapshot,
hash: &types::BlockHash,
) -> impl std::future::Future<Output = Result<Option<(types::BlockHash, types::Height)>, Self::Error>>;
#[allow(clippy::type_complexity)]
fn get_treestate(
&self,
hash: &types::BlockHash,
) -> impl std::future::Future<Output = Result<(Option<Vec<u8>>, Option<Vec<u8>>), Self::Error>>;
fn get_subtree_roots(
&self,
pool: ShieldedPool,
start_index: u16,
max_entries: Option<u16>,
) -> impl std::future::Future<Output = Result<Vec<([u8; 32], u32)>, Self::Error>>;
fn get_address_deltas(
&self,
params: GetAddressDeltasParams,
) -> impl std::future::Future<Output = Result<GetAddressDeltasResponse, Self::Error>>;
fn get_address_balance(
&self,
address_strings: GetAddressBalanceRequest,
) -> impl std::future::Future<Output = Result<AddressBalance, Self::Error>>;
fn get_address_txids(
&self,
request: GetAddressTxIdsRequest,
) -> impl std::future::Future<Output = Result<Vec<types::TransactionHash>, Self::Error>>;
fn get_address_utxos(
&self,
address_strings: GetAddressBalanceRequest,
) -> impl std::future::Future<Output = Result<Vec<GetAddressUtxos>, Self::Error>>;
fn get_mempool_info(&self) -> impl std::future::Future<Output = MempoolInfo>;
fn get_tx_out_set_info(
&self,
) -> impl std::future::Future<Output = Result<GetTxOutSetInfoResponse, Self::Error>>;
}
#[derive(Debug)]
pub struct NodeBackedChainIndex<Source: BlockchainSource = ValidatorConnector> {
#[allow(dead_code)]
mempool: std::sync::Arc<mempool::Mempool<Source>>,
non_finalized_state: Arc<ArcSwapOption<crate::NonFinalizedState<Source>>>,
finalized_db: std::sync::Arc<finalised_state::ZainoDB>,
sync_loop_handle: Option<tokio::task::JoinHandle<Result<(), SyncError>>>,
status: NamedAtomicStatus,
network: ZebraNetwork,
source: Source,
sync_timings: SyncTimings,
cancel_token: CancellationToken,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct SyncTimings {
pub(crate) interval: Duration,
pub(crate) initial_backoff: Duration,
pub(crate) max_backoff: Duration,
pub(crate) max_consecutive_failures: u32,
}
impl Default for SyncTimings {
fn default() -> Self {
Self {
interval: Duration::from_millis(500),
initial_backoff: Duration::from_millis(250),
max_backoff: Duration::from_secs(8),
max_consecutive_failures: 10,
}
}
}
#[cfg(test)]
impl SyncTimings {
pub(crate) const fn fast() -> Self {
Self {
interval: Duration::from_millis(50),
initial_backoff: Duration::from_millis(25),
max_backoff: Duration::from_millis(800),
max_consecutive_failures: 10,
}
}
pub(crate) fn max_backoff_window(&self) -> Duration {
let mut total = Duration::ZERO;
let mut current = self.initial_backoff;
for _ in 0..self.max_consecutive_failures.saturating_sub(1) {
total += current;
current = (current * 2).min(self.max_backoff);
}
total
}
}
impl<Source: BlockchainSource> NodeBackedChainIndex<Source> {
pub async fn new(
source: Source,
config: crate::config::BlockCacheConfig,
) -> Result<Self, crate::InitError> {
Self::new_with_sync_timings(source, config, SyncTimings::default()).await
}
pub(crate) async fn new_with_sync_timings(
source: Source,
config: crate::config::BlockCacheConfig,
sync_timings: SyncTimings,
) -> Result<Self, crate::InitError> {
use futures::TryFutureExt as _;
let finalized_db =
Arc::new(finalised_state::ZainoDB::spawn(config.clone(), source.clone()).await?);
let mempool_state = mempool::Mempool::spawn(source.clone(), None)
.map_err(crate::InitError::MempoolInitialzationError)
.await?;
let mut chain_index = Self {
mempool: std::sync::Arc::new(mempool_state),
non_finalized_state: Arc::new(ArcSwapOption::empty()),
finalized_db,
sync_loop_handle: None,
status: NamedAtomicStatus::new("ChainIndex", StatusType::Spawning),
network: config.network.to_zebra_network(),
source,
sync_timings,
cancel_token: CancellationToken::new(),
};
chain_index.sync_loop_handle = Some(chain_index.start_sync_loop());
Ok(chain_index)
}
pub fn subscriber(&self) -> NodeBackedChainIndexSubscriber<Source> {
NodeBackedChainIndexSubscriber {
mempool: self.mempool.subscriber(),
non_finalized_state: self.non_finalized_state.clone(),
finalized_state: self.finalized_db.to_reader(),
status: self.status.clone(),
network: self.network.clone(),
source: self.source.clone(),
}
}
pub async fn shutdown(&self) -> Result<(), FinalisedStateError> {
self.cancel_token.cancel();
self.status.store(StatusType::Closing);
self.finalized_db.shutdown().await?;
self.mempool.close();
Ok(())
}
pub fn status(&self) -> StatusType {
let finalized_status = self.finalized_db.status();
let mempool_status = self.mempool.status();
let combined_status = self
.status
.load()
.combine(finalized_status)
.combine(mempool_status);
self.status.store(combined_status);
combined_status
}
#[instrument(name = "ChainIndex::start_sync_loop", skip(self))]
pub(super) fn start_sync_loop(&self) -> tokio::task::JoinHandle<Result<(), SyncError>> {
info!("Starting ChainIndex sync loop");
let nfs = self.non_finalized_state.clone();
let fs = self.finalized_db.clone();
let status = self.status.clone();
let source = self.source.clone();
let network = self.network.clone();
let timings = self.sync_timings;
let cancel_token = self.cancel_token.clone();
tokio::task::spawn(async move {
let status = status.clone();
let source = source.clone();
let mut change_rx = source.subscribe_to_blocks_received();
let mut consecutive_failures: u32 = 0;
let mut current_backoff = timings.initial_backoff;
loop {
let source = source.clone();
let network = network.clone();
if cancel_token.is_cancelled() {
return Ok(());
}
status.store(StatusType::Syncing);
let sync_result: Result<(), SyncError> = tokio::select! {
biased;
_ = cancel_token.cancelled() => return Ok(()),
r = async {
fn source_error(error: impl std::error::Error + Send + 'static) -> SyncError {
SyncError::ErrorFromSource(Box::new(error))
}
let chain_height = source
.clone()
.get_best_block_height()
.await
.map_err(source_error)?
.ok_or_else(|| {
source_error(std::io::Error::other(
"node returned no best block height",
))
})?;
let finalised_height = finalized_height_floor(chain_height.0);
fs.sync_to_height(finalised_height, &source)
.await
.map_err(source_error)?;
let intermediate_nfs_for_scoping = nfs.load();
let non_finalized_state = match *intermediate_nfs_for_scoping {
Some(ref nfs) => nfs,
None => {
nfs.store(Some(Arc::new(
NonFinalizedState::initialize(
source,
network,
fs.to_reader()
.get_chain_block_by_height(finalised_height)
.await
.expect("todo"),
)
.await
.expect("todo"),
)));
&nfs.load_full().expect("just set to Some")
}
};
non_finalized_state
.sync(fs.clone(), chain_height.into())
.await?;
std::mem::drop(intermediate_nfs_for_scoping);
Ok(())
} => r,
};
match sync_result {
Ok(()) => {
consecutive_failures = 0;
current_backoff = timings.initial_backoff;
status.store(StatusType::Ready);
tokio::select! {
biased;
_ = cancel_token.cancelled() => return Ok(()),
_ = source::wait_or_source_change(
change_rx.as_mut(),
timings.interval,
) => {}
}
}
Err(e) => {
consecutive_failures += 1;
if consecutive_failures >= timings.max_consecutive_failures {
tracing::error!(
"Sync loop failed {consecutive_failures} consecutive times, \
giving up: {e:?}"
);
status.store(StatusType::CriticalError);
return Err(e);
}
tracing::warn!(
"Sync loop iteration failed ({consecutive_failures}/{}), \
retrying in {current_backoff:?}: {e:?}",
timings.max_consecutive_failures
);
status.store(StatusType::RecoverableError);
tokio::select! {
biased;
_ = cancel_token.cancelled() => return Ok(()),
_ = tokio::time::sleep(current_backoff) => {}
}
current_backoff = (current_backoff * 2).min(timings.max_backoff);
}
}
}
})
}
}
impl<Source: BlockchainSource> Drop for NodeBackedChainIndex<Source> {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
#[derive(Clone, Debug)]
pub struct NodeBackedChainIndexSubscriber<Source: BlockchainSource = ValidatorConnector> {
mempool: mempool::MempoolSubscriber,
non_finalized_state: Arc<ArcSwapOption<crate::NonFinalizedState<Source>>>,
finalized_state: finalised_state::reader::DbReader,
status: NamedAtomicStatus,
network: ZebraNetwork,
source: Source,
}
async fn compact_block_from_source<Source: BlockchainSource>(
source: &Source,
network: ZebraNetwork,
height: types::Height,
pool_types: &PoolTypeFilter,
) -> Result<Option<zaino_proto::proto::compact_formats::CompactBlock>, ChainIndexError> {
let Some(block) = source
.get_block(HashOrHeight::Height(zebra_chain::block::Height(height.0)))
.await
.map_err(ChainIndexError::backing_validator)?
else {
return Ok(None);
};
let block_height = block
.coinbase_height()
.map(|height| types::Height(height.0))
.ok_or_else(|| {
ChainIndexError::backing_validator(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"validator returned a block without a height",
))
})?;
if block_height != height {
return Err(ChainIndexError::backing_validator(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"validator returned block at height {}, expected {}",
block_height.0, height.0
),
)));
}
let tree_roots = source
.get_commitment_tree_roots(types::BlockHash::from(block.hash()))
.await
.map_err(ChainIndexError::backing_validator)?;
let (sapling_root, sapling_size, orchard_root, orchard_size) =
TreeRootData::new(tree_roots.0, tree_roots.1).extract_with_defaults();
let metadata = BlockMetadata::new(
sapling_root,
sapling_size.try_into().map_err(|_| {
ChainIndexError::backing_validator(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"sapling commitment tree size overflow",
))
})?,
orchard_root,
orchard_size.try_into().map_err(|_| {
ChainIndexError::backing_validator(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"orchard commitment tree size overflow",
))
})?,
ChainWork::from_u256(0.into()),
network,
);
let indexed_block =
IndexedBlock::try_from(BlockWithMetadata::new(&block, metadata)).map_err(|error| {
ChainIndexError::backing_validator(std::io::Error::new(
std::io::ErrorKind::InvalidData,
error,
))
})?;
Ok(Some(compact_block_with_pool_types(
indexed_block.to_compact_block(),
&pool_types.to_pool_types_vector(),
)))
}
impl<Source: BlockchainSource> NodeBackedChainIndexSubscriber<Source> {
fn source(&self) -> &Source {
&self.source
}
pub fn combined_status(&self) -> StatusType {
let finalized_status = self.finalized_state.status();
let mempool_status = self.mempool.status();
let combined_status = self
.status
.load()
.combine(finalized_status)
.combine(mempool_status);
self.status.store(combined_status);
combined_status
}
async fn count_finalised_unspent_outputs(
&self,
txid: TransactionHash,
) -> Result<u64, ChainIndexError> {
let Some(tx_location) = self
.finalized_state
.get_tx_location(&txid)
.await
.map_err(|e| ChainIndexError::internal(e.to_string()))?
else {
return Ok(0);
};
let Some(transparent) = self
.finalized_state
.get_transparent(tx_location)
.await
.map_err(|e| ChainIndexError::internal(e.to_string()))?
else {
return Ok(0);
};
use crate::chain_index::types::db::metadata::is_unspendable_tx_out;
let outpoints: Vec<Outpoint> = transparent
.outputs()
.iter()
.enumerate()
.filter(|(_, out)| !is_unspendable_tx_out(out))
.map(|(i, _)| Outpoint::new(txid.0, i as u32))
.collect();
if outpoints.is_empty() {
return Ok(0);
}
let spenders = self
.finalized_state
.get_outpoint_spenders(outpoints)
.await
.map_err(|e| ChainIndexError::internal(e.to_string()))?;
Ok(spenders.into_iter().filter(|s| s.is_none()).count() as u64)
}
async fn get_fullblock_bytes_from_node(
&self,
id: HashOrHeight,
) -> Result<Option<Vec<u8>>, ChainIndexError> {
self.source()
.get_block(id)
.await
.map_err(ChainIndexError::backing_validator)?
.map(|bk| {
bk.zcash_serialize_to_vec()
.map_err(ChainIndexError::backing_validator)
})
.transpose()
}
async fn get_compact_block_from_node(
&self,
height: types::Height,
pool_types: &PoolTypeFilter,
) -> Result<Option<zaino_proto::proto::compact_formats::CompactBlock>, ChainIndexError> {
compact_block_from_source(self.source(), self.network.clone(), height, pool_types).await
}
async fn get_indexed_block_height(
&self,
snapshot: &NonfinalizedBlockCacheSnapshot,
hash: types::BlockHash,
) -> Result<Option<types::Height>, ChainIndexError> {
match snapshot.blocks.get(&hash).cloned() {
Some(block) => Ok(snapshot
.heights_to_hashes
.values()
.find(|h| **h == hash)
.map(|_| block.context.index.height)),
None => self
.finalized_state
.get_block_height(hash)
.await
.map_err(|e| ChainIndexError::database_hole(hash, Some(Box::new(e)))),
}
}
async fn blocks_containing_transaction<'snapshot, 'self_lt, 'iter>(
&'self_lt self,
snapshot: &'snapshot NonfinalizedBlockCacheSnapshot,
txid: [u8; 32],
) -> Result<impl Iterator<Item = IndexedBlock> + use<'iter, Source>, FinalisedStateError>
where
'snapshot: 'iter,
'self_lt: 'iter,
{
let finalized_blocks_containing_transaction = match self
.finalized_state
.get_tx_location(&types::TransactionHash(txid))
.await?
{
Some(tx_location) => {
self.finalized_state
.get_chain_block_by_height(crate::Height(tx_location.block_height()))
.await?
}
None => None,
}
.into_iter();
let non_finalized_blocks_containing_transaction =
snapshot.blocks.values().filter_map(move |block| {
block.transactions().iter().find_map(|transaction| {
if transaction.txid().0 == txid {
Some(block.clone())
} else {
None
}
})
});
Ok(finalized_blocks_containing_transaction
.chain(non_finalized_blocks_containing_transaction))
}
async fn get_block_height_passthrough(
&self,
max_serviceable_height: &types::Height,
hash: types::BlockHash,
) -> Result<Option<types::Height>, ChainIndexError> {
match self
.source()
.get_block(HashOrHeight::Hash(hash.into()))
.await
{
Ok(Some(block)) => {
match block.coinbase_height() {
None => {
Err(ChainIndexError::validator_data_error_block_coinbase_height_missing())
}
Some(height) => {
if height <= *max_serviceable_height {
Ok(Some(types::Height::from(height)))
} else {
Ok(None)
}
}
}
}
Ok(None) => {
Ok(None)
}
Err(e) => Err(ChainIndexError::backing_validator(e)),
}
}
fn get_mempool_height(&self, snapshot: &ChainIndexSnapshot) -> Option<types::Height> {
let ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} = snapshot
else {
return None;
};
non_finalized_snapshot
.blocks
.iter()
.find(|(hash, _block)| **hash == self.mempool.mempool_chain_tip())
.map(|(_hash, block)| block.height())
}
fn mempool_branch_id(&self, snapshot: &ChainIndexSnapshot) -> Option<u32> {
self.get_mempool_height(snapshot).and_then(|height| {
ConsensusBranchId::current(&self.network, zebra_chain::block::Height::from(height + 1))
.map(u32::from)
})
}
}
impl<Source: BlockchainSource> Status for NodeBackedChainIndexSubscriber<Source> {
fn status(&self) -> StatusType {
self.combined_status()
}
}
impl<Source: BlockchainSource> ChainIndex for NodeBackedChainIndexSubscriber<Source> {
type Snapshot = ChainIndexSnapshot;
type Error = ChainIndexError;
async fn snapshot_nonfinalized_state(&self) -> Result<Self::Snapshot, Self::Error> {
match self.non_finalized_state.load().as_ref() {
Some(non_finalised_state) => Ok(ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot: non_finalised_state.get_snapshot(),
}),
None => {
let height = self
.source
.get_best_block_height()
.await
.map_err(ChainIndexError::backing_validator)?
.ok_or(ChainIndexError::database_hole(
"validator has no best block",
None,
))?;
let validator_finalized_height = finalized_height_floor(height.0);
Ok(ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
})
}
}
}
async fn get_block_height(
&self,
snapshot: &Self::Snapshot,
hash: types::BlockHash,
) -> Result<Option<types::Height>, Self::Error> {
match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => {
self.get_indexed_block_height(non_finalized_snapshot, hash)
.await
}
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => {
self.get_block_height_passthrough(validator_finalized_height, hash)
.await
} }
}
async fn get_block_hash(
&self,
snapshot: &Self::Snapshot,
height: types::Height,
) -> Result<Option<types::BlockHash>, Self::Error> {
match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => match non_finalized_snapshot
.heights_to_hashes
.get(&height)
.copied()
{
Some(block_hash) => Ok(Some(block_hash)),
None => self
.finalized_state
.get_block_hash(height)
.await
.map_err(Into::into),
},
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => {
if height <= *validator_finalized_height {
match self
.source()
.get_block(HashOrHeight::Height(height.into()))
.await
.map_err(ChainIndexError::backing_validator)?
{
Some(block) => Ok(Some(block.hash().into())),
None => Ok(None),
}
} else {
Ok(None)
}
}
}
}
async fn get_indexed_block_by_hash(
&self,
snapshot: &Self::Snapshot,
target_hash: &types::BlockHash,
) -> Result<Option<IndexedBlock>, Self::Error> {
match snapshot.get_chainblock_by_hash(target_hash) {
Some(block) => Ok(Some(block.clone())),
None => match self.get_block_height(snapshot, *target_hash).await {
Ok(Some(height)) => Ok(self
.finalized_state
.get_chain_block_by_height(height)
.await?),
Ok(None) => Ok(None),
Err(e) => Err(e),
},
}
}
async fn get_indexed_block_by_height(
&self,
snapshot: &Self::Snapshot,
target_height: &types::Height,
) -> Result<Option<IndexedBlock>, Self::Error> {
match snapshot.get_chainblock_by_height(target_height) {
Some(block) => Ok(Some(block.clone())),
None => Ok(self
.finalized_state
.get_chain_block_by_height(*target_height)
.await?),
}
}
fn get_block_range(
&self,
snapshot: &Self::Snapshot,
start: types::Height,
end: std::option::Option<types::Height>,
) -> Option<impl Stream<Item = Result<Vec<u8>, Self::Error>>> {
let end = end
.unwrap_or(*snapshot.max_serviceable_height())
.min(*snapshot.max_serviceable_height());
if start <= *snapshot.max_serviceable_height().min(&end) {
Some(
futures::stream::iter((start.0)..=(end.0)).then(move |height| async move {
match self
.finalized_state
.get_block_hash(types::Height(height))
.await
{
Ok(Some(hash)) => {
return self
.get_fullblock_bytes_from_node(HashOrHeight::Hash(hash.into()))
.await?
.ok_or(ChainIndexError::database_hole(hash, None))
}
Err(e) => Err(ChainIndexError {
kind: ChainIndexErrorKind::InternalServerError,
message: "".to_string(),
source: Some(Box::new(e)),
}),
Ok(None) => {
match snapshot.get_chainblock_by_height(&types::Height(height)) {
Some(block) => {
return self
.get_fullblock_bytes_from_node(HashOrHeight::Hash(
(*block.hash()).into(),
))
.await?
.ok_or(ChainIndexError::database_hole(block.hash(), None))
}
None => self
.get_fullblock_bytes_from_node(HashOrHeight::Height(
zebra_chain::block::Height(height),
))
.await?
.ok_or(ChainIndexError::database_hole(height, None)),
}
}
}
}),
)
} else {
None
}
}
async fn get_compact_block(
&self,
snapshot: &Self::Snapshot,
height: types::Height,
pool_types: PoolTypeFilter,
) -> Result<Option<zaino_proto::proto::compact_formats::CompactBlock>, Self::Error> {
match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => {
if height <= non_finalized_snapshot.best_tip.height {
Ok(Some(match snapshot.get_chainblock_by_height(&height) {
Some(block) => compact_block_with_pool_types(
block.to_compact_block(),
&pool_types.to_pool_types_vector(),
),
None => {
match self
.finalized_state
.get_compact_block(height, pool_types.clone())
.await
{
Ok(block) => block,
Err(_) => self
.get_compact_block_from_node(height, &pool_types)
.await?
.ok_or(ChainIndexError::database_hole(height, None))?,
}
}
}))
} else {
Ok(None)
}
}
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height: _,
} => Ok(None),
}
}
#[allow(clippy::type_complexity)]
async fn get_compact_block_stream(
&self,
nonfinalized_snapshot: &Self::Snapshot,
start_height: types::Height,
end_height: types::Height,
pool_types: PoolTypeFilter,
) -> Result<Option<CompactBlockStream>, Self::Error> {
let chain_tip_height = self.best_chaintip(nonfinalized_snapshot).await?.height;
let lowest_nonfinalized_height = types::Height(chain_tip_height.0.saturating_sub(99));
let is_ascending = start_height <= end_height;
if !is_ascending && start_height > chain_tip_height {
return Ok(None);
}
let pool_types_vector = pool_types.to_pool_types_vector();
let needs_out_of_range = is_ascending && end_height > chain_tip_height;
let capped_end_height = if needs_out_of_range {
chain_tip_height
} else {
end_height
};
let finalized_stream: Option<CompactBlockStream> = if is_ascending {
if start_height < lowest_nonfinalized_height {
let finalized_end_height = types::Height(std::cmp::min(
capped_end_height.0,
lowest_nonfinalized_height.0.saturating_sub(1),
));
if start_height <= finalized_end_height {
Some(
self.finalized_state
.get_compact_block_stream(
start_height,
finalized_end_height,
pool_types.clone(),
)
.await
.map_err(ChainIndexError::from)?,
)
} else {
None
}
} else {
None
}
} else if end_height < lowest_nonfinalized_height {
let finalized_start_height = if start_height < lowest_nonfinalized_height {
start_height
} else {
types::Height(lowest_nonfinalized_height.0.saturating_sub(1))
};
Some(
self.finalized_state
.get_compact_block_stream(
finalized_start_height,
end_height,
pool_types.clone(),
)
.await
.map_err(ChainIndexError::from)?,
)
} else {
None
};
let nonfinalized_snapshot = nonfinalized_snapshot.clone();
let source = self.source.clone();
let network = self.network.clone();
let pool_types_for_node = pool_types.clone();
let (channel_sender, channel_receiver) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
if is_ascending {
if let Some(mut finalized_stream) = finalized_stream {
while let Some(stream_item) = finalized_stream.next().await {
if channel_sender.send(stream_item).await.is_err() {
return;
}
}
}
let nonfinalized_start_height =
types::Height(std::cmp::max(start_height.0, lowest_nonfinalized_height.0));
for height_value in nonfinalized_start_height.0..=capped_end_height.0 {
let Some(indexed_block) = nonfinalized_snapshot
.get_chainblock_by_height(&types::Height(height_value))
else {
match compact_block_from_source(
&source,
network.clone(),
types::Height(height_value),
&pool_types_for_node,
)
.await
{
Ok(Some(compact_block)) => {
if channel_sender.send(Ok(compact_block)).await.is_err() {
return;
}
continue;
}
Ok(None) => {
let _ = channel_sender
.send(Err(tonic::Status::internal(format!(
"Internal error, missing nonfinalized block at height [{height_value}].",
))))
.await;
return;
}
Err(error) => {
let _ = channel_sender
.send(Err(tonic::Status::internal(error.to_string())))
.await;
return;
}
}
};
let compact_block = compact_block_with_pool_types(
indexed_block.to_compact_block(),
&pool_types_vector,
);
if channel_sender.send(Ok(compact_block)).await.is_err() {
return;
}
}
if needs_out_of_range {
let _ = channel_sender
.send(Err(tonic::Status::out_of_range(format!(
"Error: Height out of range [{}]. Height requested is greater than the best chain tip [{}].",
end_height.0, chain_tip_height.0,
))))
.await;
}
} else {
if start_height >= lowest_nonfinalized_height {
let nonfinalized_end_height =
types::Height(std::cmp::max(end_height.0, lowest_nonfinalized_height.0));
for height_value in (nonfinalized_end_height.0..=start_height.0).rev() {
let Some(indexed_block) = nonfinalized_snapshot
.get_chainblock_by_height(&types::Height(height_value))
else {
match compact_block_from_source(
&source,
network.clone(),
types::Height(height_value),
&pool_types_for_node,
)
.await
{
Ok(Some(compact_block)) => {
if channel_sender.send(Ok(compact_block)).await.is_err() {
return;
}
continue;
}
Ok(None) => {
let _ = channel_sender
.send(Err(tonic::Status::internal(format!(
"Internal error, missing nonfinalized block at height [{height_value}].",
))))
.await;
return;
}
Err(error) => {
let _ = channel_sender
.send(Err(tonic::Status::internal(error.to_string())))
.await;
return;
}
}
};
let compact_block = compact_block_with_pool_types(
indexed_block.to_compact_block(),
&pool_types_vector,
);
if channel_sender.send(Ok(compact_block)).await.is_err() {
return;
}
}
}
if let Some(mut finalized_stream) = finalized_stream {
while let Some(stream_item) = finalized_stream.next().await {
if channel_sender.send(stream_item).await.is_err() {
return;
}
}
}
}
});
Ok(Some(CompactBlockStream::new(channel_receiver)))
}
async fn get_raw_transaction(
&self,
snapshot: &Self::Snapshot,
txid: &types::TransactionHash,
) -> Result<Option<(Vec<u8>, Option<u32>)>, Self::Error> {
if let Some(mempool_tx) = self
.mempool
.get_transaction(&mempool::MempoolKey {
txid: txid.to_rpc_hex(),
})
.await
{
let bytes = mempool_tx.serialized_tx.as_ref().as_ref().to_vec();
let mempool_branch_id = self.mempool_branch_id(snapshot);
return Ok(Some((bytes, mempool_branch_id)));
}
let Some((transaction, location)) = self
.source()
.get_transaction(*txid)
.await
.map_err(ChainIndexError::backing_validator)?
else {
return Ok(None);
};
let height = match location {
GetTransactionLocation::BestChain(height) => height,
GetTransactionLocation::NonbestChain => {
let Some(non_finalized_snapshot) = snapshot.get_nfs_snapshot() else {
return Ok(None);
};
match self
.blocks_containing_transaction(non_finalized_snapshot, txid.0)
.await?
.next()
{
Some(block) => block.context.index.height.into(),
None => return Ok(None),
}
}
GetTransactionLocation::Mempool => return Ok(None),
};
Ok(Some((
zebra_chain::transaction::SerializedTransaction::from(transaction)
.as_ref()
.to_vec(),
ConsensusBranchId::current(&self.network, height).map(u32::from),
)))
}
async fn get_transaction_status(
&self,
snapshot: &Self::Snapshot,
txid: &types::TransactionHash,
) -> Result<(Option<BestChainLocation>, HashSet<NonBestChainLocation>), ChainIndexError> {
match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => {
let blocks_containing_transaction = self
.blocks_containing_transaction(non_finalized_snapshot, txid.0)
.await?
.collect::<Vec<_>>();
let Some(start_of_nonfinalized) =
non_finalized_snapshot.heights_to_hashes.keys().min()
else {
return Err(ChainIndexError::database_hole("no blocks", None));
};
let mut best_chain_block = blocks_containing_transaction
.iter()
.find(|block| {
non_finalized_snapshot
.heights_to_hashes
.get(&block.height())
== Some(block.hash())
|| block.height() < *start_of_nonfinalized
})
.map(|block| BestChainLocation::Block(*block.hash(), block.height()));
let mut non_best_chain_blocks: HashSet<NonBestChainLocation> =
blocks_containing_transaction
.iter()
.filter(|block| {
non_finalized_snapshot
.heights_to_hashes
.get(&block.height())
!= Some(block.hash())
&& block.height() >= *start_of_nonfinalized
})
.map(|block| NonBestChainLocation::Block(*block.hash(), block.height()))
.collect();
let in_mempool = self
.mempool
.contains_txid(&mempool::MempoolKey {
txid: txid.to_rpc_hex(),
})
.await;
if in_mempool {
let mempool_tip_hash = self.mempool.mempool_chain_tip();
if mempool_tip_hash == non_finalized_snapshot.best_tip.hash {
if best_chain_block.is_some() {
return Err(ChainIndexError {
kind: ChainIndexErrorKind::InvalidSnapshot,
message:
"Best chain and up-to-date mempool both contain the same transaction"
.to_string(),
source: None,
});
} else {
best_chain_block = Some(BestChainLocation::Mempool(
non_finalized_snapshot.best_tip.height + 1,
));
}
} else {
if let ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot: new_snapshot,
} = self.snapshot_nonfinalized_state().await?
{
let target_height =
new_snapshot.blocks.iter().find_map(|(hash, block)| {
if *hash == mempool_tip_hash {
Some(block.height() + 1)
} else {
None
}
});
non_best_chain_blocks
.insert(NonBestChainLocation::Mempool(target_height));
}
}
}
Ok((best_chain_block, non_best_chain_blocks))
}
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => {
if let Some((_transaction, GetTransactionLocation::BestChain(height))) = self
.source()
.get_transaction(*txid)
.await
.map_err(ChainIndexError::backing_validator)?
{
if height <= *validator_finalized_height {
if let Some(block) = self
.source()
.get_block(HashOrHeight::Height(height))
.await
.map_err(ChainIndexError::backing_validator)?
{
return Ok((
Some(BestChainLocation::Block(block.hash().into(), height.into())),
HashSet::new(),
));
}
}
}
Ok((None, HashSet::new()))
}
}
}
async fn get_mempool_txids(&self) -> Result<Vec<types::TransactionHash>, Self::Error> {
self.mempool
.get_mempool()
.await
.into_iter()
.map(|(txid_key, _)| {
TransactionHash::from_hex(&txid_key.txid)
.map_err(ChainIndexError::backing_validator)
})
.collect::<Result<_, _>>()
}
async fn get_mempool_transactions(
&self,
exclude_list: Vec<String>,
) -> Result<Vec<Vec<u8>>, Self::Error> {
let pairs: Vec<(mempool::MempoolKey, mempool::MempoolValue)> =
self.mempool.get_filtered_mempool(exclude_list).await;
let bytes: Vec<Vec<u8>> = pairs
.into_iter()
.map(|(_, v)| v.serialized_tx.as_ref().as_ref().to_vec())
.collect();
Ok(bytes)
}
fn get_mempool_stream(
&self,
snapshot: Option<&Self::Snapshot>,
) -> Option<impl futures::Stream<Item = Result<Vec<u8>, Self::Error>>> {
let non_finalized_snapshot = match snapshot {
Some(s) => match s {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => Some(non_finalized_snapshot),
ChainIndexSnapshot::StillSyncingFinalizedState { .. } => return None,
},
None => None,
};
let expected_chain_tip = non_finalized_snapshot.map(|snapshot| snapshot.best_tip.hash);
let mut subscriber = self.mempool.clone();
match subscriber
.get_mempool_stream(expected_chain_tip)
.now_or_never()
{
Some(Ok((in_rx, _handle))) => {
let (out_tx, out_rx) =
tokio::sync::mpsc::channel::<Result<Vec<u8>, ChainIndexError>>(32);
tokio::spawn(async move {
let mut in_stream = tokio_stream::wrappers::ReceiverStream::new(in_rx);
while let Some(item) = in_stream.next().await {
match item {
Ok((_key, value)) => {
let _ = out_tx
.send(Ok(value.serialized_tx.as_ref().as_ref().to_vec()))
.await;
}
Err(e) => {
let _ = out_tx
.send(Err(ChainIndexError::child_process_status_error(
"mempool", e,
)))
.await;
break;
}
}
}
});
Some(tokio_stream::wrappers::ReceiverStream::new(out_rx))
}
Some(Err(crate::error::MempoolError::IncorrectChainTip { .. })) => None,
Some(Err(e)) => {
let (out_tx, out_rx) =
tokio::sync::mpsc::channel::<Result<Vec<u8>, ChainIndexError>>(1);
let _ = out_tx.try_send(Err(e.into()));
Some(tokio_stream::wrappers::ReceiverStream::new(out_rx))
}
None => {
let (out_tx, out_rx) =
tokio::sync::mpsc::channel::<Result<Vec<u8>, ChainIndexError>>(1);
let _ = out_tx.try_send(Err(ChainIndexError::child_process_status_error(
"mempool",
crate::error::StatusError {
server_status: crate::StatusType::RecoverableError,
},
)));
Some(tokio_stream::wrappers::ReceiverStream::new(out_rx))
}
}
}
async fn find_fork_point(
&self,
snapshot: &Self::Snapshot,
hash: &types::BlockHash,
) -> Result<Option<(types::BlockHash, types::Height)>, Self::Error> {
match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => {
match non_finalized_snapshot.get_chainblock_by_hash(hash) {
Some(block) => {
if non_finalized_snapshot
.heights_to_hashes
.get(&block.height())
== Some(block.hash())
{
Ok(Some((*block.hash(), block.height())))
} else {
Box::pin(self.find_fork_point(snapshot, &block.context.parent_hash))
.await
}
}
None => {
match self.finalized_state.get_block_height(*hash).await {
Ok(Some(height)) => {
Ok(Some((*hash, height)))
}
Err(e) => Err(ChainIndexError::database_hole(hash, Some(Box::new(e)))),
Ok(None) => Ok(None),
}
}
}
}
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => {
match self
.source()
.get_block(HashOrHeight::Hash(zebra_chain::block::Hash::from(*hash)))
.await
{
Ok(Some(block)) => {
match block.coinbase_height() {
None => {
Err(ChainIndexError::validator_data_error_block_coinbase_height_missing())
}
Some(height) => {
if height <= *validator_finalized_height {
Ok(Some((
types::BlockHash::from(block.hash()),
types::Height::from(height),
)))
} else {
Ok(None)
}
}
}
}
Ok(None) => {
Ok(None)
}
Err(e) => Err(ChainIndexError::backing_validator(e)),
}
}
}
}
async fn get_treestate(
&self,
hash: &types::BlockHash,
) -> Result<(Option<Vec<u8>>, Option<Vec<u8>>), Self::Error> {
match self.source().get_treestate(*hash).await {
Ok(resp) => Ok(resp),
Err(e) => Err(ChainIndexError {
kind: ChainIndexErrorKind::InternalServerError,
message: "failed to fetch treestate from validator".to_string(),
source: Some(Box::new(e)),
}),
}
}
async fn get_subtree_roots(
&self,
pool: ShieldedPool,
start_index: u16,
max_entries: Option<u16>,
) -> Result<Vec<([u8; 32], u32)>, Self::Error> {
self.source()
.get_subtree_roots(pool, start_index, max_entries)
.await
.map_err(ChainIndexError::backing_validator)
}
async fn get_address_deltas(
&self,
params: GetAddressDeltasParams,
) -> Result<GetAddressDeltasResponse, Self::Error> {
self.source()
.get_address_deltas(params)
.await
.map_err(ChainIndexError::backing_validator)
}
async fn get_address_balance(
&self,
address_strings: GetAddressBalanceRequest,
) -> Result<AddressBalance, Self::Error> {
self.source()
.get_address_balance(address_strings)
.await
.map_err(ChainIndexError::backing_validator)
}
async fn get_address_txids(
&self,
request: GetAddressTxIdsRequest,
) -> Result<Vec<types::TransactionHash>, Self::Error> {
self.source()
.get_address_txids(request)
.await
.map_err(ChainIndexError::backing_validator)
}
async fn get_address_utxos(
&self,
address_strings: GetAddressBalanceRequest,
) -> Result<Vec<GetAddressUtxos>, Self::Error> {
self.source()
.get_address_utxos(address_strings)
.await
.map_err(ChainIndexError::backing_validator)
}
async fn get_mempool_info(&self) -> MempoolInfo {
self.mempool.get_mempool_info().await
}
async fn best_chaintip(&self, snapshot: &Self::Snapshot) -> Result<BlockIndex, Self::Error> {
Ok(match snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => non_finalized_snapshot.best_tip,
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => {
BlockIndex {
height: *validator_finalized_height,
hash: self
.source()
.get_block(HashOrHeight::Height((*validator_finalized_height).into()))
.await
.map_err(|e| {
ChainIndexError::database_hole(
validator_finalized_height,
Some(Box::new(e)),
)
})?
.ok_or(ChainIndexError::database_hole(
validator_finalized_height,
None,
))?
.hash()
.into(),
}
}
})
}
async fn get_tx_out_set_info(&self) -> Result<GetTxOutSetInfoResponse, Self::Error> {
use crate::chain_index::types::db::metadata::{
is_unspendable_tx_out, ZAINO_TXOUTSET_ENTRY_LEN,
};
use hex::ToHex as _;
use std::collections::HashMap;
let snapshot = self.snapshot_nonfinalized_state().await?;
let best_tip = self.best_chaintip(&snapshot).await?;
let non_finalized_snapshot = match &snapshot {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => non_finalized_snapshot,
ChainIndexSnapshot::StillSyncingFinalizedState { .. } => {
return Ok(GetTxOutSetInfoResponse::Empty(EmptyTxOutSetInfo {}));
}
};
let mut accumulator = self
.finalized_state
.get_tx_out_set_info_accumulator()
.await
.map_err(|e| {
ChainIndexError::internal(format!(
"get_tx_out_set_info: finalised accumulator unavailable: {e}"
))
})?;
let mut nfs_created: HashMap<Outpoint, TxOutCompact> = HashMap::new();
let mut tx_unspent_count: HashMap<TransactionHash, u64> = HashMap::new();
let mut heights: Vec<types::Height> = non_finalized_snapshot
.heights_to_hashes
.keys()
.copied()
.collect();
heights.sort();
for height in heights {
let Some(block) = non_finalized_snapshot.get_chainblock_by_height(&height) else {
return Err(ChainIndexError::internal(format!(
"get_tx_out_set_info: non-finalised snapshot height {height:?} has no block"
)));
};
for tx in block.transactions() {
let txid = *tx.txid();
let transparent = tx.transparent();
for (output_index, output) in transparent.outputs().iter().enumerate() {
if is_unspendable_tx_out(output) {
continue;
}
let outpoint = Outpoint::new(txid.0, output_index as u32);
accumulator
.apply_added_output(&outpoint, output)
.map_err(|e| ChainIndexError::internal(e.to_string()))?;
nfs_created.insert(outpoint, *output);
let entry = tx_unspent_count.entry(txid).or_insert(0);
let prev = *entry;
*entry += 1;
if prev == 0 {
accumulator.transactions =
accumulator.transactions.checked_add(1).ok_or_else(|| {
ChainIndexError::internal(
"get_tx_out_set_info: transactions counter overflow"
.to_string(),
)
})?;
}
}
for input in transparent.inputs() {
if input.is_null_prevout() {
continue;
}
let outpoint = Outpoint::new(*input.prevout_txid(), input.prevout_index());
let prev_txid = TransactionHash::from(*outpoint.prev_txid());
let prev_out_from_nfs = nfs_created.remove(&outpoint);
let prev_out = match prev_out_from_nfs {
Some(out) => out,
None => self
.finalized_state
.get_previous_output(outpoint)
.await
.map_err(|e| {
ChainIndexError::internal(format!(
"get_tx_out_set_info: finalised prev output for {outpoint:?} not found: {e}"
))
})?,
};
accumulator
.apply_removed_output(&outpoint, &prev_out)
.map_err(|e| ChainIndexError::internal(e.to_string()))?;
if let std::collections::hash_map::Entry::Vacant(e) =
tx_unspent_count.entry(prev_txid)
{
let seed = self
.count_finalised_unspent_outputs(prev_txid)
.await
.map_err(|e| {
ChainIndexError::internal(format!(
"get_tx_out_set_info: cannot seed unspent counter for {prev_txid:?}: {e}"
))
})?;
e.insert(seed);
}
let entry = tx_unspent_count.get_mut(&prev_txid).expect("seeded above");
if *entry == 0 {
return Err(ChainIndexError::internal(format!(
"get_tx_out_set_info: tx {prev_txid:?} unspent counter underflow"
)));
}
*entry -= 1;
if *entry == 0 {
accumulator.transactions =
accumulator.transactions.checked_sub(1).ok_or_else(|| {
ChainIndexError::internal(
"get_tx_out_set_info: transactions counter underflow"
.to_string(),
)
})?;
}
}
}
}
let expected_bytes = accumulator
.transaction_outputs
.checked_mul(ZAINO_TXOUTSET_ENTRY_LEN)
.ok_or_else(|| {
ChainIndexError::internal(
"get_tx_out_set_info: bytes_serialized invariant overflow".to_string(),
)
})?;
if accumulator.bytes_serialized != expected_bytes {
return Err(ChainIndexError::internal(format!(
"get_tx_out_set_info: bytes_serialized invariant violated (got {}, expected {})",
accumulator.bytes_serialized, expected_bytes
)));
}
let total_amount = accumulator.total_zatoshis as f64 / 1e8;
let hash_serialized: String = accumulator.hash_serialized.encode_hex();
let best_block: String = best_tip.hash.encode_hex();
Ok(GetTxOutSetInfoResponse::Info(GetTxOutSetInfo {
height: best_tip.height.0.into(),
best_block,
transactions: accumulator.transactions,
txouts: accumulator.transaction_outputs,
bytes_serialized: accumulator.bytes_serialized,
hash_serialized,
total_amount,
}))
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ShieldedPool {
Sapling,
Orchard,
}
impl ShieldedPool {
pub fn pool_string(&self) -> String {
match self {
ShieldedPool::Sapling => "sapling".to_string(),
ShieldedPool::Orchard => "orchard".to_string(),
}
}
}
impl<T> NonFinalizedSnapshot for Arc<T>
where
T: NonFinalizedSnapshot,
{
fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> {
self.as_ref().get_chainblock_by_hash(target_hash)
}
fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> {
self.as_ref().get_chainblock_by_height(target_height)
}
fn max_serviceable_height(&self) -> &types::Height {
self.as_ref().max_serviceable_height()
}
}
pub trait NonFinalizedSnapshot {
fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock>;
fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock>;
fn max_serviceable_height(&self) -> &types::Height;
}
impl NonFinalizedSnapshot for NonfinalizedBlockCacheSnapshot {
fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> {
self.blocks.iter().find_map(|(hash, chainblock)| {
if hash == target_hash {
Some(chainblock)
} else {
None
}
})
}
fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> {
self.heights_to_hashes.iter().find_map(|(height, hash)| {
if height == target_height {
self.get_chainblock_by_hash(hash)
} else {
None
}
})
}
fn max_serviceable_height(&self) -> &types::Height {
&self.best_tip.height
}
}
impl NonFinalizedSnapshot for ChainIndexSnapshot {
fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> {
match self {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => non_finalized_snapshot.get_chainblock_by_hash(target_hash),
ChainIndexSnapshot::StillSyncingFinalizedState { .. } => None,
}
}
fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> {
match self {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => non_finalized_snapshot.get_chainblock_by_height(target_height),
ChainIndexSnapshot::StillSyncingFinalizedState { .. } => None,
}
}
fn max_serviceable_height(&self) -> &types::Height {
match self {
ChainIndexSnapshot::NonFinalizedStateExists {
non_finalized_snapshot,
} => non_finalized_snapshot.max_serviceable_height(),
ChainIndexSnapshot::StillSyncingFinalizedState {
validator_finalized_height,
} => validator_finalized_height,
}
}
}