tendermint_rpc/
client.rs

1//! Tendermint RPC client.
2
3mod compat;
4pub use compat::CompatMode;
5
6#[cfg(any(
7    feature = "http-client",
8    feature = "websocket-client",
9    feature = "mock-client"
10))]
11mod subscription;
12#[cfg(any(
13    feature = "http-client",
14    feature = "websocket-client",
15    feature = "mock-client"
16))]
17pub use subscription::{Subscription, SubscriptionClient};
18
19#[cfg(any(
20    feature = "http-client",
21    feature = "websocket-client",
22    feature = "mock-client"
23))]
24pub mod sync;
25
26#[cfg(any(
27    feature = "http-client",
28    feature = "websocket-client",
29    feature = "mock-client"
30))]
31mod transport;
32
33#[cfg(feature = "http-client")]
34pub use transport::http::{self, HttpClient, HttpClientUrl};
35#[cfg(feature = "websocket-client")]
36pub use transport::websocket::{
37    self, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
38};
39
40#[cfg(feature = "mock-client")]
41pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
42
43use core::fmt;
44
45use async_trait::async_trait;
46use serde::{de::DeserializeOwned, Serialize};
47use tendermint::{abci, block::Height, evidence::Evidence, Genesis, Hash};
48
49use crate::{
50    endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
51    paging::Paging,
52    prelude::*,
53    query::Query,
54    Error, Order, SimpleRequest,
55};
56
57/// Provides lightweight access to the Tendermint RPC. It gives access to all
58/// endpoints with the exception of the event subscription-related ones.
59///
60/// To access event subscription capabilities, use a client that implements the
61/// [`SubscriptionClient`] trait.
62///
63/// [`SubscriptionClient`]: trait.SubscriptionClient.html
64#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
65#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
66pub trait Client {
67    /// `/abci_info`: get information about the ABCI application.
68    async fn abci_info(&self) -> Result<abci::response::Info, Error> {
69        Ok(self.perform(abci_info::Request).await?.response)
70    }
71
72    /// `/abci_query`: query the ABCI application
73    async fn abci_query<V>(
74        &self,
75        path: Option<String>,
76        data: V,
77        height: Option<Height>,
78        prove: bool,
79    ) -> Result<abci_query::AbciQuery, Error>
80    where
81        V: Into<Vec<u8>> + Send,
82    {
83        Ok(self
84            .perform(abci_query::Request::new(path, data, height, prove))
85            .await?
86            .response)
87    }
88
89    /// `/block`: get block at a given height.
90    async fn block<H>(&self, height: H) -> Result<block::Response, Error>
91    where
92        H: Into<Height> + Send,
93    {
94        self.perform(block::Request::new(height.into())).await
95    }
96
97    /// `/block_by_hash`: get block by hash.
98    async fn block_by_hash(
99        &self,
100        hash: tendermint::Hash,
101    ) -> Result<block_by_hash::Response, Error> {
102        self.perform(block_by_hash::Request::new(hash)).await
103    }
104
105    /// `/block`: get the latest block.
106    async fn latest_block(&self) -> Result<block::Response, Error> {
107        self.perform(block::Request::default()).await
108    }
109
110    /// `/header`: get block header at a given height.
111    async fn header<H>(&self, height: H) -> Result<header::Response, Error>
112    where
113        H: Into<Height> + Send,
114    {
115        self.perform(header::Request::new(height.into())).await
116    }
117
118    /// `/header_by_hash`: get block by hash.
119    async fn header_by_hash(
120        &self,
121        hash: tendermint::Hash,
122    ) -> Result<header_by_hash::Response, Error> {
123        self.perform(header_by_hash::Request::new(hash)).await
124    }
125
126    /// `/block_results`: get ABCI results for a block at a particular height.
127    async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
128    where
129        H: Into<Height> + Send,
130    {
131        self.perform(block_results::Request::new(height.into()))
132            .await
133    }
134
135    /// `/block_results`: get ABCI results for the latest block.
136    async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
137        self.perform(block_results::Request::default()).await
138    }
139
140    /// `/block_search`: search for blocks by BeginBlock and EndBlock events.
141    async fn block_search(
142        &self,
143        query: Query,
144        page: u32,
145        per_page: u8,
146        order: Order,
147    ) -> Result<block_search::Response, Error> {
148        self.perform(block_search::Request::new(query, page, per_page, order))
149            .await
150    }
151
152    /// `/blockchain`: get block headers for `min` <= `height` <= `max`.
153    ///
154    /// Block headers are returned in descending order (highest first).
155    ///
156    /// Returns at most 20 items.
157    async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
158    where
159        H: Into<Height> + Send,
160    {
161        // TODO(tarcieri): return errors for invalid params before making request?
162        self.perform(blockchain::Request::new(min.into(), max.into()))
163            .await
164    }
165
166    /// `/broadcast_tx_async`: broadcast a transaction, returning immediately.
167    async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
168    where
169        T: Into<Vec<u8>> + Send,
170    {
171        self.perform(broadcast::tx_async::Request::new(tx)).await
172    }
173
174    /// `/broadcast_tx_sync`: broadcast a transaction, returning the response
175    /// from `CheckTx`.
176    async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
177    where
178        T: Into<Vec<u8>> + Send,
179    {
180        self.perform(broadcast::tx_sync::Request::new(tx)).await
181    }
182
183    /// `/broadcast_tx_commit`: broadcast a transaction, returning the response
184    /// from `DeliverTx`.
185    async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
186    where
187        T: Into<Vec<u8>> + Send,
188    {
189        self.perform(broadcast::tx_commit::Request::new(tx)).await
190    }
191
192    /// `/commit`: get block commit at a given height.
193    async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
194    where
195        H: Into<Height> + Send,
196    {
197        self.perform(commit::Request::new(height.into())).await
198    }
199
200    /// `/consensus_params`: get current consensus parameters at the specified
201    /// height.
202    async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
203    where
204        H: Into<Height> + Send,
205    {
206        self.perform(consensus_params::Request::new(Some(height.into())))
207            .await
208    }
209
210    /// `/consensus_state`: get current consensus state
211    async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
212        self.perform(consensus_state::Request::new()).await
213    }
214
215    // TODO(thane): Simplify once validators endpoint removes pagination.
216    /// `/validators`: get validators a given height.
217    async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
218    where
219        H: Into<Height> + Send,
220    {
221        let height = height.into();
222        match paging {
223            Paging::Default => {
224                self.perform(validators::Request::new(Some(height), None, None))
225                    .await
226            },
227            Paging::Specific {
228                page_number,
229                per_page,
230            } => {
231                self.perform(validators::Request::new(
232                    Some(height),
233                    Some(page_number),
234                    Some(per_page),
235                ))
236                .await
237            },
238            Paging::All => {
239                let mut page_num = 1_usize;
240                let mut validators = Vec::new();
241                let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
242                loop {
243                    let response = self
244                        .perform(validators::Request::new(
245                            Some(height),
246                            Some(page_num.into()),
247                            Some(per_page),
248                        ))
249                        .await?;
250                    validators.extend(response.validators);
251                    if validators.len() as i32 == response.total {
252                        return Ok(validators::Response::new(
253                            response.block_height,
254                            validators,
255                            response.total,
256                        ));
257                    }
258                    page_num += 1;
259                }
260            },
261        }
262    }
263
264    /// `/consensus_params`: get the latest consensus parameters.
265    async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
266        self.perform(consensus_params::Request::new(None)).await
267    }
268
269    /// `/commit`: get the latest block commit
270    async fn latest_commit(&self) -> Result<commit::Response, Error> {
271        self.perform(commit::Request::default()).await
272    }
273
274    /// `/health`: get node health.
275    ///
276    /// Returns empty result (200 OK) on success, no response in case of an error.
277    async fn health(&self) -> Result<(), Error> {
278        self.perform(health::Request).await?;
279        Ok(())
280    }
281
282    /// `/genesis`: get genesis file.
283    async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
284    where
285        AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
286    {
287        Ok(self.perform(genesis::Request::default()).await?.genesis)
288    }
289
290    async fn genesis_chunked(&self, chunk: u64) -> Result<genesis_chunked::Response, Error> {
291        self.perform(genesis_chunked::Request::new(chunk)).await
292    }
293
294    /// `/genesis_chunked`: get genesis file in multiple chunks.
295    #[cfg(any(feature = "http-client", feature = "websocket-client"))]
296    async fn genesis_chunked_stream(
297        &self,
298    ) -> core::pin::Pin<Box<dyn futures::Stream<Item = Result<Vec<u8>, Error>> + '_>> {
299        Box::pin(futures::stream::unfold(Some(0), move |chunk| async move {
300            // Verify if there are more chunks to fetch
301            let chunk = chunk?;
302
303            match self.genesis_chunked(chunk).await {
304                Ok(response) => {
305                    if response.chunk + 1 >= response.total {
306                        // No more chunks to fetch
307                        Some((Ok(response.data), None))
308                    } else {
309                        // Emit this chunk and fetch the next chunk
310                        Some((Ok(response.data), Some(response.chunk + 1)))
311                    }
312                },
313                Err(e) => Some((Err(e), None)), // Abort the stream
314            }
315        }))
316    }
317
318    /// `/net_info`: obtain information about P2P and other network connections.
319    async fn net_info(&self) -> Result<net_info::Response, Error> {
320        self.perform(net_info::Request).await
321    }
322
323    /// `/status`: get Tendermint status including node info, pubkey, latest
324    /// block hash, app hash, block height and time.
325    async fn status(&self) -> Result<status::Response, Error> {
326        self.perform(status::Request).await
327    }
328
329    /// `/broadcast_evidence`: broadcast an evidence.
330    async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
331        self.perform(evidence::Request::new(e)).await
332    }
333
334    /// `/tx`: find transaction by hash.
335    async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
336        self.perform(tx::Request::new(hash, prove)).await
337    }
338
339    /// `/tx_search`: search for transactions with their results.
340    async fn tx_search(
341        &self,
342        query: Query,
343        prove: bool,
344        page: u32,
345        per_page: u8,
346        order: Order,
347    ) -> Result<tx_search::Response, Error> {
348        self.perform(tx_search::Request::new(query, prove, page, per_page, order))
349            .await
350    }
351
352    #[cfg(any(feature = "http-client", feature = "websocket-client"))]
353    /// Poll the `/health` endpoint until it returns a successful result or
354    /// the given `timeout` has elapsed.
355    async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
356    where
357        T: Into<core::time::Duration> + Send,
358    {
359        let timeout = timeout.into();
360        let poll_interval = core::time::Duration::from_millis(200);
361        let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
362
363        while self.health().await.is_err() {
364            if attempts_remaining == 0 {
365                return Err(Error::timeout(timeout));
366            }
367
368            attempts_remaining -= 1;
369            tokio::time::sleep(poll_interval).await;
370        }
371
372        Ok(())
373    }
374
375    /// Perform a request against the RPC endpoint.
376    ///
377    /// This method is used by the default implementations of specific
378    /// endpoint methods. The latest protocol dialect is assumed to be invoked.
379    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
380    where
381        R: SimpleRequest;
382}