1mod compat;
4pub use compat::CompatMode;
5
6#[cfg(any(
7 feature = "http-client",
8 feature = "websocket-client",
9 feature = "mock-client"
10))]
11mod subscription;
12#[cfg(any(
13 feature = "http-client",
14 feature = "websocket-client",
15 feature = "mock-client"
16))]
17pub use subscription::{Subscription, SubscriptionClient};
18
19#[cfg(any(
20 feature = "http-client",
21 feature = "websocket-client",
22 feature = "mock-client"
23))]
24pub mod sync;
25
26#[cfg(any(
27 feature = "http-client",
28 feature = "websocket-client",
29 feature = "mock-client"
30))]
31mod transport;
32
33#[cfg(feature = "http-client")]
34pub use transport::http::{self, HttpClient, HttpClientUrl};
35#[cfg(feature = "websocket-client")]
36pub use transport::websocket::{
37 self, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
38};
39
40#[cfg(feature = "mock-client")]
41pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
42
43use core::fmt;
44
45use async_trait::async_trait;
46use cometbft::{abci, block::Height, evidence::Evidence, Genesis, Hash};
47use serde::{de::DeserializeOwned, Serialize};
48
49use crate::{
50 endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
51 paging::Paging,
52 prelude::*,
53 query::Query,
54 Error, Order, SimpleRequest,
55};
56
57#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
65#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
66pub trait Client {
67 async fn abci_info(&self) -> Result<abci::response::Info, Error> {
69 Ok(self.perform(abci_info::Request).await?.response)
70 }
71
72 async fn abci_query<V>(
74 &self,
75 path: Option<String>,
76 data: V,
77 height: Option<Height>,
78 prove: bool,
79 ) -> Result<abci_query::AbciQuery, Error>
80 where
81 V: Into<Vec<u8>> + Send,
82 {
83 Ok(self
84 .perform(abci_query::Request::new(path, data, height, prove))
85 .await?
86 .response)
87 }
88
89 async fn block<H>(&self, height: H) -> Result<block::Response, Error>
91 where
92 H: Into<Height> + Send,
93 {
94 self.perform(block::Request::new(height.into())).await
95 }
96
97 async fn block_by_hash(&self, hash: cometbft::Hash) -> Result<block_by_hash::Response, Error> {
99 self.perform(block_by_hash::Request::new(hash)).await
100 }
101
102 async fn latest_block(&self) -> Result<block::Response, Error> {
104 self.perform(block::Request::default()).await
105 }
106
107 async fn header<H>(&self, height: H) -> Result<header::Response, Error>
109 where
110 H: Into<Height> + Send,
111 {
112 self.perform(header::Request::new(height.into())).await
113 }
114
115 async fn header_by_hash(
117 &self,
118 hash: cometbft::Hash,
119 ) -> Result<header_by_hash::Response, Error> {
120 self.perform(header_by_hash::Request::new(hash)).await
121 }
122
123 async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
125 where
126 H: Into<Height> + Send,
127 {
128 self.perform(block_results::Request::new(height.into()))
129 .await
130 }
131
132 async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
134 self.perform(block_results::Request::default()).await
135 }
136
137 async fn block_search(
139 &self,
140 query: Query,
141 page: u32,
142 per_page: u8,
143 order: Order,
144 ) -> Result<block_search::Response, Error> {
145 self.perform(block_search::Request::new(query, page, per_page, order))
146 .await
147 }
148
149 async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
155 where
156 H: Into<Height> + Send,
157 {
158 self.perform(blockchain::Request::new(min.into(), max.into()))
160 .await
161 }
162
163 async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
165 where
166 T: Into<Vec<u8>> + Send,
167 {
168 self.perform(broadcast::tx_async::Request::new(tx)).await
169 }
170
171 async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
174 where
175 T: Into<Vec<u8>> + Send,
176 {
177 self.perform(broadcast::tx_sync::Request::new(tx)).await
178 }
179
180 async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
183 where
184 T: Into<Vec<u8>> + Send,
185 {
186 self.perform(broadcast::tx_commit::Request::new(tx)).await
187 }
188
189 async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
191 where
192 H: Into<Height> + Send,
193 {
194 self.perform(commit::Request::new(height.into())).await
195 }
196
197 async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
200 where
201 H: Into<Height> + Send,
202 {
203 self.perform(consensus_params::Request::new(Some(height.into())))
204 .await
205 }
206
207 async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
209 self.perform(consensus_state::Request::new()).await
210 }
211
212 async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
215 where
216 H: Into<Height> + Send,
217 {
218 let height = height.into();
219 match paging {
220 Paging::Default => {
221 self.perform(validators::Request::new(Some(height), None, None))
222 .await
223 },
224 Paging::Specific {
225 page_number,
226 per_page,
227 } => {
228 self.perform(validators::Request::new(
229 Some(height),
230 Some(page_number),
231 Some(per_page),
232 ))
233 .await
234 },
235 Paging::All => {
236 let mut page_num = 1_usize;
237 let mut validators = Vec::new();
238 let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
239 loop {
240 let response = self
241 .perform(validators::Request::new(
242 Some(height),
243 Some(page_num.into()),
244 Some(per_page),
245 ))
246 .await?;
247 validators.extend(response.validators);
248 if validators.len() as i32 == response.total {
249 return Ok(validators::Response::new(
250 response.block_height,
251 validators,
252 response.total,
253 ));
254 }
255 page_num += 1;
256 }
257 },
258 }
259 }
260
261 async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
263 self.perform(consensus_params::Request::new(None)).await
264 }
265
266 async fn latest_commit(&self) -> Result<commit::Response, Error> {
268 self.perform(commit::Request::default()).await
269 }
270
271 async fn health(&self) -> Result<(), Error> {
275 self.perform(health::Request).await?;
276 Ok(())
277 }
278
279 async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
281 where
282 AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
283 {
284 Ok(self.perform(genesis::Request::default()).await?.genesis)
285 }
286
287 async fn genesis_chunked(&self, chunk: u64) -> Result<genesis_chunked::Response, Error> {
288 self.perform(genesis_chunked::Request::new(chunk)).await
289 }
290
291 #[cfg(any(feature = "http-client", feature = "websocket-client"))]
293 async fn genesis_chunked_stream(
294 &self,
295 ) -> core::pin::Pin<Box<dyn futures::Stream<Item = Result<Vec<u8>, Error>> + '_>> {
296 Box::pin(futures::stream::unfold(Some(0), move |chunk| async move {
297 let chunk = chunk?;
299
300 match self.genesis_chunked(chunk).await {
301 Ok(response) => {
302 if response.chunk + 1 >= response.total {
303 Some((Ok(response.data), None))
305 } else {
306 Some((Ok(response.data), Some(response.chunk + 1)))
308 }
309 },
310 Err(e) => Some((Err(e), None)), }
312 }))
313 }
314
315 async fn net_info(&self) -> Result<net_info::Response, Error> {
317 self.perform(net_info::Request).await
318 }
319
320 async fn status(&self) -> Result<status::Response, Error> {
323 self.perform(status::Request).await
324 }
325
326 async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
328 self.perform(evidence::Request::new(e)).await
329 }
330
331 async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
333 self.perform(tx::Request::new(hash, prove)).await
334 }
335
336 async fn tx_search(
338 &self,
339 query: Query,
340 prove: bool,
341 page: u32,
342 per_page: u8,
343 order: Order,
344 ) -> Result<tx_search::Response, Error> {
345 self.perform(tx_search::Request::new(query, prove, page, per_page, order))
346 .await
347 }
348
349 #[cfg(any(feature = "http-client", feature = "websocket-client"))]
350 async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
353 where
354 T: Into<core::time::Duration> + Send,
355 {
356 let timeout = timeout.into();
357 let poll_interval = core::time::Duration::from_millis(200);
358 let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
359
360 while self.health().await.is_err() {
361 if attempts_remaining == 0 {
362 return Err(Error::timeout(timeout));
363 }
364
365 attempts_remaining -= 1;
366 tokio::time::sleep(poll_interval).await;
367 }
368
369 Ok(())
370 }
371
372 async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
377 where
378 R: SimpleRequest;
379}