use std::{
ops::Range,
sync::{
Arc,
atomic::{self, AtomicBool},
},
time::Duration,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use zcash_client_backend::{
data_api::chain::ChainState,
proto::{
compact_formats::CompactBlock,
service::{
BlockId, GetAddressUtxosReply, RawTransaction, TreeState,
compact_tx_streamer_client::CompactTxStreamerClient,
},
},
};
use zcash_primitives::transaction::{Transaction, TxId};
use zcash_protocol::consensus::{self, BlockHeight};
#[cfg(not(feature = "darkside_test"))]
use zcash_client_backend::proto::service::SubtreeRoot;
use crate::error::{MempoolError, ServerError};
pub(crate) mod fetch;
#[derive(Debug)]
pub enum FetchRequest {
ChainTip(oneshot::Sender<Result<BlockId, tonic::Status>>),
CompactBlock(
oneshot::Sender<Result<CompactBlock, tonic::Status>>,
BlockHeight,
),
CompactBlockRange(
oneshot::Sender<Result<tonic::Streaming<CompactBlock>, tonic::Status>>,
Range<BlockHeight>,
),
TreeState(
oneshot::Sender<Result<TreeState, tonic::Status>>,
BlockHeight,
),
Transaction(oneshot::Sender<Result<RawTransaction, tonic::Status>>, TxId),
#[allow(dead_code)]
UtxoMetadata(
oneshot::Sender<Result<Vec<GetAddressUtxosReply>, tonic::Status>>,
(Vec<String>, BlockHeight),
),
TransparentAddressTxs(
oneshot::Sender<Result<tonic::Streaming<RawTransaction>, tonic::Status>>,
(String, Range<BlockHeight>),
),
#[cfg(not(feature = "darkside_test"))]
SubtreeRoots(
oneshot::Sender<Result<tonic::Streaming<SubtreeRoot>, tonic::Status>>,
u32,
i32,
u32,
),
}
pub(crate) async fn get_chain_height(
fetch_request_sender: UnboundedSender<FetchRequest>,
) -> Result<BlockHeight, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::ChainTip(reply_sender))
.expect("receiver should never be dropped");
let chain_tip = reply_receiver
.await
.expect("sender should never be dropped")?;
Ok(BlockHeight::from_u32(chain_tip.height as u32))
}
pub(crate) async fn get_compact_block(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_height: BlockHeight,
) -> Result<CompactBlock, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::CompactBlock(reply_sender, block_height))
.expect("receiver should never be dropped");
Ok(reply_receiver
.await
.expect("sender should never be dropped")?)
}
pub(crate) async fn get_compact_block_range(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_range: Range<BlockHeight>,
) -> Result<tonic::Streaming<CompactBlock>, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::CompactBlockRange(reply_sender, block_range))
.expect("receiver should never be dropped");
let block_stream = reply_receiver
.await
.expect("sender should never be dropped")?;
Ok(block_stream)
}
#[cfg(not(feature = "darkside_test"))]
pub(crate) async fn get_subtree_roots(
fetch_request_sender: UnboundedSender<FetchRequest>,
start_index: u32,
shielded_protocol: i32,
max_entries: u32,
) -> Result<Vec<SubtreeRoot>, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::SubtreeRoots(
reply_sender,
start_index,
shielded_protocol,
max_entries,
))
.expect("receiver should never be dropped");
let mut subtree_root_stream = reply_receiver
.await
.expect("sender should never be dropped")?;
let mut subtree_roots = Vec::new();
while let Some(subtree_root) = subtree_root_stream.message().await? {
subtree_roots.push(subtree_root);
}
Ok(subtree_roots)
}
pub(crate) async fn get_frontiers(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_height: BlockHeight,
) -> Result<ChainState, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::TreeState(reply_sender, block_height))
.expect("receiver should never be dropped");
let tree_state = reply_receiver
.await
.expect("sender should never be dropped")?;
tree_state
.to_chain_state()
.map_err(ServerError::InvalidFrontier)
}
pub(crate) async fn get_transaction_and_block_height(
fetch_request_sender: UnboundedSender<FetchRequest>,
consensus_parameters: &impl consensus::Parameters,
txid: TxId,
) -> Result<(Transaction, BlockHeight), ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::Transaction(reply_sender, txid))
.expect("receiver should never be dropped");
let raw_transaction = reply_receiver
.await
.expect("sender should never be dropped")?;
let block_height =
BlockHeight::from_u32(u32::try_from(raw_transaction.height).expect("should be valid u32"));
let transaction = Transaction::read(
&raw_transaction.data[..],
consensus::BranchId::for_height(consensus_parameters, block_height),
)
.map_err(ServerError::InvalidTransaction)?;
Ok((transaction, block_height))
}
#[allow(dead_code)]
pub(crate) async fn get_utxo_metadata(
fetch_request_sender: UnboundedSender<FetchRequest>,
transparent_addresses: Vec<String>,
start_height: BlockHeight,
) -> Result<Vec<GetAddressUtxosReply>, ServerError> {
if transparent_addresses.is_empty() {
panic!("addresses must be non-empty!");
}
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::UtxoMetadata(
reply_sender,
(transparent_addresses, start_height),
))
.expect("receiver should never be dropped");
let transparent_output_metadata = reply_receiver
.await
.expect("sender should never be dropped")?;
Ok(transparent_output_metadata)
}
pub(crate) async fn get_transparent_address_transactions(
fetch_request_sender: UnboundedSender<FetchRequest>,
consensus_parameters: &impl consensus::Parameters,
transparent_address: String,
block_range: Range<BlockHeight>,
) -> Result<Vec<(BlockHeight, Transaction)>, ServerError> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::TransparentAddressTxs(
reply_sender,
(transparent_address, block_range),
))
.expect("receiver should never be dropped");
let mut raw_transaction_stream = reply_receiver
.await
.expect("sender should never be dropped")?;
let mut raw_transactions: Vec<RawTransaction> = Vec::new();
while let Some(raw_tx) = raw_transaction_stream.message().await? {
raw_transactions.push(raw_tx);
}
let transactions = raw_transactions
.into_iter()
.map(|raw_transaction| {
let block_height = BlockHeight::from_u32(
u32::try_from(raw_transaction.height).expect("should be valid u32"),
);
let transaction = Transaction::read(
&raw_transaction.data[..],
consensus::BranchId::for_height(consensus_parameters, block_height),
)
.map_err(ServerError::InvalidTransaction)?;
Ok((block_height, transaction))
})
.collect::<Result<Vec<(BlockHeight, Transaction)>, ServerError>>()?;
Ok(transactions)
}
pub(crate) async fn get_mempool_transaction_stream(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
shutdown_mempool: Arc<AtomicBool>,
) -> Result<tonic::Streaming<RawTransaction>, MempoolError> {
tracing::debug!("Fetching mempool stream");
let mut interval = tokio::time::interval(Duration::from_secs(3));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await;
loop {
tokio::select! {
mempool_stream_response = fetch::get_mempool_stream(client) => {
return mempool_stream_response.map_err(|e| MempoolError::ServerError(ServerError::RequestFailed(e)));
}
_ = interval.tick() => {
if shutdown_mempool.load(atomic::Ordering::Acquire) {
return Err(MempoolError::ShutdownWithoutStream);
}
}
}
}
}