use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use near_account_id::AccountId;
use near_crypto::{PublicKey, Signer};
use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
use near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest;
use near_jsonrpc_client::methods::query::RpcQueryRequest;
use near_jsonrpc_client::{methods, JsonRpcClient, MethodCallResult};
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_jsonrpc_primitives::types::transactions::RpcTransactionError;
use near_primitives::errors::{ActionError, ActionErrorKind, InvalidTxError, TxExecutionError};
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{Action, Transaction};
use near_primitives::types::{BlockHeight, Finality, Nonce};
use near_primitives::views::{
AccessKeyView, ExecutionStatusView, FinalExecutionOutcomeView, FinalExecutionStatus,
QueryRequest,
};
pub mod error;
pub mod ops;
pub mod query;
pub mod signer;
use crate::error::Result;
use crate::signer::ExposeAccountId;
pub use crate::error::Error;
pub type CacheKey = (AccountId, PublicKey);
#[derive(Clone, Debug)]
pub struct Client {
rpc_client: JsonRpcClient,
access_key_nonces: Arc<RwLock<HashMap<CacheKey, AtomicU64>>>,
}
impl Client {
pub fn new(rpc_addr: &str) -> Self {
let connector = JsonRpcClient::new_client();
let rpc_client = connector.connect(rpc_addr);
Self::from_client(rpc_client)
}
pub fn from_client(client: JsonRpcClient) -> Self {
Self {
rpc_client: client,
access_key_nonces: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn inner(&self) -> &JsonRpcClient {
&self.rpc_client
}
pub fn inner_mut(&mut self) -> &mut JsonRpcClient {
&mut self.rpc_client
}
pub fn rpc_addr(&self) -> String {
self.rpc_client.server_addr().into()
}
pub async fn send_tx<T: Signer + ExposeAccountId>(
&self,
signer: &T,
receiver_id: &AccountId,
actions: Vec<Action>,
) -> Result<FinalExecutionOutcomeView> {
retry(|| async {
let cache_key = (signer.account_id().clone(), signer.public_key());
let (nonce, block_hash, _) = self.fetch_nonce(&cache_key.0, &cache_key.1).await?;
let result = self
.rpc_client
.call(&RpcBroadcastTxCommitRequest {
signed_transaction: Transaction {
nonce,
block_hash,
signer_id: signer.account_id().clone(),
public_key: signer.public_key(),
receiver_id: receiver_id.clone(),
actions: actions.clone(),
}
.sign(signer),
})
.await;
self.check_and_invalidate_cache(&cache_key, &result).await;
result.map_err(Into::into)
})
.await
}
pub async fn send_tx_async<T: Signer + ExposeAccountId>(
&self,
signer: &T,
receiver_id: &AccountId,
actions: Vec<Action>,
) -> Result<CryptoHash> {
retry(|| async {
let cache_key = (signer.account_id().clone(), signer.public_key());
let (nonce, block_hash, _) = self.fetch_nonce(&cache_key.0, &cache_key.1).await?;
let result = self
.rpc_client
.call(&methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest {
signed_transaction: Transaction {
nonce,
block_hash,
signer_id: signer.account_id().clone(),
public_key: signer.public_key(),
receiver_id: receiver_id.clone(),
actions: actions.clone(),
}
.sign(signer),
})
.await;
if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(_err))) = &result
{
self.invalidate_cache(&cache_key).await;
}
result.map_err(Into::into)
})
.await
}
pub(crate) async fn send<M>(&self, method: M) -> MethodCallResult<M::Response, M::Error>
where
M: methods::RpcMethod + Send + Sync,
M::Response: Send,
M::Error: Send,
{
retry(|| async { self.rpc_client.call(&method).await }).await
}
pub async fn fetch_nonce(
&self,
account_id: &AccountId,
public_key: &PublicKey,
) -> Result<(Nonce, CryptoHash, BlockHeight)> {
fetch_nonce(self, account_id, public_key).await
}
pub async fn access_key(
&self,
account_id: &AccountId,
public_key: &PublicKey,
) -> Result<(AccessKeyView, CryptoHash, BlockHeight)> {
let resp = self
.rpc_client
.call(&RpcQueryRequest {
block_reference: Finality::None.into(),
request: QueryRequest::ViewAccessKey {
account_id: account_id.clone(),
public_key: public_key.clone(),
},
})
.await?;
match resp.kind {
QueryResponseKind::AccessKey(access_key) => {
Ok((access_key, resp.block_hash, resp.block_height))
}
_ => Err(Error::RpcReturnedInvalidData(
"while querying access key".into(),
)),
}
}
pub async fn check_and_invalidate_cache(
&self,
cache_key: &CacheKey,
result: &Result<FinalExecutionOutcomeView, JsonRpcError<RpcTransactionError>>,
) {
if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
RpcTransactionError::InvalidTransaction {
context: InvalidTxError::InvalidNonce { .. },
..
},
))) = result
{
self.invalidate_cache(cache_key).await;
}
let Ok(outcome) = result else {
return;
};
for tx_err in fetch_tx_errs(outcome).await {
let invalid_cache = matches!(
tx_err,
TxExecutionError::ActionError(ActionError {
kind: ActionErrorKind::DelegateActionInvalidNonce { .. },
..
}) | TxExecutionError::InvalidTxError(InvalidTxError::InvalidNonce { .. })
);
if invalid_cache {
self.invalidate_cache(cache_key).await;
}
}
}
pub async fn invalidate_cache(&self, cache_key: &CacheKey) {
let mut nonces = self.access_key_nonces.write().await;
nonces.remove(cache_key);
}
}
impl From<Client> for JsonRpcClient {
fn from(client: Client) -> Self {
client.rpc_client
}
}
async fn fetch_tx_errs(result: &FinalExecutionOutcomeView) -> Vec<&TxExecutionError> {
let mut failures = Vec::new();
if let FinalExecutionStatus::Failure(tx_err) = &result.status {
failures.push(tx_err);
}
if let ExecutionStatusView::Failure(tx_err) = &result.transaction_outcome.outcome.status {
failures.push(tx_err);
}
for receipt in &result.receipts_outcome {
if let ExecutionStatusView::Failure(tx_err) = &receipt.outcome.status {
failures.push(tx_err);
}
}
failures
}
async fn cached_nonce(
nonce: &AtomicU64,
client: &Client,
) -> Result<(Nonce, CryptoHash, BlockHeight)> {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
let block = client.view_block().await?;
Ok((nonce + 1, block.header.hash, block.header.height))
}
async fn fetch_nonce(
client: &Client,
account_id: &AccountId,
public_key: &PublicKey,
) -> Result<(Nonce, CryptoHash, BlockHeight)> {
let cache_key = (account_id.clone(), public_key.clone());
let nonces = client.access_key_nonces.read().await;
if let Some(nonce) = nonces.get(&cache_key) {
cached_nonce(nonce, client).await
} else {
drop(nonces);
let mut nonces = client.access_key_nonces.write().await;
match nonces.entry(cache_key) {
Entry::Occupied(entry) => cached_nonce(entry.get(), client).await,
Entry::Vacant(entry) => {
let (account_id, public_key) = entry.key();
let (access_key, block_hash, block_height) =
client.access_key(account_id, public_key).await?;
entry.insert(AtomicU64::new(access_key.nonce + 1));
Ok((access_key.nonce + 1, block_hash, block_height))
}
}
}
}
async fn retry<R, E, T, F>(task: F) -> T::Output
where
F: FnMut() -> T,
T: core::future::Future<Output = core::result::Result<R, E>>,
{
let retry_strategy = ExponentialBackoff::from_millis(5).map(jitter).take(4);
Retry::spawn(retry_strategy, task).await
}