cometbft_rpc/
client.rs

1//! CometBFT RPC client.
2
3mod compat;
4pub use compat::CompatMode;
5
6#[cfg(any(feature = "http-client", feature = "websocket-client"))]
7mod subscription;
8#[cfg(any(feature = "http-client", feature = "websocket-client"))]
9pub use subscription::{Subscription, SubscriptionClient};
10
11#[cfg(any(feature = "http-client", feature = "websocket-client"))]
12pub mod sync;
13
14#[cfg(any(feature = "http-client", feature = "websocket-client"))]
15mod transport;
16
17#[cfg(feature = "http-client")]
18pub use transport::http::{self, HttpClient, HttpClientUrl};
19#[cfg(feature = "websocket-client")]
20pub use transport::websocket::{
21    self, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
22};
23
24#[cfg(any(feature = "http-client", feature = "websocket-client"))]
25pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
26
27use core::fmt;
28
29use async_trait::async_trait;
30use cometbft::{abci, block::Height, evidence::Evidence, Genesis, Hash};
31use serde::{de::DeserializeOwned, Serialize};
32
33use crate::{
34    endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
35    paging::Paging,
36    prelude::*,
37    query::Query,
38    Error, Order, SimpleRequest,
39};
40
41/// Provides lightweight access to the CometBFT RPC. It gives access to all
42/// endpoints with the exception of the event subscription-related ones.
43///
44/// To access event subscription capabilities, use a client that implements the
45/// [`SubscriptionClient`] trait.
46///
47/// [`SubscriptionClient`]: trait.SubscriptionClient.html
48#[async_trait]
49pub trait Client {
50    /// `/abci_info`: get information about the ABCI application.
51    async fn abci_info(&self) -> Result<abci::response::Info, Error> {
52        Ok(self.perform(abci_info::Request).await?.response)
53    }
54
55    /// `/abci_query`: query the ABCI application
56    async fn abci_query<V>(
57        &self,
58        path: Option<String>,
59        data: V,
60        height: Option<Height>,
61        prove: bool,
62    ) -> Result<abci_query::AbciQuery, Error>
63    where
64        V: Into<Vec<u8>> + Send,
65    {
66        Ok(self
67            .perform(abci_query::Request::new(path, data, height, prove))
68            .await?
69            .response)
70    }
71
72    /// `/block`: get block at a given height.
73    async fn block<H>(&self, height: H) -> Result<block::Response, Error>
74    where
75        H: Into<Height> + Send,
76    {
77        self.perform(block::Request::new(height.into())).await
78    }
79
80    /// `/block_by_hash`: get block by hash.
81    async fn block_by_hash(&self, hash: cometbft::Hash) -> Result<block_by_hash::Response, Error> {
82        self.perform(block_by_hash::Request::new(hash)).await
83    }
84
85    /// `/block`: get the latest block.
86    async fn latest_block(&self) -> Result<block::Response, Error> {
87        self.perform(block::Request::default()).await
88    }
89
90    /// `/header`: get block header at a given height.
91    async fn header<H>(&self, height: H) -> Result<header::Response, Error>
92    where
93        H: Into<Height> + Send,
94    {
95        self.perform(header::Request::new(height.into())).await
96    }
97
98    /// `/header_by_hash`: get block by hash.
99    async fn header_by_hash(
100        &self,
101        hash: cometbft::Hash,
102    ) -> Result<header_by_hash::Response, Error> {
103        self.perform(header_by_hash::Request::new(hash)).await
104    }
105
106    /// `/block_results`: get ABCI results for a block at a particular height.
107    async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
108    where
109        H: Into<Height> + Send,
110    {
111        self.perform(block_results::Request::new(height.into()))
112            .await
113    }
114
115    /// `/block_results`: get ABCI results for the latest block.
116    async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
117        self.perform(block_results::Request::default()).await
118    }
119
120    /// `/block_search`: search for blocks by BeginBlock and EndBlock events.
121    async fn block_search(
122        &self,
123        query: Query,
124        page: u32,
125        per_page: u8,
126        order: Order,
127    ) -> Result<block_search::Response, Error> {
128        self.perform(block_search::Request::new(query, page, per_page, order))
129            .await
130    }
131
132    /// `/blockchain`: get block headers for `min` <= `height` <= `max`.
133    ///
134    /// Block headers are returned in descending order (highest first).
135    ///
136    /// Returns at most 20 items.
137    async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
138    where
139        H: Into<Height> + Send,
140    {
141        // TODO(tarcieri): return errors for invalid params before making request?
142        self.perform(blockchain::Request::new(min.into(), max.into()))
143            .await
144    }
145
146    /// `/broadcast_tx_async`: broadcast a transaction, returning immediately.
147    async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
148    where
149        T: Into<Vec<u8>> + Send,
150    {
151        self.perform(broadcast::tx_async::Request::new(tx)).await
152    }
153
154    /// `/broadcast_tx_sync`: broadcast a transaction, returning the response
155    /// from `CheckTx`.
156    async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
157    where
158        T: Into<Vec<u8>> + Send,
159    {
160        self.perform(broadcast::tx_sync::Request::new(tx)).await
161    }
162
163    /// `/broadcast_tx_commit`: broadcast a transaction, returning the response
164    /// from `DeliverTx`.
165    async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
166    where
167        T: Into<Vec<u8>> + Send,
168    {
169        self.perform(broadcast::tx_commit::Request::new(tx)).await
170    }
171
172    /// `/commit`: get block commit at a given height.
173    async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
174    where
175        H: Into<Height> + Send,
176    {
177        self.perform(commit::Request::new(height.into())).await
178    }
179
180    /// `/consensus_params`: get current consensus parameters at the specified
181    /// height.
182    async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
183    where
184        H: Into<Height> + Send,
185    {
186        self.perform(consensus_params::Request::new(Some(height.into())))
187            .await
188    }
189
190    /// `/consensus_state`: get current consensus state
191    async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
192        self.perform(consensus_state::Request::new()).await
193    }
194
195    // TODO(thane): Simplify once validators endpoint removes pagination.
196    /// `/validators`: get validators a given height.
197    async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
198    where
199        H: Into<Height> + Send,
200    {
201        let height = height.into();
202        match paging {
203            Paging::Default => {
204                self.perform(validators::Request::new(Some(height), None, None))
205                    .await
206            },
207            Paging::Specific {
208                page_number,
209                per_page,
210            } => {
211                self.perform(validators::Request::new(
212                    Some(height),
213                    Some(page_number),
214                    Some(per_page),
215                ))
216                .await
217            },
218            Paging::All => {
219                let mut page_num = 1_usize;
220                let mut validators = Vec::new();
221                let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
222                loop {
223                    let response = self
224                        .perform(validators::Request::new(
225                            Some(height),
226                            Some(page_num.into()),
227                            Some(per_page),
228                        ))
229                        .await?;
230                    validators.extend(response.validators);
231                    if validators.len() as i32 == response.total {
232                        return Ok(validators::Response::new(
233                            response.block_height,
234                            validators,
235                            response.total,
236                        ));
237                    }
238                    page_num += 1;
239                }
240            },
241        }
242    }
243
244    /// `/consensus_params`: get the latest consensus parameters.
245    async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
246        self.perform(consensus_params::Request::new(None)).await
247    }
248
249    /// `/commit`: get the latest block commit
250    async fn latest_commit(&self) -> Result<commit::Response, Error> {
251        self.perform(commit::Request::default()).await
252    }
253
254    /// `/health`: get node health.
255    ///
256    /// Returns empty result (200 OK) on success, no response in case of an error.
257    async fn health(&self) -> Result<(), Error> {
258        self.perform(health::Request).await?;
259        Ok(())
260    }
261
262    /// `/genesis`: get genesis file.
263    async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
264    where
265        AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
266    {
267        Ok(self.perform(genesis::Request::default()).await?.genesis)
268    }
269
270    /// `/net_info`: obtain information about P2P and other network connections.
271    async fn net_info(&self) -> Result<net_info::Response, Error> {
272        self.perform(net_info::Request).await
273    }
274
275    /// `/status`: get CometBFT status including node info, pubkey, latest
276    /// block hash, app hash, block height and time.
277    async fn status(&self) -> Result<status::Response, Error> {
278        self.perform(status::Request).await
279    }
280
281    /// `/broadcast_evidence`: broadcast an evidence.
282    async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
283        self.perform(evidence::Request::new(e)).await
284    }
285
286    /// `/tx`: find transaction by hash.
287    async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
288        self.perform(tx::Request::new(hash, prove)).await
289    }
290
291    /// `/tx_search`: search for transactions with their results.
292    async fn tx_search(
293        &self,
294        query: Query,
295        prove: bool,
296        page: u32,
297        per_page: u8,
298        order: Order,
299    ) -> Result<tx_search::Response, Error> {
300        self.perform(tx_search::Request::new(query, prove, page, per_page, order))
301            .await
302    }
303
304    #[cfg(any(feature = "http-client", feature = "websocket-client"))]
305    /// Poll the `/health` endpoint until it returns a successful result or
306    /// the given `timeout` has elapsed.
307    async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
308    where
309        T: Into<core::time::Duration> + Send,
310    {
311        let timeout = timeout.into();
312        let poll_interval = core::time::Duration::from_millis(200);
313        let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
314
315        while self.health().await.is_err() {
316            if attempts_remaining == 0 {
317                return Err(Error::timeout(timeout));
318            }
319
320            attempts_remaining -= 1;
321            tokio::time::sleep(poll_interval).await;
322        }
323
324        Ok(())
325    }
326
327    /// Perform a request against the RPC endpoint.
328    ///
329    /// This method is used by the default implementations of specific
330    /// endpoint methods. The latest protocol dialect is assumed to be invoked.
331    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
332    where
333        R: SimpleRequest;
334}