layer_climb_core/
querier.rs

1pub mod abci;
2pub mod basic;
3pub mod contract;
4pub mod fetch;
5pub mod ibc;
6pub mod middleware;
7pub mod stream;
8pub mod tx;
9pub mod validator;
10
11use std::{
12    future::Future,
13    sync::{
14        atomic::{AtomicU8, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19
20use basic::BlockHeightReq;
21use middleware::{QueryMiddlewareMapReq, QueryMiddlewareMapResp, QueryMiddlewareRun};
22use tracing::instrument;
23
24use crate::{
25    cache::ClimbCache,
26    network::rpc::{RpcClient, RpcTransport},
27    prelude::*,
28};
29
30cfg_if::cfg_if! {
31    if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
32        #[derive(Clone)]
33        pub struct QueryClient {
34            pub chain_config: ChainConfig,
35            pub cache: ClimbCache,
36            pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
37            pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
38            pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
39            pub balances_pagination_limit: u64,
40            pub wait_blocks_poll_sleep_duration: Duration,
41            pub connection: Connection,
42            _grpc_channel: Option<tonic_web_wasm_client::Client>,
43            _rpc_client: Option<RpcClient>,
44            _connection_mode: Arc<AtomicU8>,
45        }
46
47        impl QueryClient {
48            pub fn clone_grpc_channel(&self) -> Result<tonic_web_wasm_client::Client> {
49                match self._grpc_channel.clone() {
50                    Some(channel) => Ok(channel),
51                    None => Err(anyhow!("grpc_channel isn't set")),
52                }
53            }
54        }
55    } else if #[cfg(target_arch = "wasm32")] {
56        #[derive(Clone)]
57        pub struct QueryClient {
58            pub chain_config: ChainConfig,
59            pub cache: ClimbCache,
60            pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
61            pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
62            pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
63            pub balances_pagination_limit: u64,
64            pub wait_blocks_poll_sleep_duration: Duration,
65            pub connection: Connection,
66            _rpc_client: Option<RpcClient>,
67            _connection_mode: Arc<AtomicU8>,
68        }
69
70        impl QueryClient {
71            pub fn clone_grpc_channel(&self) -> Result<crate::network::grpc_wasi::Client> {
72                Err(anyhow!("todo!"))
73            }
74        }
75    } else {
76        #[derive(Clone)]
77        pub struct QueryClient {
78            pub chain_config: ChainConfig,
79            pub cache: ClimbCache,
80            pub middleware_map_req: Arc<Vec<QueryMiddlewareMapReq>>,
81            pub middleware_map_resp: Arc<Vec<QueryMiddlewareMapResp>>,
82            pub middleware_run: Arc<Vec<QueryMiddlewareRun>>,
83            pub balances_pagination_limit: u64,
84            pub wait_blocks_poll_sleep_duration: Duration,
85            pub connection: Connection,
86            _grpc_channel: Option<tonic::transport::Channel>,
87            _rpc_client: Option<RpcClient>,
88            _connection_mode: Arc<AtomicU8>,
89        }
90
91        impl QueryClient {
92            pub fn clone_grpc_channel(&self) -> Result<tonic::transport::Channel> {
93                match self._grpc_channel.clone() {
94                    Some(channel) => Ok(channel),
95                    None => Err(anyhow!("grpc_channel isn't set")),
96                }
97            }
98        }
99    }
100}
101
102impl std::fmt::Debug for QueryClient {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("QueryClient")
105            .field("chain_id", &self.chain_config.chain_id)
106            .finish()
107    }
108}
109
110pub trait QueryRequest: Clone + std::fmt::Debug + Send {
111    type QueryResponse: std::fmt::Debug + Send;
112
113    fn request(&self, client: QueryClient) -> impl Future<Output = Result<Self::QueryResponse>>;
114}
115
116const DEFAULT_BALANCES_PAGINATION_LIMIT: u64 = 10;
117const DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION: std::time::Duration =
118    std::time::Duration::from_secs(1);
119
120impl QueryClient {
121    pub async fn new(chain_config: ChainConfig, connection: Option<Connection>) -> Result<Self> {
122        let connection = connection.unwrap_or_default();
123        let cache = ClimbCache::new(connection.rpc.clone());
124        Self::new_with_cache(chain_config, cache, Some(connection)).await
125    }
126
127    // if None, will make a best-guess attempt via block query
128    #[instrument]
129    pub async fn set_connection_mode(&self, mode: Option<ConnectionMode>) -> Result<()> {
130        match mode {
131            Some(mode) => {
132                self._connection_mode.store(mode.into(), Ordering::SeqCst);
133            }
134            None => {
135                for mode in ConnectionMode::modes_to_try() {
136                    self._connection_mode.store(mode.into(), Ordering::SeqCst);
137
138                    let block_height = BlockHeightReq {}.request(self.clone()).await;
139
140                    if let Ok(block_height) = block_height {
141                        if block_height > 0 {
142                            break;
143                        }
144                    }
145                }
146            }
147        };
148
149        Ok(())
150    }
151
152    pub fn get_connection_mode(&self) -> ConnectionMode {
153        self._connection_mode.load(Ordering::SeqCst).into()
154    }
155
156    pub fn rpc_client(&self) -> Result<&RpcClient> {
157        match self._rpc_client.as_ref() {
158            Some(client) => Ok(client),
159            None => Err(anyhow!("rpc_client isn't set")),
160        }
161    }
162
163    cfg_if::cfg_if! {
164        if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
165            pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
166                let _grpc_channel = cache.get_web_grpc(&chain_config).await?;
167                let _rpc_client = cache.get_rpc_client(&chain_config);
168
169                let connection = connection.unwrap_or_default();
170
171                let _self = Self {
172                    // if None, this will be overriden, just set _something_
173                    _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Grpc) as u8)),
174                    chain_config,
175                    cache,
176                    middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
177                    middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
178                    middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
179                    balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
180                    wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
181                    _grpc_channel,
182                    _rpc_client,
183                    connection,
184                };
185
186                if _self.connection.preferred_mode.is_none() {
187                    _self.set_connection_mode(None).await?;
188                }
189
190                Ok(_self)
191            }
192        } else if #[cfg(target_arch = "wasm32")] {
193            pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
194                let _rpc_client = cache.get_rpc_client(&chain_config);
195
196                let connection = connection.unwrap_or_default();
197
198                let _self = Self {
199                    // if None, this will be overriden, just set _something_
200                    _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Rpc) as u8)),
201                    chain_config,
202                    cache,
203                    middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
204                    middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
205                    middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
206                    balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
207                    wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
208                    _rpc_client,
209                    connection,
210                };
211
212                if _self.connection.preferred_mode.is_none() {
213                    _self.set_connection_mode(None).await?;
214                }
215
216                Ok(_self)
217            }
218        } else {
219            pub async fn new_with_cache(chain_config: ChainConfig, cache: ClimbCache, connection: Option<Connection>) -> Result<Self> {
220                let _grpc_channel = cache.get_grpc(&chain_config).await?;
221                let _rpc_client = cache.get_rpc_client(&chain_config);
222
223                let connection = connection.unwrap_or_default();
224
225                let _self = Self {
226                    // if None, this will be overriden, just set _something_
227                    _connection_mode: Arc::new(AtomicU8::new(connection.preferred_mode.unwrap_or(ConnectionMode::Rpc) as u8)),
228                    chain_config,
229                    cache,
230                    middleware_map_req: Arc::new(QueryMiddlewareMapReq::default_list()),
231                    middleware_map_resp: Arc::new(QueryMiddlewareMapResp::default_list()),
232                    middleware_run: Arc::new(QueryMiddlewareRun::default_list()),
233                    balances_pagination_limit: DEFAULT_BALANCES_PAGINATION_LIMIT,
234                    wait_blocks_poll_sleep_duration: DEFAULT_WAIT_BLOCKS_POLL_SLEEP_DURATION,
235                    _grpc_channel,
236                    _rpc_client,
237                    connection,
238                };
239
240                if _self.connection.preferred_mode.is_none() {
241                    _self.set_connection_mode(None).await?;
242                }
243
244
245                Ok(_self)
246            }
247        }
248    }
249
250    pub async fn run_with_middleware<REQ: QueryRequest>(
251        &self,
252        mut req: REQ,
253    ) -> Result<REQ::QueryResponse> {
254        for middleware in self.middleware_map_req.iter() {
255            req = match middleware.map_req(req.clone()).await {
256                Ok(req) => req,
257                Err(e) => return Err(e),
258            }
259        }
260
261        let mut response = None;
262
263        for middleware in self.middleware_run.iter() {
264            response = match middleware.run(req.clone(), self.clone()).await {
265                Ok(resp) => Some(resp),
266                Err(e) => return Err(e),
267            }
268        }
269
270        if response.is_none() {
271            response = Some(req.request(self.clone()).await?);
272        }
273
274        let mut response = response.unwrap();
275
276        for middleware in self.middleware_map_resp.iter() {
277            response = match middleware.map_resp(response).await {
278                Ok(resp) => resp,
279                Err(e) => return Err(e),
280            }
281        }
282
283        Ok(response)
284    }
285
286    // these do not call middleware, but their inner calls do
287    pub async fn wait_until_block_height(
288        &self,
289        target_block_height: u64,
290        sleep_duration: Option<Duration>,
291    ) -> Result<()> {
292        let sleep_duration = sleep_duration.unwrap_or(self.wait_blocks_poll_sleep_duration);
293        loop {
294            let current_block_height = self.block_height().await?;
295
296            if current_block_height >= target_block_height {
297                break Ok(());
298            }
299
300            futures_timer::Delay::new(sleep_duration).await;
301        }
302    }
303
304    pub async fn wait_blocks(&self, n_blocks: u64, sleep_duration: Option<Duration>) -> Result<()> {
305        let target_block_height = self.block_height().await? + n_blocks;
306        self.wait_until_block_height(target_block_height, sleep_duration)
307            .await
308    }
309}
310
311#[derive(Clone)]
312pub struct Connection {
313    // Todo - expand for gRPC, get rid of feature-gating
314    pub rpc: Arc<dyn RpcTransport>,
315    pub preferred_mode: Option<ConnectionMode>,
316}
317
318cfg_if::cfg_if! {
319    // WASI
320    if #[cfg(all(target_arch = "wasm32", not(target_os = "unknown")))] {
321        impl Default for Connection {
322            fn default() -> Self {
323                Self {
324                    rpc: Arc::new(crate::network::rpc::WasiRpcTransport{}),
325                    preferred_mode: Some(ConnectionMode::Rpc),
326                }
327            }
328        }
329    } else {
330        impl Default for Connection {
331            fn default() -> Self {
332                Self {
333                    rpc: Arc::new(reqwest::Client::new()),
334                    preferred_mode: None,
335                }
336            }
337        }
338    }
339}
340
341// currently only used via automatic fallback in very specific cases
342// TODO: make this more general
343#[derive(Clone, Copy, Debug)]
344pub enum ConnectionMode {
345    Grpc,
346    Rpc,
347}
348
349cfg_if::cfg_if! {
350    if #[cfg(all(target_arch = "wasm32", not(target_os = "unknown")))] {
351        impl ConnectionMode {
352            pub fn modes_to_try() -> Vec<Self> {
353                // WASI only supports RPC for now, don't even try anything else
354                vec![Self::Rpc]
355            }
356        }
357    } else {
358        impl ConnectionMode {
359            pub fn modes_to_try() -> Vec<Self> {
360                vec![Self::Grpc, Self::Rpc]
361            }
362        }
363    }
364}
365
366impl From<ConnectionMode> for u8 {
367    fn from(mode: ConnectionMode) -> u8 {
368        mode as u8
369    }
370}
371
372impl From<u8> for ConnectionMode {
373    fn from(mode: u8) -> ConnectionMode {
374        match mode {
375            0 => ConnectionMode::Grpc,
376            1 => ConnectionMode::Rpc,
377            _ => panic!("invalid ConnectionMode"),
378        }
379    }
380}
381
382impl std::fmt::Display for ConnectionMode {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        match self {
385            ConnectionMode::Grpc => write!(f, "grpc"),
386            ConnectionMode::Rpc => write!(f, "rpc"),
387        }
388    }
389}