mod subscription;
pub use subscription::{Subscription, SubscriptionClient};
pub mod sync;
mod transport;
use core::{fmt, time::Duration};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use tendermint::{abci, block::Height, evidence::Evidence, Genesis, Hash};
use tokio::time;
#[cfg(feature = "http-client")]
pub use transport::http::{HttpClient, HttpClientUrl};
pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
#[cfg(feature = "websocket-client")]
pub use transport::websocket::{
WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
};
use crate::{
endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
paging::Paging,
prelude::*,
query::Query,
Error, Order, SimpleRequest,
};
#[async_trait]
pub trait Client {
async fn abci_info(&self) -> Result<abci::response::Info, Error> {
Ok(self.perform(abci_info::Request).await?.response)
}
async fn abci_query<V>(
&self,
path: Option<String>,
data: V,
height: Option<Height>,
prove: bool,
) -> Result<abci_query::AbciQuery, Error>
where
V: Into<Vec<u8>> + Send,
{
Ok(self
.perform(abci_query::Request::new(path, data, height, prove))
.await?
.response)
}
async fn block<H>(&self, height: H) -> Result<block::Response, Error>
where
H: Into<Height> + Send,
{
self.perform(block::Request::new(height.into())).await
}
async fn block_by_hash(
&self,
hash: tendermint::Hash,
) -> Result<block_by_hash::Response, Error> {
self.perform(block_by_hash::Request::new(hash)).await
}
async fn latest_block(&self) -> Result<block::Response, Error> {
self.perform(block::Request::default()).await
}
async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height> + Send,
{
self.perform(block_results::Request::new(height.into()))
.await
}
async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
self.perform(block_results::Request::default()).await
}
async fn block_search(
&self,
query: Query,
page: u32,
per_page: u8,
order: Order,
) -> Result<block_search::Response, Error> {
self.perform(block_search::Request::new(query, page, per_page, order))
.await
}
async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
where
H: Into<Height> + Send,
{
self.perform(blockchain::Request::new(min.into(), max.into()))
.await
}
async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
where
T: Into<Vec<u8>> + Send,
{
self.perform(broadcast::tx_async::Request::new(tx)).await
}
async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
where
T: Into<Vec<u8>> + Send,
{
self.perform(broadcast::tx_sync::Request::new(tx)).await
}
async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
where
T: Into<Vec<u8>> + Send,
{
self.perform(broadcast::tx_commit::Request::new(tx)).await
}
async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
where
H: Into<Height> + Send,
{
self.perform(commit::Request::new(height.into())).await
}
async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
where
H: Into<Height> + Send,
{
self.perform(consensus_params::Request::new(Some(height.into())))
.await
}
async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
self.perform(consensus_state::Request::new()).await
}
async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
where
H: Into<Height> + Send,
{
let height = height.into();
match paging {
Paging::Default => {
self.perform(validators::Request::new(Some(height), None, None))
.await
},
Paging::Specific {
page_number,
per_page,
} => {
self.perform(validators::Request::new(
Some(height),
Some(page_number),
Some(per_page),
))
.await
},
Paging::All => {
let mut page_num = 1_usize;
let mut validators = Vec::new();
let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
loop {
let response = self
.perform(validators::Request::new(
Some(height),
Some(page_num.into()),
Some(per_page),
))
.await?;
validators.extend(response.validators);
if validators.len() as i32 == response.total {
return Ok(validators::Response::new(
response.block_height,
validators,
response.total,
));
}
page_num += 1;
}
},
}
}
async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
self.perform(consensus_params::Request::new(None)).await
}
async fn latest_commit(&self) -> Result<commit::Response, Error> {
self.perform(commit::Request::default()).await
}
async fn health(&self) -> Result<(), Error> {
self.perform(health::Request).await?;
Ok(())
}
async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
where
AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
{
Ok(self.perform(genesis::Request::default()).await?.genesis)
}
async fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request).await
}
async fn status(&self) -> Result<status::Response, Error> {
self.perform(status::Request).await
}
async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
self.perform(evidence::Request::new(e)).await
}
async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
self.perform(tx::Request::new(hash, prove)).await
}
async fn tx_search(
&self,
query: Query,
prove: bool,
page: u32,
per_page: u8,
order: Order,
) -> Result<tx_search::Response, Error> {
self.perform(tx_search::Request::new(query, prove, page, per_page, order))
.await
}
async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
where
T: Into<Duration> + Send,
{
let timeout = timeout.into();
let poll_interval = Duration::from_millis(200);
let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
while self.health().await.is_err() {
if attempts_remaining == 0 {
return Err(Error::timeout(timeout));
}
attempts_remaining -= 1;
time::sleep(poll_interval).await;
}
Ok(())
}
async fn perform<R>(&self, request: R) -> Result<R::Response, Error>
where
R: SimpleRequest;
}