use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use near_account_id::AccountId;
use near_crypto::{InMemorySigner, PublicKey, Signer};
use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
use near_jsonrpc_client::methods::block::RpcBlockRequest;
use near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest;
use near_jsonrpc_client::methods::query::RpcQueryRequest;
use near_jsonrpc_client::JsonRpcClient;
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_jsonrpc_primitives::types::transactions::RpcTransactionError;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{Action, SignedTransaction};
use near_primitives::types::{BlockReference, Finality, Nonce};
use near_primitives::views::{AccessKeyView, BlockView, FinalExecutionOutcomeView, QueryRequest};
pub mod error;
use crate::error::{Error, Result};
pub type CacheKey = (AccountId, PublicKey);
pub struct Client {
rpc_addr: String,
rpc_client: JsonRpcClient,
access_key_nonces: RwLock<HashMap<CacheKey, AtomicU64>>,
}
impl Client {
pub fn new(rpc_addr: &str) -> Self {
let connector = JsonRpcClient::new_client();
let rpc_client = connector.connect(rpc_addr);
Self {
rpc_client,
rpc_addr: rpc_addr.into(),
access_key_nonces: RwLock::new(HashMap::new()),
}
}
pub fn from_client(client: JsonRpcClient) -> Self {
Self {
rpc_addr: client.server_addr().to_string(),
rpc_client: client,
access_key_nonces: RwLock::new(HashMap::new()),
}
}
pub fn rpc_addr(&self) -> String {
self.rpc_addr.clone()
}
pub async fn send_tx(
&self,
signer: &InMemorySigner,
receiver_id: &AccountId,
actions: Vec<Action>,
) -> Result<FinalExecutionOutcomeView> {
let cache_key = (signer.account_id.clone(), signer.public_key());
retry(|| async {
let (block_hash, nonce) = fetch_tx_nonce(self, &cache_key).await?;
let result = self
.rpc_client
.call(&RpcBroadcastTxCommitRequest {
signed_transaction: SignedTransaction::from_actions(
nonce,
signer.account_id.clone(),
receiver_id.clone(),
signer as &dyn Signer,
actions.clone(),
block_hash,
),
})
.await;
if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
RpcTransactionError::InvalidTransaction {
context: InvalidTxError::InvalidNonce { .. },
..
},
))) = &result
{
let mut nonces = self.access_key_nonces.write().await;
nonces.remove(&cache_key);
}
result.map_err(Into::into)
})
.await
}
pub async fn view<T: Serialize, R: DeserializeOwned>(
&self,
receiver_id: &AccountId,
function_name: &str,
args: T,
) -> Result<R> {
let args = match serde_json::to_vec(&args) {
Ok(args) => args,
Err(e) => return Err(Error::SerializeError(e)),
};
let resp = self
.rpc_client
.call(RpcQueryRequest {
block_reference: Finality::Final.into(),
request: QueryRequest::CallFunction {
account_id: receiver_id.clone(),
method_name: function_name.into(),
args: args.into(),
},
})
.await?;
let QueryResponseKind::CallResult(resp) = resp.kind else {
return Err(Error::RpcReturnedInvalidData("while querying account"));
};
Ok(serde_json::from_slice(&resp.result)?)
}
pub async fn access_key(
&self,
account_id: &AccountId,
public_key: &PublicKey,
) -> Result<(AccessKeyView, CryptoHash)> {
let query_resp = self
.rpc_client
.call(&RpcQueryRequest {
block_reference: Finality::None.into(),
request: QueryRequest::ViewAccessKey {
account_id: account_id.clone(),
public_key: public_key.clone(),
},
})
.await?;
match query_resp.kind {
QueryResponseKind::AccessKey(access_key) => Ok((access_key, query_resp.block_hash)),
_ => Err(Error::RpcReturnedInvalidData("while querying access key")),
}
}
pub async fn view_block(&self, block_reference: BlockReference) -> Result<BlockView> {
self.rpc_client
.call(&RpcBlockRequest { block_reference })
.await
.map_err(Into::into)
}
}
async fn cached_nonce(nonce: &AtomicU64, client: &Client) -> Result<(CryptoHash, Nonce)> {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
let block = client.view_block(Finality::Final.into()).await?;
let block_hash = block.header.hash;
Ok((block_hash, nonce + 1))
}
async fn fetch_tx_nonce(client: &Client, cache_key: &CacheKey) -> Result<(CryptoHash, Nonce)> {
let nonces = client.access_key_nonces.read().await;
if let Some(nonce) = nonces.get(cache_key) {
cached_nonce(nonce, client).await
} else {
drop(nonces);
let mut nonces = client.access_key_nonces.write().await;
match nonces.entry(cache_key.clone()) {
Entry::Occupied(entry) => cached_nonce(entry.get(), client).await,
Entry::Vacant(entry) => {
let (account_id, public_key) = entry.key();
let (access_key, block_hash) = client.access_key(account_id, public_key).await?;
entry.insert(AtomicU64::new(access_key.nonce + 1));
Ok((block_hash, access_key.nonce + 1))
}
}
}
}
async fn retry<R, E, T, F>(task: F) -> T::Output
where
F: FnMut() -> T,
T: core::future::Future<Output = core::result::Result<R, E>>,
{
let retry_strategy = ExponentialBackoff::from_millis(5).map(jitter).take(4);
Retry::spawn(retry_strategy.clone(), task).await
}