use std::fmt::Debug;
use std::str::FromStr;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use http::Uri;
use tokio::time::timeout;
use tokio::time::Duration;
use tracing::{debug, info};
use ibc_proto::cosmos::bank::v1beta1::query_client::QueryClient;
use tendermint_rpc::{Client, SubscriptionClient, Url, WebSocketClient};
use crate::error::RegistryError;
use crate::formatter::{SimpleWebSocketFormatter, UriFormatter};
pub trait QueryTypes {
type QueryInput: Debug + Send;
type QueryOutput;
type QueryError;
}
#[async_trait]
pub trait QueryContext: QueryTypes {
fn query_error(chain_name: String) -> Self::QueryError;
async fn query(url: Self::QueryInput) -> Result<Self::QueryOutput, Self::QueryError>;
async fn query_healthy(
chain_name: String,
urls: Vec<Self::QueryInput>,
) -> Result<Self::QueryOutput, Self::QueryError> {
info!("Trying to find a healthy RPC endpoint for chain {chain_name}");
debug!("Trying the following RPC endpoints: {urls:?}");
let mut futures: FuturesUnordered<_> =
urls.into_iter().map(|url| Self::query(url)).collect();
while let Some(result) = futures.next().await {
if result.is_ok() {
return result;
}
}
Err(Self::query_error(chain_name))
}
}
pub struct SimpleHermesRpcQuerier;
#[derive(Clone, Debug)]
pub struct HermesConfigData {
pub rpc_address: Url,
pub max_block_size: u64,
pub websocket: Url,
}
impl QueryTypes for SimpleHermesRpcQuerier {
type QueryInput = String;
type QueryOutput = HermesConfigData;
type QueryError = RegistryError;
}
#[async_trait]
impl QueryContext for SimpleHermesRpcQuerier {
fn query_error(chain_name: String) -> RegistryError {
RegistryError::no_healthy_rpc(chain_name)
}
async fn query(rpc: Self::QueryInput) -> Result<Self::QueryOutput, Self::QueryError> {
let websocket_addr = SimpleWebSocketFormatter::parse_or_build_address(rpc.as_str())?;
info!("Querying WebSocket server at {websocket_addr}");
let (client, driver) = timeout(
Duration::from_secs(5),
WebSocketClient::new(websocket_addr.clone()),
)
.await
.map_err(|e| RegistryError::websocket_time_out_error(websocket_addr.to_string(), e))?
.map_err(|e| RegistryError::websocket_connect_error(websocket_addr.to_string(), e))?;
let driver_handle = tokio::spawn(driver.run());
let latest_consensus_params = match client.latest_consensus_params().await {
Ok(response) => response.consensus_params.block.max_bytes,
Err(e) => {
return Err(RegistryError::rpc_consensus_params_error(
websocket_addr.to_string(),
e,
))
}
};
client.close().map_err(|e| {
RegistryError::websocket_conn_close_error(websocket_addr.to_string(), e)
})?;
driver_handle
.await
.map_err(|e| RegistryError::join_error("chain_data_join".to_string(), e))?
.map_err(|e| RegistryError::websocket_driver_error(websocket_addr.to_string(), e))?;
Ok(HermesConfigData {
rpc_address: Url::from_str(&rpc)
.map_err(|e| RegistryError::tendermint_url_parse_error(rpc, e))?,
max_block_size: latest_consensus_params,
websocket: websocket_addr,
})
}
}
pub struct GrpcHealthCheckQuerier;
impl QueryTypes for GrpcHealthCheckQuerier {
type QueryInput = Uri;
type QueryOutput = Url;
type QueryError = RegistryError;
}
#[async_trait]
impl QueryContext for GrpcHealthCheckQuerier {
fn query_error(chain_name: String) -> Self::QueryError {
RegistryError::no_healthy_grpc(chain_name)
}
async fn query(uri: Self::QueryInput) -> Result<Self::QueryOutput, Self::QueryError> {
let tendermint_url = uri
.to_string()
.parse()
.map_err(|e| RegistryError::tendermint_url_parse_error(uri.to_string(), e))?;
info!("Querying gRPC server at {tendermint_url}");
QueryClient::connect(uri)
.await
.map_err(|_| RegistryError::unable_to_connect_with_grpc())?;
Ok(tendermint_url)
}
}