near_api/common/query/
mod.rs

1// TODO: root level doc might be needed here. It's pretty complicated.
2use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9    config::{retry, NetworkConfig, RetryResponse},
10    errors::{QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod validator_rpc;
18
19pub use handlers::*;
20
21const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
22
23type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
24
25#[async_trait]
26pub trait RpcType: Send + Sync + std::fmt::Debug {
27    type RpcReference: Send + Sync + Clone;
28    type Response;
29    type Error: std::fmt::Debug + Send + Sync;
30    async fn send_query(
31        &self,
32        client: &Client,
33        network: &NetworkConfig,
34        reference: &Self::RpcReference,
35    ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
36}
37
38pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
39pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
40
41/// A builder for querying multiple items at once.
42///
43/// Sometimes to construct some complex type, you would need to query multiple items at once, and combine them into one.
44/// This is where this builder comes in handy. Almost every time, you would want to use [Self::map] method to combine the responses into your desired type.
45///
46/// Currently, `MultiQueryHandler` supports tuples of sizes 2 and 3.
47/// For single responses, use `RequestBuilder` instead.
48///
49/// Here is a list of examples on how to use this:
50/// - [Tokens::ft_balance](crate::tokens::Tokens::ft_balance)
51/// - [StakingPool::staking_pool_info](crate::stake::Staking::staking_pool_info)
52pub struct MultiRpcBuilder<Query, Handler>
53where
54    Query: RpcType,
55    Query::Response: std::fmt::Debug + Send + Sync,
56    Query::Error: std::fmt::Debug + Send + Sync,
57    Handler: Send + Sync,
58{
59    reference: Query::RpcReference,
60    #[allow(clippy::type_complexity)]
61    requests: Vec<
62        Arc<
63            dyn RpcType<
64                    Response = Query::Response,
65                    RpcReference = Query::RpcReference,
66                    Error = Query::Error,
67                > + Send
68                + Sync,
69        >,
70    >,
71    handler: Handler,
72}
73
74impl<Query, Handler> MultiRpcBuilder<Query, Handler>
75where
76    Handler: Default + Send + Sync,
77    Query: RpcType,
78    Query::Response: std::fmt::Debug + Send + Sync,
79    Query::Error: std::fmt::Debug + Send + Sync,
80{
81    pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
82        Self {
83            reference: reference.into(),
84            requests: vec![],
85            handler: Default::default(),
86        }
87    }
88}
89
90impl<Query, Handler> MultiRpcBuilder<Query, Handler>
91where
92    Handler: ResponseHandler<Query = Query> + Send + Sync,
93    Query: RpcType,
94    Query::Response: std::fmt::Debug + Send + Sync,
95    Query::Error: std::fmt::Debug + Send + Sync,
96{
97    pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
98        Self {
99            reference: reference.into(),
100            requests: vec![],
101            handler,
102        }
103    }
104
105    /// Map response of the queries to another type. The `map` function is executed after the queries are fetched.
106    ///
107    /// The `Handler::Response` is the type returned by the handler's `process_response` method.
108    ///
109    /// For single responses, use `RequestBuilder` instead.
110    ///
111    /// ## Example
112    /// ```rust,no_run
113    /// use near_api::advanced::{MultiQueryHandler, CallResultHandler, MultiRpcBuilder};
114    /// use near_api::types::{Data, Reference};
115    /// use std::marker::PhantomData;
116    ///
117    /// // Create a handler for multiple query responses and specify the types of the responses
118    /// let handler = MultiQueryHandler::new((
119    ///     CallResultHandler::<String>::new(),
120    ///     CallResultHandler::<u128>::new(),
121    /// ));
122    ///
123    /// // Create the builder with the handler
124    /// let builder = MultiRpcBuilder::new(handler, Reference::Optimistic);
125    ///
126    /// // Add queries to the builder
127    /// builder.add_query(todo!());
128    ///
129    /// // Map the tuple of responses to a combined type
130    /// let mapped_builder = builder.map(|(response1, response2): (Data<String>, Data<u128>)| {
131    ///     // Process the combined data
132    ///     format!("{}: {}", response1.data, response2.data)
133    /// });
134    /// ```
135    ///
136    /// See [Tokens::ft_balance](crate::tokens::Tokens::ft_balance) implementation for a real-world example.
137    pub fn map<MappedType>(
138        self,
139        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
140    ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
141        MultiRpcBuilder {
142            handler: PostprocessHandler::new(self.handler, map),
143            requests: self.requests,
144            reference: self.reference,
145        }
146    }
147
148    /// Post-process the response of the query with error handling
149    ///
150    /// This is useful if you want to convert one type to another but your function might fail.
151    ///
152    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
153    ///
154    /// ## Example
155    /// ```rust,no_run
156    /// use near_api::*;
157    ///
158    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
159    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
160    ///         .call_function("get_balance", ())?
161    ///         .read_only()
162    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
163    ///         .fetch_from_testnet()
164    ///         .await?;
165    /// println!("Balance: {}", balance);
166    /// # Ok(())
167    /// # }
168    /// ```
169    pub fn and_then<MappedType>(
170        self,
171        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
172            + Send
173            + Sync
174            + 'static,
175    ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
176        MultiRpcBuilder {
177            handler: AndThenHandler::new(self.handler, map),
178            requests: self.requests,
179            reference: self.reference,
180        }
181    }
182
183    /// Add a query to the queried items. Sometimes you might need to query multiple items at once.
184    /// To combine the result of multiple queries into one.
185    pub fn add_query(
186        mut self,
187        request: Arc<
188            dyn RpcType<
189                    Response = Query::Response,
190                    RpcReference = Query::RpcReference,
191                    Error = Query::Error,
192                > + Send
193                + Sync,
194        >,
195    ) -> Self {
196        self.requests.push(request);
197        self
198    }
199
200    /// It might be easier to use this method to add a query builder to the queried items.
201    pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
202    where
203        Handler2: ResponseHandler<Query = Query> + Send + Sync,
204    {
205        self.requests.push(query_builder.request);
206        self
207    }
208
209    /// Set the block reference for the queries.
210    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
211        Self {
212            reference: reference.into(),
213            ..self
214        }
215    }
216
217    /// Fetch the queries from the provided network.
218    #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
219    pub async fn fetch_from(
220        self,
221        network: &NetworkConfig,
222    ) -> ResultWithMethod<Handler::Response, Query::Error> {
223        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
224
225        info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", self.requests.len());
226        let requests = self.requests.into_iter().map(|request| {
227            let reference = &self.reference;
228            async move {
229                retry(network.clone(), |client| {
230                    let request = &request;
231
232                    async move {
233                        let result = request.send_query(&client, network, reference).await;
234
235                        tracing::debug!(
236                            target: QUERY_EXECUTOR_TARGET,
237                            "Querying RPC with {:?} resulted in {:?}",
238                            request,
239                            result
240                        );
241                        result
242                    }
243                })
244                .await
245            }
246        });
247
248        let requests: Vec<_> = join_all(requests)
249            .await
250            .into_iter()
251            .collect::<Result<_, _>>()?;
252        if requests.is_empty() {
253            error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
254            return Err(QueryError::InternalErrorNoResponse);
255        }
256
257        debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
258        self.handler.process_response(requests)
259    }
260
261    /// Fetch the queries from the default mainnet network configuration.
262    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
263        let network = NetworkConfig::mainnet();
264        self.fetch_from(&network).await
265    }
266
267    /// Fetch the queries from the default testnet network configuration.
268    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
269        let network = NetworkConfig::testnet();
270        self.fetch_from(&network).await
271    }
272}
273
274pub struct RpcBuilder<Query, Handler>
275where
276    Query: RpcType,
277    Query::Response: std::fmt::Debug + Send + Sync,
278    Query::Error: std::fmt::Debug + Send + Sync,
279    Handler: Send + Sync,
280{
281    reference: Query::RpcReference,
282    request: Arc<
283        dyn RpcType<
284                Response = Query::Response,
285                RpcReference = Query::RpcReference,
286                Error = Query::Error,
287            > + Send
288            + Sync,
289    >,
290    handler: Handler,
291}
292
293impl<Query, Handler> RpcBuilder<Query, Handler>
294where
295    Handler: ResponseHandler<Query = Query> + Send + Sync,
296    Query: RpcType + 'static,
297    Query::Response: std::fmt::Debug + Send + Sync,
298    Query::Error: std::fmt::Debug + Send + Sync,
299{
300    pub fn new(
301        request: Query,
302        reference: impl Into<Query::RpcReference>,
303        handler: Handler,
304    ) -> Self {
305        Self {
306            reference: reference.into(),
307            request: Arc::new(request),
308            handler,
309        }
310    }
311
312    /// Set the block reference for the query.
313    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
314        Self {
315            reference: reference.into(),
316            ..self
317        }
318    }
319
320    /// Post-process the response of the query.
321    ///
322    /// This is useful if you want to convert one type to another.
323    ///
324    /// ## Example
325    /// ```rust,no_run
326    /// use near_api::*;
327    ///
328    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
329    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
330    ///         .call_function("get_balance", ())?
331    ///         .read_only()
332    ///         .map(|balance: Data<u128>| NearToken::from_yoctonear(balance.data))
333    ///         .fetch_from_testnet()
334    ///         .await?;
335    /// println!("Balance: {}", balance);
336    /// # Ok(())
337    /// # }
338    /// ```
339    pub fn map<MappedType>(
340        self,
341        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
342    ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
343        RpcBuilder {
344            handler: PostprocessHandler::new(self.handler, map),
345            request: self.request,
346            reference: self.reference,
347        }
348    }
349
350    /// Post-process the response of the query with error handling
351    ///
352    /// This is useful if you want to convert one type to another but your function might fail.
353    ///
354    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
355    ///
356    /// ## Example
357    /// ```rust,no_run
358    /// use near_api::*;
359    ///
360    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
361    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
362    ///         .call_function("get_balance", ())?
363    ///         .read_only()
364    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
365    ///         .fetch_from_testnet()
366    ///         .await?;
367    /// println!("Balance: {}", balance);
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub fn and_then<MappedType>(
372        self,
373        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
374            + Send
375            + Sync
376            + 'static,
377    ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
378        RpcBuilder {
379            handler: AndThenHandler::new(self.handler, map),
380            request: self.request,
381            reference: self.reference,
382        }
383    }
384
385    /// Fetch the query from the provided network.
386    #[instrument(skip(self, network))]
387    pub async fn fetch_from(
388        self,
389        network: &NetworkConfig,
390    ) -> ResultWithMethod<Handler::Response, Query::Error> {
391        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
392
393        let query_response = retry(network.clone(), |client| {
394            let request = &self.request;
395            let reference = &self.reference;
396            async move {
397                let result = request.send_query(&client, network, reference).await;
398                tracing::debug!(
399                    target: QUERY_EXECUTOR_TARGET,
400                    "Querying RPC with {:?} resulted in {:?}",
401                    request,
402                    result
403                );
404                result
405            }
406        })
407        .await?;
408
409        debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
410        self.handler.process_response(vec![query_response])
411    }
412
413    /// Fetch the query from the default mainnet network configuration.
414    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
415        let network = NetworkConfig::mainnet();
416        self.fetch_from(&network).await
417    }
418
419    /// Fetch the query from the default testnet network configuration.
420    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
421        let network = NetworkConfig::testnet();
422        self.fetch_from(&network).await
423    }
424}