near_api/common/query/
mod.rs

1// TODO: root level doc might be needed here. It's pretty complicated.
2use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9    config::{retry, NetworkConfig, RetryResponse},
10    errors::{ArgumentValidationError, QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod validator_rpc;
18
19pub use handlers::*;
20
21const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
22
23type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
24
25#[async_trait]
26pub trait RpcType: Send + Sync + std::fmt::Debug {
27    type RpcReference: Send + Sync + Clone;
28    type Response;
29    type Error: std::fmt::Debug + Send + Sync;
30    async fn send_query(
31        &self,
32        client: &Client,
33        network: &NetworkConfig,
34        reference: &Self::RpcReference,
35    ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
36}
37
38pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
39pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
40
41/// A builder for querying multiple items at once.
42///
43/// Sometimes to construct some complex type, you would need to query multiple items at once, and combine them into one.
44/// This is where this builder comes in handy. Almost every time, you would want to use [Self::map] method to combine the responses into your desired type.
45///
46/// Currently, `MultiQueryHandler` supports tuples of sizes 2 and 3.
47/// For single responses, use `RequestBuilder` instead.
48///
49/// Here is a list of examples on how to use this:
50/// - [Tokens::ft_balance](crate::tokens::Tokens::ft_balance)
51/// - [StakingPool::staking_pool_info](crate::stake::Staking::staking_pool_info)
52pub struct MultiRpcBuilder<Query, Handler>
53where
54    Query: RpcType,
55    Query::Response: std::fmt::Debug + Send + Sync,
56    Query::Error: std::fmt::Debug + Send + Sync,
57    Handler: Send + Sync,
58{
59    reference: Query::RpcReference,
60    #[allow(clippy::type_complexity)]
61    requests: Vec<
62        Result<
63            Arc<
64                dyn RpcType<
65                        Response = Query::Response,
66                        RpcReference = Query::RpcReference,
67                        Error = Query::Error,
68                    > + Send
69                    + Sync,
70            >,
71            ArgumentValidationError,
72        >,
73    >,
74    handler: Handler,
75}
76
77impl<Query, Handler> MultiRpcBuilder<Query, Handler>
78where
79    Handler: Default + Send + Sync,
80    Query: RpcType,
81    Query::Response: std::fmt::Debug + Send + Sync,
82    Query::Error: std::fmt::Debug + Send + Sync,
83{
84    pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
85        Self {
86            reference: reference.into(),
87            requests: vec![],
88            handler: Default::default(),
89        }
90    }
91}
92
93impl<Query, Handler> MultiRpcBuilder<Query, Handler>
94where
95    Handler: ResponseHandler<Query = Query> + Send + Sync,
96    Query: RpcType,
97    Query::Response: std::fmt::Debug + Send + Sync,
98    Query::Error: std::fmt::Debug + Send + Sync,
99{
100    pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
101        Self {
102            reference: reference.into(),
103            requests: vec![],
104            handler,
105        }
106    }
107
108    /// Map response of the queries to another type. The `map` function is executed after the queries are fetched.
109    ///
110    /// The `Handler::Response` is the type returned by the handler's `process_response` method.
111    ///
112    /// For single responses, use `RequestBuilder` instead.
113    ///
114    /// ## Example
115    /// ```rust,no_run
116    /// use near_api::advanced::{MultiQueryHandler, CallResultHandler, MultiRpcBuilder};
117    /// use near_api::types::{Data, Reference};
118    /// use std::marker::PhantomData;
119    ///
120    /// // Create a handler for multiple query responses and specify the types of the responses
121    /// let handler = MultiQueryHandler::new((
122    ///     CallResultHandler::<String>::new(),
123    ///     CallResultHandler::<u128>::new(),
124    /// ));
125    ///
126    /// // Create the builder with the handler
127    /// let builder = MultiRpcBuilder::new(handler, Reference::Optimistic);
128    ///
129    /// // Add queries to the builder
130    /// builder.add_query(todo!());
131    ///
132    /// // Map the tuple of responses to a combined type
133    /// let mapped_builder = builder.map(|(response1, response2): (Data<String>, Data<u128>)| {
134    ///     // Process the combined data
135    ///     format!("{}: {}", response1.data, response2.data)
136    /// });
137    /// ```
138    ///
139    /// See [Tokens::ft_balance](crate::tokens::Tokens::ft_balance) implementation for a real-world example.
140    pub fn map<MappedType>(
141        self,
142        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
143    ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
144        MultiRpcBuilder {
145            handler: PostprocessHandler::new(self.handler, map),
146            requests: self.requests,
147            reference: self.reference,
148        }
149    }
150
151    /// Post-process the response of the query with error handling
152    ///
153    /// This is useful if you want to convert one type to another but your function might fail.
154    ///
155    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
156    ///
157    /// ## Example
158    /// ```rust,no_run
159    /// use near_api::*;
160    ///
161    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
162    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
163    ///         .call_function("get_balance", ())
164    ///         .read_only()
165    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
166    ///         .fetch_from_testnet()
167    ///         .await?;
168    /// println!("Balance: {}", balance);
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub fn and_then<MappedType>(
173        self,
174        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
175            + Send
176            + Sync
177            + 'static,
178    ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
179        MultiRpcBuilder {
180            handler: AndThenHandler::new(self.handler, map),
181            requests: self.requests,
182            reference: self.reference,
183        }
184    }
185
186    /// Add a query to the queried items. Sometimes you might need to query multiple items at once.
187    /// To combine the result of multiple queries into one.
188    pub fn add_query(
189        mut self,
190        request: Arc<
191            dyn RpcType<
192                    Response = Query::Response,
193                    RpcReference = Query::RpcReference,
194                    Error = Query::Error,
195                > + Send
196                + Sync,
197        >,
198    ) -> Self {
199        self.requests.push(Ok(request));
200        self
201    }
202
203    /// It might be easier to use this method to add a query builder to the queried items.
204    pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
205    where
206        Handler2: ResponseHandler<Query = Query> + Send + Sync,
207    {
208        self.requests.push(query_builder.request);
209        self
210    }
211
212    /// Set the block reference for the queries.
213    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
214        Self {
215            reference: reference.into(),
216            ..self
217        }
218    }
219
220    /// Fetch the queries from the provided network.
221    #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
222    pub async fn fetch_from(
223        self,
224        network: &NetworkConfig,
225    ) -> ResultWithMethod<Handler::Response, Query::Error> {
226        let (requests, errors) =
227            self.requests
228                .into_iter()
229                .fold((vec![], vec![]), |(mut v, mut e), r| {
230                    match r {
231                        Ok(val) => v.push(val),
232                        Err(err) => e.push(err),
233                    }
234                    (v, e)
235                });
236        if !errors.is_empty() {
237            return Err(QueryError::ArgumentValidationError(
238                ArgumentValidationError::multiple(errors),
239            ));
240        }
241
242        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
243        info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
244        let requests = requests.into_iter().map(|request| {
245            let reference = &self.reference;
246            async move {
247                retry(network.clone(), |client| {
248                    let request = &request;
249
250                    async move {
251                        let result = request.send_query(&client, network, reference).await;
252
253                        tracing::debug!(
254                            target: QUERY_EXECUTOR_TARGET,
255                            "Querying RPC with {:?} resulted in {:?}",
256                            request,
257                            result
258                        );
259                        result
260                    }
261                })
262                .await
263            }
264        });
265
266        let requests: Vec<_> = join_all(requests)
267            .await
268            .into_iter()
269            .collect::<Result<_, _>>()?;
270        if requests.is_empty() {
271            error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
272            return Err(QueryError::InternalErrorNoResponse);
273        }
274
275        debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
276        self.handler.process_response(requests)
277    }
278
279    /// Fetch the queries from the default mainnet network configuration.
280    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
281        let network = NetworkConfig::mainnet();
282        self.fetch_from(&network).await
283    }
284
285    /// Fetch the queries from the default testnet network configuration.
286    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
287        let network = NetworkConfig::testnet();
288        self.fetch_from(&network).await
289    }
290}
291
292pub struct RpcBuilder<Query, Handler>
293where
294    Query: RpcType,
295    Query::Response: std::fmt::Debug + Send + Sync,
296    Query::Error: std::fmt::Debug + Send + Sync,
297    Handler: Send + Sync,
298{
299    reference: Query::RpcReference,
300    #[allow(clippy::type_complexity)]
301    request: Result<
302        Arc<
303            dyn RpcType<
304                    Response = Query::Response,
305                    RpcReference = Query::RpcReference,
306                    Error = Query::Error,
307                > + Send
308                + Sync,
309        >,
310        ArgumentValidationError,
311    >,
312    handler: Handler,
313}
314
315impl<Query, Handler> RpcBuilder<Query, Handler>
316where
317    Handler: ResponseHandler<Query = Query> + Send + Sync,
318    Query: RpcType + 'static,
319    Query::Response: std::fmt::Debug + Send + Sync,
320    Query::Error: std::fmt::Debug + Send + Sync,
321{
322    pub fn new(
323        request: Query,
324        reference: impl Into<Query::RpcReference>,
325        handler: Handler,
326    ) -> Self {
327        Self {
328            reference: reference.into(),
329            request: Ok(Arc::new(request)),
330            handler,
331        }
332    }
333
334    pub fn with_deferred_error(mut self, error: ArgumentValidationError) -> Self {
335        self.request = Err(error);
336        self
337    }
338
339    /// Set the block reference for the query.
340    pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
341        Self {
342            reference: reference.into(),
343            ..self
344        }
345    }
346
347    /// Post-process the response of the query.
348    ///
349    /// This is useful if you want to convert one type to another.
350    ///
351    /// ## Example
352    /// ```rust,no_run
353    /// use near_api::*;
354    ///
355    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
356    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
357    ///         .call_function("get_balance", ())
358    ///         .read_only()
359    ///         .map(|balance: Data<u128>| NearToken::from_yoctonear(balance.data))
360    ///         .fetch_from_testnet()
361    ///         .await?;
362    /// println!("Balance: {}", balance);
363    /// # Ok(())
364    /// # }
365    /// ```
366    pub fn map<MappedType>(
367        self,
368        map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
369    ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
370        RpcBuilder {
371            handler: PostprocessHandler::new(self.handler, map),
372            request: self.request,
373            reference: self.reference,
374        }
375    }
376
377    /// Post-process the response of the query with error handling
378    ///
379    /// This is useful if you want to convert one type to another but your function might fail.
380    ///
381    /// The error will be wrapped in a `QueryError::ConversionError` and returned to the caller.
382    ///
383    /// ## Example
384    /// ```rust,no_run
385    /// use near_api::*;
386    ///
387    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
388    /// let balance: NearToken = Contract("some_contract.testnet".parse()?)
389    ///         .call_function("get_balance", ())
390    ///         .read_only()
391    ///         .and_then(|balance: Data<String>| Ok(NearToken::from_yoctonear(balance.data.parse()?)))
392    ///         .fetch_from_testnet()
393    ///         .await?;
394    /// println!("Balance: {}", balance);
395    /// # Ok(())
396    /// # }
397    /// ```
398    pub fn and_then<MappedType>(
399        self,
400        map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
401            + Send
402            + Sync
403            + 'static,
404    ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
405        RpcBuilder {
406            handler: AndThenHandler::new(self.handler, map),
407            request: self.request,
408            reference: self.reference,
409        }
410    }
411
412    /// Fetch the query from the provided network.
413    #[instrument(skip(self, network))]
414    pub async fn fetch_from(
415        self,
416        network: &NetworkConfig,
417    ) -> ResultWithMethod<Handler::Response, Query::Error> {
418        let request = self.request?;
419
420        debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
421
422        let query_response = retry(network.clone(), |client| {
423            let request = &request;
424            let reference = &self.reference;
425            async move {
426                let result = request.send_query(&client, network, reference).await;
427                tracing::debug!(
428                    target: QUERY_EXECUTOR_TARGET,
429                    "Querying RPC with {:?} resulted in {:?}",
430                    request,
431                    result
432                );
433                result
434            }
435        })
436        .await?;
437
438        debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
439        self.handler.process_response(vec![query_response])
440    }
441
442    /// Fetch the query from the default mainnet network configuration.
443    pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
444        let network = NetworkConfig::mainnet();
445        self.fetch_from(&network).await
446    }
447
448    /// Fetch the query from the default testnet network configuration.
449    pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
450        let network = NetworkConfig::testnet();
451        self.fetch_from(&network).await
452    }
453}