use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use near_account_id::AccountId;
use near_crypto::{InMemorySigner, PublicKey, Signer};
use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
use near_jsonrpc_client::methods::block::RpcBlockRequest;
use near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest;
use near_jsonrpc_client::methods::query::RpcQueryRequest;
use near_jsonrpc_client::JsonRpcClient;
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_jsonrpc_primitives::types::transactions::RpcTransactionError;
use near_primitives::errors::{ActionError, ActionErrorKind, InvalidTxError, TxExecutionError};
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{Action, SignedTransaction};
use near_primitives::types::{BlockReference, Finality, Nonce};
use near_primitives::views::{
AccessKeyView, BlockView, FinalExecutionOutcomeView, FinalExecutionStatus, QueryRequest,
};
pub mod error;
use crate::error::{Error, Result};
pub type CacheKey = (AccountId, PublicKey);
pub struct Client {
rpc_client: JsonRpcClient,
access_key_nonces: RwLock<HashMap<CacheKey, AtomicU64>>,
}
impl Client {
pub fn new(rpc_addr: &str) -> Self {
let connector = JsonRpcClient::new_client();
let rpc_client = connector.connect(rpc_addr);
Self::from_client(rpc_client)
}
pub fn from_client(client: JsonRpcClient) -> Self {
Self {
rpc_client: client,
access_key_nonces: RwLock::new(HashMap::new()),
}
}
pub fn rpc_addr(&self) -> String {
self.rpc_client.server_addr().into()
}
pub async fn send_tx(
&self,
signer: &InMemorySigner,
receiver_id: &AccountId,
actions: Vec<Action>,
) -> Result<FinalExecutionOutcomeView> {
let cache_key = (signer.account_id.clone(), signer.public_key());
retry(|| async {
let (block_hash, nonce) = fetch_tx_nonce(self, &cache_key).await?;
let result = self
.rpc_client
.call(&RpcBroadcastTxCommitRequest {
signed_transaction: SignedTransaction::from_actions(
nonce,
signer.account_id.clone(),
receiver_id.clone(),
signer as &dyn Signer,
actions.clone(),
block_hash,
),
})
.await;
self.check_and_invalidate_cache(&cache_key, &result).await;
result.map_err(Into::into)
})
.await
}
pub async fn view<T: Serialize, R: DeserializeOwned>(
&self,
receiver_id: &AccountId,
function_name: &str,
args: T,
) -> Result<R> {
let args = match serde_json::to_vec(&args) {
Ok(args) => args,
Err(e) => return Err(Error::SerializeError(e)),
};
let resp = self
.rpc_client
.call(RpcQueryRequest {
block_reference: Finality::Final.into(),
request: QueryRequest::CallFunction {
account_id: receiver_id.clone(),
method_name: function_name.into(),
args: args.into(),
},
})
.await?;
let QueryResponseKind::CallResult(resp) = resp.kind else {
return Err(Error::RpcReturnedInvalidData("while querying view"));
};
Ok(serde_json::from_slice(&resp.result)?)
}
pub async fn access_key(
&self,
account_id: &AccountId,
public_key: &PublicKey,
) -> Result<(AccessKeyView, CryptoHash)> {
let query_resp = self
.rpc_client
.call(&RpcQueryRequest {
block_reference: Finality::None.into(),
request: QueryRequest::ViewAccessKey {
account_id: account_id.clone(),
public_key: public_key.clone(),
},
})
.await?;
match query_resp.kind {
QueryResponseKind::AccessKey(access_key) => Ok((access_key, query_resp.block_hash)),
_ => Err(Error::RpcReturnedInvalidData("while querying access key")),
}
}
pub async fn view_block(&self, block_reference: BlockReference) -> Result<BlockView> {
self.rpc_client
.call(&RpcBlockRequest { block_reference })
.await
.map_err(Into::into)
}
async fn check_and_invalidate_cache(
&self,
cache_key: &CacheKey,
result: &Result<FinalExecutionOutcomeView, JsonRpcError<RpcTransactionError>>,
) {
if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
RpcTransactionError::InvalidTransaction {
context: InvalidTxError::InvalidNonce { .. },
..
},
))) = result
{
self.invalidate_cache(cache_key).await;
}
let Ok(outcome) = result else {
return;
};
let FinalExecutionStatus::Failure(tx_err) = &outcome.status else {
return;
};
let invalid_cache = matches!(
tx_err,
TxExecutionError::ActionError(ActionError {
kind: ActionErrorKind::DelegateActionInvalidNonce { .. },
..
}) | TxExecutionError::InvalidTxError(InvalidTxError::InvalidNonce { .. })
);
if invalid_cache {
self.invalidate_cache(cache_key).await;
}
}
async fn invalidate_cache(&self, cache_key: &CacheKey) {
let mut nonces = self.access_key_nonces.write().await;
nonces.remove(cache_key);
}
}
async fn cached_nonce(nonce: &AtomicU64, client: &Client) -> Result<(CryptoHash, Nonce)> {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
let block = client.view_block(Finality::Final.into()).await?;
let block_hash = block.header.hash;
Ok((block_hash, nonce + 1))
}
async fn fetch_tx_nonce(client: &Client, cache_key: &CacheKey) -> Result<(CryptoHash, Nonce)> {
let nonces = client.access_key_nonces.read().await;
if let Some(nonce) = nonces.get(cache_key) {
cached_nonce(nonce, client).await
} else {
drop(nonces);
let mut nonces = client.access_key_nonces.write().await;
match nonces.entry(cache_key.clone()) {
Entry::Occupied(entry) => cached_nonce(entry.get(), client).await,
Entry::Vacant(entry) => {
let (account_id, public_key) = entry.key();
let (access_key, block_hash) = client.access_key(account_id, public_key).await?;
entry.insert(AtomicU64::new(access_key.nonce + 1));
Ok((block_hash, access_key.nonce + 1))
}
}
}
}
async fn retry<R, E, T, F>(task: F) -> T::Output
where
F: FnMut() -> T,
T: core::future::Future<Output = core::result::Result<R, E>>,
{
let retry_strategy = ExponentialBackoff::from_millis(5).map(jitter).take(4);
Retry::spawn(retry_strategy, task).await
}