1use async_trait::async_trait;
3use futures::future::join_all;
4use near_openapi_client::Client;
5use std::sync::Arc;
6use tracing::{debug, error, info, instrument};
7
8use crate::{
9 config::{retry, NetworkConfig, RetryResponse},
10 errors::{QueryError, SendRequestError},
11};
12
13pub mod block_rpc;
14pub mod handlers;
15pub mod query_request;
16pub mod query_rpc;
17pub mod validator_rpc;
18
19pub use handlers::*;
20
21const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
22
23type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
24
25#[async_trait]
26pub trait RpcType: Send + Sync + std::fmt::Debug {
27 type RpcReference: Send + Sync + Clone;
28 type Response;
29 type Error: std::fmt::Debug + Send + Sync;
30 async fn send_query(
31 &self,
32 client: &Client,
33 network: &NetworkConfig,
34 reference: &Self::RpcReference,
35 ) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
36}
37
38pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
39pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
40
41pub struct MultiRpcBuilder<Query, Handler>
53where
54 Query: RpcType,
55 Query::Response: std::fmt::Debug + Send + Sync,
56 Query::Error: std::fmt::Debug + Send + Sync,
57 Handler: Send + Sync,
58{
59 reference: Query::RpcReference,
60 #[allow(clippy::type_complexity)]
61 requests: Vec<
62 Arc<
63 dyn RpcType<
64 Response = Query::Response,
65 RpcReference = Query::RpcReference,
66 Error = Query::Error,
67 > + Send
68 + Sync,
69 >,
70 >,
71 handler: Handler,
72}
73
74impl<Query, Handler> MultiRpcBuilder<Query, Handler>
75where
76 Handler: Default + Send + Sync,
77 Query: RpcType,
78 Query::Response: std::fmt::Debug + Send + Sync,
79 Query::Error: std::fmt::Debug + Send + Sync,
80{
81 pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
82 Self {
83 reference: reference.into(),
84 requests: vec![],
85 handler: Default::default(),
86 }
87 }
88}
89
90impl<Query, Handler> MultiRpcBuilder<Query, Handler>
91where
92 Handler: ResponseHandler<Query = Query> + Send + Sync,
93 Query: RpcType,
94 Query::Response: std::fmt::Debug + Send + Sync,
95 Query::Error: std::fmt::Debug + Send + Sync,
96{
97 pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
98 Self {
99 reference: reference.into(),
100 requests: vec![],
101 handler,
102 }
103 }
104
105 pub fn map<MappedType>(
138 self,
139 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
140 ) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
141 MultiRpcBuilder {
142 handler: PostprocessHandler::new(self.handler, map),
143 requests: self.requests,
144 reference: self.reference,
145 }
146 }
147
148 pub fn and_then<MappedType>(
170 self,
171 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
172 + Send
173 + Sync
174 + 'static,
175 ) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
176 MultiRpcBuilder {
177 handler: AndThenHandler::new(self.handler, map),
178 requests: self.requests,
179 reference: self.reference,
180 }
181 }
182
183 pub fn add_query(
186 mut self,
187 request: Arc<
188 dyn RpcType<
189 Response = Query::Response,
190 RpcReference = Query::RpcReference,
191 Error = Query::Error,
192 > + Send
193 + Sync,
194 >,
195 ) -> Self {
196 self.requests.push(request);
197 self
198 }
199
200 pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
202 where
203 Handler2: ResponseHandler<Query = Query> + Send + Sync,
204 {
205 self.requests.push(query_builder.request);
206 self
207 }
208
209 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
211 Self {
212 reference: reference.into(),
213 ..self
214 }
215 }
216
217 #[instrument(skip(self, network), fields(request_count = self.requests.len()))]
219 pub async fn fetch_from(
220 self,
221 network: &NetworkConfig,
222 ) -> ResultWithMethod<Handler::Response, Query::Error> {
223 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
224
225 info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", self.requests.len());
226 let requests = self.requests.into_iter().map(|request| {
227 let reference = &self.reference;
228 async move {
229 retry(network.clone(), |client| {
230 let request = &request;
231
232 async move {
233 let result = request.send_query(&client, network, reference).await;
234
235 tracing::debug!(
236 target: QUERY_EXECUTOR_TARGET,
237 "Querying RPC with {:?} resulted in {:?}",
238 request,
239 result
240 );
241 result
242 }
243 })
244 .await
245 }
246 });
247
248 let requests: Vec<_> = join_all(requests)
249 .await
250 .into_iter()
251 .collect::<Result<_, _>>()?;
252 if requests.is_empty() {
253 error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
254 return Err(QueryError::InternalErrorNoResponse);
255 }
256
257 debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
258 self.handler.process_response(requests)
259 }
260
261 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
263 let network = NetworkConfig::mainnet();
264 self.fetch_from(&network).await
265 }
266
267 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
269 let network = NetworkConfig::testnet();
270 self.fetch_from(&network).await
271 }
272}
273
274pub struct RpcBuilder<Query, Handler>
275where
276 Query: RpcType,
277 Query::Response: std::fmt::Debug + Send + Sync,
278 Query::Error: std::fmt::Debug + Send + Sync,
279 Handler: Send + Sync,
280{
281 reference: Query::RpcReference,
282 request: Arc<
283 dyn RpcType<
284 Response = Query::Response,
285 RpcReference = Query::RpcReference,
286 Error = Query::Error,
287 > + Send
288 + Sync,
289 >,
290 handler: Handler,
291}
292
293impl<Query, Handler> RpcBuilder<Query, Handler>
294where
295 Handler: ResponseHandler<Query = Query> + Send + Sync,
296 Query: RpcType + 'static,
297 Query::Response: std::fmt::Debug + Send + Sync,
298 Query::Error: std::fmt::Debug + Send + Sync,
299{
300 pub fn new(
301 request: Query,
302 reference: impl Into<Query::RpcReference>,
303 handler: Handler,
304 ) -> Self {
305 Self {
306 reference: reference.into(),
307 request: Arc::new(request),
308 handler,
309 }
310 }
311
312 pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
314 Self {
315 reference: reference.into(),
316 ..self
317 }
318 }
319
320 pub fn map<MappedType>(
340 self,
341 map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
342 ) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
343 RpcBuilder {
344 handler: PostprocessHandler::new(self.handler, map),
345 request: self.request,
346 reference: self.reference,
347 }
348 }
349
350 pub fn and_then<MappedType>(
372 self,
373 map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
374 + Send
375 + Sync
376 + 'static,
377 ) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
378 RpcBuilder {
379 handler: AndThenHandler::new(self.handler, map),
380 request: self.request,
381 reference: self.reference,
382 }
383 }
384
385 #[instrument(skip(self, network))]
387 pub async fn fetch_from(
388 self,
389 network: &NetworkConfig,
390 ) -> ResultWithMethod<Handler::Response, Query::Error> {
391 debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
392
393 let query_response = retry(network.clone(), |client| {
394 let request = &self.request;
395 let reference = &self.reference;
396 async move {
397 let result = request.send_query(&client, network, reference).await;
398 tracing::debug!(
399 target: QUERY_EXECUTOR_TARGET,
400 "Querying RPC with {:?} resulted in {:?}",
401 request,
402 result
403 );
404 result
405 }
406 })
407 .await?;
408
409 debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
410 self.handler.process_response(vec![query_response])
411 }
412
413 pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
415 let network = NetworkConfig::mainnet();
416 self.fetch_from(&network).await
417 }
418
419 pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
421 let network = NetworkConfig::testnet();
422 self.fetch_from(&network).await
423 }
424}