1pub mod abci;
2pub mod basic;
3pub mod contract;
4pub mod fetch;
5pub mod ibc;
6pub mod middleware;
7pub mod stream;
8pub mod tx;
9pub mod validator;
10
11use std::{
12 future::Future,
13 sync::{
14 atomic::{AtomicU8, Ordering},
15 Arc,
16 },
17 time::Duration,
18};
19
20use basic::BlockHeightReq;
21use middleware::{QueryMiddlewareMapReq, QueryMiddlewareMapResp, QueryMiddlewareRun};
22use tracing::instrument;
23
24use crate::{
25 cache::ClimbCache,
26 network::rpc::{RpcClient, RpcTransport},
27 prelude::*,
28};
29
30cfg_if::cfg_if! {
31 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
32 #[derive(Clone)]
33 pub struct QueryClient {
34 pub chain_config: ChainConfig,
35 pub cache: ClimbCache,
36 pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
37 pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
38 pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
39 pub balances_pagination_limit: u64,
40 pub wait_blocks_poll_sleep_duration: Duration,
41 pub connection: Connection,
42 _grpc_channel: Option<tonic_web_wasm_client::Client>,
43 _rpc_client: Option<RpcClient>,
44 _connection_mode: Arc<AtomicU8>,
45 }
46
47 impl QueryClient {
48 pub fn clone_grpc_channel(&self) -> Result<tonic_web_wasm_client::Client> {
49 match self._grpc_channel.clone() {
50 Some(channel) => Ok(channel),
51 None => Err(anyhow!("grpc_channel isn't set")),
52 }
53 }
54 }
55 } else if #[cfg(target_arch = "wasm32")] {
56 #[derive(Clone)]
57 pub struct QueryClient {
58 pub chain_config: ChainConfig,
59 pub cache: ClimbCache,
60 pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
61 pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
62 pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
63 pub balances_pagination_limit: u64,
64 pub wait_blocks_poll_sleep_duration: Duration,
65 pub connection: Connection,
66 _rpc_client: Option<RpcClient>,
67 _connection_mode: Arc<AtomicU8>,
68 }
69
70 impl QueryClient {
71 pub fn clone_grpc_channel(&self) -> Result<crate::network::grpc_wasi::Client> {
72 Err(anyhow!("todo!"))
73 }
74 }
75 } else {
76 #[derive(Clone)]
77 pub struct QueryClient {
78 pub chain_config: ChainConfig,
79 pub cache: ClimbCache,
80 pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
81 pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
82 pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
83 pub balances_pagination_limit: u64,
84 pub wait_blocks_poll_sleep_duration: Duration,
85 pub connection: Connection,
86 _grpc_channel: Option<tonic::transport::Channel>,
87 _rpc_client: Option<RpcClient>,
88 _connection_mode: Arc<AtomicU8>,
89 }
90
91 impl QueryClient {
92 pub fn clone_grpc_channel(&self) -> Result<tonic::transport::Channel> {
93 match self._grpc_channel.clone() {
94 Some(channel) => Ok(channel),
95 None => Err(anyhow!("grpc_channel isn't set")),
96 }
97 }
98 }
99 }
100}
101
102impl std::fmt::Debug for QueryClient {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("QueryClient")
105 .field("chain_id", &self.chain_config.chain_id)
106 .finish()
107 }
108}
109
110pub trait QueryRequest: Clone + std::fmt::Debug + Send {
111 type QueryResponse: std::fmt::Debug + Send;
112
113 fn request(&self, client: QueryClient) -> impl Future<Output = Result<Self::QueryResponse>>;
114}
115
116const DEFAULT_BALANCES_PAGINATION_LIMIT: u64 = 10;
117const DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION: std::time::Duration =
118 std::time::Duration::from_secs(1);
119
120impl QueryClient {
121 pub async fn new(chain_config: ChainConfig, connection: Option<Connection>) -> Result<Self> {
122 let connection = connection.unwrap_or_default();
123 let cache = ClimbCache::new(connection.rpc.clone());
124 Self::new_with_cache(chain_config, cache, Some(connection)).await
125 }
126
127 #[instrument]
129 pub async fn set_connection_mode(&self, mode: Option<ConnectionMode>) -> Result<()> {
130 match mode {
131 Some(mode) => {
132 self._connection_mode.store(mode.into(), Ordering::SeqCst);
133 }
134 None => {
135 for mode in ConnectionMode::modes_to_try() {
136 self._connection_mode.store(mode.into(), Ordering::SeqCst);
137
138 let block_height = BlockHeightReq {}.request(self.clone()).await;
139
140 if let Ok(block_height) = block_height {
141 if block_height > 0 {
142 break;
143 }
144 }
145 }
146 }
147 };
148
149 Ok(())
150 }
151
152 pub fn get_connection_mode(&self) -> ConnectionMode {
153 self._connection_mode.load(Ordering::SeqCst).into()
154 }
155
156 pub fn rpc_client(&self) -> Result<&RpcClient> {
157 match self._rpc_client.as_ref() {
158 Some(client) => Ok(client),
159 None => Err(anyhow!("rpc_client isn't set")),
160 }
161 }
162
163 cfg_if::cfg_if! {
164 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
165 pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
166 let _grpc_channel = cache.get_web_grpc(&chain_config).await?;
167 let _rpc_client = cache.get_rpc_client(&chain_config);
168
169 let connection = connection.unwrap_or_default();
170
171 let _self = Self {
172 _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Grpc) as u8)),
174 chain_config,
175 cache,
176 middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
177 middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
178 middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
179 balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
180 wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
181 _grpc_channel,
182 _rpc_client,
183 connection,
184 };
185
186 if _self.connection.preferred_mode.is_none() {
187 _self.set_connection_mode(None).await?;
188 }
189
190 Ok(_self)
191 }
192 } else if #[cfg(target_arch = "wasm32")] {
193 pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
194 let _rpc_client = cache.get_rpc_client(&chain_config);
195
196 let connection = connection.unwrap_or_default();
197
198 let _self = Self {
199 _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Rpc) as u8)),
201 chain_config,
202 cache,
203 middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
204 middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
205 middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
206 balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
207 wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
208 _rpc_client,
209 connection,
210 };
211
212 if _self.connection.preferred_mode.is_none() {
213 _self.set_connection_mode(None).await?;
214 }
215
216 Ok(_self)
217 }
218 } else {
219 pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
220 let _grpc_channel = cache.get_grpc(&chain_config).await?;
221 let _rpc_client = cache.get_rpc_client(&chain_config);
222
223 let connection = connection.unwrap_or_default();
224
225 let _self = Self {
226 _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Rpc) as u8)),
228 chain_config,
229 cache,
230 middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
231 middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
232 middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
233 balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
234 wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
235 _grpc_channel,
236 _rpc_client,
237 connection,
238 };
239
240 if _self.connection.preferred_mode.is_none() {
241 _self.set_connection_mode(None).await?;
242 }
243
244
245 Ok(_self)
246 }
247 }
248 }
249
250 pub async fn run_with_middleware<REQ: QueryRequest>(
251 &self,
252 mut req: REQ,
253 ) -> Result<REQ::QueryResponse> {
254 for middleware in self.middleware_map_req.iter() {
255 req = match middleware.map_req(req.clone()).await {
256 Ok(req) => req,
257 Err(e) => return Err(e),
258 }
259 }
260
261 let mut response = None;
262
263 for middleware in self.middleware_run.iter() {
264 response = match middleware.run(req.clone(), self.clone()).await {
265 Ok(resp) => Some(resp),
266 Err(e) => return Err(e),
267 }
268 }
269
270 if response.is_none() {
271 response = Some(req.request(self.clone()).await?);
272 }
273
274 let mut response = response.unwrap();
275
276 for middleware in self.middleware_map_resp.iter() {
277 response = match middleware.map_resp(response).await {
278 Ok(resp) => resp,
279 Err(e) => return Err(e),
280 }
281 }
282
283 Ok(response)
284 }
285
286 pub async fn wait_until_block_height(
288 &self,
289 target_block_height: u64,
290 sleep_duration: Option<Duration>,
291 ) -> Result<()> {
292 let sleep_duration = sleep_duration.unwrap_or(self.wait_blocks_poll_sleep_duration);
293 loop {
294 let current_block_height = self.block_height().await?;
295
296 if current_block_height >= target_block_height {
297 break Ok(());
298 }
299
300 futures_timer::Delay::new(sleep_duration).await;
301 }
302 }
303
304 pub async fn wait_blocks(&self, n_blocks: u64, sleep_duration: Option<Duration>) -> Result<()> {
305 let target_block_height = self.block_height().await? + n_blocks;
306 self.wait_until_block_height(target_block_height, sleep_duration)
307 .await
308 }
309}
310
311#[derive(Clone)]
312pub struct Connection {
313 pub rpc: Arc<dyn RpcTransport>,
315 pub preferred_mode: Option<ConnectionMode>,
316}
317
318cfg_if::cfg_if! {
319 if #[cfg(all(target_arch = "wasm32", not(target_os = "unknown")))] {
321 impl Default for Connection {
322 fn default() -> Self {
323 Self {
324 rpc: Arc::new(crate::network::rpc::WasiRpcTransport{}),
325 preferred_mode: Some(ConnectionMode::Rpc),
326 }
327 }
328 }
329 } else {
330 impl Default for Connection {
331 fn default() -> Self {
332 Self {
333 rpc: Arc::new(reqwest::Client::new()),
334 preferred_mode: None,
335 }
336 }
337 }
338 }
339}
340
341#[derive(Clone, Copy, Debug)]
344pub enum ConnectionMode {
345 Grpc,
346 Rpc,
347}
348
349cfg_if::cfg_if! {
350 if #[cfg(all(target_arch = "wasm32", not(target_os = "unknown")))] {
351 impl ConnectionMode {
352 pub fn modes_to_try() -> Vec<Self> {
353 vec![Self::Rpc]
355 }
356 }
357 } else {
358 impl ConnectionMode {
359 pub fn modes_to_try() -> Vec<Self> {
360 vec![Self::Grpc, Self::Rpc]
361 }
362 }
363 }
364}
365
366impl From<ConnectionMode> for u8 {
367 fn from(mode: ConnectionMode) -> u8 {
368 mode as u8
369 }
370}
371
372impl From<u8> for ConnectionMode {
373 fn from(mode: u8) -> ConnectionMode {
374 match mode {
375 0 => ConnectionMode::Grpc,
376 1 => ConnectionMode::Rpc,
377 _ => panic!("invalid ConnectionMode"),
378 }
379 }
380}
381
382impl std::fmt::Display for ConnectionMode {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 match self {
385 ConnectionMode::Grpc => write!(f, "grpc"),
386 ConnectionMode::Rpc => write!(f, "rpc"),
387 }
388 }
389}