use async_trait::async_trait;
use futures::future::join_all;
use near_openapi_client::Client;
use std::sync::Arc;
use tracing::{debug, error, info, instrument};
use crate::{
config::{NetworkConfig, RetryResponse, retry},
errors::{ArgumentValidationError, QueryError, SendRequestError},
};
pub mod block_rpc;
pub mod handlers;
pub mod query_request;
pub mod query_rpc;
pub mod tx_rpc;
pub mod validator_rpc;
pub use handlers::*;
const QUERY_EXECUTOR_TARGET: &str = "near_api::query::executor";
type ResultWithMethod<T, E> = core::result::Result<T, QueryError<E>>;
#[async_trait]
pub trait RpcType: Send + Sync + std::fmt::Debug {
type RpcReference: Send + Sync + Clone;
type Response;
type Error: std::fmt::Debug + Send + Sync;
async fn send_query(
&self,
client: &Client,
network: &NetworkConfig,
reference: &Self::RpcReference,
) -> RetryResponse<Self::Response, SendRequestError<Self::Error>>;
}
pub type RequestBuilder<T> = RpcBuilder<<T as ResponseHandler>::Query, T>;
pub type MultiRequestBuilder<T> = MultiRpcBuilder<<T as ResponseHandler>::Query, T>;
pub struct MultiRpcBuilder<Query, Handler>
where
Query: RpcType,
Query::Response: std::fmt::Debug + Send + Sync,
Query::Error: std::fmt::Debug + Send + Sync,
Handler: Send + Sync,
{
reference: Query::RpcReference,
#[allow(clippy::type_complexity)]
requests: Vec<
Result<
Arc<
dyn RpcType<
Response = Query::Response,
RpcReference = Query::RpcReference,
Error = Query::Error,
> + Send
+ Sync,
>,
ArgumentValidationError,
>,
>,
handler: Handler,
}
impl<Query, Handler> MultiRpcBuilder<Query, Handler>
where
Handler: Default + Send + Sync,
Query: RpcType,
Query::Response: std::fmt::Debug + Send + Sync,
Query::Error: std::fmt::Debug + Send + Sync,
{
pub fn with_reference(reference: impl Into<Query::RpcReference>) -> Self {
Self {
reference: reference.into(),
requests: vec![],
handler: Default::default(),
}
}
}
impl<Query, Handler> MultiRpcBuilder<Query, Handler>
where
Handler: ResponseHandler<Query = Query> + Send + Sync,
Query: RpcType,
Query::Response: std::fmt::Debug + Send + Sync,
Query::Error: std::fmt::Debug + Send + Sync,
{
pub fn new(handler: Handler, reference: impl Into<Query::RpcReference>) -> Self {
Self {
reference: reference.into(),
requests: vec![],
handler,
}
}
pub fn map<MappedType>(
self,
map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
) -> MultiRpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
MultiRpcBuilder {
handler: PostprocessHandler::new(self.handler, map),
requests: self.requests,
reference: self.reference,
}
}
pub fn and_then<MappedType>(
self,
map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
) -> MultiRpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
MultiRpcBuilder {
handler: AndThenHandler::new(self.handler, map),
requests: self.requests,
reference: self.reference,
}
}
pub fn add_query(
mut self,
request: Arc<
dyn RpcType<
Response = Query::Response,
RpcReference = Query::RpcReference,
Error = Query::Error,
> + Send
+ Sync,
>,
) -> Self {
self.requests.push(Ok(request));
self
}
pub fn add_query_builder<Handler2>(mut self, query_builder: RpcBuilder<Query, Handler2>) -> Self
where
Handler2: ResponseHandler<Query = Query> + Send + Sync,
{
self.requests.push(query_builder.request);
self
}
pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
Self {
reference: reference.into(),
..self
}
}
#[instrument(skip(self, network), fields(request_count = self.requests.len()))]
pub async fn fetch_from(
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let (requests, errors) =
self.requests
.into_iter()
.fold((vec![], vec![]), |(mut v, mut e), r| {
match r {
Ok(val) => v.push(val),
Err(err) => e.push(err),
}
(v, e)
});
if !errors.is_empty() {
return Err(QueryError::ArgumentValidationError(
ArgumentValidationError::multiple(errors),
));
}
debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
let requests = requests.into_iter().map(|request| {
let reference = &self.reference;
async move {
retry(network.clone(), |client| {
let request = &request;
async move {
let result = request.send_query(&client, network, reference).await;
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
request,
result
);
result
}
})
.await
}
});
let requests: Vec<_> = join_all(requests)
.await
.into_iter()
.collect::<Result<_, _>>()?;
if requests.is_empty() {
error!(target: QUERY_EXECUTOR_TARGET, "No responses received");
return Err(QueryError::InternalErrorNoResponse);
}
debug!(target: QUERY_EXECUTOR_TARGET, "Processing {} responses", requests.len());
self.handler.process_response(requests)
}
pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::mainnet();
self.fetch_from(&network).await
}
pub async fn fetch_from_mainnet_archival(
self,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::mainnet_archival();
self.fetch_from(&network).await
}
pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::testnet();
self.fetch_from(&network).await
}
pub async fn fetch_from_testnet_archival(
self,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::testnet_archival();
self.fetch_from(&network).await
}
}
pub struct RpcBuilder<Query, Handler>
where
Query: RpcType,
Query::Response: std::fmt::Debug + Send + Sync,
Query::Error: std::fmt::Debug + Send + Sync,
Handler: Send + Sync,
{
reference: Query::RpcReference,
#[allow(clippy::type_complexity)]
request: Result<
Arc<
dyn RpcType<
Response = Query::Response,
RpcReference = Query::RpcReference,
Error = Query::Error,
> + Send
+ Sync,
>,
ArgumentValidationError,
>,
handler: Handler,
}
impl<Query, Handler> RpcBuilder<Query, Handler>
where
Handler: ResponseHandler<Query = Query> + Send + Sync,
Query: RpcType + 'static,
Query::Response: std::fmt::Debug + Send + Sync,
Query::Error: std::fmt::Debug + Send + Sync,
{
pub fn new(
request: Query,
reference: impl Into<Query::RpcReference>,
handler: Handler,
) -> Self {
Self {
reference: reference.into(),
request: Ok(Arc::new(request)),
handler,
}
}
pub fn with_deferred_error(mut self, error: ArgumentValidationError) -> Self {
self.request = Err(error);
self
}
pub fn at(self, reference: impl Into<Query::RpcReference>) -> Self {
Self {
reference: reference.into(),
..self
}
}
pub fn map<MappedType>(
self,
map: impl Fn(Handler::Response) -> MappedType + Send + Sync + 'static,
) -> RpcBuilder<Query, PostprocessHandler<MappedType, Handler>> {
RpcBuilder {
handler: PostprocessHandler::new(self.handler, map),
request: self.request,
reference: self.reference,
}
}
pub fn and_then<MappedType>(
self,
map: impl Fn(Handler::Response) -> Result<MappedType, Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
) -> RpcBuilder<Query, AndThenHandler<MappedType, Handler>> {
RpcBuilder {
handler: AndThenHandler::new(self.handler, map),
request: self.request,
reference: self.reference,
}
}
#[instrument(skip(self, network))]
pub async fn fetch_from(
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let request = self.request?;
debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
let query_response = retry(network.clone(), |client| {
let request = &request;
let reference = &self.reference;
async move {
let result = request.send_query(&client, network, reference).await;
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
request,
result
);
result
}
})
.await?;
debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
self.handler.process_response(vec![query_response])
}
pub async fn fetch_from_mainnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::mainnet();
self.fetch_from(&network).await
}
pub async fn fetch_from_mainnet_archival(
self,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::mainnet_archival();
self.fetch_from(&network).await
}
pub async fn fetch_from_testnet(self) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::testnet();
self.fetch_from(&network).await
}
pub async fn fetch_from_testnet_archival(
self,
) -> ResultWithMethod<Handler::Response, Query::Error> {
let network = NetworkConfig::testnet_archival();
self.fetch_from(&network).await
}
}