1use std::collections::hash_map::Entry;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use near_jsonrpc_client::methods::tx::RpcTransactionResponse;
7use tokio::sync::RwLock;
8
9use near_account_id::AccountId;
10use near_crypto::PublicKey;
11use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
12use near_jsonrpc_client::methods::query::RpcQueryRequest;
13use near_jsonrpc_client::{methods, JsonRpcClient, MethodCallResult};
14use near_jsonrpc_primitives::types::query::QueryResponseKind;
15use near_jsonrpc_primitives::types::transactions::RpcTransactionError;
16use near_primitives::errors::{ActionError, ActionErrorKind, InvalidTxError, TxExecutionError};
17use near_primitives::hash::CryptoHash;
18use near_primitives::transaction::{Action, SignedTransaction, Transaction};
19use near_primitives::types::{BlockHeight, Finality, Nonce};
20use near_primitives::views::{
21 AccessKeyView, ExecutionStatusView, FinalExecutionOutcomeView, FinalExecutionOutcomeViewEnum,
22 FinalExecutionStatus, QueryRequest, TxExecutionStatus,
23};
24
25pub mod error;
26pub mod ops;
27pub mod query;
28pub mod result;
29pub mod signer;
30
31use crate::error::Result;
32use crate::ops::RetryableTransaction;
33use crate::signer::SignerExt;
34
35pub use crate::error::Error;
36
37pub type CacheKey = (AccountId, PublicKey);
39
40#[derive(Clone, Debug)]
42pub struct Client {
43 rpc_client: JsonRpcClient,
44 access_key_nonces: Arc<RwLock<HashMap<CacheKey, AtomicU64>>>,
46}
47
48impl Client {
49 pub fn new(rpc_addr: &str) -> Self {
51 let connector = JsonRpcClient::new_client();
52 let rpc_client = connector.connect(rpc_addr);
53 Self::from_client(rpc_client)
54 }
55
56 pub fn from_client(client: JsonRpcClient) -> Self {
58 Self {
59 rpc_client: client,
60 access_key_nonces: Arc::new(RwLock::new(HashMap::new())),
61 }
62 }
63
64 pub fn inner(&self) -> &JsonRpcClient {
66 &self.rpc_client
67 }
68
69 pub fn inner_mut(&mut self) -> &mut JsonRpcClient {
71 &mut self.rpc_client
72 }
73
74 pub fn rpc_addr(&self) -> String {
76 self.rpc_client.server_addr().into()
77 }
78
79 pub fn send_tx<'a>(
83 &self,
84 signer: &'a dyn SignerExt,
85 receiver_id: &AccountId,
86 actions: Vec<Action>,
87 ) -> RetryableTransaction<'a> {
88 RetryableTransaction {
89 client: self.clone(),
90 signer,
91 actions: Ok(actions),
92 receiver_id: receiver_id.clone(),
93 strategy: None,
94 wait_until: TxExecutionStatus::default(),
95 }
96 }
97
98 pub(crate) async fn sign_tx(
99 &self,
100 signer: &dyn SignerExt,
101 receiver_id: &AccountId,
102 actions: Vec<Action>,
103 ) -> Result<SignedTransaction> {
104 let pk = signer.public_key();
105 let (nonce, block_hash, _) = self.fetch_nonce(signer.account_id(), &pk).await?;
106
107 let tx = Transaction::V0(near_primitives::transaction::TransactionV0 {
108 nonce,
109 signer_id: signer.account_id().clone(),
110 public_key: pk,
111 receiver_id: receiver_id.clone(),
112 block_hash,
113 actions,
114 });
115
116 let signature = signer.sign(tx.get_hash_and_size().0.as_ref());
117 Ok(SignedTransaction::new(signature, tx))
118 }
119
120 pub(crate) async fn send_tx_once(
122 &self,
123 signer: &dyn SignerExt,
124 receiver_id: &AccountId,
125 actions: Vec<Action>,
126 wait_until: TxExecutionStatus,
127 ) -> Result<FinalExecutionOutcomeView> {
128 let cache_key = (signer.account_id().clone(), signer.public_key());
129 let signed_transaction = self.sign_tx(signer, receiver_id, actions).await?;
130
131 let result = self
132 .rpc_client
133 .call(&methods::send_tx::RpcSendTransactionRequest {
134 signed_transaction,
135 wait_until,
136 })
137 .await;
138
139 self.check_and_invalidate_cache(&cache_key, &result).await;
140
141 let rpc_response = result.map_err(Error::from)?;
142 let outcome = rpc_response.final_execution_outcome.ok_or_else(|| {
143 Error::RpcReturnedInvalidData("Missing final execution outcome".to_string())
144 })?;
145 Ok(outcome.into_outcome())
146 }
147
148 pub async fn send_tx_async(
152 &self,
153 signer: &dyn SignerExt,
154 receiver_id: &AccountId,
155 actions: Vec<Action>,
156 ) -> Result<CryptoHash> {
157 let cache_key = (signer.account_id().clone(), signer.public_key());
160 let signed_transaction = self.sign_tx(signer, receiver_id, actions).await?;
161 let tx_hash = signed_transaction.get_hash();
162
163 let result = self
164 .rpc_client
165 .call(&methods::send_tx::RpcSendTransactionRequest {
166 signed_transaction,
167 wait_until: TxExecutionStatus::None,
168 })
169 .await;
170
171 if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(_err))) = &result {
172 self.invalidate_cache(&cache_key).await;
174 }
175
176 result.map_err(Error::from)?;
177 Ok(tx_hash)
178 }
179
180 pub(crate) async fn send_query<M>(&self, method: &M) -> MethodCallResult<M::Response, M::Error>
182 where
183 M: methods::RpcMethod + Send + Sync,
184 M::Response: Send,
185 M::Error: Send,
186 {
187 self.rpc_client.call(method).await
188 }
189
190 pub async fn fetch_nonce(
194 &self,
195 account_id: &AccountId,
196 public_key: &PublicKey,
197 ) -> Result<(Nonce, CryptoHash, BlockHeight)> {
198 fetch_nonce(self, account_id, public_key).await
199 }
200
201 pub async fn access_key(
203 &self,
204 account_id: &AccountId,
205 public_key: &PublicKey,
206 ) -> Result<(AccessKeyView, CryptoHash, BlockHeight)> {
207 let resp = self
208 .rpc_client
209 .call(&RpcQueryRequest {
210 block_reference: Finality::None.into(),
212 request: QueryRequest::ViewAccessKey {
213 account_id: account_id.clone(),
214 public_key: public_key.clone(),
215 },
216 })
217 .await?;
218
219 match resp.kind {
220 QueryResponseKind::AccessKey(access_key) => {
221 Ok((access_key, resp.block_hash, resp.block_height))
222 }
223 _ => Err(Error::RpcReturnedInvalidData(
224 "while querying access key".into(),
225 )),
226 }
227 }
228
229 pub async fn check_and_invalidate_cache(
230 &self,
231 cache_key: &CacheKey,
232 result: &Result<RpcTransactionResponse, JsonRpcError<RpcTransactionError>>,
233 ) {
234 if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
236 RpcTransactionError::InvalidTransaction {
237 context: InvalidTxError::InvalidNonce { .. },
238 ..
239 },
240 ))) = result
241 {
242 self.invalidate_cache(cache_key).await;
243 }
244
245 let Ok(outcome) = result else {
246 return;
247 };
248 for tx_err in fetch_tx_errs(outcome).await {
249 let invalid_cache = matches!(
250 tx_err,
251 TxExecutionError::ActionError(ActionError {
252 kind: ActionErrorKind::DelegateActionInvalidNonce { .. },
253 ..
254 }) | TxExecutionError::InvalidTxError(InvalidTxError::InvalidNonce { .. })
255 );
256 if invalid_cache {
257 self.invalidate_cache(cache_key).await;
258 }
259 }
260 }
261
262 pub async fn invalidate_cache(&self, cache_key: &CacheKey) {
263 let mut nonces = self.access_key_nonces.write().await;
264 nonces.remove(cache_key);
265 }
266
267 pub async fn status_tx_async(
269 &self,
270 sender_id: &AccountId,
271 tx_hash: CryptoHash,
272 wait_until: TxExecutionStatus,
273 ) -> Result<FinalExecutionOutcomeView, Error> {
274 let response = self
275 .rpc_client
276 .call(methods::tx::RpcTransactionStatusRequest {
277 transaction_info: methods::tx::TransactionInfo::TransactionId {
278 sender_account_id: sender_id.clone(),
279 tx_hash,
280 },
281 wait_until,
282 })
283 .await
284 .map_err(Error::from)?;
285
286 if matches!(
287 response.final_execution_status,
288 TxExecutionStatus::None | TxExecutionStatus::Included
289 ) {
290 return Err(Error::RpcTransactionPending);
291 }
292
293 let outcome = response
294 .final_execution_outcome
295 .ok_or_else(|| {
296 Error::RpcReturnedInvalidData("Missing final execution outcome".to_string())
297 })?
298 .into_outcome();
299 Ok(outcome)
300 }
301}
302
303impl From<Client> for JsonRpcClient {
304 fn from(client: Client) -> Self {
305 client.rpc_client
306 }
307}
308
309async fn fetch_tx_errs(result: &RpcTransactionResponse) -> Vec<&TxExecutionError> {
310 let mut failures = Vec::new();
311 let Some(outcome) = result.final_execution_outcome.as_ref() else {
312 return failures;
313 };
314 let outcome = match outcome {
315 FinalExecutionOutcomeViewEnum::FinalExecutionOutcome(outcome) => outcome,
316 FinalExecutionOutcomeViewEnum::FinalExecutionOutcomeWithReceipt(outcome) => {
317 &outcome.final_outcome
318 }
319 };
320
321 if let FinalExecutionStatus::Failure(tx_err) = &outcome.status {
322 failures.push(tx_err);
323 }
324 if let ExecutionStatusView::Failure(tx_err) = &outcome.transaction_outcome.outcome.status {
325 failures.push(tx_err);
326 }
327 for receipt in &outcome.receipts_outcome {
328 if let ExecutionStatusView::Failure(tx_err) = &receipt.outcome.status {
329 failures.push(tx_err);
330 }
331 }
332 failures
333}
334
335async fn cached_nonce(
336 nonce: &AtomicU64,
337 client: &Client,
338) -> Result<(Nonce, CryptoHash, BlockHeight)> {
339 let nonce = nonce.fetch_add(1, Ordering::SeqCst);
340
341 let block = client.view_block().await?;
343 Ok((nonce + 1, block.header.hash, block.header.height))
344}
345
346async fn fetch_nonce(
350 client: &Client,
351 account_id: &AccountId,
352 public_key: &PublicKey,
353) -> Result<(Nonce, CryptoHash, BlockHeight)> {
354 let cache_key = (account_id.clone(), public_key.clone());
355 let nonces = client.access_key_nonces.read().await;
356 if let Some(nonce) = nonces.get(&cache_key) {
357 cached_nonce(nonce, client).await
358 } else {
359 drop(nonces);
360 let mut nonces = client.access_key_nonces.write().await;
361 match nonces.entry(cache_key) {
362 Entry::Occupied(entry) => cached_nonce(entry.get(), client).await,
365
366 Entry::Vacant(entry) => {
368 let (account_id, public_key) = entry.key();
369 let (access_key, block_hash, block_height) =
370 client.access_key(account_id, public_key).await?;
371 entry.insert(AtomicU64::new(access_key.nonce + 1));
372 Ok((access_key.nonce + 1, block_hash, block_height))
373 }
374 }
375 }
376}