1mod compat;
4pub use compat::CompatMode;
5
6#[cfg(any(feature = "http-client", feature = "websocket-client"))]
7mod subscription;
8#[cfg(any(feature = "http-client", feature = "websocket-client"))]
9pub use subscription::{Subscription, SubscriptionClient};
10
11#[cfg(any(feature = "http-client", feature = "websocket-client"))]
12pub mod sync;
13
14#[cfg(any(feature = "http-client", feature = "websocket-client"))]
15mod transport;
16
17#[cfg(feature = "http-client")]
18pub use transport::http::{self, HttpClient, HttpClientUrl};
19#[cfg(feature = "websocket-client")]
20pub use transport::websocket::{
21 self, WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
22};
23
24#[cfg(any(feature = "http-client", feature = "websocket-client"))]
25pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher};
26
27use core::fmt;
28
29use async_trait::async_trait;
30use cometbft::{abci, block::Height, evidence::Evidence, Genesis, Hash};
31use serde::{de::DeserializeOwned, Serialize};
32
33use crate::{
34 endpoint::{validators::DEFAULT_VALIDATORS_PER_PAGE, *},
35 paging::Paging,
36 prelude::*,
37 query::Query,
38 Error, Order, SimpleRequest,
39};
40
41#[async_trait]
49pub trait Client {
50 async fn abci_info(&self) -> Result<abci::response::Info, Error> {
52 Ok(self.perform(abci_info::Request).await?.response)
53 }
54
55 async fn abci_query<V>(
57 &self,
58 path: Option<String>,
59 data: V,
60 height: Option<Height>,
61 prove: bool,
62 ) -> Result<abci_query::AbciQuery, Error>
63 where
64 V: Into<Vec<u8>> + Send,
65 {
66 Ok(self
67 .perform(abci_query::Request::new(path, data, height, prove))
68 .await?
69 .response)
70 }
71
72 async fn block<H>(&self, height: H) -> Result<block::Response, Error>
74 where
75 H: Into<Height> + Send,
76 {
77 self.perform(block::Request::new(height.into())).await
78 }
79
80 async fn block_by_hash(&self, hash: cometbft::Hash) -> Result<block_by_hash::Response, Error> {
82 self.perform(block_by_hash::Request::new(hash)).await
83 }
84
85 async fn latest_block(&self) -> Result<block::Response, Error> {
87 self.perform(block::Request::default()).await
88 }
89
90 async fn header<H>(&self, height: H) -> Result<header::Response, Error>
92 where
93 H: Into<Height> + Send,
94 {
95 self.perform(header::Request::new(height.into())).await
96 }
97
98 async fn header_by_hash(
100 &self,
101 hash: cometbft::Hash,
102 ) -> Result<header_by_hash::Response, Error> {
103 self.perform(header_by_hash::Request::new(hash)).await
104 }
105
106 async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
108 where
109 H: Into<Height> + Send,
110 {
111 self.perform(block_results::Request::new(height.into()))
112 .await
113 }
114
115 async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
117 self.perform(block_results::Request::default()).await
118 }
119
120 async fn block_search(
122 &self,
123 query: Query,
124 page: u32,
125 per_page: u8,
126 order: Order,
127 ) -> Result<block_search::Response, Error> {
128 self.perform(block_search::Request::new(query, page, per_page, order))
129 .await
130 }
131
132 async fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
138 where
139 H: Into<Height> + Send,
140 {
141 self.perform(blockchain::Request::new(min.into(), max.into()))
143 .await
144 }
145
146 async fn broadcast_tx_async<T>(&self, tx: T) -> Result<broadcast::tx_async::Response, Error>
148 where
149 T: Into<Vec<u8>> + Send,
150 {
151 self.perform(broadcast::tx_async::Request::new(tx)).await
152 }
153
154 async fn broadcast_tx_sync<T>(&self, tx: T) -> Result<broadcast::tx_sync::Response, Error>
157 where
158 T: Into<Vec<u8>> + Send,
159 {
160 self.perform(broadcast::tx_sync::Request::new(tx)).await
161 }
162
163 async fn broadcast_tx_commit<T>(&self, tx: T) -> Result<broadcast::tx_commit::Response, Error>
166 where
167 T: Into<Vec<u8>> + Send,
168 {
169 self.perform(broadcast::tx_commit::Request::new(tx)).await
170 }
171
172 async fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
174 where
175 H: Into<Height> + Send,
176 {
177 self.perform(commit::Request::new(height.into())).await
178 }
179
180 async fn consensus_params<H>(&self, height: H) -> Result<consensus_params::Response, Error>
183 where
184 H: Into<Height> + Send,
185 {
186 self.perform(consensus_params::Request::new(Some(height.into())))
187 .await
188 }
189
190 async fn consensus_state(&self) -> Result<consensus_state::Response, Error> {
192 self.perform(consensus_state::Request::new()).await
193 }
194
195 async fn validators<H>(&self, height: H, paging: Paging) -> Result<validators::Response, Error>
198 where
199 H: Into<Height> + Send,
200 {
201 let height = height.into();
202 match paging {
203 Paging::Default => {
204 self.perform(validators::Request::new(Some(height), None, None))
205 .await
206 },
207 Paging::Specific {
208 page_number,
209 per_page,
210 } => {
211 self.perform(validators::Request::new(
212 Some(height),
213 Some(page_number),
214 Some(per_page),
215 ))
216 .await
217 },
218 Paging::All => {
219 let mut page_num = 1_usize;
220 let mut validators = Vec::new();
221 let per_page = DEFAULT_VALIDATORS_PER_PAGE.into();
222 loop {
223 let response = self
224 .perform(validators::Request::new(
225 Some(height),
226 Some(page_num.into()),
227 Some(per_page),
228 ))
229 .await?;
230 validators.extend(response.validators);
231 if validators.len() as i32 == response.total {
232 return Ok(validators::Response::new(
233 response.block_height,
234 validators,
235 response.total,
236 ));
237 }
238 page_num += 1;
239 }
240 },
241 }
242 }
243
244 async fn latest_consensus_params(&self) -> Result<consensus_params::Response, Error> {
246 self.perform(consensus_params::Request::new(None)).await
247 }
248
249 async fn latest_commit(&self) -> Result<commit::Response, Error> {
251 self.perform(commit::Request::default()).await
252 }
253
254 async fn health(&self) -> Result<(), Error> {
258 self.perform(health::Request).await?;
259 Ok(())
260 }
261
262 async fn genesis<AppState>(&self) -> Result<Genesis<AppState>, Error>
264 where
265 AppState: fmt::Debug + Serialize + DeserializeOwned + Send,
266 {
267 Ok(self.perform(genesis::Request::default()).await?.genesis)
268 }
269
270 async fn net_info(&self) -> Result<net_info::Response, Error> {
272 self.perform(net_info::Request).await
273 }
274
275 async fn status(&self) -> Result<status::Response, Error> {
278 self.perform(status::Request).await
279 }
280
281 async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
283 self.perform(evidence::Request::new(e)).await
284 }
285
286 async fn tx(&self, hash: Hash, prove: bool) -> Result<tx::Response, Error> {
288 self.perform(tx::Request::new(hash, prove)).await
289 }
290
291 async fn tx_search(
293 &self,
294 query: Query,
295 prove: bool,
296 page: u32,
297 per_page: u8,
298 order: Order,
299 ) -> Result<tx_search::Response, Error> {
300 self.perform(tx_search::Request::new(query, prove, page, per_page, order))
301 .await
302 }
303
304 #[cfg(any(feature = "http-client", feature = "websocket-client"))]
305 async fn wait_until_healthy<T>(&self, timeout: T) -> Result<(), Error>
308 where
309 T: Into<core::time::Duration> + Send,
310 {
311 let timeout = timeout.into();
312 let poll_interval = core::time::Duration::from_millis(200);
313 let mut attempts_remaining = timeout.as_millis() / poll_interval.as_millis();
314
315 while self.health().await.is_err() {
316 if attempts_remaining == 0 {
317 return Err(Error::timeout(timeout));
318 }
319
320 attempts_remaining -= 1;
321 tokio::time::sleep(poll_interval).await;
322 }
323
324 Ok(())
325 }
326
327 async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
332 where
333 R: SimpleRequest;
334}