near_fetch/
lib.rs

1use std::collections::hash_map::Entry;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use near_jsonrpc_client::methods::tx::RpcTransactionResponse;
7use tokio::sync::RwLock;
8
9use near_account_id::AccountId;
10use near_crypto::PublicKey;
11use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
12use near_jsonrpc_client::methods::query::RpcQueryRequest;
13use near_jsonrpc_client::{methods, JsonRpcClient, MethodCallResult};
14use near_jsonrpc_primitives::types::query::QueryResponseKind;
15use near_jsonrpc_primitives::types::transactions::RpcTransactionError;
16use near_primitives::errors::{ActionError, ActionErrorKind, InvalidTxError, TxExecutionError};
17use near_primitives::hash::CryptoHash;
18use near_primitives::transaction::{Action, SignedTransaction, Transaction};
19use near_primitives::types::{BlockHeight, Finality, Nonce};
20use near_primitives::views::{
21    AccessKeyView, ExecutionStatusView, FinalExecutionOutcomeView, FinalExecutionOutcomeViewEnum,
22    FinalExecutionStatus, QueryRequest, TxExecutionStatus,
23};
24
25pub mod error;
26pub mod ops;
27pub mod query;
28pub mod result;
29pub mod signer;
30
31use crate::error::Result;
32use crate::ops::RetryableTransaction;
33use crate::signer::SignerExt;
34
35pub use crate::error::Error;
36
37/// Cache key for access key nonces.
38pub type CacheKey = (AccountId, PublicKey);
39
40/// Client that implements exponential retrying and caching of access key nonces.
41#[derive(Clone, Debug)]
42pub struct Client {
43    rpc_client: JsonRpcClient,
44    /// AccessKey nonces to reference when sending transactions.
45    access_key_nonces: Arc<RwLock<HashMap<CacheKey, AtomicU64>>>,
46}
47
48impl Client {
49    /// Construct a new [`Client`] with the given RPC address.
50    pub fn new(rpc_addr: &str) -> Self {
51        let connector = JsonRpcClient::new_client();
52        let rpc_client = connector.connect(rpc_addr);
53        Self::from_client(rpc_client)
54    }
55
56    /// Construct a [`Client`] from an existing [`JsonRpcClient`].
57    pub fn from_client(client: JsonRpcClient) -> Self {
58        Self {
59            rpc_client: client,
60            access_key_nonces: Arc::new(RwLock::new(HashMap::new())),
61        }
62    }
63
64    /// Internal reference to the [`JsonRpcClient`] that is utilized for all RPC calls.
65    pub fn inner(&self) -> &JsonRpcClient {
66        &self.rpc_client
67    }
68
69    /// Internal mutable reference to the [`JsonRpcClient`] that is utilized for all RPC calls.
70    pub fn inner_mut(&mut self) -> &mut JsonRpcClient {
71        &mut self.rpc_client
72    }
73
74    /// The RPC address the client is connected to.
75    pub fn rpc_addr(&self) -> String {
76        self.rpc_client.server_addr().into()
77    }
78
79    /// Send a series of [`Action`]s as a [`SignedTransaction`] to the network.
80    /// This gives us a transaction is that retryable. To retry, simply add in a `.retry_*`
81    /// method call to the end of the chain before an `.await` gets invoked.
82    pub fn send_tx<'a>(
83        &self,
84        signer: &'a dyn SignerExt,
85        receiver_id: &AccountId,
86        actions: Vec<Action>,
87    ) -> RetryableTransaction<'a> {
88        RetryableTransaction {
89            client: self.clone(),
90            signer,
91            actions: Ok(actions),
92            receiver_id: receiver_id.clone(),
93            strategy: None,
94            wait_until: TxExecutionStatus::default(),
95        }
96    }
97
98    pub(crate) async fn sign_tx(
99        &self,
100        signer: &dyn SignerExt,
101        receiver_id: &AccountId,
102        actions: Vec<Action>,
103    ) -> Result<SignedTransaction> {
104        let pk = signer.public_key();
105        let (nonce, block_hash, _) = self.fetch_nonce(signer.account_id(), &pk).await?;
106
107        let tx = Transaction::V0(near_primitives::transaction::TransactionV0 {
108            nonce,
109            signer_id: signer.account_id().clone(),
110            public_key: pk,
111            receiver_id: receiver_id.clone(),
112            block_hash,
113            actions,
114        });
115
116        let signature = signer.sign(tx.get_hash_and_size().0.as_ref());
117        Ok(SignedTransaction::new(signature, tx))
118    }
119
120    /// Send the transaction only once. No retrying involved.
121    pub(crate) async fn send_tx_once(
122        &self,
123        signer: &dyn SignerExt,
124        receiver_id: &AccountId,
125        actions: Vec<Action>,
126        wait_until: TxExecutionStatus,
127    ) -> Result<FinalExecutionOutcomeView> {
128        let cache_key = (signer.account_id().clone(), signer.public_key());
129        let signed_transaction = self.sign_tx(signer, receiver_id, actions).await?;
130
131        let result = self
132            .rpc_client
133            .call(&methods::send_tx::RpcSendTransactionRequest {
134                signed_transaction,
135                wait_until,
136            })
137            .await;
138
139        self.check_and_invalidate_cache(&cache_key, &result).await;
140
141        let rpc_response = result.map_err(Error::from)?;
142        let outcome = rpc_response.final_execution_outcome.ok_or_else(|| {
143            Error::RpcReturnedInvalidData("Missing final execution outcome".to_string())
144        })?;
145        Ok(outcome.into_outcome())
146    }
147
148    /// Send a series of [`Action`]s as a [`SignedTransaction`] to the network. This is an async
149    /// operation, where a hash is returned to reference the transaction in the future and check
150    /// its status.
151    pub async fn send_tx_async(
152        &self,
153        signer: &dyn SignerExt,
154        receiver_id: &AccountId,
155        actions: Vec<Action>,
156    ) -> Result<CryptoHash> {
157        // Note, the cache key's public-key part can be different per retry loop. For instance,
158        // KeyRotatingSigner rotates secret_key and public_key after each `Signer::sign` call.
159        let cache_key = (signer.account_id().clone(), signer.public_key());
160        let signed_transaction = self.sign_tx(signer, receiver_id, actions).await?;
161        let tx_hash = signed_transaction.get_hash();
162
163        let result = self
164            .rpc_client
165            .call(&methods::send_tx::RpcSendTransactionRequest {
166                signed_transaction,
167                wait_until: TxExecutionStatus::None,
168            })
169            .await;
170
171        if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(_err))) = &result {
172            // RpcBroadcastTxAsyncError should not be returned. If it does, invalidate the cache just in case.
173            self.invalidate_cache(&cache_key).await;
174        }
175
176        result.map_err(Error::from)?;
177        Ok(tx_hash)
178    }
179
180    /// Send a JsonRpc method to the network.
181    pub(crate) async fn send_query<M>(&self, method: &M) -> MethodCallResult<M::Response, M::Error>
182    where
183        M: methods::RpcMethod + Send + Sync,
184        M::Response: Send,
185        M::Error: Send,
186    {
187        self.rpc_client.call(method).await
188    }
189
190    /// Fetches the nonce associated to the account id and public key, which essentially is the
191    /// access key for the given account ID and public key. Utilize caching underneath to
192    /// prevent querying for the same access key multiple times.
193    pub async fn fetch_nonce(
194        &self,
195        account_id: &AccountId,
196        public_key: &PublicKey,
197    ) -> Result<(Nonce, CryptoHash, BlockHeight)> {
198        fetch_nonce(self, account_id, public_key).await
199    }
200
201    /// Fetches the access key for the given account ID and public key.
202    pub async fn access_key(
203        &self,
204        account_id: &AccountId,
205        public_key: &PublicKey,
206    ) -> Result<(AccessKeyView, CryptoHash, BlockHeight)> {
207        let resp = self
208            .rpc_client
209            .call(&RpcQueryRequest {
210                // Finality::None => Optimistic query for access key
211                block_reference: Finality::None.into(),
212                request: QueryRequest::ViewAccessKey {
213                    account_id: account_id.clone(),
214                    public_key: public_key.clone(),
215                },
216            })
217            .await?;
218
219        match resp.kind {
220            QueryResponseKind::AccessKey(access_key) => {
221                Ok((access_key, resp.block_hash, resp.block_height))
222            }
223            _ => Err(Error::RpcReturnedInvalidData(
224                "while querying access key".into(),
225            )),
226        }
227    }
228
229    pub async fn check_and_invalidate_cache(
230        &self,
231        cache_key: &CacheKey,
232        result: &Result<RpcTransactionResponse, JsonRpcError<RpcTransactionError>>,
233    ) {
234        // InvalidNonce, cached nonce is potentially very far behind, so invalidate it.
235        if let Err(JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
236            RpcTransactionError::InvalidTransaction {
237                context: InvalidTxError::InvalidNonce { .. },
238                ..
239            },
240        ))) = result
241        {
242            self.invalidate_cache(cache_key).await;
243        }
244
245        let Ok(outcome) = result else {
246            return;
247        };
248        for tx_err in fetch_tx_errs(outcome).await {
249            let invalid_cache = matches!(
250                tx_err,
251                TxExecutionError::ActionError(ActionError {
252                    kind: ActionErrorKind::DelegateActionInvalidNonce { .. },
253                    ..
254                }) | TxExecutionError::InvalidTxError(InvalidTxError::InvalidNonce { .. })
255            );
256            if invalid_cache {
257                self.invalidate_cache(cache_key).await;
258            }
259        }
260    }
261
262    pub async fn invalidate_cache(&self, cache_key: &CacheKey) {
263        let mut nonces = self.access_key_nonces.write().await;
264        nonces.remove(cache_key);
265    }
266
267    /// Fetches the status of a transaction given the transaction hash.
268    pub async fn status_tx_async(
269        &self,
270        sender_id: &AccountId,
271        tx_hash: CryptoHash,
272        wait_until: TxExecutionStatus,
273    ) -> Result<FinalExecutionOutcomeView, Error> {
274        let response = self
275            .rpc_client
276            .call(methods::tx::RpcTransactionStatusRequest {
277                transaction_info: methods::tx::TransactionInfo::TransactionId {
278                    sender_account_id: sender_id.clone(),
279                    tx_hash,
280                },
281                wait_until,
282            })
283            .await
284            .map_err(Error::from)?;
285
286        if matches!(
287            response.final_execution_status,
288            TxExecutionStatus::None | TxExecutionStatus::Included
289        ) {
290            return Err(Error::RpcTransactionPending);
291        }
292
293        let outcome = response
294            .final_execution_outcome
295            .ok_or_else(|| {
296                Error::RpcReturnedInvalidData("Missing final execution outcome".to_string())
297            })?
298            .into_outcome();
299        Ok(outcome)
300    }
301}
302
303impl From<Client> for JsonRpcClient {
304    fn from(client: Client) -> Self {
305        client.rpc_client
306    }
307}
308
309async fn fetch_tx_errs(result: &RpcTransactionResponse) -> Vec<&TxExecutionError> {
310    let mut failures = Vec::new();
311    let Some(outcome) = result.final_execution_outcome.as_ref() else {
312        return failures;
313    };
314    let outcome = match outcome {
315        FinalExecutionOutcomeViewEnum::FinalExecutionOutcome(outcome) => outcome,
316        FinalExecutionOutcomeViewEnum::FinalExecutionOutcomeWithReceipt(outcome) => {
317            &outcome.final_outcome
318        }
319    };
320
321    if let FinalExecutionStatus::Failure(tx_err) = &outcome.status {
322        failures.push(tx_err);
323    }
324    if let ExecutionStatusView::Failure(tx_err) = &outcome.transaction_outcome.outcome.status {
325        failures.push(tx_err);
326    }
327    for receipt in &outcome.receipts_outcome {
328        if let ExecutionStatusView::Failure(tx_err) = &receipt.outcome.status {
329            failures.push(tx_err);
330        }
331    }
332    failures
333}
334
335async fn cached_nonce(
336    nonce: &AtomicU64,
337    client: &Client,
338) -> Result<(Nonce, CryptoHash, BlockHeight)> {
339    let nonce = nonce.fetch_add(1, Ordering::SeqCst);
340
341    // Fetch latest block_hash since the previous one is now invalid for new transactions:
342    let block = client.view_block().await?;
343    Ok((nonce + 1, block.header.hash, block.header.height))
344}
345
346/// Fetches the transaction nonce and block hash associated to the access key. Internally
347/// caches the nonce as to not need to query for it every time, and ending up having to run
348/// into contention with others.
349async fn fetch_nonce(
350    client: &Client,
351    account_id: &AccountId,
352    public_key: &PublicKey,
353) -> Result<(Nonce, CryptoHash, BlockHeight)> {
354    let cache_key = (account_id.clone(), public_key.clone());
355    let nonces = client.access_key_nonces.read().await;
356    if let Some(nonce) = nonces.get(&cache_key) {
357        cached_nonce(nonce, client).await
358    } else {
359        drop(nonces);
360        let mut nonces = client.access_key_nonces.write().await;
361        match nonces.entry(cache_key) {
362            // case where multiple writers end up at the same lock acquisition point and tries
363            // to overwrite the cached value that a previous writer already wrote.
364            Entry::Occupied(entry) => cached_nonce(entry.get(), client).await,
365
366            // Write the cached value. This value will get invalidated when an InvalidNonce error is returned.
367            Entry::Vacant(entry) => {
368                let (account_id, public_key) = entry.key();
369                let (access_key, block_hash, block_height) =
370                    client.access_key(account_id, public_key).await?;
371                entry.insert(AtomicU64::new(access_key.nonce + 1));
372                Ok((access_key.nonce + 1, block_hash, block_height))
373            }
374        }
375    }
376}