iq_cometbft_rpc/
client.rs

1//! CometBFT RPC client.
2
3mod compat;
4pub use compat::CompatMode;
5
6#[cfg(any(
7    feature = "http-client",
8    feature = "websocket-client",
9    feature = "mock-client"
10))]
11mod subscription;
12#[cfg(any(
13    feature = "http-client",
14    feature = "websocket-client",
15    feature = "mock-client"
16))]
17pub use subscription::{Subscription, SubscriptionClient};
18
19#[cfg(any(
20    feature = "http-client",
21    feature = "websocket-client",
22    feature = "mock-client"
23))]
24pub mod sync;
25
26#[cfg(any(
27    feature = "http-client",
28    feature = "websocket-client",
29    feature = "mock-client"
30))]
31mod transport;
32
33#[cfg(feature = "http-client")]
34pub use transport::http::{self, HttpClient, HttpClientUrl};
35#[cfg(feature = "websocket-client")]
36pub use transport::websocket::{
37    self, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
38};
39
40#[cfg(feature = "mock-client")]
41pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
42
43use core::fmt;
44
45use async_trait::async_trait;
46use cometbft::{abci, block::Height, evidence::Evidence, Genesis, Hash};
47use serde::{de::DeserializeOwned, Serialize};
48
49use crate::{
50    endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
51    paging::Paging,
52    prelude::*,
53    query::Query,
54    Error, Order, SimpleRequest,
55};
56
57/// Provides lightweight access to the CometBFT RPC. It gives access to all
58/// endpoints with the exception of the event subscription-related ones.
59///
60/// To access event subscription capabilities, use a client that implements the
61/// [`SubscriptionClient`] trait.
62///
63/// [`SubscriptionClient`]: trait.SubscriptionClient.html
64#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
65#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
66pub trait Client {
67    /// `/abci_info`: get information about the ABCI application.
68    async fn abci_info(&self) -> Result<abci::response::Info, Error> {
69        Ok(self.perform(abci_info::Request).await?.response)
70    }
71
72    /// `/abci_query`: query the ABCI application
73    async fn abci_query<V>(
74        &self,
75        path: Option<String>,
76        data: V,
77        height: Option<Height>,
78        prove: bool,
79    ) -> Result<abci_query::AbciQuery, Error>
80    where
81        V: Into<Vec<u8>> + Send,
82    {
83        Ok(self
84            .perform(abci_query::Request::new(path, data, height, prove))
85            .await?
86            .response)
87    }
88
89    /// `/block`: get block at a given height.
90    async fn block<H>(&self, height: H) -> Result<block::Response, Error>
91    where
92        H: Into<Height> + Send,
93    {
94        self.perform(block::Request::new(height.into())).await
95    }
96
97    /// `/block_by_hash`: get block by hash.
98    async fn block_by_hash(&self, hash: cometbft::Hash) -> Result<block_by_hash::Response, Error> {
99        self.perform(block_by_hash::Request::new(hash)).await
100    }
101
102    /// `/block`: get the latest block.
103    async fn latest_block(&self) -> Result<block::Response, Error> {
104        self.perform(block::Request::default()).await
105    }
106
107    /// `/header`: get block header at a given height.
108    async fn header<H>(&self, height: H) -> Result<header::Response, Error>
109    where
110        H: Into<Height> + Send,
111    {
112        self.perform(header::Request::new(height.into())).await
113    }
114
115    /// `/header_by_hash`: get block by hash.
116    async fn header_by_hash(
117        &self,
118        hash: cometbft::Hash,
119    ) -> Result<header_by_hash::Response, Error> {
120        self.perform(header_by_hash::Request::new(hash)).await
121    }
122
123    /// `/block_results`: get ABCI results for a block at a particular height.
124    async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
125    where
126        H: Into<Height> + Send,
127    {
128        self.perform(block_results::Request::new(height.into()))
129            .await
130    }
131
132    /// `/block_results`: get ABCI results for the latest block.
133    async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
134        self.perform(block_results::Request::default()).await
135    }
136
137    /// `/block_search`: search for blocks by BeginBlock and EndBlock events.
138    async fn block_search(
139        &self,
140        query: Query,
141        page: u32,
142        per_page: u8,
143        order: Order,
144    ) -> Result<block_search::Response, Error> {
145        self.perform(block_search::Request::new(query, page, per_page, order))
146            .await
147    }
148
149    /// `/blockchain`: get block headers for `min` <= `height` <= `max`.
150    ///
151    /// Block headers are returned in descending order (highest first).
152    ///
153    /// Returns at most 20 items.
154    async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
155    where
156        H: Into<Height> + Send,
157    {
158        // TODO(tarcieri): return errors for invalid params before making request?
159        self.perform(blockchain::Request::new(min.into(), max.into()))
160            .await
161    }
162
163    /// `/broadcast_tx_async`: broadcast a transaction, returning immediately.
164    async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
165    where
166        T: Into<Vec<u8>> + Send,
167    {
168        self.perform(broadcast::tx_async::Request::new(tx)).await
169    }
170
171    /// `/broadcast_tx_sync`: broadcast a transaction, returning the response
172    /// from `CheckTx`.
173    async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
174    where
175        T: Into<Vec<u8>> + Send,
176    {
177        self.perform(broadcast::tx_sync::Request::new(tx)).await
178    }
179
180    /// `/broadcast_tx_commit`: broadcast a transaction, returning the response
181    /// from `DeliverTx`.
182    async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
183    where
184        T: Into<Vec<u8>> + Send,
185    {
186        self.perform(broadcast::tx_commit::Request::new(tx)).await
187    }
188
189    /// `/commit`: get block commit at a given height.
190    async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
191    where
192        H: Into<Height> + Send,
193    {
194        self.perform(commit::Request::new(height.into())).await
195    }
196
197    /// `/consensus_params`: get current consensus parameters at the specified
198    /// height.
199    async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
200    where
201        H: Into<Height> + Send,
202    {
203        self.perform(consensus_params::Request::new(Some(height.into())))
204            .await
205    }
206
207    /// `/consensus_state`: get current consensus state
208    async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
209        self.perform(consensus_state::Request::new()).await
210    }
211
212    // TODO(thane): Simplify once validators endpoint removes pagination.
213    /// `/validators`: get validators a given height.
214    async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
215    where
216        H: Into<Height> + Send,
217    {
218        let height = height.into();
219        match paging {
220            Paging::Default => {
221                self.perform(validators::Request::new(Some(height), None, None))
222                    .await
223            },
224            Paging::Specific {
225                page_number,
226                per_page,
227            } => {
228                self.perform(validators::Request::new(
229                    Some(height),
230                    Some(page_number),
231                    Some(per_page),
232                ))
233                .await
234            },
235            Paging::All => {
236                let mut page_num = 1_usize;
237                let mut validators = Vec::new();
238                let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
239                loop {
240                    let response = self
241                        .perform(validators::Request::new(
242                            Some(height),
243                            Some(page_num.into()),
244                            Some(per_page),
245                        ))
246                        .await?;
247                    validators.extend(response.validators);
248                    if validators.len() as i32 == response.total {
249                        return Ok(validators::Response::new(
250                            response.block_height,
251                            validators,
252                            response.total,
253                        ));
254                    }
255                    page_num += 1;
256                }
257            },
258        }
259    }
260
261    /// `/consensus_params`: get the latest consensus parameters.
262    async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
263        self.perform(consensus_params::Request::new(None)).await
264    }
265
266    /// `/commit`: get the latest block commit
267    async fn latest_commit(&self) -> Result<commit::Response, Error> {
268        self.perform(commit::Request::default()).await
269    }
270
271    /// `/health`: get node health.
272    ///
273    /// Returns empty result (200 OK) on success, no response in case of an error.
274    async fn health(&self) -> Result<(), Error> {
275        self.perform(health::Request).await?;
276        Ok(())
277    }
278
279    /// `/genesis`: get genesis file.
280    async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
281    where
282        AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
283    {
284        Ok(self.perform(genesis::Request::default()).await?.genesis)
285    }
286
287    async fn genesis_chunked(&self, chunk: u64) -> Result<genesis_chunked::Response, Error> {
288        self.perform(genesis_chunked::Request::new(chunk)).await
289    }
290
291    /// `/genesis_chunked`: get genesis file in multiple chunks.
292    #[cfg(any(feature = "http-client", feature = "websocket-client"))]
293    async fn genesis_chunked_stream(
294        &self,
295    ) -> core::pin::Pin<Box<dyn futures::Stream<Item = Result<Vec<u8>, Error>> + '_>> {
296        Box::pin(futures::stream::unfold(Some(0), move |chunk| async move {
297            // Verify if there are more chunks to fetch
298            let chunk = chunk?;
299
300            match self.genesis_chunked(chunk).await {
301                Ok(response) => {
302                    if response.chunk + 1 >= response.total {
303                        // No more chunks to fetch
304                        Some((Ok(response.data), None))
305                    } else {
306                        // Emit this chunk and fetch the next chunk
307                        Some((Ok(response.data), Some(response.chunk + 1)))
308                    }
309                },
310                Err(e) => Some((Err(e), None)), // Abort the stream
311            }
312        }))
313    }
314
315    /// `/net_info`: obtain information about P2P and other network connections.
316    async fn net_info(&self) -> Result<net_info::Response, Error> {
317        self.perform(net_info::Request).await
318    }
319
320    /// `/status`: get CometBFT status including node info, pubkey, latest
321    /// block hash, app hash, block height and time.
322    async fn status(&self) -> Result<status::Response, Error> {
323        self.perform(status::Request).await
324    }
325
326    /// `/broadcast_evidence`: broadcast an evidence.
327    async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
328        self.perform(evidence::Request::new(e)).await
329    }
330
331    /// `/tx`: find transaction by hash.
332    async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
333        self.perform(tx::Request::new(hash, prove)).await
334    }
335
336    /// `/tx_search`: search for transactions with their results.
337    async fn tx_search(
338        &self,
339        query: Query,
340        prove: bool,
341        page: u32,
342        per_page: u8,
343        order: Order,
344    ) -> Result<tx_search::Response, Error> {
345        self.perform(tx_search::Request::new(query, prove, page, per_page, order))
346            .await
347    }
348
349    #[cfg(any(feature = "http-client", feature = "websocket-client"))]
350    /// Poll the `/health` endpoint until it returns a successful result or
351    /// the given `timeout` has elapsed.
352    async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
353    where
354        T: Into<core::time::Duration> + Send,
355    {
356        let timeout = timeout.into();
357        let poll_interval = core::time::Duration::from_millis(200);
358        let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
359
360        while self.health().await.is_err() {
361            if attempts_remaining == 0 {
362                return Err(Error::timeout(timeout));
363            }
364
365            attempts_remaining -= 1;
366            tokio::time::sleep(poll_interval).await;
367        }
368
369        Ok(())
370    }
371
372    /// Perform a request against the RPC endpoint.
373    ///
374    /// This method is used by the default implementations of specific
375    /// endpoint methods. The latest protocol dialect is assumed to be invoked.
376    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
377    where
378        R: SimpleRequest;
379}