1use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9 config::{NetworkConfig, RetryResponse, retry},
10 errors::{ArgumentValidationError, QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod tx_rpc;
18pub mod validator_rpc;
19
20pub use handlers::*;
21
22const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
23
24type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
25
26#[async_trait]
27pub trait RpcType: Send + Sync + std::fmt::Debug {
28 type RpcReference: Send + Sync + Clone;
29 type Response;
30 type Error: std::fmt::Debug + Send + Sync;
31 async fn send_query(
32 &self,
33 client: &Client,
34 network: &NetworkConfig,
35 reference: &Self::RpcReference,
36 ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
37}
38
39pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
40pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
41
42pub struct MultiRpcBuilder<Query, Handler>
54where
55 Query: RpcType,
56 Query::Response: std::fmt::Debug + Send + Sync,
57 Query::Error: std::fmt::Debug + Send + Sync,
58 Handler: Send + Sync,
59{
60 reference: Query::RpcReference,
61 #[allow(clippy::type_complexity)]
62 requests: Vec<
63 Result<
64 Arc<
65 dyn RpcType<
66 Response = Query::Response,
67 RpcReference = Query::RpcReference,
68 Error = Query::Error,
69 > + Send
70 + Sync,
71 >,
72 ArgumentValidationError,
73 >,
74 >,
75 handler: Handler,
76}
77
78impl<Query, Handler> MultiRpcBuilder<Query, Handler>
79where
80 Handler: Default + Send + Sync,
81 Query: RpcType,
82 Query::Response: std::fmt::Debug + Send + Sync,
83 Query::Error: std::fmt::Debug + Send + Sync,
84{
85 pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
86 Self {
87 reference: reference.into(),
88 requests: vec![],
89 handler: Default::default(),
90 }
91 }
92}
93
94impl<Query, Handler> MultiRpcBuilder<Query, Handler>
95where
96 Handler: ResponseHandler<Query = Query> + Send + Sync,
97 Query: RpcType,
98 Query::Response: std::fmt::Debug + Send + Sync,
99 Query::Error: std::fmt::Debug + Send + Sync,
100{
101 pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
102 Self {
103 reference: reference.into(),
104 requests: vec![],
105 handler,
106 }
107 }
108
109 pub fn map<MappedType>(
142 self,
143 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
144 ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
145 MultiRpcBuilder {
146 handler: PostprocessHandler::new(self.handler, map),
147 requests: self.requests,
148 reference: self.reference,
149 }
150 }
151
152 pub fn and_then<MappedType>(
174 self,
175 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
176 + Send
177 + Sync
178 + 'static,
179 ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
180 MultiRpcBuilder {
181 handler: AndThenHandler::new(self.handler, map),
182 requests: self.requests,
183 reference: self.reference,
184 }
185 }
186
187 pub fn add_query(
190 mut self,
191 request: Arc<
192 dyn RpcType<
193 Response = Query::Response,
194 RpcReference = Query::RpcReference,
195 Error = Query::Error,
196 > + Send
197 + Sync,
198 >,
199 ) -> Self {
200 self.requests.push(Ok(request));
201 self
202 }
203
204 pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
206 where
207 Handler2: ResponseHandler<Query = Query> + Send + Sync,
208 {
209 self.requests.push(query_builder.request);
210 self
211 }
212
213 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
215 Self {
216 reference: reference.into(),
217 ..self
218 }
219 }
220
221 #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
223 pub async fn fetch_from(
224 self,
225 network: &NetworkConfig,
226 ) -> ResultWithMethod<Handler::Response, Query::Error> {
227 let (requests, errors) =
228 self.requests
229 .into_iter()
230 .fold((vec![], vec![]), |(mut v, mut e), r| {
231 match r {
232 Ok(val) => v.push(val),
233 Err(err) => e.push(err),
234 }
235 (v, e)
236 });
237 if !errors.is_empty() {
238 return Err(QueryError::ArgumentValidationError(
239 ArgumentValidationError::multiple(errors),
240 ));
241 }
242
243 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
244 info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
245 let requests = requests.into_iter().map(|request| {
246 let reference = &self.reference;
247 async move {
248 retry(network.clone(), |client| {
249 let request = &request;
250
251 async move {
252 let result = request.send_query(&client, network, reference).await;
253
254 tracing::debug!(
255 target: QUERY_EXECUTOR_TARGET,
256 "Querying RPC with {:?} resulted in {:?}",
257 request,
258 result
259 );
260 result
261 }
262 })
263 .await
264 }
265 });
266
267 let requests: Vec<_> = join_all(requests)
268 .await
269 .into_iter()
270 .collect::<Result<_, _>>()?;
271 if requests.is_empty() {
272 error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
273 return Err(QueryError::InternalErrorNoResponse);
274 }
275
276 debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
277 self.handler.process_response(requests)
278 }
279
280 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
282 let network = NetworkConfig::mainnet();
283 self.fetch_from(&network).await
284 }
285
286 pub async fn fetch_from_mainnet_archival(
288 self,
289 ) -> ResultWithMethod<Handler::Response, Query::Error> {
290 let network = NetworkConfig::mainnet_archival();
291 self.fetch_from(&network).await
292 }
293
294 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
296 let network = NetworkConfig::testnet();
297 self.fetch_from(&network).await
298 }
299
300 pub async fn fetch_from_testnet_archival(
302 self,
303 ) -> ResultWithMethod<Handler::Response, Query::Error> {
304 let network = NetworkConfig::testnet_archival();
305 self.fetch_from(&network).await
306 }
307}
308
309pub struct RpcBuilder<Query, Handler>
310where
311 Query: RpcType,
312 Query::Response: std::fmt::Debug + Send + Sync,
313 Query::Error: std::fmt::Debug + Send + Sync,
314 Handler: Send + Sync,
315{
316 reference: Query::RpcReference,
317 #[allow(clippy::type_complexity)]
318 request: Result<
319 Arc<
320 dyn RpcType<
321 Response = Query::Response,
322 RpcReference = Query::RpcReference,
323 Error = Query::Error,
324 > + Send
325 + Sync,
326 >,
327 ArgumentValidationError,
328 >,
329 handler: Handler,
330}
331
332impl<Query, Handler> RpcBuilder<Query, Handler>
333where
334 Handler: ResponseHandler<Query = Query> + Send + Sync,
335 Query: RpcType + 'static,
336 Query::Response: std::fmt::Debug + Send + Sync,
337 Query::Error: std::fmt::Debug + Send + Sync,
338{
339 pub fn new(
340 request: Query,
341 reference: impl Into<Query::RpcReference>,
342 handler: Handler,
343 ) -> Self {
344 Self {
345 reference: reference.into(),
346 request: Ok(Arc::new(request)),
347 handler,
348 }
349 }
350
351 pub fn with_deferred_error(mut self, error: ArgumentValidationError) -> Self {
352 self.request = Err(error);
353 self
354 }
355
356 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
358 Self {
359 reference: reference.into(),
360 ..self
361 }
362 }
363
364 pub fn map<MappedType>(
384 self,
385 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
386 ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
387 RpcBuilder {
388 handler: PostprocessHandler::new(self.handler, map),
389 request: self.request,
390 reference: self.reference,
391 }
392 }
393
394 pub fn and_then<MappedType>(
416 self,
417 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
418 + Send
419 + Sync
420 + 'static,
421 ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
422 RpcBuilder {
423 handler: AndThenHandler::new(self.handler, map),
424 request: self.request,
425 reference: self.reference,
426 }
427 }
428
429 #[instrument(skip(self, network))]
431 pub async fn fetch_from(
432 self,
433 network: &NetworkConfig,
434 ) -> ResultWithMethod<Handler::Response, Query::Error> {
435 let request = self.request?;
436
437 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
438
439 let query_response = retry(network.clone(), |client| {
440 let request = &request;
441 let reference = &self.reference;
442 async move {
443 let result = request.send_query(&client, network, reference).await;
444 tracing::debug!(
445 target: QUERY_EXECUTOR_TARGET,
446 "Querying RPC with {:?} resulted in {:?}",
447 request,
448 result
449 );
450 result
451 }
452 })
453 .await?;
454
455 debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
456 self.handler.process_response(vec![query_response])
457 }
458
459 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
461 let network = NetworkConfig::mainnet();
462 self.fetch_from(&network).await
463 }
464
465 pub async fn fetch_from_mainnet_archival(
467 self,
468 ) -> ResultWithMethod<Handler::Response, Query::Error> {
469 let network = NetworkConfig::mainnet_archival();
470 self.fetch_from(&network).await
471 }
472
473 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
475 let network = NetworkConfig::testnet();
476 self.fetch_from(&network).await
477 }
478
479 pub async fn fetch_from_testnet_archival(
481 self,
482 ) -> ResultWithMethod<Handler::Response, Query::Error> {
483 let network = NetworkConfig::testnet_archival();
484 self.fetch_from(&network).await
485 }
486}