1use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9 config::{retry, NetworkConfig, RetryResponse},
10 errors::{ArgumentValidationError, QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod validator_rpc;
18
19pub use handlers::*;
20
21const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
22
23type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
24
25#[async_trait]
26pub trait RpcType: Send + Sync + std::fmt::Debug {
27 type RpcReference: Send + Sync + Clone;
28 type Response;
29 type Error: std::fmt::Debug + Send + Sync;
30 async fn send_query(
31 &self,
32 client: &Client,
33 network: &NetworkConfig,
34 reference: &Self::RpcReference,
35 ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
36}
37
38pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
39pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
40
41pub struct MultiRpcBuilder<Query, Handler>
53where
54 Query: RpcType,
55 Query::Response: std::fmt::Debug + Send + Sync,
56 Query::Error: std::fmt::Debug + Send + Sync,
57 Handler: Send + Sync,
58{
59 reference: Query::RpcReference,
60 #[allow(clippy::type_complexity)]
61 requests: Vec<
62 Result<
63 Arc<
64 dyn RpcType<
65 Response = Query::Response,
66 RpcReference = Query::RpcReference,
67 Error = Query::Error,
68 > + Send
69 + Sync,
70 >,
71 ArgumentValidationError,
72 >,
73 >,
74 handler: Handler,
75}
76
77impl<Query, Handler> MultiRpcBuilder<Query, Handler>
78where
79 Handler: Default + Send + Sync,
80 Query: RpcType,
81 Query::Response: std::fmt::Debug + Send + Sync,
82 Query::Error: std::fmt::Debug + Send + Sync,
83{
84 pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
85 Self {
86 reference: reference.into(),
87 requests: vec![],
88 handler: Default::default(),
89 }
90 }
91}
92
93impl<Query, Handler> MultiRpcBuilder<Query, Handler>
94where
95 Handler: ResponseHandler<Query = Query> + Send + Sync,
96 Query: RpcType,
97 Query::Response: std::fmt::Debug + Send + Sync,
98 Query::Error: std::fmt::Debug + Send + Sync,
99{
100 pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
101 Self {
102 reference: reference.into(),
103 requests: vec![],
104 handler,
105 }
106 }
107
108 pub fn map<MappedType>(
141 self,
142 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
143 ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
144 MultiRpcBuilder {
145 handler: PostprocessHandler::new(self.handler, map),
146 requests: self.requests,
147 reference: self.reference,
148 }
149 }
150
151 pub fn and_then<MappedType>(
173 self,
174 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
175 + Send
176 + Sync
177 + 'static,
178 ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
179 MultiRpcBuilder {
180 handler: AndThenHandler::new(self.handler, map),
181 requests: self.requests,
182 reference: self.reference,
183 }
184 }
185
186 pub fn add_query(
189 mut self,
190 request: Arc<
191 dyn RpcType<
192 Response = Query::Response,
193 RpcReference = Query::RpcReference,
194 Error = Query::Error,
195 > + Send
196 + Sync,
197 >,
198 ) -> Self {
199 self.requests.push(Ok(request));
200 self
201 }
202
203 pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
205 where
206 Handler2: ResponseHandler<Query = Query> + Send + Sync,
207 {
208 self.requests.push(query_builder.request);
209 self
210 }
211
212 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
214 Self {
215 reference: reference.into(),
216 ..self
217 }
218 }
219
220 #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
222 pub async fn fetch_from(
223 self,
224 network: &NetworkConfig,
225 ) -> ResultWithMethod<Handler::Response, Query::Error> {
226 let (requests, errors) =
227 self.requests
228 .into_iter()
229 .fold((vec![], vec![]), |(mut v, mut e), r| {
230 match r {
231 Ok(val) => v.push(val),
232 Err(err) => e.push(err),
233 }
234 (v, e)
235 });
236 if !errors.is_empty() {
237 return Err(QueryError::ArgumentValidationError(
238 ArgumentValidationError::multiple(errors),
239 ));
240 }
241
242 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
243 info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
244 let requests = requests.into_iter().map(|request| {
245 let reference = &self.reference;
246 async move {
247 retry(network.clone(), |client| {
248 let request = &request;
249
250 async move {
251 let result = request.send_query(&client, network, reference).await;
252
253 tracing::debug!(
254 target: QUERY_EXECUTOR_TARGET,
255 "Querying RPC with {:?} resulted in {:?}",
256 request,
257 result
258 );
259 result
260 }
261 })
262 .await
263 }
264 });
265
266 let requests: Vec<_> = join_all(requests)
267 .await
268 .into_iter()
269 .collect::<Result<_, _>>()?;
270 if requests.is_empty() {
271 error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
272 return Err(QueryError::InternalErrorNoResponse);
273 }
274
275 debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
276 self.handler.process_response(requests)
277 }
278
279 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
281 let network = NetworkConfig::mainnet();
282 self.fetch_from(&network).await
283 }
284
285 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
287 let network = NetworkConfig::testnet();
288 self.fetch_from(&network).await
289 }
290}
291
292pub struct RpcBuilder<Query, Handler>
293where
294 Query: RpcType,
295 Query::Response: std::fmt::Debug + Send + Sync,
296 Query::Error: std::fmt::Debug + Send + Sync,
297 Handler: Send + Sync,
298{
299 reference: Query::RpcReference,
300 #[allow(clippy::type_complexity)]
301 request: Result<
302 Arc<
303 dyn RpcType<
304 Response = Query::Response,
305 RpcReference = Query::RpcReference,
306 Error = Query::Error,
307 > + Send
308 + Sync,
309 >,
310 ArgumentValidationError,
311 >,
312 handler: Handler,
313}
314
315impl<Query, Handler> RpcBuilder<Query, Handler>
316where
317 Handler: ResponseHandler<Query = Query> + Send + Sync,
318 Query: RpcType + 'static,
319 Query::Response: std::fmt::Debug + Send + Sync,
320 Query::Error: std::fmt::Debug + Send + Sync,
321{
322 pub fn new(
323 request: Query,
324 reference: impl Into<Query::RpcReference>,
325 handler: Handler,
326 ) -> Self {
327 Self {
328 reference: reference.into(),
329 request: Ok(Arc::new(request)),
330 handler,
331 }
332 }
333
334 pub fn with_deferred_error(mut self, error: ArgumentValidationError) -> Self {
335 self.request = Err(error);
336 self
337 }
338
339 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
341 Self {
342 reference: reference.into(),
343 ..self
344 }
345 }
346
347 pub fn map<MappedType>(
367 self,
368 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
369 ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
370 RpcBuilder {
371 handler: PostprocessHandler::new(self.handler, map),
372 request: self.request,
373 reference: self.reference,
374 }
375 }
376
377 pub fn and_then<MappedType>(
399 self,
400 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
401 + Send
402 + Sync
403 + 'static,
404 ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
405 RpcBuilder {
406 handler: AndThenHandler::new(self.handler, map),
407 request: self.request,
408 reference: self.reference,
409 }
410 }
411
412 #[instrument(skip(self, network))]
414 pub async fn fetch_from(
415 self,
416 network: &NetworkConfig,
417 ) -> ResultWithMethod<Handler::Response, Query::Error> {
418 let request = self.request?;
419
420 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
421
422 let query_response = retry(network.clone(), |client| {
423 let request = &request;
424 let reference = &self.reference;
425 async move {
426 let result = request.send_query(&client, network, reference).await;
427 tracing::debug!(
428 target: QUERY_EXECUTOR_TARGET,
429 "Querying RPC with {:?} resulted in {:?}",
430 request,
431 result
432 );
433 result
434 }
435 })
436 .await?;
437
438 debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
439 self.handler.process_response(vec![query_response])
440 }
441
442 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
444 let network = NetworkConfig::mainnet();
445 self.fetch_from(&network).await
446 }
447
448 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
450 let network = NetworkConfig::testnet();
451 self.fetch_from(&network).await
452 }
453}