use crate::block_processor::types::{
BlockToProcess, FullBlockInformation, ParsedTransactionResponse,
};
use crate::error::ScraperError;
use crate::helpers::tx_hash;
use crate::{Any, MessageRegistry, default_message_registry};
use futures::StreamExt;
use futures::future::join3;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tendermint::Hash;
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
use tendermint_rpc::{Client, HttpClient, Paging};
use tokio::sync::Mutex;
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Clone)]
pub struct RpcClient {
inner: Arc<HttpClient>,
pub(crate) message_registry: MessageRegistry,
}
impl RpcClient {
pub fn new(url: &Url) -> Result<Self, ScraperError> {
let http_client = HttpClient::new(url.as_str()).map_err(|source| {
ScraperError::HttpConnectionFailure {
url: url.to_string(),
source: Box::new(source),
}
})?;
Ok(RpcClient {
inner: Arc::new(http_client),
message_registry: default_message_registry(),
})
}
fn decode_or_skip(&self, msg: &Any) -> Option<serde_json::Value> {
match self.message_registry.try_decode(msg) {
Ok(decoded) => Some(decoded),
Err(err) => {
warn!("Failed to decode raw message: {err}");
None
}
}
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
block: BlockToProcess,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
let (results, validators, raw_transactions) = join3(
self.get_block_results(height),
self.get_validators_details(height),
self.get_transaction_results(&block.block.data),
)
.await;
let raw_transactions = raw_transactions?;
let mut transactions = Vec::with_capacity(raw_transactions.len());
for raw_tx in raw_transactions {
let mut parsed_messages = HashMap::new();
let mut parsed_message_urls = HashMap::new();
let tx = cosmrs::Tx::from_bytes(&raw_tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: raw_tx.hash,
source,
}
})?;
for (index, msg) in tx.body.messages.iter().enumerate() {
if let Some(value) = self.decode_or_skip(msg) {
parsed_messages.insert(index, value);
parsed_message_urls.insert(index, msg.type_url.clone());
}
}
transactions.push(ParsedTransactionResponse {
hash: raw_tx.hash,
height: raw_tx.height,
index: raw_tx.index,
tx_result: raw_tx.tx_result,
tx,
proof: raw_tx.proof,
parsed_messages,
parsed_message_urls,
block: block.block.clone(),
})
}
Ok(FullBlockInformation {
block: block.block,
results: results?,
validators: validators?,
transactions,
})
}
#[instrument(skip(self), err(Display))]
pub async fn get_basic_block_details(
&self,
height: u32,
) -> Result<block::Response, ScraperError> {
debug!("getting basic block details");
self.inner
.block(height)
.await
.map_err(|source| ScraperError::BlockQueryFailure {
height,
source: Box::new(source),
})
}
#[instrument(skip(self), err(Display))]
pub async fn get_block_results(
&self,
height: u32,
) -> Result<block_results::Response, ScraperError> {
debug!("getting block results");
self.inner.block_results(height).await.map_err(|source| {
ScraperError::BlockResultsQueryFailure {
height,
source: Box::new(source),
}
})
}
pub(crate) async fn current_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting current block height");
let info =
self.inner
.abci_info()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure {
source: Box::new(source),
})?;
Ok(info.last_block_height.value())
}
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");
let status =
self.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure {
source: Box::new(source),
})?;
Ok(status.sync_info.earliest_block_height.value())
}
async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
) -> Result<Vec<tx::Response>, ScraperError> {
let ordered_results = Arc::new(Mutex::new(BTreeMap::new()));
futures::stream::iter(
raw.iter()
.map(tx_hash)
.enumerate()
.zip(std::iter::repeat(ordered_results.clone())),
)
.for_each_concurrent(4, |((id, tx_hash), ordered_results)| async move {
let res = self.get_transaction_result(tx_hash).await;
ordered_results.lock().await.insert(id, res);
})
.await;
#[allow(clippy::unwrap_used)]
let inner = Arc::into_inner(ordered_results).unwrap().into_inner();
inner.into_values().collect()
}
#[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))]
async fn get_transaction_result(&self, tx_hash: Hash) -> Result<tx::Response, ScraperError> {
debug!("getting tx results");
self.inner
.tx(tx_hash, false)
.await
.map_err(|source| ScraperError::TxResultsQueryFailure {
hash: tx_hash,
source: Box::new(source),
})
}
#[instrument(skip(self))]
pub async fn get_validators_details(
&self,
height: u32,
) -> Result<validators::Response, ScraperError> {
debug!("getting validators set");
self.inner
.validators(height, Paging::All)
.await
.map_err(|source| ScraperError::ValidatorsQueryFailure {
height,
source: Box::new(source),
})
}
}