Skip to main content

near_api/common/query/
mod.rs

1// TODO: root level doc might be needed here. It's pretty complicated.
2use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9    config::{NetworkConfig, RetryResponse, retry},
10    errors::{ArgumentValidationError, QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod tx_rpc;
18pub mod validator_rpc;
19
20pub use handlers::*;
21
22const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
23
24type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
25
26#[async_trait]
27pub trait RpcType: Send + Sync + std::fmt::Debug {
28    type RpcReference: Send + Sync + Clone;
29    type Response;
30    type Error: std::fmt::Debug + Send + Sync;
31    async fn send_query(
32        &self,
33        client: &Client,
34        network: &NetworkConfig,
35        reference: &Self::RpcReference,
36    ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
37}
38
39pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
40pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
41
42/// A builder for querying multiple items at once.
43///
44/// Sometimes to construct some complex type, you would need to query multiple items at once, and combine them into one.
45/// This is where this builder comes in handy. Almost every time, you would want to use [Self::map] method to combine the responses into your desired type.
46///
47/// Currently, `MultiQueryHandler` supports tuples of sizes 2 and 3.
48/// For single responses, use `RequestBuilder` instead.
49///
50/// Here is a list of examples on how to use this:
51/// - [Tokens::ft_balance](crate::tokens::Tokens::ft_balance)
52/// - [StakingPool::staking_pool_info](crate::stake::Staking::staking_pool_info)
53pub struct MultiRpcBuilder<Query, Handler>
54where
55    Query: RpcType,
56    Query::Response: std::fmt::Debug + Send + Sync,
57    Query::Error: std::fmt::Debug + Send + Sync,
58    Handler: Send + Sync,
59{
60    reference: Query::RpcReference,
61    #[allow(clippy::type_complexity)]
62    requests: Vec<
63        Result<
64            Arc<
65                dyn RpcType<
66                        Response = Query::Response,
67                        RpcReference = Query::RpcReference,
68                        Error = Query::Error,
69                    > + Send
70                    + Sync,
71            >,
72            ArgumentValidationError,
73        >,
74    >,
75    handler: Handler,
76}
77
78impl<Query, Handler> MultiRpcBuilder<Query, Handler>
79where
80    Handler: Default + Send + Sync,
81    Query: RpcType,
82    Query::Response: std::fmt::Debug + Send + Sync,
83    Query::Error: std::fmt::Debug + Send + Sync,
84{
85    pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
86        Self {
87            reference: reference.into(),
88            requests: vec![],
89            handler: Default::default(),
90        }
91    }
92}
93
94impl<Query, Handler> MultiRpcBuilder<Query, Handler>
95where
96    Handler: ResponseHandler<Query = Query> + Send + Sync,
97    Query: RpcType,
98    Query::Response: std::fmt::Debug + Send + Sync,
99    Query::Error: std::fmt::Debug + Send + Sync,
100{
101    pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
102        Self {
103            reference: reference.into(),
104            requests: vec![],
105            handler,
106        }
107    }
108
109    /// Map response of the queries to another type. The `map` function is executed after the queries are fetched.
110    ///
111    /// The `Handler::Response` is the type returned by the handler's `process_response` method.
112    ///
113    /// For single responses, use `RequestBuilder` instead.
114    ///
115    /// ## Example
116    /// ```rust,no_run
117    /// use near_api::advanced::{MultiQueryHandler, CallResultHandler, MultiRpcBuilder};
118    /// use near_api::types::{Data, Reference};
119    /// use std::marker::PhantomData;
120    ///
121    /// // Create a handler for multiple query responses and specify the types of the responses
122    /// let handler = MultiQueryHandler::new((
123    ///     CallResultHandler::<String>::new(),
124    ///     CallResultHandler::<u128>::new(),
125    /// ));
126    ///
127    /// // Create the builder with the handler
128    /// let builder = MultiRpcBuilder::new(handler, Reference::Optimistic);
129    ///
130    /// // Add queries to the builder
131    /// builder.add_query(todo!());
132    ///
133    /// // Map the tuple of responses to a combined type
134    /// let mapped_builder = builder.map(|(response1, response2): (Data<String>, Data<u128>)| {
135    ///     // Process the combined data
136    ///     format!("{}: {}", response1.data, response2.data)
137    /// });
138    /// ```
139    ///
140    /// See [Tokens::ft_balance](crate::tokens::Tokens::ft_balance) implementation for a real-world example.
141    pub fn map<MappedType>(
142        self,
143        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
144    ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
145        MultiRpcBuilder {
146            handler: PostprocessHandler::new(self.handler, map),
147            requests: self.requests,
148            reference: self.reference,
149        }
150    }
151
152    /// Post-process the response of the query with error handling
153    ///
154    /// This is useful if you want to convert one type to another but your function might fail.
155    ///
156    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
157    ///
158    /// ## Example
159    /// ```rust,no_run
160    /// use near_api::*;
161    ///
162    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
163    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
164    ///         .call_function("get_balance", ())
165    ///         .read_only()
166    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
167    ///         .fetch_from_testnet()
168    ///         .await?;
169    /// println!("Balance: {}", balance);
170    /// # Ok(())
171    /// # }
172    /// ```
173    pub fn and_then<MappedType>(
174        self,
175        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
176        + Send
177        + Sync
178        + 'static,
179    ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
180        MultiRpcBuilder {
181            handler: AndThenHandler::new(self.handler, map),
182            requests: self.requests,
183            reference: self.reference,
184        }
185    }
186
187    /// Add a query to the queried items. Sometimes you might need to query multiple items at once.
188    /// To combine the result of multiple queries into one.
189    pub fn add_query(
190        mut self,
191        request: Arc<
192            dyn RpcType<
193                    Response = Query::Response,
194                    RpcReference = Query::RpcReference,
195                    Error = Query::Error,
196                > + Send
197                + Sync,
198        >,
199    ) -> Self {
200        self.requests.push(Ok(request));
201        self
202    }
203
204    /// It might be easier to use this method to add a query builder to the queried items.
205    pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
206    where
207        Handler2: ResponseHandler<Query = Query> + Send + Sync,
208    {
209        self.requests.push(query_builder.request);
210        self
211    }
212
213    /// Set the block reference for the queries.
214    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
215        Self {
216            reference: reference.into(),
217            ..self
218        }
219    }
220
221    /// Fetch the queries from the provided network.
222    #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
223    pub async fn fetch_from(
224        self,
225        network: &NetworkConfig,
226    ) -> ResultWithMethod<Handler::Response, Query::Error> {
227        let (requests, errors) =
228            self.requests
229                .into_iter()
230                .fold((vec![], vec![]), |(mut v, mut e), r| {
231                    match r {
232                        Ok(val) => v.push(val),
233                        Err(err) => e.push(err),
234                    }
235                    (v, e)
236                });
237        if !errors.is_empty() {
238            return Err(QueryError::ArgumentValidationError(
239                ArgumentValidationError::multiple(errors),
240            ));
241        }
242
243        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
244        info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
245        let requests = requests.into_iter().map(|request| {
246            let reference = &self.reference;
247            async move {
248                retry(network.clone(), |client| {
249                    let request = &request;
250
251                    async move {
252                        let result = request.send_query(&client, network, reference).await;
253
254                        tracing::debug!(
255                            target: QUERY_EXECUTOR_TARGET,
256                            "Querying RPC with {:?} resulted in {:?}",
257                            request,
258                            result
259                        );
260                        result
261                    }
262                })
263                .await
264            }
265        });
266
267        let requests: Vec<_> = join_all(requests)
268            .await
269            .into_iter()
270            .collect::<Result<_, _>>()?;
271        if requests.is_empty() {
272            error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
273            return Err(QueryError::InternalErrorNoResponse);
274        }
275
276        debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
277        self.handler.process_response(requests)
278    }
279
280    /// Fetch the queries from the default mainnet network configuration.
281    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
282        let network = NetworkConfig::mainnet();
283        self.fetch_from(&network).await
284    }
285
286    /// Fetch the queries from the default mainnet archival network configuration.
287    pub async fn fetch_from_mainnet_archival(
288        self,
289    ) -> ResultWithMethod<Handler::Response, Query::Error> {
290        let network = NetworkConfig::mainnet_archival();
291        self.fetch_from(&network).await
292    }
293
294    /// Fetch the queries from the default testnet network configuration.
295    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
296        let network = NetworkConfig::testnet();
297        self.fetch_from(&network).await
298    }
299
300    /// Fetch the queries from the default testnet archival network configuration.
301    pub async fn fetch_from_testnet_archival(
302        self,
303    ) -> ResultWithMethod<Handler::Response, Query::Error> {
304        let network = NetworkConfig::testnet_archival();
305        self.fetch_from(&network).await
306    }
307}
308
309pub struct RpcBuilder<Query, Handler>
310where
311    Query: RpcType,
312    Query::Response: std::fmt::Debug + Send + Sync,
313    Query::Error: std::fmt::Debug + Send + Sync,
314    Handler: Send + Sync,
315{
316    reference: Query::RpcReference,
317    #[allow(clippy::type_complexity)]
318    request: Result<
319        Arc<
320            dyn RpcType<
321                    Response = Query::Response,
322                    RpcReference = Query::RpcReference,
323                    Error = Query::Error,
324                > + Send
325                + Sync,
326        >,
327        ArgumentValidationError,
328    >,
329    handler: Handler,
330}
331
332impl<Query, Handler> RpcBuilder<Query, Handler>
333where
334    Handler: ResponseHandler<Query = Query> + Send + Sync,
335    Query: RpcType + 'static,
336    Query::Response: std::fmt::Debug + Send + Sync,
337    Query::Error: std::fmt::Debug + Send + Sync,
338{
339    pub fn new(
340        request: Query,
341        reference: impl Into<Query::RpcReference>,
342        handler: Handler,
343    ) -> Self {
344        Self {
345            reference: reference.into(),
346            request: Ok(Arc::new(request)),
347            handler,
348        }
349    }
350
351    pub fn with_deferred_error(mut self, error: ArgumentValidationError) -> Self {
352        self.request = Err(error);
353        self
354    }
355
356    /// Set the block reference for the query.
357    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
358        Self {
359            reference: reference.into(),
360            ..self
361        }
362    }
363
364    /// Post-process the response of the query.
365    ///
366    /// This is useful if you want to convert one type to another.
367    ///
368    /// ## Example
369    /// ```rust,no_run
370    /// use near_api::*;
371    ///
372    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
373    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
374    ///         .call_function("get_balance", ())
375    ///         .read_only()
376    ///         .map(|balance: Data<u128>| NearToken::from_yoctonear(balance.data))
377    ///         .fetch_from_testnet()
378    ///         .await?;
379    /// println!("Balance: {}", balance);
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub fn map<MappedType>(
384        self,
385        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
386    ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
387        RpcBuilder {
388            handler: PostprocessHandler::new(self.handler, map),
389            request: self.request,
390            reference: self.reference,
391        }
392    }
393
394    /// Post-process the response of the query with error handling
395    ///
396    /// This is useful if you want to convert one type to another but your function might fail.
397    ///
398    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
399    ///
400    /// ## Example
401    /// ```rust,no_run
402    /// use near_api::*;
403    ///
404    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
405    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
406    ///         .call_function("get_balance", ())
407    ///         .read_only()
408    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
409    ///         .fetch_from_testnet()
410    ///         .await?;
411    /// println!("Balance: {}", balance);
412    /// # Ok(())
413    /// # }
414    /// ```
415    pub fn and_then<MappedType>(
416        self,
417        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
418        + Send
419        + Sync
420        + 'static,
421    ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
422        RpcBuilder {
423            handler: AndThenHandler::new(self.handler, map),
424            request: self.request,
425            reference: self.reference,
426        }
427    }
428
429    /// Fetch the query from the provided network.
430    #[instrument(skip(self, network))]
431    pub async fn fetch_from(
432        self,
433        network: &NetworkConfig,
434    ) -> ResultWithMethod<Handler::Response, Query::Error> {
435        let request = self.request?;
436
437        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
438
439        let query_response = retry(network.clone(), |client| {
440            let request = &request;
441            let reference = &self.reference;
442            async move {
443                let result = request.send_query(&client, network, reference).await;
444                tracing::debug!(
445                    target: QUERY_EXECUTOR_TARGET,
446                    "Querying RPC with {:?} resulted in {:?}",
447                    request,
448                    result
449                );
450                result
451            }
452        })
453        .await?;
454
455        debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
456        self.handler.process_response(vec![query_response])
457    }
458
459    /// Fetch the query from the default mainnet network configuration.
460    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
461        let network = NetworkConfig::mainnet();
462        self.fetch_from(&network).await
463    }
464
465    /// Fetch the query from the default mainnet archival network configuration.
466    pub async fn fetch_from_mainnet_archival(
467        self,
468    ) -> ResultWithMethod<Handler::Response, Query::Error> {
469        let network = NetworkConfig::mainnet_archival();
470        self.fetch_from(&network).await
471    }
472
473    /// Fetch the query from the default testnet network configuration.
474    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
475        let network = NetworkConfig::testnet();
476        self.fetch_from(&network).await
477    }
478
479    /// Fetch the query from the default testnet archival network configuration.
480    pub async fn fetch_from_testnet_archival(
481        self,
482    ) -> ResultWithMethod<Handler::Response, Query::Error> {
483        let network = NetworkConfig::testnet_archival();
484        self.fetch_from(&network).await
485    }
486}